mirror of
https://github.com/mmueller41/genode.git
synced 2026-01-21 12:32:56 +01:00
ealanos: Added blinktree-based key-value store server.
This commit is contained in:
79
repos/ealanos/src/app/blinktree_server/README.md
Normal file
79
repos/ealanos/src/app/blinktree_server/README.md
Normal file
@@ -0,0 +1,79 @@
|
|||||||
|
# BLinkTree Benchmark
|
||||||
|
The BLinkTree-benchmark stores `8` byte numeric keys and values.
|
||||||
|
Call `./bin/blinktree_benchmark -h` for help and parameters.
|
||||||
|
|
||||||
|
## How to generate YCSB workload
|
||||||
|
* Workload specifications are done by files in `workloads_specification/`.
|
||||||
|
* Call `make ycsb-a` and `make ycsb-c` to generate workloads **A** and **C**.
|
||||||
|
* Workload files are stored in `workloads/`
|
||||||
|
* Use `./bin/blinktree_benchmark -f <fill-file> <mixed-file>` to pass the desired workload.
|
||||||
|
* Default (if not specified) is `-f workloads/fill_randint_workloada workloads/mixed_randint_workloada`.
|
||||||
|
|
||||||
|
## Important CLI arguments
|
||||||
|
* The first argument is the number of cores:
|
||||||
|
* `./bin/blinktree_benchmark 1` for using a single core.
|
||||||
|
* `./bin/blinktree_benchmark 1:24` for using cores `1` up to `24`.
|
||||||
|
* `-i <NUMBER>` specifies the number of repetitions of each workload.
|
||||||
|
* `-s <NUMBER>` steps of the cores:
|
||||||
|
* `-s 1` will increase the used cores by one (core ids: `0,1,2,3,4,5,6,7,..,23`).
|
||||||
|
* `-s 2` will skip every second core (core ids: `0,1,3,5,7,..23`).
|
||||||
|
* `-pd <NUMBER>` specifies the prefetch distance.
|
||||||
|
* `-p` or `--perf` will activate performance counter (result will be printed to console and output file).
|
||||||
|
* `--latched` will enable latches for synchronization (default off).
|
||||||
|
* `--exclusive` forces the tasks to access tree nodes exclusively (e.g. by using spinlocks or core-based sequencing) (default off).
|
||||||
|
* `--sync4me` will use built-in synchronization selection to choose the matching primitive based on annotations.
|
||||||
|
* `-o <FILE>` will write the results in **json** format to the given file.
|
||||||
|
|
||||||
|
## Understanding the output
|
||||||
|
After started, the benchmark will print a summary of configured cores and workload:
|
||||||
|
|
||||||
|
core configuration:
|
||||||
|
1: 0
|
||||||
|
2: 0 1
|
||||||
|
4: 0 1 2 3
|
||||||
|
workload: fill: 5m / readonly: 5m
|
||||||
|
|
||||||
|
Here, we configured the benchmark to use one to four cores; each line of the core configuration displays the number of cores and the core identifiers.
|
||||||
|
|
||||||
|
Following, the benchmark will be started and print the results for every iteration:
|
||||||
|
|
||||||
|
1 1 0 1478 ms 3.38295e+06 op/s
|
||||||
|
1 1 1 1237 ms 4.04204e+06 op/s
|
||||||
|
2 1 0 964 ms 5.18672e+06 op/s
|
||||||
|
2 1 1 675 ms 7.40741e+06 op/s
|
||||||
|
4 1 0 935 ms 5.34759e+06 op/s
|
||||||
|
4 1 1 532 ms 9.3985e+06 op/s
|
||||||
|
|
||||||
|
* The first column is the number of used cores.
|
||||||
|
* The second column displays the iteration of the benchmark (configured by `-i X`).
|
||||||
|
* Thirdly, the phase-identifier will be printed: `0` for initialization phase (which will be only inserts) and `1` for the workload phase (which is read-only here).
|
||||||
|
* After that, the time and throughput are written.
|
||||||
|
* If `--perf` is enabled, the output will be extended by some perf counters, which are labeled (like throughput).
|
||||||
|
|
||||||
|
## Plot the results
|
||||||
|
When using `-o FILE`, the results will be written to the given file, using `JSON` format.
|
||||||
|
The plot script `scripts/plot_blinktree_benchmark INPUT_FILE [INPUT_FILE ...]` will aggregate and plot the results using one or more of those `JSON` files.
|
||||||
|
|
||||||
|
## Examples
|
||||||
|
|
||||||
|
###### Running workload A using optimistic synchronization
|
||||||
|
|
||||||
|
./bin/blinktree_benchmark 1: -s 2 -i 3 -pd 3 -p -f workloads/fill_randint_workloada workloads/mixed_randint_workloada -o optimistic.json
|
||||||
|
|
||||||
|
###### Running workload A using best matching synchronization
|
||||||
|
|
||||||
|
./bin/blinktree_benchmark 1: -s 2 -i 3 -pd 3 -p --sync4me -f workloads/fill_randint_workloada workloads/mixed_randint_workloada -o sync4me.json
|
||||||
|
|
||||||
|
###### Running workload A using reader/writer-locks
|
||||||
|
|
||||||
|
./bin/blinktree_benchmark 1: -s 2 -i 3 -pd 3 -p --latched -f workloads/fill_randint_workloada workloads/mixed_randint_workloada -o rwlocked.json
|
||||||
|
|
||||||
|
###### Running workload A using core-based sequencing
|
||||||
|
|
||||||
|
./bin/blinktree_benchmark 1: -s 2 -i 3 -pd 3 -p --exclusive -f workloads/fill_randint_workloada workloads/mixed_randint_workloada -o core-sequenced.json
|
||||||
|
|
||||||
|
###### Running workload A using spin-locks
|
||||||
|
|
||||||
|
./bin/blinktree_benchmark 1: -s 2 -i 3 -pd 3 -p --latched --exclusive -f workloads/fill_randint_workloada workloads/mixed_randint_workloada -o spinlocked.json
|
||||||
|
|
||||||
|
|
||||||
199
repos/ealanos/src/app/blinktree_server/benchmark.cpp
Normal file
199
repos/ealanos/src/app/blinktree_server/benchmark.cpp
Normal file
@@ -0,0 +1,199 @@
|
|||||||
|
#include "benchmark.h"
|
||||||
|
#include <cstdlib>
|
||||||
|
#include <iostream>
|
||||||
|
#include <json.hpp>
|
||||||
|
#include <memory>
|
||||||
|
#include <mx/memory/global_heap.h>
|
||||||
|
|
||||||
|
using namespace application::blinktree_benchmark;
|
||||||
|
|
||||||
|
Benchmark::Benchmark(benchmark::Cores &&cores, const std::uint16_t iterations, std::string &&fill_workload_file,
|
||||||
|
std::string &&mixed_workload_file, const bool use_performance_counter,
|
||||||
|
const mx::synchronization::isolation_level node_isolation_level,
|
||||||
|
const mx::synchronization::protocol preferred_synchronization_method,
|
||||||
|
const bool print_tree_statistics, const bool check_tree, std::string &&result_file_name,
|
||||||
|
std::string &&statistic_file_name, std::string &&tree_file_name, const bool profile)
|
||||||
|
: _cores(std::move(cores)), _iterations(iterations), _node_isolation_level(node_isolation_level),
|
||||||
|
_preferred_synchronization_method(preferred_synchronization_method),
|
||||||
|
_print_tree_statistics(print_tree_statistics), _check_tree(check_tree),
|
||||||
|
_result_file_name(std::move(result_file_name)), _statistic_file_name(std::move(statistic_file_name)),
|
||||||
|
_tree_file_name(std::move(tree_file_name)), _profile(profile)
|
||||||
|
{
|
||||||
|
if (use_performance_counter)
|
||||||
|
{
|
||||||
|
this->_chronometer.add(benchmark::Perf::CYCLES);
|
||||||
|
this->_chronometer.add(benchmark::Perf::INSTRUCTIONS);
|
||||||
|
this->_chronometer.add(benchmark::Perf::STALLS_MEM_ANY);
|
||||||
|
this->_chronometer.add(benchmark::Perf::SW_PREFETCH_ACCESS_NTA);
|
||||||
|
this->_chronometer.add(benchmark::Perf::SW_PREFETCH_ACCESS_WRITE);
|
||||||
|
}
|
||||||
|
|
||||||
|
std::cout << "core configuration: \n" << this->_cores.dump(2) << std::endl;
|
||||||
|
|
||||||
|
this->_workload.build(fill_workload_file, mixed_workload_file);
|
||||||
|
if (this->_workload.empty(benchmark::phase::FILL) && this->_workload.empty(benchmark::phase::MIXED))
|
||||||
|
{
|
||||||
|
std::exit(1);
|
||||||
|
}
|
||||||
|
|
||||||
|
std::cout << "workload: " << this->_workload << "\n" << std::endl;
|
||||||
|
}
|
||||||
|
|
||||||
|
void Benchmark::start()
|
||||||
|
{
|
||||||
|
// Reset tree.
|
||||||
|
if (this->_tree == nullptr)
|
||||||
|
{
|
||||||
|
this->_tree = std::make_unique<db::index::blinktree::BLinkTree<std::uint64_t, std::int64_t>>(
|
||||||
|
this->_node_isolation_level, this->_preferred_synchronization_method);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Reset request scheduler.
|
||||||
|
if (this->_request_scheduler.empty() == false)
|
||||||
|
{
|
||||||
|
this->_request_scheduler.clear();
|
||||||
|
}
|
||||||
|
|
||||||
|
// Create one request scheduler per core.
|
||||||
|
for (auto core_index = 0U; core_index < this->_cores.current().size(); core_index++)
|
||||||
|
{
|
||||||
|
const auto channel_id = core_index;
|
||||||
|
auto *request_scheduler = mx::tasking::runtime::new_task<RequestSchedulerTask>(
|
||||||
|
0U, core_index, channel_id, this->_workload, this->_cores.current(), this->_tree.get(), this);
|
||||||
|
mx::tasking::runtime::spawn(*request_scheduler, 0U);
|
||||||
|
this->_request_scheduler.push_back(request_scheduler);
|
||||||
|
}
|
||||||
|
this->_open_requests = this->_request_scheduler.size();
|
||||||
|
|
||||||
|
// Start measurement.
|
||||||
|
if (this->_profile)
|
||||||
|
{
|
||||||
|
mx::tasking::runtime::profile(this->profile_file_name());
|
||||||
|
}
|
||||||
|
this->_chronometer.start(static_cast<std::uint16_t>(static_cast<benchmark::phase>(this->_workload)),
|
||||||
|
this->_current_iteration + 1, this->_cores.current());
|
||||||
|
}
|
||||||
|
|
||||||
|
const mx::util::core_set &Benchmark::core_set()
|
||||||
|
{
|
||||||
|
if (this->_current_iteration == std::numeric_limits<std::uint16_t>::max())
|
||||||
|
{
|
||||||
|
// This is the very first time we start the benchmark.
|
||||||
|
this->_current_iteration = 0U;
|
||||||
|
return this->_cores.next();
|
||||||
|
}
|
||||||
|
|
||||||
|
// Switch from fill to mixed phase.
|
||||||
|
if (this->_workload == benchmark::phase::FILL && this->_workload.empty(benchmark::phase::MIXED) == false)
|
||||||
|
{
|
||||||
|
this->_workload.reset(benchmark::phase::MIXED);
|
||||||
|
return this->_cores.current();
|
||||||
|
}
|
||||||
|
this->_workload.reset(benchmark::phase::FILL);
|
||||||
|
|
||||||
|
// Run the next iteration.
|
||||||
|
if (++this->_current_iteration < this->_iterations)
|
||||||
|
{
|
||||||
|
return this->_cores.current();
|
||||||
|
}
|
||||||
|
this->_current_iteration = 0U;
|
||||||
|
|
||||||
|
// At this point, all phases and all iterations for the current core configuration
|
||||||
|
// are done. Increase the cores.
|
||||||
|
return this->_cores.next();
|
||||||
|
}
|
||||||
|
|
||||||
|
void Benchmark::requests_finished()
|
||||||
|
{
|
||||||
|
const auto open_requests = --this->_open_requests;
|
||||||
|
|
||||||
|
if (open_requests == 0U) // All request schedulers are done.
|
||||||
|
{
|
||||||
|
// Stop and print time (and performance counter).
|
||||||
|
const auto result = this->_chronometer.stop(this->_workload.size());
|
||||||
|
mx::tasking::runtime::stop();
|
||||||
|
std::cout << result << std::endl;
|
||||||
|
|
||||||
|
// Dump results to file.
|
||||||
|
if (this->_result_file_name.empty() == false)
|
||||||
|
{
|
||||||
|
std::ofstream result_file_stream(this->_result_file_name, std::ofstream::app);
|
||||||
|
result_file_stream << result.to_json().dump() << std::endl;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Dump statistics to file.
|
||||||
|
if constexpr (mx::tasking::config::task_statistics())
|
||||||
|
{
|
||||||
|
if (this->_statistic_file_name.empty() == false)
|
||||||
|
{
|
||||||
|
std::ofstream statistic_file_stream(this->_statistic_file_name, std::ofstream::app);
|
||||||
|
nlohmann::json statistic_json;
|
||||||
|
statistic_json["iteration"] = result.iteration();
|
||||||
|
statistic_json["cores"] = result.core_count();
|
||||||
|
statistic_json["phase"] = result.phase();
|
||||||
|
statistic_json["scheduled"] = nlohmann::json();
|
||||||
|
statistic_json["scheduled-on-channel"] = nlohmann::json();
|
||||||
|
statistic_json["scheduled-off-channel"] = nlohmann::json();
|
||||||
|
statistic_json["executed"] = nlohmann::json();
|
||||||
|
statistic_json["executed-reader"] = nlohmann::json();
|
||||||
|
statistic_json["executed-writer"] = nlohmann::json();
|
||||||
|
statistic_json["buffer-fills"] = nlohmann::json();
|
||||||
|
for (auto i = 0U; i < this->_cores.current().size(); i++)
|
||||||
|
{
|
||||||
|
const auto core_id = std::int32_t{this->_cores.current()[i]};
|
||||||
|
const auto core_id_string = std::to_string(core_id);
|
||||||
|
statistic_json["scheduled"][core_id_string] =
|
||||||
|
result.scheduled_tasks(core_id) / double(result.operation_count());
|
||||||
|
statistic_json["scheduled-on-core"][core_id_string] =
|
||||||
|
result.scheduled_tasks_on_core(core_id) / double(result.operation_count());
|
||||||
|
statistic_json["scheduled-off-core"][core_id_string] =
|
||||||
|
result.scheduled_tasks_off_core(core_id) / double(result.operation_count());
|
||||||
|
statistic_json["executed"][core_id_string] =
|
||||||
|
result.executed_tasks(core_id) / double(result.operation_count());
|
||||||
|
statistic_json["executed-reader"][core_id_string] =
|
||||||
|
result.executed_reader_tasks(core_id) / double(result.operation_count());
|
||||||
|
statistic_json["executed-writer"][core_id_string] =
|
||||||
|
result.executed_writer_tasks(core_id) / double(result.operation_count());
|
||||||
|
statistic_json["fill"][core_id_string] =
|
||||||
|
result.worker_fills(core_id) / double(result.operation_count());
|
||||||
|
}
|
||||||
|
|
||||||
|
statistic_file_stream << statistic_json.dump(2) << std::endl;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check and print the tree.
|
||||||
|
if (this->_check_tree)
|
||||||
|
{
|
||||||
|
this->_tree->check();
|
||||||
|
}
|
||||||
|
|
||||||
|
if (this->_print_tree_statistics)
|
||||||
|
{
|
||||||
|
this->_tree->print_statistics();
|
||||||
|
}
|
||||||
|
|
||||||
|
const auto is_last_phase =
|
||||||
|
this->_workload == benchmark::phase::MIXED || this->_workload.empty(benchmark::phase::MIXED);
|
||||||
|
|
||||||
|
// Dump the tree.
|
||||||
|
if (this->_tree_file_name.empty() == false && is_last_phase)
|
||||||
|
{
|
||||||
|
std::ofstream tree_file_stream(this->_tree_file_name);
|
||||||
|
tree_file_stream << static_cast<nlohmann::json>(*(this->_tree)).dump() << std::endl;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Delete the tree to free the hole memory.
|
||||||
|
if (is_last_phase)
|
||||||
|
{
|
||||||
|
this->_tree.reset(nullptr);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
std::string Benchmark::profile_file_name() const
|
||||||
|
{
|
||||||
|
return "profiling-" + std::to_string(this->_cores.current().size()) + "-cores" + "-phase-" +
|
||||||
|
std::to_string(static_cast<std::uint16_t>(static_cast<benchmark::phase>(this->_workload))) + "-iteration-" +
|
||||||
|
std::to_string(this->_current_iteration) + ".json";
|
||||||
|
}
|
||||||
103
repos/ealanos/src/app/blinktree_server/benchmark.h
Normal file
103
repos/ealanos/src/app/blinktree_server/benchmark.h
Normal file
@@ -0,0 +1,103 @@
|
|||||||
|
#pragma once
|
||||||
|
|
||||||
|
#include "listener.h"
|
||||||
|
#include "request_scheduler.h"
|
||||||
|
#include <array>
|
||||||
|
#include <atomic>
|
||||||
|
#include <benchmark/chronometer.h>
|
||||||
|
#include <benchmark/cores.h>
|
||||||
|
#include <benchmark/workload.h>
|
||||||
|
#include <cstdint>
|
||||||
|
#include <db/index/blinktree/b_link_tree.h>
|
||||||
|
#include <memory>
|
||||||
|
#include <mx/util/core_set.h>
|
||||||
|
#include <string>
|
||||||
|
#include <vector>
|
||||||
|
|
||||||
|
namespace application::blinktree_benchmark {
|
||||||
|
/**
|
||||||
|
* Benchmark executing the task-based BLink-Tree.
|
||||||
|
*/
|
||||||
|
class Benchmark final : public Listener
|
||||||
|
{
|
||||||
|
public:
|
||||||
|
Benchmark(benchmark::Cores &&, std::uint16_t iterations, std::string &&fill_workload_file,
|
||||||
|
std::string &&mixed_workload_file, bool use_performance_counter,
|
||||||
|
mx::synchronization::isolation_level node_isolation_level,
|
||||||
|
mx::synchronization::protocol preferred_synchronization_method, bool print_tree_statistics,
|
||||||
|
bool check_tree, std::string &&result_file_name, std::string &&statistic_file_name,
|
||||||
|
std::string &&tree_file_name, bool profile);
|
||||||
|
|
||||||
|
~Benchmark() noexcept override = default;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return Core set the benchmark should run in the current iteration.
|
||||||
|
*/
|
||||||
|
const mx::util::core_set &core_set();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Callback for request tasks to notify they are out of
|
||||||
|
* new requests.
|
||||||
|
*/
|
||||||
|
void requests_finished() override;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Starts the benchmark after initialization.
|
||||||
|
*/
|
||||||
|
void start();
|
||||||
|
|
||||||
|
private:
|
||||||
|
// Collection of cores the benchmark should run on.
|
||||||
|
benchmark::Cores _cores;
|
||||||
|
|
||||||
|
// Number of iterations the benchmark should use.
|
||||||
|
const std::uint16_t _iterations;
|
||||||
|
|
||||||
|
// Current iteration within the actual core set.
|
||||||
|
std::uint16_t _current_iteration = std::numeric_limits<std::uint16_t>::max();
|
||||||
|
|
||||||
|
// Workload to get requests from.
|
||||||
|
benchmark::Workload _workload;
|
||||||
|
|
||||||
|
// Tree to run requests on.
|
||||||
|
std::unique_ptr<db::index::blinktree::BLinkTree<std::uint64_t, std::int64_t>> _tree;
|
||||||
|
|
||||||
|
// The synchronization mechanism to use for tree nodes.
|
||||||
|
const mx::synchronization::isolation_level _node_isolation_level;
|
||||||
|
|
||||||
|
// Preferred synchronization method.
|
||||||
|
const mx::synchronization::protocol _preferred_synchronization_method;
|
||||||
|
|
||||||
|
// If true, the tree statistics (height, number of nodes, ...) will be printed.
|
||||||
|
const bool _print_tree_statistics;
|
||||||
|
|
||||||
|
// If true, the tree will be checked for consistency after each iteration.
|
||||||
|
const bool _check_tree;
|
||||||
|
|
||||||
|
// Name of the file to print results to.
|
||||||
|
const std::string _result_file_name;
|
||||||
|
|
||||||
|
// Name of the file to print further statistics.
|
||||||
|
const std::string _statistic_file_name;
|
||||||
|
|
||||||
|
// Name of the file to serialize the tree to.
|
||||||
|
const std::string _tree_file_name;
|
||||||
|
|
||||||
|
// If true, use idle profiling.
|
||||||
|
const bool _profile;
|
||||||
|
|
||||||
|
// Number of open request tasks; used for tracking the benchmark.
|
||||||
|
alignas(64) std::atomic_uint16_t _open_requests = 0;
|
||||||
|
|
||||||
|
// List of request schedulers.
|
||||||
|
alignas(64) std::vector<RequestSchedulerTask *> _request_scheduler;
|
||||||
|
|
||||||
|
// Chronometer for starting/stopping time and performance counter.
|
||||||
|
alignas(64) benchmark::Chronometer<std::uint16_t> _chronometer;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return Name of the file to write profiling results to.
|
||||||
|
*/
|
||||||
|
[[nodiscard]] std::string profile_file_name() const;
|
||||||
|
};
|
||||||
|
} // namespace application::blinktree_benchmark
|
||||||
17
repos/ealanos/src/app/blinktree_server/config.h
Normal file
17
repos/ealanos/src/app/blinktree_server/config.h
Normal file
@@ -0,0 +1,17 @@
|
|||||||
|
#pragma once
|
||||||
|
|
||||||
|
namespace application::blinktree_benchmark {
|
||||||
|
class config
|
||||||
|
{
|
||||||
|
public:
|
||||||
|
/**
|
||||||
|
* @return Number of requests that will be started at a time by the request scheduler.
|
||||||
|
*/
|
||||||
|
static constexpr auto batch_size() noexcept { return 500U; }
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return Number of maximal open requests, system-wide.
|
||||||
|
*/
|
||||||
|
static constexpr auto max_parallel_requests() noexcept { return 1500U; }
|
||||||
|
};
|
||||||
|
} // namespace application::blinktree_benchmark
|
||||||
15
repos/ealanos/src/app/blinktree_server/listener.h
Normal file
15
repos/ealanos/src/app/blinktree_server/listener.h
Normal file
@@ -0,0 +1,15 @@
|
|||||||
|
#pragma once
|
||||||
|
|
||||||
|
namespace application::blinktree_benchmark {
|
||||||
|
/**
|
||||||
|
* The listener will be used to notify the benchmark that request tasks are
|
||||||
|
* done and no more work is available.
|
||||||
|
*/
|
||||||
|
class Listener
|
||||||
|
{
|
||||||
|
public:
|
||||||
|
constexpr Listener() = default;
|
||||||
|
virtual ~Listener() = default;
|
||||||
|
virtual void requests_finished() = 0;
|
||||||
|
};
|
||||||
|
} // namespace application::blinktree_benchmark
|
||||||
158
repos/ealanos/src/app/blinktree_server/main.cpp
Normal file
158
repos/ealanos/src/app/blinktree_server/main.cpp
Normal file
@@ -0,0 +1,158 @@
|
|||||||
|
#include "benchmark.h"
|
||||||
|
#include <argparse.hpp>
|
||||||
|
#include <benchmark/cores.h>
|
||||||
|
#include <mx/system/environment.h>
|
||||||
|
#include <mx/tasking/runtime.h>
|
||||||
|
#include <mx/util/core_set.h>
|
||||||
|
#include <tuple>
|
||||||
|
#include "server.h"
|
||||||
|
#include <cstring>
|
||||||
|
#include <cstdio>
|
||||||
|
|
||||||
|
/* Genode includes */
|
||||||
|
#include <libc/component.h>
|
||||||
|
|
||||||
|
using namespace application::blinktree_server;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Instantiates the BLink-Tree server with CLI arguments.
|
||||||
|
* @param count_arguments Number of CLI arguments.
|
||||||
|
* @param arguments Arguments itself.
|
||||||
|
*
|
||||||
|
* @return Instance of the server.
|
||||||
|
*/
|
||||||
|
std::tuple<Server *, std::uint16_t, bool> create_server(int count_arguments, char **arguments);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Starts the server.
|
||||||
|
*
|
||||||
|
* @param count_arguments Number of CLI arguments.
|
||||||
|
* @param arguments Arguments itself.
|
||||||
|
*
|
||||||
|
* @return Return code of the application.
|
||||||
|
*/
|
||||||
|
int bt_main(int count_arguments, char **arguments)
|
||||||
|
{
|
||||||
|
if (mx::system::Environment::is_numa_balancing_enabled())
|
||||||
|
{
|
||||||
|
std::cout << "[Warn] NUMA balancing may be enabled, set '/proc/sys/kernel/numa_balancing' to '0'" << std::endl;
|
||||||
|
}
|
||||||
|
|
||||||
|
auto [server, prefetch_distance, use_system_allocator] = create_server(count_arguments, arguments);
|
||||||
|
|
||||||
|
if (server != nullptr)
|
||||||
|
{
|
||||||
|
/// Wait for the server to finish.
|
||||||
|
server->run();
|
||||||
|
|
||||||
|
delete server;
|
||||||
|
}
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
std::tuple<Server *, std::uint16_t, bool> create_server(int count_arguments, char **arguments)
|
||||||
|
{
|
||||||
|
/*
|
||||||
|
// Set up arguments.
|
||||||
|
argparse::ArgumentParser argument_parser("blinktree_server");
|
||||||
|
argument_parser.add_argument("cores")
|
||||||
|
.help("Number of cores to use.")
|
||||||
|
.default_value(std::uint16_t(1))
|
||||||
|
.action([](const std::string &value) { return std::uint16_t(std::stoi(value)); });
|
||||||
|
argument_parser.add_argument("--port")
|
||||||
|
.help("Port of the server")
|
||||||
|
.default_value(std::uint64_t(12345))
|
||||||
|
.action([](const std::string &value) { return std::uint64_t(std::stoi(value)); });
|
||||||
|
argument_parser.add_argument("-sco", "--system-core-order")
|
||||||
|
.help("Use systems core order. If not, cores are ordered by node id (should be preferred).")
|
||||||
|
.implicit_value(true)
|
||||||
|
.default_value(false);
|
||||||
|
argument_parser.add_argument("--exclusive")
|
||||||
|
.help("Are all node accesses exclusive?")
|
||||||
|
.implicit_value(true)
|
||||||
|
.default_value(false);
|
||||||
|
argument_parser.add_argument("--latched")
|
||||||
|
.help("Prefer latch for synchronization?")
|
||||||
|
.implicit_value(true)
|
||||||
|
.default_value(false);
|
||||||
|
argument_parser.add_argument("--olfit")
|
||||||
|
.help("Prefer OLFIT for synchronization?")
|
||||||
|
.implicit_value(true)
|
||||||
|
.default_value(false);
|
||||||
|
argument_parser.add_argument("--sync4me")
|
||||||
|
.help("Let the tasking layer decide the synchronization primitive.")
|
||||||
|
.implicit_value(true)
|
||||||
|
.default_value(false);
|
||||||
|
argument_parser.add_argument("-pd", "--prefetch-distance")
|
||||||
|
.help("Distance of prefetched data objects (0 = disable prefetching).")
|
||||||
|
.default_value(std::uint16_t(0))
|
||||||
|
.action([](const std::string &value) { return std::uint16_t(std::stoi(value)); });
|
||||||
|
argument_parser.add_argument("--system-allocator")
|
||||||
|
.help("Use the systems malloc interface to allocate tasks (default disabled).")
|
||||||
|
.implicit_value(true)
|
||||||
|
.default_value(false);
|
||||||
|
|
||||||
|
// Parse arguments.
|
||||||
|
try
|
||||||
|
{
|
||||||
|
argument_parser.parse_args(count_arguments, arguments);
|
||||||
|
}
|
||||||
|
catch (std::runtime_error &e)
|
||||||
|
{
|
||||||
|
std::cout << argument_parser << std::endl;
|
||||||
|
return {nullptr, 0U, false};
|
||||||
|
}
|
||||||
|
|
||||||
|
auto order =
|
||||||
|
argument_parser.get<bool>("-sco") ? mx::util::core_set::Order::Ascending : mx::util::core_set::Order::NUMAAware;
|
||||||
|
auto cores = mx::util::core_set::build(argument_parser.get<std::uint16_t>("cores")-1, order);
|
||||||
|
const auto isolation_level = argument_parser.get<bool>("--exclusive")
|
||||||
|
? mx::synchronization::isolation_level::Exclusive
|
||||||
|
: mx::synchronization::isolation_level::ExclusiveWriter;
|
||||||
|
auto preferred_synchronization_method = mx::synchronization::protocol::Queue;
|
||||||
|
if (argument_parser.get<bool>("--latched"))
|
||||||
|
{
|
||||||
|
preferred_synchronization_method = mx::synchronization::protocol::Latch;
|
||||||
|
}
|
||||||
|
else if (argument_parser.get<bool>("--olfit"))
|
||||||
|
{
|
||||||
|
preferred_synchronization_method = mx::synchronization::protocol::OLFIT;
|
||||||
|
}
|
||||||
|
else if (argument_parser.get<bool>("--sync4me"))
|
||||||
|
{
|
||||||
|
preferred_synchronization_method = mx::synchronization::protocol::None;
|
||||||
|
}
|
||||||
|
*/
|
||||||
|
// Create the benchmark.
|
||||||
|
//auto *server = new Server(argument_parser.get<std::uint64_t>("--port"), std::move(cores), argument_parser.get<std::uint16_t>("-pd"), isolation_level, preferred_synchronization_method);
|
||||||
|
|
||||||
|
auto cores = mx::util::core_set::build(64);
|
||||||
|
|
||||||
|
auto *server = new Server(12345, std::move(cores), 3, mx::synchronization::isolation_level::ExclusiveWriter, mx::synchronization::protocol::OLFIT);
|
||||||
|
|
||||||
|
return {server, 3, false};
|
||||||
|
// return {server, argument_parser.get<std::uint16_t>("-pd"), argument_parser.get<bool>("--system-allocator")};
|
||||||
|
}
|
||||||
|
|
||||||
|
void Libc::Component::construct(Libc::Env &env) {
|
||||||
|
|
||||||
|
mx::system::Environment::set_env(&env);
|
||||||
|
|
||||||
|
auto sys_cores = mx::util::core_set::build(64);
|
||||||
|
mx::system::Environment::set_cores(&sys_cores);
|
||||||
|
|
||||||
|
std::uint16_t cores = 64;
|
||||||
|
//env.cpu().affinity_space().total();
|
||||||
|
|
||||||
|
char cores_arg[10];
|
||||||
|
sprintf(cores_arg, "%d", cores);
|
||||||
|
|
||||||
|
char *args[] = {"blinktree_server", cores_arg};
|
||||||
|
|
||||||
|
Libc::with_libc([&]()
|
||||||
|
{
|
||||||
|
std::cout << "Starting B-link tree server" << std::endl;
|
||||||
|
bt_main(2, args);
|
||||||
|
});
|
||||||
|
}
|
||||||
9
repos/ealanos/src/app/blinktree_server/network/config.h
Normal file
9
repos/ealanos/src/app/blinktree_server/network/config.h
Normal file
@@ -0,0 +1,9 @@
|
|||||||
|
#pragma once
|
||||||
|
|
||||||
|
namespace application::blinktree_server::network {
|
||||||
|
class config
|
||||||
|
{
|
||||||
|
public:
|
||||||
|
static constexpr auto max_connections() noexcept { return 64U; }
|
||||||
|
};
|
||||||
|
} // namespace mx::io::network
|
||||||
529
repos/ealanos/src/app/blinktree_server/network/server.cpp
Normal file
529
repos/ealanos/src/app/blinktree_server/network/server.cpp
Normal file
@@ -0,0 +1,529 @@
|
|||||||
|
#include "server.h"
|
||||||
|
#include "lwip/err.h"
|
||||||
|
#include "lwip/pbuf.h"
|
||||||
|
#include "lwip/tcp.h"
|
||||||
|
#include "mx/memory/global_heap.h"
|
||||||
|
#include <cstring>
|
||||||
|
#include <limits>
|
||||||
|
#include <mx/tasking/runtime.h>
|
||||||
|
#include <unistd.h>
|
||||||
|
#include <db/index/blinktree/lookup_task.h>
|
||||||
|
#include <db/index/blinktree/insert_value_task.h>
|
||||||
|
#include <db/index/blinktree/update_task.h>
|
||||||
|
#include <mx/system/topology.h>
|
||||||
|
#include <tukija/syscall-generic.h>
|
||||||
|
#include <tukija/syscalls.h>
|
||||||
|
#include <iostream>
|
||||||
|
|
||||||
|
using namespace application::blinktree_server::network;
|
||||||
|
|
||||||
|
mx::tasking::TaskResult RequestTask::execute(const std::uint16_t core_id, const std::uint16_t channel_id)
|
||||||
|
{
|
||||||
|
mx::tasking::TaskInterface* request_task;
|
||||||
|
|
||||||
|
if (this->_type == Type::Insert)
|
||||||
|
{
|
||||||
|
request_task = mx::tasking::runtime::new_task<
|
||||||
|
db::index::blinktree::InsertValueTask<std::uint64_t, std::int64_t, ResponseHandler>>(
|
||||||
|
core_id, this->_key, this->_value, this->_tree, this->_response_handler);
|
||||||
|
|
||||||
|
request_task->annotate(this->_tree->root(), db::index::blinktree::config::node_size() / 4U);
|
||||||
|
request_task->is_readonly(true);
|
||||||
|
}
|
||||||
|
else if (this->_type == Type::Lookup)
|
||||||
|
{
|
||||||
|
request_task = mx::tasking::runtime::new_task<
|
||||||
|
db::index::blinktree::LookupTask<std::uint64_t, std::int64_t, ResponseHandler>>(
|
||||||
|
core_id, this->_key, this->_response_handler);
|
||||||
|
|
||||||
|
request_task->annotate(this->_tree->root(), db::index::blinktree::config::node_size() / 4U);
|
||||||
|
request_task->is_readonly(true);
|
||||||
|
}
|
||||||
|
else if(this->_type == Type::Update)
|
||||||
|
{
|
||||||
|
request_task = mx::tasking::runtime::new_task<
|
||||||
|
db::index::blinktree::UpdateTask<std::uint64_t, std::int64_t, ResponseHandler>>(
|
||||||
|
core_id, this->_key, this->_value, this->_response_handler);
|
||||||
|
|
||||||
|
request_task->annotate(this->_tree->root(), db::index::blinktree::config::node_size() / 4U);
|
||||||
|
request_task->is_readonly(true);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
this->_tree->check();
|
||||||
|
this->_tree->print_statistics();
|
||||||
|
return mx::tasking::TaskResult::make_null();
|
||||||
|
}
|
||||||
|
|
||||||
|
return mx::tasking::TaskResult::make_succeed_and_remove(request_task);
|
||||||
|
}
|
||||||
|
|
||||||
|
void ResponseHandler::inserted(const std::uint16_t /*core*/, const std::uint64_t key, const std::int64_t /*value*/)
|
||||||
|
{
|
||||||
|
_server->send(_s, std::to_string(key));
|
||||||
|
Server::free_handler_task(core_id, static_cast<void *>(this));
|
||||||
|
}
|
||||||
|
|
||||||
|
void ResponseHandler::updated(const std::uint16_t /*core_id*/, const std::uint64_t key, const std::int64_t /*value*/)
|
||||||
|
{
|
||||||
|
_server-> send(_s, std::to_string(key));
|
||||||
|
Server::free_handler_task(core_id, static_cast<void *>(this));
|
||||||
|
}
|
||||||
|
|
||||||
|
void ResponseHandler::removed(const std::uint16_t /*core_id*/, const std::uint64_t key)
|
||||||
|
{
|
||||||
|
_server-> send(_s, std::to_string(key));
|
||||||
|
Server::free_handler_task(core_id, static_cast<void *>(this));
|
||||||
|
}
|
||||||
|
|
||||||
|
void ResponseHandler::found(const std::uint16_t /*core_id*/, const std::uint64_t /*key*/, const std::int64_t value)
|
||||||
|
{
|
||||||
|
_server-> send(_s, std::to_string(value));
|
||||||
|
Server::free_handler_task(core_id, static_cast<void *>(this));
|
||||||
|
}
|
||||||
|
|
||||||
|
void ResponseHandler::missing(const std::uint16_t /*core_id*/, const std::uint64_t key)
|
||||||
|
{
|
||||||
|
_server-> send(_s, std::to_string(key));
|
||||||
|
Server::free_handler_task(core_id, static_cast<void *>(this));
|
||||||
|
}
|
||||||
|
|
||||||
|
Server *Server::_myself;
|
||||||
|
|
||||||
|
ReceiveTask *Server::_receive_tasks = nullptr;
|
||||||
|
|
||||||
|
Server::Server(Libc::Env &env,
|
||||||
|
const std::uint64_t port,
|
||||||
|
const std::uint16_t count_channels, Timer::Connection &timer, Genode::Heap &alloc) noexcept
|
||||||
|
: _port(port), _socket(nullptr), _client_sockets({nullptr}),
|
||||||
|
_count_channels(count_channels), _env{env}, _config(env, "config"), _alloc(alloc), _timer(timer), _netif(env, _alloc, _config.xml(), _wakeup_scheduler)
|
||||||
|
{
|
||||||
|
Server::_myself = this;
|
||||||
|
this->_buffer.fill('\0');
|
||||||
|
|
||||||
|
_wakeup_scheduler.set_nic(&_netif);
|
||||||
|
|
||||||
|
_receive_tasks = static_cast<ReceiveTask*>(mx::memory::GlobalHeap::allocate_cache_line_aligned(65536 * sizeof(ReceiveTask)));
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
Server::~Server() {
|
||||||
|
}
|
||||||
|
|
||||||
|
bool Server::listen(db::index::blinktree::BLinkTree<std::uint64_t, std::int64_t>* tree)
|
||||||
|
{
|
||||||
|
_socket = Lwip::tcp_new();
|
||||||
|
|
||||||
|
if (!_socket) {
|
||||||
|
Genode::error("Failed to create server socket");
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
Lwip::err_t rc = Lwip::tcp_bind(_socket, &Lwip::ip_addr_any, _port);
|
||||||
|
if (rc != Lwip::ERR_OK) {
|
||||||
|
Genode::error("Failed to bind server socket to port ", _port);
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
_socket = Lwip::tcp_listen_with_backlog(_socket, 64);
|
||||||
|
Lwip::tcp_accept(_socket, &Server::_handle_tcp_connect);
|
||||||
|
|
||||||
|
this->_tree = tree;
|
||||||
|
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
void Server::parse(struct Server::state *s, std::string &message)
|
||||||
|
{
|
||||||
|
RequestTask::Type request_type;
|
||||||
|
|
||||||
|
std::uint64_t i = s->id;
|
||||||
|
|
||||||
|
if (message[0] == 'D')
|
||||||
|
{
|
||||||
|
auto response_handler = new (mx::memory::GlobalHeap::allocate_cache_line_aligned(sizeof(ResponseHandler))) ResponseHandler(this, s, 0);
|
||||||
|
//auto *request_task = new (&this->_request_tasks[i]) RequestTask{this->_tree, *response_handler};
|
||||||
|
auto *request_task = mx::tasking::runtime::new_task<RequestTask>(0, this->_tree, *response_handler);
|
||||||
|
request_task->annotate(std::uint16_t(0U));
|
||||||
|
mx::tasking::runtime::spawn(*request_task);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
switch (message[0])
|
||||||
|
{
|
||||||
|
case 'I':
|
||||||
|
request_type = RequestTask::Type::Insert;
|
||||||
|
break;
|
||||||
|
case 'U':
|
||||||
|
request_type = RequestTask::Type::Update;
|
||||||
|
break;
|
||||||
|
default:
|
||||||
|
request_type = RequestTask::Type::Lookup;
|
||||||
|
}
|
||||||
|
|
||||||
|
auto key = 0ULL;
|
||||||
|
auto index = 2U; // Skip request type and comma.
|
||||||
|
while (message[index] >= '0' && message[index] <= '9')
|
||||||
|
{
|
||||||
|
key = key * 10 + (message[index++] - '0');
|
||||||
|
}
|
||||||
|
|
||||||
|
auto channel_id = std::uint16_t(this->_next_worker_id.fetch_add(1U) % this->_count_channels);
|
||||||
|
if (request_type == RequestTask::Type::Insert || request_type == RequestTask::Type::Lookup)
|
||||||
|
{
|
||||||
|
auto value = 0LL;
|
||||||
|
++index;
|
||||||
|
while (message[index] >= '0' && message[index] <= '9')
|
||||||
|
{
|
||||||
|
value = value * 10 + (message[index++] - '0');
|
||||||
|
}
|
||||||
|
|
||||||
|
auto response_handler = new (mx::memory::GlobalHeap::allocate_cache_line_aligned(sizeof(ResponseHandler))) ResponseHandler(this, s, channel_id);
|
||||||
|
auto *request_task = mx::tasking::runtime::new_task<RequestTask>(channel_id, this->_tree, request_type, key, value, *response_handler);
|
||||||
|
request_task->annotate(channel_id);
|
||||||
|
mx::tasking::runtime::spawn(*request_task);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
//auto *request_task = new (&this->_request_tasks[i]) RequestTask{this->_tree, RequestTask::Type::Lookup, key, this->_response_handlers[i]};
|
||||||
|
auto response_handler = new (mx::memory::GlobalHeap::allocate_cache_line_aligned(sizeof(ResponseHandler))) ResponseHandler(this, s, channel_id);
|
||||||
|
auto *request_task = mx::tasking::runtime::new_task<RequestTask>(channel_id, this->_tree, request_type, key, *response_handler);
|
||||||
|
request_task->annotate(channel_id);
|
||||||
|
mx::tasking::runtime::spawn(*request_task);
|
||||||
|
}
|
||||||
|
mx::tasking::runtime::scheduler().allocate_cores(64);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
class Send_task : public mx::tasking::TaskInterface
|
||||||
|
{
|
||||||
|
private:
|
||||||
|
struct Server::state *_s;
|
||||||
|
std::string _message;
|
||||||
|
|
||||||
|
public:
|
||||||
|
Send_task(Server::state *s, std::string message) : _s(s), _message(message) {}
|
||||||
|
|
||||||
|
mx::tasking::TaskResult execute(std::uint16_t, std::uint16_t) override
|
||||||
|
{
|
||||||
|
using namespace Lwip;
|
||||||
|
Lwip::pbuf *ptr = nullptr;
|
||||||
|
|
||||||
|
if (_s->state == Server::CLOSED || _s->state == Server::CLOSING ) {
|
||||||
|
Genode::warning("Tried to send over socket that is to be closed");
|
||||||
|
Server::free_task(static_cast<void *>(this));
|
||||||
|
return mx::tasking::TaskResult::make_null();
|
||||||
|
}
|
||||||
|
|
||||||
|
ptr = Lwip::pbuf_alloc(Lwip::PBUF_TRANSPORT, _message.length(), Lwip::PBUF_RAM);
|
||||||
|
|
||||||
|
if (!(_s->pcb) || !_s) {
|
||||||
|
Genode::error("Tried sending over invalid pcb");
|
||||||
|
Lwip::pbuf_free(ptr);
|
||||||
|
Server::free_task(static_cast<void *>(this));
|
||||||
|
return mx::tasking::TaskResult::make_null();
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!ptr)
|
||||||
|
{
|
||||||
|
Genode::error("No memory for sending packet.");
|
||||||
|
Server::free_task(static_cast<void *>(this));
|
||||||
|
return mx::tasking::TaskResult::make_null();
|
||||||
|
}
|
||||||
|
|
||||||
|
Lwip::pbuf_take(ptr, _message.c_str(), _message.length());
|
||||||
|
//ptr->payload = static_cast<void *>(const_cast<char *>(_message.c_str()));
|
||||||
|
//ptr->len = _message.length();
|
||||||
|
|
||||||
|
if (ptr->len > tcp_sndbuf(_s->pcb))
|
||||||
|
Genode::warning("Not enough space in send buffer");
|
||||||
|
|
||||||
|
Lwip::err_t rc = Lwip::ERR_OK;
|
||||||
|
{
|
||||||
|
rc = Lwip::tcp_write(_s->pcb, ptr->payload, ptr->len, 0);
|
||||||
|
}
|
||||||
|
if (rc == Lwip::ERR_OK)
|
||||||
|
{
|
||||||
|
Lwip::tcp_output(_s->pcb);
|
||||||
|
Lwip::pbuf_free(ptr);
|
||||||
|
} else {
|
||||||
|
Genode::error("LWIP tcp_write error ", static_cast<signed int>(rc));
|
||||||
|
/*if (_s->tx == nullptr)
|
||||||
|
_s->tx = ptr;
|
||||||
|
else {
|
||||||
|
Lwip::pbuf_cat(_s->tx, ptr);
|
||||||
|
}*/
|
||||||
|
}
|
||||||
|
Server::free_task(static_cast<void *>(this));
|
||||||
|
return mx::tasking::TaskResult::make_null();
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
void
|
||||||
|
Server::send(struct state *s, std::string &&message)
|
||||||
|
{
|
||||||
|
|
||||||
|
const auto length = std::uint64_t(message.size());
|
||||||
|
auto response = std::string(length + sizeof(length), '\0');
|
||||||
|
|
||||||
|
// Write header
|
||||||
|
std::memcpy(response.data(), static_cast<const void *>(&length), sizeof(length));
|
||||||
|
|
||||||
|
// Write data
|
||||||
|
std::memmove(response.data() + sizeof(length), message.data(), length);
|
||||||
|
|
||||||
|
auto task = new (mx::memory::GlobalHeap::allocate_cache_line_aligned(sizeof(Send_task))) Send_task(s, response);
|
||||||
|
task->annotate(static_cast<mx::tasking::TaskInterface::channel>(s->channel_id));
|
||||||
|
mx::tasking::runtime::spawn(*task);
|
||||||
|
}
|
||||||
|
|
||||||
|
std::uint16_t Server::add_client(Lwip::tcp_pcb* client_socket)
|
||||||
|
{
|
||||||
|
for (auto i = 0U; i < this->_client_sockets.size(); ++i)
|
||||||
|
{
|
||||||
|
if (this->_client_sockets[i] == 0U)
|
||||||
|
{
|
||||||
|
this->_client_sockets[i] = client_socket;
|
||||||
|
return i;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return std::numeric_limits<std::uint16_t>::max();
|
||||||
|
}
|
||||||
|
|
||||||
|
void Server::stop() noexcept
|
||||||
|
{
|
||||||
|
this->_is_running = false;
|
||||||
|
}
|
||||||
|
|
||||||
|
class Close_task : public mx::tasking::TaskInterface
|
||||||
|
{
|
||||||
|
private:
|
||||||
|
Server::state &_s;
|
||||||
|
|
||||||
|
public:
|
||||||
|
Close_task(Server::state &s) : _s(s) {}
|
||||||
|
|
||||||
|
mx::tasking::TaskResult execute(std::uint16_t, std::uint16_t)
|
||||||
|
{
|
||||||
|
Genode::log("Closing connection for ", static_cast<void *>(_s.pcb) , " and state object ", static_cast<void*>(&_s));
|
||||||
|
Server::tcpbtree_close(_s.pcb, &_s);
|
||||||
|
_s.state = Server::CLOSED;
|
||||||
|
Server::free_task(static_cast<void *>(this));
|
||||||
|
Genode::log("Closed connection");
|
||||||
|
return mx::tasking::TaskResult::make_null();
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
/***********
|
||||||
|
* LWIP callback function definitions
|
||||||
|
***********/
|
||||||
|
Lwip::err_t Server::_handle_tcp_connect(void *arg, struct Lwip::tcp_pcb *newpcb, Lwip::err_t err)
|
||||||
|
{
|
||||||
|
|
||||||
|
struct state *s;
|
||||||
|
|
||||||
|
static uint64_t count_connections = 0;
|
||||||
|
|
||||||
|
LWIP_UNUSED_ARG(arg);
|
||||||
|
|
||||||
|
if ((err != Lwip::ERR_OK) || (newpcb == NULL)) { return Lwip::ERR_VAL; }
|
||||||
|
|
||||||
|
if (newpcb > reinterpret_cast<Lwip::tcp_pcb *>(0x7FFF80000000UL - sizeof(Lwip::tcp_pcb *)))
|
||||||
|
Genode::error("Non-canonical PCB address");
|
||||||
|
|
||||||
|
// Genode::log("Incoming request");
|
||||||
|
|
||||||
|
s = new (mx::memory::GlobalHeap::allocate_cache_line_aligned(sizeof(struct state)))
|
||||||
|
state(); // static_cast<struct state *>(Lwip::mem_malloc(sizeof(struct state)));
|
||||||
|
|
||||||
|
if (!s) {
|
||||||
|
Genode::error("Failed to allocate state object for new connection.");
|
||||||
|
return Lwip::ERR_MEM;
|
||||||
|
}
|
||||||
|
//Genode::log("New connection #", count_connections, ": arg=", arg, " pcb=", newpcb, " s=", s, " &s=", static_cast<void*>(&s));
|
||||||
|
|
||||||
|
s->state = states::ACCEPTED;
|
||||||
|
s->pcb = newpcb;
|
||||||
|
s->retries = 0;
|
||||||
|
s->p = nullptr;
|
||||||
|
s->tx = nullptr;
|
||||||
|
s->channel_id = 0; //count_connections % Server::get_instance()->_count_channels;
|
||||||
|
|
||||||
|
Lwip::tcp_backlog_accepted(newpcb);
|
||||||
|
/* Register callback functions */
|
||||||
|
Lwip::tcp_arg(newpcb, s);
|
||||||
|
Lwip::tcp_recv(newpcb, &Server::_handle_tcp_recv);
|
||||||
|
Lwip::tcp_err(newpcb, &Server::_handle_tcp_error);
|
||||||
|
Lwip::tcp_poll(newpcb, &Server::_handle_tcp_poll, 50);
|
||||||
|
Lwip::tcp_sent(newpcb, &Server::_handle_tcp_sent);
|
||||||
|
newpcb->flags |= TF_NODELAY;
|
||||||
|
|
||||||
|
return Lwip::ERR_OK;
|
||||||
|
}
|
||||||
|
|
||||||
|
Lwip::err_t Server::_handle_tcp_recv(void *arg, struct Lwip::tcp_pcb *tpcb, struct Lwip::pbuf *p, Lwip::err_t err)
|
||||||
|
{
|
||||||
|
static std::uint16_t next_receive_task = 0;
|
||||||
|
struct state *s;
|
||||||
|
Lwip::err_t rc = Lwip::ERR_OK;
|
||||||
|
|
||||||
|
std::uint16_t next_channel_id = 0;
|
||||||
|
|
||||||
|
s = static_cast<struct state*>(arg);
|
||||||
|
|
||||||
|
if (!s) {
|
||||||
|
Lwip::pbuf_free(p);
|
||||||
|
return Lwip::ERR_ARG;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (err != Lwip::ERR_OK) {
|
||||||
|
Lwip::pbuf_free(p);
|
||||||
|
return err;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (p == nullptr) {
|
||||||
|
s->state = CLOSING;
|
||||||
|
auto task = new (mx::memory::GlobalHeap::allocate_cache_line_aligned(sizeof(Close_task))) Close_task(*s);
|
||||||
|
if (!task) {
|
||||||
|
Genode::warning("Failed to allocate close task");
|
||||||
|
return Lwip::ERR_MEM;
|
||||||
|
}
|
||||||
|
task->annotate(static_cast<mx::tasking::TaskInterface::channel>(s->channel_id));
|
||||||
|
mx::tasking::runtime::spawn(*task);
|
||||||
|
rc = Lwip::ERR_OK;
|
||||||
|
} else if (s->state == states::ACCEPTED) {
|
||||||
|
s->state == states::RECEIVED;
|
||||||
|
|
||||||
|
// TODO: parse message and spawn request task here
|
||||||
|
rc = Lwip::ERR_OK;
|
||||||
|
{
|
||||||
|
void *payload = mx::memory::GlobalHeap::allocate_cache_line_aligned(p->len);
|
||||||
|
std::memcpy(payload, p->payload, p->len);
|
||||||
|
ReceiveTask *task = new (mx::memory::GlobalHeap::allocate_cache_line_aligned(sizeof(ReceiveTask))) ReceiveTask(s, payload, p->len);
|
||||||
|
if (!task) {
|
||||||
|
Genode::warning("Could not allocate request handler task");
|
||||||
|
return Lwip::ERR_MEM;
|
||||||
|
}
|
||||||
|
task->annotate(static_cast<mx::tasking::TaskInterface::channel>(s->channel_id));
|
||||||
|
mx::tasking::runtime::spawn(*task);
|
||||||
|
}
|
||||||
|
Lwip::tcp_recved(s->pcb, p->len);
|
||||||
|
//Server::get_instance()->send(s, "Nope");
|
||||||
|
} else if (s->state == states::RECEIVED) {
|
||||||
|
void *payload = mx::memory::GlobalHeap::allocate_cache_line_aligned(p->len);
|
||||||
|
std::memcpy(payload, p->payload, p->len);
|
||||||
|
|
||||||
|
ReceiveTask *task = new (mx::memory::GlobalHeap::allocate_cache_line_aligned(sizeof(ReceiveTask))) ReceiveTask(s, payload, p->len);
|
||||||
|
if (!task) {
|
||||||
|
Genode::warning("Could not allocate request handler task");
|
||||||
|
return Lwip::ERR_MEM;
|
||||||
|
}
|
||||||
|
task->annotate(static_cast<mx::tasking::TaskInterface::channel>(s->channel_id));
|
||||||
|
mx::tasking::runtime::spawn(*task);
|
||||||
|
Lwip::tcp_recved(s->pcb, p->len);
|
||||||
|
//Server::get_instance()->send(s, "Nope");
|
||||||
|
|
||||||
|
|
||||||
|
rc = Lwip::ERR_OK;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
Lwip::tcp_recved(tpcb, p->tot_len);
|
||||||
|
rc = Lwip::ERR_OK;
|
||||||
|
}
|
||||||
|
Lwip::pbuf_free(p);
|
||||||
|
|
||||||
|
return rc;
|
||||||
|
}
|
||||||
|
|
||||||
|
Lwip::err_t Server::_handle_tcp_poll(void *arg, struct Lwip::tcp_pcb *tpcb)
|
||||||
|
{
|
||||||
|
Lwip::err_t rc;
|
||||||
|
struct state *s;
|
||||||
|
|
||||||
|
//GENODE_LOG_TSC(1);
|
||||||
|
s = static_cast<struct state *>(arg);
|
||||||
|
|
||||||
|
if (s) {
|
||||||
|
if (s->tx) {
|
||||||
|
rc = Lwip::tcp_write(tpcb, s->tx->payload, s->tx->len, 1);
|
||||||
|
if (rc == Lwip::ERR_OK) {
|
||||||
|
Lwip::tcp_output(tpcb);
|
||||||
|
Lwip::pbuf *ptr = s->tx;
|
||||||
|
if (ptr->next) {
|
||||||
|
s->tx = ptr->next;
|
||||||
|
Lwip::pbuf_ref(s->tx);
|
||||||
|
}
|
||||||
|
Lwip::tcp_recved(tpcb, ptr->len);
|
||||||
|
Lwip::pbuf_free(ptr);
|
||||||
|
}
|
||||||
|
// TODO: process remaning pbuf entry
|
||||||
|
} else {
|
||||||
|
/*if (s->state == states::CLOSING) {
|
||||||
|
Server::tcpbtree_close(tpcb, s);
|
||||||
|
}*/
|
||||||
|
}
|
||||||
|
rc = Lwip::ERR_OK;
|
||||||
|
} else {
|
||||||
|
Lwip::tcp_abort(tpcb);
|
||||||
|
rc = Lwip::ERR_ABRT;
|
||||||
|
}
|
||||||
|
|
||||||
|
return Lwip::ERR_OK;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
Lwip::err_t Server::_handle_tcp_sent(void *arg, struct Lwip::tcp_pcb *tpcb, std::uint16_t len)
|
||||||
|
{
|
||||||
|
//GENODE_LOG_TSC(1);
|
||||||
|
struct state *s = static_cast<struct state *>(arg);
|
||||||
|
|
||||||
|
if (!s)
|
||||||
|
return Lwip::ERR_ARG;
|
||||||
|
|
||||||
|
s->retries = 0;
|
||||||
|
|
||||||
|
if (s->tx) {
|
||||||
|
Lwip::err_t rc = Lwip::tcp_write(tpcb, s->tx->payload, s->tx->len, 1);
|
||||||
|
if (rc == Lwip::ERR_OK) {
|
||||||
|
Lwip::tcp_output(tpcb);
|
||||||
|
Lwip::pbuf *ptr = s->tx;
|
||||||
|
if (ptr->next) {
|
||||||
|
s->tx = ptr->next;
|
||||||
|
Lwip::pbuf_ref(s->tx);
|
||||||
|
}
|
||||||
|
Lwip::tcp_recved(tpcb, ptr->len);
|
||||||
|
Lwip::pbuf_free(ptr);
|
||||||
|
}
|
||||||
|
tcp_sent(tpcb, &Server::_handle_tcp_sent); // Genode::log("In _handle_tcp_sent");
|
||||||
|
}
|
||||||
|
|
||||||
|
return Lwip::ERR_OK;
|
||||||
|
}
|
||||||
|
|
||||||
|
mx::tasking::TaskResult application::blinktree_server::network::ReceiveTask::execute(std::uint16_t core_id, std::uint16_t channel_id)
|
||||||
|
{
|
||||||
|
Lwip::err_t rc = Lwip::ERR_OK;
|
||||||
|
|
||||||
|
/*rc = Lwip::tcp_write(_state->pcb, _pbuf->payload, _pbuf->len, 3);
|
||||||
|
Lwip::tcp_output(_state->pcb);
|
||||||
|
if (rc == Lwip::ERR_OK) {
|
||||||
|
Lwip::tcp_recved(_state->pcb, _pbuf->tot_len);
|
||||||
|
Lwip::pbuf_free(_pbuf);
|
||||||
|
} else if (rc == Lwip::ERR_MEM) {
|
||||||
|
Genode::warning("Out of memory");
|
||||||
|
}*/
|
||||||
|
|
||||||
|
//Genode::log("Executing application task");
|
||||||
|
//Server::get_instance()->send(_state, "Nope");
|
||||||
|
// Server::tcp_send(_state->pcb, _state);
|
||||||
|
|
||||||
|
std::string request = std::string(static_cast<char*>(_payload), _length);
|
||||||
|
Server::get_instance()->parse(_state, request);
|
||||||
|
|
||||||
|
mx::memory::GlobalHeap::free(_payload);
|
||||||
|
|
||||||
|
Server::free_task(static_cast<void *>(this));
|
||||||
|
return mx::tasking::TaskResult::make_null();
|
||||||
|
}
|
||||||
301
repos/ealanos/src/app/blinktree_server/network/server.h
Normal file
301
repos/ealanos/src/app/blinktree_server/network/server.h
Normal file
@@ -0,0 +1,301 @@
|
|||||||
|
#pragma once
|
||||||
|
|
||||||
|
#include "config.h"
|
||||||
|
#include "ealanos/memory/hamstraaja.h"
|
||||||
|
#include "mx/memory/global_heap.h"
|
||||||
|
#include <array>
|
||||||
|
#include <cstddef>
|
||||||
|
#include <cstdint>
|
||||||
|
#include <memory>
|
||||||
|
#include <optional>
|
||||||
|
#include <string>
|
||||||
|
|
||||||
|
/* B-link tree includes */
|
||||||
|
#include <db/index/blinktree/b_link_tree.h>
|
||||||
|
#include <db/index/blinktree/listener.h>
|
||||||
|
|
||||||
|
/* lwIP wrapper for Genode's NIC session */
|
||||||
|
#include <mxip/mxnic_netif.h>
|
||||||
|
#include <mxip/genode_init.h>
|
||||||
|
#include <libc/component.h>
|
||||||
|
|
||||||
|
/* Genode includes */
|
||||||
|
#include <timer_session/connection.h>
|
||||||
|
#include <base/heap.h>
|
||||||
|
#include <base/attached_rom_dataspace.h>
|
||||||
|
|
||||||
|
/* MxTasking includes*/
|
||||||
|
#include <mx/memory/fixed_size_allocator.h>
|
||||||
|
#include <mx/memory/dynamic_size_allocator.h>
|
||||||
|
#include <mx/tasking/config.h>
|
||||||
|
#include <mx/tasking/scheduler.h>
|
||||||
|
#include <mx/util/core_set.h>
|
||||||
|
|
||||||
|
/* lwIP includes */
|
||||||
|
namespace Lwip {
|
||||||
|
extern "C" {
|
||||||
|
#include <lwip/opt.h>
|
||||||
|
#include <lwip/tcp.h>
|
||||||
|
#include <lwip/ip_addr.h>
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
namespace application::blinktree_server::network {
|
||||||
|
|
||||||
|
class ResponseHandler;
|
||||||
|
class RequestTask;
|
||||||
|
class ReceiveTask;
|
||||||
|
class Server
|
||||||
|
{
|
||||||
|
public:
|
||||||
|
enum states
|
||||||
|
{
|
||||||
|
NONE = 0,
|
||||||
|
ACCEPTED,
|
||||||
|
RECEIVED,
|
||||||
|
CLOSING,
|
||||||
|
CLOSED
|
||||||
|
};
|
||||||
|
|
||||||
|
struct Wakeup_scheduler : Mxip::Nic_netif::Wakeup_scheduler {
|
||||||
|
Mxip::Nic_netif *nic{nullptr};
|
||||||
|
|
||||||
|
void schedule_nic_server_wakeup() override { nic->wakeup_nic_server(); }
|
||||||
|
|
||||||
|
void set_nic(Mxip::Nic_netif *nic) { this->nic = nic; }
|
||||||
|
|
||||||
|
Wakeup_scheduler() = default;
|
||||||
|
} _wakeup_scheduler;
|
||||||
|
|
||||||
|
struct state
|
||||||
|
{
|
||||||
|
std::uint8_t state;
|
||||||
|
std::uint8_t retries;
|
||||||
|
struct Lwip::tcp_pcb *pcb;
|
||||||
|
struct Lwip::pbuf *p;
|
||||||
|
struct Lwip::pbuf *tx;
|
||||||
|
std::uint16_t channel_id;
|
||||||
|
std::uint64_t id;
|
||||||
|
};
|
||||||
|
Server(Libc::Env &env, std::uint64_t port,
|
||||||
|
std::uint16_t count_channels, Timer::Connection &timer, Genode::Heap &alloc) noexcept;
|
||||||
|
~Server();
|
||||||
|
|
||||||
|
[[nodiscard]] std::uint16_t port() const noexcept { return _port; }
|
||||||
|
void stop() noexcept;
|
||||||
|
void send(struct Server::state *s, std::string &&message);
|
||||||
|
bool listen(db::index::blinktree::BLinkTree<std::uint64_t, std::int64_t> *tree);
|
||||||
|
void parse(struct Server::state *s, std::string &message);
|
||||||
|
|
||||||
|
[[nodiscard]] bool is_running() const noexcept { return _is_running; }
|
||||||
|
|
||||||
|
static void tcp_send(struct Lwip::tcp_pcb *tpcb, struct state *s)
|
||||||
|
{
|
||||||
|
using namespace Lwip;
|
||||||
|
struct Lwip::pbuf *ptr;
|
||||||
|
Lwip::err_t rc = Lwip::ERR_OK;
|
||||||
|
|
||||||
|
if (!s)
|
||||||
|
return;
|
||||||
|
|
||||||
|
while ((rc == Lwip::ERR_OK) && (s->tx != nullptr) /* && (s->tx->len <= tcp_sndbuf(tpcb) */)
|
||||||
|
{
|
||||||
|
ptr = s->tx;
|
||||||
|
// Genode::log("Sending response");
|
||||||
|
rc = Lwip::tcp_write(tpcb, ptr->payload, ptr->len, 1);
|
||||||
|
if (rc == Lwip::ERR_OK)
|
||||||
|
{
|
||||||
|
std::uint16_t plen;
|
||||||
|
|
||||||
|
plen = ptr->len;
|
||||||
|
|
||||||
|
s->tx = ptr->next;
|
||||||
|
if (s->tx != nullptr)
|
||||||
|
{
|
||||||
|
Lwip::pbuf_ref(s->tx);
|
||||||
|
}
|
||||||
|
Lwip::tcp_output(tpcb);
|
||||||
|
Lwip::pbuf_free(ptr);
|
||||||
|
}
|
||||||
|
else if (rc == Lwip::ERR_MEM)
|
||||||
|
{
|
||||||
|
Genode::warning("Low on memory. Defering to poll()");
|
||||||
|
s->tx = ptr;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
Genode::warning("An error ", static_cast<unsigned>(rc), " occured.");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
static void tcpbtree_close(struct Lwip::tcp_pcb *tpcb, struct state *s)
|
||||||
|
{
|
||||||
|
if (!s || s->pcb != tpcb) {
|
||||||
|
Genode::error("Tried closing connection with invalid session state");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
Lwip::tcp_arg(tpcb, NULL);
|
||||||
|
Lwip::tcp_sent(tpcb, NULL);
|
||||||
|
Lwip::tcp_recv(tpcb, NULL);
|
||||||
|
Lwip::tcp_poll(tpcb, NULL, 0);
|
||||||
|
Lwip::tcp_err(tpcb, nullptr);
|
||||||
|
|
||||||
|
Genode::log("Unregistered handlers");
|
||||||
|
|
||||||
|
Lwip::tcp_close(tpcb);
|
||||||
|
Server::tcp_free(s);
|
||||||
|
}
|
||||||
|
|
||||||
|
/* tcp_recv */
|
||||||
|
static Lwip::err_t _handle_tcp_recv(void *arg, struct Lwip::tcp_pcb *tpcb, struct Lwip::pbuf *p, Lwip::err_t err);
|
||||||
|
|
||||||
|
/* tcp_err */
|
||||||
|
static void _handle_tcp_error(void *arg, Lwip::err_t err)
|
||||||
|
{
|
||||||
|
struct state *s;
|
||||||
|
LWIP_UNUSED_ARG(err);
|
||||||
|
|
||||||
|
s = static_cast<state *>(arg);
|
||||||
|
|
||||||
|
Server::tcp_free(s);
|
||||||
|
}
|
||||||
|
|
||||||
|
/* tcp_poll */
|
||||||
|
static Lwip::err_t _handle_tcp_poll(void *arg, struct Lwip::tcp_pcb *tpcb);
|
||||||
|
|
||||||
|
/* tcp_sent */
|
||||||
|
static Lwip::err_t _handle_tcp_sent(void *arg, struct Lwip::tcp_pcb *tpcb, std::uint16_t len);
|
||||||
|
|
||||||
|
/* helper function for free */
|
||||||
|
static void tcp_free(struct state *s)
|
||||||
|
{
|
||||||
|
// Genode::log("Freeing state obj s=", s);
|
||||||
|
if (s)
|
||||||
|
{
|
||||||
|
if (s->p)
|
||||||
|
Lwip::pbuf_free(s->p);
|
||||||
|
if (s->tx) Lwip::pbuf_free(s->tx);
|
||||||
|
Genode::log("Freeing state object ", s);
|
||||||
|
mx::memory::GlobalHeap::free(s); // Lwip::mem_free(s);
|
||||||
|
Genode::log("Freed state object");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
static Server *get_instance() { return _myself; }
|
||||||
|
|
||||||
|
static void free_handler_task(std::uint16_t core_id, void* task)
|
||||||
|
{
|
||||||
|
mx::memory::GlobalHeap::free(task);
|
||||||
|
}
|
||||||
|
|
||||||
|
static void free_task(void* task)
|
||||||
|
{
|
||||||
|
mx::memory::GlobalHeap::free(task);
|
||||||
|
}
|
||||||
|
|
||||||
|
private:
|
||||||
|
static Server *_myself;
|
||||||
|
const std::uint64_t _port;
|
||||||
|
struct Lwip::tcp_pcb *_socket;
|
||||||
|
Libc::Env &_env;
|
||||||
|
|
||||||
|
std::array<struct Lwip::tcp_pcb *, config::max_connections()> _client_sockets;
|
||||||
|
std::array<char, 2048U> _buffer;
|
||||||
|
static ReceiveTask *_receive_tasks;
|
||||||
|
|
||||||
|
alignas(64) bool _is_running = true;
|
||||||
|
alignas(64) std::atomic_uint64_t _next_worker_id{0U};
|
||||||
|
const std::uint16_t _count_channels;
|
||||||
|
|
||||||
|
std::uint16_t add_client(Lwip::tcp_pcb *client_socket);
|
||||||
|
|
||||||
|
/* Genode environment for NIC session */
|
||||||
|
Genode::Attached_rom_dataspace _config;
|
||||||
|
Genode::Heap &_alloc;
|
||||||
|
Timer::Connection &_timer;
|
||||||
|
|
||||||
|
/* lwIP network device (NIC session wrapper) */
|
||||||
|
Mxip::Nic_netif _netif;
|
||||||
|
db::index::blinktree::BLinkTree<std::uint64_t, std::int64_t> *_tree{nullptr};
|
||||||
|
|
||||||
|
/************************************************
|
||||||
|
* lwIP callback API: TCP callback functions
|
||||||
|
************************************************/
|
||||||
|
|
||||||
|
/* tcp_accept */
|
||||||
|
static Lwip::err_t
|
||||||
|
_handle_tcp_connect(void *arg, struct Lwip::tcp_pcb *newpcb, Lwip::err_t err);
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
/* helper function for close() */
|
||||||
|
|
||||||
|
};
|
||||||
|
|
||||||
|
class alignas(64) ResponseHandler final : public db::index::blinktree::Listener<std::uint64_t, std::int64_t>
|
||||||
|
{
|
||||||
|
public:
|
||||||
|
ResponseHandler(Server* server, Server::state *s, std::uint16_t _core_id) : _server(server), _s(s), core_id(_core_id) { }
|
||||||
|
ResponseHandler(ResponseHandler&&) noexcept = default;
|
||||||
|
~ResponseHandler() = default;
|
||||||
|
|
||||||
|
void inserted(std::uint16_t core_id, const std::uint64_t key, const std::int64_t value) override;
|
||||||
|
void updated(std::uint16_t core_id, const std::uint64_t key, const std::int64_t value) override;
|
||||||
|
void removed(std::uint16_t core_id, const std::uint64_t key) override;
|
||||||
|
void found(std::uint16_t core_id, const std::uint64_t key, const std::int64_t value) override;
|
||||||
|
void missing(std::uint16_t core_id, const std::uint64_t key) override;
|
||||||
|
|
||||||
|
private:
|
||||||
|
Server* _server;
|
||||||
|
Server::state *_s;
|
||||||
|
std::uint16_t core_id{0};
|
||||||
|
};
|
||||||
|
|
||||||
|
class alignas(64) RequestTask final : public mx::tasking::TaskInterface
|
||||||
|
{
|
||||||
|
public:
|
||||||
|
enum Type { Insert, Update, Lookup, Debug };
|
||||||
|
|
||||||
|
RequestTask(db::index::blinktree::BLinkTree<std::uint64_t, std::int64_t>* tree, const Type type, const std::uint64_t key, ResponseHandler& response_handler) noexcept
|
||||||
|
: _tree(tree), _type(type), _key(key), _response_handler(response_handler) { }
|
||||||
|
RequestTask(db::index::blinktree::BLinkTree<std::uint64_t, std::int64_t>* tree, const Type type, const std::uint64_t key, const std::int64_t value, ResponseHandler& response_handler) noexcept
|
||||||
|
: _tree(tree), _type(type), _key(key), _value(value), _response_handler(response_handler) { }
|
||||||
|
RequestTask(db::index::blinktree::BLinkTree<std::uint64_t, std::int64_t>* tree, ResponseHandler& response_handler) noexcept
|
||||||
|
: _tree(tree), _type(Type::Debug), _response_handler(response_handler) { }
|
||||||
|
~RequestTask() noexcept = default;
|
||||||
|
|
||||||
|
mx::tasking::TaskResult execute(std::uint16_t core_id, std::uint16_t channel_id) override;
|
||||||
|
|
||||||
|
private:
|
||||||
|
db::index::blinktree::BLinkTree<std::uint64_t, std::int64_t>* _tree;
|
||||||
|
Type _type;
|
||||||
|
std::uint64_t _key;
|
||||||
|
std::uint64_t _value;
|
||||||
|
ResponseHandler& _response_handler;
|
||||||
|
};
|
||||||
|
|
||||||
|
class alignas(64) ReceiveTask final : public mx::tasking::TaskInterface
|
||||||
|
{
|
||||||
|
public:
|
||||||
|
ReceiveTask(Server::state *state, void *pb, std::size_t len) : _state(state), _payload(pb), _length(len) {}
|
||||||
|
|
||||||
|
mx::tasking::TaskResult execute(std::uint16_t core_id, std::uint16_t channel_id) override;
|
||||||
|
|
||||||
|
private:
|
||||||
|
Server::state *_state;
|
||||||
|
void *_payload;
|
||||||
|
std::size_t _length;
|
||||||
|
};
|
||||||
|
|
||||||
|
class alignas(64) AcceptTask final : public mx::tasking::TaskInterface
|
||||||
|
{
|
||||||
|
public:
|
||||||
|
AcceptTask(Lwip::tcp_pcb *newpcb) : _pcb(newpcb) {}
|
||||||
|
|
||||||
|
mx::tasking::TaskResult execute(std::uint16_t core_id, std::uint16_t channel_id) override;
|
||||||
|
|
||||||
|
private:
|
||||||
|
Lwip::tcp_pcb *_pcb;
|
||||||
|
};
|
||||||
|
} // namespace mx::io::network
|
||||||
252
repos/ealanos/src/app/blinktree_server/request_scheduler.h
Normal file
252
repos/ealanos/src/app/blinktree_server/request_scheduler.h
Normal file
@@ -0,0 +1,252 @@
|
|||||||
|
#pragma once
|
||||||
|
|
||||||
|
#include "config.h"
|
||||||
|
#include "listener.h"
|
||||||
|
#include <atomic>
|
||||||
|
#include <benchmark/workload.h>
|
||||||
|
#include <cstdint>
|
||||||
|
#include <db/index/blinktree/b_link_tree.h>
|
||||||
|
#include <db/index/blinktree/config.h>
|
||||||
|
#include <db/index/blinktree/insert_value_task.h>
|
||||||
|
#include <db/index/blinktree/lookup_task.h>
|
||||||
|
#include <db/index/blinktree/update_task.h>
|
||||||
|
#include <mx/resource/resource.h>
|
||||||
|
#include <mx/tasking/runtime.h>
|
||||||
|
#include <mx/tasking/task.h>
|
||||||
|
#include <mx/util/core_set.h>
|
||||||
|
#include <mx/util/reference_counter.h>
|
||||||
|
|
||||||
|
namespace application::blinktree_benchmark {
|
||||||
|
|
||||||
|
class RequestIndex
|
||||||
|
{
|
||||||
|
public:
|
||||||
|
static RequestIndex make_finished() { return RequestIndex{std::numeric_limits<decltype(_index)>::max(), 0UL}; }
|
||||||
|
static RequestIndex make_no_new() { return RequestIndex{0UL, 0UL}; }
|
||||||
|
|
||||||
|
RequestIndex(const std::uint64_t index, const std::uint64_t count) noexcept : _index(index), _count(count) {}
|
||||||
|
explicit RequestIndex(std::pair<std::uint64_t, std::uint64_t> &&index_and_count) noexcept
|
||||||
|
: _index(std::get<0>(index_and_count)), _count(std::get<1>(index_and_count))
|
||||||
|
{
|
||||||
|
}
|
||||||
|
RequestIndex(RequestIndex &&) noexcept = default;
|
||||||
|
RequestIndex(const RequestIndex &) = default;
|
||||||
|
~RequestIndex() noexcept = default;
|
||||||
|
|
||||||
|
RequestIndex &operator=(RequestIndex &&) noexcept = default;
|
||||||
|
|
||||||
|
[[nodiscard]] std::uint64_t index() const noexcept { return _index; }
|
||||||
|
[[nodiscard]] std::uint64_t count() const noexcept { return _count; }
|
||||||
|
|
||||||
|
[[nodiscard]] bool is_finished() const noexcept { return _index == std::numeric_limits<decltype(_index)>::max(); }
|
||||||
|
[[nodiscard]] bool has_new() const noexcept { return _count > 0UL; }
|
||||||
|
|
||||||
|
RequestIndex &operator-=(const std::uint64_t count) noexcept
|
||||||
|
{
|
||||||
|
_count -= count;
|
||||||
|
_index += count;
|
||||||
|
return *this;
|
||||||
|
}
|
||||||
|
|
||||||
|
private:
|
||||||
|
std::uint64_t _index;
|
||||||
|
std::uint64_t _count;
|
||||||
|
};
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The RequestContainer manages the workload and allocates new batches of requests
|
||||||
|
* that will be scheduled by the request scheduler.
|
||||||
|
*/
|
||||||
|
class RequestContainer
|
||||||
|
{
|
||||||
|
public:
|
||||||
|
RequestContainer(const std::uint16_t core_id, const std::uint64_t max_open_requests,
|
||||||
|
benchmark::Workload &workload) noexcept
|
||||||
|
: _finished_requests(core_id), _local_buffer(workload.next(config::batch_size())),
|
||||||
|
_max_pending_requests(max_open_requests), _workload(workload)
|
||||||
|
{
|
||||||
|
}
|
||||||
|
|
||||||
|
~RequestContainer() noexcept = default;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Allocates the next requests to spawn.
|
||||||
|
*
|
||||||
|
* @return Pair of workload-index and number of tuples to request.
|
||||||
|
* When the number is negative, no more requests are available.
|
||||||
|
*/
|
||||||
|
RequestIndex next() noexcept
|
||||||
|
{
|
||||||
|
const auto finished_requests = _finished_requests.load();
|
||||||
|
|
||||||
|
const auto pending_requests = _scheduled_requests - finished_requests;
|
||||||
|
if (pending_requests >= _max_pending_requests)
|
||||||
|
{
|
||||||
|
// Too many open requests somewhere in the system.
|
||||||
|
return RequestIndex::make_no_new();
|
||||||
|
}
|
||||||
|
|
||||||
|
if (_local_buffer.has_new() == false)
|
||||||
|
{
|
||||||
|
_local_buffer = RequestIndex{_workload.next(config::batch_size())};
|
||||||
|
}
|
||||||
|
|
||||||
|
if (_local_buffer.has_new())
|
||||||
|
{
|
||||||
|
// How many requests can be scheduled without reaching the request limit?
|
||||||
|
const auto free_requests = _max_pending_requests - pending_requests;
|
||||||
|
|
||||||
|
// Try to spawn all free requests, but at least those in the local buffer.
|
||||||
|
const auto count = std::min(free_requests, _local_buffer.count());
|
||||||
|
|
||||||
|
_scheduled_requests += count;
|
||||||
|
|
||||||
|
const auto index = RequestIndex{_local_buffer.index(), count};
|
||||||
|
_local_buffer -= count;
|
||||||
|
|
||||||
|
return index;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Do we have to wait for pending requests or are we finished?
|
||||||
|
return pending_requests > 0UL ? RequestIndex::make_no_new() : RequestIndex::make_finished();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Callback after inserted a value.
|
||||||
|
*/
|
||||||
|
void inserted(const std::uint16_t core_id, const std::uint64_t /*key*/, const std::int64_t /*value*/) noexcept
|
||||||
|
{
|
||||||
|
task_finished(core_id);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Callback after updated a value.
|
||||||
|
*/
|
||||||
|
void updated(const std::uint16_t core_id, const std::uint64_t /*key*/, const std::int64_t /*value*/) noexcept
|
||||||
|
{
|
||||||
|
task_finished(core_id);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Callback after removed a value.
|
||||||
|
*/
|
||||||
|
void removed(const std::uint16_t core_id, const std::uint64_t /*key*/) noexcept { task_finished(core_id); }
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Callback after found a value.
|
||||||
|
*/
|
||||||
|
void found(const std::uint16_t core_id, const std::uint64_t /*key*/, const std::int64_t /*value*/) noexcept
|
||||||
|
{
|
||||||
|
task_finished(core_id);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Callback on missing a value.
|
||||||
|
*/
|
||||||
|
void missing(const std::uint16_t core_id, const std::uint64_t /*key*/) noexcept { task_finished(core_id); }
|
||||||
|
|
||||||
|
const benchmark::NumericTuple &operator[](const std::size_t index) const noexcept { return _workload[index]; }
|
||||||
|
|
||||||
|
private:
|
||||||
|
// Number of requests finished by tasks.
|
||||||
|
mx::util::reference_counter_64 _finished_requests;
|
||||||
|
|
||||||
|
// Number of tasks scheduled by the owning request scheduler.
|
||||||
|
std::uint64_t _scheduled_requests = 0UL;
|
||||||
|
|
||||||
|
// Local buffer holding not scheduled, but from global worker owned request items.
|
||||||
|
RequestIndex _local_buffer;
|
||||||
|
|
||||||
|
// Number of requests that can be distributed by this scheduler,
|
||||||
|
// due to system-wide maximal parallel requests.
|
||||||
|
const std::uint64_t _max_pending_requests;
|
||||||
|
|
||||||
|
// Workload to get requests from.
|
||||||
|
benchmark::Workload &_workload;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Updates the counter of finished requests.
|
||||||
|
*/
|
||||||
|
void task_finished(const std::uint16_t core_id) { _finished_requests.add(core_id); }
|
||||||
|
};
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The RequestScheduler own its own request container and sets up requests for the BLink-Tree.
|
||||||
|
*/
|
||||||
|
class RequestSchedulerTask final : public mx::tasking::TaskInterface
|
||||||
|
{
|
||||||
|
public:
|
||||||
|
RequestSchedulerTask(const std::uint16_t core_id, const std::uint16_t channel_id, benchmark::Workload &workload,
|
||||||
|
const mx::util::core_set &core_set,
|
||||||
|
db::index::blinktree::BLinkTree<std::uint64_t, std::int64_t> *tree, Listener *listener)
|
||||||
|
: _tree(tree), _listener(listener)
|
||||||
|
{
|
||||||
|
this->annotate(mx::tasking::priority::low);
|
||||||
|
this->is_readonly(false);
|
||||||
|
|
||||||
|
const auto container = mx::tasking::runtime::new_resource<RequestContainer>(
|
||||||
|
sizeof(RequestContainer), mx::resource::hint{channel_id}, core_id,
|
||||||
|
config::max_parallel_requests() / core_set.size(), workload);
|
||||||
|
this->annotate(container, sizeof(RequestContainer));
|
||||||
|
}
|
||||||
|
|
||||||
|
~RequestSchedulerTask() final = default;
|
||||||
|
|
||||||
|
mx::tasking::TaskResult execute(const std::uint16_t core_id, const std::uint16_t channel_id) override
|
||||||
|
{
|
||||||
|
// Get some new requests from the container.
|
||||||
|
auto &request_container = *mx::resource::ptr_cast<RequestContainer>(this->annotated_resource());
|
||||||
|
const auto next_requests = request_container.next();
|
||||||
|
|
||||||
|
if (next_requests.has_new())
|
||||||
|
{
|
||||||
|
for (auto i = next_requests.index(); i < next_requests.index() + next_requests.count(); ++i)
|
||||||
|
{
|
||||||
|
mx::tasking::TaskInterface *task{nullptr};
|
||||||
|
const auto &tuple = request_container[i];
|
||||||
|
if (tuple == benchmark::NumericTuple::INSERT)
|
||||||
|
{
|
||||||
|
task = mx::tasking::runtime::new_task<
|
||||||
|
db::index::blinktree::InsertValueTask<std::uint64_t, std::int64_t, RequestContainer>>(
|
||||||
|
core_id, tuple.key(), tuple.value(), _tree, request_container);
|
||||||
|
task->is_readonly(_tree->height() > 1U);
|
||||||
|
}
|
||||||
|
else if (tuple == benchmark::NumericTuple::LOOKUP)
|
||||||
|
{
|
||||||
|
task = mx::tasking::runtime::new_task<
|
||||||
|
db::index::blinktree::LookupTask<std::uint64_t, std::int64_t, RequestContainer>>(
|
||||||
|
core_id, tuple.key(), request_container);
|
||||||
|
|
||||||
|
task->is_readonly(true);
|
||||||
|
}
|
||||||
|
else if (tuple == benchmark::NumericTuple::UPDATE)
|
||||||
|
{
|
||||||
|
task = mx::tasking::runtime::new_task<
|
||||||
|
db::index::blinktree::UpdateTask<std::uint64_t, std::int64_t, RequestContainer>>(
|
||||||
|
core_id, tuple.key(), tuple.value(), request_container);
|
||||||
|
task->is_readonly(_tree->height() > 1U);
|
||||||
|
}
|
||||||
|
|
||||||
|
task->annotate(_tree->root(), db::index::blinktree::config::node_size() / 4U);
|
||||||
|
mx::tasking::runtime::spawn(*task, channel_id);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
else if (next_requests.is_finished())
|
||||||
|
{
|
||||||
|
// All requests are done. Notify the benchmark and die.
|
||||||
|
_listener->requests_finished();
|
||||||
|
mx::tasking::runtime::delete_resource<RequestContainer>(this->annotated_resource());
|
||||||
|
return mx::tasking::TaskResult::make_remove();
|
||||||
|
}
|
||||||
|
|
||||||
|
return mx::tasking::TaskResult::make_succeed(this);
|
||||||
|
}
|
||||||
|
|
||||||
|
private:
|
||||||
|
// The tree to send requests to.
|
||||||
|
db::index::blinktree::BLinkTree<std::uint64_t, std::int64_t> *_tree;
|
||||||
|
|
||||||
|
// Benchmark listener to notify on requests are done.
|
||||||
|
Listener *_listener;
|
||||||
|
};
|
||||||
|
} // namespace application::blinktree_benchmark
|
||||||
50
repos/ealanos/src/app/blinktree_server/server.cpp
Normal file
50
repos/ealanos/src/app/blinktree_server/server.cpp
Normal file
@@ -0,0 +1,50 @@
|
|||||||
|
#include "server.h"
|
||||||
|
#include "mx/memory/global_heap.h"
|
||||||
|
#include "network/server.h"
|
||||||
|
#include <algorithm>
|
||||||
|
#include <iostream>
|
||||||
|
#include <mx/system/environment.h>
|
||||||
|
#include <base/heap.h>
|
||||||
|
#include <timer_session/connection.h>
|
||||||
|
#include <mxip/genode_init.h>
|
||||||
|
|
||||||
|
using namespace application::blinktree_server;
|
||||||
|
|
||||||
|
Server::Server(const std::uint64_t port, mx::util::core_set &&cores, const std::uint16_t prefetch_distance, const mx::synchronization::isolation_level node_isolation_level, const mx::synchronization::protocol preferred_synchronization_method)
|
||||||
|
: _port(port), _cores(std::move(cores)), _prefetch_distance(prefetch_distance), _node_isolation_level(node_isolation_level), _preferred_synchronization_method(preferred_synchronization_method)
|
||||||
|
{
|
||||||
|
}
|
||||||
|
|
||||||
|
void Server::run()
|
||||||
|
{
|
||||||
|
network::Server* server;
|
||||||
|
|
||||||
|
Libc::Env &env = mx::system::Environment::env();
|
||||||
|
mx::tasking::runtime::init(env, this->_cores, this->_prefetch_distance, /* use mx tasking's task allocator*/ false);
|
||||||
|
|
||||||
|
this->_tree = std::make_unique<db::index::blinktree::BLinkTree<std::uint64_t, std::int64_t>>(
|
||||||
|
this->_node_isolation_level, this->_preferred_synchronization_method);
|
||||||
|
|
||||||
|
|
||||||
|
static mx::memory::dynamic::Allocator *alloc = new (mx::memory::GlobalHeap::allocate_cache_line_aligned(sizeof(mx::memory::dynamic::Allocator))) mx::memory::dynamic::Allocator();
|
||||||
|
|
||||||
|
static Timer::Connection timer{env};
|
||||||
|
|
||||||
|
static Genode::Heap _alloc{env.ram(), env.rm()};
|
||||||
|
|
||||||
|
Mxip::mxip_init(*mx::memory::GlobalHeap::_alloc, timer);
|
||||||
|
server = new network::Server{env, this->_port, mx::tasking::runtime::channels(), timer, _alloc};
|
||||||
|
|
||||||
|
std::cout << "Waiting for requests on port :" << this->_port << std::endl;
|
||||||
|
auto network_thread = std::thread{[server, tree = this->_tree.get()]() {
|
||||||
|
server->listen(tree);
|
||||||
|
}};
|
||||||
|
mx::tasking::runtime::start_and_wait();
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
network_thread.join();
|
||||||
|
|
||||||
|
|
||||||
|
//delete server;
|
||||||
|
}
|
||||||
30
repos/ealanos/src/app/blinktree_server/server.h
Normal file
30
repos/ealanos/src/app/blinktree_server/server.h
Normal file
@@ -0,0 +1,30 @@
|
|||||||
|
#pragma once
|
||||||
|
|
||||||
|
#include <db/index/blinktree/b_link_tree.h>
|
||||||
|
#include <mx/util/core_set.h>
|
||||||
|
|
||||||
|
namespace application::blinktree_server {
|
||||||
|
class Server
|
||||||
|
{
|
||||||
|
public:
|
||||||
|
Server(std::uint64_t port, mx::util::core_set&& cores, std::uint16_t prefetch_distance, mx::synchronization::isolation_level node_isolation_level, mx::synchronization::protocol preferred_synchronization_method);
|
||||||
|
|
||||||
|
void run();
|
||||||
|
private:
|
||||||
|
const std::uint64_t _port;
|
||||||
|
|
||||||
|
const std::uint16_t _prefetch_distance;
|
||||||
|
|
||||||
|
/// Cores.
|
||||||
|
mx::util::core_set _cores;
|
||||||
|
|
||||||
|
// The synchronization mechanism to use for tree nodes.
|
||||||
|
const mx::synchronization::isolation_level _node_isolation_level;
|
||||||
|
|
||||||
|
// Preferred synchronization method.
|
||||||
|
const mx::synchronization::protocol _preferred_synchronization_method;
|
||||||
|
|
||||||
|
/// Tree.
|
||||||
|
std::unique_ptr<db::index::blinktree::BLinkTree<std::uint64_t, std::int64_t>> _tree;
|
||||||
|
};
|
||||||
|
}
|
||||||
37
repos/ealanos/src/app/blinktree_server/target.mk
Normal file
37
repos/ealanos/src/app/blinktree_server/target.mk
Normal file
@@ -0,0 +1,37 @@
|
|||||||
|
MXINC_DIR=$(REP_DIR)/src/app/blinktree_server
|
||||||
|
MXINC_DIR+=-I$(REP_DIR)/src/app/blinktree
|
||||||
|
GENODE_GCC_TOOLCHAIN_DIR ?= /usr/local/genode/tool/23.05
|
||||||
|
MXBENCH_DIR=$(REP_DIR)/src/lib
|
||||||
|
|
||||||
|
TARGET = blinktree_daemon
|
||||||
|
# soure file for benchmark framework
|
||||||
|
SRC_MXBENCH = ${MXBENCH_DIR}/benchmark/workload_set.cpp
|
||||||
|
SRC_MXBENCH += ${MXBENCH_DIR}/benchmark/workload.cpp
|
||||||
|
SRC_MXBENCH += ${MXBENCH_DIR}/benchmark/cores.cpp
|
||||||
|
SRC_MXBENCH += ${MXBENCH_DIR}/benchmark/string_util.cpp
|
||||||
|
SRC_MXBENCH += ${MXBENCH_DIR}/benchmark/perf.cpp
|
||||||
|
# source files for blinktree benchmark
|
||||||
|
SRC_BTREE += main.cpp
|
||||||
|
SRC_BTREE += server.cpp
|
||||||
|
SRC_BTREE += network/server.cpp
|
||||||
|
|
||||||
|
INC_DIR += /usr/local/genode/tool/lib/clang/14.0.5/include/
|
||||||
|
INC_DIR += $(REP_DIR)/src/lib
|
||||||
|
INC_DIR += $(REP_DIR)/include
|
||||||
|
INC_DIR += $(REP_DIR)/include/ealanos/util
|
||||||
|
INC_DIR += $(call select_from_repositories,src/lib/libc)
|
||||||
|
INC_DIR += $(call select_from_repositories,src/lib/libc)/spec/x86_64
|
||||||
|
vpath %.h ${INC_DIR}
|
||||||
|
LD_OPT += --allow-multiple-definition
|
||||||
|
|
||||||
|
SRC_CC = ${SRC_MXBENCH} ${SRC_BTREE}
|
||||||
|
LIBS += base libc stdcxx mxtasking mxip
|
||||||
|
EXT_OBJECTS += /usr/local/genode/tool/lib/libatomic.a /usr/local/genode/tool/23.05/lib/gcc/x86_64-pc-elf/12.3.0/libgcc_eh.a /usr/local/genode/tool/lib/clang/14.0.5/lib/linux/libclang_rt.builtins-x86_64.a
|
||||||
|
CUSTOM_CC = /usr/local/genode/tool/bin/clang
|
||||||
|
CUSTOM_CXX = /usr/local/genode/tool/bin/clang++
|
||||||
|
CC_OPT := --target=x86_64-genode --sysroot=/does/not/exist --gcc-toolchain=$(GENODE_GCC_TOOLCHAIN_DIR) -Wno-error -g -DNDEBUG -I$(MXINC_DIR) -std=c++20 #-D_GLIBCXX_ATOMIC_BUILTINS_8 -D__GCC_HAVE_SYNC_COMPARE_AND_SWAP_8
|
||||||
|
CC_OPT += -I$(MXBENCH_DIR)
|
||||||
|
CC_OLEVEL = -O3
|
||||||
|
CC_CXX_WARN_STRICT =
|
||||||
|
CUSTOM_CXX_LIB := $(CROSS_DEV_PREFIX)g++
|
||||||
|
#CXX_LD += $(CROSS_DEV_PREFIX)g++
|
||||||
Reference in New Issue
Block a user