diff --git a/src/common/logging/backend.cpp b/src/common/logging/backend.cpp index 9c3bb6b49..2915efd7b 100644 --- a/src/common/logging/backend.cpp +++ b/src/common/logging/backend.cpp @@ -81,7 +81,8 @@ private: backend->Write(e); } }; - while (message_queue.PopWait(entry)) { + while (true) { + entry = message_queue.PopWait(); if (entry.final_entry) { break; } diff --git a/src/common/threadsafe_queue.h b/src/common/threadsafe_queue.h index 2a3d26577..68955d66d 100644 --- a/src/common/threadsafe_queue.h +++ b/src/common/threadsafe_queue.h @@ -42,7 +42,7 @@ public: template void Push(Arg&& t) { // create the element, add it to the queue - write_ptr->current = std::move(t); + write_ptr->current = std::forward(t); // set the next pointer to a new element ptr // then advance the write pointer ElementPtr* new_ptr = new ElementPtr(); @@ -68,11 +68,10 @@ public: if (Empty()) return false; - ElementPtr* tmpptr = read_ptr; - if (NeedSize) size--; + ElementPtr* tmpptr = read_ptr; read_ptr = tmpptr->next.load(std::memory_order_acquire); t = std::move(tmpptr->current); tmpptr->next.store(nullptr); @@ -80,12 +79,14 @@ public: return true; } - bool PopWait(T& t) { + T PopWait() { if (Empty()) { std::unique_lock lock(cv_mutex); cv.wait(lock, [this]() { return !Empty(); }); } - return Pop(t); + T t; + Pop(t); + return t; } // not thread-safe @@ -151,8 +152,8 @@ public: return spsc_queue.Pop(t); } - bool PopWait(T& t) { - return spsc_queue.PopWait(t); + T PopWait() { + return spsc_queue.PopWait(); } // not thread-safe diff --git a/src/core/rpc/rpc_server.cpp b/src/core/rpc/rpc_server.cpp index 9f156fc22..819880196 100644 --- a/src/core/rpc/rpc_server.cpp +++ b/src/core/rpc/rpc_server.cpp @@ -106,10 +106,7 @@ void RPCServer::HandleRequestsLoop() { LOG_INFO(RPC_Server, "Request handler started."); - while (request_queue.PopWait(request_packet)) { - if (!request_packet) { - break; - } + while ((request_packet = request_queue.PopWait())) { HandleSingleRequest(std::move(request_packet)); } }