WIP: MxIP TCP echo server

This commit is contained in:
Michael Mueller
2025-06-20 17:44:50 +02:00
parent 7910439f37
commit e6261ce31d
2 changed files with 31 additions and 19 deletions

View File

@@ -1,4 +1,5 @@
#include "server.h" #include "server.h"
#include "base/log.h"
#include "ealanos/memory/hamstraaja.h" #include "ealanos/memory/hamstraaja.h"
#include "lwip/err.h" #include "lwip/err.h"
#include "lwip/pbuf.h" #include "lwip/pbuf.h"
@@ -6,6 +7,7 @@
#include "lwip/tcpbase.h" #include "lwip/tcpbase.h"
#include "mx/memory/global_heap.h" #include "mx/memory/global_heap.h"
#include "mx/tasking/task.h" #include "mx/tasking/task.h"
#include "mxip_lock.h"
#include "mxnic_netif.h" #include "mxnic_netif.h"
#include <cstring> #include <cstring>
#include <limits> #include <limits>
@@ -74,7 +76,7 @@ class Send_task : public mx::tasking::TaskInterface
Send_task(Server::state *s, std::string message) : _s(s), _message(message) {} Send_task(Server::state *s, std::string message) : _s(s), _message(message) {}
mx::tasking::TaskResult execute(std::uint16_t, std::uint16_t) override mx::tasking::TaskResult execute(std::uint16_t, std::uint16_t) override
{ {
using namespace Lwip; using namespace Lwip;
if (_s->state == Server::CLOSED || _s->state == Server::CLOSING) { if (_s->state == Server::CLOSED || _s->state == Server::CLOSING) {
@@ -101,24 +103,19 @@ class Send_task : public mx::tasking::TaskInterface
} }
err_t rc = ERR_OK; err_t rc = ERR_OK;
{ {
rc = tcp_write(_s->pcb, ptr->payload, ptr->len, TCP_WRITE_FLAG_COPY); rc = tcp_write(_s->pcb, ptr->payload, ptr->len, TCP_WRITE_FLAG_COPY);
} }
if (rc == ERR_OK) if (rc == ERR_OK)
{ {
tcp_output(_s->pcb); tcp_output(_s->pcb);
pbuf_free(ptr); pbuf_free(ptr);
} else { } else {
Genode::warning("LWIP tcp_write error ", static_cast<signed int>(rc)); if (_s->tx == nullptr)
if (_s->state == Server::CLOSED || _s->state == Server::CLOSING)
return mx::tasking::TaskResult::make_remove();
pbuf_free(ptr);
/*if (_s->tx == nullptr)
_s->tx = ptr; _s->tx = ptr;
else { else {
pbuf_cat(_s->tx, ptr); pbuf_cat(_s->tx, ptr);
}*/ }
} }
//Server::free_task(static_cast<void *>(this)); //Server::free_task(static_cast<void *>(this));
return mx::tasking::TaskResult::make_remove(); return mx::tasking::TaskResult::make_remove();
@@ -128,7 +125,7 @@ class Send_task : public mx::tasking::TaskInterface
void void
Server::send(Server::state *s, std::string &&message) Server::send(Server::state *s, std::string &&message)
{ {
//GENODE_LOG_TSC(1);
const auto length = std::uint64_t(message.size()); const auto length = std::uint64_t(message.size());
auto response = std::string(length + sizeof(length), '\0'); auto response = std::string(length + sizeof(length), '\0');
@@ -139,13 +136,13 @@ Server::send(Server::state *s, std::string &&message)
std::memmove(response.data() + sizeof(length), message.data(), length); std::memmove(response.data() + sizeof(length), message.data(), length);
auto task = mx::tasking::runtime::new_task<Send_task>(0, s, response);//new (mx::memory::GlobalHeap::allocate_cache_line_aligned(sizeof(Send_task))) Send_task(s, response); auto task = mx::tasking::runtime::new_task<Send_task>(0, s, response);//new (mx::memory::GlobalHeap::allocate_cache_line_aligned(sizeof(Send_task))) Send_task(s, response);
task->annotate(static_cast<mx::tasking::TaskInterface::channel>(s->channel_id)); task->annotate(static_cast<mx::tasking::TaskInterface::channel>(0));
mx::tasking::runtime::spawn(*task); mx::tasking::runtime::spawn(*task);
} }
void Server::stop() noexcept void Server::stop() noexcept
{ {
this->_is_running = false; this->_is_running = false;
} }
class Close_task : public mx::tasking::TaskInterface class Close_task : public mx::tasking::TaskInterface
@@ -163,6 +160,7 @@ class Close_task : public mx::tasking::TaskInterface
_s.state = Server::CLOSED; _s.state = Server::CLOSED;
//Server::free_task(static_cast<void *>(this)); //Server::free_task(static_cast<void *>(this));
Genode::log("Closed connection"); Genode::log("Closed connection");
return mx::tasking::TaskResult::make_remove(); return mx::tasking::TaskResult::make_remove();
} }
}; };
@@ -211,7 +209,7 @@ err_t Server::_handle_tcp_connect(void *arg, struct tcp_pcb *newpcb, err_t err)
err_t Server::_handle_tcp_recv(void *arg, struct tcp_pcb *tpcb, struct pbuf *p, err_t err) err_t Server::_handle_tcp_recv(void *arg, struct tcp_pcb *tpcb, struct pbuf *p, err_t err)
{ {
static std::uint16_t next_receive_task = 0; static std::uint16_t next_receive_task = 0;
struct state *s; struct state *s;
err_t rc = ERR_OK; err_t rc = ERR_OK;
@@ -244,10 +242,12 @@ err_t Server::_handle_tcp_recv(void *arg, struct tcp_pcb *tpcb, struct pbuf *p,
rc = err; rc = err;
} else if (s->state == states::ACCEPTED) { } else if (s->state == states::ACCEPTED) {
//s->state = states::RECEIVED; //s->state = states::RECEIVED;
rc = ERR_OK; rc = ERR_OK;
{ {
auto task = mx::tasking::runtime::new_task<ReceiveTask>(0, s, p, p->len); ReceiveTask *task = nullptr;
{
task = mx::tasking::runtime::new_task<ReceiveTask>(0, s, p, p->len);
}
//ReceiveTask *task = new (mx::memory::GlobalHeap::allocate_cache_line_aligned(sizeof(ReceiveTask))) ReceiveTask(s, payload, p->len); //ReceiveTask *task = new (mx::memory::GlobalHeap::allocate_cache_line_aligned(sizeof(ReceiveTask))) ReceiveTask(s, payload, p->len);
if (!task) { if (!task) {
Genode::warning("Could not allocate request handler task"); Genode::warning("Could not allocate request handler task");
@@ -255,8 +255,11 @@ err_t Server::_handle_tcp_recv(void *arg, struct tcp_pcb *tpcb, struct pbuf *p,
} }
task->annotate(static_cast<mx::tasking::TaskInterface::channel>(s->channel_id)); task->annotate(static_cast<mx::tasking::TaskInterface::channel>(s->channel_id));
mx::tasking::runtime::spawn(*task); mx::tasking::runtime::spawn(*task);
}
{
//GENODE_LOG_TSC_NAMED(1, "tcp_recved");
tcp_recved(s->pcb, p->len);
} }
tcp_recved(s->pcb, p->len);
//pbuf_free(p); //pbuf_free(p);
//Server::get_instance()->send(s, "Nope"); //Server::get_instance()->send(s, "Nope");
@@ -294,7 +297,6 @@ err_t Server::_handle_tcp_poll(void *arg, struct tcp_pcb *tpcb)
err_t rc; err_t rc;
struct state *s; struct state *s;
//GENODE_LOG_TSC(1);
s = static_cast<struct state *>(arg); s = static_cast<struct state *>(arg);
if (s) { if (s) {
@@ -358,6 +360,7 @@ mx::tasking::TaskResult application::echo_server::network::ReceiveTask::execute(
{ {
err_t rc = ERR_OK; err_t rc = ERR_OK;
//GENODE_LOG_TSC_NAMED(1, "ReceiveTask");
if (!_payload || !_payload->payload) if (!_payload || !_payload->payload)
return mx::tasking::TaskResult::make_remove(); return mx::tasking::TaskResult::make_remove();

View File

@@ -1,5 +1,6 @@
#pragma once #pragma once
#include "base/output.h"
#include "config.h" #include "config.h"
#include "ealanos/memory/hamstraaja.h" #include "ealanos/memory/hamstraaja.h"
#include "lwip/pbuf.h" #include "lwip/pbuf.h"
@@ -80,7 +81,9 @@ namespace application::echo_server::network {
struct pbuf *tx; struct pbuf *tx;
std::uint16_t channel_id; std::uint16_t channel_id;
std::uint64_t id; std::uint64_t id;
}; };
Server(Libc::Env &env, std::uint64_t port, Server(Libc::Env &env, std::uint64_t port,
std::uint16_t count_channels, Timer::Connection &timer, Genode::Heap &alloc, Ealan::Memory::Hamstraaja<128, 4*4096> *talloc, Mxip::Nic_netif::Payload_allocator *palloc) noexcept; std::uint16_t count_channels, Timer::Connection &timer, Genode::Heap &alloc, Ealan::Memory::Hamstraaja<128, 4*4096> *talloc, Mxip::Nic_netif::Payload_allocator *palloc) noexcept;
~Server(); ~Server();
@@ -183,6 +186,12 @@ namespace application::echo_server::network {
Genode::log("Freeing state object ", s); Genode::log("Freeing state object ", s);
mx::memory::GlobalHeap::free(s); // mem_free(s); mx::memory::GlobalHeap::free(s); // mem_free(s);
Genode::log("Freed state object"); Genode::log("Freed state object");
static bool printed = false;
if (!printed) {
//get_instance()->_netif.print_stats();
printed = true;
}
} }
} }