From 5f2cba19d8206238af8d590222257f679ac5ea5e Mon Sep 17 00:00:00 2001 From: Michael Mueller Date: Tue, 13 Feb 2024 19:43:14 +0100 Subject: [PATCH] Create worker threads using libc to guarantee correct pinning. --- src/mx/tasking/scheduler.cpp | 70 ++++++++++++++++++++++-------------- src/mx/tasking/scheduler.h | 2 +- 2 files changed, 44 insertions(+), 28 deletions(-) diff --git a/src/mx/tasking/scheduler.cpp b/src/mx/tasking/scheduler.cpp index a156cef..1128179 100644 --- a/src/mx/tasking/scheduler.cpp +++ b/src/mx/tasking/scheduler.cpp @@ -6,24 +6,29 @@ #include #include #include +#include +#include +#include using namespace mx::tasking; +std::uint64_t *volatile mx::tasking::runtime::_signal_page; + Scheduler::Scheduler(const mx::util::core_set &core_set, const std::uint16_t prefetch_distance, - memory::dynamic::Allocator &resource_allocator) noexcept + memory::dynamic::Allocator &resource_allocator, std::uint64_t * volatile signal_page) noexcept : _core_set(core_set), _count_channels(core_set.size()), _worker({}), _channel_numa_node_map({0U}), _epoch_manager(core_set.size(), resource_allocator, _is_running), _statistic(_count_channels) { this->_worker.fill(nullptr); this->_channel_numa_node_map.fill(0U); - for (auto worker_id = 0U; worker_id < this->_count_channels; ++worker_id) + for (auto worker_id = 0U; worker_id < mx::system::Environment::topo().global_affinity_space().total(); ++worker_id) { const auto core_id = this->_core_set[worker_id]; this->_channel_numa_node_map[worker_id] = system::topology::node_id(core_id); auto ptr = memory::GlobalHeap::allocate(this->_channel_numa_node_map[worker_id], sizeof(Worker)); this->_worker[worker_id] = new (ptr) - Worker(worker_id, core_id, this->_channel_numa_node_map[worker_id], this->_is_running, + Worker(worker_id, core_id, this->_channel_numa_node_map[worker_id], signal_page, this->_is_running, prefetch_distance, this->_epoch_manager[worker_id], this->_epoch_manager.global_epoch(), this->_statistic); } @@ -42,41 +47,52 @@ Scheduler::~Scheduler() noexcept void Scheduler::start_and_wait() { // Create threads for worker... - std::vector worker_threads(this->_core_set.size() + + /*std::vector worker_threads(this->_core_set.size() + static_cast(config::memory_reclamation() != config::None)); for (auto channel_id = 0U; channel_id < this->_core_set.size(); ++channel_id) { worker_threads[channel_id] = std::thread([this, channel_id] { this->_worker[channel_id]->execute(); }); //system::thread::pin(worker_threads[channel_id], this->_worker[channel_id]->core_id()); + }*/ + Genode::Affinity::Space space = mx::system::Environment::topo().global_affinity_space(); + + std::vector worker_threads(space.total() + + static_cast(config::memory_reclamation() != config::None)); + + for (auto cpu = 0U; cpu < space.total(); ++cpu) { + Genode::String<32> const name{"worker", cpu}; + Libc::pthread_create_from_session(&worker_threads[cpu], Worker::entry, _worker[cpu], 4 * 4096, name.string(), + &mx::system::Environment::cpu(), space.location_of_index(cpu)); } - // ... and epoch management (if enabled). - if constexpr (config::memory_reclamation() != config::None) - { - // In case we enable memory reclamation: Use an additional thread. - worker_threads[this->_core_set.size()] = - std::thread([this] { this->_epoch_manager.enter_epoch_periodically(); }); - } + + // ... and epoch management (if enabled). + if constexpr (config::memory_reclamation() != config::None) + { + // In case we enable memory reclamation: Use an additional thread. + &worker_threads[space.total()], mx::memory::reclamation::EpochManager::enter, &this->_epoch_manager, + 4 * 4096, "epoch_manager", &mx::system::Environment::cpu(), space.location_of_index(space.total()); + } - // Turning the flag on starts all worker threads to execute tasks. - this->_is_running = true; + // Turning the flag on starts all worker threads to execute tasks. + this->_is_running = true; - // Wait for the worker threads to end. This will only - // reached when the _is_running flag is set to false - // from somewhere in the application. - for (auto &worker_thread : worker_threads) - { - worker_thread.join(); - } + // Wait for the worker threads to end. This will only + // reached when the _is_running flag is set to false + // from somewhere in the application. + for (auto &worker_thread : worker_threads) + { + pthread_join(worker_thread, 0); + } - if constexpr (config::memory_reclamation() != config::None) - { - // At this point, no task will execute on any resource; - // but the epoch manager has joined, too. Therefore, - // we will reclaim all memory manually. - this->_epoch_manager.reclaim_all(); - } + if constexpr (config::memory_reclamation() != config::None) + { + // At this point, no task will execute on any resource; + // but the epoch manager has joined, too. Therefore, + // we will reclaim all memory manually. + this->_epoch_manager.reclaim_all(); + } } void Scheduler::schedule(TaskInterface &task, const std::uint16_t current_channel_id) noexcept diff --git a/src/mx/tasking/scheduler.h b/src/mx/tasking/scheduler.h index 55d7b3c..cb1c51e 100644 --- a/src/mx/tasking/scheduler.h +++ b/src/mx/tasking/scheduler.h @@ -27,7 +27,7 @@ class Scheduler { public: Scheduler(const util::core_set &core_set, std::uint16_t prefetch_distance, - memory::dynamic::Allocator &resource_allocator) noexcept; + memory::dynamic::Allocator &resource_allocator, std::uint64_t * volatile signal_page) noexcept; ~Scheduler() noexcept; /**