first tests for work stealing

This commit is contained in:
2023-07-10 10:19:34 +02:00
parent fcf0a2810b
commit f0134e9196
5 changed files with 64 additions and 0 deletions

View File

@@ -140,6 +140,16 @@ public:
std::uint8_t numa_node_id() { return _numa_node_id; }
TaskInterface* steal_task() noexcept {
if (!_local_queues[mx::tasking::priority::normal].empty()) {
return _local_queues[mx::tasking::priority::normal].pop_front();
}
else if (!_local_queues[mx::tasking::priority::low].empty()) {
return _local_queues[mx::tasking::priority::low].pop_front();
}
return nullptr;
}
private:
// Backend queues for multiple produces in different NUMA regions and different priorities,
alignas(64)

View File

@@ -26,6 +26,8 @@ Scheduler::Scheduler(const mx::util::core_set &core_set, const std::uint16_t pre
Worker(worker_id, core_id, this->_channel_numa_node_map[worker_id], this->_is_running,
prefetch_distance, this->_epoch_manager[worker_id], this->_epoch_manager.global_epoch(),
this->_statistic);
this->_worker[worker_id]->setScheduler(this);
allWorkers.push_back(this->_worker[worker_id]);
}
}

View File

@@ -17,6 +17,7 @@
#include <mx/util/core_set.h>
#include <mx/util/random.h>
#include <string>
#include <mutex>
namespace mx::tasking {
/**
@@ -171,6 +172,16 @@ public:
bool operator!=(const util::core_set &cores) const noexcept { return _core_set != cores; }
/**
* @return allWorkers
*/
std::vector<Worker *>& getAllWorkers() noexcept { return allWorkers; }
/**
* @return allWorkersMutex
*/
std::mutex& getAllWorkersMutex() noexcept { return allWorkersMutex; }
private:
// Cores to run the worker threads on.
const util::core_set _core_set;
@@ -197,6 +208,12 @@ private:
// Profiler for idle times.
profiling::Profiler _profiler{};
// All initialized workers.
std::vector<Worker *> allWorkers;
// Mutex for the worker vector.
std::mutex allWorkersMutex;
/**
* Make a decision whether a task should be scheduled to the local
* channel or a remote.

View File

@@ -131,6 +131,30 @@ void Worker::execute()
runtime::delete_task(core_id, task);
}
}
stealTasks();
}
}
void Worker::stealTasks(){
std::cout << "stealTasks" << std::endl;
// This assumes that the worker has access to the other workers through
// a data structure named "allWorkers". You will need to adjust this
// based on your program's structure.
std::lock_guard<std::mutex> lock(_scheduler->getAllWorkersMutex());
for (auto& worker : _scheduler->getAllWorkers())
{
if (worker == this)
{
continue; // Skip the current worker.
}
TaskInterface* stolenTask = worker->_channel.steal_task();
if (stolenTask != nullptr)
{
// If the stolen task is not null, push it to the current worker's channel
this->_channel.push_back_local(stolenTask);
break;
}
}
}

View File

@@ -14,6 +14,9 @@
#include <vector>
namespace mx::tasking {
class Scheduler;
/**
* The worker executes tasks from his own channel, until the "running" flag is false.
*/
@@ -40,6 +43,8 @@ public:
[[nodiscard]] Channel &channel() noexcept { return _channel; }
[[nodiscard]] const Channel &channel() const noexcept { return _channel; }
void setScheduler(Scheduler* scheduler) { _scheduler = scheduler; }
private:
// Id of the logical core.
const std::uint16_t _target_core_id;
@@ -68,6 +73,9 @@ private:
// Flag for "running" state of MxTasking.
const util::maybe_atomic<bool> &_is_running;
// Scheduler
Scheduler* _scheduler;
/**
* Analyzes the given task and chooses the execution method regarding synchronization.
* @param task Task to be executed.
@@ -126,5 +134,8 @@ private:
*/
TaskResult execute_optimistic_read(std::uint16_t core_id, std::uint16_t channel_id,
resource::ResourceInterface *resource, TaskInterface *task);
void stealTasks();
};
} // namespace mx::tasking