// Copyright 2010 Dolphin Emulator Project // Licensed under GPLv2+ // Refer to the license.txt file included. #pragma once // a simple lockless thread-safe, // single reader, single writer queue #include #include #include #include #include #include "common/common_types.h" namespace Common { template class SPSCQueue { public: SPSCQueue() : size(0) { write_ptr = read_ptr = new ElementPtr(); } ~SPSCQueue() { // this will empty out the whole queue delete read_ptr; } u32 Size() const { static_assert(NeedSize, "using Size() on FifoQueue without NeedSize"); return size.load(); } bool Empty() const { return !read_ptr->next.load(); } T& Front() const { return read_ptr->current; } /** * Push data to the queue. If NeedSize=True then Push will notify the waiting consumer thread */ template void Push(Arg&& t) { // create the element, add it to the queue write_ptr->current = std::forward(t); // set the next pointer to a new element ptr // then advance the write pointer ElementPtr* new_ptr = new ElementPtr(); write_ptr->next.store(new_ptr, std::memory_order_release); write_ptr = new_ptr; if (NeedSize) { std::lock_guard lock(size_lock); size++; size_cv.notify_one(); } } void Pop() { if (NeedSize) size--; ElementPtr* tmpptr = read_ptr; // advance the read pointer read_ptr = tmpptr->next.load(); // set the next element to nullptr to stop the recursive deletion tmpptr->next.store(nullptr); delete tmpptr; // this also deletes the element } bool Pop(T& t) { if (Empty()) return false; if (NeedSize) size--; ElementPtr* tmpptr = read_ptr; read_ptr = tmpptr->next.load(std::memory_order_acquire); t = std::move(tmpptr->current); tmpptr->next.store(nullptr); delete tmpptr; return true; } /** * Waits up to timeout for data to be Pushed to the queue. Push uses a condition variable to * signal the waiting thread, but only if NeedSize = true. Returns false if the timeout is * triggered. If the condition variable is signalled, returns the value from Pop * @param T In parameter to store the value if this method returns true * @param timeout Time in milliseconds to wait for a signal from a Push */ bool PopWait(T& t, u64 timeout = 500) { if (NeedSize) { std::unique_lock lock(size_lock); if (size_cv.wait_for(lock, std::chrono::milliseconds(timeout), [& size = size] { return size > 0; })) { return Pop(t); } return false; } return Pop(t); } // not thread-safe void Clear() { size.store(0); delete read_ptr; write_ptr = read_ptr = new ElementPtr(); } private: // stores a pointer to element // and a pointer to the next ElementPtr class ElementPtr { public: ElementPtr() : next(nullptr) {} ~ElementPtr() { ElementPtr* next_ptr = next.load(); if (next_ptr) delete next_ptr; } T current; std::atomic next; }; ElementPtr* write_ptr; ElementPtr* read_ptr; std::atomic size; std::mutex size_lock; std::condition_variable size_cv; }; // a simple thread-safe, // single reader, multiple writer queue template class MPSCQueue : public SPSCQueue { public: template void Push(Arg&& t) { std::lock_guard lock(write_lock); SPSCQueue::Push(t); } private: std::mutex write_lock; }; } // namespace Common