"""Tests for SQLite backlog storage.""" import tempfile from pathlib import Path import pytest from bouncer.backlog import Backlog @pytest.fixture async def backlog(): db_path = Path(tempfile.mktemp(suffix=".db")) bl = Backlog(db_path) await bl.open() yield bl await bl.close() db_path.unlink(missing_ok=True) class TestBacklog: async def test_store_and_replay(self, backlog: Backlog): msg_id = await backlog.store("libera", "#test", "nick", "PRIVMSG", "hello") assert msg_id > 0 entries = await backlog.replay("libera") assert len(entries) == 1 assert entries[0].content == "hello" assert entries[0].sender == "nick" assert entries[0].target == "#test" async def test_replay_since_id(self, backlog: Backlog): id1 = await backlog.store("libera", "#test", "a", "PRIVMSG", "first") id2 = await backlog.store("libera", "#test", "b", "PRIVMSG", "second") id3 = await backlog.store("libera", "#test", "c", "PRIVMSG", "third") entries = await backlog.replay("libera", since_id=id1) assert len(entries) == 2 assert entries[0].id == id2 assert entries[1].id == id3 async def test_replay_empty(self, backlog: Backlog): entries = await backlog.replay("nonexistent") assert entries == [] async def test_network_isolation(self, backlog: Backlog): await backlog.store("libera", "#test", "a", "PRIVMSG", "libera msg") await backlog.store("oftc", "#test", "b", "PRIVMSG", "oftc msg") libera = await backlog.replay("libera") oftc = await backlog.replay("oftc") assert len(libera) == 1 assert len(oftc) == 1 assert libera[0].content == "libera msg" assert oftc[0].content == "oftc msg" async def test_mark_and_get_last_seen(self, backlog: Backlog): assert await backlog.get_last_seen("libera") == 0 await backlog.mark_seen("libera", 42) assert await backlog.get_last_seen("libera") == 42 await backlog.mark_seen("libera", 100) assert await backlog.get_last_seen("libera") == 100 async def test_prune(self, backlog: Backlog): for i in range(10): await backlog.store("libera", "#test", "n", "PRIVMSG", f"msg{i}") deleted = await backlog.prune("libera", keep=3) assert deleted == 7 entries = await backlog.replay("libera") assert len(entries) == 3 assert entries[0].content == "msg7" async def test_prune_zero_keeps_all(self, backlog: Backlog): for i in range(5): await backlog.store("libera", "#test", "n", "PRIVMSG", f"msg{i}") deleted = await backlog.prune("libera", keep=0) assert deleted == 0 assert len(await backlog.replay("libera")) == 5 async def test_record_disconnect(self, backlog: Backlog): await backlog.store("libera", "#test", "n", "PRIVMSG", "msg") await backlog.record_disconnect("libera") last = await backlog.get_last_seen("libera") assert last > 0