From df4316af2512ead61a6376ba875f244045249648 Mon Sep 17 00:00:00 2001 From: Michael Mueller Date: Wed, 23 Apr 2025 18:40:32 +0200 Subject: [PATCH] ealanos: Ported wait-free queue from MxTasking. --- .../ealanos/include/ealanos/util/mpsc_queue.h | 124 ++++++++++++++++++ 1 file changed, 124 insertions(+) create mode 100644 repos/ealanos/include/ealanos/util/mpsc_queue.h diff --git a/repos/ealanos/include/ealanos/util/mpsc_queue.h b/repos/ealanos/include/ealanos/util/mpsc_queue.h new file mode 100644 index 0000000000..eee0d10f52 --- /dev/null +++ b/repos/ealanos/include/ealanos/util/mpsc_queue.h @@ -0,0 +1,124 @@ + + +#ifndef __EALANOS__INCLUDE__UTIL__MPSC_QUEUE_H_ +#define __EALANOS__INCLUDE__UTIL__MPSC_QUEUE_H_ + +namespace Ealan::util { +/** + * Multi producer, single consumer queue with unlimited slots. + * Every thread can push values into the queue without using latches. + * + * Inspired by http://www.1024cores.net/home/lock-free-algorithms/queues/intrusive-mpsc-node-based-queue + */ +template class MPSCQueue +{ +public: + constexpr MPSCQueue() noexcept + : _head(reinterpret_cast(&_stub)), _tail(reinterpret_cast(&_stub)), + _end(reinterpret_cast(&_stub)) + { + reinterpret_cast(_stub).next(nullptr); + } + ~MPSCQueue() noexcept = default; + + /** + * Inserts the given item into the queue. + * @param item Item to insert. + */ + void push_back(T *item) noexcept + { + item->next(nullptr); + auto *prev = __atomic_exchange_n(&_head, item, __ATOMIC_RELAXED); + prev->next(item); + } + + /** + * Inserts all items between begin and end into the queue. + * Items must be linked among themselves. + * @param begin First item to insert. + * @param end Last item to insert. + */ + void push_back(T *begin, T *end) noexcept + { + end->next(nullptr); + auto *old_head = __atomic_exchange_n(&_head, end, __ATOMIC_RELAXED); + old_head->next(begin); + } + + /** + * @return End of the queue. + */ + [[nodiscard]] const T *end() const noexcept { return _end; } + + [[nodiscard]] T *head() noexcept { return _head; } + + /** + * @return True, when the queue is empty. + */ + [[nodiscard]] bool empty() const noexcept + { + return _tail == _end && reinterpret_cast(_stub).next() == nullptr; + } + + /** + * @return Takes and removes the first item from the queue. + */ + T *pop_front() noexcept; + +private: + // Head of the queue (accessed by every producer). + alignas(64) T *_head; + + // Tail of the queue (accessed by the consumer and producers if queue is empty)- + alignas(64) T *_tail; + + // Pointer to the end. + alignas(16) T *const _end; + + // Dummy item for empty queue. + alignas(64) char _stub[sizeof(T)]; +}; + +template T *MPSCQueue::pop_front() noexcept +{ + auto *tail = this->_tail; + auto *next = tail->next(); + + if (tail == this->_end) + { + if (next == nullptr) + { + return nullptr; + } + + this->_tail = next; + tail = next; + next = next->next(); + } + + if (next != nullptr) + { + this->_tail = next; + return tail; + } + + const auto *head = this->_head; + if (tail != head) + { + return nullptr; + } + + this->push_back(this->_end); + + next = tail->next(); + if (next != nullptr) + { + this->_tail = next; + return tail; + } + + return nullptr; +} +} // namespace Ealan::util + +#endif /* __EALANOS__INCLUDE__UTIL__MPSC_QUEUE_H_ */ \ No newline at end of file