"""Tests for the URL title preview plugin.""" import asyncio import importlib.util import sys import time from pathlib import Path from unittest.mock import patch from derp.irc import Message # plugins/ is not a Python package -- load the module from file path _spec = importlib.util.spec_from_file_location( "plugins.urltitle", Path(__file__).resolve().parent.parent / "plugins" / "urltitle.py", ) _mod = importlib.util.module_from_spec(_spec) sys.modules[_spec.name] = _mod _spec.loader.exec_module(_mod) from plugins.urltitle import ( # noqa: E402, I001 _TitleParser, _check_cooldown, _clean_url, _extract_urls, _fetch_title, _is_ignored_url, _ps, on_privmsg, ) # -- Helpers ----------------------------------------------------------------- class _FakeBot: """Minimal bot stand-in that captures sent messages.""" def __init__(self): self.sent: list[tuple[str, str]] = [] self.nick = "derp" self.prefix = "!" self._pstate: dict = {} self.config = { "flaskpaste": {"url": "https://paste.mymx.me"}, "urltitle": {}, } async def send(self, target: str, text: str) -> None: self.sent.append((target, text)) def _msg(text: str, nick: str = "alice", target: str = "#test") -> Message: """Create a channel PRIVMSG.""" return Message( raw="", prefix=f"{nick}!~{nick}@host", nick=nick, command="PRIVMSG", params=[target, text], tags={}, ) def _pm(text: str, nick: str = "alice") -> Message: """Create a private PRIVMSG.""" return Message( raw="", prefix=f"{nick}!~{nick}@host", nick=nick, command="PRIVMSG", params=["derp", text], tags={}, ) class _FakeResp: """Fake HTTP response for mocking _urlopen.""" def __init__(self, data: bytes = b"", content_type: str = "text/html", status: int = 200): self._data = data self.headers = {"Content-Type": content_type} self.status = status def read(self, n: int = -1) -> bytes: if n == -1: return self._data return self._data[:n] def close(self) -> None: pass # --------------------------------------------------------------------------- # TestExtractUrls # --------------------------------------------------------------------------- class TestExtractUrls: def test_single_url(self): urls = _extract_urls("check https://example.com please") assert urls == ["https://example.com"] def test_multiple_urls(self): urls = _extract_urls("see https://a.com and http://b.com ok") assert urls == ["https://a.com", "http://b.com"] def test_max_limit(self): text = "https://a.com https://b.com https://c.com https://d.com" urls = _extract_urls(text, max_urls=2) assert len(urls) == 2 def test_trailing_punctuation(self): urls = _extract_urls("visit https://example.com.") assert urls == ["https://example.com"] def test_trailing_comma(self): urls = _extract_urls("https://example.com, check it") assert urls == ["https://example.com"] def test_balanced_parens(self): urls = _extract_urls("https://en.wikipedia.org/wiki/Foo_(bar)") assert urls == ["https://en.wikipedia.org/wiki/Foo_(bar)"] def test_unbalanced_paren_stripped(self): urls = _extract_urls("(https://example.com)") assert urls == ["https://example.com"] def test_suppressed_url(self): urls = _extract_urls("!https://example.com") assert urls == [] def test_suppressed_mixed(self): urls = _extract_urls("!https://skip.com https://keep.com") assert urls == ["https://keep.com"] def test_no_urls(self): urls = _extract_urls("no urls here") assert urls == [] def test_dedup(self): urls = _extract_urls("https://a.com https://a.com") assert urls == ["https://a.com"] # --------------------------------------------------------------------------- # TestCleanUrl # --------------------------------------------------------------------------- class TestCleanUrl: def test_no_trailing(self): assert _clean_url("https://example.com") == "https://example.com" def test_strip_period(self): assert _clean_url("https://example.com.") == "https://example.com" def test_strip_semicolon(self): assert _clean_url("https://example.com;") == "https://example.com" def test_preserve_balanced_parens(self): url = "https://en.wikipedia.org/wiki/Foo_(bar)" assert _clean_url(url) == url def test_strip_trailing_paren_unbalanced(self): assert _clean_url("https://example.com)") == "https://example.com" def test_multiple_trailing(self): assert _clean_url("https://example.com..;") == "https://example.com" # --------------------------------------------------------------------------- # TestTitleParser # --------------------------------------------------------------------------- class TestTitleParser: def test_og_title_priority(self): parser = _TitleParser() parser.feed(""" Page Title """) assert parser.best_title == "OG Title" def test_title_fallback(self): parser = _TitleParser() parser.feed("Fallback Title") assert parser.best_title == "Fallback Title" def test_og_description(self): parser = _TitleParser() parser.feed(""" """) assert parser.best_description == "OG Desc" def test_meta_description_fallback(self): parser = _TitleParser() parser.feed('') assert parser.best_description == "Meta Desc" def test_whitespace_collapse(self): parser = _TitleParser() parser.feed(" Hello World ") assert parser.title == "Hello World" def test_no_title(self): parser = _TitleParser() parser.feed("No title here") assert parser.best_title == "" def test_multipart_title(self): parser = _TitleParser() parser.feed("Part 1 <em>Part 2</em> Part 3") # The parser collects text data; triggers start/end but # its text is still captured by handle_data assert "Part 1" in parser.title def test_empty_og_title(self): parser = _TitleParser() parser.feed(""" Real Title """) assert parser.best_title == "Real Title" # --------------------------------------------------------------------------- # TestIsIgnoredUrl # --------------------------------------------------------------------------- class TestIsIgnoredUrl: def test_paste_host(self): assert _is_ignored_url( "https://paste.mymx.me/abc", {"paste.mymx.me"}, ) is True def test_image_extension(self): assert _is_ignored_url( "https://example.com/photo.png", set(), ) is True def test_pdf_extension(self): assert _is_ignored_url( "https://example.com/doc.pdf", set(), ) is True def test_zip_extension(self): assert _is_ignored_url( "https://example.com/archive.zip", set(), ) is True def test_normal_url_passes(self): assert _is_ignored_url( "https://example.com/page", set(), ) is False def test_html_extension_passes(self): assert _is_ignored_url( "https://example.com/page.html", set(), ) is False def test_custom_ignore_host(self): assert _is_ignored_url( "https://private.local/x", {"private.local"}, ) is True # --------------------------------------------------------------------------- # TestFetchTitle # --------------------------------------------------------------------------- class TestFetchTitle: def test_successful_html(self): html = b"Test Page" head_resp = _FakeResp(b"", content_type="text/html") get_resp = _FakeResp(html, content_type="text/html; charset=utf-8") calls = iter([head_resp, get_resp]) with patch.object(_mod, "_urlopen", side_effect=lambda *a, **kw: next(calls)): title, desc = _fetch_title("https://example.com") assert title == "Test Page" def test_non_html_content_type_bails(self): head_resp = _FakeResp(b"", content_type="application/json") with patch.object(_mod, "_urlopen", return_value=head_resp): title, desc = _fetch_title("https://example.com/api") assert title == "" assert desc == "" def test_head_fail_falls_through_to_get(self): html = b"Recovered" get_resp = _FakeResp(html, content_type="text/html") def side_effect(req, **kw): if req.get_method() == "HEAD": raise ConnectionError("HEAD not supported") return get_resp with patch.object(_mod, "_urlopen", side_effect=side_effect): title, desc = _fetch_title("https://example.com") assert title == "Recovered" def test_network_error_returns_empty(self): with patch.object(_mod, "_urlopen", side_effect=ConnectionError("fail")): title, desc = _fetch_title("https://example.com") assert title == "" assert desc == "" def test_og_tags_extracted(self): html = ( b'' b'' b'' b'' ) head_resp = _FakeResp(b"", content_type="text/html") get_resp = _FakeResp(html, content_type="text/html") calls = iter([head_resp, get_resp]) with patch.object(_mod, "_urlopen", side_effect=lambda *a, **kw: next(calls)): title, desc = _fetch_title("https://example.com") assert title == "OG Title" assert desc == "OG Desc" def test_get_non_html_bails(self): """HEAD returns html but GET returns non-html (redirect to binary).""" head_resp = _FakeResp(b"", content_type="text/html") get_resp = _FakeResp(b"\x89PNG", content_type="image/png") calls = iter([head_resp, get_resp]) with patch.object(_mod, "_urlopen", side_effect=lambda *a, **kw: next(calls)): title, desc = _fetch_title("https://example.com/img") assert title == "" # --------------------------------------------------------------------------- # TestCooldown # --------------------------------------------------------------------------- class TestCooldown: def setup_method(self): self.bot = _FakeBot() def test_first_access_not_cooled(self): assert _check_cooldown(self.bot, "https://a.com", 300) is False def test_second_access_within_window(self): _check_cooldown(self.bot, "https://b.com", 300) assert _check_cooldown(self.bot, "https://b.com", 300) is True def test_after_cooldown_expires(self): seen = _ps(self.bot)["seen"] seen["https://c.com"] = time.monotonic() - 400 assert _check_cooldown(self.bot, "https://c.com", 300) is False def test_pruning(self): """Cache is pruned when it exceeds max size.""" seen = _ps(self.bot)["seen"] old = time.monotonic() - 600 for i in range(600): seen[f"https://stale-{i}.com"] = old _check_cooldown(self.bot, "https://new.com", 300) assert len(seen) < 600 # --------------------------------------------------------------------------- # TestOnPrivmsg # --------------------------------------------------------------------------- class TestOnPrivmsg: def test_channel_url_previewed(self): bot = _FakeBot() html = b"Example" head_resp = _FakeResp(b"", content_type="text/html") get_resp = _FakeResp(html, content_type="text/html") calls = iter([head_resp, get_resp]) def inner(): with patch.object(_mod, "_urlopen", side_effect=lambda *a, **kw: next(calls)): asyncio.run(on_privmsg(bot, _msg("check https://example.com"))) inner() assert len(bot.sent) == 1 assert bot.sent[0][0] == "#test" assert "\u21b3 Example" in bot.sent[0][1] def test_pm_ignored(self): bot = _FakeBot() asyncio.run(on_privmsg(bot, _pm("https://example.com"))) assert len(bot.sent) == 0 def test_bot_nick_ignored(self): bot = _FakeBot() asyncio.run(on_privmsg(bot, _msg("https://example.com", nick="derp"))) assert len(bot.sent) == 0 def test_command_ignored(self): bot = _FakeBot() asyncio.run(on_privmsg(bot, _msg("!shorten https://example.com"))) assert len(bot.sent) == 0 def test_suppressed_url(self): bot = _FakeBot() asyncio.run(on_privmsg(bot, _msg("!https://example.com"))) assert len(bot.sent) == 0 def test_paste_host_ignored(self): bot = _FakeBot() asyncio.run(on_privmsg( bot, _msg("https://paste.mymx.me/some-paste"), )) assert len(bot.sent) == 0 def test_empty_title_skipped(self): bot = _FakeBot() html = b"No title here" head_resp = _FakeResp(b"", content_type="text/html") get_resp = _FakeResp(html, content_type="text/html") calls = iter([head_resp, get_resp]) with patch.object(_mod, "_urlopen", side_effect=lambda *a, **kw: next(calls)): asyncio.run(on_privmsg(bot, _msg("https://notitle.com"))) assert len(bot.sent) == 0 def test_image_url_skipped(self): bot = _FakeBot() asyncio.run(on_privmsg( bot, _msg("https://example.com/photo.png"), )) assert len(bot.sent) == 0 def test_title_with_description(self): bot = _FakeBot() html = ( b'' b'My Page' b'' b'' ) head_resp = _FakeResp(b"", content_type="text/html") get_resp = _FakeResp(html, content_type="text/html") calls = iter([head_resp, get_resp]) with patch.object(_mod, "_urlopen", side_effect=lambda *a, **kw: next(calls)): asyncio.run(on_privmsg(bot, _msg("https://example.com"))) assert len(bot.sent) == 1 assert "My Page -- A great page" in bot.sent[0][1] def test_cooldown_prevents_repeat(self): bot = _FakeBot() html = b"Example" def make_calls(): return iter([ _FakeResp(b"", content_type="text/html"), _FakeResp(html, content_type="text/html"), ]) calls = make_calls() with patch.object(_mod, "_urlopen", side_effect=lambda *a, **kw: next(calls)): asyncio.run(on_privmsg(bot, _msg("https://cooldown.com"))) assert len(bot.sent) == 1 bot.sent.clear() # Same URL again -- should be suppressed by cooldown calls = make_calls() with patch.object(_mod, "_urlopen", side_effect=lambda *a, **kw: next(calls)): asyncio.run(on_privmsg(bot, _msg("https://cooldown.com"))) assert len(bot.sent) == 0