diff --git a/repos/os/include/os/packet_stream.h b/repos/os/include/os/packet_stream.h index c34e78ddaa..f459f9a6a2 100644 --- a/repos/os/include/os/packet_stream.h +++ b/repos/os/include/os/packet_stream.h @@ -281,6 +281,7 @@ class Genode::Packet_descriptor_transmitter Genode::Lock _tx_queue_lock { }; TX_QUEUE *_tx_queue; + bool _tx_wakeup_needed = false; /* * Noncopyable @@ -349,6 +350,36 @@ class Genode::Packet_descriptor_transmitter _rx_ready.submit(); } + bool try_tx(typename TX_QUEUE::Packet_descriptor packet) + { + Genode::Lock::Guard lock_guard(_tx_queue_lock); + + if (_tx_queue->full()) + return false; + + _tx_queue->add(packet); + + if (_tx_queue->single_element()) + _tx_wakeup_needed = true; + + return true; + } + + bool tx_wakeup() + { + Genode::Lock::Guard lock_guard(_tx_queue_lock); + + bool signal_submitted = false; + + if (_tx_wakeup_needed) { + _rx_ready.submit(); + signal_submitted = true; + } + + _tx_wakeup_needed = false; + return signal_submitted; + } + /** * Return number of slots left to be put into the tx queue */ @@ -376,6 +407,7 @@ class Genode::Packet_descriptor_receiver Genode::Lock mutable _rx_queue_lock { }; RX_QUEUE *_rx_queue; + bool _rx_wakeup_needed = false; /* * Noncopyable @@ -436,6 +468,37 @@ class Genode::Packet_descriptor_receiver _tx_ready.submit(); } + typename RX_QUEUE::Packet_descriptor try_rx() + { + Genode::Lock::Guard lock_guard(_rx_queue_lock); + + typename RX_QUEUE::Packet_descriptor packet { }; + + if (!_rx_queue->empty()) + packet = _rx_queue->get(); + + if (_rx_queue->single_slot_free()) + _rx_wakeup_needed = true; + + return packet; + } + + bool rx_wakeup() + { + Genode::Lock::Guard lock_guard(_rx_queue_lock); + + bool signal_submitted = false; + + if (_rx_wakeup_needed) { + _tx_ready.submit(); + signal_submitted = true; + } + + _rx_wakeup_needed = false; + + return signal_submitted; + } + typename RX_QUEUE::Packet_descriptor rx_peek() const { Genode::Lock::Guard lock_guard(_rx_queue_lock); @@ -856,6 +919,28 @@ class Genode::Packet_stream_sink : private Packet_stream_base return packet; } + /** + * Return next packet from source, or an invalid packet + * + * This method never blocks. + */ + Packet_descriptor try_get_packet() + { + return _submit_receiver.try_rx(); + } + + /** + * Wake up the packet source if needed + * + * This method assumes that the same signal handler is used for + * the submit receiver and the ack transmitter. + */ + void wakeup() + { + /* submit only one signal */ + _submit_receiver.rx_wakeup() || _ack_transmitter.tx_wakeup(); + } + /** * Return but do not dequeue next packet * @@ -896,6 +981,18 @@ class Genode::Packet_stream_sink : private Packet_stream_base _ack_transmitter.tx(packet); } + /** + * Acknowledge the specified packet to the client if possible + * + * \return false if the acknowledgement queue is congested + * + * This method never blocks. + */ + bool try_ack_packet(Packet_descriptor packet) + { + return _ack_transmitter.try_tx(packet); + } + void debug_print_buffers() { Packet_stream_base::_debug_print_buffers(); }