threadsafe_queue: Add std::stop_token overload to PopWait

Useful for jthreads which make use of the threadsafe queues.
This commit is contained in:
ameerj 2021-09-15 20:18:36 -04:00
parent 081ccc6441
commit c2ddda2f51

View file

@ -14,7 +14,7 @@
#include <utility> #include <utility>
namespace Common { namespace Common {
template <typename T> template <typename T, bool with_stop_token = false>
class SPSCQueue { class SPSCQueue {
public: public:
SPSCQueue() { SPSCQueue() {
@ -84,7 +84,7 @@ public:
void Wait() { void Wait() {
if (Empty()) { if (Empty()) {
std::unique_lock lock{cv_mutex}; std::unique_lock lock{cv_mutex};
cv.wait(lock, [this]() { return !Empty(); }); cv.wait(lock, [this] { return !Empty(); });
} }
} }
@ -95,6 +95,19 @@ public:
return t; return t;
} }
T PopWait(std::stop_token stop_token) {
if (Empty()) {
std::unique_lock lock{cv_mutex};
cv.wait(lock, stop_token, [this] { return !Empty(); });
}
if (stop_token.stop_requested()) {
return T{};
}
T t;
Pop(t);
return t;
}
// not thread-safe // not thread-safe
void Clear() { void Clear() {
size.store(0); size.store(0);
@ -123,13 +136,13 @@ private:
ElementPtr* read_ptr; ElementPtr* read_ptr;
std::atomic_size_t size{0}; std::atomic_size_t size{0};
std::mutex cv_mutex; std::mutex cv_mutex;
std::condition_variable cv; std::conditional_t<with_stop_token, std::condition_variable_any, std::condition_variable> cv;
}; };
// a simple thread-safe, // a simple thread-safe,
// single reader, multiple writer queue // single reader, multiple writer queue
template <typename T> template <typename T, bool with_stop_token = false>
class MPSCQueue { class MPSCQueue {
public: public:
[[nodiscard]] std::size_t Size() const { [[nodiscard]] std::size_t Size() const {
@ -166,13 +179,17 @@ public:
return spsc_queue.PopWait(); return spsc_queue.PopWait();
} }
T PopWait(std::stop_token stop_token) {
return spsc_queue.PopWait(stop_token);
}
// not thread-safe // not thread-safe
void Clear() { void Clear() {
spsc_queue.Clear(); spsc_queue.Clear();
} }
private: private:
SPSCQueue<T> spsc_queue; SPSCQueue<T, with_stop_token> spsc_queue;
std::mutex write_lock; std::mutex write_lock;
}; };
} // namespace Common } // namespace Common