diff --git a/dns.py b/dns.py new file mode 100644 index 0000000..f356b60 --- /dev/null +++ b/dns.py @@ -0,0 +1,86 @@ +#!/usr/bin/env python2 +"""DNS resolution with caching for PPF proxy testing.""" +from __future__ import division + +import time + +from misc import _log +import rocksock + +# Module-level config reference +config = None + +# DNS cache: {hostname: (ip, timestamp)} +cached_dns = {} +DNS_CACHE_TTL = 3600 # 1 hour + + +def set_config(cfg): + """Set the config object (called from proxywatchd/ppf).""" + global config + config = cfg + + +def socks4_resolve(srvname, server_port): + """Resolve hostname to IP for SOCKS4 (which requires numeric IP). + + Caches results for DNS_CACHE_TTL seconds to avoid repeated lookups. + + Args: + srvname: Hostname to resolve + server_port: Port number (for resolution) + + Returns: + IP address string, or False on failure + """ + now = time.time() + # Check cache with TTL + if srvname in cached_dns: + ip, ts = cached_dns[srvname] + if now - ts < DNS_CACHE_TTL: + if config and config.watchd.debug: + _log("using cached ip (%s) for %s" % (ip, srvname), "debug") + return ip + # Expired - fall through to re-resolve + + # Resolve hostname + dns_fail = False + try: + af, sa = rocksock.resolve(rocksock.RocksockHostinfo(srvname, server_port), want_v4=True) + if sa is not None: + cached_dns[srvname] = (sa[0], now) + return sa[0] + else: + dns_fail = True + except rocksock.RocksockException as e: + assert(e.get_errortype() == rocksock.RS_ET_GAI) + dns_fail = True + + if dns_fail: + _log("could not resolve connection target %s" % srvname, "ERROR") + return False + return srvname + + +def clear_cache(): + """Clear the DNS cache.""" + global cached_dns + cached_dns = {} + + +def cache_stats(): + """Return DNS cache statistics. + + Returns: + dict with count, oldest_age, newest_age + """ + if not cached_dns: + return {'count': 0, 'oldest_age': 0, 'newest_age': 0} + + now = time.time() + ages = [now - ts for _, ts in cached_dns.values()] + return { + 'count': len(cached_dns), + 'oldest_age': max(ages), + 'newest_age': min(ages), + } diff --git a/job.py b/job.py new file mode 100644 index 0000000..a8eac17 --- /dev/null +++ b/job.py @@ -0,0 +1,103 @@ +#!/usr/bin/env python2 +"""Job queue and priority handling for PPF proxy testing.""" +from __future__ import division + +import heapq +import threading +try: + import Queue +except ImportError: + import queue as Queue + + +class PriorityJobQueue(object): + """Priority queue for proxy test jobs. + + Lower priority number = higher priority. + Priority 0: New proxies (never tested) + Priority 1: Recently working (no failures, has successes) + Priority 2: Low fail count (< 3 failures) + Priority 3: Medium fail count + Priority 4: High fail count + """ + + def __init__(self): + self.heap = [] + self.lock = threading.Lock() + self.not_empty = threading.Condition(self.lock) + self.counter = 0 # tie-breaker for equal priorities + + def put(self, job, priority=3): + """Add job with priority (lower = higher priority).""" + with self.lock: + # Reset counter when queue was empty to prevent unbounded growth + if not self.heap: + self.counter = 0 + heapq.heappush(self.heap, (priority, self.counter, job)) + self.counter += 1 + self.not_empty.notify() + + def get(self, timeout=None): + """Get highest priority job. Raises Queue.Empty on timeout.""" + with self.not_empty: + if timeout is None: + while not self.heap: + self.not_empty.wait() + else: + end_time = __import__('time').time() + timeout + while not self.heap: + remaining = end_time - __import__('time').time() + if remaining <= 0: + raise Queue.Empty() + self.not_empty.wait(remaining) + _, _, job = heapq.heappop(self.heap) + return job + + def get_nowait(self): + """Get job without waiting. Raises Queue.Empty if empty.""" + with self.lock: + if not self.heap: + raise Queue.Empty() + _, _, job = heapq.heappop(self.heap) + return job + + def empty(self): + """Check if queue is empty.""" + with self.lock: + return len(self.heap) == 0 + + def qsize(self): + """Return queue size.""" + with self.lock: + return len(self.heap) + + def task_done(self): + """Compatibility method (no-op for heap queue).""" + pass + + +def calculate_priority(failcount, success_count, max_fail): + """Calculate job priority based on proxy state. + + Args: + failcount: Current failure count + success_count: Lifetime success count + max_fail: Maximum failures before considered dead + + Returns: + int: Priority 0-4 (lower = higher priority) + """ + # New proxy (never successfully tested) + if success_count == 0 and failcount == 0: + return 0 + # Recently working (no current failures) + if failcount == 0: + return 1 + # Low fail count + if failcount < 3: + return 2 + # Medium fail count + if failcount < max_fail // 2: + return 3 + # High fail count + return 4 diff --git a/mitm.py b/mitm.py new file mode 100644 index 0000000..76b0b8d --- /dev/null +++ b/mitm.py @@ -0,0 +1,239 @@ +#!/usr/bin/env python2 +"""MITM certificate detection and tracking for PPF.""" +from __future__ import division + +import threading +import time + +from misc import _log, tor_proxy_url +import rocksock + + +class MITMCertStats(object): + """Track MITM certificate statistics.""" + + def __init__(self): + self.lock = threading.Lock() + self.certs = {} # fingerprint -> cert_info dict + self.by_org = {} # organization -> count + self.by_issuer = {} # issuer CN -> count + self.by_proxy = {} # proxy IP -> list of fingerprints + self.total_count = 0 + self.recent_certs = [] # last N certificates seen + + def add_cert(self, proxy_ip, cert_info): + """Add a MITM certificate to statistics.""" + if not cert_info: + return + fp = cert_info.get('fingerprint', '') + if not fp: + return + + with self.lock: + self.total_count += 1 + + # Store unique certs by fingerprint + if fp not in self.certs: + self.certs[fp] = cert_info + self.certs[fp]['first_seen'] = time.time() + self.certs[fp]['count'] = 1 + self.certs[fp]['proxies'] = [proxy_ip] + else: + self.certs[fp]['count'] += 1 + self.certs[fp]['last_seen'] = time.time() + if proxy_ip not in self.certs[fp]['proxies']: + self.certs[fp]['proxies'].append(proxy_ip) + + # Track by organization + org = cert_info.get('subject_o', 'Unknown') + self.by_org[org] = self.by_org.get(org, 0) + 1 + + # Track by issuer + issuer = cert_info.get('issuer_cn', 'Unknown') + self.by_issuer[issuer] = self.by_issuer.get(issuer, 0) + 1 + + # Track proxies using this cert + if proxy_ip not in self.by_proxy: + self.by_proxy[proxy_ip] = [] + if fp not in self.by_proxy[proxy_ip]: + self.by_proxy[proxy_ip].append(fp) + + # Keep recent certs (last 50) + self.recent_certs.append({ + 'fingerprint': fp, + 'proxy': proxy_ip, + 'subject_cn': cert_info.get('subject_cn', ''), + 'issuer_cn': cert_info.get('issuer_cn', ''), + 'timestamp': time.time() + }) + if len(self.recent_certs) > 50: + self.recent_certs.pop(0) + + def get_stats(self): + """Get MITM certificate statistics for API.""" + with self.lock: + # Top organizations + top_orgs = sorted(self.by_org.items(), key=lambda x: x[1], reverse=True)[:10] + # Top issuers + top_issuers = sorted(self.by_issuer.items(), key=lambda x: x[1], reverse=True)[:10] + # Unique certs sorted by count + unique_certs = [] + for fp, info in self.certs.items(): + cert_entry = {'fingerprint': fp} + cert_entry.update(info) + unique_certs.append(cert_entry) + unique_certs = sorted(unique_certs, key=lambda x: x.get('count', 0), reverse=True)[:20] + + return { + 'total_detections': self.total_count, + 'unique_certs': len(self.certs), + 'unique_proxies': len(self.by_proxy), + 'top_organizations': [{'name': o, 'count': c} for o, c in top_orgs], + 'top_issuers': [{'name': i, 'count': c} for i, c in top_issuers], + 'certificates': unique_certs, + 'recent': list(self.recent_certs[-20:]) + } + + def save_state(self, filepath): + """Save MITM stats to JSON file for persistence.""" + import json + with self.lock: + state = { + 'certs': self.certs, + 'by_org': self.by_org, + 'by_issuer': self.by_issuer, + 'by_proxy': self.by_proxy, + 'total_count': self.total_count, + 'recent_certs': self.recent_certs[-50:], + } + try: + with open(filepath, 'w') as f: + json.dump(state, f) + except Exception as e: + _log('failed to save MITM state: %s' % str(e), 'warn') + + def load_state(self, filepath): + """Load MITM stats from JSON file.""" + import json + try: + with open(filepath, 'r') as f: + state = json.load(f) + with self.lock: + self.certs = state.get('certs', {}) + self.by_org = state.get('by_org', {}) + self.by_issuer = state.get('by_issuer', {}) + self.by_proxy = state.get('by_proxy', {}) + self.total_count = state.get('total_count', 0) + self.recent_certs = state.get('recent_certs', []) + _log('restored MITM state: %d certs, %d detections' % ( + len(self.certs), self.total_count), 'info') + except IOError: + pass # File doesn't exist yet, start fresh + except Exception as e: + _log('failed to load MITM state: %s' % str(e), 'warn') + + +def extract_cert_info(cert_der): + """Extract certificate information from DER-encoded certificate. + + Args: + cert_der: DER-encoded certificate bytes + + Returns: + dict with certificate details or None on failure + """ + import hashlib + try: + # Decode DER to get certificate details + # Python 2/3 compatible approach using ssl module + from OpenSSL import crypto + x509 = crypto.load_certificate(crypto.FILETYPE_ASN1, cert_der) + + subject = x509.get_subject() + issuer = x509.get_issuer() + + # Parse dates (format: YYYYMMDDhhmmssZ) + not_before = x509.get_notBefore() + not_after = x509.get_notAfter() + if isinstance(not_before, bytes): + not_before = not_before.decode('ascii') + if isinstance(not_after, bytes): + not_after = not_after.decode('ascii') + + # Calculate fingerprint + fp = hashlib.sha256(cert_der).hexdigest() + + return { + 'fingerprint': fp[:16], # Short fingerprint for display + 'fingerprint_full': fp, + 'subject_cn': subject.CN or '', + 'subject_o': subject.O or '', + 'subject_ou': subject.OU or '', + 'subject_c': subject.C or '', + 'issuer_cn': issuer.CN or '', + 'issuer_o': issuer.O or '', + 'serial': str(x509.get_serial_number()), + 'not_before': not_before, + 'not_after': not_after, + 'version': x509.get_version(), + 'sig_algo': x509.get_signature_algorithm().decode('ascii') if hasattr(x509.get_signature_algorithm(), 'decode') else str(x509.get_signature_algorithm()), + } + except ImportError: + # Fallback if pyOpenSSL not available - basic info from hashlib + import hashlib + fp = hashlib.sha256(cert_der).hexdigest() + return { + 'fingerprint': fp[:16], + 'fingerprint_full': fp, + 'subject_cn': '(pyOpenSSL not installed)', + 'subject_o': '', + 'issuer_cn': '', + 'issuer_o': '', + 'serial': '', + 'not_before': '', + 'not_after': '', + } + except Exception: + return None + + +def get_mitm_certificate(proxy_ip, proxy_port, proto, torhost, target_host, target_port, timeout, auth=None): + """Connect to target through proxy without cert verification to get MITM cert. + + Args: + proxy_ip: Proxy IP address + proxy_port: Proxy port + proto: Proxy protocol (http, socks4, socks5) + torhost: Tor SOCKS5 address + target_host: Target host for SSL connection + target_port: Target port (usually 443) + timeout: Connection timeout + auth: Optional auth credentials (user:pass) + + Returns: + dict with certificate info or None on failure + """ + try: + if auth: + proxy_url = '%s://%s@%s:%s' % (proto, auth, proxy_ip, proxy_port) + else: + proxy_url = '%s://%s:%s' % (proto, proxy_ip, proxy_port) + proxies = [ + rocksock.RocksockProxyFromURL(tor_proxy_url(torhost)), + rocksock.RocksockProxyFromURL(proxy_url), + ] + + # Connect without certificate verification + sock = rocksock.Rocksock(host=target_host, port=target_port, ssl=True, + proxies=proxies, timeout=timeout, verifycert=False) + sock.connect() + + # Get peer certificate + cert_der = sock.sock.getpeercert(binary_form=True) + sock.disconnect() + + if cert_der: + return extract_cert_info(cert_der) + return None + except Exception: + return None diff --git a/proxywatchd.py b/proxywatchd.py index 6baa5ea..89594eb 100644 --- a/proxywatchd.py +++ b/proxywatchd.py @@ -11,7 +11,6 @@ import time import random import string import re -import heapq import signal import network_stats @@ -41,9 +40,14 @@ from config import Config import mysqlite import dbs +import dns from misc import _log, categorize_error, tor_proxy_url, is_ssl_protocol_error import rocksock import connection_pool +from stats import JudgeStats, Stats, regexes, ssl_targets, try_div +from mitm import MITMCertStats, extract_cert_info, get_mitm_certificate +from dns import socks4_resolve +from job import PriorityJobQueue, calculate_priority # Optional scraper integration for engine stats try: @@ -59,10 +63,9 @@ def set_config(cfg): """Set the config object (used when imported from ppf.py).""" global config config = cfg + dns.set_config(cfg) _run_standalone = False -cached_dns = {} # {hostname: (ip, timestamp)} -DNS_CACHE_TTL = 3600 # 1 hour # Debug mode for proxy check path - set via PPF_DEBUG env or config _debug_proxy = os.environ.get('PPF_DEBUG', '').lower() in ('1', 'true', 'proxy') @@ -158,879 +161,12 @@ judges = { } -class JudgeStats(): - """Track per-judge success/failure rates for reliability scoring. - Judges that frequently block or rate-limit are temporarily avoided. - Stats decay over time to allow recovery. - """ - - def __init__(self, cooldown_seconds=300, block_threshold=3): - self.lock = threading.Lock() - self.stats = {} # judge -> {'success': n, 'fail': n, 'block': n, 'last_block': timestamp} - self.cooldown_seconds = cooldown_seconds # seconds to avoid blocked judges - self.block_threshold = block_threshold # consecutive blocks before cooldown - - def record_success(self, judge): - """Record successful judge response.""" - with self.lock: - if judge not in self.stats: - self.stats[judge] = {'success': 0, 'fail': 0, 'block': 0, 'last_block': 0} - self.stats[judge]['success'] += 1 - # Reset block count on success - self.stats[judge]['block'] = 0 - - def record_failure(self, judge): - """Record judge failure (proxy failed, not judge block).""" - with self.lock: - if judge not in self.stats: - self.stats[judge] = {'success': 0, 'fail': 0, 'block': 0, 'last_block': 0} - self.stats[judge]['fail'] += 1 - - def record_block(self, judge): - """Record judge blocking the proxy (403, captcha, rate-limit).""" - with self.lock: - if judge not in self.stats: - self.stats[judge] = {'success': 0, 'fail': 0, 'block': 0, 'last_block': 0} - self.stats[judge]['block'] += 1 - self.stats[judge]['last_block'] = time.time() - - def is_available(self, judge): - """Check if judge is available (not in cooldown).""" - with self.lock: - if judge not in self.stats: - return True - s = self.stats[judge] - # Check if in cooldown period - if s['block'] >= self.block_threshold: - if (time.time() - s['last_block']) < self.cooldown_seconds: - return False - # Cooldown expired, reset block count - s['block'] = 0 - return True - - def get_available_judges(self, judge_list): - """Return list of judges not in cooldown.""" - return [j for j in judge_list if self.is_available(j)] - - def status_line(self): - """Return status summary for logging.""" - with self.lock: - total = len(self.stats) - blocked = sum(1 for s in self.stats.values() - if s['block'] >= self.block_threshold and - (time.time() - s['last_block']) < self.cooldown_seconds) - return 'judges: %d total, %d in cooldown' % (total, blocked) - - def get_stats(self): - """Return statistics dict for API/dashboard.""" - with self.lock: - now = time.time() - total = len(self.stats) - in_cooldown = sum(1 for s in self.stats.values() - if s['block'] >= self.block_threshold and - (now - s['last_block']) < self.cooldown_seconds) - available = total - in_cooldown - # Get top judges by success count - top = [] - for judge, s in self.stats.items(): - total_tests = s['success'] + s['fail'] - if total_tests > 0: - success_pct = (s['success'] * 100.0) / total_tests - top.append({'judge': judge, 'success': s['success'], - 'tests': total_tests, 'rate': round(success_pct, 1)}) - top.sort(key=lambda x: x['success'], reverse=True) - return {'total': total, 'available': available, 'in_cooldown': in_cooldown, 'top': top} - - -# Global judge stats instance +# Global instances judge_stats = JudgeStats() - -# HTTP targets - check for specific headers -regexes = { - 'www.facebook.com': 'X-FB-Debug', - 'www.fbcdn.net': 'X-FB-Debug', - 'www.reddit.com': 'x-clacks-overhead', - 'www.twitter.com': 'x-connection-hash', - 't.co': 'x-connection-hash', - 'www.msn.com': 'x-aspnetmvc-version', - 'www.bing.com': 'p3p', - 'www.ask.com': 'x-served-by', - 'www.hotmail.com': 'x-msedge-ref', - 'www.bbc.co.uk': 'x-bbc-edge-cache-status', - 'www.skype.com': 'X-XSS-Protection', - 'www.alibaba.com': 'object-status', - 'www.mozilla.org': 'cf-ray', - 'www.cloudflare.com': 'cf-ray', - 'www.wikimedia.org': 'x-client-ip', - 'www.vk.com': 'x-frontend', - 'www.tinypic.com': 'x-amz-cf-pop', - 'www.netflix.com': 'X-Netflix.proxy.execution-time', - 'www.amazon.de': 'x-amz-cf-id', - 'www.reuters.com': 'x-amz-cf-id', - 'www.ikea.com': 'x-frame-options', - 'www.twitpic.com': 'timing-allow-origin', - 'www.digg.com': 'cf-request-id', - 'www.wikia.com': 'x-served-by', - 'www.wp.com': 'x-ac', - 'www.last.fm': 'x-timer', - 'www.usps.com': 'x-ruleset-version', - 'www.linkedin.com': 'x-li-uuid', - 'www.vimeo.com': 'x-timer', - 'www.yelp.com': 'x-timer', - 'www.ebay.com': 'x-envoy-upstream-service-time', - 'www.wikihow.com': 'x-c', - 'www.archive.org': 'referrer-policy', - 'www.pandora.tv': 'X-UA-Compatible', - 'www.w3.org': 'x-backend', - 'www.time.com': 'x-amz-cf-pop' -} - -# SSL targets - verify TLS handshake only (MITM detection) -# No HTTP request needed - just connect with verifycert=True -ssl_targets = [ - 'www.google.com', - 'www.microsoft.com', - 'www.apple.com', - 'www.amazon.com', - 'www.cloudflare.com', - 'www.github.com', - 'www.mozilla.org', - 'www.wikipedia.org', - 'www.reddit.com', - 'www.twitter.com', - 'x.com', - 'www.facebook.com', - 'www.linkedin.com', - 'www.paypal.com', - 'www.stripe.com', - 'www.digicert.com', - 'www.letsencrypt.org', -] - -class Stats(): - """Track and report comprehensive runtime statistics.""" - - HISTORY_SIZE = 120 # 10 min at 5s intervals - LATENCY_BUCKETS = [100, 250, 500, 1000, 2000, 5000, 10000] # ms thresholds - - def __init__(self): - self.lock = threading.RLock() # RLock for reentrant access (get_runtime_stats) - self.tested = 0 - self.passed = 0 - self.failed = 0 - self.start_time = time.time() - self.last_report = time.time() - - # Failure category tracking - self.fail_categories = {} - - # Protocol tracking (tested, passed, and failed separately) - self.proto_tested = {'http': 0, 'socks4': 0, 'socks5': 0} - self.proto_passed = {'http': 0, 'socks4': 0, 'socks5': 0} - self.proto_failed = {'http': {}, 'socks4': {}, 'socks5': {}} # Failures by category per proto - self.by_proto = self.proto_passed # Alias for dashboard API - - # Time series history (5s intervals) - self.rate_history = [] - self.success_rate_history = [] - self.latency_history = [] - self.last_history_time = time.time() - self.last_history_tested = 0 - self.last_history_passed = 0 - - # Peak values (delayed measurement to avoid startup anomalies) - self.peak_rate = 0.0 - self.peak_success_rate = 0.0 - self.peak_grace_period = 30 # seconds before recording peaks - self.min_latency = float('inf') - self.max_latency = 0.0 - - # Latency tracking with percentiles - self.latency_sum = 0.0 - self.latency_count = 0 - self.latency_samples = [] # Recent samples for percentiles - self.latency_buckets = {b: 0 for b in self.LATENCY_BUCKETS + [float('inf')]} - - # Recent window (last 60s) - self.recent_tested = 0 - self.recent_passed = 0 - self.recent_start = time.time() - - # Country/ASN tracking (top N) - self.country_passed = {} - self.asn_passed = {} - - # Hourly aggregates - self.hourly_tested = 0 - self.hourly_passed = 0 - self.hourly_start = time.time() - self.hours_data = [] # Last 24 hours - - # SSL/TLS tracking - self.ssl_tested = 0 - self.ssl_passed = 0 - self.ssl_failed = 0 - self.ssl_fail_categories = {} # Track SSL failures by category - self.mitm_detected = 0 - self.cert_errors = 0 - - def record(self, success, category=None, proto=None, latency_ms=None, country=None, asn=None, - ssl_test=False, mitm=False, cert_error=False): - with self.lock: - self.tested += 1 - self.recent_tested += 1 - self.hourly_tested += 1 - - # Track protocol tests - if proto and proto in self.proto_tested: - self.proto_tested[proto] += 1 - - if success: - self.passed += 1 - self.recent_passed += 1 - self.hourly_passed += 1 - - if proto and proto in self.proto_passed: - self.proto_passed[proto] += 1 - - if latency_ms and latency_ms > 0: - self.latency_sum += latency_ms - self.latency_count += 1 - # Track min/max - if latency_ms < self.min_latency: - self.min_latency = latency_ms - if latency_ms > self.max_latency: - self.max_latency = latency_ms - # Keep recent samples for percentiles (max 1000) - self.latency_samples.append(latency_ms) - if len(self.latency_samples) > 1000: - self.latency_samples.pop(0) - # Bucket for histogram - for bucket in self.LATENCY_BUCKETS: - if latency_ms <= bucket: - self.latency_buckets[bucket] += 1 - break - else: - self.latency_buckets[float('inf')] += 1 - - # Track country/ASN - if country: - self.country_passed[country] = self.country_passed.get(country, 0) + 1 - if asn: - self.asn_passed[asn] = self.asn_passed.get(asn, 0) + 1 - else: - self.failed += 1 - if category: - self.fail_categories[category] = self.fail_categories.get(category, 0) + 1 - # Track failures by protocol - if proto and proto in self.proto_failed: - self.proto_failed[proto][category] = self.proto_failed[proto].get(category, 0) + 1 - # Log failure category breakdown every 1000 failures - if self.failed % 1000 == 0: - top_cats = sorted(self.fail_categories.items(), key=lambda x: -x[1])[:5] - cats_str = ', '.join(['%s:%d' % (c, n) for c, n in top_cats]) - _log('fail breakdown (%d total): %s' % (self.failed, cats_str), 'diag') - - # SSL/TLS tracking - if ssl_test: - self.ssl_tested += 1 - if success: - self.ssl_passed += 1 - else: - self.ssl_failed += 1 - # Track which error caused the SSL failure - if category: - self.ssl_fail_categories[category] = self.ssl_fail_categories.get(category, 0) + 1 - if mitm: - self.mitm_detected += 1 - if cert_error: - self.cert_errors += 1 - - def update_history(self): - """Update time series history (call periodically).""" - now = time.time() - with self.lock: - elapsed = now - self.last_history_time - if elapsed >= 5: # Update every 5 seconds - # Rate - with sanity checks - tests_delta = self.tested - self.last_history_tested - if tests_delta < 0: - # Counter wrapped or corrupted - reset baseline - self.last_history_tested = self.tested - tests_delta = 0 - rate = tests_delta / elapsed if elapsed > 0 else 0 - # Cap at reasonable max (100/s is generous for proxy testing) - if rate > 100: - rate = 0 # Discard bogus value - self.rate_history.append(round(rate, 2)) - if len(self.rate_history) > self.HISTORY_SIZE: - self.rate_history.pop(0) - # Only record peaks after grace period (avoid startup anomalies) - uptime = now - self.start_time - if uptime >= self.peak_grace_period and rate > self.peak_rate and rate <= 100: - self.peak_rate = rate - - # Success rate - with sanity checks - passed_delta = self.passed - self.last_history_passed - if passed_delta < 0: - self.last_history_passed = self.passed - passed_delta = 0 - sr = (passed_delta / tests_delta * 100) if tests_delta > 0 else 0 - sr = min(sr, 100.0) # Cap at 100% - self.success_rate_history.append(round(sr, 1)) - if len(self.success_rate_history) > self.HISTORY_SIZE: - self.success_rate_history.pop(0) - if uptime >= self.peak_grace_period and sr > self.peak_success_rate: - self.peak_success_rate = sr - - # Average latency for this interval - avg_lat = self.get_avg_latency() - self.latency_history.append(round(avg_lat, 0)) - if len(self.latency_history) > self.HISTORY_SIZE: - self.latency_history.pop(0) - - self.last_history_time = now - self.last_history_tested = self.tested - self.last_history_passed = self.passed - - # Reset recent window every 60s - if now - self.recent_start >= 60: - self.recent_tested = 0 - self.recent_passed = 0 - self.recent_start = now - - # Hourly aggregation - if now - self.hourly_start >= 3600: - self.hours_data.append({ - 'tested': self.hourly_tested, - 'passed': self.hourly_passed, - 'rate': self.hourly_passed / 3600.0 if self.hourly_tested > 0 else 0, - 'success_rate': (self.hourly_passed / self.hourly_tested * 100) if self.hourly_tested > 0 else 0, - }) - if len(self.hours_data) > 24: - self.hours_data.pop(0) - self.hourly_tested = 0 - self.hourly_passed = 0 - self.hourly_start = now - - def get_recent_rate(self): - """Get rate for last 60 seconds.""" - with self.lock: - elapsed = time.time() - self.recent_start - if elapsed > 0: - return self.recent_tested / elapsed - return 0.0 - - def get_recent_success_rate(self): - """Get success rate for last 60 seconds.""" - with self.lock: - if self.recent_tested > 0: - return (self.recent_passed / self.recent_tested) * 100 - return 0.0 - - def get_avg_latency(self): - """Get average latency in ms.""" - with self.lock: - if self.latency_count > 0: - return self.latency_sum / self.latency_count - return 0.0 - - def get_latency_percentiles(self): - """Get latency percentiles (p50, p90, p99).""" - with self.lock: - if not self.latency_samples: - return {'p50': 0, 'p90': 0, 'p99': 0} - sorted_samples = sorted(self.latency_samples) - n = len(sorted_samples) - return { - 'p50': sorted_samples[int(n * 0.50)] if n > 0 else 0, - 'p90': sorted_samples[int(n * 0.90)] if n > 0 else 0, - 'p99': sorted_samples[min(int(n * 0.99), n - 1)] if n > 0 else 0, - } - - def get_latency_histogram(self): - """Get latency distribution histogram.""" - with self.lock: - total = sum(self.latency_buckets.values()) - if total == 0: - return [] - result = [] - prev = 0 - for bucket in self.LATENCY_BUCKETS: - count = self.latency_buckets[bucket] - result.append({ - 'range': '%d-%d' % (prev, bucket), - 'count': count, - 'pct': round(count / total * 100, 1), - }) - prev = bucket - # Over max bucket - over = self.latency_buckets[float('inf')] - if over > 0: - result.append({ - 'range': '>%d' % self.LATENCY_BUCKETS[-1], - 'count': over, - 'pct': round(over / total * 100, 1), - }) - return result - - def get_proto_stats(self): - """Get protocol-specific success rates and failure breakdown.""" - with self.lock: - result = {} - for proto in ['http', 'socks4', 'socks5']: - tested = self.proto_tested[proto] - passed = self.proto_passed[proto] - failed = sum(self.proto_failed[proto].values()) - result[proto] = { - 'tested': tested, - 'passed': passed, - 'failed': failed, - 'success_rate': round(passed / tested * 100, 1) if tested > 0 else 0, - 'fail_reasons': dict(self.proto_failed[proto]) if self.proto_failed[proto] else {}, - } - return result - - def get_top_countries(self, limit=10): - """Get top countries by working proxy count.""" - with self.lock: - sorted_countries = sorted(self.country_passed.items(), key=lambda x: -x[1]) - return sorted_countries[:limit] - - def get_top_asns(self, limit=10): - """Get top ASNs by working proxy count.""" - with self.lock: - sorted_asns = sorted(self.asn_passed.items(), key=lambda x: -x[1]) - return sorted_asns[:limit] - - def get_hourly_data(self): - """Get last 24 hours of hourly data.""" - with self.lock: - return list(self.hours_data) - - def load_state(self, state): - """Load persisted state from a dict (from database). - - Args: - state: dict from dbs.load_session_state() - """ - if not state: - return - with self.lock: - self.tested = state.get('tested', 0) - self.passed = state.get('passed', 0) - self.failed = state.get('failed', 0) - self.ssl_tested = state.get('ssl_tested', 0) - self.ssl_passed = state.get('ssl_passed', 0) - self.ssl_failed = state.get('ssl_failed', 0) - self.mitm_detected = state.get('mitm_detected', 0) - self.cert_errors = state.get('cert_errors', 0) - self.proto_tested['http'] = state.get('proto_http_tested', 0) - self.proto_passed['http'] = state.get('proto_http_passed', 0) - self.proto_tested['socks4'] = state.get('proto_socks4_tested', 0) - self.proto_passed['socks4'] = state.get('proto_socks4_passed', 0) - self.proto_tested['socks5'] = state.get('proto_socks5_tested', 0) - self.proto_passed['socks5'] = state.get('proto_socks5_passed', 0) - # Note: peak_rate is per-session, not restored (avoids stale/corrupt values) - # Note: start_time is NOT restored - uptime reflects current session - # Restore failure categories - if state.get('fail_categories'): - self.fail_categories = dict(state['fail_categories']) - # Restore SSL failure categories - if state.get('ssl_fail_categories'): - self.ssl_fail_categories = dict(state['ssl_fail_categories']) - # Restore protocol failure categories - if state.get('proto_failed'): - for proto in ['http', 'socks4', 'socks5']: - if proto in state['proto_failed']: - self.proto_failed[proto] = dict(state['proto_failed'][proto]) - # Restore geo tracking - if state.get('country_passed'): - self.country_passed = dict(state['country_passed']) - if state.get('asn_passed'): - # Convert string keys back to int for ASN - self.asn_passed = {int(k) if k.isdigit() else k: v - for k, v in state['asn_passed'].items()} - _log('restored session: %d tested, %d passed' % (self.tested, self.passed), 'info') - - def should_report(self, interval): - return (time.time() - self.last_report) >= interval - - def report(self): - with self.lock: - self.last_report = time.time() - elapsed = time.time() - self.start_time - rate = try_div(self.tested, elapsed) - pct = try_div(self.passed * 100.0, self.tested) - base = 'tested=%d passed=%d (%.1f%%) rate=%.2f/s uptime=%dm' % ( - self.tested, self.passed, pct, rate, int(elapsed / 60)) - # Add failure breakdown if there are failures - if self.fail_categories: - cats = ' '.join('%s=%d' % (k, v) for k, v in sorted(self.fail_categories.items())) - return '%s [%s]' % (base, cats) - return base - - def get_full_stats(self): - """Get comprehensive stats dict for API.""" - with self.lock: - elapsed = time.time() - self.start_time - return { - 'tested': self.tested, - 'passed': self.passed, - 'failed': self.failed, - 'success_rate': round(self.passed / self.tested * 100, 1) if self.tested > 0 else 0, - 'rate': round(self.tested / elapsed, 2) if elapsed > 0 else 0, - 'pass_rate': round(self.passed / elapsed, 2) if elapsed > 0 else 0, - 'recent_rate': self.get_recent_rate(), - 'recent_success_rate': self.get_recent_success_rate(), - 'peak_rate': self.peak_rate, - 'peak_success_rate': self.peak_success_rate, - 'uptime_seconds': int(elapsed), - 'rate_history': list(self.rate_history), - 'success_rate_history': list(self.success_rate_history), - 'latency_history': list(self.latency_history), - 'avg_latency': self.get_avg_latency(), - 'min_latency': self.min_latency if self.min_latency != float('inf') else 0, - 'max_latency': self.max_latency, - 'latency_percentiles': self.get_latency_percentiles(), - 'latency_histogram': self.get_latency_histogram(), - 'by_proto': dict(self.proto_passed), - 'proto_stats': self.get_proto_stats(), - 'failures': dict(self.fail_categories), - 'top_countries': self.get_top_countries(), - 'top_asns': self.get_top_asns(), - 'hourly_data': self.get_hourly_data(), - } - - -def try_div(a, b): - if b != 0: return a/float(b) - return 0 - - -class MITMCertStats(object): - """Track MITM certificate statistics.""" - - def __init__(self): - self.lock = threading.Lock() - self.certs = {} # fingerprint -> cert_info dict - self.by_org = {} # organization -> count - self.by_issuer = {} # issuer CN -> count - self.by_proxy = {} # proxy IP -> list of fingerprints - self.total_count = 0 - self.recent_certs = [] # last N certificates seen - - def add_cert(self, proxy_ip, cert_info): - """Add a MITM certificate to statistics.""" - if not cert_info: - return - fp = cert_info.get('fingerprint', '') - if not fp: - return - - with self.lock: - self.total_count += 1 - - # Store unique certs by fingerprint - if fp not in self.certs: - self.certs[fp] = cert_info - self.certs[fp]['first_seen'] = time.time() - self.certs[fp]['count'] = 1 - self.certs[fp]['proxies'] = [proxy_ip] - else: - self.certs[fp]['count'] += 1 - self.certs[fp]['last_seen'] = time.time() - if proxy_ip not in self.certs[fp]['proxies']: - self.certs[fp]['proxies'].append(proxy_ip) - - # Track by organization - org = cert_info.get('subject_o', 'Unknown') - self.by_org[org] = self.by_org.get(org, 0) + 1 - - # Track by issuer - issuer = cert_info.get('issuer_cn', 'Unknown') - self.by_issuer[issuer] = self.by_issuer.get(issuer, 0) + 1 - - # Track proxies using this cert - if proxy_ip not in self.by_proxy: - self.by_proxy[proxy_ip] = [] - if fp not in self.by_proxy[proxy_ip]: - self.by_proxy[proxy_ip].append(fp) - - # Keep recent certs (last 50) - self.recent_certs.append({ - 'fingerprint': fp, - 'proxy': proxy_ip, - 'subject_cn': cert_info.get('subject_cn', ''), - 'issuer_cn': cert_info.get('issuer_cn', ''), - 'timestamp': time.time() - }) - if len(self.recent_certs) > 50: - self.recent_certs.pop(0) - - def get_stats(self): - """Get MITM certificate statistics for API.""" - with self.lock: - # Top organizations - top_orgs = sorted(self.by_org.items(), key=lambda x: x[1], reverse=True)[:10] - # Top issuers - top_issuers = sorted(self.by_issuer.items(), key=lambda x: x[1], reverse=True)[:10] - # Unique certs sorted by count - unique_certs = [] - for fp, info in self.certs.items(): - cert_entry = {'fingerprint': fp} - cert_entry.update(info) - unique_certs.append(cert_entry) - unique_certs = sorted(unique_certs, key=lambda x: x.get('count', 0), reverse=True)[:20] - - return { - 'total_detections': self.total_count, - 'unique_certs': len(self.certs), - 'unique_proxies': len(self.by_proxy), - 'top_organizations': [{'name': o, 'count': c} for o, c in top_orgs], - 'top_issuers': [{'name': i, 'count': c} for i, c in top_issuers], - 'certificates': unique_certs, - 'recent': list(self.recent_certs[-20:]) - } - - def save_state(self, filepath): - """Save MITM stats to JSON file for persistence.""" - import json - with self.lock: - state = { - 'certs': self.certs, - 'by_org': self.by_org, - 'by_issuer': self.by_issuer, - 'by_proxy': self.by_proxy, - 'total_count': self.total_count, - 'recent_certs': self.recent_certs[-50:], - } - try: - with open(filepath, 'w') as f: - json.dump(state, f) - except Exception as e: - _log('failed to save MITM state: %s' % str(e), 'warn') - - def load_state(self, filepath): - """Load MITM stats from JSON file.""" - import json - try: - with open(filepath, 'r') as f: - state = json.load(f) - with self.lock: - self.certs = state.get('certs', {}) - self.by_org = state.get('by_org', {}) - self.by_issuer = state.get('by_issuer', {}) - self.by_proxy = state.get('by_proxy', {}) - self.total_count = state.get('total_count', 0) - self.recent_certs = state.get('recent_certs', []) - _log('restored MITM state: %d certs, %d detections' % ( - len(self.certs), self.total_count), 'info') - except IOError: - pass # File doesn't exist yet, start fresh - except Exception as e: - _log('failed to load MITM state: %s' % str(e), 'warn') - - -def extract_cert_info(cert_der): - """Extract certificate information from DER-encoded certificate. - - Args: - cert_der: DER-encoded certificate bytes - - Returns: - dict with certificate details or None on failure - """ - import hashlib - try: - # Decode DER to get certificate details - # Python 2/3 compatible approach using ssl module - from OpenSSL import crypto - x509 = crypto.load_certificate(crypto.FILETYPE_ASN1, cert_der) - - subject = x509.get_subject() - issuer = x509.get_issuer() - - # Parse dates (format: YYYYMMDDhhmmssZ) - not_before = x509.get_notBefore() - not_after = x509.get_notAfter() - if isinstance(not_before, bytes): - not_before = not_before.decode('ascii') - if isinstance(not_after, bytes): - not_after = not_after.decode('ascii') - - # Calculate fingerprint - fp = hashlib.sha256(cert_der).hexdigest() - - return { - 'fingerprint': fp[:16], # Short fingerprint for display - 'fingerprint_full': fp, - 'subject_cn': subject.CN or '', - 'subject_o': subject.O or '', - 'subject_ou': subject.OU or '', - 'subject_c': subject.C or '', - 'issuer_cn': issuer.CN or '', - 'issuer_o': issuer.O or '', - 'serial': str(x509.get_serial_number()), - 'not_before': not_before, - 'not_after': not_after, - 'version': x509.get_version(), - 'sig_algo': x509.get_signature_algorithm().decode('ascii') if hasattr(x509.get_signature_algorithm(), 'decode') else str(x509.get_signature_algorithm()), - } - except ImportError: - # Fallback if pyOpenSSL not available - basic info from hashlib - import hashlib - fp = hashlib.sha256(cert_der).hexdigest() - return { - 'fingerprint': fp[:16], - 'fingerprint_full': fp, - 'subject_cn': '(pyOpenSSL not installed)', - 'subject_o': '', - 'issuer_cn': '', - 'issuer_o': '', - 'serial': '', - 'not_before': '', - 'not_after': '', - } - except Exception as e: - return None - - -def get_mitm_certificate(proxy_ip, proxy_port, proto, torhost, target_host, target_port, timeout, auth=None): - """Connect to target through proxy without cert verification to get MITM cert. - - Args: - proxy_ip: Proxy IP address - proxy_port: Proxy port - proto: Proxy protocol (http, socks4, socks5) - torhost: Tor SOCKS5 address - target_host: Target host for SSL connection - target_port: Target port (usually 443) - timeout: Connection timeout - auth: Optional auth credentials (user:pass) - - Returns: - dict with certificate info or None on failure - """ - try: - if auth: - proxy_url = '%s://%s@%s:%s' % (proto, auth, proxy_ip, proxy_port) - else: - proxy_url = '%s://%s:%s' % (proto, proxy_ip, proxy_port) - proxies = [ - rocksock.RocksockProxyFromURL(tor_proxy_url(torhost)), - rocksock.RocksockProxyFromURL(proxy_url), - ] - - # Connect without certificate verification - sock = rocksock.Rocksock(host=target_host, port=target_port, ssl=True, - proxies=proxies, timeout=timeout, verifycert=False) - sock.connect() - - # Get peer certificate - cert_der = sock.sock.getpeercert(binary_form=True) - sock.disconnect() - - if cert_der: - return extract_cert_info(cert_der) - return None - except Exception as e: - return None - - -# Global MITM cert stats instance mitm_cert_stats = MITMCertStats() -class PriorityJobQueue(object): - """Priority queue for proxy test jobs. - - Lower priority number = higher priority. - Priority 0: New proxies (never tested) - Priority 1: Recently working (no failures, has successes) - Priority 2: Low fail count (< 3 failures) - Priority 3: Medium fail count - Priority 4: High fail count - """ - - def __init__(self): - self.heap = [] - self.lock = threading.Lock() - self.not_empty = threading.Condition(self.lock) - self.counter = 0 # tie-breaker for equal priorities - - def put(self, job, priority=3): - """Add job with priority (lower = higher priority).""" - with self.lock: - # Reset counter when queue was empty to prevent unbounded growth - if not self.heap: - self.counter = 0 - heapq.heappush(self.heap, (priority, self.counter, job)) - self.counter += 1 - self.not_empty.notify() - - def get(self, timeout=None): - """Get highest priority job. Raises Queue.Empty on timeout.""" - with self.not_empty: - if timeout is None: - while not self.heap: - self.not_empty.wait() - else: - end_time = time.time() + timeout - while not self.heap: - remaining = end_time - time.time() - if remaining <= 0: - raise Queue.Empty() - self.not_empty.wait(remaining) - _, _, job = heapq.heappop(self.heap) - return job - - def get_nowait(self): - """Get job without waiting. Raises Queue.Empty if empty.""" - with self.lock: - if not self.heap: - raise Queue.Empty() - _, _, job = heapq.heappop(self.heap) - return job - - def empty(self): - """Check if queue is empty.""" - with self.lock: - return len(self.heap) == 0 - - def qsize(self): - """Return queue size.""" - with self.lock: - return len(self.heap) - - def task_done(self): - """Compatibility method (no-op for heap queue).""" - pass - - -def calculate_priority(failcount, success_count, max_fail): - """Calculate job priority based on proxy state. - - Returns: - int: Priority 0-4 (lower = higher priority) - """ - # New proxy (never successfully tested) - if success_count == 0 and failcount == 0: - return 0 - # Recently working (no current failures) - if failcount == 0: - return 1 - # Low fail count - if failcount < 3: - return 2 - # Medium fail count - if failcount < max_fail // 2: - return 3 - # High fail count - return 4 - - class ThreadScaler(object): """Dynamic thread scaling based on queue depth and success rate. @@ -1103,40 +239,6 @@ class ThreadScaler(object): return 'threads=%d queue=%d target_per_thread=%d' % ( current_threads, queue_size, self.target_queue_per_thread) -def socks4_resolve(srvname, server_port): - """Resolve hostname to IP for SOCKS4 (which requires numeric IP). - - Caches results for DNS_CACHE_TTL seconds to avoid repeated lookups. - """ - now = time.time() - # Check cache with TTL - if srvname in cached_dns: - ip, ts = cached_dns[srvname] - if now - ts < DNS_CACHE_TTL: - if config.watchd.debug: - _log("using cached ip (%s) for %s" % (ip, srvname), "debug") - return ip - # Expired - fall through to re-resolve - - # Resolve hostname - dns_fail = False - try: - af, sa = rocksock.resolve(rocksock.RocksockHostinfo(srvname, server_port), want_v4=True) - if sa is not None: - cached_dns[srvname] = (sa[0], now) - return sa[0] - else: - dns_fail = True - except rocksock.RocksockException as e: - assert(e.get_errortype() == rocksock.RS_ET_GAI) - dns_fail = True - - if dns_fail: - _log("could not resolve connection target %s" % srvname, "ERROR") - return False - return srvname - - class ProxyTestState(): """Thread-safe state for a proxy being tested. @@ -1799,6 +901,7 @@ class Proxywatchd(): def __init__(self): config.load() + dns.set_config(config) self.in_background = False self.threads = [] self.stopping = threading.Event() diff --git a/stats.py b/stats.py new file mode 100644 index 0000000..a24396c --- /dev/null +++ b/stats.py @@ -0,0 +1,557 @@ +#!/usr/bin/env python2 +"""Statistics tracking for PPF proxy validation.""" +from __future__ import division + +import threading +import time + +from misc import _log + + +def try_div(a, b): + if b != 0: + return a / float(b) + return 0 + + +class JudgeStats(): + """Track per-judge success/failure rates for reliability scoring. + + Judges that frequently block or rate-limit are temporarily avoided. + Stats decay over time to allow recovery. + """ + + def __init__(self, cooldown_seconds=300, block_threshold=3): + self.lock = threading.Lock() + self.stats = {} # judge -> {'success': n, 'fail': n, 'block': n, 'last_block': timestamp} + self.cooldown_seconds = cooldown_seconds # seconds to avoid blocked judges + self.block_threshold = block_threshold # consecutive blocks before cooldown + + def record_success(self, judge): + """Record successful judge response.""" + with self.lock: + if judge not in self.stats: + self.stats[judge] = {'success': 0, 'fail': 0, 'block': 0, 'last_block': 0} + self.stats[judge]['success'] += 1 + # Reset block count on success + self.stats[judge]['block'] = 0 + + def record_failure(self, judge): + """Record judge failure (proxy failed, not judge block).""" + with self.lock: + if judge not in self.stats: + self.stats[judge] = {'success': 0, 'fail': 0, 'block': 0, 'last_block': 0} + self.stats[judge]['fail'] += 1 + + def record_block(self, judge): + """Record judge blocking the proxy (403, captcha, rate-limit).""" + with self.lock: + if judge not in self.stats: + self.stats[judge] = {'success': 0, 'fail': 0, 'block': 0, 'last_block': 0} + self.stats[judge]['block'] += 1 + self.stats[judge]['last_block'] = time.time() + + def is_available(self, judge): + """Check if judge is available (not in cooldown).""" + with self.lock: + if judge not in self.stats: + return True + s = self.stats[judge] + # Check if in cooldown period + if s['block'] >= self.block_threshold: + if (time.time() - s['last_block']) < self.cooldown_seconds: + return False + # Cooldown expired, reset block count + s['block'] = 0 + return True + + def get_available_judges(self, judge_list): + """Return list of judges not in cooldown.""" + return [j for j in judge_list if self.is_available(j)] + + def status_line(self): + """Return status summary for logging.""" + with self.lock: + total = len(self.stats) + blocked = sum(1 for s in self.stats.values() + if s['block'] >= self.block_threshold and + (time.time() - s['last_block']) < self.cooldown_seconds) + return 'judges: %d total, %d in cooldown' % (total, blocked) + + def get_stats(self): + """Return statistics dict for API/dashboard.""" + with self.lock: + now = time.time() + total = len(self.stats) + in_cooldown = sum(1 for s in self.stats.values() + if s['block'] >= self.block_threshold and + (now - s['last_block']) < self.cooldown_seconds) + available = total - in_cooldown + # Get top judges by success count + top = [] + for judge, s in self.stats.items(): + total_tests = s['success'] + s['fail'] + if total_tests > 0: + success_pct = (s['success'] * 100.0) / total_tests + top.append({'judge': judge, 'success': s['success'], + 'tests': total_tests, 'rate': round(success_pct, 1)}) + top.sort(key=lambda x: x['success'], reverse=True) + return {'total': total, 'available': available, 'in_cooldown': in_cooldown, 'top': top} + + +# HTTP targets - check for specific headers +regexes = { + 'www.facebook.com': 'X-FB-Debug', + 'www.fbcdn.net': 'X-FB-Debug', + 'www.reddit.com': 'x-clacks-overhead', + 'www.twitter.com': 'x-connection-hash', + 't.co': 'x-connection-hash', + 'www.msn.com': 'x-aspnetmvc-version', + 'www.bing.com': 'p3p', + 'www.ask.com': 'x-served-by', + 'www.hotmail.com': 'x-msedge-ref', + 'www.bbc.co.uk': 'x-bbc-edge-cache-status', + 'www.skype.com': 'X-XSS-Protection', + 'www.alibaba.com': 'object-status', + 'www.mozilla.org': 'cf-ray', + 'www.cloudflare.com': 'cf-ray', + 'www.wikimedia.org': 'x-client-ip', + 'www.vk.com': 'x-frontend', + 'www.tinypic.com': 'x-amz-cf-pop', + 'www.netflix.com': 'X-Netflix.proxy.execution-time', + 'www.amazon.de': 'x-amz-cf-id', + 'www.reuters.com': 'x-amz-cf-id', + 'www.ikea.com': 'x-frame-options', + 'www.twitpic.com': 'timing-allow-origin', + 'www.digg.com': 'cf-request-id', + 'www.wikia.com': 'x-served-by', + 'www.wp.com': 'x-ac', + 'www.last.fm': 'x-timer', + 'www.usps.com': 'x-ruleset-version', + 'www.linkedin.com': 'x-li-uuid', + 'www.vimeo.com': 'x-timer', + 'www.yelp.com': 'x-timer', + 'www.ebay.com': 'x-envoy-upstream-service-time', + 'www.wikihow.com': 'x-c', + 'www.archive.org': 'referrer-policy', + 'www.pandora.tv': 'X-UA-Compatible', + 'www.w3.org': 'x-backend', + 'www.time.com': 'x-amz-cf-pop' +} + +# SSL targets - verify TLS handshake only (MITM detection) +ssl_targets = [ + 'www.google.com', + 'www.microsoft.com', + 'www.apple.com', + 'www.amazon.com', + 'www.cloudflare.com', + 'www.github.com', + 'www.mozilla.org', + 'www.wikipedia.org', + 'www.reddit.com', + 'www.twitter.com', + 'x.com', + 'www.facebook.com', + 'www.linkedin.com', + 'www.paypal.com', + 'www.stripe.com', + 'www.digicert.com', + 'www.letsencrypt.org', +] + + +class Stats(): + """Track and report comprehensive runtime statistics.""" + + HISTORY_SIZE = 120 # 10 min at 5s intervals + LATENCY_BUCKETS = [100, 250, 500, 1000, 2000, 5000, 10000] # ms thresholds + + def __init__(self): + self.lock = threading.RLock() # RLock for reentrant access (get_runtime_stats) + self.tested = 0 + self.passed = 0 + self.failed = 0 + self.start_time = time.time() + self.last_report = time.time() + + # Failure category tracking + self.fail_categories = {} + + # Protocol tracking (tested, passed, and failed separately) + self.proto_tested = {'http': 0, 'socks4': 0, 'socks5': 0} + self.proto_passed = {'http': 0, 'socks4': 0, 'socks5': 0} + self.proto_failed = {'http': {}, 'socks4': {}, 'socks5': {}} # Failures by category per proto + self.by_proto = self.proto_passed # Alias for dashboard API + + # Time series history (5s intervals) + self.rate_history = [] + self.success_rate_history = [] + self.latency_history = [] + self.last_history_time = time.time() + self.last_history_tested = 0 + self.last_history_passed = 0 + + # Peak values (delayed measurement to avoid startup anomalies) + self.peak_rate = 0.0 + self.peak_success_rate = 0.0 + self.peak_grace_period = 30 # seconds before recording peaks + self.min_latency = float('inf') + self.max_latency = 0.0 + + # Latency tracking with percentiles + self.latency_sum = 0.0 + self.latency_count = 0 + self.latency_samples = [] # Recent samples for percentiles + self.latency_buckets = {b: 0 for b in self.LATENCY_BUCKETS + [float('inf')]} + + # Recent window (last 60s) + self.recent_tested = 0 + self.recent_passed = 0 + self.recent_start = time.time() + + # Country/ASN tracking (top N) + self.country_passed = {} + self.asn_passed = {} + + # Hourly aggregates + self.hourly_tested = 0 + self.hourly_passed = 0 + self.hourly_start = time.time() + self.hours_data = [] # Last 24 hours + + # SSL/TLS tracking + self.ssl_tested = 0 + self.ssl_passed = 0 + self.ssl_failed = 0 + self.ssl_fail_categories = {} # Track SSL failures by category + self.mitm_detected = 0 + self.cert_errors = 0 + + def record(self, success, category=None, proto=None, latency_ms=None, country=None, asn=None, + ssl_test=False, mitm=False, cert_error=False): + with self.lock: + self.tested += 1 + self.recent_tested += 1 + self.hourly_tested += 1 + + # Track protocol tests + if proto and proto in self.proto_tested: + self.proto_tested[proto] += 1 + + if success: + self.passed += 1 + self.recent_passed += 1 + self.hourly_passed += 1 + + if proto and proto in self.proto_passed: + self.proto_passed[proto] += 1 + + if latency_ms and latency_ms > 0: + self.latency_sum += latency_ms + self.latency_count += 1 + # Track min/max + if latency_ms < self.min_latency: + self.min_latency = latency_ms + if latency_ms > self.max_latency: + self.max_latency = latency_ms + # Keep recent samples for percentiles (max 1000) + self.latency_samples.append(latency_ms) + if len(self.latency_samples) > 1000: + self.latency_samples.pop(0) + # Bucket for histogram + for bucket in self.LATENCY_BUCKETS: + if latency_ms <= bucket: + self.latency_buckets[bucket] += 1 + break + else: + self.latency_buckets[float('inf')] += 1 + + # Track country/ASN + if country: + self.country_passed[country] = self.country_passed.get(country, 0) + 1 + if asn: + self.asn_passed[asn] = self.asn_passed.get(asn, 0) + 1 + else: + self.failed += 1 + if category: + self.fail_categories[category] = self.fail_categories.get(category, 0) + 1 + # Track failures by protocol + if proto and proto in self.proto_failed: + self.proto_failed[proto][category] = self.proto_failed[proto].get(category, 0) + 1 + # Log failure category breakdown every 1000 failures + if self.failed % 1000 == 0: + top_cats = sorted(self.fail_categories.items(), key=lambda x: -x[1])[:5] + cats_str = ', '.join(['%s:%d' % (c, n) for c, n in top_cats]) + _log('fail breakdown (%d total): %s' % (self.failed, cats_str), 'diag') + + # SSL/TLS tracking + if ssl_test: + self.ssl_tested += 1 + if success: + self.ssl_passed += 1 + else: + self.ssl_failed += 1 + # Track which error caused the SSL failure + if category: + self.ssl_fail_categories[category] = self.ssl_fail_categories.get(category, 0) + 1 + if mitm: + self.mitm_detected += 1 + if cert_error: + self.cert_errors += 1 + + def update_history(self): + """Update time series history (call periodically).""" + now = time.time() + with self.lock: + elapsed = now - self.last_history_time + if elapsed >= 5: # Update every 5 seconds + # Rate - with sanity checks + tests_delta = self.tested - self.last_history_tested + if tests_delta < 0: + # Counter wrapped or corrupted - reset baseline + self.last_history_tested = self.tested + tests_delta = 0 + rate = tests_delta / elapsed if elapsed > 0 else 0 + # Cap at reasonable max (100/s is generous for proxy testing) + if rate > 100: + rate = 0 # Discard bogus value + self.rate_history.append(round(rate, 2)) + if len(self.rate_history) > self.HISTORY_SIZE: + self.rate_history.pop(0) + # Only record peaks after grace period (avoid startup anomalies) + uptime = now - self.start_time + if uptime >= self.peak_grace_period and rate > self.peak_rate and rate <= 100: + self.peak_rate = rate + + # Success rate - with sanity checks + passed_delta = self.passed - self.last_history_passed + if passed_delta < 0: + self.last_history_passed = self.passed + passed_delta = 0 + sr = (passed_delta / tests_delta * 100) if tests_delta > 0 else 0 + sr = min(sr, 100.0) # Cap at 100% + self.success_rate_history.append(round(sr, 1)) + if len(self.success_rate_history) > self.HISTORY_SIZE: + self.success_rate_history.pop(0) + if uptime >= self.peak_grace_period and sr > self.peak_success_rate: + self.peak_success_rate = sr + + # Average latency for this interval + avg_lat = self.get_avg_latency() + self.latency_history.append(round(avg_lat, 0)) + if len(self.latency_history) > self.HISTORY_SIZE: + self.latency_history.pop(0) + + self.last_history_time = now + self.last_history_tested = self.tested + self.last_history_passed = self.passed + + # Reset recent window every 60s + if now - self.recent_start >= 60: + self.recent_tested = 0 + self.recent_passed = 0 + self.recent_start = now + + # Hourly aggregation + if now - self.hourly_start >= 3600: + self.hours_data.append({ + 'tested': self.hourly_tested, + 'passed': self.hourly_passed, + 'rate': self.hourly_passed / 3600.0 if self.hourly_tested > 0 else 0, + 'success_rate': (self.hourly_passed / self.hourly_tested * 100) if self.hourly_tested > 0 else 0, + }) + if len(self.hours_data) > 24: + self.hours_data.pop(0) + self.hourly_tested = 0 + self.hourly_passed = 0 + self.hourly_start = now + + def get_recent_rate(self): + """Get rate for last 60 seconds.""" + with self.lock: + elapsed = time.time() - self.recent_start + if elapsed > 0: + return self.recent_tested / elapsed + return 0.0 + + def get_recent_success_rate(self): + """Get success rate for last 60 seconds.""" + with self.lock: + if self.recent_tested > 0: + return (self.recent_passed / self.recent_tested) * 100 + return 0.0 + + def get_avg_latency(self): + """Get average latency in ms.""" + with self.lock: + if self.latency_count > 0: + return self.latency_sum / self.latency_count + return 0.0 + + def get_latency_percentiles(self): + """Get latency percentiles (p50, p90, p99).""" + with self.lock: + if not self.latency_samples: + return {'p50': 0, 'p90': 0, 'p99': 0} + sorted_samples = sorted(self.latency_samples) + n = len(sorted_samples) + return { + 'p50': sorted_samples[int(n * 0.50)] if n > 0 else 0, + 'p90': sorted_samples[int(n * 0.90)] if n > 0 else 0, + 'p99': sorted_samples[min(int(n * 0.99), n - 1)] if n > 0 else 0, + } + + def get_latency_histogram(self): + """Get latency distribution histogram.""" + with self.lock: + total = sum(self.latency_buckets.values()) + if total == 0: + return [] + result = [] + prev = 0 + for bucket in self.LATENCY_BUCKETS: + count = self.latency_buckets[bucket] + result.append({ + 'range': '%d-%d' % (prev, bucket), + 'count': count, + 'pct': round(count / total * 100, 1), + }) + prev = bucket + # Over max bucket + over = self.latency_buckets[float('inf')] + if over > 0: + result.append({ + 'range': '>%d' % self.LATENCY_BUCKETS[-1], + 'count': over, + 'pct': round(over / total * 100, 1), + }) + return result + + def get_proto_stats(self): + """Get protocol-specific success rates and failure breakdown.""" + with self.lock: + result = {} + for proto in ['http', 'socks4', 'socks5']: + tested = self.proto_tested[proto] + passed = self.proto_passed[proto] + failed = sum(self.proto_failed[proto].values()) + result[proto] = { + 'tested': tested, + 'passed': passed, + 'failed': failed, + 'success_rate': round(passed / tested * 100, 1) if tested > 0 else 0, + 'fail_reasons': dict(self.proto_failed[proto]) if self.proto_failed[proto] else {}, + } + return result + + def get_top_countries(self, limit=10): + """Get top countries by working proxy count.""" + with self.lock: + sorted_countries = sorted(self.country_passed.items(), key=lambda x: -x[1]) + return sorted_countries[:limit] + + def get_top_asns(self, limit=10): + """Get top ASNs by working proxy count.""" + with self.lock: + sorted_asns = sorted(self.asn_passed.items(), key=lambda x: -x[1]) + return sorted_asns[:limit] + + def get_hourly_data(self): + """Get last 24 hours of hourly data.""" + with self.lock: + return list(self.hours_data) + + def load_state(self, state): + """Load persisted state from a dict (from database). + + Args: + state: dict from dbs.load_session_state() + """ + if not state: + return + with self.lock: + self.tested = state.get('tested', 0) + self.passed = state.get('passed', 0) + self.failed = state.get('failed', 0) + self.ssl_tested = state.get('ssl_tested', 0) + self.ssl_passed = state.get('ssl_passed', 0) + self.ssl_failed = state.get('ssl_failed', 0) + self.mitm_detected = state.get('mitm_detected', 0) + self.cert_errors = state.get('cert_errors', 0) + self.proto_tested['http'] = state.get('proto_http_tested', 0) + self.proto_passed['http'] = state.get('proto_http_passed', 0) + self.proto_tested['socks4'] = state.get('proto_socks4_tested', 0) + self.proto_passed['socks4'] = state.get('proto_socks4_passed', 0) + self.proto_tested['socks5'] = state.get('proto_socks5_tested', 0) + self.proto_passed['socks5'] = state.get('proto_socks5_passed', 0) + # Note: peak_rate is per-session, not restored (avoids stale/corrupt values) + # Note: start_time is NOT restored - uptime reflects current session + # Restore failure categories + if state.get('fail_categories'): + self.fail_categories = dict(state['fail_categories']) + # Restore SSL failure categories + if state.get('ssl_fail_categories'): + self.ssl_fail_categories = dict(state['ssl_fail_categories']) + # Restore protocol failure categories + if state.get('proto_failed'): + for proto in ['http', 'socks4', 'socks5']: + if proto in state['proto_failed']: + self.proto_failed[proto] = dict(state['proto_failed'][proto]) + # Restore geo tracking + if state.get('country_passed'): + self.country_passed = dict(state['country_passed']) + if state.get('asn_passed'): + # Convert string keys back to int for ASN + self.asn_passed = {int(k) if k.isdigit() else k: v + for k, v in state['asn_passed'].items()} + _log('restored session: %d tested, %d passed' % (self.tested, self.passed), 'info') + + def should_report(self, interval): + return (time.time() - self.last_report) >= interval + + def report(self): + with self.lock: + self.last_report = time.time() + elapsed = time.time() - self.start_time + rate = try_div(self.tested, elapsed) + pct = try_div(self.passed * 100.0, self.tested) + base = 'tested=%d passed=%d (%.1f%%) rate=%.2f/s uptime=%dm' % ( + self.tested, self.passed, pct, rate, int(elapsed / 60)) + # Add failure breakdown if there are failures + if self.fail_categories: + cats = ' '.join('%s=%d' % (k, v) for k, v in sorted(self.fail_categories.items())) + return '%s [%s]' % (base, cats) + return base + + def get_full_stats(self): + """Get comprehensive stats dict for API.""" + with self.lock: + elapsed = time.time() - self.start_time + return { + 'tested': self.tested, + 'passed': self.passed, + 'failed': self.failed, + 'success_rate': round(self.passed / self.tested * 100, 1) if self.tested > 0 else 0, + 'rate': round(self.tested / elapsed, 2) if elapsed > 0 else 0, + 'pass_rate': round(self.passed / elapsed, 2) if elapsed > 0 else 0, + 'recent_rate': self.get_recent_rate(), + 'recent_success_rate': self.get_recent_success_rate(), + 'peak_rate': self.peak_rate, + 'peak_success_rate': self.peak_success_rate, + 'uptime_seconds': int(elapsed), + 'rate_history': list(self.rate_history), + 'success_rate_history': list(self.success_rate_history), + 'latency_history': list(self.latency_history), + 'avg_latency': self.get_avg_latency(), + 'min_latency': self.min_latency if self.min_latency != float('inf') else 0, + 'max_latency': self.max_latency, + 'latency_percentiles': self.get_latency_percentiles(), + 'latency_histogram': self.get_latency_histogram(), + 'by_proto': dict(self.proto_passed), + 'proto_stats': self.get_proto_stats(), + 'failures': dict(self.fail_categories), + 'top_countries': self.get_top_countries(), + 'top_asns': self.get_top_asns(), + 'hourly_data': self.get_hourly_data(), + }