diff --git a/httpd.py b/httpd.py index ece8f91..70ebc4d 100644 --- a/httpd.py +++ b/httpd.py @@ -31,55 +31,67 @@ except (ImportError, IOError, ValueError): _geodb = None _geolite = False -# ASN lookup (optional) - try pyasn first, fall back to pure-Python reader +# ASN lookup (optional, lazy-loaded on first use) +# Defers ~3.6s startup cost of parsing ipasn.dat until first ASN lookup. _asndb = None +_asndb_loaded = False _asn_dat_path = os.path.join("data", "ipasn.dat") -try: - import pyasn - _asndb = pyasn.pyasn(_asn_dat_path) -except (ImportError, IOError): - pass -if _asndb is None and os.path.exists(_asn_dat_path): - import socket - import struct - import bisect +import socket +import struct +import bisect - class _AsnLookup(object): - """Pure-Python ASN lookup using ipasn.dat (CIDR/ASN text format).""" - def __init__(self, path): - self._entries = [] - with open(path) as f: - for line in f: - line = line.strip() - if not line or line.startswith(';'): - continue - parts = line.split('\t') - if len(parts) != 2: - continue - cidr, asn = parts - ip, prefix = cidr.split('/') - start = struct.unpack('!I', socket.inet_aton(ip))[0] - self._entries.append((start, int(prefix), int(asn))) - self._entries.sort() - _log('asn: loaded %d prefixes (pure-python)' % len(self._entries), 'info') +class _AsnLookup(object): + """Pure-Python ASN lookup using ipasn.dat (CIDR/ASN text format).""" - def lookup(self, ip): - ip_int = struct.unpack('!I', socket.inet_aton(ip))[0] - idx = bisect.bisect_right(self._entries, (ip_int, 33, 0)) - 1 - if idx < 0: - return (None, None) - start, prefix_len, asn = self._entries[idx] - mask = (0xFFFFFFFF << (32 - prefix_len)) & 0xFFFFFFFF - if (ip_int & mask) == (start & mask): - return (asn, None) + def __init__(self, path): + self._entries = [] + with open(path) as f: + for line in f: + line = line.strip() + if not line or line.startswith(';'): + continue + parts = line.split('\t') + if len(parts) != 2: + continue + cidr, asn = parts + ip, prefix = cidr.split('/') + start = struct.unpack('!I', socket.inet_aton(ip))[0] + self._entries.append((start, int(prefix), int(asn))) + self._entries.sort() + _log('asn: loaded %d prefixes (pure-python)' % len(self._entries), 'info') + + def lookup(self, ip): + ip_int = struct.unpack('!I', socket.inet_aton(ip))[0] + idx = bisect.bisect_right(self._entries, (ip_int, 33, 0)) - 1 + if idx < 0: return (None, None) + start, prefix_len, asn = self._entries[idx] + mask = (0xFFFFFFFF << (32 - prefix_len)) & 0xFFFFFFFF + if (ip_int & mask) == (start & mask): + return (asn, None) + return (None, None) + +def _get_asndb(): + """Lazy-load ASN database on first call. Returns db instance or None.""" + global _asndb, _asndb_loaded + if _asndb_loaded: + return _asndb + _asndb_loaded = True try: - _asndb = _AsnLookup(_asn_dat_path) - except Exception as e: - _log('asn: failed to load %s: %s' % (_asn_dat_path, e), 'warn') + import pyasn + _asndb = pyasn.pyasn(_asn_dat_path) + return _asndb + except (ImportError, IOError): + pass + if os.path.exists(_asn_dat_path): + try: + _asndb = _AsnLookup(_asn_dat_path) + except Exception as e: + _log('asn: failed to load %s: %s' % (_asn_dat_path, e), 'warn') + return _asndb # Rate limiting configuration _rate_limits = defaultdict(list) @@ -157,6 +169,30 @@ _fail_retry_interval = 60 # retry interval for failing proxies _fail_retry_backoff = True # True=linear backoff (60,120,180...), False=fixed (60,60,60...) _max_fail = 5 # failures before proxy considered dead +# Per-greenlet (or per-thread) SQLite connection cache +# Under gevent, threading.local() is monkey-patched to greenlet-local storage. +# Connections are reused across requests handled by the same greenlet, eliminating +# redundant sqlite3.connect() + PRAGMA calls (~0.5ms each, ~2.7k/session on odin). +_local = threading.local() + + +def _get_db(path): + """Get a cached SQLite connection for the proxy database.""" + db = getattr(_local, 'proxy_db', None) + if db is None: + db = mysqlite.mysqlite(path, str) + _local.proxy_db = db + return db + + +def _get_url_db(path): + """Get a cached SQLite connection for the URL database.""" + db = getattr(_local, 'url_db', None) + if db is None: + db = mysqlite.mysqlite(path, str) + _local.url_db = db + return db + def configure_schedule(working_checktime, fail_retry_interval, fail_retry_backoff, max_fail): """Set testing schedule parameters from config.""" @@ -357,7 +393,9 @@ def _get_proto_boost(): are underrepresented relative to HTTP. Returns 0.0 when balanced. """ try: - db = mysqlite.mysqlite(_proxy_database, str) if _proxy_database else None + if not _proxy_database: + return 0.0 + db = _get_db(_proxy_database) if not db: return 0.0 row = db.execute( @@ -429,8 +467,8 @@ def claim_urls(url_db, worker_id, count=5): (? - check_time) * 1.0 / MAX(COALESCE(check_interval, 3600), 1) + MIN(COALESCE(yield_rate, 0) / 100.0, 1.0) + COALESCE(working_ratio, 0) * 0.5 - - MIN(error * 0.3, 2.0) - - MIN(stale_count * 0.1, 1.0) + - MIN(error * 0.5, 4.0) + - MIN(stale_count * 0.2, 1.5) + CASE WHEN LOWER(url) LIKE '%socks5%' OR LOWER(url) LIKE '%socks4%' THEN ? ELSE 0 END AS score @@ -616,7 +654,7 @@ def _update_url_working_ratios(url_working_counts): pending_snapshot = dict(_url_pending_counts) try: - url_db = mysqlite.mysqlite(_url_database_path, str) + url_db = _get_url_db(_url_database_path) for url, working_count in url_working_counts.items(): pending = pending_snapshot.get(url) if not pending or pending['total'] <= 0: @@ -637,7 +675,6 @@ def _update_url_working_ratios(url_working_counts): settled.append(url) url_db.commit() - url_db.close() except Exception as e: _log('_update_url_working_ratios error: %s' % e, 'error') @@ -704,9 +741,10 @@ def submit_proxy_reports(db, worker_id, proxies): (rec.country_short, proxy_key)) except Exception: pass - if _asndb: + asndb = _get_asndb() + if asndb: try: - asn_result = _asndb.lookup(ip) + asn_result = asndb.lookup(ip) if asn_result and asn_result[0]: db.execute( 'UPDATE proxylist SET asn=? WHERE proxy=?', @@ -1149,7 +1187,7 @@ class ProxyAPIHandler(BaseHTTPServer.BaseHTTPRequestHandler): def handle_countries(self): """Return all countries with proxy counts.""" try: - db = mysqlite.mysqlite(self.database, str) + db = _get_db(self.database) rows = db.execute( 'SELECT country, COUNT(*) as c FROM proxylist WHERE failed=0 AND country IS NOT NULL ' 'GROUP BY country ORDER BY c DESC' @@ -1168,7 +1206,7 @@ class ProxyAPIHandler(BaseHTTPServer.BaseHTTPRequestHandler): def get_db_stats(self): """Get statistics from database.""" try: - db = mysqlite.mysqlite(self.database, str) + db = _get_db(self.database) stats = {} # Total counts @@ -1216,7 +1254,7 @@ class ProxyAPIHandler(BaseHTTPServer.BaseHTTPRequestHandler): # Add database stats try: - db = mysqlite.mysqlite(self.database, str) + db = _get_db(self.database) stats['db'] = self.get_db_stats() stats['db_health'] = get_db_health(db) except Exception as e: @@ -1303,7 +1341,7 @@ class ProxyAPIHandler(BaseHTTPServer.BaseHTTPRequestHandler): args.append(limit) try: - db = mysqlite.mysqlite(self.database, str) + db = _get_db(self.database) rows = db.execute(sql, args).fetchall() if fmt == 'plain': @@ -1352,7 +1390,7 @@ class ProxyAPIHandler(BaseHTTPServer.BaseHTTPRequestHandler): sql += ' ORDER BY avg_latency ASC, tested DESC' try: - db = mysqlite.mysqlite(self.database, str) + db = _get_db(self.database) rows = db.execute(sql, args).fetchall() if fmt == 'plain': @@ -1369,7 +1407,7 @@ class ProxyAPIHandler(BaseHTTPServer.BaseHTTPRequestHandler): def handle_count(self): try: - db = mysqlite.mysqlite(self.database, str) + db = _get_db(self.database) row = db.execute('SELECT COUNT(*) FROM proxylist WHERE failed=0 AND proto IS NOT NULL').fetchone() self.send_json({'count': row[0] if row else 0}) except Exception as e: @@ -1406,12 +1444,12 @@ class ProxyAPIServer(threading.Thread): load_static_files(THEME) # Load worker registry from disk load_workers() - # Backfill ASN for existing proxies missing it - if _asndb: + # Backfill ASN for existing proxies missing it (triggers lazy-load) + if _get_asndb(): self._backfill_asn() # Create verification tables if they don't exist try: - db = mysqlite.mysqlite(self.database, str) + db = _get_db(self.database) create_verification_tables(db) _log('verification tables initialized', 'debug') except Exception as e: @@ -1420,7 +1458,7 @@ class ProxyAPIServer(threading.Thread): def _backfill_asn(self): """One-time backfill of ASN for proxies that have ip but no ASN.""" try: - db = mysqlite.mysqlite(self.database, str) + db = _get_db(self.database) rows = db.execute( 'SELECT proxy, ip FROM proxylist WHERE asn IS NULL AND ip IS NOT NULL' ).fetchall() @@ -1429,7 +1467,7 @@ class ProxyAPIServer(threading.Thread): updated = 0 for proxy_key, ip in rows: try: - result = _asndb.lookup(ip) + result = _get_asndb().lookup(ip) if result and result[0]: db.execute('UPDATE proxylist SET asn=? WHERE proxy=?', (result[0], proxy_key)) @@ -1600,7 +1638,7 @@ class ProxyAPIServer(threading.Thread): stats['system'] = get_system_stats() # Add database stats try: - db = mysqlite.mysqlite(self.database, str) + db = _get_db(self.database) stats['db'] = self._get_db_stats(db) stats['db_health'] = get_db_health(db) except Exception as e: @@ -1633,7 +1671,7 @@ class ProxyAPIServer(threading.Thread): # 2. Database stats and health try: - db = mysqlite.mysqlite(self.database, str) + db = _get_db(self.database) result['stats']['db'] = self._get_db_stats(db) result['stats']['db_health'] = get_db_health(db) @@ -1646,8 +1684,6 @@ class ProxyAPIServer(threading.Thread): # 4. Workers (same as /api/workers) result['workers'] = self._get_workers_data(db) - - db.close() except Exception as e: _log('api/dashboard db error: %s' % e, 'warn') result['countries'] = {} @@ -1666,7 +1702,7 @@ class ProxyAPIServer(threading.Thread): return json.dumps({'error': 'stats not available'}), 'application/json', 500 elif path == '/api/countries': try: - db = mysqlite.mysqlite(self.database, str) + db = _get_db(self.database) rows = db.execute( 'SELECT country, COUNT(*) as c FROM proxylist WHERE failed=0 AND country IS NOT NULL ' 'GROUP BY country ORDER BY c DESC' @@ -1678,7 +1714,7 @@ class ProxyAPIServer(threading.Thread): elif path == '/api/locations': # Return proxy locations aggregated by lat/lon grid (0.5 degree cells) try: - db = mysqlite.mysqlite(self.database, str) + db = _get_db(self.database) rows = db.execute( 'SELECT ROUND(latitude, 1) as lat, ROUND(longitude, 1) as lon, ' 'country, anonymity, COUNT(*) as c FROM proxylist ' @@ -1716,7 +1752,7 @@ class ProxyAPIServer(threading.Thread): sql += ' ORDER BY avg_latency ASC, tested DESC LIMIT ?' args.append(limit) - db = mysqlite.mysqlite(self.database, str) + db = _get_db(self.database) rows = db.execute(sql, args).fetchall() if fmt == 'plain': @@ -1755,7 +1791,7 @@ class ProxyAPIServer(threading.Thread): sql += ' AND mitm=1' sql += ' ORDER BY avg_latency ASC, tested DESC' - db = mysqlite.mysqlite(self.database, str) + db = _get_db(self.database) rows = db.execute(sql, args).fetchall() if fmt == 'plain': @@ -1777,7 +1813,7 @@ class ProxyAPIServer(threading.Thread): sql += ' AND mitm=0' elif mitm_filter == '1': sql += ' AND mitm=1' - db = mysqlite.mysqlite(self.database, str) + db = _get_db(self.database) row = db.execute(sql).fetchone() return json.dumps({'count': row[0] if row else 0}), 'application/json', 200 except Exception as e: @@ -1833,9 +1869,8 @@ class ProxyAPIServer(threading.Thread): elif path == '/api/workers': # List connected workers try: - db = mysqlite.mysqlite(self.database, str) + db = _get_db(self.database) workers_data = self._get_workers_data(db) - db.close() return json.dumps(workers_data, indent=2), 'application/json', 200 except Exception as e: _log('api/workers error: %s' % e, 'warn') @@ -1853,7 +1888,7 @@ class ProxyAPIServer(threading.Thread): return json.dumps({'error': 'url database not configured'}), 'application/json', 500 count = min(int(query_params.get('count', 5)), 20) try: - url_db = mysqlite.mysqlite(self.url_database, str) + url_db = _get_url_db(self.url_database) urls = claim_urls(url_db, worker_id, count) update_worker_heartbeat(worker_id) return json.dumps({ @@ -1880,7 +1915,7 @@ class ProxyAPIServer(threading.Thread): if not reports: return json.dumps({'error': 'no reports provided'}), 'application/json', 400 try: - url_db = mysqlite.mysqlite(self.url_database, str) + url_db = _get_url_db(self.url_database) processed = submit_url_reports(url_db, worker_id, reports) update_worker_heartbeat(worker_id) return json.dumps({ @@ -1904,7 +1939,7 @@ class ProxyAPIServer(threading.Thread): if not proxies: return json.dumps({'error': 'no proxies provided'}), 'application/json', 400 try: - db = mysqlite.mysqlite(self.database, str) + db = _get_db(self.database) processed = submit_proxy_reports(db, worker_id, proxies) update_worker_heartbeat(worker_id) return json.dumps({ @@ -1958,7 +1993,7 @@ class ProxyAPIServer(threading.Thread): if not self.url_database: return None try: - db = mysqlite.mysqlite(self.url_database, str) + db = _get_url_db(self.url_database) stats = {} now = int(time.time())