diff --git a/src/core/rpc/rpc_server.cpp b/src/core/rpc/rpc_server.cpp index 0a3b046ec..9f156fc22 100644 --- a/src/core/rpc/rpc_server.cpp +++ b/src/core/rpc/rpc_server.cpp @@ -106,10 +106,8 @@ void RPCServer::HandleRequestsLoop() { LOG_INFO(RPC_Server, "Request handler started."); - while (true) { - std::unique_lock lock(request_queue_mutex); - request_queue_cv.wait(lock, [&] { return !running || request_queue.Pop(request_packet); }); - if (!running) { + while (request_queue.PopWait(request_packet)) { + if (!request_packet) { break; } HandleSingleRequest(std::move(request_packet)); @@ -117,23 +115,18 @@ void RPCServer::HandleRequestsLoop() { } void RPCServer::QueueRequest(std::unique_ptr request) { - std::unique_lock lock(request_queue_mutex); request_queue.Push(std::move(request)); - request_queue_cv.notify_one(); } void RPCServer::Start() { - running = true; const auto threadFunction = [this]() { HandleRequestsLoop(); }; request_handler_thread = std::thread(threadFunction); server.Start(); } void RPCServer::Stop() { - running = false; - request_queue_cv.notify_one(); - request_handler_thread.join(); server.Stop(); + request_handler_thread.join(); } }; // namespace RPC diff --git a/src/core/rpc/rpc_server.h b/src/core/rpc/rpc_server.h index 62fdb739c..bb57bcdae 100644 --- a/src/core/rpc/rpc_server.h +++ b/src/core/rpc/rpc_server.h @@ -31,10 +31,7 @@ private: Server server; Common::SPSCQueue> request_queue; - bool running = false; std::thread request_handler_thread; - std::mutex request_queue_mutex; - std::condition_variable request_queue_cv; }; } // namespace RPC diff --git a/src/core/rpc/server.cpp b/src/core/rpc/server.cpp index 950881e9b..0ba052017 100644 --- a/src/core/rpc/server.cpp +++ b/src/core/rpc/server.cpp @@ -1,6 +1,5 @@ #include -#include "common/threadsafe_queue.h" #include "core/core.h" #include "core/rpc/rpc_server.h" #include "core/rpc/server.h" diff --git a/src/core/rpc/zmq_server.cpp b/src/core/rpc/zmq_server.cpp index 4825108d7..47885973c 100644 --- a/src/core/rpc/zmq_server.cpp +++ b/src/core/rpc/zmq_server.cpp @@ -52,7 +52,8 @@ void ZMQServer::WorkerLoop() { LOG_WARNING(RPC_Server, "Failed to receive data on ZeroMQ socket"); } } - + std::unique_ptr end_packet = nullptr; + new_request_callback(std::move(end_packet)); // Destroying the socket must be done by this thread. zmq_socket.reset(); }