feat: add multi-server support
Connect to multiple IRC servers concurrently from a single config file. Plugins are loaded once and shared; per-server state is isolated via separate SQLite databases and per-bot runtime state (bot._pstate). - Add build_server_configs() for [servers.*] config layout - Bot.__init__ gains name parameter, _pstate dict for plugin isolation - cli.py runs multiple bots via asyncio.gather - 9 stateful plugins migrated from module-level dicts to _ps(bot) pattern - Backward compatible: legacy [server] config works unchanged Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
102
plugins/alert.py
102
plugins/alert.py
@@ -77,12 +77,19 @@ _DEVTO_API = "https://dev.to/api/articles"
|
||||
_MEDIUM_FEED_URL = "https://medium.com/feed/tag"
|
||||
_HUGGINGFACE_API = "https://huggingface.co/api/models"
|
||||
|
||||
# -- Module-level tracking ---------------------------------------------------
|
||||
# -- Per-bot plugin runtime state --------------------------------------------
|
||||
|
||||
_pollers: dict[str, asyncio.Task] = {}
|
||||
_subscriptions: dict[str, dict] = {}
|
||||
_errors: dict[str, dict[str, int]] = {}
|
||||
_poll_count: dict[str, int] = {}
|
||||
|
||||
def _ps(bot):
|
||||
"""Per-bot plugin runtime state."""
|
||||
return bot._pstate.setdefault("alert", {
|
||||
"pollers": {},
|
||||
"subs": {},
|
||||
"errors": {},
|
||||
"poll_count": {},
|
||||
"db_conn": None,
|
||||
"db_path": "data/alert_history.db",
|
||||
})
|
||||
|
||||
# -- Concurrent fetch helper -------------------------------------------------
|
||||
|
||||
@@ -121,18 +128,16 @@ def _fetch_many(targets, *, build_req, timeout, parse):
|
||||
|
||||
# -- History database --------------------------------------------------------
|
||||
|
||||
_DB_PATH = Path("data/alert_history.db")
|
||||
_conn: sqlite3.Connection | None = None
|
||||
|
||||
|
||||
def _db() -> sqlite3.Connection:
|
||||
def _db(bot) -> sqlite3.Connection:
|
||||
"""Lazy-init the history database connection and schema."""
|
||||
global _conn
|
||||
if _conn is not None:
|
||||
return _conn
|
||||
_DB_PATH.parent.mkdir(parents=True, exist_ok=True)
|
||||
_conn = sqlite3.connect(str(_DB_PATH))
|
||||
_conn.execute("""
|
||||
ps = _ps(bot)
|
||||
if ps["db_conn"] is not None:
|
||||
return ps["db_conn"]
|
||||
db_path = Path(ps.get("db_path", "data/alert_history.db"))
|
||||
db_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
conn = sqlite3.connect(str(db_path))
|
||||
conn.execute("""
|
||||
CREATE TABLE IF NOT EXISTS results (
|
||||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||
channel TEXT NOT NULL,
|
||||
@@ -152,34 +157,35 @@ def _db() -> sqlite3.Connection:
|
||||
("extra", "''"),
|
||||
]:
|
||||
try:
|
||||
_conn.execute(
|
||||
conn.execute(
|
||||
f"ALTER TABLE results ADD COLUMN {col} TEXT NOT NULL DEFAULT {default}"
|
||||
)
|
||||
except sqlite3.OperationalError:
|
||||
pass # column already exists
|
||||
_conn.execute(
|
||||
conn.execute(
|
||||
"CREATE INDEX IF NOT EXISTS idx_results_alert ON results(channel, alert)"
|
||||
)
|
||||
_conn.execute(
|
||||
conn.execute(
|
||||
"CREATE INDEX IF NOT EXISTS idx_results_short_id ON results(short_id)"
|
||||
)
|
||||
# Backfill short_id for rows that predate the column
|
||||
for row_id, backend, item_id in _conn.execute(
|
||||
for row_id, backend, item_id in conn.execute(
|
||||
"SELECT id, backend, item_id FROM results WHERE short_id = ''"
|
||||
).fetchall():
|
||||
_conn.execute(
|
||||
conn.execute(
|
||||
"UPDATE results SET short_id = ? WHERE id = ?",
|
||||
(_make_short_id(backend, item_id), row_id),
|
||||
)
|
||||
_conn.commit()
|
||||
return _conn
|
||||
conn.commit()
|
||||
ps["db_conn"] = conn
|
||||
return conn
|
||||
|
||||
|
||||
def _save_result(channel: str, alert: str, backend: str, item: dict,
|
||||
def _save_result(bot, channel: str, alert: str, backend: str, item: dict,
|
||||
short_url: str = "") -> str:
|
||||
"""Persist a matched result to the history database. Returns short_id."""
|
||||
short_id = _make_short_id(backend, item.get("id", ""))
|
||||
db = _db()
|
||||
db = _db(bot)
|
||||
db.execute(
|
||||
"INSERT INTO results"
|
||||
" (channel, alert, backend, item_id, title, url, date, found_at,"
|
||||
@@ -1814,19 +1820,20 @@ def _delete(bot, key: str) -> None:
|
||||
|
||||
async def _poll_once(bot, key: str, announce: bool = True) -> None:
|
||||
"""Single poll cycle for one alert subscription (all backends)."""
|
||||
data = _subscriptions.get(key)
|
||||
ps = _ps(bot)
|
||||
data = ps["subs"].get(key)
|
||||
if data is None:
|
||||
data = _load(bot, key)
|
||||
if data is None:
|
||||
return
|
||||
_subscriptions[key] = data
|
||||
ps["subs"][key] = data
|
||||
|
||||
keyword = data["keyword"]
|
||||
now = datetime.now(timezone.utc).isoformat()
|
||||
data["last_poll"] = now
|
||||
|
||||
cycle = _poll_count[key] = _poll_count.get(key, 0) + 1
|
||||
tag_errors = _errors.setdefault(key, {})
|
||||
cycle = ps["poll_count"][key] = ps["poll_count"].get(key, 0) + 1
|
||||
tag_errors = ps["errors"].setdefault(key, {})
|
||||
loop = asyncio.get_running_loop()
|
||||
|
||||
for tag, backend in _BACKENDS.items():
|
||||
@@ -1917,7 +1924,7 @@ async def _poll_once(bot, key: str, announce: bool = True) -> None:
|
||||
except Exception:
|
||||
pass
|
||||
short_id = _save_result(
|
||||
channel, name, tag, item, short_url=short_url,
|
||||
bot, channel, name, tag, item, short_url=short_url,
|
||||
)
|
||||
title = item["title"] or "(no title)"
|
||||
extra = item.get("extra", "")
|
||||
@@ -1938,7 +1945,7 @@ async def _poll_once(bot, key: str, announce: bool = True) -> None:
|
||||
seen_list = seen_list[-_MAX_SEEN:]
|
||||
data.setdefault("seen", {})[tag] = seen_list
|
||||
|
||||
_subscriptions[key] = data
|
||||
ps["subs"][key] = data
|
||||
_save(bot, key, data)
|
||||
|
||||
|
||||
@@ -1946,7 +1953,7 @@ async def _poll_loop(bot, key: str) -> None:
|
||||
"""Infinite poll loop for one alert subscription."""
|
||||
try:
|
||||
while True:
|
||||
data = _subscriptions.get(key) or _load(bot, key)
|
||||
data = _ps(bot)["subs"].get(key) or _load(bot, key)
|
||||
if data is None:
|
||||
return
|
||||
interval = data.get("interval", _DEFAULT_INTERVAL)
|
||||
@@ -1958,35 +1965,38 @@ async def _poll_loop(bot, key: str) -> None:
|
||||
|
||||
def _start_poller(bot, key: str) -> None:
|
||||
"""Create and track a poller task."""
|
||||
existing = _pollers.get(key)
|
||||
ps = _ps(bot)
|
||||
existing = ps["pollers"].get(key)
|
||||
if existing and not existing.done():
|
||||
return
|
||||
task = asyncio.create_task(_poll_loop(bot, key))
|
||||
_pollers[key] = task
|
||||
ps["pollers"][key] = task
|
||||
|
||||
|
||||
def _stop_poller(key: str) -> None:
|
||||
def _stop_poller(bot, key: str) -> None:
|
||||
"""Cancel and remove a poller task."""
|
||||
task = _pollers.pop(key, None)
|
||||
ps = _ps(bot)
|
||||
task = ps["pollers"].pop(key, None)
|
||||
if task and not task.done():
|
||||
task.cancel()
|
||||
_subscriptions.pop(key, None)
|
||||
_errors.pop(key, None)
|
||||
_poll_count.pop(key, None)
|
||||
ps["subs"].pop(key, None)
|
||||
ps["errors"].pop(key, None)
|
||||
ps["poll_count"].pop(key, None)
|
||||
|
||||
|
||||
# -- Restore on connect -----------------------------------------------------
|
||||
|
||||
def _restore(bot) -> None:
|
||||
"""Rebuild pollers from persisted state."""
|
||||
ps = _ps(bot)
|
||||
for key in bot.state.keys("alert"):
|
||||
existing = _pollers.get(key)
|
||||
existing = ps["pollers"].get(key)
|
||||
if existing and not existing.done():
|
||||
continue
|
||||
data = _load(bot, key)
|
||||
if data is None:
|
||||
continue
|
||||
_subscriptions[key] = data
|
||||
ps["subs"][key] = data
|
||||
_start_poller(bot, key)
|
||||
|
||||
|
||||
@@ -2056,9 +2066,9 @@ async def cmd_alert(bot, message):
|
||||
if data is None:
|
||||
await bot.reply(message, f"No alert '{name}' in this channel")
|
||||
return
|
||||
_subscriptions[key] = data
|
||||
_ps(bot)["subs"][key] = data
|
||||
await _poll_once(bot, key, announce=True)
|
||||
data = _subscriptions.get(key, data)
|
||||
data = _ps(bot)["subs"].get(key, data)
|
||||
errs = data.get("last_errors", {})
|
||||
if errs:
|
||||
tags = ", ".join(sorted(errs))
|
||||
@@ -2087,7 +2097,7 @@ async def cmd_alert(bot, message):
|
||||
limit = max(1, min(int(parts[3]), 20))
|
||||
except ValueError:
|
||||
limit = 5
|
||||
db = _db()
|
||||
db = _db(bot)
|
||||
rows = db.execute(
|
||||
"SELECT id, backend, title, url, date, found_at, short_id,"
|
||||
" short_url, extra FROM results"
|
||||
@@ -2141,7 +2151,7 @@ async def cmd_alert(bot, message):
|
||||
return
|
||||
short_id = parts[2].lower()
|
||||
channel = message.target
|
||||
db = _db()
|
||||
db = _db(bot)
|
||||
row = db.execute(
|
||||
"SELECT alert, backend, title, url, date, found_at, short_id,"
|
||||
" extra"
|
||||
@@ -2216,7 +2226,7 @@ async def cmd_alert(bot, message):
|
||||
"seen": {},
|
||||
}
|
||||
_save(bot, key, data)
|
||||
_subscriptions[key] = data
|
||||
_ps(bot)["subs"][key] = data
|
||||
|
||||
# Seed seen IDs in background (silent poll), then start the poller
|
||||
async def _seed():
|
||||
@@ -2251,7 +2261,7 @@ async def cmd_alert(bot, message):
|
||||
await bot.reply(message, f"No alert '{name}' in this channel")
|
||||
return
|
||||
|
||||
_stop_poller(key)
|
||||
_stop_poller(bot, key)
|
||||
_delete(bot, key)
|
||||
await bot.reply(message, f"Removed '{name}'")
|
||||
return
|
||||
|
||||
Reference in New Issue
Block a user