httpd: add ASN enrichment for worker-reported proxies
All checks were successful
CI / validate (push) Successful in 20s
All checks were successful
CI / validate (push) Successful in 20s
Load pyasn database in httpd and look up ASN when workers report working proxies. Falls back to a pure-Python ipasn.dat reader when the pyasn C extension is unavailable (Python 2.7 containers). Backfills existing proxies with null ASN on startup.
This commit is contained in:
89
httpd.py
89
httpd.py
@@ -31,6 +31,56 @@ except (ImportError, IOError, ValueError):
|
||||
_geodb = None
|
||||
_geolite = False
|
||||
|
||||
# ASN lookup (optional) - try pyasn first, fall back to pure-Python reader
|
||||
_asndb = None
|
||||
_asn_dat_path = os.path.join("data", "ipasn.dat")
|
||||
try:
|
||||
import pyasn
|
||||
_asndb = pyasn.pyasn(_asn_dat_path)
|
||||
except (ImportError, IOError):
|
||||
pass
|
||||
|
||||
if _asndb is None and os.path.exists(_asn_dat_path):
|
||||
import socket
|
||||
import struct
|
||||
import bisect
|
||||
|
||||
class _AsnLookup(object):
|
||||
"""Pure-Python ASN lookup using ipasn.dat (CIDR/ASN text format)."""
|
||||
|
||||
def __init__(self, path):
|
||||
self._entries = []
|
||||
with open(path) as f:
|
||||
for line in f:
|
||||
line = line.strip()
|
||||
if not line or line.startswith(';'):
|
||||
continue
|
||||
parts = line.split('\t')
|
||||
if len(parts) != 2:
|
||||
continue
|
||||
cidr, asn = parts
|
||||
ip, prefix = cidr.split('/')
|
||||
start = struct.unpack('!I', socket.inet_aton(ip))[0]
|
||||
self._entries.append((start, int(prefix), int(asn)))
|
||||
self._entries.sort()
|
||||
_log('asn: loaded %d prefixes (pure-python)' % len(self._entries), 'info')
|
||||
|
||||
def lookup(self, ip):
|
||||
ip_int = struct.unpack('!I', socket.inet_aton(ip))[0]
|
||||
idx = bisect.bisect_right(self._entries, (ip_int, 33, 0)) - 1
|
||||
if idx < 0:
|
||||
return (None, None)
|
||||
start, prefix_len, asn = self._entries[idx]
|
||||
mask = (0xFFFFFFFF << (32 - prefix_len)) & 0xFFFFFFFF
|
||||
if (ip_int & mask) == (start & mask):
|
||||
return (asn, None)
|
||||
return (None, None)
|
||||
|
||||
try:
|
||||
_asndb = _AsnLookup(_asn_dat_path)
|
||||
except Exception as e:
|
||||
_log('asn: failed to load %s: %s' % (_asn_dat_path, e), 'warn')
|
||||
|
||||
# Rate limiting configuration
|
||||
_rate_limits = defaultdict(list)
|
||||
_rate_lock = threading.Lock()
|
||||
@@ -604,7 +654,7 @@ def submit_proxy_reports(db, worker_id, proxies):
|
||||
''', (proxy_key, ip, port, proto, now_int, now_int, latency, now_int,
|
||||
checktype, target))
|
||||
|
||||
# Geolocate if IP2Location available
|
||||
# Geolocate and ASN lookup
|
||||
if _geolite and _geodb:
|
||||
try:
|
||||
rec = _geodb.get_all(ip)
|
||||
@@ -614,6 +664,15 @@ def submit_proxy_reports(db, worker_id, proxies):
|
||||
(rec.country_short, proxy_key))
|
||||
except Exception:
|
||||
pass
|
||||
if _asndb:
|
||||
try:
|
||||
asn_result = _asndb.lookup(ip)
|
||||
if asn_result and asn_result[0]:
|
||||
db.execute(
|
||||
'UPDATE proxylist SET asn=? WHERE proxy=?',
|
||||
(asn_result[0], proxy_key))
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
# Track per-URL working count for working_ratio
|
||||
if source_url:
|
||||
@@ -1306,6 +1365,9 @@ class ProxyAPIServer(threading.Thread):
|
||||
load_static_files(THEME)
|
||||
# Load worker registry from disk
|
||||
load_workers()
|
||||
# Backfill ASN for existing proxies missing it
|
||||
if _asndb:
|
||||
self._backfill_asn()
|
||||
# Create verification tables if they don't exist
|
||||
try:
|
||||
db = mysqlite.mysqlite(self.database, str)
|
||||
@@ -1314,6 +1376,31 @@ class ProxyAPIServer(threading.Thread):
|
||||
except Exception as e:
|
||||
_log('failed to create verification tables: %s' % e, 'warn')
|
||||
|
||||
def _backfill_asn(self):
|
||||
"""One-time backfill of ASN for proxies that have ip but no ASN."""
|
||||
try:
|
||||
db = mysqlite.mysqlite(self.database, str)
|
||||
rows = db.execute(
|
||||
'SELECT proxy, ip FROM proxylist WHERE asn IS NULL AND ip IS NOT NULL'
|
||||
).fetchall()
|
||||
if not rows:
|
||||
return
|
||||
updated = 0
|
||||
for proxy_key, ip in rows:
|
||||
try:
|
||||
result = _asndb.lookup(ip)
|
||||
if result and result[0]:
|
||||
db.execute('UPDATE proxylist SET asn=? WHERE proxy=?',
|
||||
(result[0], proxy_key))
|
||||
updated += 1
|
||||
except Exception:
|
||||
pass
|
||||
db.commit()
|
||||
if updated:
|
||||
_log('asn: backfilled %d/%d proxies' % (updated, len(rows)), 'info')
|
||||
except Exception as e:
|
||||
_log('asn backfill error: %s' % e, 'warn')
|
||||
|
||||
def _wsgi_app(self, environ, start_response):
|
||||
"""WSGI application wrapper for gevent."""
|
||||
path = environ.get('PATH_INFO', '/').split('?')[0]
|
||||
|
||||
Reference in New Issue
Block a user