Compare commits
7 Commits
9abf8dce64
...
e9528bd879
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
e9528bd879 | ||
|
|
c483beb555 | ||
|
|
2514aa777d | ||
|
|
5bc59730c4 | ||
|
|
6ef3fee72c | ||
|
|
7b14efb30f | ||
|
|
aebe1589d2 |
20
.gitea/workflows/ci.yml
Normal file
20
.gitea/workflows/ci.yml
Normal file
@@ -0,0 +1,20 @@
|
||||
name: CI
|
||||
on:
|
||||
push:
|
||||
branches: [master]
|
||||
pull_request:
|
||||
branches: [master]
|
||||
jobs:
|
||||
test:
|
||||
runs-on: ubuntu-latest
|
||||
strategy:
|
||||
matrix:
|
||||
python-version: ["3.11", "3.12", "3.13"]
|
||||
steps:
|
||||
- uses: actions/checkout@v4
|
||||
- uses: actions/setup-python@v5
|
||||
with:
|
||||
python-version: ${{ matrix.python-version }}
|
||||
- run: pip install -e . && pip install pytest ruff
|
||||
- run: ruff check src/ tests/ plugins/
|
||||
- run: pytest -v
|
||||
10
ROADMAP.md
10
ROADMAP.md
@@ -113,9 +113,9 @@
|
||||
- [ ] Multi-server support (per-server config, shared plugins)
|
||||
- [ ] Stable plugin API (versioned, breaking change policy)
|
||||
- [x] Paste overflow (auto-paste long output to FlaskPaste, return link)
|
||||
- [ ] URL shortener integration (shorten URLs in alerts and long output)
|
||||
- [ ] Webhook listener (HTTP endpoint for push events to channels)
|
||||
- [ ] Granular ACLs (per-command permission tiers: trusted, operator, admin)
|
||||
- [x] URL shortener integration (shorten URLs in subscription announcements)
|
||||
- [x] Webhook listener (HTTP endpoint for push events to channels)
|
||||
- [x] Granular ACLs (per-command permission tiers: trusted, operator, admin)
|
||||
- [x] `paste` command (manual paste to FlaskPaste)
|
||||
- [x] `shorten` command (manual URL shortening)
|
||||
- [x] `emailcheck` plugin (SMTP VRFY/RCPT TO)
|
||||
@@ -125,6 +125,6 @@
|
||||
- [x] `jwt` plugin (decode tokens, show claims/expiry, flag weaknesses)
|
||||
- [x] `mac` plugin (OUI vendor lookup, local IEEE database)
|
||||
- [x] `pastemoni` plugin (monitor paste sites for keywords)
|
||||
- [ ] `cron` plugin (scheduled bot commands on a timer)
|
||||
- [x] `cron` plugin (scheduled bot commands on a timer)
|
||||
- [x] Plugin command unit tests (encode, hash, dns, cidr, defang)
|
||||
- [ ] CI pipeline
|
||||
- [x] CI pipeline (Gitea Actions, Python 3.11-3.13, ruff + pytest)
|
||||
|
||||
26
TASKS.md
26
TASKS.md
@@ -1,6 +1,30 @@
|
||||
# derp - Tasks
|
||||
|
||||
## Current Sprint -- v2.0.0 Quick Wins (2026-02-21)
|
||||
## Current Sprint -- v2.0.0 ACL + Webhook (2026-02-21)
|
||||
|
||||
| Pri | Status | Task |
|
||||
|-----|--------|------|
|
||||
| P0 | [x] | Granular ACL tiers in `src/derp/plugin.py` (TIERS, Handler.tier, decorator) |
|
||||
| P0 | [x] | ACL dispatch in `src/derp/bot.py` (_get_tier, _operators, _trusted) |
|
||||
| P0 | [x] | Config defaults: operators, trusted, webhook section |
|
||||
| P0 | [x] | `plugins/core.py` -- whoami/admins tier display |
|
||||
| P0 | [x] | `plugins/webhook.py` -- HTTP webhook listener (HMAC, JSON, POST) |
|
||||
| P1 | [x] | Tests: `test_acl.py` (32 cases), `test_webhook.py` (22 cases) |
|
||||
| P2 | [x] | Documentation update (USAGE.md, CHEATSHEET.md, ROADMAP.md, TODO.md) |
|
||||
|
||||
## Previous Sprint -- v2.0.0 Tier 2 (2026-02-21)
|
||||
|
||||
| Pri | Status | Task |
|
||||
|-----|--------|------|
|
||||
| P0 | [x] | `Bot.shorten_url()` method in `src/derp/bot.py` |
|
||||
| P0 | [x] | URL shortening in rss.py, youtube.py, pastemoni.py announcements |
|
||||
| P0 | [x] | `plugins/cron.py` -- scheduled command execution (add/del/list) |
|
||||
| P0 | [x] | `.gitea/workflows/ci.yml` -- Gitea Actions CI pipeline |
|
||||
| P1 | [x] | Tests: `test_flaskpaste.py` (9 cases), `test_cron.py` (~38 cases) |
|
||||
| P1 | [x] | FakeBot `shorten_url` in test_rss, test_youtube, test_pastemoni |
|
||||
| P2 | [x] | Documentation update (USAGE.md, CHEATSHEET.md, ROADMAP.md, TODO.md) |
|
||||
|
||||
## Previous Sprint -- v2.0.0 Quick Wins (2026-02-21)
|
||||
|
||||
| Pri | Status | Task |
|
||||
|-----|--------|------|
|
||||
|
||||
12
TODO.md
12
TODO.md
@@ -4,10 +4,10 @@
|
||||
|
||||
- [ ] Multi-server support (per-server config, shared plugins)
|
||||
- [ ] Stable plugin API (versioned, breaking change policy)
|
||||
- [ ] Paste overflow (auto-paste long output to FlaskPaste)
|
||||
- [ ] URL shortener integration (shorten URLs in alerts/output)
|
||||
- [ ] Webhook listener (HTTP endpoint for push events to channels)
|
||||
- [ ] Granular ACLs (per-command: trusted, operator, admin)
|
||||
- [x] Paste overflow (auto-paste long output to FlaskPaste)
|
||||
- [x] URL shortener integration (shorten URLs in subscription announcements)
|
||||
- [x] Webhook listener (HTTP endpoint for push events to channels)
|
||||
- [x] Granular ACLs (per-command: trusted, operator, admin)
|
||||
|
||||
## LLM Bridge
|
||||
|
||||
@@ -80,9 +80,9 @@ is preserved in git history for reference.
|
||||
|
||||
- [x] `paste` -- manual paste to FlaskPaste
|
||||
- [x] `shorten` -- manual URL shortening
|
||||
- [ ] `cron` -- scheduled bot commands on a timer
|
||||
- [x] `cron` -- scheduled bot commands on a timer
|
||||
|
||||
## Testing
|
||||
|
||||
- [x] Plugin command unit tests (encode, hash, dns, cidr, defang)
|
||||
- [ ] CI pipeline
|
||||
- [x] CI pipeline (Gitea Actions)
|
||||
|
||||
@@ -75,21 +75,27 @@ code changes -- restart the container or use `!reload` for plugins.
|
||||
!h # Shorthand (any unambiguous prefix works)
|
||||
```
|
||||
|
||||
## Admin
|
||||
## Permission Tiers
|
||||
|
||||
```
|
||||
!whoami # Show your hostmask + admin status
|
||||
!admins # Show admin patterns + detected opers (admin)
|
||||
user < trusted < oper < admin
|
||||
```
|
||||
|
||||
```toml
|
||||
# config/derp.toml
|
||||
[bot]
|
||||
admins = ["*!~user@trusted.host", "ops!*@*.ops.net"]
|
||||
admins = ["*!~root@*.ops.net"] # admin tier
|
||||
operators = ["*!~staff@trusted.host"] # oper tier
|
||||
trusted = ["*!~user@known.host"] # trusted tier
|
||||
```
|
||||
|
||||
IRC operators are auto-detected via WHO on connect and on user JOIN
|
||||
(debounced 2s to handle netsplit floods). Hostmask patterns use fnmatch.
|
||||
```
|
||||
!whoami # Show your hostmask + permission tier
|
||||
!admins # Show configured tiers + detected opers (admin)
|
||||
```
|
||||
|
||||
IRC operators are auto-detected via WHO (admin tier). Hostmask patterns
|
||||
use fnmatch. `admin=True` on commands still works (maps to tier="admin").
|
||||
|
||||
## Channel Management (admin)
|
||||
|
||||
@@ -437,6 +443,45 @@ History in `data/alert_history.db`.
|
||||
|
||||
Shows top 3 results as `Title -- URL`. Channel only. Max query length: 200 chars.
|
||||
|
||||
## Cron (admin)
|
||||
|
||||
```
|
||||
!cron add 1h #ops !rss check news # Schedule command every hour
|
||||
!cron add 2d #alerts !tor update # Every 2 days
|
||||
!cron del abc123 # Remove job by ID
|
||||
!cron list # List jobs in channel
|
||||
```
|
||||
|
||||
Intervals: `5m`, `1h30m`, `2d`, `90s`, or raw seconds. Min 1m, max 7d.
|
||||
Max 20 jobs/channel. Persists across restarts. Channel only.
|
||||
|
||||
## Webhook (admin)
|
||||
|
||||
```toml
|
||||
# config/derp.toml
|
||||
[webhook]
|
||||
enabled = true
|
||||
host = "127.0.0.1"
|
||||
port = 8080
|
||||
secret = "your-shared-secret"
|
||||
```
|
||||
|
||||
```bash
|
||||
# Send message to IRC channel via webhook
|
||||
SECRET="your-shared-secret"
|
||||
BODY='{"channel":"#ops","text":"Deploy done"}'
|
||||
SIG=$(printf '%s' "$BODY" | openssl dgst -sha256 -hmac "$SECRET" | awk '{print $2}')
|
||||
curl -X POST http://127.0.0.1:8080/ \
|
||||
-H "X-Signature: sha256=$SIG" -d "$BODY"
|
||||
```
|
||||
|
||||
```
|
||||
!webhook # Show listener status (admin)
|
||||
```
|
||||
|
||||
POST JSON: `{"channel":"#chan","text":"msg"}`. Optional `"action":true`.
|
||||
Auth: HMAC-SHA256 via `X-Signature` header. Starts on IRC connect.
|
||||
|
||||
## Plugin Template
|
||||
|
||||
```python
|
||||
|
||||
151
docs/USAGE.md
151
docs/USAGE.md
@@ -53,10 +53,18 @@ rate_burst = 5 # Burst capacity (default: 5)
|
||||
paste_threshold = 4 # Max lines before overflow to FlaskPaste (default: 4)
|
||||
admins = [] # Hostmask patterns (fnmatch), IRCOPs auto-detected
|
||||
timezone = "UTC" # Timezone for calendar reminders (IANA tz name)
|
||||
operators = [] # Hostmask patterns for "oper" tier (fnmatch)
|
||||
trusted = [] # Hostmask patterns for "trusted" tier (fnmatch)
|
||||
|
||||
[logging]
|
||||
level = "info" # Logging level: debug, info, warning, error
|
||||
format = "text" # Log format: "text" (default) or "json"
|
||||
|
||||
[webhook]
|
||||
enabled = false # Enable HTTP webhook listener
|
||||
host = "127.0.0.1" # Bind address
|
||||
port = 8080 # Bind port
|
||||
secret = "" # HMAC-SHA256 shared secret (empty = no auth)
|
||||
```
|
||||
|
||||
## Built-in Commands
|
||||
@@ -141,6 +149,8 @@ format = "text" # Log format: "text" (default) or "json"
|
||||
| `!shorten <url>` | Shorten a URL via FlaskPaste |
|
||||
| `!paste <text>` | Create a paste via FlaskPaste |
|
||||
| `!pastemoni <add\|del\|list\|check>` | Paste site keyword monitoring |
|
||||
| `!cron <add\|del\|list>` | Scheduled command execution (admin) |
|
||||
| `!webhook` | Show webhook listener status (admin) |
|
||||
|
||||
### Command Shorthand
|
||||
|
||||
@@ -220,24 +230,31 @@ Each line contains:
|
||||
|
||||
Default format is `"text"` (human-readable, same as before).
|
||||
|
||||
## Admin System
|
||||
## Permission Tiers (ACL)
|
||||
|
||||
Commands marked as `admin` require elevated permissions. Admin access is
|
||||
granted via:
|
||||
The bot uses a 4-tier permission model. Each command has a required tier;
|
||||
users must meet or exceed it.
|
||||
|
||||
1. **IRC operator status** -- detected automatically via `WHO`
|
||||
2. **Hostmask patterns** -- configured in `[bot] admins`, fnmatch-style
|
||||
```
|
||||
user < trusted < oper < admin
|
||||
```
|
||||
|
||||
| Tier | Granted by |
|
||||
|------|------------|
|
||||
| `user` | Everyone (default) |
|
||||
| `trusted` | `[bot] trusted` hostmask patterns |
|
||||
| `oper` | `[bot] operators` hostmask patterns |
|
||||
| `admin` | `[bot] admins` hostmask patterns or IRC operator status |
|
||||
|
||||
```toml
|
||||
[bot]
|
||||
admins = [
|
||||
"*!~user@trusted.host",
|
||||
"ops!*@*.ops.net",
|
||||
]
|
||||
admins = ["*!~root@*.ops.net"]
|
||||
operators = ["*!~staff@trusted.host"]
|
||||
trusted = ["*!~user@known.host"]
|
||||
```
|
||||
|
||||
Empty by default -- only IRC operators get admin access unless patterns
|
||||
are configured.
|
||||
All lists are empty by default -- only IRC operators get admin access
|
||||
unless patterns are configured. Patterns use fnmatch-style matching.
|
||||
|
||||
### Oper Detection
|
||||
|
||||
@@ -255,18 +272,24 @@ set automatically.
|
||||
|
||||
| Command | Description |
|
||||
|---------|-------------|
|
||||
| `!whoami` | Show your hostmask and admin status |
|
||||
| `!admins` | Show configured patterns and detected opers (admin) |
|
||||
| `!whoami` | Show your hostmask and permission tier |
|
||||
| `!admins` | Show configured tiers and detected opers (admin) |
|
||||
|
||||
Admin-restricted commands: `!load`, `!reload`, `!unload`, `!admins`, `!state`,
|
||||
`!kick`, `!ban`, `!unban`, `!topic`, `!mode`.
|
||||
`!kick`, `!ban`, `!unban`, `!topic`, `!mode`, `!webhook`.
|
||||
|
||||
### Writing Admin Commands
|
||||
### Writing Tiered Commands
|
||||
|
||||
```python
|
||||
# admin=True still works (maps to tier="admin")
|
||||
@command("dangerous", help="Admin-only action", admin=True)
|
||||
async def cmd_dangerous(bot, message):
|
||||
...
|
||||
|
||||
# Explicit tier for finer control
|
||||
@command("moderate", help="Trusted-only action", tier="trusted")
|
||||
async def cmd_moderate(bot, message):
|
||||
...
|
||||
```
|
||||
|
||||
## IRCv3 Capability Negotiation
|
||||
@@ -404,6 +427,68 @@ restarting the bot.
|
||||
The `core` plugin cannot be unloaded (prevents losing `!load`/`!reload`),
|
||||
but it can be reloaded.
|
||||
|
||||
## Webhook Listener
|
||||
|
||||
Receive HTTP POST requests from external services (CI, monitoring, GitHub,
|
||||
etc.) and relay messages to IRC channels.
|
||||
|
||||
### Configuration
|
||||
|
||||
```toml
|
||||
[webhook]
|
||||
enabled = true
|
||||
host = "127.0.0.1"
|
||||
port = 8080
|
||||
secret = "your-shared-secret"
|
||||
```
|
||||
|
||||
### HTTP API
|
||||
|
||||
Single endpoint: `POST /`
|
||||
|
||||
**Request body** (JSON):
|
||||
|
||||
```json
|
||||
{"channel": "#ops", "text": "Deploy v2.3.1 complete"}
|
||||
```
|
||||
|
||||
Optional `"action": true` sends as a `/me` action.
|
||||
|
||||
**Authentication**: HMAC-SHA256 via `X-Signature: sha256=<hex>` header.
|
||||
If `secret` is empty, no authentication is required.
|
||||
|
||||
**Response codes**:
|
||||
|
||||
| Status | When |
|
||||
|--------|------|
|
||||
| 200 OK | Message sent |
|
||||
| 400 Bad Request | Invalid JSON, missing/invalid channel, empty text |
|
||||
| 401 Unauthorized | Bad or missing HMAC signature |
|
||||
| 405 Method Not Allowed | Non-POST request |
|
||||
| 413 Payload Too Large | Body > 64 KB |
|
||||
|
||||
### Usage Example
|
||||
|
||||
```bash
|
||||
SECRET="your-shared-secret"
|
||||
BODY='{"channel":"#ops","text":"Deploy v2.3.1 complete"}'
|
||||
SIG=$(printf '%s' "$BODY" | openssl dgst -sha256 -hmac "$SECRET" | awk '{print $2}')
|
||||
curl -X POST http://127.0.0.1:8080/ \
|
||||
-H "Content-Type: application/json" \
|
||||
-H "X-Signature: sha256=$SIG" \
|
||||
-d "$BODY"
|
||||
```
|
||||
|
||||
### Status Command
|
||||
|
||||
```
|
||||
!webhook Show listener address, request count, uptime (admin)
|
||||
```
|
||||
|
||||
The server starts on IRC connect (event 001) and runs for the lifetime of
|
||||
the bot. If the server is already running (e.g. after reconnect), it is
|
||||
not restarted.
|
||||
|
||||
## Writing Plugins
|
||||
|
||||
Create a `.py` file in the `plugins/` directory:
|
||||
@@ -1077,6 +1162,42 @@ badhost.invalid -> NXDOMAIN
|
||||
- Concurrent via `asyncio.gather()`
|
||||
- Valid types: A, NS, CNAME, SOA, PTR, MX, TXT, AAAA
|
||||
|
||||
### `!cron` -- Scheduled Command Execution
|
||||
|
||||
Schedule bot commands to repeat on a timer. Admins only.
|
||||
|
||||
```
|
||||
!cron add <interval> <#channel> <command...> Schedule a command
|
||||
!cron del <id> Remove a job
|
||||
!cron list List jobs in channel
|
||||
```
|
||||
|
||||
Examples:
|
||||
|
||||
```
|
||||
!cron add 1h #ops !rss check news Poll RSS feed every hour
|
||||
!cron add 2d #alerts !tor update Update Tor list every 2 days
|
||||
!cron del abc123 Remove job by ID
|
||||
!cron list Show jobs in current channel
|
||||
```
|
||||
|
||||
Output format:
|
||||
|
||||
```
|
||||
Cron #a1b2c3: !rss check news every 1h in #ops
|
||||
#a1b2c3 every 1h: !rss check news
|
||||
```
|
||||
|
||||
- `add` and `del` require admin privileges
|
||||
- `add` and `list` must be used in a channel (not PM)
|
||||
- Interval formats: `5m`, `1h30m`, `2d`, `90s`, or raw seconds
|
||||
- Minimum interval: 1 minute
|
||||
- Maximum interval: 7 days
|
||||
- Maximum 20 jobs per channel
|
||||
- Jobs persist across bot restarts via `bot.state`
|
||||
- Dispatched commands run with the original creator's identity
|
||||
- The scheduled command goes through normal command routing and permissions
|
||||
|
||||
### FlaskPaste Configuration
|
||||
|
||||
```toml
|
||||
|
||||
@@ -139,34 +139,33 @@ async def cmd_plugins(bot, message):
|
||||
await bot.reply(message, f"Plugins: {', '.join(parts)}")
|
||||
|
||||
|
||||
@command("whoami", help="Show your hostmask and admin status")
|
||||
@command("whoami", help="Show your hostmask and permission tier")
|
||||
async def cmd_whoami(bot, message):
|
||||
"""Display the sender's hostmask and permission level."""
|
||||
prefix = message.prefix or "unknown"
|
||||
is_admin = bot._is_admin(message)
|
||||
is_oper = message.prefix in bot._opers if message.prefix else False
|
||||
tags = []
|
||||
if is_admin:
|
||||
tags.append("admin")
|
||||
else:
|
||||
tags.append("user")
|
||||
if is_oper:
|
||||
tier = bot._get_tier(message)
|
||||
tags = [tier]
|
||||
if message.prefix and message.prefix in bot._opers:
|
||||
tags.append("IRCOP")
|
||||
await bot.reply(message, f"{prefix} [{', '.join(tags)}]")
|
||||
|
||||
|
||||
@command("admins", help="Show configured admin patterns and detected opers", admin=True)
|
||||
@command("admins", help="Show configured permission tiers and detected opers", admin=True)
|
||||
async def cmd_admins(bot, message):
|
||||
"""Display admin hostmask patterns and known IRC operators."""
|
||||
"""Display configured permission tiers and known IRC operators."""
|
||||
parts = []
|
||||
if bot._admins:
|
||||
parts.append(f"Patterns: {', '.join(bot._admins)}")
|
||||
parts.append(f"Admin: {', '.join(bot._admins)}")
|
||||
else:
|
||||
parts.append("Patterns: (none)")
|
||||
parts.append("Admin: (none)")
|
||||
if bot._operators:
|
||||
parts.append(f"Oper: {', '.join(bot._operators)}")
|
||||
if bot._trusted:
|
||||
parts.append(f"Trusted: {', '.join(bot._trusted)}")
|
||||
if bot._opers:
|
||||
parts.append(f"Opers: {', '.join(sorted(bot._opers))}")
|
||||
parts.append(f"IRCOPs: {', '.join(sorted(bot._opers))}")
|
||||
else:
|
||||
parts.append("Opers: (none)")
|
||||
parts.append("IRCOPs: (none)")
|
||||
await bot.reply(message, " | ".join(parts))
|
||||
|
||||
|
||||
|
||||
288
plugins/cron.py
Normal file
288
plugins/cron.py
Normal file
@@ -0,0 +1,288 @@
|
||||
"""Plugin: scheduled command execution on a timer."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
import hashlib
|
||||
import json
|
||||
import re
|
||||
import time
|
||||
|
||||
from derp.irc import Message
|
||||
from derp.plugin import command, event
|
||||
|
||||
# -- Constants ---------------------------------------------------------------
|
||||
|
||||
_DURATION_RE = re.compile(r"(?:(\d+)d)?(?:(\d+)h)?(?:(\d+)m)?(?:(\d+)s)?$")
|
||||
_MIN_INTERVAL = 60
|
||||
_MAX_INTERVAL = 604800 # 7 days
|
||||
_MAX_JOBS = 20
|
||||
|
||||
# -- Module-level tracking ---------------------------------------------------
|
||||
|
||||
_jobs: dict[str, dict] = {}
|
||||
_tasks: dict[str, asyncio.Task] = {}
|
||||
|
||||
|
||||
# -- Pure helpers ------------------------------------------------------------
|
||||
|
||||
def _make_id(channel: str, cmd: str) -> str:
|
||||
"""Generate a short hex ID from channel + command + timestamp."""
|
||||
raw = f"{channel}:{cmd}:{time.monotonic()}".encode()
|
||||
return hashlib.sha256(raw).hexdigest()[:6]
|
||||
|
||||
|
||||
def _parse_duration(spec: str) -> int | None:
|
||||
"""Parse a duration like '5m', '1h30m', '2d', '90s', or raw seconds."""
|
||||
try:
|
||||
secs = int(spec)
|
||||
return secs if secs > 0 else None
|
||||
except ValueError:
|
||||
pass
|
||||
m = _DURATION_RE.match(spec.lower())
|
||||
if not m or not any(m.groups()):
|
||||
return None
|
||||
days = int(m.group(1) or 0)
|
||||
hours = int(m.group(2) or 0)
|
||||
mins = int(m.group(3) or 0)
|
||||
secs = int(m.group(4) or 0)
|
||||
total = days * 86400 + hours * 3600 + mins * 60 + secs
|
||||
return total if total > 0 else None
|
||||
|
||||
|
||||
def _format_duration(secs: int) -> str:
|
||||
"""Format seconds into compact duration."""
|
||||
parts = []
|
||||
if secs >= 86400:
|
||||
parts.append(f"{secs // 86400}d")
|
||||
secs %= 86400
|
||||
if secs >= 3600:
|
||||
parts.append(f"{secs // 3600}h")
|
||||
secs %= 3600
|
||||
if secs >= 60:
|
||||
parts.append(f"{secs // 60}m")
|
||||
secs %= 60
|
||||
if secs or not parts:
|
||||
parts.append(f"{secs}s")
|
||||
return "".join(parts)
|
||||
|
||||
|
||||
# -- State helpers -----------------------------------------------------------
|
||||
|
||||
def _state_key(channel: str, cron_id: str) -> str:
|
||||
"""Build composite state key."""
|
||||
return f"{channel}:{cron_id}"
|
||||
|
||||
|
||||
def _save(bot, key: str, data: dict) -> None:
|
||||
"""Persist cron job to bot.state."""
|
||||
bot.state.set("cron", key, json.dumps(data))
|
||||
|
||||
|
||||
def _load(bot, key: str) -> dict | None:
|
||||
"""Load cron job from bot.state."""
|
||||
raw = bot.state.get("cron", key)
|
||||
if raw is None:
|
||||
return None
|
||||
try:
|
||||
return json.loads(raw)
|
||||
except json.JSONDecodeError:
|
||||
return None
|
||||
|
||||
|
||||
def _delete(bot, key: str) -> None:
|
||||
"""Remove cron job from bot.state."""
|
||||
bot.state.delete("cron", key)
|
||||
|
||||
|
||||
# -- Cron loop ---------------------------------------------------------------
|
||||
|
||||
async def _cron_loop(bot, key: str) -> None:
|
||||
"""Repeating loop: sleep, then dispatch the stored command."""
|
||||
try:
|
||||
while True:
|
||||
data = _jobs.get(key)
|
||||
if not data:
|
||||
return
|
||||
await asyncio.sleep(data["interval"])
|
||||
# Synthesize a message for command dispatch
|
||||
msg = Message(
|
||||
raw="", prefix=data["prefix"],
|
||||
nick=data["nick"], command="PRIVMSG",
|
||||
params=[data["channel"], data["command"]], tags={},
|
||||
)
|
||||
bot._dispatch_command(msg)
|
||||
except asyncio.CancelledError:
|
||||
pass
|
||||
|
||||
|
||||
def _start_job(bot, key: str) -> None:
|
||||
"""Create and track a cron task."""
|
||||
existing = _tasks.get(key)
|
||||
if existing and not existing.done():
|
||||
return
|
||||
task = asyncio.create_task(_cron_loop(bot, key))
|
||||
_tasks[key] = task
|
||||
|
||||
|
||||
def _stop_job(key: str) -> None:
|
||||
"""Cancel and remove a cron task."""
|
||||
task = _tasks.pop(key, None)
|
||||
if task and not task.done():
|
||||
task.cancel()
|
||||
_jobs.pop(key, None)
|
||||
|
||||
|
||||
# -- Restore on connect -----------------------------------------------------
|
||||
|
||||
def _restore(bot) -> None:
|
||||
"""Rebuild cron tasks from persisted state."""
|
||||
for key in bot.state.keys("cron"):
|
||||
existing = _tasks.get(key)
|
||||
if existing and not existing.done():
|
||||
continue
|
||||
data = _load(bot, key)
|
||||
if data is None:
|
||||
continue
|
||||
_jobs[key] = data
|
||||
_start_job(bot, key)
|
||||
|
||||
|
||||
@event("001")
|
||||
async def on_connect(bot, message):
|
||||
"""Restore cron jobs on connect."""
|
||||
_restore(bot)
|
||||
|
||||
|
||||
# -- Command handler ---------------------------------------------------------
|
||||
|
||||
@command("cron", help="Cron: !cron add|del|list", admin=True)
|
||||
async def cmd_cron(bot, message):
|
||||
"""Scheduled command execution on a timer.
|
||||
|
||||
Usage:
|
||||
!cron add <interval> <#channel> <command...> Schedule a command
|
||||
!cron del <id> Remove a job
|
||||
!cron list List jobs
|
||||
"""
|
||||
parts = message.text.split(None, 4)
|
||||
if len(parts) < 2:
|
||||
await bot.reply(message, "Usage: !cron <add|del|list> [args]")
|
||||
return
|
||||
|
||||
sub = parts[1].lower()
|
||||
|
||||
# -- list ----------------------------------------------------------------
|
||||
if sub == "list":
|
||||
if not message.is_channel:
|
||||
await bot.reply(message, "Use this command in a channel")
|
||||
return
|
||||
channel = message.target
|
||||
prefix = f"{channel}:"
|
||||
entries = []
|
||||
for key in bot.state.keys("cron"):
|
||||
if key.startswith(prefix):
|
||||
data = _load(bot, key)
|
||||
if data:
|
||||
cron_id = data["id"]
|
||||
interval = _format_duration(data["interval"])
|
||||
cmd = data["command"]
|
||||
entries.append(f"#{cron_id} every {interval}: {cmd}")
|
||||
if not entries:
|
||||
await bot.reply(message, "No cron jobs in this channel")
|
||||
return
|
||||
for entry in entries:
|
||||
await bot.reply(message, entry)
|
||||
return
|
||||
|
||||
# -- del -----------------------------------------------------------------
|
||||
if sub == "del":
|
||||
if len(parts) < 3:
|
||||
await bot.reply(message, "Usage: !cron del <id>")
|
||||
return
|
||||
cron_id = parts[2].lstrip("#")
|
||||
# Find matching key across all channels
|
||||
found_key = None
|
||||
for key in bot.state.keys("cron"):
|
||||
data = _load(bot, key)
|
||||
if data and data["id"] == cron_id:
|
||||
found_key = key
|
||||
break
|
||||
if not found_key:
|
||||
await bot.reply(message, f"No cron job #{cron_id}")
|
||||
return
|
||||
_stop_job(found_key)
|
||||
_delete(bot, found_key)
|
||||
await bot.reply(message, f"Removed cron #{cron_id}")
|
||||
return
|
||||
|
||||
# -- add -----------------------------------------------------------------
|
||||
if sub == "add":
|
||||
if not message.is_channel:
|
||||
await bot.reply(message, "Use this command in a channel")
|
||||
return
|
||||
if len(parts) < 5:
|
||||
await bot.reply(
|
||||
message,
|
||||
"Usage: !cron add <interval> <#channel> <command...>",
|
||||
)
|
||||
return
|
||||
|
||||
interval_spec = parts[2]
|
||||
target_channel = parts[3]
|
||||
cmd_text = parts[4]
|
||||
|
||||
if not target_channel.startswith(("#", "&")):
|
||||
await bot.reply(message, "Target must be a channel (e.g. #ops)")
|
||||
return
|
||||
|
||||
interval = _parse_duration(interval_spec)
|
||||
if interval is None:
|
||||
await bot.reply(message, "Invalid interval (use: 5m, 1h, 2d)")
|
||||
return
|
||||
if interval < _MIN_INTERVAL:
|
||||
await bot.reply(
|
||||
message,
|
||||
f"Minimum interval is {_format_duration(_MIN_INTERVAL)}",
|
||||
)
|
||||
return
|
||||
if interval > _MAX_INTERVAL:
|
||||
await bot.reply(
|
||||
message,
|
||||
f"Maximum interval is {_format_duration(_MAX_INTERVAL)}",
|
||||
)
|
||||
return
|
||||
|
||||
# Check per-channel limit
|
||||
ch_prefix = f"{target_channel}:"
|
||||
count = sum(
|
||||
1 for k in bot.state.keys("cron") if k.startswith(ch_prefix)
|
||||
)
|
||||
if count >= _MAX_JOBS:
|
||||
await bot.reply(message, f"Job limit reached ({_MAX_JOBS})")
|
||||
return
|
||||
|
||||
cron_id = _make_id(target_channel, cmd_text)
|
||||
key = _state_key(target_channel, cron_id)
|
||||
|
||||
data = {
|
||||
"id": cron_id,
|
||||
"channel": target_channel,
|
||||
"command": cmd_text,
|
||||
"interval": interval,
|
||||
"prefix": message.prefix,
|
||||
"nick": message.nick,
|
||||
"added_by": message.nick,
|
||||
}
|
||||
_save(bot, key, data)
|
||||
_jobs[key] = data
|
||||
_start_job(bot, key)
|
||||
|
||||
fmt_interval = _format_duration(interval)
|
||||
await bot.reply(
|
||||
message,
|
||||
f"Cron #{cron_id}: {cmd_text} every {fmt_interval} in {target_channel}",
|
||||
)
|
||||
return
|
||||
|
||||
await bot.reply(message, "Usage: !cron <add|del|list> [args]")
|
||||
@@ -275,6 +275,8 @@ async def _poll_once(bot, key: str, announce: bool = True) -> None:
|
||||
title = item.get("title") or "(untitled)"
|
||||
snippet = item.get("snippet", "")
|
||||
url = item.get("url", "")
|
||||
if url:
|
||||
url = await bot.shorten_url(url)
|
||||
parts = [f"[{tag}] {title}"]
|
||||
if snippet:
|
||||
parts.append(snippet)
|
||||
|
||||
@@ -272,6 +272,8 @@ async def _poll_once(bot, key: str, announce: bool = True) -> None:
|
||||
for item in shown:
|
||||
title = _truncate(item["title"]) if item["title"] else "(no title)"
|
||||
link = item["link"]
|
||||
if link:
|
||||
link = await bot.shorten_url(link)
|
||||
date = item.get("date", "")
|
||||
line = f"[{name}] {title}"
|
||||
if date:
|
||||
|
||||
196
plugins/webhook.py
Normal file
196
plugins/webhook.py
Normal file
@@ -0,0 +1,196 @@
|
||||
"""Webhook listener: receive HTTP POST requests and relay messages to IRC."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
import hashlib
|
||||
import hmac
|
||||
import json
|
||||
import logging
|
||||
import time
|
||||
|
||||
from derp.plugin import command, event
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
_MAX_BODY = 65536 # 64 KB
|
||||
_server: asyncio.Server | None = None
|
||||
_request_count: int = 0
|
||||
_started: float = 0.0
|
||||
|
||||
|
||||
def _verify_signature(secret: str, body: bytes, signature: str) -> bool:
|
||||
"""Verify HMAC-SHA256 signature from X-Signature header."""
|
||||
if not secret:
|
||||
return True # no secret configured = open access
|
||||
if not signature.startswith("sha256="):
|
||||
return False
|
||||
expected = hmac.new(secret.encode(), body, hashlib.sha256).hexdigest()
|
||||
return hmac.compare_digest(expected, signature[7:])
|
||||
|
||||
|
||||
def _http_response(status: int, reason: str, body: str = "") -> bytes:
|
||||
"""Build a minimal HTTP/1.1 response."""
|
||||
body_bytes = body.encode("utf-8") if body else b""
|
||||
lines = [
|
||||
f"HTTP/1.1 {status} {reason}",
|
||||
"Content-Type: text/plain; charset=utf-8",
|
||||
f"Content-Length: {len(body_bytes)}",
|
||||
"Connection: close",
|
||||
"",
|
||||
"",
|
||||
]
|
||||
return "\r\n".join(lines).encode("utf-8") + body_bytes
|
||||
|
||||
|
||||
async def _handle_request(reader: asyncio.StreamReader,
|
||||
writer: asyncio.StreamWriter,
|
||||
bot, secret: str) -> None:
|
||||
"""Parse one HTTP request and dispatch to IRC."""
|
||||
global _request_count
|
||||
|
||||
try:
|
||||
# Read request line
|
||||
request_line = await asyncio.wait_for(reader.readline(), timeout=10.0)
|
||||
if not request_line:
|
||||
return
|
||||
parts = request_line.decode("utf-8", errors="replace").strip().split()
|
||||
if len(parts) < 2:
|
||||
writer.write(_http_response(400, "Bad Request", "malformed request"))
|
||||
return
|
||||
method, _path = parts[0], parts[1]
|
||||
|
||||
# Read headers
|
||||
headers: dict[str, str] = {}
|
||||
while True:
|
||||
line = await asyncio.wait_for(reader.readline(), timeout=10.0)
|
||||
if not line or line == b"\r\n" or line == b"\n":
|
||||
break
|
||||
decoded = line.decode("utf-8", errors="replace").strip()
|
||||
if ":" in decoded:
|
||||
key, val = decoded.split(":", 1)
|
||||
headers[key.strip().lower()] = val.strip()
|
||||
|
||||
# Method check
|
||||
if method != "POST":
|
||||
writer.write(_http_response(405, "Method Not Allowed", "POST only"))
|
||||
return
|
||||
|
||||
# Read body
|
||||
content_length = int(headers.get("content-length", "0"))
|
||||
if content_length > _MAX_BODY:
|
||||
writer.write(_http_response(413, "Payload Too Large",
|
||||
f"max {_MAX_BODY} bytes"))
|
||||
return
|
||||
body = await asyncio.wait_for(reader.readexactly(content_length),
|
||||
timeout=10.0)
|
||||
|
||||
# Verify HMAC signature
|
||||
signature = headers.get("x-signature", "")
|
||||
if not _verify_signature(secret, body, signature):
|
||||
writer.write(_http_response(401, "Unauthorized", "bad signature"))
|
||||
return
|
||||
|
||||
# Parse JSON
|
||||
try:
|
||||
data = json.loads(body)
|
||||
except (json.JSONDecodeError, UnicodeDecodeError):
|
||||
writer.write(_http_response(400, "Bad Request", "invalid JSON"))
|
||||
return
|
||||
|
||||
# Validate fields
|
||||
channel = data.get("channel", "")
|
||||
text = data.get("text", "")
|
||||
is_action = data.get("action", False)
|
||||
|
||||
if not isinstance(channel, str) or not channel.startswith(("#", "&")):
|
||||
writer.write(_http_response(400, "Bad Request", "invalid channel"))
|
||||
return
|
||||
if not isinstance(text, str) or not text.strip():
|
||||
writer.write(_http_response(400, "Bad Request", "empty text"))
|
||||
return
|
||||
|
||||
# Send to IRC
|
||||
text = text.strip()
|
||||
if is_action:
|
||||
await bot.action(channel, text)
|
||||
else:
|
||||
await bot.send(channel, text)
|
||||
|
||||
_request_count += 1
|
||||
writer.write(_http_response(200, "OK", "sent"))
|
||||
log.info("webhook: relayed to %s (%d bytes)", channel, len(text))
|
||||
|
||||
except (asyncio.TimeoutError, asyncio.IncompleteReadError, ConnectionError):
|
||||
log.debug("webhook: client disconnected")
|
||||
except Exception:
|
||||
log.exception("webhook: error handling request")
|
||||
try:
|
||||
writer.write(_http_response(500, "Internal Server Error"))
|
||||
except Exception:
|
||||
pass
|
||||
finally:
|
||||
try:
|
||||
writer.close()
|
||||
await writer.wait_closed()
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
|
||||
@event("001")
|
||||
async def on_connect(bot, message):
|
||||
"""Start the webhook HTTP server on connect (if enabled)."""
|
||||
global _server, _started, _request_count
|
||||
|
||||
if _server is not None:
|
||||
return # already running
|
||||
|
||||
cfg = bot.config.get("webhook", {})
|
||||
if not cfg.get("enabled"):
|
||||
return
|
||||
|
||||
host = cfg.get("host", "127.0.0.1")
|
||||
port = cfg.get("port", 8080)
|
||||
secret = cfg.get("secret", "")
|
||||
|
||||
async def handler(reader, writer):
|
||||
await _handle_request(reader, writer, bot, secret)
|
||||
|
||||
try:
|
||||
_server = await asyncio.start_server(handler, host, port)
|
||||
_started = time.monotonic()
|
||||
_request_count = 0
|
||||
log.info("webhook: listening on %s:%d", host, port)
|
||||
except OSError as exc:
|
||||
log.error("webhook: failed to bind %s:%d: %s", host, port, exc)
|
||||
|
||||
|
||||
@command("webhook", help="Show webhook listener status", admin=True)
|
||||
async def cmd_webhook(bot, message):
|
||||
"""Display webhook server status."""
|
||||
if _server is None:
|
||||
await bot.reply(message, "Webhook: not running")
|
||||
return
|
||||
|
||||
socks = _server.sockets
|
||||
if socks:
|
||||
addr = socks[0].getsockname()
|
||||
address = f"{addr[0]}:{addr[1]}"
|
||||
else:
|
||||
address = "unknown"
|
||||
|
||||
elapsed = int(time.monotonic() - _started)
|
||||
hours, rem = divmod(elapsed, 3600)
|
||||
minutes, secs = divmod(rem, 60)
|
||||
parts = []
|
||||
if hours:
|
||||
parts.append(f"{hours}h")
|
||||
if minutes:
|
||||
parts.append(f"{minutes}m")
|
||||
parts.append(f"{secs}s")
|
||||
uptime = " ".join(parts)
|
||||
|
||||
await bot.reply(
|
||||
message,
|
||||
f"Webhook: {address} | {_request_count} requests | up {uptime}",
|
||||
)
|
||||
@@ -394,6 +394,8 @@ async def _poll_once(bot, key: str, announce: bool = True) -> None:
|
||||
for item in shown:
|
||||
title = _truncate(item["title"]) if item["title"] else "(no title)"
|
||||
link = item["link"]
|
||||
if link:
|
||||
link = await bot.shorten_url(link)
|
||||
# Build metadata suffix
|
||||
parts = []
|
||||
dur = durations.get(item["id"], 0)
|
||||
|
||||
@@ -13,7 +13,7 @@ from pathlib import Path
|
||||
|
||||
from derp import __version__
|
||||
from derp.irc import _MAX_IRC_LINE, IRCConnection, Message, format_msg, parse
|
||||
from derp.plugin import Handler, PluginRegistry
|
||||
from derp.plugin import TIERS, Handler, PluginRegistry
|
||||
from derp.state import StateStore
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
@@ -93,6 +93,8 @@ class Bot:
|
||||
self._tasks: set[asyncio.Task] = set()
|
||||
self._reconnect_delay: float = 5.0
|
||||
self._admins: list[str] = config.get("bot", {}).get("admins", [])
|
||||
self._operators: list[str] = config.get("bot", {}).get("operators", [])
|
||||
self._trusted: list[str] = config.get("bot", {}).get("trusted", [])
|
||||
self._opers: set[str] = set() # hostmasks of known IRC operators
|
||||
self._caps: set[str] = set() # negotiated IRCv3 caps
|
||||
self._who_pending: dict[str, asyncio.Task] = {} # debounced WHO per channel
|
||||
@@ -359,20 +361,33 @@ class Bot:
|
||||
return True
|
||||
return plugin_name in allowed
|
||||
|
||||
def _get_tier(self, msg: Message) -> str:
|
||||
"""Determine the permission tier of the message sender.
|
||||
|
||||
Checks (in priority order): IRC operator, admin pattern,
|
||||
operator pattern, trusted pattern. Falls back to ``"user"``.
|
||||
"""
|
||||
if not msg.prefix:
|
||||
return "user"
|
||||
if msg.prefix in self._opers:
|
||||
return "admin"
|
||||
for pattern in self._admins:
|
||||
if fnmatch.fnmatch(msg.prefix, pattern):
|
||||
return "admin"
|
||||
for pattern in self._operators:
|
||||
if fnmatch.fnmatch(msg.prefix, pattern):
|
||||
return "oper"
|
||||
for pattern in self._trusted:
|
||||
if fnmatch.fnmatch(msg.prefix, pattern):
|
||||
return "trusted"
|
||||
return "user"
|
||||
|
||||
def _is_admin(self, msg: Message) -> bool:
|
||||
"""Check if the message sender is a bot admin.
|
||||
|
||||
Returns True if the sender is a known IRC operator or matches
|
||||
a configured hostmask pattern (fnmatch-style).
|
||||
Thin wrapper around ``_get_tier`` for backward compatibility.
|
||||
"""
|
||||
if not msg.prefix:
|
||||
return False
|
||||
if msg.prefix in self._opers:
|
||||
return True
|
||||
for pattern in self._admins:
|
||||
if fnmatch.fnmatch(msg.prefix, pattern):
|
||||
return True
|
||||
return False
|
||||
return self._get_tier(msg) == "admin"
|
||||
|
||||
def _dispatch_command(self, msg: Message) -> None:
|
||||
"""Check if a PRIVMSG is a bot command and spawn it."""
|
||||
@@ -396,10 +411,13 @@ class Bot:
|
||||
if not self._plugin_allowed(handler.plugin, channel):
|
||||
return
|
||||
|
||||
if handler.admin and not self._is_admin(msg):
|
||||
deny = f"Permission denied: {self.prefix}{cmd_name} requires admin"
|
||||
self._spawn(self.reply(msg, deny), name=f"cmd:{cmd_name}:denied")
|
||||
return
|
||||
required = handler.tier
|
||||
if required != "user":
|
||||
sender = self._get_tier(msg)
|
||||
if TIERS.index(sender) < TIERS.index(required):
|
||||
deny = f"Permission denied: {self.prefix}{cmd_name} requires {required}"
|
||||
self._spawn(self.reply(msg, deny), name=f"cmd:{cmd_name}:denied")
|
||||
return
|
||||
|
||||
self._spawn(self._run_command(handler, cmd_name, msg), name=f"cmd:{cmd_name}")
|
||||
|
||||
@@ -510,6 +528,17 @@ class Bot:
|
||||
"""Send a CTCP ACTION (/me) to a target."""
|
||||
await self.send(target, f"\x01ACTION {text}\x01")
|
||||
|
||||
async def shorten_url(self, url: str) -> str:
|
||||
"""Shorten a URL via FlaskPaste. Returns original on failure."""
|
||||
fp = self.registry._modules.get("flaskpaste")
|
||||
if not fp:
|
||||
return url
|
||||
loop = asyncio.get_running_loop()
|
||||
try:
|
||||
return await loop.run_in_executor(None, fp.shorten_url, self, url)
|
||||
except Exception:
|
||||
return url
|
||||
|
||||
async def join(self, channel: str) -> None:
|
||||
"""Join an IRC channel."""
|
||||
await self.conn.send(format_msg("JOIN", channel))
|
||||
|
||||
@@ -29,8 +29,16 @@ DEFAULTS: dict = {
|
||||
"rate_burst": 5,
|
||||
"paste_threshold": 4,
|
||||
"admins": [],
|
||||
"operators": [],
|
||||
"trusted": [],
|
||||
},
|
||||
"channels": {},
|
||||
"webhook": {
|
||||
"enabled": False,
|
||||
"host": "127.0.0.1",
|
||||
"port": 8080,
|
||||
"secret": "",
|
||||
},
|
||||
"logging": {
|
||||
"level": "info",
|
||||
"format": "text",
|
||||
|
||||
@@ -12,6 +12,8 @@ from typing import Any, Callable
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
TIERS: tuple[str, ...] = ("user", "trusted", "oper", "admin")
|
||||
|
||||
|
||||
@dataclass(slots=True)
|
||||
class Handler:
|
||||
@@ -22,9 +24,10 @@ class Handler:
|
||||
help: str = ""
|
||||
plugin: str = ""
|
||||
admin: bool = False
|
||||
tier: str = "user"
|
||||
|
||||
|
||||
def command(name: str, help: str = "", admin: bool = False) -> Callable:
|
||||
def command(name: str, help: str = "", admin: bool = False, tier: str = "") -> Callable:
|
||||
"""Decorator to register an async function as a bot command.
|
||||
|
||||
Usage::
|
||||
@@ -36,12 +39,17 @@ def command(name: str, help: str = "", admin: bool = False) -> Callable:
|
||||
@command("reload", help="Reload a plugin", admin=True)
|
||||
async def cmd_reload(bot, message):
|
||||
...
|
||||
|
||||
@command("trusted_cmd", help="Trusted-only", tier="trusted")
|
||||
async def cmd_trusted(bot, message):
|
||||
...
|
||||
"""
|
||||
|
||||
def decorator(func: Callable) -> Callable:
|
||||
func._derp_command = name # type: ignore[attr-defined]
|
||||
func._derp_help = help # type: ignore[attr-defined]
|
||||
func._derp_admin = admin # type: ignore[attr-defined]
|
||||
func._derp_tier = tier if tier else ("admin" if admin else "user") # type: ignore[attr-defined]
|
||||
return func
|
||||
|
||||
return decorator
|
||||
@@ -74,12 +82,14 @@ class PluginRegistry:
|
||||
self._paths: dict[str, Path] = {}
|
||||
|
||||
def register_command(self, name: str, callback: Callable, help: str = "",
|
||||
plugin: str = "", admin: bool = False) -> None:
|
||||
plugin: str = "", admin: bool = False,
|
||||
tier: str = "user") -> None:
|
||||
"""Register a command handler."""
|
||||
if name in self.commands:
|
||||
log.warning("command '%s' already registered, overwriting", name)
|
||||
self.commands[name] = Handler(
|
||||
name=name, callback=callback, help=help, plugin=plugin, admin=admin,
|
||||
name=name, callback=callback, help=help, plugin=plugin,
|
||||
admin=admin, tier=tier,
|
||||
)
|
||||
log.debug("registered command: %s (%s)", name, plugin)
|
||||
|
||||
@@ -102,6 +112,7 @@ class PluginRegistry:
|
||||
help=getattr(obj, "_derp_help", ""),
|
||||
plugin=plugin_name,
|
||||
admin=getattr(obj, "_derp_admin", False),
|
||||
tier=getattr(obj, "_derp_tier", "user"),
|
||||
)
|
||||
count += 1
|
||||
if hasattr(obj, "_derp_event"):
|
||||
|
||||
409
tests/test_acl.py
Normal file
409
tests/test_acl.py
Normal file
@@ -0,0 +1,409 @@
|
||||
"""Tests for the granular ACL tier system."""
|
||||
|
||||
import asyncio
|
||||
from pathlib import Path
|
||||
|
||||
from derp.bot import Bot
|
||||
from derp.config import DEFAULTS
|
||||
from derp.irc import Message, parse
|
||||
from derp.plugin import TIERS, PluginRegistry, command
|
||||
|
||||
# -- Helpers -----------------------------------------------------------------
|
||||
|
||||
|
||||
class _MockConnection:
|
||||
"""Minimal mock IRC connection for dispatch tests."""
|
||||
|
||||
def __init__(self) -> None:
|
||||
self._queue: asyncio.Queue[str | None] = asyncio.Queue()
|
||||
self.sent: list[str] = []
|
||||
|
||||
async def connect(self) -> None:
|
||||
pass
|
||||
|
||||
async def close(self) -> None:
|
||||
pass
|
||||
|
||||
async def send(self, line: str) -> None:
|
||||
self.sent.append(line)
|
||||
|
||||
async def readline(self) -> str | None:
|
||||
return await self._queue.get()
|
||||
|
||||
def inject(self, line: str) -> None:
|
||||
self._queue.put_nowait(line)
|
||||
|
||||
def disconnect(self) -> None:
|
||||
self._queue.put_nowait(None)
|
||||
|
||||
|
||||
class _Harness:
|
||||
"""Test fixture: Bot with mock connection and configurable tiers."""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
admins: list[str] | None = None,
|
||||
operators: list[str] | None = None,
|
||||
trusted: list[str] | None = None,
|
||||
) -> None:
|
||||
config: dict = {
|
||||
"server": {
|
||||
"host": "localhost", "port": 6667, "tls": False,
|
||||
"nick": "test", "user": "test", "realname": "test bot",
|
||||
},
|
||||
"bot": {
|
||||
"prefix": "!",
|
||||
"channels": [],
|
||||
"plugins_dir": "plugins",
|
||||
"admins": admins or [],
|
||||
"operators": operators or [],
|
||||
"trusted": trusted or [],
|
||||
"rate_limit": 100.0,
|
||||
"rate_burst": 100,
|
||||
},
|
||||
}
|
||||
self.registry = PluginRegistry()
|
||||
self.bot = Bot(config, self.registry)
|
||||
self.conn = _MockConnection()
|
||||
self.bot.conn = self.conn # type: ignore[assignment]
|
||||
self.registry.load_plugin(Path("plugins/core.py"))
|
||||
|
||||
def inject_registration(self) -> None:
|
||||
self.conn.inject(":server CAP * LS :")
|
||||
self.conn.inject(":server 001 test :Welcome")
|
||||
|
||||
def privmsg(self, nick: str, target: str, text: str,
|
||||
user: str = "user", host: str = "host") -> None:
|
||||
self.conn.inject(f":{nick}!{user}@{host} PRIVMSG {target} :{text}")
|
||||
|
||||
async def run(self) -> None:
|
||||
self.conn.disconnect()
|
||||
self.bot._running = True
|
||||
await self.bot._connect_and_run()
|
||||
if self.bot._tasks:
|
||||
await asyncio.gather(*list(self.bot._tasks), return_exceptions=True)
|
||||
|
||||
def sent_privmsgs(self, target: str) -> list[str]:
|
||||
results = []
|
||||
for line in self.conn.sent:
|
||||
msg = parse(line)
|
||||
if msg.command == "PRIVMSG" and msg.target == target:
|
||||
results.append(msg.text)
|
||||
return results
|
||||
|
||||
|
||||
def _msg(text: str, prefix: str = "nick!user@host") -> Message:
|
||||
nick = prefix.split("!")[0] if "!" in prefix else prefix
|
||||
return Message(
|
||||
raw="", prefix=prefix, nick=nick,
|
||||
command="PRIVMSG", params=["#test", text], tags={},
|
||||
)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# TestTierConstants
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestTierConstants:
|
||||
def test_tier_order(self):
|
||||
assert TIERS == ("user", "trusted", "oper", "admin")
|
||||
|
||||
def test_index_comparison(self):
|
||||
assert TIERS.index("user") < TIERS.index("trusted")
|
||||
assert TIERS.index("trusted") < TIERS.index("oper")
|
||||
assert TIERS.index("oper") < TIERS.index("admin")
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# TestGetTier
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestGetTier:
|
||||
def test_no_prefix(self):
|
||||
h = _Harness()
|
||||
msg = Message(raw="", prefix="", nick="", command="PRIVMSG",
|
||||
params=["#test", "hi"], tags={})
|
||||
assert h.bot._get_tier(msg) == "user"
|
||||
|
||||
def test_ircop(self):
|
||||
h = _Harness()
|
||||
h.bot._opers.add("op!root@server")
|
||||
msg = _msg("test", prefix="op!root@server")
|
||||
assert h.bot._get_tier(msg) == "admin"
|
||||
|
||||
def test_admin_pattern(self):
|
||||
h = _Harness(admins=["*!*@admin.host"])
|
||||
msg = _msg("test", prefix="alice!user@admin.host")
|
||||
assert h.bot._get_tier(msg) == "admin"
|
||||
|
||||
def test_oper_pattern(self):
|
||||
h = _Harness(operators=["*!*@oper.host"])
|
||||
msg = _msg("test", prefix="bob!user@oper.host")
|
||||
assert h.bot._get_tier(msg) == "oper"
|
||||
|
||||
def test_trusted_pattern(self):
|
||||
h = _Harness(trusted=["*!*@trusted.host"])
|
||||
msg = _msg("test", prefix="carol!user@trusted.host")
|
||||
assert h.bot._get_tier(msg) == "trusted"
|
||||
|
||||
def test_no_match(self):
|
||||
h = _Harness(admins=["*!*@admin.host"])
|
||||
msg = _msg("test", prefix="nobody!user@random.host")
|
||||
assert h.bot._get_tier(msg) == "user"
|
||||
|
||||
def test_priority_admin_over_oper(self):
|
||||
"""Admin patterns take priority over operator patterns."""
|
||||
h = _Harness(admins=["*!*@dual.host"], operators=["*!*@dual.host"])
|
||||
msg = _msg("test", prefix="x!y@dual.host")
|
||||
assert h.bot._get_tier(msg) == "admin"
|
||||
|
||||
def test_priority_oper_over_trusted(self):
|
||||
"""Operator patterns take priority over trusted patterns."""
|
||||
h = _Harness(operators=["*!*@dual.host"], trusted=["*!*@dual.host"])
|
||||
msg = _msg("test", prefix="x!y@dual.host")
|
||||
assert h.bot._get_tier(msg) == "oper"
|
||||
|
||||
def test_ircop_over_admin_pattern(self):
|
||||
"""IRC operator status takes priority over admin hostmask pattern."""
|
||||
h = _Harness(admins=["*!*@server"])
|
||||
h.bot._opers.add("op!root@server")
|
||||
msg = _msg("test", prefix="op!root@server")
|
||||
# Both match, but ircop check comes first
|
||||
assert h.bot._get_tier(msg) == "admin"
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# TestIsAdminBackcompat
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestIsAdminBackcompat:
|
||||
def test_admin_true(self):
|
||||
h = _Harness(admins=["*!*@admin.host"])
|
||||
msg = _msg("test", prefix="a!b@admin.host")
|
||||
assert h.bot._is_admin(msg) is True
|
||||
|
||||
def test_oper_false(self):
|
||||
h = _Harness(operators=["*!*@oper.host"])
|
||||
msg = _msg("test", prefix="a!b@oper.host")
|
||||
assert h.bot._is_admin(msg) is False
|
||||
|
||||
def test_trusted_false(self):
|
||||
h = _Harness(trusted=["*!*@trusted.host"])
|
||||
msg = _msg("test", prefix="a!b@trusted.host")
|
||||
assert h.bot._is_admin(msg) is False
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# TestCommandDecorator
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestCommandDecorator:
|
||||
def test_admin_true_sets_tier(self):
|
||||
@command("x", admin=True)
|
||||
async def handler(bot, msg):
|
||||
pass
|
||||
assert handler._derp_tier == "admin"
|
||||
|
||||
def test_explicit_tier(self):
|
||||
@command("x", tier="trusted")
|
||||
async def handler(bot, msg):
|
||||
pass
|
||||
assert handler._derp_tier == "trusted"
|
||||
|
||||
def test_tier_overrides_admin(self):
|
||||
@command("x", admin=True, tier="oper")
|
||||
async def handler(bot, msg):
|
||||
pass
|
||||
assert handler._derp_tier == "oper"
|
||||
|
||||
def test_default_tier(self):
|
||||
@command("x")
|
||||
async def handler(bot, msg):
|
||||
pass
|
||||
assert handler._derp_tier == "user"
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# TestDispatchWithTiers
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestDispatchWithTiers:
|
||||
def test_trusted_allowed_for_trusted_cmd(self):
|
||||
"""Trusted user can run a trusted-tier command."""
|
||||
h = _Harness(trusted=["*!user@host"])
|
||||
|
||||
@command("tcmd", tier="trusted")
|
||||
async def cmd_tcmd(bot, message):
|
||||
await bot.reply(message, "ok")
|
||||
|
||||
h.registry.register_command(
|
||||
"tcmd", cmd_tcmd, tier="trusted", plugin="test",
|
||||
)
|
||||
h.inject_registration()
|
||||
h.privmsg("nick", "#test", "!tcmd")
|
||||
asyncio.run(h.run())
|
||||
msgs = h.sent_privmsgs("#test")
|
||||
assert any("ok" in m for m in msgs)
|
||||
|
||||
def test_admin_allowed_for_trusted_cmd(self):
|
||||
"""Admin can run trusted-tier commands (higher tier)."""
|
||||
h = _Harness(admins=["*!user@host"])
|
||||
|
||||
@command("tcmd", tier="trusted")
|
||||
async def cmd_tcmd(bot, message):
|
||||
await bot.reply(message, "ok")
|
||||
|
||||
h.registry.register_command(
|
||||
"tcmd", cmd_tcmd, tier="trusted", plugin="test",
|
||||
)
|
||||
h.inject_registration()
|
||||
h.privmsg("nick", "#test", "!tcmd")
|
||||
asyncio.run(h.run())
|
||||
msgs = h.sent_privmsgs("#test")
|
||||
assert any("ok" in m for m in msgs)
|
||||
|
||||
def test_user_denied_for_trusted_cmd(self):
|
||||
"""Regular user is denied a trusted-tier command."""
|
||||
h = _Harness()
|
||||
|
||||
@command("tcmd", tier="trusted")
|
||||
async def cmd_tcmd(bot, message):
|
||||
await bot.reply(message, "ok")
|
||||
|
||||
h.registry.register_command(
|
||||
"tcmd", cmd_tcmd, tier="trusted", plugin="test",
|
||||
)
|
||||
h.inject_registration()
|
||||
h.privmsg("nick", "#test", "!tcmd")
|
||||
asyncio.run(h.run())
|
||||
msgs = h.sent_privmsgs("#test")
|
||||
assert any("Permission denied" in m for m in msgs)
|
||||
assert any("requires trusted" in m for m in msgs)
|
||||
|
||||
def test_backward_compat_admin_flag(self):
|
||||
"""admin=True commands still work via tier='admin'."""
|
||||
h = _Harness(admins=["*!user@host"])
|
||||
h.inject_registration()
|
||||
# cmd_admins is already registered via core plugin with admin=True
|
||||
h.privmsg("nick", "#test", "!admins")
|
||||
asyncio.run(h.run())
|
||||
msgs = h.sent_privmsgs("#test")
|
||||
assert any("Admin:" in m for m in msgs)
|
||||
|
||||
def test_denial_message_shows_tier(self):
|
||||
"""Permission denial message includes the required tier name."""
|
||||
h = _Harness()
|
||||
|
||||
@command("opcmd", tier="oper")
|
||||
async def cmd_opcmd(bot, message):
|
||||
await bot.reply(message, "ok")
|
||||
|
||||
h.registry.register_command(
|
||||
"opcmd", cmd_opcmd, tier="oper", plugin="test",
|
||||
)
|
||||
h.inject_registration()
|
||||
h.privmsg("nick", "#test", "!opcmd")
|
||||
asyncio.run(h.run())
|
||||
msgs = h.sent_privmsgs("#test")
|
||||
assert any("requires oper" in m for m in msgs)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# TestConfigDefaults
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestConfigDefaults:
|
||||
def test_operators_in_defaults(self):
|
||||
assert "operators" in DEFAULTS["bot"]
|
||||
assert DEFAULTS["bot"]["operators"] == []
|
||||
|
||||
def test_trusted_in_defaults(self):
|
||||
assert "trusted" in DEFAULTS["bot"]
|
||||
assert DEFAULTS["bot"]["trusted"] == []
|
||||
|
||||
def test_webhook_in_defaults(self):
|
||||
assert "webhook" in DEFAULTS
|
||||
assert DEFAULTS["webhook"]["enabled"] is False
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# TestWhoami
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestWhoami:
|
||||
def test_shows_admin(self):
|
||||
h = _Harness(admins=["*!user@host"])
|
||||
h.inject_registration()
|
||||
h.privmsg("nick", "#test", "!whoami")
|
||||
asyncio.run(h.run())
|
||||
msgs = h.sent_privmsgs("#test")
|
||||
assert any("admin" in m for m in msgs)
|
||||
|
||||
def test_shows_trusted(self):
|
||||
h = _Harness(trusted=["*!user@host"])
|
||||
h.inject_registration()
|
||||
h.privmsg("nick", "#test", "!whoami")
|
||||
asyncio.run(h.run())
|
||||
msgs = h.sent_privmsgs("#test")
|
||||
assert any("trusted" in m for m in msgs)
|
||||
|
||||
def test_shows_user(self):
|
||||
h = _Harness()
|
||||
h.inject_registration()
|
||||
h.privmsg("nick", "#test", "!whoami")
|
||||
asyncio.run(h.run())
|
||||
msgs = h.sent_privmsgs("#test")
|
||||
assert any("user" in m for m in msgs)
|
||||
|
||||
def test_shows_ircop_tag(self):
|
||||
h = _Harness()
|
||||
h.bot._opers.add("nick!user@host")
|
||||
h.inject_registration()
|
||||
h.privmsg("nick", "#test", "!whoami")
|
||||
asyncio.run(h.run())
|
||||
msgs = h.sent_privmsgs("#test")
|
||||
assert any("IRCOP" in m for m in msgs)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# TestAdmins
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestAdmins:
|
||||
def test_shows_all_tiers(self):
|
||||
h = _Harness(
|
||||
admins=["*!*@admin.host"],
|
||||
operators=["*!*@oper.host"],
|
||||
trusted=["*!*@trusted.host"],
|
||||
)
|
||||
h.bot._opers.add("nick!user@host")
|
||||
h.inject_registration()
|
||||
# Must be admin to run !admins
|
||||
h.privmsg("nick", "#test", "!admins", user="x", host="admin.host")
|
||||
asyncio.run(h.run())
|
||||
msgs = h.sent_privmsgs("#test")
|
||||
combined = " ".join(msgs)
|
||||
assert "Admin:" in combined
|
||||
assert "Oper:" in combined
|
||||
assert "Trusted:" in combined
|
||||
assert "IRCOPs:" in combined
|
||||
|
||||
def test_omits_empty_tiers(self):
|
||||
h = _Harness(admins=["*!user@host"])
|
||||
h.inject_registration()
|
||||
h.privmsg("nick", "#test", "!admins")
|
||||
asyncio.run(h.run())
|
||||
msgs = h.sent_privmsgs("#test")
|
||||
combined = " ".join(msgs)
|
||||
assert "Admin:" in combined
|
||||
# No operators or trusted configured, so those sections shouldn't appear
|
||||
assert "Oper:" not in combined
|
||||
assert "Trusted:" not in combined
|
||||
639
tests/test_cron.py
Normal file
639
tests/test_cron.py
Normal file
@@ -0,0 +1,639 @@
|
||||
"""Tests for the cron plugin."""
|
||||
|
||||
import asyncio
|
||||
import importlib.util
|
||||
import json
|
||||
import sys
|
||||
from pathlib import Path
|
||||
|
||||
from derp.irc import Message
|
||||
|
||||
# plugins/ is not a Python package -- load the module from file path
|
||||
_spec = importlib.util.spec_from_file_location(
|
||||
"plugins.cron", Path(__file__).resolve().parent.parent / "plugins" / "cron.py",
|
||||
)
|
||||
_mod = importlib.util.module_from_spec(_spec)
|
||||
sys.modules[_spec.name] = _mod
|
||||
_spec.loader.exec_module(_mod)
|
||||
|
||||
from plugins.cron import ( # noqa: E402
|
||||
_MAX_JOBS,
|
||||
_delete,
|
||||
_format_duration,
|
||||
_jobs,
|
||||
_load,
|
||||
_make_id,
|
||||
_parse_duration,
|
||||
_restore,
|
||||
_save,
|
||||
_start_job,
|
||||
_state_key,
|
||||
_stop_job,
|
||||
_tasks,
|
||||
cmd_cron,
|
||||
on_connect,
|
||||
)
|
||||
|
||||
# -- Helpers -----------------------------------------------------------------
|
||||
|
||||
class _FakeState:
|
||||
"""In-memory stand-in for bot.state."""
|
||||
|
||||
def __init__(self):
|
||||
self._store: dict[str, dict[str, str]] = {}
|
||||
|
||||
def get(self, plugin: str, key: str, default: str | None = None) -> str | None:
|
||||
return self._store.get(plugin, {}).get(key, default)
|
||||
|
||||
def set(self, plugin: str, key: str, value: str) -> None:
|
||||
self._store.setdefault(plugin, {})[key] = value
|
||||
|
||||
def delete(self, plugin: str, key: str) -> bool:
|
||||
try:
|
||||
del self._store[plugin][key]
|
||||
return True
|
||||
except KeyError:
|
||||
return False
|
||||
|
||||
def keys(self, plugin: str) -> list[str]:
|
||||
return sorted(self._store.get(plugin, {}).keys())
|
||||
|
||||
|
||||
class _FakeBot:
|
||||
"""Minimal bot stand-in that captures sent/replied messages."""
|
||||
|
||||
def __init__(self, *, admin: bool = True):
|
||||
self.sent: list[tuple[str, str]] = []
|
||||
self.replied: list[str] = []
|
||||
self.dispatched: list[Message] = []
|
||||
self.state = _FakeState()
|
||||
self._admin = admin
|
||||
self.prefix = "!"
|
||||
|
||||
async def send(self, target: str, text: str) -> None:
|
||||
self.sent.append((target, text))
|
||||
|
||||
async def reply(self, message, text: str) -> None:
|
||||
self.replied.append(text)
|
||||
|
||||
def _is_admin(self, message) -> bool:
|
||||
return self._admin
|
||||
|
||||
def _dispatch_command(self, msg: Message) -> None:
|
||||
self.dispatched.append(msg)
|
||||
|
||||
|
||||
def _msg(text: str, nick: str = "admin", target: str = "#test") -> Message:
|
||||
"""Create a channel PRIVMSG."""
|
||||
return Message(
|
||||
raw="", prefix=f"{nick}!~{nick}@host", nick=nick,
|
||||
command="PRIVMSG", params=[target, text], tags={},
|
||||
)
|
||||
|
||||
|
||||
def _pm(text: str, nick: str = "admin") -> Message:
|
||||
"""Create a private PRIVMSG."""
|
||||
return Message(
|
||||
raw="", prefix=f"{nick}!~{nick}@host", nick=nick,
|
||||
command="PRIVMSG", params=["botname", text], tags={},
|
||||
)
|
||||
|
||||
|
||||
def _clear() -> None:
|
||||
"""Reset module-level state between tests."""
|
||||
for task in _tasks.values():
|
||||
if task and not task.done():
|
||||
task.cancel()
|
||||
_tasks.clear()
|
||||
_jobs.clear()
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# TestParseDuration
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestParseDuration:
|
||||
def test_minutes(self):
|
||||
assert _parse_duration("5m") == 300
|
||||
|
||||
def test_hours(self):
|
||||
assert _parse_duration("1h") == 3600
|
||||
|
||||
def test_combined(self):
|
||||
assert _parse_duration("1h30m") == 5400
|
||||
|
||||
def test_days(self):
|
||||
assert _parse_duration("2d") == 172800
|
||||
|
||||
def test_seconds(self):
|
||||
assert _parse_duration("90s") == 90
|
||||
|
||||
def test_raw_int(self):
|
||||
assert _parse_duration("300") == 300
|
||||
|
||||
def test_zero(self):
|
||||
assert _parse_duration("0") is None
|
||||
|
||||
def test_negative(self):
|
||||
assert _parse_duration("-5") is None
|
||||
|
||||
def test_invalid(self):
|
||||
assert _parse_duration("abc") is None
|
||||
|
||||
def test_empty(self):
|
||||
assert _parse_duration("") is None
|
||||
|
||||
def test_full_combo(self):
|
||||
assert _parse_duration("1d2h3m4s") == 86400 + 7200 + 180 + 4
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# TestFormatDuration
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestFormatDuration:
|
||||
def test_seconds(self):
|
||||
assert _format_duration(45) == "45s"
|
||||
|
||||
def test_minutes(self):
|
||||
assert _format_duration(300) == "5m"
|
||||
|
||||
def test_hours(self):
|
||||
assert _format_duration(3600) == "1h"
|
||||
|
||||
def test_combined(self):
|
||||
assert _format_duration(5400) == "1h30m"
|
||||
|
||||
def test_days(self):
|
||||
assert _format_duration(86400) == "1d"
|
||||
|
||||
def test_zero(self):
|
||||
assert _format_duration(0) == "0s"
|
||||
|
||||
def test_full_combo(self):
|
||||
assert _format_duration(90061) == "1d1h1m1s"
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# TestMakeId
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestMakeId:
|
||||
def test_returns_hex(self):
|
||||
result = _make_id("#test", "!ping")
|
||||
assert len(result) == 6
|
||||
int(result, 16) # should not raise
|
||||
|
||||
def test_unique(self):
|
||||
ids = {_make_id("#test", f"!cmd{i}") for i in range(10)}
|
||||
assert len(ids) == 10
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# TestStateHelpers
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestStateHelpers:
|
||||
def test_save_and_load(self):
|
||||
bot = _FakeBot()
|
||||
data = {"id": "abc123", "channel": "#test"}
|
||||
_save(bot, "#test:abc123", data)
|
||||
loaded = _load(bot, "#test:abc123")
|
||||
assert loaded == data
|
||||
|
||||
def test_load_missing(self):
|
||||
bot = _FakeBot()
|
||||
assert _load(bot, "nonexistent") is None
|
||||
|
||||
def test_delete(self):
|
||||
bot = _FakeBot()
|
||||
_save(bot, "#test:del", {"id": "del"})
|
||||
_delete(bot, "#test:del")
|
||||
assert _load(bot, "#test:del") is None
|
||||
|
||||
def test_state_key(self):
|
||||
assert _state_key("#ops", "abc123") == "#ops:abc123"
|
||||
|
||||
def test_load_invalid_json(self):
|
||||
bot = _FakeBot()
|
||||
bot.state.set("cron", "bad", "not json{{{")
|
||||
assert _load(bot, "bad") is None
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# TestCmdCronAdd
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestCmdCronAdd:
|
||||
def test_add_success(self):
|
||||
_clear()
|
||||
bot = _FakeBot()
|
||||
|
||||
async def inner():
|
||||
await cmd_cron(bot, _msg("!cron add 5m #ops !rss check news"))
|
||||
await asyncio.sleep(0)
|
||||
assert len(bot.replied) == 1
|
||||
assert "Cron #" in bot.replied[0]
|
||||
assert "!rss check news" in bot.replied[0]
|
||||
assert "5m" in bot.replied[0]
|
||||
assert "#ops" in bot.replied[0]
|
||||
# Verify state persisted
|
||||
keys = bot.state.keys("cron")
|
||||
assert len(keys) == 1
|
||||
data = json.loads(bot.state.get("cron", keys[0]))
|
||||
assert data["command"] == "!rss check news"
|
||||
assert data["interval"] == 300
|
||||
assert data["channel"] == "#ops"
|
||||
# Verify task started
|
||||
assert len(_tasks) == 1
|
||||
_clear()
|
||||
await asyncio.sleep(0)
|
||||
|
||||
asyncio.run(inner())
|
||||
|
||||
def test_add_requires_channel(self):
|
||||
_clear()
|
||||
bot = _FakeBot()
|
||||
asyncio.run(cmd_cron(bot, _pm("!cron add 5m #ops !ping")))
|
||||
assert "Use this command in a channel" in bot.replied[0]
|
||||
|
||||
def test_add_missing_args(self):
|
||||
_clear()
|
||||
bot = _FakeBot()
|
||||
asyncio.run(cmd_cron(bot, _msg("!cron add 5m")))
|
||||
assert "Usage:" in bot.replied[0]
|
||||
|
||||
def test_add_invalid_interval(self):
|
||||
_clear()
|
||||
bot = _FakeBot()
|
||||
asyncio.run(cmd_cron(bot, _msg("!cron add abc #ops !ping")))
|
||||
assert "Invalid interval" in bot.replied[0]
|
||||
|
||||
def test_add_interval_too_short(self):
|
||||
_clear()
|
||||
bot = _FakeBot()
|
||||
asyncio.run(cmd_cron(bot, _msg("!cron add 30s #ops !ping")))
|
||||
assert "Minimum interval" in bot.replied[0]
|
||||
|
||||
def test_add_interval_too_long(self):
|
||||
_clear()
|
||||
bot = _FakeBot()
|
||||
asyncio.run(cmd_cron(bot, _msg("!cron add 30d #ops !ping")))
|
||||
assert "Maximum interval" in bot.replied[0]
|
||||
|
||||
def test_add_bad_target(self):
|
||||
_clear()
|
||||
bot = _FakeBot()
|
||||
asyncio.run(cmd_cron(bot, _msg("!cron add 5m alice !ping")))
|
||||
assert "Target must be a channel" in bot.replied[0]
|
||||
|
||||
def test_add_job_limit(self):
|
||||
_clear()
|
||||
bot = _FakeBot()
|
||||
for i in range(_MAX_JOBS):
|
||||
_save(bot, f"#ops:job{i}", {"id": f"job{i}", "channel": "#ops"})
|
||||
|
||||
asyncio.run(cmd_cron(bot, _msg("!cron add 5m #ops !ping")))
|
||||
assert "limit reached" in bot.replied[0]
|
||||
|
||||
def test_add_admin_required(self):
|
||||
_clear()
|
||||
bot = _FakeBot(admin=False)
|
||||
asyncio.run(cmd_cron(bot, _msg("!cron add 5m #ops !ping")))
|
||||
# The @command(admin=True) decorator handles this via bot._dispatch_command,
|
||||
# but since we call cmd_cron directly, the check is at the decorator level.
|
||||
# In direct call tests, admin check is already handled by the framework.
|
||||
# This test just verifies the command runs without error for non-admin.
|
||||
# Framework-level denial is tested in integration tests.
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# TestCmdCronDel
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestCmdCronDel:
|
||||
def test_del_success(self):
|
||||
_clear()
|
||||
bot = _FakeBot()
|
||||
|
||||
async def inner():
|
||||
await cmd_cron(bot, _msg("!cron add 5m #test !ping"))
|
||||
await asyncio.sleep(0)
|
||||
# Extract ID from reply
|
||||
reply = bot.replied[0]
|
||||
cron_id = reply.split("#")[1].split(":")[0]
|
||||
bot.replied.clear()
|
||||
await cmd_cron(bot, _msg(f"!cron del {cron_id}"))
|
||||
assert "Removed" in bot.replied[0]
|
||||
assert cron_id in bot.replied[0]
|
||||
assert len(bot.state.keys("cron")) == 0
|
||||
_clear()
|
||||
await asyncio.sleep(0)
|
||||
|
||||
asyncio.run(inner())
|
||||
|
||||
def test_del_nonexistent(self):
|
||||
_clear()
|
||||
bot = _FakeBot()
|
||||
asyncio.run(cmd_cron(bot, _msg("!cron del nosuch")))
|
||||
assert "No cron job" in bot.replied[0]
|
||||
|
||||
def test_del_missing_id(self):
|
||||
_clear()
|
||||
bot = _FakeBot()
|
||||
asyncio.run(cmd_cron(bot, _msg("!cron del")))
|
||||
assert "Usage:" in bot.replied[0]
|
||||
|
||||
def test_del_with_hash_prefix(self):
|
||||
_clear()
|
||||
bot = _FakeBot()
|
||||
|
||||
async def inner():
|
||||
await cmd_cron(bot, _msg("!cron add 5m #test !ping"))
|
||||
await asyncio.sleep(0)
|
||||
reply = bot.replied[0]
|
||||
cron_id = reply.split("#")[1].split(":")[0]
|
||||
bot.replied.clear()
|
||||
await cmd_cron(bot, _msg(f"!cron del #{cron_id}"))
|
||||
assert "Removed" in bot.replied[0]
|
||||
_clear()
|
||||
await asyncio.sleep(0)
|
||||
|
||||
asyncio.run(inner())
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# TestCmdCronList
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestCmdCronList:
|
||||
def test_list_empty(self):
|
||||
_clear()
|
||||
bot = _FakeBot()
|
||||
asyncio.run(cmd_cron(bot, _msg("!cron list")))
|
||||
assert "No cron jobs" in bot.replied[0]
|
||||
|
||||
def test_list_populated(self):
|
||||
_clear()
|
||||
bot = _FakeBot()
|
||||
_save(bot, "#test:abc123", {
|
||||
"id": "abc123", "channel": "#test",
|
||||
"command": "!rss check news", "interval": 300,
|
||||
})
|
||||
asyncio.run(cmd_cron(bot, _msg("!cron list")))
|
||||
assert "#abc123" in bot.replied[0]
|
||||
assert "5m" in bot.replied[0]
|
||||
assert "!rss check news" in bot.replied[0]
|
||||
|
||||
def test_list_requires_channel(self):
|
||||
_clear()
|
||||
bot = _FakeBot()
|
||||
asyncio.run(cmd_cron(bot, _pm("!cron list")))
|
||||
assert "Use this command in a channel" in bot.replied[0]
|
||||
|
||||
def test_list_channel_isolation(self):
|
||||
_clear()
|
||||
bot = _FakeBot()
|
||||
_save(bot, "#test:mine", {
|
||||
"id": "mine", "channel": "#test",
|
||||
"command": "!ping", "interval": 300,
|
||||
})
|
||||
_save(bot, "#other:theirs", {
|
||||
"id": "theirs", "channel": "#other",
|
||||
"command": "!ping", "interval": 600,
|
||||
})
|
||||
asyncio.run(cmd_cron(bot, _msg("!cron list")))
|
||||
assert "mine" in bot.replied[0]
|
||||
assert len(bot.replied) == 1 # only the #test job
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# TestCmdCronUsage
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestCmdCronUsage:
|
||||
def test_no_args(self):
|
||||
_clear()
|
||||
bot = _FakeBot()
|
||||
asyncio.run(cmd_cron(bot, _msg("!cron")))
|
||||
assert "Usage:" in bot.replied[0]
|
||||
|
||||
def test_unknown_subcommand(self):
|
||||
_clear()
|
||||
bot = _FakeBot()
|
||||
asyncio.run(cmd_cron(bot, _msg("!cron foobar")))
|
||||
assert "Usage:" in bot.replied[0]
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# TestRestore
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestRestore:
|
||||
def test_restore_spawns_tasks(self):
|
||||
_clear()
|
||||
bot = _FakeBot()
|
||||
data = {
|
||||
"id": "abc123", "channel": "#test",
|
||||
"command": "!ping", "interval": 300,
|
||||
"prefix": "admin!~admin@host", "nick": "admin",
|
||||
"added_by": "admin",
|
||||
}
|
||||
_save(bot, "#test:abc123", data)
|
||||
|
||||
async def inner():
|
||||
_restore(bot)
|
||||
assert "#test:abc123" in _tasks
|
||||
assert not _tasks["#test:abc123"].done()
|
||||
_clear()
|
||||
await asyncio.sleep(0)
|
||||
|
||||
asyncio.run(inner())
|
||||
|
||||
def test_restore_skips_active(self):
|
||||
_clear()
|
||||
bot = _FakeBot()
|
||||
data = {
|
||||
"id": "active", "channel": "#test",
|
||||
"command": "!ping", "interval": 300,
|
||||
"prefix": "admin!~admin@host", "nick": "admin",
|
||||
"added_by": "admin",
|
||||
}
|
||||
_save(bot, "#test:active", data)
|
||||
|
||||
async def inner():
|
||||
dummy = asyncio.create_task(asyncio.sleep(9999))
|
||||
_tasks["#test:active"] = dummy
|
||||
_restore(bot)
|
||||
assert _tasks["#test:active"] is dummy
|
||||
dummy.cancel()
|
||||
await asyncio.sleep(0)
|
||||
|
||||
asyncio.run(inner())
|
||||
|
||||
def test_restore_replaces_done_task(self):
|
||||
_clear()
|
||||
bot = _FakeBot()
|
||||
data = {
|
||||
"id": "done", "channel": "#test",
|
||||
"command": "!ping", "interval": 300,
|
||||
"prefix": "admin!~admin@host", "nick": "admin",
|
||||
"added_by": "admin",
|
||||
}
|
||||
_save(bot, "#test:done", data)
|
||||
|
||||
async def inner():
|
||||
done_task = asyncio.create_task(asyncio.sleep(0))
|
||||
await done_task
|
||||
_tasks["#test:done"] = done_task
|
||||
_restore(bot)
|
||||
new_task = _tasks["#test:done"]
|
||||
assert new_task is not done_task
|
||||
assert not new_task.done()
|
||||
_clear()
|
||||
await asyncio.sleep(0)
|
||||
|
||||
asyncio.run(inner())
|
||||
|
||||
def test_restore_skips_bad_json(self):
|
||||
_clear()
|
||||
bot = _FakeBot()
|
||||
bot.state.set("cron", "#test:bad", "not json{{{")
|
||||
|
||||
async def inner():
|
||||
_restore(bot)
|
||||
assert "#test:bad" not in _tasks
|
||||
|
||||
asyncio.run(inner())
|
||||
|
||||
def test_on_connect_calls_restore(self):
|
||||
_clear()
|
||||
bot = _FakeBot()
|
||||
data = {
|
||||
"id": "conn", "channel": "#test",
|
||||
"command": "!ping", "interval": 300,
|
||||
"prefix": "admin!~admin@host", "nick": "admin",
|
||||
"added_by": "admin",
|
||||
}
|
||||
_save(bot, "#test:conn", data)
|
||||
|
||||
async def inner():
|
||||
msg = _msg("", target="botname")
|
||||
await on_connect(bot, msg)
|
||||
assert "#test:conn" in _tasks
|
||||
_clear()
|
||||
await asyncio.sleep(0)
|
||||
|
||||
asyncio.run(inner())
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# TestCronLoop
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestCronLoop:
|
||||
def test_dispatches_command(self):
|
||||
"""Cron loop dispatches a synthetic message after interval."""
|
||||
_clear()
|
||||
bot = _FakeBot()
|
||||
|
||||
async def inner():
|
||||
data = {
|
||||
"id": "loop1", "channel": "#test",
|
||||
"command": "!ping", "interval": 0.05,
|
||||
"prefix": "admin!~admin@host", "nick": "admin",
|
||||
"added_by": "admin",
|
||||
}
|
||||
key = "#test:loop1"
|
||||
_jobs[key] = data
|
||||
_start_job(bot, key)
|
||||
await asyncio.sleep(0.15)
|
||||
_stop_job(key)
|
||||
await asyncio.sleep(0)
|
||||
# Should have dispatched at least once
|
||||
assert len(bot.dispatched) >= 1
|
||||
msg = bot.dispatched[0]
|
||||
assert msg.nick == "admin"
|
||||
assert msg.params[0] == "#test"
|
||||
assert msg.params[1] == "!ping"
|
||||
|
||||
asyncio.run(inner())
|
||||
|
||||
def test_loop_stops_on_job_removal(self):
|
||||
"""Cron loop exits when job is removed from _jobs."""
|
||||
_clear()
|
||||
bot = _FakeBot()
|
||||
|
||||
async def inner():
|
||||
data = {
|
||||
"id": "loop2", "channel": "#test",
|
||||
"command": "!ping", "interval": 0.05,
|
||||
"prefix": "admin!~admin@host", "nick": "admin",
|
||||
"added_by": "admin",
|
||||
}
|
||||
key = "#test:loop2"
|
||||
_jobs[key] = data
|
||||
_start_job(bot, key)
|
||||
await asyncio.sleep(0.02)
|
||||
_jobs.pop(key, None)
|
||||
await asyncio.sleep(0.1)
|
||||
task = _tasks.get(key)
|
||||
if task:
|
||||
assert task.done()
|
||||
|
||||
asyncio.run(inner())
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# TestJobManagement
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestJobManagement:
|
||||
def test_start_and_stop(self):
|
||||
_clear()
|
||||
bot = _FakeBot()
|
||||
data = {
|
||||
"id": "mgmt", "channel": "#test",
|
||||
"command": "!ping", "interval": 300,
|
||||
"prefix": "admin!~admin@host", "nick": "admin",
|
||||
"added_by": "admin",
|
||||
}
|
||||
key = "#test:mgmt"
|
||||
_jobs[key] = data
|
||||
|
||||
async def inner():
|
||||
_start_job(bot, key)
|
||||
assert key in _tasks
|
||||
assert not _tasks[key].done()
|
||||
_stop_job(key)
|
||||
await asyncio.sleep(0)
|
||||
assert key not in _tasks
|
||||
assert key not in _jobs
|
||||
|
||||
asyncio.run(inner())
|
||||
|
||||
def test_start_idempotent(self):
|
||||
_clear()
|
||||
bot = _FakeBot()
|
||||
data = {
|
||||
"id": "idem", "channel": "#test",
|
||||
"command": "!ping", "interval": 300,
|
||||
"prefix": "admin!~admin@host", "nick": "admin",
|
||||
"added_by": "admin",
|
||||
}
|
||||
key = "#test:idem"
|
||||
_jobs[key] = data
|
||||
|
||||
async def inner():
|
||||
_start_job(bot, key)
|
||||
first = _tasks[key]
|
||||
_start_job(bot, key)
|
||||
assert _tasks[key] is first
|
||||
_stop_job(key)
|
||||
await asyncio.sleep(0)
|
||||
|
||||
asyncio.run(inner())
|
||||
|
||||
def test_stop_nonexistent(self):
|
||||
_clear()
|
||||
_stop_job("#test:nonexistent")
|
||||
212
tests/test_flaskpaste.py
Normal file
212
tests/test_flaskpaste.py
Normal file
@@ -0,0 +1,212 @@
|
||||
"""Tests for the FlaskPaste plugin and Bot.shorten_url method."""
|
||||
|
||||
import asyncio
|
||||
import importlib.util
|
||||
import sys
|
||||
from pathlib import Path
|
||||
from unittest.mock import patch
|
||||
|
||||
from derp.irc import Message
|
||||
|
||||
# plugins/ is not a Python package -- load the module from file path
|
||||
_spec = importlib.util.spec_from_file_location(
|
||||
"plugins.flaskpaste",
|
||||
Path(__file__).resolve().parent.parent / "plugins" / "flaskpaste.py",
|
||||
)
|
||||
_mod = importlib.util.module_from_spec(_spec)
|
||||
sys.modules[_spec.name] = _mod
|
||||
_spec.loader.exec_module(_mod)
|
||||
|
||||
from plugins.flaskpaste import ( # noqa: E402
|
||||
cmd_paste,
|
||||
cmd_shorten,
|
||||
create_paste,
|
||||
shorten_url,
|
||||
)
|
||||
|
||||
# -- Helpers -----------------------------------------------------------------
|
||||
|
||||
class _FakeState:
|
||||
"""In-memory stand-in for bot.state."""
|
||||
|
||||
def __init__(self):
|
||||
self._store: dict[str, dict[str, str]] = {}
|
||||
|
||||
def get(self, plugin: str, key: str, default: str | None = None) -> str | None:
|
||||
return self._store.get(plugin, {}).get(key, default)
|
||||
|
||||
def set(self, plugin: str, key: str, value: str) -> None:
|
||||
self._store.setdefault(plugin, {})[key] = value
|
||||
|
||||
def delete(self, plugin: str, key: str) -> bool:
|
||||
try:
|
||||
del self._store[plugin][key]
|
||||
return True
|
||||
except KeyError:
|
||||
return False
|
||||
|
||||
def keys(self, plugin: str) -> list[str]:
|
||||
return sorted(self._store.get(plugin, {}).keys())
|
||||
|
||||
|
||||
class _FakeRegistry:
|
||||
"""Minimal registry stand-in."""
|
||||
|
||||
def __init__(self):
|
||||
self._modules: dict = {}
|
||||
|
||||
|
||||
class _FakeBot:
|
||||
"""Minimal bot stand-in that captures sent/replied messages."""
|
||||
|
||||
def __init__(self, *, admin: bool = False):
|
||||
self.sent: list[tuple[str, str]] = []
|
||||
self.replied: list[str] = []
|
||||
self.state = _FakeState()
|
||||
self.registry = _FakeRegistry()
|
||||
self._admin = admin
|
||||
self.config: dict = {}
|
||||
|
||||
async def send(self, target: str, text: str) -> None:
|
||||
self.sent.append((target, text))
|
||||
|
||||
async def reply(self, message, text: str) -> None:
|
||||
self.replied.append(text)
|
||||
|
||||
def _is_admin(self, message) -> bool:
|
||||
return self._admin
|
||||
|
||||
|
||||
def _msg(text: str, nick: str = "alice", target: str = "#test") -> Message:
|
||||
"""Create a channel PRIVMSG."""
|
||||
return Message(
|
||||
raw="", prefix=f"{nick}!~{nick}@host", nick=nick,
|
||||
command="PRIVMSG", params=[target, text], tags={},
|
||||
)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# TestCmdShorten
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestCmdShorten:
|
||||
def test_missing_url(self):
|
||||
bot = _FakeBot()
|
||||
asyncio.run(cmd_shorten(bot, _msg("!shorten")))
|
||||
assert "Usage:" in bot.replied[0]
|
||||
|
||||
def test_invalid_scheme(self):
|
||||
bot = _FakeBot()
|
||||
asyncio.run(cmd_shorten(bot, _msg("!shorten ftp://example.com")))
|
||||
assert "http://" in bot.replied[0]
|
||||
|
||||
def test_success(self):
|
||||
bot = _FakeBot()
|
||||
|
||||
async def inner():
|
||||
with patch.object(
|
||||
_mod, "_shorten_url",
|
||||
return_value="https://paste.mymx.me/s/abc123",
|
||||
):
|
||||
await cmd_shorten(bot, _msg("!shorten https://example.com/long"))
|
||||
assert "paste.mymx.me/s/abc123" in bot.replied[0]
|
||||
|
||||
asyncio.run(inner())
|
||||
|
||||
def test_failure(self):
|
||||
bot = _FakeBot()
|
||||
|
||||
async def inner():
|
||||
with patch.object(
|
||||
_mod, "_shorten_url",
|
||||
side_effect=ConnectionError("down"),
|
||||
):
|
||||
await cmd_shorten(bot, _msg("!shorten https://example.com/long"))
|
||||
assert "shorten failed" in bot.replied[0]
|
||||
|
||||
asyncio.run(inner())
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# TestCmdPaste
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestCmdPaste:
|
||||
def test_missing_text(self):
|
||||
bot = _FakeBot()
|
||||
asyncio.run(cmd_paste(bot, _msg("!paste")))
|
||||
assert "Usage:" in bot.replied[0]
|
||||
|
||||
def test_success(self):
|
||||
bot = _FakeBot()
|
||||
|
||||
async def inner():
|
||||
with patch.object(
|
||||
_mod, "_create_paste",
|
||||
return_value="https://paste.mymx.me/xyz789",
|
||||
):
|
||||
await cmd_paste(bot, _msg("!paste hello world"))
|
||||
assert "paste.mymx.me/xyz789" in bot.replied[0]
|
||||
|
||||
asyncio.run(inner())
|
||||
|
||||
def test_failure(self):
|
||||
bot = _FakeBot()
|
||||
|
||||
async def inner():
|
||||
with patch.object(
|
||||
_mod, "_create_paste",
|
||||
side_effect=ConnectionError("down"),
|
||||
):
|
||||
await cmd_paste(bot, _msg("!paste hello world"))
|
||||
assert "paste failed" in bot.replied[0]
|
||||
|
||||
asyncio.run(inner())
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# TestShortenUrlHelper
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestShortenUrlHelper:
|
||||
def test_returns_short_url(self):
|
||||
bot = _FakeBot()
|
||||
with patch.object(
|
||||
_mod, "_shorten_url",
|
||||
return_value="https://paste.mymx.me/s/short",
|
||||
):
|
||||
result = shorten_url(bot, "https://example.com/long")
|
||||
assert result == "https://paste.mymx.me/s/short"
|
||||
|
||||
def test_returns_original_on_error(self):
|
||||
bot = _FakeBot()
|
||||
with patch.object(
|
||||
_mod, "_shorten_url",
|
||||
side_effect=ConnectionError("fail"),
|
||||
):
|
||||
result = shorten_url(bot, "https://example.com/long")
|
||||
assert result == "https://example.com/long"
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# TestCreatePasteHelper
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestCreatePasteHelper:
|
||||
def test_returns_paste_url(self):
|
||||
bot = _FakeBot()
|
||||
with patch.object(
|
||||
_mod, "_create_paste",
|
||||
return_value="https://paste.mymx.me/abc",
|
||||
):
|
||||
result = create_paste(bot, "hello")
|
||||
assert result == "https://paste.mymx.me/abc"
|
||||
|
||||
def test_returns_none_on_error(self):
|
||||
bot = _FakeBot()
|
||||
with patch.object(
|
||||
_mod, "_create_paste",
|
||||
side_effect=ConnectionError("fail"),
|
||||
):
|
||||
result = create_paste(bot, "hello")
|
||||
assert result is None
|
||||
@@ -274,7 +274,7 @@ class TestAdmin:
|
||||
|
||||
replies = h.sent_privmsgs("#test")
|
||||
assert not any("Permission denied" in r for r in replies)
|
||||
assert any("Patterns:" in r for r in replies)
|
||||
assert any("Admin:" in r for r in replies)
|
||||
|
||||
def test_oper_detection(self):
|
||||
"""IRC operator detected via WHO reply can use admin commands."""
|
||||
@@ -290,7 +290,7 @@ class TestAdmin:
|
||||
|
||||
replies = h.sent_privmsgs("#test")
|
||||
assert not any("Permission denied" in r for r in replies)
|
||||
assert any("Opers:" in r for r in replies)
|
||||
assert any("IRCOPs:" in r for r in replies)
|
||||
|
||||
def test_oper_detection_on_join(self):
|
||||
"""User joining a channel triggers debounced WHO for oper detection."""
|
||||
|
||||
@@ -140,6 +140,9 @@ class _FakeBot:
|
||||
async def reply(self, message, text: str) -> None:
|
||||
self.replied.append(text)
|
||||
|
||||
async def shorten_url(self, url: str) -> str:
|
||||
return url
|
||||
|
||||
def _is_admin(self, message) -> bool:
|
||||
return self._admin
|
||||
|
||||
|
||||
@@ -166,6 +166,9 @@ class _FakeBot:
|
||||
async def reply(self, message, text: str) -> None:
|
||||
self.replied.append(text)
|
||||
|
||||
async def shorten_url(self, url: str) -> str:
|
||||
return url
|
||||
|
||||
def _is_admin(self, message) -> bool:
|
||||
return self._admin
|
||||
|
||||
|
||||
395
tests/test_webhook.py
Normal file
395
tests/test_webhook.py
Normal file
@@ -0,0 +1,395 @@
|
||||
"""Tests for the webhook listener plugin."""
|
||||
|
||||
import asyncio
|
||||
import hashlib
|
||||
import hmac
|
||||
import importlib.util
|
||||
import json
|
||||
import sys
|
||||
import time
|
||||
from pathlib import Path
|
||||
|
||||
from derp.irc import Message
|
||||
|
||||
# plugins/ is not a Python package -- load the module from file path
|
||||
_spec = importlib.util.spec_from_file_location(
|
||||
"plugins.webhook", Path(__file__).resolve().parent.parent / "plugins" / "webhook.py",
|
||||
)
|
||||
_mod = importlib.util.module_from_spec(_spec)
|
||||
sys.modules[_spec.name] = _mod
|
||||
_spec.loader.exec_module(_mod)
|
||||
|
||||
from plugins.webhook import ( # noqa: E402
|
||||
_MAX_BODY,
|
||||
_handle_request,
|
||||
_http_response,
|
||||
_verify_signature,
|
||||
cmd_webhook,
|
||||
on_connect,
|
||||
)
|
||||
|
||||
# -- Helpers -----------------------------------------------------------------
|
||||
|
||||
|
||||
class _FakeState:
|
||||
"""In-memory stand-in for bot.state."""
|
||||
|
||||
def __init__(self):
|
||||
self._store: dict[str, dict[str, str]] = {}
|
||||
|
||||
def get(self, plugin: str, key: str, default: str | None = None) -> str | None:
|
||||
return self._store.get(plugin, {}).get(key, default)
|
||||
|
||||
def set(self, plugin: str, key: str, value: str) -> None:
|
||||
self._store.setdefault(plugin, {})[key] = value
|
||||
|
||||
def delete(self, plugin: str, key: str) -> bool:
|
||||
try:
|
||||
del self._store[plugin][key]
|
||||
return True
|
||||
except KeyError:
|
||||
return False
|
||||
|
||||
def keys(self, plugin: str) -> list[str]:
|
||||
return sorted(self._store.get(plugin, {}).keys())
|
||||
|
||||
|
||||
class _FakeBot:
|
||||
"""Minimal bot stand-in that captures sent/action messages."""
|
||||
|
||||
def __init__(self, *, admin: bool = True, webhook_cfg: dict | None = None):
|
||||
self.sent: list[tuple[str, str]] = []
|
||||
self.replied: list[str] = []
|
||||
self.actions: list[tuple[str, str]] = []
|
||||
self.state = _FakeState()
|
||||
self._admin = admin
|
||||
self.prefix = "!"
|
||||
self.config = {
|
||||
"webhook": webhook_cfg or {"enabled": False},
|
||||
}
|
||||
|
||||
async def send(self, target: str, text: str) -> None:
|
||||
self.sent.append((target, text))
|
||||
|
||||
async def reply(self, message, text: str) -> None:
|
||||
self.replied.append(text)
|
||||
|
||||
async def action(self, target: str, text: str) -> None:
|
||||
self.actions.append((target, text))
|
||||
|
||||
def _is_admin(self, message) -> bool:
|
||||
return self._admin
|
||||
|
||||
|
||||
def _msg(text: str, nick: str = "admin", target: str = "#test") -> Message:
|
||||
return Message(
|
||||
raw="", prefix=f"{nick}!~{nick}@host", nick=nick,
|
||||
command="PRIVMSG", params=[target, text], tags={},
|
||||
)
|
||||
|
||||
|
||||
def _sign(secret: str, body: bytes) -> str:
|
||||
"""Generate HMAC-SHA256 signature."""
|
||||
sig = hmac.new(secret.encode(), body, hashlib.sha256).hexdigest()
|
||||
return f"sha256={sig}"
|
||||
|
||||
|
||||
class _FakeReader:
|
||||
"""Mock asyncio.StreamReader from raw HTTP bytes."""
|
||||
|
||||
def __init__(self, data: bytes) -> None:
|
||||
self._data = data
|
||||
self._pos = 0
|
||||
|
||||
async def readline(self) -> bytes:
|
||||
start = self._pos
|
||||
idx = self._data.find(b"\n", start)
|
||||
if idx == -1:
|
||||
self._pos = len(self._data)
|
||||
return self._data[start:]
|
||||
self._pos = idx + 1
|
||||
return self._data[start:self._pos]
|
||||
|
||||
async def readexactly(self, n: int) -> bytes:
|
||||
chunk = self._data[self._pos:self._pos + n]
|
||||
self._pos += n
|
||||
return chunk
|
||||
|
||||
|
||||
class _FakeWriter:
|
||||
"""Mock asyncio.StreamWriter that captures output."""
|
||||
|
||||
def __init__(self) -> None:
|
||||
self.data = b""
|
||||
self._closed = False
|
||||
|
||||
def write(self, data: bytes) -> None:
|
||||
self.data += data
|
||||
|
||||
def close(self) -> None:
|
||||
self._closed = True
|
||||
|
||||
async def wait_closed(self) -> None:
|
||||
pass
|
||||
|
||||
|
||||
def _build_request(method: str, body: bytes, headers: dict[str, str] | None = None) -> bytes:
|
||||
"""Build raw HTTP request bytes."""
|
||||
hdrs = headers or {}
|
||||
if "Content-Length" not in hdrs:
|
||||
hdrs["Content-Length"] = str(len(body))
|
||||
lines = [f"{method} / HTTP/1.1"]
|
||||
for k, v in hdrs.items():
|
||||
lines.append(f"{k}: {v}")
|
||||
lines.append("")
|
||||
lines.append("")
|
||||
return "\r\n".join(lines).encode("utf-8") + body
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# TestVerifySignature
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestVerifySignature:
|
||||
def test_valid_signature(self):
|
||||
body = b'{"channel":"#test","text":"hello"}'
|
||||
sig = _sign("secret", body)
|
||||
assert _verify_signature("secret", body, sig) is True
|
||||
|
||||
def test_invalid_signature(self):
|
||||
body = b'{"channel":"#test","text":"hello"}'
|
||||
assert _verify_signature("secret", body, "sha256=bad") is False
|
||||
|
||||
def test_empty_secret_allows_all(self):
|
||||
body = b'{"channel":"#test","text":"hello"}'
|
||||
assert _verify_signature("", body, "") is True
|
||||
|
||||
def test_missing_prefix(self):
|
||||
body = b'{"channel":"#test","text":"hello"}'
|
||||
sig = hmac.new(b"secret", body, hashlib.sha256).hexdigest()
|
||||
assert _verify_signature("secret", body, sig) is False
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# TestHttpResponse
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestHttpResponse:
|
||||
def test_200_response(self):
|
||||
resp = _http_response(200, "OK", "sent")
|
||||
assert b"200 OK" in resp
|
||||
assert b"sent" in resp
|
||||
|
||||
def test_400_with_body(self):
|
||||
resp = _http_response(400, "Bad Request", "invalid JSON")
|
||||
assert b"400 Bad Request" in resp
|
||||
assert b"invalid JSON" in resp
|
||||
|
||||
def test_405_response(self):
|
||||
resp = _http_response(405, "Method Not Allowed", "POST only")
|
||||
assert b"405 Method Not Allowed" in resp
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# TestRequestHandler
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestRequestHandler:
|
||||
def test_valid_post(self):
|
||||
bot = _FakeBot()
|
||||
body = json.dumps({"channel": "#ops", "text": "deploy done"}).encode()
|
||||
sig = _sign("secret", body)
|
||||
raw = _build_request("POST", body, {
|
||||
"Content-Length": str(len(body)),
|
||||
"Content-Type": "application/json",
|
||||
"X-Signature": sig,
|
||||
})
|
||||
reader = _FakeReader(raw)
|
||||
writer = _FakeWriter()
|
||||
asyncio.run(_handle_request(reader, writer, bot, "secret"))
|
||||
assert b"200 OK" in writer.data
|
||||
assert ("#ops", "deploy done") in bot.sent
|
||||
|
||||
def test_action_post(self):
|
||||
bot = _FakeBot()
|
||||
body = json.dumps({
|
||||
"channel": "#ops", "text": "deployed", "action": True,
|
||||
}).encode()
|
||||
sig = _sign("s", body)
|
||||
raw = _build_request("POST", body, {
|
||||
"Content-Length": str(len(body)),
|
||||
"X-Signature": sig,
|
||||
})
|
||||
reader = _FakeReader(raw)
|
||||
writer = _FakeWriter()
|
||||
asyncio.run(_handle_request(reader, writer, bot, "s"))
|
||||
assert b"200 OK" in writer.data
|
||||
assert ("#ops", "deployed") in bot.actions
|
||||
|
||||
def test_get_405(self):
|
||||
bot = _FakeBot()
|
||||
raw = _build_request("GET", b"")
|
||||
reader = _FakeReader(raw)
|
||||
writer = _FakeWriter()
|
||||
asyncio.run(_handle_request(reader, writer, bot, ""))
|
||||
assert b"405" in writer.data
|
||||
|
||||
def test_bad_signature_401(self):
|
||||
bot = _FakeBot()
|
||||
body = json.dumps({"channel": "#test", "text": "x"}).encode()
|
||||
raw = _build_request("POST", body, {
|
||||
"Content-Length": str(len(body)),
|
||||
"X-Signature": "sha256=wrong",
|
||||
})
|
||||
reader = _FakeReader(raw)
|
||||
writer = _FakeWriter()
|
||||
asyncio.run(_handle_request(reader, writer, bot, "real-secret"))
|
||||
assert b"401" in writer.data
|
||||
assert len(bot.sent) == 0
|
||||
|
||||
def test_bad_json_400(self):
|
||||
bot = _FakeBot()
|
||||
body = b"not json"
|
||||
raw = _build_request("POST", body, {"Content-Length": str(len(body))})
|
||||
reader = _FakeReader(raw)
|
||||
writer = _FakeWriter()
|
||||
asyncio.run(_handle_request(reader, writer, bot, ""))
|
||||
assert b"400" in writer.data
|
||||
assert b"invalid JSON" in writer.data
|
||||
|
||||
def test_missing_channel_400(self):
|
||||
bot = _FakeBot()
|
||||
body = json.dumps({"text": "no channel"}).encode()
|
||||
raw = _build_request("POST", body, {"Content-Length": str(len(body))})
|
||||
reader = _FakeReader(raw)
|
||||
writer = _FakeWriter()
|
||||
asyncio.run(_handle_request(reader, writer, bot, ""))
|
||||
assert b"400" in writer.data
|
||||
assert b"invalid channel" in writer.data
|
||||
|
||||
def test_invalid_channel_400(self):
|
||||
bot = _FakeBot()
|
||||
body = json.dumps({"channel": "nochanprefix", "text": "x"}).encode()
|
||||
raw = _build_request("POST", body, {"Content-Length": str(len(body))})
|
||||
reader = _FakeReader(raw)
|
||||
writer = _FakeWriter()
|
||||
asyncio.run(_handle_request(reader, writer, bot, ""))
|
||||
assert b"400" in writer.data
|
||||
|
||||
def test_empty_text_400(self):
|
||||
bot = _FakeBot()
|
||||
body = json.dumps({"channel": "#test", "text": ""}).encode()
|
||||
raw = _build_request("POST", body, {"Content-Length": str(len(body))})
|
||||
reader = _FakeReader(raw)
|
||||
writer = _FakeWriter()
|
||||
asyncio.run(_handle_request(reader, writer, bot, ""))
|
||||
assert b"400" in writer.data
|
||||
assert b"empty text" in writer.data
|
||||
|
||||
def test_body_too_large_413(self):
|
||||
bot = _FakeBot()
|
||||
raw = _build_request("POST", b"", {
|
||||
"Content-Length": str(_MAX_BODY + 1),
|
||||
})
|
||||
reader = _FakeReader(raw)
|
||||
writer = _FakeWriter()
|
||||
asyncio.run(_handle_request(reader, writer, bot, ""))
|
||||
assert b"413" in writer.data
|
||||
|
||||
def test_counter_increments(self):
|
||||
bot = _FakeBot()
|
||||
# Reset counter
|
||||
_mod._request_count = 0
|
||||
body = json.dumps({"channel": "#test", "text": "hi"}).encode()
|
||||
raw = _build_request("POST", body, {"Content-Length": str(len(body))})
|
||||
reader = _FakeReader(raw)
|
||||
writer = _FakeWriter()
|
||||
asyncio.run(_handle_request(reader, writer, bot, ""))
|
||||
assert _mod._request_count == 1
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# TestServerLifecycle
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestServerLifecycle:
|
||||
def test_disabled_config(self):
|
||||
"""Server does not start when webhook is disabled."""
|
||||
bot = _FakeBot(webhook_cfg={"enabled": False})
|
||||
msg = _msg("", target="")
|
||||
msg = Message(raw="", prefix="", nick="", command="001",
|
||||
params=["test", "Welcome"], tags={})
|
||||
# Reset global state
|
||||
_mod._server = None
|
||||
asyncio.run(on_connect(bot, msg))
|
||||
assert _mod._server is None
|
||||
|
||||
def test_duplicate_guard(self):
|
||||
"""Second on_connect does not create a second server."""
|
||||
sentinel = object()
|
||||
_mod._server = sentinel
|
||||
bot = _FakeBot(webhook_cfg={"enabled": True, "port": 0})
|
||||
msg = Message(raw="", prefix="", nick="", command="001",
|
||||
params=["test", "Welcome"], tags={})
|
||||
asyncio.run(on_connect(bot, msg))
|
||||
assert _mod._server is sentinel
|
||||
_mod._server = None # cleanup
|
||||
|
||||
def test_on_connect_starts(self):
|
||||
"""on_connect starts the server when enabled."""
|
||||
_mod._server = None
|
||||
bot = _FakeBot(webhook_cfg={
|
||||
"enabled": True, "host": "127.0.0.1", "port": 0, "secret": "",
|
||||
})
|
||||
msg = Message(raw="", prefix="", nick="", command="001",
|
||||
params=["test", "Welcome"], tags={})
|
||||
|
||||
async def _run():
|
||||
await on_connect(bot, msg)
|
||||
assert _mod._server is not None
|
||||
_mod._server.close()
|
||||
await _mod._server.wait_closed()
|
||||
_mod._server = None
|
||||
|
||||
asyncio.run(_run())
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# TestWebhookCommand
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestWebhookCommand:
|
||||
def test_not_running(self):
|
||||
bot = _FakeBot()
|
||||
_mod._server = None
|
||||
asyncio.run(cmd_webhook(bot, _msg("!webhook")))
|
||||
assert any("not running" in r for r in bot.replied)
|
||||
|
||||
def test_running_shows_status(self):
|
||||
bot = _FakeBot()
|
||||
_mod._request_count = 42
|
||||
_mod._started = time.monotonic() - 90 # 1m 30s ago
|
||||
|
||||
async def _run():
|
||||
# Start a real server on port 0 to get a valid socket
|
||||
srv = await asyncio.start_server(lambda r, w: None,
|
||||
"127.0.0.1", 0)
|
||||
_mod._server = srv
|
||||
try:
|
||||
await cmd_webhook(bot, _msg("!webhook"))
|
||||
finally:
|
||||
srv.close()
|
||||
await srv.wait_closed()
|
||||
_mod._server = None
|
||||
|
||||
asyncio.run(_run())
|
||||
assert len(bot.replied) == 1
|
||||
reply = bot.replied[0]
|
||||
assert "Webhook:" in reply
|
||||
assert "42 requests" in reply
|
||||
assert "127.0.0.1:" in reply
|
||||
@@ -171,6 +171,9 @@ class _FakeBot:
|
||||
async def reply(self, message, text: str) -> None:
|
||||
self.replied.append(text)
|
||||
|
||||
async def shorten_url(self, url: str) -> str:
|
||||
return url
|
||||
|
||||
def _is_admin(self, message) -> bool:
|
||||
return self._admin
|
||||
|
||||
|
||||
Reference in New Issue
Block a user