routes: add url shortener endpoints

This commit is contained in:
Username
2026-02-16 20:26:54 +01:00
parent 965a6eac0e
commit 727fc84784

View File

@@ -8,10 +8,12 @@ import json
import math
import re
import secrets
import string
import threading
import time
from collections import defaultdict
from typing import TYPE_CHECKING, Any
from urllib.parse import urlparse
from flask import Response, current_app, g, request
from flask.views import MethodView
@@ -27,6 +29,9 @@ from app.metrics import (
record_paste_deleted,
record_pow,
record_rate_limit,
record_url_accessed,
record_url_created,
record_url_deleted,
)
if TYPE_CHECKING:
@@ -36,6 +41,9 @@ if TYPE_CHECKING:
PASTE_ID_PATTERN = re.compile(r"^[a-f0-9]+$")
CLIENT_ID_PATTERN = re.compile(r"^[a-f0-9]{40}$")
MIME_PATTERN = re.compile(r"^[a-z0-9][a-z0-9!#$&\-^_.+]*/[a-z0-9][a-z0-9!#$&\-^_.+]*$")
SHORT_ID_PATTERN = re.compile(r"^[a-zA-Z0-9]+$")
SHORT_ID_ALPHABET = string.ascii_letters + string.digits
ALLOWED_URL_SCHEMES = frozenset({"http", "https"})
# NOTE: Magic byte detection commented out - using text/binary detection only.
# Security headers (X-Content-Type-Options: nosniff, CSP) prevent MIME confusion.
@@ -406,6 +414,16 @@ def paste_raw_url(paste_id: str) -> str:
return prefixed_url(f"/{paste_id}/raw")
def short_url_path(short_id: str) -> str:
"""Generate path for short URL redirect endpoint."""
return prefixed_url(f"/s/{short_id}")
def short_url_info_path(short_id: str) -> str:
"""Generate path for short URL info endpoint."""
return prefixed_url(f"/s/{short_id}/info")
def base_url() -> str:
"""Detect full base URL from request headers."""
scheme = (
@@ -539,6 +557,88 @@ def fetch_paste(paste_id: str, check_password: bool = True) -> Response | None:
return None
def generate_short_id() -> str:
"""Generate a random base62 short ID."""
length = current_app.config["SHORT_ID_LENGTH"]
return "".join(secrets.choice(SHORT_ID_ALPHABET) for _ in range(length))
def validate_short_id(short_id: str) -> Response | None:
"""Validate short URL ID format. Returns error response or None if valid."""
expected_length = current_app.config["SHORT_ID_LENGTH"]
if len(short_id) != expected_length or not SHORT_ID_PATTERN.match(short_id):
return error_response("Invalid short URL ID", 400)
return None
def validate_target_url(url: str) -> Response | None:
"""Validate target URL for shortening. Returns error response or None if valid."""
max_length = current_app.config["SHORT_URL_MAX_LENGTH"]
if len(url) > max_length:
return error_response(
"URL too long", 400, max_length=max_length, length=len(url)
)
parsed = urlparse(url)
if parsed.scheme not in ALLOWED_URL_SCHEMES:
return error_response(
"Invalid URL scheme", 400, allowed=list(ALLOWED_URL_SCHEMES)
)
if not parsed.netloc:
return error_response("Invalid URL: missing host", 400)
return None
def fetch_short_url(short_id: str, increment_counter: bool = True) -> Response | None:
"""Fetch short URL and store in g.short_url. Returns error response or None if OK."""
client_ip = get_client_ip()
allowed, retry_after = check_lookup_rate_limit(client_ip)
if not allowed:
response = error_response(
f"Lookup rate limit exceeded. Retry after {retry_after} seconds.",
429,
retry_after=retry_after,
)
response.headers["Retry-After"] = str(retry_after)
return response
db = get_db()
now = int(time.time())
if increment_counter:
db.execute(
"UPDATE short_urls SET last_accessed = ?, access_count = access_count + 1 WHERE id = ?",
(now, short_id),
)
else:
db.execute(
"UPDATE short_urls SET last_accessed = ? WHERE id = ?",
(now, short_id),
)
row = db.execute(
"""SELECT id, target_url, owner, created_at, last_accessed,
access_count, expires_at
FROM short_urls WHERE id = ?""",
(short_id,),
).fetchone()
if row is None:
db.commit()
return error_response("Short URL not found", 404)
# Check expiry
if row["expires_at"] and row["expires_at"] < now:
db.execute("DELETE FROM short_urls WHERE id = ?", (short_id,))
db.commit()
return error_response("Short URL expired", 404)
db.commit()
g.short_url = row
return None
def require_auth() -> Response | None:
"""Check authentication for ownership operations.
@@ -862,6 +962,11 @@ class IndexView(MethodView):
f"DELETE {prefixed_url('/<id>')}": "Delete paste (owner only)",
f"GET {prefixed_url('/register/challenge')}": "Get registration challenge",
f"POST {prefixed_url('/register')}": "Register for client certificate",
f"POST {prefixed_url('/s')}": "Create short URL (PoW required)",
f"GET {prefixed_url('/s')}": "List your short URLs (cert required)",
f"GET {prefixed_url('/s/<id>')}": "Redirect to target URL",
f"GET {prefixed_url('/s/<id>/info')}": "Short URL metadata",
f"DELETE {prefixed_url('/s/<id>')}": "Delete short URL (owner only)",
}
if pki_enabled:
@@ -1859,6 +1964,284 @@ class PastesListView(MethodView):
return json_response(response_data)
# ─────────────────────────────────────────────────────────────────────────────
# URL Shortener Views
# ─────────────────────────────────────────────────────────────────────────────
class ShortURLCreateView(MethodView):
"""Create short URLs."""
def post(self) -> Response:
"""Create a new short URL."""
# Parse URL from request body
target_url: str | None = None
if request.is_json:
data = request.get_json(silent=True)
if data and isinstance(data.get("url"), str):
target_url = data["url"].strip()
else:
raw = request.get_data(as_text=True).strip()
if raw:
target_url = raw
if not target_url:
return error_response("No URL provided", 400)
# Validate URL
if err := validate_target_url(target_url):
return err
# Auth and rate limiting
trusted_client = get_client_id()
owner = get_client_fingerprint()
client_ip = get_client_ip()
allowed, remaining, limit, reset_timestamp = check_rate_limit(
client_ip, authenticated=bool(trusted_client)
)
if not allowed:
record_rate_limit("blocked")
retry_after = max(1, reset_timestamp - int(time.time()))
response = error_response("Rate limit exceeded", 429, retry_after=retry_after)
response.headers["Retry-After"] = str(retry_after)
add_rate_limit_headers(response, 0, limit, reset_timestamp)
return response
# Proof-of-work
difficulty = current_app.config["POW_DIFFICULTY"]
if difficulty > 0:
token = request.headers.get("X-PoW-Token", "")
solution = request.headers.get("X-PoW-Solution", "")
if not token or not solution:
return error_response(
"Proof-of-work required", 400, hint="GET /challenge for a new challenge"
)
valid, err_msg = verify_pow(token, solution)
if not valid:
record_pow("failure")
return error_response(f"Proof-of-work failed: {err_msg}", 400)
record_pow("success")
# Dedup check (same URL within window)
url_hash = hashlib.sha256(target_url.encode("utf-8")).hexdigest()
is_allowed, dedup_count = check_content_hash(url_hash)
if not is_allowed:
record_dedup("blocked")
window = current_app.config["CONTENT_DEDUP_WINDOW"]
return error_response(
"Duplicate URL rate limit exceeded",
429,
count=dedup_count,
window_seconds=window,
)
record_dedup("allowed")
# Parse optional expiry
expires_at = None
expiry_header = request.headers.get("X-Expiry", "").strip()
if expiry_header:
try:
expiry_seconds = int(expiry_header)
if expiry_seconds > 0:
max_expiry = current_app.config.get("MAX_EXPIRY_SECONDS", 0)
if max_expiry > 0:
expiry_seconds = min(expiry_seconds, max_expiry)
expires_at = int(time.time()) + expiry_seconds
except ValueError:
pass
# Generate short ID and insert
short_id = generate_short_id()
now = int(time.time())
db = get_db()
db.execute(
"""INSERT INTO short_urls
(id, target_url, url_hash, owner, created_at, last_accessed, expires_at)
VALUES (?, ?, ?, ?, ?, ?, ?)""",
(short_id, target_url, url_hash, owner, now, now, expires_at),
)
db.commit()
record_antiflood_request()
log_event(
AuditEvent.URL_CREATE,
AuditOutcome.SUCCESS,
client_id=owner,
client_ip=client_ip,
details={"short_id": short_id, "target": target_url[:128]},
)
record_url_created("authenticated" if owner else "anonymous", "success")
response_data: dict[str, Any] = {
"id": short_id,
"url": short_url_path(short_id),
"target_url": target_url,
"created_at": now,
}
if owner:
response_data["owner"] = owner
if expires_at:
response_data["expires_at"] = expires_at
response = json_response(response_data, 201)
add_rate_limit_headers(response, remaining, limit, reset_timestamp)
return response
class ShortURLRedirectView(MethodView):
"""Redirect short URLs to their targets."""
def get(self, short_id: str) -> Response:
"""302 redirect to target URL."""
if err := validate_short_id(short_id):
return err
if err := fetch_short_url(short_id, increment_counter=True):
return err
row = g.short_url
log_event(
AuditEvent.URL_ACCESS,
AuditOutcome.SUCCESS,
client_ip=get_client_ip(),
details={"short_id": short_id},
)
record_url_accessed("authenticated" if get_client_fingerprint() else "anonymous")
response = Response(status=302)
response.headers["Location"] = row["target_url"]
response.headers["Cache-Control"] = "no-cache"
return response
def head(self, short_id: str) -> Response:
"""HEAD redirect to target URL."""
return self.get(short_id)
class ShortURLInfoView(MethodView):
"""Short URL metadata."""
def get(self, short_id: str) -> Response:
"""Return short URL metadata without incrementing counter."""
if err := validate_short_id(short_id):
return err
if err := fetch_short_url(short_id, increment_counter=False):
return err
row = g.short_url
data: dict[str, Any] = {
"id": row["id"],
"target_url": row["target_url"],
"created_at": row["created_at"],
"last_accessed": row["last_accessed"],
"access_count": row["access_count"],
"url": short_url_path(short_id),
}
if row["owner"]:
data["owner"] = row["owner"]
if row["expires_at"]:
data["expires_at"] = row["expires_at"]
return json_response(data)
class ShortURLDeleteView(MethodView):
"""Delete short URLs."""
def delete(self, short_id: str) -> Response:
"""Delete a short URL. Requires ownership or admin."""
if err := validate_short_id(short_id):
return err
if err := require_auth():
return err
db = get_db()
row = db.execute("SELECT owner FROM short_urls WHERE id = ?", (short_id,)).fetchone()
if row is None:
return error_response("Short URL not found", 404)
if row["owner"] != g.client_id and not is_admin():
return error_response("Permission denied", 403)
db.execute("DELETE FROM short_urls WHERE id = ?", (short_id,))
db.commit()
log_event(
AuditEvent.URL_DELETE,
AuditOutcome.SUCCESS,
client_id=g.client_id,
client_ip=get_client_ip(),
details={"short_id": short_id},
)
record_url_deleted("authenticated", "success")
return json_response({"message": "Short URL deleted"})
class ShortURLsListView(MethodView):
"""List short URLs owned by authenticated user."""
def get(self) -> Response:
"""List owned short URLs with pagination."""
if err := require_auth():
return err
client_id = g.client_id
try:
limit = min(int(request.args.get("limit", 50)), 200)
offset = max(int(request.args.get("offset", 0)), 0)
except (ValueError, TypeError):
limit, offset = 50, 0
db = get_db()
count_row = db.execute(
"SELECT COUNT(*) as total FROM short_urls WHERE owner = ?",
(client_id,),
).fetchone()
total = count_row["total"] if count_row else 0
rows = db.execute(
"""SELECT id, target_url, created_at, last_accessed, access_count, expires_at
FROM short_urls
WHERE owner = ?
ORDER BY created_at DESC
LIMIT ? OFFSET ?""",
(client_id, limit, offset),
).fetchall()
urls = []
for row in rows:
entry: dict[str, Any] = {
"id": row["id"],
"target_url": row["target_url"],
"created_at": row["created_at"],
"access_count": row["access_count"],
"url": short_url_path(row["id"]),
}
if row["expires_at"]:
entry["expires_at"] = row["expires_at"]
urls.append(entry)
return json_response({
"urls": urls,
"count": len(urls),
"total": total,
"limit": limit,
"offset": offset,
})
# ─────────────────────────────────────────────────────────────────────────────
# PKI Views (Certificate Authority)
# ─────────────────────────────────────────────────────────────────────────────
@@ -2284,6 +2667,21 @@ bp.add_url_rule(
"/<paste_id>", view_func=PasteDeleteView.as_view("paste_delete"), methods=["DELETE"]
)
# URL shortener endpoints
bp.add_url_rule("/s", view_func=ShortURLCreateView.as_view("short_url_create"), methods=["POST"])
bp.add_url_rule("/s", view_func=ShortURLsListView.as_view("short_urls_list"), methods=["GET"])
bp.add_url_rule(
"/s/<short_id>",
view_func=ShortURLRedirectView.as_view("short_url_redirect"),
methods=["GET", "HEAD"],
)
bp.add_url_rule("/s/<short_id>/info", view_func=ShortURLInfoView.as_view("short_url_info"))
bp.add_url_rule(
"/s/<short_id>",
view_func=ShortURLDeleteView.as_view("short_url_delete"),
methods=["DELETE"],
)
# PKI endpoints
bp.add_url_rule("/pki", view_func=PKIStatusView.as_view("pki_status"))
bp.add_url_rule("/pki/ca", view_func=PKICAGenerateView.as_view("pki_ca_generate"))