Files
ppf/httpd.py
Username ad89eb262e
All checks were successful
CI / syntax-check (push) Successful in 3s
CI / memory-leak-check (push) Successful in 11s
httpd: add rate limiting and security headers
2025-12-26 19:52:45 +01:00

939 lines
35 KiB
Python

#!/usr/bin/env python2
# -*- coding: utf-8 -*-
from __future__ import division
"""HTTP API server with advanced web dashboard for PPF."""
import BaseHTTPServer
import json
import threading
import time
import os
import gc
import sys
from collections import defaultdict
import mysqlite
from misc import _log
# Rate limiting configuration
_rate_limits = defaultdict(list)
_rate_lock = threading.Lock()
_rate_limit_requests = 60 # requests per window
_rate_limit_window = 60 # window in seconds
# Static directories (relative to this file)
_STATIC_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'static')
_STATIC_LIB_DIR = os.path.join(_STATIC_DIR, 'lib')
# Content type mapping for static files
_CONTENT_TYPES = {
'.js': 'application/javascript; charset=utf-8',
'.css': 'text/css; charset=utf-8',
'.json': 'application/json; charset=utf-8',
'.png': 'image/png',
'.svg': 'image/svg+xml',
'.woff': 'font/woff',
'.woff2': 'font/woff2',
}
# Cache for static library files (loaded once at startup)
_LIB_CACHE = {}
# Cache for dashboard static files (HTML, CSS, JS)
_STATIC_CACHE = {}
# Optional memory profiling (installed via requirements.txt)
try:
import objgraph
_has_objgraph = True
except ImportError:
_has_objgraph = False
try:
from pympler import muppy, summary
_has_pympler = True
except ImportError:
_has_pympler = False
# Memory tracking for leak detection
_memory_samples = []
_memory_sample_max = 60 # Keep last 60 samples (5 min at 5s intervals)
_peak_rss = 0
_start_rss = 0
# Cache for expensive operations
_gc_objects_cache = {'value': 0, 'time': 0}
_gc_objects_ttl = 30 # seconds
_db_health_cache = {'value': {}, 'time': 0}
_db_health_ttl = 10 # seconds
def is_localhost(ip):
"""Check if IP is localhost (127.0.0.0/8 or ::1)."""
if not ip:
return False
# IPv6 localhost
if ip == '::1':
return True
# IPv4 localhost (127.0.0.0/8)
if ip.startswith('127.'):
return True
return False
def check_rate_limit(ip):
"""Check if IP is within rate limit. Returns True if allowed."""
if is_localhost(ip):
return True # No rate limit for localhost
now = time.time()
with _rate_lock:
# Clean old entries
_rate_limits[ip] = [t for t in _rate_limits[ip] if now - t < _rate_limit_window]
if len(_rate_limits[ip]) >= _rate_limit_requests:
return False
_rate_limits[ip].append(now)
return True
def get_security_headers(content_type):
"""Return security headers for responses."""
headers = [
('X-Content-Type-Options', 'nosniff'),
('X-Frame-Options', 'DENY'),
('Referrer-Policy', 'strict-origin-when-cross-origin'),
]
# Add CSP for HTML pages
if 'text/html' in content_type:
headers.append((
'Content-Security-Policy',
"default-src 'self'; "
"script-src 'self' 'unsafe-inline'; "
"style-src 'self' 'unsafe-inline'; "
"img-src 'self' data: https:; "
"connect-src 'self'"
))
return headers
def get_system_stats():
"""Collect system resource statistics."""
stats = {}
# Load average (1, 5, 15 min)
try:
load = os.getloadavg()
stats['load_1m'] = round(load[0], 2)
stats['load_5m'] = round(load[1], 2)
stats['load_15m'] = round(load[2], 2)
except (OSError, AttributeError):
stats['load_1m'] = stats['load_5m'] = stats['load_15m'] = 0
# CPU count
try:
stats['cpu_count'] = os.sysconf('SC_NPROCESSORS_ONLN')
except (ValueError, OSError, AttributeError):
stats['cpu_count'] = 1
# Memory from /proc/meminfo (Linux)
try:
with open('/proc/meminfo', 'r') as f:
meminfo = {}
for line in f:
parts = line.split()
if len(parts) >= 2:
meminfo[parts[0].rstrip(':')] = int(parts[1]) * 1024 # KB to bytes
total = meminfo.get('MemTotal', 0)
available = meminfo.get('MemAvailable', meminfo.get('MemFree', 0))
stats['mem_total'] = total
stats['mem_available'] = available
stats['mem_used'] = total - available
stats['mem_pct'] = round((total - available) / total * 100, 1) if total > 0 else 0
except (IOError, KeyError, ZeroDivisionError):
stats['mem_total'] = stats['mem_available'] = stats['mem_used'] = 0
stats['mem_pct'] = 0
# Disk usage for data directory
try:
st = os.statvfs('data' if os.path.exists('data') else '.')
total = st.f_blocks * st.f_frsize
free = st.f_bavail * st.f_frsize
used = total - free
stats['disk_total'] = total
stats['disk_free'] = free
stats['disk_used'] = used
stats['disk_pct'] = round(used / total * 100, 1) if total > 0 else 0
except (OSError, ZeroDivisionError):
stats['disk_total'] = stats['disk_free'] = stats['disk_used'] = 0
stats['disk_pct'] = 0
# Process stats from /proc/self/status
try:
with open('/proc/self/status', 'r') as f:
for line in f:
if line.startswith('VmRSS:'):
stats['proc_rss'] = int(line.split()[1]) * 1024 # KB to bytes
elif line.startswith('Threads:'):
stats['proc_threads'] = int(line.split()[1])
except (IOError, ValueError, IndexError):
stats['proc_rss'] = 0
stats['proc_threads'] = 0
# Memory leak detection
global _memory_samples, _peak_rss, _start_rss
rss = stats.get('proc_rss', 0)
if rss > 0:
if _start_rss == 0:
_start_rss = rss
if rss > _peak_rss:
_peak_rss = rss
_memory_samples.append((time.time(), rss))
if len(_memory_samples) > _memory_sample_max:
_memory_samples.pop(0)
stats['proc_rss_peak'] = _peak_rss
stats['proc_rss_start'] = _start_rss
stats['proc_rss_growth'] = rss - _start_rss if _start_rss > 0 else 0
# GC stats for leak detection
try:
gc_counts = gc.get_count()
stats['gc_count_gen0'] = gc_counts[0]
stats['gc_count_gen1'] = gc_counts[1]
stats['gc_count_gen2'] = gc_counts[2]
# Cache gc.get_objects() - expensive call (~23ms)
global _gc_objects_cache
now = time.time()
if now - _gc_objects_cache['time'] > _gc_objects_ttl:
_gc_objects_cache['value'] = len(gc.get_objects())
_gc_objects_cache['time'] = now
stats['gc_objects'] = _gc_objects_cache['value']
except Exception:
stats['gc_count_gen0'] = stats['gc_count_gen1'] = stats['gc_count_gen2'] = 0
stats['gc_objects'] = 0
return stats
def get_db_health(db):
"""Get database health and statistics (cached)."""
global _db_health_cache
now = time.time()
if now - _db_health_cache['time'] < _db_health_ttl:
return _db_health_cache['value']
stats = {}
try:
# Database file size
db_path = db.path if hasattr(db, 'path') else 'data/proxies.sqlite'
if os.path.exists(db_path):
stats['db_size'] = os.path.getsize(db_path)
else:
stats['db_size'] = 0
# Page stats from pragma
row = db.execute('PRAGMA page_count').fetchone()
stats['page_count'] = row[0] if row else 0
row = db.execute('PRAGMA page_size').fetchone()
stats['page_size'] = row[0] if row else 0
row = db.execute('PRAGMA freelist_count').fetchone()
stats['freelist_count'] = row[0] if row else 0
# Anonymity breakdown
rows = db.execute(
'SELECT anonymity, COUNT(*) FROM proxylist WHERE failed=0 GROUP BY anonymity'
).fetchall()
stats['anonymity'] = {r[0] or 'unknown': r[1] for r in rows}
# Latency stats
row = db.execute(
'SELECT AVG(avg_latency), MIN(avg_latency), MAX(avg_latency) '
'FROM proxylist WHERE failed=0 AND avg_latency > 0'
).fetchone()
if row and row[0]:
stats['db_avg_latency'] = round(row[0], 1)
stats['db_min_latency'] = round(row[1], 1)
stats['db_max_latency'] = round(row[2], 1)
else:
stats['db_avg_latency'] = stats['db_min_latency'] = stats['db_max_latency'] = 0
# Recent activity
now = int(time.time())
row = db.execute(
'SELECT COUNT(*) FROM proxylist WHERE tested > ?', (now - 3600,)
).fetchone()
stats['tested_last_hour'] = row[0] if row else 0
row = db.execute(
'SELECT COUNT(*) FROM proxylist WHERE added > ?', (now - 86400,)
).fetchone()
stats['added_last_day'] = row[0] if row else 0
# Dead proxies count (permanently dead = -1, failing = positive)
row = db.execute(
'SELECT COUNT(*) FROM proxylist WHERE failed = -1'
).fetchone()
stats['dead_count'] = row[0] if row else 0
# Failing proxies count (positive fail count but not permanently dead)
row = db.execute(
'SELECT COUNT(*) FROM proxylist WHERE failed > 0'
).fetchone()
stats['failing_count'] = row[0] if row else 0
except Exception:
pass
# Update cache
_db_health_cache['value'] = stats
_db_health_cache['time'] = time.time()
return stats
# Detect if gevent has monkey-patched the environment
try:
from gevent import monkey
GEVENT_PATCHED = monkey.is_module_patched('socket')
except ImportError:
GEVENT_PATCHED = False
if GEVENT_PATCHED:
from gevent.pywsgi import WSGIServer
def load_static_libs():
"""Load static library files into cache at startup."""
global _LIB_CACHE
if not os.path.isdir(_STATIC_LIB_DIR):
_log('static/lib directory not found: %s' % _STATIC_LIB_DIR, 'warn')
return
for fname in os.listdir(_STATIC_LIB_DIR):
fpath = os.path.join(_STATIC_LIB_DIR, fname)
if os.path.isfile(fpath):
try:
with open(fpath, 'rb') as f:
_LIB_CACHE[fname] = f.read()
_log('loaded static lib: %s (%d bytes)' % (fname, len(_LIB_CACHE[fname])), 'debug')
except IOError as e:
_log('failed to load %s: %s' % (fname, e), 'warn')
_log('loaded %d static library files' % len(_LIB_CACHE), 'info')
def get_static_lib(filename):
"""Get a cached static library file."""
return _LIB_CACHE.get(filename)
def load_static_files(theme):
"""Load dashboard static files into cache at startup.
Args:
theme: dict of color name -> color value for CSS variable substitution
"""
global _STATIC_CACHE
files = {
'dashboard.html': 'static/dashboard.html',
'map.html': 'static/map.html',
'mitm.html': 'static/mitm.html',
'style.css': 'static/style.css',
'dashboard.js': 'static/dashboard.js',
'map.js': 'static/map.js',
'mitm.js': 'static/mitm.js',
}
for key, relpath in files.items():
fpath = os.path.join(os.path.dirname(os.path.abspath(__file__)), relpath)
if os.path.isfile(fpath):
try:
with open(fpath, 'rb') as f:
content = f.read()
# Apply theme substitution to CSS
if key == 'style.css' and theme:
for name, val in theme.items():
content = content.replace('{' + name + '}', val)
_STATIC_CACHE[key] = content
_log('loaded static file: %s (%d bytes)' % (key, len(content)), 'debug')
except IOError as e:
_log('failed to load %s: %s' % (fpath, e), 'warn')
else:
_log('static file not found: %s' % fpath, 'warn')
_log('loaded %d dashboard static files' % len(_STATIC_CACHE), 'info')
def get_static_file(filename):
"""Get a cached dashboard static file."""
return _STATIC_CACHE.get(filename)
# Theme colors - dark tiles on lighter background
THEME = {
'bg': '#1e2738',
'card': '#181f2a',
'card_alt': '#212a36',
'border': '#3a4858',
'text': '#e8eef5',
'dim': '#8b929b',
'green': '#3fb950',
'red': '#f85149',
'yellow': '#d29922',
'blue': '#58a6ff',
'purple': '#a371f7',
'cyan': '#39c5cf',
'orange': '#db6d28',
'pink': '#db61a2',
'map_bg': '#1e2738', # Match dashboard background
}
class ProxyAPIHandler(BaseHTTPServer.BaseHTTPRequestHandler):
"""HTTP request handler for proxy API."""
database = None
stats_provider = None
def log_message(self, format, *args):
pass
def send_response_body(self, body, content_type, status=200):
self.send_response(status)
self.send_header('Content-Type', content_type)
self.send_header('Content-Length', len(body))
self.send_header('Cache-Control', 'no-cache')
# Add security headers
for header, value in get_security_headers(content_type):
self.send_header(header, value)
self.end_headers()
self.wfile.write(body)
def send_json(self, data, status=200):
self.send_response_body(json.dumps(data, indent=2), 'application/json', status)
def send_text(self, text, status=200):
self.send_response_body(text, 'text/plain', status)
def send_html(self, html, status=200):
self.send_response_body(html, 'text/html; charset=utf-8', status)
def send_css(self, css, status=200):
self.send_response_body(css, 'text/css; charset=utf-8', status)
def send_js(self, js, status=200):
self.send_response_body(js, 'application/javascript; charset=utf-8', status)
def do_GET(self):
# Rate limiting check
client_ip = self.client_address[0] if self.client_address else ''
if not check_rate_limit(client_ip):
self.send_response(429)
self.send_header('Content-Type', 'application/json')
self.send_header('Retry-After', str(_rate_limit_window))
for header, value in get_security_headers('application/json'):
self.send_header(header, value)
self.end_headers()
self.wfile.write(json.dumps({'error': 'rate limited', 'retry_after': _rate_limit_window}))
return
path = self.path.split('?')[0]
routes = {
'/': self.handle_index,
'/dashboard': self.handle_dashboard,
'/map': self.handle_map,
'/static/style.css': self.handle_css,
'/static/dashboard.js': self.handle_js,
'/api/stats': self.handle_stats,
'/api/countries': self.handle_countries,
'/proxies': self.handle_proxies,
'/proxies/count': self.handle_count,
'/health': self.handle_health,
}
handler = routes.get(path)
if handler:
handler()
else:
self.send_json({'error': 'not found'}, 404)
def handle_index(self):
self.send_json({
'endpoints': {
'/dashboard': 'web dashboard (HTML)',
'/api/stats': 'runtime statistics (JSON)',
'/proxies': 'list working proxies (params: limit, proto, country, asn)',
'/proxies/count': 'count working proxies',
'/health': 'health check',
}
})
def handle_dashboard(self):
self.send_html(DASHBOARD_HTML)
def handle_map(self):
self.send_html(MAP_HTML)
def handle_countries(self):
"""Return all countries with proxy counts."""
try:
db = mysqlite.mysqlite(self.database, str)
rows = db.execute(
'SELECT country, COUNT(*) as c FROM proxylist WHERE failed=0 AND country IS NOT NULL '
'GROUP BY country ORDER BY c DESC'
).fetchall()
countries = {r[0]: r[1] for r in rows}
self.send_json({'countries': countries})
except Exception as e:
self.send_json({'error': str(e)}, 500)
def handle_css(self):
self.send_css(DASHBOARD_CSS)
def handle_js(self):
self.send_js(DASHBOARD_JS)
def get_db_stats(self):
"""Get statistics from database."""
try:
db = mysqlite.mysqlite(self.database, str)
stats = {}
# Total counts
row = db.execute('SELECT COUNT(*) FROM proxylist WHERE failed=0').fetchone()
stats['working'] = row[0] if row else 0
row = db.execute('SELECT COUNT(*) FROM proxylist').fetchone()
stats['total'] = row[0] if row else 0
# By protocol
rows = db.execute(
'SELECT proto, COUNT(*) FROM proxylist WHERE failed=0 GROUP BY proto'
).fetchall()
stats['by_proto'] = {r[0] or 'unknown': r[1] for r in rows}
# Top countries
rows = db.execute(
'SELECT country, COUNT(*) as c FROM proxylist WHERE failed=0 AND country IS NOT NULL '
'GROUP BY country ORDER BY c DESC LIMIT 10'
).fetchall()
stats['top_countries'] = [{'code': r[0], 'count': r[1]} for r in rows]
# Top ASNs
rows = db.execute(
'SELECT asn, COUNT(*) as c FROM proxylist WHERE failed=0 AND asn IS NOT NULL '
'GROUP BY asn ORDER BY c DESC LIMIT 10'
).fetchall()
stats['top_asns'] = [(r[0], r[1]) for r in rows]
return stats
except Exception as e:
return {'error': str(e)}
def handle_stats(self):
stats = {}
# Runtime stats from provider
if self.stats_provider:
try:
stats.update(self.stats_provider())
except Exception as e:
_log('stats_provider error: %s' % str(e), 'error')
# Add system stats
stats['system'] = get_system_stats()
# Add database stats
try:
db = mysqlite.mysqlite(self.database, str)
stats['db'] = self.get_db_stats()
stats['db_health'] = get_db_health(db)
except Exception:
pass
self.send_json(stats)
def handle_proxies(self):
params = {}
if '?' in self.path:
for pair in self.path.split('?')[1].split('&'):
if '=' in pair:
k, v = pair.split('=', 1)
params[k] = v
limit = min(int(params.get('limit', 100)), 1000)
proto = params.get('proto', '')
country = params.get('country', '')
asn = params.get('asn', '')
fmt = params.get('format', 'json')
sql = 'SELECT ip, port, proto, country, asn, avg_latency FROM proxylist WHERE failed=0'
args = []
if proto:
sql += ' AND proto=?'
args.append(proto)
if country:
sql += ' AND country=?'
args.append(country.upper())
if asn:
sql += ' AND asn=?'
args.append(int(asn))
sql += ' ORDER BY avg_latency ASC, tested DESC LIMIT ?'
args.append(limit)
try:
db = mysqlite.mysqlite(self.database, str)
rows = db.execute(sql, args).fetchall()
if fmt == 'plain':
self.send_text('\n'.join('%s:%s' % (r[0], r[1]) for r in rows))
else:
proxies = [{
'ip': r[0], 'port': r[1], 'proto': r[2],
'country': r[3], 'asn': r[4], 'latency': r[5]
} for r in rows]
self.send_json({'count': len(proxies), 'proxies': proxies})
except Exception as e:
self.send_json({'error': str(e)}, 500)
def handle_count(self):
try:
db = mysqlite.mysqlite(self.database, str)
row = db.execute('SELECT COUNT(*) FROM proxylist WHERE failed=0').fetchone()
self.send_json({'count': row[0] if row else 0})
except Exception as e:
self.send_json({'error': str(e)}, 500)
def handle_health(self):
self.send_json({'status': 'ok', 'timestamp': int(time.time())})
class ProxyAPIServer(threading.Thread):
"""Threaded HTTP API server.
Uses gevent's WSGIServer when running in a gevent-patched environment,
otherwise falls back to standard BaseHTTPServer.
"""
def __init__(self, host, port, database, stats_provider=None):
threading.Thread.__init__(self)
self.host = host
self.port = port
self.database = database
self.stats_provider = stats_provider
self.daemon = True
self.server = None
self._stop_event = threading.Event() if not GEVENT_PATCHED else None
# Load static library files into cache
load_static_libs()
# Load dashboard static files (HTML, CSS, JS) with theme substitution
load_static_files(THEME)
def _wsgi_app(self, environ, start_response):
"""WSGI application wrapper for gevent."""
path = environ.get('PATH_INFO', '/').split('?')[0]
method = environ.get('REQUEST_METHOD', 'GET')
remote_addr = environ.get('REMOTE_ADDR', '')
if method != 'GET':
start_response('405 Method Not Allowed', [('Content-Type', 'text/plain')])
return [b'Method not allowed']
# Rate limiting check
if not check_rate_limit(remote_addr):
error_body = json.dumps({'error': 'rate limited', 'retry_after': _rate_limit_window})
headers = [
('Content-Type', 'application/json'),
('Content-Length', str(len(error_body))),
('Retry-After', str(_rate_limit_window)),
]
headers.extend(get_security_headers('application/json'))
start_response('429 Too Many Requests', headers)
return [error_body]
# Route handling
try:
response_body, content_type, status = self._handle_route(path, remote_addr)
status_line = '%d %s' % (status, 'OK' if status == 200 else 'Error')
headers = [
('Content-Type', content_type),
('Content-Length', str(len(response_body))),
('Cache-Control', 'no-cache'),
]
headers.extend(get_security_headers(content_type))
start_response(status_line, headers)
return [response_body.encode('utf-8') if isinstance(response_body, unicode) else response_body]
except Exception as e:
error_body = json.dumps({'error': str(e)})
headers = [
('Content-Type', 'application/json'),
('Content-Length', str(len(error_body))),
]
headers.extend(get_security_headers('application/json'))
start_response('500 Internal Server Error', headers)
return [error_body]
def _handle_route(self, path, remote_addr=''):
"""Handle route and return (body, content_type, status)."""
if path == '/':
body = json.dumps({
'endpoints': {
'/dashboard': 'web dashboard (HTML)',
'/map': 'proxy distribution by country (HTML)',
'/mitm': 'MITM certificate search (HTML)',
'/api/stats': 'runtime statistics (JSON)',
'/api/mitm': 'MITM certificate statistics (JSON)',
'/api/countries': 'proxy counts by country (JSON)',
'/proxies': 'list working proxies (params: limit, proto, country, asn)',
'/proxies/count': 'count working proxies',
'/health': 'health check',
}
}, indent=2)
return body, 'application/json', 200
elif path == '/dashboard':
content = get_static_file('dashboard.html')
if content:
return content, 'text/html; charset=utf-8', 200
return '{"error": "dashboard.html not loaded"}', 'application/json', 500
elif path == '/map':
content = get_static_file('map.html')
if content:
return content, 'text/html; charset=utf-8', 200
return '{"error": "map.html not loaded"}', 'application/json', 500
elif path == '/mitm':
content = get_static_file('mitm.html')
if content:
return content, 'text/html; charset=utf-8', 200
return '{"error": "mitm.html not loaded"}', 'application/json', 500
elif path == '/static/style.css':
content = get_static_file('style.css')
if content:
return content, 'text/css; charset=utf-8', 200
return '{"error": "style.css not loaded"}', 'application/json', 500
elif path == '/static/dashboard.js':
content = get_static_file('dashboard.js')
if content:
return content, 'application/javascript; charset=utf-8', 200
return '{"error": "dashboard.js not loaded"}', 'application/json', 500
elif path == '/static/map.js':
content = get_static_file('map.js')
if content:
return content, 'application/javascript; charset=utf-8', 200
return '{"error": "map.js not loaded"}', 'application/json', 500
elif path == '/static/mitm.js':
content = get_static_file('mitm.js')
if content:
return content, 'application/javascript; charset=utf-8', 200
return '{"error": "mitm.js not loaded"}', 'application/json', 500
elif path.startswith('/static/lib/'):
# Serve static library files from cache
filename = path.split('/')[-1]
content = get_static_lib(filename)
if content:
ext = os.path.splitext(filename)[1]
content_type = _CONTENT_TYPES.get(ext, 'application/octet-stream')
return content, content_type, 200
return '{"error": "not found"}', 'application/json', 404
elif path == '/api/stats':
stats = {}
if self.stats_provider:
stats = self.stats_provider()
# Add system stats
stats['system'] = get_system_stats()
# Add database stats
try:
db = mysqlite.mysqlite(self.database, str)
stats['db'] = self._get_db_stats(db)
stats['db_health'] = get_db_health(db)
except Exception:
pass
return json.dumps(stats, indent=2), 'application/json', 200
elif path == '/api/mitm':
# MITM certificate statistics
if self.stats_provider:
try:
stats = self.stats_provider()
mitm = stats.get('mitm', {})
return json.dumps(mitm, indent=2), 'application/json', 200
except Exception as e:
return json.dumps({'error': str(e)}), 'application/json', 500
return json.dumps({'error': 'stats not available'}), 'application/json', 500
elif path == '/api/countries':
try:
db = mysqlite.mysqlite(self.database, str)
rows = db.execute(
'SELECT country, COUNT(*) as c FROM proxylist WHERE failed=0 AND country IS NOT NULL '
'GROUP BY country ORDER BY c DESC'
).fetchall()
countries = {r[0]: r[1] for r in rows}
return json.dumps({'countries': countries}, indent=2), 'application/json', 200
except Exception as e:
return json.dumps({'error': str(e)}), 'application/json', 500
elif path == '/api/locations':
# Return proxy locations aggregated by lat/lon grid (0.5 degree cells)
try:
db = mysqlite.mysqlite(self.database, str)
rows = db.execute(
'SELECT ROUND(latitude, 1) as lat, ROUND(longitude, 1) as lon, '
'country, anonymity, COUNT(*) as c FROM proxylist '
'WHERE failed=0 AND latitude IS NOT NULL AND longitude IS NOT NULL '
'GROUP BY lat, lon, country, anonymity ORDER BY c DESC'
).fetchall()
locations = [{'lat': r[0], 'lon': r[1], 'country': r[2], 'anon': r[3] or 'unknown', 'count': r[4]} for r in rows]
return json.dumps({'locations': locations}, indent=2), 'application/json', 200
except Exception as e:
return json.dumps({'error': str(e)}), 'application/json', 500
elif path == '/proxies':
try:
db = mysqlite.mysqlite(self.database, str)
rows = db.execute(
'SELECT proxy, proto, country, asn FROM proxylist WHERE failed=0 LIMIT 100'
).fetchall()
proxies = [{'proxy': r[0], 'proto': r[1], 'country': r[2], 'asn': r[3]} for r in rows]
return json.dumps({'proxies': proxies}, indent=2), 'application/json', 200
except Exception as e:
return json.dumps({'error': str(e)}), 'application/json', 500
elif path == '/proxies/count':
try:
db = mysqlite.mysqlite(self.database, str)
row = db.execute('SELECT COUNT(*) FROM proxylist WHERE failed=0').fetchone()
return json.dumps({'count': row[0] if row else 0}), 'application/json', 200
except Exception as e:
return json.dumps({'error': str(e)}), 'application/json', 500
elif path == '/api/memory':
# Memory profiling endpoint (localhost only)
if not is_localhost(remote_addr):
return json.dumps({'error': 'not available'}), 'application/json', 404
try:
mem = {}
# Process memory from /proc/self/status
try:
with open('/proc/self/status', 'r') as f:
for line in f:
if line.startswith('Vm'):
parts = line.split()
key = parts[0].rstrip(':')
mem[key] = int(parts[1]) * 1024 # Convert to bytes
except IOError:
pass
# GC stats
gc_stats = {
'collections': gc.get_count(),
'threshold': gc.get_threshold(),
'objects': len(gc.get_objects()),
}
# Object type counts (top 20)
type_counts = {}
for obj in gc.get_objects():
t = type(obj).__name__
type_counts[t] = type_counts.get(t, 0) + 1
top_types = sorted(type_counts.items(), key=lambda x: -x[1])[:20]
# Memory samples history
samples = []
for ts, rss in _memory_samples[-30:]:
samples.append({'time': int(ts), 'rss': rss})
result = {
'process': mem,
'gc': gc_stats,
'top_types': [{'type': t, 'count': c} for t, c in top_types],
'samples': samples,
'peak_rss': _peak_rss,
'start_rss': _start_rss,
'has_objgraph': _has_objgraph,
'has_pympler': _has_pympler,
}
# Objgraph most common types (if available)
if _has_objgraph:
try:
result['objgraph_common'] = objgraph.most_common_types(limit=15)
except Exception:
pass
# Pympler summary (if available)
if _has_pympler:
try:
all_objects = muppy.get_objects()
sum_table = summary.summarize(all_objects)
result['pympler_summary'] = [
{'type': row[0], 'count': row[1], 'size': row[2]}
for row in sum_table[:20]
]
except Exception as e:
result['pympler_error'] = str(e)
return json.dumps(result, indent=2), 'application/json', 200
except Exception as e:
return json.dumps({'error': str(e)}), 'application/json', 500
elif path == '/health':
return json.dumps({'status': 'ok', 'timestamp': int(time.time())}), 'application/json', 200
else:
return json.dumps({'error': 'not found'}), 'application/json', 404
def _get_db_stats(self, db):
"""Get database statistics."""
stats = {}
try:
# By protocol
rows = db.execute(
'SELECT proto, COUNT(*) FROM proxylist WHERE failed=0 GROUP BY proto'
).fetchall()
stats['by_proto'] = {r[0]: r[1] for r in rows if r[0]}
# Top countries
rows = db.execute(
'SELECT country, COUNT(*) as cnt FROM proxylist WHERE failed=0 AND country IS NOT NULL '
'GROUP BY country ORDER BY cnt DESC LIMIT 10'
).fetchall()
stats['top_countries'] = [{'code': r[0], 'count': r[1]} for r in rows]
# Total counts
row = db.execute('SELECT COUNT(*) FROM proxylist WHERE failed=0').fetchone()
stats['working'] = row[0] if row else 0
row = db.execute('SELECT COUNT(*) FROM proxylist').fetchone()
stats['total'] = row[0] if row else 0
except Exception:
pass
return stats
def run(self):
ProxyAPIHandler.database = self.database
ProxyAPIHandler.stats_provider = self.stats_provider
if GEVENT_PATCHED:
# Use gevent's WSGIServer for proper async handling
self.server = WSGIServer((self.host, self.port), self._wsgi_app, log=None)
_log('httpd listening on %s:%d (gevent)' % (self.host, self.port), 'info')
self.server.serve_forever()
else:
# Standard BaseHTTPServer for non-gevent environments
self.server = BaseHTTPServer.HTTPServer((self.host, self.port), ProxyAPIHandler)
_log('httpd listening on %s:%d' % (self.host, self.port), 'info')
self.server.serve_forever()
def stop(self):
if self.server:
if GEVENT_PATCHED:
self.server.stop()
else:
self.server.shutdown()
if __name__ == '__main__':
import sys
host = '127.0.0.1'
port = 8081
database = 'data/proxies.sqlite'
if len(sys.argv) > 1:
database = sys.argv[1]
_log('starting test server on %s:%d (db: %s)' % (host, port, database), 'info')
server = ProxyAPIServer(host, port, database)
server.start()
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
server.stop()
_log('server stopped', 'info')