#!/usr/bin/env python2 # -*- coding: utf-8 -*- from __future__ import division """HTTP API server with advanced web dashboard for PPF.""" try: from ppf import __version__ except ImportError: __version__ = 'unknown' import BaseHTTPServer import json import threading import time import os import gc import sys from collections import defaultdict import mysqlite from misc import _log from dbs import (create_verification_tables, insert_proxy_result, check_for_disagreement, queue_verification, get_verification_stats, get_all_worker_trust) # IP geolocation (optional) try: import IP2Location _geodb = IP2Location.IP2Location(os.path.join("data", "IP2LOCATION-LITE-DB1.BIN")) _geolite = True except (ImportError, IOError, ValueError): _geodb = None _geolite = False # ASN lookup (optional, lazy-loaded on first use) # Defers ~3.6s startup cost of parsing ipasn.dat until first ASN lookup. _asndb = None _asndb_loaded = False _asn_dat_path = os.path.join("data", "ipasn.dat") import socket import struct import bisect class _AsnLookup(object): """Pure-Python ASN lookup using ipasn.dat (CIDR/ASN text format).""" def __init__(self, path): self._entries = [] with open(path) as f: for line in f: line = line.strip() if not line or line.startswith(';'): continue parts = line.split('\t') if len(parts) != 2: continue cidr, asn = parts ip, prefix = cidr.split('/') start = struct.unpack('!I', socket.inet_aton(ip))[0] self._entries.append((start, int(prefix), int(asn))) self._entries.sort() _log('asn: loaded %d prefixes (pure-python)' % len(self._entries), 'info') def lookup(self, ip): ip_int = struct.unpack('!I', socket.inet_aton(ip))[0] idx = bisect.bisect_right(self._entries, (ip_int, 33, 0)) - 1 if idx < 0: return (None, None) start, prefix_len, asn = self._entries[idx] mask = (0xFFFFFFFF << (32 - prefix_len)) & 0xFFFFFFFF if (ip_int & mask) == (start & mask): return (asn, None) return (None, None) def _get_asndb(): """Lazy-load ASN database on first call. Returns db instance or None.""" global _asndb, _asndb_loaded if _asndb_loaded: return _asndb _asndb_loaded = True try: import pyasn _asndb = pyasn.pyasn(_asn_dat_path) return _asndb except (ImportError, IOError): pass if os.path.exists(_asn_dat_path): try: _asndb = _AsnLookup(_asn_dat_path) except Exception as e: _log('asn: failed to load %s: %s' % (_asn_dat_path, e), 'warn') return _asndb # Rate limiting configuration _rate_limits = defaultdict(list) _rate_lock = threading.Lock() _rate_limit_requests = 300 # requests per window (increased for workers) _rate_limit_window = 60 # window in seconds # Static directories (relative to this file) _STATIC_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'static') _STATIC_LIB_DIR = os.path.join(_STATIC_DIR, 'lib') # Content type mapping for static files _CONTENT_TYPES = { '.js': 'application/javascript; charset=utf-8', '.css': 'text/css; charset=utf-8', '.json': 'application/json; charset=utf-8', '.png': 'image/png', '.svg': 'image/svg+xml', '.woff': 'font/woff', '.woff2': 'font/woff2', } # Cache for static library files (loaded once at startup) _LIB_CACHE = {} # Cache for dashboard static files (HTML, CSS, JS) _STATIC_CACHE = {} # Cache for expensive operations _db_health_cache = {'value': {}, 'time': 0} _db_health_ttl = 10 # seconds # Simple RSS tracking for dashboard _peak_rss = 0 _start_rss = 0 # Worker registry for distributed testing import hashlib import random import string _workers = {} # worker_id -> {name, ip, last_seen, jobs_completed, proxies_tested, ...} _workers_lock = threading.Lock() _worker_keys = set() # valid API keys _master_key = None # master key for worker registration _workers_file = 'data/workers.json' # persistent storage # URL claim tracking (parallel to proxy claims) _url_claims = {} # url -> {worker_id, claimed_at} _url_claims_lock = threading.Lock() _url_claim_timeout = 600 # 10 min (URLs take longer to fetch+extract) # URL scoring: pending proxy counts for working_ratio correlation _url_pending_counts = {} # url -> {total, worker_id, time} _url_pending_lock = threading.Lock() _url_database_path = None # set in ProxyAPIServer.__init__ for cross-db access # URL scoring defaults (overridden by configure_url_scoring) _url_checktime = 3600 _url_perfail_checktime = 3600 _url_max_fail = 10 _url_list_max_age_days = 7 # Test rate tracking: worker_id -> list of (timestamp, count) tuples _worker_test_history = {} _worker_test_history_lock = threading.Lock() _test_history_window = 120 # seconds to keep test history for rate calculation # Session tracking _session_start_time = int(time.time()) # when httpd started # Testing schedule configuration (set from config at startup) _working_checktime = 300 # retest interval for working proxies (failed=0) _fail_retry_interval = 60 # retry interval for failing proxies _fail_retry_backoff = True # True=linear backoff (60,120,180...), False=fixed (60,60,60...) _max_fail = 5 # failures before proxy considered dead # Per-greenlet (or per-thread) SQLite connection cache # Under gevent, threading.local() is monkey-patched to greenlet-local storage. # Connections are reused across requests handled by the same greenlet, eliminating # redundant sqlite3.connect() + PRAGMA calls (~0.5ms each, ~2.7k/session on odin). _local = threading.local() def _get_db(path): """Get a cached SQLite connection for the proxy database.""" db = getattr(_local, 'proxy_db', None) if db is None: db = mysqlite.mysqlite(path, str) _local.proxy_db = db return db def _get_url_db(path): """Get a cached SQLite connection for the URL database.""" db = getattr(_local, 'url_db', None) if db is None: db = mysqlite.mysqlite(path, str) _local.url_db = db return db def configure_schedule(working_checktime, fail_retry_interval, fail_retry_backoff, max_fail): """Set testing schedule parameters from config.""" global _working_checktime, _fail_retry_interval, _fail_retry_backoff, _max_fail _working_checktime = working_checktime _fail_retry_interval = fail_retry_interval _fail_retry_backoff = fail_retry_backoff _max_fail = max_fail def configure_url_scoring(checktime, perfail_checktime, max_fail, list_max_age_days): """Set URL scoring parameters from config.""" global _url_checktime, _url_perfail_checktime, _url_max_fail, _url_list_max_age_days _url_checktime = checktime _url_perfail_checktime = perfail_checktime _url_max_fail = max_fail _url_list_max_age_days = list_max_age_days def _build_due_condition(): """Build SQL condition for proxy due check. Returns (condition_sql, params) where condition_sql is the WHERE clause and params is a tuple of parameter values. Formula: - failed=0: tested + working_checktime < now - failed>0 with backoff: tested + (failed * fail_retry_interval) < now - failed>0 without backoff: tested + fail_retry_interval < now """ now_int = int(time.time()) if _fail_retry_backoff: # Linear backoff: multiply interval by failure count condition = ''' failed >= 0 AND failed < ? AND (tested IS NULL OR CASE WHEN failed = 0 THEN tested + ? < ? ELSE tested + (failed * ?) < ? END) ''' params = (_max_fail, _working_checktime, now_int, _fail_retry_interval, now_int) else: # Fixed interval: same delay regardless of failure count condition = ''' failed >= 0 AND failed < ? AND (tested IS NULL OR CASE WHEN failed = 0 THEN tested + ? < ? ELSE tested + ? < ? END) ''' params = (_max_fail, _working_checktime, now_int, _fail_retry_interval, now_int) return condition, params def load_workers(): """Load worker registry from disk.""" global _workers, _worker_keys try: if os.path.exists(_workers_file): with open(_workers_file, 'r') as f: data = json.load(f) _workers = data.get('workers', {}) _worker_keys = set(data.get('keys', [])) _log('loaded %d workers from %s' % (len(_workers), _workers_file), 'info') except Exception as e: _log('failed to load workers: %s' % e, 'warn') def save_workers(): """Save worker registry to disk.""" try: data = { 'workers': _workers, 'keys': list(_worker_keys), 'saved': time.time(), } with open(_workers_file, 'w') as f: json.dump(data, f, indent=2) except Exception as e: _log('failed to save workers: %s' % e, 'warn') def generate_worker_key(): """Generate a random worker API key.""" chars = string.ascii_letters + string.digits return ''.join(random.choice(chars) for _ in range(32)) def generate_worker_id(name, ip): """Generate a unique worker ID from name and IP.""" data = '%s:%s:%s' % (name, ip, time.time()) return hashlib.sha256(data.encode()).hexdigest()[:16] def register_worker(name, ip, key=None): """Register a new worker and return its credentials. If a worker with same name exists, update its IP and reset key. This allows workers to re-register after restarts. """ # Check for existing worker with same name with _workers_lock: for wid, info in _workers.items(): if info.get('name') == name: # Re-registration: update IP, generate new key, preserve stats new_key = key or generate_worker_key() _worker_keys.discard(info.get('key')) # Remove old key info['ip'] = ip info['key'] = new_key info['last_seen'] = time.time() _worker_keys.add(new_key) save_workers() return wid, new_key # New registration worker_id = generate_worker_id(name, ip) worker_key = key or generate_worker_key() with _workers_lock: _workers[worker_id] = { 'name': name, 'ip': ip, 'key': worker_key, 'registered': time.time(), 'last_seen': time.time(), 'jobs_completed': 0, 'proxies_tested': 0, 'proxies_working': 0, 'proxies_failed': 0, 'total_latency': 0, 'last_batch_size': 0, 'last_batch_working': 0, } _worker_keys.add(worker_key) save_workers() return worker_id, worker_key def validate_worker_key(key): """Check if worker key is valid.""" if not key: return False # Master key allows all operations if _master_key and key == _master_key: return True return key in _worker_keys def get_worker_by_key(key): """Get worker info by API key.""" with _workers_lock: for wid, info in _workers.items(): if info.get('key') == key: return wid, info return None, None def update_worker_heartbeat(worker_id): """Update worker's last_seen timestamp.""" with _workers_lock: if worker_id in _workers: _workers[worker_id]['last_seen'] = time.time() def record_test_rate(worker_id, count): """Record test submission for rate calculation.""" now = time.time() with _worker_test_history_lock: if worker_id not in _worker_test_history: _worker_test_history[worker_id] = [] _worker_test_history[worker_id].append((now, count)) # Prune old entries cutoff = now - _test_history_window _worker_test_history[worker_id] = [ (t, c) for t, c in _worker_test_history[worker_id] if t > cutoff ] def get_worker_test_rate(worker_id): """Calculate worker's test rate (tests/sec) over recent window.""" now = time.time() with _worker_test_history_lock: if worker_id not in _worker_test_history: return 0.0 history = _worker_test_history[worker_id] if not history: return 0.0 # Sum tests in window and calculate rate cutoff = now - _test_history_window in_window = [(t, c) for t, c in history if t > cutoff] if not in_window: return 0.0 total_tests = sum(c for t, c in in_window) oldest = min(t for t, c in in_window) elapsed = now - oldest if elapsed < 1: return 0.0 return total_tests / elapsed def _get_proto_boost(): """Calculate protocol scarcity boost for URL scoring. Returns a value 0.0-1.0 to boost SOCKS sources when SOCKS proxies are underrepresented relative to HTTP. Returns 0.0 when balanced. """ try: if not _proxy_database: return 0.0 db = _get_db(_proxy_database) if not db: return 0.0 row = db.execute( "SELECT " " SUM(CASE WHEN proto='http' THEN 1 ELSE 0 END)," " SUM(CASE WHEN proto IN ('socks4','socks5') THEN 1 ELSE 0 END)" " FROM proxylist WHERE failed=0" ).fetchone() if not row or not row[0]: return 0.5 # no data, default mild boost http_count, socks_count = row[0] or 0, row[1] or 0 total = http_count + socks_count if total == 0: return 0.5 socks_ratio = float(socks_count) / total # Boost SOCKS sources when socks_ratio < 40% if socks_ratio >= 0.4: return 0.0 return min((0.4 - socks_ratio) * 2.5, 1.0) # 0.0-1.0 scale except Exception: return 0.0 # Global reference to proxy database path (set by ProxyAPIServer.__init__) _proxy_database = None def claim_urls(url_db, worker_id, count=5): """Claim a batch of URLs for worker-driven fetching. Returns list of URL dicts. Uses score-based scheduling: high-yield URLs checked more often, stale/broken ones less. Score components: - age/interval: 1.0 when due, >1.0 when overdue - yield_bonus: capped at 1.0 for high-yield sources - quality_bonus: 0-0.5 based on working_ratio - error_penalty: 0-2.0 based on consecutive errors - stale_penalty: 0-1.0 based on unchanged fetches - proto_boost: 0-1.0 for SOCKS sources when SOCKS underrepresented """ now = time.time() now_int = int(now) # Import here to avoid circular dependency at module level try: from fetch import detect_proto_from_path except ImportError: detect_proto_from_path = None # Clean expired URL claims and pending counts with _url_claims_lock: stale = [k for k, v in _url_claims.items() if now - v['claimed_at'] > _url_claim_timeout] for k in stale: del _url_claims[k] claimed_urls = set(_url_claims.keys()) with _url_pending_lock: stale_pending = [k for k, v in _url_pending_counts.items() if now - v['time'] > 600] for k in stale_pending: del _url_pending_counts[k] list_max_age_seconds = _url_list_max_age_days * 86400 min_added = now_int - list_max_age_seconds # Boost SOCKS sources when protocol pool is imbalanced proto_boost = _get_proto_boost() try: rows = url_db.execute( '''SELECT url, content_hash, (? - check_time) * 1.0 / MAX(COALESCE(check_interval, 3600), 1) + MIN(COALESCE(yield_rate, 0) / 100.0, 1.0) + COALESCE(working_ratio, 0) * 0.5 - MIN(error * 0.5, 4.0) - MIN(stale_count * 0.2, 1.5) + CASE WHEN LOWER(url) LIKE '%socks5%' OR LOWER(url) LIKE '%socks4%' THEN ? ELSE 0 END AS score FROM uris WHERE error < ? AND (? - check_time) * 1.0 / MAX(COALESCE(check_interval, 3600), 1) >= 0.8 AND (added > ? OR proxies_added > 0) ORDER BY score DESC LIMIT ?''', (now_int, proto_boost, _url_max_fail, now_int, min_added, count * 3) ).fetchall() except Exception as e: _log('claim_urls query error: %s' % e, 'error') return [] # Filter out already-claimed URLs and lock new claims claimed = [] with _url_claims_lock: for row in rows: url = row[0] if url in _url_claims or url in claimed_urls: continue _url_claims[url] = {'worker_id': worker_id, 'claimed_at': now} proto_hint = None if detect_proto_from_path: proto_hint = detect_proto_from_path(url) claimed.append({ 'url': url, 'last_hash': row[1], 'proto_hint': proto_hint, }) if len(claimed) >= count: break if claimed: _log('claim_urls: %d URLs to %s' % (len(claimed), worker_id[:8]), 'info') return claimed def submit_url_reports(url_db, worker_id, reports): """Process URL fetch feedback from workers. Returns count of processed reports. Updates EMA metrics per URL: - avg_fetch_time: exponential moving average of fetch latency - check_interval: adaptive interval (shrinks for productive URLs, grows for stale) - yield_rate: EMA of proxy count per fetch (on changed content) - last_worker: worker that last fetched this URL Stores pending proxy count for working_ratio correlation in submit_proxy_reports. """ processed = 0 now_int = int(time.time()) alpha = 0.3 # EMA smoothing factor for r in reports: url = r.get('url', '') if not url: continue # Release URL claim with _url_claims_lock: if url in _url_claims: del _url_claims[url] try: success = r.get('success', False) content_hash = r.get('content_hash') proxy_count = r.get('proxy_count', 0) changed = r.get('changed', False) fetch_time_ms = r.get('fetch_time_ms', 0) # Fetch current row for EMA computation row = url_db.execute( '''SELECT check_interval, avg_fetch_time, yield_rate FROM uris WHERE url = ?''', (url,) ).fetchone() if not row: processed += 1 continue old_interval = row[0] if row[0] is not None else 3600 old_fetch_time = row[1] if row[1] is not None else 0 old_yield = row[2] if row[2] is not None else 0.0 # EMA: avg_fetch_time if old_fetch_time > 0 and fetch_time_ms > 0: new_fetch_time = int(alpha * fetch_time_ms + (1 - alpha) * old_fetch_time) elif fetch_time_ms > 0: new_fetch_time = fetch_time_ms else: new_fetch_time = old_fetch_time if success: if changed and proxy_count > 0: # Success + changed + proxies: converge interval toward 15min new_interval = max(900, int(old_interval * 0.9)) # EMA: yield_rate new_yield = alpha * proxy_count + (1 - alpha) * old_yield url_db.execute( '''UPDATE uris SET check_time = ?, retrievals = retrievals + 1, error = 0, stale_count = 0, content_hash = ?, proxies_added = proxies_added + ?, check_interval = ?, avg_fetch_time = ?, yield_rate = ?, last_worker = ? WHERE url = ?''', (now_int, content_hash, proxy_count, new_interval, new_fetch_time, new_yield, worker_id, url) ) # Store pending count for working_ratio correlation with _url_pending_lock: _url_pending_counts[url] = { 'total': proxy_count, 'worker_id': worker_id, 'time': time.time(), } else: # Success + unchanged (or no proxies): drift interval toward 24h new_interval = min(86400, int(old_interval * 1.25)) url_db.execute( '''UPDATE uris SET check_time = ?, retrievals = retrievals + 1, error = 0, stale_count = stale_count + 1, content_hash = ?, check_interval = ?, avg_fetch_time = ?, last_worker = ? WHERE url = ?''', (now_int, content_hash, new_interval, new_fetch_time, worker_id, url) ) else: # Failure: back off faster new_interval = min(86400, int(old_interval * 1.5)) url_db.execute( '''UPDATE uris SET check_time = ?, error = error + 1, check_interval = ?, avg_fetch_time = ?, last_worker = ? WHERE url = ?''', (now_int, new_interval, new_fetch_time, worker_id, url) ) processed += 1 except Exception as e: _log('submit_url_reports error for %s: %s' % (url, e), 'error') url_db.commit() return processed def _update_url_working_ratios(url_working_counts): """Correlate working proxy counts with pending totals to update working_ratio. Called after submit_proxy_reports processes all proxies. For each source_url with a pending entry from submit_url_reports, computes: ratio = working_count / pending_total working_ratio = alpha * ratio + (1 - alpha) * old_working_ratio """ if not url_working_counts or not _url_database_path: return alpha = 0.3 settled = [] with _url_pending_lock: pending_snapshot = dict(_url_pending_counts) try: url_db = _get_url_db(_url_database_path) for url, working_count in url_working_counts.items(): pending = pending_snapshot.get(url) if not pending or pending['total'] <= 0: continue ratio = min(float(working_count) / pending['total'], 1.0) row = url_db.execute( 'SELECT working_ratio FROM uris WHERE url = ?', (url,) ).fetchone() old_ratio = row[0] if row and row[0] is not None else 0.0 new_ratio = alpha * ratio + (1 - alpha) * old_ratio url_db.execute( 'UPDATE uris SET working_ratio = ? WHERE url = ?', (new_ratio, url) ) settled.append(url) url_db.commit() except Exception as e: _log('_update_url_working_ratios error: %s' % e, 'error') # Remove settled entries from pending if settled: with _url_pending_lock: for url in settled: _url_pending_counts.pop(url, None) def submit_proxy_reports(db, worker_id, proxies): """Process working-proxy reports from workers. Returns count of processed proxies. Simplified trust-based model: workers report only working proxies. Each proxy is upserted with failed=0, last_seen=now, latency updated. Also tracks per-URL working counts for working_ratio correlation. """ global _last_workers_save processed = 0 now_int = int(time.time()) now = time.time() url_working_counts = {} # source_url -> working count for p in proxies: ip = p.get('ip', '') port = p.get('port', 0) if not ip or not port: continue proxy_key = '%s:%s' % (ip, port) proto = p.get('proto', 'http') latency = p.get('latency', 0) source_url = p.get('source_url') checktype = p.get('checktype', '') target = p.get('target', '') try: # Upsert: insert new proxy or update existing as working db.execute(''' INSERT INTO proxylist (proxy, ip, port, proto, failed, tested, added, avg_latency, last_seen, success_count, consecutive_success, last_check, last_target) VALUES (?, ?, ?, ?, 0, ?, ?, ?, ?, 1, 1, ?, ?) ON CONFLICT(proxy) DO UPDATE SET failed = 0, tested = excluded.tested, proto = excluded.proto, avg_latency = excluded.avg_latency, last_seen = excluded.last_seen, success_count = COALESCE(success_count, 0) + 1, consecutive_success = COALESCE(consecutive_success, 0) + 1, last_check = excluded.last_check, last_target = excluded.last_target ''', (proxy_key, ip, port, proto, now_int, now_int, latency, now_int, checktype, target)) # Geolocate and ASN lookup if _geolite and _geodb: try: rec = _geodb.get_all(ip) if rec and rec.country_short and rec.country_short != '-': db.execute( 'UPDATE proxylist SET country=? WHERE proxy=?', (rec.country_short, proxy_key)) except Exception: pass asndb = _get_asndb() if asndb: try: asn_result = asndb.lookup(ip) if asn_result and asn_result[0]: db.execute( 'UPDATE proxylist SET asn=? WHERE proxy=?', (asn_result[0], proxy_key)) except Exception: pass # Track per-URL working count for working_ratio if source_url: url_working_counts[source_url] = url_working_counts.get(source_url, 0) + 1 processed += 1 except Exception as e: _log('submit_proxy_reports error for %s: %s' % (proxy_key, e), 'error') # Commit database changes db.commit() # Update working_ratio for source URLs if url_working_counts: _update_url_working_ratios(url_working_counts) # Update worker stats with _workers_lock: if worker_id in _workers: w = _workers[worker_id] w['proxies_working'] = w.get('proxies_working', 0) + processed w['last_seen'] = now # Save workers periodically if now - _last_workers_save > 60: save_workers() _last_workers_save = now return processed def is_localhost(ip): """Check if IP is localhost (127.0.0.0/8 or ::1).""" if not ip: return False # IPv6 localhost if ip == '::1': return True # IPv4 localhost (127.0.0.0/8) if ip.startswith('127.'): return True return False def check_rate_limit(ip): """Check if IP is within rate limit. Returns True if allowed.""" if is_localhost(ip): return True # No rate limit for localhost now = time.time() with _rate_lock: # Clean old entries _rate_limits[ip] = [t for t in _rate_limits[ip] if now - t < _rate_limit_window] if len(_rate_limits[ip]) >= _rate_limit_requests: return False _rate_limits[ip].append(now) return True def get_security_headers(content_type): """Return security headers for responses.""" headers = [ ('X-Content-Type-Options', 'nosniff'), ('X-Frame-Options', 'DENY'), ('Referrer-Policy', 'strict-origin-when-cross-origin'), ] # Add CSP for HTML pages if 'text/html' in content_type: headers.append(( 'Content-Security-Policy', "default-src 'self'; " "script-src 'self' 'unsafe-inline'; " "style-src 'self' 'unsafe-inline'; " "img-src 'self' data: https:; " "connect-src 'self'" )) return headers def get_system_stats(): """Collect system resource statistics.""" stats = {} # Load average (1, 5, 15 min) try: load = os.getloadavg() stats['load_1m'] = round(load[0], 2) stats['load_5m'] = round(load[1], 2) stats['load_15m'] = round(load[2], 2) except (OSError, AttributeError): stats['load_1m'] = stats['load_5m'] = stats['load_15m'] = 0 # CPU count try: stats['cpu_count'] = os.sysconf('SC_NPROCESSORS_ONLN') except (ValueError, OSError, AttributeError): stats['cpu_count'] = 1 # Memory from /proc/meminfo (Linux) try: with open('/proc/meminfo', 'r') as f: meminfo = {} for line in f: parts = line.split() if len(parts) >= 2: meminfo[parts[0].rstrip(':')] = int(parts[1]) * 1024 # KB to bytes total = meminfo.get('MemTotal', 0) available = meminfo.get('MemAvailable', meminfo.get('MemFree', 0)) stats['mem_total'] = total stats['mem_available'] = available stats['mem_used'] = total - available stats['mem_pct'] = round((total - available) / total * 100, 1) if total > 0 else 0 except (IOError, KeyError, ZeroDivisionError): stats['mem_total'] = stats['mem_available'] = stats['mem_used'] = 0 stats['mem_pct'] = 0 # Disk usage for data directory try: st = os.statvfs('data' if os.path.exists('data') else '.') total = st.f_blocks * st.f_frsize free = st.f_bavail * st.f_frsize used = total - free stats['disk_total'] = total stats['disk_free'] = free stats['disk_used'] = used stats['disk_pct'] = round(used / total * 100, 1) if total > 0 else 0 except (OSError, ZeroDivisionError): stats['disk_total'] = stats['disk_free'] = stats['disk_used'] = 0 stats['disk_pct'] = 0 # Process stats from /proc/self/status try: with open('/proc/self/status', 'r') as f: for line in f: if line.startswith('VmRSS:'): stats['proc_rss'] = int(line.split()[1]) * 1024 # KB to bytes elif line.startswith('Threads:'): stats['proc_threads'] = int(line.split()[1]) except (IOError, ValueError, IndexError): stats['proc_rss'] = 0 stats['proc_threads'] = 0 # Simple RSS tracking for dashboard global _peak_rss, _start_rss rss = stats.get('proc_rss', 0) if rss > 0: if _start_rss == 0: _start_rss = rss if rss > _peak_rss: _peak_rss = rss stats['proc_rss_peak'] = _peak_rss stats['proc_rss_start'] = _start_rss stats['proc_rss_growth'] = rss - _start_rss if _start_rss > 0 else 0 # GC generation counts (fast - no object iteration) try: gc_counts = gc.get_count() stats['gc_count_gen0'] = gc_counts[0] stats['gc_count_gen1'] = gc_counts[1] stats['gc_count_gen2'] = gc_counts[2] except Exception: stats['gc_count_gen0'] = stats['gc_count_gen1'] = stats['gc_count_gen2'] = 0 return stats def get_db_health(db): """Get database health and statistics (cached).""" global _db_health_cache now = time.time() if now - _db_health_cache['time'] < _db_health_ttl: return _db_health_cache['value'] stats = {} try: # Database file size db_path = db.path if hasattr(db, 'path') else 'data/proxies.sqlite' if os.path.exists(db_path): stats['db_size'] = os.path.getsize(db_path) else: stats['db_size'] = 0 # Page stats from pragma row = db.execute('PRAGMA page_count').fetchone() stats['page_count'] = row[0] if row else 0 row = db.execute('PRAGMA page_size').fetchone() stats['page_size'] = row[0] if row else 0 row = db.execute('PRAGMA freelist_count').fetchone() stats['freelist_count'] = row[0] if row else 0 # Anonymity breakdown rows = db.execute( 'SELECT anonymity, COUNT(*) FROM proxylist WHERE failed=0 GROUP BY anonymity' ).fetchall() stats['anonymity'] = {r[0] or 'unknown': r[1] for r in rows} # Latency stats row = db.execute( 'SELECT AVG(avg_latency), MIN(avg_latency), MAX(avg_latency) ' 'FROM proxylist WHERE failed=0 AND avg_latency > 0' ).fetchone() if row and row[0]: stats['db_avg_latency'] = round(row[0], 1) stats['db_min_latency'] = round(row[1], 1) stats['db_max_latency'] = round(row[2], 1) else: stats['db_avg_latency'] = stats['db_min_latency'] = stats['db_max_latency'] = 0 # Recent activity now = int(time.time()) row = db.execute( 'SELECT COUNT(*) FROM proxylist WHERE tested > ?', (now - 3600,) ).fetchone() stats['tested_last_hour'] = row[0] if row else 0 row = db.execute( 'SELECT COUNT(*) FROM proxylist WHERE added > ?', (now - 86400,) ).fetchone() stats['added_last_day'] = row[0] if row else 0 # Dead proxies count (permanently dead = -1, failing = positive) row = db.execute( 'SELECT COUNT(*) FROM proxylist WHERE failed = -1' ).fetchone() stats['dead_count'] = row[0] if row else 0 # Failing proxies count (positive fail count but not permanently dead) row = db.execute( 'SELECT COUNT(*) FROM proxylist WHERE failed > 0' ).fetchone() stats['failing_count'] = row[0] if row else 0 except Exception as e: _log('get_db_health error: %s' % e, 'warn') # Update cache _db_health_cache['value'] = stats _db_health_cache['time'] = time.time() return stats # Detect if gevent has monkey-patched the environment try: from gevent import monkey GEVENT_PATCHED = monkey.is_module_patched('socket') except ImportError: GEVENT_PATCHED = False if GEVENT_PATCHED: from gevent.pywsgi import WSGIServer def load_static_libs(): """Load static library files into cache at startup.""" global _LIB_CACHE if not os.path.isdir(_STATIC_LIB_DIR): _log('static/lib directory not found: %s' % _STATIC_LIB_DIR, 'warn') return for fname in os.listdir(_STATIC_LIB_DIR): fpath = os.path.join(_STATIC_LIB_DIR, fname) if os.path.isfile(fpath): try: with open(fpath, 'rb') as f: _LIB_CACHE[fname] = f.read() _log('loaded static lib: %s (%d bytes)' % (fname, len(_LIB_CACHE[fname])), 'debug') except IOError as e: _log('failed to load %s: %s' % (fname, e), 'warn') _log('loaded %d static library files' % len(_LIB_CACHE), 'info') def get_static_lib(filename): """Get a cached static library file.""" return _LIB_CACHE.get(filename) def load_static_files(theme): """Load dashboard static files into cache at startup. Args: theme: dict of color name -> color value for CSS variable substitution """ global _STATIC_CACHE files = { 'dashboard.html': 'static/dashboard.html', 'map.html': 'static/map.html', 'mitm.html': 'static/mitm.html', 'workers.html': 'static/workers.html', 'style.css': 'static/style.css', 'dashboard.js': 'static/dashboard.js', 'map.js': 'static/map.js', 'mitm.js': 'static/mitm.js', } for key, relpath in files.items(): fpath = os.path.join(os.path.dirname(os.path.abspath(__file__)), relpath) if os.path.isfile(fpath): try: with open(fpath, 'rb') as f: content = f.read() # Apply theme substitution to CSS if key == 'style.css' and theme: for name, val in theme.items(): content = content.replace('{' + name + '}', val) _STATIC_CACHE[key] = content _log('loaded static file: %s (%d bytes)' % (key, len(content)), 'debug') except IOError as e: _log('failed to load %s: %s' % (fpath, e), 'warn') else: _log('static file not found: %s' % fpath, 'warn') _log('loaded %d dashboard static files' % len(_STATIC_CACHE), 'info') def get_static_file(filename): """Get a cached dashboard static file.""" return _STATIC_CACHE.get(filename) # Theme colors - dark tiles on lighter background THEME = { 'bg': '#1e2738', 'card': '#181f2a', 'card_alt': '#212a36', 'border': '#3a4858', 'text': '#e8eef5', 'dim': '#8b929b', 'green': '#3fb950', 'red': '#f85149', 'yellow': '#d29922', 'blue': '#58a6ff', 'purple': '#a371f7', 'cyan': '#39c5cf', 'orange': '#db6d28', 'pink': '#db61a2', 'map_bg': '#1e2738', # Match dashboard background } class ProxyAPIHandler(BaseHTTPServer.BaseHTTPRequestHandler): """HTTP request handler for proxy API.""" database = None stats_provider = None def log_message(self, format, *args): pass def send_response_body(self, body, content_type, status=200): self.send_response(status) self.send_header('Content-Type', content_type) self.send_header('Content-Length', len(body)) self.send_header('Cache-Control', 'no-cache') # Add security headers for header, value in get_security_headers(content_type): self.send_header(header, value) self.end_headers() self.wfile.write(body) def send_json(self, data, status=200): self.send_response_body(json.dumps(data, indent=2), 'application/json', status) def send_text(self, text, status=200): self.send_response_body(text, 'text/plain', status) def send_html(self, html, status=200): self.send_response_body(html, 'text/html; charset=utf-8', status) def send_css(self, css, status=200): self.send_response_body(css, 'text/css; charset=utf-8', status) def send_js(self, js, status=200): self.send_response_body(js, 'application/javascript; charset=utf-8', status) def send_download(self, body, content_type, filename): """Send response with Content-Disposition for download.""" self.send_response(200) self.send_header('Content-Type', content_type) self.send_header('Content-Length', len(body)) self.send_header('Content-Disposition', 'attachment; filename="%s"' % filename) for header, value in get_security_headers(content_type): self.send_header(header, value) self.end_headers() self.wfile.write(body) def do_GET(self): # Rate limiting check client_ip = self.client_address[0] if self.client_address else '' if not check_rate_limit(client_ip): self.send_response(429) self.send_header('Content-Type', 'application/json') self.send_header('Retry-After', str(_rate_limit_window)) for header, value in get_security_headers('application/json'): self.send_header(header, value) self.end_headers() self.wfile.write(json.dumps({'error': 'rate limited', 'retry_after': _rate_limit_window})) return path = self.path.split('?')[0] routes = { '/': self.handle_index, '/dashboard': self.handle_dashboard, '/map': self.handle_map, '/static/style.css': self.handle_css, '/static/dashboard.js': self.handle_js, '/api/stats': self.handle_stats, '/api/stats/export': self.handle_stats_export, '/api/countries': self.handle_countries, '/proxies': self.handle_proxies, '/proxies/all': self.handle_proxies_all, '/proxies/count': self.handle_count, '/health': self.handle_health, } handler = routes.get(path) if handler: handler() else: self.send_json({'error': 'not found'}, 404) def handle_index(self): self.send_json({ 'endpoints': { '/dashboard': 'web dashboard (HTML)', '/api/stats': 'runtime statistics (JSON)', '/api/stats/export': 'export stats (params: format=json|csv)', '/proxies': 'list working proxies (params: limit, proto, country, asn)', '/proxies/all': 'list ALL working proxies (params: proto, country, asn, format)', '/proxies/count': 'count working proxies', '/health': 'health check', } }) def handle_dashboard(self): self.send_html(DASHBOARD_HTML) def handle_map(self): self.send_html(MAP_HTML) def handle_countries(self): """Return all countries with proxy counts.""" try: db = _get_db(self.database) rows = db.execute( 'SELECT country, COUNT(*) as c FROM proxylist WHERE failed=0 AND country IS NOT NULL ' 'GROUP BY country ORDER BY c DESC' ).fetchall() countries = {r[0]: r[1] for r in rows} self.send_json({'countries': countries}) except Exception as e: self.send_json({'error': str(e)}, 500) def handle_css(self): self.send_css(DASHBOARD_CSS) def handle_js(self): self.send_js(DASHBOARD_JS) def get_db_stats(self): """Get statistics from database.""" try: db = _get_db(self.database) stats = {} # Total counts row = db.execute('SELECT COUNT(*) FROM proxylist WHERE failed=0 AND proto IS NOT NULL').fetchone() stats['working'] = row[0] if row else 0 row = db.execute('SELECT COUNT(*) FROM proxylist').fetchone() stats['total'] = row[0] if row else 0 # By protocol rows = db.execute( 'SELECT proto, COUNT(*) FROM proxylist WHERE failed=0 GROUP BY proto' ).fetchall() stats['by_proto'] = {r[0] or 'unknown': r[1] for r in rows} # Top countries rows = db.execute( 'SELECT country, COUNT(*) as c FROM proxylist WHERE failed=0 AND country IS NOT NULL ' 'GROUP BY country ORDER BY c DESC LIMIT 10' ).fetchall() stats['top_countries'] = [{'code': r[0], 'count': r[1]} for r in rows] # Top ASNs rows = db.execute( 'SELECT asn, COUNT(*) as c FROM proxylist WHERE failed=0 AND asn IS NOT NULL ' 'GROUP BY asn ORDER BY c DESC LIMIT 10' ).fetchall() stats['top_asns'] = [(r[0], r[1]) for r in rows] return stats except Exception as e: return {'error': str(e)} def handle_stats(self): stats = {} # Runtime stats from provider if self.stats_provider: try: stats.update(self.stats_provider()) except Exception as e: _log('stats_provider error: %s' % str(e), 'error') # Add system stats stats['system'] = get_system_stats() # Add database stats try: db = _get_db(self.database) stats['db'] = self.get_db_stats() stats['db_health'] = get_db_health(db) except Exception as e: _log('handle_stats db error: %s' % e, 'warn') self.send_json(stats) def handle_stats_export(self): """Export stats as JSON or CSV with download header.""" params = {} if '?' in self.path: for pair in self.path.split('?')[1].split('&'): if '=' in pair: k, v = pair.split('=', 1) params[k] = v fmt = params.get('format', 'json').lower() timestamp = time.strftime('%Y%m%d_%H%M%S') # Gather stats stats = {} if self.stats_provider: try: stats.update(self.stats_provider()) except Exception as e: _log('stats_provider error in export: %s' % e, 'warn') stats['system'] = get_system_stats() stats['exported_at'] = time.strftime('%Y-%m-%d %H:%M:%S') if fmt == 'csv': # Flatten stats to CSV rows lines = ['key,value'] def flatten(obj, prefix=''): if isinstance(obj, dict): for k, v in sorted(obj.items()): flatten(v, '%s%s.' % (prefix, k) if prefix else '%s.' % k) elif isinstance(obj, (list, tuple)): for i, v in enumerate(obj): flatten(v, '%s%d.' % (prefix, i)) else: key = prefix.rstrip('.') val = str(obj).replace('"', '""') lines.append('"%s","%s"' % (key, val)) flatten(stats) body = '\n'.join(lines) self.send_download(body, 'text/csv', 'ppf_stats_%s.csv' % timestamp) else: body = json.dumps(stats, indent=2) self.send_download(body, 'application/json', 'ppf_stats_%s.json' % timestamp) def handle_proxies(self): params = {} if '?' in self.path: for pair in self.path.split('?')[1].split('&'): if '=' in pair: k, v = pair.split('=', 1) params[k] = v limit = min(int(params.get('limit', 100)), 1000) proto = params.get('proto', '') country = params.get('country', '') asn = params.get('asn', '') mitm_filter = params.get('mitm', '') fmt = params.get('format', 'json') sql = 'SELECT ip, port, proto, country, asn, avg_latency, protos_working FROM proxylist WHERE failed=0 AND proto IS NOT NULL AND last_seen >= strftime("%s","now") - 3600' args = [] if proto: sql += ' AND proto=?' args.append(proto) if country: sql += ' AND country=?' args.append(country.upper()) if asn: sql += ' AND asn=?' args.append(int(asn)) if mitm_filter == '0': sql += ' AND mitm=0' elif mitm_filter == '1': sql += ' AND mitm=1' sql += ' ORDER BY avg_latency ASC, tested DESC LIMIT ?' args.append(limit) try: db = _get_db(self.database) rows = db.execute(sql, args).fetchall() if fmt == 'plain': self.send_text('\n'.join('%s:%s' % (r[0], r[1]) for r in rows)) else: proxies = [{ 'ip': r[0], 'port': r[1], 'proto': r[2], 'country': r[3], 'asn': r[4], 'latency': r[5], 'protos': r[6].split(',') if r[6] else [r[2]] } for r in rows] self.send_json({'count': len(proxies), 'proxies': proxies}) except Exception as e: self.send_json({'error': str(e)}, 500) def handle_proxies_all(self): params = {} if '?' in self.path: for pair in self.path.split('?')[1].split('&'): if '=' in pair: k, v = pair.split('=', 1) params[k] = v proto = params.get('proto', '') country = params.get('country', '') asn = params.get('asn', '') mitm_filter = params.get('mitm', '') fmt = params.get('format', 'json') sql = 'SELECT ip, port, proto, country, asn, avg_latency, protos_working FROM proxylist WHERE failed=0 AND proto IS NOT NULL AND last_seen >= strftime("%s","now") - 3600' args = [] if proto: sql += ' AND proto=?' args.append(proto) if country: sql += ' AND country=?' args.append(country.upper()) if asn: sql += ' AND asn=?' args.append(int(asn)) if mitm_filter == '0': sql += ' AND mitm=0' elif mitm_filter == '1': sql += ' AND mitm=1' sql += ' ORDER BY avg_latency ASC, tested DESC' try: db = _get_db(self.database) rows = db.execute(sql, args).fetchall() if fmt == 'plain': self.send_text('\n'.join('%s://%s:%s' % (r[2] or 'http', r[0], r[1]) for r in rows)) else: proxies = [{ 'ip': r[0], 'port': r[1], 'proto': r[2], 'country': r[3], 'asn': r[4], 'latency': r[5], 'protos': r[6].split(',') if r[6] else [r[2]] } for r in rows] self.send_json({'count': len(proxies), 'proxies': proxies}) except Exception as e: self.send_json({'error': str(e)}, 500) def handle_count(self): try: db = _get_db(self.database) row = db.execute('SELECT COUNT(*) FROM proxylist WHERE failed=0 AND proto IS NOT NULL').fetchone() self.send_json({'count': row[0] if row else 0}) except Exception as e: self.send_json({'error': str(e)}, 500) def handle_health(self): self.send_json({'status': 'ok', 'version': __version__, 'timestamp': int(time.time())}) class ProxyAPIServer(threading.Thread): """Threaded HTTP API server. Uses gevent's WSGIServer when running in a gevent-patched environment, otherwise falls back to standard BaseHTTPServer. """ def __init__(self, host, port, database, stats_provider=None, profiling=False, url_database=None): threading.Thread.__init__(self) self.host = host self.port = port self.database = database self.url_database = url_database self.stats_provider = stats_provider self.profiling = profiling self.daemon = True global _url_database_path, _proxy_database _url_database_path = url_database _proxy_database = database self.server = None self._stop_event = threading.Event() if not GEVENT_PATCHED else None # Load static library files into cache load_static_libs() # Load dashboard static files (HTML, CSS, JS) with theme substitution load_static_files(THEME) # Load worker registry from disk load_workers() # Backfill ASN for existing proxies missing it (triggers lazy-load) if _get_asndb(): self._backfill_asn() # Create verification tables if they don't exist try: db = _get_db(self.database) create_verification_tables(db) _log('verification tables initialized', 'debug') except Exception as e: _log('failed to create verification tables: %s' % e, 'warn') def _backfill_asn(self): """One-time backfill of ASN for proxies that have ip but no ASN.""" try: db = _get_db(self.database) rows = db.execute( 'SELECT proxy, ip FROM proxylist WHERE asn IS NULL AND ip IS NOT NULL' ).fetchall() if not rows: return updated = 0 for proxy_key, ip in rows: try: result = _get_asndb().lookup(ip) if result and result[0]: db.execute('UPDATE proxylist SET asn=? WHERE proxy=?', (result[0], proxy_key)) updated += 1 except Exception: pass db.commit() if updated: _log('asn: backfilled %d/%d proxies' % (updated, len(rows)), 'info') except Exception as e: _log('asn backfill error: %s' % e, 'warn') def _wsgi_app(self, environ, start_response): """WSGI application wrapper for gevent.""" path = environ.get('PATH_INFO', '/').split('?')[0] query_string = environ.get('QUERY_STRING', '') method = environ.get('REQUEST_METHOD', 'GET') remote_addr = environ.get('REMOTE_ADDR', '') # Parse query parameters query_params = {} if query_string: for param in query_string.split('&'): if '=' in param: k, v = param.split('=', 1) query_params[k] = v # Only allow GET and POST if method not in ('GET', 'POST'): start_response('405 Method Not Allowed', [('Content-Type', 'text/plain')]) return [b'Method not allowed'] # POST only allowed for worker API endpoints post_endpoints = ('/api/register', '/api/heartbeat', '/api/report-urls', '/api/report-proxies') if method == 'POST' and path not in post_endpoints: start_response('405 Method Not Allowed', [('Content-Type', 'text/plain')]) return [b'POST not allowed for this endpoint'] # Parse POST body post_data = None if method == 'POST': try: content_length = int(environ.get('CONTENT_LENGTH', 0)) if content_length > 0: body = environ['wsgi.input'].read(content_length) post_data = json.loads(body) except Exception: error_body = json.dumps({'error': 'invalid JSON body'}) headers = [('Content-Type', 'application/json')] start_response('400 Bad Request', headers) return [error_body.encode('utf-8')] # Rate limiting check if not check_rate_limit(remote_addr): error_body = json.dumps({'error': 'rate limited', 'retry_after': _rate_limit_window}) headers = [ ('Content-Type', 'application/json'), ('Content-Length', str(len(error_body))), ('Retry-After', str(_rate_limit_window)), ] headers.extend(get_security_headers('application/json')) start_response('429 Too Many Requests', headers) return [error_body] # Route handling try: response_body, content_type, status = self._handle_route(path, remote_addr, query_params, post_data) status_line = '%d %s' % (status, 'OK' if status == 200 else 'Error') headers = [ ('Content-Type', content_type), ('Content-Length', str(len(response_body))), ('Cache-Control', 'no-cache'), ] headers.extend(get_security_headers(content_type)) start_response(status_line, headers) return [response_body.encode('utf-8') if isinstance(response_body, unicode) else response_body] except Exception as e: error_body = json.dumps({'error': str(e)}) headers = [ ('Content-Type', 'application/json'), ('Content-Length', str(len(error_body))), ] headers.extend(get_security_headers('application/json')) start_response('500 Internal Server Error', headers) return [error_body] def _handle_route(self, path, remote_addr='', query_params=None, post_data=None): """Handle route and return (body, content_type, status).""" if query_params is None: query_params = {} if path == '/': body = json.dumps({ 'endpoints': { '/dashboard': 'web dashboard (HTML)', '/map': 'proxy distribution by country (HTML)', '/mitm': 'MITM certificate search (HTML)', '/api/dashboard': 'batch endpoint: stats + workers + countries (JSON)', '/api/stats': 'runtime statistics (JSON)', '/api/mitm': 'MITM certificate statistics (JSON)', '/api/countries': 'proxy counts by country (JSON)', '/api/register': 'register as worker (POST)', '/api/workers': 'list connected workers', '/api/claim-urls': 'claim URL batch for worker-driven fetching (GET, params: key, count)', '/api/report-urls': 'report URL fetch results (POST, params: key)', '/api/report-proxies': 'report working proxies (POST, params: key)', '/proxies': 'list working proxies (params: limit, proto, country, asn)', '/proxies/all': 'list ALL working proxies (params: proto, country, asn, format)', '/proxies/count': 'count working proxies', '/health': 'health check', } }, indent=2) return body, 'application/json', 200 elif path == '/dashboard': content = get_static_file('dashboard.html') if content: return content, 'text/html; charset=utf-8', 200 return '{"error": "dashboard.html not loaded"}', 'application/json', 500 elif path == '/map': content = get_static_file('map.html') if content: return content, 'text/html; charset=utf-8', 200 return '{"error": "map.html not loaded"}', 'application/json', 500 elif path == '/mitm': content = get_static_file('mitm.html') if content: return content, 'text/html; charset=utf-8', 200 return '{"error": "mitm.html not loaded"}', 'application/json', 500 elif path == '/workers': content = get_static_file('workers.html') if content: return content, 'text/html; charset=utf-8', 200 return '{"error": "workers.html not loaded"}', 'application/json', 500 elif path == '/static/style.css': content = get_static_file('style.css') if content: return content, 'text/css; charset=utf-8', 200 return '{"error": "style.css not loaded"}', 'application/json', 500 elif path == '/static/dashboard.js': content = get_static_file('dashboard.js') if content: return content, 'application/javascript; charset=utf-8', 200 return '{"error": "dashboard.js not loaded"}', 'application/json', 500 elif path == '/static/map.js': content = get_static_file('map.js') if content: return content, 'application/javascript; charset=utf-8', 200 return '{"error": "map.js not loaded"}', 'application/json', 500 elif path == '/static/mitm.js': content = get_static_file('mitm.js') if content: return content, 'application/javascript; charset=utf-8', 200 return '{"error": "mitm.js not loaded"}', 'application/json', 500 elif path.startswith('/static/lib/'): # Serve static library files from cache filename = path.split('/')[-1] content = get_static_lib(filename) if content: ext = os.path.splitext(filename)[1] content_type = _CONTENT_TYPES.get(ext, 'application/octet-stream') return content, content_type, 200 return '{"error": "not found"}', 'application/json', 404 elif path == '/api/stats': stats = {} if self.stats_provider: stats = self.stats_provider() # Add system stats stats['system'] = get_system_stats() # Add database stats try: db = _get_db(self.database) stats['db'] = self._get_db_stats(db) stats['db_health'] = get_db_health(db) except Exception as e: _log('api/stats db error: %s' % e, 'warn') # Add URL pipeline stats url_stats = self._get_url_stats() if url_stats is not None: stats['urls'] = url_stats # Add profiling flag (from constructor or stats_provider) if 'profiling' not in stats: stats['profiling'] = self.profiling return json.dumps(stats, indent=2), 'application/json', 200 elif path == '/api/dashboard': # Batch endpoint combining stats + workers + countries # Reduces RTT for dashboard - single request instead of multiple result = {} # 1. Runtime stats (same as /api/stats) stats = {} if self.stats_provider: try: stats = self.stats_provider() except Exception as e: _log('api/dashboard stats_provider error: %s' % e, 'warn') stats['system'] = get_system_stats() if 'profiling' not in stats: stats['profiling'] = self.profiling result['stats'] = stats # 2. Database stats and health try: db = _get_db(self.database) result['stats']['db'] = self._get_db_stats(db) result['stats']['db_health'] = get_db_health(db) # 3. Countries (same as /api/countries) rows = db.execute( 'SELECT country, COUNT(*) as c FROM proxylist WHERE failed=0 AND country IS NOT NULL ' 'GROUP BY country ORDER BY c DESC' ).fetchall() result['countries'] = {r[0]: r[1] for r in rows} # 4. Workers (same as /api/workers) result['workers'] = self._get_workers_data(db) except Exception as e: _log('api/dashboard db error: %s' % e, 'warn') result['countries'] = {} result['workers'] = {'workers': [], 'total': 0, 'active': 0} return json.dumps(result, indent=2), 'application/json', 200 elif path == '/api/mitm': # MITM certificate statistics if self.stats_provider: try: stats = self.stats_provider() mitm = stats.get('mitm', {}) return json.dumps(mitm, indent=2), 'application/json', 200 except Exception as e: return json.dumps({'error': str(e)}), 'application/json', 500 return json.dumps({'error': 'stats not available'}), 'application/json', 500 elif path == '/api/countries': try: db = _get_db(self.database) rows = db.execute( 'SELECT country, COUNT(*) as c FROM proxylist WHERE failed=0 AND country IS NOT NULL ' 'GROUP BY country ORDER BY c DESC' ).fetchall() countries = {r[0]: r[1] for r in rows} return json.dumps({'countries': countries}, indent=2), 'application/json', 200 except Exception as e: return json.dumps({'error': str(e)}), 'application/json', 500 elif path == '/api/locations': # Return proxy locations aggregated by lat/lon grid (0.5 degree cells) try: db = _get_db(self.database) rows = db.execute( 'SELECT ROUND(latitude, 1) as lat, ROUND(longitude, 1) as lon, ' 'country, anonymity, COUNT(*) as c FROM proxylist ' 'WHERE failed=0 AND latitude IS NOT NULL AND longitude IS NOT NULL ' 'GROUP BY lat, lon, country, anonymity ORDER BY c DESC' ).fetchall() locations = [{'lat': r[0], 'lon': r[1], 'country': r[2], 'anon': r[3] or 'unknown', 'count': r[4]} for r in rows] return json.dumps({'locations': locations}, indent=2), 'application/json', 200 except Exception as e: return json.dumps({'error': str(e)}), 'application/json', 500 elif path == '/proxies': try: limit = min(int(query_params.get('limit', 100)), 1000) proto = query_params.get('proto', '') country = query_params.get('country', '') asn = query_params.get('asn', '') mitm_filter = query_params.get('mitm', '') fmt = query_params.get('format', 'json') sql = 'SELECT ip, port, proto, country, asn, avg_latency, protos_working, mitm FROM proxylist WHERE failed=0 AND proto IS NOT NULL AND last_seen >= strftime("%s","now") - 3600' args = [] if proto: sql += ' AND proto=?' args.append(proto) if country: sql += ' AND country=?' args.append(country.upper()) if asn: sql += ' AND asn=?' args.append(int(asn)) if mitm_filter == '0': sql += ' AND mitm=0' elif mitm_filter == '1': sql += ' AND mitm=1' sql += ' ORDER BY avg_latency ASC, tested DESC LIMIT ?' args.append(limit) db = _get_db(self.database) rows = db.execute(sql, args).fetchall() if fmt == 'plain': return '\n'.join('%s://%s:%s' % (r[2] or 'http', r[0], r[1]) for r in rows), 'text/plain', 200 proxies = [{ 'proxy': '%s:%s' % (r[0], r[1]), 'proto': r[2], 'country': r[3], 'asn': r[4], 'latency': r[5], 'protos': r[6].split(',') if r[6] else [r[2]], 'mitm': bool(r[7]), } for r in rows] return json.dumps({'count': len(proxies), 'proxies': proxies}, indent=2), 'application/json', 200 except Exception as e: return json.dumps({'error': str(e)}), 'application/json', 500 elif path == '/proxies/all': try: proto = query_params.get('proto', '') country = query_params.get('country', '') asn = query_params.get('asn', '') mitm_filter = query_params.get('mitm', '') fmt = query_params.get('format', 'json') sql = 'SELECT ip, port, proto, country, asn, avg_latency, protos_working, mitm FROM proxylist WHERE failed=0 AND proto IS NOT NULL AND last_seen >= strftime("%s","now") - 3600' args = [] if proto: sql += ' AND proto=?' args.append(proto) if country: sql += ' AND country=?' args.append(country.upper()) if asn: sql += ' AND asn=?' args.append(int(asn)) if mitm_filter == '0': sql += ' AND mitm=0' elif mitm_filter == '1': sql += ' AND mitm=1' sql += ' ORDER BY avg_latency ASC, tested DESC' db = _get_db(self.database) rows = db.execute(sql, args).fetchall() if fmt == 'plain': return '\n'.join('%s://%s:%s' % (r[2] or 'http', r[0], r[1]) for r in rows), 'text/plain', 200 proxies = [{ 'proxy': '%s:%s' % (r[0], r[1]), 'proto': r[2], 'country': r[3], 'asn': r[4], 'latency': r[5], 'protos': r[6].split(',') if r[6] else [r[2]], 'mitm': bool(r[7]), } for r in rows] return json.dumps({'count': len(proxies), 'proxies': proxies}, indent=2), 'application/json', 200 except Exception as e: return json.dumps({'error': str(e)}), 'application/json', 500 elif path == '/proxies/count': try: mitm_filter = query_params.get('mitm', '') sql = 'SELECT COUNT(*) FROM proxylist WHERE failed=0 AND proto IS NOT NULL AND last_seen >= strftime("%s","now") - 3600' if mitm_filter == '0': sql += ' AND mitm=0' elif mitm_filter == '1': sql += ' AND mitm=1' db = _get_db(self.database) row = db.execute(sql).fetchone() return json.dumps({'count': row[0] if row else 0}), 'application/json', 200 except Exception as e: return json.dumps({'error': str(e)}), 'application/json', 500 elif path == '/health': return json.dumps({'status': 'ok', 'version': __version__, 'timestamp': int(time.time())}), 'application/json', 200 # Worker API endpoints elif path == '/api/register': # Register a new worker (POST) if not post_data: return json.dumps({'error': 'POST body required'}), 'application/json', 400 name = post_data.get('name', 'worker-%s' % remote_addr) master_key = post_data.get('master_key', '') # Require master key for registration (if set) if _master_key and master_key != _master_key: return json.dumps({'error': 'invalid master key'}), 'application/json', 403 worker_id, worker_key = register_worker(name, remote_addr) _log('worker registered: %s (%s) from %s' % (name, worker_id, remote_addr), 'info') return json.dumps({ 'worker_id': worker_id, 'worker_key': worker_key, 'message': 'registered successfully', }), 'application/json', 200 elif path == '/api/heartbeat': # Worker heartbeat with Tor status (POST) key = query_params.get('key', '') if not validate_worker_key(key): return json.dumps({'error': 'invalid worker key'}), 'application/json', 403 worker_id, _ = get_worker_by_key(key) if not worker_id: return json.dumps({'error': 'worker not found'}), 'application/json', 404 # Update worker Tor, profiling, and thread status tor_ok = post_data.get('tor_ok', True) if post_data else True tor_ip = post_data.get('tor_ip') if post_data else None profiling = post_data.get('profiling', False) if post_data else False threads = post_data.get('threads', 0) if post_data else 0 now = time.time() with _workers_lock: if worker_id in _workers: _workers[worker_id]['last_seen'] = now _workers[worker_id]['tor_ok'] = tor_ok _workers[worker_id]['tor_ip'] = tor_ip _workers[worker_id]['tor_last_check'] = now _workers[worker_id]['profiling'] = profiling _workers[worker_id]['threads'] = threads return json.dumps({ 'worker_id': worker_id, 'message': 'heartbeat received', }), 'application/json', 200 elif path == '/api/workers': # List connected workers try: db = _get_db(self.database) workers_data = self._get_workers_data(db) return json.dumps(workers_data, indent=2), 'application/json', 200 except Exception as e: _log('api/workers error: %s' % e, 'warn') return json.dumps({'error': str(e)}), 'application/json', 500 elif path == '/api/claim-urls': # Worker claims batch of URLs for fetching (GET) key = query_params.get('key', '') if not validate_worker_key(key): return json.dumps({'error': 'invalid worker key'}), 'application/json', 403 worker_id, _ = get_worker_by_key(key) if not worker_id: return json.dumps({'error': 'worker not found'}), 'application/json', 404 if not self.url_database: return json.dumps({'error': 'url database not configured'}), 'application/json', 500 count = min(int(query_params.get('count', 5)), 20) try: url_db = _get_url_db(self.url_database) urls = claim_urls(url_db, worker_id, count) update_worker_heartbeat(worker_id) return json.dumps({ 'worker_id': worker_id, 'count': len(urls), 'urls': urls, }), 'application/json', 200 except Exception as e: return json.dumps({'error': str(e)}), 'application/json', 500 elif path == '/api/report-urls': # Worker reports URL fetch results (POST) key = query_params.get('key', '') if not validate_worker_key(key): return json.dumps({'error': 'invalid worker key'}), 'application/json', 403 worker_id, _ = get_worker_by_key(key) if not worker_id: return json.dumps({'error': 'worker not found'}), 'application/json', 404 if not self.url_database: return json.dumps({'error': 'url database not configured'}), 'application/json', 500 if not post_data: return json.dumps({'error': 'POST body required'}), 'application/json', 400 reports = post_data.get('reports', []) if not reports: return json.dumps({'error': 'no reports provided'}), 'application/json', 400 try: url_db = _get_url_db(self.url_database) processed = submit_url_reports(url_db, worker_id, reports) update_worker_heartbeat(worker_id) return json.dumps({ 'worker_id': worker_id, 'processed': processed, }), 'application/json', 200 except Exception as e: return json.dumps({'error': str(e)}), 'application/json', 500 elif path == '/api/report-proxies': # Worker reports working proxies (POST) key = query_params.get('key', '') if not validate_worker_key(key): return json.dumps({'error': 'invalid worker key'}), 'application/json', 403 worker_id, _ = get_worker_by_key(key) if not worker_id: return json.dumps({'error': 'worker not found'}), 'application/json', 404 if not post_data: return json.dumps({'error': 'POST body required'}), 'application/json', 400 proxies = post_data.get('proxies', []) if not proxies: return json.dumps({'error': 'no proxies provided'}), 'application/json', 400 try: db = _get_db(self.database) processed = submit_proxy_reports(db, worker_id, proxies) update_worker_heartbeat(worker_id) return json.dumps({ 'worker_id': worker_id, 'processed': processed, }), 'application/json', 200 except Exception as e: _log('report-proxies error from %s: %s' % (worker_id, e), 'error') return json.dumps({'error': str(e)}), 'application/json', 500 else: return json.dumps({'error': 'not found'}), 'application/json', 404 def _get_db_stats(self, db): """Get database statistics.""" stats = {} try: # By protocol rows = db.execute( 'SELECT proto, COUNT(*) FROM proxylist WHERE failed=0 GROUP BY proto' ).fetchall() stats['by_proto'] = {r[0]: r[1] for r in rows if r[0]} # Top countries rows = db.execute( 'SELECT country, COUNT(*) as cnt FROM proxylist WHERE failed=0 AND country IS NOT NULL ' 'GROUP BY country ORDER BY cnt DESC LIMIT 10' ).fetchall() stats['top_countries'] = [{'code': r[0], 'count': r[1]} for r in rows] # Total counts row = db.execute('SELECT COUNT(*) FROM proxylist WHERE failed=0 AND proto IS NOT NULL').fetchone() stats['working'] = row[0] if row else 0 row = db.execute('SELECT COUNT(*) FROM proxylist').fetchone() stats['total'] = row[0] if row else 0 # Proxies due for testing (eligible for worker claims) due_condition, due_params = _build_due_condition() row = db.execute( 'SELECT COUNT(*) FROM proxylist WHERE ' + due_condition, due_params).fetchone() due_total = row[0] if row else 0 stats['due'] = due_total stats['claimed'] = 0 except Exception as e: _log('_get_db_stats error: %s' % e, 'warn') return stats def _get_url_stats(self): """Get URL pipeline statistics from the websites database.""" if not self.url_database: return None try: db = _get_url_db(self.url_database) stats = {} now = int(time.time()) # Total URLs and health breakdown row = db.execute('SELECT COUNT(*) FROM uris').fetchone() stats['total'] = row[0] if row else 0 row = db.execute('SELECT COUNT(*) FROM uris WHERE error >= 10').fetchone() stats['dead'] = row[0] if row else 0 row = db.execute('SELECT COUNT(*) FROM uris WHERE error > 0 AND error < 10').fetchone() stats['erroring'] = row[0] if row else 0 row = db.execute('SELECT COUNT(*) FROM uris WHERE error = 0').fetchone() stats['healthy'] = row[0] if row else 0 # Recently active (fetched in last hour) row = db.execute( 'SELECT COUNT(*) FROM uris WHERE check_time >= ?', (now - 3600,)).fetchone() stats['fetched_last_hour'] = row[0] if row else 0 # Productive sources (have produced working proxies) row = db.execute( 'SELECT COUNT(*) FROM uris WHERE working_ratio > 0' ).fetchone() stats['productive'] = row[0] if row else 0 # Aggregate yield row = db.execute( 'SELECT SUM(proxies_added), SUM(retrievals) FROM uris' ).fetchone() stats['total_proxies_extracted'] = row[0] or 0 if row else 0 stats['total_fetches'] = row[1] or 0 if row else 0 # Currently claimed with _url_claims_lock: stats['claimed'] = len(_url_claims) # Top sources by working_ratio (productive URLs only) rows = db.execute( 'SELECT url, working_ratio, yield_rate, proxies_added, retrievals ' 'FROM uris WHERE working_ratio > 0 AND retrievals > 0 ' 'ORDER BY working_ratio DESC LIMIT 10' ).fetchall() stats['top_sources'] = [{ 'url': r[0], 'working_ratio': round(r[1], 3), 'yield_rate': round(r[2], 1), 'proxies_added': r[3], 'fetches': r[4], } for r in rows] return stats except Exception as e: _log('_get_url_stats error: %s' % e, 'warn') return None def _get_workers_data(self, db): """Get worker status data. Used by /api/workers and /api/dashboard.""" now = time.time() with _workers_lock: workers = [] total_tested = 0 total_working = 0 total_failed = 0 for wid, info in _workers.items(): tested = info.get('proxies_tested', 0) working = info.get('proxies_working', 0) failed = info.get('proxies_failed', 0) total_latency = info.get('total_latency', 0) avg_latency = round(total_latency / working, 2) if working > 0 else 0 success_rate = round(100 * working / tested, 1) if tested > 0 else 0 total_tested += tested total_working += working total_failed += failed # Tor status with age check tor_ok = info.get('tor_ok', True) tor_ip = info.get('tor_ip') tor_last_check = info.get('tor_last_check', 0) tor_age = int(now - tor_last_check) if tor_last_check else None worker_profiling = info.get('profiling', False) worker_threads = info.get('threads', 0) # Calculate test rate for this worker test_rate = get_worker_test_rate(wid) workers.append({ 'id': wid, 'name': info['name'], 'ip': info['ip'], 'registered': int(info['registered']), 'last_seen': int(info['last_seen']), 'age': int(now - info['last_seen']), 'jobs_completed': info.get('jobs_completed', 0), 'proxies_tested': tested, 'proxies_working': working, 'proxies_failed': failed, 'success_rate': success_rate, 'avg_latency': avg_latency, 'last_batch_size': info.get('last_batch_size', 0), 'last_batch_working': info.get('last_batch_working', 0), 'active': (now - info['last_seen']) < 120, 'test_rate': round(test_rate, 2), 'tor_ok': tor_ok, 'tor_ip': tor_ip, 'tor_age': tor_age, 'profiling': worker_profiling, 'threads': worker_threads, }) # Sort by name for consistent display workers.sort(key=lambda w: w['name']) # Get queue status from database queue_stats = { 'pending': 0, 'claimed': 0, 'due': 0, 'total': 0, 'untested': 0, 'session_tested': 0, 'session_pct': 0, 'session_start': _session_start_time } try: # Total proxies in database row = db.execute('SELECT COUNT(*) FROM proxylist').fetchone() queue_stats['total'] = row[0] if row else 0 # Never tested (tested IS NULL) row = db.execute('SELECT COUNT(*) FROM proxylist WHERE tested IS NULL').fetchone() queue_stats['untested'] = row[0] if row else 0 # Tested this session (since httpd started) row = db.execute('SELECT COUNT(*) FROM proxylist WHERE tested >= ?', (_session_start_time,)).fetchone() queue_stats['session_tested'] = row[0] if row else 0 # Pending = total eligible (not dead) row = db.execute('SELECT COUNT(*) FROM proxylist WHERE failed >= 0 AND failed < 5').fetchone() queue_stats['pending'] = row[0] if row else 0 # Session progress percentage (tested / total, capped at 100%) if queue_stats['total'] > 0: pct = 100.0 * queue_stats['session_tested'] / queue_stats['total'] queue_stats['session_pct'] = round(min(pct, 100.0), 1) queue_stats['claimed'] = 0 # Due = ready for testing (respecting cooldown) due_condition, due_params = _build_due_condition() row = db.execute( 'SELECT COUNT(*) FROM proxylist WHERE ' + due_condition, due_params).fetchone() queue_stats['due'] = row[0] if row else 0 except Exception as e: _log('_get_workers_data queue stats error: %s' % e, 'warn') # Calculate combined test rate from active workers combined_rate = sum(w['test_rate'] for w in workers if w['active']) # Get manager's own stats (local proxy testing) manager_stats = None if self.stats_provider: try: stats = self.stats_provider() threads = stats.get('threads', 0) if threads > 0: # Manager has local testing enabled manager_stats = { 'threads': threads, 'tested': stats.get('tested', 0), 'passed': stats.get('passed', 0), 'rate': round(stats.get('recent_rate', 0), 2), 'success_rate': round(stats.get('success_rate', 0), 1), 'uptime': stats.get('uptime_seconds', 0), 'queue_size': stats.get('queue_size', 0), } except Exception as e: _log('_get_workers_data manager stats error: %s' % e, 'warn') # Get verification stats and worker trust verification_stats = {'queue_size': 0, 'queue_by_trigger': {}} worker_trust_data = {} try: verification_stats = get_verification_stats(db) worker_trust_data = get_all_worker_trust(db) except Exception as e: _log('_get_workers_data verification stats error: %s' % e, 'warn') # Add trust scores to workers for w in workers: wid = w['id'] if wid in worker_trust_data: trust = worker_trust_data[wid] w['trust_score'] = round(trust['trust_score'], 2) w['verifications'] = trust['verifications'] w['trust_correct'] = trust['correct'] w['trust_incorrect'] = trust['incorrect'] else: w['trust_score'] = 1.0 # Default for new workers w['verifications'] = 0 w['trust_correct'] = 0 w['trust_incorrect'] = 0 return { 'workers': workers, 'total': len(workers), 'active': sum(1 for w in workers if w['active']), 'manager': manager_stats, 'summary': { 'total_tested': total_tested, 'total_working': total_working, 'total_failed': total_failed, 'overall_success_rate': round(100 * total_working / total_tested, 1) if total_tested > 0 else 0, 'combined_rate': round(combined_rate, 2), }, 'queue': queue_stats, 'verification': verification_stats, } def run(self): ProxyAPIHandler.database = self.database ProxyAPIHandler.stats_provider = self.stats_provider if GEVENT_PATCHED: # Use gevent's WSGIServer for proper async handling self.server = WSGIServer((self.host, self.port), self._wsgi_app, log=None) _log('httpd listening on %s:%d (gevent)' % (self.host, self.port), 'info') self.server.serve_forever() else: # Standard BaseHTTPServer for non-gevent environments self.server = BaseHTTPServer.HTTPServer((self.host, self.port), ProxyAPIHandler) _log('httpd listening on %s:%d' % (self.host, self.port), 'info') self.server.serve_forever() def stop(self): if self.server: if GEVENT_PATCHED: self.server.stop() else: self.server.shutdown() if __name__ == '__main__': import sys host = '127.0.0.1' port = 8081 database = 'data/proxies.sqlite' if len(sys.argv) > 1: database = sys.argv[1] _log('starting test server on %s:%d (db: %s)' % (host, port, database), 'info') server = ProxyAPIServer(host, port, database) server.start() try: while True: time.sleep(1) except KeyboardInterrupt: server.stop() _log('server stopped', 'info')