Use MxIP for blinktree server.

This commit is contained in:
Michael Mueller
2025-01-21 15:19:02 +01:00
parent 0eacac72e5
commit dd41406bcd
4 changed files with 640 additions and 190 deletions

View File

@@ -1,13 +1,14 @@
#include "server.h"
#include <limits>
#include <mx/tasking/runtime.h>
#include <netinet/in.h>
#include <sys/socket.h>
#include <unistd.h>
#include <db/index/blinktree/lookup_task.h>
#include <db/index/blinktree/insert_value_task.h>
#include <db/index/blinktree/update_task.h>
#include <mx/system/topology.h>
#include <nova/syscall-generic.h>
#include <nova/syscalls.h>
#include <iostream>
using namespace application::blinktree_server::network;
@@ -49,203 +50,219 @@ mx::tasking::TaskResult RequestTask::execute(const std::uint16_t core_id, const
return mx::tasking::TaskResult::make_null();
}
return mx::tasking::TaskResult::make_succeed(request_task);
return mx::tasking::TaskResult::make_succeed_and_remove(request_task);
}
void ResponseHandler::inserted(const std::uint16_t /*core_id*/, const std::uint64_t key, const std::int64_t /*value*/)
void ResponseHandler::inserted(const std::uint16_t /*core*/, const std::uint64_t key, const std::int64_t /*value*/)
{
_server-> send(_client_id, std::to_string(key));
_server->send(_s, std::to_string(key));
Server::free_handler_task(core_id, static_cast<void *>(this));
}
void ResponseHandler::updated(const std::uint16_t /*core_id*/, const std::uint64_t key, const std::int64_t /*value*/)
{
_server-> send(_client_id, std::to_string(key));
_server-> send(_s, std::to_string(key));
Server::free_handler_task(core_id, static_cast<void *>(this));
}
void ResponseHandler::removed(const std::uint16_t /*core_id*/, const std::uint64_t key)
{
_server-> send(_client_id, std::to_string(key));
_server-> send(_s, std::to_string(key));
Server::free_handler_task(core_id, static_cast<void *>(this));
}
void ResponseHandler::found(const std::uint16_t /*core_id*/, const std::uint64_t /*key*/, const std::int64_t value)
{
_server-> send(_client_id, std::to_string(value));
_server-> send(_s, std::to_string(value));
Server::free_handler_task(core_id, static_cast<void *>(this));
}
void ResponseHandler::missing(const std::uint16_t /*core_id*/, const std::uint64_t key)
{
_server-> send(_client_id, std::to_string(key));
_server-> send(_s, std::to_string(key));
Server::free_handler_task(core_id, static_cast<void *>(this));
}
Server *Server::_myself;
ReceiveTask *Server::_receive_tasks = nullptr;
Server::Server(const std::uint64_t port,
const std::uint16_t count_channels) noexcept
: _port(port), _socket(-1), _client_sockets({0U}),
_count_channels(count_channels)
Server::Server(Libc::Env &env,
const std::uint64_t port,
const std::uint16_t count_channels, Timer::Connection &timer, Genode::Heap &alloc) noexcept
: _port(port), _socket(nullptr), _client_sockets({nullptr}),
_count_channels(count_channels), _env{env}, _config(env, "config"), _alloc(alloc), _timer(timer), _netif(env, _alloc, _config.xml())
{
Server::_myself = this;
this->_buffer.fill('\0');
this->_response_handlers = reinterpret_cast<ResponseHandler*>(std::malloc(sizeof(ResponseHandler) * config::max_connections()));
for (auto client_id = 0U; client_id < config::max_connections(); ++client_id) {
new (&this->_response_handlers[client_id]) ResponseHandler{this, client_id};
}
_receive_tasks = static_cast<ReceiveTask*>(mx::memory::GlobalHeap::allocate_cache_line_aligned(65536 * sizeof(ReceiveTask)));
this->_request_tasks = reinterpret_cast<RequestTask*>(std::malloc( sizeof(RequestTask) * config::max_connections()));
_handler_allocator.reset(new (mx::memory::GlobalHeap::allocate_cache_line_aligned(sizeof(mx::memory::dynamic::Allocator))) mx::memory::dynamic::Allocator());
_task_allocator.reset(new (mx::memory::GlobalHeap::allocate_cache_line_aligned(sizeof(mx::memory::dynamic::Allocator))) mx::memory::dynamic::Allocator());
}
Server::~Server() {
::free(this->_response_handlers);
::free(this->_request_tasks);
}
bool Server::listen(db::index::blinktree::BLinkTree<std::uint64_t, std::int64_t>* tree)
{
this->_socket = socket(AF_INET, SOCK_STREAM, 0);
if (this->_socket == 0)
{
_socket = Lwip::tcp_new();
if (!_socket) {
Genode::error("Failed to create server socket");
return false;
}
auto option = std::int32_t{1};
if (setsockopt(this->_socket, SOL_SOCKET, SO_REUSEADDR, &option, socklen_t{sizeof(std::int32_t)}) < 0)
{
Lwip::err_t rc = Lwip::tcp_bind(_socket, &Lwip::ip_addr_any, _port);
if (rc != Lwip::ERR_OK) {
Genode::error("Failed to bind server socket to port ", _port);
return false;
}
auto address = sockaddr_in{};
address.sin_family = AF_INET;
address.sin_addr.s_addr = INADDR_ANY;
address.sin_port = htons(this->_port);
_socket = Lwip::tcp_listen_with_backlog(_socket, 64);
Lwip::tcp_accept(_socket, &Server::_handle_tcp_connect);
if (bind(this->_socket, reinterpret_cast<sockaddr *>(&address), sizeof(sockaddr_in)) < 0)
{
return false;
}
if (::listen(this->_socket, 1024) < 0)
{
return false;
}
auto address_length = socklen_t{sizeof(sockaddr_in)};
auto socket_descriptors = fd_set{};
auto max_socket_descriptor = this->_socket;
auto client_socket = std::int32_t{-1};
while (this->_is_running)
{
FD_ZERO(&socket_descriptors); // NOLINT
FD_SET(this->_socket, &socket_descriptors);
for (auto &socket_descriptor : this->_client_sockets)
{
if (socket_descriptor > 0)
{
FD_SET(socket_descriptor, &socket_descriptors);
}
max_socket_descriptor = std::max(max_socket_descriptor, std::int32_t(socket_descriptor));
}
auto timeout = timeval{};
timeout.tv_usec = 10000;
const auto count_ready_selectors =
select(max_socket_descriptor + 1, &socket_descriptors, nullptr, nullptr, &timeout);
if (count_ready_selectors > 0)
{
if (FD_ISSET(this->_socket, &socket_descriptors))
{
if ((client_socket = accept(this->_socket, reinterpret_cast<sockaddr *>(&address), &address_length)) <
0)
{
return false;
}
this->add_client(client_socket);
}
for (auto i = 0U; i < this->_client_sockets.size(); ++i)
{
const auto client = this->_client_sockets[i];
if (FD_ISSET(client, &socket_descriptors))
{
const auto read_bytes = read(client, this->_buffer.data(), this->_buffer.size());
if (read_bytes == 0U)
{
::close(client);
this->_client_sockets[i] = 0U;
}
else
{
// Copy incoming data locally.
RequestTask::Type request_type;
auto message = std::string(this->_buffer.data(), read_bytes);
if (message[0] == 'D')
{
auto *request_task = new (&this->_request_tasks[i]) RequestTask{tree, this->_response_handlers[i]};
request_task->annotate(std::uint16_t(0U));
mx::tasking::runtime::spawn(*request_task);
}
else
{
switch(message[0])
{
case 'I': request_type = RequestTask::Type::Insert; break;
case 'U': request_type = RequestTask::Type::Update; break;
default: request_type = RequestTask::Type::Lookup;
}
auto key = 0ULL;
auto index = 2U; // Skip request type and comma.
while(message[index] >= '0' && message[index] <= '9') {
key = key * 10 + (message[index++] - '0');
}
auto channel_id = std::uint16_t(this->_next_worker_id.fetch_add(1U) % this->_count_channels);
if (request_type == RequestTask::Type::Insert || request_type == RequestTask::Type::Lookup)
{
auto value = 0LL;
++index;
while (message[index] >= '0' && message[index] <= '9')
{
value = value * 10 + (message[index++] - '0');
}
auto *request_task = new (&this->_request_tasks[i]) RequestTask{tree, request_type, key, value, this->_response_handlers[i]};
request_task->annotate(channel_id);
mx::tasking::runtime::spawn(*request_task);
}
else
{
auto *request_task = new (&this->_request_tasks[i]) RequestTask{tree, RequestTask::Type::Lookup, key, this->_response_handlers[i]};
request_task->annotate(channel_id);
mx::tasking::runtime::spawn(*request_task);
}
//mx::tasking::runtime::scheduler().allocate_cores(64);
}
}
}
}
}
}
for (const auto client : this->_client_sockets)
{
if (client > 0)
{
::close(client);
}
}
::close(this->_socket);
this->_tree = tree;
return true;
}
void Server::send(const std::uint32_t client_id, std::string &&message)
void Server::parse(struct Server::state *s, std::string &message)
{
RequestTask::Type request_type;
std::uint64_t i = s->id;
if (message[0] == 'D')
{
auto response_handler = new (_handler_allocator->allocate(0, 64, sizeof(ResponseHandler))) ResponseHandler(this, s, 0);
//auto *request_task = new (&this->_request_tasks[i]) RequestTask{this->_tree, *response_handler};
auto *request_task = mx::tasking::runtime::new_task<RequestTask>(0, this->_tree, *response_handler);
request_task->annotate(std::uint16_t(0U));
mx::tasking::runtime::spawn(*request_task);
}
else
{
switch (message[0])
{
case 'I':
request_type = RequestTask::Type::Insert;
break;
case 'U':
request_type = RequestTask::Type::Update;
break;
default:
request_type = RequestTask::Type::Lookup;
}
auto key = 0ULL;
auto index = 2U; // Skip request type and comma.
while (message[index] >= '0' && message[index] <= '9')
{
key = key * 10 + (message[index++] - '0');
}
auto channel_id = std::uint16_t(this->_next_worker_id.fetch_add(1U) % this->_count_channels);
if (request_type == RequestTask::Type::Insert || request_type == RequestTask::Type::Lookup)
{
auto value = 0LL;
++index;
while (message[index] >= '0' && message[index] <= '9')
{
value = value * 10 + (message[index++] - '0');
}
auto response_handler = new (_handler_allocator->allocate(mx::system::topology::node_id(channel_id), 64, sizeof(ResponseHandler))) ResponseHandler(this, s, channel_id);
auto *request_task = mx::tasking::runtime::new_task<RequestTask>(channel_id, this->_tree, request_type, key, value, *response_handler);
request_task->annotate(channel_id);
mx::tasking::runtime::spawn(*request_task);
}
else
{
//auto *request_task = new (&this->_request_tasks[i]) RequestTask{this->_tree, RequestTask::Type::Lookup, key, this->_response_handlers[i]};
auto response_handler = new (_handler_allocator->allocate(mx::system::topology::node_id(channel_id), 64, sizeof(ResponseHandler))) ResponseHandler(this, s, channel_id);
auto *request_task = mx::tasking::runtime::new_task<RequestTask>(channel_id, this->_tree, request_type, key, *response_handler);
request_task->annotate(channel_id);
mx::tasking::runtime::spawn(*request_task);
}
mx::tasking::runtime::scheduler().allocate_cores(64);
}
}
class Send_task : public mx::tasking::TaskInterface
{
private:
struct Server::state *_s;
std::string _message;
public:
Send_task(Server::state *s, std::string message) : _s(s), _message(message) {}
mx::tasking::TaskResult execute(std::uint16_t, std::uint16_t) override
{
using namespace Lwip;
Lwip::pbuf *ptr = nullptr;
if (_s->state == Server::CLOSED || _s->state == Server::CLOSING) {
Genode::warning("Tried to send over socket that is to be closed");
Server::free_task(static_cast<void *>(this));
return mx::tasking::TaskResult::make_null();
}
ptr = Lwip::pbuf_alloc(Lwip::PBUF_TRANSPORT, _message.length(), Lwip::PBUF_RAM);
if (!(_s->pcb) || !_s) {
Genode::error("Tried sending over invalid pcb");
Server::free_task(static_cast<void *>(this));
return mx::tasking::TaskResult::make_null();
}
if (!ptr)
{
Genode::error("No memory for sending packet.");
Server::free_task(static_cast<void *>(this));
return mx::tasking::TaskResult::make_null();
}
if (ptr >= reinterpret_cast<void *>(0x7FFF80000000UL) || _s->pcb >= reinterpret_cast<void *>(0x7FFF80000000UL))
{
Genode::error("Allocated buffer or pcb is at non-canonical address. Aborting. ptr=", static_cast<void*>(ptr), " pcb=", static_cast<void*>(_s->pcb), " s=", static_cast<void*>(_s));
Server::free_task(static_cast<void *>(this));
return mx::tasking::TaskResult::make_null();
}
ptr->payload = static_cast<void *>(const_cast<char *>(_message.c_str()));
ptr->len = _message.length();
if (ptr->len > tcp_sndbuf(_s->pcb))
Genode::warning("Not enough space in send buffer");
Lwip::err_t rc = Lwip::ERR_OK;
{
rc = Lwip::tcp_write(_s->pcb, ptr->payload, ptr->len, TCP_WRITE_FLAG_COPY);
}
if (rc == Lwip::ERR_OK)
{
Lwip::tcp_output(_s->pcb);
Lwip::pbuf_free(ptr);
} else {
if (_s->tx == nullptr)
_s->tx = ptr;
else {
Lwip::pbuf_cat(_s->tx, ptr);
}
}
Server::free_task(static_cast<void *>(this));
return mx::tasking::TaskResult::make_null();
}
};
void
Server::send(struct state *s, std::string &&message)
{
const auto length = std::uint64_t(message.size());
auto response = std::string(length + sizeof(length), '\0');
@@ -255,10 +272,12 @@ void Server::send(const std::uint32_t client_id, std::string &&message)
// Write data
std::memmove(response.data() + sizeof(length), message.data(), length);
::send(this->_client_sockets[client_id], response.c_str(), response.length(), 0);
auto task = new (Server::get_instance()->_task_allocator->allocate(0, 64, sizeof(Send_task))) Send_task(s, response);
task->annotate(static_cast<mx::tasking::TaskInterface::channel>(s->channel_id));
mx::tasking::runtime::spawn(*task);
}
std::uint16_t Server::add_client(const std::int32_t client_socket)
std::uint16_t Server::add_client(Lwip::tcp_pcb* client_socket)
{
for (auto i = 0U; i < this->_client_sockets.size(); ++i)
{
@@ -275,4 +294,222 @@ std::uint16_t Server::add_client(const std::int32_t client_socket)
void Server::stop() noexcept
{
this->_is_running = false;
}
class Close_task : public mx::tasking::TaskInterface
{
private:
Server::state &_s;
public:
Close_task(Server::state &s) : _s(s) {}
mx::tasking::TaskResult execute(std::uint16_t, std::uint16_t)
{
Genode::log("Closing connection for ", static_cast<void *>(_s.pcb) , " and state object ", static_cast<void*>(&_s));
Server::tcpbtree_close(_s.pcb, &_s);
_s.state = Server::CLOSED;
Server::free_task(static_cast<void *>(this));
return mx::tasking::TaskResult::make_null();
}
};
/***********
* LWIP callback function definitions
***********/
Lwip::err_t Server::_handle_tcp_connect(void *arg, struct Lwip::tcp_pcb *newpcb, Lwip::err_t err)
{
struct state *s;
static uint64_t count_connections = 0;
LWIP_UNUSED_ARG(arg);
if ((err != Lwip::ERR_OK) || (newpcb == NULL)) {
return Lwip::ERR_VAL;
}
//Genode::log("Incoming request");
s = new (Lwip::mem_malloc(sizeof(struct state))) state(); // static_cast<struct state *>(Lwip::mem_malloc(sizeof(struct state)));
if (!s) {
Genode::error("Failed to allocate state object for new connection.");
return Lwip::ERR_MEM;
}
//Genode::log("New connection #", count_connections, ": arg=", arg, " pcb=", newpcb, " s=", s, " &s=", static_cast<void*>(&s));
s->state = states::ACCEPTED;
s->pcb = newpcb;
s->retries = 0;
s->p = nullptr;
s->tx = nullptr;
s->channel_id = 0; //count_connections % Server::get_instance()->_count_channels;
Lwip::tcp_backlog_accepted(newpcb);
/* Register callback functions */
Lwip::tcp_arg(newpcb, s);
Lwip::tcp_recv(newpcb, &Server::_handle_tcp_recv);
Lwip::tcp_err(newpcb, &Server::_handle_tcp_error);
Lwip::tcp_poll(newpcb, &Server::_handle_tcp_poll, 50);
Lwip::tcp_sent(newpcb, &Server::_handle_tcp_sent);
newpcb->flags |= TF_NODELAY;
return Lwip::ERR_OK;
}
Lwip::err_t Server::_handle_tcp_recv(void *arg, struct Lwip::tcp_pcb *tpcb, struct Lwip::pbuf *p, Lwip::err_t err)
{
static std::uint16_t next_receive_task = 0;
struct state *s;
Lwip::err_t rc = Lwip::ERR_OK;
std::uint16_t next_channel_id = 0;
s = static_cast<struct state*>(arg);
if (err != Lwip::ERR_OK) {
return err;
}
if (p == nullptr) {
s->state = CLOSING;
auto task = new (Server::get_instance()->_task_allocator->allocate(0, 64, sizeof(Close_task))) Close_task(*s);
if (!task) {
Genode::warning("Failed to allocate close task");
return Lwip::ERR_MEM;
}
task->annotate(static_cast<mx::tasking::TaskInterface::channel>(s->channel_id));
mx::tasking::runtime::spawn(*task);
Lwip::pbuf_free(p);
rc = Lwip::ERR_OK;
} else if (err != Lwip::ERR_OK) {
rc = err;
} else if (s->state == states::ACCEPTED) {
s->state == states::RECEIVED;
// TODO: parse message and spawn request task here
rc = Lwip::ERR_OK;
{
ReceiveTask *task = new (Server::get_instance()->_task_allocator->allocate(0, 64, sizeof(ReceiveTask))) ReceiveTask(s, p);
if (!task) {
Genode::warning("Could not allocate request handler task");
return Lwip::ERR_MEM;
}
task->annotate(static_cast<mx::tasking::TaskInterface::channel>(s->channel_id));
mx::tasking::runtime::spawn(*task);
}
Lwip::tcp_recved(s->pcb, p->len);
//Server::get_instance()->send(s, "Nope");
}
else if (s->state == states::RECEIVED)
{
ReceiveTask *task = new (Server::get_instance()->_task_allocator->allocate(0, 64, sizeof(ReceiveTask))) ReceiveTask(s, p);
if (!task) {
Genode::warning("Could not allocate request handler task");
return Lwip::ERR_MEM;
}
task->annotate(static_cast<mx::tasking::TaskInterface::channel>(s->channel_id));
mx::tasking::runtime::spawn(*task);
Lwip::tcp_recved(s->pcb, p->len);
//Server::get_instance()->send(s, "Nope");
rc = Lwip::ERR_OK;
}
else
{
Lwip::tcp_recved(tpcb, p->tot_len);
Lwip::pbuf_free(p);
rc = Lwip::ERR_OK;
}
return rc;
}
Lwip::err_t Server::_handle_tcp_poll(void *arg, struct Lwip::tcp_pcb *tpcb)
{
Lwip::err_t rc;
struct state *s;
//GENODE_LOG_TSC(1);
s = static_cast<struct state *>(arg);
if (s) {
if (s->tx) {
rc = Lwip::tcp_write(tpcb, s->tx->payload, s->tx->len, 1);
if (rc == Lwip::ERR_OK) {
Lwip::tcp_output(tpcb);
Lwip::pbuf *ptr = s->tx;
if (ptr->next) {
s->tx = ptr->next;
Lwip::pbuf_ref(s->tx);
}
Lwip::tcp_recved(tpcb, ptr->len);
Lwip::pbuf_free(ptr);
}
// TODO: process remaning pbuf entry
} else {
/*if (s->state == states::CLOSING) {
Server::tcpbtree_close(tpcb, s);
}*/
}
rc = Lwip::ERR_OK;
} else {
Lwip::tcp_abort(tpcb);
rc = Lwip::ERR_ABRT;
}
return Lwip::ERR_OK;
}
Lwip::err_t Server::_handle_tcp_sent(void *arg, struct Lwip::tcp_pcb *tpcb, std::uint16_t len)
{
//GENODE_LOG_TSC(1);
struct state *s = static_cast<struct state *>(arg);
s->retries = 0;
if (s->tx) {
Lwip::err_t rc = Lwip::tcp_write(tpcb, s->tx->payload, s->tx->len, 1);
if (rc == Lwip::ERR_OK) {
Lwip::tcp_output(tpcb);
Lwip::pbuf *ptr = s->tx;
if (ptr->next) {
s->tx = ptr->next;
Lwip::pbuf_ref(s->tx);
}
Lwip::tcp_recved(tpcb, ptr->len);
Lwip::pbuf_free(ptr);
}
tcp_sent(tpcb, &Server::_handle_tcp_sent); // Genode::log("In _handle_tcp_sent");
}
return Lwip::ERR_OK;
}
mx::tasking::TaskResult application::blinktree_server::network::ReceiveTask::execute(std::uint16_t core_id, std::uint16_t channel_id)
{
Lwip::err_t rc = Lwip::ERR_OK;
/*rc = Lwip::tcp_write(_state->pcb, _pbuf->payload, _pbuf->len, 3);
Lwip::tcp_output(_state->pcb);
if (rc == Lwip::ERR_OK) {
Lwip::tcp_recved(_state->pcb, _pbuf->tot_len);
Lwip::pbuf_free(_pbuf);
} else if (rc == Lwip::ERR_MEM) {
Genode::warning("Out of memory");
}*/
//Genode::log("Executing application task");
//Server::get_instance()->send(_state, "Nope");
// Server::tcp_send(_state->pcb, _state);
std::string request = std::string(static_cast<char*>(_pbuf->payload), _pbuf->len);
Server::get_instance()->parse(_state, request);
Lwip::pbuf_free(_pbuf);
Server::free_task(static_cast<void *>(this));
return mx::tasking::TaskResult::make_null();
}

View File

@@ -5,22 +5,225 @@
#include <cstddef>
#include <cstdint>
#include <memory>
#include <mx/memory/fixed_size_allocator.h>
#include <mx/tasking/config.h>
#include <mx/tasking/scheduler.h>
#include <mx/util/core_set.h>
#include <optional>
#include <string>
/* B-link tree includes */
#include <db/index/blinktree/b_link_tree.h>
#include <db/index/blinktree/listener.h>
/* lwIP wrapper for Genode's NIC session */
#include <mxip/mxnic_netif.h>
#include <mxip/genode_init.h>
#include <libc/component.h>
/* Genode includes */
#include <timer_session/connection.h>
#include <base/heap.h>
#include <base/attached_rom_dataspace.h>
/* MxTasking includes*/
#include <mx/memory/fixed_size_allocator.h>
#include <mx/memory/dynamic_size_allocator.h>
#include <mx/tasking/config.h>
#include <mx/tasking/scheduler.h>
#include <mx/util/core_set.h>
/* lwIP includes */
namespace Lwip {
extern "C" {
#include <lwip/opt.h>
#include <lwip/tcp.h>
#include <lwip/ip_addr.h>
}
}
namespace application::blinktree_server::network {
class Server;
class ResponseHandler;
class RequestTask;
class ReceiveTask;
class Server
{
public:
enum states
{
NONE = 0,
ACCEPTED,
RECEIVED,
CLOSING,
CLOSED
};
struct state
{
std::uint8_t state;
std::uint8_t retries;
struct Lwip::tcp_pcb *pcb;
struct Lwip::pbuf *p;
struct Lwip::pbuf *tx;
std::uint16_t channel_id;
std::uint64_t id;
};
Server(Libc::Env &env, std::uint64_t port,
std::uint16_t count_channels, Timer::Connection &timer, Genode::Heap &alloc) noexcept;
~Server();
[[nodiscard]] std::uint16_t port() const noexcept { return _port; }
void stop() noexcept;
void send(struct Server::state *s, std::string &&message);
bool listen(db::index::blinktree::BLinkTree<std::uint64_t, std::int64_t> *tree);
void parse(struct Server::state *s, std::string &message);
[[nodiscard]] bool is_running() const noexcept { return _is_running; }
static void tcp_send(struct Lwip::tcp_pcb *tpcb, struct state *s)
{
using namespace Lwip;
struct Lwip::pbuf *ptr;
Lwip::err_t rc = Lwip::ERR_OK;
while ((rc == Lwip::ERR_OK) && (s->tx != nullptr) /* && (s->tx->len <= tcp_sndbuf(tpcb) */)
{
ptr = s->tx;
// Genode::log("Sending response");
rc = Lwip::tcp_write(tpcb, ptr->payload, ptr->len, 1);
if (rc == Lwip::ERR_OK)
{
std::uint16_t plen;
plen = ptr->len;
s->tx = ptr->next;
if (s->tx != nullptr)
{
Lwip::pbuf_ref(s->tx);
}
Lwip::tcp_output(tpcb);
Lwip::pbuf_free(ptr);
}
else if (rc == Lwip::ERR_MEM)
{
Genode::warning("Low on memory. Defering to poll()");
s->tx = ptr;
}
else
{
Genode::warning("An error ", static_cast<unsigned>(rc), " occured.");
}
}
}
static void tcpbtree_close(struct Lwip::tcp_pcb *tpcb, struct state *s)
{
if (s->pcb != tpcb) {
Genode::error("Tried closing connection with invalid session state");
return;
}
Lwip::tcp_arg(tpcb, NULL);
Lwip::tcp_sent(tpcb, NULL);
Lwip::tcp_recv(tpcb, NULL);
Lwip::tcp_poll(tpcb, NULL, 0);
Lwip::tcp_err(tpcb, nullptr);
Server::tcp_free(s);
Lwip::tcp_close(tpcb);
}
/* tcp_recv */
static Lwip::err_t _handle_tcp_recv(void *arg, struct Lwip::tcp_pcb *tpcb, struct Lwip::pbuf *p, Lwip::err_t err);
/* tcp_err */
static void _handle_tcp_error(void *arg, Lwip::err_t err)
{
struct state *s;
LWIP_UNUSED_ARG(err);
s = static_cast<state *>(arg);
Server::tcp_free(s);
}
/* tcp_poll */
static Lwip::err_t _handle_tcp_poll(void *arg, struct Lwip::tcp_pcb *tpcb);
/* tcp_sent */
static Lwip::err_t _handle_tcp_sent(void *arg, struct Lwip::tcp_pcb *tpcb, std::uint16_t len);
/* helper function for free */
static void tcp_free(struct state *s)
{
// Genode::log("Freeing state obj s=", s);
if (s)
{
if (s->p)
Lwip::pbuf_free(s->p);
if (s->tx)
Lwip::pbuf_free(s->tx);
delete s; // Lwip::mem_free(s);
}
}
static Server *get_instance() { return _myself; }
static void free_handler_task(std::uint16_t core_id, void* task)
{
Server::get_instance()->_handler_allocator->free(task);
}
static void free_task(void* task)
{
Server::get_instance()->_task_allocator->free(task);
}
private:
static Server *_myself;
const std::uint64_t _port;
struct Lwip::tcp_pcb *_socket;
Libc::Env &_env;
std::array<struct Lwip::tcp_pcb *, config::max_connections()> _client_sockets;
std::array<char, 2048U> _buffer;
static ReceiveTask *_receive_tasks;
alignas(64) bool _is_running = true;
alignas(64) std::atomic_uint64_t _next_worker_id{0U};
const std::uint16_t _count_channels;
std::uint16_t add_client(Lwip::tcp_pcb *client_socket);
/* Genode environment for NIC session */
Genode::Attached_rom_dataspace _config;
Genode::Heap &_alloc;
Timer::Connection &_timer;
/* lwIP network device (NIC session wrapper) */
Lwip::Nic_netif _netif;
db::index::blinktree::BLinkTree<std::uint64_t, std::int64_t> *_tree{nullptr};
std::unique_ptr<mx::memory::dynamic::Allocator> _handler_allocator{nullptr};
std::unique_ptr<mx::memory::dynamic::Allocator> _task_allocator{nullptr};
/************************************************
* lwIP callback API: TCP callback functions
************************************************/
/* tcp_accept */
static Lwip::err_t
_handle_tcp_connect(void *arg, struct Lwip::tcp_pcb *newpcb, Lwip::err_t err);
/* helper function for close() */
};
class alignas(64) ResponseHandler final : public db::index::blinktree::Listener<std::uint64_t, std::int64_t>
{
public:
ResponseHandler(Server* server, const std::uint32_t client_id) : _server(server), _client_id(client_id) { }
ResponseHandler(Server* server, Server::state *s, std::uint16_t _core_id) : _server(server), _s(s), core_id(_core_id) { }
ResponseHandler(ResponseHandler&&) noexcept = default;
~ResponseHandler() = default;
@@ -32,7 +235,8 @@ public:
private:
Server* _server;
std::uint32_t _client_id;
Server::state *_s;
std::uint16_t core_id{0};
};
class alignas(64) RequestTask final : public mx::tasking::TaskInterface
@@ -58,33 +262,26 @@ private:
ResponseHandler& _response_handler;
};
class Server
class alignas(64) ReceiveTask final : public mx::tasking::TaskInterface
{
public:
Server(std::uint64_t port,
std::uint16_t count_channels) noexcept;
~Server();
public:
ReceiveTask(Server::state *state, Lwip::pbuf *pb) : _state(state), _pbuf(pb) {}
[[nodiscard]] std::uint16_t port() const noexcept { return _port; }
void stop() noexcept;
void send(std::uint32_t client_id, std::string &&message);
bool listen(db::index::blinktree::BLinkTree<std::uint64_t, std::int64_t>* tree);
mx::tasking::TaskResult execute(std::uint16_t core_id, std::uint16_t channel_id) override;
[[nodiscard]] bool is_running() const noexcept { return _is_running; }
private:
Server::state *_state;
Lwip::pbuf *_pbuf;
};
private:
const std::uint64_t _port;
std::int32_t _socket;
std::array<std::uint32_t, config::max_connections()> _client_sockets;
std::array<char, 2048U> _buffer;
class alignas(64) AcceptTask final : public mx::tasking::TaskInterface
{
public:
AcceptTask(Lwip::tcp_pcb *newpcb) : _pcb(newpcb) {}
ResponseHandler* _response_handlers;
RequestTask *_request_tasks;
mx::tasking::TaskResult execute(std::uint16_t core_id, std::uint16_t channel_id) override;
alignas(64) bool _is_running = true;
alignas(64) std::atomic_uint64_t _next_worker_id{0U};
const std::uint16_t _count_channels;
std::uint16_t add_client(std::int32_t client_socket);
private:
Lwip::tcp_pcb *_pcb;
};
} // namespace mx::io::network

View File

@@ -1,6 +1,9 @@
#include "server.h"
#include "network/server.h"
#include <iostream>
#include <mx/system/environment.h>
#include <base/heap.h>
#include <timer_session/connection.h>
using namespace application::blinktree_server;
@@ -18,7 +21,16 @@ void Server::run()
this->_tree = std::make_unique<db::index::blinktree::BLinkTree<std::uint64_t, std::int64_t>>(
this->_node_isolation_level, this->_preferred_synchronization_method);
server = new network::Server{this->_port, mx::tasking::runtime::channels()};
Libc::Env &env = mx::system::Environment::env();
static mx::memory::dynamic::Allocator *alloc = new (mx::memory::GlobalHeap::allocate_cache_line_aligned(sizeof(mx::memory::dynamic::Allocator))) mx::memory::dynamic::Allocator();
static Timer::Connection timer{env};
static Genode::Heap _alloc{env.ram(), env.rm()};
Mxip::mxip_init(*alloc, timer);
server = new network::Server{env, this->_port, mx::tasking::runtime::channels(), timer, _alloc};
std::cout << "Waiting for requests on port :" << this->_port << std::endl;
auto network_thread = std::thread{[server, tree = this->_tree.get()]() {
@@ -26,7 +38,10 @@ void Server::run()
}};
mx::tasking::runtime::start_and_wait();
//network_thread.join();
delete server;
network_thread.join();
//delete server;
}

View File

@@ -16,12 +16,13 @@ SRC_BTREE += server.cpp
SRC_BTREE += network/server.cpp
SRC_CC = ${SRC_MXBENCH} ${SRC_BTREE}
LIBS += base libc stdcxx mxtasking
LIBS += base libc stdcxx mxtasking mxip
EXT_OBJECTS += /usr/local/genode/tool/lib/clang/14.0.5/lib/linux/libclang_rt.builtins-x86_64.a /usr/local/genode/tool/lib/libatomic.a
CUSTOM_CC = /usr/local/genode/tool/bin/clang
CUSTOM_CXX = /usr/local/genode/tool/bin/clang++
CC_OPT += --target=x86_64-genode --sysroot=/does/not/exist --gcc-toolchain=$(GENODE_GCC_TOOLCHAIN_DIR) -Wno-error -O2 -g -fno-aligned-new -DNDEBUG -I$(MXINC_DIR) -std=c++20 #-D_GLIBCXX_ATOMIC_BUILTINS_8 -D__GCC_HAVE_SYNC_COMPARE_AND_SWAP_8
CC_OPT += -femulated-tls -DCLANG_CXX11_ATOMICS
CC_OPT := --target=x86_64-genode --sysroot=/does/not/exist --gcc-toolchain=$(GENODE_GCC_TOOLCHAIN_DIR) -Wno-error -g -DNDEBUG -I$(MXINC_DIR) -std=c++20 #-D_GLIBCXX_ATOMIC_BUILTINS_8 -D__GCC_HAVE_SYNC_COMPARE_AND_SWAP_8
CC_OPT += -I$(MXBENCH_DIR)
CC_OLEVEL = -O3
CC_CXX_WARN_STRICT =
CUSTOM_CXX_LIB := $(CROSS_DEV_PREFIX)g++
#CXX_LD += $(CROSS_DEV_PREFIX)g++