Very first version of task queue stealing.

This commit is contained in:
Michael Mueller
2024-08-28 15:21:07 +02:00
parent 9f6e618d2b
commit ea6c00de24
7 changed files with 492 additions and 47 deletions

View File

@@ -145,6 +145,12 @@ public:
std::uint32_t decrement() { return --_size; }
tasking::priority phase() { return _phase; }
void switch_phase() { _phase = _phase == priority::normal ? priority::low : priority::normal; }
void phase(tasking::priority phase) { _phase = phase; }
std::uint16_t id() { return _id; }
private:
// Backend queues for multiple produces in different NUMA regions and different priorities,
alignas(64)
@@ -168,6 +174,9 @@ private:
// Holder of resource predictions of this channel.
alignas(64) ChannelOccupancy _occupancy{};
// Is the channel currently processed by a worker
alignas(64) std::atomic<tasking::priority> _phase{priority::normal};
/**
* Fills the task buffer with tasks scheduled with a given priority.
*

View File

@@ -39,7 +39,8 @@ public:
_signal_page = static_cast<std::uint64_t *>(mx::system::Environment::rm().attach(ds));
std::memset(_signal_page, 0, 4096);
Nova::mxinit(0, 0, reinterpret_cast<Nova::mword_t>(_signal_page));
Nova::mxinit(mx::system::Environment::topo().global_affinity_space().total()-1, 0, reinterpret_cast<Nova::mword_t>(_signal_page));
Genode::log("Initialized MxVisor interface");
}
// Are we ready to re-initialize the scheduler?
if (_scheduler != nullptr && _scheduler->is_running())
@@ -50,6 +51,7 @@ public:
// Create a new resource allocator.
if (_resource_allocator == nullptr)
{
Genode::log("Creating resource allocator");
_resource_allocator.reset(new (memory::GlobalHeap::allocate_cache_line_aligned(
sizeof(memory::dynamic::Allocator))) memory::dynamic::Allocator());
}
@@ -71,6 +73,7 @@ public:
}
else
{
Genode::log("Creating task allocator");
_task_allocator.reset(new (
memory::GlobalHeap::allocate_cache_line_aligned(sizeof(memory::fixed::Allocator<config::task_size()>)))
memory::fixed::Allocator<config::task_size()>(system::Environment::cores()));
@@ -245,6 +248,14 @@ public:
return _scheduler->statistic(counter, channel_id);
}
static Scheduler &scheduler() { return *_scheduler; }
static std::uint16_t my_id() { return _scheduler->my_self()->id(); }
static std::uint16_t my_channel() { return _scheduler->my_self()->current_channel()->id(); }
static std::uint16_t workers_count() { return _scheduler->active_workers(); }
private:
// Scheduler to spawn tasks.
inline static std::unique_ptr<Scheduler> _scheduler = {nullptr};

View File

@@ -11,6 +11,8 @@
#include <base/affinity.h>
#include <base/thread.h>
#include <nova/syscalls.h>
#include <cstdlib>
#include <cmath>
using namespace mx::tasking;
@@ -18,25 +20,27 @@ std::uint64_t *volatile mx::tasking::runtime::_signal_page;
Scheduler::Scheduler(const mx::util::core_set &core_set, const std::uint16_t prefetch_distance,
memory::dynamic::Allocator &resource_allocator, std::uint64_t * volatile signal_page) noexcept
: _core_set(core_set), _count_channels(core_set.size()), _worker({}), _channel_numa_node_map({0U}),
: _core_set(core_set), _count_channels(core_set.size()), _vacant_channels(_count_channels), _worker({}), _channel_numa_node_map({0U}),
_epoch_manager(core_set.size(), resource_allocator, _is_running), _statistic(_count_channels)
{
this->_worker.fill(nullptr);
this->_channel_numa_node_map.fill(0U);
Genode::log("Initializing scheduler");
for (auto worker_id = 0U; worker_id < mx::system::Environment::topo().global_affinity_space().total(); ++worker_id)
{
const auto core_id = this->_core_set[worker_id];
const auto core_id = worker_id;
this->_channel_numa_node_map[worker_id] = system::topology::node_id(core_id);
Genode::log("Creating worker ", worker_id, " at node ", this->_channel_numa_node_map[worker_id]);
auto ptr = memory::GlobalHeap::allocate(this->_channel_numa_node_map[worker_id], sizeof(Worker));
this->_worker[worker_id] =
new (ptr)
Worker(worker_id, core_id, this->_channel_numa_node_map[worker_id], signal_page, this->_is_running,
Worker(worker_id, core_id, this->_channel_numa_node_map[worker_id], signal_page, this->_is_running, _vacant_channels, _stealing_limit, _remainder_channel_count,
prefetch_distance, this->_epoch_manager[worker_id], this->_epoch_manager.global_epoch(),
this->_statistic);
ptr = memory::GlobalHeap::allocate(this->_channel_numa_node_map[worker_id], sizeof(Channel));
this->_channels[worker_id] =
new (ptr) Channel(worker_id, this->_channel_numa_node_map[worker_id], prefetch_distance);
this->_worker[worker_id]->assign(this->_channels[worker_id]);
_vacant_channels.push_back(_channels[worker_id]);
Genode::log("Channel ", worker_id, " created at ", _channels[worker_id]);
}
}
@@ -89,11 +93,26 @@ void Scheduler::start_and_wait()
Genode::log("Creating foreman thread on CPU ", start_cpu);
Channel *qf = _vacant_channels.pop_front();
_worker[0]->assign(qf);
Libc::pthread_create_from_session(&worker_threads[0], Worker::entry, _worker[0], 4 * 4096, "foreman",
&mx::system::Environment::envp()->cpu(), space.location_of_index(0) );
/* Always assign the first channel to the foreman, so that it is guaranteed
that channel 0 is always processed by worker 0 and, thus, always on the same CPU core.
This is very useful for benchmarks relying on the TSC. Furthermore, this channel will always be processed. */
Genode::log("Created foreman thread");
Genode::log("Allocating ", _core_set.size(), " initial workers.");
Nova::mword_t allocation = 0;
Nova::core_allocation(allocation, false);
Genode::log("Initial cores ", allocation);
this->allocate_cores(_core_set.size()-1);
// ... and epoch management (if enabled).
if constexpr (config::memory_reclamation() != config::None)
{
@@ -247,3 +266,7 @@ void Scheduler::profile(const std::string &output_file)
this->_profiler.profile(this->_is_running, *(this->_channels[i]));
}
}
Channel* Scheduler::steal_for(Worker *thief)
{
}

View File

@@ -17,6 +17,7 @@
#include <mx/util/core_set.h>
#include <mx/util/random.h>
#include <string>
#include <cmath>
namespace mx::tasking {
/**
@@ -55,10 +56,49 @@ public:
void interrupt() noexcept
{
_is_running = false;
this->_profiler.stop();
//Genode::log("Stopping runtime");
Genode::log("Waiting for ", _active_worker_count.load(), " workers to finish.");
while (_active_worker_count > 1)
system::builtin::pause();
Nova::mword_t pcpu = 0;
Nova::cpu_id(pcpu);
Worker *me = _worker_at_core[pcpu];
/* We assume that the runtime is always stopped by the foreman.
* So we should check whether this is truly the case here.
*/
assert(me->current_channel()->id() == 0 && "Channel is not 0.");
assert(me->id() == 0 && "Stop called by worker.");
me->yield_channels(config::max_cores(), _channels[0]);
//Genode::log("Got ", _vacant_channels.size(), " vacant channels.");
// this->_profiler.stop();
}
void resume() noexcept { _is_running = true;
void resume() noexcept {
Nova::mword_t allocation = 0;
Nova::core_allocation(allocation, false);
//Genode::log("Allocation before resume ", allocation);
allocate_cores(_core_set.size()-1);
_is_running = true;
}
[[nodiscard]] inline Worker *my_self() noexcept {
Nova::mword_t pcpu = 0;
Nova::cpu_id(pcpu);
return _worker_at_core[pcpu];
}
[[nodiscard]] inline std::uint16_t active_workers() const noexcept { return _active_worker_count; }
Channel *steal_for(Worker *thief);
[[nodiscard]] inline void set_stealing_limit(std::uint16_t workers) {
_stealing_limit = std::ceil(static_cast<float>(_vacant_channels.size()) / static_cast<float>(workers));
}
/**
@@ -99,6 +139,25 @@ public:
void predict_usage(const std::uint16_t channel_id, const resource::hint::expected_access_frequency usage) noexcept
{
_channels[channel_id]->predict_usage(usage);
/*if (usage == resource::hint::expected_access_frequency::excessive) {
Worker *owner = _owner_of_channel[channel_id];
if (owner) {
owner->prohibit_stealing();
//Genode::log("Worker ", owner->core_id(), " is going to have excessive load.");
_overloaded_workers.push_back(owner);
Nova::mword_t allocation;
std::uint16_t channels = owner->count_channels();
std::uint16_t cores_needed = (channels > 1) ? channels : 1;
Nova::alloc_cores(cores_needed + _vacant_channel_count.load(), allocation);
}
else
{
Nova::mword_t allocation;
Nova::alloc_cores(_vacant_channel_count, allocation);
}
}*/
}
/**
@@ -128,6 +187,15 @@ public:
*/
void reset() noexcept;
/**
* Register a worker in the core to worker map
*/
void register_worker(Worker *worker) { _worker_at_core[worker->phys_core_id()] = worker;
_active_worker_count.fetch_add(1);
}
void deregister_worker(Worker *worker) { _active_worker_count.fetch_sub(1); }
/**
* Aggregates the counter for all cores.
* @param counter Statistic counter.
@@ -164,6 +232,9 @@ public:
}
}
inline void add_vacant_channel(Channel *channel) { _vacant_channels.push_back(channel);
}
/**
* Starts profiling of idle times and specifies the results file.
* @param output_file File to write idle times after stopping MxTasking.
@@ -174,6 +245,17 @@ public:
bool operator!=(const util::core_set &cores) const noexcept { return _core_set != cores; }
inline void allocate_cores(std :: uint16_t cores)
{
Nova::mword_t allocation = 0;
Nova::mword_t remainder = 0;
Nova::alloc_cores(cores, allocation, remainder);
std::bitset<config::max_cores()> allocated(allocation);
_remainder_channel_count.store(remainder);
//Genode::log("Allocated ", allocation, " with ", allocated.count(), " workers and ", remainder, " excess queues.");
}
private:
// Cores to run the worker threads on.
const util::core_set _core_set;
@@ -181,15 +263,24 @@ private:
// Number of all channels.
std::uint16_t _count_channels;
alignas(64) std::atomic<bool> _loot_available{true};
// Flag for the worker threads. If false, the worker threads will stop.
// This is atomic for hardware that does not guarantee atomic reads/writes of booleans.
alignas(64) util::maybe_atomic<bool> _is_running{false};
alignas(64) std::atomic<std::uint16_t> _active_worker_count{0};
// All initialized workers.
alignas(64) std::array<Worker *, config::max_cores()> _worker{nullptr};
alignas(64) std::array<Worker *, config::max_cores()> _worker_at_core{nullptr};
alignas(64) std::array<Channel *, config::max_cores()> _channels{nullptr};
alignas(64) util::BoundMPMCQueue<Channel *> _vacant_channels{config::max_cores()};
alignas(64) std::atomic<std::int32_t> _remainder_channel_count{0};
alignas(64) util::maybe_atomic<std::uint16_t> _stealing_limit{0};
// Map of channel id to NUMA region id.
alignas(64) std::array<std::uint8_t, config::max_cores()> _channel_numa_node_map{0U};

View File

@@ -5,6 +5,8 @@
#include <cassert>
#include <mx/system/builtin.h>
#include <mx/system/environment.h>
#include <mx/tasking/scheduler.h>
#include <mx/tasking/runtime.h>
#include <mx/system/topology.h>
#include <mx/util/random.h>
#include <base/thread.h>
@@ -14,18 +16,46 @@
#include <trace/timestamp.h>
#include <nova/syscalls.h>
using namespace mx::tasking;
Worker::Worker(const std::uint16_t id, const std::uint16_t target_core_id, const std::uint16_t target_numa_node_id, std::uint64_t* volatile tukija_sig,
const util::maybe_atomic<bool> &is_running, const std::uint16_t prefetch_distance,
const util::maybe_atomic<bool> &is_running, util::BoundMPMCQueue<Channel *> &v, util::maybe_atomic<std::uint16_t> &s, std::atomic<std::int32_t> &e, const std::uint16_t prefetch_distance,
memory::reclamation::LocalEpoch &local_epoch,
const std::atomic<memory::reclamation::epoch_t> &global_epoch, profiling::Statistic &statistic) noexcept
: _target_core_id(target_core_id), _target_numa_node_id(target_numa_node_id), _tukija_signal(tukija_sig), _prefetch_distance(prefetch_distance),
: _target_core_id(target_core_id), _target_numa_node_id(target_numa_node_id), _tukija_signal(reinterpret_cast<struct InfoPage*>(tukija_sig)), _prefetch_distance(prefetch_distance),
_id(id), _local_epoch(local_epoch), _global_epoch(global_epoch),
_statistic(statistic), _is_running(is_running)
_statistic(statistic), _is_running(is_running), _vacant_channels(v), _stealing_limit(s), _excess_queues(e)
{
}
void Worker::yield_channels(std::uint16_t num, Channel *except) {
Channel *chan;
do {
chan = _channels.pop_front_or(nullptr);
if (chan && chan != except) {
//Genode::log("Worker ", _id, " returns channel ", chan->id());
_vacant_channels.push_back(chan);
_count_channels--;
}
num--;
} while (num && chan);
if (current && current != except) {
_vacant_channels.push_back(current);
//Genode::log("Worker ", _id, " returns channel ", current->id());
/* If yield_channels is called by the foreman, it is likely that this happenend due to a previous call to runtime::stop. In this case the foreman will release all channels but channel 0. However, it is possible that the foreman called runtime::stop while processing another channel than 0 which after this call will not belong to it anymore. Thus, to avoid having the foreman process a channel that has been yielded and risking nasty race conditions, the foreman takes the last remaining channel (0). */
if (_id == 0 && current->id() != 0) {
current = _channels.pop_front_or(nullptr);
assert(current != nullptr);
}
else
current = nullptr;
}
_count_channels--;
}
void Worker::execute()
{
/*{
@@ -34,42 +64,86 @@ void Worker::execute()
self->pin(loc);
}*/
if (_id != 0)
sleep();
Nova::mword_t pcpu = 0;
Nova::cpu_id(pcpu);
while (this->_is_running == false)
{
system::builtin::pause();
_phys_core_id = pcpu;
_my_page = &_tukija_signal[phys_core_id()];
if (_id != 0) {
sleep();
} else if (_id == 0) /* foreman */ {
current = _channels.pop_front();
}
_is_sleeping = false;
wait_for_hooter();
runtime::scheduler().register_worker(this);
//Genode::log("Worker ", _id, "(", _phys_core_id, ")",
// " woke up. is_runnin = ", (_is_running ? "true" : "false"));
TaskInterface *task;
const auto core_id = system::topology::core_id();
//assert(this->_target_core_id == core_id && "Worker not pinned to correct core.");
Nova::mword_t pcpu = 0;
Nova::cpu_id(pcpu);
_phys_core_id = pcpu;
auto phase = priority::normal;
//Genode::log("Worker ", _channel.id(), "(", _phys_core_id ,")", " woke up");
std::uint64_t *volatile tukija_signal = &_tukija_signal[_phys_core_id];
this->current = this->_channels.pop_front();
auto channel_id = current->id();
while (this->_is_running)
while (true)
{
handle_resume();
while (!current) {
//Genode::log("Worker ",_id,": No queues for me.");
std::uint16_t expect = 0;
bool shall_yield = !__atomic_compare_exchange_n(&_my_page->yield_flag, &expect, 2, false, __ATOMIC_SEQ_CST, __ATOMIC_RELAXED);
if (shall_yield) {
handle_yield();
return;
}
deregister();
sleep();
handle_resume();
}
//Genode::log("Worker ", _id, " is now processing channel ", this->current->id());
if constexpr (config::memory_reclamation() == config::UpdateEpochPeriodically)
{
this->_local_epoch.enter(this->_global_epoch);
}
this->current->fill();
auto channel_id = current->id();
if (this->current->size() == 0) {
//Genode::log("Channel ", _channel.id(), " empty. Going to sleep");
sleep();
//Genode::log("Worker on CPU ", _phys_core_id, " woke up at ", Genode::Trace::timestamp());
if (current->phase() != phase) {
if (phase == priority::normal)
phase = priority::low;
else /* phase == priority::low */ {
phase = priority::normal;
}
//Genode::log("Switched phase to ", (phase == priority::normal)? "normal" : "low");
}
if (phase == priority::normal)
current->fill<priority::normal>();
else {
current->fill<priority::low>();
}
if (phase == priority::normal && current->empty()) {
Genode::Trace::Timestamp thefts_start = Genode::Trace::timestamp();
steal(false);
Genode::Trace::Timestamp theft_stop = Genode::Trace::timestamp();
Genode::Trace::Timestamp cost = theft_stop - thefts_start;
_stealing_cost += cost;
if (cost > _max_cost)
_max_cost = cost;
if ((_min_cost == 0) || (_min_cost > cost))
_min_cost = cost;
_thefts++;
//_channels.push_back(current);
// current = _channels.pop_front_or(nullptr);
}
if constexpr (config::task_statistics())
@@ -147,15 +221,24 @@ void Worker::execute()
{
runtime::delete_task(core_id, task);
}
handle_yield();
}
if (__atomic_load_n(tukija_signal, __ATOMIC_SEQ_CST)) {
Genode::log("Got yield signal ", _phys_core_id);
yield();
handle_yield();
handle_stop();
current->switch_phase();
_channels.push_back(current);
current = _channels.pop_front_or(nullptr);
}
}
void Worker::registrate() {
runtime::scheduler().register_worker(this);
}
//Genode::log("Worker on CPU ", _phys_core_id, " going to stop");
sleep();
void Worker::deregister() {
runtime::scheduler().deregister_worker(this);
}
TaskResult Worker::execute_exclusive_latched(const std::uint16_t core_id, const std::uint16_t channel_id,

View File

@@ -24,7 +24,7 @@ class alignas(64) Worker
{
public:
Worker(std::uint16_t id, std::uint16_t target_core_id, std::uint16_t target_numa_node_id, std::uint64_t* volatile tukija_sig,
const util::maybe_atomic<bool> &is_running, std::uint16_t prefetch_distance,
const util::maybe_atomic<bool> &is_running, util::BoundMPMCQueue<Channel *> &v, util::maybe_atomic<std::uint16_t> &s, std::atomic<std::int32_t> &e, std::uint16_t prefetch_distance,
memory::reclamation::LocalEpoch &local_epoch, const std::atomic<memory::reclamation::epoch_t> &global_epoch,
profiling::Statistic &statistic) noexcept;
@@ -55,18 +55,77 @@ public:
Genode::log("Woke core cmap = ", alloc);*/
}
bool sleeping() { return _is_sleeping; }
bool has_loot() {
_channel_lock.lock();
bool wealthy = _count_channels > 1 || _is_sleeping;
_channel_lock.unlock();
/**
* Steal a channel from another worker
*/
bool steal();
return wealthy;
}
[[nodiscard]] std::uint16_t number_of_channels() { return _count_channels; }
inline bool steal(bool init=true)
{
if (!_is_running) {
return false;
}
bool got_loot = false;
/* First steal up to individual stealing limit */
if (init) {
while (individual_stealing_limit() > 0)
{
Channel *loot = _vacant_channels.pop_front_or(nullptr);
if (loot) {
this->assign(loot);
//Genode::log("Worker ", _id, ": Stole channel ", loot->id());
got_loot |= true;
} else /* no vacant queues anymore, so stop stealing */
break;
}
} else if (_excess_queues.load(std::memory_order_relaxed) <= 0) {
Genode::Trace::Timestamp deq_start = Genode::Trace::timestamp();
Channel *loot = _vacant_channels.pop_front_or(nullptr);
Genode::Trace::Timestamp deq_stop = Genode::Trace::timestamp();
_mean_dequeue_cost += deq_stop - deq_start;
dequeues++;
if (loot)
{
loot->phase(priority::normal);
Genode::Trace::Timestamp enq_start = Genode::Trace::timestamp();
this->assign(loot);
Genode::Trace::Timestamp enq_stop = Genode::Trace::timestamp();
_mean_enqueue_cost += enq_stop - enq_start;
enqueues++;
got_loot |= true;
if (_vacant_channels.size() == 0)
_excess_queues.store(1);
// Genode::log("Worker ", _id, " Took remaining queue ", loot->id(), " with ", _vacant_channels.size(),
// " excess queues");
}
}
return got_loot;
}
bool yield_signaled() { return __atomic_load_n(&_my_page->yield_flag, __ATOMIC_SEQ_CST) == 1; }
std::uint16_t stealing_limit() {
return static_cast<std::uint16_t>(_my_page->limit);
}
std::uint16_t remaining_queues() { return static_cast<std::uint16_t>(_my_page->remainder); }
/**
* Assign a channel to this worker
* This method is called by the scheduler to assign each worker an initial channel upon start. That's because initially none of the workers has any channel assigned yet. If it would try to steal one from another worker upon initialization it would not find a channel to begin with. Since this would apply for all channels the application would just stall forever, never finding a channel to steal.
*/
void assign(Channel *channel) { _channels.push_back(channel); }
void assign(Channel *channel) { _channels.push_back(channel);
_count_channels++;
}
/**
* @return Id of the logical core this worker runs on.
@@ -78,7 +137,42 @@ public:
[[nodiscard]] std::uint16_t numa_id() const noexcept { return _target_numa_node_id; }
/**
* @return Id of the physical core this worker runs on.
*/
[[nodiscard]] std::uint16_t phys_core_id() const noexcept { return _phys_core_id; }
/**
* @return Id of this worker
*/
[[nodiscard]] std::uint16_t id() const noexcept { return _id; }
/**
* @return the number of channels this worker currently owns.
*/
[[nodiscard]] std::uint16_t count_channels() { return _count_channels; }
/**
*
*/
Channel *current_channel() { return current; }
/**
* Yields a number of channels except the channel given
* @param num, the number of channels to yield
* @param channel, the channel to keep
*/
void yield_channels(std::uint16_t num, Channel *except);
private:
struct InfoPage {
volatile std::uint16_t yield_flag;
volatile std::uint16_t limit;
volatile std::uint16_t remainder;
std::uint16_t padding;
unsigned long pad[7];
};
// Id of the logical core.
const std::uint16_t _target_core_id;
@@ -90,6 +184,9 @@ private:
std::uint16_t _phys_core_id{0};
std::uint16_t _id{0};
mx::synchronization::Spinlock _channel_lock{};
// std::int32_t _channel_size{0U};
// Stack for persisting tasks in optimistic execution. Optimistically
@@ -99,8 +196,22 @@ private:
// Channel where tasks are stored for execution.
alignas(64) util::BoundMPMCQueue<Channel *> _channels{config::max_cores()};
alignas(64) struct InfoPage volatile *_my_page{nullptr};
alignas(64) Channel *current{nullptr};
/**
* Profiling data structures
*/
alignas(64) unsigned long _thefts{0};
alignas(64) Genode::Trace::Timestamp _stealing_cost{0};
alignas(64) Genode::Trace::Timestamp _max_cost{0};
alignas(64) Genode::Trace::Timestamp _min_cost{0};
alignas(64) Genode::Trace::Timestamp _mean_enqueue_cost{0};
alignas(64) Genode::Trace::Timestamp _mean_dequeue_cost{0};
alignas(64) unsigned long enqueues{1};
alignas(64) unsigned long dequeues{1};
// Local epoch of this worker.
memory::reclamation::LocalEpoch &_local_epoch;
@@ -113,13 +224,25 @@ private:
// Flag for "running" state of MxTasking.
const util::maybe_atomic<bool> &_is_running;
// Reference to queue of vacant channels
util::BoundMPMCQueue<Channel *> &_vacant_channels;
// Limit for stealing
const util::maybe_atomic<std::uint16_t> &_stealing_limit;
// Global number of excess queues
alignas(64) std::atomic<std::int32_t> &_excess_queues;
// Communication channel to Tukija
std::uint64_t *volatile _tukija_signal;
struct InfoPage *volatile _tukija_signal;
// Number of channels currently owned by this worker
std::atomic<std::uint16_t> _count_channels{0};
// Flag for "sleeping" state of this worker
util::maybe_atomic<bool> _is_sleeping{false};
void sleep() { //_is_sleeping = true;
void sleep() { _is_sleeping = true;
Nova::yield();
}
@@ -127,6 +250,9 @@ private:
Nova::yield(false);
}
inline std::int32_t individual_stealing_limit() { std::int32_t limit = static_cast<std::int32_t>(stealing_limit()) - static_cast<std::int32_t>(_count_channels);
return limit;
}
/**
* Analyzes the given task and chooses the execution method regarding synchronization.
@@ -186,5 +312,95 @@ private:
*/
TaskResult execute_optimistic_read(std::uint16_t core_id, std::uint16_t channel_id,
resource::ResourceInterface *resource, TaskInterface *task);
inline void wait_for_hooter()
{
while (this->_is_running == false)
{
system::builtin::pause();
}
}
inline void handle_yield()
{
if (yield_signaled()) {
//Genode::log("Got yield signal ", _phys_core_id);
yield_channels(_count_channels, nullptr);
_excess_queues.fetch_sub(1);
deregister();
yield();
//Genode::log("Worker on CPU ", _phys_core_id, " returned.");
registrate();
wait_for_hooter();
_thefts = 0;
_stealing_cost = 0;
_max_cost = 0;
_min_cost = 0;
_mean_enqueue_cost = 0;
_mean_dequeue_cost = 0;
enqueues = 1;
dequeues = 1;
handle_resume();
}
}
inline void handle_stop()
{
if (!_is_running) {
std::uint16_t expect = 0;
bool shall_yield = !__atomic_compare_exchange_n(&_my_page->yield_flag, &expect, 2, false, __ATOMIC_SEQ_CST, __ATOMIC_RELAXED);
if (shall_yield) {
handle_yield();
return;
}
Genode::log("Worker ", _id, ": thefts=", _thefts, " cost_total=", _stealing_cost, "avg cost_per_theft=", _stealing_cost/_thefts, " min cost/theft=", _min_cost, " max cost/theft=", _max_cost, " avg deq=", _mean_dequeue_cost, " avg enq=", _mean_enqueue_cost/enqueues, " #deqs=", dequeues, " #enqs=", enqueues);
// Genode::log("Worker ", _id, " woke up again");
yield_channels(_count_channels, nullptr);
deregister();
sleep();
expect = 2;
shall_yield = !__atomic_compare_exchange_n(&_my_page->yield_flag, &expect, 0, false, __ATOMIC_SEQ_CST, __ATOMIC_RELAXED);
registrate();
if (shall_yield) {
handle_yield();
return;
}
wait_for_hooter();
_thefts = 0;
_stealing_cost = 0;
_max_cost = 0;
_min_cost = 0;
_mean_enqueue_cost = 0;
_mean_dequeue_cost = 0;
enqueues = 1;
dequeues = 1;
handle_resume();
}
}
inline void handle_resume()
{
if (!current) {
unsigned int loops = 0;
while (!steal() )
{
loops++;
handle_yield();
handle_stop();
}
// Genode::log("Worker ", _id, " stole ", static_cast<std::uint32_t>(_count_channels), " channels.");
if (_excess_queues.fetch_sub(1) <= 1)
;
//Genode::log("Entering stealing phase 2");
current = _channels.pop_front_or(nullptr);
}
}
void registrate();
void deregister();
};
} // namespace mx::tasking

View File

@@ -7,6 +7,7 @@
#include <cstring>
#include <mx/memory/global_heap.h>
#include <mx/system/builtin.h>
#include <mx/synchronization/spinlock.h>
namespace mx::util {
/**
@@ -50,6 +51,7 @@ public:
{
system::builtin::pause();
}
_length.fetch_add(1);
}
/**
@@ -64,6 +66,7 @@ public:
{
system::builtin::pause();
}
_length.fetch_sub(1);
return item;
}
@@ -78,6 +81,7 @@ public:
T item;
if ( try_pop_front(item))
{
_length.fetch_sub(1);
return item;
}
else
@@ -158,6 +162,10 @@ public:
return true;
}
std::uint32_t size() {
return _length;
}
private:
// Capacity of the queue.
const std::uint32_t _capacity;
@@ -165,10 +173,14 @@ private:
// Array of status flags and data slots.
std::pair<std::atomic_uint64_t, T> *_storage;
std::atomic<std::uint32_t> _length{0};
// Index of the head.
alignas(64) std::atomic_uint64_t _head{0U};
// Index of the tail.
alignas(64) std::atomic_uint64_t _tail{0U};
alignas(64) mx::synchronization::Spinlock _lock{};
};
} // namespace mx::util