fix: add comprehensive type annotations for mypy

- database.py: add type hints for Path, Flask, Any, BaseException
- pki.py: add assertions to narrow Optional types after has_ca() checks
- routes.py: annotate config values to avoid Any return types
- api/__init__.py: use float for cleanup timestamps (time.time())
- __init__.py: remove unused return from setup_rate_limiting
This commit is contained in:
Username
2025-12-22 19:11:11 +01:00
parent 680b068c00
commit ca9342e92d
5 changed files with 36 additions and 20 deletions

View File

@@ -9,10 +9,10 @@ bp = Blueprint("api", __name__)
# Thread-safe cleanup scheduling
_cleanup_lock = threading.Lock()
_cleanup_times = {
"pastes": 0,
"hashes": 0,
"rate_limits": 0,
_cleanup_times: dict[str, float] = {
"pastes": 0.0,
"hashes": 0.0,
"rate_limits": 0.0,
}
_CLEANUP_INTERVALS = {
"pastes": 3600, # 1 hour
@@ -25,7 +25,7 @@ def reset_cleanup_times() -> None:
"""Reset cleanup timestamps. For testing only."""
with _cleanup_lock:
for key in _cleanup_times:
_cleanup_times[key] = 0
_cleanup_times[key] = 0.0
@bp.before_request

View File

@@ -64,11 +64,12 @@ _antiflood_last_increase: float = 0 # Last time difficulty was increased
def get_dynamic_difficulty() -> int:
"""Get current PoW difficulty including anti-flood adjustment."""
base = current_app.config["POW_DIFFICULTY"]
base: int = current_app.config["POW_DIFFICULTY"]
if base == 0 or not current_app.config.get("ANTIFLOOD_ENABLED", True):
return base
with _antiflood_lock:
return min(base + _antiflood_difficulty, current_app.config["ANTIFLOOD_MAX"])
max_diff: int = current_app.config["ANTIFLOOD_MAX"]
return min(base + _antiflood_difficulty, max_diff)
def record_antiflood_request() -> None:
@@ -236,7 +237,8 @@ def error_response(message: str, status: int, **extra: Any) -> Response:
def url_prefix() -> str:
"""Get configured URL prefix for reverse proxy deployments."""
return current_app.config.get("URL_PREFIX", "")
prefix: str = current_app.config.get("URL_PREFIX", "")
return prefix
def prefixed_url(path: str) -> str:
@@ -403,7 +405,7 @@ def is_admin() -> bool:
def get_pow_secret() -> bytes:
"""Get or generate PoW signing secret."""
global _pow_secret_cache
configured = current_app.config.get("POW_SECRET", "")
configured: str = current_app.config.get("POW_SECRET", "")
if configured:
return configured.encode()
if _pow_secret_cache is None:
@@ -1056,6 +1058,7 @@ class RegisterView(MethodView):
# Load CA cert for PKCS#12 (reuse ca_info from above, or refresh if it was just generated)
if ca_info is None or "certificate_pem" not in ca_info:
ca_info = get_ca_info(skip_enabled_check=True)
assert ca_info is not None # CA was just generated or exists
ca_cert = x509.load_pem_x509_certificate(ca_info["certificate_pem"].encode())
client_cert = x509.load_pem_x509_certificate(cert_info["certificate_pem"].encode())
client_key = serialization.load_pem_private_key(