diff --git a/app/api/routes.py b/app/api/routes.py index fed674e..0706280 100644 --- a/app/api/routes.py +++ b/app/api/routes.py @@ -8,10 +8,12 @@ import json import math import re import secrets +import string import threading import time from collections import defaultdict from typing import TYPE_CHECKING, Any +from urllib.parse import urlparse from flask import Response, current_app, g, request from flask.views import MethodView @@ -27,6 +29,9 @@ from app.metrics import ( record_paste_deleted, record_pow, record_rate_limit, + record_url_accessed, + record_url_created, + record_url_deleted, ) if TYPE_CHECKING: @@ -36,6 +41,9 @@ if TYPE_CHECKING: PASTE_ID_PATTERN = re.compile(r"^[a-f0-9]+$") CLIENT_ID_PATTERN = re.compile(r"^[a-f0-9]{40}$") MIME_PATTERN = re.compile(r"^[a-z0-9][a-z0-9!#$&\-^_.+]*/[a-z0-9][a-z0-9!#$&\-^_.+]*$") +SHORT_ID_PATTERN = re.compile(r"^[a-zA-Z0-9]+$") +SHORT_ID_ALPHABET = string.ascii_letters + string.digits +ALLOWED_URL_SCHEMES = frozenset({"http", "https"}) # NOTE: Magic byte detection commented out - using text/binary detection only. # Security headers (X-Content-Type-Options: nosniff, CSP) prevent MIME confusion. @@ -406,6 +414,16 @@ def paste_raw_url(paste_id: str) -> str: return prefixed_url(f"/{paste_id}/raw") +def short_url_path(short_id: str) -> str: + """Generate path for short URL redirect endpoint.""" + return prefixed_url(f"/s/{short_id}") + + +def short_url_info_path(short_id: str) -> str: + """Generate path for short URL info endpoint.""" + return prefixed_url(f"/s/{short_id}/info") + + def base_url() -> str: """Detect full base URL from request headers.""" scheme = ( @@ -539,6 +557,88 @@ def fetch_paste(paste_id: str, check_password: bool = True) -> Response | None: return None +def generate_short_id() -> str: + """Generate a random base62 short ID.""" + length = current_app.config["SHORT_ID_LENGTH"] + return "".join(secrets.choice(SHORT_ID_ALPHABET) for _ in range(length)) + + +def validate_short_id(short_id: str) -> Response | None: + """Validate short URL ID format. Returns error response or None if valid.""" + expected_length = current_app.config["SHORT_ID_LENGTH"] + if len(short_id) != expected_length or not SHORT_ID_PATTERN.match(short_id): + return error_response("Invalid short URL ID", 400) + return None + + +def validate_target_url(url: str) -> Response | None: + """Validate target URL for shortening. Returns error response or None if valid.""" + max_length = current_app.config["SHORT_URL_MAX_LENGTH"] + if len(url) > max_length: + return error_response( + "URL too long", 400, max_length=max_length, length=len(url) + ) + + parsed = urlparse(url) + if parsed.scheme not in ALLOWED_URL_SCHEMES: + return error_response( + "Invalid URL scheme", 400, allowed=list(ALLOWED_URL_SCHEMES) + ) + if not parsed.netloc: + return error_response("Invalid URL: missing host", 400) + + return None + + +def fetch_short_url(short_id: str, increment_counter: bool = True) -> Response | None: + """Fetch short URL and store in g.short_url. Returns error response or None if OK.""" + client_ip = get_client_ip() + allowed, retry_after = check_lookup_rate_limit(client_ip) + if not allowed: + response = error_response( + f"Lookup rate limit exceeded. Retry after {retry_after} seconds.", + 429, + retry_after=retry_after, + ) + response.headers["Retry-After"] = str(retry_after) + return response + + db = get_db() + now = int(time.time()) + + if increment_counter: + db.execute( + "UPDATE short_urls SET last_accessed = ?, access_count = access_count + 1 WHERE id = ?", + (now, short_id), + ) + else: + db.execute( + "UPDATE short_urls SET last_accessed = ? WHERE id = ?", + (now, short_id), + ) + + row = db.execute( + """SELECT id, target_url, owner, created_at, last_accessed, + access_count, expires_at + FROM short_urls WHERE id = ?""", + (short_id,), + ).fetchone() + + if row is None: + db.commit() + return error_response("Short URL not found", 404) + + # Check expiry + if row["expires_at"] and row["expires_at"] < now: + db.execute("DELETE FROM short_urls WHERE id = ?", (short_id,)) + db.commit() + return error_response("Short URL expired", 404) + + db.commit() + g.short_url = row + return None + + def require_auth() -> Response | None: """Check authentication for ownership operations. @@ -862,6 +962,11 @@ class IndexView(MethodView): f"DELETE {prefixed_url('/')}": "Delete paste (owner only)", f"GET {prefixed_url('/register/challenge')}": "Get registration challenge", f"POST {prefixed_url('/register')}": "Register for client certificate", + f"POST {prefixed_url('/s')}": "Create short URL (PoW required)", + f"GET {prefixed_url('/s')}": "List your short URLs (cert required)", + f"GET {prefixed_url('/s/')}": "Redirect to target URL", + f"GET {prefixed_url('/s//info')}": "Short URL metadata", + f"DELETE {prefixed_url('/s/')}": "Delete short URL (owner only)", } if pki_enabled: @@ -1859,6 +1964,284 @@ class PastesListView(MethodView): return json_response(response_data) +# ───────────────────────────────────────────────────────────────────────────── +# URL Shortener Views +# ───────────────────────────────────────────────────────────────────────────── + + +class ShortURLCreateView(MethodView): + """Create short URLs.""" + + def post(self) -> Response: + """Create a new short URL.""" + # Parse URL from request body + target_url: str | None = None + + if request.is_json: + data = request.get_json(silent=True) + if data and isinstance(data.get("url"), str): + target_url = data["url"].strip() + else: + raw = request.get_data(as_text=True).strip() + if raw: + target_url = raw + + if not target_url: + return error_response("No URL provided", 400) + + # Validate URL + if err := validate_target_url(target_url): + return err + + # Auth and rate limiting + trusted_client = get_client_id() + owner = get_client_fingerprint() + client_ip = get_client_ip() + + allowed, remaining, limit, reset_timestamp = check_rate_limit( + client_ip, authenticated=bool(trusted_client) + ) + if not allowed: + record_rate_limit("blocked") + retry_after = max(1, reset_timestamp - int(time.time())) + response = error_response("Rate limit exceeded", 429, retry_after=retry_after) + response.headers["Retry-After"] = str(retry_after) + add_rate_limit_headers(response, 0, limit, reset_timestamp) + return response + + # Proof-of-work + difficulty = current_app.config["POW_DIFFICULTY"] + if difficulty > 0: + token = request.headers.get("X-PoW-Token", "") + solution = request.headers.get("X-PoW-Solution", "") + + if not token or not solution: + return error_response( + "Proof-of-work required", 400, hint="GET /challenge for a new challenge" + ) + + valid, err_msg = verify_pow(token, solution) + if not valid: + record_pow("failure") + return error_response(f"Proof-of-work failed: {err_msg}", 400) + record_pow("success") + + # Dedup check (same URL within window) + url_hash = hashlib.sha256(target_url.encode("utf-8")).hexdigest() + is_allowed, dedup_count = check_content_hash(url_hash) + if not is_allowed: + record_dedup("blocked") + window = current_app.config["CONTENT_DEDUP_WINDOW"] + return error_response( + "Duplicate URL rate limit exceeded", + 429, + count=dedup_count, + window_seconds=window, + ) + record_dedup("allowed") + + # Parse optional expiry + expires_at = None + expiry_header = request.headers.get("X-Expiry", "").strip() + if expiry_header: + try: + expiry_seconds = int(expiry_header) + if expiry_seconds > 0: + max_expiry = current_app.config.get("MAX_EXPIRY_SECONDS", 0) + if max_expiry > 0: + expiry_seconds = min(expiry_seconds, max_expiry) + expires_at = int(time.time()) + expiry_seconds + except ValueError: + pass + + # Generate short ID and insert + short_id = generate_short_id() + now = int(time.time()) + + db = get_db() + db.execute( + """INSERT INTO short_urls + (id, target_url, url_hash, owner, created_at, last_accessed, expires_at) + VALUES (?, ?, ?, ?, ?, ?, ?)""", + (short_id, target_url, url_hash, owner, now, now, expires_at), + ) + db.commit() + + record_antiflood_request() + + log_event( + AuditEvent.URL_CREATE, + AuditOutcome.SUCCESS, + client_id=owner, + client_ip=client_ip, + details={"short_id": short_id, "target": target_url[:128]}, + ) + + record_url_created("authenticated" if owner else "anonymous", "success") + + response_data: dict[str, Any] = { + "id": short_id, + "url": short_url_path(short_id), + "target_url": target_url, + "created_at": now, + } + if owner: + response_data["owner"] = owner + if expires_at: + response_data["expires_at"] = expires_at + + response = json_response(response_data, 201) + add_rate_limit_headers(response, remaining, limit, reset_timestamp) + return response + + +class ShortURLRedirectView(MethodView): + """Redirect short URLs to their targets.""" + + def get(self, short_id: str) -> Response: + """302 redirect to target URL.""" + if err := validate_short_id(short_id): + return err + if err := fetch_short_url(short_id, increment_counter=True): + return err + + row = g.short_url + + log_event( + AuditEvent.URL_ACCESS, + AuditOutcome.SUCCESS, + client_ip=get_client_ip(), + details={"short_id": short_id}, + ) + record_url_accessed("authenticated" if get_client_fingerprint() else "anonymous") + + response = Response(status=302) + response.headers["Location"] = row["target_url"] + response.headers["Cache-Control"] = "no-cache" + return response + + def head(self, short_id: str) -> Response: + """HEAD redirect to target URL.""" + return self.get(short_id) + + +class ShortURLInfoView(MethodView): + """Short URL metadata.""" + + def get(self, short_id: str) -> Response: + """Return short URL metadata without incrementing counter.""" + if err := validate_short_id(short_id): + return err + if err := fetch_short_url(short_id, increment_counter=False): + return err + + row = g.short_url + + data: dict[str, Any] = { + "id": row["id"], + "target_url": row["target_url"], + "created_at": row["created_at"], + "last_accessed": row["last_accessed"], + "access_count": row["access_count"], + "url": short_url_path(short_id), + } + if row["owner"]: + data["owner"] = row["owner"] + if row["expires_at"]: + data["expires_at"] = row["expires_at"] + + return json_response(data) + + +class ShortURLDeleteView(MethodView): + """Delete short URLs.""" + + def delete(self, short_id: str) -> Response: + """Delete a short URL. Requires ownership or admin.""" + if err := validate_short_id(short_id): + return err + if err := require_auth(): + return err + + db = get_db() + row = db.execute("SELECT owner FROM short_urls WHERE id = ?", (short_id,)).fetchone() + + if row is None: + return error_response("Short URL not found", 404) + + if row["owner"] != g.client_id and not is_admin(): + return error_response("Permission denied", 403) + + db.execute("DELETE FROM short_urls WHERE id = ?", (short_id,)) + db.commit() + + log_event( + AuditEvent.URL_DELETE, + AuditOutcome.SUCCESS, + client_id=g.client_id, + client_ip=get_client_ip(), + details={"short_id": short_id}, + ) + record_url_deleted("authenticated", "success") + + return json_response({"message": "Short URL deleted"}) + + +class ShortURLsListView(MethodView): + """List short URLs owned by authenticated user.""" + + def get(self) -> Response: + """List owned short URLs with pagination.""" + if err := require_auth(): + return err + + client_id = g.client_id + + try: + limit = min(int(request.args.get("limit", 50)), 200) + offset = max(int(request.args.get("offset", 0)), 0) + except (ValueError, TypeError): + limit, offset = 50, 0 + + db = get_db() + + count_row = db.execute( + "SELECT COUNT(*) as total FROM short_urls WHERE owner = ?", + (client_id,), + ).fetchone() + total = count_row["total"] if count_row else 0 + + rows = db.execute( + """SELECT id, target_url, created_at, last_accessed, access_count, expires_at + FROM short_urls + WHERE owner = ? + ORDER BY created_at DESC + LIMIT ? OFFSET ?""", + (client_id, limit, offset), + ).fetchall() + + urls = [] + for row in rows: + entry: dict[str, Any] = { + "id": row["id"], + "target_url": row["target_url"], + "created_at": row["created_at"], + "access_count": row["access_count"], + "url": short_url_path(row["id"]), + } + if row["expires_at"]: + entry["expires_at"] = row["expires_at"] + urls.append(entry) + + return json_response({ + "urls": urls, + "count": len(urls), + "total": total, + "limit": limit, + "offset": offset, + }) + + # ───────────────────────────────────────────────────────────────────────────── # PKI Views (Certificate Authority) # ───────────────────────────────────────────────────────────────────────────── @@ -2284,6 +2667,21 @@ bp.add_url_rule( "/", view_func=PasteDeleteView.as_view("paste_delete"), methods=["DELETE"] ) +# URL shortener endpoints +bp.add_url_rule("/s", view_func=ShortURLCreateView.as_view("short_url_create"), methods=["POST"]) +bp.add_url_rule("/s", view_func=ShortURLsListView.as_view("short_urls_list"), methods=["GET"]) +bp.add_url_rule( + "/s/", + view_func=ShortURLRedirectView.as_view("short_url_redirect"), + methods=["GET", "HEAD"], +) +bp.add_url_rule("/s//info", view_func=ShortURLInfoView.as_view("short_url_info")) +bp.add_url_rule( + "/s/", + view_func=ShortURLDeleteView.as_view("short_url_delete"), + methods=["DELETE"], +) + # PKI endpoints bp.add_url_rule("/pki", view_func=PKIStatusView.as_view("pki_status")) bp.add_url_rule("/pki/ca", view_func=PKICAGenerateView.as_view("pki_ca_generate"))