mirror of
https://github.com/mmueller41/mxtasking.git
synced 2026-01-21 12:42:57 +01:00
Small changes to eliminate warnings, added Hello World example, bugfix in HashJoin benchmark.
This commit is contained in:
@@ -10,12 +10,6 @@ public:
|
||||
*/
|
||||
static constexpr auto max_numa_nodes() { return 2U; }
|
||||
|
||||
/**
|
||||
* Decreases the use of memory of external NUMA regions within the allocator.
|
||||
* @return True, when memory usage of external NUMA regions should be less.
|
||||
*/
|
||||
static constexpr auto low_priority_for_external_numa() { return false; }
|
||||
|
||||
/**
|
||||
* @return Interval of each epoch, if memory reclamation is used.
|
||||
*/
|
||||
|
||||
@@ -14,10 +14,10 @@ AllocationBlock::AllocationBlock(const std::uint32_t id, const std::uint8_t numa
|
||||
}
|
||||
|
||||
AllocationBlock::AllocationBlock(AllocationBlock &&other) noexcept
|
||||
: _id(other._id), _numa_node_id(other._numa_node_id), _size(other._size), _allocated_block(other._allocated_block),
|
||||
_free_elements(std::move(other._free_elements)), _available_size(other._available_size)
|
||||
: _id(other._id), _numa_node_id(other._numa_node_id), _size(other._size),
|
||||
_allocated_block(std::exchange(other._allocated_block, nullptr)), _free_elements(std::move(other._free_elements)),
|
||||
_available_size(other._available_size)
|
||||
{
|
||||
other._allocated_block = nullptr;
|
||||
}
|
||||
|
||||
AllocationBlock &AllocationBlock::operator=(AllocationBlock &&other) noexcept
|
||||
|
||||
@@ -55,7 +55,7 @@ public:
|
||||
explicit operator bool() const noexcept { return _memory != nullptr; }
|
||||
|
||||
private:
|
||||
void *_memory = nullptr;
|
||||
void *_memory{nullptr};
|
||||
};
|
||||
|
||||
/**
|
||||
@@ -64,9 +64,11 @@ private:
|
||||
* Internal, the ProcessorHeap bufferes allocated memory
|
||||
* to minimize access to the global heap.
|
||||
*/
|
||||
class ProcessorHeap
|
||||
class alignas(64) ProcessorHeap
|
||||
{
|
||||
public:
|
||||
ProcessorHeap() noexcept = default;
|
||||
|
||||
explicit ProcessorHeap(const std::uint8_t numa_node_id) noexcept : _numa_node_id(numa_node_id)
|
||||
{
|
||||
_allocated_chunks.reserve(1024);
|
||||
@@ -82,10 +84,24 @@ public:
|
||||
|
||||
for (const auto free_chunk : _free_chunk_buffer)
|
||||
{
|
||||
GlobalHeap::free(static_cast<void *>(free_chunk), Chunk::size());
|
||||
if (static_cast<bool>(free_chunk))
|
||||
{
|
||||
GlobalHeap::free(static_cast<void *>(free_chunk), Chunk::size());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
ProcessorHeap &operator=(ProcessorHeap &&other) noexcept
|
||||
{
|
||||
_numa_node_id = std::exchange(other._numa_node_id, std::numeric_limits<std::uint8_t>::max());
|
||||
_free_chunk_buffer = other._free_chunk_buffer;
|
||||
other._free_chunk_buffer.fill(Chunk{});
|
||||
_next_free_chunk.store(other._next_free_chunk.load());
|
||||
_fill_buffer_flag.store(other._fill_buffer_flag.load());
|
||||
_allocated_chunks = std::move(other._allocated_chunks);
|
||||
return *this;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return ID of the NUMA node the memory is allocated on.
|
||||
*/
|
||||
@@ -129,7 +145,7 @@ private:
|
||||
inline static constexpr auto CHUNKS = 128U;
|
||||
|
||||
// ID of the NUMA node of this ProcessorHeap.
|
||||
alignas(64) const std::uint8_t _numa_node_id;
|
||||
std::uint8_t _numa_node_id{std::numeric_limits<std::uint8_t>::max()};
|
||||
|
||||
// Buffer for free chunks.
|
||||
std::array<Chunk, CHUNKS> _free_chunk_buffer;
|
||||
@@ -176,10 +192,7 @@ private:
|
||||
template <std::size_t S> class alignas(64) CoreHeap
|
||||
{
|
||||
public:
|
||||
explicit CoreHeap(ProcessorHeap *processor_heap) noexcept
|
||||
: _processor_heap(processor_heap), _numa_node_id(processor_heap->numa_node_id())
|
||||
{
|
||||
}
|
||||
explicit CoreHeap(ProcessorHeap *processor_heap) noexcept : _processor_heap(processor_heap) { fill_buffer(); }
|
||||
|
||||
CoreHeap() noexcept = default;
|
||||
|
||||
@@ -192,26 +205,15 @@ public:
|
||||
*
|
||||
* @return Pointer to the new allocated memory.
|
||||
*/
|
||||
void *allocate() noexcept
|
||||
[[nodiscard]] void *allocate() noexcept
|
||||
{
|
||||
if (empty())
|
||||
{
|
||||
fill_buffer();
|
||||
}
|
||||
|
||||
auto *free_object = _first;
|
||||
_first = free_object->next();
|
||||
|
||||
if constexpr (config::low_priority_for_external_numa())
|
||||
{
|
||||
free_object->numa_node_id(_numa_node_id);
|
||||
|
||||
return reinterpret_cast<void *>(reinterpret_cast<std::uintptr_t>(free_object) + 64U);
|
||||
}
|
||||
else
|
||||
{
|
||||
return static_cast<void *>(free_object);
|
||||
}
|
||||
auto *free_element = std::exchange(_first, _first->next());
|
||||
return static_cast<void *>(free_element);
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -224,29 +226,9 @@ public:
|
||||
*/
|
||||
void free(void *pointer) noexcept
|
||||
{
|
||||
if constexpr (config::low_priority_for_external_numa())
|
||||
{
|
||||
const auto address = reinterpret_cast<std::uintptr_t>(pointer);
|
||||
auto *free_object = reinterpret_cast<FreeHeader *>(address - 64U);
|
||||
|
||||
if (free_object->numa_node_id() == _numa_node_id)
|
||||
{
|
||||
free_object->next(_first);
|
||||
_first = free_object;
|
||||
}
|
||||
else
|
||||
{
|
||||
_last->next(free_object);
|
||||
free_object->next(nullptr);
|
||||
_last = free_object;
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
auto *free_object = static_cast<FreeHeader *>(pointer);
|
||||
free_object->next(_first);
|
||||
_first = free_object;
|
||||
}
|
||||
auto *free_object = static_cast<FreeHeader *>(pointer);
|
||||
free_object->next(_first);
|
||||
_first = free_object;
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -258,7 +240,7 @@ public:
|
||||
auto chunk = _processor_heap->allocate();
|
||||
const auto chunk_address = static_cast<std::uintptr_t>(chunk);
|
||||
|
||||
constexpr auto object_size = config::low_priority_for_external_numa() ? S + 64U : S;
|
||||
constexpr auto object_size = S;
|
||||
constexpr auto count_objects = std::uint64_t{Chunk::size() / object_size};
|
||||
|
||||
auto *first_free = reinterpret_cast<FreeHeader *>(chunk_address);
|
||||
@@ -274,21 +256,14 @@ public:
|
||||
|
||||
last_free->next(nullptr);
|
||||
_first = first_free;
|
||||
_last = last_free;
|
||||
}
|
||||
|
||||
private:
|
||||
// Processor heap to allocate new chunks.
|
||||
ProcessorHeap *_processor_heap = nullptr;
|
||||
|
||||
// ID of the NUMA node the core is placed in.
|
||||
std::uint8_t _numa_node_id = 0U;
|
||||
ProcessorHeap *_processor_heap{nullptr};
|
||||
|
||||
// First element of the list of free memory objects.
|
||||
FreeHeader *_first = nullptr;
|
||||
|
||||
// Last element of the list of free memory objects.
|
||||
FreeHeader *_last = nullptr;
|
||||
FreeHeader *_first{nullptr};
|
||||
|
||||
/**
|
||||
* @return True, when the buffer is empty.
|
||||
@@ -302,41 +277,32 @@ private:
|
||||
template <std::size_t S> class Allocator final : public TaskAllocatorInterface
|
||||
{
|
||||
public:
|
||||
explicit Allocator(const util::core_set &core_set) : _core_heaps(core_set.size())
|
||||
explicit Allocator(const util::core_set &core_set)
|
||||
{
|
||||
_processor_heaps.fill(nullptr);
|
||||
|
||||
for (auto i = 0U; i < core_set.size(); ++i)
|
||||
for (auto node_id = std::uint8_t(0U); node_id < config::max_numa_nodes(); ++node_id)
|
||||
{
|
||||
const auto core_id = core_set[i];
|
||||
const auto node_id = system::topology::node_id(core_id);
|
||||
if (_processor_heaps[node_id] == nullptr)
|
||||
if (core_set.has_core_of_numa_node(node_id))
|
||||
{
|
||||
_processor_heaps[node_id] =
|
||||
new (GlobalHeap::allocate_cache_line_aligned(sizeof(ProcessorHeap))) ProcessorHeap(node_id);
|
||||
_processor_heaps[node_id] = ProcessorHeap{node_id};
|
||||
}
|
||||
|
||||
auto core_heap = CoreHeap<S>{_processor_heaps[node_id]};
|
||||
core_heap.fill_buffer();
|
||||
_core_heaps.insert(std::make_pair(core_id, std::move(core_heap)));
|
||||
}
|
||||
}
|
||||
|
||||
~Allocator() override
|
||||
{
|
||||
for (auto *processor_heap : _processor_heaps)
|
||||
for (const auto core_id : core_set)
|
||||
{
|
||||
delete processor_heap;
|
||||
const auto node_id = system::topology::node_id(core_id);
|
||||
_core_heaps[core_id] = CoreHeap<S>{&_processor_heaps[node_id]};
|
||||
}
|
||||
}
|
||||
|
||||
~Allocator() override = default;
|
||||
|
||||
/**
|
||||
* Allocates memory from the given CoreHeap.
|
||||
*
|
||||
* @param core_id ID of the core.
|
||||
* @return Allocated memory object.
|
||||
*/
|
||||
void *allocate(const std::uint16_t core_id) override { return _core_heaps[core_id].allocate(); }
|
||||
[[nodiscard]] void *allocate(const std::uint16_t core_id) override { return _core_heaps[core_id].allocate(); }
|
||||
|
||||
/**
|
||||
* Frees memory.
|
||||
@@ -348,9 +314,9 @@ public:
|
||||
|
||||
private:
|
||||
// Heap for every processor socket/NUMA region.
|
||||
std::array<ProcessorHeap *, config::max_numa_nodes()> _processor_heaps;
|
||||
std::array<ProcessorHeap, config::max_numa_nodes()> _processor_heaps;
|
||||
|
||||
// Map from core_id to core-local allocator.
|
||||
std::unordered_map<std::uint16_t, CoreHeap<S>> _core_heaps;
|
||||
std::array<CoreHeap<S>, tasking::config::max_cores()> _core_heaps;
|
||||
};
|
||||
} // namespace mx::memory::fixed
|
||||
@@ -12,7 +12,7 @@ namespace mx::memory {
|
||||
template <class T, typename I> class tagged_ptr
|
||||
{
|
||||
public:
|
||||
constexpr tagged_ptr() noexcept : _object_pointer(0U)
|
||||
constexpr tagged_ptr() noexcept
|
||||
{
|
||||
static_assert(sizeof(I) == 2U);
|
||||
static_assert(sizeof(tagged_ptr) == 8U);
|
||||
@@ -81,7 +81,7 @@ private:
|
||||
/**
|
||||
* Pointer to the instance of T, only 48bit are used.
|
||||
*/
|
||||
std::uintptr_t _object_pointer : 48;
|
||||
std::uintptr_t _object_pointer : 48 {0U};
|
||||
|
||||
/**
|
||||
* Information stored within this pointer, remaining 16bit are used.
|
||||
|
||||
@@ -19,7 +19,7 @@ public:
|
||||
* @param core_id Core to allocate memory for.
|
||||
* @return Allocated memory.
|
||||
*/
|
||||
virtual void *allocate(std::uint16_t core_id) = 0;
|
||||
[[nodiscard]] virtual void *allocate(std::uint16_t core_id) = 0;
|
||||
|
||||
/**
|
||||
* Frees the memory at the given core.
|
||||
@@ -36,12 +36,12 @@ template <std::size_t S> class SystemTaskAllocator final : public TaskAllocatorI
|
||||
{
|
||||
public:
|
||||
constexpr SystemTaskAllocator() noexcept = default;
|
||||
virtual ~SystemTaskAllocator() noexcept = default;
|
||||
~SystemTaskAllocator() noexcept override = default;
|
||||
|
||||
/**
|
||||
* @return Allocated memory using systems malloc (but aligned).
|
||||
*/
|
||||
void *allocate(const std::uint16_t /*core_id*/) override { return std::aligned_alloc(64U, S); }
|
||||
[[nodiscard]] void *allocate(const std::uint16_t /*core_id*/) override { return std::aligned_alloc(64U, S); }
|
||||
|
||||
/**
|
||||
* Frees the given memory using systems free.
|
||||
|
||||
@@ -164,7 +164,7 @@ private:
|
||||
class information
|
||||
{
|
||||
public:
|
||||
constexpr information() noexcept : _channel_id(0U), _synchronization_primitive(0U) {}
|
||||
constexpr information() noexcept = default;
|
||||
explicit information(const std::uint16_t channel_id,
|
||||
const synchronization::primitive synchronization_primitive) noexcept
|
||||
: _channel_id(channel_id), _synchronization_primitive(static_cast<std::uint16_t>(synchronization_primitive))
|
||||
@@ -182,8 +182,8 @@ public:
|
||||
information &operator=(const information &other) = default;
|
||||
|
||||
private:
|
||||
std::uint16_t _channel_id : 12;
|
||||
std::uint16_t _synchronization_primitive : 4;
|
||||
std::uint16_t _channel_id : 12 {0U};
|
||||
std::uint16_t _synchronization_primitive : 4 {0U};
|
||||
} __attribute__((packed));
|
||||
|
||||
/**
|
||||
|
||||
@@ -12,7 +12,7 @@ public:
|
||||
};
|
||||
|
||||
// Maximal number of supported cores.
|
||||
static constexpr auto max_cores() { return 64U; }
|
||||
static constexpr auto max_cores() { return 128U; }
|
||||
|
||||
// Maximal size for a single task, will be used for task allocation.
|
||||
static constexpr auto task_size() { return 64U; }
|
||||
@@ -30,4 +30,4 @@ public:
|
||||
// memory is unsafe.
|
||||
static constexpr auto memory_reclamation() { return memory_reclamation_scheme::UpdateEpochPeriodically; }
|
||||
};
|
||||
} // namespace mx::tasking
|
||||
} // namespace mx::tasking
|
||||
|
||||
@@ -260,6 +260,11 @@ public:
|
||||
runtime::init(core_set, prefetch_distance, use_system_allocator);
|
||||
}
|
||||
|
||||
runtime_guard(const util::core_set &core_set, const std::uint16_t prefetch_distance = 0U) noexcept
|
||||
: runtime_guard(false, core_set, prefetch_distance)
|
||||
{
|
||||
}
|
||||
|
||||
~runtime_guard() noexcept { runtime::start_and_wait(); }
|
||||
};
|
||||
} // namespace mx::tasking
|
||||
18
src/mx/tasking/task.cpp
Normal file
18
src/mx/tasking/task.cpp
Normal file
@@ -0,0 +1,18 @@
|
||||
#include "task.h"
|
||||
#include "runtime.h"
|
||||
#include <mx/system/topology.h>
|
||||
|
||||
using namespace mx::tasking;
|
||||
|
||||
TaskResult TaskResult::make_stop() noexcept
|
||||
{
|
||||
auto *stop_task = runtime::new_task<StopTaskingTask>(system::topology::core_id());
|
||||
stop_task->annotate(std::uint16_t{0U});
|
||||
return TaskResult::make_succeed_and_remove(stop_task);
|
||||
}
|
||||
|
||||
TaskResult StopTaskingTask::execute(const std::uint16_t /*core_id*/, const std::uint16_t /*channel_id*/)
|
||||
{
|
||||
runtime::stop();
|
||||
return TaskResult::make_remove();
|
||||
}
|
||||
@@ -16,16 +16,68 @@ enum priority : std::uint8_t
|
||||
};
|
||||
|
||||
class TaskInterface;
|
||||
|
||||
/**
|
||||
* The TaskResult is returned by every task to tell the
|
||||
* runtime what happens next. Possibilities are run a
|
||||
* successor task, remove the returning task or stop
|
||||
* the entire runtime.
|
||||
*/
|
||||
class TaskResult
|
||||
{
|
||||
public:
|
||||
/**
|
||||
* Let the runtime know that the given task
|
||||
* should be run as a successor of the current
|
||||
* task. The runtime will schedule that task.
|
||||
*
|
||||
* @param successor_task Task to succeed.
|
||||
* @return A TaskResult that tells the
|
||||
* runtime to run the given task.
|
||||
*/
|
||||
static TaskResult make_succeed(TaskInterface *successor_task) noexcept { return TaskResult{successor_task, false}; }
|
||||
|
||||
/**
|
||||
* Let the runtime know that the given task
|
||||
* should be removed after (successfully)
|
||||
* finishing.
|
||||
*
|
||||
* @return A TaskResult that tells the
|
||||
* runtime to remove the returning task.
|
||||
*/
|
||||
static TaskResult make_remove() noexcept { return TaskResult{nullptr, true}; }
|
||||
|
||||
/**
|
||||
* Let the runtime know that the given task
|
||||
* should be run as a successor of the current
|
||||
* task and the current task should be removed.
|
||||
*
|
||||
* @param successor_task Task to succeed.
|
||||
* @return A TaskResult that tells the runtime
|
||||
* to run the given task and remove the
|
||||
* returning task.
|
||||
*/
|
||||
static TaskResult make_succeed_and_remove(TaskInterface *successor_task) noexcept
|
||||
{
|
||||
return TaskResult{successor_task, true};
|
||||
}
|
||||
static TaskResult make_null() noexcept { return TaskResult{nullptr, false}; }
|
||||
|
||||
/**
|
||||
* Nothing will happen
|
||||
*
|
||||
* @return An empty TaskResult.
|
||||
*/
|
||||
static TaskResult make_null() noexcept { return {}; }
|
||||
|
||||
/**
|
||||
* Let the runtime know to stop after
|
||||
* the returning task.
|
||||
*
|
||||
* @return A TaskResult that tells the
|
||||
* runtime to top.
|
||||
*/
|
||||
static TaskResult make_stop() noexcept;
|
||||
|
||||
constexpr TaskResult() = default;
|
||||
~TaskResult() = default;
|
||||
|
||||
@@ -205,4 +257,13 @@ private:
|
||||
// Tasks annotations.
|
||||
annotation _annotation;
|
||||
};
|
||||
|
||||
class StopTaskingTask final : public TaskInterface
|
||||
{
|
||||
public:
|
||||
constexpr StopTaskingTask() noexcept = default;
|
||||
~StopTaskingTask() override = default;
|
||||
|
||||
TaskResult execute(std::uint16_t /*core_id*/, std::uint16_t /*channel_id*/) override;
|
||||
};
|
||||
} // namespace mx::tasking
|
||||
@@ -28,6 +28,13 @@ public:
|
||||
};
|
||||
|
||||
constexpr core_set() noexcept : _core_identifier({0U}), _numa_nodes(0U) {}
|
||||
explicit core_set(std::initializer_list<std::uint16_t> &&core_ids) noexcept : core_set()
|
||||
{
|
||||
for (const auto core_id : core_ids)
|
||||
{
|
||||
emplace_back(core_id);
|
||||
}
|
||||
}
|
||||
~core_set() noexcept = default;
|
||||
|
||||
core_set &operator=(const core_set &other) noexcept = default;
|
||||
@@ -43,6 +50,8 @@ public:
|
||||
}
|
||||
|
||||
std::uint16_t operator[](const std::uint16_t index) const noexcept { return _core_identifier[index]; }
|
||||
std::uint16_t front() const { return _core_identifier.front(); }
|
||||
std::uint16_t back() const { return _core_identifier.back(); }
|
||||
|
||||
explicit operator bool() const noexcept { return _size > 0U; }
|
||||
|
||||
@@ -101,6 +110,9 @@ public:
|
||||
return _numa_nodes.test(numa_node_id);
|
||||
}
|
||||
|
||||
[[nodiscard]] auto begin() const noexcept { return _core_identifier.begin(); }
|
||||
[[nodiscard]] auto end() const noexcept { return _core_identifier.begin() + _size; }
|
||||
|
||||
private:
|
||||
// List of core identifiers.
|
||||
std::array<std::uint16_t, tasking::config::max_cores()> _core_identifier;
|
||||
|
||||
Reference in New Issue
Block a user