Use new allocator for determining victim queues.

This commit is contained in:
Michael Mueller
2024-10-28 13:25:58 +01:00
parent ea6c00de24
commit 2925ae7737
6 changed files with 169 additions and 113 deletions

View File

@@ -39,6 +39,7 @@ public:
}
~Channel() noexcept = default;
/**
* @return Identifier of the channel.
*/
@@ -47,7 +48,10 @@ public:
/**
* @return The next task to be executed.
*/
TaskInterface *next() noexcept { return _task_buffer.next(); }
TaskInterface *next_task() noexcept { return _task_buffer.next(); }
Channel *next() noexcept { return _next; }
void next(Channel *c) { _next = c; }
/**
* Schedules the task to thread-safe queue with regard to the NUMA region
@@ -177,6 +181,8 @@ private:
// Is the channel currently processed by a worker
alignas(64) std::atomic<tasking::priority> _phase{priority::normal};
alignas(64) Channel *_next{nullptr};
/**
* Fills the task buffer with tasks scheduled with a given priority.
*

View File

@@ -20,13 +20,14 @@ std::uint64_t *volatile mx::tasking::runtime::_signal_page;
Scheduler::Scheduler(const mx::util::core_set &core_set, const std::uint16_t prefetch_distance,
memory::dynamic::Allocator &resource_allocator, std::uint64_t * volatile signal_page) noexcept
: _core_set(core_set), _count_channels(core_set.size()), _vacant_channels(_count_channels), _worker({}), _channel_numa_node_map({0U}),
: _core_set(core_set), _count_channels(core_set.size()), _worker({}), _channel_numa_node_map({0U}),
_epoch_manager(core_set.size(), resource_allocator, _is_running), _statistic(_count_channels)
{
this->_worker.fill(nullptr);
this->_channel_numa_node_map.fill(0U);
Genode::log("Initializing scheduler");
for (auto worker_id = 0U; worker_id < mx::system::Environment::topo().global_affinity_space().total(); ++worker_id)
auto worker_count = system::Environment::topo().global_affinity_space().total();
for (auto worker_id = 0U; worker_id < worker_count; ++worker_id)
{
const auto core_id = worker_id;
this->_channel_numa_node_map[worker_id] = system::topology::node_id(core_id);
@@ -34,13 +35,12 @@ Scheduler::Scheduler(const mx::util::core_set &core_set, const std::uint16_t pre
auto ptr = memory::GlobalHeap::allocate(this->_channel_numa_node_map[worker_id], sizeof(Worker));
this->_worker[worker_id] =
new (ptr)
Worker(worker_id, core_id, this->_channel_numa_node_map[worker_id], signal_page, this->_is_running, _vacant_channels, _stealing_limit, _remainder_channel_count,
Worker(worker_id, core_id, this->_channel_numa_node_map[worker_id], signal_page, this->_is_running, _vacant_channels_alloc, _remainder_channel_count,
prefetch_distance, this->_epoch_manager[worker_id], this->_epoch_manager.global_epoch(),
this->_statistic);
ptr = memory::GlobalHeap::allocate(this->_channel_numa_node_map[worker_id], sizeof(Channel));
this->_channels[worker_id] =
new (ptr) Channel(worker_id, this->_channel_numa_node_map[worker_id], prefetch_distance);
_vacant_channels.push_back(_channels[worker_id]);
Genode::log("Channel ", worker_id, " created at ", _channels[worker_id]);
}
}
@@ -92,8 +92,8 @@ void Scheduler::start_and_wait()
Genode::log("Worker started in ", (end - start), " cycles");
Genode::log("Creating foreman thread on CPU ", start_cpu);
Channel *qf = _vacant_channels.pop_front();
Channel *qf = _channels[0];
_worker[0]->assign(qf);
Libc::pthread_create_from_session(&worker_threads[0], Worker::entry, _worker[0], 4 * 4096, "foreman",
@@ -111,7 +111,7 @@ void Scheduler::start_and_wait()
Nova::core_allocation(allocation, false);
Genode::log("Initial cores ", allocation);
this->allocate_cores(_core_set.size()-1);
this->allocate_cores(_count_channels);
// ... and epoch management (if enabled).
if constexpr (config::memory_reclamation() != config::None)
@@ -265,8 +265,4 @@ void Scheduler::profile(const std::string &output_file)
Genode::log("Profiling channel ", i, " at ", this->_channels[i]);
this->_profiler.profile(this->_is_running, *(this->_channels[i]));
}
}
Channel* Scheduler::steal_for(Worker *thief)
{
}

View File

@@ -16,8 +16,10 @@
#include <mx/tasking/profiling/statistic.h>
#include <mx/util/core_set.h>
#include <mx/util/random.h>
#include <mx/util/bit_alloc.h>
#include <string>
#include <cmath>
#include <mx/util/field_alloc.h>
namespace mx::tasking {
/**
@@ -57,7 +59,6 @@ public:
{
_is_running = false;
//Genode::log("Stopping runtime");
Genode::log("Waiting for ", _active_worker_count.load(), " workers to finish.");
while (_active_worker_count > 1)
system::builtin::pause();
Nova::mword_t pcpu = 0;
@@ -82,7 +83,7 @@ public:
Nova::core_allocation(allocation, false);
//Genode::log("Allocation before resume ", allocation);
allocate_cores(_core_set.size()-1);
allocate_cores(_count_channels);
_is_running = true;
}
@@ -93,14 +94,10 @@ public:
return _worker_at_core[pcpu];
}
[[nodiscard]] inline Channel *get_channel(std::uint64_t index) { return _channels[index]; }
[[nodiscard]] inline std::uint16_t active_workers() const noexcept { return _active_worker_count; }
Channel *steal_for(Worker *thief);
[[nodiscard]] inline void set_stealing_limit(std::uint16_t workers) {
_stealing_limit = std::ceil(static_cast<float>(_vacant_channels.size()) / static_cast<float>(workers));
}
/**
* @return Core set of this instance.
*/
@@ -232,9 +229,6 @@ public:
}
}
inline void add_vacant_channel(Channel *channel) { _vacant_channels.push_back(channel);
}
/**
* Starts profiling of idle times and specifies the results file.
* @param output_file File to write idle times after stopping MxTasking.
@@ -251,9 +245,8 @@ public:
Nova::mword_t remainder = 0;
Nova::alloc_cores(cores, allocation, remainder);
std::bitset<config::max_cores()> allocated(allocation);
_remainder_channel_count.store(remainder);
_remainder_channel_count.store(allocated.count());
//Genode::log("Allocated ", allocation, " with ", allocated.count(), " workers and ", remainder, " excess queues.");
}
private:
@@ -277,9 +270,8 @@ private:
alignas(64) std::array<Channel *, config::max_cores()> _channels{nullptr};
alignas(64) util::BoundMPMCQueue<Channel *> _vacant_channels{config::max_cores()};
alignas(64) mx::util::Field_Allocator<config::max_cores()> _vacant_channels_alloc{63};
alignas(64) std::atomic<std::int32_t> _remainder_channel_count{0};
alignas(64) util::maybe_atomic<std::uint16_t> _stealing_limit{0};
// Map of channel id to NUMA region id.
alignas(64) std::array<std::uint8_t, config::max_cores()> _channel_numa_node_map{0U};

View File

@@ -20,42 +20,47 @@
using namespace mx::tasking;
Worker::Worker(const std::uint16_t id, const std::uint16_t target_core_id, const std::uint16_t target_numa_node_id, std::uint64_t* volatile tukija_sig,
const util::maybe_atomic<bool> &is_running, util::BoundMPMCQueue<Channel *> &v, util::maybe_atomic<std::uint16_t> &s, std::atomic<std::int32_t> &e, const std::uint16_t prefetch_distance,
const util::maybe_atomic<bool> &is_running, util::Field_Allocator<config::max_cores()> &v, std::atomic<std::int32_t> &e, const std::uint16_t prefetch_distance,
memory::reclamation::LocalEpoch &local_epoch,
const std::atomic<memory::reclamation::epoch_t> &global_epoch, profiling::Statistic &statistic) noexcept
: _target_core_id(target_core_id), _target_numa_node_id(target_numa_node_id), _tukija_signal(reinterpret_cast<struct InfoPage*>(tukija_sig)), _prefetch_distance(prefetch_distance),
_id(id), _local_epoch(local_epoch), _global_epoch(global_epoch),
_statistic(statistic), _is_running(is_running), _vacant_channels(v), _stealing_limit(s), _excess_queues(e)
_id(id), _rng(std::mt19937(id)), _local_epoch(local_epoch), _global_epoch(global_epoch),
_statistic(statistic), _is_running(is_running), _vacant_channels(v), _excess_queues(e)
{
}
void Worker::yield_channels(std::uint16_t num, Channel *except) {
Channel *chan;
do {
chan = _channels.pop_front_or(nullptr);
chan = _channels.pop_front();
if (chan && chan != except) {
//Genode::log("Worker ", _id, " returns channel ", chan->id());
_vacant_channels.push_back(chan);
_vacant_channels.release(chan->id());
_count_channels--;
}
num--;
} while (num && chan);
if (current && current != except) {
_vacant_channels.push_back(current);
_vacant_channels.release(current->id());
//Genode::log("Worker ", _id, " returns channel ", current->id());
/* If yield_channels is called by the foreman, it is likely that this happenend due to a previous call to runtime::stop. In this case the foreman will release all channels but channel 0. However, it is possible that the foreman called runtime::stop while processing another channel than 0 which after this call will not belong to it anymore. Thus, to avoid having the foreman process a channel that has been yielded and risking nasty race conditions, the foreman takes the last remaining channel (0). */
if (_id == 0 && current->id() != 0) {
current = _channels.pop_front_or(nullptr);
current = _channels.pop_front();
assert(current != nullptr);
}
else
current = nullptr;
_count_channels--;
}
_count_channels--;
}
Channel* Worker::get(std::uint64_t idx)
{
return runtime::scheduler().get_channel(idx);
}
void Worker::execute()
{
/*{
@@ -73,11 +78,14 @@ void Worker::execute()
if (_id != 0) {
sleep();
} else if (_id == 0) /* foreman */ {
Genode::log("Foreman starting");
current = _channels.pop_front();
}
_is_sleeping = false;
//Genode::log("Waiting for hooter");
wait_for_hooter();
//Genode::log("Hooter sounded");
runtime::scheduler().register_worker(this);
//Genode::log("Worker ", _id, "(", _phys_core_id, ")",
@@ -86,22 +94,20 @@ void Worker::execute()
TaskInterface *task;
const auto core_id = system::topology::core_id();
//assert(this->_target_core_id == core_id && "Worker not pinned to correct core.");
auto phase = priority::normal;
while (true)
{
handle_resume();
while (!current) {
//handle_channel_occupancy();
while (!current)
{
//Genode::log("Worker ",_id,": No queues for me.");
std::uint16_t expect = 0;
bool shall_yield = !__atomic_compare_exchange_n(&_my_page->yield_flag, &expect, 2, false, __ATOMIC_SEQ_CST, __ATOMIC_RELAXED);
if (shall_yield) {
handle_yield();
return;
continue;
}
deregister();
sleep();
@@ -116,42 +122,14 @@ void Worker::execute()
auto channel_id = current->id();
if (current->phase() != phase) {
if (phase == priority::normal)
phase = priority::low;
else /* phase == priority::low */ {
phase = priority::normal;
}
//Genode::log("Switched phase to ", (phase == priority::normal)? "normal" : "low");
}
if (phase == priority::normal)
current->fill<priority::normal>();
else {
current->fill<priority::low>();
}
if (phase == priority::normal && current->empty()) {
Genode::Trace::Timestamp thefts_start = Genode::Trace::timestamp();
steal(false);
Genode::Trace::Timestamp theft_stop = Genode::Trace::timestamp();
Genode::Trace::Timestamp cost = theft_stop - thefts_start;
_stealing_cost += cost;
if (cost > _max_cost)
_max_cost = cost;
if ((_min_cost == 0) || (_min_cost > cost))
_min_cost = cost;
_thefts++;
//_channels.push_back(current);
// current = _channels.pop_front_or(nullptr);
}
current->fill();
if constexpr (config::task_statistics())
{
this->_statistic.increment<profiling::Statistic::Fill>(channel_id);
}
while ((task = this->current->next()) != nullptr)
while ((task = this->current->next_task()) != nullptr)
{
// Whenever the worker-local task-buffer falls under
// the prefetch distance, we re-fill the buffer to avoid
@@ -221,15 +199,18 @@ void Worker::execute()
{
runtime::delete_task(core_id, task);
}
handle_yield();
}
steal(false);
handle_yield();
handle_stop();
current->switch_phase();
_channels.push_back(current);
current = _channels.pop_front_or(nullptr);
current = _channels.pop_front();
}
}

View File

@@ -15,6 +15,9 @@
#include <nova/syscalls.h>
#include <base/log.h>
#include <mx/util/bound_mpmc_queue.h>
#include <mx/util/field_alloc.h>
#include <mx/util/queue.h>
#include <random>
namespace mx::tasking {
/**
@@ -24,7 +27,7 @@ class alignas(64) Worker
{
public:
Worker(std::uint16_t id, std::uint16_t target_core_id, std::uint16_t target_numa_node_id, std::uint64_t* volatile tukija_sig,
const util::maybe_atomic<bool> &is_running, util::BoundMPMCQueue<Channel *> &v, util::maybe_atomic<std::uint16_t> &s, std::atomic<std::int32_t> &e, std::uint16_t prefetch_distance,
const util::maybe_atomic<bool> &is_running, util::Field_Allocator<config::max_cores()> &v, std::atomic<std::int32_t> &e, std::uint16_t prefetch_distance,
memory::reclamation::LocalEpoch &local_epoch, const std::atomic<memory::reclamation::epoch_t> &global_epoch,
profiling::Statistic &statistic) noexcept;
@@ -65,6 +68,34 @@ public:
[[nodiscard]] std::uint16_t number_of_channels() { return _count_channels; }
Channel *get(std::uint64_t idx);
inline bool pick_channel(std::uint64_t offset, std::uint64_t limit, bool change_phase_to_normal = false)
{
//Genode::Trace::Timestamp deq_start = Genode::Trace::timestamp();
std::uint64_t cidx = _vacant_channels.alloc_randomly( offset, limit);
//Genode::Trace::Timestamp deq_stop = Genode::Trace::timestamp();
//_mean_dequeue_cost += deq_stop - deq_start;
//dequeues++;
if (cidx > 0 && cidx < 64) {
//Genode::log("Worker ", _id, "(", _phys_core_id, ") picked channel ", cidx);
Channel *loot = get(cidx);
if (change_phase_to_normal)
loot->phase(priority::normal);
//Genode::Trace::Timestamp enq_start = Genode::Trace::timestamp();
assign(loot);
//Genode::Trace::Timestamp enq_stop = Genode::Trace::timestamp();
//_mean_enqueue_cost += enq_stop - enq_start;
//enqueues++;
return true;
}
return false;
}
inline bool steal(bool init=true)
{
@@ -77,35 +108,11 @@ public:
if (init) {
while (individual_stealing_limit() > 0)
{
Channel *loot = _vacant_channels.pop_front_or(nullptr);
if (loot) {
this->assign(loot);
//Genode::log("Worker ", _id, ": Stole channel ", loot->id());
got_loot |= true;
} else /* no vacant queues anymore, so stop stealing */
if ((got_loot |= pick_channel(_id, stealing_limit())))
break;
}
} else if (_excess_queues.load(std::memory_order_relaxed) <= 0) {
Genode::Trace::Timestamp deq_start = Genode::Trace::timestamp();
Channel *loot = _vacant_channels.pop_front_or(nullptr);
Genode::Trace::Timestamp deq_stop = Genode::Trace::timestamp();
_mean_dequeue_cost += deq_stop - deq_start;
dequeues++;
if (loot)
{
loot->phase(priority::normal);
Genode::Trace::Timestamp enq_start = Genode::Trace::timestamp();
this->assign(loot);
Genode::Trace::Timestamp enq_stop = Genode::Trace::timestamp();
_mean_enqueue_cost += enq_stop - enq_start;
enqueues++;
got_loot |= true;
if (_vacant_channels.size() == 0)
_excess_queues.store(1);
// Genode::log("Worker ", _id, " Took remaining queue ", loot->id(), " with ", _vacant_channels.size(),
// " excess queues");
}
got_loot |= pick_channel(1, 63, true);
}
return got_loot;
@@ -194,12 +201,14 @@ private:
alignas(64) TaskStack _task_stack;
// Channel where tasks are stored for execution.
alignas(64) util::BoundMPMCQueue<Channel *> _channels{config::max_cores()};
alignas(64) util::Queue<Channel> _channels{};
alignas(64) struct InfoPage volatile *_my_page{nullptr};
alignas(64) Channel *current{nullptr};
std::mt19937 _rng;
/**
* Profiling data structures
*/
@@ -225,10 +234,10 @@ private:
const util::maybe_atomic<bool> &_is_running;
// Reference to queue of vacant channels
util::BoundMPMCQueue<Channel *> &_vacant_channels;
util::Field_Allocator<config::max_cores()> &_vacant_channels;
// Limit for stealing
const util::maybe_atomic<std::uint16_t> &_stealing_limit;
// Flag whether this worker may steal channels or not
bool _may_steal{true};
// Global number of excess queues
alignas(64) std::atomic<std::int32_t> &_excess_queues;
@@ -324,6 +333,7 @@ private:
inline void handle_yield()
{
if (yield_signaled()) {
_may_steal = true;
//Genode::log("Got yield signal ", _phys_core_id);
yield_channels(_count_channels, nullptr);
_excess_queues.fetch_sub(1);
@@ -334,13 +344,14 @@ private:
registrate();
wait_for_hooter();
_thefts = 0;
/*
_stealing_cost = 0;
_max_cost = 0;
_min_cost = 0;
_mean_enqueue_cost = 0;
_mean_dequeue_cost = 0;
enqueues = 1;
dequeues = 1;
dequeues = 1;*/
handle_resume();
}
}
@@ -348,14 +359,19 @@ private:
inline void handle_stop()
{
if (!_is_running) {
_may_steal = true;
std::uint16_t expect = 0;
bool shall_yield = !__atomic_compare_exchange_n(&_my_page->yield_flag, &expect, 2, false, __ATOMIC_SEQ_CST, __ATOMIC_RELAXED);
if (shall_yield) {
handle_yield();
return;
}
Genode::log("Worker ", _id, ": thefts=", _thefts, " cost_total=", _stealing_cost, "avg cost_per_theft=", _stealing_cost/_thefts, " min cost/theft=", _min_cost, " max cost/theft=", _max_cost, " avg deq=", _mean_dequeue_cost, " avg enq=", _mean_enqueue_cost/enqueues, " #deqs=", dequeues, " #enqs=", enqueues);
// Genode::log("Worker ", _id, " woke up again");
//Genode::log("Worker ", _id, ": thefs=", _thefts);
// Genode::log("Worker ", _id, ": thefts=", _thefts, " cost_total=", _stealing_cost, "avg cost_per_theft=",
// _stealing_cost/_thefts, " min cost/theft=", _min_cost, " max cost/theft=", _max_cost, " avg deq=",
// _mean_dequeue_cost/dequeues, " avg enq=", _mean_enqueue_cost/enqueues, " #deqs=", dequeues, " #enqs=",
// enqueues);
// Genode::log("Worker ", _id, " woke up again");
yield_channels(_count_channels, nullptr);
deregister();
sleep();
@@ -371,13 +387,13 @@ private:
wait_for_hooter();
_thefts = 0;
_stealing_cost = 0;
/*_stealing_cost = 0;
_max_cost = 0;
_min_cost = 0;
_mean_enqueue_cost = 0;
_mean_dequeue_cost = 0;
enqueues = 1;
dequeues = 1;
dequeues = 1;*/
handle_resume();
}
}
@@ -393,10 +409,19 @@ private:
handle_stop();
}
// Genode::log("Worker ", _id, " stole ", static_cast<std::uint32_t>(_count_channels), " channels.");
if (_excess_queues.fetch_sub(1) <= 1)
;
//Genode::log("Entering stealing phase 2");
current = _channels.pop_front_or(nullptr);
_excess_queues.fetch_sub(1);
// if (_excess_queues.fetch_sub(1) <= 1)
// Genode::log("Entering stealing phase 2");
current = _channels.pop_front();
}
}
void handle_channel_occupancy()
{
if (_may_steal && current->has_excessive_usage_prediction()) {
yield_channels(_count_channels - 1, current);
_may_steal = false;
//Genode::log("Worker ", _id, ": Got channel ", current->id(), " with excessive usage prediction.");
}
}

56
src/mx/util/field_alloc.h Normal file
View File

@@ -0,0 +1,56 @@
#pragma once
#include "random.h"
#include <atomic>
#include <base/log.h>
namespace mx::util {
template <unsigned C> class Field_Allocator;
}
template <unsigned C>
class mx::util::Field_Allocator
{
private:
struct field {
alignas(64) std::atomic<bool> reserved{false};
};
alignas(64) struct field _fields[C]{};
alignas(64) std::size_t _count;
std::atomic<std::size_t> free_fields;
public:
Field_Allocator(std::size_t count) : _count(count), free_fields(count) {}
std::size_t alloc_randomly(unsigned offset, unsigned limit) {
if (free_fields.load(std::memory_order_relaxed) <= 0)
return 0;
//mx::util::Random rng(Genode::Trace::timestamp());
//Genode::log("Searching queue");
std::size_t candidate = offset;
// +rng.next(limit);
if (candidate > (offset + limit))
return 0;
if (candidate == 0)
return 0;
for (; candidate < offset + limit; candidate++) {
bool expect = false;
bool success = _fields[candidate].reserved.compare_exchange_strong(expect, true, std::memory_order_acquire, std::memory_order_relaxed);
if (success) {
free_fields.fetch_sub(1);
return candidate;
}
}
return 0;
}
void release(std::size_t field) {
_fields[field].reserved.store(false);
free_fields.fetch_add(1);
}
};