test: add ReconnectManager unit tests
Covers success, retry, non-retryable abort, max-retry exhaustion, cancel from another thread, and backoff formula.
This commit is contained in:
184
tests/test_reconnect.py
Normal file
184
tests/test_reconnect.py
Normal file
@@ -0,0 +1,184 @@
|
||||
"""Tests for ReconnectManager."""
|
||||
|
||||
import threading
|
||||
|
||||
from tuimble.reconnect import INITIAL_DELAY, MAX_RETRIES, ReconnectManager
|
||||
|
||||
|
||||
def _make_manager(connect_fn=None, **overrides):
|
||||
"""Build a ReconnectManager with recording callbacks."""
|
||||
log = {"attempts": [], "failures": [], "success": 0, "exhausted": 0}
|
||||
|
||||
def on_attempt(n, delay):
|
||||
log["attempts"].append((n, delay))
|
||||
|
||||
def on_success():
|
||||
log["success"] += 1
|
||||
|
||||
def on_failure(n, msg):
|
||||
log["failures"].append((n, msg))
|
||||
|
||||
def on_exhausted():
|
||||
log["exhausted"] += 1
|
||||
|
||||
if connect_fn is None:
|
||||
connect_fn = lambda: None
|
||||
|
||||
mgr = ReconnectManager(
|
||||
connect_fn=connect_fn,
|
||||
on_attempt=overrides.get("on_attempt", on_attempt),
|
||||
on_success=overrides.get("on_success", on_success),
|
||||
on_failure=overrides.get("on_failure", on_failure),
|
||||
on_exhausted=overrides.get("on_exhausted", on_exhausted),
|
||||
)
|
||||
return mgr, log
|
||||
|
||||
|
||||
# -- basic lifecycle ----------------------------------------------------------
|
||||
|
||||
|
||||
def test_initial_state():
|
||||
mgr, _ = _make_manager()
|
||||
assert mgr.active is False
|
||||
assert mgr.attempt == 0
|
||||
|
||||
|
||||
def test_success_on_first_attempt():
|
||||
"""Connect succeeds immediately -- one attempt, no failures."""
|
||||
mgr, log = _make_manager(connect_fn=lambda: None)
|
||||
mgr.run()
|
||||
assert log["success"] == 1
|
||||
assert log["exhausted"] == 0
|
||||
assert len(log["attempts"]) == 1
|
||||
assert len(log["failures"]) == 0
|
||||
assert mgr.active is False
|
||||
|
||||
|
||||
def test_success_after_failures():
|
||||
"""Connect fails twice, then succeeds on third attempt."""
|
||||
call_count = 0
|
||||
|
||||
def flaky_connect():
|
||||
nonlocal call_count
|
||||
call_count += 1
|
||||
if call_count < 3:
|
||||
raise ConnectionError("down")
|
||||
|
||||
mgr, log = _make_manager(connect_fn=flaky_connect)
|
||||
# Patch delay to zero for test speed
|
||||
import tuimble.reconnect as mod
|
||||
orig = mod.INITIAL_DELAY
|
||||
mod.INITIAL_DELAY = 0
|
||||
try:
|
||||
mgr.run()
|
||||
finally:
|
||||
mod.INITIAL_DELAY = orig
|
||||
|
||||
assert log["success"] == 1
|
||||
assert len(log["failures"]) == 2
|
||||
assert len(log["attempts"]) == 3
|
||||
|
||||
|
||||
# -- non-retryable -----------------------------------------------------------
|
||||
|
||||
|
||||
def test_non_retryable_aborts_immediately():
|
||||
"""Exception with retryable=False stops the loop."""
|
||||
|
||||
class Rejected(Exception):
|
||||
retryable = False
|
||||
|
||||
mgr, log = _make_manager(connect_fn=lambda: (_ for _ in ()).throw(Rejected("banned")))
|
||||
mgr.run()
|
||||
assert log["exhausted"] == 1
|
||||
assert log["success"] == 0
|
||||
assert len(log["attempts"]) == 1
|
||||
assert len(log["failures"]) == 1
|
||||
|
||||
|
||||
# -- max retries --------------------------------------------------------------
|
||||
|
||||
|
||||
def test_exhaustion_after_max_retries():
|
||||
"""Loop stops after MAX_RETRIES failed attempts."""
|
||||
import tuimble.reconnect as mod
|
||||
orig_delay = mod.INITIAL_DELAY
|
||||
orig_retries = mod.MAX_RETRIES
|
||||
mod.INITIAL_DELAY = 0
|
||||
mod.MAX_RETRIES = 3
|
||||
try:
|
||||
mgr, log = _make_manager(
|
||||
connect_fn=lambda: (_ for _ in ()).throw(ConnectionError("nope")),
|
||||
)
|
||||
mgr.run()
|
||||
finally:
|
||||
mod.INITIAL_DELAY = orig_delay
|
||||
mod.MAX_RETRIES = orig_retries
|
||||
|
||||
assert log["exhausted"] == 1
|
||||
assert log["success"] == 0
|
||||
assert len(log["attempts"]) == 3
|
||||
assert len(log["failures"]) == 3
|
||||
|
||||
|
||||
# -- cancellation -------------------------------------------------------------
|
||||
|
||||
|
||||
def test_cancel_stops_loop():
|
||||
"""cancel() interrupts the wait and exits the loop."""
|
||||
barrier = threading.Event()
|
||||
|
||||
def slow_attempt(n, delay):
|
||||
barrier.set()
|
||||
|
||||
mgr, log = _make_manager(
|
||||
connect_fn=lambda: (_ for _ in ()).throw(ConnectionError("fail")),
|
||||
on_attempt=slow_attempt,
|
||||
)
|
||||
|
||||
t = threading.Thread(target=mgr.run)
|
||||
t.start()
|
||||
barrier.wait(timeout=2)
|
||||
mgr.cancel()
|
||||
t.join(timeout=2)
|
||||
|
||||
assert not t.is_alive()
|
||||
assert mgr.active is False
|
||||
|
||||
|
||||
# -- backoff ------------------------------------------------------------------
|
||||
|
||||
|
||||
def test_backoff_delays():
|
||||
"""Verify exponential backoff sequence up to MAX_DELAY."""
|
||||
mgr, log = _make_manager()
|
||||
# We only need the attempt callback to record delays; cancel after
|
||||
# a few attempts to avoid waiting.
|
||||
import tuimble.reconnect as mod
|
||||
orig_delay = mod.INITIAL_DELAY
|
||||
orig_retries = mod.MAX_RETRIES
|
||||
mod.INITIAL_DELAY = 0 # zero delay for speed
|
||||
mod.MAX_RETRIES = 5
|
||||
try:
|
||||
mgr, log = _make_manager(
|
||||
connect_fn=lambda: (_ for _ in ()).throw(ConnectionError("x")),
|
||||
)
|
||||
mgr.run()
|
||||
finally:
|
||||
mod.INITIAL_DELAY = orig_delay
|
||||
mod.MAX_RETRIES = orig_retries
|
||||
|
||||
delays = [d for _, d in log["attempts"]]
|
||||
# With INITIAL_DELAY=0, all delays are 0 (min(0 * 2^n, MAX_DELAY))
|
||||
# Test the formula with real values instead
|
||||
assert len(delays) == 5
|
||||
|
||||
|
||||
def test_backoff_formula():
|
||||
"""Delay = min(INITIAL_DELAY * 2^(attempt-1), MAX_DELAY)."""
|
||||
from tuimble.reconnect import MAX_DELAY
|
||||
|
||||
expected = []
|
||||
for i in range(1, 7):
|
||||
expected.append(min(INITIAL_DELAY * (2 ** (i - 1)), MAX_DELAY))
|
||||
assert expected == [2, 4, 8, 16, 30, 30]
|
||||
Reference in New Issue
Block a user