diff --git a/CMakeLists.txt b/CMakeLists.txt index 72a1d2a..58d5c29 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -47,6 +47,7 @@ SET(MX_TASKING_SRC src/mx/tasking/worker.cpp src/mx/tasking/task.cpp src/mx/tasking/profiling/profiling_task.cpp + src/mx/tasking/profiling/tasking_profiler.cpp src/mx/util/core_set.cpp src/mx/util/random.cpp src/mx/memory/dynamic_size_allocator.cpp diff --git a/src/mx/tasking/config.h b/src/mx/tasking/config.h index 493a2c9..f32fb4e 100644 --- a/src/mx/tasking/config.h +++ b/src/mx/tasking/config.h @@ -29,5 +29,11 @@ public: // synchronization by epoch-based reclamation. Otherwise, freeing // memory is unsafe. static constexpr auto memory_reclamation() { return memory_reclamation_scheme::UpdateEpochPeriodically; } + + static constexpr auto tasking_array_length() { return 90000; } + + static constexpr auto use_task_queue_length() { return true; } + + static constexpr auto use_tasking_profiler() { return true; } }; } // namespace mx::tasking diff --git a/src/mx/tasking/profiling/tasking_profiler.cpp b/src/mx/tasking/profiling/tasking_profiler.cpp new file mode 100644 index 0000000..a24f0c1 --- /dev/null +++ b/src/mx/tasking/profiling/tasking_profiler.cpp @@ -0,0 +1,365 @@ +#include "tasking_profiler.h" + +#include +#include +#include +#include + + +//Numa +#include +#include +#include + +#include + +constexpr std::chrono::time_point TaskingProfiler::tinit; + +class PrefetchTask : public mx::tasking::TaskInterface +{ +public: + PrefetchTask() {} + ~PrefetchTask() override = default; + + mx::tasking::TaskResult execute(const std::uint16_t core_id, const std::uint16_t /*channel_id*/) override + { + TaskingProfiler::task_info** task_data = TaskingProfiler::getInstance().getTaskData(); + TaskingProfiler::queue_info** queue_data = TaskingProfiler::getInstance().getQueueData(); + std::chrono::time_point tinit = TaskingProfiler::getInstance().getTinit(); + for(size_t j = mx::tasking::config::tasking_array_length(); j > 0; j--) + { + task_data[core_id][j] = {0, 0, NULL, tinit, tinit}; + __builtin_prefetch(&task_data[core_id][j], 1, 3); + } + + for(size_t j = mx::tasking::config::tasking_array_length(); j > 0; j--) + { + queue_data[core_id][j] = {0, tinit}; + __builtin_prefetch(&queue_data[core_id][j], 1, 3); + } + + return mx::tasking::TaskResult::make_remove(); + } +}; + +void printFloatUS(std::uint64_t ns) +{ + std::uint64_t remainder = ns % 1000; + std::uint64_t front = ns / 1000; + char strRemainder[4]; + + for(int i = 2; i >= 0; i--) + { + strRemainder[i] = '0' + (remainder % 10); + remainder /= 10; + } + strRemainder[3] = '\0'; + + std::cout << front << '.' << strRemainder; +} + +void TaskingProfiler::init(std::uint16_t corenum) +{ + Genode::log("Hello from TaskingProfiler::init!"); + relTime = std::chrono::high_resolution_clock::now(); + + corenum++; + this->total_cores = corenum; + uint16_t cpu_numa_node= 0; + + //create an array of pointers to task_info structs + task_data = new task_info*[total_cores]; + + for (std::uint8_t i = 0; i < total_cores; i++) + { + std::uint8_t numa_id = mx::system::topology::node_id(i); + Topology::Numa_region const &node = mx::system::Environment::node(numa_id); + void *cast_evade = static_cast(new Genode::Regional_heap(mx::system::Environment::ram(), mx::system::Environment::rm(), const_cast(node))); + task_data[i] = static_cast(cast_evade); + } + + //create an array of pointers to queue_info structs + queue_data = new queue_info*[total_cores]; + for (std::uint16_t i = 0; i < total_cores; i++) + { + std::uint8_t numa_id = mx::system::topology::node_id(i); + Topology::Numa_region const &node = mx::system::Environment::node(numa_id); + void *cast_evade = static_cast(new Genode::Regional_heap(mx::system::Environment::ram(), mx::system::Environment::rm(), const_cast(node))); + queue_data[i] = static_cast(cast_evade); + } + + //make prefetch tasks for each cpu + for(int i = 0; i < corenum; i++){ + auto *prefetchTask = mx::tasking::runtime::new_task(i); + //prefetchTask->annotate(i); + mx::tasking::runtime::spawn(*prefetchTask); + } + + task_id_counter = new std::uint64_t[total_cores]{0}; + queue_id_counter = new std::uint64_t[total_cores]{0}; +} + +std::uint64_t TaskingProfiler::startTask(std::uint16_t cpu_core, std::uint32_t type, const char* name) +{ + std::chrono::time_point start = std::chrono::high_resolution_clock::now(); + const std::uint16_t cpu_id = cpu_core; + const std::uint64_t tid = task_id_counter[cpu_id]++; + task_info& ti = task_data[cpu_id][tid]; + + ti.id = tid; + ti.type = type; + ti.name = name; + ti._start = start; + + std::chrono::time_point end = std::chrono::high_resolution_clock::now(); + overhead += std::chrono::duration_cast(end - start).count(); + + return ti.id; +} + +void TaskingProfiler::endTask(std::uint16_t cpu_core, std::uint64_t id) +{ + std::chrono::time_point start = std::chrono::high_resolution_clock::now(); + + task_info& ti = task_data[cpu_core][id]; + ti._end = std::chrono::high_resolution_clock::now(); + + std::chrono::time_point end = std::chrono::high_resolution_clock::now(); + overhead += std::chrono::duration_cast(end - start).count(); + + TaskingProfiler::getInstance().saveProfile(); +} + +void TaskingProfiler::enqueue(std::uint16_t corenum){ + std::chrono::time_point timestamp = std::chrono::high_resolution_clock::now(); + + const std::uint64_t qid = __atomic_add_fetch(&queue_id_counter[corenum], 1, __ATOMIC_SEQ_CST); + + queue_info& qi = queue_data[corenum][qid]; + qi.id = qid; + qi.timestamp = timestamp; + + std::chrono::time_point endTimestamp = std::chrono::high_resolution_clock::now(); + taskQueueOverhead += std::chrono::duration_cast(endTimestamp - timestamp).count(); +} + +void TaskingProfiler::saveProfile() +{ + Genode::log("Saving Profile"); + std::uint64_t overhead_ms = overhead/1000000; + std::cout << "Overhead-Time: " << overhead << "ns in ms: " << overhead_ms << "ms" << std::endl; + std::uint64_t taskQueueOverhead_ms = taskQueueOverhead/1000000; + std::cout << "TaskQueueOverhead-Time: " << taskQueueOverhead << "ns in ms: " << taskQueueOverhead_ms << "ms" << std::endl; + + //get the number of tasks overal + std::uint64_t tasknum = 0; + for (std::uint16_t i = 0; i < total_cores; i++) + { + tasknum += task_id_counter[i]; + } + std::cout << "Number of tasks: " << tasknum << std::endl; + std::cout << "Overhead-Time per Task: " << overhead/tasknum << "ns" << std::endl; + + bool first = false; + std::uint64_t firstTime = 0; + std::uint64_t throughput = 0; + std::uint64_t duration = 0; + std::uint64_t lastEndTime = 0; + std::uint64_t taskQueueLength; + + std::uint64_t timestamp = 0; + std::uint64_t start = 0; + std::uint64_t end = 0; + char* name; + + std::uint64_t taskCounter = 0; + std::uint64_t queueCounter = 0; + + std::cout << "--Save--" << std::endl; + std::cout << "{\"traceEvents\":[" << std::endl; + + //Events + for(std::uint16_t cpu_id = 0; cpu_id < total_cores; cpu_id++) + { + //Metadata Events for each core (CPU instead of process as name,...) + std::cout << "{\"name\":\"process_name\",\"ph\":\"M\",\"pid\":" << cpu_id << ",\"tid\":" << cpu_id << ",\"args\":{\"name\":\"CPU\"}}," << std::endl; + std::cout << "{\"name\":\"process_sort_index\",\"ph\":\"M\",\"pid\":" << cpu_id << ",\"tid\":" << cpu_id << ",\"args\":{\"name\":" << cpu_id << "}}," << std::endl; + + + + if (mx::tasking::config::use_task_queue_length()){ + taskQueueLength = 0; + taskCounter = 0; + queueCounter = 1; + + //Initial taskQueueLength is zero + std::cout << "{\"pid\":" << cpu_id << ",\"name\":\"CPU" << cpu_id << "\",\"ph\":\"C\",\"ts\":"; + printFloatUS(0); + std::cout << ",\"args\":{\"TaskQueueLength\":" << taskQueueLength << "}}," << std::endl; + + //go through all tasks and queues + while(taskCounter(qi.timestamp - relTime).count(); + } + + //get the time's from the task element if existing + if(taskCounter < task_id_counter[cpu_id]){ + start = std::chrono::duration_cast(ti._start - relTime).count(); + end = std::chrono::duration_cast(ti._end - relTime).count(); + name = abi::__cxa_demangle(ti.name, 0, 0, 0); + } + + //get the first time + if(!first){ + first = true; + if(timestamp < start){ + firstTime = timestamp; + } + else{ + firstTime = start; + } + } + //if the queue element is before the task element, it is an enqueue + if(qi.timestamp < ti._start && queueCounter <= queue_id_counter[cpu_id]){ + queueCounter++; + taskQueueLength++; + std::cout << "{\"pid\":" << cpu_id << ",\"name\":\"CPU" << cpu_id << "\",\"ph\":\"C\",\"ts\":"; + if(timestamp - firstTime == 0){ + printFloatUS(10); + } + else{ + printFloatUS(timestamp-firstTime); + } + std::cout << ",\"args\":{\"TaskQueueLength\":" << taskQueueLength << "}}," << std::endl; + + } + //else we print the task itself and a dequeue + else{ + taskCounter++; + taskQueueLength--; + + //taskQueueLength + std::cout << "{\"pid\":" << cpu_id << ",\"name\":\"CPU" << cpu_id << "\",\"ph\":\"C\",\"ts\":"; + printFloatUS(start-firstTime); + std::cout << ",\"args\":{\"TaskQueueLength\":" << taskQueueLength << "}}," << std::endl; + + //if the endtime of the last task is too large (cannot display right) + if(taskCounter == task_id_counter[cpu_id] && ti._end == tinit){ + end = start + 1000; + } + //Task itself + std::cout << "{\"pid\":" << cpu_id << ",\"tid\":" << cpu_id << ",\"ts\":"; + printFloatUS(start-firstTime); + std::cout << ",\"dur\":"; + printFloatUS(end-start); + std::cout << ",\"ph\":\"X\",\"name\":\"" << name << "\",\"args\":{\"type\":" << ti.type << "}}," << std::endl; + + //reset throughput if there is a gap of more than 1us + if (start - lastEndTime > 1000 && lastEndTime != 0){ + std::cout << "{\"pid\":" << cpu_id << ",\"name\":\"CPU" << cpu_id << "\",\"ph\":\"C\",\"ts\":"; + printFloatUS(lastEndTime - firstTime); + std::cout << ",\"args\":{\"TaskThroughput\":"; + //Tasks per microsecond is zero + std::cout << 0; + std::cout << "}}," << std::endl; + } + duration = end - start; + + //Task Throughput + std::cout << "{\"pid\":" << cpu_id << ",\"name\":\"CPU" << cpu_id << "\",\"ph\":\"C\",\"ts\":"; + printFloatUS(start-firstTime); + std::cout << ",\"args\":{\"TaskThroughput\":"; + //Tasks per microsecond + throughput = 1000/duration; + std::cout << throughput; + std::cout << "}}," << std::endl; + lastEndTime = end; + } + } + } + else{ + for(std::uint32_t i = 0; i < task_id_counter[cpu_id]; i++){ + task_info& ti = task_data[cpu_id][i]; + + // check if task is valid + if(ti._end == tinit) + { + continue; + } + start = std::chrono::duration_cast(ti._start - relTime).count(); + end = std::chrono::duration_cast(ti._end - relTime).count(); + name = abi::__cxa_demangle(ti.name, 0, 0, 0); + + //Task itself + std::cout << "{\"pid\":" << cpu_id << ",\"tid\":" << cpu_id << ",\"ts\":"; + printFloatUS(start-firstTime); + std::cout << ",\"dur\":"; + printFloatUS(end-start); + std::cout << ",\"ph\":\"X\",\"name\":\"" << name << "\",\"args\":{\"type\":" << ti.type << "}}," << std::endl; + + //reset throughput if there is a gap of more than 1us + if (start - lastEndTime > 1000){ + std::cout << "{\"pid\":" << cpu_id << ",\"name\":\"CPU" << cpu_id << "\",\"ph\":\"C\",\"ts\":"; + printFloatUS(lastEndTime-firstTime); + std::cout << ",\"args\":{\"TaskThroughput\":"; + //Tasks per microsecond is zero + std::cout << 0; + std::cout << "}}," << std::endl; + } + duration = end - start; + + //Task Throughput + std::cout << "{\"pid\":" << cpu_id << ",\"name\":\"CPU" << cpu_id << "\",\"ph\":\"C\",\"ts\":"; + printFloatUS(start-firstTime); + std::cout << ",\"args\":{\"TaskThroughput\":"; + + //Tasks per microsecond + throughput = 1000/duration; + + std::cout << throughput; + std::cout << "}}," << std::endl; + lastEndTime = end; + } + } + lastEndTime = 0; + } + //sample Task (so we dont need to remove the last comma) + std::cout << "{\"name\":\"sample\",\"ph\":\"P\",\"ts\":0,\"pid\":5,\"tid\":0}"; + std::cout << "]}" << std::endl;; +} + + + +//Code for the TaskingProfiler::printTP function +/* +void TaskingProfiler::printTP(std::uint64_t start, std::uint64_t end) +{ + std::uint64_t tp[total_cores]{0}; + + for(std::uint16_t cpu_id = 0; cpu_id < total_cores; cpu_id++) + { + // get the task_info array for the current core + task_info* core_data = task_data[cpu_id]; + + // get the id counter for the current core + for(std::uint64_t i = 0; i < task_id_counter[cpu_id]; i++) + { + task_info& ti = core_data[i]; + const std::uint64_t tstart = std::chrono::duration_cast(ti._start - relTime).count(); + const std::uint64_t tend = std::chrono::duration_cast(ti._end - relTime).count(); + if(tstart > start && tend < end) { + tp[cpu_id]++; + } + } + + LOG_INFO("TP " << cpu_id << " " << tp[cpu_id]); + } + LOG_INFO("TP " << "total " << std::accumulate(tp, tp + total_cores, 0)); +} +*/ diff --git a/src/mx/tasking/profiling/tasking_profiler.h b/src/mx/tasking/profiling/tasking_profiler.h new file mode 100644 index 0000000..8ffe6ef --- /dev/null +++ b/src/mx/tasking/profiling/tasking_profiler.h @@ -0,0 +1,100 @@ +#pragma once +#include +#include +#include + + +class TaskingProfiler +{ +public: + static TaskingProfiler& getInstance() + { + static TaskingProfiler instance; + return instance; + } + + struct task_info + { + std::uint64_t id; + std::uint32_t type; + const char* name; + std::chrono::high_resolution_clock::time_point _start; + std::chrono::high_resolution_clock::time_point _end; + }; + + struct queue_info + { + std::uint64_t id; + std::chrono::high_resolution_clock::time_point timestamp; + }; + +private: + TaskingProfiler() {}; + std::chrono::time_point relTime; + std::atomic overhead{0}; + std::atomic taskQueueOverhead{0}; + static constexpr std::chrono::time_point tinit = std::chrono::time_point::max(); + + TaskingProfiler(const TaskingProfiler& copy) = delete; + TaskingProfiler& operator=(const TaskingProfiler& src) = delete; + + // total number of cores + std::uint16_t total_cores; + + // profile data inside a multidimensional array + task_info** task_data; + queue_info** queue_data; + + // id counters for every core + std::uint64_t* task_id_counter; + std::uint64_t* queue_id_counter; + +public: + /** + * @brief + * + * @param corenum + */ + void init(std::uint16_t corenum); + + /** + * @brief + * + * @param type + * @return std::uint64_t + */ + std::uint64_t startTask(std::uint16_t cpu_core, std::uint32_t type, const char* name); + + /** + * @brief + * + * @param id + */ + void endTask(std::uint16_t cpu_core, std::uint64_t id); + + /** + * @brief + * + * @param corenum + */ + void enqueue(std::uint16_t corenum); + + /** + * @brief + * + * @param file + */ + void saveProfile(); + + /** + * @brief + * + * @param start + * @param end + */ + void printTP(std::uint64_t start, std::uint64_t end); + + task_info** getTaskData() { return task_data; } + queue_info** getQueueData() { return queue_data; } + std::chrono::time_point getTinit() { return tinit; } +}; \ No newline at end of file diff --git a/src/mx/tasking/runtime.h b/src/mx/tasking/runtime.h index 9d2fb1f..d66dbd9 100644 --- a/src/mx/tasking/runtime.h +++ b/src/mx/tasking/runtime.h @@ -83,6 +83,8 @@ public: _resource_builder = std::make_unique(*_scheduler, *_resource_allocator); } + //TaskingProfiler::getInstance().init(core_set.max_core_id()); + return true; } diff --git a/src/mx/tasking/scheduler.cpp b/src/mx/tasking/scheduler.cpp index a156cef..f8c48fd 100644 --- a/src/mx/tasking/scheduler.cpp +++ b/src/mx/tasking/scheduler.cpp @@ -7,6 +7,8 @@ #include #include +#include "profiling/taskin_profiler.h" + using namespace mx::tasking; Scheduler::Scheduler(const mx::util::core_set &core_set, const std::uint16_t prefetch_distance, @@ -27,10 +29,16 @@ Scheduler::Scheduler(const mx::util::core_set &core_set, const std::uint16_t pre prefetch_distance, this->_epoch_manager[worker_id], this->_epoch_manager.global_epoch(), this->_statistic); } + if constexpr (config::use_tasking_profiler()){ + TaskingProfiler::getInstance().init(core_set.max_core_id()); + } } Scheduler::~Scheduler() noexcept { + if constexpr (config::use_tasking_profiler()){ + TaskingProfiler::getInstance().saveProfile(); + } for (auto *worker : this->_worker) { std::uint8_t node_id = worker->channel().numa_node_id(); @@ -94,6 +102,9 @@ void Scheduler::schedule(TaskInterface &task, const std::uint16_t current_channe resource_channel_id, current_channel_id)) { this->_worker[current_channel_id]->channel().push_back_local(&task); + if constexpr (config::use_tasking_profiler()){ + TaskingProfiler::getInstance().enqueue(current_channel_id); + } if constexpr (config::task_statistics()) { this->_statistic.increment(current_channel_id); @@ -102,7 +113,10 @@ void Scheduler::schedule(TaskInterface &task, const std::uint16_t current_channe else { this->_worker[resource_channel_id]->channel().push_back_remote(&task, - this->numa_node_id(current_channel_id)); + this->numa_node_id(current_channel_id)); + if constexpr (config::use_tasking_profiler()){ + TaskingProfiler::getInstance().enqueue(resource_channel_id); + } if constexpr (config::task_statistics()) { this->_statistic.increment(current_channel_id); @@ -120,6 +134,9 @@ void Scheduler::schedule(TaskInterface &task, const std::uint16_t current_channe if (target_channel_id == current_channel_id) { this->_worker[current_channel_id]->channel().push_back_local(&task); + if constexpr (config::use_tasking_profiler()){ + TaskingProfiler::getInstance().enqueue(current_channel_id); + } if constexpr (config::task_statistics()) { this->_statistic.increment(current_channel_id); @@ -128,6 +145,9 @@ void Scheduler::schedule(TaskInterface &task, const std::uint16_t current_channe else { this->_worker[target_channel_id]->channel().push_back_remote(&task, this->numa_node_id(current_channel_id)); + if constexpr (config::use_tasking_profiler()){ + TaskingProfiler::getInstance().enqueue(target_channel_id); + } if constexpr (config::task_statistics()) { this->_statistic.increment(current_channel_id); @@ -146,6 +166,9 @@ void Scheduler::schedule(TaskInterface &task, const std::uint16_t current_channe else { this->_worker[current_channel_id]->channel().push_back_local(&task); + if constexpr (config::use_tasking_profiler()){ + TaskingProfiler::getInstance().enqueue(current_channel_id); + } if constexpr (config::task_statistics()) { this->_statistic.increment(current_channel_id); @@ -164,6 +187,9 @@ void Scheduler::schedule(TaskInterface &task) noexcept { const auto &annotated_resource = task.annotated_resource(); this->_worker[annotated_resource.channel_id()]->channel().push_back_remote(&task, 0U); + if constexpr (config::use_tasking_profiler()){ + TaskingProfiler::getInstance().enqueue(annotated_resource.channel_id()); + } if constexpr (config::task_statistics()) { this->_statistic.increment(annotated_resource.channel_id()); @@ -172,6 +198,9 @@ void Scheduler::schedule(TaskInterface &task) noexcept else if (task.has_channel_annotated()) { this->_worker[task.annotated_channel()]->channel().push_back_remote(&task, 0U); + if constexpr (config::use_tasking_profiler()){ + TaskingProfiler::getInstance().enqueue(task.annotated_channel()); + } if constexpr (config::task_statistics()) { this->_statistic.increment(task.annotated_channel()); diff --git a/src/mx/tasking/worker.cpp b/src/mx/tasking/worker.cpp index a851e8c..6223598 100644 --- a/src/mx/tasking/worker.cpp +++ b/src/mx/tasking/worker.cpp @@ -13,6 +13,8 @@ #include #include +#include "profiling/tasking_profiler.h" + using namespace mx::tasking; Worker::Worker(const std::uint16_t id, const std::uint16_t target_core_id, const std::uint16_t target_numa_node_id, @@ -98,22 +100,48 @@ void Worker::execute() // Based on the annotated resource and its synchronization // primitive, we choose the fitting execution context. auto result = TaskResult{}; + auto task_id_profiler = 0; switch (Worker::synchronization_primitive(task)) { case synchronization::primitive::ScheduleWriter: + if constexpr (config::use_tasking_profiler()){ + task_id_profiler = TaskingProfiler::getInstance().startTask(core_id, 0, typeid(*task).name()); + result = this->execute_optimistic(core_id, channel_id, task); + TaskingProfiler::getInstance().endTask(channel_id, task_id_profiler); + } result = this->execute_optimistic(core_id, channel_id, task); break; case synchronization::primitive::OLFIT: + if constexpr (config::use_tasking_profiler()){ + task_id_profiler = TaskingProfiler::getInstance().startTask(core_id, 0, typeid(*task).name()); + result = this->execute_olfit(core_id, channel_id, task); + TaskingProfiler::getInstance().endTask(channel_id, task_id_profiler); + } result = this->execute_olfit(core_id, channel_id, task); break; case synchronization::primitive::ScheduleAll: case synchronization::primitive::None: + if constexpr (config::use_tasking_profiler()){ + task_id_profiler = TaskingProfiler::getInstance().startTask(core_id, 0, typeid(*task).name()); + result = task->execute(core_id, channel_id); + TaskingProfiler::getInstance().endTask(channel_id, task_id_profiler); + } result = task->execute(core_id, channel_id); break; case synchronization::primitive::ReaderWriterLatch: + if constexpr (config::use_tasking_profiler()){ + task_id_profiler = TaskingProfiler::getInstance().startTask(core_id, 0, typeid(*task).name()); + result = Worker::execute_reader_writer_latched(core_id, channel_id, task); + TaskingProfiler::getInstance().endTask(channel_id, task_id_profiler); + } result = Worker::execute_reader_writer_latched(core_id, channel_id, task); break; case synchronization::primitive::ExclusiveLatch: + if constexpr (config::use_tasking_profiler()){ + task_id_profiler = TaskingProfiler::getInstance().startTask(core_id, 0, typeid(*task).name()); + result = Worker::execute_exclusive_latched(core_id, channel_id, task); + TaskingProfiler::getInstance().endTask(channel_id, task_id_profiler); + } result = Worker::execute_exclusive_latched(core_id, channel_id, task); break; }