diff --git a/dbs.py b/dbs.py index f7d16d7..f543efa 100644 --- a/dbs.py +++ b/dbs.py @@ -366,6 +366,37 @@ def create_table_if_not_exists(sqlite, dbname): sqlite.commit() +# CDN IP ranges to filter out (not real proxies) +CDN_PREFIXES = ( + # Cloudflare + '141.101.', '172.67.', '172.64.', '104.16.', '104.17.', '104.18.', + '104.19.', '104.20.', '104.21.', '104.22.', '104.23.', '104.24.', + '104.25.', '104.26.', '104.27.', '188.114.', '190.93.', '197.234.', + '198.41.', '173.245.', '103.21.', '103.22.', '103.31.', + # Fastly + '151.101.', + # Akamai (common ranges) + '23.32.', '23.33.', '23.34.', '23.35.', '23.36.', '23.37.', + '23.38.', '23.39.', '23.40.', '23.41.', '23.42.', '23.43.', + '23.44.', '23.45.', '23.46.', '23.47.', '23.48.', '23.49.', + '23.50.', '23.51.', '23.52.', '23.53.', '23.54.', '23.55.', + '23.56.', '23.57.', '23.58.', '23.59.', '23.60.', '23.61.', + '23.62.', '23.63.', '23.64.', '23.65.', '23.66.', '23.67.', + # Amazon CloudFront + '13.32.', '13.33.', '13.35.', '13.224.', '13.225.', '13.226.', '13.227.', + # Google + '34.64.', '34.65.', '34.66.', '34.67.', '34.68.', '34.69.', '34.70.', '34.71.', +) + + +def is_cdn_ip(ip): + """Check if IP belongs to known CDN ranges.""" + for prefix in CDN_PREFIXES: + if ip.startswith(prefix): + return True + return False + + def insert_proxies(proxydb, proxies, url): """Insert new proxies into database. @@ -381,6 +412,7 @@ def insert_proxies(proxydb, proxies, url): return timestamp = int(time.time()) rows = [] + filtered = 0 for p in proxies: # Handle tuple (address, proto[, confidence]) and plain string formats confidence = 30 # Default confidence (CONFIDENCE_REGEX) @@ -407,6 +439,11 @@ def insert_proxies(proxydb, proxies, url): # IPv4: ip:port ip, port = addr_part.rsplit(':', 1) + # Filter out CDN IPs (not real proxies) + if is_cdn_ip(ip): + filtered += 1 + continue + rows.append((timestamp, addr, ip, port, proto, 1, 0, 0, 0, 0, 0, confidence)) proxydb.executemany( 'INSERT OR IGNORE INTO proxylist ' @@ -415,7 +452,10 @@ def insert_proxies(proxydb, proxies, url): rows ) proxydb.commit() - _log('+%d proxy/ies from %s' % (len(proxies), url), 'added') + if filtered: + _log('+%d proxy/ies from %s (filtered %d CDN IPs)' % (len(rows), url, filtered), 'added') + else: + _log('+%d proxy/ies from %s' % (len(rows), url), 'added') def insert_urls(urls, search, sqlite): @@ -485,8 +525,8 @@ def seed_proxy_sources(sqlite): ) if sqlite.cursor.rowcount > 0: added += 1 - except Exception: - pass + except Exception as e: + _log('seed_urls insert error for %s: %s' % (url, e), 'warn') sqlite.commit() if added > 0: _log('seeded %d proxy source URLs' % added, 'info') @@ -628,6 +668,293 @@ def get_stats_history(sqlite, hours=24): return [dict(zip(cols, row)) for row in rows] +def create_verification_tables(sqlite): + """Create tables for the verification system. + + Tables: + proxy_results: Per-test result history for disagreement detection + verification_queue: Disputed proxies awaiting manager verification + worker_trust: Worker accuracy tracking and trust scores + """ + # Per-test result history - stores last N results per proxy per worker + sqlite.execute("""CREATE TABLE IF NOT EXISTS proxy_results ( + id INTEGER PRIMARY KEY, + proxy TEXT NOT NULL, + worker_id TEXT NOT NULL, + tested INTEGER NOT NULL, + working INTEGER NOT NULL, + latency_ms INTEGER, + error_category TEXT, + target TEXT, + FOREIGN KEY (proxy) REFERENCES proxylist(proxy))""") + sqlite.execute('CREATE INDEX IF NOT EXISTS idx_results_proxy ON proxy_results(proxy, tested DESC)') + sqlite.execute('CREATE INDEX IF NOT EXISTS idx_results_worker ON proxy_results(worker_id, tested DESC)') + + # Verification queue - proxies needing manager verification + sqlite.execute("""CREATE TABLE IF NOT EXISTS verification_queue ( + proxy TEXT PRIMARY KEY, + trigger TEXT NOT NULL, + priority INTEGER DEFAULT 1, + queued_at INTEGER NOT NULL, + worker_a TEXT, + worker_b TEXT, + result_a INTEGER, + result_b INTEGER)""") + sqlite.execute('CREATE INDEX IF NOT EXISTS idx_verify_priority ON verification_queue(priority DESC, queued_at)') + + # Worker trust tracking + sqlite.execute("""CREATE TABLE IF NOT EXISTS worker_trust ( + worker_id TEXT PRIMARY KEY, + verifications INTEGER DEFAULT 0, + correct INTEGER DEFAULT 0, + incorrect INTEGER DEFAULT 0, + trust_score REAL DEFAULT 1.0, + last_updated INTEGER)""") + + sqlite.commit() + + +def insert_proxy_result(sqlite, proxy, worker_id, working, latency_ms=None, + error_category=None, target=None, max_results=10): + """Insert a proxy test result and prune old results. + + Args: + sqlite: Database connection + proxy: Proxy address (ip:port) + worker_id: Worker that tested the proxy + working: 1 if passed, 0 if failed + latency_ms: Response latency in milliseconds + error_category: Error type (timeout, refused, auth, etc.) + target: Target host that was tested + max_results: Maximum results to keep per proxy (default 10) + """ + now = int(time.time()) + sqlite.execute( + '''INSERT INTO proxy_results + (proxy, worker_id, tested, working, latency_ms, error_category, target) + VALUES (?, ?, ?, ?, ?, ?, ?)''', + (proxy, worker_id, now, working, latency_ms, error_category, target) + ) + + # Prune old results for this proxy (keep last max_results) + sqlite.execute(''' + DELETE FROM proxy_results + WHERE proxy = ? AND id NOT IN ( + SELECT id FROM proxy_results + WHERE proxy = ? + ORDER BY tested DESC + LIMIT ? + ) + ''', (proxy, proxy, max_results)) + + +def check_for_disagreement(sqlite, proxy, worker_id, working, window_seconds=300): + """Check if this result disagrees with recent results from other workers. + + Args: + sqlite: Database connection + proxy: Proxy address + worker_id: Current worker + working: Current test result (1=pass, 0=fail) + window_seconds: Time window to check (default 5 minutes) + + Returns: + tuple (disagreement_found, other_worker_id, other_result) or (False, None, None) + """ + now = int(time.time()) + cutoff = now - window_seconds + + rows = sqlite.execute(''' + SELECT worker_id, working FROM proxy_results + WHERE proxy = ? AND worker_id != ? AND tested > ? + ORDER BY tested DESC LIMIT 3 + ''', (proxy, worker_id, cutoff)).fetchall() + + for other_worker, other_result in rows: + if other_result != working: + return (True, other_worker, other_result) + + return (False, None, None) + + +def queue_verification(sqlite, proxy, trigger, priority=1, + worker_a=None, worker_b=None, result_a=None, result_b=None): + """Add a proxy to the verification queue. + + Args: + sqlite: Database connection + proxy: Proxy address + trigger: Why verification is needed (disagreement, sudden_death, resurrection, spot_check) + priority: 1=low, 2=medium, 3=high + worker_a: First worker involved (for disagreements) + worker_b: Second worker involved (for disagreements) + result_a: First worker's result + result_b: Second worker's result + """ + now = int(time.time()) + # Check if already queued with higher priority + existing = sqlite.execute( + 'SELECT priority FROM verification_queue WHERE proxy = ?', (proxy,) + ).fetchone() + + if existing and existing[0] >= priority: + return # Already queued with equal or higher priority + + sqlite.execute(''' + INSERT OR REPLACE INTO verification_queue + (proxy, trigger, priority, queued_at, worker_a, worker_b, result_a, result_b) + VALUES (?, ?, ?, ?, ?, ?, ?, ?) + ''', (proxy, trigger, priority, now, worker_a, worker_b, result_a, result_b)) + + +def get_pending_verifications(sqlite, limit=10): + """Get highest priority pending verifications. + + Args: + sqlite: Database connection + limit: Maximum number to return + + Returns: + List of dicts with proxy and verification details + """ + rows = sqlite.execute(''' + SELECT proxy, trigger, priority, queued_at, worker_a, worker_b, result_a, result_b + FROM verification_queue + ORDER BY priority DESC, queued_at ASC + LIMIT ? + ''', (limit,)).fetchall() + + cols = ['proxy', 'trigger', 'priority', 'queued_at', + 'worker_a', 'worker_b', 'result_a', 'result_b'] + return [dict(zip(cols, row)) for row in rows] + + +def remove_from_verification_queue(sqlite, proxy): + """Remove a proxy from the verification queue after verification.""" + sqlite.execute('DELETE FROM verification_queue WHERE proxy = ?', (proxy,)) + + +def update_worker_trust(sqlite, worker_id, was_correct): + """Update worker trust score after verification. + + Uses Bayesian smoothing: trust_score = (correct + 5) / (verifications + 10) + + Args: + sqlite: Database connection + worker_id: Worker to update + was_correct: True if worker's result matched verification + """ + now = int(time.time()) + + # Get current stats or initialize + row = sqlite.execute( + 'SELECT verifications, correct, incorrect FROM worker_trust WHERE worker_id = ?', + (worker_id,) + ).fetchone() + + if row: + verifications = row[0] + 1 + correct = row[1] + (1 if was_correct else 0) + incorrect = row[2] + (0 if was_correct else 1) + else: + verifications = 1 + correct = 1 if was_correct else 0 + incorrect = 0 if was_correct else 1 + + # Bayesian smoothing + trust_score = (correct + 5.0) / (verifications + 10.0) + + sqlite.execute(''' + INSERT OR REPLACE INTO worker_trust + (worker_id, verifications, correct, incorrect, trust_score, last_updated) + VALUES (?, ?, ?, ?, ?, ?) + ''', (worker_id, verifications, correct, incorrect, trust_score, now)) + + +def get_worker_trust(sqlite, worker_id): + """Get trust score for a worker. + + Args: + sqlite: Database connection + worker_id: Worker to look up + + Returns: + Dict with trust stats or default values if not found + """ + row = sqlite.execute( + 'SELECT verifications, correct, incorrect, trust_score, last_updated FROM worker_trust WHERE worker_id = ?', + (worker_id,) + ).fetchone() + + if row: + return { + 'verifications': row[0], + 'correct': row[1], + 'incorrect': row[2], + 'trust_score': row[3], + 'last_updated': row[4] + } + return { + 'verifications': 0, + 'correct': 0, + 'incorrect': 0, + 'trust_score': 1.0, # New workers start trusted + 'last_updated': None + } + + +def get_all_worker_trust(sqlite): + """Get trust scores for all workers. + + Returns: + Dict mapping worker_id to trust stats + """ + rows = sqlite.execute( + 'SELECT worker_id, verifications, correct, incorrect, trust_score, last_updated FROM worker_trust' + ).fetchall() + + result = {} + for row in rows: + result[row[0]] = { + 'verifications': row[1], + 'correct': row[2], + 'incorrect': row[3], + 'trust_score': row[4], + 'last_updated': row[5] + } + return result + + +def get_verification_stats(sqlite): + """Get verification system statistics. + + Returns: + Dict with queue_size, verified counts, disagreement counts + """ + now = int(time.time()) + today_start = (now // 86400) * 86400 + + stats = {} + + # Queue size + row = sqlite.execute('SELECT COUNT(*) FROM verification_queue').fetchone() + stats['queue_size'] = row[0] if row else 0 + + # Queue by priority + rows = sqlite.execute( + 'SELECT priority, COUNT(*) FROM verification_queue GROUP BY priority' + ).fetchall() + stats['queue_by_priority'] = {row[0]: row[1] for row in rows} + + # Queue by trigger type + rows = sqlite.execute( + 'SELECT trigger, COUNT(*) FROM verification_queue GROUP BY trigger' + ).fetchall() + stats['queue_by_trigger'] = {row[0]: row[1] for row in rows} + + return stats + + def analyze_database(sqlite): """Run ANALYZE to update SQLite query planner statistics. @@ -699,7 +1026,7 @@ def get_database_stats(sqlite): row = sqlite.execute('SELECT COUNT(*) FROM uris').fetchone() stats['uri_count'] = row[0] if row else 0 - except Exception: - pass + except Exception as e: + _log('get_database_stats error: %s' % e, 'warn') return stats