From 2925ae7737c7ee7dab006d14af199c7e92564420 Mon Sep 17 00:00:00 2001 From: Michael Mueller Date: Mon, 28 Oct 2024 13:25:58 +0100 Subject: [PATCH] Use new allocator for determining victim queues. --- src/mx/tasking/channel.h | 8 ++- src/mx/tasking/scheduler.cpp | 18 +++--- src/mx/tasking/scheduler.h | 22 +++----- src/mx/tasking/worker.cpp | 73 +++++++++--------------- src/mx/tasking/worker.h | 105 ++++++++++++++++++++++------------- src/mx/util/field_alloc.h | 56 +++++++++++++++++++ 6 files changed, 169 insertions(+), 113 deletions(-) create mode 100644 src/mx/util/field_alloc.h diff --git a/src/mx/tasking/channel.h b/src/mx/tasking/channel.h index f4a9493..72addc1 100644 --- a/src/mx/tasking/channel.h +++ b/src/mx/tasking/channel.h @@ -39,6 +39,7 @@ public: } ~Channel() noexcept = default; + /** * @return Identifier of the channel. */ @@ -47,7 +48,10 @@ public: /** * @return The next task to be executed. */ - TaskInterface *next() noexcept { return _task_buffer.next(); } + TaskInterface *next_task() noexcept { return _task_buffer.next(); } + + Channel *next() noexcept { return _next; } + void next(Channel *c) { _next = c; } /** * Schedules the task to thread-safe queue with regard to the NUMA region @@ -177,6 +181,8 @@ private: // Is the channel currently processed by a worker alignas(64) std::atomic _phase{priority::normal}; + alignas(64) Channel *_next{nullptr}; + /** * Fills the task buffer with tasks scheduled with a given priority. * diff --git a/src/mx/tasking/scheduler.cpp b/src/mx/tasking/scheduler.cpp index 47c7545..1a8cf51 100644 --- a/src/mx/tasking/scheduler.cpp +++ b/src/mx/tasking/scheduler.cpp @@ -20,13 +20,14 @@ std::uint64_t *volatile mx::tasking::runtime::_signal_page; Scheduler::Scheduler(const mx::util::core_set &core_set, const std::uint16_t prefetch_distance, memory::dynamic::Allocator &resource_allocator, std::uint64_t * volatile signal_page) noexcept - : _core_set(core_set), _count_channels(core_set.size()), _vacant_channels(_count_channels), _worker({}), _channel_numa_node_map({0U}), + : _core_set(core_set), _count_channels(core_set.size()), _worker({}), _channel_numa_node_map({0U}), _epoch_manager(core_set.size(), resource_allocator, _is_running), _statistic(_count_channels) { this->_worker.fill(nullptr); this->_channel_numa_node_map.fill(0U); Genode::log("Initializing scheduler"); - for (auto worker_id = 0U; worker_id < mx::system::Environment::topo().global_affinity_space().total(); ++worker_id) + auto worker_count = system::Environment::topo().global_affinity_space().total(); + for (auto worker_id = 0U; worker_id < worker_count; ++worker_id) { const auto core_id = worker_id; this->_channel_numa_node_map[worker_id] = system::topology::node_id(core_id); @@ -34,13 +35,12 @@ Scheduler::Scheduler(const mx::util::core_set &core_set, const std::uint16_t pre auto ptr = memory::GlobalHeap::allocate(this->_channel_numa_node_map[worker_id], sizeof(Worker)); this->_worker[worker_id] = new (ptr) - Worker(worker_id, core_id, this->_channel_numa_node_map[worker_id], signal_page, this->_is_running, _vacant_channels, _stealing_limit, _remainder_channel_count, + Worker(worker_id, core_id, this->_channel_numa_node_map[worker_id], signal_page, this->_is_running, _vacant_channels_alloc, _remainder_channel_count, prefetch_distance, this->_epoch_manager[worker_id], this->_epoch_manager.global_epoch(), this->_statistic); ptr = memory::GlobalHeap::allocate(this->_channel_numa_node_map[worker_id], sizeof(Channel)); this->_channels[worker_id] = new (ptr) Channel(worker_id, this->_channel_numa_node_map[worker_id], prefetch_distance); - _vacant_channels.push_back(_channels[worker_id]); Genode::log("Channel ", worker_id, " created at ", _channels[worker_id]); } } @@ -92,8 +92,8 @@ void Scheduler::start_and_wait() Genode::log("Worker started in ", (end - start), " cycles"); Genode::log("Creating foreman thread on CPU ", start_cpu); - - Channel *qf = _vacant_channels.pop_front(); + + Channel *qf = _channels[0]; _worker[0]->assign(qf); Libc::pthread_create_from_session(&worker_threads[0], Worker::entry, _worker[0], 4 * 4096, "foreman", @@ -111,7 +111,7 @@ void Scheduler::start_and_wait() Nova::core_allocation(allocation, false); Genode::log("Initial cores ", allocation); - this->allocate_cores(_core_set.size()-1); + this->allocate_cores(_count_channels); // ... and epoch management (if enabled). if constexpr (config::memory_reclamation() != config::None) @@ -265,8 +265,4 @@ void Scheduler::profile(const std::string &output_file) Genode::log("Profiling channel ", i, " at ", this->_channels[i]); this->_profiler.profile(this->_is_running, *(this->_channels[i])); } -} - -Channel* Scheduler::steal_for(Worker *thief) -{ } \ No newline at end of file diff --git a/src/mx/tasking/scheduler.h b/src/mx/tasking/scheduler.h index 61137e2..3152b75 100644 --- a/src/mx/tasking/scheduler.h +++ b/src/mx/tasking/scheduler.h @@ -16,8 +16,10 @@ #include #include #include +#include #include #include +#include namespace mx::tasking { /** @@ -57,7 +59,6 @@ public: { _is_running = false; //Genode::log("Stopping runtime"); - Genode::log("Waiting for ", _active_worker_count.load(), " workers to finish."); while (_active_worker_count > 1) system::builtin::pause(); Nova::mword_t pcpu = 0; @@ -82,7 +83,7 @@ public: Nova::core_allocation(allocation, false); //Genode::log("Allocation before resume ", allocation); - allocate_cores(_core_set.size()-1); + allocate_cores(_count_channels); _is_running = true; } @@ -93,14 +94,10 @@ public: return _worker_at_core[pcpu]; } + [[nodiscard]] inline Channel *get_channel(std::uint64_t index) { return _channels[index]; } + [[nodiscard]] inline std::uint16_t active_workers() const noexcept { return _active_worker_count; } - Channel *steal_for(Worker *thief); - - [[nodiscard]] inline void set_stealing_limit(std::uint16_t workers) { - _stealing_limit = std::ceil(static_cast(_vacant_channels.size()) / static_cast(workers)); - } - /** * @return Core set of this instance. */ @@ -232,9 +229,6 @@ public: } } - inline void add_vacant_channel(Channel *channel) { _vacant_channels.push_back(channel); - } - /** * Starts profiling of idle times and specifies the results file. * @param output_file File to write idle times after stopping MxTasking. @@ -251,9 +245,8 @@ public: Nova::mword_t remainder = 0; Nova::alloc_cores(cores, allocation, remainder); std::bitset allocated(allocation); - _remainder_channel_count.store(remainder); + _remainder_channel_count.store(allocated.count()); - //Genode::log("Allocated ", allocation, " with ", allocated.count(), " workers and ", remainder, " excess queues."); } private: @@ -277,9 +270,8 @@ private: alignas(64) std::array _channels{nullptr}; - alignas(64) util::BoundMPMCQueue _vacant_channels{config::max_cores()}; + alignas(64) mx::util::Field_Allocator _vacant_channels_alloc{63}; alignas(64) std::atomic _remainder_channel_count{0}; - alignas(64) util::maybe_atomic _stealing_limit{0}; // Map of channel id to NUMA region id. alignas(64) std::array _channel_numa_node_map{0U}; diff --git a/src/mx/tasking/worker.cpp b/src/mx/tasking/worker.cpp index db4095a..3390adc 100644 --- a/src/mx/tasking/worker.cpp +++ b/src/mx/tasking/worker.cpp @@ -20,42 +20,47 @@ using namespace mx::tasking; Worker::Worker(const std::uint16_t id, const std::uint16_t target_core_id, const std::uint16_t target_numa_node_id, std::uint64_t* volatile tukija_sig, - const util::maybe_atomic &is_running, util::BoundMPMCQueue &v, util::maybe_atomic &s, std::atomic &e, const std::uint16_t prefetch_distance, + const util::maybe_atomic &is_running, util::Field_Allocator &v, std::atomic &e, const std::uint16_t prefetch_distance, memory::reclamation::LocalEpoch &local_epoch, const std::atomic &global_epoch, profiling::Statistic &statistic) noexcept : _target_core_id(target_core_id), _target_numa_node_id(target_numa_node_id), _tukija_signal(reinterpret_cast(tukija_sig)), _prefetch_distance(prefetch_distance), - _id(id), _local_epoch(local_epoch), _global_epoch(global_epoch), - _statistic(statistic), _is_running(is_running), _vacant_channels(v), _stealing_limit(s), _excess_queues(e) + _id(id), _rng(std::mt19937(id)), _local_epoch(local_epoch), _global_epoch(global_epoch), + _statistic(statistic), _is_running(is_running), _vacant_channels(v), _excess_queues(e) { } void Worker::yield_channels(std::uint16_t num, Channel *except) { Channel *chan; do { - chan = _channels.pop_front_or(nullptr); + chan = _channels.pop_front(); if (chan && chan != except) { //Genode::log("Worker ", _id, " returns channel ", chan->id()); - _vacant_channels.push_back(chan); + _vacant_channels.release(chan->id()); _count_channels--; } num--; } while (num && chan); if (current && current != except) { - _vacant_channels.push_back(current); + _vacant_channels.release(current->id()); //Genode::log("Worker ", _id, " returns channel ", current->id()); /* If yield_channels is called by the foreman, it is likely that this happenend due to a previous call to runtime::stop. In this case the foreman will release all channels but channel 0. However, it is possible that the foreman called runtime::stop while processing another channel than 0 which after this call will not belong to it anymore. Thus, to avoid having the foreman process a channel that has been yielded and risking nasty race conditions, the foreman takes the last remaining channel (0). */ if (_id == 0 && current->id() != 0) { - current = _channels.pop_front_or(nullptr); + current = _channels.pop_front(); assert(current != nullptr); } else current = nullptr; + _count_channels--; } - _count_channels--; } +Channel* Worker::get(std::uint64_t idx) +{ + return runtime::scheduler().get_channel(idx); +} + void Worker::execute() { /*{ @@ -73,11 +78,14 @@ void Worker::execute() if (_id != 0) { sleep(); } else if (_id == 0) /* foreman */ { + Genode::log("Foreman starting"); current = _channels.pop_front(); } _is_sleeping = false; + //Genode::log("Waiting for hooter"); wait_for_hooter(); + //Genode::log("Hooter sounded"); runtime::scheduler().register_worker(this); //Genode::log("Worker ", _id, "(", _phys_core_id, ")", @@ -86,22 +94,20 @@ void Worker::execute() TaskInterface *task; const auto core_id = system::topology::core_id(); - //assert(this->_target_core_id == core_id && "Worker not pinned to correct core."); - - auto phase = priority::normal; while (true) { handle_resume(); - - while (!current) { + //handle_channel_occupancy(); + while (!current) + { //Genode::log("Worker ",_id,": No queues for me."); std::uint16_t expect = 0; bool shall_yield = !__atomic_compare_exchange_n(&_my_page->yield_flag, &expect, 2, false, __ATOMIC_SEQ_CST, __ATOMIC_RELAXED); if (shall_yield) { handle_yield(); - return; + continue; } deregister(); sleep(); @@ -116,42 +122,14 @@ void Worker::execute() auto channel_id = current->id(); - if (current->phase() != phase) { - if (phase == priority::normal) - phase = priority::low; - else /* phase == priority::low */ { - phase = priority::normal; - } - //Genode::log("Switched phase to ", (phase == priority::normal)? "normal" : "low"); - } - - if (phase == priority::normal) - current->fill(); - else { - current->fill(); - } - - if (phase == priority::normal && current->empty()) { - Genode::Trace::Timestamp thefts_start = Genode::Trace::timestamp(); - steal(false); - Genode::Trace::Timestamp theft_stop = Genode::Trace::timestamp(); - Genode::Trace::Timestamp cost = theft_stop - thefts_start; - _stealing_cost += cost; - if (cost > _max_cost) - _max_cost = cost; - if ((_min_cost == 0) || (_min_cost > cost)) - _min_cost = cost; - _thefts++; - //_channels.push_back(current); - // current = _channels.pop_front_or(nullptr); - } + current->fill(); if constexpr (config::task_statistics()) { this->_statistic.increment(channel_id); } - while ((task = this->current->next()) != nullptr) + while ((task = this->current->next_task()) != nullptr) { // Whenever the worker-local task-buffer falls under // the prefetch distance, we re-fill the buffer to avoid @@ -221,15 +199,18 @@ void Worker::execute() { runtime::delete_task(core_id, task); } + handle_yield(); + } + steal(false); + handle_yield(); handle_stop(); - current->switch_phase(); _channels.push_back(current); - current = _channels.pop_front_or(nullptr); + current = _channels.pop_front(); } } diff --git a/src/mx/tasking/worker.h b/src/mx/tasking/worker.h index 0e41564..df37743 100644 --- a/src/mx/tasking/worker.h +++ b/src/mx/tasking/worker.h @@ -15,6 +15,9 @@ #include #include #include +#include +#include +#include namespace mx::tasking { /** @@ -24,7 +27,7 @@ class alignas(64) Worker { public: Worker(std::uint16_t id, std::uint16_t target_core_id, std::uint16_t target_numa_node_id, std::uint64_t* volatile tukija_sig, - const util::maybe_atomic &is_running, util::BoundMPMCQueue &v, util::maybe_atomic &s, std::atomic &e, std::uint16_t prefetch_distance, + const util::maybe_atomic &is_running, util::Field_Allocator &v, std::atomic &e, std::uint16_t prefetch_distance, memory::reclamation::LocalEpoch &local_epoch, const std::atomic &global_epoch, profiling::Statistic &statistic) noexcept; @@ -65,6 +68,34 @@ public: [[nodiscard]] std::uint16_t number_of_channels() { return _count_channels; } + Channel *get(std::uint64_t idx); + + inline bool pick_channel(std::uint64_t offset, std::uint64_t limit, bool change_phase_to_normal = false) + { + //Genode::Trace::Timestamp deq_start = Genode::Trace::timestamp(); + std::uint64_t cidx = _vacant_channels.alloc_randomly( offset, limit); + //Genode::Trace::Timestamp deq_stop = Genode::Trace::timestamp(); + //_mean_dequeue_cost += deq_stop - deq_start; + //dequeues++; + + + if (cidx > 0 && cidx < 64) { + //Genode::log("Worker ", _id, "(", _phys_core_id, ") picked channel ", cidx); + Channel *loot = get(cidx); + + if (change_phase_to_normal) + loot->phase(priority::normal); + + //Genode::Trace::Timestamp enq_start = Genode::Trace::timestamp(); + assign(loot); + //Genode::Trace::Timestamp enq_stop = Genode::Trace::timestamp(); + //_mean_enqueue_cost += enq_stop - enq_start; + //enqueues++; + + return true; + } + return false; + } inline bool steal(bool init=true) { @@ -77,35 +108,11 @@ public: if (init) { while (individual_stealing_limit() > 0) { - Channel *loot = _vacant_channels.pop_front_or(nullptr); - if (loot) { - this->assign(loot); - //Genode::log("Worker ", _id, ": Stole channel ", loot->id()); - got_loot |= true; - } else /* no vacant queues anymore, so stop stealing */ + if ((got_loot |= pick_channel(_id, stealing_limit()))) break; } } else if (_excess_queues.load(std::memory_order_relaxed) <= 0) { - Genode::Trace::Timestamp deq_start = Genode::Trace::timestamp(); - Channel *loot = _vacant_channels.pop_front_or(nullptr); - Genode::Trace::Timestamp deq_stop = Genode::Trace::timestamp(); - _mean_dequeue_cost += deq_stop - deq_start; - dequeues++; - if (loot) - { - loot->phase(priority::normal); - Genode::Trace::Timestamp enq_start = Genode::Trace::timestamp(); - this->assign(loot); - Genode::Trace::Timestamp enq_stop = Genode::Trace::timestamp(); - _mean_enqueue_cost += enq_stop - enq_start; - enqueues++; - - got_loot |= true; - if (_vacant_channels.size() == 0) - _excess_queues.store(1); - // Genode::log("Worker ", _id, " Took remaining queue ", loot->id(), " with ", _vacant_channels.size(), - // " excess queues"); - } + got_loot |= pick_channel(1, 63, true); } return got_loot; @@ -194,12 +201,14 @@ private: alignas(64) TaskStack _task_stack; // Channel where tasks are stored for execution. - alignas(64) util::BoundMPMCQueue _channels{config::max_cores()}; + alignas(64) util::Queue _channels{}; alignas(64) struct InfoPage volatile *_my_page{nullptr}; alignas(64) Channel *current{nullptr}; + std::mt19937 _rng; + /** * Profiling data structures */ @@ -225,10 +234,10 @@ private: const util::maybe_atomic &_is_running; // Reference to queue of vacant channels - util::BoundMPMCQueue &_vacant_channels; + util::Field_Allocator &_vacant_channels; - // Limit for stealing - const util::maybe_atomic &_stealing_limit; + // Flag whether this worker may steal channels or not + bool _may_steal{true}; // Global number of excess queues alignas(64) std::atomic &_excess_queues; @@ -324,6 +333,7 @@ private: inline void handle_yield() { if (yield_signaled()) { + _may_steal = true; //Genode::log("Got yield signal ", _phys_core_id); yield_channels(_count_channels, nullptr); _excess_queues.fetch_sub(1); @@ -334,13 +344,14 @@ private: registrate(); wait_for_hooter(); _thefts = 0; + /* _stealing_cost = 0; _max_cost = 0; _min_cost = 0; _mean_enqueue_cost = 0; _mean_dequeue_cost = 0; enqueues = 1; - dequeues = 1; + dequeues = 1;*/ handle_resume(); } } @@ -348,14 +359,19 @@ private: inline void handle_stop() { if (!_is_running) { + _may_steal = true; std::uint16_t expect = 0; bool shall_yield = !__atomic_compare_exchange_n(&_my_page->yield_flag, &expect, 2, false, __ATOMIC_SEQ_CST, __ATOMIC_RELAXED); if (shall_yield) { handle_yield(); return; } - Genode::log("Worker ", _id, ": thefts=", _thefts, " cost_total=", _stealing_cost, "avg cost_per_theft=", _stealing_cost/_thefts, " min cost/theft=", _min_cost, " max cost/theft=", _max_cost, " avg deq=", _mean_dequeue_cost, " avg enq=", _mean_enqueue_cost/enqueues, " #deqs=", dequeues, " #enqs=", enqueues); - // Genode::log("Worker ", _id, " woke up again"); + //Genode::log("Worker ", _id, ": thefs=", _thefts); + // Genode::log("Worker ", _id, ": thefts=", _thefts, " cost_total=", _stealing_cost, "avg cost_per_theft=", + // _stealing_cost/_thefts, " min cost/theft=", _min_cost, " max cost/theft=", _max_cost, " avg deq=", + // _mean_dequeue_cost/dequeues, " avg enq=", _mean_enqueue_cost/enqueues, " #deqs=", dequeues, " #enqs=", + // enqueues); + // Genode::log("Worker ", _id, " woke up again"); yield_channels(_count_channels, nullptr); deregister(); sleep(); @@ -371,13 +387,13 @@ private: wait_for_hooter(); _thefts = 0; - _stealing_cost = 0; + /*_stealing_cost = 0; _max_cost = 0; _min_cost = 0; _mean_enqueue_cost = 0; _mean_dequeue_cost = 0; enqueues = 1; - dequeues = 1; + dequeues = 1;*/ handle_resume(); } } @@ -393,10 +409,19 @@ private: handle_stop(); } // Genode::log("Worker ", _id, " stole ", static_cast(_count_channels), " channels."); - if (_excess_queues.fetch_sub(1) <= 1) - ; - //Genode::log("Entering stealing phase 2"); - current = _channels.pop_front_or(nullptr); + _excess_queues.fetch_sub(1); + // if (_excess_queues.fetch_sub(1) <= 1) + // Genode::log("Entering stealing phase 2"); + current = _channels.pop_front(); + } + } + + void handle_channel_occupancy() + { + if (_may_steal && current->has_excessive_usage_prediction()) { + yield_channels(_count_channels - 1, current); + _may_steal = false; + //Genode::log("Worker ", _id, ": Got channel ", current->id(), " with excessive usage prediction."); } } diff --git a/src/mx/util/field_alloc.h b/src/mx/util/field_alloc.h new file mode 100644 index 0000000..533ee87 --- /dev/null +++ b/src/mx/util/field_alloc.h @@ -0,0 +1,56 @@ +#pragma once +#include "random.h" +#include +#include + +namespace mx::util { + template class Field_Allocator; +} + +template +class mx::util::Field_Allocator +{ + private: + struct field { + alignas(64) std::atomic reserved{false}; + }; + + alignas(64) struct field _fields[C]{}; + alignas(64) std::size_t _count; + std::atomic free_fields; + + public: + Field_Allocator(std::size_t count) : _count(count), free_fields(count) {} + + std::size_t alloc_randomly(unsigned offset, unsigned limit) { + if (free_fields.load(std::memory_order_relaxed) <= 0) + return 0; + + //mx::util::Random rng(Genode::Trace::timestamp()); + + //Genode::log("Searching queue"); + std::size_t candidate = offset; + // +rng.next(limit); + + if (candidate > (offset + limit)) + return 0; + + if (candidate == 0) + return 0; + + for (; candidate < offset + limit; candidate++) { + bool expect = false; + bool success = _fields[candidate].reserved.compare_exchange_strong(expect, true, std::memory_order_acquire, std::memory_order_relaxed); + if (success) { + free_fields.fetch_sub(1); + return candidate; + } + } + return 0; + } + + void release(std::size_t field) { + _fields[field].reserved.store(false); + free_fields.fetch_add(1); + } +}; \ No newline at end of file