Create worker threads using libc to guarantee correct pinning.

This commit is contained in:
Michael Mueller
2024-02-13 19:43:14 +01:00
parent c177c8cf50
commit 5f2cba19d8
2 changed files with 44 additions and 28 deletions

View File

@@ -6,24 +6,29 @@
#include <thread> #include <thread>
#include <vector> #include <vector>
#include <base/log.h> #include <base/log.h>
#include <mx/tasking/runtime.h>
#include <internal/thread_create.h>
#include <base/affinity.h>
using namespace mx::tasking; using namespace mx::tasking;
std::uint64_t *volatile mx::tasking::runtime::_signal_page;
Scheduler::Scheduler(const mx::util::core_set &core_set, const std::uint16_t prefetch_distance, Scheduler::Scheduler(const mx::util::core_set &core_set, const std::uint16_t prefetch_distance,
memory::dynamic::Allocator &resource_allocator) noexcept memory::dynamic::Allocator &resource_allocator, std::uint64_t * volatile signal_page) noexcept
: _core_set(core_set), _count_channels(core_set.size()), _worker({}), _channel_numa_node_map({0U}), : _core_set(core_set), _count_channels(core_set.size()), _worker({}), _channel_numa_node_map({0U}),
_epoch_manager(core_set.size(), resource_allocator, _is_running), _statistic(_count_channels) _epoch_manager(core_set.size(), resource_allocator, _is_running), _statistic(_count_channels)
{ {
this->_worker.fill(nullptr); this->_worker.fill(nullptr);
this->_channel_numa_node_map.fill(0U); this->_channel_numa_node_map.fill(0U);
for (auto worker_id = 0U; worker_id < this->_count_channels; ++worker_id) for (auto worker_id = 0U; worker_id < mx::system::Environment::topo().global_affinity_space().total(); ++worker_id)
{ {
const auto core_id = this->_core_set[worker_id]; const auto core_id = this->_core_set[worker_id];
this->_channel_numa_node_map[worker_id] = system::topology::node_id(core_id); this->_channel_numa_node_map[worker_id] = system::topology::node_id(core_id);
auto ptr = memory::GlobalHeap::allocate(this->_channel_numa_node_map[worker_id], sizeof(Worker)); auto ptr = memory::GlobalHeap::allocate(this->_channel_numa_node_map[worker_id], sizeof(Worker));
this->_worker[worker_id] = this->_worker[worker_id] =
new (ptr) new (ptr)
Worker(worker_id, core_id, this->_channel_numa_node_map[worker_id], this->_is_running, Worker(worker_id, core_id, this->_channel_numa_node_map[worker_id], signal_page, this->_is_running,
prefetch_distance, this->_epoch_manager[worker_id], this->_epoch_manager.global_epoch(), prefetch_distance, this->_epoch_manager[worker_id], this->_epoch_manager.global_epoch(),
this->_statistic); this->_statistic);
} }
@@ -42,41 +47,52 @@ Scheduler::~Scheduler() noexcept
void Scheduler::start_and_wait() void Scheduler::start_and_wait()
{ {
// Create threads for worker... // Create threads for worker...
std::vector<std::thread> worker_threads(this->_core_set.size() + /*std::vector<std::thread> worker_threads(this->_core_set.size() +
static_cast<std::uint16_t>(config::memory_reclamation() != config::None)); static_cast<std::uint16_t>(config::memory_reclamation() != config::None));
for (auto channel_id = 0U; channel_id < this->_core_set.size(); ++channel_id) for (auto channel_id = 0U; channel_id < this->_core_set.size(); ++channel_id)
{ {
worker_threads[channel_id] = std::thread([this, channel_id] { this->_worker[channel_id]->execute(); }); worker_threads[channel_id] = std::thread([this, channel_id] { this->_worker[channel_id]->execute(); });
//system::thread::pin(worker_threads[channel_id], this->_worker[channel_id]->core_id()); //system::thread::pin(worker_threads[channel_id], this->_worker[channel_id]->core_id());
}*/
Genode::Affinity::Space space = mx::system::Environment::topo().global_affinity_space();
std::vector<pthread_t> worker_threads(space.total() +
static_cast<std::uint16_t>(config::memory_reclamation() != config::None));
for (auto cpu = 0U; cpu < space.total(); ++cpu) {
Genode::String<32> const name{"worker", cpu};
Libc::pthread_create_from_session(&worker_threads[cpu], Worker::entry, _worker[cpu], 4 * 4096, name.string(),
&mx::system::Environment::cpu(), space.location_of_index(cpu));
} }
// ... and epoch management (if enabled).
if constexpr (config::memory_reclamation() != config::None) // ... and epoch management (if enabled).
{ if constexpr (config::memory_reclamation() != config::None)
// In case we enable memory reclamation: Use an additional thread. {
worker_threads[this->_core_set.size()] = // In case we enable memory reclamation: Use an additional thread.
std::thread([this] { this->_epoch_manager.enter_epoch_periodically(); }); &worker_threads[space.total()], mx::memory::reclamation::EpochManager::enter, &this->_epoch_manager,
} 4 * 4096, "epoch_manager", &mx::system::Environment::cpu(), space.location_of_index(space.total());
}
// Turning the flag on starts all worker threads to execute tasks. // Turning the flag on starts all worker threads to execute tasks.
this->_is_running = true; this->_is_running = true;
// Wait for the worker threads to end. This will only // Wait for the worker threads to end. This will only
// reached when the _is_running flag is set to false // reached when the _is_running flag is set to false
// from somewhere in the application. // from somewhere in the application.
for (auto &worker_thread : worker_threads) for (auto &worker_thread : worker_threads)
{ {
worker_thread.join(); pthread_join(worker_thread, 0);
} }
if constexpr (config::memory_reclamation() != config::None) if constexpr (config::memory_reclamation() != config::None)
{ {
// At this point, no task will execute on any resource; // At this point, no task will execute on any resource;
// but the epoch manager has joined, too. Therefore, // but the epoch manager has joined, too. Therefore,
// we will reclaim all memory manually. // we will reclaim all memory manually.
this->_epoch_manager.reclaim_all(); this->_epoch_manager.reclaim_all();
} }
} }
void Scheduler::schedule(TaskInterface &task, const std::uint16_t current_channel_id) noexcept void Scheduler::schedule(TaskInterface &task, const std::uint16_t current_channel_id) noexcept

View File

@@ -27,7 +27,7 @@ class Scheduler
{ {
public: public:
Scheduler(const util::core_set &core_set, std::uint16_t prefetch_distance, Scheduler(const util::core_set &core_set, std::uint16_t prefetch_distance,
memory::dynamic::Allocator &resource_allocator) noexcept; memory::dynamic::Allocator &resource_allocator, std::uint64_t * volatile signal_page) noexcept;
~Scheduler() noexcept; ~Scheduler() noexcept;
/** /**