When ssl_first=1 (default), proxy validation first attempts an SSL handshake. If it fails, falls back to the configured secondary check (head, judges, or irc). This separates SSL capability detection from basic connectivity testing. New config options: - ssl_first: enable SSL-first pattern (default: 1) - checktype: secondary check type (head, judges, irc)
2489 lines
100 KiB
Python
2489 lines
100 KiB
Python
#!/usr/bin/env python2
|
|
from __future__ import division
|
|
|
|
# Gevent monkey-patching MUST happen before any other imports
|
|
# This patches threading, socket, time, etc. for cooperative concurrency
|
|
from gevent import monkey
|
|
monkey.patch_all()
|
|
|
|
import threading # Now patched by gevent
|
|
import time
|
|
import random
|
|
import string
|
|
import re
|
|
import heapq
|
|
import signal
|
|
import network_stats
|
|
|
|
import os
|
|
import ssl
|
|
import contextlib
|
|
try:
|
|
import Queue
|
|
except ImportError:
|
|
import queue as Queue
|
|
try:
|
|
import IP2Location
|
|
geodb = IP2Location.IP2Location(os.path.join("data", "IP2LOCATION-LITE-DB1.BIN"))
|
|
geolite = True
|
|
except (ImportError, IOError, ValueError):
|
|
geolite = False
|
|
|
|
try:
|
|
import pyasn
|
|
asndb = pyasn.pyasn(os.path.join("data", "ipasn.dat"))
|
|
asn_enabled = True
|
|
except (ImportError, IOError, NameError):
|
|
asndb = None
|
|
asn_enabled = False
|
|
|
|
from config import Config
|
|
|
|
import mysqlite
|
|
import dbs
|
|
from misc import _log, categorize_error, tor_proxy_url, is_ssl_protocol_error
|
|
import rocksock
|
|
import connection_pool
|
|
|
|
# Optional scraper integration for engine stats
|
|
try:
|
|
import scraper as scraper_module
|
|
scraper_available = True
|
|
except ImportError:
|
|
scraper_module = None
|
|
scraper_available = False
|
|
|
|
config = Config()
|
|
|
|
def set_config(cfg):
|
|
"""Set the config object (used when imported from ppf.py)."""
|
|
global config
|
|
config = cfg
|
|
|
|
_run_standalone = False
|
|
cached_dns = {} # {hostname: (ip, timestamp)}
|
|
DNS_CACHE_TTL = 3600 # 1 hour
|
|
|
|
# Debug mode for proxy check path - set via PPF_DEBUG env or config
|
|
_debug_proxy = os.environ.get('PPF_DEBUG', '').lower() in ('1', 'true', 'proxy')
|
|
|
|
# Sampling debug: print detailed diagnostics for every Nth test
|
|
_sample_debug_interval = 50 # Print debug for every 50th test (lowered for diagnosis)
|
|
_sample_debug_counter = 0
|
|
_sample_debug_lock = threading.Lock()
|
|
|
|
def _dbg(msg, proxy=None):
|
|
"""Debug log for proxy check path. Only logs when PPF_DEBUG=1."""
|
|
if _debug_proxy:
|
|
prefix = '[%s] ' % proxy if proxy else ''
|
|
# Use 'dbg' category (shows at info level) instead of 'debug' (filtered by default)
|
|
_log('%s%s' % (prefix, msg), 'dbg')
|
|
|
|
def _sample_dbg(msg, proxy=None, force=False):
|
|
"""Sampled debug: log every Nth test for diagnostics without flooding."""
|
|
global _sample_debug_counter
|
|
should_log = force
|
|
if not should_log:
|
|
with _sample_debug_lock:
|
|
_sample_debug_counter += 1
|
|
if _sample_debug_counter >= _sample_debug_interval:
|
|
_sample_debug_counter = 0
|
|
should_log = True
|
|
if should_log:
|
|
prefix = '[SAMPLE %s] ' % proxy if proxy else '[SAMPLE] '
|
|
_log('%s%s' % (prefix, msg), 'diag')
|
|
|
|
# IP pattern for judge services (validates response contains valid IP in body)
|
|
IP_PATTERN = r'\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}'
|
|
|
|
def is_valid_ip(ip_str):
|
|
"""Validate IP address octets are 0-255."""
|
|
try:
|
|
parts = ip_str.split('.')
|
|
return len(parts) == 4 and all(0 <= int(p) <= 255 for p in parts)
|
|
except (ValueError, AttributeError):
|
|
return False
|
|
|
|
# Pattern for header echo - if X-Forwarded-For or Via present, proxy reveals chain
|
|
HEADER_REVEAL_PATTERN = r'(X-Forwarded-For|Via|X-Real-Ip|Forwarded)'
|
|
|
|
# Dead proxy marker - proxies with failed=-1 are permanently dead and never retested
|
|
DEAD_PROXY = -1
|
|
# Error categories that indicate proxy is definitely dead (not temporary failure)
|
|
FATAL_ERROR_CATEGORIES = ('refused', 'unreachable', 'auth')
|
|
|
|
# Patterns indicating judge is blocking the proxy (not a proxy failure)
|
|
# These should NOT count as proxy failures - retry with different judge
|
|
JUDGE_BLOCK_PATTERNS = [
|
|
r'HTTP/1\.[01] 403', # Forbidden
|
|
r'HTTP/1\.[01] 429', # Too Many Requests
|
|
r'HTTP/1\.[01] 503', # Service Unavailable
|
|
r'captcha', # Captcha challenge
|
|
r'challenge', # Generic challenge
|
|
r'verify you are human', # Human verification
|
|
r'rate.?limit', # Rate limiting
|
|
r'too many requests', # Rate limiting
|
|
r'access.?denied', # Access denied
|
|
r'blocked', # Explicit block
|
|
r'Checking your browser', # Cloudflare JS challenge
|
|
]
|
|
JUDGE_BLOCK_RE = re.compile('|'.join(JUDGE_BLOCK_PATTERNS), re.IGNORECASE)
|
|
|
|
# Check types: irc, http (header match), judges (body match), ssl (TLS handshake)
|
|
# Judge services - return IP in body (plain text, JSON, or HTML)
|
|
judges = {
|
|
# Plain text IP
|
|
'api.ipify.org': IP_PATTERN,
|
|
'icanhazip.com': IP_PATTERN,
|
|
'checkip.amazonaws.com': IP_PATTERN,
|
|
'ifconfig.me/ip': IP_PATTERN,
|
|
'ipinfo.io/ip': IP_PATTERN,
|
|
'ip.me': IP_PATTERN,
|
|
'myip.dnsomatic.com': IP_PATTERN,
|
|
'ident.me': IP_PATTERN,
|
|
'ipecho.net/plain': IP_PATTERN,
|
|
'wtfismyip.com/text': IP_PATTERN,
|
|
'eth0.me': IP_PATTERN,
|
|
'l2.io/ip': IP_PATTERN,
|
|
'tnx.nl/ip': IP_PATTERN,
|
|
'wgetip.com': IP_PATTERN,
|
|
'curlmyip.net': IP_PATTERN,
|
|
# JSON responses (IP pattern matches within JSON)
|
|
'httpbin.org/ip': IP_PATTERN,
|
|
'ip-api.com/json': IP_PATTERN,
|
|
'ipapi.co/json': IP_PATTERN,
|
|
'ipwhois.app/json': IP_PATTERN,
|
|
# Header echo - for elite detection (check if proxy adds revealing headers)
|
|
'httpbin.org/headers': IP_PATTERN, # returns request headers as JSON
|
|
}
|
|
|
|
|
|
class JudgeStats():
|
|
"""Track per-judge success/failure rates for reliability scoring.
|
|
|
|
Judges that frequently block or rate-limit are temporarily avoided.
|
|
Stats decay over time to allow recovery.
|
|
"""
|
|
|
|
def __init__(self, cooldown_seconds=300, block_threshold=3):
|
|
self.lock = threading.Lock()
|
|
self.stats = {} # judge -> {'success': n, 'fail': n, 'block': n, 'last_block': timestamp}
|
|
self.cooldown_seconds = cooldown_seconds # seconds to avoid blocked judges
|
|
self.block_threshold = block_threshold # consecutive blocks before cooldown
|
|
|
|
def record_success(self, judge):
|
|
"""Record successful judge response."""
|
|
with self.lock:
|
|
if judge not in self.stats:
|
|
self.stats[judge] = {'success': 0, 'fail': 0, 'block': 0, 'last_block': 0}
|
|
self.stats[judge]['success'] += 1
|
|
# Reset block count on success
|
|
self.stats[judge]['block'] = 0
|
|
|
|
def record_failure(self, judge):
|
|
"""Record judge failure (proxy failed, not judge block)."""
|
|
with self.lock:
|
|
if judge not in self.stats:
|
|
self.stats[judge] = {'success': 0, 'fail': 0, 'block': 0, 'last_block': 0}
|
|
self.stats[judge]['fail'] += 1
|
|
|
|
def record_block(self, judge):
|
|
"""Record judge blocking the proxy (403, captcha, rate-limit)."""
|
|
with self.lock:
|
|
if judge not in self.stats:
|
|
self.stats[judge] = {'success': 0, 'fail': 0, 'block': 0, 'last_block': 0}
|
|
self.stats[judge]['block'] += 1
|
|
self.stats[judge]['last_block'] = time.time()
|
|
|
|
def is_available(self, judge):
|
|
"""Check if judge is available (not in cooldown)."""
|
|
with self.lock:
|
|
if judge not in self.stats:
|
|
return True
|
|
s = self.stats[judge]
|
|
# Check if in cooldown period
|
|
if s['block'] >= self.block_threshold:
|
|
if (time.time() - s['last_block']) < self.cooldown_seconds:
|
|
return False
|
|
# Cooldown expired, reset block count
|
|
s['block'] = 0
|
|
return True
|
|
|
|
def get_available_judges(self, judge_list):
|
|
"""Return list of judges not in cooldown."""
|
|
return [j for j in judge_list if self.is_available(j)]
|
|
|
|
def status_line(self):
|
|
"""Return status summary for logging."""
|
|
with self.lock:
|
|
total = len(self.stats)
|
|
blocked = sum(1 for s in self.stats.values()
|
|
if s['block'] >= self.block_threshold and
|
|
(time.time() - s['last_block']) < self.cooldown_seconds)
|
|
return 'judges: %d total, %d in cooldown' % (total, blocked)
|
|
|
|
def get_stats(self):
|
|
"""Return statistics dict for API/dashboard."""
|
|
with self.lock:
|
|
now = time.time()
|
|
total = len(self.stats)
|
|
in_cooldown = sum(1 for s in self.stats.values()
|
|
if s['block'] >= self.block_threshold and
|
|
(now - s['last_block']) < self.cooldown_seconds)
|
|
available = total - in_cooldown
|
|
# Get top judges by success count
|
|
top = []
|
|
for judge, s in self.stats.items():
|
|
total_tests = s['success'] + s['fail']
|
|
if total_tests > 0:
|
|
success_pct = (s['success'] * 100.0) / total_tests
|
|
top.append({'judge': judge, 'success': s['success'],
|
|
'tests': total_tests, 'rate': round(success_pct, 1)})
|
|
top.sort(key=lambda x: x['success'], reverse=True)
|
|
return {'total': total, 'available': available, 'in_cooldown': in_cooldown, 'top': top}
|
|
|
|
|
|
# Global judge stats instance
|
|
judge_stats = JudgeStats()
|
|
|
|
# HTTP targets - check for specific headers
|
|
regexes = {
|
|
'www.facebook.com': 'X-FB-Debug',
|
|
'www.fbcdn.net': 'X-FB-Debug',
|
|
'www.reddit.com': 'x-clacks-overhead',
|
|
'www.twitter.com': 'x-connection-hash',
|
|
't.co': 'x-connection-hash',
|
|
'www.msn.com': 'x-aspnetmvc-version',
|
|
'www.bing.com': 'p3p',
|
|
'www.ask.com': 'x-served-by',
|
|
'www.hotmail.com': 'x-msedge-ref',
|
|
'www.bbc.co.uk': 'x-bbc-edge-cache-status',
|
|
'www.skype.com': 'X-XSS-Protection',
|
|
'www.alibaba.com': 'object-status',
|
|
'www.mozilla.org': 'cf-ray',
|
|
'www.cloudflare.com': 'cf-ray',
|
|
'www.wikimedia.org': 'x-client-ip',
|
|
'www.vk.com': 'x-frontend',
|
|
'www.tinypic.com': 'x-amz-cf-pop',
|
|
'www.netflix.com': 'X-Netflix.proxy.execution-time',
|
|
'www.amazon.de': 'x-amz-cf-id',
|
|
'www.reuters.com': 'x-amz-cf-id',
|
|
'www.ikea.com': 'x-frame-options',
|
|
'www.twitpic.com': 'timing-allow-origin',
|
|
'www.digg.com': 'cf-request-id',
|
|
'www.wikia.com': 'x-served-by',
|
|
'www.wp.com': 'x-ac',
|
|
'www.last.fm': 'x-timer',
|
|
'www.usps.com': 'x-ruleset-version',
|
|
'www.linkedin.com': 'x-li-uuid',
|
|
'www.vimeo.com': 'x-timer',
|
|
'www.yelp.com': 'x-timer',
|
|
'www.ebay.com': 'x-envoy-upstream-service-time',
|
|
'www.wikihow.com': 'x-c',
|
|
'www.archive.org': 'referrer-policy',
|
|
'www.pandora.tv': 'X-UA-Compatible',
|
|
'www.w3.org': 'x-backend',
|
|
'www.time.com': 'x-amz-cf-pop'
|
|
}
|
|
|
|
# SSL targets - verify TLS handshake only (MITM detection)
|
|
# No HTTP request needed - just connect with verifycert=True
|
|
ssl_targets = [
|
|
'www.google.com',
|
|
'www.microsoft.com',
|
|
'www.apple.com',
|
|
'www.amazon.com',
|
|
'www.cloudflare.com',
|
|
'www.github.com',
|
|
'www.mozilla.org',
|
|
'www.wikipedia.org',
|
|
'www.reddit.com',
|
|
'www.twitter.com',
|
|
'x.com',
|
|
'www.facebook.com',
|
|
'www.linkedin.com',
|
|
'www.paypal.com',
|
|
'www.stripe.com',
|
|
'www.digicert.com',
|
|
'www.letsencrypt.org',
|
|
]
|
|
|
|
class Stats():
|
|
"""Track and report comprehensive runtime statistics."""
|
|
|
|
HISTORY_SIZE = 120 # 10 min at 5s intervals
|
|
LATENCY_BUCKETS = [100, 250, 500, 1000, 2000, 5000, 10000] # ms thresholds
|
|
|
|
def __init__(self):
|
|
self.lock = threading.RLock() # RLock for reentrant access (get_runtime_stats)
|
|
self.tested = 0
|
|
self.passed = 0
|
|
self.failed = 0
|
|
self.start_time = time.time()
|
|
self.last_report = time.time()
|
|
|
|
# Failure category tracking
|
|
self.fail_categories = {}
|
|
|
|
# Protocol tracking (tested, passed, and failed separately)
|
|
self.proto_tested = {'http': 0, 'socks4': 0, 'socks5': 0}
|
|
self.proto_passed = {'http': 0, 'socks4': 0, 'socks5': 0}
|
|
self.proto_failed = {'http': {}, 'socks4': {}, 'socks5': {}} # Failures by category per proto
|
|
self.by_proto = self.proto_passed # Alias for dashboard API
|
|
|
|
# Time series history (5s intervals)
|
|
self.rate_history = []
|
|
self.success_rate_history = []
|
|
self.latency_history = []
|
|
self.last_history_time = time.time()
|
|
self.last_history_tested = 0
|
|
self.last_history_passed = 0
|
|
|
|
# Peak values (delayed measurement to avoid startup anomalies)
|
|
self.peak_rate = 0.0
|
|
self.peak_success_rate = 0.0
|
|
self.peak_grace_period = 30 # seconds before recording peaks
|
|
self.min_latency = float('inf')
|
|
self.max_latency = 0.0
|
|
|
|
# Latency tracking with percentiles
|
|
self.latency_sum = 0.0
|
|
self.latency_count = 0
|
|
self.latency_samples = [] # Recent samples for percentiles
|
|
self.latency_buckets = {b: 0 for b in self.LATENCY_BUCKETS + [float('inf')]}
|
|
|
|
# Recent window (last 60s)
|
|
self.recent_tested = 0
|
|
self.recent_passed = 0
|
|
self.recent_start = time.time()
|
|
|
|
# Country/ASN tracking (top N)
|
|
self.country_passed = {}
|
|
self.asn_passed = {}
|
|
|
|
# Hourly aggregates
|
|
self.hourly_tested = 0
|
|
self.hourly_passed = 0
|
|
self.hourly_start = time.time()
|
|
self.hours_data = [] # Last 24 hours
|
|
|
|
# SSL/TLS tracking
|
|
self.ssl_tested = 0
|
|
self.ssl_passed = 0
|
|
self.ssl_failed = 0
|
|
self.ssl_fail_categories = {} # Track SSL failures by category
|
|
self.mitm_detected = 0
|
|
self.cert_errors = 0
|
|
|
|
def record(self, success, category=None, proto=None, latency_ms=None, country=None, asn=None,
|
|
ssl_test=False, mitm=False, cert_error=False):
|
|
with self.lock:
|
|
self.tested += 1
|
|
self.recent_tested += 1
|
|
self.hourly_tested += 1
|
|
|
|
# Track protocol tests
|
|
if proto and proto in self.proto_tested:
|
|
self.proto_tested[proto] += 1
|
|
|
|
if success:
|
|
self.passed += 1
|
|
self.recent_passed += 1
|
|
self.hourly_passed += 1
|
|
|
|
if proto and proto in self.proto_passed:
|
|
self.proto_passed[proto] += 1
|
|
|
|
if latency_ms and latency_ms > 0:
|
|
self.latency_sum += latency_ms
|
|
self.latency_count += 1
|
|
# Track min/max
|
|
if latency_ms < self.min_latency:
|
|
self.min_latency = latency_ms
|
|
if latency_ms > self.max_latency:
|
|
self.max_latency = latency_ms
|
|
# Keep recent samples for percentiles (max 1000)
|
|
self.latency_samples.append(latency_ms)
|
|
if len(self.latency_samples) > 1000:
|
|
self.latency_samples.pop(0)
|
|
# Bucket for histogram
|
|
for bucket in self.LATENCY_BUCKETS:
|
|
if latency_ms <= bucket:
|
|
self.latency_buckets[bucket] += 1
|
|
break
|
|
else:
|
|
self.latency_buckets[float('inf')] += 1
|
|
|
|
# Track country/ASN
|
|
if country:
|
|
self.country_passed[country] = self.country_passed.get(country, 0) + 1
|
|
if asn:
|
|
self.asn_passed[asn] = self.asn_passed.get(asn, 0) + 1
|
|
else:
|
|
self.failed += 1
|
|
if category:
|
|
self.fail_categories[category] = self.fail_categories.get(category, 0) + 1
|
|
# Track failures by protocol
|
|
if proto and proto in self.proto_failed:
|
|
self.proto_failed[proto][category] = self.proto_failed[proto].get(category, 0) + 1
|
|
# Log failure category breakdown every 1000 failures
|
|
if self.failed % 1000 == 0:
|
|
top_cats = sorted(self.fail_categories.items(), key=lambda x: -x[1])[:5]
|
|
cats_str = ', '.join(['%s:%d' % (c, n) for c, n in top_cats])
|
|
_log('fail breakdown (%d total): %s' % (self.failed, cats_str), 'diag')
|
|
|
|
# SSL/TLS tracking
|
|
if ssl_test:
|
|
self.ssl_tested += 1
|
|
if success:
|
|
self.ssl_passed += 1
|
|
else:
|
|
self.ssl_failed += 1
|
|
# Track which error caused the SSL failure
|
|
if category:
|
|
self.ssl_fail_categories[category] = self.ssl_fail_categories.get(category, 0) + 1
|
|
if mitm:
|
|
self.mitm_detected += 1
|
|
if cert_error:
|
|
self.cert_errors += 1
|
|
|
|
def update_history(self):
|
|
"""Update time series history (call periodically)."""
|
|
now = time.time()
|
|
with self.lock:
|
|
elapsed = now - self.last_history_time
|
|
if elapsed >= 5: # Update every 5 seconds
|
|
# Rate - with sanity checks
|
|
tests_delta = self.tested - self.last_history_tested
|
|
if tests_delta < 0:
|
|
# Counter wrapped or corrupted - reset baseline
|
|
self.last_history_tested = self.tested
|
|
tests_delta = 0
|
|
rate = tests_delta / elapsed if elapsed > 0 else 0
|
|
# Cap at reasonable max (100/s is generous for proxy testing)
|
|
if rate > 100:
|
|
rate = 0 # Discard bogus value
|
|
self.rate_history.append(round(rate, 2))
|
|
if len(self.rate_history) > self.HISTORY_SIZE:
|
|
self.rate_history.pop(0)
|
|
# Only record peaks after grace period (avoid startup anomalies)
|
|
uptime = now - self.start_time
|
|
if uptime >= self.peak_grace_period and rate > self.peak_rate and rate <= 100:
|
|
self.peak_rate = rate
|
|
|
|
# Success rate - with sanity checks
|
|
passed_delta = self.passed - self.last_history_passed
|
|
if passed_delta < 0:
|
|
self.last_history_passed = self.passed
|
|
passed_delta = 0
|
|
sr = (passed_delta / tests_delta * 100) if tests_delta > 0 else 0
|
|
sr = min(sr, 100.0) # Cap at 100%
|
|
self.success_rate_history.append(round(sr, 1))
|
|
if len(self.success_rate_history) > self.HISTORY_SIZE:
|
|
self.success_rate_history.pop(0)
|
|
if uptime >= self.peak_grace_period and sr > self.peak_success_rate:
|
|
self.peak_success_rate = sr
|
|
|
|
# Average latency for this interval
|
|
avg_lat = self.get_avg_latency()
|
|
self.latency_history.append(round(avg_lat, 0))
|
|
if len(self.latency_history) > self.HISTORY_SIZE:
|
|
self.latency_history.pop(0)
|
|
|
|
self.last_history_time = now
|
|
self.last_history_tested = self.tested
|
|
self.last_history_passed = self.passed
|
|
|
|
# Reset recent window every 60s
|
|
if now - self.recent_start >= 60:
|
|
self.recent_tested = 0
|
|
self.recent_passed = 0
|
|
self.recent_start = now
|
|
|
|
# Hourly aggregation
|
|
if now - self.hourly_start >= 3600:
|
|
self.hours_data.append({
|
|
'tested': self.hourly_tested,
|
|
'passed': self.hourly_passed,
|
|
'rate': self.hourly_passed / 3600.0 if self.hourly_tested > 0 else 0,
|
|
'success_rate': (self.hourly_passed / self.hourly_tested * 100) if self.hourly_tested > 0 else 0,
|
|
})
|
|
if len(self.hours_data) > 24:
|
|
self.hours_data.pop(0)
|
|
self.hourly_tested = 0
|
|
self.hourly_passed = 0
|
|
self.hourly_start = now
|
|
|
|
def get_recent_rate(self):
|
|
"""Get rate for last 60 seconds."""
|
|
with self.lock:
|
|
elapsed = time.time() - self.recent_start
|
|
if elapsed > 0:
|
|
return self.recent_tested / elapsed
|
|
return 0.0
|
|
|
|
def get_recent_success_rate(self):
|
|
"""Get success rate for last 60 seconds."""
|
|
with self.lock:
|
|
if self.recent_tested > 0:
|
|
return (self.recent_passed / self.recent_tested) * 100
|
|
return 0.0
|
|
|
|
def get_avg_latency(self):
|
|
"""Get average latency in ms."""
|
|
with self.lock:
|
|
if self.latency_count > 0:
|
|
return self.latency_sum / self.latency_count
|
|
return 0.0
|
|
|
|
def get_latency_percentiles(self):
|
|
"""Get latency percentiles (p50, p90, p99)."""
|
|
with self.lock:
|
|
if not self.latency_samples:
|
|
return {'p50': 0, 'p90': 0, 'p99': 0}
|
|
sorted_samples = sorted(self.latency_samples)
|
|
n = len(sorted_samples)
|
|
return {
|
|
'p50': sorted_samples[int(n * 0.50)] if n > 0 else 0,
|
|
'p90': sorted_samples[int(n * 0.90)] if n > 0 else 0,
|
|
'p99': sorted_samples[min(int(n * 0.99), n - 1)] if n > 0 else 0,
|
|
}
|
|
|
|
def get_latency_histogram(self):
|
|
"""Get latency distribution histogram."""
|
|
with self.lock:
|
|
total = sum(self.latency_buckets.values())
|
|
if total == 0:
|
|
return []
|
|
result = []
|
|
prev = 0
|
|
for bucket in self.LATENCY_BUCKETS:
|
|
count = self.latency_buckets[bucket]
|
|
result.append({
|
|
'range': '%d-%d' % (prev, bucket),
|
|
'count': count,
|
|
'pct': round(count / total * 100, 1),
|
|
})
|
|
prev = bucket
|
|
# Over max bucket
|
|
over = self.latency_buckets[float('inf')]
|
|
if over > 0:
|
|
result.append({
|
|
'range': '>%d' % self.LATENCY_BUCKETS[-1],
|
|
'count': over,
|
|
'pct': round(over / total * 100, 1),
|
|
})
|
|
return result
|
|
|
|
def get_proto_stats(self):
|
|
"""Get protocol-specific success rates and failure breakdown."""
|
|
with self.lock:
|
|
result = {}
|
|
for proto in ['http', 'socks4', 'socks5']:
|
|
tested = self.proto_tested[proto]
|
|
passed = self.proto_passed[proto]
|
|
failed = sum(self.proto_failed[proto].values())
|
|
result[proto] = {
|
|
'tested': tested,
|
|
'passed': passed,
|
|
'failed': failed,
|
|
'success_rate': round(passed / tested * 100, 1) if tested > 0 else 0,
|
|
'fail_reasons': dict(self.proto_failed[proto]) if self.proto_failed[proto] else {},
|
|
}
|
|
return result
|
|
|
|
def get_top_countries(self, limit=10):
|
|
"""Get top countries by working proxy count."""
|
|
with self.lock:
|
|
sorted_countries = sorted(self.country_passed.items(), key=lambda x: -x[1])
|
|
return sorted_countries[:limit]
|
|
|
|
def get_top_asns(self, limit=10):
|
|
"""Get top ASNs by working proxy count."""
|
|
with self.lock:
|
|
sorted_asns = sorted(self.asn_passed.items(), key=lambda x: -x[1])
|
|
return sorted_asns[:limit]
|
|
|
|
def get_hourly_data(self):
|
|
"""Get last 24 hours of hourly data."""
|
|
with self.lock:
|
|
return list(self.hours_data)
|
|
|
|
def load_state(self, state):
|
|
"""Load persisted state from a dict (from database).
|
|
|
|
Args:
|
|
state: dict from dbs.load_session_state()
|
|
"""
|
|
if not state:
|
|
return
|
|
with self.lock:
|
|
self.tested = state.get('tested', 0)
|
|
self.passed = state.get('passed', 0)
|
|
self.failed = state.get('failed', 0)
|
|
self.ssl_tested = state.get('ssl_tested', 0)
|
|
self.ssl_passed = state.get('ssl_passed', 0)
|
|
self.ssl_failed = state.get('ssl_failed', 0)
|
|
self.mitm_detected = state.get('mitm_detected', 0)
|
|
self.cert_errors = state.get('cert_errors', 0)
|
|
self.proto_tested['http'] = state.get('proto_http_tested', 0)
|
|
self.proto_passed['http'] = state.get('proto_http_passed', 0)
|
|
self.proto_tested['socks4'] = state.get('proto_socks4_tested', 0)
|
|
self.proto_passed['socks4'] = state.get('proto_socks4_passed', 0)
|
|
self.proto_tested['socks5'] = state.get('proto_socks5_tested', 0)
|
|
self.proto_passed['socks5'] = state.get('proto_socks5_passed', 0)
|
|
# Note: peak_rate is per-session, not restored (avoids stale/corrupt values)
|
|
# Note: start_time is NOT restored - uptime reflects current session
|
|
# Restore failure categories
|
|
if state.get('fail_categories'):
|
|
self.fail_categories = dict(state['fail_categories'])
|
|
# Restore SSL failure categories
|
|
if state.get('ssl_fail_categories'):
|
|
self.ssl_fail_categories = dict(state['ssl_fail_categories'])
|
|
# Restore protocol failure categories
|
|
if state.get('proto_failed'):
|
|
for proto in ['http', 'socks4', 'socks5']:
|
|
if proto in state['proto_failed']:
|
|
self.proto_failed[proto] = dict(state['proto_failed'][proto])
|
|
# Restore geo tracking
|
|
if state.get('country_passed'):
|
|
self.country_passed = dict(state['country_passed'])
|
|
if state.get('asn_passed'):
|
|
# Convert string keys back to int for ASN
|
|
self.asn_passed = {int(k) if k.isdigit() else k: v
|
|
for k, v in state['asn_passed'].items()}
|
|
_log('restored session: %d tested, %d passed' % (self.tested, self.passed), 'info')
|
|
|
|
def should_report(self, interval):
|
|
return (time.time() - self.last_report) >= interval
|
|
|
|
def report(self):
|
|
with self.lock:
|
|
self.last_report = time.time()
|
|
elapsed = time.time() - self.start_time
|
|
rate = try_div(self.tested, elapsed)
|
|
pct = try_div(self.passed * 100.0, self.tested)
|
|
base = 'tested=%d passed=%d (%.1f%%) rate=%.2f/s uptime=%dm' % (
|
|
self.tested, self.passed, pct, rate, int(elapsed / 60))
|
|
# Add failure breakdown if there are failures
|
|
if self.fail_categories:
|
|
cats = ' '.join('%s=%d' % (k, v) for k, v in sorted(self.fail_categories.items()))
|
|
return '%s [%s]' % (base, cats)
|
|
return base
|
|
|
|
def get_full_stats(self):
|
|
"""Get comprehensive stats dict for API."""
|
|
with self.lock:
|
|
elapsed = time.time() - self.start_time
|
|
return {
|
|
'tested': self.tested,
|
|
'passed': self.passed,
|
|
'failed': self.failed,
|
|
'success_rate': round(self.passed / self.tested * 100, 1) if self.tested > 0 else 0,
|
|
'rate': round(self.tested / elapsed, 2) if elapsed > 0 else 0,
|
|
'pass_rate': round(self.passed / elapsed, 2) if elapsed > 0 else 0,
|
|
'recent_rate': self.get_recent_rate(),
|
|
'recent_success_rate': self.get_recent_success_rate(),
|
|
'peak_rate': self.peak_rate,
|
|
'peak_success_rate': self.peak_success_rate,
|
|
'uptime_seconds': int(elapsed),
|
|
'rate_history': list(self.rate_history),
|
|
'success_rate_history': list(self.success_rate_history),
|
|
'latency_history': list(self.latency_history),
|
|
'avg_latency': self.get_avg_latency(),
|
|
'min_latency': self.min_latency if self.min_latency != float('inf') else 0,
|
|
'max_latency': self.max_latency,
|
|
'latency_percentiles': self.get_latency_percentiles(),
|
|
'latency_histogram': self.get_latency_histogram(),
|
|
'by_proto': dict(self.proto_passed),
|
|
'proto_stats': self.get_proto_stats(),
|
|
'failures': dict(self.fail_categories),
|
|
'top_countries': self.get_top_countries(),
|
|
'top_asns': self.get_top_asns(),
|
|
'hourly_data': self.get_hourly_data(),
|
|
}
|
|
|
|
|
|
def try_div(a, b):
|
|
if b != 0: return a/float(b)
|
|
return 0
|
|
|
|
|
|
class MITMCertStats(object):
|
|
"""Track MITM certificate statistics."""
|
|
|
|
def __init__(self):
|
|
self.lock = threading.Lock()
|
|
self.certs = {} # fingerprint -> cert_info dict
|
|
self.by_org = {} # organization -> count
|
|
self.by_issuer = {} # issuer CN -> count
|
|
self.by_proxy = {} # proxy IP -> list of fingerprints
|
|
self.total_count = 0
|
|
self.recent_certs = [] # last N certificates seen
|
|
|
|
def add_cert(self, proxy_ip, cert_info):
|
|
"""Add a MITM certificate to statistics."""
|
|
if not cert_info:
|
|
return
|
|
fp = cert_info.get('fingerprint', '')
|
|
if not fp:
|
|
return
|
|
|
|
with self.lock:
|
|
self.total_count += 1
|
|
|
|
# Store unique certs by fingerprint
|
|
if fp not in self.certs:
|
|
self.certs[fp] = cert_info
|
|
self.certs[fp]['first_seen'] = time.time()
|
|
self.certs[fp]['count'] = 1
|
|
self.certs[fp]['proxies'] = [proxy_ip]
|
|
else:
|
|
self.certs[fp]['count'] += 1
|
|
self.certs[fp]['last_seen'] = time.time()
|
|
if proxy_ip not in self.certs[fp]['proxies']:
|
|
self.certs[fp]['proxies'].append(proxy_ip)
|
|
|
|
# Track by organization
|
|
org = cert_info.get('subject_o', 'Unknown')
|
|
self.by_org[org] = self.by_org.get(org, 0) + 1
|
|
|
|
# Track by issuer
|
|
issuer = cert_info.get('issuer_cn', 'Unknown')
|
|
self.by_issuer[issuer] = self.by_issuer.get(issuer, 0) + 1
|
|
|
|
# Track proxies using this cert
|
|
if proxy_ip not in self.by_proxy:
|
|
self.by_proxy[proxy_ip] = []
|
|
if fp not in self.by_proxy[proxy_ip]:
|
|
self.by_proxy[proxy_ip].append(fp)
|
|
|
|
# Keep recent certs (last 50)
|
|
self.recent_certs.append({
|
|
'fingerprint': fp,
|
|
'proxy': proxy_ip,
|
|
'subject_cn': cert_info.get('subject_cn', ''),
|
|
'issuer_cn': cert_info.get('issuer_cn', ''),
|
|
'timestamp': time.time()
|
|
})
|
|
if len(self.recent_certs) > 50:
|
|
self.recent_certs.pop(0)
|
|
|
|
def get_stats(self):
|
|
"""Get MITM certificate statistics for API."""
|
|
with self.lock:
|
|
# Top organizations
|
|
top_orgs = sorted(self.by_org.items(), key=lambda x: x[1], reverse=True)[:10]
|
|
# Top issuers
|
|
top_issuers = sorted(self.by_issuer.items(), key=lambda x: x[1], reverse=True)[:10]
|
|
# Unique certs sorted by count
|
|
unique_certs = []
|
|
for fp, info in self.certs.items():
|
|
cert_entry = {'fingerprint': fp}
|
|
cert_entry.update(info)
|
|
unique_certs.append(cert_entry)
|
|
unique_certs = sorted(unique_certs, key=lambda x: x.get('count', 0), reverse=True)[:20]
|
|
|
|
return {
|
|
'total_detections': self.total_count,
|
|
'unique_certs': len(self.certs),
|
|
'unique_proxies': len(self.by_proxy),
|
|
'top_organizations': [{'name': o, 'count': c} for o, c in top_orgs],
|
|
'top_issuers': [{'name': i, 'count': c} for i, c in top_issuers],
|
|
'certificates': unique_certs,
|
|
'recent': list(self.recent_certs[-20:])
|
|
}
|
|
|
|
def save_state(self, filepath):
|
|
"""Save MITM stats to JSON file for persistence."""
|
|
import json
|
|
with self.lock:
|
|
state = {
|
|
'certs': self.certs,
|
|
'by_org': self.by_org,
|
|
'by_issuer': self.by_issuer,
|
|
'by_proxy': self.by_proxy,
|
|
'total_count': self.total_count,
|
|
'recent_certs': self.recent_certs[-50:],
|
|
}
|
|
try:
|
|
with open(filepath, 'w') as f:
|
|
json.dump(state, f)
|
|
except Exception as e:
|
|
_log('failed to save MITM state: %s' % str(e), 'warn')
|
|
|
|
def load_state(self, filepath):
|
|
"""Load MITM stats from JSON file."""
|
|
import json
|
|
try:
|
|
with open(filepath, 'r') as f:
|
|
state = json.load(f)
|
|
with self.lock:
|
|
self.certs = state.get('certs', {})
|
|
self.by_org = state.get('by_org', {})
|
|
self.by_issuer = state.get('by_issuer', {})
|
|
self.by_proxy = state.get('by_proxy', {})
|
|
self.total_count = state.get('total_count', 0)
|
|
self.recent_certs = state.get('recent_certs', [])
|
|
_log('restored MITM state: %d certs, %d detections' % (
|
|
len(self.certs), self.total_count), 'info')
|
|
except IOError:
|
|
pass # File doesn't exist yet, start fresh
|
|
except Exception as e:
|
|
_log('failed to load MITM state: %s' % str(e), 'warn')
|
|
|
|
|
|
def extract_cert_info(cert_der):
|
|
"""Extract certificate information from DER-encoded certificate.
|
|
|
|
Args:
|
|
cert_der: DER-encoded certificate bytes
|
|
|
|
Returns:
|
|
dict with certificate details or None on failure
|
|
"""
|
|
import hashlib
|
|
try:
|
|
# Decode DER to get certificate details
|
|
# Python 2/3 compatible approach using ssl module
|
|
from OpenSSL import crypto
|
|
x509 = crypto.load_certificate(crypto.FILETYPE_ASN1, cert_der)
|
|
|
|
subject = x509.get_subject()
|
|
issuer = x509.get_issuer()
|
|
|
|
# Parse dates (format: YYYYMMDDhhmmssZ)
|
|
not_before = x509.get_notBefore()
|
|
not_after = x509.get_notAfter()
|
|
if isinstance(not_before, bytes):
|
|
not_before = not_before.decode('ascii')
|
|
if isinstance(not_after, bytes):
|
|
not_after = not_after.decode('ascii')
|
|
|
|
# Calculate fingerprint
|
|
fp = hashlib.sha256(cert_der).hexdigest()
|
|
|
|
return {
|
|
'fingerprint': fp[:16], # Short fingerprint for display
|
|
'fingerprint_full': fp,
|
|
'subject_cn': subject.CN or '',
|
|
'subject_o': subject.O or '',
|
|
'subject_ou': subject.OU or '',
|
|
'subject_c': subject.C or '',
|
|
'issuer_cn': issuer.CN or '',
|
|
'issuer_o': issuer.O or '',
|
|
'serial': str(x509.get_serial_number()),
|
|
'not_before': not_before,
|
|
'not_after': not_after,
|
|
'version': x509.get_version(),
|
|
'sig_algo': x509.get_signature_algorithm().decode('ascii') if hasattr(x509.get_signature_algorithm(), 'decode') else str(x509.get_signature_algorithm()),
|
|
}
|
|
except ImportError:
|
|
# Fallback if pyOpenSSL not available - basic info from hashlib
|
|
import hashlib
|
|
fp = hashlib.sha256(cert_der).hexdigest()
|
|
return {
|
|
'fingerprint': fp[:16],
|
|
'fingerprint_full': fp,
|
|
'subject_cn': '(pyOpenSSL not installed)',
|
|
'subject_o': '',
|
|
'issuer_cn': '',
|
|
'issuer_o': '',
|
|
'serial': '',
|
|
'not_before': '',
|
|
'not_after': '',
|
|
}
|
|
except Exception as e:
|
|
return None
|
|
|
|
|
|
def get_mitm_certificate(proxy_ip, proxy_port, proto, torhost, target_host, target_port, timeout, auth=None):
|
|
"""Connect to target through proxy without cert verification to get MITM cert.
|
|
|
|
Args:
|
|
proxy_ip: Proxy IP address
|
|
proxy_port: Proxy port
|
|
proto: Proxy protocol (http, socks4, socks5)
|
|
torhost: Tor SOCKS5 address
|
|
target_host: Target host for SSL connection
|
|
target_port: Target port (usually 443)
|
|
timeout: Connection timeout
|
|
auth: Optional auth credentials (user:pass)
|
|
|
|
Returns:
|
|
dict with certificate info or None on failure
|
|
"""
|
|
try:
|
|
if auth:
|
|
proxy_url = '%s://%s@%s:%s' % (proto, auth, proxy_ip, proxy_port)
|
|
else:
|
|
proxy_url = '%s://%s:%s' % (proto, proxy_ip, proxy_port)
|
|
proxies = [
|
|
rocksock.RocksockProxyFromURL(tor_proxy_url(torhost)),
|
|
rocksock.RocksockProxyFromURL(proxy_url),
|
|
]
|
|
|
|
# Connect without certificate verification
|
|
sock = rocksock.Rocksock(host=target_host, port=target_port, ssl=True,
|
|
proxies=proxies, timeout=timeout, verifycert=False)
|
|
sock.connect()
|
|
|
|
# Get peer certificate
|
|
cert_der = sock.sock.getpeercert(binary_form=True)
|
|
sock.disconnect()
|
|
|
|
if cert_der:
|
|
return extract_cert_info(cert_der)
|
|
return None
|
|
except Exception as e:
|
|
return None
|
|
|
|
|
|
# Global MITM cert stats instance
|
|
mitm_cert_stats = MITMCertStats()
|
|
|
|
|
|
class PriorityJobQueue(object):
|
|
"""Priority queue for proxy test jobs.
|
|
|
|
Lower priority number = higher priority.
|
|
Priority 0: New proxies (never tested)
|
|
Priority 1: Recently working (no failures, has successes)
|
|
Priority 2: Low fail count (< 3 failures)
|
|
Priority 3: Medium fail count
|
|
Priority 4: High fail count
|
|
"""
|
|
|
|
def __init__(self):
|
|
self.heap = []
|
|
self.lock = threading.Lock()
|
|
self.not_empty = threading.Condition(self.lock)
|
|
self.counter = 0 # tie-breaker for equal priorities
|
|
|
|
def put(self, job, priority=3):
|
|
"""Add job with priority (lower = higher priority)."""
|
|
with self.lock:
|
|
# Reset counter when queue was empty to prevent unbounded growth
|
|
if not self.heap:
|
|
self.counter = 0
|
|
heapq.heappush(self.heap, (priority, self.counter, job))
|
|
self.counter += 1
|
|
self.not_empty.notify()
|
|
|
|
def get(self, timeout=None):
|
|
"""Get highest priority job. Raises Queue.Empty on timeout."""
|
|
with self.not_empty:
|
|
if timeout is None:
|
|
while not self.heap:
|
|
self.not_empty.wait()
|
|
else:
|
|
end_time = time.time() + timeout
|
|
while not self.heap:
|
|
remaining = end_time - time.time()
|
|
if remaining <= 0:
|
|
raise Queue.Empty()
|
|
self.not_empty.wait(remaining)
|
|
_, _, job = heapq.heappop(self.heap)
|
|
return job
|
|
|
|
def get_nowait(self):
|
|
"""Get job without waiting. Raises Queue.Empty if empty."""
|
|
with self.lock:
|
|
if not self.heap:
|
|
raise Queue.Empty()
|
|
_, _, job = heapq.heappop(self.heap)
|
|
return job
|
|
|
|
def empty(self):
|
|
"""Check if queue is empty."""
|
|
with self.lock:
|
|
return len(self.heap) == 0
|
|
|
|
def qsize(self):
|
|
"""Return queue size."""
|
|
with self.lock:
|
|
return len(self.heap)
|
|
|
|
def task_done(self):
|
|
"""Compatibility method (no-op for heap queue)."""
|
|
pass
|
|
|
|
|
|
def calculate_priority(failcount, success_count, max_fail):
|
|
"""Calculate job priority based on proxy state.
|
|
|
|
Returns:
|
|
int: Priority 0-4 (lower = higher priority)
|
|
"""
|
|
# New proxy (never successfully tested)
|
|
if success_count == 0 and failcount == 0:
|
|
return 0
|
|
# Recently working (no current failures)
|
|
if failcount == 0:
|
|
return 1
|
|
# Low fail count
|
|
if failcount < 3:
|
|
return 2
|
|
# Medium fail count
|
|
if failcount < max_fail // 2:
|
|
return 3
|
|
# High fail count
|
|
return 4
|
|
|
|
|
|
class ThreadScaler(object):
|
|
"""Dynamic thread scaling based on queue depth and success rate.
|
|
|
|
With gevent greenlets, we can scale much more aggressively than with
|
|
OS threads since greenlets are cooperative and lightweight.
|
|
|
|
Scales up when:
|
|
- Queue depth exceeds high watermark
|
|
- Success rate is above threshold
|
|
Scales down when:
|
|
- Queue is nearly empty
|
|
- Success rate drops below threshold
|
|
"""
|
|
|
|
def __init__(self, min_threads=5, max_threads=100, target_queue_per_thread=5,
|
|
scale_cooldown=10, scale_threshold=10.0):
|
|
self.min_threads = min_threads
|
|
self.max_threads = max_threads
|
|
self.target_queue_per_thread = target_queue_per_thread
|
|
self.last_scale_time = 0
|
|
self.scale_cooldown = scale_cooldown
|
|
self.success_threshold = scale_threshold
|
|
|
|
def should_scale(self, current_threads, queue_size, stats):
|
|
"""Determine if scaling is needed.
|
|
|
|
Args:
|
|
current_threads: Current number of active threads
|
|
queue_size: Current job queue depth
|
|
stats: Stats object with success/fail counts
|
|
|
|
Returns:
|
|
int: Target thread count (may be same as current)
|
|
"""
|
|
now = time.time()
|
|
if (now - self.last_scale_time) < self.scale_cooldown:
|
|
return current_threads
|
|
|
|
# Calculate current success rate
|
|
with stats.lock:
|
|
total = stats.tested
|
|
passed = stats.passed
|
|
success_rate = try_div(passed * 100.0, total) if total > 0 else 50.0
|
|
|
|
# Calculate ideal thread count based on queue depth
|
|
# Never spawn more threads than jobs available
|
|
ideal = max(self.min_threads, queue_size // self.target_queue_per_thread)
|
|
ideal = min(ideal, self.max_threads, max(self.min_threads, queue_size))
|
|
|
|
target = current_threads
|
|
|
|
# Scale up: queue is deep - scale based on queue depth only
|
|
# With greenlets, scale more aggressively (+5 instead of +2)
|
|
if queue_size > current_threads * self.target_queue_per_thread:
|
|
target = min(current_threads + 5, ideal, self.max_threads)
|
|
|
|
# Scale down: queue is shallow
|
|
elif queue_size < current_threads * 2:
|
|
# Scale down faster when threads far exceed ideal
|
|
reduction = 2 if current_threads <= ideal * 2 else 5
|
|
target = max(current_threads - reduction, ideal, self.min_threads)
|
|
|
|
if target != current_threads:
|
|
self.last_scale_time = now
|
|
|
|
return target
|
|
|
|
def status_line(self, current_threads, queue_size):
|
|
"""Return status string for logging."""
|
|
return 'threads=%d queue=%d target_per_thread=%d' % (
|
|
current_threads, queue_size, self.target_queue_per_thread)
|
|
|
|
def socks4_resolve(srvname, server_port):
|
|
"""Resolve hostname to IP for SOCKS4 (which requires numeric IP).
|
|
|
|
Caches results for DNS_CACHE_TTL seconds to avoid repeated lookups.
|
|
"""
|
|
now = time.time()
|
|
# Check cache with TTL
|
|
if srvname in cached_dns:
|
|
ip, ts = cached_dns[srvname]
|
|
if now - ts < DNS_CACHE_TTL:
|
|
if config.watchd.debug:
|
|
_log("using cached ip (%s) for %s" % (ip, srvname), "debug")
|
|
return ip
|
|
# Expired - fall through to re-resolve
|
|
|
|
# Resolve hostname
|
|
dns_fail = False
|
|
try:
|
|
af, sa = rocksock.resolve(rocksock.RocksockHostinfo(srvname, server_port), want_v4=True)
|
|
if sa is not None:
|
|
cached_dns[srvname] = (sa[0], now)
|
|
return sa[0]
|
|
else:
|
|
dns_fail = True
|
|
except rocksock.RocksockException as e:
|
|
assert(e.get_errortype() == rocksock.RS_ET_GAI)
|
|
dns_fail = True
|
|
|
|
if dns_fail:
|
|
_log("could not resolve connection target %s" % srvname, "ERROR")
|
|
return False
|
|
return srvname
|
|
|
|
|
|
class ProxyTestState():
|
|
"""Thread-safe state for a proxy being tested.
|
|
|
|
Holds test results and evaluates final pass/fail status.
|
|
"""
|
|
def __init__(self, ip, port, proto, failcount, success_count, total_duration,
|
|
country, mitm, consecutive_success, asn=None, oldies=False,
|
|
completion_queue=None, proxy_full=None):
|
|
self.ip = ip
|
|
self.port = int(port)
|
|
self.proxy = '%s:%s' % (ip, port)
|
|
# Parse auth credentials from full proxy string (user:pass@ip:port)
|
|
self.auth = None
|
|
if proxy_full and '@' in str(proxy_full):
|
|
auth_part = str(proxy_full).rsplit('@', 1)[0]
|
|
if ':' in auth_part:
|
|
self.auth = auth_part # user:pass
|
|
self.proto = proto
|
|
self.failcount = failcount
|
|
self.checktime = None
|
|
self.success_count = success_count
|
|
self.total_duration = total_duration
|
|
self.country = country
|
|
self.mitm = mitm
|
|
self.consecutive_success = consecutive_success
|
|
self.asn = asn
|
|
self.isoldies = oldies
|
|
self.completion_queue = completion_queue # for signaling completion
|
|
|
|
# thread-safe result accumulation
|
|
self.lock = threading.Lock()
|
|
self.results = [] # list of (success, proto, duration, srv, tor, ssl)
|
|
self.completed = False
|
|
self.evaluated = False # for evaluate() idempotency
|
|
self.last_latency_ms = None # average latency from successful tests
|
|
self.exit_ip = None # IP seen by target server (for anonymity detection)
|
|
self.reveals_headers = None # True if proxy adds X-Forwarded-For/Via headers
|
|
self.last_fail_category = None # failure category from last test (for dead detection)
|
|
self.original_failcount = failcount # failcount before this test cycle
|
|
# SSL/TLS tracking
|
|
self.had_ssl_test = False
|
|
self.ssl_success = False
|
|
self.cert_error = False
|
|
|
|
def record_result(self, success, proto=None, duration=0, srv=None, tor=None, ssl=None, category=None, exit_ip=None, reveals_headers=None):
|
|
"""Record a single target test result. Thread-safe.
|
|
|
|
When all target tests complete, signals via completion_queue.
|
|
"""
|
|
_dbg('record: success=%s proto=%s srv=%s cat=%s' % (success, proto, srv, category), self.proxy)
|
|
should_signal = False
|
|
with self.lock:
|
|
self.results.append({
|
|
'success': success,
|
|
'proto': proto,
|
|
'duration': duration,
|
|
'srv': srv,
|
|
'tor': tor,
|
|
'ssl': ssl,
|
|
'category': category,
|
|
'exit_ip': exit_ip,
|
|
'reveals_headers': reveals_headers
|
|
})
|
|
# Track SSL tests
|
|
if ssl:
|
|
self.had_ssl_test = True
|
|
if success:
|
|
self.ssl_success = True
|
|
# Track cert errors
|
|
if category in ('cert_error', 'ssl_error', 'ssl_mitm'):
|
|
self.cert_error = True
|
|
# Check completion (single-target mode)
|
|
if not self.completed and len(self.results) >= 1:
|
|
self.completed = True
|
|
should_signal = True
|
|
# Signal outside lock to avoid deadlock
|
|
if should_signal and self.completion_queue is not None:
|
|
self.completion_queue.put(self)
|
|
|
|
def is_complete(self):
|
|
"""Check if all target tests have finished."""
|
|
# Fast path: check completed flag without lock (atomic read)
|
|
if self.completed:
|
|
return True
|
|
# Slow path: check with lock (only during transition)
|
|
with self.lock:
|
|
return len(self.results) >= 1
|
|
|
|
@staticmethod
|
|
def rwip(ip):
|
|
"""Remove leading zeros from IP octets (e.g., 192.168.001.010 -> 192.168.1.10)."""
|
|
return '.'.join(str(int(b)) for b in ip.split('.'))
|
|
|
|
def evaluate(self):
|
|
"""Evaluate results after all tests complete.
|
|
|
|
Returns:
|
|
(success, category) tuple where success is bool and category is
|
|
the dominant failure type (or None on success)
|
|
"""
|
|
with self.lock:
|
|
if self.evaluated:
|
|
# Already evaluated - return cached result
|
|
return (self.failcount == 0, self.last_fail_category)
|
|
self.evaluated = True
|
|
self.checktime = int(time.time())
|
|
|
|
successes = [r for r in self.results if r['success']]
|
|
failures = [r for r in self.results if not r['success']]
|
|
num_success = len(successes)
|
|
_dbg('evaluate: %d success, %d fail, results=%d' % (num_success, len(failures), len(self.results)), self.proxy)
|
|
|
|
# Determine dominant failure category
|
|
fail_category = None
|
|
if failures:
|
|
cats = {}
|
|
for f in failures:
|
|
cat = f.get('category') or 'other'
|
|
cats[cat] = cats.get(cat, 0) + 1
|
|
if cats:
|
|
fail_category = max(cats.keys(), key=lambda k: cats[k])
|
|
|
|
# require success (single target mode)
|
|
if num_success >= 1:
|
|
# use last successful result for metrics
|
|
last_good = successes[-1]
|
|
|
|
# Extract exit IP and header info from successful judge responses
|
|
for s in successes:
|
|
if s.get('exit_ip'):
|
|
self.exit_ip = s['exit_ip']
|
|
if s.get('reveals_headers') is not None:
|
|
self.reveals_headers = s['reveals_headers']
|
|
|
|
if geolite and self.country is None:
|
|
self.ip = self.rwip(self.ip)
|
|
rec = geodb.get_all(self.ip)
|
|
if rec is not None and rec.country_short:
|
|
self.country = rec.country_short
|
|
|
|
if asn_enabled and self.asn is None:
|
|
try:
|
|
asn_result = asndb.lookup(self.ip)
|
|
if asn_result and asn_result[0]:
|
|
self.asn = asn_result[0]
|
|
except Exception as e:
|
|
if config.watchd.debug:
|
|
_log('ASN lookup failed for %s: %s' % (self.ip, e), 'debug')
|
|
|
|
self.proto = last_good['proto']
|
|
self.failcount = 0
|
|
# Only reset mitm after 3 consecutive clean successes (not on first success)
|
|
# and only if this test didn't detect MITM
|
|
if self.consecutive_success > 0 and (self.consecutive_success % 3) == 0 and not self.cert_error:
|
|
self.mitm = 0
|
|
self.consecutive_success += 1
|
|
self.success_count += 1
|
|
self.total_duration += int(last_good['duration'] * 1000)
|
|
|
|
# Calculate average latency from successful tests (in ms)
|
|
durations = [s['duration'] for s in successes if s['duration']]
|
|
if durations:
|
|
self.last_latency_ms = sum(durations) * 1000.0 / len(durations)
|
|
|
|
torstats = "" if len(config.torhosts) == 1 else ' tor: %s;' % last_good['tor']
|
|
# Determine anonymity for status message
|
|
# transparent: exit_ip == proxy_ip
|
|
# anonymous: exit_ip != proxy_ip, adds revealing headers
|
|
# elite: exit_ip != proxy_ip, no revealing headers
|
|
anon_status = ''
|
|
if self.exit_ip:
|
|
if self.exit_ip == self.ip:
|
|
anon_status = ' anon: transparent;'
|
|
elif self.reveals_headers is False:
|
|
anon_status = ' anon: elite;'
|
|
elif self.reveals_headers is True:
|
|
anon_status = ' anon: anonymous;'
|
|
else:
|
|
anon_status = ' anon: anonymous;' # default if no header check
|
|
_log('%s://%s:%d (%s) d: %.2f sec(s);%s%s srv: %s; ssl: %s' % (
|
|
last_good['proto'], self.ip, self.port, self.country,
|
|
last_good['duration'], torstats, anon_status, last_good['srv'], str(last_good['ssl'])), 'info')
|
|
_dbg('PASS: failcount=0', self.proxy)
|
|
return (True, None)
|
|
|
|
else:
|
|
# Real proxy failure
|
|
self.failcount += 1
|
|
self.consecutive_success = 0
|
|
self.last_fail_category = fail_category
|
|
_dbg('FAIL: failcount=%d cat=%s' % (self.failcount, fail_category), self.proxy)
|
|
return (False, fail_category)
|
|
|
|
|
|
class TargetTestJob():
|
|
"""Job to test a single proxy against a single target.
|
|
|
|
Multiple TargetTestJob instances share the same ProxyTestState,
|
|
allowing tests to be interleaved with other proxies in the queue.
|
|
"""
|
|
def __init__(self, proxy_state, target_srv, checktype, worker_id=None):
|
|
self.proxy_state = proxy_state
|
|
self.target_srv = target_srv
|
|
self.checktype = checktype
|
|
self.worker_id = worker_id
|
|
|
|
def run(self):
|
|
"""Test the proxy against this job's target server."""
|
|
# DIAGNOSTIC: Verify run() is being called
|
|
global _sample_debug_counter
|
|
with _sample_debug_lock:
|
|
_sample_debug_counter += 1
|
|
if _sample_debug_counter <= 3 or _sample_debug_counter % 50 == 0:
|
|
_log('JOB RUN #%d: %s -> %s (%s)' % (_sample_debug_counter,
|
|
self.proxy_state.proxy, self.target_srv, self.checktype), 'info')
|
|
network_stats.set_category('proxy')
|
|
_dbg('test start: %s via %s' % (self.target_srv, self.checktype), self.proxy_state.proxy)
|
|
sock, proto, duration, tor, srv, failinc, is_ssl, err_cat = self._connect_and_test()
|
|
_dbg('connect result: sock=%s proto=%s err=%s' % (bool(sock), proto, err_cat), self.proxy_state.proxy)
|
|
|
|
if not sock:
|
|
# SSL-only check passed (handshake success, no request needed)
|
|
if err_cat == 'ssl_ok':
|
|
elapsed = time.time() - duration
|
|
self.proxy_state.record_result(
|
|
True, proto=proto, duration=elapsed,
|
|
srv=srv, tor=tor, ssl=is_ssl
|
|
)
|
|
elif err_cat == 'ssl_mitm':
|
|
# MITM detected - proxy works but intercepts TLS
|
|
elapsed = time.time() - duration
|
|
self.proxy_state.record_result(
|
|
True, proto=proto, duration=elapsed,
|
|
srv=srv, tor=tor, ssl=is_ssl
|
|
)
|
|
self.proxy_state.mitm = 1
|
|
else:
|
|
self.proxy_state.record_result(False, category=err_cat, ssl=is_ssl)
|
|
return
|
|
|
|
try:
|
|
recv = sock.recv(-1)
|
|
_sample_dbg('RECV: %d bytes from %s, first 80: %r' % (len(recv), srv, recv[:80]), self.proxy_state.proxy)
|
|
|
|
# Select regex based on check type (or fallback target)
|
|
if 'check.torproject.org' in srv:
|
|
# Tor API fallback (judge using torproject.org)
|
|
regex = r'"IsTor"\s*:\s*true'
|
|
elif self.checktype == 'irc':
|
|
regex = '^(:|NOTICE|ERROR)'
|
|
elif self.checktype == 'judges':
|
|
regex = judges[srv]
|
|
elif self.checktype == 'ssl':
|
|
# Should not reach here - ssl returns before recv
|
|
self.proxy_state.record_result(True, proto=proto, srv=srv, ssl=is_ssl)
|
|
return
|
|
else: # http
|
|
regex = regexes[srv]
|
|
|
|
_dbg('recv %d bytes, regex=%s' % (len(recv), regex[:30]), self.proxy_state.proxy)
|
|
if re.search(regex, recv, re.IGNORECASE):
|
|
elapsed = time.time() - duration
|
|
_dbg('regex MATCH, elapsed=%.2fs' % elapsed, self.proxy_state.proxy)
|
|
# Extract exit IP from judge/tor response
|
|
exit_ip = None
|
|
reveals_headers = None
|
|
if self.checktype == 'judges' or 'check.torproject.org' in srv:
|
|
ip_match = re.search(IP_PATTERN, recv)
|
|
if ip_match and is_valid_ip(ip_match.group(0)):
|
|
exit_ip = ip_match.group(0)
|
|
if self.checktype == 'judges' and 'check.torproject.org' not in srv:
|
|
# Check for header echo judge (elite detection)
|
|
if 'headers' in srv:
|
|
# If X-Forwarded-For/Via/etc present, proxy reveals chain
|
|
reveals_headers = bool(re.search(HEADER_REVEAL_PATTERN, recv, re.IGNORECASE))
|
|
# Record successful judge
|
|
judge_stats.record_success(srv)
|
|
self.proxy_state.record_result(
|
|
True, proto=proto, duration=elapsed,
|
|
srv=srv, tor=tor, ssl=is_ssl, exit_ip=exit_ip,
|
|
reveals_headers=reveals_headers
|
|
)
|
|
else:
|
|
_dbg('regex NO MATCH, recv[:100]=%r' % recv[:100], self.proxy_state.proxy)
|
|
# Check if judge is blocking us (not a proxy failure)
|
|
if self.checktype == 'judges' and JUDGE_BLOCK_RE.search(recv):
|
|
judge_stats.record_block(srv)
|
|
# Judge block = proxy worked, we got HTTP response, just no IP
|
|
# Count as success without exit_ip
|
|
block_elapsed = time.time() - duration
|
|
_dbg('judge BLOCK detected, counting as success', self.proxy_state.proxy)
|
|
self.proxy_state.record_result(
|
|
True, proto=proto, duration=block_elapsed,
|
|
srv=srv, tor=tor, ssl=is_ssl, exit_ip=None,
|
|
reveals_headers=None
|
|
)
|
|
if config.watchd.debug:
|
|
_log('judge %s challenged proxy %s (counted as success)' % (
|
|
srv, self.proxy_state.proxy), 'debug')
|
|
else:
|
|
_dbg('FAIL: no match, no block', self.proxy_state.proxy)
|
|
if self.checktype == 'judges':
|
|
judge_stats.record_failure(srv)
|
|
self.proxy_state.record_result(False, category='other')
|
|
|
|
except KeyboardInterrupt as e:
|
|
sock.disconnect()
|
|
raise e
|
|
except rocksock.RocksockException as e:
|
|
self.proxy_state.record_result(False, category=categorize_error(e))
|
|
finally:
|
|
sock.disconnect()
|
|
|
|
def _connect_and_test(self):
|
|
"""Connect to target through the proxy and send test packet.
|
|
|
|
If ssl_first is enabled:
|
|
1. Try SSL handshake first
|
|
2. If SSL succeeds -> return success
|
|
3. If SSL fails -> try secondary check (configured checktype)
|
|
"""
|
|
ps = self.proxy_state
|
|
_dbg('_connect_and_test: target=%s checktype=%s ssl_first=%s' % (
|
|
self.target_srv, self.checktype, config.watchd.ssl_first), ps.proxy)
|
|
# Always log first test to verify code path
|
|
global _sample_debug_counter
|
|
if _sample_debug_counter == 0:
|
|
_log('FIRST TEST: proxy=%s target=%s check=%s ssl_first=%s' % (
|
|
ps.proxy, self.target_srv, self.checktype, config.watchd.ssl_first), 'info')
|
|
|
|
protos = ['http', 'socks5', 'socks4'] if ps.proto is None else [ps.proto]
|
|
pool = connection_pool.get_pool()
|
|
|
|
# Phase 1: SSL handshake (if ssl_first enabled)
|
|
if config.watchd.ssl_first:
|
|
result = self._try_ssl_handshake(protos, pool)
|
|
if result is not None:
|
|
return result # SSL succeeded or MITM detected
|
|
# SSL failed for all protocols, continue to secondary check
|
|
_dbg('SSL failed, trying secondary check: %s' % self.checktype, ps.proxy)
|
|
|
|
# Phase 2: Secondary check (configured checktype)
|
|
return self._try_secondary_check(protos, pool)
|
|
|
|
def _try_ssl_handshake(self, protos, pool):
|
|
"""Attempt SSL handshake to verify proxy works with TLS.
|
|
|
|
Returns:
|
|
Tuple on success/MITM, None on failure (should try secondary check)
|
|
"""
|
|
ps = self.proxy_state
|
|
ssl_target = random.choice(ssl_targets)
|
|
last_error_category = None
|
|
|
|
for proto in protos:
|
|
if pool:
|
|
torhost = pool.get_tor_host(self.worker_id)
|
|
else:
|
|
torhost = random.choice(config.torhosts)
|
|
network_stats.set_tor_node(torhost)
|
|
|
|
if proto == 'socks4':
|
|
srv = socks4_resolve(ssl_target, 443)
|
|
else:
|
|
srv = ssl_target
|
|
if not srv:
|
|
continue
|
|
|
|
duration = time.time()
|
|
if ps.auth:
|
|
proxy_url = '%s://%s@%s:%s' % (proto, ps.auth, ps.ip, ps.port)
|
|
else:
|
|
proxy_url = '%s://%s:%s' % (proto, ps.ip, ps.port)
|
|
proxies = [
|
|
rocksock.RocksockProxyFromURL(tor_proxy_url(torhost)),
|
|
rocksock.RocksockProxyFromURL(proxy_url),
|
|
]
|
|
|
|
adaptive_timeout = config.watchd.timeout + min(
|
|
ps.failcount * config.watchd.timeout_fail_inc,
|
|
config.watchd.timeout_fail_max)
|
|
|
|
try:
|
|
sock = rocksock.Rocksock(host=srv, port=443, ssl=1,
|
|
proxies=proxies, timeout=adaptive_timeout,
|
|
verifycert=True)
|
|
_dbg('SSL handshake: proto=%s tor=%s target=%s' % (proto, torhost, ssl_target), ps.proxy)
|
|
sock.connect()
|
|
|
|
# SSL handshake succeeded
|
|
elapsed = time.time() - duration
|
|
if pool:
|
|
pool.record_success(torhost, elapsed)
|
|
sock.disconnect()
|
|
_dbg('SSL handshake OK', ps.proxy)
|
|
return None, proto, duration, torhost, ssl_target, 0, 1, 'ssl_ok'
|
|
|
|
except rocksock.RocksockException as e:
|
|
last_error_category = categorize_error(e)
|
|
et = e.get_errortype()
|
|
err = e.get_error()
|
|
|
|
try:
|
|
sock.disconnect()
|
|
except:
|
|
pass
|
|
|
|
if et == rocksock.RS_ET_SSL and err == rocksock.RS_E_SSL_CERTIFICATE_ERROR:
|
|
# MITM detected - proxy works but intercepts TLS
|
|
ps.mitm = 1
|
|
elapsed = time.time() - duration
|
|
if pool:
|
|
pool.record_success(torhost, elapsed)
|
|
_dbg('SSL MITM detected', ps.proxy)
|
|
return None, proto, duration, torhost, ssl_target, 0, 1, 'ssl_mitm'
|
|
|
|
if config.watchd.debug:
|
|
_log('SSL handshake failed: %s://%s:%d: %s' % (
|
|
proto, ps.ip, ps.port, e.get_errormessage()), 'debug')
|
|
|
|
# Check for Tor connection issues
|
|
if et == rocksock.RS_ET_OWN:
|
|
if e.get_failedproxy() == 0 and err == rocksock.RS_E_TARGET_CONN_REFUSED:
|
|
if pool:
|
|
pool.record_failure(torhost)
|
|
|
|
except KeyboardInterrupt:
|
|
raise
|
|
|
|
# All protocols failed SSL
|
|
return None
|
|
|
|
def _try_secondary_check(self, protos, pool):
|
|
"""Try the configured secondary checktype (head, judges, irc)."""
|
|
ps = self.proxy_state
|
|
_sample_dbg('TEST START: proxy=%s target=%s check=%s' % (
|
|
ps.proxy, self.target_srv, self.checktype), ps.proxy)
|
|
srvname = self.target_srv.strip() if self.checktype == 'irc' else self.target_srv
|
|
|
|
# For judges, extract host from 'host/path' format
|
|
if self.checktype == 'judges' and '/' in srvname:
|
|
connect_host = srvname.split('/')[0]
|
|
else:
|
|
connect_host = srvname
|
|
|
|
# Secondary checks: always use plain HTTP
|
|
use_ssl = 0
|
|
verifycert = False
|
|
if self.checktype == 'irc':
|
|
server_port = 6667
|
|
else:
|
|
server_port = 80
|
|
|
|
last_error_category = None
|
|
|
|
for proto in protos:
|
|
if pool:
|
|
torhost = pool.get_tor_host(self.worker_id)
|
|
else:
|
|
torhost = random.choice(config.torhosts)
|
|
network_stats.set_tor_node(torhost)
|
|
if proto == 'socks4':
|
|
srv = socks4_resolve(connect_host, server_port)
|
|
else:
|
|
srv = connect_host
|
|
if not srv:
|
|
continue
|
|
|
|
duration = time.time()
|
|
# Build proxy URL, including auth credentials if present
|
|
if ps.auth:
|
|
proxy_url = '%s://%s@%s:%s' % (proto, ps.auth, ps.ip, ps.port)
|
|
else:
|
|
proxy_url = '%s://%s:%s' % (proto, ps.ip, ps.port)
|
|
proxies = [
|
|
rocksock.RocksockProxyFromURL(tor_proxy_url(torhost)),
|
|
rocksock.RocksockProxyFromURL(proxy_url),
|
|
]
|
|
|
|
# Adaptive timeout: give proxies with failures more time
|
|
adaptive_timeout = config.watchd.timeout + min(
|
|
ps.failcount * config.watchd.timeout_fail_inc,
|
|
config.watchd.timeout_fail_max)
|
|
|
|
try:
|
|
sock = rocksock.Rocksock(host=srv, port=server_port, ssl=use_ssl,
|
|
proxies=proxies, timeout=adaptive_timeout,
|
|
verifycert=verifycert)
|
|
_dbg('connecting: proto=%s tor=%s ssl=%d' % (proto, torhost, use_ssl), ps.proxy)
|
|
_sample_dbg('CONNECT: tor=%s -> proxy=%s:%s (%s) -> %s:%d ssl=%d timeout=%.1f' % (
|
|
torhost, ps.ip, ps.port, proto, srv, server_port, use_ssl, adaptive_timeout), ps.proxy)
|
|
sock.connect()
|
|
_dbg('connected OK', ps.proxy)
|
|
_sample_dbg('CONNECTED OK: %s via %s' % (ps.proxy, proto), ps.proxy)
|
|
|
|
if self.checktype == 'irc':
|
|
sock.send('NICK\n')
|
|
elif self.checktype == 'judges':
|
|
# GET request to receive body (IP)
|
|
sock.send('GET /%s HTTP/1.0\r\nHost: %s\r\n\r\n' % (
|
|
srvname.split('/', 1)[1] if '/' in srvname else '',
|
|
srvname.split('/')[0]
|
|
))
|
|
else: # head - HEAD is sufficient for header checks
|
|
sock.send('HEAD / HTTP/1.0\r\nHost: %s\r\n\r\n' % srvname)
|
|
# Record success in pool
|
|
if pool:
|
|
pool.record_success(torhost, time.time() - duration)
|
|
return sock, proto, duration, torhost, srvname, 0, use_ssl, None
|
|
|
|
except rocksock.RocksockException as e:
|
|
last_error_category = categorize_error(e)
|
|
if config.watchd.debug:
|
|
_log("proxy failed: %s://%s:%d: %s [%s]" % (proto, ps.ip, ps.port,
|
|
e.get_errormessage(), last_error_category), 'debug')
|
|
|
|
et = e.get_errortype()
|
|
err = e.get_error()
|
|
fp = e.get_failedproxy()
|
|
_sample_dbg('ERROR: %s via %s -> %s (et=%d err=%d fp=%d cat=%s)' % (
|
|
ps.proxy, proto, e.get_errormessage(), et, err, fp, last_error_category), ps.proxy)
|
|
|
|
sock.disconnect()
|
|
|
|
if et == rocksock.RS_ET_OWN:
|
|
if fp == 1 and (err == rocksock.RS_E_REMOTE_DISCONNECTED or
|
|
err == rocksock.RS_E_HIT_TIMEOUT):
|
|
break
|
|
elif fp == 0 and err == rocksock.RS_E_TARGET_CONN_REFUSED:
|
|
# Tor connection failed - record in pool
|
|
if pool:
|
|
pool.record_failure(torhost)
|
|
if random.randint(0, (config.watchd.threads - 1) / 2) == 0:
|
|
_log("could not connect to tor, sleep 5s", "ERROR")
|
|
time.sleep(5)
|
|
elif et == rocksock.RS_ET_GAI:
|
|
_log("could not resolve connection target %s" % connect_host, "ERROR")
|
|
break
|
|
elif et == rocksock.RS_ET_SSL and err == rocksock.RS_E_SSL_CERTIFICATE_ERROR:
|
|
# MITM detected - proxy works but intercepts TLS
|
|
ps.mitm = 1
|
|
elapsed = time.time() - duration
|
|
if pool:
|
|
pool.record_success(torhost, elapsed)
|
|
# Extract MITM certificate info (async to not block)
|
|
try:
|
|
cert_info = get_mitm_certificate(
|
|
ps.ip, ps.port, proto, torhost,
|
|
connect_host, server_port, config.watchd.timeout,
|
|
auth=ps.auth
|
|
)
|
|
if cert_info:
|
|
mitm_cert_stats.add_cert(ps.ip, cert_info)
|
|
if config.watchd.debug:
|
|
_log('MITM cert: %s (CN=%s, O=%s)' % (
|
|
cert_info.get('fingerprint', ''),
|
|
cert_info.get('subject_cn', ''),
|
|
cert_info.get('subject_o', '')), 'debug')
|
|
except Exception as e:
|
|
if config.watchd.debug:
|
|
_log('failed to extract MITM cert: %s' % str(e), 'debug')
|
|
return None, proto, duration, torhost, srvname, 0, use_ssl, 'ssl_mitm'
|
|
except KeyboardInterrupt as e:
|
|
raise e
|
|
|
|
_sample_dbg('ALL PROTOS FAILED: %s last_cat=%s' % (ps.proxy, last_error_category), ps.proxy)
|
|
return None, None, None, None, None, 1, use_ssl, last_error_category
|
|
|
|
|
|
class WorkerThread():
|
|
def __init__(self, id, job_queue):
|
|
self.id = id
|
|
self.done = threading.Event()
|
|
self.thread = None
|
|
self.job_queue = job_queue # shared input queue
|
|
def stop(self):
|
|
self.done.set()
|
|
def term(self):
|
|
if self.thread: self.thread.join()
|
|
def start_thread(self):
|
|
self.thread = threading.Thread(target=self.workloop)
|
|
self.thread.start()
|
|
def workloop(self):
|
|
job_count = 0
|
|
duration_total = 0
|
|
while not self.done.is_set():
|
|
try:
|
|
job = self.job_queue.get(timeout=0.5)
|
|
except Queue.Empty:
|
|
continue
|
|
nao = time.time()
|
|
# Assign worker ID for connection pool affinity
|
|
job.worker_id = self.id
|
|
job.run()
|
|
spent = time.time() - nao
|
|
job_count += 1
|
|
duration_total += spent
|
|
self.job_queue.task_done()
|
|
if self.thread and job_count > 0:
|
|
avg_t = try_div(duration_total, job_count)
|
|
_log("terminated, %d jobs, avg.time %.2f" % (job_count, avg_t), self.id)
|
|
|
|
class Proxywatchd():
|
|
|
|
def stop(self):
|
|
_log('halting... (%d thread(s))' % len(self.threads), 'watchd')
|
|
self.stopping.set()
|
|
|
|
def _cleanup(self):
|
|
for wt in self.threads:
|
|
wt.stop()
|
|
for wt in self.threads:
|
|
wt.term()
|
|
self.collect_work()
|
|
self.submit_collected()
|
|
if self.httpd_server:
|
|
self.httpd_server.stop()
|
|
self.stopped.set()
|
|
|
|
def finish(self):
|
|
if not self.in_background: self._cleanup()
|
|
while not self.stopped.is_set(): time.sleep(0.1)
|
|
success_rate = try_div(self.totals['success'], self.totals['submitted']) * 100
|
|
_log("total results: %d/%d (%.2f%%)"%(self.totals['success'], self.totals['submitted'], success_rate), "watchd")
|
|
|
|
# Final save of session state before exit
|
|
try:
|
|
with self._db_context() as db:
|
|
dbs.save_session_state(db, self.stats)
|
|
dbs.save_stats_snapshot(db, self.stats)
|
|
_log('session state saved', 'watchd')
|
|
except Exception as e:
|
|
_log('failed to save final session state: %s' % str(e), 'warn')
|
|
|
|
# Save MITM certificate stats
|
|
try:
|
|
mitm_cert_stats.save_state(self.mitm_state_file)
|
|
_log('MITM cert state saved', 'watchd')
|
|
except Exception as e:
|
|
_log('failed to save MITM state: %s' % str(e), 'warn')
|
|
|
|
def _prep_db(self):
|
|
"""Deprecated: Use _db_context() instead for new code."""
|
|
self.mysqlite = mysqlite.mysqlite(config.watchd.database, str)
|
|
|
|
def _close_db(self):
|
|
"""Deprecated: Use _db_context() instead for new code."""
|
|
if self.mysqlite:
|
|
self.mysqlite.close()
|
|
self.mysqlite = None
|
|
|
|
@contextlib.contextmanager
|
|
def _db_context(self):
|
|
"""Context manager for database connections."""
|
|
db = mysqlite.mysqlite(config.watchd.database, str)
|
|
try:
|
|
yield db
|
|
finally:
|
|
db.close()
|
|
|
|
def __init__(self):
|
|
config.load()
|
|
self.in_background = False
|
|
self.threads = []
|
|
self.stopping = threading.Event()
|
|
self.stopped = threading.Event()
|
|
|
|
# shared work-stealing queues
|
|
self.job_queue = PriorityJobQueue()
|
|
self.completion_queue = Queue.Queue() # completed ProxyTestState objects
|
|
|
|
# track pending proxy states
|
|
self.pending_states = {} # dict: proxy -> ProxyTestState
|
|
self.pending_lock = threading.Lock()
|
|
|
|
# create table if needed (use dbs.py for canonical schema)
|
|
with self._db_context() as db:
|
|
dbs.create_table_if_not_exists(db, 'proxylist')
|
|
|
|
self.submit_after = config.watchd.submit_after # number of collected jobs before writing db
|
|
self.collected = [] # completed ProxyTestState objects ready for DB
|
|
self.totals = {
|
|
'submitted':0,
|
|
'success':0,
|
|
}
|
|
self.stats = Stats()
|
|
self.last_cleanup = time.time()
|
|
self.httpd_server = None
|
|
|
|
# Dynamic thread scaling
|
|
min_t = config.watchd.min_threads if config.watchd.min_threads > 0 else max(5, config.watchd.threads // 4)
|
|
self.scaler = ThreadScaler(
|
|
min_threads=min_t,
|
|
max_threads=config.watchd.threads,
|
|
target_queue_per_thread=5,
|
|
scale_cooldown=config.watchd.scale_cooldown,
|
|
scale_threshold=config.watchd.scale_threshold
|
|
)
|
|
self.thread_id_counter = 0
|
|
self.last_jobs_log = 0
|
|
self.last_update_log = 0
|
|
self.log_interval = 60 # seconds between routine log messages
|
|
|
|
# Register SIGHUP handler for config reload
|
|
signal.signal(signal.SIGHUP, self._handle_sighup)
|
|
|
|
def _handle_sighup(self, signum, frame):
|
|
"""Handle SIGHUP signal for config reload."""
|
|
_log('received SIGHUP, reloading config...', 'watchd')
|
|
self.reload_config()
|
|
|
|
def reload_config(self):
|
|
"""Reload configuration without restart.
|
|
|
|
Hot-reloadable settings:
|
|
threads, timeout, checktime, perfail_checktime, submit_after,
|
|
stats_interval, debug, tor_safeguard, stale_days, max_fail,
|
|
outage_threshold
|
|
|
|
Requires restart:
|
|
database, source_file, checktype
|
|
"""
|
|
old_threads = config.watchd.threads
|
|
old_checktypes = list(config.watchd.checktypes)
|
|
|
|
try:
|
|
config.load()
|
|
errors = config.validate()
|
|
if errors:
|
|
for e in errors:
|
|
_log('config error: %s' % e, 'error')
|
|
_log('config reload failed, keeping old config', 'error')
|
|
return False
|
|
except Exception as e:
|
|
_log('config reload failed: %s' % str(e), 'error')
|
|
return False
|
|
|
|
# Update runtime values
|
|
self.submit_after = config.watchd.submit_after
|
|
|
|
# Update scaler limits
|
|
self.scaler.min_threads = max(5, config.watchd.threads // 4)
|
|
self.scaler.max_threads = config.watchd.threads
|
|
|
|
# Warn about values requiring restart
|
|
if config.watchd.checktypes != old_checktypes:
|
|
_log('checktype changed (%s -> %s), requires restart to take effect' % (
|
|
','.join(old_checktypes), ','.join(config.watchd.checktypes)), 'warn')
|
|
|
|
_log('config reloaded: threads=%d timeout=%d checktime=%d max_fail=%d' % (
|
|
config.watchd.threads, config.watchd.timeout,
|
|
config.watchd.checktime, config.watchd.max_fail), 'watchd')
|
|
return True
|
|
|
|
def _spawn_thread(self):
|
|
"""Spawn a new worker thread."""
|
|
self.thread_id_counter += 1
|
|
threadid = 'dyn%d' % self.thread_id_counter
|
|
wt = WorkerThread(threadid, self.job_queue)
|
|
wt.start_thread()
|
|
self.threads.append(wt)
|
|
return threadid
|
|
|
|
def _remove_thread(self):
|
|
"""Remove one worker thread (graceful shutdown)."""
|
|
if len(self.threads) <= self.scaler.min_threads:
|
|
return None
|
|
# Stop the last thread
|
|
wt = self.threads.pop()
|
|
wt.stop()
|
|
# Don't wait for it - let it finish its current job
|
|
return wt.id
|
|
|
|
def _adjust_threads(self, target):
|
|
"""Adjust thread count to target."""
|
|
current = len(self.threads)
|
|
if target > current:
|
|
added = []
|
|
for _ in range(target - current):
|
|
tid = self._spawn_thread()
|
|
added.append(tid)
|
|
if added:
|
|
_log('scaled up: +%d threads (%s)' % (len(added), ','.join(added)), 'scaler')
|
|
elif target < current:
|
|
removed = []
|
|
for _ in range(current - target):
|
|
tid = self._remove_thread()
|
|
if tid:
|
|
removed.append(tid)
|
|
if removed:
|
|
_log('scaled down: -%d threads' % len(removed), 'scaler')
|
|
|
|
def fetch_rows(self):
|
|
self.isoldies = False
|
|
q = 'SELECT ip,port,proto,failed,success_count,total_duration,country,mitm,consecutive_success,asn,proxy FROM proxylist WHERE failed >= ? and failed < ? and (tested + ? + (failed * ?)) < ? ORDER BY RANDOM()'
|
|
now = time.time()
|
|
params = (0, config.watchd.max_fail, config.watchd.checktime, config.watchd.perfail_checktime, now)
|
|
_dbg('fetch_rows: params=(0, %d, %d, %d, %.0f)' % (config.watchd.max_fail, config.watchd.checktime, config.watchd.perfail_checktime, now))
|
|
rows = self.mysqlite.execute(q, params).fetchall()
|
|
_dbg('fetch_rows: got %d rows, threads=%d' % (len(rows), config.watchd.threads))
|
|
# check oldies ?
|
|
if len(rows) < config.watchd.threads:
|
|
_dbg('fetch_rows: rows < threads, clearing rows (oldies=%s)' % config.watchd.oldies)
|
|
rows = []
|
|
if config.watchd.oldies:
|
|
self.isoldies = True
|
|
## disable tor safeguard for old proxies
|
|
if self.tor_safeguard: self.tor_safeguard = False
|
|
rows = self.mysqlite.execute(q, (config.watchd.max_fail, config.watchd.max_fail + round( config.watchd.max_fail/2), config.watchd.checktime, config.watchd.oldies_checktime, time.time())).fetchall()
|
|
return rows
|
|
|
|
def prepare_jobs(self):
|
|
self._prep_db()
|
|
## enable tor safeguard by default
|
|
self.tor_safeguard = config.watchd.tor_safeguard
|
|
rows = self.fetch_rows()
|
|
_dbg('prepare_jobs: %d rows, checktypes=%s' % (len(rows), config.watchd.checktypes))
|
|
checktypes = config.watchd.checktypes
|
|
|
|
# Build target pools for each checktype
|
|
target_pools = {}
|
|
for ct in checktypes:
|
|
if ct == 'irc':
|
|
target_pools[ct] = config.servers
|
|
_dbg('target_pool[irc]: %d servers' % len(config.servers))
|
|
elif ct == 'judges':
|
|
# Filter out judges in cooldown (blocked/rate-limited)
|
|
all_judges = list(judges.keys())
|
|
available = judge_stats.get_available_judges(all_judges)
|
|
target_pools[ct] = available if available else all_judges
|
|
elif ct == 'ssl':
|
|
target_pools[ct] = ssl_targets
|
|
_dbg('target_pool[ssl]: %d targets' % len(ssl_targets))
|
|
else: # head
|
|
target_pools[ct] = list(regexes.keys())
|
|
_dbg('target_pool[%s]: %d targets' % (ct, len(regexes)))
|
|
|
|
# create all jobs first, then shuffle for interleaving
|
|
all_jobs = []
|
|
new_states = []
|
|
|
|
for row in rows:
|
|
# create shared state for this proxy
|
|
# row: ip, port, proto, failed, success_count, total_duration,
|
|
# country, mitm, consecutive_success, asn, proxy
|
|
state = ProxyTestState(
|
|
row[0], row[1], row[2], row[3], row[4], row[5],
|
|
row[6], row[7], row[8], asn=row[9],
|
|
oldies=self.isoldies, completion_queue=self.completion_queue,
|
|
proxy_full=row[10]
|
|
)
|
|
new_states.append(state)
|
|
|
|
# randomly select checktype for this proxy
|
|
checktype = random.choice(checktypes)
|
|
target_pool = target_pools[checktype]
|
|
|
|
# select single target (single-target mode)
|
|
target = random.choice(target_pool)
|
|
|
|
# create job for this proxy
|
|
job = TargetTestJob(state, target, checktype)
|
|
all_jobs.append(job)
|
|
|
|
# shuffle to interleave tests across different proxies
|
|
random.shuffle(all_jobs)
|
|
|
|
# track pending states (dict for O(1) lookup/removal)
|
|
with self.pending_lock:
|
|
for state in new_states:
|
|
self.pending_states[state.proxy] = state
|
|
|
|
# queue all jobs with priority
|
|
for job in all_jobs:
|
|
priority = calculate_priority(
|
|
job.proxy_state.failcount,
|
|
job.proxy_state.success_count,
|
|
config.watchd.max_fail
|
|
)
|
|
self.job_queue.put(job, priority)
|
|
|
|
self._close_db()
|
|
proxy_count = len(new_states)
|
|
job_count = len(all_jobs)
|
|
_dbg('prepare_jobs: created %d jobs for %d proxies' % (job_count, proxy_count))
|
|
now = time.time()
|
|
if proxy_count > 0 and (now - self.last_jobs_log) >= self.log_interval:
|
|
_log("created %d jobs for %d proxies" % (job_count, proxy_count), 'watchd')
|
|
self.last_jobs_log = now
|
|
return job_count
|
|
|
|
def collect_work(self):
|
|
# process completed states from completion queue (event-driven, not polling)
|
|
# ProxyTestState.record_result() pushes to completion_queue when all targets done
|
|
completed_count = 0
|
|
while True:
|
|
try:
|
|
state = self.completion_queue.get_nowait()
|
|
completed_count += 1
|
|
# evaluate and record stats
|
|
success, category = state.evaluate()
|
|
self.stats.record(success, category, state.proto, state.last_latency_ms,
|
|
state.country, state.asn,
|
|
ssl_test=state.had_ssl_test, mitm=(state.mitm > 0),
|
|
cert_error=state.cert_error)
|
|
self.collected.append(state)
|
|
# remove from pending dict (O(1))
|
|
with self.pending_lock:
|
|
self.pending_states.pop(state.proxy, None)
|
|
except Queue.Empty:
|
|
break
|
|
|
|
def collect_unfinished(self):
|
|
# drain any remaining jobs from job queue
|
|
unfinished_count = 0
|
|
while True:
|
|
try:
|
|
self.job_queue.get_nowait()
|
|
unfinished_count += 1
|
|
except Queue.Empty:
|
|
break
|
|
if unfinished_count > 0:
|
|
_log("discarded %d unfinished jobs" % unfinished_count, "watchd")
|
|
# note: corresponding ProxyTestStates will be incomplete
|
|
# they'll be re-tested in the next cycle
|
|
|
|
def submit_collected(self):
|
|
if len(self.collected) == 0:
|
|
return True
|
|
sc = 0
|
|
dead_count = 0
|
|
args = []
|
|
latency_updates = [] # (proxy, latency_ms) for successful tests
|
|
max_fail = config.watchd.max_fail
|
|
_dbg('submit_collected: %d jobs' % len(self.collected))
|
|
for job in self.collected:
|
|
if job.failcount == 0:
|
|
sc += 1
|
|
_dbg('submit OK: failcount=0', job.proxy)
|
|
if job.last_latency_ms is not None:
|
|
latency_updates.append((job.proxy, job.last_latency_ms))
|
|
# Check if proxy should be marked as permanently dead
|
|
effective_failcount = job.failcount
|
|
if job.failcount > 0:
|
|
is_fatal = job.last_fail_category in FATAL_ERROR_CATEGORIES
|
|
# Fatal errors (refused/unreachable/auth) = immediately dead
|
|
if is_fatal:
|
|
effective_failcount = DEAD_PROXY
|
|
dead_count += 1
|
|
# Non-fatal: mark dead if exceeded max_fail*2
|
|
elif job.failcount >= max_fail * 2:
|
|
effective_failcount = DEAD_PROXY
|
|
dead_count += 1
|
|
args.append((effective_failcount, job.checktime, 1, job.country, job.proto,
|
|
job.success_count, job.total_duration, job.mitm,
|
|
job.consecutive_success, job.asn, job.proxy))
|
|
|
|
success_rate = (float(sc) / len(self.collected)) * 100
|
|
ret = True
|
|
if len(self.collected) >= 100 and success_rate <= config.watchd.outage_threshold and self.tor_safeguard:
|
|
_log("WATCHD %.2f%% SUCCESS RATE - tor circuit blocked? won't submit fails" % success_rate, "ERROR")
|
|
if sc == 0:
|
|
return False
|
|
args = []
|
|
latency_updates = []
|
|
for job in self.collected:
|
|
if job.failcount == 0:
|
|
args.append((job.failcount, job.checktime, 1, job.country, job.proto,
|
|
job.success_count, job.total_duration, job.mitm,
|
|
job.consecutive_success, job.asn, job.proxy))
|
|
if job.last_latency_ms is not None:
|
|
latency_updates.append((job.proxy, job.last_latency_ms))
|
|
ret = False
|
|
|
|
now = time.time()
|
|
if (now - self.last_update_log) >= self.log_interval or success_rate > 0:
|
|
dead_msg = ', %d marked dead' % dead_count if dead_count > 0 else ''
|
|
_log("updating %d DB entries (success rate: %.2f%%%s)" % (len(self.collected), success_rate, dead_msg), 'watchd')
|
|
self.last_update_log = now
|
|
self._prep_db()
|
|
query = 'UPDATE proxylist SET failed=?,tested=?,dronebl=?,country=?,proto=?,success_count=?,total_duration=?,mitm=?,consecutive_success=?,asn=? WHERE proxy=?'
|
|
self.mysqlite.executemany(query, args)
|
|
|
|
# Batch update latency metrics for successful proxies
|
|
if latency_updates:
|
|
dbs.batch_update_proxy_latency(self.mysqlite, latency_updates)
|
|
|
|
# Batch update anonymity for proxies with exit IP data
|
|
anonymity_updates = [(job.proxy, job.exit_ip, job.ip, job.reveals_headers)
|
|
for job in self.collected
|
|
if job.failcount == 0 and job.exit_ip]
|
|
if anonymity_updates:
|
|
dbs.batch_update_proxy_anonymity(self.mysqlite, anonymity_updates)
|
|
|
|
self.mysqlite.commit()
|
|
self._close_db()
|
|
self.collected = []
|
|
self.totals['submitted'] += len(args)
|
|
self.totals['success'] += sc
|
|
return ret
|
|
|
|
def cleanup_stale(self):
|
|
"""Remove proxies that have been dead for too long."""
|
|
stale_seconds = config.watchd.stale_days * 86400
|
|
cutoff = int(time.time()) - stale_seconds
|
|
with self._db_context() as db:
|
|
# delete proxies that: (failed >= max_fail OR permanently dead) AND last tested before cutoff
|
|
result = db.execute(
|
|
'DELETE FROM proxylist WHERE (failed >= ? OR failed = ?) AND tested < ?',
|
|
(config.watchd.max_fail, DEAD_PROXY, cutoff)
|
|
)
|
|
count = result.rowcount if hasattr(result, 'rowcount') else 0
|
|
db.commit()
|
|
if count > 0:
|
|
_log('removed %d stale proxies (not seen in %d days)' % (count, config.watchd.stale_days), 'watchd')
|
|
self.last_cleanup = time.time()
|
|
|
|
def start(self):
|
|
if config.watchd.threads == 1 and _run_standalone:
|
|
return self._run()
|
|
else:
|
|
return self._run_background()
|
|
|
|
def run(self):
|
|
if self.in_background:
|
|
while 1: time.sleep(1)
|
|
|
|
def _run_background(self):
|
|
self.in_background = True
|
|
t = threading.Thread(target=self._run)
|
|
t.start()
|
|
|
|
def get_runtime_stats(self):
|
|
"""Return runtime statistics for the dashboard.
|
|
|
|
Returns:
|
|
dict with tested, passed, success_rate, rate, uptime_seconds,
|
|
threads, failures, tor_pool, and engine stats.
|
|
"""
|
|
stats_data = {}
|
|
|
|
# Stats from Stats object
|
|
with self.stats.lock:
|
|
stats_data['tested'] = self.stats.tested
|
|
stats_data['passed'] = self.stats.passed
|
|
stats_data['failed'] = self.stats.failed
|
|
elapsed = time.time() - self.stats.start_time
|
|
stats_data['uptime_seconds'] = int(elapsed)
|
|
stats_data['rate'] = try_div(self.stats.tested, elapsed)
|
|
stats_data['success_rate'] = try_div(self.stats.passed * 100.0, self.stats.tested)
|
|
stats_data['failures'] = dict(self.stats.fail_categories)
|
|
# New: Protocol breakdown
|
|
stats_data['by_proto'] = dict(self.stats.by_proto)
|
|
# New: Rate history for sparklines
|
|
stats_data['rate_history'] = list(self.stats.rate_history)
|
|
stats_data['success_rate_history'] = list(self.stats.success_rate_history)
|
|
# New: Peak rate
|
|
stats_data['peak_rate'] = self.stats.peak_rate
|
|
|
|
# New: Recent rate (last 60s) and avg latency
|
|
stats_data['recent_rate'] = self.stats.get_recent_rate()
|
|
stats_data['recent_success_rate'] = self.stats.get_recent_success_rate()
|
|
stats_data['avg_latency'] = self.stats.get_avg_latency()
|
|
|
|
# Latency stats
|
|
stats_data['min_latency'] = self.stats.min_latency if self.stats.min_latency != float('inf') else 0
|
|
stats_data['max_latency'] = self.stats.max_latency
|
|
stats_data['latency_percentiles'] = self.stats.get_latency_percentiles()
|
|
stats_data['latency_histogram'] = self.stats.get_latency_histogram()
|
|
|
|
# Protocol performance (tested, passed, rate per protocol)
|
|
stats_data['proto_stats'] = self.stats.get_proto_stats()
|
|
|
|
# Geographic distribution from session
|
|
stats_data['top_countries_session'] = self.stats.get_top_countries(10)
|
|
stats_data['top_asns_session'] = self.stats.get_top_asns(10)
|
|
|
|
# SSL/TLS stats
|
|
with self.stats.lock:
|
|
ssl_tested = self.stats.ssl_tested
|
|
ssl_passed = self.stats.ssl_passed
|
|
mitm_stats = mitm_cert_stats.get_stats()
|
|
stats_data['ssl'] = {
|
|
'tested': ssl_tested,
|
|
'passed': ssl_passed,
|
|
'failed': self.stats.ssl_failed,
|
|
'fail_categories': dict(self.stats.ssl_fail_categories),
|
|
'success_rate': try_div(ssl_passed * 100.0, ssl_tested) if ssl_tested > 0 else 0,
|
|
'mitm_detected': self.stats.mitm_detected,
|
|
'cert_errors': self.stats.cert_errors,
|
|
'mitm_unique_certs': mitm_stats.get('unique_certs', 0),
|
|
'mitm_unique_proxies': mitm_stats.get('unique_proxies', 0)
|
|
}
|
|
|
|
# Thread info
|
|
stats_data['threads'] = len(self.threads)
|
|
stats_data['min_threads'] = self.scaler.min_threads
|
|
stats_data['max_threads'] = self.scaler.max_threads
|
|
stats_data['queue_size'] = self.job_queue.qsize()
|
|
stats_data['checktype'] = ','.join(config.watchd.checktypes)
|
|
stats_data['checktypes'] = config.watchd.checktypes
|
|
stats_data['profiling'] = (
|
|
getattr(config.args, 'profile', False) if hasattr(config, 'args') else False
|
|
) or getattr(config.common, 'profiling', False)
|
|
stats_data['pass_rate'] = try_div(self.stats.passed, elapsed)
|
|
|
|
# Tor pool stats
|
|
pool = connection_pool.get_pool()
|
|
if pool:
|
|
pool_stats = pool.get_stats()
|
|
hosts_list = []
|
|
healthy_count = 0
|
|
latency_sum = 0.0
|
|
latency_count = 0
|
|
for h in pool_stats.get('hosts', []):
|
|
is_healthy = h.get('available', False)
|
|
lat_ms = h.get('avg_latency', 0) * 1000 if h.get('avg_latency') else None
|
|
hosts_list.append({
|
|
'address': h.get('host', ''),
|
|
'healthy': is_healthy,
|
|
'latency_ms': lat_ms,
|
|
'success_rate': h.get('success_rate', 0),
|
|
})
|
|
if is_healthy:
|
|
healthy_count += 1
|
|
if lat_ms and lat_ms > 0:
|
|
latency_sum += lat_ms
|
|
latency_count += 1
|
|
pool_info = {
|
|
'hosts': hosts_list,
|
|
'total_requests': pool_stats.get('total_requests', 0),
|
|
'success_rate': pool_stats.get('success_rate', 0),
|
|
'healthy_count': healthy_count,
|
|
'total_count': len(hosts_list),
|
|
'avg_latency': latency_sum / latency_count if latency_count > 0 else 0,
|
|
}
|
|
stats_data['tor_pool'] = pool_info
|
|
else:
|
|
stats_data['tor_pool'] = {
|
|
'hosts': [], 'total_requests': 0, 'success_rate': 0,
|
|
'healthy_count': 0, 'total_count': 0, 'avg_latency': 0
|
|
}
|
|
|
|
# Judge stats (when using judges checktype)
|
|
if 'judges' in config.watchd.checktypes:
|
|
js = judge_stats.get_stats()
|
|
stats_data['judges'] = {
|
|
'total': js.get('total', 0),
|
|
'available': js.get('available', 0),
|
|
'in_cooldown': js.get('in_cooldown', 0),
|
|
'top_judges': js.get('top', [])[:5] # top 5 most successful
|
|
}
|
|
else:
|
|
stats_data['judges'] = None
|
|
|
|
# Scraper/engine stats
|
|
if scraper_available:
|
|
scraper_stats = scraper_module.get_scraper_stats()
|
|
if scraper_stats:
|
|
stats_data['engines_available'] = scraper_stats.get('available', 0)
|
|
stats_data['engines_total'] = scraper_stats.get('total', 0)
|
|
stats_data['engines_backoff'] = scraper_stats.get('in_backoff', 0)
|
|
stats_data['scraper'] = scraper_stats
|
|
else:
|
|
stats_data['engines_available'] = 0
|
|
stats_data['engines_total'] = 0
|
|
stats_data['engines_backoff'] = 0
|
|
stats_data['scraper'] = None
|
|
else:
|
|
stats_data['engines_available'] = 0
|
|
stats_data['engines_total'] = 0
|
|
stats_data['engines_backoff'] = 0
|
|
stats_data['scraper'] = None
|
|
|
|
# Network usage stats
|
|
stats_data['network'] = network_stats.get_stats()
|
|
|
|
# MITM certificate stats (detailed)
|
|
stats_data['mitm'] = mitm_cert_stats.get_stats()
|
|
|
|
return stats_data
|
|
|
|
def _run(self):
|
|
_log('starting...', 'watchd')
|
|
_log('config: db=%s checktype=%s threads=%d checktime=%d perfail=%d max_fail=%d' % (
|
|
config.watchd.database, ','.join(config.watchd.checktypes), config.watchd.threads,
|
|
config.watchd.checktime, config.watchd.perfail_checktime, config.watchd.max_fail), 'watchd')
|
|
|
|
# Log database status at startup
|
|
self._prep_db()
|
|
total = self.mysqlite.execute('SELECT COUNT(*) FROM proxylist').fetchone()[0]
|
|
now = time.time()
|
|
due = self.mysqlite.execute(
|
|
'SELECT COUNT(*) FROM proxylist WHERE failed >= 0 and failed < ? and (tested + ? + (failed * ?)) < ?',
|
|
(config.watchd.max_fail, config.watchd.checktime, config.watchd.perfail_checktime, now)
|
|
).fetchone()[0]
|
|
|
|
# Create stats persistence tables
|
|
dbs.create_table_if_not_exists(self.mysqlite, 'stats_history')
|
|
dbs.create_table_if_not_exists(self.mysqlite, 'session_state')
|
|
|
|
# Load persisted session state
|
|
saved_state = dbs.load_session_state(self.mysqlite)
|
|
if saved_state:
|
|
self.stats.load_state(saved_state)
|
|
|
|
self._close_db()
|
|
|
|
# Load MITM certificate state (same directory as database)
|
|
db_dir = os.path.dirname(config.watchd.database) or '.'
|
|
self.mitm_state_file = os.path.join(db_dir, 'mitm_certs.json')
|
|
mitm_cert_stats.load_state(self.mitm_state_file)
|
|
_log('database: %d total proxies, %d due for testing' % (total, due), 'watchd')
|
|
|
|
# Initialize Tor connection pool
|
|
connection_pool.init_pool(config.torhosts, warmup=True)
|
|
|
|
# Start HTTP API server if enabled
|
|
if config.httpd.enabled:
|
|
from httpd import ProxyAPIServer
|
|
self.httpd_server = ProxyAPIServer(
|
|
config.httpd.listenip,
|
|
config.httpd.port,
|
|
config.watchd.database,
|
|
stats_provider=self.get_runtime_stats
|
|
)
|
|
self.httpd_server.start()
|
|
|
|
# create worker threads with shared queues
|
|
for i in range(config.watchd.threads):
|
|
threadid = ''.join([random.choice(string.ascii_letters) for x in range(5)])
|
|
wt = WorkerThread(threadid, self.job_queue)
|
|
if self.in_background:
|
|
wt.start_thread()
|
|
self.threads.append(wt)
|
|
time.sleep(random.random() / 10)
|
|
|
|
sleeptime = 0
|
|
while True:
|
|
|
|
if self.stopping.is_set():
|
|
if self.in_background: self._cleanup()
|
|
break
|
|
|
|
if sleeptime > 0:
|
|
time.sleep(1)
|
|
sleeptime -= 1
|
|
continue
|
|
|
|
# check if job queue is empty (work-stealing: threads pull as needed)
|
|
if self.job_queue.empty():
|
|
self.collect_work()
|
|
if not self.submit_collected() and self.tor_safeguard:
|
|
_log("zzZzZzzZ sleeping 1 minute(s) due to tor issues", "watchd")
|
|
sleeptime = 60
|
|
else:
|
|
job_count = self.prepare_jobs()
|
|
if job_count == 0:
|
|
# no jobs available, wait before checking again
|
|
sleeptime = 10
|
|
|
|
if not self.in_background: # single_thread scenario
|
|
self.threads[0].workloop()
|
|
|
|
self.collect_work()
|
|
|
|
if len(self.collected) > self.submit_after:
|
|
if not self.submit_collected() and self.tor_safeguard:
|
|
_log("zzZzZzzZ sleeping 1 minute(s) due to tor issues", "watchd")
|
|
sleeptime = 60
|
|
|
|
# Update rate history for sparklines
|
|
self.stats.update_history()
|
|
|
|
# periodic stats report
|
|
if self.stats.should_report(config.watchd.stats_interval):
|
|
_log(self.stats.report(), 'stats')
|
|
# Also report pool stats
|
|
pool = connection_pool.get_pool()
|
|
if pool:
|
|
_log(pool.status_line(), 'stats')
|
|
# Report judge stats (when using judges checktype)
|
|
if 'judges' in config.watchd.checktypes:
|
|
_log(judge_stats.status_line(), 'stats')
|
|
# Report scaler status
|
|
_log(self.scaler.status_line(len(self.threads), self.job_queue.qsize()), 'scaler')
|
|
|
|
# Save session state periodically (every stats_interval, default 5m)
|
|
try:
|
|
with self._db_context() as db:
|
|
dbs.save_session_state(db, self.stats)
|
|
except Exception as e:
|
|
_log('failed to save session state: %s' % str(e), 'warn')
|
|
|
|
# Save MITM certificate stats periodically
|
|
try:
|
|
mitm_cert_stats.save_state(self.mitm_state_file)
|
|
except Exception as e:
|
|
_log('failed to save MITM state: %s' % str(e), 'warn')
|
|
|
|
# Hourly stats snapshot
|
|
now = time.time()
|
|
if not hasattr(self, '_last_snapshot'):
|
|
self._last_snapshot = now
|
|
if (now - self._last_snapshot) >= 3600:
|
|
try:
|
|
with self._db_context() as db:
|
|
dbs.save_stats_snapshot(db, self.stats)
|
|
self._last_snapshot = now
|
|
except Exception as e:
|
|
_log('failed to save stats snapshot: %s' % str(e), 'warn')
|
|
|
|
# Dynamic thread scaling
|
|
if self.in_background:
|
|
target = self.scaler.should_scale(
|
|
len(self.threads),
|
|
self.job_queue.qsize(),
|
|
self.stats
|
|
)
|
|
if target != len(self.threads):
|
|
self._adjust_threads(target)
|
|
|
|
# periodic stale proxy cleanup (daily)
|
|
if (time.time() - self.last_cleanup) >= 86400:
|
|
self.cleanup_stale()
|
|
|
|
time.sleep(1)
|
|
|
|
|
|
if __name__ == '__main__':
|
|
_run_standalone = True
|
|
|
|
config.load()
|
|
errors = config.validate()
|
|
if errors:
|
|
for e in errors:
|
|
_log(e, 'error')
|
|
import sys
|
|
sys.exit(1)
|
|
|
|
w = Proxywatchd()
|
|
try:
|
|
w.start()
|
|
w.run()
|
|
except KeyboardInterrupt as e:
|
|
pass
|
|
finally:
|
|
w.stop()
|
|
w.finish()
|