diff --git a/src/mx/system/environment.h b/src/mx/system/environment.h index ce864ea..024f09c 100644 --- a/src/mx/system/environment.h +++ b/src/mx/system/environment.h @@ -53,7 +53,9 @@ public: * @return Libc::Env& */ static Libc::Env &env() { return Environment::get_instance().getenv(); } - + + static Libc::Env *envp() { return Environment::get_instance()._env; } + /** * @brief Set the libc env object * diff --git a/src/mx/system/topology.h b/src/mx/system/topology.h index ecd84ff..94bba75 100644 --- a/src/mx/system/topology.h +++ b/src/mx/system/topology.h @@ -23,7 +23,7 @@ public: static std::uint16_t core_id() { Genode::Affinity::Location loc = Genode::Thread::myself()->affinity(); - unsigned width = Environment::cpu().affinity_space().width(); + unsigned width = Environment::topo().global_affinity_space().total(); return std::uint16_t(loc.xpos() + loc.ypos() * width); } @@ -39,11 +39,11 @@ public: /** * @return The greatest NUMA region identifier. */ - static std::uint8_t max_node_id() { return std::uint8_t(Environment::topo().node_count()-1); } + static std::uint8_t max_node_id() { return 7; /*std::uint8_t(Environment::topo().node_count()-1);*/ } /** * @return Number of available cores. */ - static std::uint16_t count_cores() { return std::uint16_t(Environment::cpu().affinity_space().total()); } + static std::uint16_t count_cores() { return std::uint16_t(Environment::topo().global_affinity_space().total()); } }; } // namespace mx::system \ No newline at end of file diff --git a/src/mx/tasking/config.h b/src/mx/tasking/config.h index 493a2c9..4d0b7b2 100644 --- a/src/mx/tasking/config.h +++ b/src/mx/tasking/config.h @@ -28,6 +28,6 @@ public: // If enabled, memory will be reclaimed while using optimistic // synchronization by epoch-based reclamation. Otherwise, freeing // memory is unsafe. - static constexpr auto memory_reclamation() { return memory_reclamation_scheme::UpdateEpochPeriodically; } + static constexpr auto memory_reclamation() { return memory_reclamation_scheme::None; } }; } // namespace mx::tasking diff --git a/src/mx/tasking/runtime.h b/src/mx/tasking/runtime.h index 42aaaf0..07e6bdf 100644 --- a/src/mx/tasking/runtime.h +++ b/src/mx/tasking/runtime.h @@ -136,6 +136,8 @@ public: */ static void stop() noexcept { _scheduler->interrupt(); } + static void resume() noexcept { _scheduler->resume(); } + /** * Creates a new task. * @param core_id Core to allocate memory from. diff --git a/src/mx/tasking/scheduler.cpp b/src/mx/tasking/scheduler.cpp index 1128179..47b84a0 100644 --- a/src/mx/tasking/scheduler.cpp +++ b/src/mx/tasking/scheduler.cpp @@ -9,6 +9,8 @@ #include #include #include +#include +#include using namespace mx::tasking; @@ -60,19 +62,29 @@ void Scheduler::start_and_wait() std::vector worker_threads(space.total() + static_cast(config::memory_reclamation() != config::None)); - for (auto cpu = 0U; cpu < space.total(); ++cpu) { + Nova::mword_t start_cpu = 0; + Nova::cpu_id(start_cpu); + + for (auto cpu = 1U; cpu < space.total(); ++cpu) { Genode::String<32> const name{"worker", cpu}; Libc::pthread_create_from_session(&worker_threads[cpu], Worker::entry, _worker[cpu], 4 * 4096, name.string(), - &mx::system::Environment::cpu(), space.location_of_index(cpu)); + &mx::system::Environment::envp()->cpu(), space.location_of_index(cpu)); } - - // ... and epoch management (if enabled). - if constexpr (config::memory_reclamation() != config::None) - { - // In case we enable memory reclamation: Use an additional thread. - &worker_threads[space.total()], mx::memory::reclamation::EpochManager::enter, &this->_epoch_manager, - 4 * 4096, "epoch_manager", &mx::system::Environment::cpu(), space.location_of_index(space.total()); + + Genode::log("Creating foreman thread on CPU ", start_cpu); + + Libc::pthread_create_from_session(&worker_threads[0], Worker::entry, _worker[0], 4 * 4096, "foreman", + &mx::system::Environment::envp()->cpu(), space.location_of_index(0) ); + + Genode::log("Created foreman thread"); + + // ... and epoch management (if enabled). + if constexpr (config::memory_reclamation() != config::None) + { + // In case we enable memory reclamation: Use an additional thread. + &worker_threads[space.total()], mx::memory::reclamation::EpochManager::enter, &this->_epoch_manager, 4 * 4096, + "epoch_manager", &mx::system::Environment::cpu(), space.location_of_index(space.total()); } // Turning the flag on starts all worker threads to execute tasks. diff --git a/src/mx/tasking/scheduler.h b/src/mx/tasking/scheduler.h index cb1c51e..7100326 100644 --- a/src/mx/tasking/scheduler.h +++ b/src/mx/tasking/scheduler.h @@ -58,6 +58,9 @@ public: this->_profiler.stop(); } + void resume() noexcept { _is_running = true; + } + /** * @return Core set of this instance. */ diff --git a/src/mx/tasking/worker.cpp b/src/mx/tasking/worker.cpp index 9e9c645..512f0c0 100644 --- a/src/mx/tasking/worker.cpp +++ b/src/mx/tasking/worker.cpp @@ -34,19 +34,26 @@ void Worker::execute() self->pin(loc); }*/ + if (_channel.id() != 0) + sleep(); while (this->_is_running == false) { system::builtin::pause(); } + TaskInterface *task; const auto core_id = system::topology::core_id(); //assert(this->_target_core_id == core_id && "Worker not pinned to correct core."); const auto channel_id = this->_channel.id(); - const auto phys_core_id = system::Environment::topo().phys_id(Genode::Thread::myself()->affinity()); + Nova::mword_t pcpu = 0; + Nova::cpu_id(pcpu); - std::uint64_t *volatile tukija_signal = &_tukija_signal[phys_core_id]; + _phys_core_id = pcpu; + + //Genode::log("Worker ", _channel.id(), "(", _phys_core_id ,")", " woke up"); + std::uint64_t *volatile tukija_signal = &_tukija_signal[_phys_core_id]; while (this->_is_running) { @@ -58,7 +65,9 @@ void Worker::execute() this->_channel_size = this->_channel.fill(); if (this->_channel_size == 0) { - Nova::yield(true); + //Genode::log("Channel ", _channel.id(), " empty. Going to sleep"); + sleep(); + //Genode::log("Worker on CPU ", _phys_core_id, " woke up at ", Genode::Trace::timestamp()); } if constexpr (config::task_statistics()) @@ -138,10 +147,13 @@ void Worker::execute() } if (__atomic_load_n(tukija_signal, __ATOMIC_SEQ_CST)) { - Nova::yield(false); + Genode::log("Got yield signal ", _phys_core_id); + yield(); } } } + //Genode::log("Worker on CPU ", _phys_core_id, " going to stop"); + sleep(); } TaskResult Worker::execute_exclusive_latched(const std::uint16_t core_id, const std::uint16_t channel_id, diff --git a/src/mx/tasking/worker.h b/src/mx/tasking/worker.h index e504320..271f8cc 100644 --- a/src/mx/tasking/worker.h +++ b/src/mx/tasking/worker.h @@ -12,6 +12,8 @@ #include #include #include +#include +#include namespace mx::tasking { /** @@ -42,6 +44,19 @@ public: return nullptr; } + void wake() { _is_sleeping = false; + //Genode::log("Waking worker ", _channel.id(), " on CPU ", _phys_core_id); + if (Nova::wake_core(static_cast(_phys_core_id)) != Nova::NOVA_OK) + ; + //Genode::log("Failed to wake up worker on CPU ", _phys_core_id); + /*Nova::mword_t alloc; + Nova::alloc_cores(1, alloc); + Genode::log("Woke core cmap = ", alloc);*/ + } + + bool sleeping() { return _is_sleeping; } + + /** * @return Id of the logical core this worker runs on. */ @@ -57,6 +72,8 @@ private: // Distance of prefetching tasks. const std::uint16_t _prefetch_distance; + std::uint16_t _phys_core_id{0}; + std::int32_t _channel_size{0U}; // Stack for persisting tasks in optimistic execution. Optimistically @@ -81,6 +98,18 @@ private: // Communication channel to Tukija std::uint64_t *volatile _tukija_signal; + // Flag for "sleeping" state of this worker + util::maybe_atomic _is_sleeping{false}; + + void sleep() { //_is_sleeping = true; + Nova::yield(); + } + + void yield() { _is_sleeping = true; + Nova::yield(false); + } + + /** * Analyzes the given task and chooses the execution method regarding synchronization. * @param task Task to be executed.