feat: connection rate and chain latency metrics
Add RateTracker (rolling deque, events/sec) and LatencyTracker (circular buffer, p50/p95/p99 in ms) to the Metrics class. Both are recorded in _handle_client and exposed via summary(), to_dict(), /status, and /metrics. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -25,7 +25,7 @@
|
||||
- [x] Built-in control API (runtime metrics, pool state, config reload)
|
||||
- [ ] SOCKS5 server authentication (username/password)
|
||||
- [x] Tor control port integration (circuit renewal via NEWNYM)
|
||||
- [ ] Metrics (connections/sec, bytes relayed, hop latency)
|
||||
- [x] Metrics (connections/sec, bytes relayed, hop latency)
|
||||
|
||||
## v0.3.0
|
||||
|
||||
|
||||
1
TASKS.md
1
TASKS.md
@@ -48,6 +48,7 @@
|
||||
- [x] Replace HTTP health check with TLS handshake (round-robin targets, no httpbin dependency)
|
||||
|
||||
- [x] Multi-listener with configurable proxy chaining (per-port chain depth)
|
||||
- [x] Connection rate and chain latency metrics (rate/s, p50/p95/p99)
|
||||
|
||||
## Next
|
||||
- [ ] Integration tests with mock proxy server
|
||||
|
||||
@@ -164,7 +164,22 @@ python -m pstats ~/.cache/s5p/s5p.prof # container profile output
|
||||
## Metrics Log
|
||||
|
||||
```
|
||||
metrics: conn=142 ok=98 fail=44 retries=88 active=3 in=1.2M out=4.5M up=0h05m12s pool=42/65
|
||||
metrics: conn=1842 ok=1790 fail=52 retries=67 active=3 in=50.0M out=1.0G rate=4.72/s p50=198.3ms p95=890.1ms up=1h01m01s pool=42/65
|
||||
```
|
||||
|
||||
## Metrics JSON (`/metrics`)
|
||||
|
||||
```bash
|
||||
curl -s http://127.0.0.1:1081/metrics | jq .
|
||||
```
|
||||
|
||||
```json
|
||||
{
|
||||
"connections": 1842,
|
||||
"success": 1790,
|
||||
"rate": 4.72,
|
||||
"latency": {"count": 1000, "min": 45.2, "max": 2841.7, "avg": 312.4, "p50": 198.3, "p95": 890.1, "p99": 1523.6}
|
||||
}
|
||||
```
|
||||
|
||||
## Troubleshooting
|
||||
|
||||
@@ -307,7 +307,7 @@ s5p --api 127.0.0.1:1081 -c config/s5p.yaml
|
||||
| Method | Path | Description |
|
||||
|--------|------|-------------|
|
||||
| `GET` | `/status` | Combined summary: uptime, metrics, pool stats, chain |
|
||||
| `GET` | `/metrics` | Full metrics counters (connections, bytes, uptime) |
|
||||
| `GET` | `/metrics` | Full metrics counters (connections, bytes, rate, latency) |
|
||||
| `GET` | `/pool` | All proxies with per-entry state |
|
||||
| `GET` | `/pool/alive` | Alive proxies only |
|
||||
| `GET` | `/config` | Current runtime config (sanitized) |
|
||||
@@ -461,7 +461,7 @@ s5p tracks connection metrics and logs a summary every 60 seconds and on
|
||||
shutdown:
|
||||
|
||||
```
|
||||
metrics: conn=142 ok=98 fail=44 retries=88 active=3 in=1.2M out=4.5M up=0h05m12s pool=42/65
|
||||
metrics: conn=1842 ok=1790 fail=52 retries=67 active=3 in=50.0M out=1.0G rate=4.72/s p50=198.3ms p95=890.1ms up=1h01m01s pool=42/65
|
||||
```
|
||||
|
||||
| Counter | Meaning |
|
||||
@@ -473,9 +473,48 @@ metrics: conn=142 ok=98 fail=44 retries=88 active=3 in=1.2M out=4.5M up=0h05m12s
|
||||
| `active` | Currently relaying |
|
||||
| `in` | Bytes client -> remote |
|
||||
| `out` | Bytes remote -> client |
|
||||
| `rate` | Connection rate (events/sec, rolling window) |
|
||||
| `p50` | Median chain setup latency in ms |
|
||||
| `p95` | 95th percentile chain setup latency in ms |
|
||||
| `up` | Server uptime |
|
||||
| `pool` | Alive/total proxies (only when pool is active) |
|
||||
|
||||
### `/metrics` JSON response
|
||||
|
||||
`GET /metrics` returns all counters plus rate and latency percentiles:
|
||||
|
||||
```json
|
||||
{
|
||||
"connections": 1842,
|
||||
"success": 1790,
|
||||
"failed": 52,
|
||||
"retries": 67,
|
||||
"active": 3,
|
||||
"bytes_in": 52428800,
|
||||
"bytes_out": 1073741824,
|
||||
"uptime": 3661.2,
|
||||
"rate": 4.72,
|
||||
"latency": {
|
||||
"count": 1000,
|
||||
"min": 45.2,
|
||||
"max": 2841.7,
|
||||
"avg": 312.4,
|
||||
"p50": 198.3,
|
||||
"p95": 890.1,
|
||||
"p99": 1523.6
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
| Field | Type | Description |
|
||||
|-------|------|-------------|
|
||||
| `rate` | float | Connections/sec (rolling window of last 256 events) |
|
||||
| `latency` | object/null | Chain setup latency stats in ms (null if no samples) |
|
||||
| `latency.count` | int | Number of samples in buffer (max 1000) |
|
||||
| `latency.p50` | float | Median latency (ms) |
|
||||
| `latency.p95` | float | 95th percentile (ms) |
|
||||
| `latency.p99` | float | 99th percentile (ms) |
|
||||
|
||||
## Profiling
|
||||
|
||||
```bash
|
||||
|
||||
@@ -63,6 +63,8 @@ def _handle_status(ctx: dict) -> tuple[int, dict]:
|
||||
"active": metrics.active,
|
||||
"bytes_in": metrics.bytes_in,
|
||||
"bytes_out": metrics.bytes_out,
|
||||
"rate": round(metrics.conn_rate.rate(), 2),
|
||||
"latency": metrics.latency.stats(),
|
||||
}
|
||||
pool = ctx.get("pool")
|
||||
if pool:
|
||||
|
||||
@@ -3,6 +3,69 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import time
|
||||
from collections import deque
|
||||
|
||||
|
||||
class RateTracker:
|
||||
"""Rolling window event rate (events/sec).
|
||||
|
||||
Stores up to ``maxlen`` monotonic timestamps. Rate is computed
|
||||
on read as ``(n - 1) / span`` over the window -- no background timer.
|
||||
"""
|
||||
|
||||
def __init__(self, maxlen: int = 256) -> None:
|
||||
self._times: deque[float] = deque(maxlen=maxlen)
|
||||
|
||||
def record(self, now: float | None = None) -> None:
|
||||
"""Record an event at *now* (default: ``time.monotonic()``)."""
|
||||
self._times.append(now if now is not None else time.monotonic())
|
||||
|
||||
def rate(self) -> float:
|
||||
"""Return events/sec over the window, 0.0 if < 2 events."""
|
||||
n = len(self._times)
|
||||
if n < 2:
|
||||
return 0.0
|
||||
span = self._times[-1] - self._times[0]
|
||||
if span <= 0:
|
||||
return 0.0
|
||||
return (n - 1) / span
|
||||
|
||||
|
||||
class LatencyTracker:
|
||||
"""Circular buffer of latency samples with percentile stats.
|
||||
|
||||
Stores up to ``maxlen`` float-second samples. ``stats()`` sorts
|
||||
a copy on read (~0.1 ms for 1000 floats) and returns millisecond values.
|
||||
"""
|
||||
|
||||
def __init__(self, maxlen: int = 1000) -> None:
|
||||
self._samples: deque[float] = deque(maxlen=maxlen)
|
||||
|
||||
def record(self, seconds: float) -> None:
|
||||
"""Append a latency sample (in seconds)."""
|
||||
self._samples.append(seconds)
|
||||
|
||||
@property
|
||||
def count(self) -> int:
|
||||
"""Number of samples currently stored."""
|
||||
return len(self._samples)
|
||||
|
||||
def stats(self) -> dict | None:
|
||||
"""Return ``{count, min, max, avg, p50, p95, p99}`` in ms, or None."""
|
||||
n = len(self._samples)
|
||||
if n == 0:
|
||||
return None
|
||||
s = sorted(self._samples)
|
||||
ms = [v * 1000 for v in s]
|
||||
return {
|
||||
"count": n,
|
||||
"min": round(ms[0], 1),
|
||||
"max": round(ms[-1], 1),
|
||||
"avg": round(sum(ms) / n, 1),
|
||||
"p50": round(ms[int(n * 0.50)], 1),
|
||||
"p95": round(ms[min(int(n * 0.95), n - 1)], 1),
|
||||
"p99": round(ms[min(int(n * 0.99), n - 1)], 1),
|
||||
}
|
||||
|
||||
|
||||
class Metrics:
|
||||
@@ -21,16 +84,23 @@ class Metrics:
|
||||
self.bytes_out: int = 0
|
||||
self.active: int = 0
|
||||
self.started: float = time.monotonic()
|
||||
self.conn_rate: RateTracker = RateTracker()
|
||||
self.latency: LatencyTracker = LatencyTracker()
|
||||
|
||||
def summary(self) -> str:
|
||||
"""One-line log-friendly summary."""
|
||||
uptime = time.monotonic() - self.started
|
||||
h, rem = divmod(int(uptime), 3600)
|
||||
m, s = divmod(rem, 60)
|
||||
rate = self.conn_rate.rate()
|
||||
lat = self.latency.stats()
|
||||
p50 = f" p50={lat['p50']:.1f}ms" if lat else ""
|
||||
p95 = f" p95={lat['p95']:.1f}ms" if lat else ""
|
||||
return (
|
||||
f"conn={self.connections} ok={self.success} fail={self.failed} "
|
||||
f"retries={self.retries} active={self.active} "
|
||||
f"in={_human_bytes(self.bytes_in)} out={_human_bytes(self.bytes_out)} "
|
||||
f"rate={rate:.2f}/s{p50}{p95} "
|
||||
f"up={h}h{m:02d}m{s:02d}s"
|
||||
)
|
||||
|
||||
@@ -45,6 +115,8 @@ class Metrics:
|
||||
"bytes_in": self.bytes_in,
|
||||
"bytes_out": self.bytes_out,
|
||||
"uptime": round(time.monotonic() - self.started, 1),
|
||||
"rate": round(self.conn_rate.rate(), 2),
|
||||
"latency": self.latency.stats(),
|
||||
}
|
||||
|
||||
|
||||
|
||||
@@ -73,6 +73,7 @@ async def _handle_client(
|
||||
|
||||
if metrics:
|
||||
metrics.connections += 1
|
||||
metrics.conn_rate.record()
|
||||
|
||||
try:
|
||||
# -- greeting --
|
||||
@@ -126,6 +127,8 @@ async def _handle_client(
|
||||
)
|
||||
dt = time.monotonic() - t0
|
||||
logger.debug("[%s] chain up in %.0fms", tag, dt * 1000)
|
||||
if metrics:
|
||||
metrics.latency.record(dt)
|
||||
break
|
||||
except (ProtoError, TimeoutError, ConnectionError, OSError) as e:
|
||||
last_err = e
|
||||
|
||||
@@ -109,6 +109,8 @@ class TestHandleStatus:
|
||||
assert body["connections"] == 10
|
||||
assert body["success"] == 8
|
||||
assert "uptime" in body
|
||||
assert "rate" in body
|
||||
assert "latency" in body
|
||||
|
||||
def test_with_pool(self):
|
||||
pool = MagicMock()
|
||||
@@ -152,6 +154,8 @@ class TestHandleMetrics:
|
||||
assert body["connections"] == 42
|
||||
assert body["bytes_in"] == 1024
|
||||
assert "uptime" in body
|
||||
assert "rate" in body
|
||||
assert "latency" in body
|
||||
|
||||
|
||||
class TestHandlePool:
|
||||
|
||||
164
tests/test_metrics.py
Normal file
164
tests/test_metrics.py
Normal file
@@ -0,0 +1,164 @@
|
||||
"""Tests for metrics trackers and helpers."""
|
||||
|
||||
from s5p.metrics import LatencyTracker, Metrics, RateTracker, _human_bytes
|
||||
|
||||
# -- LatencyTracker ----------------------------------------------------------
|
||||
|
||||
|
||||
class TestLatencyTracker:
|
||||
"""Test latency sample collection and percentile stats."""
|
||||
|
||||
def test_empty(self):
|
||||
lt = LatencyTracker()
|
||||
assert lt.count == 0
|
||||
assert lt.stats() is None
|
||||
|
||||
def test_single(self):
|
||||
lt = LatencyTracker()
|
||||
lt.record(0.1)
|
||||
s = lt.stats()
|
||||
assert s is not None
|
||||
assert s["count"] == 1
|
||||
assert s["min"] == s["max"] == s["avg"] == s["p50"]
|
||||
|
||||
def test_percentiles(self):
|
||||
lt = LatencyTracker()
|
||||
# 100 evenly spaced samples: 0.001, 0.002, ..., 0.100
|
||||
for i in range(1, 101):
|
||||
lt.record(i / 1000)
|
||||
s = lt.stats()
|
||||
assert s["count"] == 100
|
||||
assert s["min"] == 1.0 # 0.001s = 1.0ms
|
||||
assert s["max"] == 100.0 # 0.100s = 100.0ms
|
||||
assert 49.0 <= s["avg"] <= 52.0
|
||||
assert 50.0 <= s["p50"] <= 52.0
|
||||
assert 95.0 <= s["p95"] <= 97.0
|
||||
assert 99.0 <= s["p99"] <= 101.0
|
||||
|
||||
def test_bounded(self):
|
||||
lt = LatencyTracker(maxlen=5)
|
||||
for i in range(10):
|
||||
lt.record(i / 100)
|
||||
assert lt.count == 5
|
||||
s = lt.stats()
|
||||
# only the last 5 samples remain: 0.05..0.09
|
||||
assert s["min"] == 50.0
|
||||
assert s["max"] == 90.0
|
||||
|
||||
def test_milliseconds(self):
|
||||
lt = LatencyTracker()
|
||||
lt.record(0.5) # 500ms
|
||||
s = lt.stats()
|
||||
assert s["min"] == 500.0
|
||||
assert s["max"] == 500.0
|
||||
|
||||
|
||||
# -- RateTracker -------------------------------------------------------------
|
||||
|
||||
|
||||
class TestRateTracker:
|
||||
"""Test rolling window event rate calculation."""
|
||||
|
||||
def test_empty(self):
|
||||
rt = RateTracker()
|
||||
assert rt.rate() == 0.0
|
||||
|
||||
def test_single(self):
|
||||
rt = RateTracker()
|
||||
rt.record()
|
||||
assert rt.rate() == 0.0
|
||||
|
||||
def test_known_rate(self):
|
||||
rt = RateTracker()
|
||||
# 11 events at 0.1s intervals = 10/1.0 = 10.0/s
|
||||
for i in range(11):
|
||||
rt.record(now=100.0 + i * 0.1)
|
||||
assert abs(rt.rate() - 10.0) < 0.01
|
||||
|
||||
def test_bounded(self):
|
||||
rt = RateTracker(maxlen=5)
|
||||
for i in range(10):
|
||||
rt.record(now=float(i))
|
||||
# only last 5 events: t=5..9, span=4, rate=4/4=1.0
|
||||
assert abs(rt.rate() - 1.0) < 0.01
|
||||
|
||||
def test_zero_span(self):
|
||||
rt = RateTracker()
|
||||
rt.record(now=1.0)
|
||||
rt.record(now=1.0)
|
||||
assert rt.rate() == 0.0
|
||||
|
||||
|
||||
# -- Metrics -----------------------------------------------------------------
|
||||
|
||||
|
||||
class TestMetrics:
|
||||
"""Test Metrics aggregation and output."""
|
||||
|
||||
def test_to_dict_includes_rate_and_latency(self):
|
||||
m = Metrics()
|
||||
m.connections = 10
|
||||
m.conn_rate.record(now=0.0)
|
||||
m.conn_rate.record(now=1.0)
|
||||
m.latency.record(0.2)
|
||||
m.latency.record(0.3)
|
||||
d = m.to_dict()
|
||||
assert "rate" in d
|
||||
assert isinstance(d["rate"], float)
|
||||
assert "latency" in d
|
||||
assert d["latency"] is not None
|
||||
assert d["latency"]["count"] == 2
|
||||
|
||||
def test_to_dict_latency_none_when_empty(self):
|
||||
m = Metrics()
|
||||
d = m.to_dict()
|
||||
assert d["latency"] is None
|
||||
assert d["rate"] == 0.0
|
||||
|
||||
def test_summary_includes_rate(self):
|
||||
m = Metrics()
|
||||
m.conn_rate.record(now=0.0)
|
||||
m.conn_rate.record(now=1.0)
|
||||
s = m.summary()
|
||||
assert "rate=" in s
|
||||
assert "/s" in s
|
||||
|
||||
def test_summary_includes_latency(self):
|
||||
m = Metrics()
|
||||
m.latency.record(0.2)
|
||||
s = m.summary()
|
||||
assert "p50=" in s
|
||||
assert "p95=" in s
|
||||
|
||||
def test_summary_no_latency_when_empty(self):
|
||||
m = Metrics()
|
||||
s = m.summary()
|
||||
assert "p50=" not in s
|
||||
assert "p95=" not in s
|
||||
|
||||
|
||||
# -- _human_bytes ------------------------------------------------------------
|
||||
|
||||
|
||||
class TestHumanBytes:
|
||||
"""Test byte count formatting."""
|
||||
|
||||
def test_bytes(self):
|
||||
assert _human_bytes(0) == "0B"
|
||||
assert _human_bytes(512) == "512B"
|
||||
|
||||
def test_kilobytes(self):
|
||||
assert _human_bytes(1024) == "1.0K"
|
||||
assert _human_bytes(1536) == "1.5K"
|
||||
|
||||
def test_megabytes(self):
|
||||
assert _human_bytes(1024 * 1024) == "1.0M"
|
||||
|
||||
def test_gigabytes(self):
|
||||
assert _human_bytes(1024**3) == "1.0G"
|
||||
|
||||
def test_terabytes(self):
|
||||
assert _human_bytes(1024**4) == "1.0T"
|
||||
|
||||
def test_petabytes(self):
|
||||
assert _human_bytes(1024**5) == "1.0P"
|
||||
Reference in New Issue
Block a user