feat: add observability and CLI enhancements
Some checks failed
CI / Lint & Format (push) Failing after 16s
CI / Tests (push) Has been skipped
CI / Security Scan (push) Failing after 20s

Audit logging:
- audit_log table with event tracking
- app/audit.py module with log_event(), query_audit_log()
- GET /audit endpoint (admin only)
- configurable retention and cleanup

Prometheus metrics:
- app/metrics.py with custom counters
- paste create/access/delete, rate limit, PoW, dedup metrics
- instrumentation in API routes

CLI clipboard integration:
- fpaste create -C/--clipboard (read from clipboard)
- fpaste create --copy-url (copy result URL)
- fpaste get -c/--copy (copy content)
- cross-platform: xclip, xsel, pbcopy, wl-copy

Shell completions:
- completions/ directory with bash/zsh/fish scripts
- fpaste completion --shell command
This commit is contained in:
Username
2025-12-23 22:39:50 +01:00
parent 4d08a4467d
commit 7063f8718e
13 changed files with 2003 additions and 47 deletions

View File

@@ -190,6 +190,11 @@ def setup_metrics(app: Flask) -> None:
metrics.info("flaskpaste_info", "FlaskPaste application info", version=VERSION)
app.extensions["metrics"] = metrics
# Setup custom metrics
from app.metrics import setup_custom_metrics
setup_custom_metrics(app)
except ImportError:
app.logger.warning("prometheus_flask_exporter not available, metrics disabled")

View File

@@ -13,11 +13,13 @@ _cleanup_times: dict[str, float] = {
"pastes": 0.0,
"hashes": 0.0,
"rate_limits": 0.0,
"audit": 0.0,
}
_CLEANUP_INTERVALS = {
"pastes": 3600, # 1 hour
"hashes": 900, # 15 minutes
"rate_limits": 300, # 5 minutes
"audit": 86400, # 24 hours
}
@@ -61,5 +63,15 @@ def run_scheduled_cleanup():
if count > 0:
current_app.logger.info(f"Cleaned up {count} rate limit entries")
# Cleanup old audit logs
if now - _cleanup_times["audit"] >= _CLEANUP_INTERVALS["audit"]:
_cleanup_times["audit"] = now
if current_app.config.get("AUDIT_ENABLED", True):
from app.audit import cleanup_old_audit_logs
count = cleanup_old_audit_logs()
if count > 0:
current_app.logger.info(f"Cleaned up {count} old audit log entries")
from app.api import routes # noqa: E402, F401

View File

@@ -19,6 +19,14 @@ from flask.views import MethodView
from app.api import bp
from app.config import VERSION
from app.database import check_content_hash, get_db, hash_password, verify_password
from app.metrics import (
record_dedup,
record_paste_accessed,
record_paste_created,
record_paste_deleted,
record_pow,
record_rate_limit,
)
if TYPE_CHECKING:
from sqlite3 import Row
@@ -246,6 +254,16 @@ def prefixed_url(path: str) -> str:
return f"{url_prefix()}{path}"
def paste_url(paste_id: str) -> str:
"""Generate URL for paste metadata endpoint."""
return prefixed_url(f"/{paste_id}")
def paste_raw_url(paste_id: str) -> str:
"""Generate URL for raw paste content endpoint."""
return prefixed_url(f"/{paste_id}/raw")
def base_url() -> str:
"""Detect full base URL from request headers."""
scheme = (
@@ -257,6 +275,59 @@ def base_url() -> str:
return f"{scheme}://{host}{url_prefix()}"
# ─────────────────────────────────────────────────────────────────────────────
# Response Builders
# ─────────────────────────────────────────────────────────────────────────────
def build_paste_metadata(
paste_id: str,
mime_type: str,
size: int,
created_at: int,
*,
owner: str | None = None,
burn_after_read: bool = False,
expires_at: int | None = None,
password_protected: bool = False,
include_owner: bool = False,
last_accessed: int | None = None,
) -> dict[str, Any]:
"""Build standardized paste metadata response dict.
Args:
paste_id: Paste identifier
mime_type: Content MIME type
size: Content size in bytes
created_at: Creation timestamp
owner: Owner fingerprint (included only if include_owner=True)
burn_after_read: Whether paste is burn-after-read
expires_at: Expiration timestamp
password_protected: Whether paste has password
include_owner: Whether to include owner in response
last_accessed: Last access timestamp (optional)
"""
data: dict[str, Any] = {
"id": paste_id,
"mime_type": mime_type,
"size": size,
"created_at": created_at,
"url": paste_url(paste_id),
"raw": paste_raw_url(paste_id),
}
if last_accessed is not None:
data["last_accessed"] = last_accessed
if include_owner and owner:
data["owner"] = owner
if burn_after_read:
data["burn_after_read"] = True
if expires_at:
data["expires_at"] = expires_at
if password_protected:
data["password_protected"] = True
return data
# ─────────────────────────────────────────────────────────────────────────────
# Validation Helpers (used within views)
# ─────────────────────────────────────────────────────────────────────────────
@@ -703,6 +774,17 @@ class IndexView(MethodView):
current_app.logger.warning(
"Rate limit exceeded: ip=%s trusted=%s", client_ip, bool(trusted_client)
)
# Audit log rate limit event
if current_app.config.get("AUDIT_ENABLED", True):
from app.audit import AuditEvent, AuditOutcome, log_event
log_event(
AuditEvent.RATE_LIMIT,
AuditOutcome.BLOCKED,
client_id=owner,
client_ip=client_ip,
)
record_rate_limit("blocked")
response = error_response(
"Rate limit exceeded",
429,
@@ -729,8 +811,22 @@ class IndexView(MethodView):
current_app.logger.warning(
"PoW verification failed: %s from=%s", err, request.remote_addr
)
# Audit log PoW failure
if current_app.config.get("AUDIT_ENABLED", True):
from app.audit import AuditEvent, AuditOutcome, log_event
log_event(
AuditEvent.POW_FAILURE,
AuditOutcome.FAILURE,
client_id=owner,
client_ip=client_ip,
details={"error": err},
)
record_pow("failure")
return error_response(f"Proof-of-work failed: {err}", 400)
record_pow("success")
# Size limits (only trusted clients get elevated limits)
content_size = len(content)
max_size = (
@@ -807,6 +903,18 @@ class IndexView(MethodView):
dedup_count,
request.remote_addr,
)
# Audit log dedup block
if current_app.config.get("AUDIT_ENABLED", True):
from app.audit import AuditEvent, AuditOutcome, log_event
log_event(
AuditEvent.DEDUP_BLOCK,
AuditOutcome.BLOCKED,
client_id=owner,
client_ip=client_ip,
details={"hash": content_hash[:16], "count": dedup_count},
)
record_dedup("blocked")
return error_response(
"Duplicate content rate limit exceeded",
429,
@@ -814,6 +922,8 @@ class IndexView(MethodView):
window_seconds=window,
)
record_dedup("allowed")
# Parse optional headers
burn_header = request.headers.get("X-Burn-After-Read", "").strip().lower()
burn_after_read = burn_header in ("true", "1", "yes")
@@ -883,11 +993,11 @@ class IndexView(MethodView):
)
db.commit()
# Build response
# Build response (creation response is intentionally different - no size)
response_data: dict[str, Any] = {
"id": paste_id,
"url": f"/{paste_id}",
"raw": f"/{paste_id}/raw",
"url": paste_url(paste_id),
"raw": paste_raw_url(paste_id),
"mime_type": mime_type,
"created_at": now,
}
@@ -903,6 +1013,21 @@ class IndexView(MethodView):
# Record successful paste for anti-flood tracking
record_antiflood_request()
# Audit log paste creation
if current_app.config.get("AUDIT_ENABLED", True):
from app.audit import AuditEvent, AuditOutcome, log_event
log_event(
AuditEvent.PASTE_CREATE,
AuditOutcome.SUCCESS,
paste_id=paste_id,
client_id=owner,
client_ip=client_ip,
details={"size": content_size, "mime_type": mime_type},
)
record_paste_created("authenticated" if owner else "anonymous", "success")
return json_response(response_data, 201)
@@ -1135,21 +1260,17 @@ class PasteView(MethodView):
row: Row = g.paste
g.db.commit()
response_data: dict[str, Any] = {
"id": row["id"],
"mime_type": row["mime_type"],
"size": row["size"],
"created_at": row["created_at"],
"raw": f"/{paste_id}/raw",
}
if row["burn_after_read"]:
response_data["burn_after_read"] = True
if row["expires_at"]:
response_data["expires_at"] = row["expires_at"]
if row["password_hash"]:
response_data["password_protected"] = True
return json_response(response_data)
return json_response(
build_paste_metadata(
paste_id=row["id"],
mime_type=row["mime_type"],
size=row["size"],
created_at=row["created_at"],
burn_after_read=bool(row["burn_after_read"]),
expires_at=row["expires_at"],
password_protected=bool(row["password_hash"]),
)
)
def head(self, paste_id: str) -> Response:
"""Return paste metadata headers only."""
@@ -1254,6 +1375,19 @@ class PasteView(MethodView):
db.execute(update_sql, update_params)
db.commit()
# Audit log paste update
if current_app.config.get("AUDIT_ENABLED", True):
from app.audit import AuditEvent, AuditOutcome, log_event
log_event(
AuditEvent.PASTE_UPDATE,
AuditOutcome.SUCCESS,
paste_id=paste_id,
client_id=g.client_id,
client_ip=get_client_ip(),
details={"fields": [f.split(" = ")[0] for f in update_fields]},
)
# Fetch updated paste for response
updated = db.execute(
"""SELECT id, mime_type, length(content) as size, expires_at,
@@ -1296,6 +1430,24 @@ class PasteRawView(MethodView):
db.commit()
# Audit log paste access
if current_app.config.get("AUDIT_ENABLED", True):
from app.audit import AuditEvent, AuditOutcome, log_event
log_event(
AuditEvent.PASTE_ACCESS,
AuditOutcome.SUCCESS,
paste_id=paste_id,
client_id=get_client_fingerprint(),
client_ip=get_client_ip(),
details={"burn": bool(burn_after_read)},
)
record_paste_accessed(
"authenticated" if get_client_fingerprint() else "anonymous",
bool(burn_after_read),
)
response = Response(row["content"], mimetype=row["mime_type"])
if row["mime_type"].startswith(("image/", "text/")):
response.headers["Content-Disposition"] = "inline"
@@ -1350,6 +1502,20 @@ class PasteDeleteView(MethodView):
db.execute("DELETE FROM pastes WHERE id = ?", (paste_id,))
db.commit()
# Audit log paste deletion
if current_app.config.get("AUDIT_ENABLED", True):
from app.audit import AuditEvent, AuditOutcome, log_event
log_event(
AuditEvent.PASTE_DELETE,
AuditOutcome.SUCCESS,
paste_id=paste_id,
client_id=g.client_id,
client_ip=get_client_ip(),
)
record_paste_deleted("authenticated", "success")
return json_response({"message": "Paste deleted"})
@@ -1457,27 +1623,21 @@ class PastesListView(MethodView):
if type_filter:
rows = [r for r in rows if fnmatch.fnmatch(r["mime_type"], type_filter)]
pastes = []
for row in rows:
paste: dict[str, Any] = {
"id": row["id"],
"mime_type": row["mime_type"],
"size": row["size"],
"created_at": row["created_at"],
"last_accessed": row["last_accessed"],
"url": f"/{row['id']}",
"raw": f"/{row['id']}/raw",
}
# Include owner for admin view
if show_all and row["owner"]:
paste["owner"] = row["owner"]
if row["burn_after_read"]:
paste["burn_after_read"] = True
if row["expires_at"]:
paste["expires_at"] = row["expires_at"]
if row["password_protected"]:
paste["password_protected"] = True
pastes.append(paste)
pastes = [
build_paste_metadata(
paste_id=row["id"],
mime_type=row["mime_type"],
size=row["size"],
created_at=row["created_at"],
owner=row["owner"],
burn_after_read=bool(row["burn_after_read"]),
expires_at=row["expires_at"],
password_protected=bool(row["password_protected"]),
include_owner=show_all,
last_accessed=row["last_accessed"],
)
for row in rows
]
response_data: dict[str, Any] = {
"pastes": pastes,
@@ -1780,6 +1940,79 @@ class PKIRevokeView(MethodView):
return json_response({"message": "Certificate revoked", "serial": serial})
# ─────────────────────────────────────────────────────────────────────────────
# Audit Log View
# ─────────────────────────────────────────────────────────────────────────────
class AuditLogView(MethodView):
"""Audit log query endpoint (admin only)."""
def get(self) -> Response:
"""Query audit log with filters.
Query parameters:
- event_type: Filter by event type
- client_id: Filter by client fingerprint
- paste_id: Filter by paste ID
- outcome: Filter by outcome (success, failure, blocked)
- since: Filter by timestamp >= since
- until: Filter by timestamp <= until
- limit: Maximum results (default 100, max 500)
- offset: Pagination offset
"""
if err := require_auth():
return err
if not is_admin():
return error_response("Admin access required", 403)
from app.audit import query_audit_log
# Parse query parameters
event_type = request.args.get("event_type", "").strip() or None
client_id = request.args.get("client_id", "").strip() or None
paste_id = request.args.get("paste_id", "").strip() or None
outcome = request.args.get("outcome", "").strip() or None
try:
since = int(request.args.get("since", 0)) or None
except (ValueError, TypeError):
since = None
try:
until = int(request.args.get("until", 0)) or None
except (ValueError, TypeError):
until = None
try:
limit = min(int(request.args.get("limit", 100)), 500)
except (ValueError, TypeError):
limit = 100
try:
offset = max(int(request.args.get("offset", 0)), 0)
except (ValueError, TypeError):
offset = 0
entries, total = query_audit_log(
event_type=event_type,
client_id=client_id,
paste_id=paste_id,
outcome=outcome,
since=since,
until=until,
limit=limit,
offset=offset,
)
return json_response(
{
"entries": entries,
"count": len(entries),
"total": total,
"limit": limit,
"offset": offset,
}
)
# ─────────────────────────────────────────────────────────────────────────────
# Route Registration
# ─────────────────────────────────────────────────────────────────────────────
@@ -1817,3 +2050,6 @@ bp.add_url_rule("/pki/certs", view_func=PKICertsView.as_view("pki_certs"))
bp.add_url_rule(
"/pki/revoke/<serial>", view_func=PKIRevokeView.as_view("pki_revoke"), methods=["POST"]
)
# Audit log endpoint (admin only)
bp.add_url_rule("/audit", view_func=AuditLogView.as_view("audit_log"))

206
app/audit.py Normal file
View File

@@ -0,0 +1,206 @@
"""Audit logging for security events."""
from __future__ import annotations
import json
import time
from enum import Enum
from typing import TYPE_CHECKING, Any
from flask import current_app, g
from app.database import get_db
if TYPE_CHECKING:
from sqlite3 import Row
class AuditEvent(str, Enum):
"""Security-relevant event types for audit logging."""
PASTE_CREATE = "paste_create"
PASTE_ACCESS = "paste_access"
PASTE_DELETE = "paste_delete"
PASTE_UPDATE = "paste_update"
RATE_LIMIT = "rate_limit"
POW_FAILURE = "pow_failure"
DEDUP_BLOCK = "dedup_block"
CERT_ISSUED = "cert_issued"
CERT_REVOKED = "cert_revoked"
AUTH_FAILURE = "auth_failure"
class AuditOutcome(str, Enum):
"""Outcome types for audit events."""
SUCCESS = "success"
FAILURE = "failure"
BLOCKED = "blocked"
def log_event(
event_type: AuditEvent | str,
outcome: AuditOutcome | str,
*,
paste_id: str | None = None,
client_id: str | None = None,
client_ip: str | None = None,
details: dict[str, Any] | None = None,
) -> None:
"""Log a security-relevant event to the audit log.
Args:
event_type: Type of event (from AuditEvent enum or string)
outcome: Result of the event (success, failure, blocked)
paste_id: Related paste ID (if applicable)
client_id: Client certificate fingerprint (if authenticated)
client_ip: Client IP address
details: Additional event-specific details (stored as JSON)
"""
if not current_app.config.get("AUDIT_ENABLED", True):
return
now = int(time.time())
request_id = getattr(g, "request_id", None)
# Convert enums to strings
event_type_str = event_type.value if isinstance(event_type, AuditEvent) else event_type
outcome_str = outcome.value if isinstance(outcome, AuditOutcome) else outcome
# Serialize details to JSON if provided
details_json = json.dumps(details, ensure_ascii=False) if details else None
db = get_db()
db.execute(
"""INSERT INTO audit_log
(timestamp, event_type, client_id, client_ip, paste_id, request_id, outcome, details)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)""",
(
now,
event_type_str,
client_id,
client_ip,
paste_id,
request_id,
outcome_str,
details_json,
),
)
db.commit()
def query_audit_log(
*,
event_type: str | None = None,
client_id: str | None = None,
paste_id: str | None = None,
outcome: str | None = None,
since: int | None = None,
until: int | None = None,
limit: int = 100,
offset: int = 0,
) -> tuple[list[dict[str, Any]], int]:
"""Query audit log with filters.
Args:
event_type: Filter by event type
client_id: Filter by client fingerprint
paste_id: Filter by paste ID
outcome: Filter by outcome (success, failure, blocked)
since: Filter by timestamp >= since
until: Filter by timestamp <= until
limit: Maximum results to return
offset: Pagination offset
Returns:
Tuple of (list of audit entries, total count matching filters)
"""
db = get_db()
where_clauses: list[str] = []
params: list[Any] = []
if event_type:
where_clauses.append("event_type = ?")
params.append(event_type)
if client_id:
where_clauses.append("client_id = ?")
params.append(client_id)
if paste_id:
where_clauses.append("paste_id = ?")
params.append(paste_id)
if outcome:
where_clauses.append("outcome = ?")
params.append(outcome)
if since:
where_clauses.append("timestamp >= ?")
params.append(since)
if until:
where_clauses.append("timestamp <= ?")
params.append(until)
where_sql = " AND ".join(where_clauses) if where_clauses else "1=1"
# Get total count
count_row = db.execute(
f"SELECT COUNT(*) as total FROM audit_log WHERE {where_sql}", # noqa: S608
params,
).fetchone()
total = count_row["total"] if count_row else 0
# Fetch entries
rows: list[Row] = db.execute(
f"""SELECT id, timestamp, event_type, client_id, client_ip,
paste_id, request_id, outcome, details
FROM audit_log
WHERE {where_sql}
ORDER BY timestamp DESC
LIMIT ? OFFSET ?""", # noqa: S608
[*params, limit, offset],
).fetchall()
entries = []
for row in rows:
entry: dict[str, Any] = {
"id": row["id"],
"timestamp": row["timestamp"],
"event_type": row["event_type"],
"outcome": row["outcome"],
}
if row["client_id"]:
entry["client_id"] = row["client_id"]
if row["client_ip"]:
entry["client_ip"] = row["client_ip"]
if row["paste_id"]:
entry["paste_id"] = row["paste_id"]
if row["request_id"]:
entry["request_id"] = row["request_id"]
if row["details"]:
try:
entry["details"] = json.loads(row["details"])
except json.JSONDecodeError:
entry["details"] = row["details"]
entries.append(entry)
return entries, total
def cleanup_old_audit_logs(retention_days: int | None = None) -> int:
"""Delete audit log entries older than retention period.
Args:
retention_days: Number of days to keep. If None, uses config.
Returns:
Number of deleted entries.
"""
if retention_days is None:
retention_days = current_app.config.get("AUDIT_RETENTION_DAYS", 90)
cutoff = int(time.time()) - (retention_days * 24 * 60 * 60)
db = get_db()
cursor = db.execute("DELETE FROM audit_log WHERE timestamp < ?", (cutoff,))
db.commit()
return cursor.rowcount

View File

@@ -100,6 +100,11 @@ class Config:
# Authenticated users get higher limits (multiplier)
RATE_LIMIT_AUTH_MULTIPLIER = int(os.environ.get("FLASKPASTE_RATE_AUTH_MULT", "5"))
# Audit Logging
# Track security-relevant events (paste creation, deletion, rate limits, etc.)
AUDIT_ENABLED = os.environ.get("FLASKPASTE_AUDIT", "1").lower() in ("1", "true", "yes")
AUDIT_RETENTION_DAYS = int(os.environ.get("FLASKPASTE_AUDIT_RETENTION", "90"))
# PKI Configuration
# Enable PKI endpoints for certificate authority and issuance
PKI_ENABLED = os.environ.get("FLASKPASTE_PKI_ENABLED", "0").lower() in ("1", "true", "yes")

View File

@@ -73,6 +73,23 @@ CREATE TABLE IF NOT EXISTS issued_certificates (
CREATE INDEX IF NOT EXISTS idx_certs_fingerprint ON issued_certificates(fingerprint_sha1);
CREATE INDEX IF NOT EXISTS idx_certs_status ON issued_certificates(status);
CREATE INDEX IF NOT EXISTS idx_certs_ca_id ON issued_certificates(ca_id);
-- Audit log for security events
CREATE TABLE IF NOT EXISTS audit_log (
id INTEGER PRIMARY KEY AUTOINCREMENT,
timestamp INTEGER NOT NULL,
event_type TEXT NOT NULL,
client_id TEXT,
client_ip TEXT,
paste_id TEXT,
request_id TEXT,
outcome TEXT NOT NULL,
details TEXT
);
CREATE INDEX IF NOT EXISTS idx_audit_timestamp ON audit_log(timestamp);
CREATE INDEX IF NOT EXISTS idx_audit_event_type ON audit_log(event_type);
CREATE INDEX IF NOT EXISTS idx_audit_client_id ON audit_log(client_id);
"""
# Password hashing constants

160
app/metrics.py Normal file
View File

@@ -0,0 +1,160 @@
"""Custom Prometheus metrics for FlaskPaste."""
from __future__ import annotations
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from flask import Flask
# Metrics are only initialized when not testing
_paste_created_counter = None
_paste_accessed_counter = None
_paste_deleted_counter = None
_rate_limit_counter = None
_pow_counter = None
_dedup_counter = None
_request_duration_histogram = None
def setup_custom_metrics(app: Flask) -> None:
"""Initialize custom Prometheus metrics.
Should be called after prometheus_flask_exporter is set up.
"""
global _paste_created_counter, _paste_accessed_counter, _paste_deleted_counter
global _rate_limit_counter, _pow_counter, _dedup_counter, _request_duration_histogram
if app.config.get("TESTING"):
return
try:
from prometheus_client import Counter, Histogram
except ImportError:
app.logger.warning("prometheus_client not available, custom metrics disabled")
return
_paste_created_counter = Counter(
"flaskpaste_paste_created_total",
"Total number of paste creation attempts",
["auth_type", "outcome"],
)
_paste_accessed_counter = Counter(
"flaskpaste_paste_accessed_total",
"Total number of paste accesses",
["auth_type", "burn"],
)
_paste_deleted_counter = Counter(
"flaskpaste_paste_deleted_total",
"Total number of paste deletion attempts",
["auth_type", "outcome"],
)
_rate_limit_counter = Counter(
"flaskpaste_rate_limit_total",
"Total number of rate limit events",
["outcome"],
)
_pow_counter = Counter(
"flaskpaste_pow_total",
"Total number of proof-of-work validations",
["outcome"],
)
_dedup_counter = Counter(
"flaskpaste_dedup_total",
"Total number of content deduplication events",
["outcome"],
)
_request_duration_histogram = Histogram(
"flaskpaste_request_duration_seconds",
"Request duration in seconds",
["method", "endpoint", "status"],
buckets=(0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0),
)
app.logger.info("Custom Prometheus metrics initialized")
def record_paste_created(auth_type: str, outcome: str) -> None:
"""Record a paste creation attempt.
Args:
auth_type: "authenticated" or "anonymous"
outcome: "success", "failure", or "blocked"
"""
if _paste_created_counter:
_paste_created_counter.labels(auth_type=auth_type, outcome=outcome).inc()
def record_paste_accessed(auth_type: str, burn: bool) -> None:
"""Record a paste access.
Args:
auth_type: "authenticated" or "anonymous"
burn: True if paste was burn-after-read
"""
if _paste_accessed_counter:
_paste_accessed_counter.labels(auth_type=auth_type, burn=str(burn).lower()).inc()
def record_paste_deleted(auth_type: str, outcome: str) -> None:
"""Record a paste deletion attempt.
Args:
auth_type: "authenticated" or "anonymous"
outcome: "success" or "failure"
"""
if _paste_deleted_counter:
_paste_deleted_counter.labels(auth_type=auth_type, outcome=outcome).inc()
def record_rate_limit(outcome: str) -> None:
"""Record a rate limit event.
Args:
outcome: "allowed" or "blocked"
"""
if _rate_limit_counter:
_rate_limit_counter.labels(outcome=outcome).inc()
def record_pow(outcome: str) -> None:
"""Record a proof-of-work validation.
Args:
outcome: "success" or "failure"
"""
if _pow_counter:
_pow_counter.labels(outcome=outcome).inc()
def record_dedup(outcome: str) -> None:
"""Record a content deduplication event.
Args:
outcome: "allowed" or "blocked"
"""
if _dedup_counter:
_dedup_counter.labels(outcome=outcome).inc()
def observe_request_duration(
method: str, endpoint: str, status: int, duration: float
) -> None:
"""Record request duration.
Args:
method: HTTP method (GET, POST, etc.)
endpoint: Request endpoint path
status: HTTP status code
duration: Request duration in seconds
"""
if _request_duration_histogram:
_request_duration_histogram.labels(
method=method, endpoint=endpoint, status=str(status)
).observe(duration)

202
completions/fpaste.bash Normal file
View File

@@ -0,0 +1,202 @@
# Bash completion for fpaste
# Install: source this file or copy to /etc/bash_completion.d/fpaste
_fpaste_completions() {
local cur prev words cword
_init_completion || return
local commands="create c new get g delete d rm info i list ls search s find update u export register cert pki completion"
local pki_commands="status issue download dl"
# Handle command-level completion
if [[ $cword -eq 1 ]]; then
COMPREPLY=($(compgen -W "$commands" -- "$cur"))
return
fi
local cmd="${words[1]}"
# PKI subcommand completion
if [[ "$cmd" == "pki" && $cword -eq 2 ]]; then
COMPREPLY=($(compgen -W "$pki_commands" -- "$cur"))
return
fi
# Option completion based on command
case "$cmd" in
create|c|new)
case "$prev" in
-x|--expiry)
# Suggest common expiry values
COMPREPLY=($(compgen -W "60 300 600 3600 86400 604800" -- "$cur"))
return
;;
-p|--password)
return
;;
esac
if [[ "$cur" == -* ]]; then
COMPREPLY=($(compgen -W "-E --no-encrypt -b --burn -x --expiry -p --password -r --raw -q --quiet -C --clipboard --copy-url" -- "$cur"))
else
_filedir
fi
;;
get|g)
case "$prev" in
-o|--output)
_filedir
return
;;
-p|--password)
return
;;
esac
if [[ "$cur" == -* ]]; then
COMPREPLY=($(compgen -W "-o --output -c --copy -p --password -m --meta" -- "$cur"))
fi
;;
delete|d|rm)
case "$prev" in
-c|--confirm)
return
;;
esac
if [[ "$cur" == -* ]]; then
COMPREPLY=($(compgen -W "-a --all -c --confirm" -- "$cur"))
fi
;;
info|i)
# No options
;;
list|ls)
case "$prev" in
-l|--limit|-o|--offset)
return
;;
esac
if [[ "$cur" == -* ]]; then
COMPREPLY=($(compgen -W "-a --all -l --limit -o --offset --json" -- "$cur"))
fi
;;
search|s|find)
case "$prev" in
-t|--type)
COMPREPLY=($(compgen -W "text/* image/* application/*" -- "$cur"))
return
;;
--after|--before)
return
;;
-l|--limit)
return
;;
esac
if [[ "$cur" == -* ]]; then
COMPREPLY=($(compgen -W "-t --type --after --before -l --limit --json" -- "$cur"))
fi
;;
update|u)
case "$prev" in
-x|--expiry)
COMPREPLY=($(compgen -W "60 300 600 3600 86400 604800" -- "$cur"))
return
;;
-p|--password)
return
;;
esac
if [[ "$cur" == -* ]]; then
COMPREPLY=($(compgen -W "-E --no-encrypt -p --password --remove-password -x --expiry -q --quiet" -- "$cur"))
else
_filedir
fi
;;
export)
case "$prev" in
-o|--output|-k|--keyfile)
_filedir
return
;;
esac
if [[ "$cur" == -* ]]; then
COMPREPLY=($(compgen -W "-o --output -k --keyfile --manifest -q --quiet" -- "$cur"))
fi
;;
register)
case "$prev" in
-n|--name|-o|--output)
return
;;
esac
if [[ "$cur" == -* ]]; then
COMPREPLY=($(compgen -W "-n --name -o --output --configure --p12-only -f --force -q --quiet" -- "$cur"))
fi
;;
cert)
case "$prev" in
-o|--output)
_filedir -d
return
;;
-a|--algorithm)
COMPREPLY=($(compgen -W "rsa ec" -- "$cur"))
return
;;
-c|--curve)
COMPREPLY=($(compgen -W "secp256r1 secp384r1 secp521r1" -- "$cur"))
return
;;
-b|--bits|-d|--days|-n|--name|--password-key)
return
;;
esac
if [[ "$cur" == -* ]]; then
COMPREPLY=($(compgen -W "-o --output -a --algorithm -b --bits -c --curve -d --days -n --name --password-key --configure -f --force" -- "$cur"))
fi
;;
pki)
local pki_cmd="${words[2]}"
case "$pki_cmd" in
issue)
case "$prev" in
-n|--name|-o|--output)
return
;;
esac
if [[ "$cur" == -* ]]; then
COMPREPLY=($(compgen -W "-n --name -o --output --configure -f --force" -- "$cur"))
fi
;;
download|dl)
case "$prev" in
-o|--output)
_filedir
return
;;
esac
if [[ "$cur" == -* ]]; then
COMPREPLY=($(compgen -W "-o --output --configure" -- "$cur"))
fi
;;
esac
;;
completion)
case "$prev" in
--shell)
COMPREPLY=($(compgen -W "bash zsh fish" -- "$cur"))
return
;;
esac
if [[ "$cur" == -* ]]; then
COMPREPLY=($(compgen -W "--shell" -- "$cur"))
fi
;;
esac
# Global options
if [[ "$cur" == -* && $cword -eq 1 ]]; then
COMPREPLY=($(compgen -W "-s --server -h --help" -- "$cur"))
fi
}
complete -F _fpaste_completions fpaste

148
completions/fpaste.fish Normal file
View File

@@ -0,0 +1,148 @@
# Fish completion for fpaste
# Install: copy to ~/.config/fish/completions/fpaste.fish
# Disable file completions by default
complete -c fpaste -f
# Helper function to check if command is specified
function __fpaste_needs_command
set -l cmd (commandline -opc)
if test (count $cmd) -eq 1
return 0
end
return 1
end
function __fpaste_using_command
set -l cmd (commandline -opc)
if test (count $cmd) -gt 1
if test $argv[1] = $cmd[2]
return 0
end
end
return 1
end
function __fpaste_using_pki_command
set -l cmd (commandline -opc)
if test (count $cmd) -gt 2
if test $cmd[2] = 'pki' -a $argv[1] = $cmd[3]
return 0
end
end
return 1
end
# Global options
complete -c fpaste -s s -l server -d 'Server URL'
complete -c fpaste -s h -l help -d 'Show help'
# Commands
complete -c fpaste -n '__fpaste_needs_command' -a 'create' -d 'Create a new paste'
complete -c fpaste -n '__fpaste_needs_command' -a 'c' -d 'Create a new paste'
complete -c fpaste -n '__fpaste_needs_command' -a 'new' -d 'Create a new paste'
complete -c fpaste -n '__fpaste_needs_command' -a 'get' -d 'Retrieve a paste'
complete -c fpaste -n '__fpaste_needs_command' -a 'g' -d 'Retrieve a paste'
complete -c fpaste -n '__fpaste_needs_command' -a 'delete' -d 'Delete paste(s)'
complete -c fpaste -n '__fpaste_needs_command' -a 'd' -d 'Delete paste(s)'
complete -c fpaste -n '__fpaste_needs_command' -a 'rm' -d 'Delete paste(s)'
complete -c fpaste -n '__fpaste_needs_command' -a 'info' -d 'Show server info'
complete -c fpaste -n '__fpaste_needs_command' -a 'i' -d 'Show server info'
complete -c fpaste -n '__fpaste_needs_command' -a 'list' -d 'List your pastes'
complete -c fpaste -n '__fpaste_needs_command' -a 'ls' -d 'List your pastes'
complete -c fpaste -n '__fpaste_needs_command' -a 'search' -d 'Search your pastes'
complete -c fpaste -n '__fpaste_needs_command' -a 's' -d 'Search your pastes'
complete -c fpaste -n '__fpaste_needs_command' -a 'find' -d 'Search your pastes'
complete -c fpaste -n '__fpaste_needs_command' -a 'update' -d 'Update existing paste'
complete -c fpaste -n '__fpaste_needs_command' -a 'u' -d 'Update existing paste'
complete -c fpaste -n '__fpaste_needs_command' -a 'export' -d 'Export all pastes'
complete -c fpaste -n '__fpaste_needs_command' -a 'register' -d 'Register and get certificate'
complete -c fpaste -n '__fpaste_needs_command' -a 'cert' -d 'Generate client certificate'
complete -c fpaste -n '__fpaste_needs_command' -a 'pki' -d 'PKI operations'
complete -c fpaste -n '__fpaste_needs_command' -a 'completion' -d 'Generate shell completion'
# create command options
complete -c fpaste -n '__fpaste_using_command create; or __fpaste_using_command c; or __fpaste_using_command new' -s E -l no-encrypt -d 'Disable encryption'
complete -c fpaste -n '__fpaste_using_command create; or __fpaste_using_command c; or __fpaste_using_command new' -s b -l burn -d 'Burn after read'
complete -c fpaste -n '__fpaste_using_command create; or __fpaste_using_command c; or __fpaste_using_command new' -s x -l expiry -d 'Expiry in seconds' -x
complete -c fpaste -n '__fpaste_using_command create; or __fpaste_using_command c; or __fpaste_using_command new' -s p -l password -d 'Password protect' -x
complete -c fpaste -n '__fpaste_using_command create; or __fpaste_using_command c; or __fpaste_using_command new' -s r -l raw -d 'Output raw URL'
complete -c fpaste -n '__fpaste_using_command create; or __fpaste_using_command c; or __fpaste_using_command new' -s q -l quiet -d 'Output ID only'
complete -c fpaste -n '__fpaste_using_command create; or __fpaste_using_command c; or __fpaste_using_command new' -s C -l clipboard -d 'Read from clipboard'
complete -c fpaste -n '__fpaste_using_command create; or __fpaste_using_command c; or __fpaste_using_command new' -l copy-url -d 'Copy result URL to clipboard'
complete -c fpaste -n '__fpaste_using_command create; or __fpaste_using_command c; or __fpaste_using_command new' -F
# get command options
complete -c fpaste -n '__fpaste_using_command get; or __fpaste_using_command g' -s o -l output -d 'Save to file' -r
complete -c fpaste -n '__fpaste_using_command get; or __fpaste_using_command g' -s c -l copy -d 'Copy content to clipboard'
complete -c fpaste -n '__fpaste_using_command get; or __fpaste_using_command g' -s p -l password -d 'Password' -x
complete -c fpaste -n '__fpaste_using_command get; or __fpaste_using_command g' -s m -l meta -d 'Show metadata only'
# delete command options
complete -c fpaste -n '__fpaste_using_command delete; or __fpaste_using_command d; or __fpaste_using_command rm' -s a -l all -d 'Delete all pastes'
complete -c fpaste -n '__fpaste_using_command delete; or __fpaste_using_command d; or __fpaste_using_command rm' -s c -l confirm -d 'Confirm count' -x
# list command options
complete -c fpaste -n '__fpaste_using_command list; or __fpaste_using_command ls' -s a -l all -d 'List all pastes (admin)'
complete -c fpaste -n '__fpaste_using_command list; or __fpaste_using_command ls' -s l -l limit -d 'Max pastes' -x
complete -c fpaste -n '__fpaste_using_command list; or __fpaste_using_command ls' -s o -l offset -d 'Skip first N pastes' -x
complete -c fpaste -n '__fpaste_using_command list; or __fpaste_using_command ls' -l json -d 'Output as JSON'
# search command options
complete -c fpaste -n '__fpaste_using_command search; or __fpaste_using_command s; or __fpaste_using_command find' -s t -l type -d 'Filter by MIME type' -x
complete -c fpaste -n '__fpaste_using_command search; or __fpaste_using_command s; or __fpaste_using_command find' -l after -d 'Created after' -x
complete -c fpaste -n '__fpaste_using_command search; or __fpaste_using_command s; or __fpaste_using_command find' -l before -d 'Created before' -x
complete -c fpaste -n '__fpaste_using_command search; or __fpaste_using_command s; or __fpaste_using_command find' -s l -l limit -d 'Max results' -x
complete -c fpaste -n '__fpaste_using_command search; or __fpaste_using_command s; or __fpaste_using_command find' -l json -d 'Output as JSON'
# update command options
complete -c fpaste -n '__fpaste_using_command update; or __fpaste_using_command u' -s E -l no-encrypt -d 'Disable encryption'
complete -c fpaste -n '__fpaste_using_command update; or __fpaste_using_command u' -s p -l password -d 'Set/change password' -x
complete -c fpaste -n '__fpaste_using_command update; or __fpaste_using_command u' -l remove-password -d 'Remove password'
complete -c fpaste -n '__fpaste_using_command update; or __fpaste_using_command u' -s x -l expiry -d 'Extend expiry' -x
complete -c fpaste -n '__fpaste_using_command update; or __fpaste_using_command u' -s q -l quiet -d 'Minimal output'
complete -c fpaste -n '__fpaste_using_command update; or __fpaste_using_command u' -F
# export command options
complete -c fpaste -n '__fpaste_using_command export' -s o -l output -d 'Output directory' -r
complete -c fpaste -n '__fpaste_using_command export' -s k -l keyfile -d 'Key file' -r
complete -c fpaste -n '__fpaste_using_command export' -l manifest -d 'Write manifest.json'
complete -c fpaste -n '__fpaste_using_command export' -s q -l quiet -d 'Minimal output'
# register command options
complete -c fpaste -n '__fpaste_using_command register' -s n -l name -d 'Common name' -x
complete -c fpaste -n '__fpaste_using_command register' -s o -l output -d 'Output directory' -r
complete -c fpaste -n '__fpaste_using_command register' -l configure -d 'Update config file'
complete -c fpaste -n '__fpaste_using_command register' -l p12-only -d 'Save only PKCS#12'
complete -c fpaste -n '__fpaste_using_command register' -s f -l force -d 'Overwrite existing files'
complete -c fpaste -n '__fpaste_using_command register' -s q -l quiet -d 'Minimal output'
# cert command options
complete -c fpaste -n '__fpaste_using_command cert' -s o -l output -d 'Output directory' -r
complete -c fpaste -n '__fpaste_using_command cert' -s a -l algorithm -d 'Key algorithm' -x -a 'rsa ec'
complete -c fpaste -n '__fpaste_using_command cert' -s b -l bits -d 'RSA key size' -x
complete -c fpaste -n '__fpaste_using_command cert' -s c -l curve -d 'EC curve' -x -a 'secp256r1 secp384r1 secp521r1'
complete -c fpaste -n '__fpaste_using_command cert' -s d -l days -d 'Validity period' -x
complete -c fpaste -n '__fpaste_using_command cert' -s n -l name -d 'Common name' -x
complete -c fpaste -n '__fpaste_using_command cert' -l password-key -d 'Encrypt private key' -x
complete -c fpaste -n '__fpaste_using_command cert' -l configure -d 'Update config file'
complete -c fpaste -n '__fpaste_using_command cert' -s f -l force -d 'Overwrite existing files'
# pki subcommands
complete -c fpaste -n '__fpaste_using_command pki' -a 'status' -d 'Show PKI status'
complete -c fpaste -n '__fpaste_using_command pki' -a 'issue' -d 'Request certificate from server'
complete -c fpaste -n '__fpaste_using_command pki' -a 'download' -d 'Download CA certificate'
complete -c fpaste -n '__fpaste_using_command pki' -a 'dl' -d 'Download CA certificate'
# pki issue options
complete -c fpaste -n '__fpaste_using_pki_command issue' -s n -l name -d 'Common name' -x
complete -c fpaste -n '__fpaste_using_pki_command issue' -s o -l output -d 'Output directory' -r
complete -c fpaste -n '__fpaste_using_pki_command issue' -l configure -d 'Update config file'
complete -c fpaste -n '__fpaste_using_pki_command issue' -s f -l force -d 'Overwrite existing files'
# pki download options
complete -c fpaste -n '__fpaste_using_pki_command download; or __fpaste_using_pki_command dl' -s o -l output -d 'Save to file' -r
complete -c fpaste -n '__fpaste_using_pki_command download; or __fpaste_using_pki_command dl' -l configure -d 'Update config file'
# completion command options
complete -c fpaste -n '__fpaste_using_command completion' -l shell -d 'Shell type' -x -a 'bash zsh fish'

203
completions/fpaste.zsh Normal file
View File

@@ -0,0 +1,203 @@
#compdef fpaste
# Zsh completion for fpaste
# Install: copy to ~/.zfunc/_fpaste and add 'fpath=(~/.zfunc $fpath)' to ~/.zshrc
_fpaste() {
local curcontext="$curcontext" state line
typeset -A opt_args
_arguments -C \
'-s[Server URL]:url:' \
'--server[Server URL]:url:' \
'-h[Show help]' \
'--help[Show help]' \
'1: :->command' \
'*:: :->args'
case $state in
command)
local commands=(
'create:Create a new paste'
'c:Create a new paste'
'new:Create a new paste'
'get:Retrieve a paste'
'g:Retrieve a paste'
'delete:Delete paste(s)'
'd:Delete paste(s)'
'rm:Delete paste(s)'
'info:Show server info'
'i:Show server info'
'list:List your pastes'
'ls:List your pastes'
'search:Search your pastes'
's:Search your pastes'
'find:Search your pastes'
'update:Update existing paste'
'u:Update existing paste'
'export:Export all pastes'
'register:Register and get certificate'
'cert:Generate client certificate'
'pki:PKI operations'
'completion:Generate shell completion'
)
_describe -t commands 'fpaste commands' commands
;;
args)
case $line[1] in
create|c|new)
_arguments \
'-E[Disable encryption]' \
'--no-encrypt[Disable encryption]' \
'-b[Burn after read]' \
'--burn[Burn after read]' \
'-x[Expiry in seconds]:seconds:' \
'--expiry[Expiry in seconds]:seconds:' \
'-p[Password protect]:password:' \
'--password[Password protect]:password:' \
'-r[Output raw URL]' \
'--raw[Output raw URL]' \
'-q[Output ID only]' \
'--quiet[Output ID only]' \
'-C[Read from clipboard]' \
'--clipboard[Read from clipboard]' \
'--copy-url[Copy result URL to clipboard]' \
'*:file:_files'
;;
get|g)
_arguments \
'-o[Save to file]:file:_files' \
'--output[Save to file]:file:_files' \
'-c[Copy content to clipboard]' \
'--copy[Copy content to clipboard]' \
'-p[Password]:password:' \
'--password[Password]:password:' \
'-m[Show metadata only]' \
'--meta[Show metadata only]' \
'1:paste ID:'
;;
delete|d|rm)
_arguments \
'-a[Delete all pastes]' \
'--all[Delete all pastes]' \
'-c[Confirm count]:count:' \
'--confirm[Confirm count]:count:' \
'*:paste ID:'
;;
info|i)
;;
list|ls)
_arguments \
'-a[List all pastes (admin)]' \
'--all[List all pastes (admin)]' \
'-l[Max pastes]:number:' \
'--limit[Max pastes]:number:' \
'-o[Skip first N pastes]:number:' \
'--offset[Skip first N pastes]:number:' \
'--json[Output as JSON]'
;;
search|s|find)
_arguments \
'-t[Filter by MIME type]:pattern:' \
'--type[Filter by MIME type]:pattern:' \
'--after[Created after]:date:' \
'--before[Created before]:date:' \
'-l[Max results]:number:' \
'--limit[Max results]:number:' \
'--json[Output as JSON]'
;;
update|u)
_arguments \
'-E[Disable encryption]' \
'--no-encrypt[Disable encryption]' \
'-p[Set/change password]:password:' \
'--password[Set/change password]:password:' \
'--remove-password[Remove password]' \
'-x[Extend expiry]:seconds:' \
'--expiry[Extend expiry]:seconds:' \
'-q[Minimal output]' \
'--quiet[Minimal output]' \
'1:paste ID:' \
'*:file:_files'
;;
export)
_arguments \
'-o[Output directory]:directory:_files -/' \
'--output[Output directory]:directory:_files -/' \
'-k[Key file]:file:_files' \
'--keyfile[Key file]:file:_files' \
'--manifest[Write manifest.json]' \
'-q[Minimal output]' \
'--quiet[Minimal output]'
;;
register)
_arguments \
'-n[Common name]:name:' \
'--name[Common name]:name:' \
'-o[Output directory]:directory:_files -/' \
'--output[Output directory]:directory:_files -/' \
'--configure[Update config file]' \
'--p12-only[Save only PKCS#12]' \
'-f[Overwrite existing files]' \
'--force[Overwrite existing files]' \
'-q[Minimal output]' \
'--quiet[Minimal output]'
;;
cert)
_arguments \
'-o[Output directory]:directory:_files -/' \
'--output[Output directory]:directory:_files -/' \
'-a[Key algorithm]:algorithm:(rsa ec)' \
'--algorithm[Key algorithm]:algorithm:(rsa ec)' \
'-b[RSA key size]:bits:' \
'--bits[RSA key size]:bits:' \
'-c[EC curve]:curve:(secp256r1 secp384r1 secp521r1)' \
'--curve[EC curve]:curve:(secp256r1 secp384r1 secp521r1)' \
'-d[Validity period]:days:' \
'--days[Validity period]:days:' \
'-n[Common name]:name:' \
'--name[Common name]:name:' \
'--password-key[Encrypt private key]:password:' \
'--configure[Update config file]' \
'-f[Overwrite existing files]' \
'--force[Overwrite existing files]'
;;
pki)
local pki_commands=(
'status:Show PKI status'
'issue:Request certificate from server'
'download:Download CA certificate'
'dl:Download CA certificate'
)
if (( CURRENT == 2 )); then
_describe -t commands 'pki commands' pki_commands
else
case $line[2] in
issue)
_arguments \
'-n[Common name]:name:' \
'--name[Common name]:name:' \
'-o[Output directory]:directory:_files -/' \
'--output[Output directory]:directory:_files -/' \
'--configure[Update config file]' \
'-f[Overwrite existing files]' \
'--force[Overwrite existing files]'
;;
download|dl)
_arguments \
'-o[Save to file]:file:_files' \
'--output[Save to file]:file:_files' \
'--configure[Update config file]'
;;
esac
fi
;;
completion)
_arguments \
'--shell[Shell type]:shell:(bash zsh fish)'
;;
esac
;;
esac
}
_fpaste "$@"

287
fpaste
View File

@@ -8,7 +8,9 @@ import base64
import hashlib
import json
import os
import shutil
import ssl
import subprocess
import sys
import time
import urllib.error
@@ -431,13 +433,76 @@ def print_paste_list(
print(f"\n{summary}")
# -----------------------------------------------------------------------------
# Clipboard integration
# -----------------------------------------------------------------------------
# Clipboard read commands (tool name, command args)
CLIPBOARD_READ_COMMANDS = [
("xclip", ["xclip", "-selection", "clipboard", "-o"]),
("xsel", ["xsel", "--clipboard", "--output"]),
("pbpaste", ["pbpaste"]),
("powershell.exe", ["powershell.exe", "-command", "Get-Clipboard"]),
("wl-paste", ["wl-paste"]),
]
# Clipboard write commands (tool name, command args)
CLIPBOARD_WRITE_COMMANDS = [
("xclip", ["xclip", "-selection", "clipboard", "-i"]),
("xsel", ["xsel", "--clipboard", "--input"]),
("pbcopy", ["pbcopy"]),
("clip.exe", ["clip.exe"]),
("wl-copy", ["wl-copy"]),
]
def find_clipboard_command(commands: list[tuple[str, list[str]]]) -> list[str] | None:
"""Find first available clipboard command."""
for tool_name, cmd in commands:
if shutil.which(tool_name):
return cmd
return None
def read_clipboard() -> bytes:
"""Read content from system clipboard."""
cmd = find_clipboard_command(CLIPBOARD_READ_COMMANDS)
if not cmd:
die("no clipboard tool found (install xclip, xsel, or wl-paste)")
try:
result = subprocess.run(cmd, capture_output=True, check=True)
return result.stdout
except subprocess.CalledProcessError as e:
die(f"clipboard read failed: {e.stderr.decode(errors='replace')}")
except FileNotFoundError:
die(f"clipboard tool not found: {cmd[0]}")
def write_clipboard(data: bytes) -> None:
"""Write content to system clipboard."""
cmd = find_clipboard_command(CLIPBOARD_WRITE_COMMANDS)
if not cmd:
die("no clipboard tool found (install xclip, xsel, or wl-copy)")
try:
subprocess.run(cmd, input=data, check=True)
except subprocess.CalledProcessError as e:
die(f"clipboard write failed: {e.stderr.decode(errors='replace') if e.stderr else ''}")
except FileNotFoundError:
die(f"clipboard tool not found: {cmd[0]}")
# -----------------------------------------------------------------------------
# Content helpers
# -----------------------------------------------------------------------------
def read_content(file_arg: str | None) -> bytes:
"""Read content from file or stdin."""
def read_content(file_arg: str | None, from_clipboard: bool = False) -> bytes:
"""Read content from file, stdin, or clipboard."""
if from_clipboard:
return read_clipboard()
if file_arg:
if file_arg == "-":
return sys.stdin.buffer.read()
@@ -447,7 +512,7 @@ def read_content(file_arg: str | None) -> bytes:
return path.read_bytes()
if sys.stdin.isatty():
die("no input provided (pipe data or specify file)")
die("no input provided (pipe data, specify file, or use -C for clipboard)")
return sys.stdin.buffer.read()
@@ -504,7 +569,8 @@ def require_auth(config: Mapping[str, Any]) -> None:
def cmd_create(args: argparse.Namespace, config: dict[str, Any]) -> None:
"""Create a new paste."""
content = read_content(args.file)
from_clipboard = getattr(args, "clipboard", False)
content = read_content(args.file, from_clipboard=from_clipboard)
if not content:
die("empty content")
@@ -562,11 +628,19 @@ def cmd_create(args: argparse.Namespace, config: dict[str, Any]) -> None:
base_url = config["server"].rstrip("/")
if args.raw:
print(base_url + data["raw"] + key_fragment)
result_url = base_url + data["raw"] + key_fragment
elif args.quiet:
print(data["id"] + key_fragment)
result_url = data["id"] + key_fragment
else:
print(base_url + data["url"] + key_fragment)
result_url = base_url + data["url"] + key_fragment
print(result_url)
# Copy URL to clipboard if requested
if getattr(args, "copy_url", False):
write_clipboard(result_url.encode())
if not args.quiet:
print("(copied to clipboard)", file=sys.stderr)
return
last_error = parse_error(body, body.decode(errors="replace"))
@@ -619,7 +693,11 @@ def cmd_get(args: argparse.Namespace, config: dict[str, Any]) -> None:
if encryption_key:
body = decrypt_content(body, encryption_key)
if args.output:
# Copy to clipboard if requested
if getattr(args, "copy", False):
write_clipboard(body)
print("(copied to clipboard)", file=sys.stderr)
elif args.output:
Path(args.output).write_bytes(body)
print(f"saved: {args.output}", file=sys.stderr)
else:
@@ -1323,11 +1401,14 @@ def build_parser() -> argparse.ArgumentParser:
p_create.add_argument("-p", "--password", metavar="PASS", help="password protect")
p_create.add_argument("-r", "--raw", action="store_true", help="output raw URL")
p_create.add_argument("-q", "--quiet", action="store_true", help="output ID only")
p_create.add_argument("-C", "--clipboard", action="store_true", help="read from clipboard")
p_create.add_argument("--copy-url", action="store_true", help="copy result URL to clipboard")
# get
p_get = subparsers.add_parser("get", aliases=["g"], help="retrieve paste")
p_get.add_argument("id", help="paste ID or URL")
p_get.add_argument("-o", "--output", help="save to file")
p_get.add_argument("-c", "--copy", action="store_true", help="copy content to clipboard")
p_get.add_argument("-p", "--password", metavar="PASS", help="password for protected paste")
p_get.add_argument("-m", "--meta", action="store_true", help="show metadata only")
@@ -1419,9 +1500,198 @@ def build_parser() -> argparse.ArgumentParser:
"--configure", action="store_true", help="update config file (requires -o)"
)
# completion
p_completion = subparsers.add_parser("completion", help="generate shell completion")
p_completion.add_argument(
"--shell", choices=["bash", "zsh", "fish"], default="bash", help="shell type"
)
return parser
def cmd_completion(args: argparse.Namespace, config: dict[str, Any]) -> None:
"""Output shell completion script."""
shell = getattr(args, "shell", None) or "bash"
# Bash completion - full featured
bash_completion = """\
# Bash completion for fpaste
# Install: source this file or copy to /etc/bash_completion.d/fpaste
_fpaste_completions() {
local cur prev words cword
_init_completion || return
local commands="create c new get g delete d rm info i list ls"
commands+=" search s find update u export register cert pki completion"
local pki_commands="status issue download dl"
if [[ $cword -eq 1 ]]; then
COMPREPLY=($(compgen -W "$commands" -- "$cur"))
return
fi
local cmd="${words[1]}"
if [[ "$cmd" == "pki" && $cword -eq 2 ]]; then
COMPREPLY=($(compgen -W "$pki_commands" -- "$cur"))
return
fi
case "$cmd" in
create|c|new)
local opts="-E --no-encrypt -b --burn -x --expiry -p --password"
opts+=" -r --raw -q --quiet -C --clipboard --copy-url"
[[ "$cur" == -* ]] && COMPREPLY=($(compgen -W "$opts" -- "$cur")) || _filedir
;;
get|g)
local opts="-o --output -c --copy -p --password -m --meta"
[[ "$cur" == -* ]] && COMPREPLY=($(compgen -W "$opts" -- "$cur"))
;;
delete|d|rm)
[[ "$cur" == -* ]] && COMPREPLY=($(compgen -W "-a --all -c --confirm" -- "$cur"))
;;
list|ls)
local opts="-a --all -l --limit -o --offset --json"
[[ "$cur" == -* ]] && COMPREPLY=($(compgen -W "$opts" -- "$cur"))
;;
search|s|find)
local opts="-t --type --after --before -l --limit --json"
[[ "$cur" == -* ]] && COMPREPLY=($(compgen -W "$opts" -- "$cur"))
;;
update|u)
local opts="-E --no-encrypt -p --password --remove-password -x --expiry -q --quiet"
[[ "$cur" == -* ]] && COMPREPLY=($(compgen -W "$opts" -- "$cur")) || _filedir
;;
export)
local opts="-o --output -k --keyfile --manifest -q --quiet"
[[ "$cur" == -* ]] && COMPREPLY=($(compgen -W "$opts" -- "$cur"))
;;
register)
local opts="-n --name -o --output --configure --p12-only -f --force -q --quiet"
[[ "$cur" == -* ]] && COMPREPLY=($(compgen -W "$opts" -- "$cur"))
;;
cert)
local opts="-o --output -a --algorithm -b --bits -c --curve"
opts+=" -d --days -n --name --password-key --configure -f --force"
[[ "$cur" == -* ]] && COMPREPLY=($(compgen -W "$opts" -- "$cur"))
;;
completion)
[[ "$cur" == -* ]] && COMPREPLY=($(compgen -W "--shell" -- "$cur"))
[[ "$prev" == "--shell" ]] && COMPREPLY=($(compgen -W "bash zsh fish" -- "$cur"))
;;
esac
}
complete -F _fpaste_completions fpaste
"""
# Zsh completion - compact format
zsh_completion = """\
#compdef fpaste
_fpaste() {
local curcontext="$curcontext" state line
typeset -A opt_args
_arguments -C '-s[Server]:url:' '--server[Server]:url:' '1: :->cmd' '*:: :->args'
case $state in
cmd)
local cmds=('create:Create paste' 'get:Get paste' 'delete:Delete'
'info:Server info' 'list:List pastes' 'search:Search'
'update:Update paste' 'export:Export' 'register:Register'
'cert:Generate cert' 'pki:PKI ops' 'completion:Completions')
_describe -t commands 'commands' cmds
;;
args)
case $line[1] in
create|c|new)
_arguments '-E[No encrypt]' '-b[Burn]' '-x[Expiry]:sec:' \\
'-p[Pass]:p:' '-r[Raw]' '-q[Quiet]' '-C[Clipboard]' \\
'--copy-url' '*:file:_files' ;;
get|g)
_arguments '-o[Out]:f:_files' '-c[Copy]' '-p[Pass]:p:' \\
'-m[Meta]' '1:ID:' ;;
delete|d|rm) _arguments '-a[All]' '-c[Confirm]:n:' '*:ID:' ;;
list|ls) _arguments '-a[All]' '-l[Limit]:n:' '-o[Off]:n:' '--json' ;;
search|s|find)
_arguments '-t[Type]:p:' '--after:d:' '--before:d:' \\
'-l[Limit]:n:' '--json' ;;
update|u)
_arguments '-E[No encrypt]' '-p[Pass]:p:' '--remove-password' \\
'-x[Expiry]:s:' '-q[Quiet]' '1:ID:' '*:file:_files' ;;
export)
_arguments '-o[Dir]:d:_files -/' '-k[Keys]:f:_files' \\
'--manifest' '-q[Quiet]' ;;
register)
_arguments '-n[Name]:cn:' '-o[Dir]:d:_files -/' '--configure' \\
'--p12-only' '-f[Force]' '-q[Quiet]' ;;
cert)
_arguments '-o[Dir]:d:_files -/' '-a[Algo]:(rsa ec)' \\
'-b[Bits]:n:' '-c[Curve]:(secp256r1 secp384r1 secp521r1)' \\
'-d[Days]:n:' '-n[Name]:cn:' '--configure' '-f[Force]' ;;
pki) (( CURRENT == 2 )) && _describe 'cmd' '(status issue download)' ;;
completion) _arguments '--shell:(bash zsh fish)' ;;
esac ;;
esac
}
_fpaste "$@"
"""
# Fish completion - compact
fish_completion = """\
# Fish completion for fpaste
complete -c fpaste -f
complete -c fpaste -n __fish_use_subcommand -a 'create c new' -d 'Create'
complete -c fpaste -n __fish_use_subcommand -a 'get g' -d 'Get paste'
complete -c fpaste -n __fish_use_subcommand -a 'delete d rm' -d 'Delete'
complete -c fpaste -n __fish_use_subcommand -a 'info i' -d 'Server info'
complete -c fpaste -n __fish_use_subcommand -a 'list ls' -d 'List'
complete -c fpaste -n __fish_use_subcommand -a 'search s find' -d 'Search'
complete -c fpaste -n __fish_use_subcommand -a 'update u' -d 'Update'
complete -c fpaste -n __fish_use_subcommand -a 'export' -d 'Export'
complete -c fpaste -n __fish_use_subcommand -a 'register' -d 'Register'
complete -c fpaste -n __fish_use_subcommand -a 'cert' -d 'Gen cert'
complete -c fpaste -n __fish_use_subcommand -a 'pki' -d 'PKI'
complete -c fpaste -n __fish_use_subcommand -a 'completion' -d 'Completions'
set -l cr '__fish_seen_subcommand_from create c new'
complete -c fpaste -n $cr -s E -l no-encrypt -d 'No encrypt'
complete -c fpaste -n $cr -s b -l burn -d 'Burn'
complete -c fpaste -n $cr -s x -l expiry -d 'Expiry' -x
complete -c fpaste -n $cr -s p -l password -d 'Password' -x
complete -c fpaste -n $cr -s r -l raw -d 'Raw URL'
complete -c fpaste -n $cr -s q -l quiet -d 'Quiet'
complete -c fpaste -n $cr -s C -l clipboard -d 'Clipboard'
complete -c fpaste -n $cr -l copy-url -d 'Copy URL'
complete -c fpaste -n $cr -F
set -l gt '__fish_seen_subcommand_from get g'
complete -c fpaste -n $gt -s o -l output -d 'Output' -r
complete -c fpaste -n $gt -s c -l copy -d 'Copy'
complete -c fpaste -n $gt -s p -l password -d 'Password' -x
complete -c fpaste -n $gt -s m -l meta -d 'Metadata'
set -l dl '__fish_seen_subcommand_from delete d rm'
complete -c fpaste -n $dl -s a -l all -d 'All'
complete -c fpaste -n $dl -s c -l confirm -d 'Confirm' -x
set -l ls '__fish_seen_subcommand_from list ls'
complete -c fpaste -n $ls -s a -l all -d 'All'
complete -c fpaste -n $ls -s l -l limit -d 'Limit' -x
complete -c fpaste -n $ls -s o -l offset -d 'Offset' -x
complete -c fpaste -n $ls -l json -d 'JSON'
set -l cp '__fish_seen_subcommand_from completion'
complete -c fpaste -n $cp -l shell -d 'Shell' -xa 'bash zsh fish'
"""
completions = {"bash": bash_completion, "zsh": zsh_completion, "fish": fish_completion}
if shell not in completions:
die(f"unsupported shell: {shell} (use: bash, zsh, fish)")
print(completions[shell])
# Command dispatch table
COMMANDS: dict[str, Any] = {
"create": cmd_create,
@@ -1444,6 +1714,7 @@ COMMANDS: dict[str, Any] = {
"export": cmd_export,
"register": cmd_register,
"cert": cmd_cert,
"completion": cmd_completion,
}
PKI_COMMANDS: dict[str, Any] = {

312
tests/test_audit.py Normal file
View File

@@ -0,0 +1,312 @@
"""Tests for audit logging system."""
from __future__ import annotations
import time
from typing import TYPE_CHECKING
import pytest
from app import create_app
from app.audit import AuditEvent, AuditOutcome, cleanup_old_audit_logs, log_event, query_audit_log
from app.database import get_db
if TYPE_CHECKING:
from flask import Flask
from flask.testing import FlaskClient
@pytest.fixture
def app() -> Flask:
"""Create application configured for testing."""
app = create_app("testing")
app.config["POW_DIFFICULTY"] = 0
app.config["RATE_LIMIT_ENABLED"] = False
app.config["AUDIT_ENABLED"] = True
app.config["AUDIT_RETENTION_DAYS"] = 90
return app
@pytest.fixture
def client(app: Flask) -> FlaskClient:
"""Create test client."""
return app.test_client()
@pytest.fixture
def admin_client(app: Flask) -> FlaskClient:
"""Create test client with admin authentication."""
client = app.test_client()
fingerprint = "b" * 40
client.environ_base["HTTP_X_SSL_CLIENT_SHA1"] = fingerprint
with app.app_context():
app.config["PKI_ENABLED"] = True
db = get_db()
now = int(time.time())
db.execute(
"""INSERT OR REPLACE INTO certificate_authority
(id, common_name, certificate_pem, private_key_encrypted,
key_salt, created_at, expires_at, key_algorithm)
VALUES ('default', 'Test CA', 'CERT', X'00', X'00', ?, ?, 'rsa')""",
(now, now + 86400 * 365),
)
db.execute(
"""INSERT OR REPLACE INTO issued_certificates
(serial, ca_id, common_name, fingerprint_sha1, certificate_pem,
created_at, expires_at, status, is_admin)
VALUES (?, 'default', 'admin', ?, 'CERT', ?, ?, 'valid', 1)""",
("admin001", fingerprint, now, now + 86400),
)
db.commit()
return client
class TestAuditLogging:
"""Test audit logging functions."""
def test_log_event_creates_record(self, app: Flask) -> None:
"""log_event should create audit log entry."""
with app.app_context():
log_event(
AuditEvent.PASTE_CREATE,
AuditOutcome.SUCCESS,
paste_id="abc123",
client_id="a" * 40,
client_ip="192.168.1.1",
details={"size": 1024},
)
entries, total = query_audit_log(paste_id="abc123")
assert total == 1
assert len(entries) == 1
assert entries[0]["event_type"] == "paste_create"
assert entries[0]["outcome"] == "success"
assert entries[0]["paste_id"] == "abc123"
assert entries[0]["client_id"] == "a" * 40
assert entries[0]["client_ip"] == "192.168.1.1"
assert entries[0]["details"]["size"] == 1024
def test_log_event_with_string_types(self, app: Flask) -> None:
"""log_event should accept string types as well as enums."""
with app.app_context():
log_event(
"custom_event",
"custom_outcome",
paste_id="xyz789",
)
entries, total = query_audit_log(event_type="custom_event")
assert total == 1
assert entries[0]["event_type"] == "custom_event"
assert entries[0]["outcome"] == "custom_outcome"
def test_log_event_disabled(self, app: Flask) -> None:
"""log_event should skip logging when disabled."""
app.config["AUDIT_ENABLED"] = False
with app.app_context():
log_event(
AuditEvent.PASTE_CREATE,
AuditOutcome.SUCCESS,
paste_id="disabled123",
)
_entries, total = query_audit_log(paste_id="disabled123")
assert total == 0
class TestAuditQuery:
"""Test audit log query functionality."""
def test_query_by_event_type(self, app: Flask) -> None:
"""Query should filter by event type."""
with app.app_context():
log_event(AuditEvent.PASTE_CREATE, AuditOutcome.SUCCESS, paste_id="evt_p1")
log_event(AuditEvent.PASTE_DELETE, AuditOutcome.SUCCESS, paste_id="evt_p2")
log_event(AuditEvent.PASTE_CREATE, AuditOutcome.SUCCESS, paste_id="evt_p3")
entries, _total = query_audit_log(event_type="paste_create")
# Verify entries matching our test are present
our_entries = [e for e in entries if e.get("paste_id", "").startswith("evt_")]
assert len(our_entries) == 2
assert all(e["event_type"] == "paste_create" for e in entries)
def test_query_by_client_id(self, app: Flask) -> None:
"""Query should filter by client ID."""
with app.app_context():
# Use unique client IDs for this test
client_a = "1" * 40
client_b = "2" * 40
log_event(
AuditEvent.PASTE_CREATE, AuditOutcome.SUCCESS, client_id=client_a, paste_id="cid_p1"
)
log_event(
AuditEvent.PASTE_CREATE, AuditOutcome.SUCCESS, client_id=client_b, paste_id="cid_p2"
)
entries, total = query_audit_log(client_id=client_a)
assert total >= 1
assert all(e["client_id"] == client_a for e in entries)
def test_query_by_outcome(self, app: Flask) -> None:
"""Query should filter by outcome."""
with app.app_context():
log_event(AuditEvent.RATE_LIMIT, AuditOutcome.BLOCKED, paste_id="out_test")
log_event(AuditEvent.PASTE_CREATE, AuditOutcome.SUCCESS)
log_event(AuditEvent.POW_FAILURE, AuditOutcome.FAILURE)
entries, total = query_audit_log(outcome="blocked")
assert total >= 1
assert all(e["outcome"] == "blocked" for e in entries)
def test_query_by_timestamp_range(self, app: Flask) -> None:
"""Query should filter by timestamp range."""
with app.app_context():
now = int(time.time())
log_event(AuditEvent.PASTE_CREATE, AuditOutcome.SUCCESS, paste_id="recent")
entries, total = query_audit_log(since=now - 60, until=now + 60)
assert total >= 1
assert any(e["paste_id"] == "recent" for e in entries)
def test_query_pagination(self, app: Flask) -> None:
"""Query should support pagination."""
with app.app_context():
for i in range(5):
log_event(AuditEvent.PASTE_CREATE, AuditOutcome.SUCCESS, paste_id=f"p{i}")
entries1, total = query_audit_log(limit=2, offset=0)
entries2, _ = query_audit_log(limit=2, offset=2)
assert total >= 5
assert len(entries1) == 2
assert len(entries2) == 2
assert entries1[0]["paste_id"] != entries2[0]["paste_id"]
class TestAuditCleanup:
"""Test audit log cleanup functionality."""
def test_cleanup_old_logs(self, app: Flask) -> None:
"""Cleanup should delete old entries."""
with app.app_context():
db = get_db()
old_ts = int(time.time()) - (100 * 24 * 60 * 60) # 100 days ago
# Insert old entry directly
db.execute(
"""INSERT INTO audit_log
(timestamp, event_type, outcome)
VALUES (?, 'old_event', 'success')""",
(old_ts,),
)
db.commit()
# Insert recent entry
log_event(AuditEvent.PASTE_CREATE, AuditOutcome.SUCCESS, paste_id="recent")
# Cleanup with 90 day retention
deleted = cleanup_old_audit_logs(retention_days=90)
assert deleted == 1
entries, _total = query_audit_log()
assert all(e["event_type"] != "old_event" for e in entries)
class TestAuditEndpoint:
"""Test GET /audit endpoint."""
def test_audit_requires_auth(self, client: FlaskClient) -> None:
"""Audit endpoint should require authentication."""
response = client.get("/audit")
assert response.status_code == 401
def test_audit_requires_admin(self, app: Flask) -> None:
"""Audit endpoint should require admin access."""
client = app.test_client()
client.environ_base["HTTP_X_SSL_CLIENT_SHA1"] = "c" * 40 # Non-admin
response = client.get("/audit")
assert response.status_code == 403
def test_audit_admin_access(self, admin_client: FlaskClient, app: Flask) -> None:
"""Admin should be able to query audit log."""
with app.app_context():
log_event(AuditEvent.PASTE_CREATE, AuditOutcome.SUCCESS, paste_id="test123")
response = admin_client.get("/audit")
assert response.status_code == 200
data = response.get_json()
assert "entries" in data
assert "total" in data
assert "count" in data
def test_audit_query_params(self, admin_client: FlaskClient, app: Flask) -> None:
"""Audit endpoint should accept query parameters."""
with app.app_context():
log_event(AuditEvent.PASTE_CREATE, AuditOutcome.SUCCESS, paste_id="filtered")
log_event(AuditEvent.PASTE_DELETE, AuditOutcome.SUCCESS, paste_id="other")
response = admin_client.get("/audit?event_type=paste_create")
assert response.status_code == 200
data = response.get_json()
assert all(e["event_type"] == "paste_create" for e in data["entries"])
class TestAuditIntegration:
"""Test audit logging integration with paste operations."""
def test_paste_create_logs_event(self, client: FlaskClient, app: Flask) -> None:
"""Paste creation should log audit event."""
response = client.post("/", data=b"test content")
assert response.status_code == 201
paste_id = response.get_json()["id"]
with app.app_context():
entries, _ = query_audit_log(paste_id=paste_id, event_type="paste_create")
assert len(entries) == 1
assert entries[0]["outcome"] == "success"
def test_paste_access_logs_event(self, client: FlaskClient, app: Flask) -> None:
"""Paste access should log audit event."""
# Create paste
create_response = client.post("/", data=b"access test")
paste_id = create_response.get_json()["id"]
# Access paste
client.get(f"/{paste_id}/raw")
with app.app_context():
entries, _ = query_audit_log(paste_id=paste_id, event_type="paste_access")
assert len(entries) == 1
assert entries[0]["outcome"] == "success"
def test_rate_limit_logs_event(self, app: Flask) -> None:
"""Rate limit block should log audit event."""
app.config["RATE_LIMIT_ENABLED"] = True
app.config["RATE_LIMIT_WINDOW"] = 60
app.config["RATE_LIMIT_MAX"] = 1
client = app.test_client()
# First request should succeed
client.post("/", data=b"first")
# Second request should be rate limited
response = client.post("/", data=b"second")
assert response.status_code == 429
with app.app_context():
entries, _ = query_audit_log(event_type="rate_limit")
assert len(entries) >= 1
assert entries[0]["outcome"] == "blocked"

179
tests/test_metrics.py Normal file
View File

@@ -0,0 +1,179 @@
"""Tests for Prometheus metrics module."""
from __future__ import annotations
from typing import TYPE_CHECKING
from unittest.mock import patch
import pytest
from app import create_app
from app.metrics import (
record_dedup,
record_paste_accessed,
record_paste_created,
record_paste_deleted,
record_pow,
record_rate_limit,
setup_custom_metrics,
)
if TYPE_CHECKING:
from flask import Flask
from flask.testing import FlaskClient
@pytest.fixture
def app() -> Flask:
"""Create application configured for testing."""
app = create_app("testing")
app.config["POW_DIFFICULTY"] = 0
app.config["RATE_LIMIT_ENABLED"] = False
return app
@pytest.fixture
def client(app: Flask) -> FlaskClient:
"""Create test client."""
return app.test_client()
class TestMetricsFunctions:
"""Test that metrics functions execute without error."""
def test_record_paste_created_no_error(self) -> None:
"""record_paste_created should not raise when counters not initialized."""
# In testing mode, counters are None, should gracefully no-op
record_paste_created("anonymous", "success")
record_paste_created("authenticated", "success")
record_paste_created("anonymous", "failure")
def test_record_paste_accessed_no_error(self) -> None:
"""record_paste_accessed should not raise when counters not initialized."""
record_paste_accessed("anonymous", False)
record_paste_accessed("authenticated", True)
def test_record_paste_deleted_no_error(self) -> None:
"""record_paste_deleted should not raise when counters not initialized."""
record_paste_deleted("authenticated", "success")
record_paste_deleted("authenticated", "failure")
def test_record_rate_limit_no_error(self) -> None:
"""record_rate_limit should not raise when counters not initialized."""
record_rate_limit("allowed")
record_rate_limit("blocked")
def test_record_pow_no_error(self) -> None:
"""record_pow should not raise when counters not initialized."""
record_pow("success")
record_pow("failure")
def test_record_dedup_no_error(self) -> None:
"""record_dedup should not raise when counters not initialized."""
record_dedup("allowed")
record_dedup("blocked")
class TestMetricsSetup:
"""Test metrics initialization."""
def test_setup_skipped_in_testing(self, app: Flask) -> None:
"""setup_custom_metrics should skip initialization in testing mode."""
with app.app_context():
# Should not raise even though prometheus_client is available
setup_custom_metrics(app)
def test_setup_handles_missing_prometheus(self, app: Flask) -> None:
"""setup_custom_metrics should handle missing prometheus_client gracefully."""
app.config["TESTING"] = False
with app.app_context(), patch.dict("sys.modules", {"prometheus_client": None}):
# Should not raise, just log warning
setup_custom_metrics(app)
class TestMetricsIntegration:
"""Test metrics are recorded during API operations."""
def test_paste_create_records_metric(self, client: FlaskClient) -> None:
"""Paste creation should call record_paste_created."""
with patch("app.api.routes.record_paste_created") as mock_record:
response = client.post("/", data=b"test content")
assert response.status_code == 201
mock_record.assert_called_once_with("anonymous", "success")
def test_paste_access_records_metric(self, client: FlaskClient) -> None:
"""Paste access should call record_paste_accessed."""
# Create paste first
create_response = client.post("/", data=b"access test")
paste_id = create_response.get_json()["id"]
with patch("app.api.routes.record_paste_accessed") as mock_record:
response = client.get(f"/{paste_id}/raw")
assert response.status_code == 200
mock_record.assert_called_once_with("anonymous", False)
def test_burn_after_read_records_metric(self, client: FlaskClient) -> None:
"""Burn-after-read access should record burn=True."""
# Create burn-after-read paste
create_response = client.post(
"/",
data=b"burn test",
headers={"X-Burn-After-Read": "true"},
)
paste_id = create_response.get_json()["id"]
with patch("app.api.routes.record_paste_accessed") as mock_record:
response = client.get(f"/{paste_id}/raw")
assert response.status_code == 200
mock_record.assert_called_once_with("anonymous", True)
def test_dedup_allowed_records_metric(self, client: FlaskClient) -> None:
"""Successful paste creation should call record_dedup with 'allowed'."""
with patch("app.api.routes.record_dedup") as mock_record:
response = client.post("/", data=b"unique content for dedup test")
assert response.status_code == 201
mock_record.assert_called_once_with("allowed")
def test_pow_success_records_metric(self, app: Flask) -> None:
"""Successful PoW verification should call record_pow with 'success'."""
app.config["POW_DIFFICULTY"] = 8
client = app.test_client()
# Get a challenge
challenge_resp = client.get("/challenge")
challenge = challenge_resp.get_json()
# Solve it (brute force for testing)
import hashlib
nonce = challenge["nonce"]
difficulty = challenge["difficulty"]
token = challenge["token"]
solution = 0
while True:
work = f"{nonce}:{solution}".encode()
hash_bytes = hashlib.sha256(work).digest()
zero_bits = 0
for byte in hash_bytes:
if byte == 0:
zero_bits += 8
else:
zero_bits += 8 - byte.bit_length()
break
if zero_bits >= difficulty:
break
solution += 1
with patch("app.api.routes.record_pow") as mock_record:
response = client.post(
"/",
data=b"pow test content",
headers={
"X-PoW-Token": token,
"X-PoW-Solution": str(solution),
},
)
assert response.status_code == 201
mock_record.assert_called_with("success")