Connect to multiple IRC servers concurrently from a single config file. Plugins are loaded once and shared; per-server state is isolated via separate SQLite databases and per-bot runtime state (bot._pstate). - Add build_server_configs() for [servers.*] config layout - Bot.__init__ gains name parameter, _pstate dict for plugin isolation - cli.py runs multiple bots via asyncio.gather - 9 stateful plugins migrated from module-level dicts to _ps(bot) pattern - Backward compatible: legacy [server] config works unchanged Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
479 lines
16 KiB
Python
479 lines
16 KiB
Python
"""Tests for the URL title preview plugin."""
|
|
|
|
import asyncio
|
|
import importlib.util
|
|
import sys
|
|
import time
|
|
from pathlib import Path
|
|
from unittest.mock import patch
|
|
|
|
from derp.irc import Message
|
|
|
|
# plugins/ is not a Python package -- load the module from file path
|
|
_spec = importlib.util.spec_from_file_location(
|
|
"plugins.urltitle",
|
|
Path(__file__).resolve().parent.parent / "plugins" / "urltitle.py",
|
|
)
|
|
_mod = importlib.util.module_from_spec(_spec)
|
|
sys.modules[_spec.name] = _mod
|
|
_spec.loader.exec_module(_mod)
|
|
|
|
from plugins.urltitle import ( # noqa: E402, I001
|
|
_TitleParser,
|
|
_check_cooldown,
|
|
_clean_url,
|
|
_extract_urls,
|
|
_fetch_title,
|
|
_is_ignored_url,
|
|
_ps,
|
|
on_privmsg,
|
|
)
|
|
|
|
|
|
# -- Helpers -----------------------------------------------------------------
|
|
|
|
|
|
class _FakeBot:
|
|
"""Minimal bot stand-in that captures sent messages."""
|
|
|
|
def __init__(self):
|
|
self.sent: list[tuple[str, str]] = []
|
|
self.nick = "derp"
|
|
self.prefix = "!"
|
|
self._pstate: dict = {}
|
|
self.config = {
|
|
"flaskpaste": {"url": "https://paste.mymx.me"},
|
|
"urltitle": {},
|
|
}
|
|
|
|
async def send(self, target: str, text: str) -> None:
|
|
self.sent.append((target, text))
|
|
|
|
|
|
def _msg(text: str, nick: str = "alice", target: str = "#test") -> Message:
|
|
"""Create a channel PRIVMSG."""
|
|
return Message(
|
|
raw="", prefix=f"{nick}!~{nick}@host", nick=nick,
|
|
command="PRIVMSG", params=[target, text], tags={},
|
|
)
|
|
|
|
|
|
def _pm(text: str, nick: str = "alice") -> Message:
|
|
"""Create a private PRIVMSG."""
|
|
return Message(
|
|
raw="", prefix=f"{nick}!~{nick}@host", nick=nick,
|
|
command="PRIVMSG", params=["derp", text], tags={},
|
|
)
|
|
|
|
|
|
class _FakeResp:
|
|
"""Fake HTTP response for mocking _urlopen."""
|
|
|
|
def __init__(self, data: bytes = b"", content_type: str = "text/html",
|
|
status: int = 200):
|
|
self._data = data
|
|
self.headers = {"Content-Type": content_type}
|
|
self.status = status
|
|
|
|
def read(self, n: int = -1) -> bytes:
|
|
if n == -1:
|
|
return self._data
|
|
return self._data[:n]
|
|
|
|
def close(self) -> None:
|
|
pass
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# TestExtractUrls
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestExtractUrls:
|
|
def test_single_url(self):
|
|
urls = _extract_urls("check https://example.com please")
|
|
assert urls == ["https://example.com"]
|
|
|
|
def test_multiple_urls(self):
|
|
urls = _extract_urls("see https://a.com and http://b.com ok")
|
|
assert urls == ["https://a.com", "http://b.com"]
|
|
|
|
def test_max_limit(self):
|
|
text = "https://a.com https://b.com https://c.com https://d.com"
|
|
urls = _extract_urls(text, max_urls=2)
|
|
assert len(urls) == 2
|
|
|
|
def test_trailing_punctuation(self):
|
|
urls = _extract_urls("visit https://example.com.")
|
|
assert urls == ["https://example.com"]
|
|
|
|
def test_trailing_comma(self):
|
|
urls = _extract_urls("https://example.com, check it")
|
|
assert urls == ["https://example.com"]
|
|
|
|
def test_balanced_parens(self):
|
|
urls = _extract_urls("https://en.wikipedia.org/wiki/Foo_(bar)")
|
|
assert urls == ["https://en.wikipedia.org/wiki/Foo_(bar)"]
|
|
|
|
def test_unbalanced_paren_stripped(self):
|
|
urls = _extract_urls("(https://example.com)")
|
|
assert urls == ["https://example.com"]
|
|
|
|
def test_suppressed_url(self):
|
|
urls = _extract_urls("!https://example.com")
|
|
assert urls == []
|
|
|
|
def test_suppressed_mixed(self):
|
|
urls = _extract_urls("!https://skip.com https://keep.com")
|
|
assert urls == ["https://keep.com"]
|
|
|
|
def test_no_urls(self):
|
|
urls = _extract_urls("no urls here")
|
|
assert urls == []
|
|
|
|
def test_dedup(self):
|
|
urls = _extract_urls("https://a.com https://a.com")
|
|
assert urls == ["https://a.com"]
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# TestCleanUrl
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestCleanUrl:
|
|
def test_no_trailing(self):
|
|
assert _clean_url("https://example.com") == "https://example.com"
|
|
|
|
def test_strip_period(self):
|
|
assert _clean_url("https://example.com.") == "https://example.com"
|
|
|
|
def test_strip_semicolon(self):
|
|
assert _clean_url("https://example.com;") == "https://example.com"
|
|
|
|
def test_preserve_balanced_parens(self):
|
|
url = "https://en.wikipedia.org/wiki/Foo_(bar)"
|
|
assert _clean_url(url) == url
|
|
|
|
def test_strip_trailing_paren_unbalanced(self):
|
|
assert _clean_url("https://example.com)") == "https://example.com"
|
|
|
|
def test_multiple_trailing(self):
|
|
assert _clean_url("https://example.com..;") == "https://example.com"
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# TestTitleParser
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestTitleParser:
|
|
def test_og_title_priority(self):
|
|
parser = _TitleParser()
|
|
parser.feed("""
|
|
<html><head>
|
|
<meta property="og:title" content="OG Title">
|
|
<title>Page Title</title>
|
|
</head></html>
|
|
""")
|
|
assert parser.best_title == "OG Title"
|
|
|
|
def test_title_fallback(self):
|
|
parser = _TitleParser()
|
|
parser.feed("<html><head><title>Fallback Title</title></head></html>")
|
|
assert parser.best_title == "Fallback Title"
|
|
|
|
def test_og_description(self):
|
|
parser = _TitleParser()
|
|
parser.feed("""
|
|
<meta property="og:description" content="OG Desc">
|
|
<meta name="description" content="Meta Desc">
|
|
""")
|
|
assert parser.best_description == "OG Desc"
|
|
|
|
def test_meta_description_fallback(self):
|
|
parser = _TitleParser()
|
|
parser.feed('<meta name="description" content="Meta Desc">')
|
|
assert parser.best_description == "Meta Desc"
|
|
|
|
def test_whitespace_collapse(self):
|
|
parser = _TitleParser()
|
|
parser.feed("<title> Hello World </title>")
|
|
assert parser.title == "Hello World"
|
|
|
|
def test_no_title(self):
|
|
parser = _TitleParser()
|
|
parser.feed("<html><body>No title here</body></html>")
|
|
assert parser.best_title == ""
|
|
|
|
def test_multipart_title(self):
|
|
parser = _TitleParser()
|
|
parser.feed("<title>Part 1 <em>Part 2</em> Part 3</title>")
|
|
# The parser collects text data; <em> triggers start/end but
|
|
# its text is still captured by handle_data
|
|
assert "Part 1" in parser.title
|
|
|
|
def test_empty_og_title(self):
|
|
parser = _TitleParser()
|
|
parser.feed("""
|
|
<meta property="og:title" content="">
|
|
<title>Real Title</title>
|
|
""")
|
|
assert parser.best_title == "Real Title"
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# TestIsIgnoredUrl
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestIsIgnoredUrl:
|
|
def test_paste_host(self):
|
|
assert _is_ignored_url(
|
|
"https://paste.mymx.me/abc", {"paste.mymx.me"},
|
|
) is True
|
|
|
|
def test_image_extension(self):
|
|
assert _is_ignored_url(
|
|
"https://example.com/photo.png", set(),
|
|
) is True
|
|
|
|
def test_pdf_extension(self):
|
|
assert _is_ignored_url(
|
|
"https://example.com/doc.pdf", set(),
|
|
) is True
|
|
|
|
def test_zip_extension(self):
|
|
assert _is_ignored_url(
|
|
"https://example.com/archive.zip", set(),
|
|
) is True
|
|
|
|
def test_normal_url_passes(self):
|
|
assert _is_ignored_url(
|
|
"https://example.com/page", set(),
|
|
) is False
|
|
|
|
def test_html_extension_passes(self):
|
|
assert _is_ignored_url(
|
|
"https://example.com/page.html", set(),
|
|
) is False
|
|
|
|
def test_custom_ignore_host(self):
|
|
assert _is_ignored_url(
|
|
"https://private.local/x", {"private.local"},
|
|
) is True
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# TestFetchTitle
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestFetchTitle:
|
|
def test_successful_html(self):
|
|
html = b"<html><head><title>Test Page</title></head></html>"
|
|
head_resp = _FakeResp(b"", content_type="text/html")
|
|
get_resp = _FakeResp(html, content_type="text/html; charset=utf-8")
|
|
calls = iter([head_resp, get_resp])
|
|
|
|
with patch.object(_mod, "_urlopen", side_effect=lambda *a, **kw: next(calls)):
|
|
title, desc = _fetch_title("https://example.com")
|
|
assert title == "Test Page"
|
|
|
|
def test_non_html_content_type_bails(self):
|
|
head_resp = _FakeResp(b"", content_type="application/json")
|
|
|
|
with patch.object(_mod, "_urlopen", return_value=head_resp):
|
|
title, desc = _fetch_title("https://example.com/api")
|
|
assert title == ""
|
|
assert desc == ""
|
|
|
|
def test_head_fail_falls_through_to_get(self):
|
|
html = b"<html><head><title>Recovered</title></head></html>"
|
|
get_resp = _FakeResp(html, content_type="text/html")
|
|
|
|
def side_effect(req, **kw):
|
|
if req.get_method() == "HEAD":
|
|
raise ConnectionError("HEAD not supported")
|
|
return get_resp
|
|
|
|
with patch.object(_mod, "_urlopen", side_effect=side_effect):
|
|
title, desc = _fetch_title("https://example.com")
|
|
assert title == "Recovered"
|
|
|
|
def test_network_error_returns_empty(self):
|
|
with patch.object(_mod, "_urlopen", side_effect=ConnectionError("fail")):
|
|
title, desc = _fetch_title("https://example.com")
|
|
assert title == ""
|
|
assert desc == ""
|
|
|
|
def test_og_tags_extracted(self):
|
|
html = (
|
|
b'<html><head>'
|
|
b'<meta property="og:title" content="OG Title">'
|
|
b'<meta property="og:description" content="OG Desc">'
|
|
b'</head></html>'
|
|
)
|
|
head_resp = _FakeResp(b"", content_type="text/html")
|
|
get_resp = _FakeResp(html, content_type="text/html")
|
|
calls = iter([head_resp, get_resp])
|
|
|
|
with patch.object(_mod, "_urlopen", side_effect=lambda *a, **kw: next(calls)):
|
|
title, desc = _fetch_title("https://example.com")
|
|
assert title == "OG Title"
|
|
assert desc == "OG Desc"
|
|
|
|
def test_get_non_html_bails(self):
|
|
"""HEAD returns html but GET returns non-html (redirect to binary)."""
|
|
head_resp = _FakeResp(b"", content_type="text/html")
|
|
get_resp = _FakeResp(b"\x89PNG", content_type="image/png")
|
|
calls = iter([head_resp, get_resp])
|
|
|
|
with patch.object(_mod, "_urlopen", side_effect=lambda *a, **kw: next(calls)):
|
|
title, desc = _fetch_title("https://example.com/img")
|
|
assert title == ""
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# TestCooldown
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestCooldown:
|
|
def setup_method(self):
|
|
self.bot = _FakeBot()
|
|
|
|
def test_first_access_not_cooled(self):
|
|
assert _check_cooldown(self.bot, "https://a.com", 300) is False
|
|
|
|
def test_second_access_within_window(self):
|
|
_check_cooldown(self.bot, "https://b.com", 300)
|
|
assert _check_cooldown(self.bot, "https://b.com", 300) is True
|
|
|
|
def test_after_cooldown_expires(self):
|
|
seen = _ps(self.bot)["seen"]
|
|
seen["https://c.com"] = time.monotonic() - 400
|
|
assert _check_cooldown(self.bot, "https://c.com", 300) is False
|
|
|
|
def test_pruning(self):
|
|
"""Cache is pruned when it exceeds max size."""
|
|
seen = _ps(self.bot)["seen"]
|
|
old = time.monotonic() - 600
|
|
for i in range(600):
|
|
seen[f"https://stale-{i}.com"] = old
|
|
_check_cooldown(self.bot, "https://new.com", 300)
|
|
assert len(seen) < 600
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# TestOnPrivmsg
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestOnPrivmsg:
|
|
|
|
def test_channel_url_previewed(self):
|
|
bot = _FakeBot()
|
|
html = b"<html><head><title>Example</title></head></html>"
|
|
head_resp = _FakeResp(b"", content_type="text/html")
|
|
get_resp = _FakeResp(html, content_type="text/html")
|
|
calls = iter([head_resp, get_resp])
|
|
|
|
def inner():
|
|
with patch.object(_mod, "_urlopen",
|
|
side_effect=lambda *a, **kw: next(calls)):
|
|
asyncio.run(on_privmsg(bot, _msg("check https://example.com")))
|
|
|
|
inner()
|
|
assert len(bot.sent) == 1
|
|
assert bot.sent[0][0] == "#test"
|
|
assert "\u21b3 Example" in bot.sent[0][1]
|
|
|
|
def test_pm_ignored(self):
|
|
bot = _FakeBot()
|
|
|
|
asyncio.run(on_privmsg(bot, _pm("https://example.com")))
|
|
assert len(bot.sent) == 0
|
|
|
|
def test_bot_nick_ignored(self):
|
|
bot = _FakeBot()
|
|
|
|
asyncio.run(on_privmsg(bot, _msg("https://example.com", nick="derp")))
|
|
assert len(bot.sent) == 0
|
|
|
|
def test_command_ignored(self):
|
|
bot = _FakeBot()
|
|
|
|
asyncio.run(on_privmsg(bot, _msg("!shorten https://example.com")))
|
|
assert len(bot.sent) == 0
|
|
|
|
def test_suppressed_url(self):
|
|
bot = _FakeBot()
|
|
|
|
asyncio.run(on_privmsg(bot, _msg("!https://example.com")))
|
|
assert len(bot.sent) == 0
|
|
|
|
def test_paste_host_ignored(self):
|
|
bot = _FakeBot()
|
|
|
|
asyncio.run(on_privmsg(
|
|
bot, _msg("https://paste.mymx.me/some-paste"),
|
|
))
|
|
assert len(bot.sent) == 0
|
|
|
|
def test_empty_title_skipped(self):
|
|
bot = _FakeBot()
|
|
html = b"<html><body>No title here</body></html>"
|
|
head_resp = _FakeResp(b"", content_type="text/html")
|
|
get_resp = _FakeResp(html, content_type="text/html")
|
|
calls = iter([head_resp, get_resp])
|
|
|
|
with patch.object(_mod, "_urlopen",
|
|
side_effect=lambda *a, **kw: next(calls)):
|
|
asyncio.run(on_privmsg(bot, _msg("https://notitle.com")))
|
|
assert len(bot.sent) == 0
|
|
|
|
def test_image_url_skipped(self):
|
|
bot = _FakeBot()
|
|
|
|
asyncio.run(on_privmsg(
|
|
bot, _msg("https://example.com/photo.png"),
|
|
))
|
|
assert len(bot.sent) == 0
|
|
|
|
def test_title_with_description(self):
|
|
bot = _FakeBot()
|
|
html = (
|
|
b'<html><head>'
|
|
b'<title>My Page</title>'
|
|
b'<meta name="description" content="A great page">'
|
|
b'</head></html>'
|
|
)
|
|
head_resp = _FakeResp(b"", content_type="text/html")
|
|
get_resp = _FakeResp(html, content_type="text/html")
|
|
calls = iter([head_resp, get_resp])
|
|
|
|
with patch.object(_mod, "_urlopen",
|
|
side_effect=lambda *a, **kw: next(calls)):
|
|
asyncio.run(on_privmsg(bot, _msg("https://example.com")))
|
|
assert len(bot.sent) == 1
|
|
assert "My Page -- A great page" in bot.sent[0][1]
|
|
|
|
def test_cooldown_prevents_repeat(self):
|
|
bot = _FakeBot()
|
|
html = b"<html><head><title>Example</title></head></html>"
|
|
|
|
def make_calls():
|
|
return iter([
|
|
_FakeResp(b"", content_type="text/html"),
|
|
_FakeResp(html, content_type="text/html"),
|
|
])
|
|
|
|
calls = make_calls()
|
|
with patch.object(_mod, "_urlopen",
|
|
side_effect=lambda *a, **kw: next(calls)):
|
|
asyncio.run(on_privmsg(bot, _msg("https://cooldown.com")))
|
|
|
|
assert len(bot.sent) == 1
|
|
bot.sent.clear()
|
|
|
|
# Same URL again -- should be suppressed by cooldown
|
|
calls = make_calls()
|
|
with patch.object(_mod, "_urlopen",
|
|
side_effect=lambda *a, **kw: next(calls)):
|
|
asyncio.run(on_privmsg(bot, _msg("https://cooldown.com")))
|
|
assert len(bot.sent) == 0
|