diff --git a/src/mx/tasking/scheduler.cpp b/src/mx/tasking/scheduler.cpp index b79d6b4..9f0dd06 100644 --- a/src/mx/tasking/scheduler.cpp +++ b/src/mx/tasking/scheduler.cpp @@ -30,7 +30,7 @@ Scheduler::Scheduler(const mx::util::core_set &core_set, const std::uint16_t pre new (memory::GlobalHeap::allocate(this->_channel_numa_node_map[worker_id], sizeof(Worker))) Worker(worker_id, core_id, this->_channel_numa_node_map[worker_id], this->_is_running, prefetch_distance, this->_epoch_manager[worker_id], this->_epoch_manager.global_epoch(), - this->_statistic, _cout_lock); + this->_statistic, _cout_lock, _stop, this->_mutex, _worker_counter); } } @@ -64,6 +64,9 @@ void Scheduler::start_and_wait() // Turning the flag on starts all worker threads to execute tasks. this->_is_running = true; + for (auto worker_id = 0; worker_id < this->_core_set.size(); ++worker_id) + this->_stop.SignalAll(); + // Wait for the worker threads to end. This will only // reached when the _is_running flag is set to false // from somewhere in the application. diff --git a/src/mx/tasking/scheduler.h b/src/mx/tasking/scheduler.h index 6c984f3..629ce63 100644 --- a/src/mx/tasking/scheduler.h +++ b/src/mx/tasking/scheduler.h @@ -18,6 +18,11 @@ #include #include #include +#include + +#ifndef SHENANGO +#define SHENANGO +#endif namespace mx::tasking { /** @@ -60,13 +65,19 @@ public: _worker_affinities[worker_id] = _worker[worker_id]->phys_core_id(); } this->_profiler.stop(); + _worker_counter.store(0); } [[nodiscard]] const std::array &worker_affinities() { return _worker_affinities; } - void resume() noexcept { _is_running = true; } + void resume() noexcept + { + _is_running = true; + for (auto worker_id = 0; worker_id < this->_core_set.size()+15; ++worker_id) + _stop.Signal(); + } /** * @return Core set of this instance. @@ -210,6 +221,11 @@ private: synchronization::Spinlock _cout_lock{}; + rt::CondVar _stop{}; + rt::Mutex _mutex{}; + + std::atomic _worker_counter{0}; + /** * Make a decision whether a task should be scheduled to the local * channel or a remote. diff --git a/src/mx/tasking/worker.cpp b/src/mx/tasking/worker.cpp index 195614d..3b4196d 100644 --- a/src/mx/tasking/worker.cpp +++ b/src/mx/tasking/worker.cpp @@ -13,10 +13,10 @@ using namespace mx::tasking; Worker::Worker(const std::uint16_t id, const std::uint16_t target_core_id, const std::uint16_t target_numa_node_id, const util::maybe_atomic &is_running, const std::uint16_t prefetch_distance, memory::reclamation::LocalEpoch &local_epoch, - const std::atomic &global_epoch, profiling::Statistic &statistic, synchronization::Spinlock &cout_lock) noexcept + const std::atomic &global_epoch, profiling::Statistic &statistic, synchronization::Spinlock &cout_lock, rt::CondVar &stop, rt::Mutex &mutex, std::atomic &counter) noexcept : _target_core_id(target_core_id), _prefetch_distance(prefetch_distance), _channel(id, target_numa_node_id, prefetch_distance), _local_epoch(local_epoch), _global_epoch(global_epoch), - _statistic(statistic), _is_running(is_running), _cout_lock(cout_lock) + _statistic(statistic), _is_running(is_running), _cout_lock(cout_lock), _stop(stop), _mutex(mutex), _counter(counter) { } @@ -39,9 +39,12 @@ void Worker::execute() { while (this->_is_running == false) { - system::builtin::pause(); + rt::Mutex mutex{}; + _stop.WaitTimed(&mutex, 10); + _counter.fetch_add(1); + //std::cout << "Worker " << _target_core_id << " woke up " << std::endl; } - + if constexpr (config::memory_reclamation() == config::UpdateEpochPeriodically) { this->_local_epoch.enter(this->_global_epoch); diff --git a/src/mx/tasking/worker.h b/src/mx/tasking/worker.h index 68d917f..3ef4f0b 100644 --- a/src/mx/tasking/worker.h +++ b/src/mx/tasking/worker.h @@ -13,7 +13,11 @@ #include #include #include +#include +#ifndef SHENANGO +#define SHENANGO +#endif namespace mx::tasking { /** * The worker executes tasks from his own channel, until the "running" flag is false. @@ -24,7 +28,7 @@ public: Worker(std::uint16_t id, std::uint16_t target_core_id, std::uint16_t target_numa_node_id, const util::maybe_atomic &is_running, std::uint16_t prefetch_distance, memory::reclamation::LocalEpoch &local_epoch, const std::atomic &global_epoch, - profiling::Statistic &statistic, synchronization::Spinlock &cout_lock) noexcept; + profiling::Statistic &statistic, synchronization::Spinlock &cout_lock, rt::CondVar &stop, rt::Mutex &mutex, std::atomic &counter) noexcept; ~Worker() noexcept = default; @@ -75,6 +79,11 @@ private: // Flag for "running" state of MxTasking. const util::maybe_atomic &_is_running; + rt::CondVar &_stop; + rt::Mutex &_mutex; + + std::atomic &_counter; + /** * Analyzes the given task and chooses the execution method regarding synchronization. * @param task Task to be executed.