update to steal a whole queue

This commit is contained in:
2023-12-15 12:44:44 +01:00
parent f0134e9196
commit 3284440c93
2 changed files with 33 additions and 16 deletions

View File

@@ -140,14 +140,21 @@ public:
std::uint8_t numa_node_id() { return _numa_node_id; }
TaskInterface* steal_task() noexcept {
std::unique_ptr<mx::util::Queue<TaskInterface>> steal() noexcept {
std::unique_ptr<mx::util::Queue<TaskInterface>> stolenQueue = nullptr;
if (!_local_queues[mx::tasking::priority::normal].empty()) {
return _local_queues[mx::tasking::priority::normal].pop_front();
stolenQueue = std::make_unique<mx::util::Queue<TaskInterface>>();
while (!_local_queues[mx::tasking::priority::normal].empty()) {
stolenQueue->push_back(_local_queues[mx::tasking::priority::normal].pop_front());
}
}
else if (!_local_queues[mx::tasking::priority::low].empty()) {
return _local_queues[mx::tasking::priority::low].pop_front();
stolenQueue = std::make_unique<mx::util::Queue<TaskInterface>>();
while (!_local_queues[mx::tasking::priority::low].empty()) {
stolenQueue->push_back(_local_queues[mx::tasking::priority::low].pop_front());
}
return nullptr;
}
return stolenQueue;
}
private:

View File

@@ -44,8 +44,6 @@ void Worker::execute()
assert(this->_target_core_id == core_id && "Worker not pinned to correct core.");
const auto channel_id = this->_channel.id();
while (this->_is_running)
{
if constexpr (config::memory_reclamation() == config::UpdateEpochPeriodically)
@@ -136,28 +134,40 @@ void Worker::execute()
}
void Worker::stealTasks(){
std::cout << "stealTasks" << std::endl;
// This assumes that the worker has access to the other workers through
// a data structure named "allWorkers". You will need to adjust this
// based on your program's structure.
// Lock all workers during the stealing process
std::lock_guard<std::mutex> lock(_scheduler->getAllWorkersMutex());
for (auto& worker : _scheduler->getAllWorkers())
{
if (worker == this)
// Skip the current worker and the workers that are already running.
if (worker == this || worker->_channel.empty())
{
continue; // Skip the current worker.
//skip current worker
continue;
}
TaskInterface* stolenTask = worker->_channel.steal_task();
if (stolenTask != nullptr)
Genode::log("trying to steal from worker ", worker->_channel.id(), " to worker ", this->_channel.id(), " ...");
//dont steal from workers that are running or have no tasks
// Try to steal the entire queue from the other worker but only if it is not running.
auto stolenQueue = worker->_channel.steal();
if (stolenQueue != nullptr)
{
// If the stolen task is not null, push it to the current worker's channel
this->_channel.push_back_local(stolenTask);
Genode::log("stealing from worker ", worker->_channel.id(), " to worker ", this->_channel.id(), " ...");
// If the stolen queue is not null, move all tasks to the current worker's channel
while (!stolenQueue->empty()) {
this->_channel.push_back_local(stolenQueue->pop_front());
}
break;
}
}
}
TaskResult Worker::execute_exclusive_latched(const std::uint16_t core_id, const std::uint16_t channel_id,
mx::tasking::TaskInterface *const task)
{