"""Plugin: per-channel RSS/Atom feed subscriptions with periodic polling.""" from __future__ import annotations import asyncio import json import re import urllib.request import xml.etree.ElementTree as ET from datetime import datetime, timezone from urllib.parse import urlparse from derp.http import urlopen as _urlopen from derp.plugin import command, event # -- Constants --------------------------------------------------------------- _NAME_RE = re.compile(r"^[a-z0-9][a-z0-9-]{0,19}$") _MAX_SEEN = 200 _MAX_ANNOUNCE = 5 _DEFAULT_INTERVAL = 600 _MAX_INTERVAL = 3600 _FETCH_TIMEOUT = 15 _USER_AGENT = "derp/1.0" _MAX_TITLE_LEN = 80 _MAX_FEEDS = 20 _ATOM_NS = "{http://www.w3.org/2005/Atom}" _DC_NS = "{http://purl.org/dc/elements/1.1/}" # -- Module-level tracking --------------------------------------------------- _pollers: dict[str, asyncio.Task] = {} _feeds: dict[str, dict] = {} _errors: dict[str, int] = {} # -- Pure helpers ------------------------------------------------------------ def _state_key(channel: str, name: str) -> str: """Build composite state key.""" return f"{channel}:{name}" def _validate_name(name: str) -> bool: """Check name against allowed pattern.""" return bool(_NAME_RE.match(name)) def _derive_name(url: str) -> str: """Derive a short feed name from URL hostname.""" try: hostname = urlparse(url).hostname or "" except Exception: hostname = "" # Strip www. prefix, take first label, lowercase hostname = hostname.lower().removeprefix("www.") name = hostname.split(".")[0] if hostname else "feed" # Sanitize to allowed chars name = re.sub(r"[^a-z0-9-]", "", name) if not name or not name[0].isalnum(): name = "feed" return name[:20] def _truncate(text: str, max_len: int = _MAX_TITLE_LEN) -> str: """Truncate text with ellipsis if needed.""" if len(text) <= max_len: return text return text[: max_len - 3].rstrip() + "..." # -- State helpers ----------------------------------------------------------- def _save(bot, key: str, data: dict) -> None: """Persist feed data to bot.state.""" bot.state.set("rss", key, json.dumps(data)) def _load(bot, key: str) -> dict | None: """Load feed data from bot.state.""" raw = bot.state.get("rss", key) if raw is None: return None try: return json.loads(raw) except json.JSONDecodeError: return None def _delete(bot, key: str) -> None: """Remove feed data from bot.state.""" bot.state.delete("rss", key) # -- Feed fetching (blocking, for executor) ---------------------------------- def _fetch_feed(url: str, etag: str = "", last_modified: str = "") -> dict: """Blocking HTTP GET for feed content. Run via executor.""" result: dict = { "status": 0, "body": b"", "etag": "", "last_modified": "", "error": "", } req = urllib.request.Request(url, method="GET") req.add_header("User-Agent", _USER_AGENT) if etag: req.add_header("If-None-Match", etag) if last_modified: req.add_header("If-Modified-Since", last_modified) try: resp = _urlopen(req, timeout=_FETCH_TIMEOUT) result["status"] = resp.status result["body"] = resp.read() result["etag"] = resp.headers.get("ETag", "") result["last_modified"] = resp.headers.get("Last-Modified", "") resp.close() except urllib.error.HTTPError as exc: result["status"] = exc.code if exc.code == 304: result["etag"] = etag result["last_modified"] = last_modified else: result["error"] = f"HTTP {exc.code}" except urllib.error.URLError as exc: result["error"] = str(exc.reason) except Exception as exc: result["error"] = str(exc) return result # -- Feed parsing ------------------------------------------------------------ def _parse_rss(root: ET.Element) -> tuple[str, list[dict]]: """Parse RSS 2.0 feed.""" channel = root.find("channel") if channel is None: return ("", []) title = (channel.findtext("title") or "").strip() items = [] for item in channel.findall("item"): item_id = item.findtext("guid") or item.findtext("link") or "" item_title = (item.findtext("title") or "").strip() item_link = (item.findtext("link") or "").strip() if item_id: items.append({"id": item_id, "title": item_title, "link": item_link}) return (title, items) def _parse_atom(root: ET.Element) -> tuple[str, list[dict]]: """Parse Atom feed.""" title = (root.findtext(f"{_ATOM_NS}title") or "").strip() items = [] for entry in root.findall(f"{_ATOM_NS}entry"): entry_id = (entry.findtext(f"{_ATOM_NS}id") or "").strip() link_el = entry.find(f"{_ATOM_NS}link") entry_link = (link_el.get("href", "") if link_el is not None else "").strip() if not entry_id: entry_id = entry_link entry_title = (entry.findtext(f"{_ATOM_NS}title") or "").strip() if entry_id: items.append({"id": entry_id, "title": entry_title, "link": entry_link}) return (title, items) def _parse_feed(body: bytes) -> tuple[str, list[dict]]: """Auto-detect RSS/Atom and parse. Returns (feed_title, items).""" root = ET.fromstring(body) tag = root.tag local = tag.split("}")[-1].lower() if "}" in tag else tag.lower() if local == "rss": return _parse_rss(root) if local == "feed": return _parse_atom(root) raise ValueError(f"Unknown feed format: {root.tag}") # -- Polling ----------------------------------------------------------------- async def _poll_once(bot, key: str, announce: bool = True) -> None: """Single poll cycle for one feed.""" data = _feeds.get(key) if data is None: data = _load(bot, key) if data is None: return _feeds[key] = data url = data["url"] etag = data.get("etag", "") last_modified = data.get("last_modified", "") loop = asyncio.get_running_loop() result = await loop.run_in_executor( None, _fetch_feed, url, etag, last_modified, ) now = datetime.now(timezone.utc).isoformat() data["last_poll"] = now if result["error"]: data["last_error"] = result["error"] _errors[key] = _errors.get(key, 0) + 1 _feeds[key] = data _save(bot, key, data) return # HTTP 304 -- not modified if result["status"] == 304: data["last_error"] = "" _errors[key] = 0 _feeds[key] = data _save(bot, key, data) return # Update conditional headers data["etag"] = result["etag"] data["last_modified"] = result["last_modified"] data["last_error"] = "" _errors[key] = 0 try: feed_title, items = _parse_feed(result["body"]) except Exception as exc: data["last_error"] = f"Parse error: {exc}" _errors[key] = _errors.get(key, 0) + 1 _feeds[key] = data _save(bot, key, data) return if feed_title and not data.get("title"): data["title"] = feed_title seen = set(data.get("seen", [])) seen_list = list(data.get("seen", [])) new_items = [item for item in items if item["id"] not in seen] if announce and new_items: channel = data["channel"] name = data["name"] shown = new_items[:_MAX_ANNOUNCE] for item in shown: title = _truncate(item["title"]) if item["title"] else "(no title)" link = item["link"] line = f"[{name}] {title}" if link: line += f" -- {link}" await bot.send(channel, line) remaining = len(new_items) - len(shown) if remaining > 0: await bot.send(channel, f"[{name}] ... and {remaining} more") # Update seen list for item in new_items: seen_list.append(item["id"]) if len(seen_list) > _MAX_SEEN: seen_list = seen_list[-_MAX_SEEN:] data["seen"] = seen_list _feeds[key] = data _save(bot, key, data) async def _poll_loop(bot, key: str) -> None: """Infinite poll loop for one feed.""" try: while True: data = _feeds.get(key) or _load(bot, key) if data is None: return interval = data.get("interval", _DEFAULT_INTERVAL) # Back off on consecutive errors errs = _errors.get(key, 0) if errs >= 5: interval = min(interval * 2, _MAX_INTERVAL) await asyncio.sleep(interval) await _poll_once(bot, key, announce=True) except asyncio.CancelledError: pass def _start_poller(bot, key: str) -> None: """Create and track a poller task.""" existing = _pollers.get(key) if existing and not existing.done(): return task = asyncio.create_task(_poll_loop(bot, key)) _pollers[key] = task def _stop_poller(key: str) -> None: """Cancel and remove a poller task.""" task = _pollers.pop(key, None) if task and not task.done(): task.cancel() _feeds.pop(key, None) _errors.pop(key, 0) # -- Restore on connect ----------------------------------------------------- def _restore(bot) -> None: """Rebuild pollers from persisted state.""" for key in bot.state.keys("rss"): existing = _pollers.get(key) if existing and not existing.done(): continue data = _load(bot, key) if data is None: continue _feeds[key] = data _start_poller(bot, key) @event("001") async def on_connect(bot, message): """Restore RSS feed pollers on connect.""" _restore(bot) # -- Command handler --------------------------------------------------------- @command("rss", help="RSS: !rss add|del|list|check") async def cmd_rss(bot, message): """Per-channel RSS/Atom feed subscriptions. Usage: !rss add [name] Subscribe a feed (admin) !rss del Unsubscribe a feed (admin) !rss list List feeds in this channel !rss check Force-poll a feed now """ parts = message.text.split(None, 3) if len(parts) < 2: await bot.reply(message, "Usage: !rss [args]") return sub = parts[1].lower() # -- list (any user, any context) ---------------------------------------- if sub == "list": if not message.is_channel: await bot.reply(message, "Use this command in a channel") return channel = message.target prefix = f"{channel}:" feeds = [] for key in bot.state.keys("rss"): if key.startswith(prefix): data = _load(bot, key) if data: name = data["name"] err = data.get("last_error", "") if err: feeds.append(f"{name} (error)") else: feeds.append(name) if not feeds: await bot.reply(message, "No feeds in this channel") return await bot.reply(message, f"Feeds: {', '.join(feeds)}") return # -- check (any user, channel only) -------------------------------------- if sub == "check": if not message.is_channel: await bot.reply(message, "Use this command in a channel") return if len(parts) < 3: await bot.reply(message, "Usage: !rss check ") return name = parts[2].lower() channel = message.target key = _state_key(channel, name) data = _load(bot, key) if data is None: await bot.reply(message, f"No feed '{name}' in this channel") return _feeds[key] = data await _poll_once(bot, key, announce=True) data = _feeds.get(key, data) if data.get("last_error"): await bot.reply(message, f"{name}: error -- {data['last_error']}") else: await bot.reply(message, f"{name}: checked") return # -- add (admin, channel only) ------------------------------------------- if sub == "add": if not bot._is_admin(message): await bot.reply(message, "Permission denied: add requires admin") return if not message.is_channel: await bot.reply(message, "Use this command in a channel") return if len(parts) < 3: await bot.reply(message, "Usage: !rss add [name]") return url = parts[2] if not url.startswith(("http://", "https://")): url = f"https://{url}" name = parts[3].lower() if len(parts) > 3 else _derive_name(url) if not _validate_name(name): await bot.reply( message, "Invalid name (lowercase alphanumeric + hyphens, 1-20 chars)", ) return channel = message.target key = _state_key(channel, name) # Check for duplicate if _load(bot, key) is not None: await bot.reply(message, f"Feed '{name}' already exists in this channel") return # Check per-channel limit prefix = f"{channel}:" count = sum(1 for k in bot.state.keys("rss") if k.startswith(prefix)) if count >= _MAX_FEEDS: await bot.reply(message, f"Channel feed limit reached ({_MAX_FEEDS})") return # Test-fetch to validate URL and seed seen list loop = asyncio.get_running_loop() result = await loop.run_in_executor(None, _fetch_feed, url, "", "") if result["error"]: await bot.reply(message, f"Fetch failed: {result['error']}") return feed_title = "" seen = [] try: feed_title, items = _parse_feed(result["body"]) seen = [item["id"] for item in items] if len(seen) > _MAX_SEEN: seen = seen[-_MAX_SEEN:] except Exception as exc: await bot.reply(message, f"Parse failed: {exc}") return now = datetime.now(timezone.utc).isoformat() data = { "url": url, "name": name, "channel": channel, "interval": _DEFAULT_INTERVAL, "added_by": message.nick, "added_at": now, "seen": seen, "last_poll": now, "last_error": "", "etag": result["etag"], "last_modified": result["last_modified"], "title": feed_title, } _save(bot, key, data) _feeds[key] = data _start_poller(bot, key) display = feed_title or name item_count = len(seen) await bot.reply( message, f"Subscribed '{name}' ({display}, {item_count} existing items)", ) return # -- del (admin, channel only) ------------------------------------------- if sub == "del": if not bot._is_admin(message): await bot.reply(message, "Permission denied: del requires admin") return if not message.is_channel: await bot.reply(message, "Use this command in a channel") return if len(parts) < 3: await bot.reply(message, "Usage: !rss del ") return name = parts[2].lower() channel = message.target key = _state_key(channel, name) if _load(bot, key) is None: await bot.reply(message, f"No feed '{name}' in this channel") return _stop_poller(key) _delete(bot, key) await bot.reply(message, f"Unsubscribed '{name}'") return await bot.reply(message, "Usage: !rss [args]")