#!/usr/bin/env python2 # -*- coding: utf-8 -*- from __future__ import division """HTTP API server with advanced web dashboard for PPF.""" import BaseHTTPServer import json import threading import time import os import gc import sys import mysqlite from misc import _log # Static directories (relative to this file) _STATIC_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'static') _STATIC_LIB_DIR = os.path.join(_STATIC_DIR, 'lib') # Content type mapping for static files _CONTENT_TYPES = { '.js': 'application/javascript; charset=utf-8', '.css': 'text/css; charset=utf-8', '.json': 'application/json; charset=utf-8', '.png': 'image/png', '.svg': 'image/svg+xml', '.woff': 'font/woff', '.woff2': 'font/woff2', } # Cache for static library files (loaded once at startup) _LIB_CACHE = {} # Cache for dashboard static files (HTML, CSS, JS) _STATIC_CACHE = {} # Optional memory profiling (installed via requirements.txt) try: import objgraph _has_objgraph = True except ImportError: _has_objgraph = False try: from pympler import muppy, summary _has_pympler = True except ImportError: _has_pympler = False # Memory tracking for leak detection _memory_samples = [] _memory_sample_max = 60 # Keep last 60 samples (5 min at 5s intervals) _peak_rss = 0 _start_rss = 0 def get_system_stats(): """Collect system resource statistics.""" stats = {} # Load average (1, 5, 15 min) try: load = os.getloadavg() stats['load_1m'] = round(load[0], 2) stats['load_5m'] = round(load[1], 2) stats['load_15m'] = round(load[2], 2) except (OSError, AttributeError): stats['load_1m'] = stats['load_5m'] = stats['load_15m'] = 0 # CPU count try: stats['cpu_count'] = os.sysconf('SC_NPROCESSORS_ONLN') except (ValueError, OSError, AttributeError): stats['cpu_count'] = 1 # Memory from /proc/meminfo (Linux) try: with open('/proc/meminfo', 'r') as f: meminfo = {} for line in f: parts = line.split() if len(parts) >= 2: meminfo[parts[0].rstrip(':')] = int(parts[1]) * 1024 # KB to bytes total = meminfo.get('MemTotal', 0) available = meminfo.get('MemAvailable', meminfo.get('MemFree', 0)) stats['mem_total'] = total stats['mem_available'] = available stats['mem_used'] = total - available stats['mem_pct'] = round((total - available) / total * 100, 1) if total > 0 else 0 except (IOError, KeyError, ZeroDivisionError): stats['mem_total'] = stats['mem_available'] = stats['mem_used'] = 0 stats['mem_pct'] = 0 # Disk usage for data directory try: st = os.statvfs('data' if os.path.exists('data') else '.') total = st.f_blocks * st.f_frsize free = st.f_bavail * st.f_frsize used = total - free stats['disk_total'] = total stats['disk_free'] = free stats['disk_used'] = used stats['disk_pct'] = round(used / total * 100, 1) if total > 0 else 0 except (OSError, ZeroDivisionError): stats['disk_total'] = stats['disk_free'] = stats['disk_used'] = 0 stats['disk_pct'] = 0 # Process stats from /proc/self/status try: with open('/proc/self/status', 'r') as f: for line in f: if line.startswith('VmRSS:'): stats['proc_rss'] = int(line.split()[1]) * 1024 # KB to bytes elif line.startswith('Threads:'): stats['proc_threads'] = int(line.split()[1]) except (IOError, ValueError, IndexError): stats['proc_rss'] = 0 stats['proc_threads'] = 0 # Memory leak detection global _memory_samples, _peak_rss, _start_rss rss = stats.get('proc_rss', 0) if rss > 0: if _start_rss == 0: _start_rss = rss if rss > _peak_rss: _peak_rss = rss _memory_samples.append((time.time(), rss)) if len(_memory_samples) > _memory_sample_max: _memory_samples.pop(0) stats['proc_rss_peak'] = _peak_rss stats['proc_rss_start'] = _start_rss stats['proc_rss_growth'] = rss - _start_rss if _start_rss > 0 else 0 # GC stats for leak detection try: gc_counts = gc.get_count() stats['gc_count_gen0'] = gc_counts[0] stats['gc_count_gen1'] = gc_counts[1] stats['gc_count_gen2'] = gc_counts[2] stats['gc_objects'] = len(gc.get_objects()) except Exception: stats['gc_count_gen0'] = stats['gc_count_gen1'] = stats['gc_count_gen2'] = 0 stats['gc_objects'] = 0 return stats def get_db_health(db): """Get database health and statistics.""" stats = {} try: # Database file size db_path = db.path if hasattr(db, 'path') else 'data/proxies.sqlite' if os.path.exists(db_path): stats['db_size'] = os.path.getsize(db_path) else: stats['db_size'] = 0 # Page stats from pragma row = db.execute('PRAGMA page_count').fetchone() stats['page_count'] = row[0] if row else 0 row = db.execute('PRAGMA page_size').fetchone() stats['page_size'] = row[0] if row else 0 row = db.execute('PRAGMA freelist_count').fetchone() stats['freelist_count'] = row[0] if row else 0 # Anonymity breakdown rows = db.execute( 'SELECT anonymity, COUNT(*) FROM proxylist WHERE failed=0 GROUP BY anonymity' ).fetchall() stats['anonymity'] = {r[0] or 'unknown': r[1] for r in rows} # Latency stats row = db.execute( 'SELECT AVG(avg_latency), MIN(avg_latency), MAX(avg_latency) ' 'FROM proxylist WHERE failed=0 AND avg_latency > 0' ).fetchone() if row and row[0]: stats['db_avg_latency'] = round(row[0], 1) stats['db_min_latency'] = round(row[1], 1) stats['db_max_latency'] = round(row[2], 1) else: stats['db_avg_latency'] = stats['db_min_latency'] = stats['db_max_latency'] = 0 # Recent activity now = int(time.time()) row = db.execute( 'SELECT COUNT(*) FROM proxylist WHERE tested > ?', (now - 3600,) ).fetchone() stats['tested_last_hour'] = row[0] if row else 0 row = db.execute( 'SELECT COUNT(*) FROM proxylist WHERE added > ?', (now - 86400,) ).fetchone() stats['added_last_day'] = row[0] if row else 0 # Dead proxies count (permanently dead = -1, failing = positive) row = db.execute( 'SELECT COUNT(*) FROM proxylist WHERE failed = -1' ).fetchone() stats['dead_count'] = row[0] if row else 0 # Failing proxies count (positive fail count but not permanently dead) row = db.execute( 'SELECT COUNT(*) FROM proxylist WHERE failed > 0' ).fetchone() stats['failing_count'] = row[0] if row else 0 except Exception: pass return stats # Detect if gevent has monkey-patched the environment try: from gevent import monkey GEVENT_PATCHED = monkey.is_module_patched('socket') except ImportError: GEVENT_PATCHED = False if GEVENT_PATCHED: from gevent.pywsgi import WSGIServer def load_static_libs(): """Load static library files into cache at startup.""" global _LIB_CACHE if not os.path.isdir(_STATIC_LIB_DIR): _log('static/lib directory not found: %s' % _STATIC_LIB_DIR, 'warn') return for fname in os.listdir(_STATIC_LIB_DIR): fpath = os.path.join(_STATIC_LIB_DIR, fname) if os.path.isfile(fpath): try: with open(fpath, 'rb') as f: _LIB_CACHE[fname] = f.read() _log('loaded static lib: %s (%d bytes)' % (fname, len(_LIB_CACHE[fname])), 'debug') except IOError as e: _log('failed to load %s: %s' % (fname, e), 'warn') _log('loaded %d static library files' % len(_LIB_CACHE), 'info') def get_static_lib(filename): """Get a cached static library file.""" return _LIB_CACHE.get(filename) def load_static_files(theme): """Load dashboard static files into cache at startup. Args: theme: dict of color name -> color value for CSS variable substitution """ global _STATIC_CACHE files = { 'dashboard.html': 'static/dashboard.html', 'map.html': 'static/map.html', 'mitm.html': 'static/mitm.html', 'style.css': 'static/style.css', 'dashboard.js': 'static/dashboard.js', 'map.js': 'static/map.js', 'mitm.js': 'static/mitm.js', } for key, relpath in files.items(): fpath = os.path.join(os.path.dirname(os.path.abspath(__file__)), relpath) if os.path.isfile(fpath): try: with open(fpath, 'rb') as f: content = f.read() # Apply theme substitution to CSS if key == 'style.css' and theme: for name, val in theme.items(): content = content.replace('{' + name + '}', val) _STATIC_CACHE[key] = content _log('loaded static file: %s (%d bytes)' % (key, len(content)), 'debug') except IOError as e: _log('failed to load %s: %s' % (fpath, e), 'warn') else: _log('static file not found: %s' % fpath, 'warn') _log('loaded %d dashboard static files' % len(_STATIC_CACHE), 'info') def get_static_file(filename): """Get a cached dashboard static file.""" return _STATIC_CACHE.get(filename) # Theme colors - dark tiles on lighter background THEME = { 'bg': '#1e2738', 'card': '#181f2a', 'card_alt': '#212a36', 'border': '#3a4858', 'text': '#e8eef5', 'dim': '#8b929b', 'green': '#3fb950', 'red': '#f85149', 'yellow': '#d29922', 'blue': '#58a6ff', 'purple': '#a371f7', 'cyan': '#39c5cf', 'orange': '#db6d28', 'pink': '#db61a2', 'map_bg': '#1e2738', # Match dashboard background } class ProxyAPIHandler(BaseHTTPServer.BaseHTTPRequestHandler): """HTTP request handler for proxy API.""" database = None stats_provider = None def log_message(self, format, *args): pass def send_response_body(self, body, content_type, status=200): self.send_response(status) self.send_header('Content-Type', content_type) self.send_header('Content-Length', len(body)) self.send_header('Cache-Control', 'no-cache') if content_type == 'application/json': self.send_header('Access-Control-Allow-Origin', '*') self.end_headers() self.wfile.write(body) def send_json(self, data, status=200): self.send_response_body(json.dumps(data, indent=2), 'application/json', status) def send_text(self, text, status=200): self.send_response_body(text, 'text/plain', status) def send_html(self, html, status=200): self.send_response_body(html, 'text/html; charset=utf-8', status) def send_css(self, css, status=200): self.send_response_body(css, 'text/css; charset=utf-8', status) def send_js(self, js, status=200): self.send_response_body(js, 'application/javascript; charset=utf-8', status) def do_GET(self): path = self.path.split('?')[0] routes = { '/': self.handle_index, '/dashboard': self.handle_dashboard, '/map': self.handle_map, '/static/style.css': self.handle_css, '/static/dashboard.js': self.handle_js, '/api/stats': self.handle_stats, '/api/countries': self.handle_countries, '/proxies': self.handle_proxies, '/proxies/count': self.handle_count, '/health': self.handle_health, } handler = routes.get(path) if handler: handler() else: self.send_json({'error': 'not found'}, 404) def handle_index(self): self.send_json({ 'endpoints': { '/dashboard': 'web dashboard (HTML)', '/api/stats': 'runtime statistics (JSON)', '/proxies': 'list working proxies (params: limit, proto, country, asn)', '/proxies/count': 'count working proxies', '/health': 'health check', } }) def handle_dashboard(self): self.send_html(DASHBOARD_HTML) def handle_map(self): self.send_html(MAP_HTML) def handle_countries(self): """Return all countries with proxy counts.""" try: db = mysqlite.mysqlite(self.database, str) rows = db.execute( 'SELECT country, COUNT(*) as c FROM proxylist WHERE failed=0 AND country IS NOT NULL ' 'GROUP BY country ORDER BY c DESC' ).fetchall() countries = {r[0]: r[1] for r in rows} self.send_json({'countries': countries}) except Exception as e: self.send_json({'error': str(e)}, 500) def handle_css(self): self.send_css(DASHBOARD_CSS) def handle_js(self): self.send_js(DASHBOARD_JS) def get_db_stats(self): """Get statistics from database.""" try: db = mysqlite.mysqlite(self.database, str) stats = {} # Total counts row = db.execute('SELECT COUNT(*) FROM proxylist WHERE failed=0').fetchone() stats['working'] = row[0] if row else 0 row = db.execute('SELECT COUNT(*) FROM proxylist').fetchone() stats['total'] = row[0] if row else 0 # By protocol rows = db.execute( 'SELECT proto, COUNT(*) FROM proxylist WHERE failed=0 GROUP BY proto' ).fetchall() stats['by_proto'] = {r[0] or 'unknown': r[1] for r in rows} # Top countries rows = db.execute( 'SELECT country, COUNT(*) as c FROM proxylist WHERE failed=0 AND country IS NOT NULL ' 'GROUP BY country ORDER BY c DESC LIMIT 10' ).fetchall() stats['top_countries'] = [{'code': r[0], 'count': r[1]} for r in rows] # Top ASNs rows = db.execute( 'SELECT asn, COUNT(*) as c FROM proxylist WHERE failed=0 AND asn IS NOT NULL ' 'GROUP BY asn ORDER BY c DESC LIMIT 10' ).fetchall() stats['top_asns'] = [(r[0], r[1]) for r in rows] return stats except Exception as e: return {'error': str(e)} def handle_stats(self): stats = {} # Runtime stats from provider if self.stats_provider: try: stats.update(self.stats_provider()) except Exception as e: _log('stats_provider error: %s' % str(e), 'error') # Add system stats stats['system'] = get_system_stats() # Add database stats try: db = mysqlite.mysqlite(self.database, str) stats['db'] = self.get_db_stats() stats['db_health'] = get_db_health(db) except Exception: pass self.send_json(stats) def handle_proxies(self): params = {} if '?' in self.path: for pair in self.path.split('?')[1].split('&'): if '=' in pair: k, v = pair.split('=', 1) params[k] = v limit = min(int(params.get('limit', 100)), 1000) proto = params.get('proto', '') country = params.get('country', '') asn = params.get('asn', '') fmt = params.get('format', 'json') sql = 'SELECT ip, port, proto, country, asn, avg_latency FROM proxylist WHERE failed=0' args = [] if proto: sql += ' AND proto=?' args.append(proto) if country: sql += ' AND country=?' args.append(country.upper()) if asn: sql += ' AND asn=?' args.append(int(asn)) sql += ' ORDER BY avg_latency ASC, tested DESC LIMIT ?' args.append(limit) try: db = mysqlite.mysqlite(self.database, str) rows = db.execute(sql, args).fetchall() if fmt == 'plain': self.send_text('\n'.join('%s:%s' % (r[0], r[1]) for r in rows)) else: proxies = [{ 'ip': r[0], 'port': r[1], 'proto': r[2], 'country': r[3], 'asn': r[4], 'latency': r[5] } for r in rows] self.send_json({'count': len(proxies), 'proxies': proxies}) except Exception as e: self.send_json({'error': str(e)}, 500) def handle_count(self): try: db = mysqlite.mysqlite(self.database, str) row = db.execute('SELECT COUNT(*) FROM proxylist WHERE failed=0').fetchone() self.send_json({'count': row[0] if row else 0}) except Exception as e: self.send_json({'error': str(e)}, 500) def handle_health(self): self.send_json({'status': 'ok', 'timestamp': int(time.time())}) class ProxyAPIServer(threading.Thread): """Threaded HTTP API server. Uses gevent's WSGIServer when running in a gevent-patched environment, otherwise falls back to standard BaseHTTPServer. """ def __init__(self, host, port, database, stats_provider=None): threading.Thread.__init__(self) self.host = host self.port = port self.database = database self.stats_provider = stats_provider self.daemon = True self.server = None self._stop_event = threading.Event() if not GEVENT_PATCHED else None # Load static library files into cache load_static_libs() # Load dashboard static files (HTML, CSS, JS) with theme substitution load_static_files(THEME) def _wsgi_app(self, environ, start_response): """WSGI application wrapper for gevent.""" path = environ.get('PATH_INFO', '/').split('?')[0] method = environ.get('REQUEST_METHOD', 'GET') if method != 'GET': start_response('405 Method Not Allowed', [('Content-Type', 'text/plain')]) return [b'Method not allowed'] # Route handling try: response_body, content_type, status = self._handle_route(path) status_line = '%d %s' % (status, 'OK' if status == 200 else 'Error') headers = [ ('Content-Type', content_type), ('Content-Length', str(len(response_body))), ('Cache-Control', 'no-cache'), ] if content_type == 'application/json': headers.append(('Access-Control-Allow-Origin', '*')) start_response(status_line, headers) return [response_body.encode('utf-8') if isinstance(response_body, unicode) else response_body] except Exception as e: error_body = json.dumps({'error': str(e)}) start_response('500 Internal Server Error', [ ('Content-Type', 'application/json'), ('Content-Length', str(len(error_body))), ]) return [error_body] def _handle_route(self, path): """Handle route and return (body, content_type, status).""" if path == '/': body = json.dumps({ 'endpoints': { '/dashboard': 'web dashboard (HTML)', '/map': 'proxy distribution by country (HTML)', '/mitm': 'MITM certificate search (HTML)', '/api/stats': 'runtime statistics (JSON)', '/api/mitm': 'MITM certificate statistics (JSON)', '/api/countries': 'proxy counts by country (JSON)', '/proxies': 'list working proxies (params: limit, proto, country, asn)', '/proxies/count': 'count working proxies', '/health': 'health check', } }, indent=2) return body, 'application/json', 200 elif path == '/dashboard': content = get_static_file('dashboard.html') if content: return content, 'text/html; charset=utf-8', 200 return '{"error": "dashboard.html not loaded"}', 'application/json', 500 elif path == '/map': content = get_static_file('map.html') if content: return content, 'text/html; charset=utf-8', 200 return '{"error": "map.html not loaded"}', 'application/json', 500 elif path == '/mitm': content = get_static_file('mitm.html') if content: return content, 'text/html; charset=utf-8', 200 return '{"error": "mitm.html not loaded"}', 'application/json', 500 elif path == '/static/style.css': content = get_static_file('style.css') if content: return content, 'text/css; charset=utf-8', 200 return '{"error": "style.css not loaded"}', 'application/json', 500 elif path == '/static/dashboard.js': content = get_static_file('dashboard.js') if content: return content, 'application/javascript; charset=utf-8', 200 return '{"error": "dashboard.js not loaded"}', 'application/json', 500 elif path == '/static/map.js': content = get_static_file('map.js') if content: return content, 'application/javascript; charset=utf-8', 200 return '{"error": "map.js not loaded"}', 'application/json', 500 elif path == '/static/mitm.js': content = get_static_file('mitm.js') if content: return content, 'application/javascript; charset=utf-8', 200 return '{"error": "mitm.js not loaded"}', 'application/json', 500 elif path.startswith('/static/lib/'): # Serve static library files from cache filename = path.split('/')[-1] content = get_static_lib(filename) if content: ext = os.path.splitext(filename)[1] content_type = _CONTENT_TYPES.get(ext, 'application/octet-stream') return content, content_type, 200 return '{"error": "not found"}', 'application/json', 404 elif path == '/api/stats': stats = {} if self.stats_provider: stats = self.stats_provider() # Add system stats stats['system'] = get_system_stats() # Add database stats try: db = mysqlite.mysqlite(self.database, str) stats['db'] = self._get_db_stats(db) stats['db_health'] = get_db_health(db) except Exception: pass return json.dumps(stats, indent=2), 'application/json', 200 elif path == '/api/mitm': # MITM certificate statistics if self.stats_provider: try: stats = self.stats_provider() mitm = stats.get('mitm', {}) return json.dumps(mitm, indent=2), 'application/json', 200 except Exception as e: return json.dumps({'error': str(e)}), 'application/json', 500 return json.dumps({'error': 'stats not available'}), 'application/json', 500 elif path == '/api/countries': try: db = mysqlite.mysqlite(self.database, str) rows = db.execute( 'SELECT country, COUNT(*) as c FROM proxylist WHERE failed=0 AND country IS NOT NULL ' 'GROUP BY country ORDER BY c DESC' ).fetchall() countries = {r[0]: r[1] for r in rows} return json.dumps({'countries': countries}, indent=2), 'application/json', 200 except Exception as e: return json.dumps({'error': str(e)}), 'application/json', 500 elif path == '/api/locations': # Return proxy locations aggregated by lat/lon grid (0.5 degree cells) try: db = mysqlite.mysqlite(self.database, str) rows = db.execute( 'SELECT ROUND(latitude, 1) as lat, ROUND(longitude, 1) as lon, ' 'country, anonymity, COUNT(*) as c FROM proxylist ' 'WHERE failed=0 AND latitude IS NOT NULL AND longitude IS NOT NULL ' 'GROUP BY lat, lon, country, anonymity ORDER BY c DESC' ).fetchall() locations = [{'lat': r[0], 'lon': r[1], 'country': r[2], 'anon': r[3] or 'unknown', 'count': r[4]} for r in rows] return json.dumps({'locations': locations}, indent=2), 'application/json', 200 except Exception as e: return json.dumps({'error': str(e)}), 'application/json', 500 elif path == '/proxies': try: db = mysqlite.mysqlite(self.database, str) rows = db.execute( 'SELECT proxy, proto, country, asn FROM proxylist WHERE failed=0 LIMIT 100' ).fetchall() proxies = [{'proxy': r[0], 'proto': r[1], 'country': r[2], 'asn': r[3]} for r in rows] return json.dumps({'proxies': proxies}, indent=2), 'application/json', 200 except Exception as e: return json.dumps({'error': str(e)}), 'application/json', 500 elif path == '/proxies/count': try: db = mysqlite.mysqlite(self.database, str) row = db.execute('SELECT COUNT(*) FROM proxylist WHERE failed=0').fetchone() return json.dumps({'count': row[0] if row else 0}), 'application/json', 200 except Exception as e: return json.dumps({'error': str(e)}), 'application/json', 500 elif path == '/api/memory': # Memory profiling endpoint try: mem = {} # Process memory from /proc/self/status try: with open('/proc/self/status', 'r') as f: for line in f: if line.startswith('Vm'): parts = line.split() key = parts[0].rstrip(':') mem[key] = int(parts[1]) * 1024 # Convert to bytes except IOError: pass # GC stats gc_stats = { 'collections': gc.get_count(), 'threshold': gc.get_threshold(), 'objects': len(gc.get_objects()), } # Object type counts (top 20) type_counts = {} for obj in gc.get_objects(): t = type(obj).__name__ type_counts[t] = type_counts.get(t, 0) + 1 top_types = sorted(type_counts.items(), key=lambda x: -x[1])[:20] # Memory samples history samples = [] for ts, rss in _memory_samples[-30:]: samples.append({'time': int(ts), 'rss': rss}) result = { 'process': mem, 'gc': gc_stats, 'top_types': [{'type': t, 'count': c} for t, c in top_types], 'samples': samples, 'peak_rss': _peak_rss, 'start_rss': _start_rss, 'has_objgraph': _has_objgraph, 'has_pympler': _has_pympler, } # Objgraph most common types (if available) if _has_objgraph: try: result['objgraph_common'] = objgraph.most_common_types(limit=15) except Exception: pass # Pympler summary (if available) if _has_pympler: try: all_objects = muppy.get_objects() sum_table = summary.summarize(all_objects) result['pympler_summary'] = [ {'type': row[0], 'count': row[1], 'size': row[2]} for row in sum_table[:20] ] except Exception as e: result['pympler_error'] = str(e) return json.dumps(result, indent=2), 'application/json', 200 except Exception as e: return json.dumps({'error': str(e)}), 'application/json', 500 elif path == '/health': return json.dumps({'status': 'ok', 'timestamp': int(time.time())}), 'application/json', 200 else: return json.dumps({'error': 'not found'}), 'application/json', 404 def _get_db_stats(self, db): """Get database statistics.""" stats = {} try: # By protocol rows = db.execute( 'SELECT proto, COUNT(*) FROM proxylist WHERE failed=0 GROUP BY proto' ).fetchall() stats['by_proto'] = {r[0]: r[1] for r in rows if r[0]} # Top countries rows = db.execute( 'SELECT country, COUNT(*) as cnt FROM proxylist WHERE failed=0 AND country IS NOT NULL ' 'GROUP BY country ORDER BY cnt DESC LIMIT 10' ).fetchall() stats['top_countries'] = [{'code': r[0], 'count': r[1]} for r in rows] # Total counts row = db.execute('SELECT COUNT(*) FROM proxylist WHERE failed=0').fetchone() stats['working'] = row[0] if row else 0 row = db.execute('SELECT COUNT(*) FROM proxylist').fetchone() stats['total'] = row[0] if row else 0 except Exception: pass return stats def run(self): ProxyAPIHandler.database = self.database ProxyAPIHandler.stats_provider = self.stats_provider if GEVENT_PATCHED: # Use gevent's WSGIServer for proper async handling self.server = WSGIServer((self.host, self.port), self._wsgi_app, log=None) _log('httpd listening on %s:%d (gevent)' % (self.host, self.port), 'info') self.server.serve_forever() else: # Standard BaseHTTPServer for non-gevent environments self.server = BaseHTTPServer.HTTPServer((self.host, self.port), ProxyAPIHandler) _log('httpd listening on %s:%d' % (self.host, self.port), 'info') self.server.serve_forever() def stop(self): if self.server: if GEVENT_PATCHED: self.server.stop() else: self.server.shutdown() if __name__ == '__main__': import sys host = '127.0.0.1' port = 8081 database = 'data/proxies.sqlite' if len(sys.argv) > 1: database = sys.argv[1] _log('starting test server on %s:%d (db: %s)' % (host, port, database), 'info') server = ProxyAPIServer(host, port, database) server.start() try: while True: time.sleep(1) except KeyboardInterrupt: server.stop() _log('server stopped', 'info')