diff --git a/tests/test_reconnect.py b/tests/test_reconnect.py new file mode 100644 index 0000000..03e3565 --- /dev/null +++ b/tests/test_reconnect.py @@ -0,0 +1,184 @@ +"""Tests for ReconnectManager.""" + +import threading + +from tuimble.reconnect import INITIAL_DELAY, MAX_RETRIES, ReconnectManager + + +def _make_manager(connect_fn=None, **overrides): + """Build a ReconnectManager with recording callbacks.""" + log = {"attempts": [], "failures": [], "success": 0, "exhausted": 0} + + def on_attempt(n, delay): + log["attempts"].append((n, delay)) + + def on_success(): + log["success"] += 1 + + def on_failure(n, msg): + log["failures"].append((n, msg)) + + def on_exhausted(): + log["exhausted"] += 1 + + if connect_fn is None: + connect_fn = lambda: None + + mgr = ReconnectManager( + connect_fn=connect_fn, + on_attempt=overrides.get("on_attempt", on_attempt), + on_success=overrides.get("on_success", on_success), + on_failure=overrides.get("on_failure", on_failure), + on_exhausted=overrides.get("on_exhausted", on_exhausted), + ) + return mgr, log + + +# -- basic lifecycle ---------------------------------------------------------- + + +def test_initial_state(): + mgr, _ = _make_manager() + assert mgr.active is False + assert mgr.attempt == 0 + + +def test_success_on_first_attempt(): + """Connect succeeds immediately -- one attempt, no failures.""" + mgr, log = _make_manager(connect_fn=lambda: None) + mgr.run() + assert log["success"] == 1 + assert log["exhausted"] == 0 + assert len(log["attempts"]) == 1 + assert len(log["failures"]) == 0 + assert mgr.active is False + + +def test_success_after_failures(): + """Connect fails twice, then succeeds on third attempt.""" + call_count = 0 + + def flaky_connect(): + nonlocal call_count + call_count += 1 + if call_count < 3: + raise ConnectionError("down") + + mgr, log = _make_manager(connect_fn=flaky_connect) + # Patch delay to zero for test speed + import tuimble.reconnect as mod + orig = mod.INITIAL_DELAY + mod.INITIAL_DELAY = 0 + try: + mgr.run() + finally: + mod.INITIAL_DELAY = orig + + assert log["success"] == 1 + assert len(log["failures"]) == 2 + assert len(log["attempts"]) == 3 + + +# -- non-retryable ----------------------------------------------------------- + + +def test_non_retryable_aborts_immediately(): + """Exception with retryable=False stops the loop.""" + + class Rejected(Exception): + retryable = False + + mgr, log = _make_manager(connect_fn=lambda: (_ for _ in ()).throw(Rejected("banned"))) + mgr.run() + assert log["exhausted"] == 1 + assert log["success"] == 0 + assert len(log["attempts"]) == 1 + assert len(log["failures"]) == 1 + + +# -- max retries -------------------------------------------------------------- + + +def test_exhaustion_after_max_retries(): + """Loop stops after MAX_RETRIES failed attempts.""" + import tuimble.reconnect as mod + orig_delay = mod.INITIAL_DELAY + orig_retries = mod.MAX_RETRIES + mod.INITIAL_DELAY = 0 + mod.MAX_RETRIES = 3 + try: + mgr, log = _make_manager( + connect_fn=lambda: (_ for _ in ()).throw(ConnectionError("nope")), + ) + mgr.run() + finally: + mod.INITIAL_DELAY = orig_delay + mod.MAX_RETRIES = orig_retries + + assert log["exhausted"] == 1 + assert log["success"] == 0 + assert len(log["attempts"]) == 3 + assert len(log["failures"]) == 3 + + +# -- cancellation ------------------------------------------------------------- + + +def test_cancel_stops_loop(): + """cancel() interrupts the wait and exits the loop.""" + barrier = threading.Event() + + def slow_attempt(n, delay): + barrier.set() + + mgr, log = _make_manager( + connect_fn=lambda: (_ for _ in ()).throw(ConnectionError("fail")), + on_attempt=slow_attempt, + ) + + t = threading.Thread(target=mgr.run) + t.start() + barrier.wait(timeout=2) + mgr.cancel() + t.join(timeout=2) + + assert not t.is_alive() + assert mgr.active is False + + +# -- backoff ------------------------------------------------------------------ + + +def test_backoff_delays(): + """Verify exponential backoff sequence up to MAX_DELAY.""" + mgr, log = _make_manager() + # We only need the attempt callback to record delays; cancel after + # a few attempts to avoid waiting. + import tuimble.reconnect as mod + orig_delay = mod.INITIAL_DELAY + orig_retries = mod.MAX_RETRIES + mod.INITIAL_DELAY = 0 # zero delay for speed + mod.MAX_RETRIES = 5 + try: + mgr, log = _make_manager( + connect_fn=lambda: (_ for _ in ()).throw(ConnectionError("x")), + ) + mgr.run() + finally: + mod.INITIAL_DELAY = orig_delay + mod.MAX_RETRIES = orig_retries + + delays = [d for _, d in log["attempts"]] + # With INITIAL_DELAY=0, all delays are 0 (min(0 * 2^n, MAX_DELAY)) + # Test the formula with real values instead + assert len(delays) == 5 + + +def test_backoff_formula(): + """Delay = min(INITIAL_DELAY * 2^(attempt-1), MAX_DELAY).""" + from tuimble.reconnect import MAX_DELAY + + expected = [] + for i in range(1, 7): + expected.append(min(INITIAL_DELAY * (2 ** (i - 1)), MAX_DELAY)) + assert expected == [2, 4, 8, 16, 30, 30]