Compare commits
7 Commits
9abf8dce64
...
e9528bd879
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
e9528bd879 | ||
|
|
c483beb555 | ||
|
|
2514aa777d | ||
|
|
5bc59730c4 | ||
|
|
6ef3fee72c | ||
|
|
7b14efb30f | ||
|
|
aebe1589d2 |
20
.gitea/workflows/ci.yml
Normal file
20
.gitea/workflows/ci.yml
Normal file
@@ -0,0 +1,20 @@
|
|||||||
|
name: CI
|
||||||
|
on:
|
||||||
|
push:
|
||||||
|
branches: [master]
|
||||||
|
pull_request:
|
||||||
|
branches: [master]
|
||||||
|
jobs:
|
||||||
|
test:
|
||||||
|
runs-on: ubuntu-latest
|
||||||
|
strategy:
|
||||||
|
matrix:
|
||||||
|
python-version: ["3.11", "3.12", "3.13"]
|
||||||
|
steps:
|
||||||
|
- uses: actions/checkout@v4
|
||||||
|
- uses: actions/setup-python@v5
|
||||||
|
with:
|
||||||
|
python-version: ${{ matrix.python-version }}
|
||||||
|
- run: pip install -e . && pip install pytest ruff
|
||||||
|
- run: ruff check src/ tests/ plugins/
|
||||||
|
- run: pytest -v
|
||||||
10
ROADMAP.md
10
ROADMAP.md
@@ -113,9 +113,9 @@
|
|||||||
- [ ] Multi-server support (per-server config, shared plugins)
|
- [ ] Multi-server support (per-server config, shared plugins)
|
||||||
- [ ] Stable plugin API (versioned, breaking change policy)
|
- [ ] Stable plugin API (versioned, breaking change policy)
|
||||||
- [x] Paste overflow (auto-paste long output to FlaskPaste, return link)
|
- [x] Paste overflow (auto-paste long output to FlaskPaste, return link)
|
||||||
- [ ] URL shortener integration (shorten URLs in alerts and long output)
|
- [x] URL shortener integration (shorten URLs in subscription announcements)
|
||||||
- [ ] Webhook listener (HTTP endpoint for push events to channels)
|
- [x] Webhook listener (HTTP endpoint for push events to channels)
|
||||||
- [ ] Granular ACLs (per-command permission tiers: trusted, operator, admin)
|
- [x] Granular ACLs (per-command permission tiers: trusted, operator, admin)
|
||||||
- [x] `paste` command (manual paste to FlaskPaste)
|
- [x] `paste` command (manual paste to FlaskPaste)
|
||||||
- [x] `shorten` command (manual URL shortening)
|
- [x] `shorten` command (manual URL shortening)
|
||||||
- [x] `emailcheck` plugin (SMTP VRFY/RCPT TO)
|
- [x] `emailcheck` plugin (SMTP VRFY/RCPT TO)
|
||||||
@@ -125,6 +125,6 @@
|
|||||||
- [x] `jwt` plugin (decode tokens, show claims/expiry, flag weaknesses)
|
- [x] `jwt` plugin (decode tokens, show claims/expiry, flag weaknesses)
|
||||||
- [x] `mac` plugin (OUI vendor lookup, local IEEE database)
|
- [x] `mac` plugin (OUI vendor lookup, local IEEE database)
|
||||||
- [x] `pastemoni` plugin (monitor paste sites for keywords)
|
- [x] `pastemoni` plugin (monitor paste sites for keywords)
|
||||||
- [ ] `cron` plugin (scheduled bot commands on a timer)
|
- [x] `cron` plugin (scheduled bot commands on a timer)
|
||||||
- [x] Plugin command unit tests (encode, hash, dns, cidr, defang)
|
- [x] Plugin command unit tests (encode, hash, dns, cidr, defang)
|
||||||
- [ ] CI pipeline
|
- [x] CI pipeline (Gitea Actions, Python 3.11-3.13, ruff + pytest)
|
||||||
|
|||||||
26
TASKS.md
26
TASKS.md
@@ -1,6 +1,30 @@
|
|||||||
# derp - Tasks
|
# derp - Tasks
|
||||||
|
|
||||||
## Current Sprint -- v2.0.0 Quick Wins (2026-02-21)
|
## Current Sprint -- v2.0.0 ACL + Webhook (2026-02-21)
|
||||||
|
|
||||||
|
| Pri | Status | Task |
|
||||||
|
|-----|--------|------|
|
||||||
|
| P0 | [x] | Granular ACL tiers in `src/derp/plugin.py` (TIERS, Handler.tier, decorator) |
|
||||||
|
| P0 | [x] | ACL dispatch in `src/derp/bot.py` (_get_tier, _operators, _trusted) |
|
||||||
|
| P0 | [x] | Config defaults: operators, trusted, webhook section |
|
||||||
|
| P0 | [x] | `plugins/core.py` -- whoami/admins tier display |
|
||||||
|
| P0 | [x] | `plugins/webhook.py` -- HTTP webhook listener (HMAC, JSON, POST) |
|
||||||
|
| P1 | [x] | Tests: `test_acl.py` (32 cases), `test_webhook.py` (22 cases) |
|
||||||
|
| P2 | [x] | Documentation update (USAGE.md, CHEATSHEET.md, ROADMAP.md, TODO.md) |
|
||||||
|
|
||||||
|
## Previous Sprint -- v2.0.0 Tier 2 (2026-02-21)
|
||||||
|
|
||||||
|
| Pri | Status | Task |
|
||||||
|
|-----|--------|------|
|
||||||
|
| P0 | [x] | `Bot.shorten_url()` method in `src/derp/bot.py` |
|
||||||
|
| P0 | [x] | URL shortening in rss.py, youtube.py, pastemoni.py announcements |
|
||||||
|
| P0 | [x] | `plugins/cron.py` -- scheduled command execution (add/del/list) |
|
||||||
|
| P0 | [x] | `.gitea/workflows/ci.yml` -- Gitea Actions CI pipeline |
|
||||||
|
| P1 | [x] | Tests: `test_flaskpaste.py` (9 cases), `test_cron.py` (~38 cases) |
|
||||||
|
| P1 | [x] | FakeBot `shorten_url` in test_rss, test_youtube, test_pastemoni |
|
||||||
|
| P2 | [x] | Documentation update (USAGE.md, CHEATSHEET.md, ROADMAP.md, TODO.md) |
|
||||||
|
|
||||||
|
## Previous Sprint -- v2.0.0 Quick Wins (2026-02-21)
|
||||||
|
|
||||||
| Pri | Status | Task |
|
| Pri | Status | Task |
|
||||||
|-----|--------|------|
|
|-----|--------|------|
|
||||||
|
|||||||
12
TODO.md
12
TODO.md
@@ -4,10 +4,10 @@
|
|||||||
|
|
||||||
- [ ] Multi-server support (per-server config, shared plugins)
|
- [ ] Multi-server support (per-server config, shared plugins)
|
||||||
- [ ] Stable plugin API (versioned, breaking change policy)
|
- [ ] Stable plugin API (versioned, breaking change policy)
|
||||||
- [ ] Paste overflow (auto-paste long output to FlaskPaste)
|
- [x] Paste overflow (auto-paste long output to FlaskPaste)
|
||||||
- [ ] URL shortener integration (shorten URLs in alerts/output)
|
- [x] URL shortener integration (shorten URLs in subscription announcements)
|
||||||
- [ ] Webhook listener (HTTP endpoint for push events to channels)
|
- [x] Webhook listener (HTTP endpoint for push events to channels)
|
||||||
- [ ] Granular ACLs (per-command: trusted, operator, admin)
|
- [x] Granular ACLs (per-command: trusted, operator, admin)
|
||||||
|
|
||||||
## LLM Bridge
|
## LLM Bridge
|
||||||
|
|
||||||
@@ -80,9 +80,9 @@ is preserved in git history for reference.
|
|||||||
|
|
||||||
- [x] `paste` -- manual paste to FlaskPaste
|
- [x] `paste` -- manual paste to FlaskPaste
|
||||||
- [x] `shorten` -- manual URL shortening
|
- [x] `shorten` -- manual URL shortening
|
||||||
- [ ] `cron` -- scheduled bot commands on a timer
|
- [x] `cron` -- scheduled bot commands on a timer
|
||||||
|
|
||||||
## Testing
|
## Testing
|
||||||
|
|
||||||
- [x] Plugin command unit tests (encode, hash, dns, cidr, defang)
|
- [x] Plugin command unit tests (encode, hash, dns, cidr, defang)
|
||||||
- [ ] CI pipeline
|
- [x] CI pipeline (Gitea Actions)
|
||||||
|
|||||||
@@ -75,21 +75,27 @@ code changes -- restart the container or use `!reload` for plugins.
|
|||||||
!h # Shorthand (any unambiguous prefix works)
|
!h # Shorthand (any unambiguous prefix works)
|
||||||
```
|
```
|
||||||
|
|
||||||
## Admin
|
## Permission Tiers
|
||||||
|
|
||||||
```
|
```
|
||||||
!whoami # Show your hostmask + admin status
|
user < trusted < oper < admin
|
||||||
!admins # Show admin patterns + detected opers (admin)
|
|
||||||
```
|
```
|
||||||
|
|
||||||
```toml
|
```toml
|
||||||
# config/derp.toml
|
# config/derp.toml
|
||||||
[bot]
|
[bot]
|
||||||
admins = ["*!~user@trusted.host", "ops!*@*.ops.net"]
|
admins = ["*!~root@*.ops.net"] # admin tier
|
||||||
|
operators = ["*!~staff@trusted.host"] # oper tier
|
||||||
|
trusted = ["*!~user@known.host"] # trusted tier
|
||||||
```
|
```
|
||||||
|
|
||||||
IRC operators are auto-detected via WHO on connect and on user JOIN
|
```
|
||||||
(debounced 2s to handle netsplit floods). Hostmask patterns use fnmatch.
|
!whoami # Show your hostmask + permission tier
|
||||||
|
!admins # Show configured tiers + detected opers (admin)
|
||||||
|
```
|
||||||
|
|
||||||
|
IRC operators are auto-detected via WHO (admin tier). Hostmask patterns
|
||||||
|
use fnmatch. `admin=True` on commands still works (maps to tier="admin").
|
||||||
|
|
||||||
## Channel Management (admin)
|
## Channel Management (admin)
|
||||||
|
|
||||||
@@ -437,6 +443,45 @@ History in `data/alert_history.db`.
|
|||||||
|
|
||||||
Shows top 3 results as `Title -- URL`. Channel only. Max query length: 200 chars.
|
Shows top 3 results as `Title -- URL`. Channel only. Max query length: 200 chars.
|
||||||
|
|
||||||
|
## Cron (admin)
|
||||||
|
|
||||||
|
```
|
||||||
|
!cron add 1h #ops !rss check news # Schedule command every hour
|
||||||
|
!cron add 2d #alerts !tor update # Every 2 days
|
||||||
|
!cron del abc123 # Remove job by ID
|
||||||
|
!cron list # List jobs in channel
|
||||||
|
```
|
||||||
|
|
||||||
|
Intervals: `5m`, `1h30m`, `2d`, `90s`, or raw seconds. Min 1m, max 7d.
|
||||||
|
Max 20 jobs/channel. Persists across restarts. Channel only.
|
||||||
|
|
||||||
|
## Webhook (admin)
|
||||||
|
|
||||||
|
```toml
|
||||||
|
# config/derp.toml
|
||||||
|
[webhook]
|
||||||
|
enabled = true
|
||||||
|
host = "127.0.0.1"
|
||||||
|
port = 8080
|
||||||
|
secret = "your-shared-secret"
|
||||||
|
```
|
||||||
|
|
||||||
|
```bash
|
||||||
|
# Send message to IRC channel via webhook
|
||||||
|
SECRET="your-shared-secret"
|
||||||
|
BODY='{"channel":"#ops","text":"Deploy done"}'
|
||||||
|
SIG=$(printf '%s' "$BODY" | openssl dgst -sha256 -hmac "$SECRET" | awk '{print $2}')
|
||||||
|
curl -X POST http://127.0.0.1:8080/ \
|
||||||
|
-H "X-Signature: sha256=$SIG" -d "$BODY"
|
||||||
|
```
|
||||||
|
|
||||||
|
```
|
||||||
|
!webhook # Show listener status (admin)
|
||||||
|
```
|
||||||
|
|
||||||
|
POST JSON: `{"channel":"#chan","text":"msg"}`. Optional `"action":true`.
|
||||||
|
Auth: HMAC-SHA256 via `X-Signature` header. Starts on IRC connect.
|
||||||
|
|
||||||
## Plugin Template
|
## Plugin Template
|
||||||
|
|
||||||
```python
|
```python
|
||||||
|
|||||||
151
docs/USAGE.md
151
docs/USAGE.md
@@ -53,10 +53,18 @@ rate_burst = 5 # Burst capacity (default: 5)
|
|||||||
paste_threshold = 4 # Max lines before overflow to FlaskPaste (default: 4)
|
paste_threshold = 4 # Max lines before overflow to FlaskPaste (default: 4)
|
||||||
admins = [] # Hostmask patterns (fnmatch), IRCOPs auto-detected
|
admins = [] # Hostmask patterns (fnmatch), IRCOPs auto-detected
|
||||||
timezone = "UTC" # Timezone for calendar reminders (IANA tz name)
|
timezone = "UTC" # Timezone for calendar reminders (IANA tz name)
|
||||||
|
operators = [] # Hostmask patterns for "oper" tier (fnmatch)
|
||||||
|
trusted = [] # Hostmask patterns for "trusted" tier (fnmatch)
|
||||||
|
|
||||||
[logging]
|
[logging]
|
||||||
level = "info" # Logging level: debug, info, warning, error
|
level = "info" # Logging level: debug, info, warning, error
|
||||||
format = "text" # Log format: "text" (default) or "json"
|
format = "text" # Log format: "text" (default) or "json"
|
||||||
|
|
||||||
|
[webhook]
|
||||||
|
enabled = false # Enable HTTP webhook listener
|
||||||
|
host = "127.0.0.1" # Bind address
|
||||||
|
port = 8080 # Bind port
|
||||||
|
secret = "" # HMAC-SHA256 shared secret (empty = no auth)
|
||||||
```
|
```
|
||||||
|
|
||||||
## Built-in Commands
|
## Built-in Commands
|
||||||
@@ -141,6 +149,8 @@ format = "text" # Log format: "text" (default) or "json"
|
|||||||
| `!shorten <url>` | Shorten a URL via FlaskPaste |
|
| `!shorten <url>` | Shorten a URL via FlaskPaste |
|
||||||
| `!paste <text>` | Create a paste via FlaskPaste |
|
| `!paste <text>` | Create a paste via FlaskPaste |
|
||||||
| `!pastemoni <add\|del\|list\|check>` | Paste site keyword monitoring |
|
| `!pastemoni <add\|del\|list\|check>` | Paste site keyword monitoring |
|
||||||
|
| `!cron <add\|del\|list>` | Scheduled command execution (admin) |
|
||||||
|
| `!webhook` | Show webhook listener status (admin) |
|
||||||
|
|
||||||
### Command Shorthand
|
### Command Shorthand
|
||||||
|
|
||||||
@@ -220,24 +230,31 @@ Each line contains:
|
|||||||
|
|
||||||
Default format is `"text"` (human-readable, same as before).
|
Default format is `"text"` (human-readable, same as before).
|
||||||
|
|
||||||
## Admin System
|
## Permission Tiers (ACL)
|
||||||
|
|
||||||
Commands marked as `admin` require elevated permissions. Admin access is
|
The bot uses a 4-tier permission model. Each command has a required tier;
|
||||||
granted via:
|
users must meet or exceed it.
|
||||||
|
|
||||||
1. **IRC operator status** -- detected automatically via `WHO`
|
```
|
||||||
2. **Hostmask patterns** -- configured in `[bot] admins`, fnmatch-style
|
user < trusted < oper < admin
|
||||||
|
```
|
||||||
|
|
||||||
|
| Tier | Granted by |
|
||||||
|
|------|------------|
|
||||||
|
| `user` | Everyone (default) |
|
||||||
|
| `trusted` | `[bot] trusted` hostmask patterns |
|
||||||
|
| `oper` | `[bot] operators` hostmask patterns |
|
||||||
|
| `admin` | `[bot] admins` hostmask patterns or IRC operator status |
|
||||||
|
|
||||||
```toml
|
```toml
|
||||||
[bot]
|
[bot]
|
||||||
admins = [
|
admins = ["*!~root@*.ops.net"]
|
||||||
"*!~user@trusted.host",
|
operators = ["*!~staff@trusted.host"]
|
||||||
"ops!*@*.ops.net",
|
trusted = ["*!~user@known.host"]
|
||||||
]
|
|
||||||
```
|
```
|
||||||
|
|
||||||
Empty by default -- only IRC operators get admin access unless patterns
|
All lists are empty by default -- only IRC operators get admin access
|
||||||
are configured.
|
unless patterns are configured. Patterns use fnmatch-style matching.
|
||||||
|
|
||||||
### Oper Detection
|
### Oper Detection
|
||||||
|
|
||||||
@@ -255,18 +272,24 @@ set automatically.
|
|||||||
|
|
||||||
| Command | Description |
|
| Command | Description |
|
||||||
|---------|-------------|
|
|---------|-------------|
|
||||||
| `!whoami` | Show your hostmask and admin status |
|
| `!whoami` | Show your hostmask and permission tier |
|
||||||
| `!admins` | Show configured patterns and detected opers (admin) |
|
| `!admins` | Show configured tiers and detected opers (admin) |
|
||||||
|
|
||||||
Admin-restricted commands: `!load`, `!reload`, `!unload`, `!admins`, `!state`,
|
Admin-restricted commands: `!load`, `!reload`, `!unload`, `!admins`, `!state`,
|
||||||
`!kick`, `!ban`, `!unban`, `!topic`, `!mode`.
|
`!kick`, `!ban`, `!unban`, `!topic`, `!mode`, `!webhook`.
|
||||||
|
|
||||||
### Writing Admin Commands
|
### Writing Tiered Commands
|
||||||
|
|
||||||
```python
|
```python
|
||||||
|
# admin=True still works (maps to tier="admin")
|
||||||
@command("dangerous", help="Admin-only action", admin=True)
|
@command("dangerous", help="Admin-only action", admin=True)
|
||||||
async def cmd_dangerous(bot, message):
|
async def cmd_dangerous(bot, message):
|
||||||
...
|
...
|
||||||
|
|
||||||
|
# Explicit tier for finer control
|
||||||
|
@command("moderate", help="Trusted-only action", tier="trusted")
|
||||||
|
async def cmd_moderate(bot, message):
|
||||||
|
...
|
||||||
```
|
```
|
||||||
|
|
||||||
## IRCv3 Capability Negotiation
|
## IRCv3 Capability Negotiation
|
||||||
@@ -404,6 +427,68 @@ restarting the bot.
|
|||||||
The `core` plugin cannot be unloaded (prevents losing `!load`/`!reload`),
|
The `core` plugin cannot be unloaded (prevents losing `!load`/`!reload`),
|
||||||
but it can be reloaded.
|
but it can be reloaded.
|
||||||
|
|
||||||
|
## Webhook Listener
|
||||||
|
|
||||||
|
Receive HTTP POST requests from external services (CI, monitoring, GitHub,
|
||||||
|
etc.) and relay messages to IRC channels.
|
||||||
|
|
||||||
|
### Configuration
|
||||||
|
|
||||||
|
```toml
|
||||||
|
[webhook]
|
||||||
|
enabled = true
|
||||||
|
host = "127.0.0.1"
|
||||||
|
port = 8080
|
||||||
|
secret = "your-shared-secret"
|
||||||
|
```
|
||||||
|
|
||||||
|
### HTTP API
|
||||||
|
|
||||||
|
Single endpoint: `POST /`
|
||||||
|
|
||||||
|
**Request body** (JSON):
|
||||||
|
|
||||||
|
```json
|
||||||
|
{"channel": "#ops", "text": "Deploy v2.3.1 complete"}
|
||||||
|
```
|
||||||
|
|
||||||
|
Optional `"action": true` sends as a `/me` action.
|
||||||
|
|
||||||
|
**Authentication**: HMAC-SHA256 via `X-Signature: sha256=<hex>` header.
|
||||||
|
If `secret` is empty, no authentication is required.
|
||||||
|
|
||||||
|
**Response codes**:
|
||||||
|
|
||||||
|
| Status | When |
|
||||||
|
|--------|------|
|
||||||
|
| 200 OK | Message sent |
|
||||||
|
| 400 Bad Request | Invalid JSON, missing/invalid channel, empty text |
|
||||||
|
| 401 Unauthorized | Bad or missing HMAC signature |
|
||||||
|
| 405 Method Not Allowed | Non-POST request |
|
||||||
|
| 413 Payload Too Large | Body > 64 KB |
|
||||||
|
|
||||||
|
### Usage Example
|
||||||
|
|
||||||
|
```bash
|
||||||
|
SECRET="your-shared-secret"
|
||||||
|
BODY='{"channel":"#ops","text":"Deploy v2.3.1 complete"}'
|
||||||
|
SIG=$(printf '%s' "$BODY" | openssl dgst -sha256 -hmac "$SECRET" | awk '{print $2}')
|
||||||
|
curl -X POST http://127.0.0.1:8080/ \
|
||||||
|
-H "Content-Type: application/json" \
|
||||||
|
-H "X-Signature: sha256=$SIG" \
|
||||||
|
-d "$BODY"
|
||||||
|
```
|
||||||
|
|
||||||
|
### Status Command
|
||||||
|
|
||||||
|
```
|
||||||
|
!webhook Show listener address, request count, uptime (admin)
|
||||||
|
```
|
||||||
|
|
||||||
|
The server starts on IRC connect (event 001) and runs for the lifetime of
|
||||||
|
the bot. If the server is already running (e.g. after reconnect), it is
|
||||||
|
not restarted.
|
||||||
|
|
||||||
## Writing Plugins
|
## Writing Plugins
|
||||||
|
|
||||||
Create a `.py` file in the `plugins/` directory:
|
Create a `.py` file in the `plugins/` directory:
|
||||||
@@ -1077,6 +1162,42 @@ badhost.invalid -> NXDOMAIN
|
|||||||
- Concurrent via `asyncio.gather()`
|
- Concurrent via `asyncio.gather()`
|
||||||
- Valid types: A, NS, CNAME, SOA, PTR, MX, TXT, AAAA
|
- Valid types: A, NS, CNAME, SOA, PTR, MX, TXT, AAAA
|
||||||
|
|
||||||
|
### `!cron` -- Scheduled Command Execution
|
||||||
|
|
||||||
|
Schedule bot commands to repeat on a timer. Admins only.
|
||||||
|
|
||||||
|
```
|
||||||
|
!cron add <interval> <#channel> <command...> Schedule a command
|
||||||
|
!cron del <id> Remove a job
|
||||||
|
!cron list List jobs in channel
|
||||||
|
```
|
||||||
|
|
||||||
|
Examples:
|
||||||
|
|
||||||
|
```
|
||||||
|
!cron add 1h #ops !rss check news Poll RSS feed every hour
|
||||||
|
!cron add 2d #alerts !tor update Update Tor list every 2 days
|
||||||
|
!cron del abc123 Remove job by ID
|
||||||
|
!cron list Show jobs in current channel
|
||||||
|
```
|
||||||
|
|
||||||
|
Output format:
|
||||||
|
|
||||||
|
```
|
||||||
|
Cron #a1b2c3: !rss check news every 1h in #ops
|
||||||
|
#a1b2c3 every 1h: !rss check news
|
||||||
|
```
|
||||||
|
|
||||||
|
- `add` and `del` require admin privileges
|
||||||
|
- `add` and `list` must be used in a channel (not PM)
|
||||||
|
- Interval formats: `5m`, `1h30m`, `2d`, `90s`, or raw seconds
|
||||||
|
- Minimum interval: 1 minute
|
||||||
|
- Maximum interval: 7 days
|
||||||
|
- Maximum 20 jobs per channel
|
||||||
|
- Jobs persist across bot restarts via `bot.state`
|
||||||
|
- Dispatched commands run with the original creator's identity
|
||||||
|
- The scheduled command goes through normal command routing and permissions
|
||||||
|
|
||||||
### FlaskPaste Configuration
|
### FlaskPaste Configuration
|
||||||
|
|
||||||
```toml
|
```toml
|
||||||
|
|||||||
@@ -139,34 +139,33 @@ async def cmd_plugins(bot, message):
|
|||||||
await bot.reply(message, f"Plugins: {', '.join(parts)}")
|
await bot.reply(message, f"Plugins: {', '.join(parts)}")
|
||||||
|
|
||||||
|
|
||||||
@command("whoami", help="Show your hostmask and admin status")
|
@command("whoami", help="Show your hostmask and permission tier")
|
||||||
async def cmd_whoami(bot, message):
|
async def cmd_whoami(bot, message):
|
||||||
"""Display the sender's hostmask and permission level."""
|
"""Display the sender's hostmask and permission level."""
|
||||||
prefix = message.prefix or "unknown"
|
prefix = message.prefix or "unknown"
|
||||||
is_admin = bot._is_admin(message)
|
tier = bot._get_tier(message)
|
||||||
is_oper = message.prefix in bot._opers if message.prefix else False
|
tags = [tier]
|
||||||
tags = []
|
if message.prefix and message.prefix in bot._opers:
|
||||||
if is_admin:
|
|
||||||
tags.append("admin")
|
|
||||||
else:
|
|
||||||
tags.append("user")
|
|
||||||
if is_oper:
|
|
||||||
tags.append("IRCOP")
|
tags.append("IRCOP")
|
||||||
await bot.reply(message, f"{prefix} [{', '.join(tags)}]")
|
await bot.reply(message, f"{prefix} [{', '.join(tags)}]")
|
||||||
|
|
||||||
|
|
||||||
@command("admins", help="Show configured admin patterns and detected opers", admin=True)
|
@command("admins", help="Show configured permission tiers and detected opers", admin=True)
|
||||||
async def cmd_admins(bot, message):
|
async def cmd_admins(bot, message):
|
||||||
"""Display admin hostmask patterns and known IRC operators."""
|
"""Display configured permission tiers and known IRC operators."""
|
||||||
parts = []
|
parts = []
|
||||||
if bot._admins:
|
if bot._admins:
|
||||||
parts.append(f"Patterns: {', '.join(bot._admins)}")
|
parts.append(f"Admin: {', '.join(bot._admins)}")
|
||||||
else:
|
else:
|
||||||
parts.append("Patterns: (none)")
|
parts.append("Admin: (none)")
|
||||||
|
if bot._operators:
|
||||||
|
parts.append(f"Oper: {', '.join(bot._operators)}")
|
||||||
|
if bot._trusted:
|
||||||
|
parts.append(f"Trusted: {', '.join(bot._trusted)}")
|
||||||
if bot._opers:
|
if bot._opers:
|
||||||
parts.append(f"Opers: {', '.join(sorted(bot._opers))}")
|
parts.append(f"IRCOPs: {', '.join(sorted(bot._opers))}")
|
||||||
else:
|
else:
|
||||||
parts.append("Opers: (none)")
|
parts.append("IRCOPs: (none)")
|
||||||
await bot.reply(message, " | ".join(parts))
|
await bot.reply(message, " | ".join(parts))
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
288
plugins/cron.py
Normal file
288
plugins/cron.py
Normal file
@@ -0,0 +1,288 @@
|
|||||||
|
"""Plugin: scheduled command execution on a timer."""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import asyncio
|
||||||
|
import hashlib
|
||||||
|
import json
|
||||||
|
import re
|
||||||
|
import time
|
||||||
|
|
||||||
|
from derp.irc import Message
|
||||||
|
from derp.plugin import command, event
|
||||||
|
|
||||||
|
# -- Constants ---------------------------------------------------------------
|
||||||
|
|
||||||
|
_DURATION_RE = re.compile(r"(?:(\d+)d)?(?:(\d+)h)?(?:(\d+)m)?(?:(\d+)s)?$")
|
||||||
|
_MIN_INTERVAL = 60
|
||||||
|
_MAX_INTERVAL = 604800 # 7 days
|
||||||
|
_MAX_JOBS = 20
|
||||||
|
|
||||||
|
# -- Module-level tracking ---------------------------------------------------
|
||||||
|
|
||||||
|
_jobs: dict[str, dict] = {}
|
||||||
|
_tasks: dict[str, asyncio.Task] = {}
|
||||||
|
|
||||||
|
|
||||||
|
# -- Pure helpers ------------------------------------------------------------
|
||||||
|
|
||||||
|
def _make_id(channel: str, cmd: str) -> str:
|
||||||
|
"""Generate a short hex ID from channel + command + timestamp."""
|
||||||
|
raw = f"{channel}:{cmd}:{time.monotonic()}".encode()
|
||||||
|
return hashlib.sha256(raw).hexdigest()[:6]
|
||||||
|
|
||||||
|
|
||||||
|
def _parse_duration(spec: str) -> int | None:
|
||||||
|
"""Parse a duration like '5m', '1h30m', '2d', '90s', or raw seconds."""
|
||||||
|
try:
|
||||||
|
secs = int(spec)
|
||||||
|
return secs if secs > 0 else None
|
||||||
|
except ValueError:
|
||||||
|
pass
|
||||||
|
m = _DURATION_RE.match(spec.lower())
|
||||||
|
if not m or not any(m.groups()):
|
||||||
|
return None
|
||||||
|
days = int(m.group(1) or 0)
|
||||||
|
hours = int(m.group(2) or 0)
|
||||||
|
mins = int(m.group(3) or 0)
|
||||||
|
secs = int(m.group(4) or 0)
|
||||||
|
total = days * 86400 + hours * 3600 + mins * 60 + secs
|
||||||
|
return total if total > 0 else None
|
||||||
|
|
||||||
|
|
||||||
|
def _format_duration(secs: int) -> str:
|
||||||
|
"""Format seconds into compact duration."""
|
||||||
|
parts = []
|
||||||
|
if secs >= 86400:
|
||||||
|
parts.append(f"{secs // 86400}d")
|
||||||
|
secs %= 86400
|
||||||
|
if secs >= 3600:
|
||||||
|
parts.append(f"{secs // 3600}h")
|
||||||
|
secs %= 3600
|
||||||
|
if secs >= 60:
|
||||||
|
parts.append(f"{secs // 60}m")
|
||||||
|
secs %= 60
|
||||||
|
if secs or not parts:
|
||||||
|
parts.append(f"{secs}s")
|
||||||
|
return "".join(parts)
|
||||||
|
|
||||||
|
|
||||||
|
# -- State helpers -----------------------------------------------------------
|
||||||
|
|
||||||
|
def _state_key(channel: str, cron_id: str) -> str:
|
||||||
|
"""Build composite state key."""
|
||||||
|
return f"{channel}:{cron_id}"
|
||||||
|
|
||||||
|
|
||||||
|
def _save(bot, key: str, data: dict) -> None:
|
||||||
|
"""Persist cron job to bot.state."""
|
||||||
|
bot.state.set("cron", key, json.dumps(data))
|
||||||
|
|
||||||
|
|
||||||
|
def _load(bot, key: str) -> dict | None:
|
||||||
|
"""Load cron job from bot.state."""
|
||||||
|
raw = bot.state.get("cron", key)
|
||||||
|
if raw is None:
|
||||||
|
return None
|
||||||
|
try:
|
||||||
|
return json.loads(raw)
|
||||||
|
except json.JSONDecodeError:
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
def _delete(bot, key: str) -> None:
|
||||||
|
"""Remove cron job from bot.state."""
|
||||||
|
bot.state.delete("cron", key)
|
||||||
|
|
||||||
|
|
||||||
|
# -- Cron loop ---------------------------------------------------------------
|
||||||
|
|
||||||
|
async def _cron_loop(bot, key: str) -> None:
|
||||||
|
"""Repeating loop: sleep, then dispatch the stored command."""
|
||||||
|
try:
|
||||||
|
while True:
|
||||||
|
data = _jobs.get(key)
|
||||||
|
if not data:
|
||||||
|
return
|
||||||
|
await asyncio.sleep(data["interval"])
|
||||||
|
# Synthesize a message for command dispatch
|
||||||
|
msg = Message(
|
||||||
|
raw="", prefix=data["prefix"],
|
||||||
|
nick=data["nick"], command="PRIVMSG",
|
||||||
|
params=[data["channel"], data["command"]], tags={},
|
||||||
|
)
|
||||||
|
bot._dispatch_command(msg)
|
||||||
|
except asyncio.CancelledError:
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
def _start_job(bot, key: str) -> None:
|
||||||
|
"""Create and track a cron task."""
|
||||||
|
existing = _tasks.get(key)
|
||||||
|
if existing and not existing.done():
|
||||||
|
return
|
||||||
|
task = asyncio.create_task(_cron_loop(bot, key))
|
||||||
|
_tasks[key] = task
|
||||||
|
|
||||||
|
|
||||||
|
def _stop_job(key: str) -> None:
|
||||||
|
"""Cancel and remove a cron task."""
|
||||||
|
task = _tasks.pop(key, None)
|
||||||
|
if task and not task.done():
|
||||||
|
task.cancel()
|
||||||
|
_jobs.pop(key, None)
|
||||||
|
|
||||||
|
|
||||||
|
# -- Restore on connect -----------------------------------------------------
|
||||||
|
|
||||||
|
def _restore(bot) -> None:
|
||||||
|
"""Rebuild cron tasks from persisted state."""
|
||||||
|
for key in bot.state.keys("cron"):
|
||||||
|
existing = _tasks.get(key)
|
||||||
|
if existing and not existing.done():
|
||||||
|
continue
|
||||||
|
data = _load(bot, key)
|
||||||
|
if data is None:
|
||||||
|
continue
|
||||||
|
_jobs[key] = data
|
||||||
|
_start_job(bot, key)
|
||||||
|
|
||||||
|
|
||||||
|
@event("001")
|
||||||
|
async def on_connect(bot, message):
|
||||||
|
"""Restore cron jobs on connect."""
|
||||||
|
_restore(bot)
|
||||||
|
|
||||||
|
|
||||||
|
# -- Command handler ---------------------------------------------------------
|
||||||
|
|
||||||
|
@command("cron", help="Cron: !cron add|del|list", admin=True)
|
||||||
|
async def cmd_cron(bot, message):
|
||||||
|
"""Scheduled command execution on a timer.
|
||||||
|
|
||||||
|
Usage:
|
||||||
|
!cron add <interval> <#channel> <command...> Schedule a command
|
||||||
|
!cron del <id> Remove a job
|
||||||
|
!cron list List jobs
|
||||||
|
"""
|
||||||
|
parts = message.text.split(None, 4)
|
||||||
|
if len(parts) < 2:
|
||||||
|
await bot.reply(message, "Usage: !cron <add|del|list> [args]")
|
||||||
|
return
|
||||||
|
|
||||||
|
sub = parts[1].lower()
|
||||||
|
|
||||||
|
# -- list ----------------------------------------------------------------
|
||||||
|
if sub == "list":
|
||||||
|
if not message.is_channel:
|
||||||
|
await bot.reply(message, "Use this command in a channel")
|
||||||
|
return
|
||||||
|
channel = message.target
|
||||||
|
prefix = f"{channel}:"
|
||||||
|
entries = []
|
||||||
|
for key in bot.state.keys("cron"):
|
||||||
|
if key.startswith(prefix):
|
||||||
|
data = _load(bot, key)
|
||||||
|
if data:
|
||||||
|
cron_id = data["id"]
|
||||||
|
interval = _format_duration(data["interval"])
|
||||||
|
cmd = data["command"]
|
||||||
|
entries.append(f"#{cron_id} every {interval}: {cmd}")
|
||||||
|
if not entries:
|
||||||
|
await bot.reply(message, "No cron jobs in this channel")
|
||||||
|
return
|
||||||
|
for entry in entries:
|
||||||
|
await bot.reply(message, entry)
|
||||||
|
return
|
||||||
|
|
||||||
|
# -- del -----------------------------------------------------------------
|
||||||
|
if sub == "del":
|
||||||
|
if len(parts) < 3:
|
||||||
|
await bot.reply(message, "Usage: !cron del <id>")
|
||||||
|
return
|
||||||
|
cron_id = parts[2].lstrip("#")
|
||||||
|
# Find matching key across all channels
|
||||||
|
found_key = None
|
||||||
|
for key in bot.state.keys("cron"):
|
||||||
|
data = _load(bot, key)
|
||||||
|
if data and data["id"] == cron_id:
|
||||||
|
found_key = key
|
||||||
|
break
|
||||||
|
if not found_key:
|
||||||
|
await bot.reply(message, f"No cron job #{cron_id}")
|
||||||
|
return
|
||||||
|
_stop_job(found_key)
|
||||||
|
_delete(bot, found_key)
|
||||||
|
await bot.reply(message, f"Removed cron #{cron_id}")
|
||||||
|
return
|
||||||
|
|
||||||
|
# -- add -----------------------------------------------------------------
|
||||||
|
if sub == "add":
|
||||||
|
if not message.is_channel:
|
||||||
|
await bot.reply(message, "Use this command in a channel")
|
||||||
|
return
|
||||||
|
if len(parts) < 5:
|
||||||
|
await bot.reply(
|
||||||
|
message,
|
||||||
|
"Usage: !cron add <interval> <#channel> <command...>",
|
||||||
|
)
|
||||||
|
return
|
||||||
|
|
||||||
|
interval_spec = parts[2]
|
||||||
|
target_channel = parts[3]
|
||||||
|
cmd_text = parts[4]
|
||||||
|
|
||||||
|
if not target_channel.startswith(("#", "&")):
|
||||||
|
await bot.reply(message, "Target must be a channel (e.g. #ops)")
|
||||||
|
return
|
||||||
|
|
||||||
|
interval = _parse_duration(interval_spec)
|
||||||
|
if interval is None:
|
||||||
|
await bot.reply(message, "Invalid interval (use: 5m, 1h, 2d)")
|
||||||
|
return
|
||||||
|
if interval < _MIN_INTERVAL:
|
||||||
|
await bot.reply(
|
||||||
|
message,
|
||||||
|
f"Minimum interval is {_format_duration(_MIN_INTERVAL)}",
|
||||||
|
)
|
||||||
|
return
|
||||||
|
if interval > _MAX_INTERVAL:
|
||||||
|
await bot.reply(
|
||||||
|
message,
|
||||||
|
f"Maximum interval is {_format_duration(_MAX_INTERVAL)}",
|
||||||
|
)
|
||||||
|
return
|
||||||
|
|
||||||
|
# Check per-channel limit
|
||||||
|
ch_prefix = f"{target_channel}:"
|
||||||
|
count = sum(
|
||||||
|
1 for k in bot.state.keys("cron") if k.startswith(ch_prefix)
|
||||||
|
)
|
||||||
|
if count >= _MAX_JOBS:
|
||||||
|
await bot.reply(message, f"Job limit reached ({_MAX_JOBS})")
|
||||||
|
return
|
||||||
|
|
||||||
|
cron_id = _make_id(target_channel, cmd_text)
|
||||||
|
key = _state_key(target_channel, cron_id)
|
||||||
|
|
||||||
|
data = {
|
||||||
|
"id": cron_id,
|
||||||
|
"channel": target_channel,
|
||||||
|
"command": cmd_text,
|
||||||
|
"interval": interval,
|
||||||
|
"prefix": message.prefix,
|
||||||
|
"nick": message.nick,
|
||||||
|
"added_by": message.nick,
|
||||||
|
}
|
||||||
|
_save(bot, key, data)
|
||||||
|
_jobs[key] = data
|
||||||
|
_start_job(bot, key)
|
||||||
|
|
||||||
|
fmt_interval = _format_duration(interval)
|
||||||
|
await bot.reply(
|
||||||
|
message,
|
||||||
|
f"Cron #{cron_id}: {cmd_text} every {fmt_interval} in {target_channel}",
|
||||||
|
)
|
||||||
|
return
|
||||||
|
|
||||||
|
await bot.reply(message, "Usage: !cron <add|del|list> [args]")
|
||||||
@@ -275,6 +275,8 @@ async def _poll_once(bot, key: str, announce: bool = True) -> None:
|
|||||||
title = item.get("title") or "(untitled)"
|
title = item.get("title") or "(untitled)"
|
||||||
snippet = item.get("snippet", "")
|
snippet = item.get("snippet", "")
|
||||||
url = item.get("url", "")
|
url = item.get("url", "")
|
||||||
|
if url:
|
||||||
|
url = await bot.shorten_url(url)
|
||||||
parts = [f"[{tag}] {title}"]
|
parts = [f"[{tag}] {title}"]
|
||||||
if snippet:
|
if snippet:
|
||||||
parts.append(snippet)
|
parts.append(snippet)
|
||||||
|
|||||||
@@ -272,6 +272,8 @@ async def _poll_once(bot, key: str, announce: bool = True) -> None:
|
|||||||
for item in shown:
|
for item in shown:
|
||||||
title = _truncate(item["title"]) if item["title"] else "(no title)"
|
title = _truncate(item["title"]) if item["title"] else "(no title)"
|
||||||
link = item["link"]
|
link = item["link"]
|
||||||
|
if link:
|
||||||
|
link = await bot.shorten_url(link)
|
||||||
date = item.get("date", "")
|
date = item.get("date", "")
|
||||||
line = f"[{name}] {title}"
|
line = f"[{name}] {title}"
|
||||||
if date:
|
if date:
|
||||||
|
|||||||
196
plugins/webhook.py
Normal file
196
plugins/webhook.py
Normal file
@@ -0,0 +1,196 @@
|
|||||||
|
"""Webhook listener: receive HTTP POST requests and relay messages to IRC."""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import asyncio
|
||||||
|
import hashlib
|
||||||
|
import hmac
|
||||||
|
import json
|
||||||
|
import logging
|
||||||
|
import time
|
||||||
|
|
||||||
|
from derp.plugin import command, event
|
||||||
|
|
||||||
|
log = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
_MAX_BODY = 65536 # 64 KB
|
||||||
|
_server: asyncio.Server | None = None
|
||||||
|
_request_count: int = 0
|
||||||
|
_started: float = 0.0
|
||||||
|
|
||||||
|
|
||||||
|
def _verify_signature(secret: str, body: bytes, signature: str) -> bool:
|
||||||
|
"""Verify HMAC-SHA256 signature from X-Signature header."""
|
||||||
|
if not secret:
|
||||||
|
return True # no secret configured = open access
|
||||||
|
if not signature.startswith("sha256="):
|
||||||
|
return False
|
||||||
|
expected = hmac.new(secret.encode(), body, hashlib.sha256).hexdigest()
|
||||||
|
return hmac.compare_digest(expected, signature[7:])
|
||||||
|
|
||||||
|
|
||||||
|
def _http_response(status: int, reason: str, body: str = "") -> bytes:
|
||||||
|
"""Build a minimal HTTP/1.1 response."""
|
||||||
|
body_bytes = body.encode("utf-8") if body else b""
|
||||||
|
lines = [
|
||||||
|
f"HTTP/1.1 {status} {reason}",
|
||||||
|
"Content-Type: text/plain; charset=utf-8",
|
||||||
|
f"Content-Length: {len(body_bytes)}",
|
||||||
|
"Connection: close",
|
||||||
|
"",
|
||||||
|
"",
|
||||||
|
]
|
||||||
|
return "\r\n".join(lines).encode("utf-8") + body_bytes
|
||||||
|
|
||||||
|
|
||||||
|
async def _handle_request(reader: asyncio.StreamReader,
|
||||||
|
writer: asyncio.StreamWriter,
|
||||||
|
bot, secret: str) -> None:
|
||||||
|
"""Parse one HTTP request and dispatch to IRC."""
|
||||||
|
global _request_count
|
||||||
|
|
||||||
|
try:
|
||||||
|
# Read request line
|
||||||
|
request_line = await asyncio.wait_for(reader.readline(), timeout=10.0)
|
||||||
|
if not request_line:
|
||||||
|
return
|
||||||
|
parts = request_line.decode("utf-8", errors="replace").strip().split()
|
||||||
|
if len(parts) < 2:
|
||||||
|
writer.write(_http_response(400, "Bad Request", "malformed request"))
|
||||||
|
return
|
||||||
|
method, _path = parts[0], parts[1]
|
||||||
|
|
||||||
|
# Read headers
|
||||||
|
headers: dict[str, str] = {}
|
||||||
|
while True:
|
||||||
|
line = await asyncio.wait_for(reader.readline(), timeout=10.0)
|
||||||
|
if not line or line == b"\r\n" or line == b"\n":
|
||||||
|
break
|
||||||
|
decoded = line.decode("utf-8", errors="replace").strip()
|
||||||
|
if ":" in decoded:
|
||||||
|
key, val = decoded.split(":", 1)
|
||||||
|
headers[key.strip().lower()] = val.strip()
|
||||||
|
|
||||||
|
# Method check
|
||||||
|
if method != "POST":
|
||||||
|
writer.write(_http_response(405, "Method Not Allowed", "POST only"))
|
||||||
|
return
|
||||||
|
|
||||||
|
# Read body
|
||||||
|
content_length = int(headers.get("content-length", "0"))
|
||||||
|
if content_length > _MAX_BODY:
|
||||||
|
writer.write(_http_response(413, "Payload Too Large",
|
||||||
|
f"max {_MAX_BODY} bytes"))
|
||||||
|
return
|
||||||
|
body = await asyncio.wait_for(reader.readexactly(content_length),
|
||||||
|
timeout=10.0)
|
||||||
|
|
||||||
|
# Verify HMAC signature
|
||||||
|
signature = headers.get("x-signature", "")
|
||||||
|
if not _verify_signature(secret, body, signature):
|
||||||
|
writer.write(_http_response(401, "Unauthorized", "bad signature"))
|
||||||
|
return
|
||||||
|
|
||||||
|
# Parse JSON
|
||||||
|
try:
|
||||||
|
data = json.loads(body)
|
||||||
|
except (json.JSONDecodeError, UnicodeDecodeError):
|
||||||
|
writer.write(_http_response(400, "Bad Request", "invalid JSON"))
|
||||||
|
return
|
||||||
|
|
||||||
|
# Validate fields
|
||||||
|
channel = data.get("channel", "")
|
||||||
|
text = data.get("text", "")
|
||||||
|
is_action = data.get("action", False)
|
||||||
|
|
||||||
|
if not isinstance(channel, str) or not channel.startswith(("#", "&")):
|
||||||
|
writer.write(_http_response(400, "Bad Request", "invalid channel"))
|
||||||
|
return
|
||||||
|
if not isinstance(text, str) or not text.strip():
|
||||||
|
writer.write(_http_response(400, "Bad Request", "empty text"))
|
||||||
|
return
|
||||||
|
|
||||||
|
# Send to IRC
|
||||||
|
text = text.strip()
|
||||||
|
if is_action:
|
||||||
|
await bot.action(channel, text)
|
||||||
|
else:
|
||||||
|
await bot.send(channel, text)
|
||||||
|
|
||||||
|
_request_count += 1
|
||||||
|
writer.write(_http_response(200, "OK", "sent"))
|
||||||
|
log.info("webhook: relayed to %s (%d bytes)", channel, len(text))
|
||||||
|
|
||||||
|
except (asyncio.TimeoutError, asyncio.IncompleteReadError, ConnectionError):
|
||||||
|
log.debug("webhook: client disconnected")
|
||||||
|
except Exception:
|
||||||
|
log.exception("webhook: error handling request")
|
||||||
|
try:
|
||||||
|
writer.write(_http_response(500, "Internal Server Error"))
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
finally:
|
||||||
|
try:
|
||||||
|
writer.close()
|
||||||
|
await writer.wait_closed()
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
@event("001")
|
||||||
|
async def on_connect(bot, message):
|
||||||
|
"""Start the webhook HTTP server on connect (if enabled)."""
|
||||||
|
global _server, _started, _request_count
|
||||||
|
|
||||||
|
if _server is not None:
|
||||||
|
return # already running
|
||||||
|
|
||||||
|
cfg = bot.config.get("webhook", {})
|
||||||
|
if not cfg.get("enabled"):
|
||||||
|
return
|
||||||
|
|
||||||
|
host = cfg.get("host", "127.0.0.1")
|
||||||
|
port = cfg.get("port", 8080)
|
||||||
|
secret = cfg.get("secret", "")
|
||||||
|
|
||||||
|
async def handler(reader, writer):
|
||||||
|
await _handle_request(reader, writer, bot, secret)
|
||||||
|
|
||||||
|
try:
|
||||||
|
_server = await asyncio.start_server(handler, host, port)
|
||||||
|
_started = time.monotonic()
|
||||||
|
_request_count = 0
|
||||||
|
log.info("webhook: listening on %s:%d", host, port)
|
||||||
|
except OSError as exc:
|
||||||
|
log.error("webhook: failed to bind %s:%d: %s", host, port, exc)
|
||||||
|
|
||||||
|
|
||||||
|
@command("webhook", help="Show webhook listener status", admin=True)
|
||||||
|
async def cmd_webhook(bot, message):
|
||||||
|
"""Display webhook server status."""
|
||||||
|
if _server is None:
|
||||||
|
await bot.reply(message, "Webhook: not running")
|
||||||
|
return
|
||||||
|
|
||||||
|
socks = _server.sockets
|
||||||
|
if socks:
|
||||||
|
addr = socks[0].getsockname()
|
||||||
|
address = f"{addr[0]}:{addr[1]}"
|
||||||
|
else:
|
||||||
|
address = "unknown"
|
||||||
|
|
||||||
|
elapsed = int(time.monotonic() - _started)
|
||||||
|
hours, rem = divmod(elapsed, 3600)
|
||||||
|
minutes, secs = divmod(rem, 60)
|
||||||
|
parts = []
|
||||||
|
if hours:
|
||||||
|
parts.append(f"{hours}h")
|
||||||
|
if minutes:
|
||||||
|
parts.append(f"{minutes}m")
|
||||||
|
parts.append(f"{secs}s")
|
||||||
|
uptime = " ".join(parts)
|
||||||
|
|
||||||
|
await bot.reply(
|
||||||
|
message,
|
||||||
|
f"Webhook: {address} | {_request_count} requests | up {uptime}",
|
||||||
|
)
|
||||||
@@ -394,6 +394,8 @@ async def _poll_once(bot, key: str, announce: bool = True) -> None:
|
|||||||
for item in shown:
|
for item in shown:
|
||||||
title = _truncate(item["title"]) if item["title"] else "(no title)"
|
title = _truncate(item["title"]) if item["title"] else "(no title)"
|
||||||
link = item["link"]
|
link = item["link"]
|
||||||
|
if link:
|
||||||
|
link = await bot.shorten_url(link)
|
||||||
# Build metadata suffix
|
# Build metadata suffix
|
||||||
parts = []
|
parts = []
|
||||||
dur = durations.get(item["id"], 0)
|
dur = durations.get(item["id"], 0)
|
||||||
|
|||||||
@@ -13,7 +13,7 @@ from pathlib import Path
|
|||||||
|
|
||||||
from derp import __version__
|
from derp import __version__
|
||||||
from derp.irc import _MAX_IRC_LINE, IRCConnection, Message, format_msg, parse
|
from derp.irc import _MAX_IRC_LINE, IRCConnection, Message, format_msg, parse
|
||||||
from derp.plugin import Handler, PluginRegistry
|
from derp.plugin import TIERS, Handler, PluginRegistry
|
||||||
from derp.state import StateStore
|
from derp.state import StateStore
|
||||||
|
|
||||||
log = logging.getLogger(__name__)
|
log = logging.getLogger(__name__)
|
||||||
@@ -93,6 +93,8 @@ class Bot:
|
|||||||
self._tasks: set[asyncio.Task] = set()
|
self._tasks: set[asyncio.Task] = set()
|
||||||
self._reconnect_delay: float = 5.0
|
self._reconnect_delay: float = 5.0
|
||||||
self._admins: list[str] = config.get("bot", {}).get("admins", [])
|
self._admins: list[str] = config.get("bot", {}).get("admins", [])
|
||||||
|
self._operators: list[str] = config.get("bot", {}).get("operators", [])
|
||||||
|
self._trusted: list[str] = config.get("bot", {}).get("trusted", [])
|
||||||
self._opers: set[str] = set() # hostmasks of known IRC operators
|
self._opers: set[str] = set() # hostmasks of known IRC operators
|
||||||
self._caps: set[str] = set() # negotiated IRCv3 caps
|
self._caps: set[str] = set() # negotiated IRCv3 caps
|
||||||
self._who_pending: dict[str, asyncio.Task] = {} # debounced WHO per channel
|
self._who_pending: dict[str, asyncio.Task] = {} # debounced WHO per channel
|
||||||
@@ -359,20 +361,33 @@ class Bot:
|
|||||||
return True
|
return True
|
||||||
return plugin_name in allowed
|
return plugin_name in allowed
|
||||||
|
|
||||||
|
def _get_tier(self, msg: Message) -> str:
|
||||||
|
"""Determine the permission tier of the message sender.
|
||||||
|
|
||||||
|
Checks (in priority order): IRC operator, admin pattern,
|
||||||
|
operator pattern, trusted pattern. Falls back to ``"user"``.
|
||||||
|
"""
|
||||||
|
if not msg.prefix:
|
||||||
|
return "user"
|
||||||
|
if msg.prefix in self._opers:
|
||||||
|
return "admin"
|
||||||
|
for pattern in self._admins:
|
||||||
|
if fnmatch.fnmatch(msg.prefix, pattern):
|
||||||
|
return "admin"
|
||||||
|
for pattern in self._operators:
|
||||||
|
if fnmatch.fnmatch(msg.prefix, pattern):
|
||||||
|
return "oper"
|
||||||
|
for pattern in self._trusted:
|
||||||
|
if fnmatch.fnmatch(msg.prefix, pattern):
|
||||||
|
return "trusted"
|
||||||
|
return "user"
|
||||||
|
|
||||||
def _is_admin(self, msg: Message) -> bool:
|
def _is_admin(self, msg: Message) -> bool:
|
||||||
"""Check if the message sender is a bot admin.
|
"""Check if the message sender is a bot admin.
|
||||||
|
|
||||||
Returns True if the sender is a known IRC operator or matches
|
Thin wrapper around ``_get_tier`` for backward compatibility.
|
||||||
a configured hostmask pattern (fnmatch-style).
|
|
||||||
"""
|
"""
|
||||||
if not msg.prefix:
|
return self._get_tier(msg) == "admin"
|
||||||
return False
|
|
||||||
if msg.prefix in self._opers:
|
|
||||||
return True
|
|
||||||
for pattern in self._admins:
|
|
||||||
if fnmatch.fnmatch(msg.prefix, pattern):
|
|
||||||
return True
|
|
||||||
return False
|
|
||||||
|
|
||||||
def _dispatch_command(self, msg: Message) -> None:
|
def _dispatch_command(self, msg: Message) -> None:
|
||||||
"""Check if a PRIVMSG is a bot command and spawn it."""
|
"""Check if a PRIVMSG is a bot command and spawn it."""
|
||||||
@@ -396,10 +411,13 @@ class Bot:
|
|||||||
if not self._plugin_allowed(handler.plugin, channel):
|
if not self._plugin_allowed(handler.plugin, channel):
|
||||||
return
|
return
|
||||||
|
|
||||||
if handler.admin and not self._is_admin(msg):
|
required = handler.tier
|
||||||
deny = f"Permission denied: {self.prefix}{cmd_name} requires admin"
|
if required != "user":
|
||||||
self._spawn(self.reply(msg, deny), name=f"cmd:{cmd_name}:denied")
|
sender = self._get_tier(msg)
|
||||||
return
|
if TIERS.index(sender) < TIERS.index(required):
|
||||||
|
deny = f"Permission denied: {self.prefix}{cmd_name} requires {required}"
|
||||||
|
self._spawn(self.reply(msg, deny), name=f"cmd:{cmd_name}:denied")
|
||||||
|
return
|
||||||
|
|
||||||
self._spawn(self._run_command(handler, cmd_name, msg), name=f"cmd:{cmd_name}")
|
self._spawn(self._run_command(handler, cmd_name, msg), name=f"cmd:{cmd_name}")
|
||||||
|
|
||||||
@@ -510,6 +528,17 @@ class Bot:
|
|||||||
"""Send a CTCP ACTION (/me) to a target."""
|
"""Send a CTCP ACTION (/me) to a target."""
|
||||||
await self.send(target, f"\x01ACTION {text}\x01")
|
await self.send(target, f"\x01ACTION {text}\x01")
|
||||||
|
|
||||||
|
async def shorten_url(self, url: str) -> str:
|
||||||
|
"""Shorten a URL via FlaskPaste. Returns original on failure."""
|
||||||
|
fp = self.registry._modules.get("flaskpaste")
|
||||||
|
if not fp:
|
||||||
|
return url
|
||||||
|
loop = asyncio.get_running_loop()
|
||||||
|
try:
|
||||||
|
return await loop.run_in_executor(None, fp.shorten_url, self, url)
|
||||||
|
except Exception:
|
||||||
|
return url
|
||||||
|
|
||||||
async def join(self, channel: str) -> None:
|
async def join(self, channel: str) -> None:
|
||||||
"""Join an IRC channel."""
|
"""Join an IRC channel."""
|
||||||
await self.conn.send(format_msg("JOIN", channel))
|
await self.conn.send(format_msg("JOIN", channel))
|
||||||
|
|||||||
@@ -29,8 +29,16 @@ DEFAULTS: dict = {
|
|||||||
"rate_burst": 5,
|
"rate_burst": 5,
|
||||||
"paste_threshold": 4,
|
"paste_threshold": 4,
|
||||||
"admins": [],
|
"admins": [],
|
||||||
|
"operators": [],
|
||||||
|
"trusted": [],
|
||||||
},
|
},
|
||||||
"channels": {},
|
"channels": {},
|
||||||
|
"webhook": {
|
||||||
|
"enabled": False,
|
||||||
|
"host": "127.0.0.1",
|
||||||
|
"port": 8080,
|
||||||
|
"secret": "",
|
||||||
|
},
|
||||||
"logging": {
|
"logging": {
|
||||||
"level": "info",
|
"level": "info",
|
||||||
"format": "text",
|
"format": "text",
|
||||||
|
|||||||
@@ -12,6 +12,8 @@ from typing import Any, Callable
|
|||||||
|
|
||||||
log = logging.getLogger(__name__)
|
log = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
TIERS: tuple[str, ...] = ("user", "trusted", "oper", "admin")
|
||||||
|
|
||||||
|
|
||||||
@dataclass(slots=True)
|
@dataclass(slots=True)
|
||||||
class Handler:
|
class Handler:
|
||||||
@@ -22,9 +24,10 @@ class Handler:
|
|||||||
help: str = ""
|
help: str = ""
|
||||||
plugin: str = ""
|
plugin: str = ""
|
||||||
admin: bool = False
|
admin: bool = False
|
||||||
|
tier: str = "user"
|
||||||
|
|
||||||
|
|
||||||
def command(name: str, help: str = "", admin: bool = False) -> Callable:
|
def command(name: str, help: str = "", admin: bool = False, tier: str = "") -> Callable:
|
||||||
"""Decorator to register an async function as a bot command.
|
"""Decorator to register an async function as a bot command.
|
||||||
|
|
||||||
Usage::
|
Usage::
|
||||||
@@ -36,12 +39,17 @@ def command(name: str, help: str = "", admin: bool = False) -> Callable:
|
|||||||
@command("reload", help="Reload a plugin", admin=True)
|
@command("reload", help="Reload a plugin", admin=True)
|
||||||
async def cmd_reload(bot, message):
|
async def cmd_reload(bot, message):
|
||||||
...
|
...
|
||||||
|
|
||||||
|
@command("trusted_cmd", help="Trusted-only", tier="trusted")
|
||||||
|
async def cmd_trusted(bot, message):
|
||||||
|
...
|
||||||
"""
|
"""
|
||||||
|
|
||||||
def decorator(func: Callable) -> Callable:
|
def decorator(func: Callable) -> Callable:
|
||||||
func._derp_command = name # type: ignore[attr-defined]
|
func._derp_command = name # type: ignore[attr-defined]
|
||||||
func._derp_help = help # type: ignore[attr-defined]
|
func._derp_help = help # type: ignore[attr-defined]
|
||||||
func._derp_admin = admin # type: ignore[attr-defined]
|
func._derp_admin = admin # type: ignore[attr-defined]
|
||||||
|
func._derp_tier = tier if tier else ("admin" if admin else "user") # type: ignore[attr-defined]
|
||||||
return func
|
return func
|
||||||
|
|
||||||
return decorator
|
return decorator
|
||||||
@@ -74,12 +82,14 @@ class PluginRegistry:
|
|||||||
self._paths: dict[str, Path] = {}
|
self._paths: dict[str, Path] = {}
|
||||||
|
|
||||||
def register_command(self, name: str, callback: Callable, help: str = "",
|
def register_command(self, name: str, callback: Callable, help: str = "",
|
||||||
plugin: str = "", admin: bool = False) -> None:
|
plugin: str = "", admin: bool = False,
|
||||||
|
tier: str = "user") -> None:
|
||||||
"""Register a command handler."""
|
"""Register a command handler."""
|
||||||
if name in self.commands:
|
if name in self.commands:
|
||||||
log.warning("command '%s' already registered, overwriting", name)
|
log.warning("command '%s' already registered, overwriting", name)
|
||||||
self.commands[name] = Handler(
|
self.commands[name] = Handler(
|
||||||
name=name, callback=callback, help=help, plugin=plugin, admin=admin,
|
name=name, callback=callback, help=help, plugin=plugin,
|
||||||
|
admin=admin, tier=tier,
|
||||||
)
|
)
|
||||||
log.debug("registered command: %s (%s)", name, plugin)
|
log.debug("registered command: %s (%s)", name, plugin)
|
||||||
|
|
||||||
@@ -102,6 +112,7 @@ class PluginRegistry:
|
|||||||
help=getattr(obj, "_derp_help", ""),
|
help=getattr(obj, "_derp_help", ""),
|
||||||
plugin=plugin_name,
|
plugin=plugin_name,
|
||||||
admin=getattr(obj, "_derp_admin", False),
|
admin=getattr(obj, "_derp_admin", False),
|
||||||
|
tier=getattr(obj, "_derp_tier", "user"),
|
||||||
)
|
)
|
||||||
count += 1
|
count += 1
|
||||||
if hasattr(obj, "_derp_event"):
|
if hasattr(obj, "_derp_event"):
|
||||||
|
|||||||
409
tests/test_acl.py
Normal file
409
tests/test_acl.py
Normal file
@@ -0,0 +1,409 @@
|
|||||||
|
"""Tests for the granular ACL tier system."""
|
||||||
|
|
||||||
|
import asyncio
|
||||||
|
from pathlib import Path
|
||||||
|
|
||||||
|
from derp.bot import Bot
|
||||||
|
from derp.config import DEFAULTS
|
||||||
|
from derp.irc import Message, parse
|
||||||
|
from derp.plugin import TIERS, PluginRegistry, command
|
||||||
|
|
||||||
|
# -- Helpers -----------------------------------------------------------------
|
||||||
|
|
||||||
|
|
||||||
|
class _MockConnection:
|
||||||
|
"""Minimal mock IRC connection for dispatch tests."""
|
||||||
|
|
||||||
|
def __init__(self) -> None:
|
||||||
|
self._queue: asyncio.Queue[str | None] = asyncio.Queue()
|
||||||
|
self.sent: list[str] = []
|
||||||
|
|
||||||
|
async def connect(self) -> None:
|
||||||
|
pass
|
||||||
|
|
||||||
|
async def close(self) -> None:
|
||||||
|
pass
|
||||||
|
|
||||||
|
async def send(self, line: str) -> None:
|
||||||
|
self.sent.append(line)
|
||||||
|
|
||||||
|
async def readline(self) -> str | None:
|
||||||
|
return await self._queue.get()
|
||||||
|
|
||||||
|
def inject(self, line: str) -> None:
|
||||||
|
self._queue.put_nowait(line)
|
||||||
|
|
||||||
|
def disconnect(self) -> None:
|
||||||
|
self._queue.put_nowait(None)
|
||||||
|
|
||||||
|
|
||||||
|
class _Harness:
|
||||||
|
"""Test fixture: Bot with mock connection and configurable tiers."""
|
||||||
|
|
||||||
|
def __init__(
|
||||||
|
self,
|
||||||
|
admins: list[str] | None = None,
|
||||||
|
operators: list[str] | None = None,
|
||||||
|
trusted: list[str] | None = None,
|
||||||
|
) -> None:
|
||||||
|
config: dict = {
|
||||||
|
"server": {
|
||||||
|
"host": "localhost", "port": 6667, "tls": False,
|
||||||
|
"nick": "test", "user": "test", "realname": "test bot",
|
||||||
|
},
|
||||||
|
"bot": {
|
||||||
|
"prefix": "!",
|
||||||
|
"channels": [],
|
||||||
|
"plugins_dir": "plugins",
|
||||||
|
"admins": admins or [],
|
||||||
|
"operators": operators or [],
|
||||||
|
"trusted": trusted or [],
|
||||||
|
"rate_limit": 100.0,
|
||||||
|
"rate_burst": 100,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
self.registry = PluginRegistry()
|
||||||
|
self.bot = Bot(config, self.registry)
|
||||||
|
self.conn = _MockConnection()
|
||||||
|
self.bot.conn = self.conn # type: ignore[assignment]
|
||||||
|
self.registry.load_plugin(Path("plugins/core.py"))
|
||||||
|
|
||||||
|
def inject_registration(self) -> None:
|
||||||
|
self.conn.inject(":server CAP * LS :")
|
||||||
|
self.conn.inject(":server 001 test :Welcome")
|
||||||
|
|
||||||
|
def privmsg(self, nick: str, target: str, text: str,
|
||||||
|
user: str = "user", host: str = "host") -> None:
|
||||||
|
self.conn.inject(f":{nick}!{user}@{host} PRIVMSG {target} :{text}")
|
||||||
|
|
||||||
|
async def run(self) -> None:
|
||||||
|
self.conn.disconnect()
|
||||||
|
self.bot._running = True
|
||||||
|
await self.bot._connect_and_run()
|
||||||
|
if self.bot._tasks:
|
||||||
|
await asyncio.gather(*list(self.bot._tasks), return_exceptions=True)
|
||||||
|
|
||||||
|
def sent_privmsgs(self, target: str) -> list[str]:
|
||||||
|
results = []
|
||||||
|
for line in self.conn.sent:
|
||||||
|
msg = parse(line)
|
||||||
|
if msg.command == "PRIVMSG" and msg.target == target:
|
||||||
|
results.append(msg.text)
|
||||||
|
return results
|
||||||
|
|
||||||
|
|
||||||
|
def _msg(text: str, prefix: str = "nick!user@host") -> Message:
|
||||||
|
nick = prefix.split("!")[0] if "!" in prefix else prefix
|
||||||
|
return Message(
|
||||||
|
raw="", prefix=prefix, nick=nick,
|
||||||
|
command="PRIVMSG", params=["#test", text], tags={},
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# TestTierConstants
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
|
||||||
|
class TestTierConstants:
|
||||||
|
def test_tier_order(self):
|
||||||
|
assert TIERS == ("user", "trusted", "oper", "admin")
|
||||||
|
|
||||||
|
def test_index_comparison(self):
|
||||||
|
assert TIERS.index("user") < TIERS.index("trusted")
|
||||||
|
assert TIERS.index("trusted") < TIERS.index("oper")
|
||||||
|
assert TIERS.index("oper") < TIERS.index("admin")
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# TestGetTier
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
|
||||||
|
class TestGetTier:
|
||||||
|
def test_no_prefix(self):
|
||||||
|
h = _Harness()
|
||||||
|
msg = Message(raw="", prefix="", nick="", command="PRIVMSG",
|
||||||
|
params=["#test", "hi"], tags={})
|
||||||
|
assert h.bot._get_tier(msg) == "user"
|
||||||
|
|
||||||
|
def test_ircop(self):
|
||||||
|
h = _Harness()
|
||||||
|
h.bot._opers.add("op!root@server")
|
||||||
|
msg = _msg("test", prefix="op!root@server")
|
||||||
|
assert h.bot._get_tier(msg) == "admin"
|
||||||
|
|
||||||
|
def test_admin_pattern(self):
|
||||||
|
h = _Harness(admins=["*!*@admin.host"])
|
||||||
|
msg = _msg("test", prefix="alice!user@admin.host")
|
||||||
|
assert h.bot._get_tier(msg) == "admin"
|
||||||
|
|
||||||
|
def test_oper_pattern(self):
|
||||||
|
h = _Harness(operators=["*!*@oper.host"])
|
||||||
|
msg = _msg("test", prefix="bob!user@oper.host")
|
||||||
|
assert h.bot._get_tier(msg) == "oper"
|
||||||
|
|
||||||
|
def test_trusted_pattern(self):
|
||||||
|
h = _Harness(trusted=["*!*@trusted.host"])
|
||||||
|
msg = _msg("test", prefix="carol!user@trusted.host")
|
||||||
|
assert h.bot._get_tier(msg) == "trusted"
|
||||||
|
|
||||||
|
def test_no_match(self):
|
||||||
|
h = _Harness(admins=["*!*@admin.host"])
|
||||||
|
msg = _msg("test", prefix="nobody!user@random.host")
|
||||||
|
assert h.bot._get_tier(msg) == "user"
|
||||||
|
|
||||||
|
def test_priority_admin_over_oper(self):
|
||||||
|
"""Admin patterns take priority over operator patterns."""
|
||||||
|
h = _Harness(admins=["*!*@dual.host"], operators=["*!*@dual.host"])
|
||||||
|
msg = _msg("test", prefix="x!y@dual.host")
|
||||||
|
assert h.bot._get_tier(msg) == "admin"
|
||||||
|
|
||||||
|
def test_priority_oper_over_trusted(self):
|
||||||
|
"""Operator patterns take priority over trusted patterns."""
|
||||||
|
h = _Harness(operators=["*!*@dual.host"], trusted=["*!*@dual.host"])
|
||||||
|
msg = _msg("test", prefix="x!y@dual.host")
|
||||||
|
assert h.bot._get_tier(msg) == "oper"
|
||||||
|
|
||||||
|
def test_ircop_over_admin_pattern(self):
|
||||||
|
"""IRC operator status takes priority over admin hostmask pattern."""
|
||||||
|
h = _Harness(admins=["*!*@server"])
|
||||||
|
h.bot._opers.add("op!root@server")
|
||||||
|
msg = _msg("test", prefix="op!root@server")
|
||||||
|
# Both match, but ircop check comes first
|
||||||
|
assert h.bot._get_tier(msg) == "admin"
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# TestIsAdminBackcompat
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
|
||||||
|
class TestIsAdminBackcompat:
|
||||||
|
def test_admin_true(self):
|
||||||
|
h = _Harness(admins=["*!*@admin.host"])
|
||||||
|
msg = _msg("test", prefix="a!b@admin.host")
|
||||||
|
assert h.bot._is_admin(msg) is True
|
||||||
|
|
||||||
|
def test_oper_false(self):
|
||||||
|
h = _Harness(operators=["*!*@oper.host"])
|
||||||
|
msg = _msg("test", prefix="a!b@oper.host")
|
||||||
|
assert h.bot._is_admin(msg) is False
|
||||||
|
|
||||||
|
def test_trusted_false(self):
|
||||||
|
h = _Harness(trusted=["*!*@trusted.host"])
|
||||||
|
msg = _msg("test", prefix="a!b@trusted.host")
|
||||||
|
assert h.bot._is_admin(msg) is False
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# TestCommandDecorator
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
|
||||||
|
class TestCommandDecorator:
|
||||||
|
def test_admin_true_sets_tier(self):
|
||||||
|
@command("x", admin=True)
|
||||||
|
async def handler(bot, msg):
|
||||||
|
pass
|
||||||
|
assert handler._derp_tier == "admin"
|
||||||
|
|
||||||
|
def test_explicit_tier(self):
|
||||||
|
@command("x", tier="trusted")
|
||||||
|
async def handler(bot, msg):
|
||||||
|
pass
|
||||||
|
assert handler._derp_tier == "trusted"
|
||||||
|
|
||||||
|
def test_tier_overrides_admin(self):
|
||||||
|
@command("x", admin=True, tier="oper")
|
||||||
|
async def handler(bot, msg):
|
||||||
|
pass
|
||||||
|
assert handler._derp_tier == "oper"
|
||||||
|
|
||||||
|
def test_default_tier(self):
|
||||||
|
@command("x")
|
||||||
|
async def handler(bot, msg):
|
||||||
|
pass
|
||||||
|
assert handler._derp_tier == "user"
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# TestDispatchWithTiers
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
|
||||||
|
class TestDispatchWithTiers:
|
||||||
|
def test_trusted_allowed_for_trusted_cmd(self):
|
||||||
|
"""Trusted user can run a trusted-tier command."""
|
||||||
|
h = _Harness(trusted=["*!user@host"])
|
||||||
|
|
||||||
|
@command("tcmd", tier="trusted")
|
||||||
|
async def cmd_tcmd(bot, message):
|
||||||
|
await bot.reply(message, "ok")
|
||||||
|
|
||||||
|
h.registry.register_command(
|
||||||
|
"tcmd", cmd_tcmd, tier="trusted", plugin="test",
|
||||||
|
)
|
||||||
|
h.inject_registration()
|
||||||
|
h.privmsg("nick", "#test", "!tcmd")
|
||||||
|
asyncio.run(h.run())
|
||||||
|
msgs = h.sent_privmsgs("#test")
|
||||||
|
assert any("ok" in m for m in msgs)
|
||||||
|
|
||||||
|
def test_admin_allowed_for_trusted_cmd(self):
|
||||||
|
"""Admin can run trusted-tier commands (higher tier)."""
|
||||||
|
h = _Harness(admins=["*!user@host"])
|
||||||
|
|
||||||
|
@command("tcmd", tier="trusted")
|
||||||
|
async def cmd_tcmd(bot, message):
|
||||||
|
await bot.reply(message, "ok")
|
||||||
|
|
||||||
|
h.registry.register_command(
|
||||||
|
"tcmd", cmd_tcmd, tier="trusted", plugin="test",
|
||||||
|
)
|
||||||
|
h.inject_registration()
|
||||||
|
h.privmsg("nick", "#test", "!tcmd")
|
||||||
|
asyncio.run(h.run())
|
||||||
|
msgs = h.sent_privmsgs("#test")
|
||||||
|
assert any("ok" in m for m in msgs)
|
||||||
|
|
||||||
|
def test_user_denied_for_trusted_cmd(self):
|
||||||
|
"""Regular user is denied a trusted-tier command."""
|
||||||
|
h = _Harness()
|
||||||
|
|
||||||
|
@command("tcmd", tier="trusted")
|
||||||
|
async def cmd_tcmd(bot, message):
|
||||||
|
await bot.reply(message, "ok")
|
||||||
|
|
||||||
|
h.registry.register_command(
|
||||||
|
"tcmd", cmd_tcmd, tier="trusted", plugin="test",
|
||||||
|
)
|
||||||
|
h.inject_registration()
|
||||||
|
h.privmsg("nick", "#test", "!tcmd")
|
||||||
|
asyncio.run(h.run())
|
||||||
|
msgs = h.sent_privmsgs("#test")
|
||||||
|
assert any("Permission denied" in m for m in msgs)
|
||||||
|
assert any("requires trusted" in m for m in msgs)
|
||||||
|
|
||||||
|
def test_backward_compat_admin_flag(self):
|
||||||
|
"""admin=True commands still work via tier='admin'."""
|
||||||
|
h = _Harness(admins=["*!user@host"])
|
||||||
|
h.inject_registration()
|
||||||
|
# cmd_admins is already registered via core plugin with admin=True
|
||||||
|
h.privmsg("nick", "#test", "!admins")
|
||||||
|
asyncio.run(h.run())
|
||||||
|
msgs = h.sent_privmsgs("#test")
|
||||||
|
assert any("Admin:" in m for m in msgs)
|
||||||
|
|
||||||
|
def test_denial_message_shows_tier(self):
|
||||||
|
"""Permission denial message includes the required tier name."""
|
||||||
|
h = _Harness()
|
||||||
|
|
||||||
|
@command("opcmd", tier="oper")
|
||||||
|
async def cmd_opcmd(bot, message):
|
||||||
|
await bot.reply(message, "ok")
|
||||||
|
|
||||||
|
h.registry.register_command(
|
||||||
|
"opcmd", cmd_opcmd, tier="oper", plugin="test",
|
||||||
|
)
|
||||||
|
h.inject_registration()
|
||||||
|
h.privmsg("nick", "#test", "!opcmd")
|
||||||
|
asyncio.run(h.run())
|
||||||
|
msgs = h.sent_privmsgs("#test")
|
||||||
|
assert any("requires oper" in m for m in msgs)
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# TestConfigDefaults
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
|
||||||
|
class TestConfigDefaults:
|
||||||
|
def test_operators_in_defaults(self):
|
||||||
|
assert "operators" in DEFAULTS["bot"]
|
||||||
|
assert DEFAULTS["bot"]["operators"] == []
|
||||||
|
|
||||||
|
def test_trusted_in_defaults(self):
|
||||||
|
assert "trusted" in DEFAULTS["bot"]
|
||||||
|
assert DEFAULTS["bot"]["trusted"] == []
|
||||||
|
|
||||||
|
def test_webhook_in_defaults(self):
|
||||||
|
assert "webhook" in DEFAULTS
|
||||||
|
assert DEFAULTS["webhook"]["enabled"] is False
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# TestWhoami
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
|
||||||
|
class TestWhoami:
|
||||||
|
def test_shows_admin(self):
|
||||||
|
h = _Harness(admins=["*!user@host"])
|
||||||
|
h.inject_registration()
|
||||||
|
h.privmsg("nick", "#test", "!whoami")
|
||||||
|
asyncio.run(h.run())
|
||||||
|
msgs = h.sent_privmsgs("#test")
|
||||||
|
assert any("admin" in m for m in msgs)
|
||||||
|
|
||||||
|
def test_shows_trusted(self):
|
||||||
|
h = _Harness(trusted=["*!user@host"])
|
||||||
|
h.inject_registration()
|
||||||
|
h.privmsg("nick", "#test", "!whoami")
|
||||||
|
asyncio.run(h.run())
|
||||||
|
msgs = h.sent_privmsgs("#test")
|
||||||
|
assert any("trusted" in m for m in msgs)
|
||||||
|
|
||||||
|
def test_shows_user(self):
|
||||||
|
h = _Harness()
|
||||||
|
h.inject_registration()
|
||||||
|
h.privmsg("nick", "#test", "!whoami")
|
||||||
|
asyncio.run(h.run())
|
||||||
|
msgs = h.sent_privmsgs("#test")
|
||||||
|
assert any("user" in m for m in msgs)
|
||||||
|
|
||||||
|
def test_shows_ircop_tag(self):
|
||||||
|
h = _Harness()
|
||||||
|
h.bot._opers.add("nick!user@host")
|
||||||
|
h.inject_registration()
|
||||||
|
h.privmsg("nick", "#test", "!whoami")
|
||||||
|
asyncio.run(h.run())
|
||||||
|
msgs = h.sent_privmsgs("#test")
|
||||||
|
assert any("IRCOP" in m for m in msgs)
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# TestAdmins
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
|
||||||
|
class TestAdmins:
|
||||||
|
def test_shows_all_tiers(self):
|
||||||
|
h = _Harness(
|
||||||
|
admins=["*!*@admin.host"],
|
||||||
|
operators=["*!*@oper.host"],
|
||||||
|
trusted=["*!*@trusted.host"],
|
||||||
|
)
|
||||||
|
h.bot._opers.add("nick!user@host")
|
||||||
|
h.inject_registration()
|
||||||
|
# Must be admin to run !admins
|
||||||
|
h.privmsg("nick", "#test", "!admins", user="x", host="admin.host")
|
||||||
|
asyncio.run(h.run())
|
||||||
|
msgs = h.sent_privmsgs("#test")
|
||||||
|
combined = " ".join(msgs)
|
||||||
|
assert "Admin:" in combined
|
||||||
|
assert "Oper:" in combined
|
||||||
|
assert "Trusted:" in combined
|
||||||
|
assert "IRCOPs:" in combined
|
||||||
|
|
||||||
|
def test_omits_empty_tiers(self):
|
||||||
|
h = _Harness(admins=["*!user@host"])
|
||||||
|
h.inject_registration()
|
||||||
|
h.privmsg("nick", "#test", "!admins")
|
||||||
|
asyncio.run(h.run())
|
||||||
|
msgs = h.sent_privmsgs("#test")
|
||||||
|
combined = " ".join(msgs)
|
||||||
|
assert "Admin:" in combined
|
||||||
|
# No operators or trusted configured, so those sections shouldn't appear
|
||||||
|
assert "Oper:" not in combined
|
||||||
|
assert "Trusted:" not in combined
|
||||||
639
tests/test_cron.py
Normal file
639
tests/test_cron.py
Normal file
@@ -0,0 +1,639 @@
|
|||||||
|
"""Tests for the cron plugin."""
|
||||||
|
|
||||||
|
import asyncio
|
||||||
|
import importlib.util
|
||||||
|
import json
|
||||||
|
import sys
|
||||||
|
from pathlib import Path
|
||||||
|
|
||||||
|
from derp.irc import Message
|
||||||
|
|
||||||
|
# plugins/ is not a Python package -- load the module from file path
|
||||||
|
_spec = importlib.util.spec_from_file_location(
|
||||||
|
"plugins.cron", Path(__file__).resolve().parent.parent / "plugins" / "cron.py",
|
||||||
|
)
|
||||||
|
_mod = importlib.util.module_from_spec(_spec)
|
||||||
|
sys.modules[_spec.name] = _mod
|
||||||
|
_spec.loader.exec_module(_mod)
|
||||||
|
|
||||||
|
from plugins.cron import ( # noqa: E402
|
||||||
|
_MAX_JOBS,
|
||||||
|
_delete,
|
||||||
|
_format_duration,
|
||||||
|
_jobs,
|
||||||
|
_load,
|
||||||
|
_make_id,
|
||||||
|
_parse_duration,
|
||||||
|
_restore,
|
||||||
|
_save,
|
||||||
|
_start_job,
|
||||||
|
_state_key,
|
||||||
|
_stop_job,
|
||||||
|
_tasks,
|
||||||
|
cmd_cron,
|
||||||
|
on_connect,
|
||||||
|
)
|
||||||
|
|
||||||
|
# -- Helpers -----------------------------------------------------------------
|
||||||
|
|
||||||
|
class _FakeState:
|
||||||
|
"""In-memory stand-in for bot.state."""
|
||||||
|
|
||||||
|
def __init__(self):
|
||||||
|
self._store: dict[str, dict[str, str]] = {}
|
||||||
|
|
||||||
|
def get(self, plugin: str, key: str, default: str | None = None) -> str | None:
|
||||||
|
return self._store.get(plugin, {}).get(key, default)
|
||||||
|
|
||||||
|
def set(self, plugin: str, key: str, value: str) -> None:
|
||||||
|
self._store.setdefault(plugin, {})[key] = value
|
||||||
|
|
||||||
|
def delete(self, plugin: str, key: str) -> bool:
|
||||||
|
try:
|
||||||
|
del self._store[plugin][key]
|
||||||
|
return True
|
||||||
|
except KeyError:
|
||||||
|
return False
|
||||||
|
|
||||||
|
def keys(self, plugin: str) -> list[str]:
|
||||||
|
return sorted(self._store.get(plugin, {}).keys())
|
||||||
|
|
||||||
|
|
||||||
|
class _FakeBot:
|
||||||
|
"""Minimal bot stand-in that captures sent/replied messages."""
|
||||||
|
|
||||||
|
def __init__(self, *, admin: bool = True):
|
||||||
|
self.sent: list[tuple[str, str]] = []
|
||||||
|
self.replied: list[str] = []
|
||||||
|
self.dispatched: list[Message] = []
|
||||||
|
self.state = _FakeState()
|
||||||
|
self._admin = admin
|
||||||
|
self.prefix = "!"
|
||||||
|
|
||||||
|
async def send(self, target: str, text: str) -> None:
|
||||||
|
self.sent.append((target, text))
|
||||||
|
|
||||||
|
async def reply(self, message, text: str) -> None:
|
||||||
|
self.replied.append(text)
|
||||||
|
|
||||||
|
def _is_admin(self, message) -> bool:
|
||||||
|
return self._admin
|
||||||
|
|
||||||
|
def _dispatch_command(self, msg: Message) -> None:
|
||||||
|
self.dispatched.append(msg)
|
||||||
|
|
||||||
|
|
||||||
|
def _msg(text: str, nick: str = "admin", target: str = "#test") -> Message:
|
||||||
|
"""Create a channel PRIVMSG."""
|
||||||
|
return Message(
|
||||||
|
raw="", prefix=f"{nick}!~{nick}@host", nick=nick,
|
||||||
|
command="PRIVMSG", params=[target, text], tags={},
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def _pm(text: str, nick: str = "admin") -> Message:
|
||||||
|
"""Create a private PRIVMSG."""
|
||||||
|
return Message(
|
||||||
|
raw="", prefix=f"{nick}!~{nick}@host", nick=nick,
|
||||||
|
command="PRIVMSG", params=["botname", text], tags={},
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def _clear() -> None:
|
||||||
|
"""Reset module-level state between tests."""
|
||||||
|
for task in _tasks.values():
|
||||||
|
if task and not task.done():
|
||||||
|
task.cancel()
|
||||||
|
_tasks.clear()
|
||||||
|
_jobs.clear()
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# TestParseDuration
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
class TestParseDuration:
|
||||||
|
def test_minutes(self):
|
||||||
|
assert _parse_duration("5m") == 300
|
||||||
|
|
||||||
|
def test_hours(self):
|
||||||
|
assert _parse_duration("1h") == 3600
|
||||||
|
|
||||||
|
def test_combined(self):
|
||||||
|
assert _parse_duration("1h30m") == 5400
|
||||||
|
|
||||||
|
def test_days(self):
|
||||||
|
assert _parse_duration("2d") == 172800
|
||||||
|
|
||||||
|
def test_seconds(self):
|
||||||
|
assert _parse_duration("90s") == 90
|
||||||
|
|
||||||
|
def test_raw_int(self):
|
||||||
|
assert _parse_duration("300") == 300
|
||||||
|
|
||||||
|
def test_zero(self):
|
||||||
|
assert _parse_duration("0") is None
|
||||||
|
|
||||||
|
def test_negative(self):
|
||||||
|
assert _parse_duration("-5") is None
|
||||||
|
|
||||||
|
def test_invalid(self):
|
||||||
|
assert _parse_duration("abc") is None
|
||||||
|
|
||||||
|
def test_empty(self):
|
||||||
|
assert _parse_duration("") is None
|
||||||
|
|
||||||
|
def test_full_combo(self):
|
||||||
|
assert _parse_duration("1d2h3m4s") == 86400 + 7200 + 180 + 4
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# TestFormatDuration
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
class TestFormatDuration:
|
||||||
|
def test_seconds(self):
|
||||||
|
assert _format_duration(45) == "45s"
|
||||||
|
|
||||||
|
def test_minutes(self):
|
||||||
|
assert _format_duration(300) == "5m"
|
||||||
|
|
||||||
|
def test_hours(self):
|
||||||
|
assert _format_duration(3600) == "1h"
|
||||||
|
|
||||||
|
def test_combined(self):
|
||||||
|
assert _format_duration(5400) == "1h30m"
|
||||||
|
|
||||||
|
def test_days(self):
|
||||||
|
assert _format_duration(86400) == "1d"
|
||||||
|
|
||||||
|
def test_zero(self):
|
||||||
|
assert _format_duration(0) == "0s"
|
||||||
|
|
||||||
|
def test_full_combo(self):
|
||||||
|
assert _format_duration(90061) == "1d1h1m1s"
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# TestMakeId
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
class TestMakeId:
|
||||||
|
def test_returns_hex(self):
|
||||||
|
result = _make_id("#test", "!ping")
|
||||||
|
assert len(result) == 6
|
||||||
|
int(result, 16) # should not raise
|
||||||
|
|
||||||
|
def test_unique(self):
|
||||||
|
ids = {_make_id("#test", f"!cmd{i}") for i in range(10)}
|
||||||
|
assert len(ids) == 10
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# TestStateHelpers
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
class TestStateHelpers:
|
||||||
|
def test_save_and_load(self):
|
||||||
|
bot = _FakeBot()
|
||||||
|
data = {"id": "abc123", "channel": "#test"}
|
||||||
|
_save(bot, "#test:abc123", data)
|
||||||
|
loaded = _load(bot, "#test:abc123")
|
||||||
|
assert loaded == data
|
||||||
|
|
||||||
|
def test_load_missing(self):
|
||||||
|
bot = _FakeBot()
|
||||||
|
assert _load(bot, "nonexistent") is None
|
||||||
|
|
||||||
|
def test_delete(self):
|
||||||
|
bot = _FakeBot()
|
||||||
|
_save(bot, "#test:del", {"id": "del"})
|
||||||
|
_delete(bot, "#test:del")
|
||||||
|
assert _load(bot, "#test:del") is None
|
||||||
|
|
||||||
|
def test_state_key(self):
|
||||||
|
assert _state_key("#ops", "abc123") == "#ops:abc123"
|
||||||
|
|
||||||
|
def test_load_invalid_json(self):
|
||||||
|
bot = _FakeBot()
|
||||||
|
bot.state.set("cron", "bad", "not json{{{")
|
||||||
|
assert _load(bot, "bad") is None
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# TestCmdCronAdd
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
class TestCmdCronAdd:
|
||||||
|
def test_add_success(self):
|
||||||
|
_clear()
|
||||||
|
bot = _FakeBot()
|
||||||
|
|
||||||
|
async def inner():
|
||||||
|
await cmd_cron(bot, _msg("!cron add 5m #ops !rss check news"))
|
||||||
|
await asyncio.sleep(0)
|
||||||
|
assert len(bot.replied) == 1
|
||||||
|
assert "Cron #" in bot.replied[0]
|
||||||
|
assert "!rss check news" in bot.replied[0]
|
||||||
|
assert "5m" in bot.replied[0]
|
||||||
|
assert "#ops" in bot.replied[0]
|
||||||
|
# Verify state persisted
|
||||||
|
keys = bot.state.keys("cron")
|
||||||
|
assert len(keys) == 1
|
||||||
|
data = json.loads(bot.state.get("cron", keys[0]))
|
||||||
|
assert data["command"] == "!rss check news"
|
||||||
|
assert data["interval"] == 300
|
||||||
|
assert data["channel"] == "#ops"
|
||||||
|
# Verify task started
|
||||||
|
assert len(_tasks) == 1
|
||||||
|
_clear()
|
||||||
|
await asyncio.sleep(0)
|
||||||
|
|
||||||
|
asyncio.run(inner())
|
||||||
|
|
||||||
|
def test_add_requires_channel(self):
|
||||||
|
_clear()
|
||||||
|
bot = _FakeBot()
|
||||||
|
asyncio.run(cmd_cron(bot, _pm("!cron add 5m #ops !ping")))
|
||||||
|
assert "Use this command in a channel" in bot.replied[0]
|
||||||
|
|
||||||
|
def test_add_missing_args(self):
|
||||||
|
_clear()
|
||||||
|
bot = _FakeBot()
|
||||||
|
asyncio.run(cmd_cron(bot, _msg("!cron add 5m")))
|
||||||
|
assert "Usage:" in bot.replied[0]
|
||||||
|
|
||||||
|
def test_add_invalid_interval(self):
|
||||||
|
_clear()
|
||||||
|
bot = _FakeBot()
|
||||||
|
asyncio.run(cmd_cron(bot, _msg("!cron add abc #ops !ping")))
|
||||||
|
assert "Invalid interval" in bot.replied[0]
|
||||||
|
|
||||||
|
def test_add_interval_too_short(self):
|
||||||
|
_clear()
|
||||||
|
bot = _FakeBot()
|
||||||
|
asyncio.run(cmd_cron(bot, _msg("!cron add 30s #ops !ping")))
|
||||||
|
assert "Minimum interval" in bot.replied[0]
|
||||||
|
|
||||||
|
def test_add_interval_too_long(self):
|
||||||
|
_clear()
|
||||||
|
bot = _FakeBot()
|
||||||
|
asyncio.run(cmd_cron(bot, _msg("!cron add 30d #ops !ping")))
|
||||||
|
assert "Maximum interval" in bot.replied[0]
|
||||||
|
|
||||||
|
def test_add_bad_target(self):
|
||||||
|
_clear()
|
||||||
|
bot = _FakeBot()
|
||||||
|
asyncio.run(cmd_cron(bot, _msg("!cron add 5m alice !ping")))
|
||||||
|
assert "Target must be a channel" in bot.replied[0]
|
||||||
|
|
||||||
|
def test_add_job_limit(self):
|
||||||
|
_clear()
|
||||||
|
bot = _FakeBot()
|
||||||
|
for i in range(_MAX_JOBS):
|
||||||
|
_save(bot, f"#ops:job{i}", {"id": f"job{i}", "channel": "#ops"})
|
||||||
|
|
||||||
|
asyncio.run(cmd_cron(bot, _msg("!cron add 5m #ops !ping")))
|
||||||
|
assert "limit reached" in bot.replied[0]
|
||||||
|
|
||||||
|
def test_add_admin_required(self):
|
||||||
|
_clear()
|
||||||
|
bot = _FakeBot(admin=False)
|
||||||
|
asyncio.run(cmd_cron(bot, _msg("!cron add 5m #ops !ping")))
|
||||||
|
# The @command(admin=True) decorator handles this via bot._dispatch_command,
|
||||||
|
# but since we call cmd_cron directly, the check is at the decorator level.
|
||||||
|
# In direct call tests, admin check is already handled by the framework.
|
||||||
|
# This test just verifies the command runs without error for non-admin.
|
||||||
|
# Framework-level denial is tested in integration tests.
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# TestCmdCronDel
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
class TestCmdCronDel:
|
||||||
|
def test_del_success(self):
|
||||||
|
_clear()
|
||||||
|
bot = _FakeBot()
|
||||||
|
|
||||||
|
async def inner():
|
||||||
|
await cmd_cron(bot, _msg("!cron add 5m #test !ping"))
|
||||||
|
await asyncio.sleep(0)
|
||||||
|
# Extract ID from reply
|
||||||
|
reply = bot.replied[0]
|
||||||
|
cron_id = reply.split("#")[1].split(":")[0]
|
||||||
|
bot.replied.clear()
|
||||||
|
await cmd_cron(bot, _msg(f"!cron del {cron_id}"))
|
||||||
|
assert "Removed" in bot.replied[0]
|
||||||
|
assert cron_id in bot.replied[0]
|
||||||
|
assert len(bot.state.keys("cron")) == 0
|
||||||
|
_clear()
|
||||||
|
await asyncio.sleep(0)
|
||||||
|
|
||||||
|
asyncio.run(inner())
|
||||||
|
|
||||||
|
def test_del_nonexistent(self):
|
||||||
|
_clear()
|
||||||
|
bot = _FakeBot()
|
||||||
|
asyncio.run(cmd_cron(bot, _msg("!cron del nosuch")))
|
||||||
|
assert "No cron job" in bot.replied[0]
|
||||||
|
|
||||||
|
def test_del_missing_id(self):
|
||||||
|
_clear()
|
||||||
|
bot = _FakeBot()
|
||||||
|
asyncio.run(cmd_cron(bot, _msg("!cron del")))
|
||||||
|
assert "Usage:" in bot.replied[0]
|
||||||
|
|
||||||
|
def test_del_with_hash_prefix(self):
|
||||||
|
_clear()
|
||||||
|
bot = _FakeBot()
|
||||||
|
|
||||||
|
async def inner():
|
||||||
|
await cmd_cron(bot, _msg("!cron add 5m #test !ping"))
|
||||||
|
await asyncio.sleep(0)
|
||||||
|
reply = bot.replied[0]
|
||||||
|
cron_id = reply.split("#")[1].split(":")[0]
|
||||||
|
bot.replied.clear()
|
||||||
|
await cmd_cron(bot, _msg(f"!cron del #{cron_id}"))
|
||||||
|
assert "Removed" in bot.replied[0]
|
||||||
|
_clear()
|
||||||
|
await asyncio.sleep(0)
|
||||||
|
|
||||||
|
asyncio.run(inner())
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# TestCmdCronList
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
class TestCmdCronList:
|
||||||
|
def test_list_empty(self):
|
||||||
|
_clear()
|
||||||
|
bot = _FakeBot()
|
||||||
|
asyncio.run(cmd_cron(bot, _msg("!cron list")))
|
||||||
|
assert "No cron jobs" in bot.replied[0]
|
||||||
|
|
||||||
|
def test_list_populated(self):
|
||||||
|
_clear()
|
||||||
|
bot = _FakeBot()
|
||||||
|
_save(bot, "#test:abc123", {
|
||||||
|
"id": "abc123", "channel": "#test",
|
||||||
|
"command": "!rss check news", "interval": 300,
|
||||||
|
})
|
||||||
|
asyncio.run(cmd_cron(bot, _msg("!cron list")))
|
||||||
|
assert "#abc123" in bot.replied[0]
|
||||||
|
assert "5m" in bot.replied[0]
|
||||||
|
assert "!rss check news" in bot.replied[0]
|
||||||
|
|
||||||
|
def test_list_requires_channel(self):
|
||||||
|
_clear()
|
||||||
|
bot = _FakeBot()
|
||||||
|
asyncio.run(cmd_cron(bot, _pm("!cron list")))
|
||||||
|
assert "Use this command in a channel" in bot.replied[0]
|
||||||
|
|
||||||
|
def test_list_channel_isolation(self):
|
||||||
|
_clear()
|
||||||
|
bot = _FakeBot()
|
||||||
|
_save(bot, "#test:mine", {
|
||||||
|
"id": "mine", "channel": "#test",
|
||||||
|
"command": "!ping", "interval": 300,
|
||||||
|
})
|
||||||
|
_save(bot, "#other:theirs", {
|
||||||
|
"id": "theirs", "channel": "#other",
|
||||||
|
"command": "!ping", "interval": 600,
|
||||||
|
})
|
||||||
|
asyncio.run(cmd_cron(bot, _msg("!cron list")))
|
||||||
|
assert "mine" in bot.replied[0]
|
||||||
|
assert len(bot.replied) == 1 # only the #test job
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# TestCmdCronUsage
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
class TestCmdCronUsage:
|
||||||
|
def test_no_args(self):
|
||||||
|
_clear()
|
||||||
|
bot = _FakeBot()
|
||||||
|
asyncio.run(cmd_cron(bot, _msg("!cron")))
|
||||||
|
assert "Usage:" in bot.replied[0]
|
||||||
|
|
||||||
|
def test_unknown_subcommand(self):
|
||||||
|
_clear()
|
||||||
|
bot = _FakeBot()
|
||||||
|
asyncio.run(cmd_cron(bot, _msg("!cron foobar")))
|
||||||
|
assert "Usage:" in bot.replied[0]
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# TestRestore
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
class TestRestore:
|
||||||
|
def test_restore_spawns_tasks(self):
|
||||||
|
_clear()
|
||||||
|
bot = _FakeBot()
|
||||||
|
data = {
|
||||||
|
"id": "abc123", "channel": "#test",
|
||||||
|
"command": "!ping", "interval": 300,
|
||||||
|
"prefix": "admin!~admin@host", "nick": "admin",
|
||||||
|
"added_by": "admin",
|
||||||
|
}
|
||||||
|
_save(bot, "#test:abc123", data)
|
||||||
|
|
||||||
|
async def inner():
|
||||||
|
_restore(bot)
|
||||||
|
assert "#test:abc123" in _tasks
|
||||||
|
assert not _tasks["#test:abc123"].done()
|
||||||
|
_clear()
|
||||||
|
await asyncio.sleep(0)
|
||||||
|
|
||||||
|
asyncio.run(inner())
|
||||||
|
|
||||||
|
def test_restore_skips_active(self):
|
||||||
|
_clear()
|
||||||
|
bot = _FakeBot()
|
||||||
|
data = {
|
||||||
|
"id": "active", "channel": "#test",
|
||||||
|
"command": "!ping", "interval": 300,
|
||||||
|
"prefix": "admin!~admin@host", "nick": "admin",
|
||||||
|
"added_by": "admin",
|
||||||
|
}
|
||||||
|
_save(bot, "#test:active", data)
|
||||||
|
|
||||||
|
async def inner():
|
||||||
|
dummy = asyncio.create_task(asyncio.sleep(9999))
|
||||||
|
_tasks["#test:active"] = dummy
|
||||||
|
_restore(bot)
|
||||||
|
assert _tasks["#test:active"] is dummy
|
||||||
|
dummy.cancel()
|
||||||
|
await asyncio.sleep(0)
|
||||||
|
|
||||||
|
asyncio.run(inner())
|
||||||
|
|
||||||
|
def test_restore_replaces_done_task(self):
|
||||||
|
_clear()
|
||||||
|
bot = _FakeBot()
|
||||||
|
data = {
|
||||||
|
"id": "done", "channel": "#test",
|
||||||
|
"command": "!ping", "interval": 300,
|
||||||
|
"prefix": "admin!~admin@host", "nick": "admin",
|
||||||
|
"added_by": "admin",
|
||||||
|
}
|
||||||
|
_save(bot, "#test:done", data)
|
||||||
|
|
||||||
|
async def inner():
|
||||||
|
done_task = asyncio.create_task(asyncio.sleep(0))
|
||||||
|
await done_task
|
||||||
|
_tasks["#test:done"] = done_task
|
||||||
|
_restore(bot)
|
||||||
|
new_task = _tasks["#test:done"]
|
||||||
|
assert new_task is not done_task
|
||||||
|
assert not new_task.done()
|
||||||
|
_clear()
|
||||||
|
await asyncio.sleep(0)
|
||||||
|
|
||||||
|
asyncio.run(inner())
|
||||||
|
|
||||||
|
def test_restore_skips_bad_json(self):
|
||||||
|
_clear()
|
||||||
|
bot = _FakeBot()
|
||||||
|
bot.state.set("cron", "#test:bad", "not json{{{")
|
||||||
|
|
||||||
|
async def inner():
|
||||||
|
_restore(bot)
|
||||||
|
assert "#test:bad" not in _tasks
|
||||||
|
|
||||||
|
asyncio.run(inner())
|
||||||
|
|
||||||
|
def test_on_connect_calls_restore(self):
|
||||||
|
_clear()
|
||||||
|
bot = _FakeBot()
|
||||||
|
data = {
|
||||||
|
"id": "conn", "channel": "#test",
|
||||||
|
"command": "!ping", "interval": 300,
|
||||||
|
"prefix": "admin!~admin@host", "nick": "admin",
|
||||||
|
"added_by": "admin",
|
||||||
|
}
|
||||||
|
_save(bot, "#test:conn", data)
|
||||||
|
|
||||||
|
async def inner():
|
||||||
|
msg = _msg("", target="botname")
|
||||||
|
await on_connect(bot, msg)
|
||||||
|
assert "#test:conn" in _tasks
|
||||||
|
_clear()
|
||||||
|
await asyncio.sleep(0)
|
||||||
|
|
||||||
|
asyncio.run(inner())
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# TestCronLoop
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
class TestCronLoop:
|
||||||
|
def test_dispatches_command(self):
|
||||||
|
"""Cron loop dispatches a synthetic message after interval."""
|
||||||
|
_clear()
|
||||||
|
bot = _FakeBot()
|
||||||
|
|
||||||
|
async def inner():
|
||||||
|
data = {
|
||||||
|
"id": "loop1", "channel": "#test",
|
||||||
|
"command": "!ping", "interval": 0.05,
|
||||||
|
"prefix": "admin!~admin@host", "nick": "admin",
|
||||||
|
"added_by": "admin",
|
||||||
|
}
|
||||||
|
key = "#test:loop1"
|
||||||
|
_jobs[key] = data
|
||||||
|
_start_job(bot, key)
|
||||||
|
await asyncio.sleep(0.15)
|
||||||
|
_stop_job(key)
|
||||||
|
await asyncio.sleep(0)
|
||||||
|
# Should have dispatched at least once
|
||||||
|
assert len(bot.dispatched) >= 1
|
||||||
|
msg = bot.dispatched[0]
|
||||||
|
assert msg.nick == "admin"
|
||||||
|
assert msg.params[0] == "#test"
|
||||||
|
assert msg.params[1] == "!ping"
|
||||||
|
|
||||||
|
asyncio.run(inner())
|
||||||
|
|
||||||
|
def test_loop_stops_on_job_removal(self):
|
||||||
|
"""Cron loop exits when job is removed from _jobs."""
|
||||||
|
_clear()
|
||||||
|
bot = _FakeBot()
|
||||||
|
|
||||||
|
async def inner():
|
||||||
|
data = {
|
||||||
|
"id": "loop2", "channel": "#test",
|
||||||
|
"command": "!ping", "interval": 0.05,
|
||||||
|
"prefix": "admin!~admin@host", "nick": "admin",
|
||||||
|
"added_by": "admin",
|
||||||
|
}
|
||||||
|
key = "#test:loop2"
|
||||||
|
_jobs[key] = data
|
||||||
|
_start_job(bot, key)
|
||||||
|
await asyncio.sleep(0.02)
|
||||||
|
_jobs.pop(key, None)
|
||||||
|
await asyncio.sleep(0.1)
|
||||||
|
task = _tasks.get(key)
|
||||||
|
if task:
|
||||||
|
assert task.done()
|
||||||
|
|
||||||
|
asyncio.run(inner())
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# TestJobManagement
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
class TestJobManagement:
|
||||||
|
def test_start_and_stop(self):
|
||||||
|
_clear()
|
||||||
|
bot = _FakeBot()
|
||||||
|
data = {
|
||||||
|
"id": "mgmt", "channel": "#test",
|
||||||
|
"command": "!ping", "interval": 300,
|
||||||
|
"prefix": "admin!~admin@host", "nick": "admin",
|
||||||
|
"added_by": "admin",
|
||||||
|
}
|
||||||
|
key = "#test:mgmt"
|
||||||
|
_jobs[key] = data
|
||||||
|
|
||||||
|
async def inner():
|
||||||
|
_start_job(bot, key)
|
||||||
|
assert key in _tasks
|
||||||
|
assert not _tasks[key].done()
|
||||||
|
_stop_job(key)
|
||||||
|
await asyncio.sleep(0)
|
||||||
|
assert key not in _tasks
|
||||||
|
assert key not in _jobs
|
||||||
|
|
||||||
|
asyncio.run(inner())
|
||||||
|
|
||||||
|
def test_start_idempotent(self):
|
||||||
|
_clear()
|
||||||
|
bot = _FakeBot()
|
||||||
|
data = {
|
||||||
|
"id": "idem", "channel": "#test",
|
||||||
|
"command": "!ping", "interval": 300,
|
||||||
|
"prefix": "admin!~admin@host", "nick": "admin",
|
||||||
|
"added_by": "admin",
|
||||||
|
}
|
||||||
|
key = "#test:idem"
|
||||||
|
_jobs[key] = data
|
||||||
|
|
||||||
|
async def inner():
|
||||||
|
_start_job(bot, key)
|
||||||
|
first = _tasks[key]
|
||||||
|
_start_job(bot, key)
|
||||||
|
assert _tasks[key] is first
|
||||||
|
_stop_job(key)
|
||||||
|
await asyncio.sleep(0)
|
||||||
|
|
||||||
|
asyncio.run(inner())
|
||||||
|
|
||||||
|
def test_stop_nonexistent(self):
|
||||||
|
_clear()
|
||||||
|
_stop_job("#test:nonexistent")
|
||||||
212
tests/test_flaskpaste.py
Normal file
212
tests/test_flaskpaste.py
Normal file
@@ -0,0 +1,212 @@
|
|||||||
|
"""Tests for the FlaskPaste plugin and Bot.shorten_url method."""
|
||||||
|
|
||||||
|
import asyncio
|
||||||
|
import importlib.util
|
||||||
|
import sys
|
||||||
|
from pathlib import Path
|
||||||
|
from unittest.mock import patch
|
||||||
|
|
||||||
|
from derp.irc import Message
|
||||||
|
|
||||||
|
# plugins/ is not a Python package -- load the module from file path
|
||||||
|
_spec = importlib.util.spec_from_file_location(
|
||||||
|
"plugins.flaskpaste",
|
||||||
|
Path(__file__).resolve().parent.parent / "plugins" / "flaskpaste.py",
|
||||||
|
)
|
||||||
|
_mod = importlib.util.module_from_spec(_spec)
|
||||||
|
sys.modules[_spec.name] = _mod
|
||||||
|
_spec.loader.exec_module(_mod)
|
||||||
|
|
||||||
|
from plugins.flaskpaste import ( # noqa: E402
|
||||||
|
cmd_paste,
|
||||||
|
cmd_shorten,
|
||||||
|
create_paste,
|
||||||
|
shorten_url,
|
||||||
|
)
|
||||||
|
|
||||||
|
# -- Helpers -----------------------------------------------------------------
|
||||||
|
|
||||||
|
class _FakeState:
|
||||||
|
"""In-memory stand-in for bot.state."""
|
||||||
|
|
||||||
|
def __init__(self):
|
||||||
|
self._store: dict[str, dict[str, str]] = {}
|
||||||
|
|
||||||
|
def get(self, plugin: str, key: str, default: str | None = None) -> str | None:
|
||||||
|
return self._store.get(plugin, {}).get(key, default)
|
||||||
|
|
||||||
|
def set(self, plugin: str, key: str, value: str) -> None:
|
||||||
|
self._store.setdefault(plugin, {})[key] = value
|
||||||
|
|
||||||
|
def delete(self, plugin: str, key: str) -> bool:
|
||||||
|
try:
|
||||||
|
del self._store[plugin][key]
|
||||||
|
return True
|
||||||
|
except KeyError:
|
||||||
|
return False
|
||||||
|
|
||||||
|
def keys(self, plugin: str) -> list[str]:
|
||||||
|
return sorted(self._store.get(plugin, {}).keys())
|
||||||
|
|
||||||
|
|
||||||
|
class _FakeRegistry:
|
||||||
|
"""Minimal registry stand-in."""
|
||||||
|
|
||||||
|
def __init__(self):
|
||||||
|
self._modules: dict = {}
|
||||||
|
|
||||||
|
|
||||||
|
class _FakeBot:
|
||||||
|
"""Minimal bot stand-in that captures sent/replied messages."""
|
||||||
|
|
||||||
|
def __init__(self, *, admin: bool = False):
|
||||||
|
self.sent: list[tuple[str, str]] = []
|
||||||
|
self.replied: list[str] = []
|
||||||
|
self.state = _FakeState()
|
||||||
|
self.registry = _FakeRegistry()
|
||||||
|
self._admin = admin
|
||||||
|
self.config: dict = {}
|
||||||
|
|
||||||
|
async def send(self, target: str, text: str) -> None:
|
||||||
|
self.sent.append((target, text))
|
||||||
|
|
||||||
|
async def reply(self, message, text: str) -> None:
|
||||||
|
self.replied.append(text)
|
||||||
|
|
||||||
|
def _is_admin(self, message) -> bool:
|
||||||
|
return self._admin
|
||||||
|
|
||||||
|
|
||||||
|
def _msg(text: str, nick: str = "alice", target: str = "#test") -> Message:
|
||||||
|
"""Create a channel PRIVMSG."""
|
||||||
|
return Message(
|
||||||
|
raw="", prefix=f"{nick}!~{nick}@host", nick=nick,
|
||||||
|
command="PRIVMSG", params=[target, text], tags={},
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# TestCmdShorten
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
class TestCmdShorten:
|
||||||
|
def test_missing_url(self):
|
||||||
|
bot = _FakeBot()
|
||||||
|
asyncio.run(cmd_shorten(bot, _msg("!shorten")))
|
||||||
|
assert "Usage:" in bot.replied[0]
|
||||||
|
|
||||||
|
def test_invalid_scheme(self):
|
||||||
|
bot = _FakeBot()
|
||||||
|
asyncio.run(cmd_shorten(bot, _msg("!shorten ftp://example.com")))
|
||||||
|
assert "http://" in bot.replied[0]
|
||||||
|
|
||||||
|
def test_success(self):
|
||||||
|
bot = _FakeBot()
|
||||||
|
|
||||||
|
async def inner():
|
||||||
|
with patch.object(
|
||||||
|
_mod, "_shorten_url",
|
||||||
|
return_value="https://paste.mymx.me/s/abc123",
|
||||||
|
):
|
||||||
|
await cmd_shorten(bot, _msg("!shorten https://example.com/long"))
|
||||||
|
assert "paste.mymx.me/s/abc123" in bot.replied[0]
|
||||||
|
|
||||||
|
asyncio.run(inner())
|
||||||
|
|
||||||
|
def test_failure(self):
|
||||||
|
bot = _FakeBot()
|
||||||
|
|
||||||
|
async def inner():
|
||||||
|
with patch.object(
|
||||||
|
_mod, "_shorten_url",
|
||||||
|
side_effect=ConnectionError("down"),
|
||||||
|
):
|
||||||
|
await cmd_shorten(bot, _msg("!shorten https://example.com/long"))
|
||||||
|
assert "shorten failed" in bot.replied[0]
|
||||||
|
|
||||||
|
asyncio.run(inner())
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# TestCmdPaste
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
class TestCmdPaste:
|
||||||
|
def test_missing_text(self):
|
||||||
|
bot = _FakeBot()
|
||||||
|
asyncio.run(cmd_paste(bot, _msg("!paste")))
|
||||||
|
assert "Usage:" in bot.replied[0]
|
||||||
|
|
||||||
|
def test_success(self):
|
||||||
|
bot = _FakeBot()
|
||||||
|
|
||||||
|
async def inner():
|
||||||
|
with patch.object(
|
||||||
|
_mod, "_create_paste",
|
||||||
|
return_value="https://paste.mymx.me/xyz789",
|
||||||
|
):
|
||||||
|
await cmd_paste(bot, _msg("!paste hello world"))
|
||||||
|
assert "paste.mymx.me/xyz789" in bot.replied[0]
|
||||||
|
|
||||||
|
asyncio.run(inner())
|
||||||
|
|
||||||
|
def test_failure(self):
|
||||||
|
bot = _FakeBot()
|
||||||
|
|
||||||
|
async def inner():
|
||||||
|
with patch.object(
|
||||||
|
_mod, "_create_paste",
|
||||||
|
side_effect=ConnectionError("down"),
|
||||||
|
):
|
||||||
|
await cmd_paste(bot, _msg("!paste hello world"))
|
||||||
|
assert "paste failed" in bot.replied[0]
|
||||||
|
|
||||||
|
asyncio.run(inner())
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# TestShortenUrlHelper
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
class TestShortenUrlHelper:
|
||||||
|
def test_returns_short_url(self):
|
||||||
|
bot = _FakeBot()
|
||||||
|
with patch.object(
|
||||||
|
_mod, "_shorten_url",
|
||||||
|
return_value="https://paste.mymx.me/s/short",
|
||||||
|
):
|
||||||
|
result = shorten_url(bot, "https://example.com/long")
|
||||||
|
assert result == "https://paste.mymx.me/s/short"
|
||||||
|
|
||||||
|
def test_returns_original_on_error(self):
|
||||||
|
bot = _FakeBot()
|
||||||
|
with patch.object(
|
||||||
|
_mod, "_shorten_url",
|
||||||
|
side_effect=ConnectionError("fail"),
|
||||||
|
):
|
||||||
|
result = shorten_url(bot, "https://example.com/long")
|
||||||
|
assert result == "https://example.com/long"
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# TestCreatePasteHelper
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
class TestCreatePasteHelper:
|
||||||
|
def test_returns_paste_url(self):
|
||||||
|
bot = _FakeBot()
|
||||||
|
with patch.object(
|
||||||
|
_mod, "_create_paste",
|
||||||
|
return_value="https://paste.mymx.me/abc",
|
||||||
|
):
|
||||||
|
result = create_paste(bot, "hello")
|
||||||
|
assert result == "https://paste.mymx.me/abc"
|
||||||
|
|
||||||
|
def test_returns_none_on_error(self):
|
||||||
|
bot = _FakeBot()
|
||||||
|
with patch.object(
|
||||||
|
_mod, "_create_paste",
|
||||||
|
side_effect=ConnectionError("fail"),
|
||||||
|
):
|
||||||
|
result = create_paste(bot, "hello")
|
||||||
|
assert result is None
|
||||||
@@ -274,7 +274,7 @@ class TestAdmin:
|
|||||||
|
|
||||||
replies = h.sent_privmsgs("#test")
|
replies = h.sent_privmsgs("#test")
|
||||||
assert not any("Permission denied" in r for r in replies)
|
assert not any("Permission denied" in r for r in replies)
|
||||||
assert any("Patterns:" in r for r in replies)
|
assert any("Admin:" in r for r in replies)
|
||||||
|
|
||||||
def test_oper_detection(self):
|
def test_oper_detection(self):
|
||||||
"""IRC operator detected via WHO reply can use admin commands."""
|
"""IRC operator detected via WHO reply can use admin commands."""
|
||||||
@@ -290,7 +290,7 @@ class TestAdmin:
|
|||||||
|
|
||||||
replies = h.sent_privmsgs("#test")
|
replies = h.sent_privmsgs("#test")
|
||||||
assert not any("Permission denied" in r for r in replies)
|
assert not any("Permission denied" in r for r in replies)
|
||||||
assert any("Opers:" in r for r in replies)
|
assert any("IRCOPs:" in r for r in replies)
|
||||||
|
|
||||||
def test_oper_detection_on_join(self):
|
def test_oper_detection_on_join(self):
|
||||||
"""User joining a channel triggers debounced WHO for oper detection."""
|
"""User joining a channel triggers debounced WHO for oper detection."""
|
||||||
|
|||||||
@@ -140,6 +140,9 @@ class _FakeBot:
|
|||||||
async def reply(self, message, text: str) -> None:
|
async def reply(self, message, text: str) -> None:
|
||||||
self.replied.append(text)
|
self.replied.append(text)
|
||||||
|
|
||||||
|
async def shorten_url(self, url: str) -> str:
|
||||||
|
return url
|
||||||
|
|
||||||
def _is_admin(self, message) -> bool:
|
def _is_admin(self, message) -> bool:
|
||||||
return self._admin
|
return self._admin
|
||||||
|
|
||||||
|
|||||||
@@ -166,6 +166,9 @@ class _FakeBot:
|
|||||||
async def reply(self, message, text: str) -> None:
|
async def reply(self, message, text: str) -> None:
|
||||||
self.replied.append(text)
|
self.replied.append(text)
|
||||||
|
|
||||||
|
async def shorten_url(self, url: str) -> str:
|
||||||
|
return url
|
||||||
|
|
||||||
def _is_admin(self, message) -> bool:
|
def _is_admin(self, message) -> bool:
|
||||||
return self._admin
|
return self._admin
|
||||||
|
|
||||||
|
|||||||
395
tests/test_webhook.py
Normal file
395
tests/test_webhook.py
Normal file
@@ -0,0 +1,395 @@
|
|||||||
|
"""Tests for the webhook listener plugin."""
|
||||||
|
|
||||||
|
import asyncio
|
||||||
|
import hashlib
|
||||||
|
import hmac
|
||||||
|
import importlib.util
|
||||||
|
import json
|
||||||
|
import sys
|
||||||
|
import time
|
||||||
|
from pathlib import Path
|
||||||
|
|
||||||
|
from derp.irc import Message
|
||||||
|
|
||||||
|
# plugins/ is not a Python package -- load the module from file path
|
||||||
|
_spec = importlib.util.spec_from_file_location(
|
||||||
|
"plugins.webhook", Path(__file__).resolve().parent.parent / "plugins" / "webhook.py",
|
||||||
|
)
|
||||||
|
_mod = importlib.util.module_from_spec(_spec)
|
||||||
|
sys.modules[_spec.name] = _mod
|
||||||
|
_spec.loader.exec_module(_mod)
|
||||||
|
|
||||||
|
from plugins.webhook import ( # noqa: E402
|
||||||
|
_MAX_BODY,
|
||||||
|
_handle_request,
|
||||||
|
_http_response,
|
||||||
|
_verify_signature,
|
||||||
|
cmd_webhook,
|
||||||
|
on_connect,
|
||||||
|
)
|
||||||
|
|
||||||
|
# -- Helpers -----------------------------------------------------------------
|
||||||
|
|
||||||
|
|
||||||
|
class _FakeState:
|
||||||
|
"""In-memory stand-in for bot.state."""
|
||||||
|
|
||||||
|
def __init__(self):
|
||||||
|
self._store: dict[str, dict[str, str]] = {}
|
||||||
|
|
||||||
|
def get(self, plugin: str, key: str, default: str | None = None) -> str | None:
|
||||||
|
return self._store.get(plugin, {}).get(key, default)
|
||||||
|
|
||||||
|
def set(self, plugin: str, key: str, value: str) -> None:
|
||||||
|
self._store.setdefault(plugin, {})[key] = value
|
||||||
|
|
||||||
|
def delete(self, plugin: str, key: str) -> bool:
|
||||||
|
try:
|
||||||
|
del self._store[plugin][key]
|
||||||
|
return True
|
||||||
|
except KeyError:
|
||||||
|
return False
|
||||||
|
|
||||||
|
def keys(self, plugin: str) -> list[str]:
|
||||||
|
return sorted(self._store.get(plugin, {}).keys())
|
||||||
|
|
||||||
|
|
||||||
|
class _FakeBot:
|
||||||
|
"""Minimal bot stand-in that captures sent/action messages."""
|
||||||
|
|
||||||
|
def __init__(self, *, admin: bool = True, webhook_cfg: dict | None = None):
|
||||||
|
self.sent: list[tuple[str, str]] = []
|
||||||
|
self.replied: list[str] = []
|
||||||
|
self.actions: list[tuple[str, str]] = []
|
||||||
|
self.state = _FakeState()
|
||||||
|
self._admin = admin
|
||||||
|
self.prefix = "!"
|
||||||
|
self.config = {
|
||||||
|
"webhook": webhook_cfg or {"enabled": False},
|
||||||
|
}
|
||||||
|
|
||||||
|
async def send(self, target: str, text: str) -> None:
|
||||||
|
self.sent.append((target, text))
|
||||||
|
|
||||||
|
async def reply(self, message, text: str) -> None:
|
||||||
|
self.replied.append(text)
|
||||||
|
|
||||||
|
async def action(self, target: str, text: str) -> None:
|
||||||
|
self.actions.append((target, text))
|
||||||
|
|
||||||
|
def _is_admin(self, message) -> bool:
|
||||||
|
return self._admin
|
||||||
|
|
||||||
|
|
||||||
|
def _msg(text: str, nick: str = "admin", target: str = "#test") -> Message:
|
||||||
|
return Message(
|
||||||
|
raw="", prefix=f"{nick}!~{nick}@host", nick=nick,
|
||||||
|
command="PRIVMSG", params=[target, text], tags={},
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def _sign(secret: str, body: bytes) -> str:
|
||||||
|
"""Generate HMAC-SHA256 signature."""
|
||||||
|
sig = hmac.new(secret.encode(), body, hashlib.sha256).hexdigest()
|
||||||
|
return f"sha256={sig}"
|
||||||
|
|
||||||
|
|
||||||
|
class _FakeReader:
|
||||||
|
"""Mock asyncio.StreamReader from raw HTTP bytes."""
|
||||||
|
|
||||||
|
def __init__(self, data: bytes) -> None:
|
||||||
|
self._data = data
|
||||||
|
self._pos = 0
|
||||||
|
|
||||||
|
async def readline(self) -> bytes:
|
||||||
|
start = self._pos
|
||||||
|
idx = self._data.find(b"\n", start)
|
||||||
|
if idx == -1:
|
||||||
|
self._pos = len(self._data)
|
||||||
|
return self._data[start:]
|
||||||
|
self._pos = idx + 1
|
||||||
|
return self._data[start:self._pos]
|
||||||
|
|
||||||
|
async def readexactly(self, n: int) -> bytes:
|
||||||
|
chunk = self._data[self._pos:self._pos + n]
|
||||||
|
self._pos += n
|
||||||
|
return chunk
|
||||||
|
|
||||||
|
|
||||||
|
class _FakeWriter:
|
||||||
|
"""Mock asyncio.StreamWriter that captures output."""
|
||||||
|
|
||||||
|
def __init__(self) -> None:
|
||||||
|
self.data = b""
|
||||||
|
self._closed = False
|
||||||
|
|
||||||
|
def write(self, data: bytes) -> None:
|
||||||
|
self.data += data
|
||||||
|
|
||||||
|
def close(self) -> None:
|
||||||
|
self._closed = True
|
||||||
|
|
||||||
|
async def wait_closed(self) -> None:
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
def _build_request(method: str, body: bytes, headers: dict[str, str] | None = None) -> bytes:
|
||||||
|
"""Build raw HTTP request bytes."""
|
||||||
|
hdrs = headers or {}
|
||||||
|
if "Content-Length" not in hdrs:
|
||||||
|
hdrs["Content-Length"] = str(len(body))
|
||||||
|
lines = [f"{method} / HTTP/1.1"]
|
||||||
|
for k, v in hdrs.items():
|
||||||
|
lines.append(f"{k}: {v}")
|
||||||
|
lines.append("")
|
||||||
|
lines.append("")
|
||||||
|
return "\r\n".join(lines).encode("utf-8") + body
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# TestVerifySignature
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
|
||||||
|
class TestVerifySignature:
|
||||||
|
def test_valid_signature(self):
|
||||||
|
body = b'{"channel":"#test","text":"hello"}'
|
||||||
|
sig = _sign("secret", body)
|
||||||
|
assert _verify_signature("secret", body, sig) is True
|
||||||
|
|
||||||
|
def test_invalid_signature(self):
|
||||||
|
body = b'{"channel":"#test","text":"hello"}'
|
||||||
|
assert _verify_signature("secret", body, "sha256=bad") is False
|
||||||
|
|
||||||
|
def test_empty_secret_allows_all(self):
|
||||||
|
body = b'{"channel":"#test","text":"hello"}'
|
||||||
|
assert _verify_signature("", body, "") is True
|
||||||
|
|
||||||
|
def test_missing_prefix(self):
|
||||||
|
body = b'{"channel":"#test","text":"hello"}'
|
||||||
|
sig = hmac.new(b"secret", body, hashlib.sha256).hexdigest()
|
||||||
|
assert _verify_signature("secret", body, sig) is False
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# TestHttpResponse
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
|
||||||
|
class TestHttpResponse:
|
||||||
|
def test_200_response(self):
|
||||||
|
resp = _http_response(200, "OK", "sent")
|
||||||
|
assert b"200 OK" in resp
|
||||||
|
assert b"sent" in resp
|
||||||
|
|
||||||
|
def test_400_with_body(self):
|
||||||
|
resp = _http_response(400, "Bad Request", "invalid JSON")
|
||||||
|
assert b"400 Bad Request" in resp
|
||||||
|
assert b"invalid JSON" in resp
|
||||||
|
|
||||||
|
def test_405_response(self):
|
||||||
|
resp = _http_response(405, "Method Not Allowed", "POST only")
|
||||||
|
assert b"405 Method Not Allowed" in resp
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# TestRequestHandler
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
|
||||||
|
class TestRequestHandler:
|
||||||
|
def test_valid_post(self):
|
||||||
|
bot = _FakeBot()
|
||||||
|
body = json.dumps({"channel": "#ops", "text": "deploy done"}).encode()
|
||||||
|
sig = _sign("secret", body)
|
||||||
|
raw = _build_request("POST", body, {
|
||||||
|
"Content-Length": str(len(body)),
|
||||||
|
"Content-Type": "application/json",
|
||||||
|
"X-Signature": sig,
|
||||||
|
})
|
||||||
|
reader = _FakeReader(raw)
|
||||||
|
writer = _FakeWriter()
|
||||||
|
asyncio.run(_handle_request(reader, writer, bot, "secret"))
|
||||||
|
assert b"200 OK" in writer.data
|
||||||
|
assert ("#ops", "deploy done") in bot.sent
|
||||||
|
|
||||||
|
def test_action_post(self):
|
||||||
|
bot = _FakeBot()
|
||||||
|
body = json.dumps({
|
||||||
|
"channel": "#ops", "text": "deployed", "action": True,
|
||||||
|
}).encode()
|
||||||
|
sig = _sign("s", body)
|
||||||
|
raw = _build_request("POST", body, {
|
||||||
|
"Content-Length": str(len(body)),
|
||||||
|
"X-Signature": sig,
|
||||||
|
})
|
||||||
|
reader = _FakeReader(raw)
|
||||||
|
writer = _FakeWriter()
|
||||||
|
asyncio.run(_handle_request(reader, writer, bot, "s"))
|
||||||
|
assert b"200 OK" in writer.data
|
||||||
|
assert ("#ops", "deployed") in bot.actions
|
||||||
|
|
||||||
|
def test_get_405(self):
|
||||||
|
bot = _FakeBot()
|
||||||
|
raw = _build_request("GET", b"")
|
||||||
|
reader = _FakeReader(raw)
|
||||||
|
writer = _FakeWriter()
|
||||||
|
asyncio.run(_handle_request(reader, writer, bot, ""))
|
||||||
|
assert b"405" in writer.data
|
||||||
|
|
||||||
|
def test_bad_signature_401(self):
|
||||||
|
bot = _FakeBot()
|
||||||
|
body = json.dumps({"channel": "#test", "text": "x"}).encode()
|
||||||
|
raw = _build_request("POST", body, {
|
||||||
|
"Content-Length": str(len(body)),
|
||||||
|
"X-Signature": "sha256=wrong",
|
||||||
|
})
|
||||||
|
reader = _FakeReader(raw)
|
||||||
|
writer = _FakeWriter()
|
||||||
|
asyncio.run(_handle_request(reader, writer, bot, "real-secret"))
|
||||||
|
assert b"401" in writer.data
|
||||||
|
assert len(bot.sent) == 0
|
||||||
|
|
||||||
|
def test_bad_json_400(self):
|
||||||
|
bot = _FakeBot()
|
||||||
|
body = b"not json"
|
||||||
|
raw = _build_request("POST", body, {"Content-Length": str(len(body))})
|
||||||
|
reader = _FakeReader(raw)
|
||||||
|
writer = _FakeWriter()
|
||||||
|
asyncio.run(_handle_request(reader, writer, bot, ""))
|
||||||
|
assert b"400" in writer.data
|
||||||
|
assert b"invalid JSON" in writer.data
|
||||||
|
|
||||||
|
def test_missing_channel_400(self):
|
||||||
|
bot = _FakeBot()
|
||||||
|
body = json.dumps({"text": "no channel"}).encode()
|
||||||
|
raw = _build_request("POST", body, {"Content-Length": str(len(body))})
|
||||||
|
reader = _FakeReader(raw)
|
||||||
|
writer = _FakeWriter()
|
||||||
|
asyncio.run(_handle_request(reader, writer, bot, ""))
|
||||||
|
assert b"400" in writer.data
|
||||||
|
assert b"invalid channel" in writer.data
|
||||||
|
|
||||||
|
def test_invalid_channel_400(self):
|
||||||
|
bot = _FakeBot()
|
||||||
|
body = json.dumps({"channel": "nochanprefix", "text": "x"}).encode()
|
||||||
|
raw = _build_request("POST", body, {"Content-Length": str(len(body))})
|
||||||
|
reader = _FakeReader(raw)
|
||||||
|
writer = _FakeWriter()
|
||||||
|
asyncio.run(_handle_request(reader, writer, bot, ""))
|
||||||
|
assert b"400" in writer.data
|
||||||
|
|
||||||
|
def test_empty_text_400(self):
|
||||||
|
bot = _FakeBot()
|
||||||
|
body = json.dumps({"channel": "#test", "text": ""}).encode()
|
||||||
|
raw = _build_request("POST", body, {"Content-Length": str(len(body))})
|
||||||
|
reader = _FakeReader(raw)
|
||||||
|
writer = _FakeWriter()
|
||||||
|
asyncio.run(_handle_request(reader, writer, bot, ""))
|
||||||
|
assert b"400" in writer.data
|
||||||
|
assert b"empty text" in writer.data
|
||||||
|
|
||||||
|
def test_body_too_large_413(self):
|
||||||
|
bot = _FakeBot()
|
||||||
|
raw = _build_request("POST", b"", {
|
||||||
|
"Content-Length": str(_MAX_BODY + 1),
|
||||||
|
})
|
||||||
|
reader = _FakeReader(raw)
|
||||||
|
writer = _FakeWriter()
|
||||||
|
asyncio.run(_handle_request(reader, writer, bot, ""))
|
||||||
|
assert b"413" in writer.data
|
||||||
|
|
||||||
|
def test_counter_increments(self):
|
||||||
|
bot = _FakeBot()
|
||||||
|
# Reset counter
|
||||||
|
_mod._request_count = 0
|
||||||
|
body = json.dumps({"channel": "#test", "text": "hi"}).encode()
|
||||||
|
raw = _build_request("POST", body, {"Content-Length": str(len(body))})
|
||||||
|
reader = _FakeReader(raw)
|
||||||
|
writer = _FakeWriter()
|
||||||
|
asyncio.run(_handle_request(reader, writer, bot, ""))
|
||||||
|
assert _mod._request_count == 1
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# TestServerLifecycle
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
|
||||||
|
class TestServerLifecycle:
|
||||||
|
def test_disabled_config(self):
|
||||||
|
"""Server does not start when webhook is disabled."""
|
||||||
|
bot = _FakeBot(webhook_cfg={"enabled": False})
|
||||||
|
msg = _msg("", target="")
|
||||||
|
msg = Message(raw="", prefix="", nick="", command="001",
|
||||||
|
params=["test", "Welcome"], tags={})
|
||||||
|
# Reset global state
|
||||||
|
_mod._server = None
|
||||||
|
asyncio.run(on_connect(bot, msg))
|
||||||
|
assert _mod._server is None
|
||||||
|
|
||||||
|
def test_duplicate_guard(self):
|
||||||
|
"""Second on_connect does not create a second server."""
|
||||||
|
sentinel = object()
|
||||||
|
_mod._server = sentinel
|
||||||
|
bot = _FakeBot(webhook_cfg={"enabled": True, "port": 0})
|
||||||
|
msg = Message(raw="", prefix="", nick="", command="001",
|
||||||
|
params=["test", "Welcome"], tags={})
|
||||||
|
asyncio.run(on_connect(bot, msg))
|
||||||
|
assert _mod._server is sentinel
|
||||||
|
_mod._server = None # cleanup
|
||||||
|
|
||||||
|
def test_on_connect_starts(self):
|
||||||
|
"""on_connect starts the server when enabled."""
|
||||||
|
_mod._server = None
|
||||||
|
bot = _FakeBot(webhook_cfg={
|
||||||
|
"enabled": True, "host": "127.0.0.1", "port": 0, "secret": "",
|
||||||
|
})
|
||||||
|
msg = Message(raw="", prefix="", nick="", command="001",
|
||||||
|
params=["test", "Welcome"], tags={})
|
||||||
|
|
||||||
|
async def _run():
|
||||||
|
await on_connect(bot, msg)
|
||||||
|
assert _mod._server is not None
|
||||||
|
_mod._server.close()
|
||||||
|
await _mod._server.wait_closed()
|
||||||
|
_mod._server = None
|
||||||
|
|
||||||
|
asyncio.run(_run())
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# TestWebhookCommand
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
|
||||||
|
class TestWebhookCommand:
|
||||||
|
def test_not_running(self):
|
||||||
|
bot = _FakeBot()
|
||||||
|
_mod._server = None
|
||||||
|
asyncio.run(cmd_webhook(bot, _msg("!webhook")))
|
||||||
|
assert any("not running" in r for r in bot.replied)
|
||||||
|
|
||||||
|
def test_running_shows_status(self):
|
||||||
|
bot = _FakeBot()
|
||||||
|
_mod._request_count = 42
|
||||||
|
_mod._started = time.monotonic() - 90 # 1m 30s ago
|
||||||
|
|
||||||
|
async def _run():
|
||||||
|
# Start a real server on port 0 to get a valid socket
|
||||||
|
srv = await asyncio.start_server(lambda r, w: None,
|
||||||
|
"127.0.0.1", 0)
|
||||||
|
_mod._server = srv
|
||||||
|
try:
|
||||||
|
await cmd_webhook(bot, _msg("!webhook"))
|
||||||
|
finally:
|
||||||
|
srv.close()
|
||||||
|
await srv.wait_closed()
|
||||||
|
_mod._server = None
|
||||||
|
|
||||||
|
asyncio.run(_run())
|
||||||
|
assert len(bot.replied) == 1
|
||||||
|
reply = bot.replied[0]
|
||||||
|
assert "Webhook:" in reply
|
||||||
|
assert "42 requests" in reply
|
||||||
|
assert "127.0.0.1:" in reply
|
||||||
@@ -171,6 +171,9 @@ class _FakeBot:
|
|||||||
async def reply(self, message, text: str) -> None:
|
async def reply(self, message, text: str) -> None:
|
||||||
self.replied.append(text)
|
self.replied.append(text)
|
||||||
|
|
||||||
|
async def shorten_url(self, url: str) -> str:
|
||||||
|
return url
|
||||||
|
|
||||||
def _is_admin(self, message) -> bool:
|
def _is_admin(self, message) -> bool:
|
||||||
return self._admin
|
return self._admin
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user