From dcf5bb924ddda47d5e9173059e66d4b901cc86ac Mon Sep 17 00:00:00 2001 From: Michael Mueller Date: Tue, 3 Jun 2025 15:29:37 +0200 Subject: [PATCH] ealanos: Added blinktree-based key-value store server. --- .../src/app/blinktree_server/README.md | 79 +++ .../src/app/blinktree_server/benchmark.cpp | 199 +++++++ .../src/app/blinktree_server/benchmark.h | 103 ++++ .../ealanos/src/app/blinktree_server/config.h | 17 + .../src/app/blinktree_server/listener.h | 15 + .../ealanos/src/app/blinktree_server/main.cpp | 158 ++++++ .../src/app/blinktree_server/network/config.h | 9 + .../app/blinktree_server/network/server.cpp | 529 ++++++++++++++++++ .../src/app/blinktree_server/network/server.h | 301 ++++++++++ .../app/blinktree_server/request_scheduler.h | 252 +++++++++ .../src/app/blinktree_server/server.cpp | 50 ++ .../ealanos/src/app/blinktree_server/server.h | 30 + .../src/app/blinktree_server/target.mk | 37 ++ 13 files changed, 1779 insertions(+) create mode 100644 repos/ealanos/src/app/blinktree_server/README.md create mode 100644 repos/ealanos/src/app/blinktree_server/benchmark.cpp create mode 100644 repos/ealanos/src/app/blinktree_server/benchmark.h create mode 100644 repos/ealanos/src/app/blinktree_server/config.h create mode 100644 repos/ealanos/src/app/blinktree_server/listener.h create mode 100644 repos/ealanos/src/app/blinktree_server/main.cpp create mode 100644 repos/ealanos/src/app/blinktree_server/network/config.h create mode 100644 repos/ealanos/src/app/blinktree_server/network/server.cpp create mode 100644 repos/ealanos/src/app/blinktree_server/network/server.h create mode 100644 repos/ealanos/src/app/blinktree_server/request_scheduler.h create mode 100644 repos/ealanos/src/app/blinktree_server/server.cpp create mode 100644 repos/ealanos/src/app/blinktree_server/server.h create mode 100644 repos/ealanos/src/app/blinktree_server/target.mk diff --git a/repos/ealanos/src/app/blinktree_server/README.md b/repos/ealanos/src/app/blinktree_server/README.md new file mode 100644 index 0000000000..06f1770410 --- /dev/null +++ b/repos/ealanos/src/app/blinktree_server/README.md @@ -0,0 +1,79 @@ +# BLinkTree Benchmark +The BLinkTree-benchmark stores `8` byte numeric keys and values. +Call `./bin/blinktree_benchmark -h` for help and parameters. + +## How to generate YCSB workload +* Workload specifications are done by files in `workloads_specification/`. +* Call `make ycsb-a` and `make ycsb-c` to generate workloads **A** and **C**. +* Workload files are stored in `workloads/` +* Use `./bin/blinktree_benchmark -f ` to pass the desired workload. +* Default (if not specified) is `-f workloads/fill_randint_workloada workloads/mixed_randint_workloada`. + +## Important CLI arguments +* The first argument is the number of cores: + * `./bin/blinktree_benchmark 1` for using a single core. + * `./bin/blinktree_benchmark 1:24` for using cores `1` up to `24`. +* `-i ` specifies the number of repetitions of each workload. +* `-s ` steps of the cores: + * `-s 1` will increase the used cores by one (core ids: `0,1,2,3,4,5,6,7,..,23`). + * `-s 2` will skip every second core (core ids: `0,1,3,5,7,..23`). +* `-pd ` specifies the prefetch distance. +* `-p` or `--perf` will activate performance counter (result will be printed to console and output file). +* `--latched` will enable latches for synchronization (default off). +* `--exclusive` forces the tasks to access tree nodes exclusively (e.g. by using spinlocks or core-based sequencing) (default off). +* `--sync4me` will use built-in synchronization selection to choose the matching primitive based on annotations. +* `-o ` will write the results in **json** format to the given file. + +## Understanding the output +After started, the benchmark will print a summary of configured cores and workload: + + core configuration: + 1: 0 + 2: 0 1 + 4: 0 1 2 3 + workload: fill: 5m / readonly: 5m + +Here, we configured the benchmark to use one to four cores; each line of the core configuration displays the number of cores and the core identifiers. + +Following, the benchmark will be started and print the results for every iteration: + + 1 1 0 1478 ms 3.38295e+06 op/s + 1 1 1 1237 ms 4.04204e+06 op/s + 2 1 0 964 ms 5.18672e+06 op/s + 2 1 1 675 ms 7.40741e+06 op/s + 4 1 0 935 ms 5.34759e+06 op/s + 4 1 1 532 ms 9.3985e+06 op/s + +* The first column is the number of used cores. +* The second column displays the iteration of the benchmark (configured by `-i X`). +* Thirdly, the phase-identifier will be printed: `0` for initialization phase (which will be only inserts) and `1` for the workload phase (which is read-only here). +* After that, the time and throughput are written. +* If `--perf` is enabled, the output will be extended by some perf counters, which are labeled (like throughput). + +## Plot the results +When using `-o FILE`, the results will be written to the given file, using `JSON` format. +The plot script `scripts/plot_blinktree_benchmark INPUT_FILE [INPUT_FILE ...]` will aggregate and plot the results using one or more of those `JSON` files. + +## Examples + +###### Running workload A using optimistic synchronization + + ./bin/blinktree_benchmark 1: -s 2 -i 3 -pd 3 -p -f workloads/fill_randint_workloada workloads/mixed_randint_workloada -o optimistic.json + +###### Running workload A using best matching synchronization + + ./bin/blinktree_benchmark 1: -s 2 -i 3 -pd 3 -p --sync4me -f workloads/fill_randint_workloada workloads/mixed_randint_workloada -o sync4me.json + +###### Running workload A using reader/writer-locks + + ./bin/blinktree_benchmark 1: -s 2 -i 3 -pd 3 -p --latched -f workloads/fill_randint_workloada workloads/mixed_randint_workloada -o rwlocked.json + +###### Running workload A using core-based sequencing + + ./bin/blinktree_benchmark 1: -s 2 -i 3 -pd 3 -p --exclusive -f workloads/fill_randint_workloada workloads/mixed_randint_workloada -o core-sequenced.json + +###### Running workload A using spin-locks + + ./bin/blinktree_benchmark 1: -s 2 -i 3 -pd 3 -p --latched --exclusive -f workloads/fill_randint_workloada workloads/mixed_randint_workloada -o spinlocked.json + + \ No newline at end of file diff --git a/repos/ealanos/src/app/blinktree_server/benchmark.cpp b/repos/ealanos/src/app/blinktree_server/benchmark.cpp new file mode 100644 index 0000000000..24117ba79f --- /dev/null +++ b/repos/ealanos/src/app/blinktree_server/benchmark.cpp @@ -0,0 +1,199 @@ +#include "benchmark.h" +#include +#include +#include +#include +#include + +using namespace application::blinktree_benchmark; + +Benchmark::Benchmark(benchmark::Cores &&cores, const std::uint16_t iterations, std::string &&fill_workload_file, + std::string &&mixed_workload_file, const bool use_performance_counter, + const mx::synchronization::isolation_level node_isolation_level, + const mx::synchronization::protocol preferred_synchronization_method, + const bool print_tree_statistics, const bool check_tree, std::string &&result_file_name, + std::string &&statistic_file_name, std::string &&tree_file_name, const bool profile) + : _cores(std::move(cores)), _iterations(iterations), _node_isolation_level(node_isolation_level), + _preferred_synchronization_method(preferred_synchronization_method), + _print_tree_statistics(print_tree_statistics), _check_tree(check_tree), + _result_file_name(std::move(result_file_name)), _statistic_file_name(std::move(statistic_file_name)), + _tree_file_name(std::move(tree_file_name)), _profile(profile) +{ + if (use_performance_counter) + { + this->_chronometer.add(benchmark::Perf::CYCLES); + this->_chronometer.add(benchmark::Perf::INSTRUCTIONS); + this->_chronometer.add(benchmark::Perf::STALLS_MEM_ANY); + this->_chronometer.add(benchmark::Perf::SW_PREFETCH_ACCESS_NTA); + this->_chronometer.add(benchmark::Perf::SW_PREFETCH_ACCESS_WRITE); + } + + std::cout << "core configuration: \n" << this->_cores.dump(2) << std::endl; + + this->_workload.build(fill_workload_file, mixed_workload_file); + if (this->_workload.empty(benchmark::phase::FILL) && this->_workload.empty(benchmark::phase::MIXED)) + { + std::exit(1); + } + + std::cout << "workload: " << this->_workload << "\n" << std::endl; +} + +void Benchmark::start() +{ + // Reset tree. + if (this->_tree == nullptr) + { + this->_tree = std::make_unique>( + this->_node_isolation_level, this->_preferred_synchronization_method); + } + + // Reset request scheduler. + if (this->_request_scheduler.empty() == false) + { + this->_request_scheduler.clear(); + } + + // Create one request scheduler per core. + for (auto core_index = 0U; core_index < this->_cores.current().size(); core_index++) + { + const auto channel_id = core_index; + auto *request_scheduler = mx::tasking::runtime::new_task( + 0U, core_index, channel_id, this->_workload, this->_cores.current(), this->_tree.get(), this); + mx::tasking::runtime::spawn(*request_scheduler, 0U); + this->_request_scheduler.push_back(request_scheduler); + } + this->_open_requests = this->_request_scheduler.size(); + + // Start measurement. + if (this->_profile) + { + mx::tasking::runtime::profile(this->profile_file_name()); + } + this->_chronometer.start(static_cast(static_cast(this->_workload)), + this->_current_iteration + 1, this->_cores.current()); +} + +const mx::util::core_set &Benchmark::core_set() +{ + if (this->_current_iteration == std::numeric_limits::max()) + { + // This is the very first time we start the benchmark. + this->_current_iteration = 0U; + return this->_cores.next(); + } + + // Switch from fill to mixed phase. + if (this->_workload == benchmark::phase::FILL && this->_workload.empty(benchmark::phase::MIXED) == false) + { + this->_workload.reset(benchmark::phase::MIXED); + return this->_cores.current(); + } + this->_workload.reset(benchmark::phase::FILL); + + // Run the next iteration. + if (++this->_current_iteration < this->_iterations) + { + return this->_cores.current(); + } + this->_current_iteration = 0U; + + // At this point, all phases and all iterations for the current core configuration + // are done. Increase the cores. + return this->_cores.next(); +} + +void Benchmark::requests_finished() +{ + const auto open_requests = --this->_open_requests; + + if (open_requests == 0U) // All request schedulers are done. + { + // Stop and print time (and performance counter). + const auto result = this->_chronometer.stop(this->_workload.size()); + mx::tasking::runtime::stop(); + std::cout << result << std::endl; + + // Dump results to file. + if (this->_result_file_name.empty() == false) + { + std::ofstream result_file_stream(this->_result_file_name, std::ofstream::app); + result_file_stream << result.to_json().dump() << std::endl; + } + + // Dump statistics to file. + if constexpr (mx::tasking::config::task_statistics()) + { + if (this->_statistic_file_name.empty() == false) + { + std::ofstream statistic_file_stream(this->_statistic_file_name, std::ofstream::app); + nlohmann::json statistic_json; + statistic_json["iteration"] = result.iteration(); + statistic_json["cores"] = result.core_count(); + statistic_json["phase"] = result.phase(); + statistic_json["scheduled"] = nlohmann::json(); + statistic_json["scheduled-on-channel"] = nlohmann::json(); + statistic_json["scheduled-off-channel"] = nlohmann::json(); + statistic_json["executed"] = nlohmann::json(); + statistic_json["executed-reader"] = nlohmann::json(); + statistic_json["executed-writer"] = nlohmann::json(); + statistic_json["buffer-fills"] = nlohmann::json(); + for (auto i = 0U; i < this->_cores.current().size(); i++) + { + const auto core_id = std::int32_t{this->_cores.current()[i]}; + const auto core_id_string = std::to_string(core_id); + statistic_json["scheduled"][core_id_string] = + result.scheduled_tasks(core_id) / double(result.operation_count()); + statistic_json["scheduled-on-core"][core_id_string] = + result.scheduled_tasks_on_core(core_id) / double(result.operation_count()); + statistic_json["scheduled-off-core"][core_id_string] = + result.scheduled_tasks_off_core(core_id) / double(result.operation_count()); + statistic_json["executed"][core_id_string] = + result.executed_tasks(core_id) / double(result.operation_count()); + statistic_json["executed-reader"][core_id_string] = + result.executed_reader_tasks(core_id) / double(result.operation_count()); + statistic_json["executed-writer"][core_id_string] = + result.executed_writer_tasks(core_id) / double(result.operation_count()); + statistic_json["fill"][core_id_string] = + result.worker_fills(core_id) / double(result.operation_count()); + } + + statistic_file_stream << statistic_json.dump(2) << std::endl; + } + } + + // Check and print the tree. + if (this->_check_tree) + { + this->_tree->check(); + } + + if (this->_print_tree_statistics) + { + this->_tree->print_statistics(); + } + + const auto is_last_phase = + this->_workload == benchmark::phase::MIXED || this->_workload.empty(benchmark::phase::MIXED); + + // Dump the tree. + if (this->_tree_file_name.empty() == false && is_last_phase) + { + std::ofstream tree_file_stream(this->_tree_file_name); + tree_file_stream << static_cast(*(this->_tree)).dump() << std::endl; + } + + // Delete the tree to free the hole memory. + if (is_last_phase) + { + this->_tree.reset(nullptr); + } + } +} + +std::string Benchmark::profile_file_name() const +{ + return "profiling-" + std::to_string(this->_cores.current().size()) + "-cores" + "-phase-" + + std::to_string(static_cast(static_cast(this->_workload))) + "-iteration-" + + std::to_string(this->_current_iteration) + ".json"; +} \ No newline at end of file diff --git a/repos/ealanos/src/app/blinktree_server/benchmark.h b/repos/ealanos/src/app/blinktree_server/benchmark.h new file mode 100644 index 0000000000..ae0b789b4b --- /dev/null +++ b/repos/ealanos/src/app/blinktree_server/benchmark.h @@ -0,0 +1,103 @@ +#pragma once + +#include "listener.h" +#include "request_scheduler.h" +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace application::blinktree_benchmark { +/** + * Benchmark executing the task-based BLink-Tree. + */ +class Benchmark final : public Listener +{ +public: + Benchmark(benchmark::Cores &&, std::uint16_t iterations, std::string &&fill_workload_file, + std::string &&mixed_workload_file, bool use_performance_counter, + mx::synchronization::isolation_level node_isolation_level, + mx::synchronization::protocol preferred_synchronization_method, bool print_tree_statistics, + bool check_tree, std::string &&result_file_name, std::string &&statistic_file_name, + std::string &&tree_file_name, bool profile); + + ~Benchmark() noexcept override = default; + + /** + * @return Core set the benchmark should run in the current iteration. + */ + const mx::util::core_set &core_set(); + + /** + * Callback for request tasks to notify they are out of + * new requests. + */ + void requests_finished() override; + + /** + * Starts the benchmark after initialization. + */ + void start(); + +private: + // Collection of cores the benchmark should run on. + benchmark::Cores _cores; + + // Number of iterations the benchmark should use. + const std::uint16_t _iterations; + + // Current iteration within the actual core set. + std::uint16_t _current_iteration = std::numeric_limits::max(); + + // Workload to get requests from. + benchmark::Workload _workload; + + // Tree to run requests on. + std::unique_ptr> _tree; + + // The synchronization mechanism to use for tree nodes. + const mx::synchronization::isolation_level _node_isolation_level; + + // Preferred synchronization method. + const mx::synchronization::protocol _preferred_synchronization_method; + + // If true, the tree statistics (height, number of nodes, ...) will be printed. + const bool _print_tree_statistics; + + // If true, the tree will be checked for consistency after each iteration. + const bool _check_tree; + + // Name of the file to print results to. + const std::string _result_file_name; + + // Name of the file to print further statistics. + const std::string _statistic_file_name; + + // Name of the file to serialize the tree to. + const std::string _tree_file_name; + + // If true, use idle profiling. + const bool _profile; + + // Number of open request tasks; used for tracking the benchmark. + alignas(64) std::atomic_uint16_t _open_requests = 0; + + // List of request schedulers. + alignas(64) std::vector _request_scheduler; + + // Chronometer for starting/stopping time and performance counter. + alignas(64) benchmark::Chronometer _chronometer; + + /** + * @return Name of the file to write profiling results to. + */ + [[nodiscard]] std::string profile_file_name() const; +}; +} // namespace application::blinktree_benchmark \ No newline at end of file diff --git a/repos/ealanos/src/app/blinktree_server/config.h b/repos/ealanos/src/app/blinktree_server/config.h new file mode 100644 index 0000000000..2144075be8 --- /dev/null +++ b/repos/ealanos/src/app/blinktree_server/config.h @@ -0,0 +1,17 @@ +#pragma once + +namespace application::blinktree_benchmark { +class config +{ +public: + /** + * @return Number of requests that will be started at a time by the request scheduler. + */ + static constexpr auto batch_size() noexcept { return 500U; } + + /** + * @return Number of maximal open requests, system-wide. + */ + static constexpr auto max_parallel_requests() noexcept { return 1500U; } +}; +} // namespace application::blinktree_benchmark \ No newline at end of file diff --git a/repos/ealanos/src/app/blinktree_server/listener.h b/repos/ealanos/src/app/blinktree_server/listener.h new file mode 100644 index 0000000000..5a911fa6b5 --- /dev/null +++ b/repos/ealanos/src/app/blinktree_server/listener.h @@ -0,0 +1,15 @@ +#pragma once + +namespace application::blinktree_benchmark { +/** + * The listener will be used to notify the benchmark that request tasks are + * done and no more work is available. + */ +class Listener +{ +public: + constexpr Listener() = default; + virtual ~Listener() = default; + virtual void requests_finished() = 0; +}; +} // namespace application::blinktree_benchmark \ No newline at end of file diff --git a/repos/ealanos/src/app/blinktree_server/main.cpp b/repos/ealanos/src/app/blinktree_server/main.cpp new file mode 100644 index 0000000000..1d231d1890 --- /dev/null +++ b/repos/ealanos/src/app/blinktree_server/main.cpp @@ -0,0 +1,158 @@ +#include "benchmark.h" +#include +#include +#include +#include +#include +#include +#include "server.h" +#include +#include + +/* Genode includes */ +#include + +using namespace application::blinktree_server; + +/** + * Instantiates the BLink-Tree server with CLI arguments. + * @param count_arguments Number of CLI arguments. + * @param arguments Arguments itself. + * + * @return Instance of the server. + */ +std::tuple create_server(int count_arguments, char **arguments); + +/** + * Starts the server. + * + * @param count_arguments Number of CLI arguments. + * @param arguments Arguments itself. + * + * @return Return code of the application. + */ +int bt_main(int count_arguments, char **arguments) +{ + if (mx::system::Environment::is_numa_balancing_enabled()) + { + std::cout << "[Warn] NUMA balancing may be enabled, set '/proc/sys/kernel/numa_balancing' to '0'" << std::endl; + } + + auto [server, prefetch_distance, use_system_allocator] = create_server(count_arguments, arguments); + + if (server != nullptr) + { + /// Wait for the server to finish. + server->run(); + + delete server; + } + + return 0; +} + +std::tuple create_server(int count_arguments, char **arguments) +{ + /* + // Set up arguments. + argparse::ArgumentParser argument_parser("blinktree_server"); + argument_parser.add_argument("cores") + .help("Number of cores to use.") + .default_value(std::uint16_t(1)) + .action([](const std::string &value) { return std::uint16_t(std::stoi(value)); }); + argument_parser.add_argument("--port") + .help("Port of the server") + .default_value(std::uint64_t(12345)) + .action([](const std::string &value) { return std::uint64_t(std::stoi(value)); }); + argument_parser.add_argument("-sco", "--system-core-order") + .help("Use systems core order. If not, cores are ordered by node id (should be preferred).") + .implicit_value(true) + .default_value(false); + argument_parser.add_argument("--exclusive") + .help("Are all node accesses exclusive?") + .implicit_value(true) + .default_value(false); + argument_parser.add_argument("--latched") + .help("Prefer latch for synchronization?") + .implicit_value(true) + .default_value(false); + argument_parser.add_argument("--olfit") + .help("Prefer OLFIT for synchronization?") + .implicit_value(true) + .default_value(false); + argument_parser.add_argument("--sync4me") + .help("Let the tasking layer decide the synchronization primitive.") + .implicit_value(true) + .default_value(false); + argument_parser.add_argument("-pd", "--prefetch-distance") + .help("Distance of prefetched data objects (0 = disable prefetching).") + .default_value(std::uint16_t(0)) + .action([](const std::string &value) { return std::uint16_t(std::stoi(value)); }); + argument_parser.add_argument("--system-allocator") + .help("Use the systems malloc interface to allocate tasks (default disabled).") + .implicit_value(true) + .default_value(false); + + // Parse arguments. + try + { + argument_parser.parse_args(count_arguments, arguments); + } + catch (std::runtime_error &e) + { + std::cout << argument_parser << std::endl; + return {nullptr, 0U, false}; + } + + auto order = + argument_parser.get("-sco") ? mx::util::core_set::Order::Ascending : mx::util::core_set::Order::NUMAAware; + auto cores = mx::util::core_set::build(argument_parser.get("cores")-1, order); + const auto isolation_level = argument_parser.get("--exclusive") + ? mx::synchronization::isolation_level::Exclusive + : mx::synchronization::isolation_level::ExclusiveWriter; + auto preferred_synchronization_method = mx::synchronization::protocol::Queue; + if (argument_parser.get("--latched")) + { + preferred_synchronization_method = mx::synchronization::protocol::Latch; + } + else if (argument_parser.get("--olfit")) + { + preferred_synchronization_method = mx::synchronization::protocol::OLFIT; + } + else if (argument_parser.get("--sync4me")) + { + preferred_synchronization_method = mx::synchronization::protocol::None; + } + */ + // Create the benchmark. + //auto *server = new Server(argument_parser.get("--port"), std::move(cores), argument_parser.get("-pd"), isolation_level, preferred_synchronization_method); + + auto cores = mx::util::core_set::build(64); + + auto *server = new Server(12345, std::move(cores), 3, mx::synchronization::isolation_level::ExclusiveWriter, mx::synchronization::protocol::OLFIT); + + return {server, 3, false}; + // return {server, argument_parser.get("-pd"), argument_parser.get("--system-allocator")}; +} + +void Libc::Component::construct(Libc::Env &env) { + + mx::system::Environment::set_env(&env); + + auto sys_cores = mx::util::core_set::build(64); + mx::system::Environment::set_cores(&sys_cores); + + std::uint16_t cores = 64; + //env.cpu().affinity_space().total(); + + char cores_arg[10]; + sprintf(cores_arg, "%d", cores); + + char *args[] = {"blinktree_server", cores_arg}; + + Libc::with_libc([&]() + { + std::cout << "Starting B-link tree server" << std::endl; + bt_main(2, args); + }); +} \ No newline at end of file diff --git a/repos/ealanos/src/app/blinktree_server/network/config.h b/repos/ealanos/src/app/blinktree_server/network/config.h new file mode 100644 index 0000000000..026c610a96 --- /dev/null +++ b/repos/ealanos/src/app/blinktree_server/network/config.h @@ -0,0 +1,9 @@ +#pragma once + +namespace application::blinktree_server::network { +class config +{ +public: + static constexpr auto max_connections() noexcept { return 64U; } +}; +} // namespace mx::io::network \ No newline at end of file diff --git a/repos/ealanos/src/app/blinktree_server/network/server.cpp b/repos/ealanos/src/app/blinktree_server/network/server.cpp new file mode 100644 index 0000000000..f0b902b247 --- /dev/null +++ b/repos/ealanos/src/app/blinktree_server/network/server.cpp @@ -0,0 +1,529 @@ +#include "server.h" +#include "lwip/err.h" +#include "lwip/pbuf.h" +#include "lwip/tcp.h" +#include "mx/memory/global_heap.h" +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +using namespace application::blinktree_server::network; + +mx::tasking::TaskResult RequestTask::execute(const std::uint16_t core_id, const std::uint16_t channel_id) +{ + mx::tasking::TaskInterface* request_task; + + if (this->_type == Type::Insert) + { + request_task = mx::tasking::runtime::new_task< + db::index::blinktree::InsertValueTask>( + core_id, this->_key, this->_value, this->_tree, this->_response_handler); + + request_task->annotate(this->_tree->root(), db::index::blinktree::config::node_size() / 4U); + request_task->is_readonly(true); + } + else if (this->_type == Type::Lookup) + { + request_task = mx::tasking::runtime::new_task< + db::index::blinktree::LookupTask>( + core_id, this->_key, this->_response_handler); + + request_task->annotate(this->_tree->root(), db::index::blinktree::config::node_size() / 4U); + request_task->is_readonly(true); + } + else if(this->_type == Type::Update) + { + request_task = mx::tasking::runtime::new_task< + db::index::blinktree::UpdateTask>( + core_id, this->_key, this->_value, this->_response_handler); + + request_task->annotate(this->_tree->root(), db::index::blinktree::config::node_size() / 4U); + request_task->is_readonly(true); + } + else + { + this->_tree->check(); + this->_tree->print_statistics(); + return mx::tasking::TaskResult::make_null(); + } + + return mx::tasking::TaskResult::make_succeed_and_remove(request_task); +} + +void ResponseHandler::inserted(const std::uint16_t /*core*/, const std::uint64_t key, const std::int64_t /*value*/) +{ + _server->send(_s, std::to_string(key)); + Server::free_handler_task(core_id, static_cast(this)); +} + +void ResponseHandler::updated(const std::uint16_t /*core_id*/, const std::uint64_t key, const std::int64_t /*value*/) +{ + _server-> send(_s, std::to_string(key)); + Server::free_handler_task(core_id, static_cast(this)); +} + +void ResponseHandler::removed(const std::uint16_t /*core_id*/, const std::uint64_t key) +{ + _server-> send(_s, std::to_string(key)); + Server::free_handler_task(core_id, static_cast(this)); +} + +void ResponseHandler::found(const std::uint16_t /*core_id*/, const std::uint64_t /*key*/, const std::int64_t value) +{ + _server-> send(_s, std::to_string(value)); + Server::free_handler_task(core_id, static_cast(this)); +} + +void ResponseHandler::missing(const std::uint16_t /*core_id*/, const std::uint64_t key) +{ + _server-> send(_s, std::to_string(key)); + Server::free_handler_task(core_id, static_cast(this)); +} + +Server *Server::_myself; + +ReceiveTask *Server::_receive_tasks = nullptr; + +Server::Server(Libc::Env &env, + const std::uint64_t port, + const std::uint16_t count_channels, Timer::Connection &timer, Genode::Heap &alloc) noexcept + : _port(port), _socket(nullptr), _client_sockets({nullptr}), + _count_channels(count_channels), _env{env}, _config(env, "config"), _alloc(alloc), _timer(timer), _netif(env, _alloc, _config.xml(), _wakeup_scheduler) +{ + Server::_myself = this; + this->_buffer.fill('\0'); + + _wakeup_scheduler.set_nic(&_netif); + + _receive_tasks = static_cast(mx::memory::GlobalHeap::allocate_cache_line_aligned(65536 * sizeof(ReceiveTask))); + +} + +Server::~Server() { +} + +bool Server::listen(db::index::blinktree::BLinkTree* tree) +{ + _socket = Lwip::tcp_new(); + + if (!_socket) { + Genode::error("Failed to create server socket"); + return false; + } + + Lwip::err_t rc = Lwip::tcp_bind(_socket, &Lwip::ip_addr_any, _port); + if (rc != Lwip::ERR_OK) { + Genode::error("Failed to bind server socket to port ", _port); + return false; + } + + _socket = Lwip::tcp_listen_with_backlog(_socket, 64); + Lwip::tcp_accept(_socket, &Server::_handle_tcp_connect); + + this->_tree = tree; + + return true; +} + +void Server::parse(struct Server::state *s, std::string &message) +{ + RequestTask::Type request_type; + + std::uint64_t i = s->id; + + if (message[0] == 'D') + { + auto response_handler = new (mx::memory::GlobalHeap::allocate_cache_line_aligned(sizeof(ResponseHandler))) ResponseHandler(this, s, 0); + //auto *request_task = new (&this->_request_tasks[i]) RequestTask{this->_tree, *response_handler}; + auto *request_task = mx::tasking::runtime::new_task(0, this->_tree, *response_handler); + request_task->annotate(std::uint16_t(0U)); + mx::tasking::runtime::spawn(*request_task); + } + else + { + switch (message[0]) + { + case 'I': + request_type = RequestTask::Type::Insert; + break; + case 'U': + request_type = RequestTask::Type::Update; + break; + default: + request_type = RequestTask::Type::Lookup; + } + + auto key = 0ULL; + auto index = 2U; // Skip request type and comma. + while (message[index] >= '0' && message[index] <= '9') + { + key = key * 10 + (message[index++] - '0'); + } + + auto channel_id = std::uint16_t(this->_next_worker_id.fetch_add(1U) % this->_count_channels); + if (request_type == RequestTask::Type::Insert || request_type == RequestTask::Type::Lookup) + { + auto value = 0LL; + ++index; + while (message[index] >= '0' && message[index] <= '9') + { + value = value * 10 + (message[index++] - '0'); + } + + auto response_handler = new (mx::memory::GlobalHeap::allocate_cache_line_aligned(sizeof(ResponseHandler))) ResponseHandler(this, s, channel_id); + auto *request_task = mx::tasking::runtime::new_task(channel_id, this->_tree, request_type, key, value, *response_handler); + request_task->annotate(channel_id); + mx::tasking::runtime::spawn(*request_task); + } + else + { + //auto *request_task = new (&this->_request_tasks[i]) RequestTask{this->_tree, RequestTask::Type::Lookup, key, this->_response_handlers[i]}; + auto response_handler = new (mx::memory::GlobalHeap::allocate_cache_line_aligned(sizeof(ResponseHandler))) ResponseHandler(this, s, channel_id); + auto *request_task = mx::tasking::runtime::new_task(channel_id, this->_tree, request_type, key, *response_handler); + request_task->annotate(channel_id); + mx::tasking::runtime::spawn(*request_task); + } + mx::tasking::runtime::scheduler().allocate_cores(64); + } +} + +class Send_task : public mx::tasking::TaskInterface +{ + private: + struct Server::state *_s; + std::string _message; + + public: + Send_task(Server::state *s, std::string message) : _s(s), _message(message) {} + + mx::tasking::TaskResult execute(std::uint16_t, std::uint16_t) override + { + using namespace Lwip; + Lwip::pbuf *ptr = nullptr; + + if (_s->state == Server::CLOSED || _s->state == Server::CLOSING ) { + Genode::warning("Tried to send over socket that is to be closed"); + Server::free_task(static_cast(this)); + return mx::tasking::TaskResult::make_null(); + } + + ptr = Lwip::pbuf_alloc(Lwip::PBUF_TRANSPORT, _message.length(), Lwip::PBUF_RAM); + + if (!(_s->pcb) || !_s) { + Genode::error("Tried sending over invalid pcb"); + Lwip::pbuf_free(ptr); + Server::free_task(static_cast(this)); + return mx::tasking::TaskResult::make_null(); + } + + if (!ptr) + { + Genode::error("No memory for sending packet."); + Server::free_task(static_cast(this)); + return mx::tasking::TaskResult::make_null(); + } + + Lwip::pbuf_take(ptr, _message.c_str(), _message.length()); + //ptr->payload = static_cast(const_cast(_message.c_str())); + //ptr->len = _message.length(); + + if (ptr->len > tcp_sndbuf(_s->pcb)) + Genode::warning("Not enough space in send buffer"); + + Lwip::err_t rc = Lwip::ERR_OK; + { + rc = Lwip::tcp_write(_s->pcb, ptr->payload, ptr->len, 0); + } + if (rc == Lwip::ERR_OK) + { + Lwip::tcp_output(_s->pcb); + Lwip::pbuf_free(ptr); + } else { + Genode::error("LWIP tcp_write error ", static_cast(rc)); + /*if (_s->tx == nullptr) + _s->tx = ptr; + else { + Lwip::pbuf_cat(_s->tx, ptr); + }*/ + } + Server::free_task(static_cast(this)); + return mx::tasking::TaskResult::make_null(); + } +}; + +void +Server::send(struct state *s, std::string &&message) +{ + + const auto length = std::uint64_t(message.size()); + auto response = std::string(length + sizeof(length), '\0'); + + // Write header + std::memcpy(response.data(), static_cast(&length), sizeof(length)); + + // Write data + std::memmove(response.data() + sizeof(length), message.data(), length); + + auto task = new (mx::memory::GlobalHeap::allocate_cache_line_aligned(sizeof(Send_task))) Send_task(s, response); + task->annotate(static_cast(s->channel_id)); + mx::tasking::runtime::spawn(*task); +} + +std::uint16_t Server::add_client(Lwip::tcp_pcb* client_socket) +{ + for (auto i = 0U; i < this->_client_sockets.size(); ++i) + { + if (this->_client_sockets[i] == 0U) + { + this->_client_sockets[i] = client_socket; + return i; + } + } + + return std::numeric_limits::max(); +} + +void Server::stop() noexcept +{ + this->_is_running = false; +} + +class Close_task : public mx::tasking::TaskInterface +{ + private: + Server::state &_s; + + public: + Close_task(Server::state &s) : _s(s) {} + + mx::tasking::TaskResult execute(std::uint16_t, std::uint16_t) + { + Genode::log("Closing connection for ", static_cast(_s.pcb) , " and state object ", static_cast(&_s)); + Server::tcpbtree_close(_s.pcb, &_s); + _s.state = Server::CLOSED; + Server::free_task(static_cast(this)); + Genode::log("Closed connection"); + return mx::tasking::TaskResult::make_null(); + } +}; + +/*********** + * LWIP callback function definitions + ***********/ +Lwip::err_t Server::_handle_tcp_connect(void *arg, struct Lwip::tcp_pcb *newpcb, Lwip::err_t err) +{ + + struct state *s; + + static uint64_t count_connections = 0; + + LWIP_UNUSED_ARG(arg); + + if ((err != Lwip::ERR_OK) || (newpcb == NULL)) { return Lwip::ERR_VAL; } + + if (newpcb > reinterpret_cast(0x7FFF80000000UL - sizeof(Lwip::tcp_pcb *))) + Genode::error("Non-canonical PCB address"); + + // Genode::log("Incoming request"); + + s = new (mx::memory::GlobalHeap::allocate_cache_line_aligned(sizeof(struct state))) + state(); // static_cast(Lwip::mem_malloc(sizeof(struct state))); + + if (!s) { + Genode::error("Failed to allocate state object for new connection."); + return Lwip::ERR_MEM; + } + //Genode::log("New connection #", count_connections, ": arg=", arg, " pcb=", newpcb, " s=", s, " &s=", static_cast(&s)); + + s->state = states::ACCEPTED; + s->pcb = newpcb; + s->retries = 0; + s->p = nullptr; + s->tx = nullptr; + s->channel_id = 0; //count_connections % Server::get_instance()->_count_channels; + + Lwip::tcp_backlog_accepted(newpcb); + /* Register callback functions */ + Lwip::tcp_arg(newpcb, s); + Lwip::tcp_recv(newpcb, &Server::_handle_tcp_recv); + Lwip::tcp_err(newpcb, &Server::_handle_tcp_error); + Lwip::tcp_poll(newpcb, &Server::_handle_tcp_poll, 50); + Lwip::tcp_sent(newpcb, &Server::_handle_tcp_sent); + newpcb->flags |= TF_NODELAY; + + return Lwip::ERR_OK; +} + +Lwip::err_t Server::_handle_tcp_recv(void *arg, struct Lwip::tcp_pcb *tpcb, struct Lwip::pbuf *p, Lwip::err_t err) +{ + static std::uint16_t next_receive_task = 0; + struct state *s; + Lwip::err_t rc = Lwip::ERR_OK; + + std::uint16_t next_channel_id = 0; + + s = static_cast(arg); + + if (!s) { + Lwip::pbuf_free(p); + return Lwip::ERR_ARG; + } + + if (err != Lwip::ERR_OK) { + Lwip::pbuf_free(p); + return err; + } + + if (p == nullptr) { + s->state = CLOSING; + auto task = new (mx::memory::GlobalHeap::allocate_cache_line_aligned(sizeof(Close_task))) Close_task(*s); + if (!task) { + Genode::warning("Failed to allocate close task"); + return Lwip::ERR_MEM; + } + task->annotate(static_cast(s->channel_id)); + mx::tasking::runtime::spawn(*task); + rc = Lwip::ERR_OK; + } else if (s->state == states::ACCEPTED) { + s->state == states::RECEIVED; + + // TODO: parse message and spawn request task here + rc = Lwip::ERR_OK; + { + void *payload = mx::memory::GlobalHeap::allocate_cache_line_aligned(p->len); + std::memcpy(payload, p->payload, p->len); + ReceiveTask *task = new (mx::memory::GlobalHeap::allocate_cache_line_aligned(sizeof(ReceiveTask))) ReceiveTask(s, payload, p->len); + if (!task) { + Genode::warning("Could not allocate request handler task"); + return Lwip::ERR_MEM; + } + task->annotate(static_cast(s->channel_id)); + mx::tasking::runtime::spawn(*task); + } + Lwip::tcp_recved(s->pcb, p->len); + //Server::get_instance()->send(s, "Nope"); + } else if (s->state == states::RECEIVED) { + void *payload = mx::memory::GlobalHeap::allocate_cache_line_aligned(p->len); + std::memcpy(payload, p->payload, p->len); + + ReceiveTask *task = new (mx::memory::GlobalHeap::allocate_cache_line_aligned(sizeof(ReceiveTask))) ReceiveTask(s, payload, p->len); + if (!task) { + Genode::warning("Could not allocate request handler task"); + return Lwip::ERR_MEM; + } + task->annotate(static_cast(s->channel_id)); + mx::tasking::runtime::spawn(*task); + Lwip::tcp_recved(s->pcb, p->len); + //Server::get_instance()->send(s, "Nope"); + + + rc = Lwip::ERR_OK; + } + else + { + Lwip::tcp_recved(tpcb, p->tot_len); + rc = Lwip::ERR_OK; + } + Lwip::pbuf_free(p); + + return rc; +} + +Lwip::err_t Server::_handle_tcp_poll(void *arg, struct Lwip::tcp_pcb *tpcb) +{ + Lwip::err_t rc; + struct state *s; + + //GENODE_LOG_TSC(1); + s = static_cast(arg); + + if (s) { + if (s->tx) { + rc = Lwip::tcp_write(tpcb, s->tx->payload, s->tx->len, 1); + if (rc == Lwip::ERR_OK) { + Lwip::tcp_output(tpcb); + Lwip::pbuf *ptr = s->tx; + if (ptr->next) { + s->tx = ptr->next; + Lwip::pbuf_ref(s->tx); + } + Lwip::tcp_recved(tpcb, ptr->len); + Lwip::pbuf_free(ptr); + } + // TODO: process remaning pbuf entry + } else { + /*if (s->state == states::CLOSING) { + Server::tcpbtree_close(tpcb, s); + }*/ + } + rc = Lwip::ERR_OK; + } else { + Lwip::tcp_abort(tpcb); + rc = Lwip::ERR_ABRT; + } + + return Lwip::ERR_OK; +} + + +Lwip::err_t Server::_handle_tcp_sent(void *arg, struct Lwip::tcp_pcb *tpcb, std::uint16_t len) +{ + //GENODE_LOG_TSC(1); + struct state *s = static_cast(arg); + + if (!s) + return Lwip::ERR_ARG; + + s->retries = 0; + + if (s->tx) { + Lwip::err_t rc = Lwip::tcp_write(tpcb, s->tx->payload, s->tx->len, 1); + if (rc == Lwip::ERR_OK) { + Lwip::tcp_output(tpcb); + Lwip::pbuf *ptr = s->tx; + if (ptr->next) { + s->tx = ptr->next; + Lwip::pbuf_ref(s->tx); + } + Lwip::tcp_recved(tpcb, ptr->len); + Lwip::pbuf_free(ptr); + } + tcp_sent(tpcb, &Server::_handle_tcp_sent); // Genode::log("In _handle_tcp_sent"); + } + + return Lwip::ERR_OK; +} + +mx::tasking::TaskResult application::blinktree_server::network::ReceiveTask::execute(std::uint16_t core_id, std::uint16_t channel_id) +{ + Lwip::err_t rc = Lwip::ERR_OK; + + /*rc = Lwip::tcp_write(_state->pcb, _pbuf->payload, _pbuf->len, 3); + Lwip::tcp_output(_state->pcb); + if (rc == Lwip::ERR_OK) { + Lwip::tcp_recved(_state->pcb, _pbuf->tot_len); + Lwip::pbuf_free(_pbuf); + } else if (rc == Lwip::ERR_MEM) { + Genode::warning("Out of memory"); + }*/ + + //Genode::log("Executing application task"); + //Server::get_instance()->send(_state, "Nope"); + // Server::tcp_send(_state->pcb, _state); + + std::string request = std::string(static_cast(_payload), _length); + Server::get_instance()->parse(_state, request); + + mx::memory::GlobalHeap::free(_payload); + + Server::free_task(static_cast(this)); + return mx::tasking::TaskResult::make_null(); +} \ No newline at end of file diff --git a/repos/ealanos/src/app/blinktree_server/network/server.h b/repos/ealanos/src/app/blinktree_server/network/server.h new file mode 100644 index 0000000000..5fedd46c47 --- /dev/null +++ b/repos/ealanos/src/app/blinktree_server/network/server.h @@ -0,0 +1,301 @@ +#pragma once + +#include "config.h" +#include "ealanos/memory/hamstraaja.h" +#include "mx/memory/global_heap.h" +#include +#include +#include +#include +#include +#include + +/* B-link tree includes */ +#include +#include + +/* lwIP wrapper for Genode's NIC session */ +#include +#include +#include + +/* Genode includes */ +#include +#include +#include + +/* MxTasking includes*/ +#include +#include +#include +#include +#include + +/* lwIP includes */ +namespace Lwip { + extern "C" { + #include + #include + #include + } +} + +namespace application::blinktree_server::network { + + class ResponseHandler; + class RequestTask; + class ReceiveTask; + class Server + { + public: + enum states + { + NONE = 0, + ACCEPTED, + RECEIVED, + CLOSING, + CLOSED + }; + + struct Wakeup_scheduler : Mxip::Nic_netif::Wakeup_scheduler { + Mxip::Nic_netif *nic{nullptr}; + + void schedule_nic_server_wakeup() override { nic->wakeup_nic_server(); } + + void set_nic(Mxip::Nic_netif *nic) { this->nic = nic; } + + Wakeup_scheduler() = default; + } _wakeup_scheduler; + + struct state + { + std::uint8_t state; + std::uint8_t retries; + struct Lwip::tcp_pcb *pcb; + struct Lwip::pbuf *p; + struct Lwip::pbuf *tx; + std::uint16_t channel_id; + std::uint64_t id; + }; + Server(Libc::Env &env, std::uint64_t port, + std::uint16_t count_channels, Timer::Connection &timer, Genode::Heap &alloc) noexcept; + ~Server(); + + [[nodiscard]] std::uint16_t port() const noexcept { return _port; } + void stop() noexcept; + void send(struct Server::state *s, std::string &&message); + bool listen(db::index::blinktree::BLinkTree *tree); + void parse(struct Server::state *s, std::string &message); + + [[nodiscard]] bool is_running() const noexcept { return _is_running; } + + static void tcp_send(struct Lwip::tcp_pcb *tpcb, struct state *s) + { + using namespace Lwip; + struct Lwip::pbuf *ptr; + Lwip::err_t rc = Lwip::ERR_OK; + + if (!s) + return; + + while ((rc == Lwip::ERR_OK) && (s->tx != nullptr) /* && (s->tx->len <= tcp_sndbuf(tpcb) */) + { + ptr = s->tx; + // Genode::log("Sending response"); + rc = Lwip::tcp_write(tpcb, ptr->payload, ptr->len, 1); + if (rc == Lwip::ERR_OK) + { + std::uint16_t plen; + + plen = ptr->len; + + s->tx = ptr->next; + if (s->tx != nullptr) + { + Lwip::pbuf_ref(s->tx); + } + Lwip::tcp_output(tpcb); + Lwip::pbuf_free(ptr); + } + else if (rc == Lwip::ERR_MEM) + { + Genode::warning("Low on memory. Defering to poll()"); + s->tx = ptr; + } + else + { + Genode::warning("An error ", static_cast(rc), " occured."); + } + } + } + + static void tcpbtree_close(struct Lwip::tcp_pcb *tpcb, struct state *s) + { + if (!s || s->pcb != tpcb) { + Genode::error("Tried closing connection with invalid session state"); + return; + } + Lwip::tcp_arg(tpcb, NULL); + Lwip::tcp_sent(tpcb, NULL); + Lwip::tcp_recv(tpcb, NULL); + Lwip::tcp_poll(tpcb, NULL, 0); + Lwip::tcp_err(tpcb, nullptr); + + Genode::log("Unregistered handlers"); + + Lwip::tcp_close(tpcb); + Server::tcp_free(s); + } + + /* tcp_recv */ + static Lwip::err_t _handle_tcp_recv(void *arg, struct Lwip::tcp_pcb *tpcb, struct Lwip::pbuf *p, Lwip::err_t err); + + /* tcp_err */ + static void _handle_tcp_error(void *arg, Lwip::err_t err) + { + struct state *s; + LWIP_UNUSED_ARG(err); + + s = static_cast(arg); + + Server::tcp_free(s); + } + + /* tcp_poll */ + static Lwip::err_t _handle_tcp_poll(void *arg, struct Lwip::tcp_pcb *tpcb); + + /* tcp_sent */ + static Lwip::err_t _handle_tcp_sent(void *arg, struct Lwip::tcp_pcb *tpcb, std::uint16_t len); + + /* helper function for free */ + static void tcp_free(struct state *s) + { + // Genode::log("Freeing state obj s=", s); + if (s) + { + if (s->p) + Lwip::pbuf_free(s->p); + if (s->tx) Lwip::pbuf_free(s->tx); + Genode::log("Freeing state object ", s); + mx::memory::GlobalHeap::free(s); // Lwip::mem_free(s); + Genode::log("Freed state object"); + } + } + + static Server *get_instance() { return _myself; } + + static void free_handler_task(std::uint16_t core_id, void* task) + { + mx::memory::GlobalHeap::free(task); + } + + static void free_task(void* task) + { + mx::memory::GlobalHeap::free(task); + } + + private: + static Server *_myself; + const std::uint64_t _port; + struct Lwip::tcp_pcb *_socket; + Libc::Env &_env; + + std::array _client_sockets; + std::array _buffer; + static ReceiveTask *_receive_tasks; + + alignas(64) bool _is_running = true; + alignas(64) std::atomic_uint64_t _next_worker_id{0U}; + const std::uint16_t _count_channels; + + std::uint16_t add_client(Lwip::tcp_pcb *client_socket); + + /* Genode environment for NIC session */ + Genode::Attached_rom_dataspace _config; + Genode::Heap &_alloc; + Timer::Connection &_timer; + + /* lwIP network device (NIC session wrapper) */ + Mxip::Nic_netif _netif; + db::index::blinktree::BLinkTree *_tree{nullptr}; + + /************************************************ + * lwIP callback API: TCP callback functions + ************************************************/ + + /* tcp_accept */ + static Lwip::err_t + _handle_tcp_connect(void *arg, struct Lwip::tcp_pcb *newpcb, Lwip::err_t err); + + + + /* helper function for close() */ + +}; + +class alignas(64) ResponseHandler final : public db::index::blinktree::Listener +{ +public: + ResponseHandler(Server* server, Server::state *s, std::uint16_t _core_id) : _server(server), _s(s), core_id(_core_id) { } + ResponseHandler(ResponseHandler&&) noexcept = default; + ~ResponseHandler() = default; + + void inserted(std::uint16_t core_id, const std::uint64_t key, const std::int64_t value) override; + void updated(std::uint16_t core_id, const std::uint64_t key, const std::int64_t value) override; + void removed(std::uint16_t core_id, const std::uint64_t key) override; + void found(std::uint16_t core_id, const std::uint64_t key, const std::int64_t value) override; + void missing(std::uint16_t core_id, const std::uint64_t key) override; + +private: + Server* _server; + Server::state *_s; + std::uint16_t core_id{0}; +}; + +class alignas(64) RequestTask final : public mx::tasking::TaskInterface +{ +public: + enum Type { Insert, Update, Lookup, Debug }; + + RequestTask(db::index::blinktree::BLinkTree* tree, const Type type, const std::uint64_t key, ResponseHandler& response_handler) noexcept + : _tree(tree), _type(type), _key(key), _response_handler(response_handler) { } + RequestTask(db::index::blinktree::BLinkTree* tree, const Type type, const std::uint64_t key, const std::int64_t value, ResponseHandler& response_handler) noexcept + : _tree(tree), _type(type), _key(key), _value(value), _response_handler(response_handler) { } + RequestTask(db::index::blinktree::BLinkTree* tree, ResponseHandler& response_handler) noexcept + : _tree(tree), _type(Type::Debug), _response_handler(response_handler) { } + ~RequestTask() noexcept = default; + + mx::tasking::TaskResult execute(std::uint16_t core_id, std::uint16_t channel_id) override; + +private: + db::index::blinktree::BLinkTree* _tree; + Type _type; + std::uint64_t _key; + std::uint64_t _value; + ResponseHandler& _response_handler; +}; + +class alignas(64) ReceiveTask final : public mx::tasking::TaskInterface +{ + public: + ReceiveTask(Server::state *state, void *pb, std::size_t len) : _state(state), _payload(pb), _length(len) {} + + mx::tasking::TaskResult execute(std::uint16_t core_id, std::uint16_t channel_id) override; + + private: + Server::state *_state; + void *_payload; + std::size_t _length; +}; + +class alignas(64) AcceptTask final : public mx::tasking::TaskInterface +{ + public: + AcceptTask(Lwip::tcp_pcb *newpcb) : _pcb(newpcb) {} + + mx::tasking::TaskResult execute(std::uint16_t core_id, std::uint16_t channel_id) override; + + private: + Lwip::tcp_pcb *_pcb; +}; +} // namespace mx::io::network \ No newline at end of file diff --git a/repos/ealanos/src/app/blinktree_server/request_scheduler.h b/repos/ealanos/src/app/blinktree_server/request_scheduler.h new file mode 100644 index 0000000000..677a29f4d0 --- /dev/null +++ b/repos/ealanos/src/app/blinktree_server/request_scheduler.h @@ -0,0 +1,252 @@ +#pragma once + +#include "config.h" +#include "listener.h" +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace application::blinktree_benchmark { + +class RequestIndex +{ +public: + static RequestIndex make_finished() { return RequestIndex{std::numeric_limits::max(), 0UL}; } + static RequestIndex make_no_new() { return RequestIndex{0UL, 0UL}; } + + RequestIndex(const std::uint64_t index, const std::uint64_t count) noexcept : _index(index), _count(count) {} + explicit RequestIndex(std::pair &&index_and_count) noexcept + : _index(std::get<0>(index_and_count)), _count(std::get<1>(index_and_count)) + { + } + RequestIndex(RequestIndex &&) noexcept = default; + RequestIndex(const RequestIndex &) = default; + ~RequestIndex() noexcept = default; + + RequestIndex &operator=(RequestIndex &&) noexcept = default; + + [[nodiscard]] std::uint64_t index() const noexcept { return _index; } + [[nodiscard]] std::uint64_t count() const noexcept { return _count; } + + [[nodiscard]] bool is_finished() const noexcept { return _index == std::numeric_limits::max(); } + [[nodiscard]] bool has_new() const noexcept { return _count > 0UL; } + + RequestIndex &operator-=(const std::uint64_t count) noexcept + { + _count -= count; + _index += count; + return *this; + } + +private: + std::uint64_t _index; + std::uint64_t _count; +}; + +/** + * The RequestContainer manages the workload and allocates new batches of requests + * that will be scheduled by the request scheduler. + */ +class RequestContainer +{ +public: + RequestContainer(const std::uint16_t core_id, const std::uint64_t max_open_requests, + benchmark::Workload &workload) noexcept + : _finished_requests(core_id), _local_buffer(workload.next(config::batch_size())), + _max_pending_requests(max_open_requests), _workload(workload) + { + } + + ~RequestContainer() noexcept = default; + + /** + * Allocates the next requests to spawn. + * + * @return Pair of workload-index and number of tuples to request. + * When the number is negative, no more requests are available. + */ + RequestIndex next() noexcept + { + const auto finished_requests = _finished_requests.load(); + + const auto pending_requests = _scheduled_requests - finished_requests; + if (pending_requests >= _max_pending_requests) + { + // Too many open requests somewhere in the system. + return RequestIndex::make_no_new(); + } + + if (_local_buffer.has_new() == false) + { + _local_buffer = RequestIndex{_workload.next(config::batch_size())}; + } + + if (_local_buffer.has_new()) + { + // How many requests can be scheduled without reaching the request limit? + const auto free_requests = _max_pending_requests - pending_requests; + + // Try to spawn all free requests, but at least those in the local buffer. + const auto count = std::min(free_requests, _local_buffer.count()); + + _scheduled_requests += count; + + const auto index = RequestIndex{_local_buffer.index(), count}; + _local_buffer -= count; + + return index; + } + + // Do we have to wait for pending requests or are we finished? + return pending_requests > 0UL ? RequestIndex::make_no_new() : RequestIndex::make_finished(); + } + + /** + * Callback after inserted a value. + */ + void inserted(const std::uint16_t core_id, const std::uint64_t /*key*/, const std::int64_t /*value*/) noexcept + { + task_finished(core_id); + } + + /** + * Callback after updated a value. + */ + void updated(const std::uint16_t core_id, const std::uint64_t /*key*/, const std::int64_t /*value*/) noexcept + { + task_finished(core_id); + } + + /** + * Callback after removed a value. + */ + void removed(const std::uint16_t core_id, const std::uint64_t /*key*/) noexcept { task_finished(core_id); } + + /** + * Callback after found a value. + */ + void found(const std::uint16_t core_id, const std::uint64_t /*key*/, const std::int64_t /*value*/) noexcept + { + task_finished(core_id); + } + + /** + * Callback on missing a value. + */ + void missing(const std::uint16_t core_id, const std::uint64_t /*key*/) noexcept { task_finished(core_id); } + + const benchmark::NumericTuple &operator[](const std::size_t index) const noexcept { return _workload[index]; } + +private: + // Number of requests finished by tasks. + mx::util::reference_counter_64 _finished_requests; + + // Number of tasks scheduled by the owning request scheduler. + std::uint64_t _scheduled_requests = 0UL; + + // Local buffer holding not scheduled, but from global worker owned request items. + RequestIndex _local_buffer; + + // Number of requests that can be distributed by this scheduler, + // due to system-wide maximal parallel requests. + const std::uint64_t _max_pending_requests; + + // Workload to get requests from. + benchmark::Workload &_workload; + + /** + * Updates the counter of finished requests. + */ + void task_finished(const std::uint16_t core_id) { _finished_requests.add(core_id); } +}; + +/** + * The RequestScheduler own its own request container and sets up requests for the BLink-Tree. + */ +class RequestSchedulerTask final : public mx::tasking::TaskInterface +{ +public: + RequestSchedulerTask(const std::uint16_t core_id, const std::uint16_t channel_id, benchmark::Workload &workload, + const mx::util::core_set &core_set, + db::index::blinktree::BLinkTree *tree, Listener *listener) + : _tree(tree), _listener(listener) + { + this->annotate(mx::tasking::priority::low); + this->is_readonly(false); + + const auto container = mx::tasking::runtime::new_resource( + sizeof(RequestContainer), mx::resource::hint{channel_id}, core_id, + config::max_parallel_requests() / core_set.size(), workload); + this->annotate(container, sizeof(RequestContainer)); + } + + ~RequestSchedulerTask() final = default; + + mx::tasking::TaskResult execute(const std::uint16_t core_id, const std::uint16_t channel_id) override + { + // Get some new requests from the container. + auto &request_container = *mx::resource::ptr_cast(this->annotated_resource()); + const auto next_requests = request_container.next(); + + if (next_requests.has_new()) + { + for (auto i = next_requests.index(); i < next_requests.index() + next_requests.count(); ++i) + { + mx::tasking::TaskInterface *task{nullptr}; + const auto &tuple = request_container[i]; + if (tuple == benchmark::NumericTuple::INSERT) + { + task = mx::tasking::runtime::new_task< + db::index::blinktree::InsertValueTask>( + core_id, tuple.key(), tuple.value(), _tree, request_container); + task->is_readonly(_tree->height() > 1U); + } + else if (tuple == benchmark::NumericTuple::LOOKUP) + { + task = mx::tasking::runtime::new_task< + db::index::blinktree::LookupTask>( + core_id, tuple.key(), request_container); + + task->is_readonly(true); + } + else if (tuple == benchmark::NumericTuple::UPDATE) + { + task = mx::tasking::runtime::new_task< + db::index::blinktree::UpdateTask>( + core_id, tuple.key(), tuple.value(), request_container); + task->is_readonly(_tree->height() > 1U); + } + + task->annotate(_tree->root(), db::index::blinktree::config::node_size() / 4U); + mx::tasking::runtime::spawn(*task, channel_id); + } + } + else if (next_requests.is_finished()) + { + // All requests are done. Notify the benchmark and die. + _listener->requests_finished(); + mx::tasking::runtime::delete_resource(this->annotated_resource()); + return mx::tasking::TaskResult::make_remove(); + } + + return mx::tasking::TaskResult::make_succeed(this); + } + +private: + // The tree to send requests to. + db::index::blinktree::BLinkTree *_tree; + + // Benchmark listener to notify on requests are done. + Listener *_listener; +}; +} // namespace application::blinktree_benchmark \ No newline at end of file diff --git a/repos/ealanos/src/app/blinktree_server/server.cpp b/repos/ealanos/src/app/blinktree_server/server.cpp new file mode 100644 index 0000000000..efa5d38801 --- /dev/null +++ b/repos/ealanos/src/app/blinktree_server/server.cpp @@ -0,0 +1,50 @@ +#include "server.h" +#include "mx/memory/global_heap.h" +#include "network/server.h" +#include +#include +#include +#include +#include +#include + +using namespace application::blinktree_server; + +Server::Server(const std::uint64_t port, mx::util::core_set &&cores, const std::uint16_t prefetch_distance, const mx::synchronization::isolation_level node_isolation_level, const mx::synchronization::protocol preferred_synchronization_method) + : _port(port), _cores(std::move(cores)), _prefetch_distance(prefetch_distance), _node_isolation_level(node_isolation_level), _preferred_synchronization_method(preferred_synchronization_method) +{ +} + +void Server::run() +{ + network::Server* server; + + Libc::Env &env = mx::system::Environment::env(); + mx::tasking::runtime::init(env, this->_cores, this->_prefetch_distance, /* use mx tasking's task allocator*/ false); + + this->_tree = std::make_unique>( + this->_node_isolation_level, this->_preferred_synchronization_method); + + + static mx::memory::dynamic::Allocator *alloc = new (mx::memory::GlobalHeap::allocate_cache_line_aligned(sizeof(mx::memory::dynamic::Allocator))) mx::memory::dynamic::Allocator(); + + static Timer::Connection timer{env}; + + static Genode::Heap _alloc{env.ram(), env.rm()}; + + Mxip::mxip_init(*mx::memory::GlobalHeap::_alloc, timer); + server = new network::Server{env, this->_port, mx::tasking::runtime::channels(), timer, _alloc}; + + std::cout << "Waiting for requests on port :" << this->_port << std::endl; + auto network_thread = std::thread{[server, tree = this->_tree.get()]() { + server->listen(tree); + }}; + mx::tasking::runtime::start_and_wait(); + + + + network_thread.join(); + + + //delete server; +} \ No newline at end of file diff --git a/repos/ealanos/src/app/blinktree_server/server.h b/repos/ealanos/src/app/blinktree_server/server.h new file mode 100644 index 0000000000..af0d868cbf --- /dev/null +++ b/repos/ealanos/src/app/blinktree_server/server.h @@ -0,0 +1,30 @@ +#pragma once + +#include +#include + +namespace application::blinktree_server { +class Server +{ +public: + Server(std::uint64_t port, mx::util::core_set&& cores, std::uint16_t prefetch_distance, mx::synchronization::isolation_level node_isolation_level, mx::synchronization::protocol preferred_synchronization_method); + + void run(); +private: + const std::uint64_t _port; + + const std::uint16_t _prefetch_distance; + + /// Cores. + mx::util::core_set _cores; + + // The synchronization mechanism to use for tree nodes. + const mx::synchronization::isolation_level _node_isolation_level; + + // Preferred synchronization method. + const mx::synchronization::protocol _preferred_synchronization_method; + + /// Tree. + std::unique_ptr> _tree; +}; +} \ No newline at end of file diff --git a/repos/ealanos/src/app/blinktree_server/target.mk b/repos/ealanos/src/app/blinktree_server/target.mk new file mode 100644 index 0000000000..3255569010 --- /dev/null +++ b/repos/ealanos/src/app/blinktree_server/target.mk @@ -0,0 +1,37 @@ +MXINC_DIR=$(REP_DIR)/src/app/blinktree_server +MXINC_DIR+=-I$(REP_DIR)/src/app/blinktree +GENODE_GCC_TOOLCHAIN_DIR ?= /usr/local/genode/tool/23.05 +MXBENCH_DIR=$(REP_DIR)/src/lib + +TARGET = blinktree_daemon +# soure file for benchmark framework +SRC_MXBENCH = ${MXBENCH_DIR}/benchmark/workload_set.cpp +SRC_MXBENCH += ${MXBENCH_DIR}/benchmark/workload.cpp +SRC_MXBENCH += ${MXBENCH_DIR}/benchmark/cores.cpp +SRC_MXBENCH += ${MXBENCH_DIR}/benchmark/string_util.cpp +SRC_MXBENCH += ${MXBENCH_DIR}/benchmark/perf.cpp +# source files for blinktree benchmark +SRC_BTREE += main.cpp +SRC_BTREE += server.cpp +SRC_BTREE += network/server.cpp + +INC_DIR += /usr/local/genode/tool/lib/clang/14.0.5/include/ +INC_DIR += $(REP_DIR)/src/lib +INC_DIR += $(REP_DIR)/include +INC_DIR += $(REP_DIR)/include/ealanos/util +INC_DIR += $(call select_from_repositories,src/lib/libc) +INC_DIR += $(call select_from_repositories,src/lib/libc)/spec/x86_64 +vpath %.h ${INC_DIR} +LD_OPT += --allow-multiple-definition + +SRC_CC = ${SRC_MXBENCH} ${SRC_BTREE} +LIBS += base libc stdcxx mxtasking mxip +EXT_OBJECTS += /usr/local/genode/tool/lib/libatomic.a /usr/local/genode/tool/23.05/lib/gcc/x86_64-pc-elf/12.3.0/libgcc_eh.a /usr/local/genode/tool/lib/clang/14.0.5/lib/linux/libclang_rt.builtins-x86_64.a +CUSTOM_CC = /usr/local/genode/tool/bin/clang +CUSTOM_CXX = /usr/local/genode/tool/bin/clang++ +CC_OPT := --target=x86_64-genode --sysroot=/does/not/exist --gcc-toolchain=$(GENODE_GCC_TOOLCHAIN_DIR) -Wno-error -g -DNDEBUG -I$(MXINC_DIR) -std=c++20 #-D_GLIBCXX_ATOMIC_BUILTINS_8 -D__GCC_HAVE_SYNC_COMPARE_AND_SWAP_8 +CC_OPT += -I$(MXBENCH_DIR) +CC_OLEVEL = -O3 +CC_CXX_WARN_STRICT = +CUSTOM_CXX_LIB := $(CROSS_DEV_PREFIX)g++ +#CXX_LD += $(CROSS_DEV_PREFIX)g++