Files
derp/tests/test_urltitle.py
user 073659607e feat: add multi-server support
Connect to multiple IRC servers concurrently from a single config file.
Plugins are loaded once and shared; per-server state is isolated via
separate SQLite databases and per-bot runtime state (bot._pstate).

- Add build_server_configs() for [servers.*] config layout
- Bot.__init__ gains name parameter, _pstate dict for plugin isolation
- cli.py runs multiple bots via asyncio.gather
- 9 stateful plugins migrated from module-level dicts to _ps(bot) pattern
- Backward compatible: legacy [server] config works unchanged

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-21 19:04:20 +01:00

479 lines
16 KiB
Python

"""Tests for the URL title preview plugin."""
import asyncio
import importlib.util
import sys
import time
from pathlib import Path
from unittest.mock import patch
from derp.irc import Message
# plugins/ is not a Python package -- load the module from file path
_spec = importlib.util.spec_from_file_location(
"plugins.urltitle",
Path(__file__).resolve().parent.parent / "plugins" / "urltitle.py",
)
_mod = importlib.util.module_from_spec(_spec)
sys.modules[_spec.name] = _mod
_spec.loader.exec_module(_mod)
from plugins.urltitle import ( # noqa: E402, I001
_TitleParser,
_check_cooldown,
_clean_url,
_extract_urls,
_fetch_title,
_is_ignored_url,
_ps,
on_privmsg,
)
# -- Helpers -----------------------------------------------------------------
class _FakeBot:
"""Minimal bot stand-in that captures sent messages."""
def __init__(self):
self.sent: list[tuple[str, str]] = []
self.nick = "derp"
self.prefix = "!"
self._pstate: dict = {}
self.config = {
"flaskpaste": {"url": "https://paste.mymx.me"},
"urltitle": {},
}
async def send(self, target: str, text: str) -> None:
self.sent.append((target, text))
def _msg(text: str, nick: str = "alice", target: str = "#test") -> Message:
"""Create a channel PRIVMSG."""
return Message(
raw="", prefix=f"{nick}!~{nick}@host", nick=nick,
command="PRIVMSG", params=[target, text], tags={},
)
def _pm(text: str, nick: str = "alice") -> Message:
"""Create a private PRIVMSG."""
return Message(
raw="", prefix=f"{nick}!~{nick}@host", nick=nick,
command="PRIVMSG", params=["derp", text], tags={},
)
class _FakeResp:
"""Fake HTTP response for mocking _urlopen."""
def __init__(self, data: bytes = b"", content_type: str = "text/html",
status: int = 200):
self._data = data
self.headers = {"Content-Type": content_type}
self.status = status
def read(self, n: int = -1) -> bytes:
if n == -1:
return self._data
return self._data[:n]
def close(self) -> None:
pass
# ---------------------------------------------------------------------------
# TestExtractUrls
# ---------------------------------------------------------------------------
class TestExtractUrls:
def test_single_url(self):
urls = _extract_urls("check https://example.com please")
assert urls == ["https://example.com"]
def test_multiple_urls(self):
urls = _extract_urls("see https://a.com and http://b.com ok")
assert urls == ["https://a.com", "http://b.com"]
def test_max_limit(self):
text = "https://a.com https://b.com https://c.com https://d.com"
urls = _extract_urls(text, max_urls=2)
assert len(urls) == 2
def test_trailing_punctuation(self):
urls = _extract_urls("visit https://example.com.")
assert urls == ["https://example.com"]
def test_trailing_comma(self):
urls = _extract_urls("https://example.com, check it")
assert urls == ["https://example.com"]
def test_balanced_parens(self):
urls = _extract_urls("https://en.wikipedia.org/wiki/Foo_(bar)")
assert urls == ["https://en.wikipedia.org/wiki/Foo_(bar)"]
def test_unbalanced_paren_stripped(self):
urls = _extract_urls("(https://example.com)")
assert urls == ["https://example.com"]
def test_suppressed_url(self):
urls = _extract_urls("!https://example.com")
assert urls == []
def test_suppressed_mixed(self):
urls = _extract_urls("!https://skip.com https://keep.com")
assert urls == ["https://keep.com"]
def test_no_urls(self):
urls = _extract_urls("no urls here")
assert urls == []
def test_dedup(self):
urls = _extract_urls("https://a.com https://a.com")
assert urls == ["https://a.com"]
# ---------------------------------------------------------------------------
# TestCleanUrl
# ---------------------------------------------------------------------------
class TestCleanUrl:
def test_no_trailing(self):
assert _clean_url("https://example.com") == "https://example.com"
def test_strip_period(self):
assert _clean_url("https://example.com.") == "https://example.com"
def test_strip_semicolon(self):
assert _clean_url("https://example.com;") == "https://example.com"
def test_preserve_balanced_parens(self):
url = "https://en.wikipedia.org/wiki/Foo_(bar)"
assert _clean_url(url) == url
def test_strip_trailing_paren_unbalanced(self):
assert _clean_url("https://example.com)") == "https://example.com"
def test_multiple_trailing(self):
assert _clean_url("https://example.com..;") == "https://example.com"
# ---------------------------------------------------------------------------
# TestTitleParser
# ---------------------------------------------------------------------------
class TestTitleParser:
def test_og_title_priority(self):
parser = _TitleParser()
parser.feed("""
<html><head>
<meta property="og:title" content="OG Title">
<title>Page Title</title>
</head></html>
""")
assert parser.best_title == "OG Title"
def test_title_fallback(self):
parser = _TitleParser()
parser.feed("<html><head><title>Fallback Title</title></head></html>")
assert parser.best_title == "Fallback Title"
def test_og_description(self):
parser = _TitleParser()
parser.feed("""
<meta property="og:description" content="OG Desc">
<meta name="description" content="Meta Desc">
""")
assert parser.best_description == "OG Desc"
def test_meta_description_fallback(self):
parser = _TitleParser()
parser.feed('<meta name="description" content="Meta Desc">')
assert parser.best_description == "Meta Desc"
def test_whitespace_collapse(self):
parser = _TitleParser()
parser.feed("<title> Hello World </title>")
assert parser.title == "Hello World"
def test_no_title(self):
parser = _TitleParser()
parser.feed("<html><body>No title here</body></html>")
assert parser.best_title == ""
def test_multipart_title(self):
parser = _TitleParser()
parser.feed("<title>Part 1 <em>Part 2</em> Part 3</title>")
# The parser collects text data; <em> triggers start/end but
# its text is still captured by handle_data
assert "Part 1" in parser.title
def test_empty_og_title(self):
parser = _TitleParser()
parser.feed("""
<meta property="og:title" content="">
<title>Real Title</title>
""")
assert parser.best_title == "Real Title"
# ---------------------------------------------------------------------------
# TestIsIgnoredUrl
# ---------------------------------------------------------------------------
class TestIsIgnoredUrl:
def test_paste_host(self):
assert _is_ignored_url(
"https://paste.mymx.me/abc", {"paste.mymx.me"},
) is True
def test_image_extension(self):
assert _is_ignored_url(
"https://example.com/photo.png", set(),
) is True
def test_pdf_extension(self):
assert _is_ignored_url(
"https://example.com/doc.pdf", set(),
) is True
def test_zip_extension(self):
assert _is_ignored_url(
"https://example.com/archive.zip", set(),
) is True
def test_normal_url_passes(self):
assert _is_ignored_url(
"https://example.com/page", set(),
) is False
def test_html_extension_passes(self):
assert _is_ignored_url(
"https://example.com/page.html", set(),
) is False
def test_custom_ignore_host(self):
assert _is_ignored_url(
"https://private.local/x", {"private.local"},
) is True
# ---------------------------------------------------------------------------
# TestFetchTitle
# ---------------------------------------------------------------------------
class TestFetchTitle:
def test_successful_html(self):
html = b"<html><head><title>Test Page</title></head></html>"
head_resp = _FakeResp(b"", content_type="text/html")
get_resp = _FakeResp(html, content_type="text/html; charset=utf-8")
calls = iter([head_resp, get_resp])
with patch.object(_mod, "_urlopen", side_effect=lambda *a, **kw: next(calls)):
title, desc = _fetch_title("https://example.com")
assert title == "Test Page"
def test_non_html_content_type_bails(self):
head_resp = _FakeResp(b"", content_type="application/json")
with patch.object(_mod, "_urlopen", return_value=head_resp):
title, desc = _fetch_title("https://example.com/api")
assert title == ""
assert desc == ""
def test_head_fail_falls_through_to_get(self):
html = b"<html><head><title>Recovered</title></head></html>"
get_resp = _FakeResp(html, content_type="text/html")
def side_effect(req, **kw):
if req.get_method() == "HEAD":
raise ConnectionError("HEAD not supported")
return get_resp
with patch.object(_mod, "_urlopen", side_effect=side_effect):
title, desc = _fetch_title("https://example.com")
assert title == "Recovered"
def test_network_error_returns_empty(self):
with patch.object(_mod, "_urlopen", side_effect=ConnectionError("fail")):
title, desc = _fetch_title("https://example.com")
assert title == ""
assert desc == ""
def test_og_tags_extracted(self):
html = (
b'<html><head>'
b'<meta property="og:title" content="OG Title">'
b'<meta property="og:description" content="OG Desc">'
b'</head></html>'
)
head_resp = _FakeResp(b"", content_type="text/html")
get_resp = _FakeResp(html, content_type="text/html")
calls = iter([head_resp, get_resp])
with patch.object(_mod, "_urlopen", side_effect=lambda *a, **kw: next(calls)):
title, desc = _fetch_title("https://example.com")
assert title == "OG Title"
assert desc == "OG Desc"
def test_get_non_html_bails(self):
"""HEAD returns html but GET returns non-html (redirect to binary)."""
head_resp = _FakeResp(b"", content_type="text/html")
get_resp = _FakeResp(b"\x89PNG", content_type="image/png")
calls = iter([head_resp, get_resp])
with patch.object(_mod, "_urlopen", side_effect=lambda *a, **kw: next(calls)):
title, desc = _fetch_title("https://example.com/img")
assert title == ""
# ---------------------------------------------------------------------------
# TestCooldown
# ---------------------------------------------------------------------------
class TestCooldown:
def setup_method(self):
self.bot = _FakeBot()
def test_first_access_not_cooled(self):
assert _check_cooldown(self.bot, "https://a.com", 300) is False
def test_second_access_within_window(self):
_check_cooldown(self.bot, "https://b.com", 300)
assert _check_cooldown(self.bot, "https://b.com", 300) is True
def test_after_cooldown_expires(self):
seen = _ps(self.bot)["seen"]
seen["https://c.com"] = time.monotonic() - 400
assert _check_cooldown(self.bot, "https://c.com", 300) is False
def test_pruning(self):
"""Cache is pruned when it exceeds max size."""
seen = _ps(self.bot)["seen"]
old = time.monotonic() - 600
for i in range(600):
seen[f"https://stale-{i}.com"] = old
_check_cooldown(self.bot, "https://new.com", 300)
assert len(seen) < 600
# ---------------------------------------------------------------------------
# TestOnPrivmsg
# ---------------------------------------------------------------------------
class TestOnPrivmsg:
def test_channel_url_previewed(self):
bot = _FakeBot()
html = b"<html><head><title>Example</title></head></html>"
head_resp = _FakeResp(b"", content_type="text/html")
get_resp = _FakeResp(html, content_type="text/html")
calls = iter([head_resp, get_resp])
def inner():
with patch.object(_mod, "_urlopen",
side_effect=lambda *a, **kw: next(calls)):
asyncio.run(on_privmsg(bot, _msg("check https://example.com")))
inner()
assert len(bot.sent) == 1
assert bot.sent[0][0] == "#test"
assert "\u21b3 Example" in bot.sent[0][1]
def test_pm_ignored(self):
bot = _FakeBot()
asyncio.run(on_privmsg(bot, _pm("https://example.com")))
assert len(bot.sent) == 0
def test_bot_nick_ignored(self):
bot = _FakeBot()
asyncio.run(on_privmsg(bot, _msg("https://example.com", nick="derp")))
assert len(bot.sent) == 0
def test_command_ignored(self):
bot = _FakeBot()
asyncio.run(on_privmsg(bot, _msg("!shorten https://example.com")))
assert len(bot.sent) == 0
def test_suppressed_url(self):
bot = _FakeBot()
asyncio.run(on_privmsg(bot, _msg("!https://example.com")))
assert len(bot.sent) == 0
def test_paste_host_ignored(self):
bot = _FakeBot()
asyncio.run(on_privmsg(
bot, _msg("https://paste.mymx.me/some-paste"),
))
assert len(bot.sent) == 0
def test_empty_title_skipped(self):
bot = _FakeBot()
html = b"<html><body>No title here</body></html>"
head_resp = _FakeResp(b"", content_type="text/html")
get_resp = _FakeResp(html, content_type="text/html")
calls = iter([head_resp, get_resp])
with patch.object(_mod, "_urlopen",
side_effect=lambda *a, **kw: next(calls)):
asyncio.run(on_privmsg(bot, _msg("https://notitle.com")))
assert len(bot.sent) == 0
def test_image_url_skipped(self):
bot = _FakeBot()
asyncio.run(on_privmsg(
bot, _msg("https://example.com/photo.png"),
))
assert len(bot.sent) == 0
def test_title_with_description(self):
bot = _FakeBot()
html = (
b'<html><head>'
b'<title>My Page</title>'
b'<meta name="description" content="A great page">'
b'</head></html>'
)
head_resp = _FakeResp(b"", content_type="text/html")
get_resp = _FakeResp(html, content_type="text/html")
calls = iter([head_resp, get_resp])
with patch.object(_mod, "_urlopen",
side_effect=lambda *a, **kw: next(calls)):
asyncio.run(on_privmsg(bot, _msg("https://example.com")))
assert len(bot.sent) == 1
assert "My Page -- A great page" in bot.sent[0][1]
def test_cooldown_prevents_repeat(self):
bot = _FakeBot()
html = b"<html><head><title>Example</title></head></html>"
def make_calls():
return iter([
_FakeResp(b"", content_type="text/html"),
_FakeResp(html, content_type="text/html"),
])
calls = make_calls()
with patch.object(_mod, "_urlopen",
side_effect=lambda *a, **kw: next(calls)):
asyncio.run(on_privmsg(bot, _msg("https://cooldown.com")))
assert len(bot.sent) == 1
bot.sent.clear()
# Same URL again -- should be suppressed by cooldown
calls = make_calls()
with patch.object(_mod, "_urlopen",
side_effect=lambda *a, **kw: next(calls)):
asyncio.run(on_privmsg(bot, _msg("https://cooldown.com")))
assert len(bot.sent) == 0