From ad89eb262eadb8dd8c0b9820c3b8c9796751890c Mon Sep 17 00:00:00 2001 From: Username Date: Fri, 26 Dec 2025 19:52:45 +0100 Subject: [PATCH] httpd: add rate limiting and security headers --- httpd.py | 81 +++++++++++++++++++++++++++++++++++++++++++++++++++----- 1 file changed, 74 insertions(+), 7 deletions(-) diff --git a/httpd.py b/httpd.py index 494790f..6683183 100644 --- a/httpd.py +++ b/httpd.py @@ -10,9 +10,16 @@ import time import os import gc import sys +from collections import defaultdict import mysqlite from misc import _log +# Rate limiting configuration +_rate_limits = defaultdict(list) +_rate_lock = threading.Lock() +_rate_limit_requests = 60 # requests per window +_rate_limit_window = 60 # window in seconds + # Static directories (relative to this file) _STATIC_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'static') _STATIC_LIB_DIR = os.path.join(_STATIC_DIR, 'lib') @@ -74,6 +81,40 @@ def is_localhost(ip): return False +def check_rate_limit(ip): + """Check if IP is within rate limit. Returns True if allowed.""" + if is_localhost(ip): + return True # No rate limit for localhost + now = time.time() + with _rate_lock: + # Clean old entries + _rate_limits[ip] = [t for t in _rate_limits[ip] if now - t < _rate_limit_window] + if len(_rate_limits[ip]) >= _rate_limit_requests: + return False + _rate_limits[ip].append(now) + return True + + +def get_security_headers(content_type): + """Return security headers for responses.""" + headers = [ + ('X-Content-Type-Options', 'nosniff'), + ('X-Frame-Options', 'DENY'), + ('Referrer-Policy', 'strict-origin-when-cross-origin'), + ] + # Add CSP for HTML pages + if 'text/html' in content_type: + headers.append(( + 'Content-Security-Policy', + "default-src 'self'; " + "script-src 'self' 'unsafe-inline'; " + "style-src 'self' 'unsafe-inline'; " + "img-src 'self' data: https:; " + "connect-src 'self'" + )) + return headers + + def get_system_stats(): """Collect system resource statistics.""" stats = {} @@ -355,8 +396,9 @@ class ProxyAPIHandler(BaseHTTPServer.BaseHTTPRequestHandler): self.send_header('Content-Type', content_type) self.send_header('Content-Length', len(body)) self.send_header('Cache-Control', 'no-cache') - if content_type == 'application/json': - self.send_header('Access-Control-Allow-Origin', '*') + # Add security headers + for header, value in get_security_headers(content_type): + self.send_header(header, value) self.end_headers() self.wfile.write(body) @@ -376,6 +418,18 @@ class ProxyAPIHandler(BaseHTTPServer.BaseHTTPRequestHandler): self.send_response_body(js, 'application/javascript; charset=utf-8', status) def do_GET(self): + # Rate limiting check + client_ip = self.client_address[0] if self.client_address else '' + if not check_rate_limit(client_ip): + self.send_response(429) + self.send_header('Content-Type', 'application/json') + self.send_header('Retry-After', str(_rate_limit_window)) + for header, value in get_security_headers('application/json'): + self.send_header(header, value) + self.end_headers() + self.wfile.write(json.dumps({'error': 'rate limited', 'retry_after': _rate_limit_window})) + return + path = self.path.split('?')[0] routes = { '/': self.handle_index, @@ -572,14 +626,26 @@ class ProxyAPIServer(threading.Thread): """WSGI application wrapper for gevent.""" path = environ.get('PATH_INFO', '/').split('?')[0] method = environ.get('REQUEST_METHOD', 'GET') + remote_addr = environ.get('REMOTE_ADDR', '') if method != 'GET': start_response('405 Method Not Allowed', [('Content-Type', 'text/plain')]) return [b'Method not allowed'] + # Rate limiting check + if not check_rate_limit(remote_addr): + error_body = json.dumps({'error': 'rate limited', 'retry_after': _rate_limit_window}) + headers = [ + ('Content-Type', 'application/json'), + ('Content-Length', str(len(error_body))), + ('Retry-After', str(_rate_limit_window)), + ] + headers.extend(get_security_headers('application/json')) + start_response('429 Too Many Requests', headers) + return [error_body] + # Route handling try: - remote_addr = environ.get('REMOTE_ADDR', '') response_body, content_type, status = self._handle_route(path, remote_addr) status_line = '%d %s' % (status, 'OK' if status == 200 else 'Error') headers = [ @@ -587,16 +653,17 @@ class ProxyAPIServer(threading.Thread): ('Content-Length', str(len(response_body))), ('Cache-Control', 'no-cache'), ] - if content_type == 'application/json': - headers.append(('Access-Control-Allow-Origin', '*')) + headers.extend(get_security_headers(content_type)) start_response(status_line, headers) return [response_body.encode('utf-8') if isinstance(response_body, unicode) else response_body] except Exception as e: error_body = json.dumps({'error': str(e)}) - start_response('500 Internal Server Error', [ + headers = [ ('Content-Type', 'application/json'), ('Content-Length', str(len(error_body))), - ]) + ] + headers.extend(get_security_headers('application/json')) + start_response('500 Internal Server Error', headers) return [error_body] def _handle_route(self, path, remote_addr=''):