diff --git a/repos/mml/src/app/blinktree_server/network/server.cpp b/repos/mml/src/app/blinktree_server/network/server.cpp index 40734f83ed..e2ff91ccbe 100644 --- a/repos/mml/src/app/blinktree_server/network/server.cpp +++ b/repos/mml/src/app/blinktree_server/network/server.cpp @@ -1,13 +1,14 @@ #include "server.h" #include #include -#include -#include #include #include #include #include #include +#include +#include +#include using namespace application::blinktree_server::network; @@ -49,203 +50,219 @@ mx::tasking::TaskResult RequestTask::execute(const std::uint16_t core_id, const return mx::tasking::TaskResult::make_null(); } - return mx::tasking::TaskResult::make_succeed(request_task); + return mx::tasking::TaskResult::make_succeed_and_remove(request_task); } -void ResponseHandler::inserted(const std::uint16_t /*core_id*/, const std::uint64_t key, const std::int64_t /*value*/) +void ResponseHandler::inserted(const std::uint16_t /*core*/, const std::uint64_t key, const std::int64_t /*value*/) { - _server-> send(_client_id, std::to_string(key)); + _server->send(_s, std::to_string(key)); + Server::free_handler_task(core_id, static_cast(this)); } void ResponseHandler::updated(const std::uint16_t /*core_id*/, const std::uint64_t key, const std::int64_t /*value*/) { - _server-> send(_client_id, std::to_string(key)); + _server-> send(_s, std::to_string(key)); + Server::free_handler_task(core_id, static_cast(this)); } void ResponseHandler::removed(const std::uint16_t /*core_id*/, const std::uint64_t key) { - _server-> send(_client_id, std::to_string(key)); + _server-> send(_s, std::to_string(key)); + Server::free_handler_task(core_id, static_cast(this)); } void ResponseHandler::found(const std::uint16_t /*core_id*/, const std::uint64_t /*key*/, const std::int64_t value) { - _server-> send(_client_id, std::to_string(value)); + _server-> send(_s, std::to_string(value)); + Server::free_handler_task(core_id, static_cast(this)); } void ResponseHandler::missing(const std::uint16_t /*core_id*/, const std::uint64_t key) { - _server-> send(_client_id, std::to_string(key)); + _server-> send(_s, std::to_string(key)); + Server::free_handler_task(core_id, static_cast(this)); } +Server *Server::_myself; +ReceiveTask *Server::_receive_tasks = nullptr; - -Server::Server(const std::uint64_t port, - const std::uint16_t count_channels) noexcept - : _port(port), _socket(-1), _client_sockets({0U}), - _count_channels(count_channels) +Server::Server(Libc::Env &env, + const std::uint64_t port, + const std::uint16_t count_channels, Timer::Connection &timer, Genode::Heap &alloc) noexcept + : _port(port), _socket(nullptr), _client_sockets({nullptr}), + _count_channels(count_channels), _env{env}, _config(env, "config"), _alloc(alloc), _timer(timer), _netif(env, _alloc, _config.xml()) { + Server::_myself = this; this->_buffer.fill('\0'); - this->_response_handlers = reinterpret_cast(std::malloc(sizeof(ResponseHandler) * config::max_connections())); - for (auto client_id = 0U; client_id < config::max_connections(); ++client_id) { - new (&this->_response_handlers[client_id]) ResponseHandler{this, client_id}; - } + _receive_tasks = static_cast(mx::memory::GlobalHeap::allocate_cache_line_aligned(65536 * sizeof(ReceiveTask))); - this->_request_tasks = reinterpret_cast(std::malloc( sizeof(RequestTask) * config::max_connections())); + _handler_allocator.reset(new (mx::memory::GlobalHeap::allocate_cache_line_aligned(sizeof(mx::memory::dynamic::Allocator))) mx::memory::dynamic::Allocator()); + + _task_allocator.reset(new (mx::memory::GlobalHeap::allocate_cache_line_aligned(sizeof(mx::memory::dynamic::Allocator))) mx::memory::dynamic::Allocator()); } Server::~Server() { - ::free(this->_response_handlers); - ::free(this->_request_tasks); } bool Server::listen(db::index::blinktree::BLinkTree* tree) { - this->_socket = socket(AF_INET, SOCK_STREAM, 0); - if (this->_socket == 0) - { + _socket = Lwip::tcp_new(); + + if (!_socket) { + Genode::error("Failed to create server socket"); return false; } - auto option = std::int32_t{1}; - if (setsockopt(this->_socket, SOL_SOCKET, SO_REUSEADDR, &option, socklen_t{sizeof(std::int32_t)}) < 0) - { + Lwip::err_t rc = Lwip::tcp_bind(_socket, &Lwip::ip_addr_any, _port); + if (rc != Lwip::ERR_OK) { + Genode::error("Failed to bind server socket to port ", _port); return false; } - auto address = sockaddr_in{}; - address.sin_family = AF_INET; - address.sin_addr.s_addr = INADDR_ANY; - address.sin_port = htons(this->_port); + _socket = Lwip::tcp_listen_with_backlog(_socket, 64); + Lwip::tcp_accept(_socket, &Server::_handle_tcp_connect); - if (bind(this->_socket, reinterpret_cast(&address), sizeof(sockaddr_in)) < 0) - { - return false; - } - - if (::listen(this->_socket, 1024) < 0) - { - return false; - } - - auto address_length = socklen_t{sizeof(sockaddr_in)}; - auto socket_descriptors = fd_set{}; - auto max_socket_descriptor = this->_socket; - auto client_socket = std::int32_t{-1}; - - while (this->_is_running) - { - FD_ZERO(&socket_descriptors); // NOLINT - FD_SET(this->_socket, &socket_descriptors); - - for (auto &socket_descriptor : this->_client_sockets) - { - if (socket_descriptor > 0) - { - FD_SET(socket_descriptor, &socket_descriptors); - } - - max_socket_descriptor = std::max(max_socket_descriptor, std::int32_t(socket_descriptor)); - } - - auto timeout = timeval{}; - timeout.tv_usec = 10000; - const auto count_ready_selectors = - select(max_socket_descriptor + 1, &socket_descriptors, nullptr, nullptr, &timeout); - - if (count_ready_selectors > 0) - { - if (FD_ISSET(this->_socket, &socket_descriptors)) - { - if ((client_socket = accept(this->_socket, reinterpret_cast(&address), &address_length)) < - 0) - { - return false; - } - this->add_client(client_socket); - } - - for (auto i = 0U; i < this->_client_sockets.size(); ++i) - { - const auto client = this->_client_sockets[i]; - if (FD_ISSET(client, &socket_descriptors)) - { - const auto read_bytes = read(client, this->_buffer.data(), this->_buffer.size()); - if (read_bytes == 0U) - { - ::close(client); - this->_client_sockets[i] = 0U; - } - else - { - // Copy incoming data locally. - RequestTask::Type request_type; - auto message = std::string(this->_buffer.data(), read_bytes); - - if (message[0] == 'D') - { - auto *request_task = new (&this->_request_tasks[i]) RequestTask{tree, this->_response_handlers[i]}; - request_task->annotate(std::uint16_t(0U)); - mx::tasking::runtime::spawn(*request_task); - } - else - { - switch(message[0]) - { - case 'I': request_type = RequestTask::Type::Insert; break; - case 'U': request_type = RequestTask::Type::Update; break; - default: request_type = RequestTask::Type::Lookup; - } - - auto key = 0ULL; - auto index = 2U; // Skip request type and comma. - while(message[index] >= '0' && message[index] <= '9') { - key = key * 10 + (message[index++] - '0'); - } - - auto channel_id = std::uint16_t(this->_next_worker_id.fetch_add(1U) % this->_count_channels); - if (request_type == RequestTask::Type::Insert || request_type == RequestTask::Type::Lookup) - { - auto value = 0LL; - ++index; - while (message[index] >= '0' && message[index] <= '9') - { - value = value * 10 + (message[index++] - '0'); - } - - auto *request_task = new (&this->_request_tasks[i]) RequestTask{tree, request_type, key, value, this->_response_handlers[i]}; - request_task->annotate(channel_id); - mx::tasking::runtime::spawn(*request_task); - } - else - { - auto *request_task = new (&this->_request_tasks[i]) RequestTask{tree, RequestTask::Type::Lookup, key, this->_response_handlers[i]}; - request_task->annotate(channel_id); - mx::tasking::runtime::spawn(*request_task); - } - //mx::tasking::runtime::scheduler().allocate_cores(64); - } - } - } - } - } - } - - for (const auto client : this->_client_sockets) - { - if (client > 0) - { - ::close(client); - } - } - ::close(this->_socket); + this->_tree = tree; return true; } -void Server::send(const std::uint32_t client_id, std::string &&message) +void Server::parse(struct Server::state *s, std::string &message) { + RequestTask::Type request_type; + + std::uint64_t i = s->id; + + if (message[0] == 'D') + { + auto response_handler = new (_handler_allocator->allocate(0, 64, sizeof(ResponseHandler))) ResponseHandler(this, s, 0); + //auto *request_task = new (&this->_request_tasks[i]) RequestTask{this->_tree, *response_handler}; + auto *request_task = mx::tasking::runtime::new_task(0, this->_tree, *response_handler); + request_task->annotate(std::uint16_t(0U)); + mx::tasking::runtime::spawn(*request_task); + } + else + { + switch (message[0]) + { + case 'I': + request_type = RequestTask::Type::Insert; + break; + case 'U': + request_type = RequestTask::Type::Update; + break; + default: + request_type = RequestTask::Type::Lookup; + } + + auto key = 0ULL; + auto index = 2U; // Skip request type and comma. + while (message[index] >= '0' && message[index] <= '9') + { + key = key * 10 + (message[index++] - '0'); + } + + auto channel_id = std::uint16_t(this->_next_worker_id.fetch_add(1U) % this->_count_channels); + if (request_type == RequestTask::Type::Insert || request_type == RequestTask::Type::Lookup) + { + auto value = 0LL; + ++index; + while (message[index] >= '0' && message[index] <= '9') + { + value = value * 10 + (message[index++] - '0'); + } + + auto response_handler = new (_handler_allocator->allocate(mx::system::topology::node_id(channel_id), 64, sizeof(ResponseHandler))) ResponseHandler(this, s, channel_id); + auto *request_task = mx::tasking::runtime::new_task(channel_id, this->_tree, request_type, key, value, *response_handler); + request_task->annotate(channel_id); + mx::tasking::runtime::spawn(*request_task); + } + else + { + //auto *request_task = new (&this->_request_tasks[i]) RequestTask{this->_tree, RequestTask::Type::Lookup, key, this->_response_handlers[i]}; + auto response_handler = new (_handler_allocator->allocate(mx::system::topology::node_id(channel_id), 64, sizeof(ResponseHandler))) ResponseHandler(this, s, channel_id); + auto *request_task = mx::tasking::runtime::new_task(channel_id, this->_tree, request_type, key, *response_handler); + request_task->annotate(channel_id); + mx::tasking::runtime::spawn(*request_task); + } + mx::tasking::runtime::scheduler().allocate_cores(64); + } +} + +class Send_task : public mx::tasking::TaskInterface +{ + private: + struct Server::state *_s; + std::string _message; + + public: + Send_task(Server::state *s, std::string message) : _s(s), _message(message) {} + + mx::tasking::TaskResult execute(std::uint16_t, std::uint16_t) override + { + using namespace Lwip; + Lwip::pbuf *ptr = nullptr; + + if (_s->state == Server::CLOSED || _s->state == Server::CLOSING) { + Genode::warning("Tried to send over socket that is to be closed"); + Server::free_task(static_cast(this)); + return mx::tasking::TaskResult::make_null(); + } + + ptr = Lwip::pbuf_alloc(Lwip::PBUF_TRANSPORT, _message.length(), Lwip::PBUF_RAM); + + if (!(_s->pcb) || !_s) { + Genode::error("Tried sending over invalid pcb"); + Server::free_task(static_cast(this)); + return mx::tasking::TaskResult::make_null(); + } + + if (!ptr) + { + Genode::error("No memory for sending packet."); + Server::free_task(static_cast(this)); + return mx::tasking::TaskResult::make_null(); + } + + if (ptr >= reinterpret_cast(0x7FFF80000000UL) || _s->pcb >= reinterpret_cast(0x7FFF80000000UL)) + { + Genode::error("Allocated buffer or pcb is at non-canonical address. Aborting. ptr=", static_cast(ptr), " pcb=", static_cast(_s->pcb), " s=", static_cast(_s)); + Server::free_task(static_cast(this)); + return mx::tasking::TaskResult::make_null(); + } + + ptr->payload = static_cast(const_cast(_message.c_str())); + ptr->len = _message.length(); + + if (ptr->len > tcp_sndbuf(_s->pcb)) + Genode::warning("Not enough space in send buffer"); + + Lwip::err_t rc = Lwip::ERR_OK; + { + rc = Lwip::tcp_write(_s->pcb, ptr->payload, ptr->len, TCP_WRITE_FLAG_COPY); + } + if (rc == Lwip::ERR_OK) + { + Lwip::tcp_output(_s->pcb); + Lwip::pbuf_free(ptr); + } else { + if (_s->tx == nullptr) + _s->tx = ptr; + else { + Lwip::pbuf_cat(_s->tx, ptr); + } + } + Server::free_task(static_cast(this)); + return mx::tasking::TaskResult::make_null(); + } +}; + +void +Server::send(struct state *s, std::string &&message) +{ + const auto length = std::uint64_t(message.size()); auto response = std::string(length + sizeof(length), '\0'); @@ -255,10 +272,12 @@ void Server::send(const std::uint32_t client_id, std::string &&message) // Write data std::memmove(response.data() + sizeof(length), message.data(), length); - ::send(this->_client_sockets[client_id], response.c_str(), response.length(), 0); + auto task = new (Server::get_instance()->_task_allocator->allocate(0, 64, sizeof(Send_task))) Send_task(s, response); + task->annotate(static_cast(s->channel_id)); + mx::tasking::runtime::spawn(*task); } -std::uint16_t Server::add_client(const std::int32_t client_socket) +std::uint16_t Server::add_client(Lwip::tcp_pcb* client_socket) { for (auto i = 0U; i < this->_client_sockets.size(); ++i) { @@ -275,4 +294,222 @@ std::uint16_t Server::add_client(const std::int32_t client_socket) void Server::stop() noexcept { this->_is_running = false; +} + +class Close_task : public mx::tasking::TaskInterface +{ + private: + Server::state &_s; + + public: + Close_task(Server::state &s) : _s(s) {} + + mx::tasking::TaskResult execute(std::uint16_t, std::uint16_t) + { + Genode::log("Closing connection for ", static_cast(_s.pcb) , " and state object ", static_cast(&_s)); + Server::tcpbtree_close(_s.pcb, &_s); + _s.state = Server::CLOSED; + Server::free_task(static_cast(this)); + return mx::tasking::TaskResult::make_null(); + } +}; + +/*********** + * LWIP callback function definitions + ***********/ +Lwip::err_t Server::_handle_tcp_connect(void *arg, struct Lwip::tcp_pcb *newpcb, Lwip::err_t err) +{ + + struct state *s; + + static uint64_t count_connections = 0; + + LWIP_UNUSED_ARG(arg); + + if ((err != Lwip::ERR_OK) || (newpcb == NULL)) { + return Lwip::ERR_VAL; + } + + //Genode::log("Incoming request"); + + s = new (Lwip::mem_malloc(sizeof(struct state))) state(); // static_cast(Lwip::mem_malloc(sizeof(struct state))); + + if (!s) { + Genode::error("Failed to allocate state object for new connection."); + return Lwip::ERR_MEM; + } + //Genode::log("New connection #", count_connections, ": arg=", arg, " pcb=", newpcb, " s=", s, " &s=", static_cast(&s)); + + s->state = states::ACCEPTED; + s->pcb = newpcb; + s->retries = 0; + s->p = nullptr; + s->tx = nullptr; + s->channel_id = 0; //count_connections % Server::get_instance()->_count_channels; + + Lwip::tcp_backlog_accepted(newpcb); + /* Register callback functions */ + Lwip::tcp_arg(newpcb, s); + Lwip::tcp_recv(newpcb, &Server::_handle_tcp_recv); + Lwip::tcp_err(newpcb, &Server::_handle_tcp_error); + Lwip::tcp_poll(newpcb, &Server::_handle_tcp_poll, 50); + Lwip::tcp_sent(newpcb, &Server::_handle_tcp_sent); + newpcb->flags |= TF_NODELAY; + + return Lwip::ERR_OK; +} + +Lwip::err_t Server::_handle_tcp_recv(void *arg, struct Lwip::tcp_pcb *tpcb, struct Lwip::pbuf *p, Lwip::err_t err) +{ + static std::uint16_t next_receive_task = 0; + struct state *s; + Lwip::err_t rc = Lwip::ERR_OK; + + std::uint16_t next_channel_id = 0; + + s = static_cast(arg); + + if (err != Lwip::ERR_OK) { + return err; + } + + if (p == nullptr) { + s->state = CLOSING; + auto task = new (Server::get_instance()->_task_allocator->allocate(0, 64, sizeof(Close_task))) Close_task(*s); + if (!task) { + Genode::warning("Failed to allocate close task"); + return Lwip::ERR_MEM; + } + task->annotate(static_cast(s->channel_id)); + mx::tasking::runtime::spawn(*task); + Lwip::pbuf_free(p); + rc = Lwip::ERR_OK; + } else if (err != Lwip::ERR_OK) { + rc = err; + } else if (s->state == states::ACCEPTED) { + s->state == states::RECEIVED; + + // TODO: parse message and spawn request task here + rc = Lwip::ERR_OK; + { + ReceiveTask *task = new (Server::get_instance()->_task_allocator->allocate(0, 64, sizeof(ReceiveTask))) ReceiveTask(s, p); + if (!task) { + Genode::warning("Could not allocate request handler task"); + return Lwip::ERR_MEM; + } + task->annotate(static_cast(s->channel_id)); + mx::tasking::runtime::spawn(*task); + } + Lwip::tcp_recved(s->pcb, p->len); + //Server::get_instance()->send(s, "Nope"); + } + else if (s->state == states::RECEIVED) + { + ReceiveTask *task = new (Server::get_instance()->_task_allocator->allocate(0, 64, sizeof(ReceiveTask))) ReceiveTask(s, p); + if (!task) { + Genode::warning("Could not allocate request handler task"); + return Lwip::ERR_MEM; + } + task->annotate(static_cast(s->channel_id)); + mx::tasking::runtime::spawn(*task); + Lwip::tcp_recved(s->pcb, p->len); + //Server::get_instance()->send(s, "Nope"); + + rc = Lwip::ERR_OK; + } + else + { + Lwip::tcp_recved(tpcb, p->tot_len); + Lwip::pbuf_free(p); + rc = Lwip::ERR_OK; + } + + return rc; +} + +Lwip::err_t Server::_handle_tcp_poll(void *arg, struct Lwip::tcp_pcb *tpcb) +{ + Lwip::err_t rc; + struct state *s; + + //GENODE_LOG_TSC(1); + s = static_cast(arg); + + if (s) { + if (s->tx) { + rc = Lwip::tcp_write(tpcb, s->tx->payload, s->tx->len, 1); + if (rc == Lwip::ERR_OK) { + Lwip::tcp_output(tpcb); + Lwip::pbuf *ptr = s->tx; + if (ptr->next) { + s->tx = ptr->next; + Lwip::pbuf_ref(s->tx); + } + Lwip::tcp_recved(tpcb, ptr->len); + Lwip::pbuf_free(ptr); + } + // TODO: process remaning pbuf entry + } else { + /*if (s->state == states::CLOSING) { + Server::tcpbtree_close(tpcb, s); + }*/ + } + rc = Lwip::ERR_OK; + } else { + Lwip::tcp_abort(tpcb); + rc = Lwip::ERR_ABRT; + } + + return Lwip::ERR_OK; +} + + +Lwip::err_t Server::_handle_tcp_sent(void *arg, struct Lwip::tcp_pcb *tpcb, std::uint16_t len) +{ + //GENODE_LOG_TSC(1); + struct state *s = static_cast(arg); + s->retries = 0; + + if (s->tx) { + Lwip::err_t rc = Lwip::tcp_write(tpcb, s->tx->payload, s->tx->len, 1); + if (rc == Lwip::ERR_OK) { + Lwip::tcp_output(tpcb); + Lwip::pbuf *ptr = s->tx; + if (ptr->next) { + s->tx = ptr->next; + Lwip::pbuf_ref(s->tx); + } + Lwip::tcp_recved(tpcb, ptr->len); + Lwip::pbuf_free(ptr); + } + tcp_sent(tpcb, &Server::_handle_tcp_sent); // Genode::log("In _handle_tcp_sent"); + } + + return Lwip::ERR_OK; +} + +mx::tasking::TaskResult application::blinktree_server::network::ReceiveTask::execute(std::uint16_t core_id, std::uint16_t channel_id) +{ + Lwip::err_t rc = Lwip::ERR_OK; + + /*rc = Lwip::tcp_write(_state->pcb, _pbuf->payload, _pbuf->len, 3); + Lwip::tcp_output(_state->pcb); + if (rc == Lwip::ERR_OK) { + Lwip::tcp_recved(_state->pcb, _pbuf->tot_len); + Lwip::pbuf_free(_pbuf); + } else if (rc == Lwip::ERR_MEM) { + Genode::warning("Out of memory"); + }*/ + + //Genode::log("Executing application task"); + //Server::get_instance()->send(_state, "Nope"); + // Server::tcp_send(_state->pcb, _state); + + std::string request = std::string(static_cast(_pbuf->payload), _pbuf->len); + Server::get_instance()->parse(_state, request); + + Lwip::pbuf_free(_pbuf); + + Server::free_task(static_cast(this)); + return mx::tasking::TaskResult::make_null(); } \ No newline at end of file diff --git a/repos/mml/src/app/blinktree_server/network/server.h b/repos/mml/src/app/blinktree_server/network/server.h index 1a754d293f..91e1cee63f 100644 --- a/repos/mml/src/app/blinktree_server/network/server.h +++ b/repos/mml/src/app/blinktree_server/network/server.h @@ -5,22 +5,225 @@ #include #include #include -#include -#include -#include -#include #include #include + +/* B-link tree includes */ #include #include +/* lwIP wrapper for Genode's NIC session */ +#include +#include +#include + +/* Genode includes */ +#include +#include +#include + +/* MxTasking includes*/ +#include +#include +#include +#include +#include + +/* lwIP includes */ +namespace Lwip { + extern "C" { + #include + #include + #include + } +} + namespace application::blinktree_server::network { -class Server; + class ResponseHandler; + class RequestTask; + class ReceiveTask; + class Server + { + public: + enum states + { + NONE = 0, + ACCEPTED, + RECEIVED, + CLOSING, + CLOSED + }; + + struct state + { + std::uint8_t state; + std::uint8_t retries; + struct Lwip::tcp_pcb *pcb; + struct Lwip::pbuf *p; + struct Lwip::pbuf *tx; + std::uint16_t channel_id; + std::uint64_t id; + }; + Server(Libc::Env &env, std::uint64_t port, + std::uint16_t count_channels, Timer::Connection &timer, Genode::Heap &alloc) noexcept; + ~Server(); + + [[nodiscard]] std::uint16_t port() const noexcept { return _port; } + void stop() noexcept; + void send(struct Server::state *s, std::string &&message); + bool listen(db::index::blinktree::BLinkTree *tree); + void parse(struct Server::state *s, std::string &message); + + [[nodiscard]] bool is_running() const noexcept { return _is_running; } + + static void tcp_send(struct Lwip::tcp_pcb *tpcb, struct state *s) + { + using namespace Lwip; + struct Lwip::pbuf *ptr; + Lwip::err_t rc = Lwip::ERR_OK; + + + while ((rc == Lwip::ERR_OK) && (s->tx != nullptr) /* && (s->tx->len <= tcp_sndbuf(tpcb) */) + { + ptr = s->tx; + // Genode::log("Sending response"); + rc = Lwip::tcp_write(tpcb, ptr->payload, ptr->len, 1); + if (rc == Lwip::ERR_OK) + { + std::uint16_t plen; + + plen = ptr->len; + + s->tx = ptr->next; + if (s->tx != nullptr) + { + Lwip::pbuf_ref(s->tx); + } + Lwip::tcp_output(tpcb); + Lwip::pbuf_free(ptr); + } + else if (rc == Lwip::ERR_MEM) + { + Genode::warning("Low on memory. Defering to poll()"); + s->tx = ptr; + } + else + { + Genode::warning("An error ", static_cast(rc), " occured."); + } + } + } + + static void tcpbtree_close(struct Lwip::tcp_pcb *tpcb, struct state *s) + { + if (s->pcb != tpcb) { + Genode::error("Tried closing connection with invalid session state"); + return; + } + Lwip::tcp_arg(tpcb, NULL); + Lwip::tcp_sent(tpcb, NULL); + Lwip::tcp_recv(tpcb, NULL); + Lwip::tcp_poll(tpcb, NULL, 0); + Lwip::tcp_err(tpcb, nullptr); + + Server::tcp_free(s); + + Lwip::tcp_close(tpcb); + } + + /* tcp_recv */ + static Lwip::err_t _handle_tcp_recv(void *arg, struct Lwip::tcp_pcb *tpcb, struct Lwip::pbuf *p, Lwip::err_t err); + + /* tcp_err */ + static void _handle_tcp_error(void *arg, Lwip::err_t err) + { + struct state *s; + LWIP_UNUSED_ARG(err); + + s = static_cast(arg); + + Server::tcp_free(s); + } + + /* tcp_poll */ + static Lwip::err_t _handle_tcp_poll(void *arg, struct Lwip::tcp_pcb *tpcb); + + /* tcp_sent */ + static Lwip::err_t _handle_tcp_sent(void *arg, struct Lwip::tcp_pcb *tpcb, std::uint16_t len); + + /* helper function for free */ + static void tcp_free(struct state *s) + { + // Genode::log("Freeing state obj s=", s); + if (s) + { + if (s->p) + Lwip::pbuf_free(s->p); + if (s->tx) + Lwip::pbuf_free(s->tx); + delete s; // Lwip::mem_free(s); + } + } + + static Server *get_instance() { return _myself; } + + static void free_handler_task(std::uint16_t core_id, void* task) + { + Server::get_instance()->_handler_allocator->free(task); + } + + static void free_task(void* task) + { + Server::get_instance()->_task_allocator->free(task); + } + + private: + static Server *_myself; + const std::uint64_t _port; + struct Lwip::tcp_pcb *_socket; + Libc::Env &_env; + + std::array _client_sockets; + std::array _buffer; + static ReceiveTask *_receive_tasks; + + alignas(64) bool _is_running = true; + alignas(64) std::atomic_uint64_t _next_worker_id{0U}; + const std::uint16_t _count_channels; + + std::uint16_t add_client(Lwip::tcp_pcb *client_socket); + + /* Genode environment for NIC session */ + Genode::Attached_rom_dataspace _config; + Genode::Heap &_alloc; + Timer::Connection &_timer; + + /* lwIP network device (NIC session wrapper) */ + Lwip::Nic_netif _netif; + db::index::blinktree::BLinkTree *_tree{nullptr}; + + std::unique_ptr _handler_allocator{nullptr}; + std::unique_ptr _task_allocator{nullptr}; + + /************************************************ + * lwIP callback API: TCP callback functions + ************************************************/ + + /* tcp_accept */ + static Lwip::err_t + _handle_tcp_connect(void *arg, struct Lwip::tcp_pcb *newpcb, Lwip::err_t err); + + + + /* helper function for close() */ + +}; + class alignas(64) ResponseHandler final : public db::index::blinktree::Listener { public: - ResponseHandler(Server* server, const std::uint32_t client_id) : _server(server), _client_id(client_id) { } + ResponseHandler(Server* server, Server::state *s, std::uint16_t _core_id) : _server(server), _s(s), core_id(_core_id) { } ResponseHandler(ResponseHandler&&) noexcept = default; ~ResponseHandler() = default; @@ -32,7 +235,8 @@ public: private: Server* _server; - std::uint32_t _client_id; + Server::state *_s; + std::uint16_t core_id{0}; }; class alignas(64) RequestTask final : public mx::tasking::TaskInterface @@ -58,33 +262,26 @@ private: ResponseHandler& _response_handler; }; -class Server +class alignas(64) ReceiveTask final : public mx::tasking::TaskInterface { -public: - Server(std::uint64_t port, - std::uint16_t count_channels) noexcept; - ~Server(); + public: + ReceiveTask(Server::state *state, Lwip::pbuf *pb) : _state(state), _pbuf(pb) {} - [[nodiscard]] std::uint16_t port() const noexcept { return _port; } - void stop() noexcept; - void send(std::uint32_t client_id, std::string &&message); - bool listen(db::index::blinktree::BLinkTree* tree); + mx::tasking::TaskResult execute(std::uint16_t core_id, std::uint16_t channel_id) override; - [[nodiscard]] bool is_running() const noexcept { return _is_running; } + private: + Server::state *_state; + Lwip::pbuf *_pbuf; +}; -private: - const std::uint64_t _port; - std::int32_t _socket; - std::array _client_sockets; - std::array _buffer; +class alignas(64) AcceptTask final : public mx::tasking::TaskInterface +{ + public: + AcceptTask(Lwip::tcp_pcb *newpcb) : _pcb(newpcb) {} - ResponseHandler* _response_handlers; - RequestTask *_request_tasks; + mx::tasking::TaskResult execute(std::uint16_t core_id, std::uint16_t channel_id) override; - alignas(64) bool _is_running = true; - alignas(64) std::atomic_uint64_t _next_worker_id{0U}; - const std::uint16_t _count_channels; - - std::uint16_t add_client(std::int32_t client_socket); + private: + Lwip::tcp_pcb *_pcb; }; } // namespace mx::io::network \ No newline at end of file diff --git a/repos/mml/src/app/blinktree_server/server.cpp b/repos/mml/src/app/blinktree_server/server.cpp index bc279484fc..fcd9737ad2 100644 --- a/repos/mml/src/app/blinktree_server/server.cpp +++ b/repos/mml/src/app/blinktree_server/server.cpp @@ -1,6 +1,9 @@ #include "server.h" #include "network/server.h" #include +#include +#include +#include using namespace application::blinktree_server; @@ -18,7 +21,16 @@ void Server::run() this->_tree = std::make_unique>( this->_node_isolation_level, this->_preferred_synchronization_method); - server = new network::Server{this->_port, mx::tasking::runtime::channels()}; + Libc::Env &env = mx::system::Environment::env(); + + static mx::memory::dynamic::Allocator *alloc = new (mx::memory::GlobalHeap::allocate_cache_line_aligned(sizeof(mx::memory::dynamic::Allocator))) mx::memory::dynamic::Allocator(); + + static Timer::Connection timer{env}; + + static Genode::Heap _alloc{env.ram(), env.rm()}; + + Mxip::mxip_init(*alloc, timer); + server = new network::Server{env, this->_port, mx::tasking::runtime::channels(), timer, _alloc}; std::cout << "Waiting for requests on port :" << this->_port << std::endl; auto network_thread = std::thread{[server, tree = this->_tree.get()]() { @@ -26,7 +38,10 @@ void Server::run() }}; mx::tasking::runtime::start_and_wait(); - //network_thread.join(); - delete server; + + network_thread.join(); + + + //delete server; } \ No newline at end of file diff --git a/repos/mml/src/app/blinktree_server/target.mk b/repos/mml/src/app/blinktree_server/target.mk index f0cf215d64..2147dfb97b 100644 --- a/repos/mml/src/app/blinktree_server/target.mk +++ b/repos/mml/src/app/blinktree_server/target.mk @@ -16,12 +16,13 @@ SRC_BTREE += server.cpp SRC_BTREE += network/server.cpp SRC_CC = ${SRC_MXBENCH} ${SRC_BTREE} -LIBS += base libc stdcxx mxtasking +LIBS += base libc stdcxx mxtasking mxip EXT_OBJECTS += /usr/local/genode/tool/lib/clang/14.0.5/lib/linux/libclang_rt.builtins-x86_64.a /usr/local/genode/tool/lib/libatomic.a CUSTOM_CC = /usr/local/genode/tool/bin/clang CUSTOM_CXX = /usr/local/genode/tool/bin/clang++ -CC_OPT += --target=x86_64-genode --sysroot=/does/not/exist --gcc-toolchain=$(GENODE_GCC_TOOLCHAIN_DIR) -Wno-error -O2 -g -fno-aligned-new -DNDEBUG -I$(MXINC_DIR) -std=c++20 #-D_GLIBCXX_ATOMIC_BUILTINS_8 -D__GCC_HAVE_SYNC_COMPARE_AND_SWAP_8 -CC_OPT += -femulated-tls -DCLANG_CXX11_ATOMICS +CC_OPT := --target=x86_64-genode --sysroot=/does/not/exist --gcc-toolchain=$(GENODE_GCC_TOOLCHAIN_DIR) -Wno-error -g -DNDEBUG -I$(MXINC_DIR) -std=c++20 #-D_GLIBCXX_ATOMIC_BUILTINS_8 -D__GCC_HAVE_SYNC_COMPARE_AND_SWAP_8 +CC_OPT += -I$(MXBENCH_DIR) +CC_OLEVEL = -O3 CC_CXX_WARN_STRICT = CUSTOM_CXX_LIB := $(CROSS_DEV_PREFIX)g++ #CXX_LD += $(CROSS_DEV_PREFIX)g++