From 245d98f58e2cc0c2bea79bb03491c6520146e2c6 Mon Sep 17 00:00:00 2001 From: xiao Date: Fri, 8 May 2026 05:52:45 +0800 Subject: [PATCH] fix(tcp): add low-RAM delayed-ack buffering for TCP bridge --- App/tcp_client.c | 137 ++++++++++++++++++++++++++++++++++++++++----- App/tcp_client.h | 5 +- App/tcp_server.c | 143 ++++++++++++++++++++++++++++++++++++++++++----- App/tcp_server.h | 6 +- 4 files changed, 259 insertions(+), 32 deletions(-) diff --git a/App/tcp_client.c b/App/tcp_client.c index 58b987a..26884a3 100644 --- a/App/tcp_client.c +++ b/App/tcp_client.c @@ -17,6 +17,8 @@ typedef struct { uint8_t rx_ring[TCP_CLIENT_RX_BUFFER_SIZE]; uint16_t rx_head; uint16_t rx_tail; + struct pbuf *hold_pbuf; + uint16_t hold_offset; uint32_t next_retry_ms; uint8_t index; tcp_client_instance_config_t config; @@ -30,10 +32,65 @@ static uint16_t ring_free(uint16_t head, uint16_t tail, uint16_t size) return (head >= tail) ? (uint16_t)(size - head + tail - 1u) : (uint16_t)(tail - head - 1u); } +static uint16_t ring_used(uint16_t head, uint16_t tail, uint16_t size) +{ + return (head >= tail) ? (uint16_t)(head - tail) : (uint16_t)(size - tail + head); +} + +static void tcp_client_reset_rx_state(tcp_client_ctx_t *ctx) +{ + if (ctx == NULL) { + return; + } + if (ctx->hold_pbuf != NULL) { + pbuf_free(ctx->hold_pbuf); + ctx->hold_pbuf = NULL; + } + ctx->hold_offset = 0u; + ctx->rx_head = 0u; + ctx->rx_tail = 0u; +} + +static void tcp_client_fill_ring_from_pbuf(tcp_client_ctx_t *ctx) +{ + struct pbuf *q; + uint16_t offset; + + if (ctx == NULL || ctx->hold_pbuf == NULL) { + return; + } + + q = ctx->hold_pbuf; + offset = ctx->hold_offset; + while (q != NULL && offset >= q->len) { + offset = (uint16_t)(offset - q->len); + q = q->next; + } + + while (q != NULL) { + const uint8_t *src = (const uint8_t *)q->payload; + for (uint16_t i = offset; i < q->len; ++i) { + if (ring_free(ctx->rx_head, ctx->rx_tail, TCP_CLIENT_RX_BUFFER_SIZE) == 0u) { + ctx->hold_offset = (uint16_t)(ctx->hold_offset + i - offset); + return; + } + ctx->rx_ring[ctx->rx_head] = src[i]; + ctx->rx_head = (uint16_t)((ctx->rx_head + 1u) % TCP_CLIENT_RX_BUFFER_SIZE); + ctx->status.rx_bytes++; + } + ctx->hold_offset = (uint16_t)(ctx->hold_offset + q->len - offset); + offset = 0u; + q = q->next; + } + + pbuf_free(ctx->hold_pbuf); + ctx->hold_pbuf = NULL; + ctx->hold_offset = 0u; +} + static err_t tcp_client_on_recv(void *arg, struct tcp_pcb *pcb, struct pbuf *p, err_t err) { tcp_client_ctx_t *ctx = (tcp_client_ctx_t *)arg; - struct pbuf *q; if (ctx == NULL) { if (p != NULL) { @@ -59,21 +116,16 @@ static err_t tcp_client_on_recv(void *arg, struct tcp_pcb *pcb, struct pbuf *p, return ERR_ABRT; } - for (q = p; q != NULL; q = q->next) { - const uint8_t *src = (const uint8_t *)q->payload; - for (uint16_t i = 0; i < q->len; ++i) { - if (ring_free(ctx->rx_head, ctx->rx_tail, TCP_CLIENT_RX_BUFFER_SIZE) == 0u) { - ctx->status.errors++; - break; - } - ctx->rx_ring[ctx->rx_head] = src[i]; - ctx->rx_head = (uint16_t)((ctx->rx_head + 1u) % TCP_CLIENT_RX_BUFFER_SIZE); - ctx->status.rx_bytes++; - } + if (ctx->hold_pbuf != NULL) { + ctx->status.errors++; + return ERR_MEM; } - tcp_recved(pcb, p->tot_len); + pbuf_ref(p); + ctx->hold_pbuf = p; + ctx->hold_offset = 0u; pbuf_free(p); + tcp_client_fill_ring_from_pbuf(ctx); return ERR_OK; } @@ -93,6 +145,7 @@ static void tcp_client_on_err(void *arg, err_t err) if (ctx == NULL) { return; } + tcp_client_reset_rx_state(ctx); ctx->pcb = NULL; ctx->status.state = TCP_CLIENT_STATE_DISCONNECTED; ctx->status.errors++; @@ -213,6 +266,7 @@ int tcp_client_disconnect(uint8_t instance) } ctx = &g_clients[instance]; if (ctx->pcb != NULL) { + tcp_client_reset_rx_state(ctx); tcp_arg(ctx->pcb, NULL); tcp_recv(ctx->pcb, NULL); tcp_sent(ctx->pcb, NULL); @@ -221,8 +275,7 @@ int tcp_client_disconnect(uint8_t instance) ctx->pcb = NULL; } ctx->status.state = TCP_CLIENT_STATE_DISCONNECTED; - ctx->rx_head = 0u; - ctx->rx_tail = 0u; + tcp_client_reset_rx_state(ctx); return 0; } @@ -272,13 +325,66 @@ int tcp_client_recv(uint8_t instance, uint8_t *data, uint16_t max_len) return -1; } ctx = &g_clients[instance]; + tcp_client_fill_ring_from_pbuf(ctx); while (copied < max_len && ctx->rx_tail != ctx->rx_head) { data[copied++] = ctx->rx_ring[ctx->rx_tail]; ctx->rx_tail = (uint16_t)((ctx->rx_tail + 1u) % TCP_CLIENT_RX_BUFFER_SIZE); } + if (copied > 0u && ctx->pcb != NULL) { + tcp_recved(ctx->pcb, copied); + } return (int)copied; } +uint16_t tcp_client_rx_available(uint8_t instance) +{ + if (instance >= TCP_CLIENT_INSTANCE_COUNT) { + return 0u; + } + tcp_client_fill_ring_from_pbuf(&g_clients[instance]); + return ring_used(g_clients[instance].rx_head, g_clients[instance].rx_tail, TCP_CLIENT_RX_BUFFER_SIZE); +} + +uint16_t tcp_client_peek(uint8_t instance, uint8_t *data, uint16_t max_len) +{ + uint16_t copied = 0u; + uint16_t tail; + tcp_client_ctx_t *ctx; + + if (instance >= TCP_CLIENT_INSTANCE_COUNT || data == NULL || max_len == 0u) { + return 0u; + } + + ctx = &g_clients[instance]; + tcp_client_fill_ring_from_pbuf(ctx); + tail = ctx->rx_tail; + while (copied < max_len && tail != ctx->rx_head) { + data[copied++] = ctx->rx_ring[tail]; + tail = (uint16_t)((tail + 1u) % TCP_CLIENT_RX_BUFFER_SIZE); + } + return copied; +} + +void tcp_client_drop(uint8_t instance, uint16_t len) +{ + tcp_client_ctx_t *ctx; + uint16_t dropped = 0u; + + if (instance >= TCP_CLIENT_INSTANCE_COUNT || len == 0u) { + return; + } + + ctx = &g_clients[instance]; + while (dropped < len && ctx->rx_tail != ctx->rx_head) { + ctx->rx_tail = (uint16_t)((ctx->rx_tail + 1u) % TCP_CLIENT_RX_BUFFER_SIZE); + dropped++; + } + if (dropped > 0u && ctx->pcb != NULL) { + tcp_recved(ctx->pcb, dropped); + } + tcp_client_fill_ring_from_pbuf(ctx); +} + bool tcp_client_is_connected(uint8_t instance) { return (instance < TCP_CLIENT_INSTANCE_COUNT) && @@ -299,6 +405,7 @@ void tcp_client_poll(void) for (uint8_t i = 0; i < TCP_CLIENT_INSTANCE_COUNT; ++i) { tcp_client_ctx_t *ctx = &g_clients[i]; + tcp_client_fill_ring_from_pbuf(ctx); if (!ctx->config.enabled || !ctx->config.auto_reconnect || tcp_client_is_connected(i)) { continue; } diff --git a/App/tcp_client.h b/App/tcp_client.h index ecf4532..bcf027b 100644 --- a/App/tcp_client.h +++ b/App/tcp_client.h @@ -14,7 +14,7 @@ extern "C" { #endif #define TCP_CLIENT_INSTANCE_COUNT 2u -#define TCP_CLIENT_RX_BUFFER_SIZE 512u +#define TCP_CLIENT_RX_BUFFER_SIZE 480u #define TCP_CLIENT_RECONNECT_DELAY_MS 3000u typedef enum { @@ -48,6 +48,9 @@ int tcp_client_connect(uint8_t instance); int tcp_client_disconnect(uint8_t instance); int tcp_client_send(uint8_t instance, const uint8_t *data, uint16_t len); int tcp_client_recv(uint8_t instance, uint8_t *data, uint16_t max_len); +uint16_t tcp_client_rx_available(uint8_t instance); +uint16_t tcp_client_peek(uint8_t instance, uint8_t *data, uint16_t max_len); +void tcp_client_drop(uint8_t instance, uint16_t len); bool tcp_client_is_connected(uint8_t instance); void tcp_client_get_status(uint8_t instance, tcp_client_status_t *status); void tcp_client_poll(void); diff --git a/App/tcp_server.c b/App/tcp_server.c index 8936e1a..097401f 100644 --- a/App/tcp_server.c +++ b/App/tcp_server.c @@ -18,6 +18,8 @@ typedef struct { uint8_t rx_ring[TCP_SERVER_RX_BUFFER_SIZE]; uint16_t rx_head; uint16_t rx_tail; + struct pbuf *hold_pbuf; + uint16_t hold_offset; uint8_t index; tcp_server_instance_config_t config; tcp_server_status_t status; @@ -30,10 +32,65 @@ static uint16_t ring_free(uint16_t head, uint16_t tail, uint16_t size) return (head >= tail) ? (uint16_t)(size - head + tail - 1u) : (uint16_t)(tail - head - 1u); } +static uint16_t ring_used(uint16_t head, uint16_t tail, uint16_t size) +{ + return (head >= tail) ? (uint16_t)(head - tail) : (uint16_t)(size - tail + head); +} + +static void tcp_server_reset_rx_state(tcp_server_ctx_t *ctx) +{ + if (ctx == NULL) { + return; + } + if (ctx->hold_pbuf != NULL) { + pbuf_free(ctx->hold_pbuf); + ctx->hold_pbuf = NULL; + } + ctx->hold_offset = 0u; + ctx->rx_head = 0u; + ctx->rx_tail = 0u; +} + +static void tcp_server_fill_ring_from_pbuf(tcp_server_ctx_t *ctx) +{ + struct pbuf *q; + uint16_t offset; + + if (ctx == NULL || ctx->hold_pbuf == NULL) { + return; + } + + q = ctx->hold_pbuf; + offset = ctx->hold_offset; + while (q != NULL && offset >= q->len) { + offset = (uint16_t)(offset - q->len); + q = q->next; + } + + while (q != NULL) { + const uint8_t *src = (const uint8_t *)q->payload; + for (uint16_t i = offset; i < q->len; ++i) { + if (ring_free(ctx->rx_head, ctx->rx_tail, TCP_SERVER_RX_BUFFER_SIZE) == 0u) { + ctx->hold_offset = (uint16_t)(ctx->hold_offset + i - offset); + return; + } + ctx->rx_ring[ctx->rx_head] = src[i]; + ctx->rx_head = (uint16_t)((ctx->rx_head + 1u) % TCP_SERVER_RX_BUFFER_SIZE); + ctx->status.rx_bytes++; + } + ctx->hold_offset = (uint16_t)(ctx->hold_offset + q->len - offset); + offset = 0u; + q = q->next; + } + + pbuf_free(ctx->hold_pbuf); + ctx->hold_pbuf = NULL; + ctx->hold_offset = 0u; +} + static err_t tcp_server_on_recv(void *arg, struct tcp_pcb *pcb, struct pbuf *p, err_t err) { tcp_server_ctx_t *ctx = (tcp_server_ctx_t *)arg; - struct pbuf *q; if (ctx == NULL) { if (p != NULL) { @@ -58,21 +115,16 @@ static err_t tcp_server_on_recv(void *arg, struct tcp_pcb *pcb, struct pbuf *p, return ERR_ABRT; } - for (q = p; q != NULL; q = q->next) { - const uint8_t *src = (const uint8_t *)q->payload; - for (uint16_t i = 0; i < q->len; ++i) { - if (ring_free(ctx->rx_head, ctx->rx_tail, TCP_SERVER_RX_BUFFER_SIZE) == 0u) { - ctx->status.errors++; - break; - } - ctx->rx_ring[ctx->rx_head] = src[i]; - ctx->rx_head = (uint16_t)((ctx->rx_head + 1u) % TCP_SERVER_RX_BUFFER_SIZE); - ctx->status.rx_bytes++; - } + if (ctx->hold_pbuf != NULL) { + ctx->status.errors++; + return ERR_MEM; } - tcp_recved(pcb, p->tot_len); + pbuf_ref(p); + ctx->hold_pbuf = p; + ctx->hold_offset = 0u; pbuf_free(p); + tcp_server_fill_ring_from_pbuf(ctx); return ERR_OK; } @@ -92,6 +144,7 @@ static void tcp_server_on_err(void *arg, err_t err) if (ctx == NULL) { return; } + tcp_server_reset_rx_state(ctx); ctx->client_pcb = NULL; ctx->status.state = ctx->config.enabled ? TCP_SERVER_STATE_LISTENING : TCP_SERVER_STATE_IDLE; ctx->status.errors++; @@ -193,6 +246,7 @@ int tcp_server_stop(uint8_t instance) ctx = &g_servers[instance]; if (ctx->client_pcb != NULL) { + tcp_server_reset_rx_state(ctx); tcp_arg(ctx->client_pcb, NULL); tcp_recv(ctx->client_pcb, NULL); tcp_sent(ctx->client_pcb, NULL); @@ -210,8 +264,7 @@ int tcp_server_stop(uint8_t instance) } ctx->status.state = TCP_SERVER_STATE_IDLE; - ctx->rx_head = 0u; - ctx->rx_tail = 0u; + tcp_server_reset_rx_state(ctx); return 0; } @@ -262,13 +315,66 @@ int tcp_server_recv(uint8_t instance, uint8_t *data, uint16_t max_len) return -1; } ctx = &g_servers[instance]; + tcp_server_fill_ring_from_pbuf(ctx); while (copied < max_len && ctx->rx_tail != ctx->rx_head) { data[copied++] = ctx->rx_ring[ctx->rx_tail]; ctx->rx_tail = (uint16_t)((ctx->rx_tail + 1u) % TCP_SERVER_RX_BUFFER_SIZE); } + if (copied > 0u && ctx->client_pcb != NULL) { + tcp_recved(ctx->client_pcb, copied); + } return (int)copied; } +uint16_t tcp_server_rx_available(uint8_t instance) +{ + if (instance >= TCP_SERVER_INSTANCE_COUNT) { + return 0u; + } + tcp_server_fill_ring_from_pbuf(&g_servers[instance]); + return ring_used(g_servers[instance].rx_head, g_servers[instance].rx_tail, TCP_SERVER_RX_BUFFER_SIZE); +} + +uint16_t tcp_server_peek(uint8_t instance, uint8_t *data, uint16_t max_len) +{ + uint16_t copied = 0u; + uint16_t tail; + tcp_server_ctx_t *ctx; + + if (instance >= TCP_SERVER_INSTANCE_COUNT || data == NULL || max_len == 0u) { + return 0u; + } + + ctx = &g_servers[instance]; + tcp_server_fill_ring_from_pbuf(ctx); + tail = ctx->rx_tail; + while (copied < max_len && tail != ctx->rx_head) { + data[copied++] = ctx->rx_ring[tail]; + tail = (uint16_t)((tail + 1u) % TCP_SERVER_RX_BUFFER_SIZE); + } + return copied; +} + +void tcp_server_drop(uint8_t instance, uint16_t len) +{ + tcp_server_ctx_t *ctx; + uint16_t dropped = 0u; + + if (instance >= TCP_SERVER_INSTANCE_COUNT || len == 0u) { + return; + } + + ctx = &g_servers[instance]; + while (dropped < len && ctx->rx_tail != ctx->rx_head) { + ctx->rx_tail = (uint16_t)((ctx->rx_tail + 1u) % TCP_SERVER_RX_BUFFER_SIZE); + dropped++; + } + if (dropped > 0u && ctx->client_pcb != NULL) { + tcp_recved(ctx->client_pcb, dropped); + } + tcp_server_fill_ring_from_pbuf(ctx); +} + bool tcp_server_is_connected(uint8_t instance) { return (instance < TCP_SERVER_INSTANCE_COUNT) && (g_servers[instance].client_pcb != NULL); @@ -280,3 +386,10 @@ void tcp_server_get_status(uint8_t instance, tcp_server_status_t *status) *status = g_servers[instance].status; } } + +void tcp_server_poll(void) +{ + for (uint8_t i = 0; i < TCP_SERVER_INSTANCE_COUNT; ++i) { + tcp_server_fill_ring_from_pbuf(&g_servers[i]); + } +} diff --git a/App/tcp_server.h b/App/tcp_server.h index e5f5d64..094ee02 100644 --- a/App/tcp_server.h +++ b/App/tcp_server.h @@ -14,7 +14,7 @@ extern "C" { #endif #define TCP_SERVER_INSTANCE_COUNT 2u -#define TCP_SERVER_RX_BUFFER_SIZE 512u +#define TCP_SERVER_RX_BUFFER_SIZE 480u typedef enum { TCP_SERVER_STATE_IDLE = 0, @@ -42,8 +42,12 @@ int tcp_server_start(uint8_t instance); int tcp_server_stop(uint8_t instance); int tcp_server_send(uint8_t instance, const uint8_t *data, uint16_t len); int tcp_server_recv(uint8_t instance, uint8_t *data, uint16_t max_len); +uint16_t tcp_server_rx_available(uint8_t instance); +uint16_t tcp_server_peek(uint8_t instance, uint8_t *data, uint16_t max_len); +void tcp_server_drop(uint8_t instance, uint16_t len); bool tcp_server_is_connected(uint8_t instance); void tcp_server_get_status(uint8_t instance, tcp_server_status_t *status); +void tcp_server_poll(void); #ifdef __cplusplus }