#!/usr/bin/env python2 from __future__ import division # Gevent monkey-patching MUST happen before any other imports # This patches threading, socket, time, etc. for cooperative concurrency from gevent import monkey monkey.patch_all() import gevent from gevent.pool import Pool as GreenPool from gevent.queue import Queue as GreenQueue from gevent.event import Event as GreenEvent from gevent.lock import Semaphore as GreenLock import threading # Now patched by gevent import time import random import string import re import heapq import signal import os try: import Queue except ImportError: import queue as Queue try: import IP2Location geodb = IP2Location.IP2Location(os.path.join("data", "IP2LOCATION-LITE-DB1.BIN")) geolite = True except (ImportError, IOError, ValueError): geolite = False try: import pyasn asndb = pyasn.pyasn(os.path.join("data", "ipasn.dat")) asn_enabled = True except (ImportError, IOError, NameError): asndb = None asn_enabled = False from config import Config import mysqlite import dbs from misc import _log, categorize_error import rocksock import connection_pool # Optional scraper integration for engine stats try: import scraper as scraper_module scraper_available = True except ImportError: scraper_module = None scraper_available = False config = Config() _run_standalone = False cached_dns = {} # IP pattern for judge services (validates response contains valid IP in body) IP_PATTERN = r'\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}' # Pattern for header echo - if X-Forwarded-For or Via present, proxy reveals chain HEADER_REVEAL_PATTERN = r'(X-Forwarded-For|Via|X-Real-Ip|Forwarded)' # Dead proxy marker - proxies with failed=-1 are permanently dead and never retested DEAD_PROXY = -1 # Error categories that indicate proxy is definitely dead (not temporary failure) FATAL_ERROR_CATEGORIES = ('refused', 'unreachable', 'auth') # Patterns indicating judge is blocking the proxy (not a proxy failure) # These should NOT count as proxy failures - retry with different judge JUDGE_BLOCK_PATTERNS = [ r'HTTP/1\.[01] 403', # Forbidden r'HTTP/1\.[01] 429', # Too Many Requests r'HTTP/1\.[01] 503', # Service Unavailable r'captcha', # Captcha challenge r'challenge', # Generic challenge r'verify you are human', # Human verification r'rate.?limit', # Rate limiting r'too many requests', # Rate limiting r'access.?denied', # Access denied r'blocked', # Explicit block r'Checking your browser', # Cloudflare JS challenge ] JUDGE_BLOCK_RE = re.compile('|'.join(JUDGE_BLOCK_PATTERNS), re.IGNORECASE) # Check types: irc, http (header match), judges (body match), ssl (TLS handshake) # Judge services - return IP in body (plain text, JSON, or HTML) judges = { # Plain text IP 'api.ipify.org': IP_PATTERN, 'icanhazip.com': IP_PATTERN, 'checkip.amazonaws.com': IP_PATTERN, 'ifconfig.me/ip': IP_PATTERN, 'ipinfo.io/ip': IP_PATTERN, 'ip.me': IP_PATTERN, 'myip.dnsomatic.com': IP_PATTERN, 'ident.me': IP_PATTERN, 'ipecho.net/plain': IP_PATTERN, 'wtfismyip.com/text': IP_PATTERN, 'eth0.me': IP_PATTERN, 'l2.io/ip': IP_PATTERN, 'tnx.nl/ip': IP_PATTERN, 'wgetip.com': IP_PATTERN, 'curlmyip.net': IP_PATTERN, # JSON responses (IP pattern matches within JSON) 'httpbin.org/ip': IP_PATTERN, 'ip-api.com/json': IP_PATTERN, 'ipapi.co/json': IP_PATTERN, 'ipwhois.app/json': IP_PATTERN, # Header echo - for elite detection (check if proxy adds revealing headers) 'httpbin.org/headers': IP_PATTERN, # returns request headers as JSON } class JudgeStats(): """Track per-judge success/failure rates for reliability scoring. Judges that frequently block or rate-limit are temporarily avoided. Stats decay over time to allow recovery. """ def __init__(self, cooldown_seconds=300, block_threshold=3): self.lock = threading.Lock() self.stats = {} # judge -> {'success': n, 'fail': n, 'block': n, 'last_block': timestamp} self.cooldown_seconds = cooldown_seconds # seconds to avoid blocked judges self.block_threshold = block_threshold # consecutive blocks before cooldown def record_success(self, judge): """Record successful judge response.""" with self.lock: if judge not in self.stats: self.stats[judge] = {'success': 0, 'fail': 0, 'block': 0, 'last_block': 0} self.stats[judge]['success'] += 1 # Reset block count on success self.stats[judge]['block'] = 0 def record_failure(self, judge): """Record judge failure (proxy failed, not judge block).""" with self.lock: if judge not in self.stats: self.stats[judge] = {'success': 0, 'fail': 0, 'block': 0, 'last_block': 0} self.stats[judge]['fail'] += 1 def record_block(self, judge): """Record judge blocking the proxy (403, captcha, rate-limit).""" with self.lock: if judge not in self.stats: self.stats[judge] = {'success': 0, 'fail': 0, 'block': 0, 'last_block': 0} self.stats[judge]['block'] += 1 self.stats[judge]['last_block'] = time.time() def is_available(self, judge): """Check if judge is available (not in cooldown).""" with self.lock: if judge not in self.stats: return True s = self.stats[judge] # Check if in cooldown period if s['block'] >= self.block_threshold: if (time.time() - s['last_block']) < self.cooldown_seconds: return False # Cooldown expired, reset block count s['block'] = 0 return True def get_available_judges(self, judge_list): """Return list of judges not in cooldown.""" return [j for j in judge_list if self.is_available(j)] def status_line(self): """Return status summary for logging.""" with self.lock: total = len(self.stats) blocked = sum(1 for s in self.stats.values() if s['block'] >= self.block_threshold and (time.time() - s['last_block']) < self.cooldown_seconds) return 'judges: %d total, %d in cooldown' % (total, blocked) def get_stats(self): """Return statistics dict for API/dashboard.""" with self.lock: now = time.time() total = len(self.stats) in_cooldown = sum(1 for s in self.stats.values() if s['block'] >= self.block_threshold and (now - s['last_block']) < self.cooldown_seconds) available = total - in_cooldown # Get top judges by success count top = [] for judge, s in self.stats.items(): total_tests = s['success'] + s['fail'] if total_tests > 0: success_pct = (s['success'] * 100.0) / total_tests top.append({'judge': judge, 'success': s['success'], 'tests': total_tests, 'rate': round(success_pct, 1)}) top.sort(key=lambda x: x['success'], reverse=True) return {'total': total, 'available': available, 'in_cooldown': in_cooldown, 'top': top} # Global judge stats instance judge_stats = JudgeStats() # HTTP targets - check for specific headers regexes = { 'www.facebook.com': 'X-FB-Debug', 'www.fbcdn.net': 'X-FB-Debug', 'www.reddit.com': 'x-clacks-overhead', 'www.twitter.com': 'x-connection-hash', 't.co': 'x-connection-hash', 'www.msn.com': 'x-aspnetmvc-version', 'www.bing.com': 'p3p', 'www.ask.com': 'x-served-by', 'www.hotmail.com': 'x-msedge-ref', 'www.bbc.co.uk': 'x-bbc-edge-cache-status', 'www.skype.com': 'X-XSS-Protection', 'www.alibaba.com': 'object-status', 'www.mozilla.org': 'cf-ray', 'www.cloudflare.com': 'cf-ray', 'www.wikimedia.org': 'x-client-ip', 'www.vk.com': 'x-frontend', 'www.tinypic.com': 'x-amz-cf-pop', 'www.netflix.com': 'X-Netflix.proxy.execution-time', 'www.amazon.de': 'x-amz-cf-id', 'www.reuters.com': 'x-amz-cf-id', 'www.ikea.com': 'x-frame-options', 'www.twitpic.com': 'timing-allow-origin', 'www.digg.com': 'cf-request-id', 'www.wikia.com': 'x-served-by', 'www.wp.com': 'x-ac', 'www.last.fm': 'x-timer', 'www.usps.com': 'x-ruleset-version', 'www.linkedin.com': 'x-li-uuid', 'www.vimeo.com': 'x-timer', 'www.yelp.com': 'x-timer', 'www.ebay.com': 'x-envoy-upstream-service-time', 'www.wikihow.com': 'x-c', 'www.archive.org': 'referrer-policy', 'www.pandora.tv': 'X-UA-Compatible', 'www.w3.org': 'x-backend', 'www.time.com': 'x-amz-cf-pop' } # SSL targets - verify TLS handshake only (MITM detection) # No HTTP request needed - just connect with verifycert=True ssl_targets = [ 'www.google.com', 'www.microsoft.com', 'www.apple.com', 'www.amazon.com', 'www.cloudflare.com', 'www.github.com', 'www.mozilla.org', 'www.wikipedia.org', 'www.reddit.com', 'www.twitter.com', 'x.com', 'www.facebook.com', 'www.linkedin.com', 'www.paypal.com', 'www.stripe.com', 'www.digicert.com', 'www.letsencrypt.org', ] class Stats(): """Track and report comprehensive runtime statistics.""" HISTORY_SIZE = 120 # 10 min at 5s intervals LATENCY_BUCKETS = [100, 250, 500, 1000, 2000, 5000, 10000] # ms thresholds def __init__(self): self.lock = threading.RLock() # RLock for reentrant access (get_runtime_stats) self.tested = 0 self.passed = 0 self.failed = 0 self.start_time = time.time() self.last_report = time.time() # Failure category tracking self.fail_categories = {} # Protocol tracking (tested and passed separately) self.proto_tested = {'http': 0, 'socks4': 0, 'socks5': 0} self.proto_passed = {'http': 0, 'socks4': 0, 'socks5': 0} # Legacy alias for compatibility self.by_proto = self.proto_passed # Time series history (5s intervals) self.rate_history = [] self.success_rate_history = [] self.latency_history = [] self.last_history_time = time.time() self.last_history_tested = 0 self.last_history_passed = 0 # Peak values self.peak_rate = 0.0 self.peak_success_rate = 0.0 self.min_latency = float('inf') self.max_latency = 0.0 # Latency tracking with percentiles self.latency_sum = 0.0 self.latency_count = 0 self.latency_samples = [] # Recent samples for percentiles self.latency_buckets = {b: 0 for b in self.LATENCY_BUCKETS + [float('inf')]} # Recent window (last 60s) self.recent_tested = 0 self.recent_passed = 0 self.recent_start = time.time() # Country/ASN tracking (top N) self.country_passed = {} self.asn_passed = {} # Hourly aggregates self.hourly_tested = 0 self.hourly_passed = 0 self.hourly_start = time.time() self.hours_data = [] # Last 24 hours # SSL/TLS tracking self.ssl_tested = 0 self.ssl_passed = 0 self.ssl_failed = 0 self.mitm_detected = 0 self.cert_errors = 0 def record(self, success, category=None, proto=None, latency_ms=None, country=None, asn=None, ssl_test=False, mitm=False, cert_error=False): with self.lock: self.tested += 1 self.recent_tested += 1 self.hourly_tested += 1 # Track protocol tests if proto and proto in self.proto_tested: self.proto_tested[proto] += 1 if success: self.passed += 1 self.recent_passed += 1 self.hourly_passed += 1 if proto and proto in self.proto_passed: self.proto_passed[proto] += 1 if latency_ms and latency_ms > 0: self.latency_sum += latency_ms self.latency_count += 1 # Track min/max if latency_ms < self.min_latency: self.min_latency = latency_ms if latency_ms > self.max_latency: self.max_latency = latency_ms # Keep recent samples for percentiles (max 1000) self.latency_samples.append(latency_ms) if len(self.latency_samples) > 1000: self.latency_samples.pop(0) # Bucket for histogram for bucket in self.LATENCY_BUCKETS: if latency_ms <= bucket: self.latency_buckets[bucket] += 1 break else: self.latency_buckets[float('inf')] += 1 # Track country/ASN if country: self.country_passed[country] = self.country_passed.get(country, 0) + 1 if asn: self.asn_passed[asn] = self.asn_passed.get(asn, 0) + 1 else: self.failed += 1 if category: self.fail_categories[category] = self.fail_categories.get(category, 0) + 1 # SSL/TLS tracking if ssl_test: self.ssl_tested += 1 if success: self.ssl_passed += 1 else: self.ssl_failed += 1 if mitm: self.mitm_detected += 1 if cert_error: self.cert_errors += 1 def update_history(self): """Update time series history (call periodically).""" now = time.time() with self.lock: elapsed = now - self.last_history_time if elapsed >= 5: # Update every 5 seconds # Rate tests_delta = self.tested - self.last_history_tested rate = tests_delta / elapsed if elapsed > 0 else 0 self.rate_history.append(round(rate, 2)) if len(self.rate_history) > self.HISTORY_SIZE: self.rate_history.pop(0) if rate > self.peak_rate: self.peak_rate = rate # Success rate passed_delta = self.passed - self.last_history_passed sr = (passed_delta / tests_delta * 100) if tests_delta > 0 else 0 self.success_rate_history.append(round(sr, 1)) if len(self.success_rate_history) > self.HISTORY_SIZE: self.success_rate_history.pop(0) if sr > self.peak_success_rate: self.peak_success_rate = sr # Average latency for this interval avg_lat = self.get_avg_latency() self.latency_history.append(round(avg_lat, 0)) if len(self.latency_history) > self.HISTORY_SIZE: self.latency_history.pop(0) self.last_history_time = now self.last_history_tested = self.tested self.last_history_passed = self.passed # Reset recent window every 60s if now - self.recent_start >= 60: self.recent_tested = 0 self.recent_passed = 0 self.recent_start = now # Hourly aggregation if now - self.hourly_start >= 3600: self.hours_data.append({ 'tested': self.hourly_tested, 'passed': self.hourly_passed, 'rate': self.hourly_passed / 3600.0 if self.hourly_tested > 0 else 0, 'success_rate': (self.hourly_passed / self.hourly_tested * 100) if self.hourly_tested > 0 else 0, }) if len(self.hours_data) > 24: self.hours_data.pop(0) self.hourly_tested = 0 self.hourly_passed = 0 self.hourly_start = now def get_recent_rate(self): """Get rate for last 60 seconds.""" with self.lock: elapsed = time.time() - self.recent_start if elapsed > 0: return self.recent_tested / elapsed return 0.0 def get_recent_success_rate(self): """Get success rate for last 60 seconds.""" with self.lock: if self.recent_tested > 0: return (self.recent_passed / self.recent_tested) * 100 return 0.0 def get_avg_latency(self): """Get average latency in ms.""" with self.lock: if self.latency_count > 0: return self.latency_sum / self.latency_count return 0.0 def get_latency_percentiles(self): """Get latency percentiles (p50, p90, p99).""" with self.lock: if not self.latency_samples: return {'p50': 0, 'p90': 0, 'p99': 0} sorted_samples = sorted(self.latency_samples) n = len(sorted_samples) return { 'p50': sorted_samples[int(n * 0.50)] if n > 0 else 0, 'p90': sorted_samples[int(n * 0.90)] if n > 0 else 0, 'p99': sorted_samples[min(int(n * 0.99), n - 1)] if n > 0 else 0, } def get_latency_histogram(self): """Get latency distribution histogram.""" with self.lock: total = sum(self.latency_buckets.values()) if total == 0: return [] result = [] prev = 0 for bucket in self.LATENCY_BUCKETS: count = self.latency_buckets[bucket] result.append({ 'range': '%d-%d' % (prev, bucket), 'count': count, 'pct': round(count / total * 100, 1), }) prev = bucket # Over max bucket over = self.latency_buckets[float('inf')] if over > 0: result.append({ 'range': '>%d' % self.LATENCY_BUCKETS[-1], 'count': over, 'pct': round(over / total * 100, 1), }) return result def get_proto_stats(self): """Get protocol-specific success rates.""" with self.lock: result = {} for proto in ['http', 'socks4', 'socks5']: tested = self.proto_tested[proto] passed = self.proto_passed[proto] result[proto] = { 'tested': tested, 'passed': passed, 'success_rate': round(passed / tested * 100, 1) if tested > 0 else 0, } return result def get_top_countries(self, limit=10): """Get top countries by working proxy count.""" with self.lock: sorted_countries = sorted(self.country_passed.items(), key=lambda x: -x[1]) return sorted_countries[:limit] def get_top_asns(self, limit=10): """Get top ASNs by working proxy count.""" with self.lock: sorted_asns = sorted(self.asn_passed.items(), key=lambda x: -x[1]) return sorted_asns[:limit] def get_hourly_data(self): """Get last 24 hours of hourly data.""" with self.lock: return list(self.hours_data) def load_state(self, state): """Load persisted state from a dict (from database). Args: state: dict from dbs.load_session_state() """ if not state: return with self.lock: self.tested = state.get('tested', 0) self.passed = state.get('passed', 0) self.failed = state.get('failed', 0) self.ssl_tested = state.get('ssl_tested', 0) self.ssl_passed = state.get('ssl_passed', 0) self.ssl_failed = state.get('ssl_failed', 0) self.mitm_detected = state.get('mitm_detected', 0) self.cert_errors = state.get('cert_errors', 0) self.proto_tested['http'] = state.get('proto_http_tested', 0) self.proto_passed['http'] = state.get('proto_http_passed', 0) self.proto_tested['socks4'] = state.get('proto_socks4_tested', 0) self.proto_passed['socks4'] = state.get('proto_socks4_passed', 0) self.proto_tested['socks5'] = state.get('proto_socks5_tested', 0) self.proto_passed['socks5'] = state.get('proto_socks5_passed', 0) self.peak_rate = state.get('peak_rate', 0.0) # Note: start_time is NOT restored - uptime reflects current session # Restore failure categories if state.get('fail_categories'): self.fail_categories = dict(state['fail_categories']) # Restore geo tracking if state.get('country_passed'): self.country_passed = dict(state['country_passed']) if state.get('asn_passed'): # Convert string keys back to int for ASN self.asn_passed = {int(k) if k.isdigit() else k: v for k, v in state['asn_passed'].items()} _log('restored session: %d tested, %d passed' % (self.tested, self.passed), 'info') def should_report(self, interval): return (time.time() - self.last_report) >= interval def report(self): with self.lock: self.last_report = time.time() elapsed = time.time() - self.start_time rate = try_div(self.tested, elapsed) pct = try_div(self.passed * 100.0, self.tested) base = 'tested=%d passed=%d (%.1f%%) rate=%.2f/s uptime=%dm' % ( self.tested, self.passed, pct, rate, int(elapsed / 60)) # Add failure breakdown if there are failures if self.fail_categories: cats = ' '.join('%s=%d' % (k, v) for k, v in sorted(self.fail_categories.items())) return '%s [%s]' % (base, cats) return base def get_full_stats(self): """Get comprehensive stats dict for API.""" with self.lock: elapsed = time.time() - self.start_time return { 'tested': self.tested, 'passed': self.passed, 'failed': self.failed, 'success_rate': round(self.passed / self.tested * 100, 1) if self.tested > 0 else 0, 'rate': round(self.tested / elapsed, 2) if elapsed > 0 else 0, 'pass_rate': round(self.passed / elapsed, 2) if elapsed > 0 else 0, 'recent_rate': self.get_recent_rate(), 'recent_success_rate': self.get_recent_success_rate(), 'peak_rate': self.peak_rate, 'peak_success_rate': self.peak_success_rate, 'uptime_seconds': int(elapsed), 'rate_history': list(self.rate_history), 'success_rate_history': list(self.success_rate_history), 'latency_history': list(self.latency_history), 'avg_latency': self.get_avg_latency(), 'min_latency': self.min_latency if self.min_latency != float('inf') else 0, 'max_latency': self.max_latency, 'latency_percentiles': self.get_latency_percentiles(), 'latency_histogram': self.get_latency_histogram(), 'by_proto': dict(self.proto_passed), 'proto_stats': self.get_proto_stats(), 'failures': dict(self.fail_categories), 'top_countries': self.get_top_countries(), 'top_asns': self.get_top_asns(), 'hourly_data': self.get_hourly_data(), } def try_div(a, b): if b != 0: return a/float(b) return 0 class PriorityJobQueue(object): """Priority queue for proxy test jobs. Lower priority number = higher priority. Priority 0: New proxies (never tested) Priority 1: Recently working (no failures, has successes) Priority 2: Low fail count (< 3 failures) Priority 3: Medium fail count Priority 4: High fail count """ def __init__(self): self.heap = [] self.lock = threading.Lock() self.not_empty = threading.Condition(self.lock) self.counter = 0 # tie-breaker for equal priorities def put(self, job, priority=3): """Add job with priority (lower = higher priority).""" with self.lock: heapq.heappush(self.heap, (priority, self.counter, job)) self.counter += 1 self.not_empty.notify() def get(self, timeout=None): """Get highest priority job. Raises Queue.Empty on timeout.""" with self.not_empty: if timeout is None: while not self.heap: self.not_empty.wait() else: end_time = time.time() + timeout while not self.heap: remaining = end_time - time.time() if remaining <= 0: raise Queue.Empty() self.not_empty.wait(remaining) _, _, job = heapq.heappop(self.heap) return job def get_nowait(self): """Get job without waiting. Raises Queue.Empty if empty.""" with self.lock: if not self.heap: raise Queue.Empty() _, _, job = heapq.heappop(self.heap) return job def empty(self): """Check if queue is empty.""" with self.lock: return len(self.heap) == 0 def qsize(self): """Return queue size.""" with self.lock: return len(self.heap) def task_done(self): """Compatibility method (no-op for heap queue).""" pass def calculate_priority(failcount, success_count, max_fail): """Calculate job priority based on proxy state. Returns: int: Priority 0-4 (lower = higher priority) """ # New proxy (never successfully tested) if success_count == 0 and failcount == 0: return 0 # Recently working (no current failures) if failcount == 0: return 1 # Low fail count if failcount < 3: return 2 # Medium fail count if failcount < max_fail // 2: return 3 # High fail count return 4 class ThreadScaler(object): """Dynamic thread scaling based on queue depth and success rate. With gevent greenlets, we can scale much more aggressively than with OS threads since greenlets are cooperative and lightweight. Scales up when: - Queue depth exceeds high watermark - Success rate is above threshold Scales down when: - Queue is nearly empty - Success rate drops below threshold """ def __init__(self, min_threads=5, max_threads=100, target_queue_per_thread=5): self.min_threads = min_threads self.max_threads = max_threads self.target_queue_per_thread = target_queue_per_thread self.last_scale_time = 0 self.scale_cooldown = 10 # seconds between scaling (faster with greenlets) self.success_threshold = 10.0 # minimum success rate % to scale up def should_scale(self, current_threads, queue_size, stats): """Determine if scaling is needed. Args: current_threads: Current number of active threads queue_size: Current job queue depth stats: Stats object with success/fail counts Returns: int: Target thread count (may be same as current) """ now = time.time() if (now - self.last_scale_time) < self.scale_cooldown: return current_threads # Calculate current success rate with stats.lock: total = stats.tested passed = stats.passed success_rate = try_div(passed * 100.0, total) if total > 0 else 50.0 # Calculate ideal thread count based on queue depth ideal = max(self.min_threads, queue_size // self.target_queue_per_thread) ideal = min(ideal, self.max_threads) target = current_threads # Scale up: queue is deep - scale based on queue depth only # With greenlets, scale more aggressively (+5 instead of +2) if queue_size > current_threads * self.target_queue_per_thread: target = min(current_threads + 5, ideal, self.max_threads) # Scale down: queue is shallow elif queue_size < current_threads * 2: target = max(current_threads - 2, self.min_threads) if target != current_threads: self.last_scale_time = now return target def status_line(self, current_threads, queue_size): """Return status string for logging.""" return 'threads=%d queue=%d target_per_thread=%d' % ( current_threads, queue_size, self.target_queue_per_thread) def socks4_resolve(srvname, server_port): srv = srvname if srv in cached_dns: srv = cached_dns[srvname] if config.watchd.debug: _log("using cached ip (%s) for %s"%(srv, srvname), "debug") else: dns_fail = False try: af, sa = rocksock.resolve(rocksock.RocksockHostinfo(srvname, server_port), want_v4=True) if sa is not None: cached_dns[srvname] = sa[0] srv = sa[0] else: dns_fail = True except rocksock.RocksockException as e: assert(e.get_errortype() == rocksock.RS_ET_GAI) dns_fail = True if dns_fail: fail_inc = 0 _log("could not resolve connection target %s"%srvname, "ERROR") return False return srv class ProxyTestState(): """Thread-safe state for a proxy being tested against multiple targets. Results from TargetTestJob instances are aggregated here. When all tests complete, evaluate() determines final pass/fail. """ def __init__(self, ip, port, proto, failcount, success_count, total_duration, country, mitm, consecutive_success, asn=None, num_targets=3, oldies=False, completion_queue=None): self.ip = ip self.port = int(port) self.proxy = '%s:%s' % (ip, port) self.proto = proto self.failcount = failcount self.checktime = None self.success_count = success_count self.total_duration = total_duration self.country = country self.mitm = mitm self.consecutive_success = consecutive_success self.asn = asn self.isoldies = oldies self.num_targets = num_targets self.completion_queue = completion_queue # for signaling completion # thread-safe result accumulation self.lock = threading.Lock() self.results = [] # list of (success, proto, duration, srv, tor, ssl) self.completed = False self.last_latency_ms = None # average latency from successful tests self.exit_ip = None # IP seen by target server (for anonymity detection) self.reveals_headers = None # True if proxy adds X-Forwarded-For/Via headers self.last_fail_category = None # failure category from last test (for dead detection) self.original_failcount = failcount # failcount before this test cycle # SSL/TLS tracking self.had_ssl_test = False self.ssl_success = False self.cert_error = False def record_result(self, success, proto=None, duration=0, srv=None, tor=None, ssl=None, category=None, exit_ip=None, reveals_headers=None): """Record a single target test result. Thread-safe. When all target tests complete, signals via completion_queue. """ should_signal = False with self.lock: self.results.append({ 'success': success, 'proto': proto, 'duration': duration, 'srv': srv, 'tor': tor, 'ssl': ssl, 'category': category, 'exit_ip': exit_ip, 'reveals_headers': reveals_headers }) # Track SSL tests if ssl: self.had_ssl_test = True if success: self.ssl_success = True # Track cert errors if category == 'cert_error' or category == 'ssl_error': self.cert_error = True # Check completion (inside lock to prevent race) if not self.completed and len(self.results) >= self.num_targets: self.completed = True should_signal = True # Signal outside lock to avoid deadlock if should_signal and self.completion_queue is not None: self.completion_queue.put(self) def is_complete(self): """Check if all target tests have finished.""" # Fast path: check completed flag without lock (atomic read) if self.completed: return True # Slow path: check with lock (only during transition) with self.lock: return len(self.results) >= self.num_targets def rwip(self, ip): """Remove leading zeros from IP octets (e.g., 192.168.001.010 -> 192.168.1.10).""" n = [] for b in ip.split('.'): while b[0] == '0' and len(b) > 1: b = b[1:] n.append(b) return '.'.join(n) def evaluate(self): """Evaluate results after all tests complete. Returns: (success, category) tuple where success is bool and category is the dominant failure type (or None on success) """ with self.lock: if self.completed: return (self.failcount == 0, None) self.completed = True self.checktime = int(time.time()) successes = [r for r in self.results if r['success']] failures = [r for r in self.results if not r['success']] num_success = len(successes) # Determine dominant failure category fail_category = None if failures: cats = {} for f in failures: cat = f.get('category') or 'other' cats[cat] = cats.get(cat, 0) + 1 if cats: fail_category = max(cats.keys(), key=lambda k: cats[k]) # require success (single target mode) if num_success >= 1: # use last successful result for metrics last_good = successes[-1] # Extract exit IP and header info from successful judge responses for s in successes: if s.get('exit_ip'): self.exit_ip = s['exit_ip'] if s.get('reveals_headers') is not None: self.reveals_headers = s['reveals_headers'] if geolite and self.country is None: self.ip = self.rwip(self.ip) rec = geodb.get_all(self.ip) if rec is not None and rec.country_short: self.country = rec.country_short if asn_enabled and self.asn is None: try: asn_result = asndb.lookup(self.ip) if asn_result and asn_result[0]: self.asn = asn_result[0] except Exception: pass self.proto = last_good['proto'] self.failcount = 0 if (self.consecutive_success % 3) == 0: self.mitm = 0 self.consecutive_success += 1 self.success_count += 1 self.total_duration += int(last_good['duration'] * 1000) # Calculate average latency from successful tests (in ms) durations = [s['duration'] for s in successes if s['duration']] if durations: self.last_latency_ms = sum(durations) * 1000.0 / len(durations) torstats = "" if len(config.torhosts) == 1 else ' tor: %s;' % last_good['tor'] # Determine anonymity for status message # transparent: exit_ip == proxy_ip # anonymous: exit_ip != proxy_ip, adds revealing headers # elite: exit_ip != proxy_ip, no revealing headers anon_status = '' if self.exit_ip: if self.exit_ip == self.ip: anon_status = ' anon: transparent;' elif self.reveals_headers is False: anon_status = ' anon: elite;' elif self.reveals_headers is True: anon_status = ' anon: anonymous;' else: anon_status = ' anon: anonymous;' # default if no header check _log('%s://%s:%d (%s) d: %.2f sec(s);%s%s srv: %s; ssl: %s; %d/%d targets' % ( last_good['proto'], self.ip, self.port, self.country, last_good['duration'], torstats, anon_status, last_good['srv'], str(last_good['ssl']), num_success, self.num_targets), 'xxxxx') return (True, None) else: # Check if failures were all judge blocks (not proxy's fault) judge_blocks = [f for f in failures if f.get('category') == 'judge_block'] real_failures = [f for f in failures if f.get('category') != 'judge_block'] if judge_blocks and not real_failures: # All failures were judge blocks - inconclusive, don't penalize proxy # checktime still updated so we don't immediately retest return (False, 'judge_block') else: # Real proxy failure self.failcount += 1 self.consecutive_success = 0 self.last_fail_category = fail_category return (False, fail_category) class TargetTestJob(): """Job to test a single proxy against a single target. Multiple TargetTestJob instances share the same ProxyTestState, allowing tests to be interleaved with other proxies in the queue. """ def __init__(self, proxy_state, target_srv, checktype, worker_id=None): self.proxy_state = proxy_state self.target_srv = target_srv self.checktype = checktype self.worker_id = worker_id def run(self): """Test the proxy against this job's target server.""" sock, proto, duration, tor, srv, failinc, is_ssl, err_cat = self._connect_and_test() if not sock: # SSL-only check passed (handshake success, no request needed) if err_cat == 'ssl_ok': elapsed = time.time() - duration self.proxy_state.record_result( True, proto=proto, duration=elapsed, srv=srv, tor=tor, ssl=is_ssl ) elif err_cat == 'ssl_mitm': # MITM detected - proxy works but intercepts TLS elapsed = time.time() - duration self.proxy_state.record_result( True, proto=proto, duration=elapsed, srv=srv, tor=tor, ssl=is_ssl ) self.proxy_state.mitm = 1 else: self.proxy_state.record_result(False, category=err_cat) return try: recv = sock.recv(-1) # Select regex based on check type if self.checktype == 'irc': regex = '^(:|NOTICE|ERROR)' elif self.checktype == 'judges': regex = judges[srv] elif self.checktype == 'ssl': # Should not reach here - ssl returns before recv self.proxy_state.record_result(True, proto=proto, srv=srv, ssl=is_ssl) return else: # http regex = regexes[srv] if re.search(regex, recv, re.IGNORECASE): elapsed = time.time() - duration # Extract exit IP from judge response exit_ip = None reveals_headers = None if self.checktype == 'judges': ip_match = re.search(IP_PATTERN, recv) if ip_match: exit_ip = ip_match.group(0) # Check for header echo judge (elite detection) if 'headers' in srv: # If X-Forwarded-For/Via/etc present, proxy reveals chain reveals_headers = bool(re.search(HEADER_REVEAL_PATTERN, recv, re.IGNORECASE)) # Record successful judge judge_stats.record_success(srv) self.proxy_state.record_result( True, proto=proto, duration=elapsed, srv=srv, tor=tor, ssl=is_ssl, exit_ip=exit_ip, reveals_headers=reveals_headers ) else: # Check if judge is blocking us (not a proxy failure) if self.checktype == 'judges' and JUDGE_BLOCK_RE.search(recv): judge_stats.record_block(srv) # Don't count as proxy failure - judge is blocking self.proxy_state.record_result(False, category='judge_block') if config.watchd.debug: _log('judge %s blocked proxy %s' % (srv, self.proxy_state.proxy), 'debug') else: if self.checktype == 'judges': judge_stats.record_failure(srv) self.proxy_state.record_result(False, category='other') except KeyboardInterrupt as e: sock.disconnect() raise e except rocksock.RocksockException as e: self.proxy_state.record_result(False, category=categorize_error(e)) finally: sock.disconnect() def _connect_and_test(self): """Connect to target through the proxy and send test packet.""" ps = self.proxy_state srvname = self.target_srv.strip() if self.checktype == 'irc' else self.target_srv # For judges, extract host from 'host/path' format if self.checktype == 'judges' and '/' in srvname: connect_host = srvname.split('/')[0] else: connect_host = srvname # SSL checktype: always use SSL with certificate verification if self.checktype == 'ssl': use_ssl = 1 ssl_only_check = True # handshake only, no HTTP request server_port = 443 verifycert = True else: use_ssl = random.choice([0, 1]) if config.watchd.use_ssl == 2 else config.watchd.use_ssl ssl_only_check = False # minimal SSL test (handshake only, no request) if ps.consecutive_success > 0 and (ps.consecutive_success % 3) == 0: use_ssl = 1 ssl_only_check = True # periodic MITM check - handshake is sufficient if self.checktype == 'irc': server_port = 6697 if use_ssl else 6667 else: server_port = 443 if use_ssl else 80 verifycert = True if use_ssl else False protos = ['http', 'socks5', 'socks4'] if ps.proto is None else [ps.proto] last_error_category = None # Get Tor host from pool (with worker affinity) pool = connection_pool.get_pool() for proto in protos: if pool: torhost = pool.get_tor_host(self.worker_id) else: torhost = random.choice(config.torhosts) if proto == 'socks4': srv = socks4_resolve(connect_host, server_port) else: srv = connect_host if not srv: continue duration = time.time() proxies = [ rocksock.RocksockProxyFromURL('socks5://%s' % torhost), rocksock.RocksockProxyFromURL('%s://%s:%s' % (proto, ps.ip, ps.port)), ] try: sock = rocksock.Rocksock(host=srv, port=server_port, ssl=use_ssl, proxies=proxies, timeout=config.watchd.timeout, verifycert=verifycert) sock.connect() # SSL-only check: handshake passed, no request needed if ssl_only_check: elapsed = time.time() - duration if pool: pool.record_success(torhost, elapsed) sock.disconnect() return None, proto, duration, torhost, srvname, 0, use_ssl, 'ssl_ok' if self.checktype == 'irc': sock.send('NICK\n') elif self.checktype == 'judges': # GET request to receive body with IP address sock.send('GET /%s HTTP/1.0\r\nHost: %s\r\n\r\n' % ( srvname.split('/', 1)[1] if '/' in srvname else '', srvname.split('/')[0] )) else: # http - HEAD is sufficient for header checks sock.send('HEAD / HTTP/1.0\r\nHost: %s\r\n\r\n' % srvname) # Record success in pool if pool: pool.record_success(torhost, time.time() - duration) return sock, proto, duration, torhost, srvname, 0, use_ssl, None except rocksock.RocksockException as e: last_error_category = categorize_error(e) if config.watchd.debug: _log("proxy failed: %s://%s:%d: %s [%s]" % (proto, ps.ip, ps.port, e.get_errormessage(), last_error_category), 'debug') et = e.get_errortype() err = e.get_error() fp = e.get_failedproxy() sock.disconnect() if et == rocksock.RS_ET_OWN: if fp == 1 and (err == rocksock.RS_E_REMOTE_DISCONNECTED or err == rocksock.RS_E_HIT_TIMEOUT): break elif fp == 0 and err == rocksock.RS_E_TARGET_CONN_REFUSED: # Tor connection failed - record in pool if pool: pool.record_failure(torhost) if random.randint(0, (config.watchd.threads - 1) / 2) == 0: _log("could not connect to tor, sleep 5s", "ERROR") time.sleep(5) elif et == rocksock.RS_ET_GAI: _log("could not resolve connection target %s" % connect_host, "ERROR") break elif err == rocksock.RS_E_SSL_CERTIFICATE_ERROR: # MITM detected - proxy works but intercepts TLS ps.mitm = 1 elapsed = time.time() - duration if pool: pool.record_success(torhost, elapsed) return None, proto, duration, torhost, srvname, 0, use_ssl, 'ssl_mitm' except KeyboardInterrupt as e: raise e return None, None, None, None, None, 1, use_ssl, last_error_category class WorkerThread(): def __init__(self, id, job_queue, result_queue): self.id = id self.done = threading.Event() self.thread = None self.job_queue = job_queue # shared input queue self.result_queue = result_queue # shared output queue def stop(self): self.done.set() def term(self): if self.thread: self.thread.join() def start_thread(self): self.thread = threading.Thread(target=self.workloop) self.thread.start() def workloop(self): job_count = 0 duration_total = 0 while not self.done.is_set(): try: job = self.job_queue.get(timeout=0.5) except Queue.Empty: continue nao = time.time() # Assign worker ID for connection pool affinity job.worker_id = self.id job.run() spent = time.time() - nao job_count += 1 duration_total += spent self.result_queue.put(job) self.job_queue.task_done() if self.thread and job_count > 0: avg_t = try_div(duration_total, job_count) _log("terminated, %d jobs, avg.time %.2f" % (job_count, avg_t), self.id) class Proxywatchd(): def stop(self): _log('halting... (%d thread(s))' % len(self.threads), 'watchd') self.stopping.set() def _cleanup(self): for wt in self.threads: wt.stop() for wt in self.threads: wt.term() self.collect_work() self.submit_collected() if self.httpd_server: self.httpd_server.stop() self.stopped.set() def finish(self): if not self.in_background: self._cleanup() while not self.stopped.is_set(): time.sleep(0.1) success_rate = try_div(self.totals['success'], self.totals['submitted']) * 100 _log("total results: %d/%d (%.2f%%)"%(self.totals['success'], self.totals['submitted'], success_rate), "watchd") # Final save of session state before exit try: self._prep_db() dbs.save_session_state(self.mysqlite, self.stats) dbs.save_stats_snapshot(self.mysqlite, self.stats) self._close_db() _log('session state saved', 'watchd') except Exception as e: _log('failed to save final session state: %s' % str(e), 'warn') def _prep_db(self): self.mysqlite = mysqlite.mysqlite(config.watchd.database, str) def _close_db(self): if self.mysqlite: self.mysqlite.close() self.mysqlite = None def __init__(self): config.load() self.in_background = False self.threads = [] self.stopping = threading.Event() self.stopped = threading.Event() # shared work-stealing queues self.job_queue = PriorityJobQueue() self.result_queue = Queue.Queue() self.completion_queue = Queue.Queue() # completed ProxyTestState objects # track pending proxy states (for multi-target aggregation) self.pending_states = {} # dict: proxy -> ProxyTestState (O(1) lookup/removal) self.pending_lock = threading.Lock() # create table if needed (use dbs.py for canonical schema) self._prep_db() dbs.create_table_if_not_exists(self.mysqlite, 'proxylist') self._close_db() self.submit_after = config.watchd.submit_after # number of collected jobs before writing db self.collected = [] # completed ProxyTestState objects ready for DB self.totals = { 'submitted':0, 'success':0, } self.stats = Stats() self.last_cleanup = time.time() self.httpd_server = None # Dynamic thread scaling min_t = config.watchd.min_threads if config.watchd.min_threads > 0 else max(5, config.watchd.threads // 4) self.scaler = ThreadScaler( min_threads=min_t, max_threads=config.watchd.threads, # respect configured thread limit target_queue_per_thread=5 ) self.thread_id_counter = 0 self.last_jobs_log = 0 self.last_update_log = 0 self.log_interval = 60 # seconds between routine log messages # Register SIGHUP handler for config reload signal.signal(signal.SIGHUP, self._handle_sighup) def _handle_sighup(self, signum, frame): """Handle SIGHUP signal for config reload.""" _log('received SIGHUP, reloading config...', 'watchd') self.reload_config() def reload_config(self): """Reload configuration without restart. Hot-reloadable settings: threads, timeout, checktime, perfail_checktime, submit_after, stats_interval, debug, tor_safeguard, stale_days, max_fail, outage_threshold Requires restart: database, source_file, checktype """ old_threads = config.watchd.threads old_checktype = config.watchd.checktype try: config.load() errors = config.validate() if errors: for e in errors: _log('config error: %s' % e, 'error') _log('config reload failed, keeping old config', 'error') return False except Exception as e: _log('config reload failed: %s' % str(e), 'error') return False # Update runtime values self.submit_after = config.watchd.submit_after # Update scaler limits self.scaler.min_threads = max(5, config.watchd.threads // 4) self.scaler.max_threads = config.watchd.threads # Warn about values requiring restart if config.watchd.checktype != old_checktype: _log('checktype changed (%s -> %s), requires restart to take effect' % ( old_checktype, config.watchd.checktype), 'warn') _log('config reloaded: threads=%d timeout=%d checktime=%d max_fail=%d' % ( config.watchd.threads, config.watchd.timeout, config.watchd.checktime, config.watchd.max_fail), 'watchd') return True def _spawn_thread(self): """Spawn a new worker thread.""" self.thread_id_counter += 1 threadid = 'dyn%d' % self.thread_id_counter wt = WorkerThread(threadid, self.job_queue, self.result_queue) wt.start_thread() self.threads.append(wt) return threadid def _remove_thread(self): """Remove one worker thread (graceful shutdown).""" if len(self.threads) <= self.scaler.min_threads: return None # Stop the last thread wt = self.threads.pop() wt.stop() # Don't wait for it - let it finish its current job return wt.id def _adjust_threads(self, target): """Adjust thread count to target.""" current = len(self.threads) if target > current: added = [] for _ in range(target - current): tid = self._spawn_thread() added.append(tid) if added: _log('scaled up: +%d threads (%s)' % (len(added), ','.join(added)), 'scaler') elif target < current: removed = [] for _ in range(current - target): tid = self._remove_thread() if tid: removed.append(tid) if removed: _log('scaled down: -%d threads' % len(removed), 'scaler') def fetch_rows(self): self.isoldies = False q = 'SELECT ip,port,proto,failed,success_count,total_duration,country,mitm,consecutive_success,asn FROM proxylist WHERE failed >= ? and failed < ? and (tested + ? + (failed * ?)) < ? ORDER BY RANDOM()' now = time.time() params = (0, config.watchd.max_fail, config.watchd.checktime, config.watchd.perfail_checktime, now) if config.watchd.debug: _log('fetch_rows: db=%s checktime=%d perfail=%d max_fail=%d now=%d' % ( config.watchd.database, config.watchd.checktime, config.watchd.perfail_checktime, config.watchd.max_fail, int(now)), 'debug') rows = self.mysqlite.execute(q, params).fetchall() if config.watchd.debug: _log('fetch_rows: got %d rows' % len(rows), 'debug') # check oldies ? if len(rows) < config.watchd.threads: rows = [] if config.watchd.oldies: self.isoldies = True ## disable tor safeguard for old proxies if self.tor_safeguard: self.tor_safeguard = False rows = self.mysqlite.execute(q, (config.watchd.max_fail, config.watchd.max_fail + round( config.watchd.max_fail/2), config.watchd.checktime, config.watchd.oldies_checktime, time.time())).fetchall() return rows def prepare_jobs(self): self._prep_db() ## enable tor safeguard by default self.tor_safeguard = config.watchd.tor_safeguard rows = self.fetch_rows() checktype = config.watchd.checktype num_targets = 1 # select target pool based on checktype if checktype == 'irc': target_pool = config.servers elif checktype == 'judges': # Filter out judges in cooldown (blocked/rate-limited) all_judges = list(judges.keys()) target_pool = judge_stats.get_available_judges(all_judges) if not target_pool: # All judges in cooldown - use all anyway target_pool = all_judges if config.watchd.debug: _log('all judges in cooldown, using full list', 'debug') elif checktype == 'ssl': target_pool = ssl_targets else: # http target_pool = list(regexes.keys()) # create all jobs first, then shuffle for interleaving all_jobs = [] new_states = [] for row in rows: # create shared state for this proxy state = ProxyTestState( row[0], row[1], row[2], row[3], row[4], row[5], row[6], row[7], row[8], asn=row[9], num_targets=num_targets, oldies=self.isoldies, completion_queue=self.completion_queue ) new_states.append(state) # select random targets for this proxy targets = random.sample(target_pool, min(num_targets, len(target_pool))) # create one job per target for target in targets: job = TargetTestJob(state, target, checktype) all_jobs.append(job) # shuffle to interleave tests across different proxies random.shuffle(all_jobs) # track pending states (dict for O(1) lookup/removal) with self.pending_lock: for state in new_states: self.pending_states[state.proxy] = state # queue all jobs with priority for job in all_jobs: priority = calculate_priority( job.proxy_state.failcount, job.proxy_state.success_count, config.watchd.max_fail ) self.job_queue.put(job, priority) self._close_db() proxy_count = len(new_states) job_count = len(all_jobs) now = time.time() if proxy_count > 0 and (now - self.last_jobs_log) >= self.log_interval: _log("created %d jobs for %d proxies (%d targets each)" % ( job_count, proxy_count, num_targets), 'watchd') self.last_jobs_log = now return job_count def collect_work(self): # drain results from shared result queue (TargetTestJob objects) # results are already recorded in their ProxyTestState while True: try: self.result_queue.get_nowait() except Queue.Empty: break # process completed states from completion queue (event-driven, not polling) # ProxyTestState.record_result() pushes to completion_queue when all targets done completed_count = 0 while True: try: state = self.completion_queue.get_nowait() completed_count += 1 # evaluate and record stats success, category = state.evaluate() self.stats.record(success, category, state.proto, state.last_latency_ms, state.country, state.asn, ssl_test=state.had_ssl_test, mitm=(state.mitm > 0), cert_error=state.cert_error) self.collected.append(state) # remove from pending dict (O(1)) with self.pending_lock: self.pending_states.pop(state.proxy, None) except Queue.Empty: break def collect_unfinished(self): # drain any remaining jobs from job queue unfinished_count = 0 while True: try: self.job_queue.get_nowait() unfinished_count += 1 except Queue.Empty: break if unfinished_count > 0: _log("discarded %d unfinished jobs" % unfinished_count, "watchd") # note: corresponding ProxyTestStates will be incomplete # they'll be re-tested in the next cycle def submit_collected(self): if len(self.collected) == 0: return True sc = 0 dead_count = 0 args = [] latency_updates = [] # (proxy, latency_ms) for successful tests max_fail = config.watchd.max_fail for job in self.collected: if job.failcount == 0: sc += 1 if job.last_latency_ms is not None: latency_updates.append((job.proxy, job.last_latency_ms)) # Check if proxy should be marked as permanently dead effective_failcount = job.failcount if job.failcount > 0: is_fatal = job.last_fail_category in FATAL_ERROR_CATEGORIES # Fatal errors (refused/unreachable/auth) = immediately dead if is_fatal: effective_failcount = DEAD_PROXY dead_count += 1 # Non-fatal: mark dead if exceeded max_fail*2 elif job.failcount >= max_fail * 2: effective_failcount = DEAD_PROXY dead_count += 1 args.append((effective_failcount, job.checktime, 1, job.country, job.proto, job.success_count, job.total_duration, job.mitm, job.consecutive_success, job.asn, job.proxy)) success_rate = (float(sc) / len(self.collected)) * 100 ret = True if len(self.collected) >= 100 and success_rate <= config.watchd.outage_threshold and self.tor_safeguard: _log("WATCHD %.2f%% SUCCESS RATE - tor circuit blocked? won't submit fails" % success_rate, "ERROR") if sc == 0: return False args = [] latency_updates = [] for job in self.collected: if job.failcount == 0: args.append((job.failcount, job.checktime, 1, job.country, job.proto, job.success_count, job.total_duration, job.mitm, job.consecutive_success, job.asn, job.proxy)) if job.last_latency_ms is not None: latency_updates.append((job.proxy, job.last_latency_ms)) ret = False now = time.time() if (now - self.last_update_log) >= self.log_interval or success_rate > 0: dead_msg = ', %d marked dead' % dead_count if dead_count > 0 else '' _log("updating %d DB entries (success rate: %.2f%%%s)" % (len(self.collected), success_rate, dead_msg), 'watchd') self.last_update_log = now self._prep_db() query = 'UPDATE proxylist SET failed=?,tested=?,dronebl=?,country=?,proto=?,success_count=?,total_duration=?,mitm=?,consecutive_success=?,asn=? WHERE proxy=?' self.mysqlite.executemany(query, args) # Batch update latency metrics for successful proxies if latency_updates: dbs.batch_update_proxy_latency(self.mysqlite, latency_updates) # Batch update anonymity for proxies with exit IP data anonymity_updates = [(job.proxy, job.exit_ip, job.ip, job.reveals_headers) for job in self.collected if job.failcount == 0 and job.exit_ip] if anonymity_updates: dbs.batch_update_proxy_anonymity(self.mysqlite, anonymity_updates) self.mysqlite.commit() self._close_db() self.collected = [] self.totals['submitted'] += len(args) self.totals['success'] += sc return ret def cleanup_stale(self): """Remove proxies that have been dead for too long.""" stale_seconds = config.watchd.stale_days * 86400 cutoff = int(time.time()) - stale_seconds self._prep_db() # delete proxies that: (failed >= max_fail OR permanently dead) AND last tested before cutoff result = self.mysqlite.execute( 'DELETE FROM proxylist WHERE (failed >= ? OR failed = ?) AND tested < ?', (config.watchd.max_fail, DEAD_PROXY, cutoff) ) count = result.rowcount if hasattr(result, 'rowcount') else 0 self.mysqlite.commit() self._close_db() if count > 0: _log('removed %d stale proxies (not seen in %d days)' % (count, config.watchd.stale_days), 'watchd') self.last_cleanup = time.time() def start(self): if config.watchd.threads == 1 and _run_standalone: return self._run() else: return self._run_background() def run(self): if self.in_background: while 1: time.sleep(1) def _run_background(self): self.in_background = True t = threading.Thread(target=self._run) t.start() def get_runtime_stats(self): """Return runtime statistics for the dashboard. Returns: dict with tested, passed, success_rate, rate, uptime_seconds, threads, failures, tor_pool, and engine stats. """ stats_data = {} # Stats from Stats object with self.stats.lock: stats_data['tested'] = self.stats.tested stats_data['passed'] = self.stats.passed stats_data['failed'] = self.stats.failed elapsed = time.time() - self.stats.start_time stats_data['uptime_seconds'] = int(elapsed) stats_data['rate'] = try_div(self.stats.tested, elapsed) stats_data['success_rate'] = try_div(self.stats.passed * 100.0, self.stats.tested) stats_data['failures'] = dict(self.stats.fail_categories) # New: Protocol breakdown stats_data['by_proto'] = dict(self.stats.by_proto) # New: Rate history for sparklines stats_data['rate_history'] = list(self.stats.rate_history) stats_data['success_rate_history'] = list(self.stats.success_rate_history) # New: Peak rate stats_data['peak_rate'] = self.stats.peak_rate # New: Recent rate (last 60s) and avg latency stats_data['recent_rate'] = self.stats.get_recent_rate() stats_data['recent_success_rate'] = self.stats.get_recent_success_rate() stats_data['avg_latency'] = self.stats.get_avg_latency() # Latency stats stats_data['min_latency'] = self.stats.min_latency if self.stats.min_latency != float('inf') else 0 stats_data['max_latency'] = self.stats.max_latency stats_data['latency_percentiles'] = self.stats.get_latency_percentiles() stats_data['latency_histogram'] = self.stats.get_latency_histogram() # Protocol performance (tested, passed, rate per protocol) stats_data['proto_stats'] = self.stats.get_proto_stats() # Geographic distribution from session stats_data['top_countries_session'] = self.stats.get_top_countries(10) stats_data['top_asns_session'] = self.stats.get_top_asns(10) # SSL/TLS stats with self.stats.lock: ssl_tested = self.stats.ssl_tested ssl_passed = self.stats.ssl_passed stats_data['ssl'] = { 'tested': ssl_tested, 'passed': ssl_passed, 'failed': self.stats.ssl_failed, 'success_rate': try_div(ssl_passed * 100.0, ssl_tested) if ssl_tested > 0 else 0, 'mitm_detected': self.stats.mitm_detected, 'cert_errors': self.stats.cert_errors } # Thread info stats_data['threads'] = len(self.threads) stats_data['min_threads'] = self.scaler.min_threads stats_data['max_threads'] = self.scaler.max_threads stats_data['queue_size'] = self.job_queue.qsize() stats_data['checktype'] = config.watchd.checktype stats_data['profiling'] = getattr(config.args, 'profile', False) if hasattr(config, 'args') else False stats_data['pass_rate'] = try_div(self.stats.passed, elapsed) # Tor pool stats pool = connection_pool.get_pool() if pool: pool_stats = pool.get_stats() hosts_list = [] healthy_count = 0 latency_sum = 0.0 latency_count = 0 for h in pool_stats.get('hosts', []): is_healthy = h.get('available', False) lat_ms = h.get('avg_latency', 0) * 1000 if h.get('avg_latency') else None hosts_list.append({ 'address': h.get('host', ''), 'healthy': is_healthy, 'latency_ms': lat_ms, 'success_rate': h.get('success_rate', 0), }) if is_healthy: healthy_count += 1 if lat_ms and lat_ms > 0: latency_sum += lat_ms latency_count += 1 pool_info = { 'hosts': hosts_list, 'total_requests': pool_stats.get('total_requests', 0), 'success_rate': pool_stats.get('success_rate', 0), 'healthy_count': healthy_count, 'total_count': len(hosts_list), 'avg_latency': latency_sum / latency_count if latency_count > 0 else 0, } stats_data['tor_pool'] = pool_info else: stats_data['tor_pool'] = { 'hosts': [], 'total_requests': 0, 'success_rate': 0, 'healthy_count': 0, 'total_count': 0, 'avg_latency': 0 } # Judge stats (when using judges checktype) if config.watchd.checktype == 'judges': js = judge_stats.get_stats() stats_data['judges'] = { 'total': js.get('total', 0), 'available': js.get('available', 0), 'in_cooldown': js.get('in_cooldown', 0), 'top_judges': js.get('top', [])[:5] # top 5 most successful } else: stats_data['judges'] = None # Scraper/engine stats if scraper_available: scraper_stats = scraper_module.get_scraper_stats() if scraper_stats: stats_data['engines_available'] = scraper_stats.get('available', 0) stats_data['engines_total'] = scraper_stats.get('total', 0) stats_data['engines_backoff'] = scraper_stats.get('in_backoff', 0) stats_data['scraper'] = scraper_stats else: stats_data['engines_available'] = 0 stats_data['engines_total'] = 0 stats_data['engines_backoff'] = 0 stats_data['scraper'] = None else: stats_data['engines_available'] = 0 stats_data['engines_total'] = 0 stats_data['engines_backoff'] = 0 stats_data['scraper'] = None return stats_data def _run(self): _log('starting...', 'watchd') _log('config: db=%s checktype=%s threads=%d checktime=%d perfail=%d max_fail=%d' % ( config.watchd.database, config.watchd.checktype, config.watchd.threads, config.watchd.checktime, config.watchd.perfail_checktime, config.watchd.max_fail), 'watchd') # Log database status at startup self._prep_db() total = self.mysqlite.execute('SELECT COUNT(*) FROM proxylist').fetchone()[0] now = time.time() due = self.mysqlite.execute( 'SELECT COUNT(*) FROM proxylist WHERE failed >= 0 and failed < ? and (tested + ? + (failed * ?)) < ?', (config.watchd.max_fail, config.watchd.checktime, config.watchd.perfail_checktime, now) ).fetchone()[0] # Create stats persistence tables dbs.create_table_if_not_exists(self.mysqlite, 'stats_history') dbs.create_table_if_not_exists(self.mysqlite, 'session_state') # Load persisted session state saved_state = dbs.load_session_state(self.mysqlite) if saved_state: self.stats.load_state(saved_state) self._close_db() _log('database: %d total proxies, %d due for testing' % (total, due), 'watchd') # Initialize Tor connection pool connection_pool.init_pool(config.torhosts, warmup=True) # Start HTTP API server if enabled if config.httpd.enabled: from httpd import ProxyAPIServer self.httpd_server = ProxyAPIServer( config.httpd.listenip, config.httpd.port, config.watchd.database, stats_provider=self.get_runtime_stats ) self.httpd_server.start() # create worker threads with shared queues for i in range(config.watchd.threads): threadid = ''.join([random.choice(string.ascii_letters) for x in range(5)]) wt = WorkerThread(threadid, self.job_queue, self.result_queue) if self.in_background: wt.start_thread() self.threads.append(wt) time.sleep(random.random() / 10) sleeptime = 0 while True: if self.stopping.is_set(): if self.in_background: self._cleanup() break if sleeptime > 0: time.sleep(1) sleeptime -= 1 continue # check if job queue is empty (work-stealing: threads pull as needed) if self.job_queue.empty(): self.collect_work() if not self.submit_collected() and self.tor_safeguard: _log("zzZzZzzZ sleeping 1 minute(s) due to tor issues", "watchd") sleeptime = 60 else: job_count = self.prepare_jobs() if job_count == 0: # no jobs available, wait before checking again sleeptime = 10 if not self.in_background: # single_thread scenario self.threads[0].workloop() self.collect_work() if len(self.collected) > self.submit_after: if not self.submit_collected() and self.tor_safeguard: _log("zzZzZzzZ sleeping 1 minute(s) due to tor issues", "watchd") sleeptime = 60 # Update rate history for sparklines self.stats.update_history() # periodic stats report if self.stats.should_report(config.watchd.stats_interval): _log(self.stats.report(), 'stats') # Also report pool stats pool = connection_pool.get_pool() if pool: _log(pool.status_line(), 'stats') # Report judge stats (when using judges checktype) if config.watchd.checktype == 'judges': _log(judge_stats.status_line(), 'stats') # Report scaler status _log(self.scaler.status_line(len(self.threads), self.job_queue.qsize()), 'scaler') # Save session state periodically (every stats_interval, default 5m) try: self._prep_db() dbs.save_session_state(self.mysqlite, self.stats) self._close_db() except Exception as e: _log('failed to save session state: %s' % str(e), 'warn') # Hourly stats snapshot now = time.time() if not hasattr(self, '_last_snapshot'): self._last_snapshot = now if (now - self._last_snapshot) >= 3600: try: self._prep_db() dbs.save_stats_snapshot(self.mysqlite, self.stats) self._close_db() self._last_snapshot = now except Exception as e: _log('failed to save stats snapshot: %s' % str(e), 'warn') # Dynamic thread scaling if self.in_background: target = self.scaler.should_scale( len(self.threads), self.job_queue.qsize(), self.stats ) if target != len(self.threads): self._adjust_threads(target) # periodic stale proxy cleanup (daily) if (time.time() - self.last_cleanup) >= 86400: self.cleanup_stale() time.sleep(1) if __name__ == '__main__': _run_standalone = True config.load() errors = config.validate() if errors: for e in errors: _log(e, 'error') import sys sys.exit(1) w = Proxywatchd() try: w.start() w.run() except KeyboardInterrupt as e: pass finally: w.stop() w.finish()