httpd: cache sqlite connections per-greenlet, lazy-load ASN, sharpen URL scoring
- threading.local() caches proxy_db and url_db per greenlet (eliminates ~2.7k redundant sqlite3.connect + PRAGMA calls per session on odin) - ASN database now lazy-loaded on first lookup (defers ~3.6s startup cost) - URL claim error penalty increased from 0.3*error(cap 2) to 0.5*error(cap 4) and stale penalty from 0.1*stale(cap 1) to 0.2*stale(cap 1.5) to reduce worker cycles wasted on erroring URLs (71% of 7,158 URLs erroring)
This commit is contained in:
181
httpd.py
181
httpd.py
@@ -31,55 +31,67 @@ except (ImportError, IOError, ValueError):
|
||||
_geodb = None
|
||||
_geolite = False
|
||||
|
||||
# ASN lookup (optional) - try pyasn first, fall back to pure-Python reader
|
||||
# ASN lookup (optional, lazy-loaded on first use)
|
||||
# Defers ~3.6s startup cost of parsing ipasn.dat until first ASN lookup.
|
||||
_asndb = None
|
||||
_asndb_loaded = False
|
||||
_asn_dat_path = os.path.join("data", "ipasn.dat")
|
||||
try:
|
||||
import pyasn
|
||||
_asndb = pyasn.pyasn(_asn_dat_path)
|
||||
except (ImportError, IOError):
|
||||
pass
|
||||
|
||||
if _asndb is None and os.path.exists(_asn_dat_path):
|
||||
import socket
|
||||
import struct
|
||||
import bisect
|
||||
import socket
|
||||
import struct
|
||||
import bisect
|
||||
|
||||
class _AsnLookup(object):
|
||||
"""Pure-Python ASN lookup using ipasn.dat (CIDR/ASN text format)."""
|
||||
|
||||
def __init__(self, path):
|
||||
self._entries = []
|
||||
with open(path) as f:
|
||||
for line in f:
|
||||
line = line.strip()
|
||||
if not line or line.startswith(';'):
|
||||
continue
|
||||
parts = line.split('\t')
|
||||
if len(parts) != 2:
|
||||
continue
|
||||
cidr, asn = parts
|
||||
ip, prefix = cidr.split('/')
|
||||
start = struct.unpack('!I', socket.inet_aton(ip))[0]
|
||||
self._entries.append((start, int(prefix), int(asn)))
|
||||
self._entries.sort()
|
||||
_log('asn: loaded %d prefixes (pure-python)' % len(self._entries), 'info')
|
||||
class _AsnLookup(object):
|
||||
"""Pure-Python ASN lookup using ipasn.dat (CIDR/ASN text format)."""
|
||||
|
||||
def lookup(self, ip):
|
||||
ip_int = struct.unpack('!I', socket.inet_aton(ip))[0]
|
||||
idx = bisect.bisect_right(self._entries, (ip_int, 33, 0)) - 1
|
||||
if idx < 0:
|
||||
return (None, None)
|
||||
start, prefix_len, asn = self._entries[idx]
|
||||
mask = (0xFFFFFFFF << (32 - prefix_len)) & 0xFFFFFFFF
|
||||
if (ip_int & mask) == (start & mask):
|
||||
return (asn, None)
|
||||
def __init__(self, path):
|
||||
self._entries = []
|
||||
with open(path) as f:
|
||||
for line in f:
|
||||
line = line.strip()
|
||||
if not line or line.startswith(';'):
|
||||
continue
|
||||
parts = line.split('\t')
|
||||
if len(parts) != 2:
|
||||
continue
|
||||
cidr, asn = parts
|
||||
ip, prefix = cidr.split('/')
|
||||
start = struct.unpack('!I', socket.inet_aton(ip))[0]
|
||||
self._entries.append((start, int(prefix), int(asn)))
|
||||
self._entries.sort()
|
||||
_log('asn: loaded %d prefixes (pure-python)' % len(self._entries), 'info')
|
||||
|
||||
def lookup(self, ip):
|
||||
ip_int = struct.unpack('!I', socket.inet_aton(ip))[0]
|
||||
idx = bisect.bisect_right(self._entries, (ip_int, 33, 0)) - 1
|
||||
if idx < 0:
|
||||
return (None, None)
|
||||
start, prefix_len, asn = self._entries[idx]
|
||||
mask = (0xFFFFFFFF << (32 - prefix_len)) & 0xFFFFFFFF
|
||||
if (ip_int & mask) == (start & mask):
|
||||
return (asn, None)
|
||||
return (None, None)
|
||||
|
||||
|
||||
def _get_asndb():
|
||||
"""Lazy-load ASN database on first call. Returns db instance or None."""
|
||||
global _asndb, _asndb_loaded
|
||||
if _asndb_loaded:
|
||||
return _asndb
|
||||
_asndb_loaded = True
|
||||
try:
|
||||
_asndb = _AsnLookup(_asn_dat_path)
|
||||
except Exception as e:
|
||||
_log('asn: failed to load %s: %s' % (_asn_dat_path, e), 'warn')
|
||||
import pyasn
|
||||
_asndb = pyasn.pyasn(_asn_dat_path)
|
||||
return _asndb
|
||||
except (ImportError, IOError):
|
||||
pass
|
||||
if os.path.exists(_asn_dat_path):
|
||||
try:
|
||||
_asndb = _AsnLookup(_asn_dat_path)
|
||||
except Exception as e:
|
||||
_log('asn: failed to load %s: %s' % (_asn_dat_path, e), 'warn')
|
||||
return _asndb
|
||||
|
||||
# Rate limiting configuration
|
||||
_rate_limits = defaultdict(list)
|
||||
@@ -157,6 +169,30 @@ _fail_retry_interval = 60 # retry interval for failing proxies
|
||||
_fail_retry_backoff = True # True=linear backoff (60,120,180...), False=fixed (60,60,60...)
|
||||
_max_fail = 5 # failures before proxy considered dead
|
||||
|
||||
# Per-greenlet (or per-thread) SQLite connection cache
|
||||
# Under gevent, threading.local() is monkey-patched to greenlet-local storage.
|
||||
# Connections are reused across requests handled by the same greenlet, eliminating
|
||||
# redundant sqlite3.connect() + PRAGMA calls (~0.5ms each, ~2.7k/session on odin).
|
||||
_local = threading.local()
|
||||
|
||||
|
||||
def _get_db(path):
|
||||
"""Get a cached SQLite connection for the proxy database."""
|
||||
db = getattr(_local, 'proxy_db', None)
|
||||
if db is None:
|
||||
db = mysqlite.mysqlite(path, str)
|
||||
_local.proxy_db = db
|
||||
return db
|
||||
|
||||
|
||||
def _get_url_db(path):
|
||||
"""Get a cached SQLite connection for the URL database."""
|
||||
db = getattr(_local, 'url_db', None)
|
||||
if db is None:
|
||||
db = mysqlite.mysqlite(path, str)
|
||||
_local.url_db = db
|
||||
return db
|
||||
|
||||
|
||||
def configure_schedule(working_checktime, fail_retry_interval, fail_retry_backoff, max_fail):
|
||||
"""Set testing schedule parameters from config."""
|
||||
@@ -357,7 +393,9 @@ def _get_proto_boost():
|
||||
are underrepresented relative to HTTP. Returns 0.0 when balanced.
|
||||
"""
|
||||
try:
|
||||
db = mysqlite.mysqlite(_proxy_database, str) if _proxy_database else None
|
||||
if not _proxy_database:
|
||||
return 0.0
|
||||
db = _get_db(_proxy_database)
|
||||
if not db:
|
||||
return 0.0
|
||||
row = db.execute(
|
||||
@@ -429,8 +467,8 @@ def claim_urls(url_db, worker_id, count=5):
|
||||
(? - check_time) * 1.0 / MAX(COALESCE(check_interval, 3600), 1)
|
||||
+ MIN(COALESCE(yield_rate, 0) / 100.0, 1.0)
|
||||
+ COALESCE(working_ratio, 0) * 0.5
|
||||
- MIN(error * 0.3, 2.0)
|
||||
- MIN(stale_count * 0.1, 1.0)
|
||||
- MIN(error * 0.5, 4.0)
|
||||
- MIN(stale_count * 0.2, 1.5)
|
||||
+ CASE WHEN LOWER(url) LIKE '%socks5%' OR LOWER(url) LIKE '%socks4%'
|
||||
THEN ? ELSE 0 END
|
||||
AS score
|
||||
@@ -616,7 +654,7 @@ def _update_url_working_ratios(url_working_counts):
|
||||
pending_snapshot = dict(_url_pending_counts)
|
||||
|
||||
try:
|
||||
url_db = mysqlite.mysqlite(_url_database_path, str)
|
||||
url_db = _get_url_db(_url_database_path)
|
||||
for url, working_count in url_working_counts.items():
|
||||
pending = pending_snapshot.get(url)
|
||||
if not pending or pending['total'] <= 0:
|
||||
@@ -637,7 +675,6 @@ def _update_url_working_ratios(url_working_counts):
|
||||
settled.append(url)
|
||||
|
||||
url_db.commit()
|
||||
url_db.close()
|
||||
except Exception as e:
|
||||
_log('_update_url_working_ratios error: %s' % e, 'error')
|
||||
|
||||
@@ -704,9 +741,10 @@ def submit_proxy_reports(db, worker_id, proxies):
|
||||
(rec.country_short, proxy_key))
|
||||
except Exception:
|
||||
pass
|
||||
if _asndb:
|
||||
asndb = _get_asndb()
|
||||
if asndb:
|
||||
try:
|
||||
asn_result = _asndb.lookup(ip)
|
||||
asn_result = asndb.lookup(ip)
|
||||
if asn_result and asn_result[0]:
|
||||
db.execute(
|
||||
'UPDATE proxylist SET asn=? WHERE proxy=?',
|
||||
@@ -1149,7 +1187,7 @@ class ProxyAPIHandler(BaseHTTPServer.BaseHTTPRequestHandler):
|
||||
def handle_countries(self):
|
||||
"""Return all countries with proxy counts."""
|
||||
try:
|
||||
db = mysqlite.mysqlite(self.database, str)
|
||||
db = _get_db(self.database)
|
||||
rows = db.execute(
|
||||
'SELECT country, COUNT(*) as c FROM proxylist WHERE failed=0 AND country IS NOT NULL '
|
||||
'GROUP BY country ORDER BY c DESC'
|
||||
@@ -1168,7 +1206,7 @@ class ProxyAPIHandler(BaseHTTPServer.BaseHTTPRequestHandler):
|
||||
def get_db_stats(self):
|
||||
"""Get statistics from database."""
|
||||
try:
|
||||
db = mysqlite.mysqlite(self.database, str)
|
||||
db = _get_db(self.database)
|
||||
stats = {}
|
||||
|
||||
# Total counts
|
||||
@@ -1216,7 +1254,7 @@ class ProxyAPIHandler(BaseHTTPServer.BaseHTTPRequestHandler):
|
||||
|
||||
# Add database stats
|
||||
try:
|
||||
db = mysqlite.mysqlite(self.database, str)
|
||||
db = _get_db(self.database)
|
||||
stats['db'] = self.get_db_stats()
|
||||
stats['db_health'] = get_db_health(db)
|
||||
except Exception as e:
|
||||
@@ -1303,7 +1341,7 @@ class ProxyAPIHandler(BaseHTTPServer.BaseHTTPRequestHandler):
|
||||
args.append(limit)
|
||||
|
||||
try:
|
||||
db = mysqlite.mysqlite(self.database, str)
|
||||
db = _get_db(self.database)
|
||||
rows = db.execute(sql, args).fetchall()
|
||||
|
||||
if fmt == 'plain':
|
||||
@@ -1352,7 +1390,7 @@ class ProxyAPIHandler(BaseHTTPServer.BaseHTTPRequestHandler):
|
||||
sql += ' ORDER BY avg_latency ASC, tested DESC'
|
||||
|
||||
try:
|
||||
db = mysqlite.mysqlite(self.database, str)
|
||||
db = _get_db(self.database)
|
||||
rows = db.execute(sql, args).fetchall()
|
||||
|
||||
if fmt == 'plain':
|
||||
@@ -1369,7 +1407,7 @@ class ProxyAPIHandler(BaseHTTPServer.BaseHTTPRequestHandler):
|
||||
|
||||
def handle_count(self):
|
||||
try:
|
||||
db = mysqlite.mysqlite(self.database, str)
|
||||
db = _get_db(self.database)
|
||||
row = db.execute('SELECT COUNT(*) FROM proxylist WHERE failed=0 AND proto IS NOT NULL').fetchone()
|
||||
self.send_json({'count': row[0] if row else 0})
|
||||
except Exception as e:
|
||||
@@ -1406,12 +1444,12 @@ class ProxyAPIServer(threading.Thread):
|
||||
load_static_files(THEME)
|
||||
# Load worker registry from disk
|
||||
load_workers()
|
||||
# Backfill ASN for existing proxies missing it
|
||||
if _asndb:
|
||||
# Backfill ASN for existing proxies missing it (triggers lazy-load)
|
||||
if _get_asndb():
|
||||
self._backfill_asn()
|
||||
# Create verification tables if they don't exist
|
||||
try:
|
||||
db = mysqlite.mysqlite(self.database, str)
|
||||
db = _get_db(self.database)
|
||||
create_verification_tables(db)
|
||||
_log('verification tables initialized', 'debug')
|
||||
except Exception as e:
|
||||
@@ -1420,7 +1458,7 @@ class ProxyAPIServer(threading.Thread):
|
||||
def _backfill_asn(self):
|
||||
"""One-time backfill of ASN for proxies that have ip but no ASN."""
|
||||
try:
|
||||
db = mysqlite.mysqlite(self.database, str)
|
||||
db = _get_db(self.database)
|
||||
rows = db.execute(
|
||||
'SELECT proxy, ip FROM proxylist WHERE asn IS NULL AND ip IS NOT NULL'
|
||||
).fetchall()
|
||||
@@ -1429,7 +1467,7 @@ class ProxyAPIServer(threading.Thread):
|
||||
updated = 0
|
||||
for proxy_key, ip in rows:
|
||||
try:
|
||||
result = _asndb.lookup(ip)
|
||||
result = _get_asndb().lookup(ip)
|
||||
if result and result[0]:
|
||||
db.execute('UPDATE proxylist SET asn=? WHERE proxy=?',
|
||||
(result[0], proxy_key))
|
||||
@@ -1600,7 +1638,7 @@ class ProxyAPIServer(threading.Thread):
|
||||
stats['system'] = get_system_stats()
|
||||
# Add database stats
|
||||
try:
|
||||
db = mysqlite.mysqlite(self.database, str)
|
||||
db = _get_db(self.database)
|
||||
stats['db'] = self._get_db_stats(db)
|
||||
stats['db_health'] = get_db_health(db)
|
||||
except Exception as e:
|
||||
@@ -1633,7 +1671,7 @@ class ProxyAPIServer(threading.Thread):
|
||||
|
||||
# 2. Database stats and health
|
||||
try:
|
||||
db = mysqlite.mysqlite(self.database, str)
|
||||
db = _get_db(self.database)
|
||||
result['stats']['db'] = self._get_db_stats(db)
|
||||
result['stats']['db_health'] = get_db_health(db)
|
||||
|
||||
@@ -1646,8 +1684,6 @@ class ProxyAPIServer(threading.Thread):
|
||||
|
||||
# 4. Workers (same as /api/workers)
|
||||
result['workers'] = self._get_workers_data(db)
|
||||
|
||||
db.close()
|
||||
except Exception as e:
|
||||
_log('api/dashboard db error: %s' % e, 'warn')
|
||||
result['countries'] = {}
|
||||
@@ -1666,7 +1702,7 @@ class ProxyAPIServer(threading.Thread):
|
||||
return json.dumps({'error': 'stats not available'}), 'application/json', 500
|
||||
elif path == '/api/countries':
|
||||
try:
|
||||
db = mysqlite.mysqlite(self.database, str)
|
||||
db = _get_db(self.database)
|
||||
rows = db.execute(
|
||||
'SELECT country, COUNT(*) as c FROM proxylist WHERE failed=0 AND country IS NOT NULL '
|
||||
'GROUP BY country ORDER BY c DESC'
|
||||
@@ -1678,7 +1714,7 @@ class ProxyAPIServer(threading.Thread):
|
||||
elif path == '/api/locations':
|
||||
# Return proxy locations aggregated by lat/lon grid (0.5 degree cells)
|
||||
try:
|
||||
db = mysqlite.mysqlite(self.database, str)
|
||||
db = _get_db(self.database)
|
||||
rows = db.execute(
|
||||
'SELECT ROUND(latitude, 1) as lat, ROUND(longitude, 1) as lon, '
|
||||
'country, anonymity, COUNT(*) as c FROM proxylist '
|
||||
@@ -1716,7 +1752,7 @@ class ProxyAPIServer(threading.Thread):
|
||||
sql += ' ORDER BY avg_latency ASC, tested DESC LIMIT ?'
|
||||
args.append(limit)
|
||||
|
||||
db = mysqlite.mysqlite(self.database, str)
|
||||
db = _get_db(self.database)
|
||||
rows = db.execute(sql, args).fetchall()
|
||||
|
||||
if fmt == 'plain':
|
||||
@@ -1755,7 +1791,7 @@ class ProxyAPIServer(threading.Thread):
|
||||
sql += ' AND mitm=1'
|
||||
sql += ' ORDER BY avg_latency ASC, tested DESC'
|
||||
|
||||
db = mysqlite.mysqlite(self.database, str)
|
||||
db = _get_db(self.database)
|
||||
rows = db.execute(sql, args).fetchall()
|
||||
|
||||
if fmt == 'plain':
|
||||
@@ -1777,7 +1813,7 @@ class ProxyAPIServer(threading.Thread):
|
||||
sql += ' AND mitm=0'
|
||||
elif mitm_filter == '1':
|
||||
sql += ' AND mitm=1'
|
||||
db = mysqlite.mysqlite(self.database, str)
|
||||
db = _get_db(self.database)
|
||||
row = db.execute(sql).fetchone()
|
||||
return json.dumps({'count': row[0] if row else 0}), 'application/json', 200
|
||||
except Exception as e:
|
||||
@@ -1833,9 +1869,8 @@ class ProxyAPIServer(threading.Thread):
|
||||
elif path == '/api/workers':
|
||||
# List connected workers
|
||||
try:
|
||||
db = mysqlite.mysqlite(self.database, str)
|
||||
db = _get_db(self.database)
|
||||
workers_data = self._get_workers_data(db)
|
||||
db.close()
|
||||
return json.dumps(workers_data, indent=2), 'application/json', 200
|
||||
except Exception as e:
|
||||
_log('api/workers error: %s' % e, 'warn')
|
||||
@@ -1853,7 +1888,7 @@ class ProxyAPIServer(threading.Thread):
|
||||
return json.dumps({'error': 'url database not configured'}), 'application/json', 500
|
||||
count = min(int(query_params.get('count', 5)), 20)
|
||||
try:
|
||||
url_db = mysqlite.mysqlite(self.url_database, str)
|
||||
url_db = _get_url_db(self.url_database)
|
||||
urls = claim_urls(url_db, worker_id, count)
|
||||
update_worker_heartbeat(worker_id)
|
||||
return json.dumps({
|
||||
@@ -1880,7 +1915,7 @@ class ProxyAPIServer(threading.Thread):
|
||||
if not reports:
|
||||
return json.dumps({'error': 'no reports provided'}), 'application/json', 400
|
||||
try:
|
||||
url_db = mysqlite.mysqlite(self.url_database, str)
|
||||
url_db = _get_url_db(self.url_database)
|
||||
processed = submit_url_reports(url_db, worker_id, reports)
|
||||
update_worker_heartbeat(worker_id)
|
||||
return json.dumps({
|
||||
@@ -1904,7 +1939,7 @@ class ProxyAPIServer(threading.Thread):
|
||||
if not proxies:
|
||||
return json.dumps({'error': 'no proxies provided'}), 'application/json', 400
|
||||
try:
|
||||
db = mysqlite.mysqlite(self.database, str)
|
||||
db = _get_db(self.database)
|
||||
processed = submit_proxy_reports(db, worker_id, proxies)
|
||||
update_worker_heartbeat(worker_id)
|
||||
return json.dumps({
|
||||
@@ -1958,7 +1993,7 @@ class ProxyAPIServer(threading.Thread):
|
||||
if not self.url_database:
|
||||
return None
|
||||
try:
|
||||
db = mysqlite.mysqlite(self.url_database, str)
|
||||
db = _get_url_db(self.url_database)
|
||||
stats = {}
|
||||
now = int(time.time())
|
||||
|
||||
|
||||
Reference in New Issue
Block a user