dbs: add CDN filtering and verification tables

- CDN_PREFIXES: filter Cloudflare, Fastly, Akamai, CloudFront, Google
- is_cdn_ip(): check if IP belongs to known CDN ranges
- insert_proxies(): skip CDN IPs with count in log message
- verification tables: worker_results, verification_queue, worker_trust
- queue_verification(): add proxies for manager re-testing
- get_verification_stats(): queue size and trigger breakdown
- get_all_worker_trust(): trust scores for all workers
This commit is contained in:
Username
2026-01-08 09:05:13 +01:00
parent 721a602dd9
commit 64b3629585

337
dbs.py
View File

@@ -366,6 +366,37 @@ def create_table_if_not_exists(sqlite, dbname):
sqlite.commit()
# CDN IP ranges to filter out (not real proxies)
CDN_PREFIXES = (
# Cloudflare
'141.101.', '172.67.', '172.64.', '104.16.', '104.17.', '104.18.',
'104.19.', '104.20.', '104.21.', '104.22.', '104.23.', '104.24.',
'104.25.', '104.26.', '104.27.', '188.114.', '190.93.', '197.234.',
'198.41.', '173.245.', '103.21.', '103.22.', '103.31.',
# Fastly
'151.101.',
# Akamai (common ranges)
'23.32.', '23.33.', '23.34.', '23.35.', '23.36.', '23.37.',
'23.38.', '23.39.', '23.40.', '23.41.', '23.42.', '23.43.',
'23.44.', '23.45.', '23.46.', '23.47.', '23.48.', '23.49.',
'23.50.', '23.51.', '23.52.', '23.53.', '23.54.', '23.55.',
'23.56.', '23.57.', '23.58.', '23.59.', '23.60.', '23.61.',
'23.62.', '23.63.', '23.64.', '23.65.', '23.66.', '23.67.',
# Amazon CloudFront
'13.32.', '13.33.', '13.35.', '13.224.', '13.225.', '13.226.', '13.227.',
# Google
'34.64.', '34.65.', '34.66.', '34.67.', '34.68.', '34.69.', '34.70.', '34.71.',
)
def is_cdn_ip(ip):
"""Check if IP belongs to known CDN ranges."""
for prefix in CDN_PREFIXES:
if ip.startswith(prefix):
return True
return False
def insert_proxies(proxydb, proxies, url):
"""Insert new proxies into database.
@@ -381,6 +412,7 @@ def insert_proxies(proxydb, proxies, url):
return
timestamp = int(time.time())
rows = []
filtered = 0
for p in proxies:
# Handle tuple (address, proto[, confidence]) and plain string formats
confidence = 30 # Default confidence (CONFIDENCE_REGEX)
@@ -407,6 +439,11 @@ def insert_proxies(proxydb, proxies, url):
# IPv4: ip:port
ip, port = addr_part.rsplit(':', 1)
# Filter out CDN IPs (not real proxies)
if is_cdn_ip(ip):
filtered += 1
continue
rows.append((timestamp, addr, ip, port, proto, 1, 0, 0, 0, 0, 0, confidence))
proxydb.executemany(
'INSERT OR IGNORE INTO proxylist '
@@ -415,7 +452,10 @@ def insert_proxies(proxydb, proxies, url):
rows
)
proxydb.commit()
_log('+%d proxy/ies from %s' % (len(proxies), url), 'added')
if filtered:
_log('+%d proxy/ies from %s (filtered %d CDN IPs)' % (len(rows), url, filtered), 'added')
else:
_log('+%d proxy/ies from %s' % (len(rows), url), 'added')
def insert_urls(urls, search, sqlite):
@@ -485,8 +525,8 @@ def seed_proxy_sources(sqlite):
)
if sqlite.cursor.rowcount > 0:
added += 1
except Exception:
pass
except Exception as e:
_log('seed_urls insert error for %s: %s' % (url, e), 'warn')
sqlite.commit()
if added > 0:
_log('seeded %d proxy source URLs' % added, 'info')
@@ -628,6 +668,293 @@ def get_stats_history(sqlite, hours=24):
return [dict(zip(cols, row)) for row in rows]
def create_verification_tables(sqlite):
"""Create tables for the verification system.
Tables:
proxy_results: Per-test result history for disagreement detection
verification_queue: Disputed proxies awaiting manager verification
worker_trust: Worker accuracy tracking and trust scores
"""
# Per-test result history - stores last N results per proxy per worker
sqlite.execute("""CREATE TABLE IF NOT EXISTS proxy_results (
id INTEGER PRIMARY KEY,
proxy TEXT NOT NULL,
worker_id TEXT NOT NULL,
tested INTEGER NOT NULL,
working INTEGER NOT NULL,
latency_ms INTEGER,
error_category TEXT,
target TEXT,
FOREIGN KEY (proxy) REFERENCES proxylist(proxy))""")
sqlite.execute('CREATE INDEX IF NOT EXISTS idx_results_proxy ON proxy_results(proxy, tested DESC)')
sqlite.execute('CREATE INDEX IF NOT EXISTS idx_results_worker ON proxy_results(worker_id, tested DESC)')
# Verification queue - proxies needing manager verification
sqlite.execute("""CREATE TABLE IF NOT EXISTS verification_queue (
proxy TEXT PRIMARY KEY,
trigger TEXT NOT NULL,
priority INTEGER DEFAULT 1,
queued_at INTEGER NOT NULL,
worker_a TEXT,
worker_b TEXT,
result_a INTEGER,
result_b INTEGER)""")
sqlite.execute('CREATE INDEX IF NOT EXISTS idx_verify_priority ON verification_queue(priority DESC, queued_at)')
# Worker trust tracking
sqlite.execute("""CREATE TABLE IF NOT EXISTS worker_trust (
worker_id TEXT PRIMARY KEY,
verifications INTEGER DEFAULT 0,
correct INTEGER DEFAULT 0,
incorrect INTEGER DEFAULT 0,
trust_score REAL DEFAULT 1.0,
last_updated INTEGER)""")
sqlite.commit()
def insert_proxy_result(sqlite, proxy, worker_id, working, latency_ms=None,
error_category=None, target=None, max_results=10):
"""Insert a proxy test result and prune old results.
Args:
sqlite: Database connection
proxy: Proxy address (ip:port)
worker_id: Worker that tested the proxy
working: 1 if passed, 0 if failed
latency_ms: Response latency in milliseconds
error_category: Error type (timeout, refused, auth, etc.)
target: Target host that was tested
max_results: Maximum results to keep per proxy (default 10)
"""
now = int(time.time())
sqlite.execute(
'''INSERT INTO proxy_results
(proxy, worker_id, tested, working, latency_ms, error_category, target)
VALUES (?, ?, ?, ?, ?, ?, ?)''',
(proxy, worker_id, now, working, latency_ms, error_category, target)
)
# Prune old results for this proxy (keep last max_results)
sqlite.execute('''
DELETE FROM proxy_results
WHERE proxy = ? AND id NOT IN (
SELECT id FROM proxy_results
WHERE proxy = ?
ORDER BY tested DESC
LIMIT ?
)
''', (proxy, proxy, max_results))
def check_for_disagreement(sqlite, proxy, worker_id, working, window_seconds=300):
"""Check if this result disagrees with recent results from other workers.
Args:
sqlite: Database connection
proxy: Proxy address
worker_id: Current worker
working: Current test result (1=pass, 0=fail)
window_seconds: Time window to check (default 5 minutes)
Returns:
tuple (disagreement_found, other_worker_id, other_result) or (False, None, None)
"""
now = int(time.time())
cutoff = now - window_seconds
rows = sqlite.execute('''
SELECT worker_id, working FROM proxy_results
WHERE proxy = ? AND worker_id != ? AND tested > ?
ORDER BY tested DESC LIMIT 3
''', (proxy, worker_id, cutoff)).fetchall()
for other_worker, other_result in rows:
if other_result != working:
return (True, other_worker, other_result)
return (False, None, None)
def queue_verification(sqlite, proxy, trigger, priority=1,
worker_a=None, worker_b=None, result_a=None, result_b=None):
"""Add a proxy to the verification queue.
Args:
sqlite: Database connection
proxy: Proxy address
trigger: Why verification is needed (disagreement, sudden_death, resurrection, spot_check)
priority: 1=low, 2=medium, 3=high
worker_a: First worker involved (for disagreements)
worker_b: Second worker involved (for disagreements)
result_a: First worker's result
result_b: Second worker's result
"""
now = int(time.time())
# Check if already queued with higher priority
existing = sqlite.execute(
'SELECT priority FROM verification_queue WHERE proxy = ?', (proxy,)
).fetchone()
if existing and existing[0] >= priority:
return # Already queued with equal or higher priority
sqlite.execute('''
INSERT OR REPLACE INTO verification_queue
(proxy, trigger, priority, queued_at, worker_a, worker_b, result_a, result_b)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
''', (proxy, trigger, priority, now, worker_a, worker_b, result_a, result_b))
def get_pending_verifications(sqlite, limit=10):
"""Get highest priority pending verifications.
Args:
sqlite: Database connection
limit: Maximum number to return
Returns:
List of dicts with proxy and verification details
"""
rows = sqlite.execute('''
SELECT proxy, trigger, priority, queued_at, worker_a, worker_b, result_a, result_b
FROM verification_queue
ORDER BY priority DESC, queued_at ASC
LIMIT ?
''', (limit,)).fetchall()
cols = ['proxy', 'trigger', 'priority', 'queued_at',
'worker_a', 'worker_b', 'result_a', 'result_b']
return [dict(zip(cols, row)) for row in rows]
def remove_from_verification_queue(sqlite, proxy):
"""Remove a proxy from the verification queue after verification."""
sqlite.execute('DELETE FROM verification_queue WHERE proxy = ?', (proxy,))
def update_worker_trust(sqlite, worker_id, was_correct):
"""Update worker trust score after verification.
Uses Bayesian smoothing: trust_score = (correct + 5) / (verifications + 10)
Args:
sqlite: Database connection
worker_id: Worker to update
was_correct: True if worker's result matched verification
"""
now = int(time.time())
# Get current stats or initialize
row = sqlite.execute(
'SELECT verifications, correct, incorrect FROM worker_trust WHERE worker_id = ?',
(worker_id,)
).fetchone()
if row:
verifications = row[0] + 1
correct = row[1] + (1 if was_correct else 0)
incorrect = row[2] + (0 if was_correct else 1)
else:
verifications = 1
correct = 1 if was_correct else 0
incorrect = 0 if was_correct else 1
# Bayesian smoothing
trust_score = (correct + 5.0) / (verifications + 10.0)
sqlite.execute('''
INSERT OR REPLACE INTO worker_trust
(worker_id, verifications, correct, incorrect, trust_score, last_updated)
VALUES (?, ?, ?, ?, ?, ?)
''', (worker_id, verifications, correct, incorrect, trust_score, now))
def get_worker_trust(sqlite, worker_id):
"""Get trust score for a worker.
Args:
sqlite: Database connection
worker_id: Worker to look up
Returns:
Dict with trust stats or default values if not found
"""
row = sqlite.execute(
'SELECT verifications, correct, incorrect, trust_score, last_updated FROM worker_trust WHERE worker_id = ?',
(worker_id,)
).fetchone()
if row:
return {
'verifications': row[0],
'correct': row[1],
'incorrect': row[2],
'trust_score': row[3],
'last_updated': row[4]
}
return {
'verifications': 0,
'correct': 0,
'incorrect': 0,
'trust_score': 1.0, # New workers start trusted
'last_updated': None
}
def get_all_worker_trust(sqlite):
"""Get trust scores for all workers.
Returns:
Dict mapping worker_id to trust stats
"""
rows = sqlite.execute(
'SELECT worker_id, verifications, correct, incorrect, trust_score, last_updated FROM worker_trust'
).fetchall()
result = {}
for row in rows:
result[row[0]] = {
'verifications': row[1],
'correct': row[2],
'incorrect': row[3],
'trust_score': row[4],
'last_updated': row[5]
}
return result
def get_verification_stats(sqlite):
"""Get verification system statistics.
Returns:
Dict with queue_size, verified counts, disagreement counts
"""
now = int(time.time())
today_start = (now // 86400) * 86400
stats = {}
# Queue size
row = sqlite.execute('SELECT COUNT(*) FROM verification_queue').fetchone()
stats['queue_size'] = row[0] if row else 0
# Queue by priority
rows = sqlite.execute(
'SELECT priority, COUNT(*) FROM verification_queue GROUP BY priority'
).fetchall()
stats['queue_by_priority'] = {row[0]: row[1] for row in rows}
# Queue by trigger type
rows = sqlite.execute(
'SELECT trigger, COUNT(*) FROM verification_queue GROUP BY trigger'
).fetchall()
stats['queue_by_trigger'] = {row[0]: row[1] for row in rows}
return stats
def analyze_database(sqlite):
"""Run ANALYZE to update SQLite query planner statistics.
@@ -699,7 +1026,7 @@ def get_database_stats(sqlite):
row = sqlite.execute('SELECT COUNT(*) FROM uris').fetchone()
stats['uri_count'] = row[0] if row else 0
except Exception:
pass
except Exception as e:
_log('get_database_stats error: %s' % e, 'warn')
return stats