From 9fdc89a4568e340fd88889235473e1c5b7d34623 Mon Sep 17 00:00:00 2001 From: James Rowe Date: Fri, 23 Feb 2018 00:30:43 -0700 Subject: [PATCH] SPSCQueue: Add PopWait Adds a condition var to SPSCQueue so when a new log is pushed it will wake the consumer thread that is calling PopWait. This only applies to to queues with NeedSize=true --- src/common/threadsafe_queue.h | 32 +++++++++++++++++++++++++++++++- 1 file changed, 31 insertions(+), 1 deletion(-) diff --git a/src/common/threadsafe_queue.h b/src/common/threadsafe_queue.h index a0c731e8c..1c676d202 100644 --- a/src/common/threadsafe_queue.h +++ b/src/common/threadsafe_queue.h @@ -9,6 +9,7 @@ #include #include +#include #include #include #include "common/common_types.h" @@ -36,6 +37,10 @@ public: T& Front() const { return read_ptr->current; } + + /** + * Push data to the queue. If NeedSize=True then Push will notify the waiting consumer thread + */ template void Push(Arg&& t) { // create the element, add it to the queue @@ -45,8 +50,11 @@ public: ElementPtr* new_ptr = new ElementPtr(); write_ptr->next.store(new_ptr, std::memory_order_release); write_ptr = new_ptr; - if (NeedSize) + if (NeedSize) { + std::lock_guard lock(size_lock); size++; + size_cv.notify_one(); + } } void Pop() { @@ -75,6 +83,25 @@ public: return true; } + /** + * Waits up to timeout for data to be Pushed to the queue. Push uses a condition variable to + * signal the waiting thread, but only if NeedSize = true. Returns false if the timeout is + * triggered. If the condition variable is signalled, returns the value from Pop + * @param T In parameter to store the value if this method returns true + * @param timeout Time in milliseconds to wait for a signal from a Push + */ + bool PopWait(T& t, u64 timeout = 500) { + if (NeedSize) { + std::unique_lock lock(size_lock); + if (size_cv.wait_for(lock, std::chrono::milliseconds(timeout), + [& size = size] { return size > 0; })) { + return Pop(t); + } + return false; + } + return Pop(t); + } + // not thread-safe void Clear() { size.store(0); @@ -102,6 +129,9 @@ private: ElementPtr* write_ptr; ElementPtr* read_ptr; std::atomic size; + + std::mutex size_lock; + std::condition_variable size_cv; }; // a simple thread-safe,