mirror of
https://github.com/mmueller41/mxtasking.git
synced 2026-01-21 12:42:57 +01:00
Manage channels seperate from workers. This enables a channels to be migrated between worker threads without impacting the former's functionality.
This commit is contained in:
@@ -82,6 +82,7 @@ public:
|
||||
{
|
||||
size = fill<priority::low>(config::task_buffer_size());
|
||||
}
|
||||
_size = size;
|
||||
|
||||
return size;
|
||||
}
|
||||
@@ -140,6 +141,10 @@ public:
|
||||
|
||||
std::uint8_t numa_node_id() { return _numa_node_id; }
|
||||
|
||||
std::uint32_t size() { return _size; }
|
||||
|
||||
std::uint32_t decrement() { return --_size; }
|
||||
|
||||
private:
|
||||
// Backend queues for multiple produces in different NUMA regions and different priorities,
|
||||
alignas(64)
|
||||
@@ -154,6 +159,9 @@ private:
|
||||
// Id of this channel.
|
||||
const std::uint16_t _id;
|
||||
|
||||
// Size of this channel
|
||||
std::int32_t _size{0U};
|
||||
|
||||
// NUMA id of the worker thread owning this channel.
|
||||
const std::uint8_t _numa_node_id;
|
||||
|
||||
|
||||
@@ -33,6 +33,11 @@ Scheduler::Scheduler(const mx::util::core_set &core_set, const std::uint16_t pre
|
||||
Worker(worker_id, core_id, this->_channel_numa_node_map[worker_id], signal_page, this->_is_running,
|
||||
prefetch_distance, this->_epoch_manager[worker_id], this->_epoch_manager.global_epoch(),
|
||||
this->_statistic);
|
||||
ptr = memory::GlobalHeap::allocate(this->_channel_numa_node_map[worker_id], sizeof(Channel));
|
||||
this->_channels[worker_id] =
|
||||
new (ptr) Channel(worker_id, this->_channel_numa_node_map[worker_id], prefetch_distance);
|
||||
this->_worker[worker_id]->assign(this->_channels[worker_id]);
|
||||
Genode::log("Channel ", worker_id, " created at ", _channels[worker_id]);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -40,10 +45,17 @@ Scheduler::~Scheduler() noexcept
|
||||
{
|
||||
for (auto *worker : this->_worker)
|
||||
{
|
||||
std::uint8_t node_id = worker->channel().numa_node_id();
|
||||
std::uint8_t node_id = worker->numa_id();
|
||||
worker->~Worker();
|
||||
memory::GlobalHeap::free(worker, sizeof(Worker), node_id);
|
||||
}
|
||||
|
||||
for (auto *channel : this->_channels)
|
||||
{
|
||||
std::uint8_t node_id = channel->numa_node_id();
|
||||
channel->~Channel();
|
||||
memory::GlobalHeap::free(channel, sizeof(Channel), node_id);
|
||||
}
|
||||
}
|
||||
|
||||
void Scheduler::start_and_wait()
|
||||
@@ -64,13 +76,16 @@ void Scheduler::start_and_wait()
|
||||
|
||||
Nova::mword_t start_cpu = 0;
|
||||
Nova::cpu_id(start_cpu);
|
||||
|
||||
for (auto cpu = 1U; cpu < space.total(); ++cpu) {
|
||||
|
||||
Genode::Trace::Timestamp start = Genode::Trace::timestamp();
|
||||
for (auto cpu = 1U; cpu < space.total(); ++cpu)
|
||||
{
|
||||
Genode::String<32> const name{"worker", cpu};
|
||||
Libc::pthread_create_from_session(&worker_threads[cpu], Worker::entry, _worker[cpu], 4 * 4096, name.string(),
|
||||
&mx::system::Environment::envp()->cpu(), space.location_of_index(cpu));
|
||||
}
|
||||
|
||||
Genode::Trace::Timestamp end = Genode::Trace::timestamp();
|
||||
Genode::log("Worker started in ", (end - start), " cycles");
|
||||
|
||||
Genode::log("Creating foreman thread on CPU ", start_cpu);
|
||||
|
||||
@@ -83,28 +98,29 @@ void Scheduler::start_and_wait()
|
||||
if constexpr (config::memory_reclamation() != config::None)
|
||||
{
|
||||
// In case we enable memory reclamation: Use an additional thread.
|
||||
Libc::pthread_create_from_session(
|
||||
&worker_threads[space.total()], mx::memory::reclamation::EpochManager::enter, &this->_epoch_manager, 4 * 4096,
|
||||
"epoch_manager", &mx::system::Environment::cpu(), space.location_of_index(space.total());
|
||||
}
|
||||
"epoch_manager", &mx::system::Environment::cpu(), space.location_of_index(space.total()));
|
||||
}
|
||||
|
||||
// Turning the flag on starts all worker threads to execute tasks.
|
||||
this->_is_running = true;
|
||||
// Turning the flag on starts all worker threads to execute tasks.
|
||||
this->_is_running = true;
|
||||
|
||||
// Wait for the worker threads to end. This will only
|
||||
// reached when the _is_running flag is set to false
|
||||
// from somewhere in the application.
|
||||
for (auto &worker_thread : worker_threads)
|
||||
{
|
||||
pthread_join(worker_thread, 0);
|
||||
}
|
||||
// Wait for the worker threads to end. This will only
|
||||
// reached when the _is_running flag is set to false
|
||||
// from somewhere in the application.
|
||||
for (auto &worker_thread : worker_threads)
|
||||
{
|
||||
pthread_join(worker_thread, 0);
|
||||
}
|
||||
|
||||
if constexpr (config::memory_reclamation() != config::None)
|
||||
{
|
||||
// At this point, no task will execute on any resource;
|
||||
// but the epoch manager has joined, too. Therefore,
|
||||
// we will reclaim all memory manually.
|
||||
this->_epoch_manager.reclaim_all();
|
||||
}
|
||||
if constexpr (config::memory_reclamation() != config::None)
|
||||
{
|
||||
// At this point, no task will execute on any resource;
|
||||
// but the epoch manager has joined, too. Therefore,
|
||||
// we will reclaim all memory manually.
|
||||
this->_epoch_manager.reclaim_all();
|
||||
}
|
||||
}
|
||||
|
||||
void Scheduler::schedule(TaskInterface &task, const std::uint16_t current_channel_id) noexcept
|
||||
@@ -121,7 +137,7 @@ void Scheduler::schedule(TaskInterface &task, const std::uint16_t current_channe
|
||||
if (Scheduler::keep_task_local(task.is_readonly(), annotated_resource.synchronization_primitive(),
|
||||
resource_channel_id, current_channel_id))
|
||||
{
|
||||
this->_worker[current_channel_id]->channel().push_back_local(&task);
|
||||
this->_channels[current_channel_id]->push_back_local(&task);
|
||||
if constexpr (config::task_statistics())
|
||||
{
|
||||
this->_statistic.increment<profiling::Statistic::ScheduledOnChannel>(current_channel_id);
|
||||
@@ -129,7 +145,7 @@ void Scheduler::schedule(TaskInterface &task, const std::uint16_t current_channe
|
||||
}
|
||||
else
|
||||
{
|
||||
this->_worker[resource_channel_id]->channel().push_back_remote(&task,
|
||||
this->_channels[resource_channel_id]->push_back_remote(&task,
|
||||
this->numa_node_id(current_channel_id));
|
||||
if constexpr (config::task_statistics())
|
||||
{
|
||||
@@ -147,7 +163,7 @@ void Scheduler::schedule(TaskInterface &task, const std::uint16_t current_channe
|
||||
// whenever possible to spawn the task.
|
||||
if (target_channel_id == current_channel_id)
|
||||
{
|
||||
this->_worker[current_channel_id]->channel().push_back_local(&task);
|
||||
this->_channels[current_channel_id]->push_back_local(&task);
|
||||
if constexpr (config::task_statistics())
|
||||
{
|
||||
this->_statistic.increment<profiling::Statistic::ScheduledOnChannel>(current_channel_id);
|
||||
@@ -155,7 +171,7 @@ void Scheduler::schedule(TaskInterface &task, const std::uint16_t current_channe
|
||||
}
|
||||
else
|
||||
{
|
||||
this->_worker[target_channel_id]->channel().push_back_remote(&task, this->numa_node_id(current_channel_id));
|
||||
this->_channels[target_channel_id]->push_back_remote(&task, this->numa_node_id(current_channel_id));
|
||||
if constexpr (config::task_statistics())
|
||||
{
|
||||
this->_statistic.increment<profiling::Statistic::ScheduledOffChannel>(current_channel_id);
|
||||
@@ -173,7 +189,7 @@ void Scheduler::schedule(TaskInterface &task, const std::uint16_t current_channe
|
||||
// The task can run everywhere.
|
||||
else
|
||||
{
|
||||
this->_worker[current_channel_id]->channel().push_back_local(&task);
|
||||
this->_channels[current_channel_id]->push_back_local(&task);
|
||||
if constexpr (config::task_statistics())
|
||||
{
|
||||
this->_statistic.increment<profiling::Statistic::ScheduledOnChannel>(current_channel_id);
|
||||
@@ -191,7 +207,7 @@ void Scheduler::schedule(TaskInterface &task) noexcept
|
||||
if (task.has_resource_annotated())
|
||||
{
|
||||
const auto &annotated_resource = task.annotated_resource();
|
||||
this->_worker[annotated_resource.channel_id()]->channel().push_back_remote(&task, 0U);
|
||||
this->_channels[annotated_resource.channel_id()]->push_back_remote(&task, 0U);
|
||||
if constexpr (config::task_statistics())
|
||||
{
|
||||
this->_statistic.increment<profiling::Statistic::ScheduledOffChannel>(annotated_resource.channel_id());
|
||||
@@ -199,7 +215,7 @@ void Scheduler::schedule(TaskInterface &task) noexcept
|
||||
}
|
||||
else if (task.has_channel_annotated())
|
||||
{
|
||||
this->_worker[task.annotated_channel()]->channel().push_back_remote(&task, 0U);
|
||||
this->_channels[task.annotated_channel()]->push_back_remote(&task, 0U);
|
||||
if constexpr (config::task_statistics())
|
||||
{
|
||||
this->_statistic.increment<profiling::Statistic::ScheduledOffChannel>(task.annotated_channel());
|
||||
@@ -227,6 +243,7 @@ void Scheduler::profile(const std::string &output_file)
|
||||
this->_profiler.profile(output_file);
|
||||
for (auto i = 0U; i < this->_count_channels; ++i)
|
||||
{
|
||||
this->_profiler.profile(this->_is_running, this->_worker[i]->channel());
|
||||
Genode::log("Profiling channel ", i, " at ", this->_channels[i]);
|
||||
this->_profiler.profile(this->_is_running, *(this->_channels[i]));
|
||||
}
|
||||
}
|
||||
@@ -98,7 +98,7 @@ public:
|
||||
*/
|
||||
void predict_usage(const std::uint16_t channel_id, const resource::hint::expected_access_frequency usage) noexcept
|
||||
{
|
||||
_worker[channel_id]->channel().predict_usage(usage);
|
||||
_channels[channel_id]->predict_usage(usage);
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -111,7 +111,7 @@ public:
|
||||
const resource::hint::expected_access_frequency old_prediction,
|
||||
const resource::hint::expected_access_frequency new_prediction) noexcept
|
||||
{
|
||||
_worker[channel_id]->channel().modify_predicted_usage(old_prediction, new_prediction);
|
||||
_channels[channel_id]->modify_predicted_usage(old_prediction, new_prediction);
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -120,7 +120,7 @@ public:
|
||||
*/
|
||||
[[nodiscard]] bool has_excessive_usage_prediction(const std::uint16_t channel_id) const noexcept
|
||||
{
|
||||
return _worker[channel_id]->channel().has_excessive_usage_prediction();
|
||||
return _channels[channel_id]->has_excessive_usage_prediction();
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -179,7 +179,7 @@ private:
|
||||
const util::core_set _core_set;
|
||||
|
||||
// Number of all channels.
|
||||
const std::uint16_t _count_channels;
|
||||
std::uint16_t _count_channels;
|
||||
|
||||
// Flag for the worker threads. If false, the worker threads will stop.
|
||||
// This is atomic for hardware that does not guarantee atomic reads/writes of booleans.
|
||||
@@ -188,6 +188,8 @@ private:
|
||||
// All initialized workers.
|
||||
alignas(64) std::array<Worker *, config::max_cores()> _worker{nullptr};
|
||||
|
||||
alignas(64) std::array<Channel *, config::max_cores()> _channels{nullptr};
|
||||
|
||||
// Map of channel id to NUMA region id.
|
||||
alignas(64) std::array<std::uint8_t, config::max_cores()> _channel_numa_node_map{0U};
|
||||
|
||||
|
||||
@@ -20,8 +20,8 @@ Worker::Worker(const std::uint16_t id, const std::uint16_t target_core_id, const
|
||||
const util::maybe_atomic<bool> &is_running, const std::uint16_t prefetch_distance,
|
||||
memory::reclamation::LocalEpoch &local_epoch,
|
||||
const std::atomic<memory::reclamation::epoch_t> &global_epoch, profiling::Statistic &statistic) noexcept
|
||||
: _target_core_id(target_core_id), _tukija_signal(tukija_sig), _prefetch_distance(prefetch_distance),
|
||||
_channel(id, target_numa_node_id, prefetch_distance), _local_epoch(local_epoch), _global_epoch(global_epoch),
|
||||
: _target_core_id(target_core_id), _target_numa_node_id(target_numa_node_id), _tukija_signal(tukija_sig), _prefetch_distance(prefetch_distance),
|
||||
_id(id), _local_epoch(local_epoch), _global_epoch(global_epoch),
|
||||
_statistic(statistic), _is_running(is_running)
|
||||
{
|
||||
}
|
||||
@@ -34,7 +34,7 @@ void Worker::execute()
|
||||
|
||||
self->pin(loc);
|
||||
}*/
|
||||
if (_channel.id() != 0)
|
||||
if (_id != 0)
|
||||
sleep();
|
||||
|
||||
while (this->_is_running == false)
|
||||
@@ -46,7 +46,6 @@ void Worker::execute()
|
||||
TaskInterface *task;
|
||||
const auto core_id = system::topology::core_id();
|
||||
//assert(this->_target_core_id == core_id && "Worker not pinned to correct core.");
|
||||
const auto channel_id = this->_channel.id();
|
||||
Nova::mword_t pcpu = 0;
|
||||
Nova::cpu_id(pcpu);
|
||||
|
||||
@@ -55,6 +54,9 @@ void Worker::execute()
|
||||
//Genode::log("Worker ", _channel.id(), "(", _phys_core_id ,")", " woke up");
|
||||
std::uint64_t *volatile tukija_signal = &_tukija_signal[_phys_core_id];
|
||||
|
||||
this->current = this->_channels.pop_front();
|
||||
auto channel_id = current->id();
|
||||
|
||||
while (this->_is_running)
|
||||
{
|
||||
if constexpr (config::memory_reclamation() == config::UpdateEpochPeriodically)
|
||||
@@ -62,9 +64,9 @@ void Worker::execute()
|
||||
this->_local_epoch.enter(this->_global_epoch);
|
||||
}
|
||||
|
||||
this->_channel_size = this->_channel.fill();
|
||||
this->current->fill();
|
||||
|
||||
if (this->_channel_size == 0) {
|
||||
if (this->current->size() == 0) {
|
||||
//Genode::log("Channel ", _channel.id(), " empty. Going to sleep");
|
||||
sleep();
|
||||
//Genode::log("Worker on CPU ", _phys_core_id, " woke up at ", Genode::Trace::timestamp());
|
||||
@@ -75,19 +77,19 @@ void Worker::execute()
|
||||
this->_statistic.increment<profiling::Statistic::Fill>(channel_id);
|
||||
}
|
||||
|
||||
while ((task = this->_channel.next()) != nullptr)
|
||||
while ((task = this->current->next()) != nullptr)
|
||||
{
|
||||
// Whenever the worker-local task-buffer falls under
|
||||
// the prefetch distance, we re-fill the buffer to avoid
|
||||
// empty slots in the prefetch-buffer.
|
||||
if (--this->_channel_size <= this->_prefetch_distance)
|
||||
if (this->current->decrement() <= this->_prefetch_distance)
|
||||
{
|
||||
if constexpr (config::memory_reclamation() == config::UpdateEpochPeriodically)
|
||||
{
|
||||
this->_local_epoch.enter(this->_global_epoch);
|
||||
}
|
||||
|
||||
this->_channel_size = this->_channel.fill();
|
||||
this->current->fill();
|
||||
if constexpr (config::task_statistics())
|
||||
{
|
||||
this->_statistic.increment<profiling::Statistic::Fill>(channel_id);
|
||||
|
||||
@@ -14,6 +14,7 @@
|
||||
#include <vector>
|
||||
#include <nova/syscalls.h>
|
||||
#include <base/log.h>
|
||||
#include <mx/util/bound_mpmc_queue.h>
|
||||
|
||||
namespace mx::tasking {
|
||||
/**
|
||||
@@ -56,32 +57,49 @@ public:
|
||||
|
||||
bool sleeping() { return _is_sleeping; }
|
||||
|
||||
/**
|
||||
* Steal a channel from another worker
|
||||
*/
|
||||
bool steal();
|
||||
|
||||
/**
|
||||
* Assign a channel to this worker
|
||||
* This method is called by the scheduler to assign each worker an initial channel upon start. That's because initially none of the workers has any channel assigned yet. If it would try to steal one from another worker upon initialization it would not find a channel to begin with. Since this would apply for all channels the application would just stall forever, never finding a channel to steal.
|
||||
*/
|
||||
void assign(Channel *channel) { _channels.push_back(channel); }
|
||||
|
||||
/**
|
||||
* @return Id of the logical core this worker runs on.
|
||||
*/
|
||||
[[nodiscard]] std::uint16_t core_id() const noexcept { return _target_core_id; }
|
||||
|
||||
[[nodiscard]] Channel &channel() noexcept { return _channel; }
|
||||
[[nodiscard]] const Channel &channel() const noexcept { return _channel; }
|
||||
/*[[nodiscard]] Channel &channel() noexcept { return _channel; }
|
||||
[[nodiscard]] const Channel &channel() const noexcept { return _channel; }*/
|
||||
|
||||
[[nodiscard]] std::uint16_t numa_id() const noexcept { return _target_numa_node_id; }
|
||||
|
||||
private:
|
||||
// Id of the logical core.
|
||||
const std::uint16_t _target_core_id;
|
||||
|
||||
const std::uint16_t _target_numa_node_id;
|
||||
|
||||
// Distance of prefetching tasks.
|
||||
const std::uint16_t _prefetch_distance;
|
||||
|
||||
std::uint16_t _phys_core_id{0};
|
||||
|
||||
std::int32_t _channel_size{0U};
|
||||
std::uint16_t _id{0};
|
||||
// std::int32_t _channel_size{0U};
|
||||
|
||||
// Stack for persisting tasks in optimistic execution. Optimistically
|
||||
// executed tasks may fail and be restored after execution.
|
||||
alignas(64) TaskStack _task_stack;
|
||||
|
||||
// Channel where tasks are stored for execution.
|
||||
alignas(64) Channel _channel;
|
||||
alignas(64) util::BoundMPMCQueue<Channel *> _channels{config::max_cores()};
|
||||
|
||||
alignas(64) Channel *current{nullptr};
|
||||
|
||||
// Local epoch of this worker.
|
||||
memory::reclamation::LocalEpoch &_local_epoch;
|
||||
|
||||
Reference in New Issue
Block a user