Compare commits

..

7 Commits

Author SHA1 Message Date
user
e9528bd879 docs: update docs for ACL tiers and webhook
All checks were successful
CI / test (3.11) (push) Successful in 1m37s
CI / test (3.12) (push) Successful in 1m35s
CI / test (3.13) (push) Successful in 1m20s
- USAGE.md: permission tiers section, webhook config/API/example
- CHEATSHEET.md: ACL tiers and webhook quick-ref sections
- ROADMAP.md: mark webhook and ACL items done
- TODO.md: mark webhook and ACL items done
- TASKS.md: new sprint for ACL + webhook work
2026-02-21 17:59:22 +01:00
user
c483beb555 feat: add webhook listener for push events to channels
HTTP POST endpoint for external services (CI, monitoring, GitHub).
HMAC-SHA256 auth, JSON body, single POST endpoint at /.

- asyncio.start_server with raw HTTP parsing (zero deps)
- Body validation: channel prefix, non-empty text, 64KB cap
- !webhook admin command shows address, request count, uptime
- Module-level server guard prevents duplicates on reconnect
- 22 test cases in test_webhook.py
2026-02-21 17:59:14 +01:00
user
2514aa777d feat: add granular ACL tiers (trusted/oper/admin)
4-tier permission model: user < trusted < oper < admin.
Commands specify a required tier via tier= parameter.
Backward compatible: admin=True maps to tier="admin".

- TIERS constant and Handler.tier field in plugin.py
- _get_tier() method in bot.py with pattern matching
- _is_admin() preserved as thin wrapper
- operators/trusted config lists in config.py
- whoami shows tier, admins shows all configured tiers
- 32 test cases in test_acl.py
2026-02-21 17:59:05 +01:00
user
5bc59730c4 docs: update docs for cron, shortener, CI
Add !cron section to USAGE.md and CHEATSHEET.md.
Mark cron, URL shortener, CI complete in ROADMAP.md and TODO.md.
New sprint in TASKS.md.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-21 17:35:16 +01:00
user
6ef3fee72c feat: add Gitea Actions CI pipeline
Matrix build: Python 3.11, 3.12, 3.13.
Runs ruff lint and pytest on push/PR to master.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-21 17:35:12 +01:00
user
7b14efb30f feat: add cron plugin for scheduled commands
Admin-only plugin for interval-based command execution.
Supports add/del/list, 1m-7d intervals, 20 jobs/channel.
Persists via bot.state, restores on reconnect.
Includes test_cron.py (~38 cases).

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-21 17:35:08 +01:00
user
aebe1589d2 feat: add URL shortening to subscription announcements
Bot.shorten_url() method delegates to flaskpaste plugin when loaded.
RSS, YouTube, and pastemoni announcements auto-shorten links.
Includes test_flaskpaste.py (9 cases) and FakeBot updates in 3 test files.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-21 17:35:03 +01:00
23 changed files with 2479 additions and 68 deletions

20
.gitea/workflows/ci.yml Normal file
View File

@@ -0,0 +1,20 @@
name: CI
on:
push:
branches: [master]
pull_request:
branches: [master]
jobs:
test:
runs-on: ubuntu-latest
strategy:
matrix:
python-version: ["3.11", "3.12", "3.13"]
steps:
- uses: actions/checkout@v4
- uses: actions/setup-python@v5
with:
python-version: ${{ matrix.python-version }}
- run: pip install -e . && pip install pytest ruff
- run: ruff check src/ tests/ plugins/
- run: pytest -v

View File

@@ -113,9 +113,9 @@
- [ ] Multi-server support (per-server config, shared plugins)
- [ ] Stable plugin API (versioned, breaking change policy)
- [x] Paste overflow (auto-paste long output to FlaskPaste, return link)
- [ ] URL shortener integration (shorten URLs in alerts and long output)
- [ ] Webhook listener (HTTP endpoint for push events to channels)
- [ ] Granular ACLs (per-command permission tiers: trusted, operator, admin)
- [x] URL shortener integration (shorten URLs in subscription announcements)
- [x] Webhook listener (HTTP endpoint for push events to channels)
- [x] Granular ACLs (per-command permission tiers: trusted, operator, admin)
- [x] `paste` command (manual paste to FlaskPaste)
- [x] `shorten` command (manual URL shortening)
- [x] `emailcheck` plugin (SMTP VRFY/RCPT TO)
@@ -125,6 +125,6 @@
- [x] `jwt` plugin (decode tokens, show claims/expiry, flag weaknesses)
- [x] `mac` plugin (OUI vendor lookup, local IEEE database)
- [x] `pastemoni` plugin (monitor paste sites for keywords)
- [ ] `cron` plugin (scheduled bot commands on a timer)
- [x] `cron` plugin (scheduled bot commands on a timer)
- [x] Plugin command unit tests (encode, hash, dns, cidr, defang)
- [ ] CI pipeline
- [x] CI pipeline (Gitea Actions, Python 3.11-3.13, ruff + pytest)

View File

@@ -1,6 +1,30 @@
# derp - Tasks
## Current Sprint -- v2.0.0 Quick Wins (2026-02-21)
## Current Sprint -- v2.0.0 ACL + Webhook (2026-02-21)
| Pri | Status | Task |
|-----|--------|------|
| P0 | [x] | Granular ACL tiers in `src/derp/plugin.py` (TIERS, Handler.tier, decorator) |
| P0 | [x] | ACL dispatch in `src/derp/bot.py` (_get_tier, _operators, _trusted) |
| P0 | [x] | Config defaults: operators, trusted, webhook section |
| P0 | [x] | `plugins/core.py` -- whoami/admins tier display |
| P0 | [x] | `plugins/webhook.py` -- HTTP webhook listener (HMAC, JSON, POST) |
| P1 | [x] | Tests: `test_acl.py` (32 cases), `test_webhook.py` (22 cases) |
| P2 | [x] | Documentation update (USAGE.md, CHEATSHEET.md, ROADMAP.md, TODO.md) |
## Previous Sprint -- v2.0.0 Tier 2 (2026-02-21)
| Pri | Status | Task |
|-----|--------|------|
| P0 | [x] | `Bot.shorten_url()` method in `src/derp/bot.py` |
| P0 | [x] | URL shortening in rss.py, youtube.py, pastemoni.py announcements |
| P0 | [x] | `plugins/cron.py` -- scheduled command execution (add/del/list) |
| P0 | [x] | `.gitea/workflows/ci.yml` -- Gitea Actions CI pipeline |
| P1 | [x] | Tests: `test_flaskpaste.py` (9 cases), `test_cron.py` (~38 cases) |
| P1 | [x] | FakeBot `shorten_url` in test_rss, test_youtube, test_pastemoni |
| P2 | [x] | Documentation update (USAGE.md, CHEATSHEET.md, ROADMAP.md, TODO.md) |
## Previous Sprint -- v2.0.0 Quick Wins (2026-02-21)
| Pri | Status | Task |
|-----|--------|------|

12
TODO.md
View File

@@ -4,10 +4,10 @@
- [ ] Multi-server support (per-server config, shared plugins)
- [ ] Stable plugin API (versioned, breaking change policy)
- [ ] Paste overflow (auto-paste long output to FlaskPaste)
- [ ] URL shortener integration (shorten URLs in alerts/output)
- [ ] Webhook listener (HTTP endpoint for push events to channels)
- [ ] Granular ACLs (per-command: trusted, operator, admin)
- [x] Paste overflow (auto-paste long output to FlaskPaste)
- [x] URL shortener integration (shorten URLs in subscription announcements)
- [x] Webhook listener (HTTP endpoint for push events to channels)
- [x] Granular ACLs (per-command: trusted, operator, admin)
## LLM Bridge
@@ -80,9 +80,9 @@ is preserved in git history for reference.
- [x] `paste` -- manual paste to FlaskPaste
- [x] `shorten` -- manual URL shortening
- [ ] `cron` -- scheduled bot commands on a timer
- [x] `cron` -- scheduled bot commands on a timer
## Testing
- [x] Plugin command unit tests (encode, hash, dns, cidr, defang)
- [ ] CI pipeline
- [x] CI pipeline (Gitea Actions)

View File

@@ -75,21 +75,27 @@ code changes -- restart the container or use `!reload` for plugins.
!h # Shorthand (any unambiguous prefix works)
```
## Admin
## Permission Tiers
```
!whoami # Show your hostmask + admin status
!admins # Show admin patterns + detected opers (admin)
user < trusted < oper < admin
```
```toml
# config/derp.toml
[bot]
admins = ["*!~user@trusted.host", "ops!*@*.ops.net"]
admins = ["*!~root@*.ops.net"] # admin tier
operators = ["*!~staff@trusted.host"] # oper tier
trusted = ["*!~user@known.host"] # trusted tier
```
IRC operators are auto-detected via WHO on connect and on user JOIN
(debounced 2s to handle netsplit floods). Hostmask patterns use fnmatch.
```
!whoami # Show your hostmask + permission tier
!admins # Show configured tiers + detected opers (admin)
```
IRC operators are auto-detected via WHO (admin tier). Hostmask patterns
use fnmatch. `admin=True` on commands still works (maps to tier="admin").
## Channel Management (admin)
@@ -437,6 +443,45 @@ History in `data/alert_history.db`.
Shows top 3 results as `Title -- URL`. Channel only. Max query length: 200 chars.
## Cron (admin)
```
!cron add 1h #ops !rss check news # Schedule command every hour
!cron add 2d #alerts !tor update # Every 2 days
!cron del abc123 # Remove job by ID
!cron list # List jobs in channel
```
Intervals: `5m`, `1h30m`, `2d`, `90s`, or raw seconds. Min 1m, max 7d.
Max 20 jobs/channel. Persists across restarts. Channel only.
## Webhook (admin)
```toml
# config/derp.toml
[webhook]
enabled = true
host = "127.0.0.1"
port = 8080
secret = "your-shared-secret"
```
```bash
# Send message to IRC channel via webhook
SECRET="your-shared-secret"
BODY='{"channel":"#ops","text":"Deploy done"}'
SIG=$(printf '%s' "$BODY" | openssl dgst -sha256 -hmac "$SECRET" | awk '{print $2}')
curl -X POST http://127.0.0.1:8080/ \
-H "X-Signature: sha256=$SIG" -d "$BODY"
```
```
!webhook # Show listener status (admin)
```
POST JSON: `{"channel":"#chan","text":"msg"}`. Optional `"action":true`.
Auth: HMAC-SHA256 via `X-Signature` header. Starts on IRC connect.
## Plugin Template
```python

View File

@@ -53,10 +53,18 @@ rate_burst = 5 # Burst capacity (default: 5)
paste_threshold = 4 # Max lines before overflow to FlaskPaste (default: 4)
admins = [] # Hostmask patterns (fnmatch), IRCOPs auto-detected
timezone = "UTC" # Timezone for calendar reminders (IANA tz name)
operators = [] # Hostmask patterns for "oper" tier (fnmatch)
trusted = [] # Hostmask patterns for "trusted" tier (fnmatch)
[logging]
level = "info" # Logging level: debug, info, warning, error
format = "text" # Log format: "text" (default) or "json"
[webhook]
enabled = false # Enable HTTP webhook listener
host = "127.0.0.1" # Bind address
port = 8080 # Bind port
secret = "" # HMAC-SHA256 shared secret (empty = no auth)
```
## Built-in Commands
@@ -141,6 +149,8 @@ format = "text" # Log format: "text" (default) or "json"
| `!shorten <url>` | Shorten a URL via FlaskPaste |
| `!paste <text>` | Create a paste via FlaskPaste |
| `!pastemoni <add\|del\|list\|check>` | Paste site keyword monitoring |
| `!cron <add\|del\|list>` | Scheduled command execution (admin) |
| `!webhook` | Show webhook listener status (admin) |
### Command Shorthand
@@ -220,24 +230,31 @@ Each line contains:
Default format is `"text"` (human-readable, same as before).
## Admin System
## Permission Tiers (ACL)
Commands marked as `admin` require elevated permissions. Admin access is
granted via:
The bot uses a 4-tier permission model. Each command has a required tier;
users must meet or exceed it.
1. **IRC operator status** -- detected automatically via `WHO`
2. **Hostmask patterns** -- configured in `[bot] admins`, fnmatch-style
```
user < trusted < oper < admin
```
| Tier | Granted by |
|------|------------|
| `user` | Everyone (default) |
| `trusted` | `[bot] trusted` hostmask patterns |
| `oper` | `[bot] operators` hostmask patterns |
| `admin` | `[bot] admins` hostmask patterns or IRC operator status |
```toml
[bot]
admins = [
"*!~user@trusted.host",
"ops!*@*.ops.net",
]
admins = ["*!~root@*.ops.net"]
operators = ["*!~staff@trusted.host"]
trusted = ["*!~user@known.host"]
```
Empty by default -- only IRC operators get admin access unless patterns
are configured.
All lists are empty by default -- only IRC operators get admin access
unless patterns are configured. Patterns use fnmatch-style matching.
### Oper Detection
@@ -255,18 +272,24 @@ set automatically.
| Command | Description |
|---------|-------------|
| `!whoami` | Show your hostmask and admin status |
| `!admins` | Show configured patterns and detected opers (admin) |
| `!whoami` | Show your hostmask and permission tier |
| `!admins` | Show configured tiers and detected opers (admin) |
Admin-restricted commands: `!load`, `!reload`, `!unload`, `!admins`, `!state`,
`!kick`, `!ban`, `!unban`, `!topic`, `!mode`.
`!kick`, `!ban`, `!unban`, `!topic`, `!mode`, `!webhook`.
### Writing Admin Commands
### Writing Tiered Commands
```python
# admin=True still works (maps to tier="admin")
@command("dangerous", help="Admin-only action", admin=True)
async def cmd_dangerous(bot, message):
...
# Explicit tier for finer control
@command("moderate", help="Trusted-only action", tier="trusted")
async def cmd_moderate(bot, message):
...
```
## IRCv3 Capability Negotiation
@@ -404,6 +427,68 @@ restarting the bot.
The `core` plugin cannot be unloaded (prevents losing `!load`/`!reload`),
but it can be reloaded.
## Webhook Listener
Receive HTTP POST requests from external services (CI, monitoring, GitHub,
etc.) and relay messages to IRC channels.
### Configuration
```toml
[webhook]
enabled = true
host = "127.0.0.1"
port = 8080
secret = "your-shared-secret"
```
### HTTP API
Single endpoint: `POST /`
**Request body** (JSON):
```json
{"channel": "#ops", "text": "Deploy v2.3.1 complete"}
```
Optional `"action": true` sends as a `/me` action.
**Authentication**: HMAC-SHA256 via `X-Signature: sha256=<hex>` header.
If `secret` is empty, no authentication is required.
**Response codes**:
| Status | When |
|--------|------|
| 200 OK | Message sent |
| 400 Bad Request | Invalid JSON, missing/invalid channel, empty text |
| 401 Unauthorized | Bad or missing HMAC signature |
| 405 Method Not Allowed | Non-POST request |
| 413 Payload Too Large | Body > 64 KB |
### Usage Example
```bash
SECRET="your-shared-secret"
BODY='{"channel":"#ops","text":"Deploy v2.3.1 complete"}'
SIG=$(printf '%s' "$BODY" | openssl dgst -sha256 -hmac "$SECRET" | awk '{print $2}')
curl -X POST http://127.0.0.1:8080/ \
-H "Content-Type: application/json" \
-H "X-Signature: sha256=$SIG" \
-d "$BODY"
```
### Status Command
```
!webhook Show listener address, request count, uptime (admin)
```
The server starts on IRC connect (event 001) and runs for the lifetime of
the bot. If the server is already running (e.g. after reconnect), it is
not restarted.
## Writing Plugins
Create a `.py` file in the `plugins/` directory:
@@ -1077,6 +1162,42 @@ badhost.invalid -> NXDOMAIN
- Concurrent via `asyncio.gather()`
- Valid types: A, NS, CNAME, SOA, PTR, MX, TXT, AAAA
### `!cron` -- Scheduled Command Execution
Schedule bot commands to repeat on a timer. Admins only.
```
!cron add <interval> <#channel> <command...> Schedule a command
!cron del <id> Remove a job
!cron list List jobs in channel
```
Examples:
```
!cron add 1h #ops !rss check news Poll RSS feed every hour
!cron add 2d #alerts !tor update Update Tor list every 2 days
!cron del abc123 Remove job by ID
!cron list Show jobs in current channel
```
Output format:
```
Cron #a1b2c3: !rss check news every 1h in #ops
#a1b2c3 every 1h: !rss check news
```
- `add` and `del` require admin privileges
- `add` and `list` must be used in a channel (not PM)
- Interval formats: `5m`, `1h30m`, `2d`, `90s`, or raw seconds
- Minimum interval: 1 minute
- Maximum interval: 7 days
- Maximum 20 jobs per channel
- Jobs persist across bot restarts via `bot.state`
- Dispatched commands run with the original creator's identity
- The scheduled command goes through normal command routing and permissions
### FlaskPaste Configuration
```toml

View File

@@ -139,34 +139,33 @@ async def cmd_plugins(bot, message):
await bot.reply(message, f"Plugins: {', '.join(parts)}")
@command("whoami", help="Show your hostmask and admin status")
@command("whoami", help="Show your hostmask and permission tier")
async def cmd_whoami(bot, message):
"""Display the sender's hostmask and permission level."""
prefix = message.prefix or "unknown"
is_admin = bot._is_admin(message)
is_oper = message.prefix in bot._opers if message.prefix else False
tags = []
if is_admin:
tags.append("admin")
else:
tags.append("user")
if is_oper:
tier = bot._get_tier(message)
tags = [tier]
if message.prefix and message.prefix in bot._opers:
tags.append("IRCOP")
await bot.reply(message, f"{prefix} [{', '.join(tags)}]")
@command("admins", help="Show configured admin patterns and detected opers", admin=True)
@command("admins", help="Show configured permission tiers and detected opers", admin=True)
async def cmd_admins(bot, message):
"""Display admin hostmask patterns and known IRC operators."""
"""Display configured permission tiers and known IRC operators."""
parts = []
if bot._admins:
parts.append(f"Patterns: {', '.join(bot._admins)}")
parts.append(f"Admin: {', '.join(bot._admins)}")
else:
parts.append("Patterns: (none)")
parts.append("Admin: (none)")
if bot._operators:
parts.append(f"Oper: {', '.join(bot._operators)}")
if bot._trusted:
parts.append(f"Trusted: {', '.join(bot._trusted)}")
if bot._opers:
parts.append(f"Opers: {', '.join(sorted(bot._opers))}")
parts.append(f"IRCOPs: {', '.join(sorted(bot._opers))}")
else:
parts.append("Opers: (none)")
parts.append("IRCOPs: (none)")
await bot.reply(message, " | ".join(parts))

288
plugins/cron.py Normal file
View File

@@ -0,0 +1,288 @@
"""Plugin: scheduled command execution on a timer."""
from __future__ import annotations
import asyncio
import hashlib
import json
import re
import time
from derp.irc import Message
from derp.plugin import command, event
# -- Constants ---------------------------------------------------------------
_DURATION_RE = re.compile(r"(?:(\d+)d)?(?:(\d+)h)?(?:(\d+)m)?(?:(\d+)s)?$")
_MIN_INTERVAL = 60
_MAX_INTERVAL = 604800 # 7 days
_MAX_JOBS = 20
# -- Module-level tracking ---------------------------------------------------
_jobs: dict[str, dict] = {}
_tasks: dict[str, asyncio.Task] = {}
# -- Pure helpers ------------------------------------------------------------
def _make_id(channel: str, cmd: str) -> str:
"""Generate a short hex ID from channel + command + timestamp."""
raw = f"{channel}:{cmd}:{time.monotonic()}".encode()
return hashlib.sha256(raw).hexdigest()[:6]
def _parse_duration(spec: str) -> int | None:
"""Parse a duration like '5m', '1h30m', '2d', '90s', or raw seconds."""
try:
secs = int(spec)
return secs if secs > 0 else None
except ValueError:
pass
m = _DURATION_RE.match(spec.lower())
if not m or not any(m.groups()):
return None
days = int(m.group(1) or 0)
hours = int(m.group(2) or 0)
mins = int(m.group(3) or 0)
secs = int(m.group(4) or 0)
total = days * 86400 + hours * 3600 + mins * 60 + secs
return total if total > 0 else None
def _format_duration(secs: int) -> str:
"""Format seconds into compact duration."""
parts = []
if secs >= 86400:
parts.append(f"{secs // 86400}d")
secs %= 86400
if secs >= 3600:
parts.append(f"{secs // 3600}h")
secs %= 3600
if secs >= 60:
parts.append(f"{secs // 60}m")
secs %= 60
if secs or not parts:
parts.append(f"{secs}s")
return "".join(parts)
# -- State helpers -----------------------------------------------------------
def _state_key(channel: str, cron_id: str) -> str:
"""Build composite state key."""
return f"{channel}:{cron_id}"
def _save(bot, key: str, data: dict) -> None:
"""Persist cron job to bot.state."""
bot.state.set("cron", key, json.dumps(data))
def _load(bot, key: str) -> dict | None:
"""Load cron job from bot.state."""
raw = bot.state.get("cron", key)
if raw is None:
return None
try:
return json.loads(raw)
except json.JSONDecodeError:
return None
def _delete(bot, key: str) -> None:
"""Remove cron job from bot.state."""
bot.state.delete("cron", key)
# -- Cron loop ---------------------------------------------------------------
async def _cron_loop(bot, key: str) -> None:
"""Repeating loop: sleep, then dispatch the stored command."""
try:
while True:
data = _jobs.get(key)
if not data:
return
await asyncio.sleep(data["interval"])
# Synthesize a message for command dispatch
msg = Message(
raw="", prefix=data["prefix"],
nick=data["nick"], command="PRIVMSG",
params=[data["channel"], data["command"]], tags={},
)
bot._dispatch_command(msg)
except asyncio.CancelledError:
pass
def _start_job(bot, key: str) -> None:
"""Create and track a cron task."""
existing = _tasks.get(key)
if existing and not existing.done():
return
task = asyncio.create_task(_cron_loop(bot, key))
_tasks[key] = task
def _stop_job(key: str) -> None:
"""Cancel and remove a cron task."""
task = _tasks.pop(key, None)
if task and not task.done():
task.cancel()
_jobs.pop(key, None)
# -- Restore on connect -----------------------------------------------------
def _restore(bot) -> None:
"""Rebuild cron tasks from persisted state."""
for key in bot.state.keys("cron"):
existing = _tasks.get(key)
if existing and not existing.done():
continue
data = _load(bot, key)
if data is None:
continue
_jobs[key] = data
_start_job(bot, key)
@event("001")
async def on_connect(bot, message):
"""Restore cron jobs on connect."""
_restore(bot)
# -- Command handler ---------------------------------------------------------
@command("cron", help="Cron: !cron add|del|list", admin=True)
async def cmd_cron(bot, message):
"""Scheduled command execution on a timer.
Usage:
!cron add <interval> <#channel> <command...> Schedule a command
!cron del <id> Remove a job
!cron list List jobs
"""
parts = message.text.split(None, 4)
if len(parts) < 2:
await bot.reply(message, "Usage: !cron <add|del|list> [args]")
return
sub = parts[1].lower()
# -- list ----------------------------------------------------------------
if sub == "list":
if not message.is_channel:
await bot.reply(message, "Use this command in a channel")
return
channel = message.target
prefix = f"{channel}:"
entries = []
for key in bot.state.keys("cron"):
if key.startswith(prefix):
data = _load(bot, key)
if data:
cron_id = data["id"]
interval = _format_duration(data["interval"])
cmd = data["command"]
entries.append(f"#{cron_id} every {interval}: {cmd}")
if not entries:
await bot.reply(message, "No cron jobs in this channel")
return
for entry in entries:
await bot.reply(message, entry)
return
# -- del -----------------------------------------------------------------
if sub == "del":
if len(parts) < 3:
await bot.reply(message, "Usage: !cron del <id>")
return
cron_id = parts[2].lstrip("#")
# Find matching key across all channels
found_key = None
for key in bot.state.keys("cron"):
data = _load(bot, key)
if data and data["id"] == cron_id:
found_key = key
break
if not found_key:
await bot.reply(message, f"No cron job #{cron_id}")
return
_stop_job(found_key)
_delete(bot, found_key)
await bot.reply(message, f"Removed cron #{cron_id}")
return
# -- add -----------------------------------------------------------------
if sub == "add":
if not message.is_channel:
await bot.reply(message, "Use this command in a channel")
return
if len(parts) < 5:
await bot.reply(
message,
"Usage: !cron add <interval> <#channel> <command...>",
)
return
interval_spec = parts[2]
target_channel = parts[3]
cmd_text = parts[4]
if not target_channel.startswith(("#", "&")):
await bot.reply(message, "Target must be a channel (e.g. #ops)")
return
interval = _parse_duration(interval_spec)
if interval is None:
await bot.reply(message, "Invalid interval (use: 5m, 1h, 2d)")
return
if interval < _MIN_INTERVAL:
await bot.reply(
message,
f"Minimum interval is {_format_duration(_MIN_INTERVAL)}",
)
return
if interval > _MAX_INTERVAL:
await bot.reply(
message,
f"Maximum interval is {_format_duration(_MAX_INTERVAL)}",
)
return
# Check per-channel limit
ch_prefix = f"{target_channel}:"
count = sum(
1 for k in bot.state.keys("cron") if k.startswith(ch_prefix)
)
if count >= _MAX_JOBS:
await bot.reply(message, f"Job limit reached ({_MAX_JOBS})")
return
cron_id = _make_id(target_channel, cmd_text)
key = _state_key(target_channel, cron_id)
data = {
"id": cron_id,
"channel": target_channel,
"command": cmd_text,
"interval": interval,
"prefix": message.prefix,
"nick": message.nick,
"added_by": message.nick,
}
_save(bot, key, data)
_jobs[key] = data
_start_job(bot, key)
fmt_interval = _format_duration(interval)
await bot.reply(
message,
f"Cron #{cron_id}: {cmd_text} every {fmt_interval} in {target_channel}",
)
return
await bot.reply(message, "Usage: !cron <add|del|list> [args]")

View File

@@ -275,6 +275,8 @@ async def _poll_once(bot, key: str, announce: bool = True) -> None:
title = item.get("title") or "(untitled)"
snippet = item.get("snippet", "")
url = item.get("url", "")
if url:
url = await bot.shorten_url(url)
parts = [f"[{tag}] {title}"]
if snippet:
parts.append(snippet)

View File

@@ -272,6 +272,8 @@ async def _poll_once(bot, key: str, announce: bool = True) -> None:
for item in shown:
title = _truncate(item["title"]) if item["title"] else "(no title)"
link = item["link"]
if link:
link = await bot.shorten_url(link)
date = item.get("date", "")
line = f"[{name}] {title}"
if date:

196
plugins/webhook.py Normal file
View File

@@ -0,0 +1,196 @@
"""Webhook listener: receive HTTP POST requests and relay messages to IRC."""
from __future__ import annotations
import asyncio
import hashlib
import hmac
import json
import logging
import time
from derp.plugin import command, event
log = logging.getLogger(__name__)
_MAX_BODY = 65536 # 64 KB
_server: asyncio.Server | None = None
_request_count: int = 0
_started: float = 0.0
def _verify_signature(secret: str, body: bytes, signature: str) -> bool:
"""Verify HMAC-SHA256 signature from X-Signature header."""
if not secret:
return True # no secret configured = open access
if not signature.startswith("sha256="):
return False
expected = hmac.new(secret.encode(), body, hashlib.sha256).hexdigest()
return hmac.compare_digest(expected, signature[7:])
def _http_response(status: int, reason: str, body: str = "") -> bytes:
"""Build a minimal HTTP/1.1 response."""
body_bytes = body.encode("utf-8") if body else b""
lines = [
f"HTTP/1.1 {status} {reason}",
"Content-Type: text/plain; charset=utf-8",
f"Content-Length: {len(body_bytes)}",
"Connection: close",
"",
"",
]
return "\r\n".join(lines).encode("utf-8") + body_bytes
async def _handle_request(reader: asyncio.StreamReader,
writer: asyncio.StreamWriter,
bot, secret: str) -> None:
"""Parse one HTTP request and dispatch to IRC."""
global _request_count
try:
# Read request line
request_line = await asyncio.wait_for(reader.readline(), timeout=10.0)
if not request_line:
return
parts = request_line.decode("utf-8", errors="replace").strip().split()
if len(parts) < 2:
writer.write(_http_response(400, "Bad Request", "malformed request"))
return
method, _path = parts[0], parts[1]
# Read headers
headers: dict[str, str] = {}
while True:
line = await asyncio.wait_for(reader.readline(), timeout=10.0)
if not line or line == b"\r\n" or line == b"\n":
break
decoded = line.decode("utf-8", errors="replace").strip()
if ":" in decoded:
key, val = decoded.split(":", 1)
headers[key.strip().lower()] = val.strip()
# Method check
if method != "POST":
writer.write(_http_response(405, "Method Not Allowed", "POST only"))
return
# Read body
content_length = int(headers.get("content-length", "0"))
if content_length > _MAX_BODY:
writer.write(_http_response(413, "Payload Too Large",
f"max {_MAX_BODY} bytes"))
return
body = await asyncio.wait_for(reader.readexactly(content_length),
timeout=10.0)
# Verify HMAC signature
signature = headers.get("x-signature", "")
if not _verify_signature(secret, body, signature):
writer.write(_http_response(401, "Unauthorized", "bad signature"))
return
# Parse JSON
try:
data = json.loads(body)
except (json.JSONDecodeError, UnicodeDecodeError):
writer.write(_http_response(400, "Bad Request", "invalid JSON"))
return
# Validate fields
channel = data.get("channel", "")
text = data.get("text", "")
is_action = data.get("action", False)
if not isinstance(channel, str) or not channel.startswith(("#", "&")):
writer.write(_http_response(400, "Bad Request", "invalid channel"))
return
if not isinstance(text, str) or not text.strip():
writer.write(_http_response(400, "Bad Request", "empty text"))
return
# Send to IRC
text = text.strip()
if is_action:
await bot.action(channel, text)
else:
await bot.send(channel, text)
_request_count += 1
writer.write(_http_response(200, "OK", "sent"))
log.info("webhook: relayed to %s (%d bytes)", channel, len(text))
except (asyncio.TimeoutError, asyncio.IncompleteReadError, ConnectionError):
log.debug("webhook: client disconnected")
except Exception:
log.exception("webhook: error handling request")
try:
writer.write(_http_response(500, "Internal Server Error"))
except Exception:
pass
finally:
try:
writer.close()
await writer.wait_closed()
except Exception:
pass
@event("001")
async def on_connect(bot, message):
"""Start the webhook HTTP server on connect (if enabled)."""
global _server, _started, _request_count
if _server is not None:
return # already running
cfg = bot.config.get("webhook", {})
if not cfg.get("enabled"):
return
host = cfg.get("host", "127.0.0.1")
port = cfg.get("port", 8080)
secret = cfg.get("secret", "")
async def handler(reader, writer):
await _handle_request(reader, writer, bot, secret)
try:
_server = await asyncio.start_server(handler, host, port)
_started = time.monotonic()
_request_count = 0
log.info("webhook: listening on %s:%d", host, port)
except OSError as exc:
log.error("webhook: failed to bind %s:%d: %s", host, port, exc)
@command("webhook", help="Show webhook listener status", admin=True)
async def cmd_webhook(bot, message):
"""Display webhook server status."""
if _server is None:
await bot.reply(message, "Webhook: not running")
return
socks = _server.sockets
if socks:
addr = socks[0].getsockname()
address = f"{addr[0]}:{addr[1]}"
else:
address = "unknown"
elapsed = int(time.monotonic() - _started)
hours, rem = divmod(elapsed, 3600)
minutes, secs = divmod(rem, 60)
parts = []
if hours:
parts.append(f"{hours}h")
if minutes:
parts.append(f"{minutes}m")
parts.append(f"{secs}s")
uptime = " ".join(parts)
await bot.reply(
message,
f"Webhook: {address} | {_request_count} requests | up {uptime}",
)

View File

@@ -394,6 +394,8 @@ async def _poll_once(bot, key: str, announce: bool = True) -> None:
for item in shown:
title = _truncate(item["title"]) if item["title"] else "(no title)"
link = item["link"]
if link:
link = await bot.shorten_url(link)
# Build metadata suffix
parts = []
dur = durations.get(item["id"], 0)

View File

@@ -13,7 +13,7 @@ from pathlib import Path
from derp import __version__
from derp.irc import _MAX_IRC_LINE, IRCConnection, Message, format_msg, parse
from derp.plugin import Handler, PluginRegistry
from derp.plugin import TIERS, Handler, PluginRegistry
from derp.state import StateStore
log = logging.getLogger(__name__)
@@ -93,6 +93,8 @@ class Bot:
self._tasks: set[asyncio.Task] = set()
self._reconnect_delay: float = 5.0
self._admins: list[str] = config.get("bot", {}).get("admins", [])
self._operators: list[str] = config.get("bot", {}).get("operators", [])
self._trusted: list[str] = config.get("bot", {}).get("trusted", [])
self._opers: set[str] = set() # hostmasks of known IRC operators
self._caps: set[str] = set() # negotiated IRCv3 caps
self._who_pending: dict[str, asyncio.Task] = {} # debounced WHO per channel
@@ -359,20 +361,33 @@ class Bot:
return True
return plugin_name in allowed
def _get_tier(self, msg: Message) -> str:
"""Determine the permission tier of the message sender.
Checks (in priority order): IRC operator, admin pattern,
operator pattern, trusted pattern. Falls back to ``"user"``.
"""
if not msg.prefix:
return "user"
if msg.prefix in self._opers:
return "admin"
for pattern in self._admins:
if fnmatch.fnmatch(msg.prefix, pattern):
return "admin"
for pattern in self._operators:
if fnmatch.fnmatch(msg.prefix, pattern):
return "oper"
for pattern in self._trusted:
if fnmatch.fnmatch(msg.prefix, pattern):
return "trusted"
return "user"
def _is_admin(self, msg: Message) -> bool:
"""Check if the message sender is a bot admin.
Returns True if the sender is a known IRC operator or matches
a configured hostmask pattern (fnmatch-style).
Thin wrapper around ``_get_tier`` for backward compatibility.
"""
if not msg.prefix:
return False
if msg.prefix in self._opers:
return True
for pattern in self._admins:
if fnmatch.fnmatch(msg.prefix, pattern):
return True
return False
return self._get_tier(msg) == "admin"
def _dispatch_command(self, msg: Message) -> None:
"""Check if a PRIVMSG is a bot command and spawn it."""
@@ -396,10 +411,13 @@ class Bot:
if not self._plugin_allowed(handler.plugin, channel):
return
if handler.admin and not self._is_admin(msg):
deny = f"Permission denied: {self.prefix}{cmd_name} requires admin"
self._spawn(self.reply(msg, deny), name=f"cmd:{cmd_name}:denied")
return
required = handler.tier
if required != "user":
sender = self._get_tier(msg)
if TIERS.index(sender) < TIERS.index(required):
deny = f"Permission denied: {self.prefix}{cmd_name} requires {required}"
self._spawn(self.reply(msg, deny), name=f"cmd:{cmd_name}:denied")
return
self._spawn(self._run_command(handler, cmd_name, msg), name=f"cmd:{cmd_name}")
@@ -510,6 +528,17 @@ class Bot:
"""Send a CTCP ACTION (/me) to a target."""
await self.send(target, f"\x01ACTION {text}\x01")
async def shorten_url(self, url: str) -> str:
"""Shorten a URL via FlaskPaste. Returns original on failure."""
fp = self.registry._modules.get("flaskpaste")
if not fp:
return url
loop = asyncio.get_running_loop()
try:
return await loop.run_in_executor(None, fp.shorten_url, self, url)
except Exception:
return url
async def join(self, channel: str) -> None:
"""Join an IRC channel."""
await self.conn.send(format_msg("JOIN", channel))

View File

@@ -29,8 +29,16 @@ DEFAULTS: dict = {
"rate_burst": 5,
"paste_threshold": 4,
"admins": [],
"operators": [],
"trusted": [],
},
"channels": {},
"webhook": {
"enabled": False,
"host": "127.0.0.1",
"port": 8080,
"secret": "",
},
"logging": {
"level": "info",
"format": "text",

View File

@@ -12,6 +12,8 @@ from typing import Any, Callable
log = logging.getLogger(__name__)
TIERS: tuple[str, ...] = ("user", "trusted", "oper", "admin")
@dataclass(slots=True)
class Handler:
@@ -22,9 +24,10 @@ class Handler:
help: str = ""
plugin: str = ""
admin: bool = False
tier: str = "user"
def command(name: str, help: str = "", admin: bool = False) -> Callable:
def command(name: str, help: str = "", admin: bool = False, tier: str = "") -> Callable:
"""Decorator to register an async function as a bot command.
Usage::
@@ -36,12 +39,17 @@ def command(name: str, help: str = "", admin: bool = False) -> Callable:
@command("reload", help="Reload a plugin", admin=True)
async def cmd_reload(bot, message):
...
@command("trusted_cmd", help="Trusted-only", tier="trusted")
async def cmd_trusted(bot, message):
...
"""
def decorator(func: Callable) -> Callable:
func._derp_command = name # type: ignore[attr-defined]
func._derp_help = help # type: ignore[attr-defined]
func._derp_admin = admin # type: ignore[attr-defined]
func._derp_tier = tier if tier else ("admin" if admin else "user") # type: ignore[attr-defined]
return func
return decorator
@@ -74,12 +82,14 @@ class PluginRegistry:
self._paths: dict[str, Path] = {}
def register_command(self, name: str, callback: Callable, help: str = "",
plugin: str = "", admin: bool = False) -> None:
plugin: str = "", admin: bool = False,
tier: str = "user") -> None:
"""Register a command handler."""
if name in self.commands:
log.warning("command '%s' already registered, overwriting", name)
self.commands[name] = Handler(
name=name, callback=callback, help=help, plugin=plugin, admin=admin,
name=name, callback=callback, help=help, plugin=plugin,
admin=admin, tier=tier,
)
log.debug("registered command: %s (%s)", name, plugin)
@@ -102,6 +112,7 @@ class PluginRegistry:
help=getattr(obj, "_derp_help", ""),
plugin=plugin_name,
admin=getattr(obj, "_derp_admin", False),
tier=getattr(obj, "_derp_tier", "user"),
)
count += 1
if hasattr(obj, "_derp_event"):

409
tests/test_acl.py Normal file
View File

@@ -0,0 +1,409 @@
"""Tests for the granular ACL tier system."""
import asyncio
from pathlib import Path
from derp.bot import Bot
from derp.config import DEFAULTS
from derp.irc import Message, parse
from derp.plugin import TIERS, PluginRegistry, command
# -- Helpers -----------------------------------------------------------------
class _MockConnection:
"""Minimal mock IRC connection for dispatch tests."""
def __init__(self) -> None:
self._queue: asyncio.Queue[str | None] = asyncio.Queue()
self.sent: list[str] = []
async def connect(self) -> None:
pass
async def close(self) -> None:
pass
async def send(self, line: str) -> None:
self.sent.append(line)
async def readline(self) -> str | None:
return await self._queue.get()
def inject(self, line: str) -> None:
self._queue.put_nowait(line)
def disconnect(self) -> None:
self._queue.put_nowait(None)
class _Harness:
"""Test fixture: Bot with mock connection and configurable tiers."""
def __init__(
self,
admins: list[str] | None = None,
operators: list[str] | None = None,
trusted: list[str] | None = None,
) -> None:
config: dict = {
"server": {
"host": "localhost", "port": 6667, "tls": False,
"nick": "test", "user": "test", "realname": "test bot",
},
"bot": {
"prefix": "!",
"channels": [],
"plugins_dir": "plugins",
"admins": admins or [],
"operators": operators or [],
"trusted": trusted or [],
"rate_limit": 100.0,
"rate_burst": 100,
},
}
self.registry = PluginRegistry()
self.bot = Bot(config, self.registry)
self.conn = _MockConnection()
self.bot.conn = self.conn # type: ignore[assignment]
self.registry.load_plugin(Path("plugins/core.py"))
def inject_registration(self) -> None:
self.conn.inject(":server CAP * LS :")
self.conn.inject(":server 001 test :Welcome")
def privmsg(self, nick: str, target: str, text: str,
user: str = "user", host: str = "host") -> None:
self.conn.inject(f":{nick}!{user}@{host} PRIVMSG {target} :{text}")
async def run(self) -> None:
self.conn.disconnect()
self.bot._running = True
await self.bot._connect_and_run()
if self.bot._tasks:
await asyncio.gather(*list(self.bot._tasks), return_exceptions=True)
def sent_privmsgs(self, target: str) -> list[str]:
results = []
for line in self.conn.sent:
msg = parse(line)
if msg.command == "PRIVMSG" and msg.target == target:
results.append(msg.text)
return results
def _msg(text: str, prefix: str = "nick!user@host") -> Message:
nick = prefix.split("!")[0] if "!" in prefix else prefix
return Message(
raw="", prefix=prefix, nick=nick,
command="PRIVMSG", params=["#test", text], tags={},
)
# ---------------------------------------------------------------------------
# TestTierConstants
# ---------------------------------------------------------------------------
class TestTierConstants:
def test_tier_order(self):
assert TIERS == ("user", "trusted", "oper", "admin")
def test_index_comparison(self):
assert TIERS.index("user") < TIERS.index("trusted")
assert TIERS.index("trusted") < TIERS.index("oper")
assert TIERS.index("oper") < TIERS.index("admin")
# ---------------------------------------------------------------------------
# TestGetTier
# ---------------------------------------------------------------------------
class TestGetTier:
def test_no_prefix(self):
h = _Harness()
msg = Message(raw="", prefix="", nick="", command="PRIVMSG",
params=["#test", "hi"], tags={})
assert h.bot._get_tier(msg) == "user"
def test_ircop(self):
h = _Harness()
h.bot._opers.add("op!root@server")
msg = _msg("test", prefix="op!root@server")
assert h.bot._get_tier(msg) == "admin"
def test_admin_pattern(self):
h = _Harness(admins=["*!*@admin.host"])
msg = _msg("test", prefix="alice!user@admin.host")
assert h.bot._get_tier(msg) == "admin"
def test_oper_pattern(self):
h = _Harness(operators=["*!*@oper.host"])
msg = _msg("test", prefix="bob!user@oper.host")
assert h.bot._get_tier(msg) == "oper"
def test_trusted_pattern(self):
h = _Harness(trusted=["*!*@trusted.host"])
msg = _msg("test", prefix="carol!user@trusted.host")
assert h.bot._get_tier(msg) == "trusted"
def test_no_match(self):
h = _Harness(admins=["*!*@admin.host"])
msg = _msg("test", prefix="nobody!user@random.host")
assert h.bot._get_tier(msg) == "user"
def test_priority_admin_over_oper(self):
"""Admin patterns take priority over operator patterns."""
h = _Harness(admins=["*!*@dual.host"], operators=["*!*@dual.host"])
msg = _msg("test", prefix="x!y@dual.host")
assert h.bot._get_tier(msg) == "admin"
def test_priority_oper_over_trusted(self):
"""Operator patterns take priority over trusted patterns."""
h = _Harness(operators=["*!*@dual.host"], trusted=["*!*@dual.host"])
msg = _msg("test", prefix="x!y@dual.host")
assert h.bot._get_tier(msg) == "oper"
def test_ircop_over_admin_pattern(self):
"""IRC operator status takes priority over admin hostmask pattern."""
h = _Harness(admins=["*!*@server"])
h.bot._opers.add("op!root@server")
msg = _msg("test", prefix="op!root@server")
# Both match, but ircop check comes first
assert h.bot._get_tier(msg) == "admin"
# ---------------------------------------------------------------------------
# TestIsAdminBackcompat
# ---------------------------------------------------------------------------
class TestIsAdminBackcompat:
def test_admin_true(self):
h = _Harness(admins=["*!*@admin.host"])
msg = _msg("test", prefix="a!b@admin.host")
assert h.bot._is_admin(msg) is True
def test_oper_false(self):
h = _Harness(operators=["*!*@oper.host"])
msg = _msg("test", prefix="a!b@oper.host")
assert h.bot._is_admin(msg) is False
def test_trusted_false(self):
h = _Harness(trusted=["*!*@trusted.host"])
msg = _msg("test", prefix="a!b@trusted.host")
assert h.bot._is_admin(msg) is False
# ---------------------------------------------------------------------------
# TestCommandDecorator
# ---------------------------------------------------------------------------
class TestCommandDecorator:
def test_admin_true_sets_tier(self):
@command("x", admin=True)
async def handler(bot, msg):
pass
assert handler._derp_tier == "admin"
def test_explicit_tier(self):
@command("x", tier="trusted")
async def handler(bot, msg):
pass
assert handler._derp_tier == "trusted"
def test_tier_overrides_admin(self):
@command("x", admin=True, tier="oper")
async def handler(bot, msg):
pass
assert handler._derp_tier == "oper"
def test_default_tier(self):
@command("x")
async def handler(bot, msg):
pass
assert handler._derp_tier == "user"
# ---------------------------------------------------------------------------
# TestDispatchWithTiers
# ---------------------------------------------------------------------------
class TestDispatchWithTiers:
def test_trusted_allowed_for_trusted_cmd(self):
"""Trusted user can run a trusted-tier command."""
h = _Harness(trusted=["*!user@host"])
@command("tcmd", tier="trusted")
async def cmd_tcmd(bot, message):
await bot.reply(message, "ok")
h.registry.register_command(
"tcmd", cmd_tcmd, tier="trusted", plugin="test",
)
h.inject_registration()
h.privmsg("nick", "#test", "!tcmd")
asyncio.run(h.run())
msgs = h.sent_privmsgs("#test")
assert any("ok" in m for m in msgs)
def test_admin_allowed_for_trusted_cmd(self):
"""Admin can run trusted-tier commands (higher tier)."""
h = _Harness(admins=["*!user@host"])
@command("tcmd", tier="trusted")
async def cmd_tcmd(bot, message):
await bot.reply(message, "ok")
h.registry.register_command(
"tcmd", cmd_tcmd, tier="trusted", plugin="test",
)
h.inject_registration()
h.privmsg("nick", "#test", "!tcmd")
asyncio.run(h.run())
msgs = h.sent_privmsgs("#test")
assert any("ok" in m for m in msgs)
def test_user_denied_for_trusted_cmd(self):
"""Regular user is denied a trusted-tier command."""
h = _Harness()
@command("tcmd", tier="trusted")
async def cmd_tcmd(bot, message):
await bot.reply(message, "ok")
h.registry.register_command(
"tcmd", cmd_tcmd, tier="trusted", plugin="test",
)
h.inject_registration()
h.privmsg("nick", "#test", "!tcmd")
asyncio.run(h.run())
msgs = h.sent_privmsgs("#test")
assert any("Permission denied" in m for m in msgs)
assert any("requires trusted" in m for m in msgs)
def test_backward_compat_admin_flag(self):
"""admin=True commands still work via tier='admin'."""
h = _Harness(admins=["*!user@host"])
h.inject_registration()
# cmd_admins is already registered via core plugin with admin=True
h.privmsg("nick", "#test", "!admins")
asyncio.run(h.run())
msgs = h.sent_privmsgs("#test")
assert any("Admin:" in m for m in msgs)
def test_denial_message_shows_tier(self):
"""Permission denial message includes the required tier name."""
h = _Harness()
@command("opcmd", tier="oper")
async def cmd_opcmd(bot, message):
await bot.reply(message, "ok")
h.registry.register_command(
"opcmd", cmd_opcmd, tier="oper", plugin="test",
)
h.inject_registration()
h.privmsg("nick", "#test", "!opcmd")
asyncio.run(h.run())
msgs = h.sent_privmsgs("#test")
assert any("requires oper" in m for m in msgs)
# ---------------------------------------------------------------------------
# TestConfigDefaults
# ---------------------------------------------------------------------------
class TestConfigDefaults:
def test_operators_in_defaults(self):
assert "operators" in DEFAULTS["bot"]
assert DEFAULTS["bot"]["operators"] == []
def test_trusted_in_defaults(self):
assert "trusted" in DEFAULTS["bot"]
assert DEFAULTS["bot"]["trusted"] == []
def test_webhook_in_defaults(self):
assert "webhook" in DEFAULTS
assert DEFAULTS["webhook"]["enabled"] is False
# ---------------------------------------------------------------------------
# TestWhoami
# ---------------------------------------------------------------------------
class TestWhoami:
def test_shows_admin(self):
h = _Harness(admins=["*!user@host"])
h.inject_registration()
h.privmsg("nick", "#test", "!whoami")
asyncio.run(h.run())
msgs = h.sent_privmsgs("#test")
assert any("admin" in m for m in msgs)
def test_shows_trusted(self):
h = _Harness(trusted=["*!user@host"])
h.inject_registration()
h.privmsg("nick", "#test", "!whoami")
asyncio.run(h.run())
msgs = h.sent_privmsgs("#test")
assert any("trusted" in m for m in msgs)
def test_shows_user(self):
h = _Harness()
h.inject_registration()
h.privmsg("nick", "#test", "!whoami")
asyncio.run(h.run())
msgs = h.sent_privmsgs("#test")
assert any("user" in m for m in msgs)
def test_shows_ircop_tag(self):
h = _Harness()
h.bot._opers.add("nick!user@host")
h.inject_registration()
h.privmsg("nick", "#test", "!whoami")
asyncio.run(h.run())
msgs = h.sent_privmsgs("#test")
assert any("IRCOP" in m for m in msgs)
# ---------------------------------------------------------------------------
# TestAdmins
# ---------------------------------------------------------------------------
class TestAdmins:
def test_shows_all_tiers(self):
h = _Harness(
admins=["*!*@admin.host"],
operators=["*!*@oper.host"],
trusted=["*!*@trusted.host"],
)
h.bot._opers.add("nick!user@host")
h.inject_registration()
# Must be admin to run !admins
h.privmsg("nick", "#test", "!admins", user="x", host="admin.host")
asyncio.run(h.run())
msgs = h.sent_privmsgs("#test")
combined = " ".join(msgs)
assert "Admin:" in combined
assert "Oper:" in combined
assert "Trusted:" in combined
assert "IRCOPs:" in combined
def test_omits_empty_tiers(self):
h = _Harness(admins=["*!user@host"])
h.inject_registration()
h.privmsg("nick", "#test", "!admins")
asyncio.run(h.run())
msgs = h.sent_privmsgs("#test")
combined = " ".join(msgs)
assert "Admin:" in combined
# No operators or trusted configured, so those sections shouldn't appear
assert "Oper:" not in combined
assert "Trusted:" not in combined

639
tests/test_cron.py Normal file
View File

@@ -0,0 +1,639 @@
"""Tests for the cron plugin."""
import asyncio
import importlib.util
import json
import sys
from pathlib import Path
from derp.irc import Message
# plugins/ is not a Python package -- load the module from file path
_spec = importlib.util.spec_from_file_location(
"plugins.cron", Path(__file__).resolve().parent.parent / "plugins" / "cron.py",
)
_mod = importlib.util.module_from_spec(_spec)
sys.modules[_spec.name] = _mod
_spec.loader.exec_module(_mod)
from plugins.cron import ( # noqa: E402
_MAX_JOBS,
_delete,
_format_duration,
_jobs,
_load,
_make_id,
_parse_duration,
_restore,
_save,
_start_job,
_state_key,
_stop_job,
_tasks,
cmd_cron,
on_connect,
)
# -- Helpers -----------------------------------------------------------------
class _FakeState:
"""In-memory stand-in for bot.state."""
def __init__(self):
self._store: dict[str, dict[str, str]] = {}
def get(self, plugin: str, key: str, default: str | None = None) -> str | None:
return self._store.get(plugin, {}).get(key, default)
def set(self, plugin: str, key: str, value: str) -> None:
self._store.setdefault(plugin, {})[key] = value
def delete(self, plugin: str, key: str) -> bool:
try:
del self._store[plugin][key]
return True
except KeyError:
return False
def keys(self, plugin: str) -> list[str]:
return sorted(self._store.get(plugin, {}).keys())
class _FakeBot:
"""Minimal bot stand-in that captures sent/replied messages."""
def __init__(self, *, admin: bool = True):
self.sent: list[tuple[str, str]] = []
self.replied: list[str] = []
self.dispatched: list[Message] = []
self.state = _FakeState()
self._admin = admin
self.prefix = "!"
async def send(self, target: str, text: str) -> None:
self.sent.append((target, text))
async def reply(self, message, text: str) -> None:
self.replied.append(text)
def _is_admin(self, message) -> bool:
return self._admin
def _dispatch_command(self, msg: Message) -> None:
self.dispatched.append(msg)
def _msg(text: str, nick: str = "admin", target: str = "#test") -> Message:
"""Create a channel PRIVMSG."""
return Message(
raw="", prefix=f"{nick}!~{nick}@host", nick=nick,
command="PRIVMSG", params=[target, text], tags={},
)
def _pm(text: str, nick: str = "admin") -> Message:
"""Create a private PRIVMSG."""
return Message(
raw="", prefix=f"{nick}!~{nick}@host", nick=nick,
command="PRIVMSG", params=["botname", text], tags={},
)
def _clear() -> None:
"""Reset module-level state between tests."""
for task in _tasks.values():
if task and not task.done():
task.cancel()
_tasks.clear()
_jobs.clear()
# ---------------------------------------------------------------------------
# TestParseDuration
# ---------------------------------------------------------------------------
class TestParseDuration:
def test_minutes(self):
assert _parse_duration("5m") == 300
def test_hours(self):
assert _parse_duration("1h") == 3600
def test_combined(self):
assert _parse_duration("1h30m") == 5400
def test_days(self):
assert _parse_duration("2d") == 172800
def test_seconds(self):
assert _parse_duration("90s") == 90
def test_raw_int(self):
assert _parse_duration("300") == 300
def test_zero(self):
assert _parse_duration("0") is None
def test_negative(self):
assert _parse_duration("-5") is None
def test_invalid(self):
assert _parse_duration("abc") is None
def test_empty(self):
assert _parse_duration("") is None
def test_full_combo(self):
assert _parse_duration("1d2h3m4s") == 86400 + 7200 + 180 + 4
# ---------------------------------------------------------------------------
# TestFormatDuration
# ---------------------------------------------------------------------------
class TestFormatDuration:
def test_seconds(self):
assert _format_duration(45) == "45s"
def test_minutes(self):
assert _format_duration(300) == "5m"
def test_hours(self):
assert _format_duration(3600) == "1h"
def test_combined(self):
assert _format_duration(5400) == "1h30m"
def test_days(self):
assert _format_duration(86400) == "1d"
def test_zero(self):
assert _format_duration(0) == "0s"
def test_full_combo(self):
assert _format_duration(90061) == "1d1h1m1s"
# ---------------------------------------------------------------------------
# TestMakeId
# ---------------------------------------------------------------------------
class TestMakeId:
def test_returns_hex(self):
result = _make_id("#test", "!ping")
assert len(result) == 6
int(result, 16) # should not raise
def test_unique(self):
ids = {_make_id("#test", f"!cmd{i}") for i in range(10)}
assert len(ids) == 10
# ---------------------------------------------------------------------------
# TestStateHelpers
# ---------------------------------------------------------------------------
class TestStateHelpers:
def test_save_and_load(self):
bot = _FakeBot()
data = {"id": "abc123", "channel": "#test"}
_save(bot, "#test:abc123", data)
loaded = _load(bot, "#test:abc123")
assert loaded == data
def test_load_missing(self):
bot = _FakeBot()
assert _load(bot, "nonexistent") is None
def test_delete(self):
bot = _FakeBot()
_save(bot, "#test:del", {"id": "del"})
_delete(bot, "#test:del")
assert _load(bot, "#test:del") is None
def test_state_key(self):
assert _state_key("#ops", "abc123") == "#ops:abc123"
def test_load_invalid_json(self):
bot = _FakeBot()
bot.state.set("cron", "bad", "not json{{{")
assert _load(bot, "bad") is None
# ---------------------------------------------------------------------------
# TestCmdCronAdd
# ---------------------------------------------------------------------------
class TestCmdCronAdd:
def test_add_success(self):
_clear()
bot = _FakeBot()
async def inner():
await cmd_cron(bot, _msg("!cron add 5m #ops !rss check news"))
await asyncio.sleep(0)
assert len(bot.replied) == 1
assert "Cron #" in bot.replied[0]
assert "!rss check news" in bot.replied[0]
assert "5m" in bot.replied[0]
assert "#ops" in bot.replied[0]
# Verify state persisted
keys = bot.state.keys("cron")
assert len(keys) == 1
data = json.loads(bot.state.get("cron", keys[0]))
assert data["command"] == "!rss check news"
assert data["interval"] == 300
assert data["channel"] == "#ops"
# Verify task started
assert len(_tasks) == 1
_clear()
await asyncio.sleep(0)
asyncio.run(inner())
def test_add_requires_channel(self):
_clear()
bot = _FakeBot()
asyncio.run(cmd_cron(bot, _pm("!cron add 5m #ops !ping")))
assert "Use this command in a channel" in bot.replied[0]
def test_add_missing_args(self):
_clear()
bot = _FakeBot()
asyncio.run(cmd_cron(bot, _msg("!cron add 5m")))
assert "Usage:" in bot.replied[0]
def test_add_invalid_interval(self):
_clear()
bot = _FakeBot()
asyncio.run(cmd_cron(bot, _msg("!cron add abc #ops !ping")))
assert "Invalid interval" in bot.replied[0]
def test_add_interval_too_short(self):
_clear()
bot = _FakeBot()
asyncio.run(cmd_cron(bot, _msg("!cron add 30s #ops !ping")))
assert "Minimum interval" in bot.replied[0]
def test_add_interval_too_long(self):
_clear()
bot = _FakeBot()
asyncio.run(cmd_cron(bot, _msg("!cron add 30d #ops !ping")))
assert "Maximum interval" in bot.replied[0]
def test_add_bad_target(self):
_clear()
bot = _FakeBot()
asyncio.run(cmd_cron(bot, _msg("!cron add 5m alice !ping")))
assert "Target must be a channel" in bot.replied[0]
def test_add_job_limit(self):
_clear()
bot = _FakeBot()
for i in range(_MAX_JOBS):
_save(bot, f"#ops:job{i}", {"id": f"job{i}", "channel": "#ops"})
asyncio.run(cmd_cron(bot, _msg("!cron add 5m #ops !ping")))
assert "limit reached" in bot.replied[0]
def test_add_admin_required(self):
_clear()
bot = _FakeBot(admin=False)
asyncio.run(cmd_cron(bot, _msg("!cron add 5m #ops !ping")))
# The @command(admin=True) decorator handles this via bot._dispatch_command,
# but since we call cmd_cron directly, the check is at the decorator level.
# In direct call tests, admin check is already handled by the framework.
# This test just verifies the command runs without error for non-admin.
# Framework-level denial is tested in integration tests.
# ---------------------------------------------------------------------------
# TestCmdCronDel
# ---------------------------------------------------------------------------
class TestCmdCronDel:
def test_del_success(self):
_clear()
bot = _FakeBot()
async def inner():
await cmd_cron(bot, _msg("!cron add 5m #test !ping"))
await asyncio.sleep(0)
# Extract ID from reply
reply = bot.replied[0]
cron_id = reply.split("#")[1].split(":")[0]
bot.replied.clear()
await cmd_cron(bot, _msg(f"!cron del {cron_id}"))
assert "Removed" in bot.replied[0]
assert cron_id in bot.replied[0]
assert len(bot.state.keys("cron")) == 0
_clear()
await asyncio.sleep(0)
asyncio.run(inner())
def test_del_nonexistent(self):
_clear()
bot = _FakeBot()
asyncio.run(cmd_cron(bot, _msg("!cron del nosuch")))
assert "No cron job" in bot.replied[0]
def test_del_missing_id(self):
_clear()
bot = _FakeBot()
asyncio.run(cmd_cron(bot, _msg("!cron del")))
assert "Usage:" in bot.replied[0]
def test_del_with_hash_prefix(self):
_clear()
bot = _FakeBot()
async def inner():
await cmd_cron(bot, _msg("!cron add 5m #test !ping"))
await asyncio.sleep(0)
reply = bot.replied[0]
cron_id = reply.split("#")[1].split(":")[0]
bot.replied.clear()
await cmd_cron(bot, _msg(f"!cron del #{cron_id}"))
assert "Removed" in bot.replied[0]
_clear()
await asyncio.sleep(0)
asyncio.run(inner())
# ---------------------------------------------------------------------------
# TestCmdCronList
# ---------------------------------------------------------------------------
class TestCmdCronList:
def test_list_empty(self):
_clear()
bot = _FakeBot()
asyncio.run(cmd_cron(bot, _msg("!cron list")))
assert "No cron jobs" in bot.replied[0]
def test_list_populated(self):
_clear()
bot = _FakeBot()
_save(bot, "#test:abc123", {
"id": "abc123", "channel": "#test",
"command": "!rss check news", "interval": 300,
})
asyncio.run(cmd_cron(bot, _msg("!cron list")))
assert "#abc123" in bot.replied[0]
assert "5m" in bot.replied[0]
assert "!rss check news" in bot.replied[0]
def test_list_requires_channel(self):
_clear()
bot = _FakeBot()
asyncio.run(cmd_cron(bot, _pm("!cron list")))
assert "Use this command in a channel" in bot.replied[0]
def test_list_channel_isolation(self):
_clear()
bot = _FakeBot()
_save(bot, "#test:mine", {
"id": "mine", "channel": "#test",
"command": "!ping", "interval": 300,
})
_save(bot, "#other:theirs", {
"id": "theirs", "channel": "#other",
"command": "!ping", "interval": 600,
})
asyncio.run(cmd_cron(bot, _msg("!cron list")))
assert "mine" in bot.replied[0]
assert len(bot.replied) == 1 # only the #test job
# ---------------------------------------------------------------------------
# TestCmdCronUsage
# ---------------------------------------------------------------------------
class TestCmdCronUsage:
def test_no_args(self):
_clear()
bot = _FakeBot()
asyncio.run(cmd_cron(bot, _msg("!cron")))
assert "Usage:" in bot.replied[0]
def test_unknown_subcommand(self):
_clear()
bot = _FakeBot()
asyncio.run(cmd_cron(bot, _msg("!cron foobar")))
assert "Usage:" in bot.replied[0]
# ---------------------------------------------------------------------------
# TestRestore
# ---------------------------------------------------------------------------
class TestRestore:
def test_restore_spawns_tasks(self):
_clear()
bot = _FakeBot()
data = {
"id": "abc123", "channel": "#test",
"command": "!ping", "interval": 300,
"prefix": "admin!~admin@host", "nick": "admin",
"added_by": "admin",
}
_save(bot, "#test:abc123", data)
async def inner():
_restore(bot)
assert "#test:abc123" in _tasks
assert not _tasks["#test:abc123"].done()
_clear()
await asyncio.sleep(0)
asyncio.run(inner())
def test_restore_skips_active(self):
_clear()
bot = _FakeBot()
data = {
"id": "active", "channel": "#test",
"command": "!ping", "interval": 300,
"prefix": "admin!~admin@host", "nick": "admin",
"added_by": "admin",
}
_save(bot, "#test:active", data)
async def inner():
dummy = asyncio.create_task(asyncio.sleep(9999))
_tasks["#test:active"] = dummy
_restore(bot)
assert _tasks["#test:active"] is dummy
dummy.cancel()
await asyncio.sleep(0)
asyncio.run(inner())
def test_restore_replaces_done_task(self):
_clear()
bot = _FakeBot()
data = {
"id": "done", "channel": "#test",
"command": "!ping", "interval": 300,
"prefix": "admin!~admin@host", "nick": "admin",
"added_by": "admin",
}
_save(bot, "#test:done", data)
async def inner():
done_task = asyncio.create_task(asyncio.sleep(0))
await done_task
_tasks["#test:done"] = done_task
_restore(bot)
new_task = _tasks["#test:done"]
assert new_task is not done_task
assert not new_task.done()
_clear()
await asyncio.sleep(0)
asyncio.run(inner())
def test_restore_skips_bad_json(self):
_clear()
bot = _FakeBot()
bot.state.set("cron", "#test:bad", "not json{{{")
async def inner():
_restore(bot)
assert "#test:bad" not in _tasks
asyncio.run(inner())
def test_on_connect_calls_restore(self):
_clear()
bot = _FakeBot()
data = {
"id": "conn", "channel": "#test",
"command": "!ping", "interval": 300,
"prefix": "admin!~admin@host", "nick": "admin",
"added_by": "admin",
}
_save(bot, "#test:conn", data)
async def inner():
msg = _msg("", target="botname")
await on_connect(bot, msg)
assert "#test:conn" in _tasks
_clear()
await asyncio.sleep(0)
asyncio.run(inner())
# ---------------------------------------------------------------------------
# TestCronLoop
# ---------------------------------------------------------------------------
class TestCronLoop:
def test_dispatches_command(self):
"""Cron loop dispatches a synthetic message after interval."""
_clear()
bot = _FakeBot()
async def inner():
data = {
"id": "loop1", "channel": "#test",
"command": "!ping", "interval": 0.05,
"prefix": "admin!~admin@host", "nick": "admin",
"added_by": "admin",
}
key = "#test:loop1"
_jobs[key] = data
_start_job(bot, key)
await asyncio.sleep(0.15)
_stop_job(key)
await asyncio.sleep(0)
# Should have dispatched at least once
assert len(bot.dispatched) >= 1
msg = bot.dispatched[0]
assert msg.nick == "admin"
assert msg.params[0] == "#test"
assert msg.params[1] == "!ping"
asyncio.run(inner())
def test_loop_stops_on_job_removal(self):
"""Cron loop exits when job is removed from _jobs."""
_clear()
bot = _FakeBot()
async def inner():
data = {
"id": "loop2", "channel": "#test",
"command": "!ping", "interval": 0.05,
"prefix": "admin!~admin@host", "nick": "admin",
"added_by": "admin",
}
key = "#test:loop2"
_jobs[key] = data
_start_job(bot, key)
await asyncio.sleep(0.02)
_jobs.pop(key, None)
await asyncio.sleep(0.1)
task = _tasks.get(key)
if task:
assert task.done()
asyncio.run(inner())
# ---------------------------------------------------------------------------
# TestJobManagement
# ---------------------------------------------------------------------------
class TestJobManagement:
def test_start_and_stop(self):
_clear()
bot = _FakeBot()
data = {
"id": "mgmt", "channel": "#test",
"command": "!ping", "interval": 300,
"prefix": "admin!~admin@host", "nick": "admin",
"added_by": "admin",
}
key = "#test:mgmt"
_jobs[key] = data
async def inner():
_start_job(bot, key)
assert key in _tasks
assert not _tasks[key].done()
_stop_job(key)
await asyncio.sleep(0)
assert key not in _tasks
assert key not in _jobs
asyncio.run(inner())
def test_start_idempotent(self):
_clear()
bot = _FakeBot()
data = {
"id": "idem", "channel": "#test",
"command": "!ping", "interval": 300,
"prefix": "admin!~admin@host", "nick": "admin",
"added_by": "admin",
}
key = "#test:idem"
_jobs[key] = data
async def inner():
_start_job(bot, key)
first = _tasks[key]
_start_job(bot, key)
assert _tasks[key] is first
_stop_job(key)
await asyncio.sleep(0)
asyncio.run(inner())
def test_stop_nonexistent(self):
_clear()
_stop_job("#test:nonexistent")

212
tests/test_flaskpaste.py Normal file
View File

@@ -0,0 +1,212 @@
"""Tests for the FlaskPaste plugin and Bot.shorten_url method."""
import asyncio
import importlib.util
import sys
from pathlib import Path
from unittest.mock import patch
from derp.irc import Message
# plugins/ is not a Python package -- load the module from file path
_spec = importlib.util.spec_from_file_location(
"plugins.flaskpaste",
Path(__file__).resolve().parent.parent / "plugins" / "flaskpaste.py",
)
_mod = importlib.util.module_from_spec(_spec)
sys.modules[_spec.name] = _mod
_spec.loader.exec_module(_mod)
from plugins.flaskpaste import ( # noqa: E402
cmd_paste,
cmd_shorten,
create_paste,
shorten_url,
)
# -- Helpers -----------------------------------------------------------------
class _FakeState:
"""In-memory stand-in for bot.state."""
def __init__(self):
self._store: dict[str, dict[str, str]] = {}
def get(self, plugin: str, key: str, default: str | None = None) -> str | None:
return self._store.get(plugin, {}).get(key, default)
def set(self, plugin: str, key: str, value: str) -> None:
self._store.setdefault(plugin, {})[key] = value
def delete(self, plugin: str, key: str) -> bool:
try:
del self._store[plugin][key]
return True
except KeyError:
return False
def keys(self, plugin: str) -> list[str]:
return sorted(self._store.get(plugin, {}).keys())
class _FakeRegistry:
"""Minimal registry stand-in."""
def __init__(self):
self._modules: dict = {}
class _FakeBot:
"""Minimal bot stand-in that captures sent/replied messages."""
def __init__(self, *, admin: bool = False):
self.sent: list[tuple[str, str]] = []
self.replied: list[str] = []
self.state = _FakeState()
self.registry = _FakeRegistry()
self._admin = admin
self.config: dict = {}
async def send(self, target: str, text: str) -> None:
self.sent.append((target, text))
async def reply(self, message, text: str) -> None:
self.replied.append(text)
def _is_admin(self, message) -> bool:
return self._admin
def _msg(text: str, nick: str = "alice", target: str = "#test") -> Message:
"""Create a channel PRIVMSG."""
return Message(
raw="", prefix=f"{nick}!~{nick}@host", nick=nick,
command="PRIVMSG", params=[target, text], tags={},
)
# ---------------------------------------------------------------------------
# TestCmdShorten
# ---------------------------------------------------------------------------
class TestCmdShorten:
def test_missing_url(self):
bot = _FakeBot()
asyncio.run(cmd_shorten(bot, _msg("!shorten")))
assert "Usage:" in bot.replied[0]
def test_invalid_scheme(self):
bot = _FakeBot()
asyncio.run(cmd_shorten(bot, _msg("!shorten ftp://example.com")))
assert "http://" in bot.replied[0]
def test_success(self):
bot = _FakeBot()
async def inner():
with patch.object(
_mod, "_shorten_url",
return_value="https://paste.mymx.me/s/abc123",
):
await cmd_shorten(bot, _msg("!shorten https://example.com/long"))
assert "paste.mymx.me/s/abc123" in bot.replied[0]
asyncio.run(inner())
def test_failure(self):
bot = _FakeBot()
async def inner():
with patch.object(
_mod, "_shorten_url",
side_effect=ConnectionError("down"),
):
await cmd_shorten(bot, _msg("!shorten https://example.com/long"))
assert "shorten failed" in bot.replied[0]
asyncio.run(inner())
# ---------------------------------------------------------------------------
# TestCmdPaste
# ---------------------------------------------------------------------------
class TestCmdPaste:
def test_missing_text(self):
bot = _FakeBot()
asyncio.run(cmd_paste(bot, _msg("!paste")))
assert "Usage:" in bot.replied[0]
def test_success(self):
bot = _FakeBot()
async def inner():
with patch.object(
_mod, "_create_paste",
return_value="https://paste.mymx.me/xyz789",
):
await cmd_paste(bot, _msg("!paste hello world"))
assert "paste.mymx.me/xyz789" in bot.replied[0]
asyncio.run(inner())
def test_failure(self):
bot = _FakeBot()
async def inner():
with patch.object(
_mod, "_create_paste",
side_effect=ConnectionError("down"),
):
await cmd_paste(bot, _msg("!paste hello world"))
assert "paste failed" in bot.replied[0]
asyncio.run(inner())
# ---------------------------------------------------------------------------
# TestShortenUrlHelper
# ---------------------------------------------------------------------------
class TestShortenUrlHelper:
def test_returns_short_url(self):
bot = _FakeBot()
with patch.object(
_mod, "_shorten_url",
return_value="https://paste.mymx.me/s/short",
):
result = shorten_url(bot, "https://example.com/long")
assert result == "https://paste.mymx.me/s/short"
def test_returns_original_on_error(self):
bot = _FakeBot()
with patch.object(
_mod, "_shorten_url",
side_effect=ConnectionError("fail"),
):
result = shorten_url(bot, "https://example.com/long")
assert result == "https://example.com/long"
# ---------------------------------------------------------------------------
# TestCreatePasteHelper
# ---------------------------------------------------------------------------
class TestCreatePasteHelper:
def test_returns_paste_url(self):
bot = _FakeBot()
with patch.object(
_mod, "_create_paste",
return_value="https://paste.mymx.me/abc",
):
result = create_paste(bot, "hello")
assert result == "https://paste.mymx.me/abc"
def test_returns_none_on_error(self):
bot = _FakeBot()
with patch.object(
_mod, "_create_paste",
side_effect=ConnectionError("fail"),
):
result = create_paste(bot, "hello")
assert result is None

View File

@@ -274,7 +274,7 @@ class TestAdmin:
replies = h.sent_privmsgs("#test")
assert not any("Permission denied" in r for r in replies)
assert any("Patterns:" in r for r in replies)
assert any("Admin:" in r for r in replies)
def test_oper_detection(self):
"""IRC operator detected via WHO reply can use admin commands."""
@@ -290,7 +290,7 @@ class TestAdmin:
replies = h.sent_privmsgs("#test")
assert not any("Permission denied" in r for r in replies)
assert any("Opers:" in r for r in replies)
assert any("IRCOPs:" in r for r in replies)
def test_oper_detection_on_join(self):
"""User joining a channel triggers debounced WHO for oper detection."""

View File

@@ -140,6 +140,9 @@ class _FakeBot:
async def reply(self, message, text: str) -> None:
self.replied.append(text)
async def shorten_url(self, url: str) -> str:
return url
def _is_admin(self, message) -> bool:
return self._admin

View File

@@ -166,6 +166,9 @@ class _FakeBot:
async def reply(self, message, text: str) -> None:
self.replied.append(text)
async def shorten_url(self, url: str) -> str:
return url
def _is_admin(self, message) -> bool:
return self._admin

395
tests/test_webhook.py Normal file
View File

@@ -0,0 +1,395 @@
"""Tests for the webhook listener plugin."""
import asyncio
import hashlib
import hmac
import importlib.util
import json
import sys
import time
from pathlib import Path
from derp.irc import Message
# plugins/ is not a Python package -- load the module from file path
_spec = importlib.util.spec_from_file_location(
"plugins.webhook", Path(__file__).resolve().parent.parent / "plugins" / "webhook.py",
)
_mod = importlib.util.module_from_spec(_spec)
sys.modules[_spec.name] = _mod
_spec.loader.exec_module(_mod)
from plugins.webhook import ( # noqa: E402
_MAX_BODY,
_handle_request,
_http_response,
_verify_signature,
cmd_webhook,
on_connect,
)
# -- Helpers -----------------------------------------------------------------
class _FakeState:
"""In-memory stand-in for bot.state."""
def __init__(self):
self._store: dict[str, dict[str, str]] = {}
def get(self, plugin: str, key: str, default: str | None = None) -> str | None:
return self._store.get(plugin, {}).get(key, default)
def set(self, plugin: str, key: str, value: str) -> None:
self._store.setdefault(plugin, {})[key] = value
def delete(self, plugin: str, key: str) -> bool:
try:
del self._store[plugin][key]
return True
except KeyError:
return False
def keys(self, plugin: str) -> list[str]:
return sorted(self._store.get(plugin, {}).keys())
class _FakeBot:
"""Minimal bot stand-in that captures sent/action messages."""
def __init__(self, *, admin: bool = True, webhook_cfg: dict | None = None):
self.sent: list[tuple[str, str]] = []
self.replied: list[str] = []
self.actions: list[tuple[str, str]] = []
self.state = _FakeState()
self._admin = admin
self.prefix = "!"
self.config = {
"webhook": webhook_cfg or {"enabled": False},
}
async def send(self, target: str, text: str) -> None:
self.sent.append((target, text))
async def reply(self, message, text: str) -> None:
self.replied.append(text)
async def action(self, target: str, text: str) -> None:
self.actions.append((target, text))
def _is_admin(self, message) -> bool:
return self._admin
def _msg(text: str, nick: str = "admin", target: str = "#test") -> Message:
return Message(
raw="", prefix=f"{nick}!~{nick}@host", nick=nick,
command="PRIVMSG", params=[target, text], tags={},
)
def _sign(secret: str, body: bytes) -> str:
"""Generate HMAC-SHA256 signature."""
sig = hmac.new(secret.encode(), body, hashlib.sha256).hexdigest()
return f"sha256={sig}"
class _FakeReader:
"""Mock asyncio.StreamReader from raw HTTP bytes."""
def __init__(self, data: bytes) -> None:
self._data = data
self._pos = 0
async def readline(self) -> bytes:
start = self._pos
idx = self._data.find(b"\n", start)
if idx == -1:
self._pos = len(self._data)
return self._data[start:]
self._pos = idx + 1
return self._data[start:self._pos]
async def readexactly(self, n: int) -> bytes:
chunk = self._data[self._pos:self._pos + n]
self._pos += n
return chunk
class _FakeWriter:
"""Mock asyncio.StreamWriter that captures output."""
def __init__(self) -> None:
self.data = b""
self._closed = False
def write(self, data: bytes) -> None:
self.data += data
def close(self) -> None:
self._closed = True
async def wait_closed(self) -> None:
pass
def _build_request(method: str, body: bytes, headers: dict[str, str] | None = None) -> bytes:
"""Build raw HTTP request bytes."""
hdrs = headers or {}
if "Content-Length" not in hdrs:
hdrs["Content-Length"] = str(len(body))
lines = [f"{method} / HTTP/1.1"]
for k, v in hdrs.items():
lines.append(f"{k}: {v}")
lines.append("")
lines.append("")
return "\r\n".join(lines).encode("utf-8") + body
# ---------------------------------------------------------------------------
# TestVerifySignature
# ---------------------------------------------------------------------------
class TestVerifySignature:
def test_valid_signature(self):
body = b'{"channel":"#test","text":"hello"}'
sig = _sign("secret", body)
assert _verify_signature("secret", body, sig) is True
def test_invalid_signature(self):
body = b'{"channel":"#test","text":"hello"}'
assert _verify_signature("secret", body, "sha256=bad") is False
def test_empty_secret_allows_all(self):
body = b'{"channel":"#test","text":"hello"}'
assert _verify_signature("", body, "") is True
def test_missing_prefix(self):
body = b'{"channel":"#test","text":"hello"}'
sig = hmac.new(b"secret", body, hashlib.sha256).hexdigest()
assert _verify_signature("secret", body, sig) is False
# ---------------------------------------------------------------------------
# TestHttpResponse
# ---------------------------------------------------------------------------
class TestHttpResponse:
def test_200_response(self):
resp = _http_response(200, "OK", "sent")
assert b"200 OK" in resp
assert b"sent" in resp
def test_400_with_body(self):
resp = _http_response(400, "Bad Request", "invalid JSON")
assert b"400 Bad Request" in resp
assert b"invalid JSON" in resp
def test_405_response(self):
resp = _http_response(405, "Method Not Allowed", "POST only")
assert b"405 Method Not Allowed" in resp
# ---------------------------------------------------------------------------
# TestRequestHandler
# ---------------------------------------------------------------------------
class TestRequestHandler:
def test_valid_post(self):
bot = _FakeBot()
body = json.dumps({"channel": "#ops", "text": "deploy done"}).encode()
sig = _sign("secret", body)
raw = _build_request("POST", body, {
"Content-Length": str(len(body)),
"Content-Type": "application/json",
"X-Signature": sig,
})
reader = _FakeReader(raw)
writer = _FakeWriter()
asyncio.run(_handle_request(reader, writer, bot, "secret"))
assert b"200 OK" in writer.data
assert ("#ops", "deploy done") in bot.sent
def test_action_post(self):
bot = _FakeBot()
body = json.dumps({
"channel": "#ops", "text": "deployed", "action": True,
}).encode()
sig = _sign("s", body)
raw = _build_request("POST", body, {
"Content-Length": str(len(body)),
"X-Signature": sig,
})
reader = _FakeReader(raw)
writer = _FakeWriter()
asyncio.run(_handle_request(reader, writer, bot, "s"))
assert b"200 OK" in writer.data
assert ("#ops", "deployed") in bot.actions
def test_get_405(self):
bot = _FakeBot()
raw = _build_request("GET", b"")
reader = _FakeReader(raw)
writer = _FakeWriter()
asyncio.run(_handle_request(reader, writer, bot, ""))
assert b"405" in writer.data
def test_bad_signature_401(self):
bot = _FakeBot()
body = json.dumps({"channel": "#test", "text": "x"}).encode()
raw = _build_request("POST", body, {
"Content-Length": str(len(body)),
"X-Signature": "sha256=wrong",
})
reader = _FakeReader(raw)
writer = _FakeWriter()
asyncio.run(_handle_request(reader, writer, bot, "real-secret"))
assert b"401" in writer.data
assert len(bot.sent) == 0
def test_bad_json_400(self):
bot = _FakeBot()
body = b"not json"
raw = _build_request("POST", body, {"Content-Length": str(len(body))})
reader = _FakeReader(raw)
writer = _FakeWriter()
asyncio.run(_handle_request(reader, writer, bot, ""))
assert b"400" in writer.data
assert b"invalid JSON" in writer.data
def test_missing_channel_400(self):
bot = _FakeBot()
body = json.dumps({"text": "no channel"}).encode()
raw = _build_request("POST", body, {"Content-Length": str(len(body))})
reader = _FakeReader(raw)
writer = _FakeWriter()
asyncio.run(_handle_request(reader, writer, bot, ""))
assert b"400" in writer.data
assert b"invalid channel" in writer.data
def test_invalid_channel_400(self):
bot = _FakeBot()
body = json.dumps({"channel": "nochanprefix", "text": "x"}).encode()
raw = _build_request("POST", body, {"Content-Length": str(len(body))})
reader = _FakeReader(raw)
writer = _FakeWriter()
asyncio.run(_handle_request(reader, writer, bot, ""))
assert b"400" in writer.data
def test_empty_text_400(self):
bot = _FakeBot()
body = json.dumps({"channel": "#test", "text": ""}).encode()
raw = _build_request("POST", body, {"Content-Length": str(len(body))})
reader = _FakeReader(raw)
writer = _FakeWriter()
asyncio.run(_handle_request(reader, writer, bot, ""))
assert b"400" in writer.data
assert b"empty text" in writer.data
def test_body_too_large_413(self):
bot = _FakeBot()
raw = _build_request("POST", b"", {
"Content-Length": str(_MAX_BODY + 1),
})
reader = _FakeReader(raw)
writer = _FakeWriter()
asyncio.run(_handle_request(reader, writer, bot, ""))
assert b"413" in writer.data
def test_counter_increments(self):
bot = _FakeBot()
# Reset counter
_mod._request_count = 0
body = json.dumps({"channel": "#test", "text": "hi"}).encode()
raw = _build_request("POST", body, {"Content-Length": str(len(body))})
reader = _FakeReader(raw)
writer = _FakeWriter()
asyncio.run(_handle_request(reader, writer, bot, ""))
assert _mod._request_count == 1
# ---------------------------------------------------------------------------
# TestServerLifecycle
# ---------------------------------------------------------------------------
class TestServerLifecycle:
def test_disabled_config(self):
"""Server does not start when webhook is disabled."""
bot = _FakeBot(webhook_cfg={"enabled": False})
msg = _msg("", target="")
msg = Message(raw="", prefix="", nick="", command="001",
params=["test", "Welcome"], tags={})
# Reset global state
_mod._server = None
asyncio.run(on_connect(bot, msg))
assert _mod._server is None
def test_duplicate_guard(self):
"""Second on_connect does not create a second server."""
sentinel = object()
_mod._server = sentinel
bot = _FakeBot(webhook_cfg={"enabled": True, "port": 0})
msg = Message(raw="", prefix="", nick="", command="001",
params=["test", "Welcome"], tags={})
asyncio.run(on_connect(bot, msg))
assert _mod._server is sentinel
_mod._server = None # cleanup
def test_on_connect_starts(self):
"""on_connect starts the server when enabled."""
_mod._server = None
bot = _FakeBot(webhook_cfg={
"enabled": True, "host": "127.0.0.1", "port": 0, "secret": "",
})
msg = Message(raw="", prefix="", nick="", command="001",
params=["test", "Welcome"], tags={})
async def _run():
await on_connect(bot, msg)
assert _mod._server is not None
_mod._server.close()
await _mod._server.wait_closed()
_mod._server = None
asyncio.run(_run())
# ---------------------------------------------------------------------------
# TestWebhookCommand
# ---------------------------------------------------------------------------
class TestWebhookCommand:
def test_not_running(self):
bot = _FakeBot()
_mod._server = None
asyncio.run(cmd_webhook(bot, _msg("!webhook")))
assert any("not running" in r for r in bot.replied)
def test_running_shows_status(self):
bot = _FakeBot()
_mod._request_count = 42
_mod._started = time.monotonic() - 90 # 1m 30s ago
async def _run():
# Start a real server on port 0 to get a valid socket
srv = await asyncio.start_server(lambda r, w: None,
"127.0.0.1", 0)
_mod._server = srv
try:
await cmd_webhook(bot, _msg("!webhook"))
finally:
srv.close()
await srv.wait_closed()
_mod._server = None
asyncio.run(_run())
assert len(bot.replied) == 1
reply = bot.replied[0]
assert "Webhook:" in reply
assert "42 requests" in reply
assert "127.0.0.1:" in reply

View File

@@ -171,6 +171,9 @@ class _FakeBot:
async def reply(self, message, text: str) -> None:
self.replied.append(text)
async def shorten_url(self, url: str) -> str:
return url
def _is_admin(self, message) -> bool:
return self._admin