diff --git a/src/mx/tasking/channel.h b/src/mx/tasking/channel.h index ed51949..7ead1b4 100644 --- a/src/mx/tasking/channel.h +++ b/src/mx/tasking/channel.h @@ -140,6 +140,16 @@ public: std::uint8_t numa_node_id() { return _numa_node_id; } + TaskInterface* steal_task() noexcept { + if (!_local_queues[mx::tasking::priority::normal].empty()) { + return _local_queues[mx::tasking::priority::normal].pop_front(); + } + else if (!_local_queues[mx::tasking::priority::low].empty()) { + return _local_queues[mx::tasking::priority::low].pop_front(); + } + return nullptr; + } + private: // Backend queues for multiple produces in different NUMA regions and different priorities, alignas(64) diff --git a/src/mx/tasking/scheduler.cpp b/src/mx/tasking/scheduler.cpp index a156cef..8cb04f9 100644 --- a/src/mx/tasking/scheduler.cpp +++ b/src/mx/tasking/scheduler.cpp @@ -26,6 +26,8 @@ Scheduler::Scheduler(const mx::util::core_set &core_set, const std::uint16_t pre Worker(worker_id, core_id, this->_channel_numa_node_map[worker_id], this->_is_running, prefetch_distance, this->_epoch_manager[worker_id], this->_epoch_manager.global_epoch(), this->_statistic); + this->_worker[worker_id]->setScheduler(this); + allWorkers.push_back(this->_worker[worker_id]); } } diff --git a/src/mx/tasking/scheduler.h b/src/mx/tasking/scheduler.h index 55d7b3c..c15917e 100644 --- a/src/mx/tasking/scheduler.h +++ b/src/mx/tasking/scheduler.h @@ -17,6 +17,7 @@ #include #include #include +#include namespace mx::tasking { /** @@ -171,6 +172,16 @@ public: bool operator!=(const util::core_set &cores) const noexcept { return _core_set != cores; } + /** + * @return allWorkers + */ + std::vector& getAllWorkers() noexcept { return allWorkers; } + + /** + * @return allWorkersMutex + */ + std::mutex& getAllWorkersMutex() noexcept { return allWorkersMutex; } + private: // Cores to run the worker threads on. const util::core_set _core_set; @@ -197,6 +208,12 @@ private: // Profiler for idle times. profiling::Profiler _profiler{}; + // All initialized workers. + std::vector allWorkers; + + // Mutex for the worker vector. + std::mutex allWorkersMutex; + /** * Make a decision whether a task should be scheduled to the local * channel or a remote. diff --git a/src/mx/tasking/worker.cpp b/src/mx/tasking/worker.cpp index a851e8c..3496375 100644 --- a/src/mx/tasking/worker.cpp +++ b/src/mx/tasking/worker.cpp @@ -131,6 +131,30 @@ void Worker::execute() runtime::delete_task(core_id, task); } } + stealTasks(); + } +} + +void Worker::stealTasks(){ + std::cout << "stealTasks" << std::endl; + // This assumes that the worker has access to the other workers through + // a data structure named "allWorkers". You will need to adjust this + // based on your program's structure. + std::lock_guard lock(_scheduler->getAllWorkersMutex()); + for (auto& worker : _scheduler->getAllWorkers()) + { + if (worker == this) + { + continue; // Skip the current worker. + } + + TaskInterface* stolenTask = worker->_channel.steal_task(); + if (stolenTask != nullptr) + { + // If the stolen task is not null, push it to the current worker's channel + this->_channel.push_back_local(stolenTask); + break; + } } } diff --git a/src/mx/tasking/worker.h b/src/mx/tasking/worker.h index 1854357..39ff7e5 100644 --- a/src/mx/tasking/worker.h +++ b/src/mx/tasking/worker.h @@ -14,6 +14,9 @@ #include namespace mx::tasking { + +class Scheduler; + /** * The worker executes tasks from his own channel, until the "running" flag is false. */ @@ -40,6 +43,8 @@ public: [[nodiscard]] Channel &channel() noexcept { return _channel; } [[nodiscard]] const Channel &channel() const noexcept { return _channel; } + void setScheduler(Scheduler* scheduler) { _scheduler = scheduler; } + private: // Id of the logical core. const std::uint16_t _target_core_id; @@ -68,6 +73,9 @@ private: // Flag for "running" state of MxTasking. const util::maybe_atomic &_is_running; + // Scheduler + Scheduler* _scheduler; + /** * Analyzes the given task and chooses the execution method regarding synchronization. * @param task Task to be executed. @@ -126,5 +134,8 @@ private: */ TaskResult execute_optimistic_read(std::uint16_t core_id, std::uint16_t channel_id, resource::ResourceInterface *resource, TaskInterface *task); + + void stealTasks(); + }; } // namespace mx::tasking \ No newline at end of file