Added experimental lock-free MPMC queue as packet queue implementation.

This commit is contained in:
Michael Mueller
2025-01-21 15:20:57 +01:00
parent 7535440e79
commit 991dd5e6c5

View File

@@ -84,7 +84,10 @@ namespace Genode {
class Packet_descriptor; class Packet_descriptor;
template <typename, int> class Packet_descriptor_queue; template <typename, int>
class Mutex_free_packet_descriptor_queue;
template <typename, int>
class Packet_descriptor_queue;
template <typename> class Packet_descriptor_transmitter; template <typename> class Packet_descriptor_transmitter;
template <typename> class Packet_descriptor_receiver; template <typename> class Packet_descriptor_receiver;
@@ -148,6 +151,129 @@ class Genode::Packet_descriptor
Genode::size_t size() const { return _size; } Genode::size_t size() const { return _size; }
}; };
template <typename PACKET_DESCRIPTOR, int QUEUE_SIZE>
class Genode::Mutex_free_packet_descriptor_queue
{
private:
struct cell {
Genode::uint64_t idx;
PACKET_DESCRIPTOR content;
};
struct {
struct cell _queue[QUEUE_SIZE];
};
Genode::uint32_t _length{0};
alignas(64) Genode::uint64_t _head{0U};
alignas(64) Genode::uint64_t _tail{0U};
public:
typedef PACKET_DESCRIPTOR Packet_descriptor;
enum Role { PRODUCER, CONSUMER };
Mutex_free_packet_descriptor_queue(Role role)
{
Genode::log("Creating packet queue");
if (role == PRODUCER)
{
Genode::memset(_queue, 0, sizeof(_queue));
for (auto i = 0U; i < QUEUE_SIZE; ++i)
{
_queue[i].idx = i;
}
}
}
bool add(Packet_descriptor packet)
{
auto pos = __atomic_load_n(&_head, __ATOMIC_RELAXED);
Genode::uint64_t slot = 0;
for (;;) {
Genode::log("Searching for free slot");
slot = pos % QUEUE_SIZE;
const auto idx = __atomic_load_n(&_queue[slot].idx, __ATOMIC_ACQUIRE);
const auto diff = Genode::int64_t(idx) - Genode::int64_t(pos);
if (diff == 0) {
if (__atomic_compare_exchange_n(&_head, &pos, pos+1, true, __ATOMIC_RELAXED, __ATOMIC_RELAXED))
break;
} else if (diff < 0)
return false;
else
pos = __atomic_load_n(&_head, __ATOMIC_RELAXED);
}
_queue[slot].content = packet;
__atomic_store_n(&_queue[slot].idx, pos + 1, __ATOMIC_RELEASE);
return true;
}
bool try_get(Packet_descriptor &packet)
{
auto pos = __atomic_load_n(&_tail, __ATOMIC_RELAXED);
Genode::uint64_t slot = 0;
for (;;) {
slot = pos % QUEUE_SIZE;
const auto idx = __atomic_load_n(&_queue[slot].idx, __ATOMIC_ACQUIRE);
const auto diff = Genode::int64_t(idx) - Genode::int64_t(pos + 1);
if (diff == 0) {
if (__atomic_compare_exchange_n(&_tail, &pos, pos+1, true, __ATOMIC_RELAXED, __ATOMIC_RELAXED))
break;
} else if (diff < 0)
return false;
else
pos = __atomic_load_n(&_tail, __ATOMIC_RELAXED);
}
packet = _queue[slot].content;
__atomic_store_n(&_queue[slot].idx, pos + QUEUE_SIZE, __ATOMIC_RELEASE);
return true;
}
Packet_descriptor get()
{
Packet_descriptor pkt;
while (!try_get(pkt))
Genode::log("trying to get packet");
return pkt;
}
Packet_descriptor peek()
{
return _queue[__atomic_load_n(&_tail, __ATOMIC_ACQUIRE) % QUEUE_SIZE].content;
}
bool empty() {
return __atomic_load_n(&_head, __ATOMIC_RELAXED) == __atomic_load_n(&_tail, __ATOMIC_RELAXED);
}
bool full() { return (_head + 1)%QUEUE_SIZE == _tail; }
bool single_element() {
return (_tail + 1) % QUEUE_SIZE == _head;
}
/*
bool single_slot_free() {
auto pos = __atomic_load_n(&_head, __ATOMIC_RELAXED);
const auto idx = __atomic_load_n(&_queue[pos % QUEUE_SIZE].idx, __ATOMIC_ACQUIRE);
const auto diff = Genode::int64_t(idx) - Genode::int64_t(pos);
Genode::log("Got diff free slots: ", diff);
if (diff == 0)
return true;
else
return false;
}*/
bool single_slot_free() { return (_head + 2)%QUEUE_SIZE == _tail; }
unsigned slots_free() {
Genode::log("Checking free slots");
return static_cast<unsigned>(((_tail > _head) ? _tail - _head
: QUEUE_SIZE - _head + _tail) -
1);
}
};
/** /**
* Ring buffer shared between source and sink, containing packet descriptors * Ring buffer shared between source and sink, containing packet descriptors
@@ -327,7 +453,11 @@ class Genode::Packet_descriptor_transmitter
bool try_tx(typename TX_QUEUE::Packet_descriptor packet) bool try_tx(typename TX_QUEUE::Packet_descriptor packet)
{ {
Genode::Mutex::Guard mutex_guard(_tx_queue_mutex); {
//GENODE_LOG_TSC_NAMED(1, "try_tx: _tx_queue_mutex lock");
_tx_queue_mutex.acquire();
}
//Genode::Mutex::Guard mutex_guard(_tx_queue_mutex);
if (_tx_queue->full()) if (_tx_queue->full())
return false; return false;
@@ -337,12 +467,20 @@ class Genode::Packet_descriptor_transmitter
if (_tx_queue->single_element()) if (_tx_queue->single_element())
_tx_wakeup_needed = true; _tx_wakeup_needed = true;
{
//GENODE_LOG_TSC_NAMED(1, "try_tx: …tx_queue_mutex release");
_tx_queue_mutex.release();
}
return true; return true;
} }
bool tx_wakeup() bool tx_wakeup()
{ {
Genode::Mutex::Guard mutex_guard(_tx_queue_mutex); {
//GENODE_LOG_TSC_NAMED(1, "tx_wakeup: acquire");
_tx_queue_mutex.acquire();
}
//Genode::Mutex::Guard mutex_guard(_tx_queue_mutex);
bool signal_submitted = false; bool signal_submitted = false;
@@ -352,6 +490,10 @@ class Genode::Packet_descriptor_transmitter
} }
_tx_wakeup_needed = false; _tx_wakeup_needed = false;
{
//GENODE_LOG_TSC_NAMED(1, "tx_wakeup: release");
_tx_queue_mutex.release();
}
return signal_submitted; return signal_submitted;
} }