"""Tests for metrics trackers and helpers.""" from s5p.metrics import LatencyTracker, Metrics, RateTracker, _human_bytes # -- LatencyTracker ---------------------------------------------------------- class TestLatencyTracker: """Test latency sample collection and percentile stats.""" def test_empty(self): lt = LatencyTracker() assert lt.count == 0 assert lt.stats() is None def test_single(self): lt = LatencyTracker() lt.record(0.1) s = lt.stats() assert s is not None assert s["count"] == 1 assert s["min"] == s["max"] == s["avg"] == s["p50"] def test_percentiles(self): lt = LatencyTracker() # 100 evenly spaced samples: 0.001, 0.002, ..., 0.100 for i in range(1, 101): lt.record(i / 1000) s = lt.stats() assert s["count"] == 100 assert s["min"] == 1.0 # 0.001s = 1.0ms assert s["max"] == 100.0 # 0.100s = 100.0ms assert 49.0 <= s["avg"] <= 52.0 assert 50.0 <= s["p50"] <= 52.0 assert 95.0 <= s["p95"] <= 97.0 assert 99.0 <= s["p99"] <= 101.0 def test_bounded(self): lt = LatencyTracker(maxlen=5) for i in range(10): lt.record(i / 100) assert lt.count == 5 s = lt.stats() # only the last 5 samples remain: 0.05..0.09 assert s["min"] == 50.0 assert s["max"] == 90.0 def test_milliseconds(self): lt = LatencyTracker() lt.record(0.5) # 500ms s = lt.stats() assert s["min"] == 500.0 assert s["max"] == 500.0 # -- RateTracker ------------------------------------------------------------- class TestRateTracker: """Test rolling window event rate calculation.""" def test_empty(self): rt = RateTracker() assert rt.rate() == 0.0 def test_single(self): rt = RateTracker() rt.record() assert rt.rate() == 0.0 def test_known_rate(self): rt = RateTracker() # 11 events at 0.1s intervals = 10/1.0 = 10.0/s for i in range(11): rt.record(now=100.0 + i * 0.1) assert abs(rt.rate() - 10.0) < 0.01 def test_bounded(self): rt = RateTracker(maxlen=5) for i in range(10): rt.record(now=float(i)) # only last 5 events: t=5..9, span=4, rate=4/4=1.0 assert abs(rt.rate() - 1.0) < 0.01 def test_zero_span(self): rt = RateTracker() rt.record(now=1.0) rt.record(now=1.0) assert rt.rate() == 0.0 # -- Metrics ----------------------------------------------------------------- class TestMetrics: """Test Metrics aggregation and output.""" def test_to_dict_includes_rate_and_latency(self): m = Metrics() m.connections = 10 m.conn_rate.record(now=0.0) m.conn_rate.record(now=1.0) m.latency.record(0.2) m.latency.record(0.3) d = m.to_dict() assert "rate" in d assert isinstance(d["rate"], float) assert "latency" in d assert d["latency"] is not None assert d["latency"]["count"] == 2 def test_to_dict_latency_none_when_empty(self): m = Metrics() d = m.to_dict() assert d["latency"] is None assert d["rate"] == 0.0 def test_summary_includes_rate(self): m = Metrics() m.conn_rate.record(now=0.0) m.conn_rate.record(now=1.0) s = m.summary() assert "rate=" in s assert "/s" in s def test_summary_includes_latency(self): m = Metrics() m.latency.record(0.2) s = m.summary() assert "p50=" in s assert "p95=" in s def test_summary_no_latency_when_empty(self): m = Metrics() s = m.summary() assert "p50=" not in s assert "p95=" not in s def test_listener_latency(self): m = Metrics() m.get_listener_latency("0.0.0.0:1080").record(0.5) m.get_listener_latency("0.0.0.0:1080").record(0.6) m.get_listener_latency("0.0.0.0:1081").record(0.1) d = m.to_dict() assert "listener_latency" in d assert "0.0.0.0:1080" in d["listener_latency"] assert "0.0.0.0:1081" in d["listener_latency"] assert d["listener_latency"]["0.0.0.0:1080"]["count"] == 2 assert d["listener_latency"]["0.0.0.0:1081"]["count"] == 1 def test_listener_latency_empty(self): m = Metrics() d = m.to_dict() assert d["listener_latency"] == {} # -- _human_bytes ------------------------------------------------------------ class TestHumanBytes: """Test byte count formatting.""" def test_bytes(self): assert _human_bytes(0) == "0B" assert _human_bytes(512) == "512B" def test_kilobytes(self): assert _human_bytes(1024) == "1.0K" assert _human_bytes(1536) == "1.5K" def test_megabytes(self): assert _human_bytes(1024 * 1024) == "1.0M" def test_gigabytes(self): assert _human_bytes(1024**3) == "1.0G" def test_terabytes(self): assert _human_bytes(1024**4) == "1.0T" def test_petabytes(self): assert _human_bytes(1024**5) == "1.0P"