Put workers to sleep when runtime is stopped.

This commit is contained in:
Michael Mueller
2024-10-14 15:16:47 +02:00
parent 4935773863
commit f701cb1f8d
4 changed files with 38 additions and 7 deletions

View File

@@ -30,7 +30,7 @@ Scheduler::Scheduler(const mx::util::core_set &core_set, const std::uint16_t pre
new (memory::GlobalHeap::allocate(this->_channel_numa_node_map[worker_id], sizeof(Worker))) new (memory::GlobalHeap::allocate(this->_channel_numa_node_map[worker_id], sizeof(Worker)))
Worker(worker_id, core_id, this->_channel_numa_node_map[worker_id], this->_is_running, Worker(worker_id, core_id, this->_channel_numa_node_map[worker_id], this->_is_running,
prefetch_distance, this->_epoch_manager[worker_id], this->_epoch_manager.global_epoch(), prefetch_distance, this->_epoch_manager[worker_id], this->_epoch_manager.global_epoch(),
this->_statistic, _cout_lock); this->_statistic, _cout_lock, _stop, this->_mutex, _worker_counter);
} }
} }
@@ -64,6 +64,9 @@ void Scheduler::start_and_wait()
// Turning the flag on starts all worker threads to execute tasks. // Turning the flag on starts all worker threads to execute tasks.
this->_is_running = true; this->_is_running = true;
for (auto worker_id = 0; worker_id < this->_core_set.size(); ++worker_id)
this->_stop.SignalAll();
// Wait for the worker threads to end. This will only // Wait for the worker threads to end. This will only
// reached when the _is_running flag is set to false // reached when the _is_running flag is set to false
// from somewhere in the application. // from somewhere in the application.

View File

@@ -18,6 +18,11 @@
#include <mx/util/random.h> #include <mx/util/random.h>
#include <string> #include <string>
#include <mx/synchronization/spinlock.h> #include <mx/synchronization/spinlock.h>
#include <cc/sync.h>
#ifndef SHENANGO
#define SHENANGO
#endif
namespace mx::tasking { namespace mx::tasking {
/** /**
@@ -60,13 +65,19 @@ public:
_worker_affinities[worker_id] = _worker[worker_id]->phys_core_id(); _worker_affinities[worker_id] = _worker[worker_id]->phys_core_id();
} }
this->_profiler.stop(); this->_profiler.stop();
_worker_counter.store(0);
} }
[[nodiscard]] const std::array<std::uint16_t,config::max_cores()> &worker_affinities() { [[nodiscard]] const std::array<std::uint16_t,config::max_cores()> &worker_affinities() {
return _worker_affinities; return _worker_affinities;
} }
void resume() noexcept { _is_running = true; } void resume() noexcept
{
_is_running = true;
for (auto worker_id = 0; worker_id < this->_core_set.size()+15; ++worker_id)
_stop.Signal();
}
/** /**
* @return Core set of this instance. * @return Core set of this instance.
@@ -210,6 +221,11 @@ private:
synchronization::Spinlock _cout_lock{}; synchronization::Spinlock _cout_lock{};
rt::CondVar _stop{};
rt::Mutex _mutex{};
std::atomic<std::uint16_t> _worker_counter{0};
/** /**
* Make a decision whether a task should be scheduled to the local * Make a decision whether a task should be scheduled to the local
* channel or a remote. * channel or a remote.

View File

@@ -13,10 +13,10 @@ using namespace mx::tasking;
Worker::Worker(const std::uint16_t id, const std::uint16_t target_core_id, const std::uint16_t target_numa_node_id, Worker::Worker(const std::uint16_t id, const std::uint16_t target_core_id, const std::uint16_t target_numa_node_id,
const util::maybe_atomic<bool> &is_running, const std::uint16_t prefetch_distance, const util::maybe_atomic<bool> &is_running, const std::uint16_t prefetch_distance,
memory::reclamation::LocalEpoch &local_epoch, memory::reclamation::LocalEpoch &local_epoch,
const std::atomic<memory::reclamation::epoch_t> &global_epoch, profiling::Statistic &statistic, synchronization::Spinlock &cout_lock) noexcept const std::atomic<memory::reclamation::epoch_t> &global_epoch, profiling::Statistic &statistic, synchronization::Spinlock &cout_lock, rt::CondVar &stop, rt::Mutex &mutex, std::atomic<std::uint16_t> &counter) noexcept
: _target_core_id(target_core_id), _prefetch_distance(prefetch_distance), : _target_core_id(target_core_id), _prefetch_distance(prefetch_distance),
_channel(id, target_numa_node_id, prefetch_distance), _local_epoch(local_epoch), _global_epoch(global_epoch), _channel(id, target_numa_node_id, prefetch_distance), _local_epoch(local_epoch), _global_epoch(global_epoch),
_statistic(statistic), _is_running(is_running), _cout_lock(cout_lock) _statistic(statistic), _is_running(is_running), _cout_lock(cout_lock), _stop(stop), _mutex(mutex), _counter(counter)
{ {
} }
@@ -39,9 +39,12 @@ void Worker::execute()
{ {
while (this->_is_running == false) while (this->_is_running == false)
{ {
system::builtin::pause(); rt::Mutex mutex{};
_stop.WaitTimed(&mutex, 10);
_counter.fetch_add(1);
//std::cout << "Worker " << _target_core_id << " woke up " << std::endl;
} }
if constexpr (config::memory_reclamation() == config::UpdateEpochPeriodically) if constexpr (config::memory_reclamation() == config::UpdateEpochPeriodically)
{ {
this->_local_epoch.enter(this->_global_epoch); this->_local_epoch.enter(this->_global_epoch);

View File

@@ -13,7 +13,11 @@
#include <variant> #include <variant>
#include <vector> #include <vector>
#include <mx/synchronization/spinlock.h> #include <mx/synchronization/spinlock.h>
#include <cc/sync.h>
#ifndef SHENANGO
#define SHENANGO
#endif
namespace mx::tasking { namespace mx::tasking {
/** /**
* The worker executes tasks from his own channel, until the "running" flag is false. * The worker executes tasks from his own channel, until the "running" flag is false.
@@ -24,7 +28,7 @@ public:
Worker(std::uint16_t id, std::uint16_t target_core_id, std::uint16_t target_numa_node_id, Worker(std::uint16_t id, std::uint16_t target_core_id, std::uint16_t target_numa_node_id,
const util::maybe_atomic<bool> &is_running, std::uint16_t prefetch_distance, const util::maybe_atomic<bool> &is_running, std::uint16_t prefetch_distance,
memory::reclamation::LocalEpoch &local_epoch, const std::atomic<memory::reclamation::epoch_t> &global_epoch, memory::reclamation::LocalEpoch &local_epoch, const std::atomic<memory::reclamation::epoch_t> &global_epoch,
profiling::Statistic &statistic, synchronization::Spinlock &cout_lock) noexcept; profiling::Statistic &statistic, synchronization::Spinlock &cout_lock, rt::CondVar &stop, rt::Mutex &mutex, std::atomic<std::uint16_t> &counter) noexcept;
~Worker() noexcept = default; ~Worker() noexcept = default;
@@ -75,6 +79,11 @@ private:
// Flag for "running" state of MxTasking. // Flag for "running" state of MxTasking.
const util::maybe_atomic<bool> &_is_running; const util::maybe_atomic<bool> &_is_running;
rt::CondVar &_stop;
rt::Mutex &_mutex;
std::atomic<std::uint16_t> &_counter;
/** /**
* Analyzes the given task and chooses the execution method regarding synchronization. * Analyzes the given task and chooses the execution method regarding synchronization.
* @param task Task to be executed. * @param task Task to be executed.