Returns {count, sum, 0.5, 0.95, 0.99} in seconds for Prometheus
summary exposition. Companion to the existing stats() (milliseconds).
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
199 lines
5.6 KiB
Python
199 lines
5.6 KiB
Python
"""Tests for metrics trackers and helpers."""
|
|
|
|
from s5p.metrics import LatencyTracker, Metrics, RateTracker, _human_bytes
|
|
|
|
# -- LatencyTracker ----------------------------------------------------------
|
|
|
|
|
|
class TestLatencyTracker:
|
|
"""Test latency sample collection and percentile stats."""
|
|
|
|
def test_empty(self):
|
|
lt = LatencyTracker()
|
|
assert lt.count == 0
|
|
assert lt.stats() is None
|
|
|
|
def test_single(self):
|
|
lt = LatencyTracker()
|
|
lt.record(0.1)
|
|
s = lt.stats()
|
|
assert s is not None
|
|
assert s["count"] == 1
|
|
assert s["min"] == s["max"] == s["avg"] == s["p50"]
|
|
|
|
def test_percentiles(self):
|
|
lt = LatencyTracker()
|
|
# 100 evenly spaced samples: 0.001, 0.002, ..., 0.100
|
|
for i in range(1, 101):
|
|
lt.record(i / 1000)
|
|
s = lt.stats()
|
|
assert s["count"] == 100
|
|
assert s["min"] == 1.0 # 0.001s = 1.0ms
|
|
assert s["max"] == 100.0 # 0.100s = 100.0ms
|
|
assert 49.0 <= s["avg"] <= 52.0
|
|
assert 50.0 <= s["p50"] <= 52.0
|
|
assert 95.0 <= s["p95"] <= 97.0
|
|
assert 99.0 <= s["p99"] <= 101.0
|
|
|
|
def test_bounded(self):
|
|
lt = LatencyTracker(maxlen=5)
|
|
for i in range(10):
|
|
lt.record(i / 100)
|
|
assert lt.count == 5
|
|
s = lt.stats()
|
|
# only the last 5 samples remain: 0.05..0.09
|
|
assert s["min"] == 50.0
|
|
assert s["max"] == 90.0
|
|
|
|
def test_milliseconds(self):
|
|
lt = LatencyTracker()
|
|
lt.record(0.5) # 500ms
|
|
s = lt.stats()
|
|
assert s["min"] == 500.0
|
|
assert s["max"] == 500.0
|
|
|
|
def test_quantiles_empty(self):
|
|
lt = LatencyTracker()
|
|
assert lt.quantiles() is None
|
|
|
|
def test_quantiles_seconds(self):
|
|
lt = LatencyTracker()
|
|
for i in range(1, 101):
|
|
lt.record(i / 1000)
|
|
q = lt.quantiles()
|
|
assert q is not None
|
|
assert q["count"] == 100
|
|
assert 0.050 <= q["0.5"] <= 0.052
|
|
assert 0.095 <= q["0.95"] <= 0.097
|
|
assert 0.099 <= q["0.99"] <= 0.101
|
|
assert "sum" in q
|
|
assert q["sum"] > 0
|
|
|
|
|
|
# -- RateTracker -------------------------------------------------------------
|
|
|
|
|
|
class TestRateTracker:
|
|
"""Test rolling window event rate calculation."""
|
|
|
|
def test_empty(self):
|
|
rt = RateTracker()
|
|
assert rt.rate() == 0.0
|
|
|
|
def test_single(self):
|
|
rt = RateTracker()
|
|
rt.record()
|
|
assert rt.rate() == 0.0
|
|
|
|
def test_known_rate(self):
|
|
rt = RateTracker()
|
|
# 11 events at 0.1s intervals = 10/1.0 = 10.0/s
|
|
for i in range(11):
|
|
rt.record(now=100.0 + i * 0.1)
|
|
assert abs(rt.rate() - 10.0) < 0.01
|
|
|
|
def test_bounded(self):
|
|
rt = RateTracker(maxlen=5)
|
|
for i in range(10):
|
|
rt.record(now=float(i))
|
|
# only last 5 events: t=5..9, span=4, rate=4/4=1.0
|
|
assert abs(rt.rate() - 1.0) < 0.01
|
|
|
|
def test_zero_span(self):
|
|
rt = RateTracker()
|
|
rt.record(now=1.0)
|
|
rt.record(now=1.0)
|
|
assert rt.rate() == 0.0
|
|
|
|
|
|
# -- Metrics -----------------------------------------------------------------
|
|
|
|
|
|
class TestMetrics:
|
|
"""Test Metrics aggregation and output."""
|
|
|
|
def test_to_dict_includes_rate_and_latency(self):
|
|
m = Metrics()
|
|
m.connections = 10
|
|
m.conn_rate.record(now=0.0)
|
|
m.conn_rate.record(now=1.0)
|
|
m.latency.record(0.2)
|
|
m.latency.record(0.3)
|
|
d = m.to_dict()
|
|
assert "rate" in d
|
|
assert isinstance(d["rate"], float)
|
|
assert "latency" in d
|
|
assert d["latency"] is not None
|
|
assert d["latency"]["count"] == 2
|
|
|
|
def test_to_dict_latency_none_when_empty(self):
|
|
m = Metrics()
|
|
d = m.to_dict()
|
|
assert d["latency"] is None
|
|
assert d["rate"] == 0.0
|
|
|
|
def test_summary_includes_rate(self):
|
|
m = Metrics()
|
|
m.conn_rate.record(now=0.0)
|
|
m.conn_rate.record(now=1.0)
|
|
s = m.summary()
|
|
assert "rate=" in s
|
|
assert "/s" in s
|
|
|
|
def test_summary_includes_latency(self):
|
|
m = Metrics()
|
|
m.latency.record(0.2)
|
|
s = m.summary()
|
|
assert "p50=" in s
|
|
assert "p95=" in s
|
|
|
|
def test_summary_no_latency_when_empty(self):
|
|
m = Metrics()
|
|
s = m.summary()
|
|
assert "p50=" not in s
|
|
assert "p95=" not in s
|
|
|
|
def test_listener_latency(self):
|
|
m = Metrics()
|
|
m.get_listener_latency("0.0.0.0:1080").record(0.5)
|
|
m.get_listener_latency("0.0.0.0:1080").record(0.6)
|
|
m.get_listener_latency("0.0.0.0:1081").record(0.1)
|
|
d = m.to_dict()
|
|
assert "listener_latency" in d
|
|
assert "0.0.0.0:1080" in d["listener_latency"]
|
|
assert "0.0.0.0:1081" in d["listener_latency"]
|
|
assert d["listener_latency"]["0.0.0.0:1080"]["count"] == 2
|
|
assert d["listener_latency"]["0.0.0.0:1081"]["count"] == 1
|
|
|
|
def test_listener_latency_empty(self):
|
|
m = Metrics()
|
|
d = m.to_dict()
|
|
assert d["listener_latency"] == {}
|
|
|
|
|
|
# -- _human_bytes ------------------------------------------------------------
|
|
|
|
|
|
class TestHumanBytes:
|
|
"""Test byte count formatting."""
|
|
|
|
def test_bytes(self):
|
|
assert _human_bytes(0) == "0B"
|
|
assert _human_bytes(512) == "512B"
|
|
|
|
def test_kilobytes(self):
|
|
assert _human_bytes(1024) == "1.0K"
|
|
assert _human_bytes(1536) == "1.5K"
|
|
|
|
def test_megabytes(self):
|
|
assert _human_bytes(1024 * 1024) == "1.0M"
|
|
|
|
def test_gigabytes(self):
|
|
assert _human_bytes(1024**3) == "1.0G"
|
|
|
|
def test_terabytes(self):
|
|
assert _human_bytes(1024**4) == "1.0T"
|
|
|
|
def test_petabytes(self):
|
|
assert _human_bytes(1024**5) == "1.0P"
|