Log workers' affinities.

This commit is contained in:
Michael Mueller
2024-10-10 18:39:02 +02:00
parent 28be6142f6
commit 7a2bce669b
3 changed files with 25 additions and 5 deletions

View File

@@ -49,6 +49,7 @@ template <typename P> class InterimResult
public:
InterimResult(const std::uint64_t operation_count, const P &phase, const std::uint16_t iteration,
const std::uint16_t core_count, const std::chrono::milliseconds time,
const std::array<std::uint16_t,mx::tasking::config::max_cores()> &affinities,
std::vector<PerfCounter> &counter, std::unordered_map<std::uint16_t, std::uint64_t> executed_tasks,
std::unordered_map<std::uint16_t, std::uint64_t> executed_reader_tasks,
std::unordered_map<std::uint16_t, std::uint64_t> executed_writer_tasks,
@@ -56,7 +57,7 @@ public:
std::unordered_map<std::uint16_t, std::uint64_t> scheduled_tasks_on_core,
std::unordered_map<std::uint16_t, std::uint64_t> scheduled_tasks_off_core,
std::unordered_map<std::uint16_t, std::uint64_t> worker_fills)
: _operation_count(operation_count), _phase(phase), _iteration(iteration), _core_count(core_count), _time(time),
: _operation_count(operation_count), _phase(phase), _iteration(iteration), _core_count(core_count), _time(time), _affinities(affinities),
_executed_tasks(std::move(executed_tasks)), _executed_reader_tasks(std::move(executed_reader_tasks)),
_executed_writer_tasks(std::move(executed_writer_tasks)), _scheduled_tasks(std::move(scheduled_tasks)),
_scheduled_tasks_on_core(std::move(scheduled_tasks_on_core)),
@@ -75,6 +76,7 @@ public:
std::uint16_t iteration() const noexcept { return _iteration; }
std::uint16_t core_count() const noexcept { return _core_count; }
std::chrono::milliseconds time() const noexcept { return _time; }
const std::array<std::uint16_t,mx::tasking::config::max_cores()> &affinities() const noexcept { return _affinities; }
double throughput() const { return _operation_count / (_time.count() / 1000.0); }
const std::vector<std::pair<std::string, double>> &performance_counter() const noexcept
{
@@ -122,6 +124,7 @@ public:
json["cores"] = core_count();
json["phase"] = phase();
json["throughput"] = throughput();
json["affinities"] = affinities();
for (const auto &[name, value] : performance_counter())
{
json[name] = value / double(operation_count());
@@ -145,6 +148,7 @@ private:
const std::uint16_t _iteration;
const std::uint16_t _core_count;
const std::chrono::milliseconds _time;
const std::array<std::uint16_t,mx::tasking::config::max_cores()> &_affinities;
std::vector<std::pair<std::string, double>> _performance_counter;
const std::unordered_map<std::uint16_t, std::uint64_t> _executed_tasks;
const std::unordered_map<std::uint16_t, std::uint64_t> _executed_reader_tasks;
@@ -194,6 +198,7 @@ public:
_current_iteration,
_core_set.size(),
milliseconds,
mx::tasking::runtime::worker_affinities(),
_perf.counter(),
statistic_map(mx::tasking::profiling::Statistic::Executed),
statistic_map(mx::tasking::profiling::Statistic::ExecutedReader),

View File

@@ -6,16 +6,17 @@
#include <mx/system/builtin.h>
#include <mx/system/topology.h>
#include <mx/util/random.h>
#include <iostream>
using namespace mx::tasking;
Worker::Worker(const std::uint16_t id, const std::uint16_t target_core_id, const std::uint16_t target_numa_node_id,
const util::maybe_atomic<bool> &is_running, const std::uint16_t prefetch_distance,
memory::reclamation::LocalEpoch &local_epoch,
const std::atomic<memory::reclamation::epoch_t> &global_epoch, profiling::Statistic &statistic) noexcept
const std::atomic<memory::reclamation::epoch_t> &global_epoch, profiling::Statistic &statistic, synchronization::Spinlock &cout_lock) noexcept
: _target_core_id(target_core_id), _prefetch_distance(prefetch_distance),
_channel(id, target_numa_node_id, prefetch_distance), _local_epoch(local_epoch), _global_epoch(global_epoch),
_statistic(statistic), _is_running(is_running)
_statistic(statistic), _is_running(is_running), _cout_lock(cout_lock)
{
}
@@ -32,8 +33,15 @@ void Worker::execute()
//assert(this->_target_core_id == core_id && "Worker not pinned to correct core.");
const auto channel_id = this->_channel.id();
while (this->_is_running)
_phys_core_id = static_cast<std::uint16_t>(sched_getcpu());
while (true)
{
while (this->_is_running == false)
{
system::builtin::pause();
}
if constexpr (config::memory_reclamation() == config::UpdateEpochPeriodically)
{
this->_local_epoch.enter(this->_global_epoch);

View File

@@ -12,6 +12,7 @@
#include <mx/util/maybe_atomic.h>
#include <variant>
#include <vector>
#include <mx/synchronization/spinlock.h>
namespace mx::tasking {
/**
@@ -23,7 +24,7 @@ public:
Worker(std::uint16_t id, std::uint16_t target_core_id, std::uint16_t target_numa_node_id,
const util::maybe_atomic<bool> &is_running, std::uint16_t prefetch_distance,
memory::reclamation::LocalEpoch &local_epoch, const std::atomic<memory::reclamation::epoch_t> &global_epoch,
profiling::Statistic &statistic) noexcept;
profiling::Statistic &statistic, synchronization::Spinlock &cout_lock) noexcept;
~Worker() noexcept = default;
@@ -40,10 +41,16 @@ public:
[[nodiscard]] Channel &channel() noexcept { return _channel; }
[[nodiscard]] const Channel &channel() const noexcept { return _channel; }
[[nodiscard]] const std::uint16_t phys_core_id() const noexcept { return _phys_core_id; }
private:
// Id of the logical core.
const std::uint16_t _target_core_id;
std::uint16_t _phys_core_id{0};
synchronization::Spinlock &_cout_lock;
// Distance of prefetching tasks.
const std::uint16_t _prefetch_distance;