From 28be6142f65a5aa22db47a5478e860aa7dfc2446 Mon Sep 17 00:00:00 2001 From: Michael Mueller Date: Thu, 10 Oct 2024 18:38:31 +0200 Subject: [PATCH] Avoid complete restart of runtime environment. --- .../blinktree_benchmark/benchmark.cpp | 12 +++++++-- .../blinktree_benchmark/benchmark.h | 25 +++++++++++++++++++ src/mx/tasking/runtime.h | 6 +++++ src/mx/tasking/scheduler.cpp | 3 ++- src/mx/tasking/scheduler.h | 13 ++++++++++ 5 files changed, 56 insertions(+), 3 deletions(-) diff --git a/src/application/blinktree_benchmark/benchmark.cpp b/src/application/blinktree_benchmark/benchmark.cpp index e10b7e9..ad01035 100644 --- a/src/application/blinktree_benchmark/benchmark.cpp +++ b/src/application/blinktree_benchmark/benchmark.cpp @@ -116,18 +116,21 @@ void Benchmark::requests_finished() { const auto open_requests = --this->_open_requests; + //std::cout << "Finished request. " << this->_open_requests << " to go." << std::endl; + if (open_requests == 0U) // All request schedulers are done. { std::uint16_t core_id = mx::system::topology::core_id(); - if (core_id != 0) { + /*if (core_id != 0) { this->_open_requests++; auto *stop_task = mx::tasking::runtime::new_task(0U, *this); stop_task->annotate(static_cast(0)); mx::tasking::runtime::spawn(*stop_task, core_id); return; - } + }*/ // Stop and print time (and performance counter). const auto result = this->_chronometer.stop(this->_workload.size()); + std::cout << "Benchmark finished." << std::endl; mx::tasking::runtime::stop(); std::cout << result << std::endl; @@ -205,6 +208,11 @@ void Benchmark::requests_finished() { this->_tree.reset(nullptr); } + + auto *restart_task = mx::tasking::runtime::new_task(0U, *this); + restart_task->annotate(static_cast(0)); + mx::tasking::runtime::spawn(*restart_task, core_id); + mx::tasking::runtime::resume(); } } diff --git a/src/application/blinktree_benchmark/benchmark.h b/src/application/blinktree_benchmark/benchmark.h index 13a4d67..037cc4e 100644 --- a/src/application/blinktree_benchmark/benchmark.h +++ b/src/application/blinktree_benchmark/benchmark.h @@ -13,6 +13,7 @@ #include #include #include +#include namespace application::blinktree_benchmark { /** @@ -116,6 +117,7 @@ class StartMeasurementTask : public mx::tasking::TaskInterface mx::tasking::TaskResult execute(const std::uint16_t, const std::uint16_t) override { _benchmark._chronometer.start(static_cast(static_cast(_benchmark._workload)), _benchmark._current_iteration + 1, _benchmark._cores.current()); + std::cout << "Started benchmark" << std::endl; return mx::tasking::TaskResult::make_remove(); } }; @@ -134,4 +136,27 @@ class StopMeasurementTask : public mx::tasking::TaskInterface return mx::tasking::TaskResult::make_remove(); } }; +class RestartTask : public mx::tasking::TaskInterface +{ +private: + Benchmark &_benchmark; + +public: + constexpr RestartTask(Benchmark &benchmark) : _benchmark(benchmark) {} + ~RestartTask() override = default; + + mx::tasking::TaskResult execute(const std::uint16_t core_id, const std::uint16_t channel_id) override + { + if (_benchmark.core_set()) + { + _benchmark.start(); + } + else + { + std::cout << "Benchmark finished." << std::endl; + mx::tasking::runtime::stop(); + } + return mx::tasking::TaskResult::make_remove(); + } +}; } // namespace application::blinktree_benchmark diff --git a/src/mx/tasking/runtime.h b/src/mx/tasking/runtime.h index 9d2fb1f..6b4fc43 100644 --- a/src/mx/tasking/runtime.h +++ b/src/mx/tasking/runtime.h @@ -125,6 +125,12 @@ public: */ static void stop() noexcept { _scheduler->interrupt(); } + static void resume() noexcept { _scheduler->resume(); } + + static const std::array &worker_affinities() { + return _scheduler->worker_affinities(); + } + /** * Creates a new task. * @param core_id Core to allocate memory from. diff --git a/src/mx/tasking/scheduler.cpp b/src/mx/tasking/scheduler.cpp index 2bb062f..b79d6b4 100644 --- a/src/mx/tasking/scheduler.cpp +++ b/src/mx/tasking/scheduler.cpp @@ -18,6 +18,7 @@ Scheduler::Scheduler(const mx::util::core_set &core_set, const std::uint16_t pre : _core_set(core_set), _count_channels(core_set.size()), _worker({}), _channel_numa_node_map({0U}), _epoch_manager(core_set.size(), resource_allocator, _is_running), _statistic(_count_channels) { + std::cout << "Creating scheduler" << std::endl; this->_worker.fill(nullptr); this->_channel_numa_node_map.fill(0U); @@ -29,7 +30,7 @@ Scheduler::Scheduler(const mx::util::core_set &core_set, const std::uint16_t pre new (memory::GlobalHeap::allocate(this->_channel_numa_node_map[worker_id], sizeof(Worker))) Worker(worker_id, core_id, this->_channel_numa_node_map[worker_id], this->_is_running, prefetch_distance, this->_epoch_manager[worker_id], this->_epoch_manager.global_epoch(), - this->_statistic); + this->_statistic, _cout_lock); } } diff --git a/src/mx/tasking/scheduler.h b/src/mx/tasking/scheduler.h index 55d7b3c..6c984f3 100644 --- a/src/mx/tasking/scheduler.h +++ b/src/mx/tasking/scheduler.h @@ -17,6 +17,7 @@ #include #include #include +#include namespace mx::tasking { /** @@ -55,9 +56,18 @@ public: void interrupt() noexcept { _is_running = false; + for (std::uint16_t worker_id = 0; worker_id < _count_channels; worker_id++) { + _worker_affinities[worker_id] = _worker[worker_id]->phys_core_id(); + } this->_profiler.stop(); } + [[nodiscard]] const std::array &worker_affinities() { + return _worker_affinities; + } + + void resume() noexcept { _is_running = true; } + /** * @return Core set of this instance. */ @@ -191,12 +201,15 @@ private: // Epoch manager for memory reclamation, alignas(64) memory::reclamation::EpochManager _epoch_manager; + alignas(64) std::array _worker_affinities{0}; // Profiler for task statistics. profiling::Statistic _statistic; // Profiler for idle times. profiling::Profiler _profiler{}; + synchronization::Spinlock _cout_lock{}; + /** * Make a decision whether a task should be scheduled to the local * channel or a remote.