Add PySocks dependency and shared src/derp/http.py module providing proxy-aware urlopen() and build_opener() that route through socks5h://127.0.0.1:1080. Subclassed SocksiPyHandler passes SSL context through to HTTPS connections. Swapped 14 external-facing plugins to use the proxied helpers. Local-only traffic (SearXNG, raw DNS/TLS sockets) stays direct. Updated test mocks in test_twitch and test_alert accordingly.
336 lines
11 KiB
Python
336 lines
11 KiB
Python
"""Plugin: cross-platform username enumeration.
|
|
|
|
Check username availability across ~25 services using HTTP probes
|
|
and public JSON APIs. Hybrid approach -- official APIs (GitHub,
|
|
GitLab, Docker Hub, Keybase, Dev.to, Reddit) where available,
|
|
HTTP status probes for the rest.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import json
|
|
import logging
|
|
import re
|
|
import ssl
|
|
import urllib.request
|
|
from concurrent.futures import ThreadPoolExecutor
|
|
from dataclasses import dataclass
|
|
|
|
from derp.http import urlopen as _urlopen
|
|
from derp.plugin import command
|
|
|
|
log = logging.getLogger(__name__)
|
|
|
|
_USERNAME_RE = re.compile(r"^[a-zA-Z0-9._-]{1,39}$")
|
|
_USER_AGENT = "derp/1.0"
|
|
_TIMEOUT = 8
|
|
_OVERALL_TIMEOUT = 20.0
|
|
|
|
_pool = ThreadPoolExecutor(max_workers=8, thread_name_prefix="username")
|
|
|
|
|
|
# -- service definition ------------------------------------------------------
|
|
|
|
@dataclass(slots=True, frozen=True)
|
|
class _Service:
|
|
"""A single service to probe for username existence."""
|
|
|
|
name: str # Display name ("GitHub")
|
|
url: str # URL template with {user} placeholder
|
|
method: str # "status" | "json" | "body"
|
|
category: str # "dev" | "social" | "media" | "other"
|
|
|
|
|
|
_SERVICES: tuple[_Service, ...] = (
|
|
# -- dev --
|
|
_Service("GitHub", "https://api.github.com/users/{user}", "json", "dev"),
|
|
_Service("GitLab", "https://gitlab.com/api/v4/users?username={user}", "json", "dev"),
|
|
_Service("Codeberg", "https://codeberg.org/{user}", "status", "dev"),
|
|
_Service("Docker Hub", "https://hub.docker.com/v2/users/{user}/", "json", "dev"),
|
|
_Service("PyPI", "https://pypi.org/user/{user}/", "status", "dev"),
|
|
_Service("npm", "https://www.npmjs.com/~{user}", "status", "dev"),
|
|
_Service(
|
|
"Keybase",
|
|
"https://keybase.io/_/api/1.0/user/lookup.json?usernames={user}",
|
|
"json", "dev",
|
|
),
|
|
_Service("Dev.to", "https://dev.to/api/users/by_username?url={user}", "json", "dev"),
|
|
_Service("HackerOne", "https://hackerone.com/{user}", "status", "dev"),
|
|
# -- social --
|
|
_Service("Reddit", "https://www.reddit.com/user/{user}/about.json", "json", "social"),
|
|
_Service("Twitter/X", "https://nitter.net/{user}", "status", "social"),
|
|
_Service("Instagram", "https://www.instagram.com/{user}/", "status", "social"),
|
|
_Service("TikTok", "https://www.tiktok.com/@{user}", "status", "social"),
|
|
_Service("Pinterest", "https://www.pinterest.com/{user}/", "status", "social"),
|
|
_Service("Telegram", "https://t.me/{user}", "body", "social"),
|
|
_Service("LinkedIn", "https://www.linkedin.com/in/{user}", "status", "social"),
|
|
_Service("Medium", "https://medium.com/@{user}", "status", "social"),
|
|
# -- media --
|
|
_Service("Twitch", "https://www.twitch.tv/{user}", "status", "media"),
|
|
_Service("Spotify", "https://open.spotify.com/user/{user}", "status", "media"),
|
|
_Service("SoundCloud", "https://soundcloud.com/{user}", "status", "media"),
|
|
_Service("YouTube", "https://www.youtube.com/@{user}", "status", "media"),
|
|
# -- other --
|
|
_Service("Steam", "https://steamcommunity.com/id/{user}", "status", "other"),
|
|
_Service("Pastebin", "https://pastebin.com/u/{user}", "status", "other"),
|
|
_Service("Gravatar", "https://en.gravatar.com/{user}.json", "json", "other"),
|
|
_Service("About.me", "https://about.me/{user}", "status", "other"),
|
|
)
|
|
|
|
# Lookup tables built once at import time.
|
|
_BY_NAME: dict[str, _Service] = {s.name.lower(): s for s in _SERVICES}
|
|
_BY_CATEGORY: dict[str, list[_Service]] = {}
|
|
for _s in _SERVICES:
|
|
_BY_CATEGORY.setdefault(_s.category, []).append(_s)
|
|
|
|
|
|
# -- HTTP helper (blocking, runs in thread pool) -----------------------------
|
|
|
|
def _http_get(url: str, timeout: int = _TIMEOUT) -> tuple[int, str]:
|
|
"""Blocking GET. Returns (status_code, body). (0, "") on error."""
|
|
ctx = ssl.create_default_context()
|
|
ctx.check_hostname = False
|
|
ctx.verify_mode = ssl.CERT_NONE
|
|
|
|
req = urllib.request.Request(url, headers={"User-Agent": _USER_AGENT})
|
|
try:
|
|
with _urlopen(req, timeout=timeout, context=ctx) as resp:
|
|
body = resp.read().decode("utf-8", errors="replace")
|
|
return resp.status, body
|
|
except urllib.error.HTTPError as exc:
|
|
return exc.code, ""
|
|
except Exception:
|
|
return 0, ""
|
|
|
|
|
|
# -- result classification ---------------------------------------------------
|
|
|
|
def classify_status(status: int) -> str:
|
|
"""Classify HTTP status code into found/not_found/error."""
|
|
if status == 200:
|
|
return "found"
|
|
if status == 404:
|
|
return "not_found"
|
|
if status == 0:
|
|
return "error"
|
|
# 301/302 redirects to login/homepage typically mean not found,
|
|
# but some services redirect to the profile (found). Treat
|
|
# ambiguous codes as errors to avoid false positives.
|
|
return "error"
|
|
|
|
|
|
def classify_json(svc: _Service, status: int, body: str) -> str:
|
|
"""Classify JSON API response with service-specific validation."""
|
|
if status == 404:
|
|
return "not_found"
|
|
if status == 0 or not body:
|
|
return "error"
|
|
|
|
try:
|
|
data = json.loads(body)
|
|
except (json.JSONDecodeError, ValueError):
|
|
return "error"
|
|
|
|
name = svc.name
|
|
|
|
if name == "GitHub":
|
|
# 200 with a login field means the user exists.
|
|
return "found" if data.get("login") else "not_found"
|
|
|
|
if name == "GitLab":
|
|
# Returns an array; empty array means not found.
|
|
if isinstance(data, list):
|
|
return "found" if len(data) > 0 else "not_found"
|
|
return "error"
|
|
|
|
if name == "Docker Hub":
|
|
return "found" if data.get("id") else "not_found"
|
|
|
|
if name == "Keybase":
|
|
them = data.get("them", [])
|
|
if isinstance(them, list) and len(them) > 0:
|
|
return "found"
|
|
return "not_found"
|
|
|
|
if name == "Dev.to":
|
|
return "found" if data.get("username") else "not_found"
|
|
|
|
if name == "Reddit":
|
|
# Reddit returns {"error": 404} for missing users.
|
|
if data.get("error"):
|
|
return "not_found"
|
|
kind = data.get("kind", "")
|
|
return "found" if kind == "t2" else "not_found"
|
|
|
|
if name == "Gravatar":
|
|
return "found" if data.get("entry") else "not_found"
|
|
|
|
# Fallback: if we got 200 with parseable JSON, assume found.
|
|
return "found" if status == 200 else "error"
|
|
|
|
|
|
def classify_body(status: int, body: str) -> str:
|
|
"""Classify body-based response (Telegram)."""
|
|
if status == 0:
|
|
return "error"
|
|
# Telegram t.me pages show "If you have <strong>Telegram</strong>"
|
|
# when the user does NOT exist (preview page).
|
|
if "tgme_page_extra" in body:
|
|
return "found"
|
|
return "not_found"
|
|
|
|
|
|
def classify(svc: _Service, status: int, body: str) -> str:
|
|
"""Route to the correct classifier based on service method."""
|
|
if svc.method == "json":
|
|
return classify_json(svc, status, body)
|
|
if svc.method == "body":
|
|
return classify_body(status, body)
|
|
return classify_status(status)
|
|
|
|
|
|
# -- async check -------------------------------------------------------------
|
|
|
|
async def _check(svc: _Service, user: str) -> tuple[str, str]:
|
|
"""Check a single service. Returns (service_name, result)."""
|
|
url = svc.url.format(user=user)
|
|
loop = asyncio.get_running_loop()
|
|
try:
|
|
status, body = await asyncio.wait_for(
|
|
loop.run_in_executor(_pool, _http_get, url),
|
|
timeout=float(_TIMEOUT),
|
|
)
|
|
except TimeoutError:
|
|
return svc.name, "error"
|
|
except Exception:
|
|
return svc.name, "error"
|
|
|
|
return svc.name, classify(svc, status, body)
|
|
|
|
|
|
# -- formatting ---------------------------------------------------------------
|
|
|
|
def _profile_url(svc: _Service, user: str) -> str:
|
|
"""Build a human-friendly profile URL for display."""
|
|
url = svc.url.format(user=user)
|
|
# For API endpoints, return a browser-friendly URL instead.
|
|
name = svc.name
|
|
if name == "GitHub":
|
|
return f"https://github.com/{user}"
|
|
if name == "GitLab":
|
|
return f"https://gitlab.com/{user}"
|
|
if name == "Docker Hub":
|
|
return f"https://hub.docker.com/u/{user}"
|
|
if name == "Keybase":
|
|
return f"https://keybase.io/{user}"
|
|
if name == "Dev.to":
|
|
return f"https://dev.to/{user}"
|
|
if name == "Reddit":
|
|
return f"https://www.reddit.com/user/{user}"
|
|
if name == "Gravatar":
|
|
return f"https://en.gravatar.com/{user}"
|
|
# For status/body probes, the URL itself is the profile page.
|
|
return url
|
|
|
|
|
|
def format_summary(user: str, results: list[tuple[str, str]]) -> list[str]:
|
|
"""Format full-scan results into IRC output lines."""
|
|
found = [name for name, r in results if r == "found"]
|
|
not_found = [name for name, r in results if r == "not_found"]
|
|
errors = [name for name, r in results if r == "error"]
|
|
|
|
lines = [
|
|
f'{user} -- {len(found)} found, {len(not_found)} not found, '
|
|
f'{len(errors)} errors',
|
|
]
|
|
if found:
|
|
lines.append(f"Found: {', '.join(found)}")
|
|
return lines
|
|
|
|
|
|
def format_single(svc: _Service, user: str, result: str) -> str:
|
|
"""Format a single-service check result."""
|
|
if result == "found":
|
|
url = _profile_url(svc, user)
|
|
return f"{svc.name}: {user} -> found | {url}"
|
|
return f"{svc.name}: {user} -> {result}"
|
|
|
|
|
|
def format_list() -> list[str]:
|
|
"""Format the service list grouped by category."""
|
|
lines = []
|
|
for cat in ("dev", "social", "media", "other"):
|
|
services = _BY_CATEGORY.get(cat, [])
|
|
names = ", ".join(s.name for s in services)
|
|
lines.append(f"{cat.title()}: {names}")
|
|
return lines
|
|
|
|
|
|
# -- command handler ----------------------------------------------------------
|
|
|
|
@command(
|
|
"username",
|
|
help="Username OSINT: !username <user> [service] | !username list",
|
|
)
|
|
async def cmd_username(bot, message):
|
|
"""Check username availability across multiple services."""
|
|
parts = message.text.split()
|
|
|
|
if len(parts) < 2:
|
|
await bot.reply(
|
|
message,
|
|
"Usage: !username <user> | !username <user> <service> | !username list",
|
|
)
|
|
return
|
|
|
|
subcmd = parts[1]
|
|
|
|
# -- list subcommand --
|
|
if subcmd.lower() == "list":
|
|
for line in format_list():
|
|
await bot.reply(message, line)
|
|
return
|
|
|
|
user = subcmd
|
|
|
|
# -- input validation --
|
|
if not _USERNAME_RE.match(user):
|
|
await bot.reply(message, "Invalid username (alphanumeric, . _ - , max 39 chars)")
|
|
return
|
|
|
|
# -- single service check --
|
|
if len(parts) >= 3:
|
|
svc_name = parts[2].lower()
|
|
svc = _BY_NAME.get(svc_name)
|
|
if svc is None:
|
|
# Try case-insensitive partial match.
|
|
matches = [s for key, s in _BY_NAME.items() if svc_name in key]
|
|
if len(matches) == 1:
|
|
svc = matches[0]
|
|
else:
|
|
available = ", ".join(s.name for s in _SERVICES)
|
|
await bot.reply(message, f"Unknown service '{parts[2]}' -- available: {available}")
|
|
return
|
|
|
|
name, result = await _check(svc, user)
|
|
await bot.reply(message, format_single(svc, user, result))
|
|
return
|
|
|
|
# -- full scan --
|
|
total = len(_SERVICES)
|
|
await bot.reply(message, f'Checking "{user}" across {total} services...')
|
|
|
|
try:
|
|
tasks = [_check(svc, user) for svc in _SERVICES]
|
|
results: list[tuple[str, str]] = await asyncio.wait_for(
|
|
asyncio.gather(*tasks),
|
|
timeout=_OVERALL_TIMEOUT,
|
|
)
|
|
except TimeoutError:
|
|
await bot.reply(message, f"{user} -- scan timed out")
|
|
return
|
|
|
|
for line in format_summary(user, results):
|
|
await bot.reply(message, line)
|