mirror of
https://github.com/mmueller41/mxtasking.git
synced 2026-01-21 12:42:57 +01:00
Worker and channel allocation adopted to Genode's memory management.
This commit is contained in:
@@ -138,6 +138,8 @@ public:
|
|||||||
return _occupancy.has_excessive_usage_prediction();
|
return _occupancy.has_excessive_usage_prediction();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
std::uint8_t numa_node_id() { return _numa_node_id; }
|
||||||
|
|
||||||
private:
|
private:
|
||||||
// Backend queues for multiple produces in different NUMA regions and different priorities,
|
// Backend queues for multiple produces in different NUMA regions and different priorities,
|
||||||
alignas(64)
|
alignas(64)
|
||||||
|
|||||||
@@ -2,10 +2,10 @@
|
|||||||
#include <cassert>
|
#include <cassert>
|
||||||
#include <mx/memory/global_heap.h>
|
#include <mx/memory/global_heap.h>
|
||||||
#include <mx/synchronization/synchronization.h>
|
#include <mx/synchronization/synchronization.h>
|
||||||
#include <mx/system/thread.h>
|
|
||||||
#include <mx/system/topology.h>
|
#include <mx/system/topology.h>
|
||||||
#include <thread>
|
#include <thread>
|
||||||
#include <vector>
|
#include <vector>
|
||||||
|
#include <base/log.h>
|
||||||
|
|
||||||
using namespace mx::tasking;
|
using namespace mx::tasking;
|
||||||
|
|
||||||
@@ -16,13 +16,13 @@ Scheduler::Scheduler(const mx::util::core_set &core_set, const std::uint16_t pre
|
|||||||
{
|
{
|
||||||
this->_worker.fill(nullptr);
|
this->_worker.fill(nullptr);
|
||||||
this->_channel_numa_node_map.fill(0U);
|
this->_channel_numa_node_map.fill(0U);
|
||||||
|
|
||||||
for (auto worker_id = 0U; worker_id < this->_count_channels; ++worker_id)
|
for (auto worker_id = 0U; worker_id < this->_count_channels; ++worker_id)
|
||||||
{
|
{
|
||||||
const auto core_id = this->_core_set[worker_id];
|
const auto core_id = this->_core_set[worker_id];
|
||||||
this->_channel_numa_node_map[worker_id] = system::topology::node_id(core_id);
|
this->_channel_numa_node_map[worker_id] = system::topology::node_id(core_id);
|
||||||
|
auto ptr = memory::GlobalHeap::allocate(this->_channel_numa_node_map[worker_id], sizeof(Worker));
|
||||||
this->_worker[worker_id] =
|
this->_worker[worker_id] =
|
||||||
new (memory::GlobalHeap::allocate(this->_channel_numa_node_map[worker_id], sizeof(Worker)))
|
new (ptr)
|
||||||
Worker(worker_id, core_id, this->_channel_numa_node_map[worker_id], this->_is_running,
|
Worker(worker_id, core_id, this->_channel_numa_node_map[worker_id], this->_is_running,
|
||||||
prefetch_distance, this->_epoch_manager[worker_id], this->_epoch_manager.global_epoch(),
|
prefetch_distance, this->_epoch_manager[worker_id], this->_epoch_manager.global_epoch(),
|
||||||
this->_statistic);
|
this->_statistic);
|
||||||
@@ -33,8 +33,9 @@ Scheduler::~Scheduler() noexcept
|
|||||||
{
|
{
|
||||||
for (auto *worker : this->_worker)
|
for (auto *worker : this->_worker)
|
||||||
{
|
{
|
||||||
|
std::uint8_t node_id = worker->channel().numa_node_id();
|
||||||
worker->~Worker();
|
worker->~Worker();
|
||||||
memory::GlobalHeap::free(worker, sizeof(Worker));
|
memory::GlobalHeap::free(worker, sizeof(Worker), node_id);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -47,7 +48,7 @@ void Scheduler::start_and_wait()
|
|||||||
{
|
{
|
||||||
worker_threads[channel_id] = std::thread([this, channel_id] { this->_worker[channel_id]->execute(); });
|
worker_threads[channel_id] = std::thread([this, channel_id] { this->_worker[channel_id]->execute(); });
|
||||||
|
|
||||||
system::thread::pin(worker_threads[channel_id], this->_worker[channel_id]->core_id());
|
//system::thread::pin(worker_threads[channel_id], this->_worker[channel_id]->core_id());
|
||||||
}
|
}
|
||||||
|
|
||||||
// ... and epoch management (if enabled).
|
// ... and epoch management (if enabled).
|
||||||
|
|||||||
Reference in New Issue
Block a user