From 3284440c93d9100a9ebe4add512ccc52a7579d92 Mon Sep 17 00:00:00 2001 From: Denshooter Date: Fri, 15 Dec 2023 12:44:44 +0100 Subject: [PATCH] update to steal a whole queue --- src/mx/tasking/channel.h | 15 +++++++++++---- src/mx/tasking/worker.cpp | 34 ++++++++++++++++++++++------------ 2 files changed, 33 insertions(+), 16 deletions(-) diff --git a/src/mx/tasking/channel.h b/src/mx/tasking/channel.h index 7ead1b4..efe849d 100644 --- a/src/mx/tasking/channel.h +++ b/src/mx/tasking/channel.h @@ -140,14 +140,21 @@ public: std::uint8_t numa_node_id() { return _numa_node_id; } - TaskInterface* steal_task() noexcept { + std::unique_ptr> steal() noexcept { + std::unique_ptr> stolenQueue = nullptr; if (!_local_queues[mx::tasking::priority::normal].empty()) { - return _local_queues[mx::tasking::priority::normal].pop_front(); + stolenQueue = std::make_unique>(); + while (!_local_queues[mx::tasking::priority::normal].empty()) { + stolenQueue->push_back(_local_queues[mx::tasking::priority::normal].pop_front()); + } } else if (!_local_queues[mx::tasking::priority::low].empty()) { - return _local_queues[mx::tasking::priority::low].pop_front(); + stolenQueue = std::make_unique>(); + while (!_local_queues[mx::tasking::priority::low].empty()) { + stolenQueue->push_back(_local_queues[mx::tasking::priority::low].pop_front()); + } } - return nullptr; + return stolenQueue; } private: diff --git a/src/mx/tasking/worker.cpp b/src/mx/tasking/worker.cpp index 3496375..ed764a4 100644 --- a/src/mx/tasking/worker.cpp +++ b/src/mx/tasking/worker.cpp @@ -44,8 +44,6 @@ void Worker::execute() assert(this->_target_core_id == core_id && "Worker not pinned to correct core."); const auto channel_id = this->_channel.id(); - - while (this->_is_running) { if constexpr (config::memory_reclamation() == config::UpdateEpochPeriodically) @@ -136,28 +134,40 @@ void Worker::execute() } void Worker::stealTasks(){ - std::cout << "stealTasks" << std::endl; - // This assumes that the worker has access to the other workers through - // a data structure named "allWorkers". You will need to adjust this - // based on your program's structure. + // Lock all workers during the stealing process std::lock_guard lock(_scheduler->getAllWorkersMutex()); + for (auto& worker : _scheduler->getAllWorkers()) { - if (worker == this) + // Skip the current worker and the workers that are already running. + if (worker == this || worker->_channel.empty()) { - continue; // Skip the current worker. + //skip current worker + continue; } - TaskInterface* stolenTask = worker->_channel.steal_task(); - if (stolenTask != nullptr) + Genode::log("trying to steal from worker ", worker->_channel.id(), " to worker ", this->_channel.id(), " ..."); + + //dont steal from workers that are running or have no tasks + + + + // Try to steal the entire queue from the other worker but only if it is not running. + auto stolenQueue = worker->_channel.steal(); + + if (stolenQueue != nullptr) { - // If the stolen task is not null, push it to the current worker's channel - this->_channel.push_back_local(stolenTask); + Genode::log("stealing from worker ", worker->_channel.id(), " to worker ", this->_channel.id(), " ..."); + // If the stolen queue is not null, move all tasks to the current worker's channel + while (!stolenQueue->empty()) { + this->_channel.push_back_local(stolenQueue->pop_front()); + } break; } } } + TaskResult Worker::execute_exclusive_latched(const std::uint16_t core_id, const std::uint16_t channel_id, mx::tasking::TaskInterface *const task) {