diff --git a/CMakeLists.txt b/CMakeLists.txt index ee0bf0c..19ab355 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -45,6 +45,7 @@ SET(MX_TASKING_SRC src/mx/resource/builder.cpp src/mx/tasking/scheduler.cpp src/mx/tasking/worker.cpp + src/mx/tasking/task.cpp src/mx/tasking/profiling/profiling_task.cpp src/mx/util/core_set.cpp src/mx/util/random.cpp @@ -69,6 +70,7 @@ add_executable(blinktree_benchmark src/application/blinktree_benchmark/main.cpp src/application/blinktree_benchmark/benchmark.cpp ) +target_link_libraries(blinktree_benchmark pthread numa atomic mxtasking mxbenchmarking) add_executable(hashjoin_benchmark src/application/hashjoin_benchmark/main.cpp @@ -77,11 +79,11 @@ add_executable(hashjoin_benchmark src/application/hashjoin_benchmark/tpch_table_reader.cpp src/application/hashjoin_benchmark/notifier.cpp ) - -# Link executables -target_link_libraries(blinktree_benchmark pthread numa atomic mxtasking mxbenchmarking) target_link_libraries(hashjoin_benchmark pthread numa atomic mxtasking mxbenchmarking) +add_executable(hello_world src/application/hello_world/main.cpp) +target_link_libraries(hello_world pthread numa atomic mxtasking mxbenchmarking) + # Add tests if (GTEST) set(TESTS @@ -104,4 +106,4 @@ endif() # Custom targets add_custom_target(ycsb-a ${CMAKE_SOURCE_DIR}/scripts/generate_ycsb a randint) -add_custom_target(ycsb-c ${CMAKE_SOURCE_DIR}/scripts/generate_ycsb c randint) \ No newline at end of file +add_custom_target(ycsb-c ${CMAKE_SOURCE_DIR}/scripts/generate_ycsb c randint) diff --git a/README.md b/README.md index cc5bdee..3900dd8 100644 --- a/README.md +++ b/README.md @@ -28,6 +28,10 @@ For detailed information please see README files in `src/application/` fold * [B Link Tree benchmark](src/application/blinktree_benchmark/README.md) (`src/application/blinktree_benchmark`) * [Hash Join benchmark](src/application/hashjoin_benchmark/README.md) (`src/application/hashjoin_benchmark`) +## Example +We build a small `Hello World!` example, located in `src/application/hello_world`. +You might take a look to see how to use `MxTasking`. + ### Simple example for B Link Tree * Call `make ycsb-a` to generate the default workload * Call `./bin/blinktree_benchmark 1:4` to run benchmark for one to four cores. diff --git a/src/application/blinktree_benchmark/README.md b/src/application/blinktree_benchmark/README.md index 132536d..06f1770 100644 --- a/src/application/blinktree_benchmark/README.md +++ b/src/application/blinktree_benchmark/README.md @@ -50,6 +50,10 @@ Following, the benchmark will be started and print the results for every iterati * After that, the time and throughput are written. * If `--perf` is enabled, the output will be extended by some perf counters, which are labeled (like throughput). +## Plot the results +When using `-o FILE`, the results will be written to the given file, using `JSON` format. +The plot script `scripts/plot_blinktree_benchmark INPUT_FILE [INPUT_FILE ...]` will aggregate and plot the results using one or more of those `JSON` files. + ## Examples ###### Running workload A using optimistic synchronization diff --git a/src/application/hashjoin_benchmark/README.md b/src/application/hashjoin_benchmark/README.md index 8f5dcab..07429d2 100644 --- a/src/application/hashjoin_benchmark/README.md +++ b/src/application/hashjoin_benchmark/README.md @@ -43,3 +43,7 @@ Following, the benchmark will be started and print the results for every iterati * Thirdly, the granularity of how many records per task will be processed. * After that, the time and throughput are written. * If `--perf` is enabled, the output will be extended by some perf counters, which are labeled (like throughput). + +## Plot the results +When using `-o FILE`, the results will be written to the given file, using `JSON` format. +The plot script `scripts/plot_hashjoin_benchmark INPUT_FILE` will aggregate and plot the results using one `JSON` file. diff --git a/src/application/hashjoin_benchmark/benchmark.cpp b/src/application/hashjoin_benchmark/benchmark.cpp index 2b51bd7..f5e30e3 100644 --- a/src/application/hashjoin_benchmark/benchmark.cpp +++ b/src/application/hashjoin_benchmark/benchmark.cpp @@ -186,11 +186,6 @@ std::uint64_t Benchmark::tuples_per_core(const std::uint64_t count_join_keys, co { const auto cache_lines = (count_join_keys * sizeof(std::uint32_t)) / 64U; const auto cache_lines_per_core = cache_lines / count_cores; - auto p = 1U; - while (p < cache_lines_per_core) - { - p += 64U; - } - return p * (64U / sizeof(std::uint32_t)); + return cache_lines_per_core * (64U / sizeof(std::uint32_t)); } diff --git a/src/application/hello_world/main.cpp b/src/application/hello_world/main.cpp new file mode 100644 index 0000000..bf2c79a --- /dev/null +++ b/src/application/hello_world/main.cpp @@ -0,0 +1,41 @@ +#include +#include + +class HelloWorldTask : public mx::tasking::TaskInterface +{ +public: + constexpr HelloWorldTask() = default; + ~HelloWorldTask() override = default; + + mx::tasking::TaskResult execute(const std::uint16_t /*core_id*/, const std::uint16_t /*channel_id*/) override + { + std::cout << "Hello World" << std::endl; + + // Stop MxTasking runtime after this task. + return mx::tasking::TaskResult::make_stop(); + } +}; + +int main() +{ + // Define which cores will be used (1 core here). + const auto cores = mx::util::core_set::build(1); + + { // Scope for the MxTasking runtime. + + // Create a runtime for the given cores. + mx::tasking::runtime_guard _{cores}; + + // Create an instance of the HelloWorldTask with the current core as first + // parameter. The core is required for memory allocation. + auto *hello_world_task = mx::tasking::runtime::new_task(cores.front()); + + // Annotate the task to run on the first core. + hello_world_task->annotate(cores.front()); + + // Schedule the task. + mx::tasking::runtime::spawn(*hello_world_task); + } + + return 0; +} \ No newline at end of file diff --git a/src/mx/memory/config.h b/src/mx/memory/config.h index d6e2208..0be22ff 100644 --- a/src/mx/memory/config.h +++ b/src/mx/memory/config.h @@ -10,12 +10,6 @@ public: */ static constexpr auto max_numa_nodes() { return 2U; } - /** - * Decreases the use of memory of external NUMA regions within the allocator. - * @return True, when memory usage of external NUMA regions should be less. - */ - static constexpr auto low_priority_for_external_numa() { return false; } - /** * @return Interval of each epoch, if memory reclamation is used. */ diff --git a/src/mx/memory/dynamic_size_allocator.cpp b/src/mx/memory/dynamic_size_allocator.cpp index d8e2cee..03ac0ce 100644 --- a/src/mx/memory/dynamic_size_allocator.cpp +++ b/src/mx/memory/dynamic_size_allocator.cpp @@ -14,10 +14,10 @@ AllocationBlock::AllocationBlock(const std::uint32_t id, const std::uint8_t numa } AllocationBlock::AllocationBlock(AllocationBlock &&other) noexcept - : _id(other._id), _numa_node_id(other._numa_node_id), _size(other._size), _allocated_block(other._allocated_block), - _free_elements(std::move(other._free_elements)), _available_size(other._available_size) + : _id(other._id), _numa_node_id(other._numa_node_id), _size(other._size), + _allocated_block(std::exchange(other._allocated_block, nullptr)), _free_elements(std::move(other._free_elements)), + _available_size(other._available_size) { - other._allocated_block = nullptr; } AllocationBlock &AllocationBlock::operator=(AllocationBlock &&other) noexcept diff --git a/src/mx/memory/fixed_size_allocator.h b/src/mx/memory/fixed_size_allocator.h index 967db44..b412bee 100644 --- a/src/mx/memory/fixed_size_allocator.h +++ b/src/mx/memory/fixed_size_allocator.h @@ -55,7 +55,7 @@ public: explicit operator bool() const noexcept { return _memory != nullptr; } private: - void *_memory = nullptr; + void *_memory{nullptr}; }; /** @@ -64,9 +64,11 @@ private: * Internal, the ProcessorHeap bufferes allocated memory * to minimize access to the global heap. */ -class ProcessorHeap +class alignas(64) ProcessorHeap { public: + ProcessorHeap() noexcept = default; + explicit ProcessorHeap(const std::uint8_t numa_node_id) noexcept : _numa_node_id(numa_node_id) { _allocated_chunks.reserve(1024); @@ -82,10 +84,24 @@ public: for (const auto free_chunk : _free_chunk_buffer) { - GlobalHeap::free(static_cast(free_chunk), Chunk::size()); + if (static_cast(free_chunk)) + { + GlobalHeap::free(static_cast(free_chunk), Chunk::size()); + } } } + ProcessorHeap &operator=(ProcessorHeap &&other) noexcept + { + _numa_node_id = std::exchange(other._numa_node_id, std::numeric_limits::max()); + _free_chunk_buffer = other._free_chunk_buffer; + other._free_chunk_buffer.fill(Chunk{}); + _next_free_chunk.store(other._next_free_chunk.load()); + _fill_buffer_flag.store(other._fill_buffer_flag.load()); + _allocated_chunks = std::move(other._allocated_chunks); + return *this; + } + /** * @return ID of the NUMA node the memory is allocated on. */ @@ -129,7 +145,7 @@ private: inline static constexpr auto CHUNKS = 128U; // ID of the NUMA node of this ProcessorHeap. - alignas(64) const std::uint8_t _numa_node_id; + std::uint8_t _numa_node_id{std::numeric_limits::max()}; // Buffer for free chunks. std::array _free_chunk_buffer; @@ -176,10 +192,7 @@ private: template class alignas(64) CoreHeap { public: - explicit CoreHeap(ProcessorHeap *processor_heap) noexcept - : _processor_heap(processor_heap), _numa_node_id(processor_heap->numa_node_id()) - { - } + explicit CoreHeap(ProcessorHeap *processor_heap) noexcept : _processor_heap(processor_heap) { fill_buffer(); } CoreHeap() noexcept = default; @@ -192,26 +205,15 @@ public: * * @return Pointer to the new allocated memory. */ - void *allocate() noexcept + [[nodiscard]] void *allocate() noexcept { if (empty()) { fill_buffer(); } - auto *free_object = _first; - _first = free_object->next(); - - if constexpr (config::low_priority_for_external_numa()) - { - free_object->numa_node_id(_numa_node_id); - - return reinterpret_cast(reinterpret_cast(free_object) + 64U); - } - else - { - return static_cast(free_object); - } + auto *free_element = std::exchange(_first, _first->next()); + return static_cast(free_element); } /** @@ -224,29 +226,9 @@ public: */ void free(void *pointer) noexcept { - if constexpr (config::low_priority_for_external_numa()) - { - const auto address = reinterpret_cast(pointer); - auto *free_object = reinterpret_cast(address - 64U); - - if (free_object->numa_node_id() == _numa_node_id) - { - free_object->next(_first); - _first = free_object; - } - else - { - _last->next(free_object); - free_object->next(nullptr); - _last = free_object; - } - } - else - { - auto *free_object = static_cast(pointer); - free_object->next(_first); - _first = free_object; - } + auto *free_object = static_cast(pointer); + free_object->next(_first); + _first = free_object; } /** @@ -258,7 +240,7 @@ public: auto chunk = _processor_heap->allocate(); const auto chunk_address = static_cast(chunk); - constexpr auto object_size = config::low_priority_for_external_numa() ? S + 64U : S; + constexpr auto object_size = S; constexpr auto count_objects = std::uint64_t{Chunk::size() / object_size}; auto *first_free = reinterpret_cast(chunk_address); @@ -274,21 +256,14 @@ public: last_free->next(nullptr); _first = first_free; - _last = last_free; } private: // Processor heap to allocate new chunks. - ProcessorHeap *_processor_heap = nullptr; - - // ID of the NUMA node the core is placed in. - std::uint8_t _numa_node_id = 0U; + ProcessorHeap *_processor_heap{nullptr}; // First element of the list of free memory objects. - FreeHeader *_first = nullptr; - - // Last element of the list of free memory objects. - FreeHeader *_last = nullptr; + FreeHeader *_first{nullptr}; /** * @return True, when the buffer is empty. @@ -302,41 +277,32 @@ private: template class Allocator final : public TaskAllocatorInterface { public: - explicit Allocator(const util::core_set &core_set) : _core_heaps(core_set.size()) + explicit Allocator(const util::core_set &core_set) { - _processor_heaps.fill(nullptr); - - for (auto i = 0U; i < core_set.size(); ++i) + for (auto node_id = std::uint8_t(0U); node_id < config::max_numa_nodes(); ++node_id) { - const auto core_id = core_set[i]; - const auto node_id = system::topology::node_id(core_id); - if (_processor_heaps[node_id] == nullptr) + if (core_set.has_core_of_numa_node(node_id)) { - _processor_heaps[node_id] = - new (GlobalHeap::allocate_cache_line_aligned(sizeof(ProcessorHeap))) ProcessorHeap(node_id); + _processor_heaps[node_id] = ProcessorHeap{node_id}; } - - auto core_heap = CoreHeap{_processor_heaps[node_id]}; - core_heap.fill_buffer(); - _core_heaps.insert(std::make_pair(core_id, std::move(core_heap))); } - } - ~Allocator() override - { - for (auto *processor_heap : _processor_heaps) + for (const auto core_id : core_set) { - delete processor_heap; + const auto node_id = system::topology::node_id(core_id); + _core_heaps[core_id] = CoreHeap{&_processor_heaps[node_id]}; } } + ~Allocator() override = default; + /** * Allocates memory from the given CoreHeap. * * @param core_id ID of the core. * @return Allocated memory object. */ - void *allocate(const std::uint16_t core_id) override { return _core_heaps[core_id].allocate(); } + [[nodiscard]] void *allocate(const std::uint16_t core_id) override { return _core_heaps[core_id].allocate(); } /** * Frees memory. @@ -348,9 +314,9 @@ public: private: // Heap for every processor socket/NUMA region. - std::array _processor_heaps; + std::array _processor_heaps; // Map from core_id to core-local allocator. - std::unordered_map> _core_heaps; + std::array, tasking::config::max_cores()> _core_heaps; }; } // namespace mx::memory::fixed \ No newline at end of file diff --git a/src/mx/memory/tagged_ptr.h b/src/mx/memory/tagged_ptr.h index f5509f5..bdac79c 100644 --- a/src/mx/memory/tagged_ptr.h +++ b/src/mx/memory/tagged_ptr.h @@ -12,7 +12,7 @@ namespace mx::memory { template class tagged_ptr { public: - constexpr tagged_ptr() noexcept : _object_pointer(0U) + constexpr tagged_ptr() noexcept { static_assert(sizeof(I) == 2U); static_assert(sizeof(tagged_ptr) == 8U); @@ -81,7 +81,7 @@ private: /** * Pointer to the instance of T, only 48bit are used. */ - std::uintptr_t _object_pointer : 48; + std::uintptr_t _object_pointer : 48 {0U}; /** * Information stored within this pointer, remaining 16bit are used. diff --git a/src/mx/memory/task_allocator_interface.h b/src/mx/memory/task_allocator_interface.h index 5190818..e92a910 100644 --- a/src/mx/memory/task_allocator_interface.h +++ b/src/mx/memory/task_allocator_interface.h @@ -19,7 +19,7 @@ public: * @param core_id Core to allocate memory for. * @return Allocated memory. */ - virtual void *allocate(std::uint16_t core_id) = 0; + [[nodiscard]] virtual void *allocate(std::uint16_t core_id) = 0; /** * Frees the memory at the given core. @@ -36,12 +36,12 @@ template class SystemTaskAllocator final : public TaskAllocatorI { public: constexpr SystemTaskAllocator() noexcept = default; - virtual ~SystemTaskAllocator() noexcept = default; + ~SystemTaskAllocator() noexcept override = default; /** * @return Allocated memory using systems malloc (but aligned). */ - void *allocate(const std::uint16_t /*core_id*/) override { return std::aligned_alloc(64U, S); } + [[nodiscard]] void *allocate(const std::uint16_t /*core_id*/) override { return std::aligned_alloc(64U, S); } /** * Frees the given memory using systems free. diff --git a/src/mx/resource/resource.h b/src/mx/resource/resource.h index 3060b99..c95e2a2 100644 --- a/src/mx/resource/resource.h +++ b/src/mx/resource/resource.h @@ -164,7 +164,7 @@ private: class information { public: - constexpr information() noexcept : _channel_id(0U), _synchronization_primitive(0U) {} + constexpr information() noexcept = default; explicit information(const std::uint16_t channel_id, const synchronization::primitive synchronization_primitive) noexcept : _channel_id(channel_id), _synchronization_primitive(static_cast(synchronization_primitive)) @@ -182,8 +182,8 @@ public: information &operator=(const information &other) = default; private: - std::uint16_t _channel_id : 12; - std::uint16_t _synchronization_primitive : 4; + std::uint16_t _channel_id : 12 {0U}; + std::uint16_t _synchronization_primitive : 4 {0U}; } __attribute__((packed)); /** diff --git a/src/mx/tasking/config.h b/src/mx/tasking/config.h index 56b3447..493a2c9 100644 --- a/src/mx/tasking/config.h +++ b/src/mx/tasking/config.h @@ -12,7 +12,7 @@ public: }; // Maximal number of supported cores. - static constexpr auto max_cores() { return 64U; } + static constexpr auto max_cores() { return 128U; } // Maximal size for a single task, will be used for task allocation. static constexpr auto task_size() { return 64U; } @@ -30,4 +30,4 @@ public: // memory is unsafe. static constexpr auto memory_reclamation() { return memory_reclamation_scheme::UpdateEpochPeriodically; } }; -} // namespace mx::tasking \ No newline at end of file +} // namespace mx::tasking diff --git a/src/mx/tasking/runtime.h b/src/mx/tasking/runtime.h index dbbf201..9d2fb1f 100644 --- a/src/mx/tasking/runtime.h +++ b/src/mx/tasking/runtime.h @@ -260,6 +260,11 @@ public: runtime::init(core_set, prefetch_distance, use_system_allocator); } + runtime_guard(const util::core_set &core_set, const std::uint16_t prefetch_distance = 0U) noexcept + : runtime_guard(false, core_set, prefetch_distance) + { + } + ~runtime_guard() noexcept { runtime::start_and_wait(); } }; } // namespace mx::tasking \ No newline at end of file diff --git a/src/mx/tasking/task.cpp b/src/mx/tasking/task.cpp new file mode 100644 index 0000000..1e9779a --- /dev/null +++ b/src/mx/tasking/task.cpp @@ -0,0 +1,18 @@ +#include "task.h" +#include "runtime.h" +#include + +using namespace mx::tasking; + +TaskResult TaskResult::make_stop() noexcept +{ + auto *stop_task = runtime::new_task(system::topology::core_id()); + stop_task->annotate(std::uint16_t{0U}); + return TaskResult::make_succeed_and_remove(stop_task); +} + +TaskResult StopTaskingTask::execute(const std::uint16_t /*core_id*/, const std::uint16_t /*channel_id*/) +{ + runtime::stop(); + return TaskResult::make_remove(); +} \ No newline at end of file diff --git a/src/mx/tasking/task.h b/src/mx/tasking/task.h index 86b4084..919aaef 100644 --- a/src/mx/tasking/task.h +++ b/src/mx/tasking/task.h @@ -16,16 +16,68 @@ enum priority : std::uint8_t }; class TaskInterface; + +/** + * The TaskResult is returned by every task to tell the + * runtime what happens next. Possibilities are run a + * successor task, remove the returning task or stop + * the entire runtime. + */ class TaskResult { public: + /** + * Let the runtime know that the given task + * should be run as a successor of the current + * task. The runtime will schedule that task. + * + * @param successor_task Task to succeed. + * @return A TaskResult that tells the + * runtime to run the given task. + */ static TaskResult make_succeed(TaskInterface *successor_task) noexcept { return TaskResult{successor_task, false}; } + + /** + * Let the runtime know that the given task + * should be removed after (successfully) + * finishing. + * + * @return A TaskResult that tells the + * runtime to remove the returning task. + */ static TaskResult make_remove() noexcept { return TaskResult{nullptr, true}; } + + /** + * Let the runtime know that the given task + * should be run as a successor of the current + * task and the current task should be removed. + * + * @param successor_task Task to succeed. + * @return A TaskResult that tells the runtime + * to run the given task and remove the + * returning task. + */ static TaskResult make_succeed_and_remove(TaskInterface *successor_task) noexcept { return TaskResult{successor_task, true}; } - static TaskResult make_null() noexcept { return TaskResult{nullptr, false}; } + + /** + * Nothing will happen + * + * @return An empty TaskResult. + */ + static TaskResult make_null() noexcept { return {}; } + + /** + * Let the runtime know to stop after + * the returning task. + * + * @return A TaskResult that tells the + * runtime to top. + */ + static TaskResult make_stop() noexcept; + constexpr TaskResult() = default; ~TaskResult() = default; @@ -205,4 +257,13 @@ private: // Tasks annotations. annotation _annotation; }; + +class StopTaskingTask final : public TaskInterface +{ +public: + constexpr StopTaskingTask() noexcept = default; + ~StopTaskingTask() override = default; + + TaskResult execute(std::uint16_t /*core_id*/, std::uint16_t /*channel_id*/) override; +}; } // namespace mx::tasking \ No newline at end of file diff --git a/src/mx/util/core_set.h b/src/mx/util/core_set.h index 9edf650..99ded80 100644 --- a/src/mx/util/core_set.h +++ b/src/mx/util/core_set.h @@ -28,6 +28,13 @@ public: }; constexpr core_set() noexcept : _core_identifier({0U}), _numa_nodes(0U) {} + explicit core_set(std::initializer_list &&core_ids) noexcept : core_set() + { + for (const auto core_id : core_ids) + { + emplace_back(core_id); + } + } ~core_set() noexcept = default; core_set &operator=(const core_set &other) noexcept = default; @@ -43,6 +50,8 @@ public: } std::uint16_t operator[](const std::uint16_t index) const noexcept { return _core_identifier[index]; } + std::uint16_t front() const { return _core_identifier.front(); } + std::uint16_t back() const { return _core_identifier.back(); } explicit operator bool() const noexcept { return _size > 0U; } @@ -101,6 +110,9 @@ public: return _numa_nodes.test(numa_node_id); } + [[nodiscard]] auto begin() const noexcept { return _core_identifier.begin(); } + [[nodiscard]] auto end() const noexcept { return _core_identifier.begin() + _size; } + private: // List of core identifiers. std::array _core_identifier;