From 2b0e64e06160f554e3ff18dba80f015a76d1721f Mon Sep 17 00:00:00 2001 From: Norman Feske Date: Tue, 25 Jan 2022 15:11:46 +0100 Subject: [PATCH] os: remove blocking semantics from packet stream Fixes #4390 --- repos/os/include/os/packet_stream.h | 143 ++++++------------ repos/os/include/packet_stream_rx/client.h | 3 - .../os/include/packet_stream_rx/rpc_object.h | 11 +- repos/os/include/packet_stream_tx/client.h | 2 - .../os/include/packet_stream_tx/rpc_object.h | 10 +- 5 files changed, 55 insertions(+), 114 deletions(-) diff --git a/repos/os/include/os/packet_stream.h b/repos/os/include/os/packet_stream.h index 2205144b03..bd9545de5b 100644 --- a/repos/os/include/os/packet_stream.h +++ b/repos/os/include/os/packet_stream.h @@ -37,36 +37,33 @@ * This protocol has four corner cases that are handled by signals: * * :submit queue is full: when the source is trying to submit a new packet. - * In this case, the source blocks and waits for the sink to remove packets + * In this case, the source needs to stall until the sink has removed packets * from the submit queue. If the sink observes such a condition (calling * 'get_packet' on a full submit queue, it delivers a 'ready_to_submit' * signal to wake up the source. * * :submit queue is empty: when the sink tries to obtain a packet via - * 'get_packet'. The sink is going to block. If the source places a + * 'get_packet'. In this case, the sink may go idle. If the source places a * packet into an empty submit queue, it delivers a 'packet_avail' * signal to wake up the sink. * * :acknowledgement queue is full: when the sink tries to acknowledge a packet - * using 'acknowledge_packet'. The sink is going to block until the source + * using 'acknowledge_packet'. The sink needs to stall until the source * removes an acknowledged packet from the acknowledgement queue and delivers * a 'ready_to_ack' signal. * * :acknowledgement queue is empty: when the source tries to obtain an * acknowledged packet using 'get_acked_packet'. In this case, the source - * will block until the sink places another acknowledged packet into the + * may go idle until the sink places another acknowledged packet into the * empty acknowledgement queue and delivers a 'ack_avail' signal. * - * These conditions can be avoided by querying the state of the submit and - * acknowledge buffers using the methods 'packet_avail', - * 'ready_to_submit', 'ready_to_ack', and 'ack_avail'. - * - * If bidirectional data exchange between two processes is desired, two pairs - * of 'Packet_stream_source' and 'Packet_stream_sink' should be instantiated. + * These conditions must be queried before interacting with the queues by + * using the methods 'packet_avail', 'ready_to_submit', 'ready_to_ack', and + * 'ack_avail'. */ /* - * Copyright (C) 2009-2017 Genode Labs GmbH + * Copyright (C) 2009-2022 Genode Labs GmbH * * This file is part of the Genode OS framework, which is distributed * under the terms of the GNU Affero General Public License version 3. @@ -271,13 +268,8 @@ class Genode::Packet_descriptor_transmitter { private: - /* facility to receive ready-to-transmit signals */ - Genode::Signal_receiver _tx_ready { }; - Genode::Signal_context _tx_ready_context { }; - Genode::Signal_context_capability _tx_ready_cap; - /* facility to send ready-to-receive signals */ - Genode::Signal_transmitter _rx_ready { }; + Genode::Signal_transmitter _rx_ready { }; Genode::Mutex _tx_queue_mutex { }; TX_QUEUE *_tx_queue; @@ -291,25 +283,16 @@ class Genode::Packet_descriptor_transmitter public: + class Saturated_tx_queue : Exception { }; + /** * Constructor */ Packet_descriptor_transmitter(TX_QUEUE *tx_queue) : - _tx_ready_cap(_tx_ready.manage(&_tx_ready_context)), _tx_queue(tx_queue) { } - ~Packet_descriptor_transmitter() - { - _tx_ready.dissolve(&_tx_ready_context); - } - - Genode::Signal_context_capability tx_ready_cap() - { - return _tx_ready_cap; - } - void register_rx_ready_cap(Genode::Signal_context_capability cap) { _rx_ready.context(cap); @@ -333,18 +316,10 @@ class Genode::Packet_descriptor_transmitter { Genode::Mutex::Guard mutex_guard(_tx_queue_mutex); - do { - /* block for signal if tx queue is full */ - if (_tx_queue->full()) - _tx_ready.wait_for_signal(); + if (_tx_queue->full()) + throw Saturated_tx_queue(); - /* - * It could happen that pending signals do not refer to the - * current queue situation. Therefore, we need to double check - * if the queue insertion succeeds and retry if needed. - */ - - } while (_tx_queue->add(packet) == false); + _tx_queue->add(packet); if (_tx_queue->single_element()) _rx_ready.submit(); @@ -397,13 +372,8 @@ class Genode::Packet_descriptor_receiver { private: - /* facility to receive ready-to-receive signals */ - Genode::Signal_receiver _rx_ready { }; - Genode::Signal_context _rx_ready_context { }; - Genode::Signal_context_capability _rx_ready_cap; - /* facility to send ready-to-transmit signals */ - Genode::Signal_transmitter _tx_ready { }; + Genode::Signal_transmitter _tx_ready { }; Genode::Mutex mutable _rx_queue_mutex { }; RX_QUEUE *_rx_queue; @@ -417,25 +387,16 @@ class Genode::Packet_descriptor_receiver public: + class Empty_rx_queue : Exception { }; + /** * Constructor */ Packet_descriptor_receiver(RX_QUEUE *rx_queue) : - _rx_ready_cap(_rx_ready.manage(&_rx_ready_context)), _rx_queue(rx_queue) { } - ~Packet_descriptor_receiver() - { - _rx_ready.dissolve(&_rx_ready_context); - } - - Genode::Signal_context_capability rx_ready_cap() - { - return _rx_ready_cap; - } - void register_tx_ready_cap(Genode::Signal_context_capability cap) { _tx_ready.context(cap); @@ -459,8 +420,8 @@ class Genode::Packet_descriptor_receiver { Genode::Mutex::Guard mutex_guard(_rx_queue_mutex); - while (_rx_queue->empty()) - _rx_ready.wait_for_signal(); + if (_rx_queue->empty()) + throw Empty_rx_queue(); *out_packet = _rx_queue->get(); @@ -674,7 +635,9 @@ class Genode::Packet_stream_source : private Packet_stream_base /** * Exception type */ - class Packet_alloc_failed { }; + class Packet_alloc_failed : Exception { }; + class Saturated_submit_queue : Exception { }; + class Empty_ack_queue : Exception { }; /** * Constructor @@ -740,24 +703,6 @@ class Genode::Packet_stream_source : private Packet_stream_base _ack_receiver.register_tx_ready_cap(cap); } - /** - * Return signal handler for handling signals indicating that new - * packets can be submitted. - */ - Genode::Signal_context_capability sigh_ready_to_submit() - { - return _submit_transmitter.tx_ready_cap(); - } - - /** - * Return signal handler for handling signals indicating that - * new acknowledgements are available. - */ - Genode::Signal_context_capability sigh_ack_avail() - { - return _ack_receiver.rx_ready_cap(); - } - /** * Allocate packet * @@ -803,6 +748,11 @@ class Genode::Packet_stream_source : private Packet_stream_base */ void submit_packet(Packet_descriptor packet) { + if (!ready_to_submit()) { + error("attempt to add packet into saturated submit queue"); + throw Saturated_submit_queue(); + } + _submit_transmitter.tx(packet); } @@ -840,6 +790,11 @@ class Genode::Packet_stream_source : private Packet_stream_base */ Packet_descriptor get_acked_packet() { + if (!ack_avail()) { + error("attempt to retrieve packet from empty acknowledgement queue"); + throw Empty_ack_queue(); + } + Packet_descriptor packet; _ack_receiver.rx(&packet); return packet; @@ -888,6 +843,12 @@ class Genode::Packet_stream_sink : private Packet_stream_base typedef typename POLICY::Packet_descriptor Packet_descriptor; typedef typename POLICY::Content_type Content_type; + /** + * Exception types + */ + class Saturated_ack_queue : Exception { }; + class Empty_submit_queue : Exception { }; + private: Packet_descriptor_receiver _submit_receiver; @@ -933,24 +894,6 @@ class Genode::Packet_stream_sink : private Packet_stream_base _submit_receiver.register_tx_ready_cap(cap); } - /** - * Return signal handler for handling signals indicating that - * new acknowledgements can be generated. - */ - Genode::Signal_context_capability sigh_ready_to_ack() - { - return _ack_transmitter.tx_ready_cap(); - } - - /** - * Return signal handler for handling signals indicating that - * new packets are available in the submit queue. - */ - Genode::Signal_context_capability sigh_packet_avail() - { - return _submit_receiver.rx_ready_cap(); - } - /** * Return true if a packet is available */ @@ -963,6 +906,11 @@ class Genode::Packet_stream_sink : private Packet_stream_base */ Packet_descriptor get_packet() { + if (!packet_avail()) { + error("attempt to retrieve packet from empty submit queue"); + throw Empty_submit_queue(); + } + Packet_descriptor packet; _submit_receiver.rx(&packet); return packet; @@ -1027,6 +975,11 @@ class Genode::Packet_stream_sink : private Packet_stream_base */ void acknowledge_packet(Packet_descriptor packet) { + if (!ack_slots_free()) { + error("attempt to add packet to saturated acknowledgement queue"); + throw Saturated_ack_queue(); + } + _ack_transmitter.tx(packet); } diff --git a/repos/os/include/packet_stream_rx/client.h b/repos/os/include/packet_stream_rx/client.h index 1d5c3134d1..390efd915a 100644 --- a/repos/os/include/packet_stream_rx/client.h +++ b/repos/os/include/packet_stream_rx/client.h @@ -51,9 +51,6 @@ class Packet_stream_rx::Client : public Genode::Rpc_client /* wire data-flow signals for the packet receiver */ _sink.register_sigh_ack_avail(Base::template call()); _sink.register_sigh_ready_to_submit(Base::template call()); - - sigh_ready_to_ack(_sink.sigh_ready_to_ack()); - sigh_packet_avail(_sink.sigh_packet_avail()); } void sigh_ready_to_ack(Genode::Signal_context_capability sigh) override { diff --git a/repos/os/include/packet_stream_rx/rpc_object.h b/repos/os/include/packet_stream_rx/rpc_object.h index e3b255a54a..0a0233f022 100644 --- a/repos/os/include/packet_stream_rx/rpc_object.h +++ b/repos/os/include/packet_stream_rx/rpc_object.h @@ -29,8 +29,8 @@ class Packet_stream_rx::Rpc_object : public Genode::Rpc_object _cap { }; typename CHANNEL::Source _source; - Genode::Signal_context_capability _sigh_ready_to_submit; - Genode::Signal_context_capability _sigh_ack_avail; + Genode::Signal_context_capability _sigh_ready_to_submit { }; + Genode::Signal_context_capability _sigh_ack_avail { }; public: @@ -48,11 +48,8 @@ class Packet_stream_rx::Rpc_object : public Genode::Rpc_object /* wire data-flow signals for the packet transmitter */ _source.register_sigh_packet_avail(Base::template call()); _source.register_sigh_ready_to_ack(Base::template call()); - sigh_ready_to_submit(_source.sigh_ready_to_submit()); - sigh_ack_avail(_source.sigh_ack_avail()); } void sigh_ready_to_submit(Genode::Signal_context_capability sigh) override { diff --git a/repos/os/include/packet_stream_tx/rpc_object.h b/repos/os/include/packet_stream_tx/rpc_object.h index 8c9abbf14b..f37354af77 100644 --- a/repos/os/include/packet_stream_tx/rpc_object.h +++ b/repos/os/include/packet_stream_tx/rpc_object.h @@ -29,8 +29,8 @@ class Packet_stream_tx::Rpc_object : public Genode::Rpc_object _cap { }; typename CHANNEL::Sink _sink; - Genode::Signal_context_capability _sigh_ready_to_ack; - Genode::Signal_context_capability _sigh_packet_avail; + Genode::Signal_context_capability _sigh_ready_to_ack { }; + Genode::Signal_context_capability _sigh_packet_avail { }; public: @@ -46,11 +46,7 @@ class Packet_stream_tx::Rpc_object : public Genode::Rpc_object