httpd: add rate limiting and security headers
All checks were successful
CI / syntax-check (push) Successful in 3s
CI / memory-leak-check (push) Successful in 11s

This commit is contained in:
Username
2025-12-26 19:52:45 +01:00
parent 59fe2c4a14
commit ad89eb262e

View File

@@ -10,9 +10,16 @@ import time
import os
import gc
import sys
from collections import defaultdict
import mysqlite
from misc import _log
# Rate limiting configuration
_rate_limits = defaultdict(list)
_rate_lock = threading.Lock()
_rate_limit_requests = 60 # requests per window
_rate_limit_window = 60 # window in seconds
# Static directories (relative to this file)
_STATIC_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'static')
_STATIC_LIB_DIR = os.path.join(_STATIC_DIR, 'lib')
@@ -74,6 +81,40 @@ def is_localhost(ip):
return False
def check_rate_limit(ip):
"""Check if IP is within rate limit. Returns True if allowed."""
if is_localhost(ip):
return True # No rate limit for localhost
now = time.time()
with _rate_lock:
# Clean old entries
_rate_limits[ip] = [t for t in _rate_limits[ip] if now - t < _rate_limit_window]
if len(_rate_limits[ip]) >= _rate_limit_requests:
return False
_rate_limits[ip].append(now)
return True
def get_security_headers(content_type):
"""Return security headers for responses."""
headers = [
('X-Content-Type-Options', 'nosniff'),
('X-Frame-Options', 'DENY'),
('Referrer-Policy', 'strict-origin-when-cross-origin'),
]
# Add CSP for HTML pages
if 'text/html' in content_type:
headers.append((
'Content-Security-Policy',
"default-src 'self'; "
"script-src 'self' 'unsafe-inline'; "
"style-src 'self' 'unsafe-inline'; "
"img-src 'self' data: https:; "
"connect-src 'self'"
))
return headers
def get_system_stats():
"""Collect system resource statistics."""
stats = {}
@@ -355,8 +396,9 @@ class ProxyAPIHandler(BaseHTTPServer.BaseHTTPRequestHandler):
self.send_header('Content-Type', content_type)
self.send_header('Content-Length', len(body))
self.send_header('Cache-Control', 'no-cache')
if content_type == 'application/json':
self.send_header('Access-Control-Allow-Origin', '*')
# Add security headers
for header, value in get_security_headers(content_type):
self.send_header(header, value)
self.end_headers()
self.wfile.write(body)
@@ -376,6 +418,18 @@ class ProxyAPIHandler(BaseHTTPServer.BaseHTTPRequestHandler):
self.send_response_body(js, 'application/javascript; charset=utf-8', status)
def do_GET(self):
# Rate limiting check
client_ip = self.client_address[0] if self.client_address else ''
if not check_rate_limit(client_ip):
self.send_response(429)
self.send_header('Content-Type', 'application/json')
self.send_header('Retry-After', str(_rate_limit_window))
for header, value in get_security_headers('application/json'):
self.send_header(header, value)
self.end_headers()
self.wfile.write(json.dumps({'error': 'rate limited', 'retry_after': _rate_limit_window}))
return
path = self.path.split('?')[0]
routes = {
'/': self.handle_index,
@@ -572,14 +626,26 @@ class ProxyAPIServer(threading.Thread):
"""WSGI application wrapper for gevent."""
path = environ.get('PATH_INFO', '/').split('?')[0]
method = environ.get('REQUEST_METHOD', 'GET')
remote_addr = environ.get('REMOTE_ADDR', '')
if method != 'GET':
start_response('405 Method Not Allowed', [('Content-Type', 'text/plain')])
return [b'Method not allowed']
# Rate limiting check
if not check_rate_limit(remote_addr):
error_body = json.dumps({'error': 'rate limited', 'retry_after': _rate_limit_window})
headers = [
('Content-Type', 'application/json'),
('Content-Length', str(len(error_body))),
('Retry-After', str(_rate_limit_window)),
]
headers.extend(get_security_headers('application/json'))
start_response('429 Too Many Requests', headers)
return [error_body]
# Route handling
try:
remote_addr = environ.get('REMOTE_ADDR', '')
response_body, content_type, status = self._handle_route(path, remote_addr)
status_line = '%d %s' % (status, 'OK' if status == 200 else 'Error')
headers = [
@@ -587,16 +653,17 @@ class ProxyAPIServer(threading.Thread):
('Content-Length', str(len(response_body))),
('Cache-Control', 'no-cache'),
]
if content_type == 'application/json':
headers.append(('Access-Control-Allow-Origin', '*'))
headers.extend(get_security_headers(content_type))
start_response(status_line, headers)
return [response_body.encode('utf-8') if isinstance(response_body, unicode) else response_body]
except Exception as e:
error_body = json.dumps({'error': str(e)})
start_response('500 Internal Server Error', [
headers = [
('Content-Type', 'application/json'),
('Content-Length', str(len(error_body))),
])
]
headers.extend(get_security_headers('application/json'))
start_response('500 Internal Server Error', headers)
return [error_body]
def _handle_route(self, path, remote_addr=''):