taskingProfiler init

This commit is contained in:
2023-04-24 11:18:16 +02:00
parent fcf0a2810b
commit 6fd37d3f73
7 changed files with 532 additions and 1 deletions

View File

@@ -47,6 +47,7 @@ SET(MX_TASKING_SRC
src/mx/tasking/worker.cpp src/mx/tasking/worker.cpp
src/mx/tasking/task.cpp src/mx/tasking/task.cpp
src/mx/tasking/profiling/profiling_task.cpp src/mx/tasking/profiling/profiling_task.cpp
src/mx/tasking/profiling/tasking_profiler.cpp
src/mx/util/core_set.cpp src/mx/util/core_set.cpp
src/mx/util/random.cpp src/mx/util/random.cpp
src/mx/memory/dynamic_size_allocator.cpp src/mx/memory/dynamic_size_allocator.cpp

View File

@@ -29,5 +29,11 @@ public:
// synchronization by epoch-based reclamation. Otherwise, freeing // synchronization by epoch-based reclamation. Otherwise, freeing
// memory is unsafe. // memory is unsafe.
static constexpr auto memory_reclamation() { return memory_reclamation_scheme::UpdateEpochPeriodically; } static constexpr auto memory_reclamation() { return memory_reclamation_scheme::UpdateEpochPeriodically; }
static constexpr auto tasking_array_length() { return 90000; }
static constexpr auto use_task_queue_length() { return true; }
static constexpr auto use_tasking_profiler() { return true; }
}; };
} // namespace mx::tasking } // namespace mx::tasking

View File

@@ -0,0 +1,365 @@
#include "tasking_profiler.h"
#include <iostream>
#include <chrono>
#include <numeric>
#include <cxxabi.h>
//Numa
#include <mx/tasking/runtime.h>
#include <mx/system/environment.h>
#include <mx/system/topology.h>
#include <base/log.h>
constexpr std::chrono::time_point<std::chrono::high_resolution_clock> TaskingProfiler::tinit;
class PrefetchTask : public mx::tasking::TaskInterface
{
public:
PrefetchTask() {}
~PrefetchTask() override = default;
mx::tasking::TaskResult execute(const std::uint16_t core_id, const std::uint16_t /*channel_id*/) override
{
TaskingProfiler::task_info** task_data = TaskingProfiler::getInstance().getTaskData();
TaskingProfiler::queue_info** queue_data = TaskingProfiler::getInstance().getQueueData();
std::chrono::time_point<std::chrono::high_resolution_clock> tinit = TaskingProfiler::getInstance().getTinit();
for(size_t j = mx::tasking::config::tasking_array_length(); j > 0; j--)
{
task_data[core_id][j] = {0, 0, NULL, tinit, tinit};
__builtin_prefetch(&task_data[core_id][j], 1, 3);
}
for(size_t j = mx::tasking::config::tasking_array_length(); j > 0; j--)
{
queue_data[core_id][j] = {0, tinit};
__builtin_prefetch(&queue_data[core_id][j], 1, 3);
}
return mx::tasking::TaskResult::make_remove();
}
};
void printFloatUS(std::uint64_t ns)
{
std::uint64_t remainder = ns % 1000;
std::uint64_t front = ns / 1000;
char strRemainder[4];
for(int i = 2; i >= 0; i--)
{
strRemainder[i] = '0' + (remainder % 10);
remainder /= 10;
}
strRemainder[3] = '\0';
std::cout << front << '.' << strRemainder;
}
void TaskingProfiler::init(std::uint16_t corenum)
{
Genode::log("Hello from TaskingProfiler::init!");
relTime = std::chrono::high_resolution_clock::now();
corenum++;
this->total_cores = corenum;
uint16_t cpu_numa_node= 0;
//create an array of pointers to task_info structs
task_data = new task_info*[total_cores];
for (std::uint8_t i = 0; i < total_cores; i++)
{
std::uint8_t numa_id = mx::system::topology::node_id(i);
Topology::Numa_region const &node = mx::system::Environment::node(numa_id);
void *cast_evade = static_cast<void*>(new Genode::Regional_heap(mx::system::Environment::ram(), mx::system::Environment::rm(), const_cast<Topology::Numa_region&>(node)));
task_data[i] = static_cast<task_info*>(cast_evade);
}
//create an array of pointers to queue_info structs
queue_data = new queue_info*[total_cores];
for (std::uint16_t i = 0; i < total_cores; i++)
{
std::uint8_t numa_id = mx::system::topology::node_id(i);
Topology::Numa_region const &node = mx::system::Environment::node(numa_id);
void *cast_evade = static_cast<void*>(new Genode::Regional_heap(mx::system::Environment::ram(), mx::system::Environment::rm(), const_cast<Topology::Numa_region&>(node)));
queue_data[i] = static_cast<queue_info*>(cast_evade);
}
//make prefetch tasks for each cpu
for(int i = 0; i < corenum; i++){
auto *prefetchTask = mx::tasking::runtime::new_task<PrefetchTask>(i);
//prefetchTask->annotate(i);
mx::tasking::runtime::spawn(*prefetchTask);
}
task_id_counter = new std::uint64_t[total_cores]{0};
queue_id_counter = new std::uint64_t[total_cores]{0};
}
std::uint64_t TaskingProfiler::startTask(std::uint16_t cpu_core, std::uint32_t type, const char* name)
{
std::chrono::time_point<std::chrono::high_resolution_clock> start = std::chrono::high_resolution_clock::now();
const std::uint16_t cpu_id = cpu_core;
const std::uint64_t tid = task_id_counter[cpu_id]++;
task_info& ti = task_data[cpu_id][tid];
ti.id = tid;
ti.type = type;
ti.name = name;
ti._start = start;
std::chrono::time_point<std::chrono::high_resolution_clock> end = std::chrono::high_resolution_clock::now();
overhead += std::chrono::duration_cast<std::chrono::nanoseconds>(end - start).count();
return ti.id;
}
void TaskingProfiler::endTask(std::uint16_t cpu_core, std::uint64_t id)
{
std::chrono::time_point<std::chrono::high_resolution_clock> start = std::chrono::high_resolution_clock::now();
task_info& ti = task_data[cpu_core][id];
ti._end = std::chrono::high_resolution_clock::now();
std::chrono::time_point<std::chrono::high_resolution_clock> end = std::chrono::high_resolution_clock::now();
overhead += std::chrono::duration_cast<std::chrono::nanoseconds>(end - start).count();
TaskingProfiler::getInstance().saveProfile();
}
void TaskingProfiler::enqueue(std::uint16_t corenum){
std::chrono::time_point<std::chrono::high_resolution_clock> timestamp = std::chrono::high_resolution_clock::now();
const std::uint64_t qid = __atomic_add_fetch(&queue_id_counter[corenum], 1, __ATOMIC_SEQ_CST);
queue_info& qi = queue_data[corenum][qid];
qi.id = qid;
qi.timestamp = timestamp;
std::chrono::time_point<std::chrono::high_resolution_clock> endTimestamp = std::chrono::high_resolution_clock::now();
taskQueueOverhead += std::chrono::duration_cast<std::chrono::nanoseconds>(endTimestamp - timestamp).count();
}
void TaskingProfiler::saveProfile()
{
Genode::log("Saving Profile");
std::uint64_t overhead_ms = overhead/1000000;
std::cout << "Overhead-Time: " << overhead << "ns in ms: " << overhead_ms << "ms" << std::endl;
std::uint64_t taskQueueOverhead_ms = taskQueueOverhead/1000000;
std::cout << "TaskQueueOverhead-Time: " << taskQueueOverhead << "ns in ms: " << taskQueueOverhead_ms << "ms" << std::endl;
//get the number of tasks overal
std::uint64_t tasknum = 0;
for (std::uint16_t i = 0; i < total_cores; i++)
{
tasknum += task_id_counter[i];
}
std::cout << "Number of tasks: " << tasknum << std::endl;
std::cout << "Overhead-Time per Task: " << overhead/tasknum << "ns" << std::endl;
bool first = false;
std::uint64_t firstTime = 0;
std::uint64_t throughput = 0;
std::uint64_t duration = 0;
std::uint64_t lastEndTime = 0;
std::uint64_t taskQueueLength;
std::uint64_t timestamp = 0;
std::uint64_t start = 0;
std::uint64_t end = 0;
char* name;
std::uint64_t taskCounter = 0;
std::uint64_t queueCounter = 0;
std::cout << "--Save--" << std::endl;
std::cout << "{\"traceEvents\":[" << std::endl;
//Events
for(std::uint16_t cpu_id = 0; cpu_id < total_cores; cpu_id++)
{
//Metadata Events for each core (CPU instead of process as name,...)
std::cout << "{\"name\":\"process_name\",\"ph\":\"M\",\"pid\":" << cpu_id << ",\"tid\":" << cpu_id << ",\"args\":{\"name\":\"CPU\"}}," << std::endl;
std::cout << "{\"name\":\"process_sort_index\",\"ph\":\"M\",\"pid\":" << cpu_id << ",\"tid\":" << cpu_id << ",\"args\":{\"name\":" << cpu_id << "}}," << std::endl;
if (mx::tasking::config::use_task_queue_length()){
taskQueueLength = 0;
taskCounter = 0;
queueCounter = 1;
//Initial taskQueueLength is zero
std::cout << "{\"pid\":" << cpu_id << ",\"name\":\"CPU" << cpu_id << "\",\"ph\":\"C\",\"ts\":";
printFloatUS(0);
std::cout << ",\"args\":{\"TaskQueueLength\":" << taskQueueLength << "}}," << std::endl;
//go through all tasks and queues
while(taskCounter<task_id_counter[cpu_id] || queueCounter<queue_id_counter[cpu_id]){
//get the next task and queue
queue_info& qi = queue_data[cpu_id][queueCounter];
task_info& ti = task_data[cpu_id][taskCounter];
//get the time from the queue element if existing
if(queueCounter < queue_id_counter[cpu_id]){
timestamp = std::chrono::duration_cast<std::chrono::nanoseconds>(qi.timestamp - relTime).count();
}
//get the time's from the task element if existing
if(taskCounter < task_id_counter[cpu_id]){
start = std::chrono::duration_cast<std::chrono::nanoseconds>(ti._start - relTime).count();
end = std::chrono::duration_cast<std::chrono::nanoseconds>(ti._end - relTime).count();
name = abi::__cxa_demangle(ti.name, 0, 0, 0);
}
//get the first time
if(!first){
first = true;
if(timestamp < start){
firstTime = timestamp;
}
else{
firstTime = start;
}
}
//if the queue element is before the task element, it is an enqueue
if(qi.timestamp < ti._start && queueCounter <= queue_id_counter[cpu_id]){
queueCounter++;
taskQueueLength++;
std::cout << "{\"pid\":" << cpu_id << ",\"name\":\"CPU" << cpu_id << "\",\"ph\":\"C\",\"ts\":";
if(timestamp - firstTime == 0){
printFloatUS(10);
}
else{
printFloatUS(timestamp-firstTime);
}
std::cout << ",\"args\":{\"TaskQueueLength\":" << taskQueueLength << "}}," << std::endl;
}
//else we print the task itself and a dequeue
else{
taskCounter++;
taskQueueLength--;
//taskQueueLength
std::cout << "{\"pid\":" << cpu_id << ",\"name\":\"CPU" << cpu_id << "\",\"ph\":\"C\",\"ts\":";
printFloatUS(start-firstTime);
std::cout << ",\"args\":{\"TaskQueueLength\":" << taskQueueLength << "}}," << std::endl;
//if the endtime of the last task is too large (cannot display right)
if(taskCounter == task_id_counter[cpu_id] && ti._end == tinit){
end = start + 1000;
}
//Task itself
std::cout << "{\"pid\":" << cpu_id << ",\"tid\":" << cpu_id << ",\"ts\":";
printFloatUS(start-firstTime);
std::cout << ",\"dur\":";
printFloatUS(end-start);
std::cout << ",\"ph\":\"X\",\"name\":\"" << name << "\",\"args\":{\"type\":" << ti.type << "}}," << std::endl;
//reset throughput if there is a gap of more than 1us
if (start - lastEndTime > 1000 && lastEndTime != 0){
std::cout << "{\"pid\":" << cpu_id << ",\"name\":\"CPU" << cpu_id << "\",\"ph\":\"C\",\"ts\":";
printFloatUS(lastEndTime - firstTime);
std::cout << ",\"args\":{\"TaskThroughput\":";
//Tasks per microsecond is zero
std::cout << 0;
std::cout << "}}," << std::endl;
}
duration = end - start;
//Task Throughput
std::cout << "{\"pid\":" << cpu_id << ",\"name\":\"CPU" << cpu_id << "\",\"ph\":\"C\",\"ts\":";
printFloatUS(start-firstTime);
std::cout << ",\"args\":{\"TaskThroughput\":";
//Tasks per microsecond
throughput = 1000/duration;
std::cout << throughput;
std::cout << "}}," << std::endl;
lastEndTime = end;
}
}
}
else{
for(std::uint32_t i = 0; i < task_id_counter[cpu_id]; i++){
task_info& ti = task_data[cpu_id][i];
// check if task is valid
if(ti._end == tinit)
{
continue;
}
start = std::chrono::duration_cast<std::chrono::nanoseconds>(ti._start - relTime).count();
end = std::chrono::duration_cast<std::chrono::nanoseconds>(ti._end - relTime).count();
name = abi::__cxa_demangle(ti.name, 0, 0, 0);
//Task itself
std::cout << "{\"pid\":" << cpu_id << ",\"tid\":" << cpu_id << ",\"ts\":";
printFloatUS(start-firstTime);
std::cout << ",\"dur\":";
printFloatUS(end-start);
std::cout << ",\"ph\":\"X\",\"name\":\"" << name << "\",\"args\":{\"type\":" << ti.type << "}}," << std::endl;
//reset throughput if there is a gap of more than 1us
if (start - lastEndTime > 1000){
std::cout << "{\"pid\":" << cpu_id << ",\"name\":\"CPU" << cpu_id << "\",\"ph\":\"C\",\"ts\":";
printFloatUS(lastEndTime-firstTime);
std::cout << ",\"args\":{\"TaskThroughput\":";
//Tasks per microsecond is zero
std::cout << 0;
std::cout << "}}," << std::endl;
}
duration = end - start;
//Task Throughput
std::cout << "{\"pid\":" << cpu_id << ",\"name\":\"CPU" << cpu_id << "\",\"ph\":\"C\",\"ts\":";
printFloatUS(start-firstTime);
std::cout << ",\"args\":{\"TaskThroughput\":";
//Tasks per microsecond
throughput = 1000/duration;
std::cout << throughput;
std::cout << "}}," << std::endl;
lastEndTime = end;
}
}
lastEndTime = 0;
}
//sample Task (so we dont need to remove the last comma)
std::cout << "{\"name\":\"sample\",\"ph\":\"P\",\"ts\":0,\"pid\":5,\"tid\":0}";
std::cout << "]}" << std::endl;;
}
//Code for the TaskingProfiler::printTP function
/*
void TaskingProfiler::printTP(std::uint64_t start, std::uint64_t end)
{
std::uint64_t tp[total_cores]{0};
for(std::uint16_t cpu_id = 0; cpu_id < total_cores; cpu_id++)
{
// get the task_info array for the current core
task_info* core_data = task_data[cpu_id];
// get the id counter for the current core
for(std::uint64_t i = 0; i < task_id_counter[cpu_id]; i++)
{
task_info& ti = core_data[i];
const std::uint64_t tstart = std::chrono::duration_cast<std::chrono::nanoseconds>(ti._start - relTime).count();
const std::uint64_t tend = std::chrono::duration_cast<std::chrono::nanoseconds>(ti._end - relTime).count();
if(tstart > start && tend < end) {
tp[cpu_id]++;
}
}
LOG_INFO("TP " << cpu_id << " " << tp[cpu_id]);
}
LOG_INFO("TP " << "total " << std::accumulate(tp, tp + total_cores, 0));
}
*/

View File

@@ -0,0 +1,100 @@
#pragma once
#include <chrono>
#include <atomic>
#include <mx/tasking/config.h>
class TaskingProfiler
{
public:
static TaskingProfiler& getInstance()
{
static TaskingProfiler instance;
return instance;
}
struct task_info
{
std::uint64_t id;
std::uint32_t type;
const char* name;
std::chrono::high_resolution_clock::time_point _start;
std::chrono::high_resolution_clock::time_point _end;
};
struct queue_info
{
std::uint64_t id;
std::chrono::high_resolution_clock::time_point timestamp;
};
private:
TaskingProfiler() {};
std::chrono::time_point<std::chrono::high_resolution_clock> relTime;
std::atomic<std::uint64_t> overhead{0};
std::atomic<std::uint64_t> taskQueueOverhead{0};
static constexpr std::chrono::time_point<std::chrono::high_resolution_clock> tinit = std::chrono::time_point<std::chrono::high_resolution_clock>::max();
TaskingProfiler(const TaskingProfiler& copy) = delete;
TaskingProfiler& operator=(const TaskingProfiler& src) = delete;
// total number of cores
std::uint16_t total_cores;
// profile data inside a multidimensional array
task_info** task_data;
queue_info** queue_data;
// id counters for every core
std::uint64_t* task_id_counter;
std::uint64_t* queue_id_counter;
public:
/**
* @brief
*
* @param corenum
*/
void init(std::uint16_t corenum);
/**
* @brief
*
* @param type
* @return std::uint64_t
*/
std::uint64_t startTask(std::uint16_t cpu_core, std::uint32_t type, const char* name);
/**
* @brief
*
* @param id
*/
void endTask(std::uint16_t cpu_core, std::uint64_t id);
/**
* @brief
*
* @param corenum
*/
void enqueue(std::uint16_t corenum);
/**
* @brief
*
* @param file
*/
void saveProfile();
/**
* @brief
*
* @param start
* @param end
*/
void printTP(std::uint64_t start, std::uint64_t end);
task_info** getTaskData() { return task_data; }
queue_info** getQueueData() { return queue_data; }
std::chrono::time_point<std::chrono::high_resolution_clock> getTinit() { return tinit; }
};

View File

@@ -83,6 +83,8 @@ public:
_resource_builder = std::make_unique<resource::Builder>(*_scheduler, *_resource_allocator); _resource_builder = std::make_unique<resource::Builder>(*_scheduler, *_resource_allocator);
} }
//TaskingProfiler::getInstance().init(core_set.max_core_id());
return true; return true;
} }

View File

@@ -7,6 +7,8 @@
#include <vector> #include <vector>
#include <base/log.h> #include <base/log.h>
#include "profiling/taskin_profiler.h"
using namespace mx::tasking; using namespace mx::tasking;
Scheduler::Scheduler(const mx::util::core_set &core_set, const std::uint16_t prefetch_distance, Scheduler::Scheduler(const mx::util::core_set &core_set, const std::uint16_t prefetch_distance,
@@ -27,10 +29,16 @@ Scheduler::Scheduler(const mx::util::core_set &core_set, const std::uint16_t pre
prefetch_distance, this->_epoch_manager[worker_id], this->_epoch_manager.global_epoch(), prefetch_distance, this->_epoch_manager[worker_id], this->_epoch_manager.global_epoch(),
this->_statistic); this->_statistic);
} }
if constexpr (config::use_tasking_profiler()){
TaskingProfiler::getInstance().init(core_set.max_core_id());
}
} }
Scheduler::~Scheduler() noexcept Scheduler::~Scheduler() noexcept
{ {
if constexpr (config::use_tasking_profiler()){
TaskingProfiler::getInstance().saveProfile();
}
for (auto *worker : this->_worker) for (auto *worker : this->_worker)
{ {
std::uint8_t node_id = worker->channel().numa_node_id(); std::uint8_t node_id = worker->channel().numa_node_id();
@@ -94,6 +102,9 @@ void Scheduler::schedule(TaskInterface &task, const std::uint16_t current_channe
resource_channel_id, current_channel_id)) resource_channel_id, current_channel_id))
{ {
this->_worker[current_channel_id]->channel().push_back_local(&task); this->_worker[current_channel_id]->channel().push_back_local(&task);
if constexpr (config::use_tasking_profiler()){
TaskingProfiler::getInstance().enqueue(current_channel_id);
}
if constexpr (config::task_statistics()) if constexpr (config::task_statistics())
{ {
this->_statistic.increment<profiling::Statistic::ScheduledOnChannel>(current_channel_id); this->_statistic.increment<profiling::Statistic::ScheduledOnChannel>(current_channel_id);
@@ -102,7 +113,10 @@ void Scheduler::schedule(TaskInterface &task, const std::uint16_t current_channe
else else
{ {
this->_worker[resource_channel_id]->channel().push_back_remote(&task, this->_worker[resource_channel_id]->channel().push_back_remote(&task,
this->numa_node_id(current_channel_id)); this->numa_node_id(current_channel_id));
if constexpr (config::use_tasking_profiler()){
TaskingProfiler::getInstance().enqueue(resource_channel_id);
}
if constexpr (config::task_statistics()) if constexpr (config::task_statistics())
{ {
this->_statistic.increment<profiling::Statistic::ScheduledOffChannel>(current_channel_id); this->_statistic.increment<profiling::Statistic::ScheduledOffChannel>(current_channel_id);
@@ -120,6 +134,9 @@ void Scheduler::schedule(TaskInterface &task, const std::uint16_t current_channe
if (target_channel_id == current_channel_id) if (target_channel_id == current_channel_id)
{ {
this->_worker[current_channel_id]->channel().push_back_local(&task); this->_worker[current_channel_id]->channel().push_back_local(&task);
if constexpr (config::use_tasking_profiler()){
TaskingProfiler::getInstance().enqueue(current_channel_id);
}
if constexpr (config::task_statistics()) if constexpr (config::task_statistics())
{ {
this->_statistic.increment<profiling::Statistic::ScheduledOnChannel>(current_channel_id); this->_statistic.increment<profiling::Statistic::ScheduledOnChannel>(current_channel_id);
@@ -128,6 +145,9 @@ void Scheduler::schedule(TaskInterface &task, const std::uint16_t current_channe
else else
{ {
this->_worker[target_channel_id]->channel().push_back_remote(&task, this->numa_node_id(current_channel_id)); this->_worker[target_channel_id]->channel().push_back_remote(&task, this->numa_node_id(current_channel_id));
if constexpr (config::use_tasking_profiler()){
TaskingProfiler::getInstance().enqueue(target_channel_id);
}
if constexpr (config::task_statistics()) if constexpr (config::task_statistics())
{ {
this->_statistic.increment<profiling::Statistic::ScheduledOffChannel>(current_channel_id); this->_statistic.increment<profiling::Statistic::ScheduledOffChannel>(current_channel_id);
@@ -146,6 +166,9 @@ void Scheduler::schedule(TaskInterface &task, const std::uint16_t current_channe
else else
{ {
this->_worker[current_channel_id]->channel().push_back_local(&task); this->_worker[current_channel_id]->channel().push_back_local(&task);
if constexpr (config::use_tasking_profiler()){
TaskingProfiler::getInstance().enqueue(current_channel_id);
}
if constexpr (config::task_statistics()) if constexpr (config::task_statistics())
{ {
this->_statistic.increment<profiling::Statistic::ScheduledOnChannel>(current_channel_id); this->_statistic.increment<profiling::Statistic::ScheduledOnChannel>(current_channel_id);
@@ -164,6 +187,9 @@ void Scheduler::schedule(TaskInterface &task) noexcept
{ {
const auto &annotated_resource = task.annotated_resource(); const auto &annotated_resource = task.annotated_resource();
this->_worker[annotated_resource.channel_id()]->channel().push_back_remote(&task, 0U); this->_worker[annotated_resource.channel_id()]->channel().push_back_remote(&task, 0U);
if constexpr (config::use_tasking_profiler()){
TaskingProfiler::getInstance().enqueue(annotated_resource.channel_id());
}
if constexpr (config::task_statistics()) if constexpr (config::task_statistics())
{ {
this->_statistic.increment<profiling::Statistic::ScheduledOffChannel>(annotated_resource.channel_id()); this->_statistic.increment<profiling::Statistic::ScheduledOffChannel>(annotated_resource.channel_id());
@@ -172,6 +198,9 @@ void Scheduler::schedule(TaskInterface &task) noexcept
else if (task.has_channel_annotated()) else if (task.has_channel_annotated())
{ {
this->_worker[task.annotated_channel()]->channel().push_back_remote(&task, 0U); this->_worker[task.annotated_channel()]->channel().push_back_remote(&task, 0U);
if constexpr (config::use_tasking_profiler()){
TaskingProfiler::getInstance().enqueue(task.annotated_channel());
}
if constexpr (config::task_statistics()) if constexpr (config::task_statistics())
{ {
this->_statistic.increment<profiling::Statistic::ScheduledOffChannel>(task.annotated_channel()); this->_statistic.increment<profiling::Statistic::ScheduledOffChannel>(task.annotated_channel());

View File

@@ -13,6 +13,8 @@
#include <base/log.h> #include <base/log.h>
#include <trace/timestamp.h> #include <trace/timestamp.h>
#include "profiling/tasking_profiler.h"
using namespace mx::tasking; using namespace mx::tasking;
Worker::Worker(const std::uint16_t id, const std::uint16_t target_core_id, const std::uint16_t target_numa_node_id, Worker::Worker(const std::uint16_t id, const std::uint16_t target_core_id, const std::uint16_t target_numa_node_id,
@@ -98,22 +100,48 @@ void Worker::execute()
// Based on the annotated resource and its synchronization // Based on the annotated resource and its synchronization
// primitive, we choose the fitting execution context. // primitive, we choose the fitting execution context.
auto result = TaskResult{}; auto result = TaskResult{};
auto task_id_profiler = 0;
switch (Worker::synchronization_primitive(task)) switch (Worker::synchronization_primitive(task))
{ {
case synchronization::primitive::ScheduleWriter: case synchronization::primitive::ScheduleWriter:
if constexpr (config::use_tasking_profiler()){
task_id_profiler = TaskingProfiler::getInstance().startTask(core_id, 0, typeid(*task).name());
result = this->execute_optimistic(core_id, channel_id, task);
TaskingProfiler::getInstance().endTask(channel_id, task_id_profiler);
}
result = this->execute_optimistic(core_id, channel_id, task); result = this->execute_optimistic(core_id, channel_id, task);
break; break;
case synchronization::primitive::OLFIT: case synchronization::primitive::OLFIT:
if constexpr (config::use_tasking_profiler()){
task_id_profiler = TaskingProfiler::getInstance().startTask(core_id, 0, typeid(*task).name());
result = this->execute_olfit(core_id, channel_id, task);
TaskingProfiler::getInstance().endTask(channel_id, task_id_profiler);
}
result = this->execute_olfit(core_id, channel_id, task); result = this->execute_olfit(core_id, channel_id, task);
break; break;
case synchronization::primitive::ScheduleAll: case synchronization::primitive::ScheduleAll:
case synchronization::primitive::None: case synchronization::primitive::None:
if constexpr (config::use_tasking_profiler()){
task_id_profiler = TaskingProfiler::getInstance().startTask(core_id, 0, typeid(*task).name());
result = task->execute(core_id, channel_id);
TaskingProfiler::getInstance().endTask(channel_id, task_id_profiler);
}
result = task->execute(core_id, channel_id); result = task->execute(core_id, channel_id);
break; break;
case synchronization::primitive::ReaderWriterLatch: case synchronization::primitive::ReaderWriterLatch:
if constexpr (config::use_tasking_profiler()){
task_id_profiler = TaskingProfiler::getInstance().startTask(core_id, 0, typeid(*task).name());
result = Worker::execute_reader_writer_latched(core_id, channel_id, task);
TaskingProfiler::getInstance().endTask(channel_id, task_id_profiler);
}
result = Worker::execute_reader_writer_latched(core_id, channel_id, task); result = Worker::execute_reader_writer_latched(core_id, channel_id, task);
break; break;
case synchronization::primitive::ExclusiveLatch: case synchronization::primitive::ExclusiveLatch:
if constexpr (config::use_tasking_profiler()){
task_id_profiler = TaskingProfiler::getInstance().startTask(core_id, 0, typeid(*task).name());
result = Worker::execute_exclusive_latched(core_id, channel_id, task);
TaskingProfiler::getInstance().endTask(channel_id, task_id_profiler);
}
result = Worker::execute_exclusive_latched(core_id, channel_id, task); result = Worker::execute_exclusive_latched(core_id, channel_id, task);
break; break;
} }