diff --git a/src/mx/tasking/scheduler.cpp b/src/mx/tasking/scheduler.cpp index 73d7c10..eefde76 100644 --- a/src/mx/tasking/scheduler.cpp +++ b/src/mx/tasking/scheduler.cpp @@ -41,22 +41,17 @@ Scheduler::~Scheduler() noexcept void Scheduler::start_and_wait() { // Create threads for worker... - // TODO: Use Genode's thread interface instead of POSIX threads - std::vector worker_threads(this->_core_set.size() + - static_cast(config::memory_reclamation() != config::None)); + // Done: Use Genode's thread interface instead of POSIX threads for (auto channel_id = 0U; channel_id < this->_core_set.size(); ++channel_id) { this->_worker[channel_id]->start(); - - system::thread::pin(worker_threads[channel_id], this->_worker[channel_id]->core_id()); } // ... and epoch management (if enabled). if constexpr (config::memory_reclamation() != config::None) { // In case we enable memory reclamation: Use an additional thread. - worker_threads[this->_core_set.size()] = - std::thread([this] { this->_epoch_manager.enter_epoch_periodically(); }); + this->_epoch_manager.start(); } // Turning the flag on starts all worker threads to execute tasks. @@ -65,13 +60,14 @@ void Scheduler::start_and_wait() // Wait for the worker threads to end. This will only // reached when the _is_running flag is set to false // from somewhere in the application. - for (auto &worker_thread : worker_threads) + for (auto &worker : this->_worker) { - worker_thread.join(); + worker.join(); } if constexpr (config::memory_reclamation() != config::None) { + this->_epoch_manager.join(); // At this point, no task will execute on any resource; // but the epoch manager has joined, too. Therefore, // we will reclaim all memory manually. @@ -138,7 +134,7 @@ void Scheduler::schedule(TaskInterface &task, const std::uint16_t current_channe // The developer assigned a fixed NUMA region to the task. else if (task.has_node_annotated()) { - // TODO: Select random channel @ node, based on load + // TODO: @jmuehlig Select random channel @ node, based on load assert(false && "NOT IMPLEMENTED: Task scheduling for node."); } @@ -179,7 +175,7 @@ void Scheduler::schedule(TaskInterface &task) noexcept } else if (task.has_node_annotated()) { - // TODO: Select random channel @ node, based on load + // TODO: @jmuehlig Select random channel @ node, based on load assert(false && "NOT IMPLEMENTED: Task scheduling for node."); } else diff --git a/src/mx/tasking/worker.cpp b/src/mx/tasking/worker.cpp index d49cf61..1e414ed 100644 --- a/src/mx/tasking/worker.cpp +++ b/src/mx/tasking/worker.cpp @@ -13,7 +13,8 @@ Worker::Worker(const std::uint16_t id, const std::uint16_t target_core_id, const const util::maybe_atomic &is_running, const std::uint16_t prefetch_distance, memory::reclamation::LocalEpoch &local_epoch, const std::atomic &global_epoch, profiling::Statistic &statistic) noexcept - : _target_core_id(target_core_id), _prefetch_distance(prefetch_distance), + : Thread(system::Environment::env, Name("Worker ", id), 4*4096, system::Environment::env.cpu().affinity_space().location_of_index(target_core_id), Weight(), system::Environment::env.cpu()), + _target_core_id(target_core_id), _prefetch_distance(prefetch_distance), _channel(id, target_numa_node_id, prefetch_distance), _local_epoch(local_epoch), _global_epoch(global_epoch), _statistic(statistic), _is_running(is_running) {