diff --git a/plugins/username.py b/plugins/username.py new file mode 100644 index 0000000..0efbf0a --- /dev/null +++ b/plugins/username.py @@ -0,0 +1,334 @@ +"""Plugin: cross-platform username enumeration. + +Check username availability across ~25 services using HTTP probes +and public JSON APIs. Hybrid approach -- official APIs (GitHub, +GitLab, Docker Hub, Keybase, Dev.to, Reddit) where available, +HTTP status probes for the rest. +""" + +from __future__ import annotations + +import asyncio +import json +import logging +import re +import ssl +import urllib.request +from concurrent.futures import ThreadPoolExecutor +from dataclasses import dataclass + +from derp.plugin import command + +log = logging.getLogger(__name__) + +_USERNAME_RE = re.compile(r"^[a-zA-Z0-9._-]{1,39}$") +_USER_AGENT = "derp/1.0" +_TIMEOUT = 8 +_OVERALL_TIMEOUT = 20.0 + +_pool = ThreadPoolExecutor(max_workers=8, thread_name_prefix="username") + + +# -- service definition ------------------------------------------------------ + +@dataclass(slots=True, frozen=True) +class _Service: + """A single service to probe for username existence.""" + + name: str # Display name ("GitHub") + url: str # URL template with {user} placeholder + method: str # "status" | "json" | "body" + category: str # "dev" | "social" | "media" | "other" + + +_SERVICES: tuple[_Service, ...] = ( + # -- dev -- + _Service("GitHub", "https://api.github.com/users/{user}", "json", "dev"), + _Service("GitLab", "https://gitlab.com/api/v4/users?username={user}", "json", "dev"), + _Service("Codeberg", "https://codeberg.org/{user}", "status", "dev"), + _Service("Docker Hub", "https://hub.docker.com/v2/users/{user}/", "json", "dev"), + _Service("PyPI", "https://pypi.org/user/{user}/", "status", "dev"), + _Service("npm", "https://www.npmjs.com/~{user}", "status", "dev"), + _Service( + "Keybase", + "https://keybase.io/_/api/1.0/user/lookup.json?usernames={user}", + "json", "dev", + ), + _Service("Dev.to", "https://dev.to/api/users/by_username?url={user}", "json", "dev"), + _Service("HackerOne", "https://hackerone.com/{user}", "status", "dev"), + # -- social -- + _Service("Reddit", "https://www.reddit.com/user/{user}/about.json", "json", "social"), + _Service("Twitter/X", "https://nitter.net/{user}", "status", "social"), + _Service("Instagram", "https://www.instagram.com/{user}/", "status", "social"), + _Service("TikTok", "https://www.tiktok.com/@{user}", "status", "social"), + _Service("Pinterest", "https://www.pinterest.com/{user}/", "status", "social"), + _Service("Telegram", "https://t.me/{user}", "body", "social"), + _Service("LinkedIn", "https://www.linkedin.com/in/{user}", "status", "social"), + _Service("Medium", "https://medium.com/@{user}", "status", "social"), + # -- media -- + _Service("Twitch", "https://www.twitch.tv/{user}", "status", "media"), + _Service("Spotify", "https://open.spotify.com/user/{user}", "status", "media"), + _Service("SoundCloud", "https://soundcloud.com/{user}", "status", "media"), + _Service("YouTube", "https://www.youtube.com/@{user}", "status", "media"), + # -- other -- + _Service("Steam", "https://steamcommunity.com/id/{user}", "status", "other"), + _Service("Pastebin", "https://pastebin.com/u/{user}", "status", "other"), + _Service("Gravatar", "https://en.gravatar.com/{user}.json", "json", "other"), + _Service("About.me", "https://about.me/{user}", "status", "other"), +) + +# Lookup tables built once at import time. +_BY_NAME: dict[str, _Service] = {s.name.lower(): s for s in _SERVICES} +_BY_CATEGORY: dict[str, list[_Service]] = {} +for _s in _SERVICES: + _BY_CATEGORY.setdefault(_s.category, []).append(_s) + + +# -- HTTP helper (blocking, runs in thread pool) ----------------------------- + +def _http_get(url: str, timeout: int = _TIMEOUT) -> tuple[int, str]: + """Blocking GET. Returns (status_code, body). (0, "") on error.""" + ctx = ssl.create_default_context() + ctx.check_hostname = False + ctx.verify_mode = ssl.CERT_NONE + + req = urllib.request.Request(url, headers={"User-Agent": _USER_AGENT}) + try: + with urllib.request.urlopen(req, timeout=timeout, context=ctx) as resp: + body = resp.read().decode("utf-8", errors="replace") + return resp.status, body + except urllib.error.HTTPError as exc: + return exc.code, "" + except Exception: + return 0, "" + + +# -- result classification --------------------------------------------------- + +def classify_status(status: int) -> str: + """Classify HTTP status code into found/not_found/error.""" + if status == 200: + return "found" + if status == 404: + return "not_found" + if status == 0: + return "error" + # 301/302 redirects to login/homepage typically mean not found, + # but some services redirect to the profile (found). Treat + # ambiguous codes as errors to avoid false positives. + return "error" + + +def classify_json(svc: _Service, status: int, body: str) -> str: + """Classify JSON API response with service-specific validation.""" + if status == 404: + return "not_found" + if status == 0 or not body: + return "error" + + try: + data = json.loads(body) + except (json.JSONDecodeError, ValueError): + return "error" + + name = svc.name + + if name == "GitHub": + # 200 with a login field means the user exists. + return "found" if data.get("login") else "not_found" + + if name == "GitLab": + # Returns an array; empty array means not found. + if isinstance(data, list): + return "found" if len(data) > 0 else "not_found" + return "error" + + if name == "Docker Hub": + return "found" if data.get("id") else "not_found" + + if name == "Keybase": + them = data.get("them", []) + if isinstance(them, list) and len(them) > 0: + return "found" + return "not_found" + + if name == "Dev.to": + return "found" if data.get("username") else "not_found" + + if name == "Reddit": + # Reddit returns {"error": 404} for missing users. + if data.get("error"): + return "not_found" + kind = data.get("kind", "") + return "found" if kind == "t2" else "not_found" + + if name == "Gravatar": + return "found" if data.get("entry") else "not_found" + + # Fallback: if we got 200 with parseable JSON, assume found. + return "found" if status == 200 else "error" + + +def classify_body(status: int, body: str) -> str: + """Classify body-based response (Telegram).""" + if status == 0: + return "error" + # Telegram t.me pages show "If you have Telegram" + # when the user does NOT exist (preview page). + if "tgme_page_extra" in body: + return "found" + return "not_found" + + +def classify(svc: _Service, status: int, body: str) -> str: + """Route to the correct classifier based on service method.""" + if svc.method == "json": + return classify_json(svc, status, body) + if svc.method == "body": + return classify_body(status, body) + return classify_status(status) + + +# -- async check ------------------------------------------------------------- + +async def _check(svc: _Service, user: str) -> tuple[str, str]: + """Check a single service. Returns (service_name, result).""" + url = svc.url.format(user=user) + loop = asyncio.get_running_loop() + try: + status, body = await asyncio.wait_for( + loop.run_in_executor(_pool, _http_get, url), + timeout=float(_TIMEOUT), + ) + except TimeoutError: + return svc.name, "error" + except Exception: + return svc.name, "error" + + return svc.name, classify(svc, status, body) + + +# -- formatting --------------------------------------------------------------- + +def _profile_url(svc: _Service, user: str) -> str: + """Build a human-friendly profile URL for display.""" + url = svc.url.format(user=user) + # For API endpoints, return a browser-friendly URL instead. + name = svc.name + if name == "GitHub": + return f"https://github.com/{user}" + if name == "GitLab": + return f"https://gitlab.com/{user}" + if name == "Docker Hub": + return f"https://hub.docker.com/u/{user}" + if name == "Keybase": + return f"https://keybase.io/{user}" + if name == "Dev.to": + return f"https://dev.to/{user}" + if name == "Reddit": + return f"https://www.reddit.com/user/{user}" + if name == "Gravatar": + return f"https://en.gravatar.com/{user}" + # For status/body probes, the URL itself is the profile page. + return url + + +def format_summary(user: str, results: list[tuple[str, str]]) -> list[str]: + """Format full-scan results into IRC output lines.""" + found = [name for name, r in results if r == "found"] + not_found = [name for name, r in results if r == "not_found"] + errors = [name for name, r in results if r == "error"] + + lines = [ + f'{user} -- {len(found)} found, {len(not_found)} not found, ' + f'{len(errors)} errors', + ] + if found: + lines.append(f"Found: {', '.join(found)}") + return lines + + +def format_single(svc: _Service, user: str, result: str) -> str: + """Format a single-service check result.""" + if result == "found": + url = _profile_url(svc, user) + return f"{svc.name}: {user} -> found | {url}" + return f"{svc.name}: {user} -> {result}" + + +def format_list() -> list[str]: + """Format the service list grouped by category.""" + lines = [] + for cat in ("dev", "social", "media", "other"): + services = _BY_CATEGORY.get(cat, []) + names = ", ".join(s.name for s in services) + lines.append(f"{cat.title()}: {names}") + return lines + + +# -- command handler ---------------------------------------------------------- + +@command( + "username", + help="Username OSINT: !username [service] | !username list", +) +async def cmd_username(bot, message): + """Check username availability across multiple services.""" + parts = message.text.split() + + if len(parts) < 2: + await bot.reply( + message, + "Usage: !username | !username | !username list", + ) + return + + subcmd = parts[1] + + # -- list subcommand -- + if subcmd.lower() == "list": + for line in format_list(): + await bot.reply(message, line) + return + + user = subcmd + + # -- input validation -- + if not _USERNAME_RE.match(user): + await bot.reply(message, "Invalid username (alphanumeric, . _ - , max 39 chars)") + return + + # -- single service check -- + if len(parts) >= 3: + svc_name = parts[2].lower() + svc = _BY_NAME.get(svc_name) + if svc is None: + # Try case-insensitive partial match. + matches = [s for key, s in _BY_NAME.items() if svc_name in key] + if len(matches) == 1: + svc = matches[0] + else: + available = ", ".join(s.name for s in _SERVICES) + await bot.reply(message, f"Unknown service '{parts[2]}' -- available: {available}") + return + + name, result = await _check(svc, user) + await bot.reply(message, format_single(svc, user, result)) + return + + # -- full scan -- + total = len(_SERVICES) + await bot.reply(message, f'Checking "{user}" across {total} services...') + + try: + tasks = [_check(svc, user) for svc in _SERVICES] + results: list[tuple[str, str]] = await asyncio.wait_for( + asyncio.gather(*tasks), + timeout=_OVERALL_TIMEOUT, + ) + except TimeoutError: + await bot.reply(message, f"{user} -- scan timed out") + return + + for line in format_summary(user, results): + await bot.reply(message, line)