#!/usr/bin/env python3 """Simple TCP debug server for validating raw payload forwarding. Usage examples: python tools/tcp_debug_server.py --host 0.0.0.0 --port 8081 python tools/tcp_debug_server.py --port 8081 --echo Features: - Listen as a plain TCP server - Print connect/disconnect events - Print received payload as text and hex - Optional echo mode - Optional stdin -> socket sender for manual testing """ from __future__ import annotations import argparse import select import socket import sys import threading from datetime import datetime def ts() -> str: return datetime.now().strftime("%H:%M:%S.%f")[:-3] def hex_bytes(data: bytes) -> str: return " ".join(f"{b:02X}" for b in data) def text_view(data: bytes) -> str: return "".join(chr(b) if 32 <= b < 127 else "." for b in data) def sender_loop(conn: socket.socket, stop_event: threading.Event) -> None: print(f"[{ts()}] stdin sender ready. Type text and press Enter to send.") while not stop_event.is_set(): line = sys.stdin.readline() if line == "": stop_event.set() break payload = line.encode("utf-8", errors="replace") try: conn.sendall(payload) print( f"[{ts()}] TX {len(payload)} bytes | text={payload!r} | hex={hex_bytes(payload)}" ) except OSError as exc: print(f"[{ts()}] TX failed: {exc}") stop_event.set() break def serve(host: str, port: int, echo: bool, no_stdin: bool) -> int: with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as server: server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) server.bind((host, port)) server.listen(1) print(f"[{ts()}] Listening on {host}:{port}") conn, addr = server.accept() with conn: print(f"[{ts()}] Client connected from {addr[0]}:{addr[1]}") stop_event = threading.Event() sender_thread = None if not no_stdin: sender_thread = threading.Thread( target=sender_loop, args=(conn, stop_event), daemon=True ) sender_thread.start() conn.setblocking(False) while not stop_event.is_set(): ready, _, _ = select.select([conn], [], [], 0.2) if not ready: continue try: data = conn.recv(4096) except BlockingIOError: continue except OSError as exc: print(f"[{ts()}] RX failed: {exc}") break if not data: print(f"[{ts()}] Client disconnected") break print( f"[{ts()}] RX {len(data)} bytes | text={text_view(data)} | hex={hex_bytes(data)}" ) if echo: try: conn.sendall(data) print(f"[{ts()}] ECHO {len(data)} bytes") except OSError as exc: print(f"[{ts()}] Echo failed: {exc}") break stop_event.set() if sender_thread is not None: sender_thread.join(timeout=0.5) return 0 def parse_args() -> argparse.Namespace: parser = argparse.ArgumentParser(description="Minimal raw TCP debug server") parser.add_argument( "--host", default="0.0.0.0", help="Listen host, default: 0.0.0.0" ) parser.add_argument( "--port", type=int, default=8081, help="Listen port, default: 8081" ) parser.add_argument( "--echo", action="store_true", help="Echo received payload back to client" ) parser.add_argument( "--no-stdin", action="store_true", help="Disable stdin sender thread (receive-only mode)", ) return parser.parse_args() def main() -> int: args = parse_args() try: return serve(args.host, args.port, args.echo, args.no_stdin) except KeyboardInterrupt: print(f"\n[{ts()}] Stopped by user") return 0 if __name__ == "__main__": raise SystemExit(main())