From ecf9a840e43c315a6daa325baaf8f6ec906ce3a4 Mon Sep 17 00:00:00 2001 From: user Date: Mon, 16 Feb 2026 19:02:43 +0100 Subject: [PATCH] feat: add control API module Lightweight asyncio HTTP handler for runtime inspection and management. Endpoints: /status, /metrics, /pool, /pool/alive, /config (GET) and /reload, /pool/test, /pool/refresh (POST). Raw HTTP/1.1 parsing, JSON responses, no new dependencies. Co-Authored-By: Claude Opus 4.6 --- src/s5p/api.py | 274 +++++++++++++++++++++++++++++++++++++++++++++ src/s5p/metrics.py | 13 +++ 2 files changed, 287 insertions(+) create mode 100644 src/s5p/api.py diff --git a/src/s5p/api.py b/src/s5p/api.py new file mode 100644 index 0000000..fb0d40e --- /dev/null +++ b/src/s5p/api.py @@ -0,0 +1,274 @@ +"""Built-in HTTP control API for runtime inspection and management.""" + +from __future__ import annotations + +import asyncio +import json +import logging +import time + +from .metrics import Metrics + +logger = logging.getLogger("s5p") + + +# -- HTTP helpers ------------------------------------------------------------ + + +def _parse_request(data: bytes) -> tuple[str, str]: + """Extract method and path from an HTTP/1.1 request line. + + Returns (method, path) or ("", "") on parse failure. + """ + try: + line = data.split(b"\r\n", 1)[0].decode("ascii") + except (UnicodeDecodeError, IndexError): + return "", "" + parts = line.split(None, 2) + if len(parts) < 2: + return "", "" + return parts[0].upper(), parts[1].split("?", 1)[0] + + +def _json_response( + writer: asyncio.StreamWriter, + status: int, + body: dict | list, +) -> None: + """Write an HTTP response with JSON body and close.""" + phrases = {200: "OK", 400: "Bad Request", 404: "Not Found", + 405: "Method Not Allowed", 500: "Internal Server Error"} + payload = json.dumps(body, separators=(",", ":")).encode() + header = ( + f"HTTP/1.1 {status} {phrases.get(status, 'Error')}\r\n" + f"Content-Type: application/json\r\n" + f"Content-Length: {len(payload)}\r\n" + f"Connection: close\r\n" + f"\r\n" + ) + writer.write(header.encode() + payload) + + +# -- route handlers ---------------------------------------------------------- + + +def _handle_status(ctx: dict) -> tuple[int, dict]: + """GET /status -- combined runtime summary.""" + metrics: Metrics = ctx["metrics"] + data = { + "uptime": round(time.monotonic() - metrics.started, 1), + "connections": metrics.connections, + "success": metrics.success, + "failed": metrics.failed, + "active": metrics.active, + "bytes_in": metrics.bytes_in, + "bytes_out": metrics.bytes_out, + } + pool = ctx.get("pool") + if pool: + data["pool"] = {"alive": pool.alive_count, "total": pool.count} + config = ctx.get("config") + if config: + data["chain"] = [str(h) for h in config.chain] + return 200, data + + +def _handle_metrics(ctx: dict) -> tuple[int, dict]: + """GET /metrics -- full metrics counters.""" + return 200, ctx["metrics"].to_dict() + + +def _handle_pool(ctx: dict, alive_only: bool = False) -> tuple[int, dict]: + """GET /pool or /pool/alive -- proxy pool state.""" + pool = ctx.get("pool") + if not pool: + return 200, {"alive": 0, "total": 0, "proxies": {}} + + proxies = {} + for key, entry in pool._proxies.items(): + if alive_only and not entry.alive: + continue + proxies[key] = { + "alive": entry.alive, + "fails": entry.fails, + "tests": entry.tests, + "last_ok": entry.last_ok, + "last_test": entry.last_test, + "last_seen": entry.last_seen, + } + return 200, { + "alive": pool.alive_count, + "total": pool.count, + "proxies": proxies, + } + + +def _handle_config(ctx: dict) -> tuple[int, dict]: + """GET /config -- sanitized runtime config.""" + config = ctx.get("config") + if not config: + return 500, {"error": "config unavailable"} + + data: dict = { + "listen": f"{config.listen_host}:{config.listen_port}", + "timeout": config.timeout, + "retries": config.retries, + "log_level": config.log_level, + "max_connections": config.max_connections, + "pool_size": config.pool_size, + "chain": [str(h) for h in config.chain], + } + if config.proxy_pool: + pp = config.proxy_pool + sources = [] + for src in pp.sources: + s: dict = {} + if src.url: + s["url"] = src.url + if src.file: + s["file"] = src.file + sources.append(s) + data["proxy_pool"] = { + "sources": sources, + "refresh": pp.refresh, + "test_interval": pp.test_interval, + "max_fails": pp.max_fails, + } + return 200, data + + +async def _handle_reload(ctx: dict) -> tuple[int, dict]: + """POST /reload -- re-read config (like SIGHUP).""" + reload_fn = ctx.get("reload_fn") + if not reload_fn: + return 500, {"error": "reload not available"} + try: + await reload_fn() + return 200, {"ok": True} + except Exception as e: + return 500, {"error": str(e)} + + +async def _handle_pool_test(ctx: dict) -> tuple[int, dict]: + """POST /pool/test -- trigger immediate health test.""" + pool = ctx.get("pool") + if not pool: + return 400, {"error": "no proxy pool configured"} + asyncio.create_task(pool._run_health_tests()) + return 200, {"ok": True} + + +async def _handle_pool_refresh(ctx: dict) -> tuple[int, dict]: + """POST /pool/refresh -- trigger immediate source re-fetch.""" + pool = ctx.get("pool") + if not pool: + return 400, {"error": "no proxy pool configured"} + asyncio.create_task(pool._fetch_all_sources()) + return 200, {"ok": True} + + +# -- routing ----------------------------------------------------------------- + +_GET_ROUTES: dict[str, str] = { + "/status": "status", + "/metrics": "metrics", + "/pool": "pool", + "/pool/alive": "pool_alive", + "/config": "config", +} + +_POST_ROUTES: dict[str, str] = { + "/reload": "reload", + "/pool/test": "pool_test", + "/pool/refresh": "pool_refresh", +} + + +async def _route(method: str, path: str, ctx: dict) -> tuple[int, dict]: + """Dispatch request to the appropriate handler.""" + if method == "GET" and path in _GET_ROUTES: + name = _GET_ROUTES[path] + if name == "status": + return _handle_status(ctx) + if name == "metrics": + return _handle_metrics(ctx) + if name == "pool": + return _handle_pool(ctx) + if name == "pool_alive": + return _handle_pool(ctx, alive_only=True) + if name == "config": + return _handle_config(ctx) + + if method == "POST" and path in _POST_ROUTES: + name = _POST_ROUTES[path] + if name == "reload": + return await _handle_reload(ctx) + if name == "pool_test": + return await _handle_pool_test(ctx) + if name == "pool_refresh": + return await _handle_pool_refresh(ctx) + + # wrong method on a known path + if path in _GET_ROUTES or path in _POST_ROUTES: + expected = "GET" if path in _GET_ROUTES else "POST" + return 405, {"error": f"use {expected} for {path}"} + + return 404, {"error": "not found"} + + +# -- server ------------------------------------------------------------------ + + +async def _handle_connection( + reader: asyncio.StreamReader, + writer: asyncio.StreamWriter, + ctx: dict, +) -> None: + """Handle a single HTTP request on the control API.""" + try: + data = await asyncio.wait_for(reader.read(8192), timeout=5.0) + if not data: + return + method, path = _parse_request(data) + if not method: + _json_response(writer, 400, {"error": "bad request"}) + await writer.drain() + return + + status, body = await _route(method, path, ctx) + _json_response(writer, status, body) + await writer.drain() + except (TimeoutError, ConnectionError, OSError): + pass + except Exception: + logger.debug("api: unexpected error handling request", exc_info=True) + finally: + try: + writer.close() + await writer.wait_closed() + except OSError: + pass + + +async def start_api( + host: str, + port: int, + ctx: dict, +) -> asyncio.Server: + """Start the control API HTTP server. + + Args: + host: Bind address. + port: Bind port. + ctx: Shared context dict with config, metrics, pool, reload_fn. + + Returns: + The running asyncio.Server (caller manages lifecycle). + """ + async def handler(r: asyncio.StreamReader, w: asyncio.StreamWriter) -> None: + await _handle_connection(r, w, ctx) + + srv = await asyncio.start_server(handler, host, port) + addrs = ", ".join(str(s.getsockname()) for s in srv.sockets) + logger.info("api: listening on %s", addrs) + return srv diff --git a/src/s5p/metrics.py b/src/s5p/metrics.py index 8c97a85..01ea7c3 100644 --- a/src/s5p/metrics.py +++ b/src/s5p/metrics.py @@ -34,6 +34,19 @@ class Metrics: f"up={h}h{m:02d}m{s:02d}s" ) + def to_dict(self) -> dict: + """Return all counters as a dict (for JSON serialization).""" + return { + "connections": self.connections, + "success": self.success, + "failed": self.failed, + "retries": self.retries, + "active": self.active, + "bytes_in": self.bytes_in, + "bytes_out": self.bytes_out, + "uptime": round(time.monotonic() - self.started, 1), + } + def _human_bytes(n: int) -> str: """Format byte count in human-readable form."""