mirror of
https://github.com/mmueller41/mxtasking.git
synced 2026-01-21 12:42:57 +01:00
mxtasking: Integration with Tukija.
This commit is contained in:
@@ -54,6 +54,8 @@ public:
|
||||
*/
|
||||
static Libc::Env &env() { return Environment::get_instance().getenv(); }
|
||||
|
||||
static Libc::Env *envp() { return Environment::get_instance()._env; }
|
||||
|
||||
/**
|
||||
* @brief Set the libc env object
|
||||
*
|
||||
|
||||
@@ -23,7 +23,7 @@ public:
|
||||
static std::uint16_t core_id()
|
||||
{
|
||||
Genode::Affinity::Location loc = Genode::Thread::myself()->affinity();
|
||||
unsigned width = Environment::cpu().affinity_space().width();
|
||||
unsigned width = Environment::topo().global_affinity_space().total();
|
||||
|
||||
return std::uint16_t(loc.xpos() + loc.ypos() * width);
|
||||
}
|
||||
@@ -39,11 +39,11 @@ public:
|
||||
/**
|
||||
* @return The greatest NUMA region identifier.
|
||||
*/
|
||||
static std::uint8_t max_node_id() { return std::uint8_t(Environment::topo().node_count()-1); }
|
||||
static std::uint8_t max_node_id() { return 7; /*std::uint8_t(Environment::topo().node_count()-1);*/ }
|
||||
|
||||
/**
|
||||
* @return Number of available cores.
|
||||
*/
|
||||
static std::uint16_t count_cores() { return std::uint16_t(Environment::cpu().affinity_space().total()); }
|
||||
static std::uint16_t count_cores() { return std::uint16_t(Environment::topo().global_affinity_space().total()); }
|
||||
};
|
||||
} // namespace mx::system
|
||||
@@ -28,6 +28,6 @@ public:
|
||||
// If enabled, memory will be reclaimed while using optimistic
|
||||
// synchronization by epoch-based reclamation. Otherwise, freeing
|
||||
// memory is unsafe.
|
||||
static constexpr auto memory_reclamation() { return memory_reclamation_scheme::UpdateEpochPeriodically; }
|
||||
static constexpr auto memory_reclamation() { return memory_reclamation_scheme::None; }
|
||||
};
|
||||
} // namespace mx::tasking
|
||||
|
||||
@@ -136,6 +136,8 @@ public:
|
||||
*/
|
||||
static void stop() noexcept { _scheduler->interrupt(); }
|
||||
|
||||
static void resume() noexcept { _scheduler->resume(); }
|
||||
|
||||
/**
|
||||
* Creates a new task.
|
||||
* @param core_id Core to allocate memory from.
|
||||
|
||||
@@ -9,6 +9,8 @@
|
||||
#include <mx/tasking/runtime.h>
|
||||
#include <internal/thread_create.h>
|
||||
#include <base/affinity.h>
|
||||
#include <base/thread.h>
|
||||
#include <nova/syscalls.h>
|
||||
|
||||
using namespace mx::tasking;
|
||||
|
||||
@@ -60,19 +62,29 @@ void Scheduler::start_and_wait()
|
||||
std::vector<pthread_t> worker_threads(space.total() +
|
||||
static_cast<std::uint16_t>(config::memory_reclamation() != config::None));
|
||||
|
||||
for (auto cpu = 0U; cpu < space.total(); ++cpu) {
|
||||
Nova::mword_t start_cpu = 0;
|
||||
Nova::cpu_id(start_cpu);
|
||||
|
||||
for (auto cpu = 1U; cpu < space.total(); ++cpu) {
|
||||
Genode::String<32> const name{"worker", cpu};
|
||||
Libc::pthread_create_from_session(&worker_threads[cpu], Worker::entry, _worker[cpu], 4 * 4096, name.string(),
|
||||
&mx::system::Environment::cpu(), space.location_of_index(cpu));
|
||||
&mx::system::Environment::envp()->cpu(), space.location_of_index(cpu));
|
||||
}
|
||||
|
||||
|
||||
// ... and epoch management (if enabled).
|
||||
if constexpr (config::memory_reclamation() != config::None)
|
||||
{
|
||||
// In case we enable memory reclamation: Use an additional thread.
|
||||
&worker_threads[space.total()], mx::memory::reclamation::EpochManager::enter, &this->_epoch_manager,
|
||||
4 * 4096, "epoch_manager", &mx::system::Environment::cpu(), space.location_of_index(space.total());
|
||||
Genode::log("Creating foreman thread on CPU ", start_cpu);
|
||||
|
||||
Libc::pthread_create_from_session(&worker_threads[0], Worker::entry, _worker[0], 4 * 4096, "foreman",
|
||||
&mx::system::Environment::envp()->cpu(), space.location_of_index(0) );
|
||||
|
||||
Genode::log("Created foreman thread");
|
||||
|
||||
// ... and epoch management (if enabled).
|
||||
if constexpr (config::memory_reclamation() != config::None)
|
||||
{
|
||||
// In case we enable memory reclamation: Use an additional thread.
|
||||
&worker_threads[space.total()], mx::memory::reclamation::EpochManager::enter, &this->_epoch_manager, 4 * 4096,
|
||||
"epoch_manager", &mx::system::Environment::cpu(), space.location_of_index(space.total());
|
||||
}
|
||||
|
||||
// Turning the flag on starts all worker threads to execute tasks.
|
||||
|
||||
@@ -58,6 +58,9 @@ public:
|
||||
this->_profiler.stop();
|
||||
}
|
||||
|
||||
void resume() noexcept { _is_running = true;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return Core set of this instance.
|
||||
*/
|
||||
|
||||
@@ -34,19 +34,26 @@ void Worker::execute()
|
||||
|
||||
self->pin(loc);
|
||||
}*/
|
||||
if (_channel.id() != 0)
|
||||
sleep();
|
||||
|
||||
while (this->_is_running == false)
|
||||
{
|
||||
system::builtin::pause();
|
||||
}
|
||||
|
||||
|
||||
TaskInterface *task;
|
||||
const auto core_id = system::topology::core_id();
|
||||
//assert(this->_target_core_id == core_id && "Worker not pinned to correct core.");
|
||||
const auto channel_id = this->_channel.id();
|
||||
const auto phys_core_id = system::Environment::topo().phys_id(Genode::Thread::myself()->affinity());
|
||||
Nova::mword_t pcpu = 0;
|
||||
Nova::cpu_id(pcpu);
|
||||
|
||||
std::uint64_t *volatile tukija_signal = &_tukija_signal[phys_core_id];
|
||||
_phys_core_id = pcpu;
|
||||
|
||||
//Genode::log("Worker ", _channel.id(), "(", _phys_core_id ,")", " woke up");
|
||||
std::uint64_t *volatile tukija_signal = &_tukija_signal[_phys_core_id];
|
||||
|
||||
while (this->_is_running)
|
||||
{
|
||||
@@ -58,7 +65,9 @@ void Worker::execute()
|
||||
this->_channel_size = this->_channel.fill();
|
||||
|
||||
if (this->_channel_size == 0) {
|
||||
Nova::yield(true);
|
||||
//Genode::log("Channel ", _channel.id(), " empty. Going to sleep");
|
||||
sleep();
|
||||
//Genode::log("Worker on CPU ", _phys_core_id, " woke up at ", Genode::Trace::timestamp());
|
||||
}
|
||||
|
||||
if constexpr (config::task_statistics())
|
||||
@@ -138,10 +147,13 @@ void Worker::execute()
|
||||
}
|
||||
|
||||
if (__atomic_load_n(tukija_signal, __ATOMIC_SEQ_CST)) {
|
||||
Nova::yield(false);
|
||||
Genode::log("Got yield signal ", _phys_core_id);
|
||||
yield();
|
||||
}
|
||||
}
|
||||
}
|
||||
//Genode::log("Worker on CPU ", _phys_core_id, " going to stop");
|
||||
sleep();
|
||||
}
|
||||
|
||||
TaskResult Worker::execute_exclusive_latched(const std::uint16_t core_id, const std::uint16_t channel_id,
|
||||
|
||||
@@ -12,6 +12,8 @@
|
||||
#include <mx/util/maybe_atomic.h>
|
||||
#include <variant>
|
||||
#include <vector>
|
||||
#include <nova/syscalls.h>
|
||||
#include <base/log.h>
|
||||
|
||||
namespace mx::tasking {
|
||||
/**
|
||||
@@ -42,6 +44,19 @@ public:
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
void wake() { _is_sleeping = false;
|
||||
//Genode::log("Waking worker ", _channel.id(), " on CPU ", _phys_core_id);
|
||||
if (Nova::wake_core(static_cast<Nova::mword_t>(_phys_core_id)) != Nova::NOVA_OK)
|
||||
;
|
||||
//Genode::log("Failed to wake up worker on CPU ", _phys_core_id);
|
||||
/*Nova::mword_t alloc;
|
||||
Nova::alloc_cores(1, alloc);
|
||||
Genode::log("Woke core cmap = ", alloc);*/
|
||||
}
|
||||
|
||||
bool sleeping() { return _is_sleeping; }
|
||||
|
||||
|
||||
/**
|
||||
* @return Id of the logical core this worker runs on.
|
||||
*/
|
||||
@@ -57,6 +72,8 @@ private:
|
||||
// Distance of prefetching tasks.
|
||||
const std::uint16_t _prefetch_distance;
|
||||
|
||||
std::uint16_t _phys_core_id{0};
|
||||
|
||||
std::int32_t _channel_size{0U};
|
||||
|
||||
// Stack for persisting tasks in optimistic execution. Optimistically
|
||||
@@ -81,6 +98,18 @@ private:
|
||||
// Communication channel to Tukija
|
||||
std::uint64_t *volatile _tukija_signal;
|
||||
|
||||
// Flag for "sleeping" state of this worker
|
||||
util::maybe_atomic<bool> _is_sleeping{false};
|
||||
|
||||
void sleep() { //_is_sleeping = true;
|
||||
Nova::yield();
|
||||
}
|
||||
|
||||
void yield() { _is_sleeping = true;
|
||||
Nova::yield(false);
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Analyzes the given task and chooses the execution method regarding synchronization.
|
||||
* @param task Task to be executed.
|
||||
|
||||
Reference in New Issue
Block a user