feat: add control API module

Lightweight asyncio HTTP handler for runtime inspection and management.
Endpoints: /status, /metrics, /pool, /pool/alive, /config (GET) and
/reload, /pool/test, /pool/refresh (POST). Raw HTTP/1.1 parsing, JSON
responses, no new dependencies.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
user
2026-02-16 19:02:43 +01:00
parent 66fc76ceb3
commit ecf9a840e4
2 changed files with 287 additions and 0 deletions

274
src/s5p/api.py Normal file
View File

@@ -0,0 +1,274 @@
"""Built-in HTTP control API for runtime inspection and management."""
from __future__ import annotations
import asyncio
import json
import logging
import time
from .metrics import Metrics
logger = logging.getLogger("s5p")
# -- HTTP helpers ------------------------------------------------------------
def _parse_request(data: bytes) -> tuple[str, str]:
"""Extract method and path from an HTTP/1.1 request line.
Returns (method, path) or ("", "") on parse failure.
"""
try:
line = data.split(b"\r\n", 1)[0].decode("ascii")
except (UnicodeDecodeError, IndexError):
return "", ""
parts = line.split(None, 2)
if len(parts) < 2:
return "", ""
return parts[0].upper(), parts[1].split("?", 1)[0]
def _json_response(
writer: asyncio.StreamWriter,
status: int,
body: dict | list,
) -> None:
"""Write an HTTP response with JSON body and close."""
phrases = {200: "OK", 400: "Bad Request", 404: "Not Found",
405: "Method Not Allowed", 500: "Internal Server Error"}
payload = json.dumps(body, separators=(",", ":")).encode()
header = (
f"HTTP/1.1 {status} {phrases.get(status, 'Error')}\r\n"
f"Content-Type: application/json\r\n"
f"Content-Length: {len(payload)}\r\n"
f"Connection: close\r\n"
f"\r\n"
)
writer.write(header.encode() + payload)
# -- route handlers ----------------------------------------------------------
def _handle_status(ctx: dict) -> tuple[int, dict]:
"""GET /status -- combined runtime summary."""
metrics: Metrics = ctx["metrics"]
data = {
"uptime": round(time.monotonic() - metrics.started, 1),
"connections": metrics.connections,
"success": metrics.success,
"failed": metrics.failed,
"active": metrics.active,
"bytes_in": metrics.bytes_in,
"bytes_out": metrics.bytes_out,
}
pool = ctx.get("pool")
if pool:
data["pool"] = {"alive": pool.alive_count, "total": pool.count}
config = ctx.get("config")
if config:
data["chain"] = [str(h) for h in config.chain]
return 200, data
def _handle_metrics(ctx: dict) -> tuple[int, dict]:
"""GET /metrics -- full metrics counters."""
return 200, ctx["metrics"].to_dict()
def _handle_pool(ctx: dict, alive_only: bool = False) -> tuple[int, dict]:
"""GET /pool or /pool/alive -- proxy pool state."""
pool = ctx.get("pool")
if not pool:
return 200, {"alive": 0, "total": 0, "proxies": {}}
proxies = {}
for key, entry in pool._proxies.items():
if alive_only and not entry.alive:
continue
proxies[key] = {
"alive": entry.alive,
"fails": entry.fails,
"tests": entry.tests,
"last_ok": entry.last_ok,
"last_test": entry.last_test,
"last_seen": entry.last_seen,
}
return 200, {
"alive": pool.alive_count,
"total": pool.count,
"proxies": proxies,
}
def _handle_config(ctx: dict) -> tuple[int, dict]:
"""GET /config -- sanitized runtime config."""
config = ctx.get("config")
if not config:
return 500, {"error": "config unavailable"}
data: dict = {
"listen": f"{config.listen_host}:{config.listen_port}",
"timeout": config.timeout,
"retries": config.retries,
"log_level": config.log_level,
"max_connections": config.max_connections,
"pool_size": config.pool_size,
"chain": [str(h) for h in config.chain],
}
if config.proxy_pool:
pp = config.proxy_pool
sources = []
for src in pp.sources:
s: dict = {}
if src.url:
s["url"] = src.url
if src.file:
s["file"] = src.file
sources.append(s)
data["proxy_pool"] = {
"sources": sources,
"refresh": pp.refresh,
"test_interval": pp.test_interval,
"max_fails": pp.max_fails,
}
return 200, data
async def _handle_reload(ctx: dict) -> tuple[int, dict]:
"""POST /reload -- re-read config (like SIGHUP)."""
reload_fn = ctx.get("reload_fn")
if not reload_fn:
return 500, {"error": "reload not available"}
try:
await reload_fn()
return 200, {"ok": True}
except Exception as e:
return 500, {"error": str(e)}
async def _handle_pool_test(ctx: dict) -> tuple[int, dict]:
"""POST /pool/test -- trigger immediate health test."""
pool = ctx.get("pool")
if not pool:
return 400, {"error": "no proxy pool configured"}
asyncio.create_task(pool._run_health_tests())
return 200, {"ok": True}
async def _handle_pool_refresh(ctx: dict) -> tuple[int, dict]:
"""POST /pool/refresh -- trigger immediate source re-fetch."""
pool = ctx.get("pool")
if not pool:
return 400, {"error": "no proxy pool configured"}
asyncio.create_task(pool._fetch_all_sources())
return 200, {"ok": True}
# -- routing -----------------------------------------------------------------
_GET_ROUTES: dict[str, str] = {
"/status": "status",
"/metrics": "metrics",
"/pool": "pool",
"/pool/alive": "pool_alive",
"/config": "config",
}
_POST_ROUTES: dict[str, str] = {
"/reload": "reload",
"/pool/test": "pool_test",
"/pool/refresh": "pool_refresh",
}
async def _route(method: str, path: str, ctx: dict) -> tuple[int, dict]:
"""Dispatch request to the appropriate handler."""
if method == "GET" and path in _GET_ROUTES:
name = _GET_ROUTES[path]
if name == "status":
return _handle_status(ctx)
if name == "metrics":
return _handle_metrics(ctx)
if name == "pool":
return _handle_pool(ctx)
if name == "pool_alive":
return _handle_pool(ctx, alive_only=True)
if name == "config":
return _handle_config(ctx)
if method == "POST" and path in _POST_ROUTES:
name = _POST_ROUTES[path]
if name == "reload":
return await _handle_reload(ctx)
if name == "pool_test":
return await _handle_pool_test(ctx)
if name == "pool_refresh":
return await _handle_pool_refresh(ctx)
# wrong method on a known path
if path in _GET_ROUTES or path in _POST_ROUTES:
expected = "GET" if path in _GET_ROUTES else "POST"
return 405, {"error": f"use {expected} for {path}"}
return 404, {"error": "not found"}
# -- server ------------------------------------------------------------------
async def _handle_connection(
reader: asyncio.StreamReader,
writer: asyncio.StreamWriter,
ctx: dict,
) -> None:
"""Handle a single HTTP request on the control API."""
try:
data = await asyncio.wait_for(reader.read(8192), timeout=5.0)
if not data:
return
method, path = _parse_request(data)
if not method:
_json_response(writer, 400, {"error": "bad request"})
await writer.drain()
return
status, body = await _route(method, path, ctx)
_json_response(writer, status, body)
await writer.drain()
except (TimeoutError, ConnectionError, OSError):
pass
except Exception:
logger.debug("api: unexpected error handling request", exc_info=True)
finally:
try:
writer.close()
await writer.wait_closed()
except OSError:
pass
async def start_api(
host: str,
port: int,
ctx: dict,
) -> asyncio.Server:
"""Start the control API HTTP server.
Args:
host: Bind address.
port: Bind port.
ctx: Shared context dict with config, metrics, pool, reload_fn.
Returns:
The running asyncio.Server (caller manages lifecycle).
"""
async def handler(r: asyncio.StreamReader, w: asyncio.StreamWriter) -> None:
await _handle_connection(r, w, ctx)
srv = await asyncio.start_server(handler, host, port)
addrs = ", ".join(str(s.getsockname()) for s in srv.sockets)
logger.info("api: listening on %s", addrs)
return srv

View File

@@ -34,6 +34,19 @@ class Metrics:
f"up={h}h{m:02d}m{s:02d}s"
)
def to_dict(self) -> dict:
"""Return all counters as a dict (for JSON serialization)."""
return {
"connections": self.connections,
"success": self.success,
"failed": self.failed,
"retries": self.retries,
"active": self.active,
"bytes_in": self.bytes_in,
"bytes_out": self.bytes_out,
"uptime": round(time.monotonic() - self.started, 1),
}
def _human_bytes(n: int) -> str:
"""Format byte count in human-readable form."""