From 7063f8718e24f67b0f5833b75ae1e05532de0111 Mon Sep 17 00:00:00 2001 From: Username Date: Tue, 23 Dec 2025 22:39:50 +0100 Subject: [PATCH] feat: add observability and CLI enhancements Audit logging: - audit_log table with event tracking - app/audit.py module with log_event(), query_audit_log() - GET /audit endpoint (admin only) - configurable retention and cleanup Prometheus metrics: - app/metrics.py with custom counters - paste create/access/delete, rate limit, PoW, dedup metrics - instrumentation in API routes CLI clipboard integration: - fpaste create -C/--clipboard (read from clipboard) - fpaste create --copy-url (copy result URL) - fpaste get -c/--copy (copy content) - cross-platform: xclip, xsel, pbcopy, wl-copy Shell completions: - completions/ directory with bash/zsh/fish scripts - fpaste completion --shell command --- app/__init__.py | 5 + app/api/__init__.py | 12 ++ app/api/routes.py | 314 +++++++++++++++++++++++++++++++++++----- app/audit.py | 206 ++++++++++++++++++++++++++ app/config.py | 5 + app/database.py | 17 +++ app/metrics.py | 160 ++++++++++++++++++++ completions/fpaste.bash | 202 ++++++++++++++++++++++++++ completions/fpaste.fish | 148 +++++++++++++++++++ completions/fpaste.zsh | 203 ++++++++++++++++++++++++++ fpaste | 287 +++++++++++++++++++++++++++++++++++- tests/test_audit.py | 312 +++++++++++++++++++++++++++++++++++++++ tests/test_metrics.py | 179 +++++++++++++++++++++++ 13 files changed, 2003 insertions(+), 47 deletions(-) create mode 100644 app/audit.py create mode 100644 app/metrics.py create mode 100644 completions/fpaste.bash create mode 100644 completions/fpaste.fish create mode 100644 completions/fpaste.zsh create mode 100644 tests/test_audit.py create mode 100644 tests/test_metrics.py diff --git a/app/__init__.py b/app/__init__.py index efa31b8..0fc5eb9 100644 --- a/app/__init__.py +++ b/app/__init__.py @@ -190,6 +190,11 @@ def setup_metrics(app: Flask) -> None: metrics.info("flaskpaste_info", "FlaskPaste application info", version=VERSION) app.extensions["metrics"] = metrics + + # Setup custom metrics + from app.metrics import setup_custom_metrics + + setup_custom_metrics(app) except ImportError: app.logger.warning("prometheus_flask_exporter not available, metrics disabled") diff --git a/app/api/__init__.py b/app/api/__init__.py index a282448..088a30e 100644 --- a/app/api/__init__.py +++ b/app/api/__init__.py @@ -13,11 +13,13 @@ _cleanup_times: dict[str, float] = { "pastes": 0.0, "hashes": 0.0, "rate_limits": 0.0, + "audit": 0.0, } _CLEANUP_INTERVALS = { "pastes": 3600, # 1 hour "hashes": 900, # 15 minutes "rate_limits": 300, # 5 minutes + "audit": 86400, # 24 hours } @@ -61,5 +63,15 @@ def run_scheduled_cleanup(): if count > 0: current_app.logger.info(f"Cleaned up {count} rate limit entries") + # Cleanup old audit logs + if now - _cleanup_times["audit"] >= _CLEANUP_INTERVALS["audit"]: + _cleanup_times["audit"] = now + if current_app.config.get("AUDIT_ENABLED", True): + from app.audit import cleanup_old_audit_logs + + count = cleanup_old_audit_logs() + if count > 0: + current_app.logger.info(f"Cleaned up {count} old audit log entries") + from app.api import routes # noqa: E402, F401 diff --git a/app/api/routes.py b/app/api/routes.py index 1352f43..5b95326 100644 --- a/app/api/routes.py +++ b/app/api/routes.py @@ -19,6 +19,14 @@ from flask.views import MethodView from app.api import bp from app.config import VERSION from app.database import check_content_hash, get_db, hash_password, verify_password +from app.metrics import ( + record_dedup, + record_paste_accessed, + record_paste_created, + record_paste_deleted, + record_pow, + record_rate_limit, +) if TYPE_CHECKING: from sqlite3 import Row @@ -246,6 +254,16 @@ def prefixed_url(path: str) -> str: return f"{url_prefix()}{path}" +def paste_url(paste_id: str) -> str: + """Generate URL for paste metadata endpoint.""" + return prefixed_url(f"/{paste_id}") + + +def paste_raw_url(paste_id: str) -> str: + """Generate URL for raw paste content endpoint.""" + return prefixed_url(f"/{paste_id}/raw") + + def base_url() -> str: """Detect full base URL from request headers.""" scheme = ( @@ -257,6 +275,59 @@ def base_url() -> str: return f"{scheme}://{host}{url_prefix()}" +# ───────────────────────────────────────────────────────────────────────────── +# Response Builders +# ───────────────────────────────────────────────────────────────────────────── + + +def build_paste_metadata( + paste_id: str, + mime_type: str, + size: int, + created_at: int, + *, + owner: str | None = None, + burn_after_read: bool = False, + expires_at: int | None = None, + password_protected: bool = False, + include_owner: bool = False, + last_accessed: int | None = None, +) -> dict[str, Any]: + """Build standardized paste metadata response dict. + + Args: + paste_id: Paste identifier + mime_type: Content MIME type + size: Content size in bytes + created_at: Creation timestamp + owner: Owner fingerprint (included only if include_owner=True) + burn_after_read: Whether paste is burn-after-read + expires_at: Expiration timestamp + password_protected: Whether paste has password + include_owner: Whether to include owner in response + last_accessed: Last access timestamp (optional) + """ + data: dict[str, Any] = { + "id": paste_id, + "mime_type": mime_type, + "size": size, + "created_at": created_at, + "url": paste_url(paste_id), + "raw": paste_raw_url(paste_id), + } + if last_accessed is not None: + data["last_accessed"] = last_accessed + if include_owner and owner: + data["owner"] = owner + if burn_after_read: + data["burn_after_read"] = True + if expires_at: + data["expires_at"] = expires_at + if password_protected: + data["password_protected"] = True + return data + + # ───────────────────────────────────────────────────────────────────────────── # Validation Helpers (used within views) # ───────────────────────────────────────────────────────────────────────────── @@ -703,6 +774,17 @@ class IndexView(MethodView): current_app.logger.warning( "Rate limit exceeded: ip=%s trusted=%s", client_ip, bool(trusted_client) ) + # Audit log rate limit event + if current_app.config.get("AUDIT_ENABLED", True): + from app.audit import AuditEvent, AuditOutcome, log_event + + log_event( + AuditEvent.RATE_LIMIT, + AuditOutcome.BLOCKED, + client_id=owner, + client_ip=client_ip, + ) + record_rate_limit("blocked") response = error_response( "Rate limit exceeded", 429, @@ -729,8 +811,22 @@ class IndexView(MethodView): current_app.logger.warning( "PoW verification failed: %s from=%s", err, request.remote_addr ) + # Audit log PoW failure + if current_app.config.get("AUDIT_ENABLED", True): + from app.audit import AuditEvent, AuditOutcome, log_event + + log_event( + AuditEvent.POW_FAILURE, + AuditOutcome.FAILURE, + client_id=owner, + client_ip=client_ip, + details={"error": err}, + ) + record_pow("failure") return error_response(f"Proof-of-work failed: {err}", 400) + record_pow("success") + # Size limits (only trusted clients get elevated limits) content_size = len(content) max_size = ( @@ -807,6 +903,18 @@ class IndexView(MethodView): dedup_count, request.remote_addr, ) + # Audit log dedup block + if current_app.config.get("AUDIT_ENABLED", True): + from app.audit import AuditEvent, AuditOutcome, log_event + + log_event( + AuditEvent.DEDUP_BLOCK, + AuditOutcome.BLOCKED, + client_id=owner, + client_ip=client_ip, + details={"hash": content_hash[:16], "count": dedup_count}, + ) + record_dedup("blocked") return error_response( "Duplicate content rate limit exceeded", 429, @@ -814,6 +922,8 @@ class IndexView(MethodView): window_seconds=window, ) + record_dedup("allowed") + # Parse optional headers burn_header = request.headers.get("X-Burn-After-Read", "").strip().lower() burn_after_read = burn_header in ("true", "1", "yes") @@ -883,11 +993,11 @@ class IndexView(MethodView): ) db.commit() - # Build response + # Build response (creation response is intentionally different - no size) response_data: dict[str, Any] = { "id": paste_id, - "url": f"/{paste_id}", - "raw": f"/{paste_id}/raw", + "url": paste_url(paste_id), + "raw": paste_raw_url(paste_id), "mime_type": mime_type, "created_at": now, } @@ -903,6 +1013,21 @@ class IndexView(MethodView): # Record successful paste for anti-flood tracking record_antiflood_request() + # Audit log paste creation + if current_app.config.get("AUDIT_ENABLED", True): + from app.audit import AuditEvent, AuditOutcome, log_event + + log_event( + AuditEvent.PASTE_CREATE, + AuditOutcome.SUCCESS, + paste_id=paste_id, + client_id=owner, + client_ip=client_ip, + details={"size": content_size, "mime_type": mime_type}, + ) + + record_paste_created("authenticated" if owner else "anonymous", "success") + return json_response(response_data, 201) @@ -1135,21 +1260,17 @@ class PasteView(MethodView): row: Row = g.paste g.db.commit() - response_data: dict[str, Any] = { - "id": row["id"], - "mime_type": row["mime_type"], - "size": row["size"], - "created_at": row["created_at"], - "raw": f"/{paste_id}/raw", - } - if row["burn_after_read"]: - response_data["burn_after_read"] = True - if row["expires_at"]: - response_data["expires_at"] = row["expires_at"] - if row["password_hash"]: - response_data["password_protected"] = True - - return json_response(response_data) + return json_response( + build_paste_metadata( + paste_id=row["id"], + mime_type=row["mime_type"], + size=row["size"], + created_at=row["created_at"], + burn_after_read=bool(row["burn_after_read"]), + expires_at=row["expires_at"], + password_protected=bool(row["password_hash"]), + ) + ) def head(self, paste_id: str) -> Response: """Return paste metadata headers only.""" @@ -1254,6 +1375,19 @@ class PasteView(MethodView): db.execute(update_sql, update_params) db.commit() + # Audit log paste update + if current_app.config.get("AUDIT_ENABLED", True): + from app.audit import AuditEvent, AuditOutcome, log_event + + log_event( + AuditEvent.PASTE_UPDATE, + AuditOutcome.SUCCESS, + paste_id=paste_id, + client_id=g.client_id, + client_ip=get_client_ip(), + details={"fields": [f.split(" = ")[0] for f in update_fields]}, + ) + # Fetch updated paste for response updated = db.execute( """SELECT id, mime_type, length(content) as size, expires_at, @@ -1296,6 +1430,24 @@ class PasteRawView(MethodView): db.commit() + # Audit log paste access + if current_app.config.get("AUDIT_ENABLED", True): + from app.audit import AuditEvent, AuditOutcome, log_event + + log_event( + AuditEvent.PASTE_ACCESS, + AuditOutcome.SUCCESS, + paste_id=paste_id, + client_id=get_client_fingerprint(), + client_ip=get_client_ip(), + details={"burn": bool(burn_after_read)}, + ) + + record_paste_accessed( + "authenticated" if get_client_fingerprint() else "anonymous", + bool(burn_after_read), + ) + response = Response(row["content"], mimetype=row["mime_type"]) if row["mime_type"].startswith(("image/", "text/")): response.headers["Content-Disposition"] = "inline" @@ -1350,6 +1502,20 @@ class PasteDeleteView(MethodView): db.execute("DELETE FROM pastes WHERE id = ?", (paste_id,)) db.commit() + # Audit log paste deletion + if current_app.config.get("AUDIT_ENABLED", True): + from app.audit import AuditEvent, AuditOutcome, log_event + + log_event( + AuditEvent.PASTE_DELETE, + AuditOutcome.SUCCESS, + paste_id=paste_id, + client_id=g.client_id, + client_ip=get_client_ip(), + ) + + record_paste_deleted("authenticated", "success") + return json_response({"message": "Paste deleted"}) @@ -1457,27 +1623,21 @@ class PastesListView(MethodView): if type_filter: rows = [r for r in rows if fnmatch.fnmatch(r["mime_type"], type_filter)] - pastes = [] - for row in rows: - paste: dict[str, Any] = { - "id": row["id"], - "mime_type": row["mime_type"], - "size": row["size"], - "created_at": row["created_at"], - "last_accessed": row["last_accessed"], - "url": f"/{row['id']}", - "raw": f"/{row['id']}/raw", - } - # Include owner for admin view - if show_all and row["owner"]: - paste["owner"] = row["owner"] - if row["burn_after_read"]: - paste["burn_after_read"] = True - if row["expires_at"]: - paste["expires_at"] = row["expires_at"] - if row["password_protected"]: - paste["password_protected"] = True - pastes.append(paste) + pastes = [ + build_paste_metadata( + paste_id=row["id"], + mime_type=row["mime_type"], + size=row["size"], + created_at=row["created_at"], + owner=row["owner"], + burn_after_read=bool(row["burn_after_read"]), + expires_at=row["expires_at"], + password_protected=bool(row["password_protected"]), + include_owner=show_all, + last_accessed=row["last_accessed"], + ) + for row in rows + ] response_data: dict[str, Any] = { "pastes": pastes, @@ -1780,6 +1940,79 @@ class PKIRevokeView(MethodView): return json_response({"message": "Certificate revoked", "serial": serial}) +# ───────────────────────────────────────────────────────────────────────────── +# Audit Log View +# ───────────────────────────────────────────────────────────────────────────── + + +class AuditLogView(MethodView): + """Audit log query endpoint (admin only).""" + + def get(self) -> Response: + """Query audit log with filters. + + Query parameters: + - event_type: Filter by event type + - client_id: Filter by client fingerprint + - paste_id: Filter by paste ID + - outcome: Filter by outcome (success, failure, blocked) + - since: Filter by timestamp >= since + - until: Filter by timestamp <= until + - limit: Maximum results (default 100, max 500) + - offset: Pagination offset + """ + if err := require_auth(): + return err + if not is_admin(): + return error_response("Admin access required", 403) + + from app.audit import query_audit_log + + # Parse query parameters + event_type = request.args.get("event_type", "").strip() or None + client_id = request.args.get("client_id", "").strip() or None + paste_id = request.args.get("paste_id", "").strip() or None + outcome = request.args.get("outcome", "").strip() or None + + try: + since = int(request.args.get("since", 0)) or None + except (ValueError, TypeError): + since = None + try: + until = int(request.args.get("until", 0)) or None + except (ValueError, TypeError): + until = None + try: + limit = min(int(request.args.get("limit", 100)), 500) + except (ValueError, TypeError): + limit = 100 + try: + offset = max(int(request.args.get("offset", 0)), 0) + except (ValueError, TypeError): + offset = 0 + + entries, total = query_audit_log( + event_type=event_type, + client_id=client_id, + paste_id=paste_id, + outcome=outcome, + since=since, + until=until, + limit=limit, + offset=offset, + ) + + return json_response( + { + "entries": entries, + "count": len(entries), + "total": total, + "limit": limit, + "offset": offset, + } + ) + + # ───────────────────────────────────────────────────────────────────────────── # Route Registration # ───────────────────────────────────────────────────────────────────────────── @@ -1817,3 +2050,6 @@ bp.add_url_rule("/pki/certs", view_func=PKICertsView.as_view("pki_certs")) bp.add_url_rule( "/pki/revoke/", view_func=PKIRevokeView.as_view("pki_revoke"), methods=["POST"] ) + +# Audit log endpoint (admin only) +bp.add_url_rule("/audit", view_func=AuditLogView.as_view("audit_log")) diff --git a/app/audit.py b/app/audit.py new file mode 100644 index 0000000..0adc4ad --- /dev/null +++ b/app/audit.py @@ -0,0 +1,206 @@ +"""Audit logging for security events.""" + +from __future__ import annotations + +import json +import time +from enum import Enum +from typing import TYPE_CHECKING, Any + +from flask import current_app, g + +from app.database import get_db + +if TYPE_CHECKING: + from sqlite3 import Row + + +class AuditEvent(str, Enum): + """Security-relevant event types for audit logging.""" + + PASTE_CREATE = "paste_create" + PASTE_ACCESS = "paste_access" + PASTE_DELETE = "paste_delete" + PASTE_UPDATE = "paste_update" + RATE_LIMIT = "rate_limit" + POW_FAILURE = "pow_failure" + DEDUP_BLOCK = "dedup_block" + CERT_ISSUED = "cert_issued" + CERT_REVOKED = "cert_revoked" + AUTH_FAILURE = "auth_failure" + + +class AuditOutcome(str, Enum): + """Outcome types for audit events.""" + + SUCCESS = "success" + FAILURE = "failure" + BLOCKED = "blocked" + + +def log_event( + event_type: AuditEvent | str, + outcome: AuditOutcome | str, + *, + paste_id: str | None = None, + client_id: str | None = None, + client_ip: str | None = None, + details: dict[str, Any] | None = None, +) -> None: + """Log a security-relevant event to the audit log. + + Args: + event_type: Type of event (from AuditEvent enum or string) + outcome: Result of the event (success, failure, blocked) + paste_id: Related paste ID (if applicable) + client_id: Client certificate fingerprint (if authenticated) + client_ip: Client IP address + details: Additional event-specific details (stored as JSON) + """ + if not current_app.config.get("AUDIT_ENABLED", True): + return + + now = int(time.time()) + request_id = getattr(g, "request_id", None) + + # Convert enums to strings + event_type_str = event_type.value if isinstance(event_type, AuditEvent) else event_type + outcome_str = outcome.value if isinstance(outcome, AuditOutcome) else outcome + + # Serialize details to JSON if provided + details_json = json.dumps(details, ensure_ascii=False) if details else None + + db = get_db() + db.execute( + """INSERT INTO audit_log + (timestamp, event_type, client_id, client_ip, paste_id, request_id, outcome, details) + VALUES (?, ?, ?, ?, ?, ?, ?, ?)""", + ( + now, + event_type_str, + client_id, + client_ip, + paste_id, + request_id, + outcome_str, + details_json, + ), + ) + db.commit() + + +def query_audit_log( + *, + event_type: str | None = None, + client_id: str | None = None, + paste_id: str | None = None, + outcome: str | None = None, + since: int | None = None, + until: int | None = None, + limit: int = 100, + offset: int = 0, +) -> tuple[list[dict[str, Any]], int]: + """Query audit log with filters. + + Args: + event_type: Filter by event type + client_id: Filter by client fingerprint + paste_id: Filter by paste ID + outcome: Filter by outcome (success, failure, blocked) + since: Filter by timestamp >= since + until: Filter by timestamp <= until + limit: Maximum results to return + offset: Pagination offset + + Returns: + Tuple of (list of audit entries, total count matching filters) + """ + db = get_db() + + where_clauses: list[str] = [] + params: list[Any] = [] + + if event_type: + where_clauses.append("event_type = ?") + params.append(event_type) + if client_id: + where_clauses.append("client_id = ?") + params.append(client_id) + if paste_id: + where_clauses.append("paste_id = ?") + params.append(paste_id) + if outcome: + where_clauses.append("outcome = ?") + params.append(outcome) + if since: + where_clauses.append("timestamp >= ?") + params.append(since) + if until: + where_clauses.append("timestamp <= ?") + params.append(until) + + where_sql = " AND ".join(where_clauses) if where_clauses else "1=1" + + # Get total count + count_row = db.execute( + f"SELECT COUNT(*) as total FROM audit_log WHERE {where_sql}", # noqa: S608 + params, + ).fetchone() + total = count_row["total"] if count_row else 0 + + # Fetch entries + rows: list[Row] = db.execute( + f"""SELECT id, timestamp, event_type, client_id, client_ip, + paste_id, request_id, outcome, details + FROM audit_log + WHERE {where_sql} + ORDER BY timestamp DESC + LIMIT ? OFFSET ?""", # noqa: S608 + [*params, limit, offset], + ).fetchall() + + entries = [] + for row in rows: + entry: dict[str, Any] = { + "id": row["id"], + "timestamp": row["timestamp"], + "event_type": row["event_type"], + "outcome": row["outcome"], + } + if row["client_id"]: + entry["client_id"] = row["client_id"] + if row["client_ip"]: + entry["client_ip"] = row["client_ip"] + if row["paste_id"]: + entry["paste_id"] = row["paste_id"] + if row["request_id"]: + entry["request_id"] = row["request_id"] + if row["details"]: + try: + entry["details"] = json.loads(row["details"]) + except json.JSONDecodeError: + entry["details"] = row["details"] + entries.append(entry) + + return entries, total + + +def cleanup_old_audit_logs(retention_days: int | None = None) -> int: + """Delete audit log entries older than retention period. + + Args: + retention_days: Number of days to keep. If None, uses config. + + Returns: + Number of deleted entries. + """ + if retention_days is None: + retention_days = current_app.config.get("AUDIT_RETENTION_DAYS", 90) + + cutoff = int(time.time()) - (retention_days * 24 * 60 * 60) + + db = get_db() + cursor = db.execute("DELETE FROM audit_log WHERE timestamp < ?", (cutoff,)) + db.commit() + + return cursor.rowcount diff --git a/app/config.py b/app/config.py index 7cdc351..a7a2a82 100644 --- a/app/config.py +++ b/app/config.py @@ -100,6 +100,11 @@ class Config: # Authenticated users get higher limits (multiplier) RATE_LIMIT_AUTH_MULTIPLIER = int(os.environ.get("FLASKPASTE_RATE_AUTH_MULT", "5")) + # Audit Logging + # Track security-relevant events (paste creation, deletion, rate limits, etc.) + AUDIT_ENABLED = os.environ.get("FLASKPASTE_AUDIT", "1").lower() in ("1", "true", "yes") + AUDIT_RETENTION_DAYS = int(os.environ.get("FLASKPASTE_AUDIT_RETENTION", "90")) + # PKI Configuration # Enable PKI endpoints for certificate authority and issuance PKI_ENABLED = os.environ.get("FLASKPASTE_PKI_ENABLED", "0").lower() in ("1", "true", "yes") diff --git a/app/database.py b/app/database.py index fbc60ef..9acbf44 100644 --- a/app/database.py +++ b/app/database.py @@ -73,6 +73,23 @@ CREATE TABLE IF NOT EXISTS issued_certificates ( CREATE INDEX IF NOT EXISTS idx_certs_fingerprint ON issued_certificates(fingerprint_sha1); CREATE INDEX IF NOT EXISTS idx_certs_status ON issued_certificates(status); CREATE INDEX IF NOT EXISTS idx_certs_ca_id ON issued_certificates(ca_id); + +-- Audit log for security events +CREATE TABLE IF NOT EXISTS audit_log ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + timestamp INTEGER NOT NULL, + event_type TEXT NOT NULL, + client_id TEXT, + client_ip TEXT, + paste_id TEXT, + request_id TEXT, + outcome TEXT NOT NULL, + details TEXT +); + +CREATE INDEX IF NOT EXISTS idx_audit_timestamp ON audit_log(timestamp); +CREATE INDEX IF NOT EXISTS idx_audit_event_type ON audit_log(event_type); +CREATE INDEX IF NOT EXISTS idx_audit_client_id ON audit_log(client_id); """ # Password hashing constants diff --git a/app/metrics.py b/app/metrics.py new file mode 100644 index 0000000..559805e --- /dev/null +++ b/app/metrics.py @@ -0,0 +1,160 @@ +"""Custom Prometheus metrics for FlaskPaste.""" + +from __future__ import annotations + +from typing import TYPE_CHECKING + +if TYPE_CHECKING: + from flask import Flask + +# Metrics are only initialized when not testing +_paste_created_counter = None +_paste_accessed_counter = None +_paste_deleted_counter = None +_rate_limit_counter = None +_pow_counter = None +_dedup_counter = None +_request_duration_histogram = None + + +def setup_custom_metrics(app: Flask) -> None: + """Initialize custom Prometheus metrics. + + Should be called after prometheus_flask_exporter is set up. + """ + global _paste_created_counter, _paste_accessed_counter, _paste_deleted_counter + global _rate_limit_counter, _pow_counter, _dedup_counter, _request_duration_histogram + + if app.config.get("TESTING"): + return + + try: + from prometheus_client import Counter, Histogram + except ImportError: + app.logger.warning("prometheus_client not available, custom metrics disabled") + return + + _paste_created_counter = Counter( + "flaskpaste_paste_created_total", + "Total number of paste creation attempts", + ["auth_type", "outcome"], + ) + + _paste_accessed_counter = Counter( + "flaskpaste_paste_accessed_total", + "Total number of paste accesses", + ["auth_type", "burn"], + ) + + _paste_deleted_counter = Counter( + "flaskpaste_paste_deleted_total", + "Total number of paste deletion attempts", + ["auth_type", "outcome"], + ) + + _rate_limit_counter = Counter( + "flaskpaste_rate_limit_total", + "Total number of rate limit events", + ["outcome"], + ) + + _pow_counter = Counter( + "flaskpaste_pow_total", + "Total number of proof-of-work validations", + ["outcome"], + ) + + _dedup_counter = Counter( + "flaskpaste_dedup_total", + "Total number of content deduplication events", + ["outcome"], + ) + + _request_duration_histogram = Histogram( + "flaskpaste_request_duration_seconds", + "Request duration in seconds", + ["method", "endpoint", "status"], + buckets=(0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0), + ) + + app.logger.info("Custom Prometheus metrics initialized") + + +def record_paste_created(auth_type: str, outcome: str) -> None: + """Record a paste creation attempt. + + Args: + auth_type: "authenticated" or "anonymous" + outcome: "success", "failure", or "blocked" + """ + if _paste_created_counter: + _paste_created_counter.labels(auth_type=auth_type, outcome=outcome).inc() + + +def record_paste_accessed(auth_type: str, burn: bool) -> None: + """Record a paste access. + + Args: + auth_type: "authenticated" or "anonymous" + burn: True if paste was burn-after-read + """ + if _paste_accessed_counter: + _paste_accessed_counter.labels(auth_type=auth_type, burn=str(burn).lower()).inc() + + +def record_paste_deleted(auth_type: str, outcome: str) -> None: + """Record a paste deletion attempt. + + Args: + auth_type: "authenticated" or "anonymous" + outcome: "success" or "failure" + """ + if _paste_deleted_counter: + _paste_deleted_counter.labels(auth_type=auth_type, outcome=outcome).inc() + + +def record_rate_limit(outcome: str) -> None: + """Record a rate limit event. + + Args: + outcome: "allowed" or "blocked" + """ + if _rate_limit_counter: + _rate_limit_counter.labels(outcome=outcome).inc() + + +def record_pow(outcome: str) -> None: + """Record a proof-of-work validation. + + Args: + outcome: "success" or "failure" + """ + if _pow_counter: + _pow_counter.labels(outcome=outcome).inc() + + +def record_dedup(outcome: str) -> None: + """Record a content deduplication event. + + Args: + outcome: "allowed" or "blocked" + """ + if _dedup_counter: + _dedup_counter.labels(outcome=outcome).inc() + + +def observe_request_duration( + method: str, endpoint: str, status: int, duration: float +) -> None: + """Record request duration. + + Args: + method: HTTP method (GET, POST, etc.) + endpoint: Request endpoint path + status: HTTP status code + duration: Request duration in seconds + """ + if _request_duration_histogram: + _request_duration_histogram.labels( + method=method, endpoint=endpoint, status=str(status) + ).observe(duration) diff --git a/completions/fpaste.bash b/completions/fpaste.bash new file mode 100644 index 0000000..1c504b8 --- /dev/null +++ b/completions/fpaste.bash @@ -0,0 +1,202 @@ +# Bash completion for fpaste +# Install: source this file or copy to /etc/bash_completion.d/fpaste + +_fpaste_completions() { + local cur prev words cword + _init_completion || return + + local commands="create c new get g delete d rm info i list ls search s find update u export register cert pki completion" + local pki_commands="status issue download dl" + + # Handle command-level completion + if [[ $cword -eq 1 ]]; then + COMPREPLY=($(compgen -W "$commands" -- "$cur")) + return + fi + + local cmd="${words[1]}" + + # PKI subcommand completion + if [[ "$cmd" == "pki" && $cword -eq 2 ]]; then + COMPREPLY=($(compgen -W "$pki_commands" -- "$cur")) + return + fi + + # Option completion based on command + case "$cmd" in + create|c|new) + case "$prev" in + -x|--expiry) + # Suggest common expiry values + COMPREPLY=($(compgen -W "60 300 600 3600 86400 604800" -- "$cur")) + return + ;; + -p|--password) + return + ;; + esac + if [[ "$cur" == -* ]]; then + COMPREPLY=($(compgen -W "-E --no-encrypt -b --burn -x --expiry -p --password -r --raw -q --quiet -C --clipboard --copy-url" -- "$cur")) + else + _filedir + fi + ;; + get|g) + case "$prev" in + -o|--output) + _filedir + return + ;; + -p|--password) + return + ;; + esac + if [[ "$cur" == -* ]]; then + COMPREPLY=($(compgen -W "-o --output -c --copy -p --password -m --meta" -- "$cur")) + fi + ;; + delete|d|rm) + case "$prev" in + -c|--confirm) + return + ;; + esac + if [[ "$cur" == -* ]]; then + COMPREPLY=($(compgen -W "-a --all -c --confirm" -- "$cur")) + fi + ;; + info|i) + # No options + ;; + list|ls) + case "$prev" in + -l|--limit|-o|--offset) + return + ;; + esac + if [[ "$cur" == -* ]]; then + COMPREPLY=($(compgen -W "-a --all -l --limit -o --offset --json" -- "$cur")) + fi + ;; + search|s|find) + case "$prev" in + -t|--type) + COMPREPLY=($(compgen -W "text/* image/* application/*" -- "$cur")) + return + ;; + --after|--before) + return + ;; + -l|--limit) + return + ;; + esac + if [[ "$cur" == -* ]]; then + COMPREPLY=($(compgen -W "-t --type --after --before -l --limit --json" -- "$cur")) + fi + ;; + update|u) + case "$prev" in + -x|--expiry) + COMPREPLY=($(compgen -W "60 300 600 3600 86400 604800" -- "$cur")) + return + ;; + -p|--password) + return + ;; + esac + if [[ "$cur" == -* ]]; then + COMPREPLY=($(compgen -W "-E --no-encrypt -p --password --remove-password -x --expiry -q --quiet" -- "$cur")) + else + _filedir + fi + ;; + export) + case "$prev" in + -o|--output|-k|--keyfile) + _filedir + return + ;; + esac + if [[ "$cur" == -* ]]; then + COMPREPLY=($(compgen -W "-o --output -k --keyfile --manifest -q --quiet" -- "$cur")) + fi + ;; + register) + case "$prev" in + -n|--name|-o|--output) + return + ;; + esac + if [[ "$cur" == -* ]]; then + COMPREPLY=($(compgen -W "-n --name -o --output --configure --p12-only -f --force -q --quiet" -- "$cur")) + fi + ;; + cert) + case "$prev" in + -o|--output) + _filedir -d + return + ;; + -a|--algorithm) + COMPREPLY=($(compgen -W "rsa ec" -- "$cur")) + return + ;; + -c|--curve) + COMPREPLY=($(compgen -W "secp256r1 secp384r1 secp521r1" -- "$cur")) + return + ;; + -b|--bits|-d|--days|-n|--name|--password-key) + return + ;; + esac + if [[ "$cur" == -* ]]; then + COMPREPLY=($(compgen -W "-o --output -a --algorithm -b --bits -c --curve -d --days -n --name --password-key --configure -f --force" -- "$cur")) + fi + ;; + pki) + local pki_cmd="${words[2]}" + case "$pki_cmd" in + issue) + case "$prev" in + -n|--name|-o|--output) + return + ;; + esac + if [[ "$cur" == -* ]]; then + COMPREPLY=($(compgen -W "-n --name -o --output --configure -f --force" -- "$cur")) + fi + ;; + download|dl) + case "$prev" in + -o|--output) + _filedir + return + ;; + esac + if [[ "$cur" == -* ]]; then + COMPREPLY=($(compgen -W "-o --output --configure" -- "$cur")) + fi + ;; + esac + ;; + completion) + case "$prev" in + --shell) + COMPREPLY=($(compgen -W "bash zsh fish" -- "$cur")) + return + ;; + esac + if [[ "$cur" == -* ]]; then + COMPREPLY=($(compgen -W "--shell" -- "$cur")) + fi + ;; + esac + + # Global options + if [[ "$cur" == -* && $cword -eq 1 ]]; then + COMPREPLY=($(compgen -W "-s --server -h --help" -- "$cur")) + fi +} + +complete -F _fpaste_completions fpaste diff --git a/completions/fpaste.fish b/completions/fpaste.fish new file mode 100644 index 0000000..5840bac --- /dev/null +++ b/completions/fpaste.fish @@ -0,0 +1,148 @@ +# Fish completion for fpaste +# Install: copy to ~/.config/fish/completions/fpaste.fish + +# Disable file completions by default +complete -c fpaste -f + +# Helper function to check if command is specified +function __fpaste_needs_command + set -l cmd (commandline -opc) + if test (count $cmd) -eq 1 + return 0 + end + return 1 +end + +function __fpaste_using_command + set -l cmd (commandline -opc) + if test (count $cmd) -gt 1 + if test $argv[1] = $cmd[2] + return 0 + end + end + return 1 +end + +function __fpaste_using_pki_command + set -l cmd (commandline -opc) + if test (count $cmd) -gt 2 + if test $cmd[2] = 'pki' -a $argv[1] = $cmd[3] + return 0 + end + end + return 1 +end + +# Global options +complete -c fpaste -s s -l server -d 'Server URL' +complete -c fpaste -s h -l help -d 'Show help' + +# Commands +complete -c fpaste -n '__fpaste_needs_command' -a 'create' -d 'Create a new paste' +complete -c fpaste -n '__fpaste_needs_command' -a 'c' -d 'Create a new paste' +complete -c fpaste -n '__fpaste_needs_command' -a 'new' -d 'Create a new paste' +complete -c fpaste -n '__fpaste_needs_command' -a 'get' -d 'Retrieve a paste' +complete -c fpaste -n '__fpaste_needs_command' -a 'g' -d 'Retrieve a paste' +complete -c fpaste -n '__fpaste_needs_command' -a 'delete' -d 'Delete paste(s)' +complete -c fpaste -n '__fpaste_needs_command' -a 'd' -d 'Delete paste(s)' +complete -c fpaste -n '__fpaste_needs_command' -a 'rm' -d 'Delete paste(s)' +complete -c fpaste -n '__fpaste_needs_command' -a 'info' -d 'Show server info' +complete -c fpaste -n '__fpaste_needs_command' -a 'i' -d 'Show server info' +complete -c fpaste -n '__fpaste_needs_command' -a 'list' -d 'List your pastes' +complete -c fpaste -n '__fpaste_needs_command' -a 'ls' -d 'List your pastes' +complete -c fpaste -n '__fpaste_needs_command' -a 'search' -d 'Search your pastes' +complete -c fpaste -n '__fpaste_needs_command' -a 's' -d 'Search your pastes' +complete -c fpaste -n '__fpaste_needs_command' -a 'find' -d 'Search your pastes' +complete -c fpaste -n '__fpaste_needs_command' -a 'update' -d 'Update existing paste' +complete -c fpaste -n '__fpaste_needs_command' -a 'u' -d 'Update existing paste' +complete -c fpaste -n '__fpaste_needs_command' -a 'export' -d 'Export all pastes' +complete -c fpaste -n '__fpaste_needs_command' -a 'register' -d 'Register and get certificate' +complete -c fpaste -n '__fpaste_needs_command' -a 'cert' -d 'Generate client certificate' +complete -c fpaste -n '__fpaste_needs_command' -a 'pki' -d 'PKI operations' +complete -c fpaste -n '__fpaste_needs_command' -a 'completion' -d 'Generate shell completion' + +# create command options +complete -c fpaste -n '__fpaste_using_command create; or __fpaste_using_command c; or __fpaste_using_command new' -s E -l no-encrypt -d 'Disable encryption' +complete -c fpaste -n '__fpaste_using_command create; or __fpaste_using_command c; or __fpaste_using_command new' -s b -l burn -d 'Burn after read' +complete -c fpaste -n '__fpaste_using_command create; or __fpaste_using_command c; or __fpaste_using_command new' -s x -l expiry -d 'Expiry in seconds' -x +complete -c fpaste -n '__fpaste_using_command create; or __fpaste_using_command c; or __fpaste_using_command new' -s p -l password -d 'Password protect' -x +complete -c fpaste -n '__fpaste_using_command create; or __fpaste_using_command c; or __fpaste_using_command new' -s r -l raw -d 'Output raw URL' +complete -c fpaste -n '__fpaste_using_command create; or __fpaste_using_command c; or __fpaste_using_command new' -s q -l quiet -d 'Output ID only' +complete -c fpaste -n '__fpaste_using_command create; or __fpaste_using_command c; or __fpaste_using_command new' -s C -l clipboard -d 'Read from clipboard' +complete -c fpaste -n '__fpaste_using_command create; or __fpaste_using_command c; or __fpaste_using_command new' -l copy-url -d 'Copy result URL to clipboard' +complete -c fpaste -n '__fpaste_using_command create; or __fpaste_using_command c; or __fpaste_using_command new' -F + +# get command options +complete -c fpaste -n '__fpaste_using_command get; or __fpaste_using_command g' -s o -l output -d 'Save to file' -r +complete -c fpaste -n '__fpaste_using_command get; or __fpaste_using_command g' -s c -l copy -d 'Copy content to clipboard' +complete -c fpaste -n '__fpaste_using_command get; or __fpaste_using_command g' -s p -l password -d 'Password' -x +complete -c fpaste -n '__fpaste_using_command get; or __fpaste_using_command g' -s m -l meta -d 'Show metadata only' + +# delete command options +complete -c fpaste -n '__fpaste_using_command delete; or __fpaste_using_command d; or __fpaste_using_command rm' -s a -l all -d 'Delete all pastes' +complete -c fpaste -n '__fpaste_using_command delete; or __fpaste_using_command d; or __fpaste_using_command rm' -s c -l confirm -d 'Confirm count' -x + +# list command options +complete -c fpaste -n '__fpaste_using_command list; or __fpaste_using_command ls' -s a -l all -d 'List all pastes (admin)' +complete -c fpaste -n '__fpaste_using_command list; or __fpaste_using_command ls' -s l -l limit -d 'Max pastes' -x +complete -c fpaste -n '__fpaste_using_command list; or __fpaste_using_command ls' -s o -l offset -d 'Skip first N pastes' -x +complete -c fpaste -n '__fpaste_using_command list; or __fpaste_using_command ls' -l json -d 'Output as JSON' + +# search command options +complete -c fpaste -n '__fpaste_using_command search; or __fpaste_using_command s; or __fpaste_using_command find' -s t -l type -d 'Filter by MIME type' -x +complete -c fpaste -n '__fpaste_using_command search; or __fpaste_using_command s; or __fpaste_using_command find' -l after -d 'Created after' -x +complete -c fpaste -n '__fpaste_using_command search; or __fpaste_using_command s; or __fpaste_using_command find' -l before -d 'Created before' -x +complete -c fpaste -n '__fpaste_using_command search; or __fpaste_using_command s; or __fpaste_using_command find' -s l -l limit -d 'Max results' -x +complete -c fpaste -n '__fpaste_using_command search; or __fpaste_using_command s; or __fpaste_using_command find' -l json -d 'Output as JSON' + +# update command options +complete -c fpaste -n '__fpaste_using_command update; or __fpaste_using_command u' -s E -l no-encrypt -d 'Disable encryption' +complete -c fpaste -n '__fpaste_using_command update; or __fpaste_using_command u' -s p -l password -d 'Set/change password' -x +complete -c fpaste -n '__fpaste_using_command update; or __fpaste_using_command u' -l remove-password -d 'Remove password' +complete -c fpaste -n '__fpaste_using_command update; or __fpaste_using_command u' -s x -l expiry -d 'Extend expiry' -x +complete -c fpaste -n '__fpaste_using_command update; or __fpaste_using_command u' -s q -l quiet -d 'Minimal output' +complete -c fpaste -n '__fpaste_using_command update; or __fpaste_using_command u' -F + +# export command options +complete -c fpaste -n '__fpaste_using_command export' -s o -l output -d 'Output directory' -r +complete -c fpaste -n '__fpaste_using_command export' -s k -l keyfile -d 'Key file' -r +complete -c fpaste -n '__fpaste_using_command export' -l manifest -d 'Write manifest.json' +complete -c fpaste -n '__fpaste_using_command export' -s q -l quiet -d 'Minimal output' + +# register command options +complete -c fpaste -n '__fpaste_using_command register' -s n -l name -d 'Common name' -x +complete -c fpaste -n '__fpaste_using_command register' -s o -l output -d 'Output directory' -r +complete -c fpaste -n '__fpaste_using_command register' -l configure -d 'Update config file' +complete -c fpaste -n '__fpaste_using_command register' -l p12-only -d 'Save only PKCS#12' +complete -c fpaste -n '__fpaste_using_command register' -s f -l force -d 'Overwrite existing files' +complete -c fpaste -n '__fpaste_using_command register' -s q -l quiet -d 'Minimal output' + +# cert command options +complete -c fpaste -n '__fpaste_using_command cert' -s o -l output -d 'Output directory' -r +complete -c fpaste -n '__fpaste_using_command cert' -s a -l algorithm -d 'Key algorithm' -x -a 'rsa ec' +complete -c fpaste -n '__fpaste_using_command cert' -s b -l bits -d 'RSA key size' -x +complete -c fpaste -n '__fpaste_using_command cert' -s c -l curve -d 'EC curve' -x -a 'secp256r1 secp384r1 secp521r1' +complete -c fpaste -n '__fpaste_using_command cert' -s d -l days -d 'Validity period' -x +complete -c fpaste -n '__fpaste_using_command cert' -s n -l name -d 'Common name' -x +complete -c fpaste -n '__fpaste_using_command cert' -l password-key -d 'Encrypt private key' -x +complete -c fpaste -n '__fpaste_using_command cert' -l configure -d 'Update config file' +complete -c fpaste -n '__fpaste_using_command cert' -s f -l force -d 'Overwrite existing files' + +# pki subcommands +complete -c fpaste -n '__fpaste_using_command pki' -a 'status' -d 'Show PKI status' +complete -c fpaste -n '__fpaste_using_command pki' -a 'issue' -d 'Request certificate from server' +complete -c fpaste -n '__fpaste_using_command pki' -a 'download' -d 'Download CA certificate' +complete -c fpaste -n '__fpaste_using_command pki' -a 'dl' -d 'Download CA certificate' + +# pki issue options +complete -c fpaste -n '__fpaste_using_pki_command issue' -s n -l name -d 'Common name' -x +complete -c fpaste -n '__fpaste_using_pki_command issue' -s o -l output -d 'Output directory' -r +complete -c fpaste -n '__fpaste_using_pki_command issue' -l configure -d 'Update config file' +complete -c fpaste -n '__fpaste_using_pki_command issue' -s f -l force -d 'Overwrite existing files' + +# pki download options +complete -c fpaste -n '__fpaste_using_pki_command download; or __fpaste_using_pki_command dl' -s o -l output -d 'Save to file' -r +complete -c fpaste -n '__fpaste_using_pki_command download; or __fpaste_using_pki_command dl' -l configure -d 'Update config file' + +# completion command options +complete -c fpaste -n '__fpaste_using_command completion' -l shell -d 'Shell type' -x -a 'bash zsh fish' diff --git a/completions/fpaste.zsh b/completions/fpaste.zsh new file mode 100644 index 0000000..81e884c --- /dev/null +++ b/completions/fpaste.zsh @@ -0,0 +1,203 @@ +#compdef fpaste +# Zsh completion for fpaste +# Install: copy to ~/.zfunc/_fpaste and add 'fpath=(~/.zfunc $fpath)' to ~/.zshrc + +_fpaste() { + local curcontext="$curcontext" state line + typeset -A opt_args + + _arguments -C \ + '-s[Server URL]:url:' \ + '--server[Server URL]:url:' \ + '-h[Show help]' \ + '--help[Show help]' \ + '1: :->command' \ + '*:: :->args' + + case $state in + command) + local commands=( + 'create:Create a new paste' + 'c:Create a new paste' + 'new:Create a new paste' + 'get:Retrieve a paste' + 'g:Retrieve a paste' + 'delete:Delete paste(s)' + 'd:Delete paste(s)' + 'rm:Delete paste(s)' + 'info:Show server info' + 'i:Show server info' + 'list:List your pastes' + 'ls:List your pastes' + 'search:Search your pastes' + 's:Search your pastes' + 'find:Search your pastes' + 'update:Update existing paste' + 'u:Update existing paste' + 'export:Export all pastes' + 'register:Register and get certificate' + 'cert:Generate client certificate' + 'pki:PKI operations' + 'completion:Generate shell completion' + ) + _describe -t commands 'fpaste commands' commands + ;; + args) + case $line[1] in + create|c|new) + _arguments \ + '-E[Disable encryption]' \ + '--no-encrypt[Disable encryption]' \ + '-b[Burn after read]' \ + '--burn[Burn after read]' \ + '-x[Expiry in seconds]:seconds:' \ + '--expiry[Expiry in seconds]:seconds:' \ + '-p[Password protect]:password:' \ + '--password[Password protect]:password:' \ + '-r[Output raw URL]' \ + '--raw[Output raw URL]' \ + '-q[Output ID only]' \ + '--quiet[Output ID only]' \ + '-C[Read from clipboard]' \ + '--clipboard[Read from clipboard]' \ + '--copy-url[Copy result URL to clipboard]' \ + '*:file:_files' + ;; + get|g) + _arguments \ + '-o[Save to file]:file:_files' \ + '--output[Save to file]:file:_files' \ + '-c[Copy content to clipboard]' \ + '--copy[Copy content to clipboard]' \ + '-p[Password]:password:' \ + '--password[Password]:password:' \ + '-m[Show metadata only]' \ + '--meta[Show metadata only]' \ + '1:paste ID:' + ;; + delete|d|rm) + _arguments \ + '-a[Delete all pastes]' \ + '--all[Delete all pastes]' \ + '-c[Confirm count]:count:' \ + '--confirm[Confirm count]:count:' \ + '*:paste ID:' + ;; + info|i) + ;; + list|ls) + _arguments \ + '-a[List all pastes (admin)]' \ + '--all[List all pastes (admin)]' \ + '-l[Max pastes]:number:' \ + '--limit[Max pastes]:number:' \ + '-o[Skip first N pastes]:number:' \ + '--offset[Skip first N pastes]:number:' \ + '--json[Output as JSON]' + ;; + search|s|find) + _arguments \ + '-t[Filter by MIME type]:pattern:' \ + '--type[Filter by MIME type]:pattern:' \ + '--after[Created after]:date:' \ + '--before[Created before]:date:' \ + '-l[Max results]:number:' \ + '--limit[Max results]:number:' \ + '--json[Output as JSON]' + ;; + update|u) + _arguments \ + '-E[Disable encryption]' \ + '--no-encrypt[Disable encryption]' \ + '-p[Set/change password]:password:' \ + '--password[Set/change password]:password:' \ + '--remove-password[Remove password]' \ + '-x[Extend expiry]:seconds:' \ + '--expiry[Extend expiry]:seconds:' \ + '-q[Minimal output]' \ + '--quiet[Minimal output]' \ + '1:paste ID:' \ + '*:file:_files' + ;; + export) + _arguments \ + '-o[Output directory]:directory:_files -/' \ + '--output[Output directory]:directory:_files -/' \ + '-k[Key file]:file:_files' \ + '--keyfile[Key file]:file:_files' \ + '--manifest[Write manifest.json]' \ + '-q[Minimal output]' \ + '--quiet[Minimal output]' + ;; + register) + _arguments \ + '-n[Common name]:name:' \ + '--name[Common name]:name:' \ + '-o[Output directory]:directory:_files -/' \ + '--output[Output directory]:directory:_files -/' \ + '--configure[Update config file]' \ + '--p12-only[Save only PKCS#12]' \ + '-f[Overwrite existing files]' \ + '--force[Overwrite existing files]' \ + '-q[Minimal output]' \ + '--quiet[Minimal output]' + ;; + cert) + _arguments \ + '-o[Output directory]:directory:_files -/' \ + '--output[Output directory]:directory:_files -/' \ + '-a[Key algorithm]:algorithm:(rsa ec)' \ + '--algorithm[Key algorithm]:algorithm:(rsa ec)' \ + '-b[RSA key size]:bits:' \ + '--bits[RSA key size]:bits:' \ + '-c[EC curve]:curve:(secp256r1 secp384r1 secp521r1)' \ + '--curve[EC curve]:curve:(secp256r1 secp384r1 secp521r1)' \ + '-d[Validity period]:days:' \ + '--days[Validity period]:days:' \ + '-n[Common name]:name:' \ + '--name[Common name]:name:' \ + '--password-key[Encrypt private key]:password:' \ + '--configure[Update config file]' \ + '-f[Overwrite existing files]' \ + '--force[Overwrite existing files]' + ;; + pki) + local pki_commands=( + 'status:Show PKI status' + 'issue:Request certificate from server' + 'download:Download CA certificate' + 'dl:Download CA certificate' + ) + if (( CURRENT == 2 )); then + _describe -t commands 'pki commands' pki_commands + else + case $line[2] in + issue) + _arguments \ + '-n[Common name]:name:' \ + '--name[Common name]:name:' \ + '-o[Output directory]:directory:_files -/' \ + '--output[Output directory]:directory:_files -/' \ + '--configure[Update config file]' \ + '-f[Overwrite existing files]' \ + '--force[Overwrite existing files]' + ;; + download|dl) + _arguments \ + '-o[Save to file]:file:_files' \ + '--output[Save to file]:file:_files' \ + '--configure[Update config file]' + ;; + esac + fi + ;; + completion) + _arguments \ + '--shell[Shell type]:shell:(bash zsh fish)' + ;; + esac + ;; + esac +} + +_fpaste "$@" diff --git a/fpaste b/fpaste index c3b34d6..ba89258 100755 --- a/fpaste +++ b/fpaste @@ -8,7 +8,9 @@ import base64 import hashlib import json import os +import shutil import ssl +import subprocess import sys import time import urllib.error @@ -431,13 +433,76 @@ def print_paste_list( print(f"\n{summary}") +# ----------------------------------------------------------------------------- +# Clipboard integration +# ----------------------------------------------------------------------------- + +# Clipboard read commands (tool name, command args) +CLIPBOARD_READ_COMMANDS = [ + ("xclip", ["xclip", "-selection", "clipboard", "-o"]), + ("xsel", ["xsel", "--clipboard", "--output"]), + ("pbpaste", ["pbpaste"]), + ("powershell.exe", ["powershell.exe", "-command", "Get-Clipboard"]), + ("wl-paste", ["wl-paste"]), +] + +# Clipboard write commands (tool name, command args) +CLIPBOARD_WRITE_COMMANDS = [ + ("xclip", ["xclip", "-selection", "clipboard", "-i"]), + ("xsel", ["xsel", "--clipboard", "--input"]), + ("pbcopy", ["pbcopy"]), + ("clip.exe", ["clip.exe"]), + ("wl-copy", ["wl-copy"]), +] + + +def find_clipboard_command(commands: list[tuple[str, list[str]]]) -> list[str] | None: + """Find first available clipboard command.""" + for tool_name, cmd in commands: + if shutil.which(tool_name): + return cmd + return None + + +def read_clipboard() -> bytes: + """Read content from system clipboard.""" + cmd = find_clipboard_command(CLIPBOARD_READ_COMMANDS) + if not cmd: + die("no clipboard tool found (install xclip, xsel, or wl-paste)") + + try: + result = subprocess.run(cmd, capture_output=True, check=True) + return result.stdout + except subprocess.CalledProcessError as e: + die(f"clipboard read failed: {e.stderr.decode(errors='replace')}") + except FileNotFoundError: + die(f"clipboard tool not found: {cmd[0]}") + + +def write_clipboard(data: bytes) -> None: + """Write content to system clipboard.""" + cmd = find_clipboard_command(CLIPBOARD_WRITE_COMMANDS) + if not cmd: + die("no clipboard tool found (install xclip, xsel, or wl-copy)") + + try: + subprocess.run(cmd, input=data, check=True) + except subprocess.CalledProcessError as e: + die(f"clipboard write failed: {e.stderr.decode(errors='replace') if e.stderr else ''}") + except FileNotFoundError: + die(f"clipboard tool not found: {cmd[0]}") + + # ----------------------------------------------------------------------------- # Content helpers # ----------------------------------------------------------------------------- -def read_content(file_arg: str | None) -> bytes: - """Read content from file or stdin.""" +def read_content(file_arg: str | None, from_clipboard: bool = False) -> bytes: + """Read content from file, stdin, or clipboard.""" + if from_clipboard: + return read_clipboard() + if file_arg: if file_arg == "-": return sys.stdin.buffer.read() @@ -447,7 +512,7 @@ def read_content(file_arg: str | None) -> bytes: return path.read_bytes() if sys.stdin.isatty(): - die("no input provided (pipe data or specify file)") + die("no input provided (pipe data, specify file, or use -C for clipboard)") return sys.stdin.buffer.read() @@ -504,7 +569,8 @@ def require_auth(config: Mapping[str, Any]) -> None: def cmd_create(args: argparse.Namespace, config: dict[str, Any]) -> None: """Create a new paste.""" - content = read_content(args.file) + from_clipboard = getattr(args, "clipboard", False) + content = read_content(args.file, from_clipboard=from_clipboard) if not content: die("empty content") @@ -562,11 +628,19 @@ def cmd_create(args: argparse.Namespace, config: dict[str, Any]) -> None: base_url = config["server"].rstrip("/") if args.raw: - print(base_url + data["raw"] + key_fragment) + result_url = base_url + data["raw"] + key_fragment elif args.quiet: - print(data["id"] + key_fragment) + result_url = data["id"] + key_fragment else: - print(base_url + data["url"] + key_fragment) + result_url = base_url + data["url"] + key_fragment + + print(result_url) + + # Copy URL to clipboard if requested + if getattr(args, "copy_url", False): + write_clipboard(result_url.encode()) + if not args.quiet: + print("(copied to clipboard)", file=sys.stderr) return last_error = parse_error(body, body.decode(errors="replace")) @@ -619,7 +693,11 @@ def cmd_get(args: argparse.Namespace, config: dict[str, Any]) -> None: if encryption_key: body = decrypt_content(body, encryption_key) - if args.output: + # Copy to clipboard if requested + if getattr(args, "copy", False): + write_clipboard(body) + print("(copied to clipboard)", file=sys.stderr) + elif args.output: Path(args.output).write_bytes(body) print(f"saved: {args.output}", file=sys.stderr) else: @@ -1323,11 +1401,14 @@ def build_parser() -> argparse.ArgumentParser: p_create.add_argument("-p", "--password", metavar="PASS", help="password protect") p_create.add_argument("-r", "--raw", action="store_true", help="output raw URL") p_create.add_argument("-q", "--quiet", action="store_true", help="output ID only") + p_create.add_argument("-C", "--clipboard", action="store_true", help="read from clipboard") + p_create.add_argument("--copy-url", action="store_true", help="copy result URL to clipboard") # get p_get = subparsers.add_parser("get", aliases=["g"], help="retrieve paste") p_get.add_argument("id", help="paste ID or URL") p_get.add_argument("-o", "--output", help="save to file") + p_get.add_argument("-c", "--copy", action="store_true", help="copy content to clipboard") p_get.add_argument("-p", "--password", metavar="PASS", help="password for protected paste") p_get.add_argument("-m", "--meta", action="store_true", help="show metadata only") @@ -1419,9 +1500,198 @@ def build_parser() -> argparse.ArgumentParser: "--configure", action="store_true", help="update config file (requires -o)" ) + # completion + p_completion = subparsers.add_parser("completion", help="generate shell completion") + p_completion.add_argument( + "--shell", choices=["bash", "zsh", "fish"], default="bash", help="shell type" + ) + return parser +def cmd_completion(args: argparse.Namespace, config: dict[str, Any]) -> None: + """Output shell completion script.""" + shell = getattr(args, "shell", None) or "bash" + + # Bash completion - full featured + bash_completion = """\ +# Bash completion for fpaste +# Install: source this file or copy to /etc/bash_completion.d/fpaste + +_fpaste_completions() { + local cur prev words cword + _init_completion || return + + local commands="create c new get g delete d rm info i list ls" + commands+=" search s find update u export register cert pki completion" + local pki_commands="status issue download dl" + + if [[ $cword -eq 1 ]]; then + COMPREPLY=($(compgen -W "$commands" -- "$cur")) + return + fi + + local cmd="${words[1]}" + + if [[ "$cmd" == "pki" && $cword -eq 2 ]]; then + COMPREPLY=($(compgen -W "$pki_commands" -- "$cur")) + return + fi + + case "$cmd" in + create|c|new) + local opts="-E --no-encrypt -b --burn -x --expiry -p --password" + opts+=" -r --raw -q --quiet -C --clipboard --copy-url" + [[ "$cur" == -* ]] && COMPREPLY=($(compgen -W "$opts" -- "$cur")) || _filedir + ;; + get|g) + local opts="-o --output -c --copy -p --password -m --meta" + [[ "$cur" == -* ]] && COMPREPLY=($(compgen -W "$opts" -- "$cur")) + ;; + delete|d|rm) + [[ "$cur" == -* ]] && COMPREPLY=($(compgen -W "-a --all -c --confirm" -- "$cur")) + ;; + list|ls) + local opts="-a --all -l --limit -o --offset --json" + [[ "$cur" == -* ]] && COMPREPLY=($(compgen -W "$opts" -- "$cur")) + ;; + search|s|find) + local opts="-t --type --after --before -l --limit --json" + [[ "$cur" == -* ]] && COMPREPLY=($(compgen -W "$opts" -- "$cur")) + ;; + update|u) + local opts="-E --no-encrypt -p --password --remove-password -x --expiry -q --quiet" + [[ "$cur" == -* ]] && COMPREPLY=($(compgen -W "$opts" -- "$cur")) || _filedir + ;; + export) + local opts="-o --output -k --keyfile --manifest -q --quiet" + [[ "$cur" == -* ]] && COMPREPLY=($(compgen -W "$opts" -- "$cur")) + ;; + register) + local opts="-n --name -o --output --configure --p12-only -f --force -q --quiet" + [[ "$cur" == -* ]] && COMPREPLY=($(compgen -W "$opts" -- "$cur")) + ;; + cert) + local opts="-o --output -a --algorithm -b --bits -c --curve" + opts+=" -d --days -n --name --password-key --configure -f --force" + [[ "$cur" == -* ]] && COMPREPLY=($(compgen -W "$opts" -- "$cur")) + ;; + completion) + [[ "$cur" == -* ]] && COMPREPLY=($(compgen -W "--shell" -- "$cur")) + [[ "$prev" == "--shell" ]] && COMPREPLY=($(compgen -W "bash zsh fish" -- "$cur")) + ;; + esac +} + +complete -F _fpaste_completions fpaste +""" + + # Zsh completion - compact format + zsh_completion = """\ +#compdef fpaste +_fpaste() { + local curcontext="$curcontext" state line + typeset -A opt_args + _arguments -C '-s[Server]:url:' '--server[Server]:url:' '1: :->cmd' '*:: :->args' + case $state in + cmd) + local cmds=('create:Create paste' 'get:Get paste' 'delete:Delete' + 'info:Server info' 'list:List pastes' 'search:Search' + 'update:Update paste' 'export:Export' 'register:Register' + 'cert:Generate cert' 'pki:PKI ops' 'completion:Completions') + _describe -t commands 'commands' cmds + ;; + args) + case $line[1] in + create|c|new) + _arguments '-E[No encrypt]' '-b[Burn]' '-x[Expiry]:sec:' \\ + '-p[Pass]:p:' '-r[Raw]' '-q[Quiet]' '-C[Clipboard]' \\ + '--copy-url' '*:file:_files' ;; + get|g) + _arguments '-o[Out]:f:_files' '-c[Copy]' '-p[Pass]:p:' \\ + '-m[Meta]' '1:ID:' ;; + delete|d|rm) _arguments '-a[All]' '-c[Confirm]:n:' '*:ID:' ;; + list|ls) _arguments '-a[All]' '-l[Limit]:n:' '-o[Off]:n:' '--json' ;; + search|s|find) + _arguments '-t[Type]:p:' '--after:d:' '--before:d:' \\ + '-l[Limit]:n:' '--json' ;; + update|u) + _arguments '-E[No encrypt]' '-p[Pass]:p:' '--remove-password' \\ + '-x[Expiry]:s:' '-q[Quiet]' '1:ID:' '*:file:_files' ;; + export) + _arguments '-o[Dir]:d:_files -/' '-k[Keys]:f:_files' \\ + '--manifest' '-q[Quiet]' ;; + register) + _arguments '-n[Name]:cn:' '-o[Dir]:d:_files -/' '--configure' \\ + '--p12-only' '-f[Force]' '-q[Quiet]' ;; + cert) + _arguments '-o[Dir]:d:_files -/' '-a[Algo]:(rsa ec)' \\ + '-b[Bits]:n:' '-c[Curve]:(secp256r1 secp384r1 secp521r1)' \\ + '-d[Days]:n:' '-n[Name]:cn:' '--configure' '-f[Force]' ;; + pki) (( CURRENT == 2 )) && _describe 'cmd' '(status issue download)' ;; + completion) _arguments '--shell:(bash zsh fish)' ;; + esac ;; + esac +} +_fpaste "$@" +""" + + # Fish completion - compact + fish_completion = """\ +# Fish completion for fpaste +complete -c fpaste -f +complete -c fpaste -n __fish_use_subcommand -a 'create c new' -d 'Create' +complete -c fpaste -n __fish_use_subcommand -a 'get g' -d 'Get paste' +complete -c fpaste -n __fish_use_subcommand -a 'delete d rm' -d 'Delete' +complete -c fpaste -n __fish_use_subcommand -a 'info i' -d 'Server info' +complete -c fpaste -n __fish_use_subcommand -a 'list ls' -d 'List' +complete -c fpaste -n __fish_use_subcommand -a 'search s find' -d 'Search' +complete -c fpaste -n __fish_use_subcommand -a 'update u' -d 'Update' +complete -c fpaste -n __fish_use_subcommand -a 'export' -d 'Export' +complete -c fpaste -n __fish_use_subcommand -a 'register' -d 'Register' +complete -c fpaste -n __fish_use_subcommand -a 'cert' -d 'Gen cert' +complete -c fpaste -n __fish_use_subcommand -a 'pki' -d 'PKI' +complete -c fpaste -n __fish_use_subcommand -a 'completion' -d 'Completions' + +set -l cr '__fish_seen_subcommand_from create c new' +complete -c fpaste -n $cr -s E -l no-encrypt -d 'No encrypt' +complete -c fpaste -n $cr -s b -l burn -d 'Burn' +complete -c fpaste -n $cr -s x -l expiry -d 'Expiry' -x +complete -c fpaste -n $cr -s p -l password -d 'Password' -x +complete -c fpaste -n $cr -s r -l raw -d 'Raw URL' +complete -c fpaste -n $cr -s q -l quiet -d 'Quiet' +complete -c fpaste -n $cr -s C -l clipboard -d 'Clipboard' +complete -c fpaste -n $cr -l copy-url -d 'Copy URL' +complete -c fpaste -n $cr -F + +set -l gt '__fish_seen_subcommand_from get g' +complete -c fpaste -n $gt -s o -l output -d 'Output' -r +complete -c fpaste -n $gt -s c -l copy -d 'Copy' +complete -c fpaste -n $gt -s p -l password -d 'Password' -x +complete -c fpaste -n $gt -s m -l meta -d 'Metadata' + +set -l dl '__fish_seen_subcommand_from delete d rm' +complete -c fpaste -n $dl -s a -l all -d 'All' +complete -c fpaste -n $dl -s c -l confirm -d 'Confirm' -x + +set -l ls '__fish_seen_subcommand_from list ls' +complete -c fpaste -n $ls -s a -l all -d 'All' +complete -c fpaste -n $ls -s l -l limit -d 'Limit' -x +complete -c fpaste -n $ls -s o -l offset -d 'Offset' -x +complete -c fpaste -n $ls -l json -d 'JSON' + +set -l cp '__fish_seen_subcommand_from completion' +complete -c fpaste -n $cp -l shell -d 'Shell' -xa 'bash zsh fish' +""" + + completions = {"bash": bash_completion, "zsh": zsh_completion, "fish": fish_completion} + + if shell not in completions: + die(f"unsupported shell: {shell} (use: bash, zsh, fish)") + + print(completions[shell]) + + # Command dispatch table COMMANDS: dict[str, Any] = { "create": cmd_create, @@ -1444,6 +1714,7 @@ COMMANDS: dict[str, Any] = { "export": cmd_export, "register": cmd_register, "cert": cmd_cert, + "completion": cmd_completion, } PKI_COMMANDS: dict[str, Any] = { diff --git a/tests/test_audit.py b/tests/test_audit.py new file mode 100644 index 0000000..21d1f95 --- /dev/null +++ b/tests/test_audit.py @@ -0,0 +1,312 @@ +"""Tests for audit logging system.""" + +from __future__ import annotations + +import time +from typing import TYPE_CHECKING + +import pytest + +from app import create_app +from app.audit import AuditEvent, AuditOutcome, cleanup_old_audit_logs, log_event, query_audit_log +from app.database import get_db + +if TYPE_CHECKING: + from flask import Flask + from flask.testing import FlaskClient + + +@pytest.fixture +def app() -> Flask: + """Create application configured for testing.""" + app = create_app("testing") + app.config["POW_DIFFICULTY"] = 0 + app.config["RATE_LIMIT_ENABLED"] = False + app.config["AUDIT_ENABLED"] = True + app.config["AUDIT_RETENTION_DAYS"] = 90 + return app + + +@pytest.fixture +def client(app: Flask) -> FlaskClient: + """Create test client.""" + return app.test_client() + + +@pytest.fixture +def admin_client(app: Flask) -> FlaskClient: + """Create test client with admin authentication.""" + client = app.test_client() + fingerprint = "b" * 40 + client.environ_base["HTTP_X_SSL_CLIENT_SHA1"] = fingerprint + with app.app_context(): + app.config["PKI_ENABLED"] = True + db = get_db() + now = int(time.time()) + db.execute( + """INSERT OR REPLACE INTO certificate_authority + (id, common_name, certificate_pem, private_key_encrypted, + key_salt, created_at, expires_at, key_algorithm) + VALUES ('default', 'Test CA', 'CERT', X'00', X'00', ?, ?, 'rsa')""", + (now, now + 86400 * 365), + ) + db.execute( + """INSERT OR REPLACE INTO issued_certificates + (serial, ca_id, common_name, fingerprint_sha1, certificate_pem, + created_at, expires_at, status, is_admin) + VALUES (?, 'default', 'admin', ?, 'CERT', ?, ?, 'valid', 1)""", + ("admin001", fingerprint, now, now + 86400), + ) + db.commit() + return client + + +class TestAuditLogging: + """Test audit logging functions.""" + + def test_log_event_creates_record(self, app: Flask) -> None: + """log_event should create audit log entry.""" + with app.app_context(): + log_event( + AuditEvent.PASTE_CREATE, + AuditOutcome.SUCCESS, + paste_id="abc123", + client_id="a" * 40, + client_ip="192.168.1.1", + details={"size": 1024}, + ) + + entries, total = query_audit_log(paste_id="abc123") + + assert total == 1 + assert len(entries) == 1 + assert entries[0]["event_type"] == "paste_create" + assert entries[0]["outcome"] == "success" + assert entries[0]["paste_id"] == "abc123" + assert entries[0]["client_id"] == "a" * 40 + assert entries[0]["client_ip"] == "192.168.1.1" + assert entries[0]["details"]["size"] == 1024 + + def test_log_event_with_string_types(self, app: Flask) -> None: + """log_event should accept string types as well as enums.""" + with app.app_context(): + log_event( + "custom_event", + "custom_outcome", + paste_id="xyz789", + ) + + entries, total = query_audit_log(event_type="custom_event") + + assert total == 1 + assert entries[0]["event_type"] == "custom_event" + assert entries[0]["outcome"] == "custom_outcome" + + def test_log_event_disabled(self, app: Flask) -> None: + """log_event should skip logging when disabled.""" + app.config["AUDIT_ENABLED"] = False + with app.app_context(): + log_event( + AuditEvent.PASTE_CREATE, + AuditOutcome.SUCCESS, + paste_id="disabled123", + ) + + _entries, total = query_audit_log(paste_id="disabled123") + assert total == 0 + + +class TestAuditQuery: + """Test audit log query functionality.""" + + def test_query_by_event_type(self, app: Flask) -> None: + """Query should filter by event type.""" + with app.app_context(): + log_event(AuditEvent.PASTE_CREATE, AuditOutcome.SUCCESS, paste_id="evt_p1") + log_event(AuditEvent.PASTE_DELETE, AuditOutcome.SUCCESS, paste_id="evt_p2") + log_event(AuditEvent.PASTE_CREATE, AuditOutcome.SUCCESS, paste_id="evt_p3") + + entries, _total = query_audit_log(event_type="paste_create") + + # Verify entries matching our test are present + our_entries = [e for e in entries if e.get("paste_id", "").startswith("evt_")] + assert len(our_entries) == 2 + assert all(e["event_type"] == "paste_create" for e in entries) + + def test_query_by_client_id(self, app: Flask) -> None: + """Query should filter by client ID.""" + with app.app_context(): + # Use unique client IDs for this test + client_a = "1" * 40 + client_b = "2" * 40 + log_event( + AuditEvent.PASTE_CREATE, AuditOutcome.SUCCESS, client_id=client_a, paste_id="cid_p1" + ) + log_event( + AuditEvent.PASTE_CREATE, AuditOutcome.SUCCESS, client_id=client_b, paste_id="cid_p2" + ) + + entries, total = query_audit_log(client_id=client_a) + + assert total >= 1 + assert all(e["client_id"] == client_a for e in entries) + + def test_query_by_outcome(self, app: Flask) -> None: + """Query should filter by outcome.""" + with app.app_context(): + log_event(AuditEvent.RATE_LIMIT, AuditOutcome.BLOCKED, paste_id="out_test") + log_event(AuditEvent.PASTE_CREATE, AuditOutcome.SUCCESS) + log_event(AuditEvent.POW_FAILURE, AuditOutcome.FAILURE) + + entries, total = query_audit_log(outcome="blocked") + + assert total >= 1 + assert all(e["outcome"] == "blocked" for e in entries) + + def test_query_by_timestamp_range(self, app: Flask) -> None: + """Query should filter by timestamp range.""" + with app.app_context(): + now = int(time.time()) + + log_event(AuditEvent.PASTE_CREATE, AuditOutcome.SUCCESS, paste_id="recent") + + entries, total = query_audit_log(since=now - 60, until=now + 60) + + assert total >= 1 + assert any(e["paste_id"] == "recent" for e in entries) + + def test_query_pagination(self, app: Flask) -> None: + """Query should support pagination.""" + with app.app_context(): + for i in range(5): + log_event(AuditEvent.PASTE_CREATE, AuditOutcome.SUCCESS, paste_id=f"p{i}") + + entries1, total = query_audit_log(limit=2, offset=0) + entries2, _ = query_audit_log(limit=2, offset=2) + + assert total >= 5 + assert len(entries1) == 2 + assert len(entries2) == 2 + assert entries1[0]["paste_id"] != entries2[0]["paste_id"] + + +class TestAuditCleanup: + """Test audit log cleanup functionality.""" + + def test_cleanup_old_logs(self, app: Flask) -> None: + """Cleanup should delete old entries.""" + with app.app_context(): + db = get_db() + old_ts = int(time.time()) - (100 * 24 * 60 * 60) # 100 days ago + + # Insert old entry directly + db.execute( + """INSERT INTO audit_log + (timestamp, event_type, outcome) + VALUES (?, 'old_event', 'success')""", + (old_ts,), + ) + db.commit() + + # Insert recent entry + log_event(AuditEvent.PASTE_CREATE, AuditOutcome.SUCCESS, paste_id="recent") + + # Cleanup with 90 day retention + deleted = cleanup_old_audit_logs(retention_days=90) + + assert deleted == 1 + + entries, _total = query_audit_log() + assert all(e["event_type"] != "old_event" for e in entries) + + +class TestAuditEndpoint: + """Test GET /audit endpoint.""" + + def test_audit_requires_auth(self, client: FlaskClient) -> None: + """Audit endpoint should require authentication.""" + response = client.get("/audit") + assert response.status_code == 401 + + def test_audit_requires_admin(self, app: Flask) -> None: + """Audit endpoint should require admin access.""" + client = app.test_client() + client.environ_base["HTTP_X_SSL_CLIENT_SHA1"] = "c" * 40 # Non-admin + + response = client.get("/audit") + assert response.status_code == 403 + + def test_audit_admin_access(self, admin_client: FlaskClient, app: Flask) -> None: + """Admin should be able to query audit log.""" + with app.app_context(): + log_event(AuditEvent.PASTE_CREATE, AuditOutcome.SUCCESS, paste_id="test123") + + response = admin_client.get("/audit") + assert response.status_code == 200 + + data = response.get_json() + assert "entries" in data + assert "total" in data + assert "count" in data + + def test_audit_query_params(self, admin_client: FlaskClient, app: Flask) -> None: + """Audit endpoint should accept query parameters.""" + with app.app_context(): + log_event(AuditEvent.PASTE_CREATE, AuditOutcome.SUCCESS, paste_id="filtered") + log_event(AuditEvent.PASTE_DELETE, AuditOutcome.SUCCESS, paste_id="other") + + response = admin_client.get("/audit?event_type=paste_create") + assert response.status_code == 200 + + data = response.get_json() + assert all(e["event_type"] == "paste_create" for e in data["entries"]) + + +class TestAuditIntegration: + """Test audit logging integration with paste operations.""" + + def test_paste_create_logs_event(self, client: FlaskClient, app: Flask) -> None: + """Paste creation should log audit event.""" + response = client.post("/", data=b"test content") + assert response.status_code == 201 + + paste_id = response.get_json()["id"] + + with app.app_context(): + entries, _ = query_audit_log(paste_id=paste_id, event_type="paste_create") + assert len(entries) == 1 + assert entries[0]["outcome"] == "success" + + def test_paste_access_logs_event(self, client: FlaskClient, app: Flask) -> None: + """Paste access should log audit event.""" + # Create paste + create_response = client.post("/", data=b"access test") + paste_id = create_response.get_json()["id"] + + # Access paste + client.get(f"/{paste_id}/raw") + + with app.app_context(): + entries, _ = query_audit_log(paste_id=paste_id, event_type="paste_access") + assert len(entries) == 1 + assert entries[0]["outcome"] == "success" + + def test_rate_limit_logs_event(self, app: Flask) -> None: + """Rate limit block should log audit event.""" + app.config["RATE_LIMIT_ENABLED"] = True + app.config["RATE_LIMIT_WINDOW"] = 60 + app.config["RATE_LIMIT_MAX"] = 1 + + client = app.test_client() + + # First request should succeed + client.post("/", data=b"first") + + # Second request should be rate limited + response = client.post("/", data=b"second") + assert response.status_code == 429 + + with app.app_context(): + entries, _ = query_audit_log(event_type="rate_limit") + assert len(entries) >= 1 + assert entries[0]["outcome"] == "blocked" diff --git a/tests/test_metrics.py b/tests/test_metrics.py new file mode 100644 index 0000000..4e26c2d --- /dev/null +++ b/tests/test_metrics.py @@ -0,0 +1,179 @@ +"""Tests for Prometheus metrics module.""" + +from __future__ import annotations + +from typing import TYPE_CHECKING +from unittest.mock import patch + +import pytest + +from app import create_app +from app.metrics import ( + record_dedup, + record_paste_accessed, + record_paste_created, + record_paste_deleted, + record_pow, + record_rate_limit, + setup_custom_metrics, +) + +if TYPE_CHECKING: + from flask import Flask + from flask.testing import FlaskClient + + +@pytest.fixture +def app() -> Flask: + """Create application configured for testing.""" + app = create_app("testing") + app.config["POW_DIFFICULTY"] = 0 + app.config["RATE_LIMIT_ENABLED"] = False + return app + + +@pytest.fixture +def client(app: Flask) -> FlaskClient: + """Create test client.""" + return app.test_client() + + +class TestMetricsFunctions: + """Test that metrics functions execute without error.""" + + def test_record_paste_created_no_error(self) -> None: + """record_paste_created should not raise when counters not initialized.""" + # In testing mode, counters are None, should gracefully no-op + record_paste_created("anonymous", "success") + record_paste_created("authenticated", "success") + record_paste_created("anonymous", "failure") + + def test_record_paste_accessed_no_error(self) -> None: + """record_paste_accessed should not raise when counters not initialized.""" + record_paste_accessed("anonymous", False) + record_paste_accessed("authenticated", True) + + def test_record_paste_deleted_no_error(self) -> None: + """record_paste_deleted should not raise when counters not initialized.""" + record_paste_deleted("authenticated", "success") + record_paste_deleted("authenticated", "failure") + + def test_record_rate_limit_no_error(self) -> None: + """record_rate_limit should not raise when counters not initialized.""" + record_rate_limit("allowed") + record_rate_limit("blocked") + + def test_record_pow_no_error(self) -> None: + """record_pow should not raise when counters not initialized.""" + record_pow("success") + record_pow("failure") + + def test_record_dedup_no_error(self) -> None: + """record_dedup should not raise when counters not initialized.""" + record_dedup("allowed") + record_dedup("blocked") + + +class TestMetricsSetup: + """Test metrics initialization.""" + + def test_setup_skipped_in_testing(self, app: Flask) -> None: + """setup_custom_metrics should skip initialization in testing mode.""" + with app.app_context(): + # Should not raise even though prometheus_client is available + setup_custom_metrics(app) + + def test_setup_handles_missing_prometheus(self, app: Flask) -> None: + """setup_custom_metrics should handle missing prometheus_client gracefully.""" + app.config["TESTING"] = False + + with app.app_context(), patch.dict("sys.modules", {"prometheus_client": None}): + # Should not raise, just log warning + setup_custom_metrics(app) + + +class TestMetricsIntegration: + """Test metrics are recorded during API operations.""" + + def test_paste_create_records_metric(self, client: FlaskClient) -> None: + """Paste creation should call record_paste_created.""" + with patch("app.api.routes.record_paste_created") as mock_record: + response = client.post("/", data=b"test content") + assert response.status_code == 201 + mock_record.assert_called_once_with("anonymous", "success") + + def test_paste_access_records_metric(self, client: FlaskClient) -> None: + """Paste access should call record_paste_accessed.""" + # Create paste first + create_response = client.post("/", data=b"access test") + paste_id = create_response.get_json()["id"] + + with patch("app.api.routes.record_paste_accessed") as mock_record: + response = client.get(f"/{paste_id}/raw") + assert response.status_code == 200 + mock_record.assert_called_once_with("anonymous", False) + + def test_burn_after_read_records_metric(self, client: FlaskClient) -> None: + """Burn-after-read access should record burn=True.""" + # Create burn-after-read paste + create_response = client.post( + "/", + data=b"burn test", + headers={"X-Burn-After-Read": "true"}, + ) + paste_id = create_response.get_json()["id"] + + with patch("app.api.routes.record_paste_accessed") as mock_record: + response = client.get(f"/{paste_id}/raw") + assert response.status_code == 200 + mock_record.assert_called_once_with("anonymous", True) + + def test_dedup_allowed_records_metric(self, client: FlaskClient) -> None: + """Successful paste creation should call record_dedup with 'allowed'.""" + with patch("app.api.routes.record_dedup") as mock_record: + response = client.post("/", data=b"unique content for dedup test") + assert response.status_code == 201 + mock_record.assert_called_once_with("allowed") + + def test_pow_success_records_metric(self, app: Flask) -> None: + """Successful PoW verification should call record_pow with 'success'.""" + app.config["POW_DIFFICULTY"] = 8 + client = app.test_client() + + # Get a challenge + challenge_resp = client.get("/challenge") + challenge = challenge_resp.get_json() + + # Solve it (brute force for testing) + import hashlib + + nonce = challenge["nonce"] + difficulty = challenge["difficulty"] + token = challenge["token"] + + solution = 0 + while True: + work = f"{nonce}:{solution}".encode() + hash_bytes = hashlib.sha256(work).digest() + zero_bits = 0 + for byte in hash_bytes: + if byte == 0: + zero_bits += 8 + else: + zero_bits += 8 - byte.bit_length() + break + if zero_bits >= difficulty: + break + solution += 1 + + with patch("app.api.routes.record_pow") as mock_record: + response = client.post( + "/", + data=b"pow test content", + headers={ + "X-PoW-Token": token, + "X-PoW-Solution": str(solution), + }, + ) + assert response.status_code == 201 + mock_record.assert_called_with("success")