Single-shot (!ask) and conversational (!chat) LLM commands backed by OpenRouter's API. Per-user history (20 msg cap), 5s cooldown, reasoning model fallback, and model switching via subcommands. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
539 lines
18 KiB
Python
539 lines
18 KiB
Python
"""Tests for the OpenRouter LLM chat plugin."""
|
|
|
|
import asyncio
|
|
import importlib.util
|
|
import json
|
|
import sys
|
|
import time
|
|
import urllib.error
|
|
from pathlib import Path
|
|
from unittest.mock import patch
|
|
|
|
from derp.irc import Message
|
|
|
|
# plugins/ is not a Python package -- load the module from file path
|
|
_spec = importlib.util.spec_from_file_location(
|
|
"plugins.llm", Path(__file__).resolve().parent.parent / "plugins" / "llm.py",
|
|
)
|
|
_mod = importlib.util.module_from_spec(_spec)
|
|
sys.modules[_spec.name] = _mod
|
|
_spec.loader.exec_module(_mod)
|
|
|
|
from plugins.llm import ( # noqa: E402
|
|
_COOLDOWN,
|
|
_MAX_HISTORY,
|
|
_MAX_REPLY_LEN,
|
|
_chat_request,
|
|
_check_cooldown,
|
|
_extract_reply,
|
|
_get_api_key,
|
|
_get_model,
|
|
_ps,
|
|
_set_cooldown,
|
|
_truncate,
|
|
cmd_ask,
|
|
cmd_chat,
|
|
)
|
|
|
|
|
|
# -- Helpers -----------------------------------------------------------------
|
|
|
|
class _FakeState:
|
|
"""In-memory stand-in for bot.state."""
|
|
|
|
def __init__(self):
|
|
self._store: dict[str, dict[str, str]] = {}
|
|
|
|
def get(self, plugin: str, key: str, default: str | None = None) -> str | None:
|
|
return self._store.get(plugin, {}).get(key, default)
|
|
|
|
def set(self, plugin: str, key: str, value: str) -> None:
|
|
self._store.setdefault(plugin, {})[key] = value
|
|
|
|
def delete(self, plugin: str, key: str) -> bool:
|
|
try:
|
|
del self._store[plugin][key]
|
|
return True
|
|
except KeyError:
|
|
return False
|
|
|
|
def keys(self, plugin: str) -> list[str]:
|
|
return sorted(self._store.get(plugin, {}).keys())
|
|
|
|
|
|
class _FakeRegistry:
|
|
"""Minimal registry stand-in."""
|
|
|
|
def __init__(self):
|
|
self._modules: dict = {}
|
|
|
|
|
|
class _FakeBot:
|
|
"""Minimal bot stand-in that captures sent/replied messages."""
|
|
|
|
def __init__(self, *, admin: bool = False, config: dict | None = None):
|
|
self.sent: list[tuple[str, str]] = []
|
|
self.actions: list[tuple[str, str]] = []
|
|
self.replied: list[str] = []
|
|
self.state = _FakeState()
|
|
self._pstate: dict = {}
|
|
self.registry = _FakeRegistry()
|
|
self._admin = admin
|
|
self.config = config or {}
|
|
|
|
async def send(self, target: str, text: str) -> None:
|
|
self.sent.append((target, text))
|
|
|
|
async def action(self, target: str, text: str) -> None:
|
|
self.actions.append((target, text))
|
|
|
|
async def reply(self, message, text: str) -> None:
|
|
self.replied.append(text)
|
|
|
|
async def long_reply(self, message, lines, *, label: str = "") -> None:
|
|
for line in lines:
|
|
self.replied.append(line)
|
|
|
|
def _is_admin(self, message) -> bool:
|
|
return self._admin
|
|
|
|
|
|
def _msg(text: str, nick: str = "alice", target: str = "#test") -> Message:
|
|
"""Create a channel PRIVMSG."""
|
|
return Message(
|
|
raw="", prefix=f"{nick}!~{nick}@host", nick=nick,
|
|
command="PRIVMSG", params=[target, text], tags={},
|
|
)
|
|
|
|
|
|
def _pm(text: str, nick: str = "alice") -> Message:
|
|
"""Create a private PRIVMSG."""
|
|
return Message(
|
|
raw="", prefix=f"{nick}!~{nick}@host", nick=nick,
|
|
command="PRIVMSG", params=["botname", text], tags={},
|
|
)
|
|
|
|
|
|
def _api_response(content: str = "Hello!", reasoning: str = "") -> dict:
|
|
"""Build a mock API response."""
|
|
msg = {"role": "assistant", "content": content}
|
|
if reasoning:
|
|
msg["reasoning"] = reasoning
|
|
return {"choices": [{"message": msg}]}
|
|
|
|
|
|
class _FakeResp:
|
|
"""Mock HTTP response."""
|
|
|
|
def __init__(self, data: dict):
|
|
self._data = json.dumps(data).encode()
|
|
|
|
def read(self):
|
|
return self._data
|
|
|
|
def close(self):
|
|
pass
|
|
|
|
|
|
def _clear(bot=None) -> None:
|
|
"""Reset per-bot plugin state between tests."""
|
|
if bot is None:
|
|
return
|
|
ps = _ps(bot)
|
|
ps["histories"].clear()
|
|
ps["cooldowns"].clear()
|
|
ps["model"] = ""
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# TestTruncate
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestTruncate:
|
|
def test_short_text_unchanged(self):
|
|
assert _truncate("hello") == "hello"
|
|
|
|
def test_exact_length_unchanged(self):
|
|
text = "a" * _MAX_REPLY_LEN
|
|
assert _truncate(text) == text
|
|
|
|
def test_long_text_truncated(self):
|
|
text = "a" * 600
|
|
result = _truncate(text)
|
|
assert len(result) == _MAX_REPLY_LEN
|
|
assert result.endswith("...")
|
|
|
|
def test_custom_max(self):
|
|
result = _truncate("abcdefghij", 7)
|
|
assert result == "abcd..."
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# TestExtractReply
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestExtractReply:
|
|
def test_normal_content(self):
|
|
data = _api_response(content="Hello world")
|
|
assert _extract_reply(data) == "Hello world"
|
|
|
|
def test_empty_content_falls_back_to_reasoning(self):
|
|
data = _api_response(content="", reasoning="Thinking about it")
|
|
assert _extract_reply(data) == "Thinking about it"
|
|
|
|
def test_content_preferred_over_reasoning(self):
|
|
data = _api_response(content="Answer", reasoning="Reasoning")
|
|
assert _extract_reply(data) == "Answer"
|
|
|
|
def test_empty_choices(self):
|
|
assert _extract_reply({"choices": []}) == ""
|
|
|
|
def test_no_choices(self):
|
|
assert _extract_reply({}) == ""
|
|
|
|
def test_whitespace_content_falls_back(self):
|
|
data = _api_response(content=" ", reasoning="Fallback")
|
|
assert _extract_reply(data) == "Fallback"
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# TestGetApiKey
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestGetApiKey:
|
|
def test_from_env(self):
|
|
bot = _FakeBot()
|
|
with patch.dict("os.environ", {"OPENROUTER_API_KEY": "env-key"}):
|
|
assert _get_api_key(bot) == "env-key"
|
|
|
|
def test_from_config(self):
|
|
bot = _FakeBot(config={"openrouter": {"api_key": "cfg-key"}})
|
|
with patch.dict("os.environ", {}, clear=True):
|
|
import os
|
|
os.environ.pop("OPENROUTER_API_KEY", None)
|
|
assert _get_api_key(bot) == "cfg-key"
|
|
|
|
def test_env_takes_precedence(self):
|
|
bot = _FakeBot(config={"openrouter": {"api_key": "cfg-key"}})
|
|
with patch.dict("os.environ", {"OPENROUTER_API_KEY": "env-key"}):
|
|
assert _get_api_key(bot) == "env-key"
|
|
|
|
def test_missing_returns_empty(self):
|
|
bot = _FakeBot()
|
|
with patch.dict("os.environ", {}, clear=True):
|
|
import os
|
|
os.environ.pop("OPENROUTER_API_KEY", None)
|
|
assert _get_api_key(bot) == ""
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# TestGetModel
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestGetModel:
|
|
def test_default_model(self):
|
|
bot = _FakeBot()
|
|
assert _get_model(bot) == "openrouter/auto"
|
|
|
|
def test_from_config(self):
|
|
bot = _FakeBot(config={"openrouter": {"model": "some/model"}})
|
|
assert _get_model(bot) == "some/model"
|
|
|
|
def test_runtime_override(self):
|
|
bot = _FakeBot(config={"openrouter": {"model": "some/model"}})
|
|
_ps(bot)["model"] = "override/model"
|
|
assert _get_model(bot) == "override/model"
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# TestCooldown
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestCooldown:
|
|
def test_first_request_not_limited(self):
|
|
bot = _FakeBot()
|
|
_clear(bot)
|
|
assert _check_cooldown(bot, "alice") is False
|
|
|
|
def test_second_request_within_cooldown(self):
|
|
bot = _FakeBot()
|
|
_clear(bot)
|
|
_set_cooldown(bot, "alice")
|
|
assert _check_cooldown(bot, "alice") is True
|
|
|
|
def test_different_users_independent(self):
|
|
bot = _FakeBot()
|
|
_clear(bot)
|
|
_set_cooldown(bot, "alice")
|
|
assert _check_cooldown(bot, "bob") is False
|
|
|
|
def test_after_cooldown_passes(self):
|
|
bot = _FakeBot()
|
|
_clear(bot)
|
|
_set_cooldown(bot, "alice")
|
|
# Simulate time passing
|
|
_ps(bot)["cooldowns"]["alice"] = time.monotonic() - _COOLDOWN - 1
|
|
assert _check_cooldown(bot, "alice") is False
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# TestCmdAsk
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestCmdAsk:
|
|
def test_no_args(self):
|
|
bot = _FakeBot()
|
|
asyncio.run(cmd_ask(bot, _msg("!ask")))
|
|
assert "Usage:" in bot.replied[0]
|
|
|
|
def test_empty_args(self):
|
|
bot = _FakeBot()
|
|
asyncio.run(cmd_ask(bot, _msg("!ask ")))
|
|
assert "Usage:" in bot.replied[0]
|
|
|
|
def test_no_api_key(self):
|
|
bot = _FakeBot()
|
|
_clear(bot)
|
|
with patch.dict("os.environ", {}, clear=True):
|
|
import os
|
|
os.environ.pop("OPENROUTER_API_KEY", None)
|
|
asyncio.run(cmd_ask(bot, _msg("!ask what is python")))
|
|
assert "not configured" in bot.replied[0]
|
|
|
|
def test_success(self):
|
|
bot = _FakeBot()
|
|
_clear(bot)
|
|
resp = _FakeResp(_api_response(content="Python is a programming language."))
|
|
|
|
with patch.dict("os.environ", {"OPENROUTER_API_KEY": "test-key"}):
|
|
with patch.object(_mod, "_urlopen", return_value=resp):
|
|
asyncio.run(cmd_ask(bot, _msg("!ask what is python")))
|
|
|
|
assert len(bot.replied) == 1
|
|
assert "Python is a programming language" in bot.replied[0]
|
|
|
|
def test_api_error_429(self):
|
|
bot = _FakeBot()
|
|
_clear(bot)
|
|
err = urllib.error.HTTPError(
|
|
"url", 429, "Too Many Requests", {}, None,
|
|
)
|
|
|
|
with patch.dict("os.environ", {"OPENROUTER_API_KEY": "test-key"}):
|
|
with patch.object(_mod, "_urlopen", side_effect=err):
|
|
asyncio.run(cmd_ask(bot, _msg("!ask hello")))
|
|
|
|
assert "Rate limited" in bot.replied[0]
|
|
|
|
def test_api_error_500(self):
|
|
bot = _FakeBot()
|
|
_clear(bot)
|
|
err = urllib.error.HTTPError(
|
|
"url", 500, "Internal Server Error", {}, None,
|
|
)
|
|
|
|
with patch.dict("os.environ", {"OPENROUTER_API_KEY": "test-key"}):
|
|
with patch.object(_mod, "_urlopen", side_effect=err):
|
|
asyncio.run(cmd_ask(bot, _msg("!ask hello")))
|
|
|
|
assert "API error" in bot.replied[0]
|
|
assert "500" in bot.replied[0]
|
|
|
|
def test_connection_error(self):
|
|
bot = _FakeBot()
|
|
_clear(bot)
|
|
|
|
with patch.dict("os.environ", {"OPENROUTER_API_KEY": "test-key"}):
|
|
with patch.object(_mod, "_urlopen", side_effect=ConnectionError("fail")):
|
|
asyncio.run(cmd_ask(bot, _msg("!ask hello")))
|
|
|
|
assert "Request failed" in bot.replied[0]
|
|
|
|
def test_empty_response(self):
|
|
bot = _FakeBot()
|
|
_clear(bot)
|
|
resp = _FakeResp({"choices": []})
|
|
|
|
with patch.dict("os.environ", {"OPENROUTER_API_KEY": "test-key"}):
|
|
with patch.object(_mod, "_urlopen", return_value=resp):
|
|
asyncio.run(cmd_ask(bot, _msg("!ask hello")))
|
|
|
|
assert "No response" in bot.replied[0]
|
|
|
|
def test_cooldown(self):
|
|
bot = _FakeBot()
|
|
_clear(bot)
|
|
resp = _FakeResp(_api_response(content="Hello!"))
|
|
|
|
with patch.dict("os.environ", {"OPENROUTER_API_KEY": "test-key"}):
|
|
with patch.object(_mod, "_urlopen", return_value=resp):
|
|
asyncio.run(cmd_ask(bot, _msg("!ask first")))
|
|
bot.replied.clear()
|
|
asyncio.run(cmd_ask(bot, _msg("!ask second")))
|
|
|
|
assert "Cooldown" in bot.replied[0]
|
|
|
|
def test_response_truncation(self):
|
|
bot = _FakeBot()
|
|
_clear(bot)
|
|
long_text = "a" * 600
|
|
resp = _FakeResp(_api_response(content=long_text))
|
|
|
|
with patch.dict("os.environ", {"OPENROUTER_API_KEY": "test-key"}):
|
|
with patch.object(_mod, "_urlopen", return_value=resp):
|
|
asyncio.run(cmd_ask(bot, _msg("!ask hello")))
|
|
|
|
assert len(bot.replied[0]) == _MAX_REPLY_LEN
|
|
assert bot.replied[0].endswith("...")
|
|
|
|
def test_reasoning_model_fallback(self):
|
|
bot = _FakeBot()
|
|
_clear(bot)
|
|
resp = _FakeResp(_api_response(content="", reasoning="Deep thought"))
|
|
|
|
with patch.dict("os.environ", {"OPENROUTER_API_KEY": "test-key"}):
|
|
with patch.object(_mod, "_urlopen", return_value=resp):
|
|
asyncio.run(cmd_ask(bot, _msg("!ask meaning of life")))
|
|
|
|
assert "Deep thought" in bot.replied[0]
|
|
|
|
def test_multiline_uses_long_reply(self):
|
|
bot = _FakeBot()
|
|
_clear(bot)
|
|
resp = _FakeResp(_api_response(content="Line one\nLine two\nLine three"))
|
|
|
|
with patch.dict("os.environ", {"OPENROUTER_API_KEY": "test-key"}):
|
|
with patch.object(_mod, "_urlopen", return_value=resp):
|
|
asyncio.run(cmd_ask(bot, _msg("!ask hello")))
|
|
|
|
assert len(bot.replied) == 3
|
|
assert bot.replied[0] == "Line one"
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# TestCmdChat
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestCmdChat:
|
|
def test_no_args(self):
|
|
bot = _FakeBot()
|
|
asyncio.run(cmd_chat(bot, _msg("!chat")))
|
|
assert "Usage:" in bot.replied[0]
|
|
|
|
def test_chat_with_history(self):
|
|
bot = _FakeBot()
|
|
_clear(bot)
|
|
resp1 = _FakeResp(_api_response(content="I am an assistant."))
|
|
resp2 = _FakeResp(_api_response(content="You asked who I am."))
|
|
|
|
with patch.dict("os.environ", {"OPENROUTER_API_KEY": "test-key"}):
|
|
with patch.object(_mod, "_urlopen", return_value=resp1):
|
|
asyncio.run(cmd_chat(bot, _msg("!chat who are you")))
|
|
# Clear cooldown for second request
|
|
_ps(bot)["cooldowns"].clear()
|
|
with patch.object(_mod, "_urlopen", return_value=resp2) as mock_url:
|
|
asyncio.run(cmd_chat(bot, _msg("!chat what did I ask")))
|
|
# Verify the history was sent with the second request
|
|
call_args = mock_url.call_args
|
|
req = call_args[0][0]
|
|
body = json.loads(req.data)
|
|
# System + user1 + assistant1 + user2 = 4 messages
|
|
assert len(body["messages"]) == 4
|
|
assert body["messages"][1]["content"] == "who are you"
|
|
assert body["messages"][2]["content"] == "I am an assistant."
|
|
assert body["messages"][3]["content"] == "what did I ask"
|
|
|
|
assert "I am an assistant" in bot.replied[0]
|
|
assert "You asked who I am" in bot.replied[1]
|
|
|
|
def test_chat_clear(self):
|
|
bot = _FakeBot()
|
|
_clear(bot)
|
|
# Pre-populate history
|
|
_ps(bot)["histories"]["alice"] = [
|
|
{"role": "user", "content": "hello"},
|
|
{"role": "assistant", "content": "hi"},
|
|
]
|
|
|
|
asyncio.run(cmd_chat(bot, _msg("!chat clear")))
|
|
assert "cleared" in bot.replied[0].lower()
|
|
assert "alice" not in _ps(bot)["histories"]
|
|
|
|
def test_chat_cooldown(self):
|
|
bot = _FakeBot()
|
|
_clear(bot)
|
|
resp = _FakeResp(_api_response(content="Hello!"))
|
|
|
|
with patch.dict("os.environ", {"OPENROUTER_API_KEY": "test-key"}):
|
|
with patch.object(_mod, "_urlopen", return_value=resp):
|
|
asyncio.run(cmd_chat(bot, _msg("!chat first")))
|
|
bot.replied.clear()
|
|
asyncio.run(cmd_chat(bot, _msg("!chat second")))
|
|
|
|
assert "Cooldown" in bot.replied[0]
|
|
|
|
def test_chat_model_show(self):
|
|
bot = _FakeBot()
|
|
_clear(bot)
|
|
asyncio.run(cmd_chat(bot, _msg("!chat model")))
|
|
assert "openrouter/auto" in bot.replied[0]
|
|
|
|
def test_chat_model_switch(self):
|
|
bot = _FakeBot(admin=True)
|
|
_clear(bot)
|
|
asyncio.run(cmd_chat(bot, _msg("!chat model meta-llama/llama-3.3-70b-instruct:free")))
|
|
assert "Model set to" in bot.replied[0]
|
|
assert _ps(bot)["model"] == "meta-llama/llama-3.3-70b-instruct:free"
|
|
|
|
def test_chat_models_list(self):
|
|
bot = _FakeBot()
|
|
_clear(bot)
|
|
asyncio.run(cmd_chat(bot, _msg("!chat models")))
|
|
assert len(bot.replied) >= 3
|
|
assert any("openrouter/auto" in r for r in bot.replied)
|
|
|
|
def test_chat_no_api_key(self):
|
|
bot = _FakeBot()
|
|
_clear(bot)
|
|
with patch.dict("os.environ", {}, clear=True):
|
|
import os
|
|
os.environ.pop("OPENROUTER_API_KEY", None)
|
|
asyncio.run(cmd_chat(bot, _msg("!chat hello")))
|
|
assert "not configured" in bot.replied[0]
|
|
|
|
def test_history_cap(self):
|
|
bot = _FakeBot()
|
|
_clear(bot)
|
|
# Pre-populate with MAX_HISTORY messages
|
|
ps = _ps(bot)
|
|
ps["histories"]["alice"] = [
|
|
{"role": "user" if i % 2 == 0 else "assistant", "content": f"msg{i}"}
|
|
for i in range(_MAX_HISTORY)
|
|
]
|
|
|
|
resp = _FakeResp(_api_response(content="Latest reply"))
|
|
|
|
with patch.dict("os.environ", {"OPENROUTER_API_KEY": "test-key"}):
|
|
with patch.object(_mod, "_urlopen", return_value=resp):
|
|
asyncio.run(cmd_chat(bot, _msg("!chat overflow")))
|
|
|
|
history = ps["histories"]["alice"]
|
|
# History should be capped at MAX_HISTORY
|
|
assert len(history) <= _MAX_HISTORY
|
|
|
|
def test_chat_api_error_removes_user_msg(self):
|
|
"""On API failure, the user message should be removed from history."""
|
|
bot = _FakeBot()
|
|
_clear(bot)
|
|
err = urllib.error.HTTPError(
|
|
"url", 500, "Internal Server Error", {}, None,
|
|
)
|
|
|
|
with patch.dict("os.environ", {"OPENROUTER_API_KEY": "test-key"}):
|
|
with patch.object(_mod, "_urlopen", side_effect=err):
|
|
asyncio.run(cmd_chat(bot, _msg("!chat hello")))
|
|
|
|
ps = _ps(bot)
|
|
# History should be empty -- user msg was removed on failure
|
|
assert len(ps["histories"].get("alice", [])) == 0
|