mxtasking: Optimized work-stealing.

This commit is contained in:
Michael Mueller
2025-08-29 17:16:27 +02:00
parent 21b8726a59
commit 8cdefc4c68
3 changed files with 57 additions and 33 deletions

View File

@@ -74,12 +74,14 @@ void Worker::execute()
self->pin(loc);
}*/
_phys_core_id = Tukija::Cip::cip()->get_cpu_index();
if (_id != 0) {
sleep();
} else if (_id == 0) /* foreman */ {
Genode::log("Foreman starting");
current = _channels.pop_front();
current = _channels.pop_front();
registrate();
}
_is_sleeping = false;
@@ -87,8 +89,6 @@ void Worker::execute()
wait_for_hooter();
//Genode::log("Hooter sounded");
_phys_core_id = Tukija::Cip::cip()->get_cpu_index();
runtime::scheduler().register_worker(this);
//Genode::log("Worker ", _id, "(", _phys_core_id, ")",
// " woke up. is_runnin = ", (_is_running ? "true" : "false"));
@@ -102,9 +102,12 @@ void Worker::execute()
{
handle_resume();
//handle_channel_occupancy();
while (!current)
{
//Genode::log("Worker ",_id,": No queues for me.");
while (!current) {
if (steal(false)) {
current = _channels.pop_front();
break;
}
Genode::warning("Worker ",_id,": No queues for me.");
unsigned long expect = 0;
bool shall_yield = !__atomic_compare_exchange_n(&(Tukija::Cip::cip()->worker_for_location(Genode::Thread::myself()->affinity()).yield_flag), &expect, 2, false, __ATOMIC_SEQ_CST, __ATOMIC_RELAXED);
@@ -125,7 +128,13 @@ void Worker::execute()
auto channel_id = current->id();
current->fill();
current->fill();
if (this->current->empty()) {
steal(false);
_channels.push_back(current);
current = _channels.pop_front();
}
if constexpr (config::task_statistics())
{
@@ -207,13 +216,15 @@ void Worker::execute()
}
steal(false);
//steal(false);
handle_yield();
handle_stop();
_channels.push_back(current);
current = _channels.pop_front();
/*
_channels.push_back(current);
current = _channels.pop_front();
*/
}
}

View File

@@ -3,6 +3,7 @@
#include "base/thread.h"
#include "channel.h"
#include "config.h"
#include "debug/helper_functions.h"
#include "profiling/statistic.h"
#include "task.h"
#include "task_stack.h"
@@ -54,7 +55,7 @@ public:
Channel *get(std::uint64_t idx);
inline bool pick_channel(std::uint64_t offset, std::uint64_t limit, bool change_phase_to_normal = false)
{
{
//Genode::Trace::Timestamp deq_start = Genode::Trace::timestamp();
std::uint64_t cidx = _vacant_channels.alloc_randomly( offset, limit);
//Genode::Trace::Timestamp deq_stop = Genode::Trace::timestamp();
@@ -82,32 +83,36 @@ public:
inline bool steal(bool init=true)
{
if (!_is_running || !_may_steal) {
if (!_is_running || !_may_steal) {
return false;
}
if (!_vacant_channels.has_free_fields())
return false;
bool got_loot = false;
/* First steal up to individual stealing limit */
if (init) {
while (individual_stealing_limit() > 0)
{
if ((got_loot |= pick_channel(_id, stealing_limit())))
break;
if ((got_loot |= pick_channel(_id, stealing_limit()))) continue;
if (!got_loot) break;
}
} else if (_excess_queues.load(std::memory_order_relaxed) <= 0) {
got_loot |= pick_channel(1, 63, true);
got_loot |= pick_channel(1, Tukija::Cip::cip()->channel_info.count, true);
}
return got_loot;
}
bool yield_signaled() { return __atomic_load_n(&(Tukija::Cip::cip()->worker_for_location(Genode::Thread::myself()->affinity()).yield_flag), __ATOMIC_SEQ_CST) == 1; }
inline bool yield_signaled() { return __atomic_load_n(&(Tukija::Cip::cip()->worker_for_location(Genode::Thread::myself()->affinity()).yield_flag), __ATOMIC_RELAXED) == 1; }
std::uint16_t stealing_limit() {
inline std::uint16_t stealing_limit() {
return static_cast<std::uint16_t>(Tukija::Cip::cip()->channel_info.limit);
}
std::uint16_t remaining_queues() { return static_cast<std::uint16_t>(Tukija::Cip::cip()->channel_info.remainder); }
inline std::uint16_t remaining_queues() { return static_cast<std::uint16_t>(Tukija::Cip::cip()->channel_info.remainder); }
/**
* Assign a channel to this worker
@@ -145,7 +150,7 @@ public:
/**
*
*/
Channel *current_channel() { return current; }
inline Channel *current_channel() { return current; }
/**
* Yields a number of channels except the channel given
@@ -315,7 +320,6 @@ private:
deregister();
yield();
//Genode::log("Worker on CPU ", _phys_core_id, " returned.");
registrate();
wait_for_hooter();
_thefts = 0;
/*
@@ -332,14 +336,16 @@ private:
inline void handle_stop()
{
if (!_is_running) {
if (!_is_running) {
_may_steal = true;
unsigned long expect = 0;
if (yield_signaled()) {
handle_yield();
return;
}
//Genode::log("Worker ", _id, ": thefs=", _thefts);
//Genode::log("Worker ", _id, ": attempted thefts: ", dequeues);
//Genode::log("Worker ", _id, ": thefs=", _thefts);
// Genode::log("Worker ", _id, ": thefts=", _thefts, " cost_total=", _stealing_cost, "avg cost_per_theft=",
// _stealing_cost/_thefts, " min cost/theft=", _min_cost, " max cost/theft=", _max_cost, " avg deq=",
// _mean_dequeue_cost/dequeues, " avg enq=", _mean_enqueue_cost/enqueues, " #deqs=", dequeues, " #enqs=",
@@ -349,7 +355,6 @@ private:
deregister();
sleep();
registrate();
if (yield_signaled()) {
handle_yield();
@@ -357,7 +362,8 @@ private:
}
wait_for_hooter();
_thefts = 0;
//_thefts = 0;
//dequeues = 1;
/*_stealing_cost = 0;
_max_cost = 0;
_min_cost = 0;
@@ -372,17 +378,16 @@ private:
inline void handle_resume()
{
if (!current) {
unsigned int loops = 0;
_may_steal = true;
while (!steal() )
if (!steal() )
{
loops++;
handle_yield();
handle_stop();
}
// Genode::log("Worker ", _id, " stole ", static_cast<std::uint32_t>(_count_channels), " channels.");
_excess_queues.fetch_sub(1);
_excess_queues.fetch_sub(1);
//Genode::log("Worker ", _id, " stole ", static_cast<std::uint32_t>(_count_channels), " channels.");
current = _channels.pop_front();
registrate();
}
}

View File

@@ -32,6 +32,9 @@ class mx::util::Field_Allocator
std::size_t candidate = offset;
// +rng.next(limit);
if (candidate >= _count)
return 0;
if (candidate > (offset + limit))
return 0;
@@ -52,5 +55,10 @@ class mx::util::Field_Allocator
void release(std::size_t field) {
_fields[field].reserved.store(false);
free_fields.fetch_add(1);
}
}
bool has_free_fields()
{
return static_cast<bool>(free_fields.load());
}
};