diff --git a/plugins/alert.py b/plugins/alert.py index fad1425..9c2cedc 100644 --- a/plugins/alert.py +++ b/plugins/alert.py @@ -58,34 +58,52 @@ def _truncate(text: str, max_len: int = _MAX_TITLE_LEN) -> str: return text[: max_len - 3].rstrip() + "..." +_DATE_PROPS = { + "article:published_time", "og:article:published_time", + "og:updated_time", "date", "dc.date", "dcterms.date", + "sailthru.date", +} + + class _OGParser(HTMLParser): - """Extract og:title and og:description from tags.""" + """Extract og:title, og:description, and published date from tags.""" def __init__(self): super().__init__() self.og_title = "" self.og_description = "" + self.published = "" def handle_starttag(self, tag: str, attrs: list[tuple[str, str | None]]) -> None: if tag != "meta": return attr_map = {k.lower(): (v or "") for k, v in attrs} - prop = attr_map.get("property", "") + prop = attr_map.get("property", "").lower() + name = attr_map.get("name", "").lower() content = attr_map.get("content", "") if prop == "og:title": self.og_title = content elif prop == "og:description": self.og_description = content + if not self.published and content: + if prop in _DATE_PROPS or name in _DATE_PROPS: + self.published = content _OG_TIMEOUT = 10 _OG_MAX_BYTES = 64 * 1024 # Only read first 64 KB (OG tags are in ) -def _fetch_og(url: str) -> tuple[str, str]: - """Fetch og:title and og:description from a URL. Blocking. +def _parse_date(raw: str) -> str: + """Try to extract a YYYY-MM-DD date from a raw date string.""" + m = re.search(r"\d{4}-\d{2}-\d{2}", raw) + return m.group(0) if m else "" - Returns (og_title, og_description). Empty strings on failure. + +def _fetch_og(url: str) -> tuple[str, str, str]: + """Fetch og:title, og:description, and published date from a URL. + + Returns (og_title, og_description, date). Empty strings on failure. """ try: req = urllib.request.Request(url, method="GET") @@ -96,10 +114,11 @@ def _fetch_og(url: str) -> tuple[str, str]: html = raw.decode("utf-8", errors="replace") parser = _OGParser() parser.feed(html) - return parser.og_title, parser.og_description + date = _parse_date(parser.published) + return parser.og_title, parser.og_description, date except Exception as exc: _log.debug("og fetch failed for %s: %s", url, exc) - return "", "" + return "", "", "" # -- YouTube InnerTube search (blocking) ------------------------------------ @@ -125,10 +144,18 @@ def _extract_videos(obj: object, depth: int = 0) -> list[dict]: else: title = "" if title: + # Extract relative publish time (e.g. "2 days ago") + pub_obj = obj.get("publishedTimeText") + date = "" + if isinstance(pub_obj, dict): + date = pub_obj.get("simpleText", "") + elif isinstance(pub_obj, str): + date = pub_obj results.append({ "id": video_id, "title": title, "url": f"https://www.youtube.com/watch?v={video_id}", + "date": date, "extra": "", }) for val in obj.values(): @@ -220,6 +247,7 @@ def _search_twitch(keyword: str) -> list[dict]: "id": f"stream:{stream_id}", "title": line, "url": f"https://twitch.tv/{login}", + "date": "", "extra": "", }) @@ -234,6 +262,7 @@ def _search_twitch(keyword: str) -> list[dict]: "id": f"vod:{vod_id}", "title": title, "url": f"https://twitch.tv/videos/{vod_id}", + "date": "", "extra": "", }) @@ -259,10 +288,12 @@ def _search_searx(keyword: str) -> list[dict]: for item in data.get("results", []): item_url = item.get("url", "") title = item.get("title", "") + date = _parse_date(item.get("publishedDate", "")) results.append({ "id": item_url, "title": title, "url": item_url, + "date": date, "extra": "", }) return results @@ -338,19 +369,27 @@ async def _poll_once(bot, key: str, announce: bool = True) -> None: title_l = item.get("title", "").lower() url_l = item.get("url", "").lower() if kw_lower in title_l or kw_lower in url_l: + # Fetch OG tags for date if backend didn't provide one + if not item.get("date") and item.get("url"): + _, _, og_date = await loop.run_in_executor( + None, _fetch_og, item["url"], + ) + if og_date: + item["date"] = og_date matched.append(item) continue # Fetch OG tags for items that didn't match on title/URL item_url = item.get("url", "") if item_url: - og_title, og_desc = await loop.run_in_executor( + og_title, og_desc, og_date = await loop.run_in_executor( None, _fetch_og, item_url, ) if (kw_lower in og_title.lower() or kw_lower in og_desc.lower()): - # Use og:title as display title if richer if og_title and len(og_title) > len(item.get("title", "")): item["title"] = og_title + if og_date and not item.get("date"): + item["date"] = og_date matched.append(item) if announce and matched: @@ -360,7 +399,11 @@ async def _poll_once(bot, key: str, announce: bool = True) -> None: for item in shown: title = _truncate(item["title"]) if item["title"] else "(no title)" url = item["url"] - line = f"[{name}/{tag}] {title}" + date = item.get("date", "") + line = f"[{name}/{tag}]" + if date: + line += f" ({date})" + line += f" {title}" if url: line += f" -- {url}" await bot.send(channel, line)