Files
ppf/proxywatchd.py
Username 1cb7d93a5f proxywatchd: add ssl_only mode and schedule improvements
- ssl_only mode: skip secondary check when SSL handshake fails
- _build_due_sql(): unified query for proxies due testing
- working_checktime/fail_retry_interval: new schedule formula
- fail_retry_backoff: linear backoff option for failing proxies
2026-01-08 09:05:25 +01:00

1837 lines
74 KiB
Python

#!/usr/bin/env python2
from __future__ import division
# Gevent monkey-patching MUST happen before any other imports
# This patches threading, socket, time, etc. for cooperative concurrency
from gevent import monkey
monkey.patch_all()
import threading # Now patched by gevent
import time
import random
import string
import re
import signal
import network_stats
import os
import ssl
import contextlib
try:
import Queue
except ImportError:
import queue as Queue
try:
import IP2Location
geodb = IP2Location.IP2Location(os.path.join("data", "IP2LOCATION-LITE-DB1.BIN"))
geolite = True
except (ImportError, IOError, ValueError):
geolite = False
try:
import pyasn
asndb = pyasn.pyasn(os.path.join("data", "ipasn.dat"))
asn_enabled = True
except (ImportError, IOError, NameError):
asndb = None
asn_enabled = False
from config import Config
import mysqlite
import dbs
import dns
from misc import _log, categorize_error, tor_proxy_url, is_ssl_protocol_error
import rocksock
import connection_pool
from stats import JudgeStats, Stats, regexes, ssl_targets, try_div
from mitm import MITMCertStats, extract_cert_info, get_mitm_certificate
from dns import socks4_resolve
from job import PriorityJobQueue, calculate_priority
# Optional scraper integration for engine stats
try:
import scraper as scraper_module
scraper_available = True
except ImportError:
scraper_module = None
scraper_available = False
config = Config()
def set_config(cfg):
"""Set the config object (used when imported from ppf.py)."""
global config
config = cfg
dns.set_config(cfg)
_run_standalone = False
# Debug mode for proxy check path - set via PPF_DEBUG env or config
_debug_proxy = os.environ.get('PPF_DEBUG', '').lower() in ('1', 'true', 'proxy')
# Sampling debug: print detailed diagnostics for every Nth test
_sample_debug_interval = 50 # Print debug for every 50th test (lowered for diagnosis)
_sample_debug_counter = 0
_sample_debug_lock = threading.Lock()
def _dbg(msg, proxy=None):
"""Debug log for proxy check path. Only logs when PPF_DEBUG=1."""
if _debug_proxy:
prefix = '[%s] ' % proxy if proxy else ''
# Use 'dbg' category (shows at info level) instead of 'debug' (filtered by default)
_log('%s%s' % (prefix, msg), 'dbg')
def _sample_dbg(msg, proxy=None, force=False):
"""Sampled debug: log every Nth test for diagnostics without flooding."""
global _sample_debug_counter
should_log = force
if not should_log:
with _sample_debug_lock:
_sample_debug_counter += 1
if _sample_debug_counter >= _sample_debug_interval:
_sample_debug_counter = 0
should_log = True
if should_log:
prefix = '[SAMPLE %s] ' % proxy if proxy else '[SAMPLE] '
_log('%s%s' % (prefix, msg), 'diag')
def _build_due_sql():
"""Build SQL WHERE clause for proxies due for testing.
Returns (sql_condition, params) using new schedule formula:
- failed=0: tested + working_checktime < now
- failed>0 with backoff: tested + (failed * fail_retry_interval) < now
- failed>0 without backoff: tested + fail_retry_interval < now
"""
now = time.time()
if config.watchd.fail_retry_backoff:
# Linear backoff: multiply interval by failure count
sql = '''failed >= 0 AND failed < ? AND (
tested IS NULL OR
CASE WHEN failed = 0
THEN tested + ? < ?
ELSE tested + (failed * ?) < ?
END
)'''
params = (config.watchd.max_fail, config.watchd.working_checktime, now,
config.watchd.fail_retry_interval, now)
else:
# Fixed interval: same delay regardless of failure count
sql = '''failed >= 0 AND failed < ? AND (
tested IS NULL OR
CASE WHEN failed = 0
THEN tested + ? < ?
ELSE tested + ? < ?
END
)'''
params = (config.watchd.max_fail, config.watchd.working_checktime, now,
config.watchd.fail_retry_interval, now)
return sql, params
# IP pattern for judge services (validates response contains valid IP in body)
IP_PATTERN = r'\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}'
def is_valid_ip(ip_str):
"""Validate IP address octets are 0-255."""
try:
parts = ip_str.split('.')
return len(parts) == 4 and all(0 <= int(p) <= 255 for p in parts)
except (ValueError, AttributeError):
return False
# Pattern for header echo - if X-Forwarded-For or Via present, proxy reveals chain
HEADER_REVEAL_PATTERN = r'(X-Forwarded-For|Via|X-Real-Ip|Forwarded)'
# Dead proxy marker - proxies with failed=-1 are permanently dead and never retested
DEAD_PROXY = -1
# Error categories that indicate proxy is definitely dead (not temporary failure)
FATAL_ERROR_CATEGORIES = ('refused', 'unreachable', 'auth')
# Patterns indicating judge is blocking the proxy (not a proxy failure)
# These should NOT count as proxy failures - retry with different judge
JUDGE_BLOCK_PATTERNS = [
r'HTTP/1\.[01] 403', # Forbidden
r'HTTP/1\.[01] 429', # Too Many Requests
r'HTTP/1\.[01] 503', # Service Unavailable
r'captcha', # Captcha challenge
r'challenge', # Generic challenge
r'verify you are human', # Human verification
r'rate.?limit', # Rate limiting
r'too many requests', # Rate limiting
r'access.?denied', # Access denied
r'blocked', # Explicit block
r'Checking your browser', # Cloudflare JS challenge
]
JUDGE_BLOCK_RE = re.compile('|'.join(JUDGE_BLOCK_PATTERNS), re.IGNORECASE)
# Check types: irc, http (header match), judges (body match), ssl (TLS handshake)
# Judge services - return IP in body (plain text, JSON, or HTML)
judges = {
# Plain text IP
'api.ipify.org': IP_PATTERN,
'icanhazip.com': IP_PATTERN,
'checkip.amazonaws.com': IP_PATTERN,
'ifconfig.me/ip': IP_PATTERN,
'ipinfo.io/ip': IP_PATTERN,
'ip.me': IP_PATTERN,
'myip.dnsomatic.com': IP_PATTERN,
'ident.me': IP_PATTERN,
'ipecho.net/plain': IP_PATTERN,
'wtfismyip.com/text': IP_PATTERN,
'eth0.me': IP_PATTERN,
'l2.io/ip': IP_PATTERN,
'tnx.nl/ip': IP_PATTERN,
'wgetip.com': IP_PATTERN,
'curlmyip.net': IP_PATTERN,
# JSON responses (IP pattern matches within JSON)
'httpbin.org/ip': IP_PATTERN,
'ip-api.com/json': IP_PATTERN,
'ipapi.co/json': IP_PATTERN,
'ipwhois.app/json': IP_PATTERN,
# Header echo - for elite detection (check if proxy adds revealing headers)
'httpbin.org/headers': IP_PATTERN, # returns request headers as JSON
}
# Global instances
judge_stats = JudgeStats()
mitm_cert_stats = MITMCertStats()
class ThreadScaler(object):
"""Dynamic thread scaling based on queue depth and success rate.
With gevent greenlets, we can scale much more aggressively than with
OS threads since greenlets are cooperative and lightweight.
Scales up when:
- Queue depth exceeds high watermark
- Success rate is above threshold
Scales down when:
- Queue is nearly empty
- Success rate drops below threshold
"""
def __init__(self, min_threads=5, max_threads=100, target_queue_per_thread=5,
scale_cooldown=10, scale_threshold=10.0):
self.min_threads = min_threads
self.max_threads = max_threads
self.target_queue_per_thread = target_queue_per_thread
self.last_scale_time = 0
self.scale_cooldown = scale_cooldown
self.success_threshold = scale_threshold
def should_scale(self, current_threads, queue_size, stats):
"""Determine if scaling is needed.
Args:
current_threads: Current number of active threads
queue_size: Current job queue depth
stats: Stats object with success/fail counts
Returns:
int: Target thread count (may be same as current)
"""
now = time.time()
if (now - self.last_scale_time) < self.scale_cooldown:
return current_threads
# Calculate current success rate
with stats.lock:
total = stats.tested
passed = stats.passed
success_rate = try_div(passed * 100.0, total) if total > 0 else 50.0
# Calculate ideal thread count based on queue depth
# Never spawn more threads than jobs available
ideal = max(self.min_threads, queue_size // self.target_queue_per_thread)
ideal = min(ideal, self.max_threads, max(self.min_threads, queue_size))
target = current_threads
# Scale up: queue is deep - scale based on queue depth only
# With greenlets, scale more aggressively (+5 instead of +2)
if queue_size > current_threads * self.target_queue_per_thread:
target = min(current_threads + 5, ideal, self.max_threads)
# Scale down: queue is shallow
elif queue_size < current_threads * 2:
# Scale down faster when threads far exceed ideal
reduction = 2 if current_threads <= ideal * 2 else 5
target = max(current_threads - reduction, ideal, self.min_threads)
if target != current_threads:
self.last_scale_time = now
return target
def status_line(self, current_threads, queue_size):
"""Return status string for logging."""
return 'threads=%d queue=%d target_per_thread=%d' % (
current_threads, queue_size, self.target_queue_per_thread)
class ProxyTestState(object):
"""Thread-safe state for a proxy being tested.
Holds test results and evaluates final pass/fail status.
"""
__slots__ = (
'ip', 'port', 'proxy', 'auth', 'proto', 'failcount', 'checktime',
'success_count', 'total_duration', 'country', 'mitm', 'consecutive_success',
'asn', 'isoldies', 'completion_queue', 'lock', 'results', 'completed',
'evaluated', 'last_latency_ms', 'exit_ip', 'reveals_headers',
'last_fail_category', 'original_failcount', 'had_ssl_test', 'ssl_success',
'cert_error'
)
def __init__(self, ip, port, proto, failcount, success_count, total_duration,
country, mitm, consecutive_success, asn=None, oldies=False,
completion_queue=None, proxy_full=None):
self.ip = ip
self.port = int(port)
self.proxy = '%s:%s' % (ip, port)
# Parse auth credentials from full proxy string (user:pass@ip:port)
self.auth = None
if proxy_full and '@' in str(proxy_full):
auth_part = str(proxy_full).rsplit('@', 1)[0]
if ':' in auth_part:
self.auth = auth_part # user:pass
self.proto = proto
self.failcount = failcount
self.checktime = None
self.success_count = success_count
self.total_duration = total_duration
self.country = country
self.mitm = mitm
self.consecutive_success = consecutive_success
self.asn = asn
self.isoldies = oldies
self.completion_queue = completion_queue # for signaling completion
# thread-safe result accumulation
self.lock = threading.Lock()
self.results = [] # list of (success, proto, duration, srv, tor, ssl)
self.completed = False
self.evaluated = False # for evaluate() idempotency
self.last_latency_ms = None # average latency from successful tests
self.exit_ip = None # IP seen by target server (for anonymity detection)
self.reveals_headers = None # True if proxy adds X-Forwarded-For/Via headers
self.last_fail_category = None # failure category from last test (for dead detection)
self.original_failcount = failcount # failcount before this test cycle
# SSL/TLS tracking
self.had_ssl_test = False
self.ssl_success = False
self.cert_error = False
def record_result(self, success, proto=None, duration=0, srv=None, tor=None, ssl=None, category=None, exit_ip=None, reveals_headers=None):
"""Record a single target test result. Thread-safe.
When all target tests complete, signals via completion_queue.
"""
_dbg('record: success=%s proto=%s srv=%s cat=%s' % (success, proto, srv, category), self.proxy)
should_signal = False
with self.lock:
self.results.append({
'success': success,
'proto': proto,
'duration': duration,
'srv': srv,
'tor': tor,
'ssl': ssl,
'category': category,
'exit_ip': exit_ip,
'reveals_headers': reveals_headers
})
# Track SSL tests
if ssl:
self.had_ssl_test = True
if success:
self.ssl_success = True
# Track cert errors
if category in ('cert_error', 'ssl_error', 'ssl_mitm'):
self.cert_error = True
# Check completion (single-target mode)
if not self.completed and len(self.results) >= 1:
self.completed = True
should_signal = True
# Signal outside lock to avoid deadlock
if should_signal and self.completion_queue is not None:
self.completion_queue.put(self)
def is_complete(self):
"""Check if all target tests have finished."""
# Fast path: check completed flag without lock (atomic read)
if self.completed:
return True
# Slow path: check with lock (only during transition)
with self.lock:
return len(self.results) >= 1
@staticmethod
def rwip(ip):
"""Remove leading zeros from IP octets (e.g., 192.168.001.010 -> 192.168.1.10)."""
return '.'.join(str(int(b)) for b in ip.split('.'))
def evaluate(self):
"""Evaluate results after all tests complete.
Returns:
(success, category) tuple where success is bool and category is
the dominant failure type (or None on success)
"""
with self.lock:
if self.evaluated:
# Already evaluated - return cached result
return (self.failcount == 0, self.last_fail_category)
self.evaluated = True
self.checktime = int(time.time())
successes = [r for r in self.results if r['success']]
failures = [r for r in self.results if not r['success']]
num_success = len(successes)
_dbg('evaluate: %d success, %d fail, results=%d' % (num_success, len(failures), len(self.results)), self.proxy)
# Determine dominant failure category
fail_category = None
if failures:
cats = {}
for f in failures:
cat = f.get('category') or 'other'
cats[cat] = cats.get(cat, 0) + 1
if cats:
fail_category = max(cats.keys(), key=lambda k: cats[k])
# require success (single target mode)
if num_success >= 1:
# use last successful result for metrics
last_good = successes[-1]
# Extract exit IP and header info from successful judge responses
for s in successes:
if s.get('exit_ip'):
self.exit_ip = s['exit_ip']
if s.get('reveals_headers') is not None:
self.reveals_headers = s['reveals_headers']
if geolite and self.country is None:
self.ip = self.rwip(self.ip)
rec = geodb.get_all(self.ip)
if rec is not None and rec.country_short:
self.country = rec.country_short
if asn_enabled and self.asn is None:
try:
asn_result = asndb.lookup(self.ip)
if asn_result and asn_result[0]:
self.asn = asn_result[0]
except Exception as e:
if config.watchd.debug:
_log('ASN lookup failed for %s: %s' % (self.ip, e), 'debug')
self.proto = last_good['proto']
self.failcount = 0
# Only reset mitm after 3 consecutive clean successes (not on first success)
# and only if this test didn't detect MITM
if self.consecutive_success > 0 and (self.consecutive_success % 3) == 0 and not self.cert_error:
self.mitm = 0
self.consecutive_success += 1
self.success_count += 1
self.total_duration += int(last_good['duration'] * 1000)
# Calculate average latency from successful tests (in ms)
durations = [s['duration'] for s in successes if s['duration']]
if durations:
self.last_latency_ms = sum(durations) * 1000.0 / len(durations)
torstats = "" if len(config.torhosts) == 1 else ' tor: %s;' % last_good['tor']
# Determine anonymity for status message
# transparent: exit_ip == proxy_ip
# anonymous: exit_ip != proxy_ip, adds revealing headers
# elite: exit_ip != proxy_ip, no revealing headers
anon_status = ''
if self.exit_ip:
if self.exit_ip == self.ip:
anon_status = ' anon: transparent;'
elif self.reveals_headers is False:
anon_status = ' anon: elite;'
elif self.reveals_headers is True:
anon_status = ' anon: anonymous;'
else:
anon_status = ' anon: anonymous;' # default if no header check
_log('%s://%s:%d (%s) d: %.2f sec(s);%s%s srv: %s; ssl: %s' % (
last_good['proto'], self.ip, self.port, self.country,
last_good['duration'], torstats, anon_status, last_good['srv'], str(last_good['ssl'])), 'info')
_dbg('PASS: failcount=0', self.proxy)
return (True, None)
else:
# Real proxy failure
self.failcount += 1
self.consecutive_success = 0
self.last_fail_category = fail_category
_dbg('FAIL: failcount=%d cat=%s' % (self.failcount, fail_category), self.proxy)
return (False, fail_category)
class TargetTestJob(object):
"""Job to test a single proxy against a single target.
Multiple TargetTestJob instances share the same ProxyTestState,
allowing tests to be interleaved with other proxies in the queue.
"""
__slots__ = ('proxy_state', 'target_srv', 'checktype', 'worker_id')
def __init__(self, proxy_state, target_srv, checktype, worker_id=None):
self.proxy_state = proxy_state
self.target_srv = target_srv
self.checktype = checktype
self.worker_id = worker_id
def run(self):
"""Test the proxy against this job's target server."""
# DIAGNOSTIC: Verify run() is being called
global _sample_debug_counter
with _sample_debug_lock:
_sample_debug_counter += 1
if _sample_debug_counter <= 3 or _sample_debug_counter % 50 == 0:
_log('JOB RUN #%d: %s -> %s (%s)' % (_sample_debug_counter,
self.proxy_state.proxy, self.target_srv, self.checktype), 'info')
network_stats.set_category('proxy')
_dbg('test start: %s via %s' % (self.target_srv, self.checktype), self.proxy_state.proxy)
sock, proto, duration, tor, srv, failinc, is_ssl, err_cat = self._connect_and_test()
_dbg('connect result: sock=%s proto=%s err=%s' % (bool(sock), proto, err_cat), self.proxy_state.proxy)
if not sock:
# SSL-only check passed (handshake success, no request needed)
if err_cat == 'ssl_ok':
elapsed = time.time() - duration
self.proxy_state.record_result(
True, proto=proto, duration=elapsed,
srv=srv, tor=tor, ssl=is_ssl
)
elif err_cat == 'ssl_mitm':
# MITM detected - proxy works but intercepts TLS
elapsed = time.time() - duration
self.proxy_state.record_result(
True, proto=proto, duration=elapsed,
srv=srv, tor=tor, ssl=is_ssl
)
self.proxy_state.mitm = 1
else:
self.proxy_state.record_result(False, category=err_cat, ssl=is_ssl)
return
try:
recv = sock.recv(-1)
_sample_dbg('RECV: %d bytes from %s, first 80: %r' % (len(recv), srv, recv[:80]), self.proxy_state.proxy)
# Select regex based on check type (or fallback target)
if 'check.torproject.org' in srv:
# Tor API fallback (judge using torproject.org)
regex = r'"IsTor"\s*:\s*true'
elif self.checktype == 'irc':
regex = '^(:|NOTICE|ERROR)'
elif self.checktype == 'judges':
regex = judges[srv]
elif self.checktype == 'ssl':
# Should not reach here - ssl returns before recv
self.proxy_state.record_result(True, proto=proto, srv=srv, ssl=is_ssl)
return
else: # http
regex = regexes[srv]
_dbg('recv %d bytes, regex=%s' % (len(recv), regex[:30]), self.proxy_state.proxy)
if re.search(regex, recv, re.IGNORECASE):
elapsed = time.time() - duration
_dbg('regex MATCH, elapsed=%.2fs' % elapsed, self.proxy_state.proxy)
# Extract exit IP from judge/tor response
exit_ip = None
reveals_headers = None
if self.checktype == 'judges' or 'check.torproject.org' in srv:
ip_match = re.search(IP_PATTERN, recv)
if ip_match and is_valid_ip(ip_match.group(0)):
exit_ip = ip_match.group(0)
if self.checktype == 'judges' and 'check.torproject.org' not in srv:
# Check for header echo judge (elite detection)
if 'headers' in srv:
# If X-Forwarded-For/Via/etc present, proxy reveals chain
reveals_headers = bool(re.search(HEADER_REVEAL_PATTERN, recv, re.IGNORECASE))
# Record successful judge
judge_stats.record_success(srv)
self.proxy_state.record_result(
True, proto=proto, duration=elapsed,
srv=srv, tor=tor, ssl=is_ssl, exit_ip=exit_ip,
reveals_headers=reveals_headers
)
else:
_dbg('regex NO MATCH, recv[:100]=%r' % recv[:100], self.proxy_state.proxy)
# Check if judge is blocking us (not a proxy failure)
if self.checktype == 'judges' and JUDGE_BLOCK_RE.search(recv):
judge_stats.record_block(srv)
# Judge block = proxy worked, we got HTTP response, just no IP
# Count as success without exit_ip
block_elapsed = time.time() - duration
_dbg('judge BLOCK detected, counting as success', self.proxy_state.proxy)
self.proxy_state.record_result(
True, proto=proto, duration=block_elapsed,
srv=srv, tor=tor, ssl=is_ssl, exit_ip=None,
reveals_headers=None
)
if config.watchd.debug:
_log('judge %s challenged proxy %s (counted as success)' % (
srv, self.proxy_state.proxy), 'debug')
else:
_dbg('FAIL: no match, no block', self.proxy_state.proxy)
if self.checktype == 'judges':
judge_stats.record_failure(srv)
self.proxy_state.record_result(False, category='other')
except KeyboardInterrupt as e:
sock.disconnect()
raise e
except rocksock.RocksockException as e:
self.proxy_state.record_result(False, category=categorize_error(e))
finally:
sock.disconnect()
def _connect_and_test(self):
"""Connect to target through the proxy and send test packet.
If ssl_first is enabled:
1. Try SSL handshake first
2. If SSL succeeds -> return success
3. If SSL fails -> try secondary check (configured checktype)
"""
ps = self.proxy_state
_dbg('_connect_and_test: target=%s checktype=%s ssl_first=%s' % (
self.target_srv, self.checktype, config.watchd.ssl_first), ps.proxy)
# Always log first test to verify code path
global _sample_debug_counter
if _sample_debug_counter == 0:
_log('FIRST TEST: proxy=%s target=%s check=%s ssl_first=%s' % (
ps.proxy, self.target_srv, self.checktype, config.watchd.ssl_first), 'info')
protos = ['http', 'socks5', 'socks4'] if ps.proto is None else [ps.proto]
pool = connection_pool.get_pool()
# Phase 1: SSL handshake (if ssl_first enabled)
if config.watchd.ssl_first:
result = self._try_ssl_handshake(protos, pool)
if result is not None:
return result # SSL succeeded or MITM detected
# SSL failed for all protocols
if config.watchd.ssl_only:
# ssl_only mode: skip secondary check, mark as failed
_dbg('SSL failed, ssl_only mode, skipping secondary check', ps.proxy)
return (None, None, 0, None, None, 1, 0, 'ssl_only')
_dbg('SSL failed, trying secondary check: %s' % self.checktype, ps.proxy)
# Phase 2: Secondary check (configured checktype)
return self._try_secondary_check(protos, pool)
def _try_ssl_handshake(self, protos, pool):
"""Attempt SSL handshake to verify proxy works with TLS.
Returns:
Tuple on success/MITM, None on failure (should try secondary check)
"""
ps = self.proxy_state
ssl_target = random.choice(ssl_targets)
last_error_category = None
for proto in protos:
if pool:
torhost = pool.get_tor_host(self.worker_id)
else:
torhost = random.choice(config.torhosts)
network_stats.set_tor_node(torhost)
if proto == 'socks4':
srv = socks4_resolve(ssl_target, 443)
else:
srv = ssl_target
if not srv:
continue
duration = time.time()
if ps.auth:
proxy_url = '%s://%s@%s:%s' % (proto, ps.auth, ps.ip, ps.port)
else:
proxy_url = '%s://%s:%s' % (proto, ps.ip, ps.port)
proxies = [
rocksock.RocksockProxyFromURL(tor_proxy_url(torhost)),
rocksock.RocksockProxyFromURL(proxy_url),
]
adaptive_timeout = config.watchd.timeout + min(
ps.failcount * config.watchd.timeout_fail_inc,
config.watchd.timeout_fail_max)
try:
sock = rocksock.Rocksock(host=srv, port=443, ssl=1,
proxies=proxies, timeout=adaptive_timeout,
verifycert=True)
_dbg('SSL handshake: proto=%s tor=%s target=%s' % (proto, torhost, ssl_target), ps.proxy)
sock.connect()
# SSL handshake succeeded
elapsed = time.time() - duration
if pool:
pool.record_success(torhost, elapsed)
sock.disconnect()
_dbg('SSL handshake OK', ps.proxy)
return None, proto, duration, torhost, ssl_target, 0, 1, 'ssl_ok'
except rocksock.RocksockException as e:
last_error_category = categorize_error(e)
et = e.get_errortype()
err = e.get_error()
try:
sock.disconnect()
except:
pass
if et == rocksock.RS_ET_SSL and err == rocksock.RS_E_SSL_CERTIFICATE_ERROR:
# MITM detected - proxy works but intercepts TLS
ps.mitm = 1
elapsed = time.time() - duration
if pool:
pool.record_success(torhost, elapsed)
_dbg('SSL MITM detected', ps.proxy)
return None, proto, duration, torhost, ssl_target, 0, 1, 'ssl_mitm'
if config.watchd.debug:
_log('SSL handshake failed: %s://%s:%d: %s' % (
proto, ps.ip, ps.port, e.get_errormessage()), 'debug')
# Check for Tor connection issues
if et == rocksock.RS_ET_OWN:
if e.get_failedproxy() == 0 and err == rocksock.RS_E_TARGET_CONN_REFUSED:
if pool:
pool.record_failure(torhost)
except KeyboardInterrupt:
raise
# All protocols failed SSL
return None
def _try_secondary_check(self, protos, pool):
"""Try the configured secondary checktype (head, judges, irc)."""
ps = self.proxy_state
_sample_dbg('TEST START: proxy=%s target=%s check=%s' % (
ps.proxy, self.target_srv, self.checktype), ps.proxy)
srvname = self.target_srv.strip() if self.checktype == 'irc' else self.target_srv
# For judges, extract host from 'host/path' format
if self.checktype == 'judges' and '/' in srvname:
connect_host = srvname.split('/')[0]
else:
connect_host = srvname
# Secondary checks: always use plain HTTP
use_ssl = 0
verifycert = False
if self.checktype == 'irc':
server_port = 6667
else:
server_port = 80
last_error_category = None
for proto in protos:
if pool:
torhost = pool.get_tor_host(self.worker_id)
else:
torhost = random.choice(config.torhosts)
network_stats.set_tor_node(torhost)
if proto == 'socks4':
srv = socks4_resolve(connect_host, server_port)
else:
srv = connect_host
if not srv:
continue
duration = time.time()
# Build proxy URL, including auth credentials if present
if ps.auth:
proxy_url = '%s://%s@%s:%s' % (proto, ps.auth, ps.ip, ps.port)
else:
proxy_url = '%s://%s:%s' % (proto, ps.ip, ps.port)
proxies = [
rocksock.RocksockProxyFromURL(tor_proxy_url(torhost)),
rocksock.RocksockProxyFromURL(proxy_url),
]
# Adaptive timeout: give proxies with failures more time
adaptive_timeout = config.watchd.timeout + min(
ps.failcount * config.watchd.timeout_fail_inc,
config.watchd.timeout_fail_max)
try:
sock = rocksock.Rocksock(host=srv, port=server_port, ssl=use_ssl,
proxies=proxies, timeout=adaptive_timeout,
verifycert=verifycert)
_dbg('connecting: proto=%s tor=%s ssl=%d' % (proto, torhost, use_ssl), ps.proxy)
_sample_dbg('CONNECT: tor=%s -> proxy=%s:%s (%s) -> %s:%d ssl=%d timeout=%.1f' % (
torhost, ps.ip, ps.port, proto, srv, server_port, use_ssl, adaptive_timeout), ps.proxy)
sock.connect()
_dbg('connected OK', ps.proxy)
_sample_dbg('CONNECTED OK: %s via %s' % (ps.proxy, proto), ps.proxy)
if self.checktype == 'irc':
sock.send('NICK\n')
elif self.checktype == 'judges':
# GET request to receive body (IP)
sock.send('GET /%s HTTP/1.0\r\nHost: %s\r\n\r\n' % (
srvname.split('/', 1)[1] if '/' in srvname else '',
srvname.split('/')[0]
))
else: # head - HEAD is sufficient for header checks
sock.send('HEAD / HTTP/1.0\r\nHost: %s\r\n\r\n' % srvname)
# Record success in pool
if pool:
pool.record_success(torhost, time.time() - duration)
return sock, proto, duration, torhost, srvname, 0, use_ssl, None
except rocksock.RocksockException as e:
last_error_category = categorize_error(e)
if config.watchd.debug:
_log("proxy failed: %s://%s:%d: %s [%s]" % (proto, ps.ip, ps.port,
e.get_errormessage(), last_error_category), 'debug')
et = e.get_errortype()
err = e.get_error()
fp = e.get_failedproxy()
_sample_dbg('ERROR: %s via %s -> %s (et=%d err=%d fp=%d cat=%s)' % (
ps.proxy, proto, e.get_errormessage(), et, err, fp, last_error_category), ps.proxy)
sock.disconnect()
if et == rocksock.RS_ET_OWN:
if fp == 1 and (err == rocksock.RS_E_REMOTE_DISCONNECTED or
err == rocksock.RS_E_HIT_TIMEOUT):
break
elif fp == 0 and err == rocksock.RS_E_TARGET_CONN_REFUSED:
# Tor connection failed - record in pool
if pool:
pool.record_failure(torhost)
if random.randint(0, (config.watchd.threads - 1) / 2) == 0:
_log("could not connect to tor, sleep 5s", "ERROR")
time.sleep(5)
elif et == rocksock.RS_ET_GAI:
_log("could not resolve connection target %s" % connect_host, "ERROR")
break
elif et == rocksock.RS_ET_SSL and err == rocksock.RS_E_SSL_CERTIFICATE_ERROR:
# MITM detected - proxy works but intercepts TLS
ps.mitm = 1
elapsed = time.time() - duration
if pool:
pool.record_success(torhost, elapsed)
# Extract MITM certificate info (async to not block)
try:
cert_info = get_mitm_certificate(
ps.ip, ps.port, proto, torhost,
connect_host, server_port, config.watchd.timeout,
auth=ps.auth
)
if cert_info:
mitm_cert_stats.add_cert(ps.ip, cert_info)
if config.watchd.debug:
_log('MITM cert: %s (CN=%s, O=%s)' % (
cert_info.get('fingerprint', ''),
cert_info.get('subject_cn', ''),
cert_info.get('subject_o', '')), 'debug')
except Exception as e:
if config.watchd.debug:
_log('failed to extract MITM cert: %s' % str(e), 'debug')
return None, proto, duration, torhost, srvname, 0, use_ssl, 'ssl_mitm'
except KeyboardInterrupt as e:
raise e
_sample_dbg('ALL PROTOS FAILED: %s last_cat=%s' % (ps.proxy, last_error_category), ps.proxy)
return None, None, None, None, None, 1, use_ssl, last_error_category
class WorkerThread():
def __init__(self, id, job_queue):
self.id = id
self.done = threading.Event()
self.thread = None
self.job_queue = job_queue # shared input queue
def stop(self):
self.done.set()
def term(self):
if self.thread: self.thread.join()
def start_thread(self):
self.thread = threading.Thread(target=self.workloop)
self.thread.start()
def workloop(self):
job_count = 0
duration_total = 0
while not self.done.is_set():
try:
job = self.job_queue.get(timeout=0.5)
except Queue.Empty:
continue
nao = time.time()
# Assign worker ID for connection pool affinity
job.worker_id = self.id
job.run()
spent = time.time() - nao
job_count += 1
duration_total += spent
self.job_queue.task_done()
if self.thread and job_count > 0:
avg_t = try_div(duration_total, job_count)
_log("terminated, %d jobs, avg.time %.2f" % (job_count, avg_t), self.id)
class VerificationThread(threading.Thread):
"""Manager thread that verifies disputed/suspicious proxy results.
Pulls from verification_queue, tests proxies directly via Tor,
records authoritative results, and updates worker trust scores.
"""
def __init__(self, database, interval=30, batch_size=10):
threading.Thread.__init__(self)
self.database = database
self.interval = interval
self.batch_size = batch_size
self.daemon = True
self._stop = threading.Event()
self.verified_count = 0
self.disagreements_resolved = 0
def stop(self):
self._stop.set()
def run(self):
_log('verification thread started (interval=%ds, batch=%d)' % (
self.interval, self.batch_size), 'verify')
while not self._stop.is_set():
try:
self._verification_cycle()
except Exception as e:
_log('verification cycle error: %s' % e, 'error')
# Wait for next cycle
self._stop.wait(self.interval)
_log('verification thread stopped (verified=%d, resolved=%d)' % (
self.verified_count, self.disagreements_resolved), 'verify')
def _verification_cycle(self):
"""Process pending verifications."""
db = mysqlite.mysqlite(self.database, str)
try:
pending = dbs.get_pending_verifications(db, self.batch_size)
if not pending:
return
for item in pending:
if self._stop.is_set():
break
proxy = item['proxy']
trigger = item['trigger']
# Test proxy directly
result = self._test_proxy(proxy)
if result is None:
# Test failed/inconclusive, skip for now
continue
# Record verification result
self._record_verification(db, item, result)
self.verified_count += 1
if trigger == 'disagreement':
self.disagreements_resolved += 1
db.commit()
finally:
db.close()
def _test_proxy(self, proxy):
"""Test proxy directly using local Tor connection.
Returns:
True if proxy works, False if fails, None if inconclusive
"""
parts = proxy.split(':')
if len(parts) != 2:
return None
ip, port_str = parts
try:
port = int(port_str)
except ValueError:
return None
# Try SSL handshake as primary test
pool = connection_pool.get_pool()
if not pool:
return None
proto_map = {
'http': rocksock.RS_PT_HTTP,
'socks4': rocksock.RS_PT_SOCKS4,
'socks5': rocksock.RS_PT_SOCKS5
}
protos = ['http', 'socks5', 'socks4']
for proto in protos:
try:
sock = rocksock.Rocksock(
host='www.google.com',
port=443,
ssl=True,
timeout=config.common.timeout_connect,
proxies=[
(tor_proxy_url(pool.get_host(), proto='socks5'), rocksock.RS_PT_SOCKS5),
(proxy, proto_map.get(proto, rocksock.RS_PT_HTTP))
]
)
sock.connect()
sock.disconnect()
return True
except Exception:
continue
return False
def _record_verification(self, db, item, result):
"""Record verification result and update worker trust."""
proxy = item['proxy']
trigger = item['trigger']
worker_a = item.get('worker_a')
worker_b = item.get('worker_b')
result_a = item.get('result_a')
result_b = item.get('result_b')
# Update worker trust based on who was correct
if trigger == 'disagreement' and worker_a and worker_b:
# Check which worker(s) agreed with verification
result_int = 1 if result else 0
if result_a == result_int:
dbs.update_worker_trust(db, worker_a, True)
else:
dbs.update_worker_trust(db, worker_a, False)
if result_b == result_int:
dbs.update_worker_trust(db, worker_b, True)
else:
dbs.update_worker_trust(db, worker_b, False)
elif worker_a:
# Single worker trigger (resurrection/sudden_death)
result_int = 1 if result else 0
was_correct = (result_a == result_int)
dbs.update_worker_trust(db, worker_a, was_correct)
# Update proxy status with authoritative result
if result:
db.execute('''
UPDATE proxylist SET failed = 0, tested = ?
WHERE proxy = ?
''', (int(time.time()), proxy))
else:
db.execute('''
UPDATE proxylist SET failed = failed + 1, tested = ?
WHERE proxy = ?
''', (int(time.time()), proxy))
# Remove from verification queue
dbs.remove_from_verification_queue(db, proxy)
_log('verified %s: %s (trigger=%s)' % (
proxy, 'PASS' if result else 'FAIL', trigger), 'verify')
class Proxywatchd():
def stop(self):
_log('halting... (%d thread(s))' % len(self.threads), 'watchd')
self.stopping.set()
def _cleanup(self):
for wt in self.threads:
wt.stop()
for wt in self.threads:
wt.term()
self.collect_work()
self.submit_collected()
if self.httpd_server:
self.httpd_server.stop()
if self.verification_thread:
self.verification_thread.stop()
self.stopped.set()
def finish(self):
if not self.in_background: self._cleanup()
while not self.stopped.is_set(): time.sleep(0.1)
success_rate = try_div(self.totals['success'], self.totals['submitted']) * 100
_log("total results: %d/%d (%.2f%%)"%(self.totals['success'], self.totals['submitted'], success_rate), "watchd")
# Final save of session state before exit
try:
with self._db_context() as db:
dbs.save_session_state(db, self.stats)
dbs.save_stats_snapshot(db, self.stats)
_log('session state saved', 'watchd')
except Exception as e:
_log('failed to save final session state: %s' % str(e), 'warn')
# Save MITM certificate stats
try:
mitm_cert_stats.save_state(self.mitm_state_file)
_log('MITM cert state saved', 'watchd')
except Exception as e:
_log('failed to save MITM state: %s' % str(e), 'warn')
@contextlib.contextmanager
def _db_context(self):
"""Context manager for database connections."""
db = mysqlite.mysqlite(config.watchd.database, str)
try:
yield db
finally:
db.close()
def __init__(self):
config.load()
dns.set_config(config)
self.in_background = False
self.threads = []
self.stopping = threading.Event()
self.stopped = threading.Event()
# shared work-stealing queues
self.job_queue = PriorityJobQueue()
self.completion_queue = Queue.Queue() # completed ProxyTestState objects
# track pending proxy states
self.pending_states = {} # dict: proxy -> ProxyTestState
self.pending_lock = threading.Lock()
# create table if needed (use dbs.py for canonical schema)
with self._db_context() as db:
dbs.create_table_if_not_exists(db, 'proxylist')
self.submit_after = config.watchd.submit_after # number of collected jobs before writing db
self.collected = [] # completed ProxyTestState objects ready for DB
self.totals = {
'submitted':0,
'success':0,
}
self.stats = Stats()
self.last_cleanup = time.time()
self.httpd_server = None
self.verification_thread = None
# Dynamic thread scaling
min_t = config.watchd.min_threads if config.watchd.min_threads > 0 else max(5, config.watchd.threads // 4)
self.scaler = ThreadScaler(
min_threads=min_t,
max_threads=config.watchd.threads,
target_queue_per_thread=5,
scale_cooldown=config.watchd.scale_cooldown,
scale_threshold=config.watchd.scale_threshold
)
self.thread_id_counter = 0
self.last_jobs_log = 0
self.last_update_log = 0
self.log_interval = 60 # seconds between routine log messages
# Register SIGHUP handler for config reload
signal.signal(signal.SIGHUP, self._handle_sighup)
def _handle_sighup(self, signum, frame):
"""Handle SIGHUP signal for config reload."""
_log('received SIGHUP, reloading config...', 'watchd')
self.reload_config()
def reload_config(self):
"""Reload configuration without restart.
Hot-reloadable settings:
threads, timeout, checktime, perfail_checktime, submit_after,
stats_interval, debug, tor_safeguard, stale_days, max_fail,
outage_threshold
Requires restart:
database, source_file, checktype
"""
old_threads = config.watchd.threads
old_checktypes = list(config.watchd.checktypes)
try:
config.load()
errors = config.validate()
if errors:
for e in errors:
_log('config error: %s' % e, 'error')
_log('config reload failed, keeping old config', 'error')
return False
except Exception as e:
_log('config reload failed: %s' % str(e), 'error')
return False
# Update runtime values
self.submit_after = config.watchd.submit_after
# Update scaler limits
self.scaler.min_threads = max(5, config.watchd.threads // 4)
self.scaler.max_threads = config.watchd.threads
# Warn about values requiring restart
if config.watchd.checktypes != old_checktypes:
_log('checktype changed (%s -> %s), requires restart to take effect' % (
','.join(old_checktypes), ','.join(config.watchd.checktypes)), 'warn')
_log('config reloaded: threads=%d timeout=%d working=%ds fail_interval=%ds backoff=%s' % (
config.watchd.threads, config.watchd.timeout,
config.watchd.working_checktime, config.watchd.fail_retry_interval,
config.watchd.fail_retry_backoff), 'watchd')
return True
def _spawn_thread(self):
"""Spawn a new worker thread."""
self.thread_id_counter += 1
threadid = 'dyn%d' % self.thread_id_counter
wt = WorkerThread(threadid, self.job_queue)
wt.start_thread()
self.threads.append(wt)
return threadid
def _remove_thread(self):
"""Remove one worker thread (graceful shutdown)."""
if len(self.threads) <= self.scaler.min_threads:
return None
# Stop the last thread
wt = self.threads.pop()
wt.stop()
# Don't wait for it - let it finish its current job
return wt.id
def _adjust_threads(self, target):
"""Adjust thread count to target."""
current = len(self.threads)
if target > current:
added = []
for _ in range(target - current):
tid = self._spawn_thread()
added.append(tid)
if added:
_log('scaled up: +%d threads (%s)' % (len(added), ','.join(added)), 'scaler')
elif target < current:
removed = []
for _ in range(current - target):
tid = self._remove_thread()
if tid:
removed.append(tid)
if removed:
_log('scaled down: -%d threads' % len(removed), 'scaler')
def fetch_rows(self, db):
"""Fetch proxy rows due for testing from database."""
self.isoldies = False
# Build due condition using new schedule formula
due_sql, due_params = _build_due_sql()
q = '''SELECT ip,port,proto,failed,success_count,total_duration,country,mitm,
consecutive_success,asn,proxy FROM proxylist WHERE %s ORDER BY RANDOM()''' % due_sql
_dbg('fetch_rows: working=%d fail_interval=%d backoff=%s max_fail=%d' % (
config.watchd.working_checktime, config.watchd.fail_retry_interval,
config.watchd.fail_retry_backoff, config.watchd.max_fail))
rows = db.execute(q, due_params).fetchall()
_dbg('fetch_rows: got %d rows, threads=%d' % (len(rows), config.watchd.threads))
# Check oldies? (dead proxies getting a second chance)
if len(rows) < config.watchd.threads:
_dbg('fetch_rows: rows < threads, clearing rows (oldies=%s)' % config.watchd.oldies)
rows = []
if config.watchd.oldies:
self.isoldies = True
# Disable tor safeguard for old proxies
if self.tor_safeguard:
self.tor_safeguard = False
# Oldies use simple checktime-based query (dead proxies)
now = time.time()
oldies_max = config.watchd.max_fail + round(config.watchd.max_fail / 2)
q_oldies = '''SELECT ip,port,proto,failed,success_count,total_duration,country,
mitm,consecutive_success,asn,proxy FROM proxylist
WHERE failed >= ? AND failed < ? AND (tested + ?) < ?
ORDER BY RANDOM()'''
rows = db.execute(q_oldies, (config.watchd.max_fail, oldies_max,
config.watchd.oldies_checktime, now)).fetchall()
return rows
def prepare_jobs(self):
## enable tor safeguard by default
self.tor_safeguard = config.watchd.tor_safeguard
# Fetch rows within context manager scope
with self._db_context() as db:
rows = self.fetch_rows(db)
_dbg('prepare_jobs: %d rows, checktypes=%s' % (len(rows), config.watchd.checktypes))
checktypes = config.watchd.checktypes
# Build target pools for each checktype
target_pools = {}
for ct in checktypes:
if ct == 'irc':
target_pools[ct] = config.servers
_dbg('target_pool[irc]: %d servers' % len(config.servers))
elif ct == 'judges':
# Filter out judges in cooldown (blocked/rate-limited)
all_judges = list(judges.keys())
available = judge_stats.get_available_judges(all_judges)
target_pools[ct] = available if available else all_judges
elif ct == 'ssl':
target_pools[ct] = ssl_targets
_dbg('target_pool[ssl]: %d targets' % len(ssl_targets))
else: # head
target_pools[ct] = list(regexes.keys())
_dbg('target_pool[%s]: %d targets' % (ct, len(regexes)))
# create all jobs first, then shuffle for interleaving
all_jobs = []
new_states = []
for row in rows:
# create shared state for this proxy
# row: ip, port, proto, failed, success_count, total_duration,
# country, mitm, consecutive_success, asn, proxy
state = ProxyTestState(
row[0], row[1], row[2], row[3], row[4], row[5],
row[6], row[7], row[8], asn=row[9],
oldies=self.isoldies, completion_queue=self.completion_queue,
proxy_full=row[10]
)
new_states.append(state)
# randomly select checktype for this proxy
checktype = random.choice(checktypes)
target_pool = target_pools[checktype]
# select single target (single-target mode)
target = random.choice(target_pool)
# create job for this proxy
job = TargetTestJob(state, target, checktype)
all_jobs.append(job)
# shuffle to interleave tests across different proxies
random.shuffle(all_jobs)
# track pending states (dict for O(1) lookup/removal)
with self.pending_lock:
for state in new_states:
self.pending_states[state.proxy] = state
# queue all jobs with priority
for job in all_jobs:
priority = calculate_priority(
job.proxy_state.failcount,
job.proxy_state.success_count,
config.watchd.max_fail
)
self.job_queue.put(job, priority)
proxy_count = len(new_states)
job_count = len(all_jobs)
_dbg('prepare_jobs: created %d jobs for %d proxies' % (job_count, proxy_count))
now = time.time()
if proxy_count > 0 and (now - self.last_jobs_log) >= self.log_interval:
_log("created %d jobs for %d proxies" % (job_count, proxy_count), 'watchd')
self.last_jobs_log = now
return job_count
def collect_work(self):
# process completed states from completion queue (event-driven, not polling)
# ProxyTestState.record_result() pushes to completion_queue when all targets done
completed_count = 0
while True:
try:
state = self.completion_queue.get_nowait()
completed_count += 1
# evaluate and record stats
success, category = state.evaluate()
self.stats.record(success, category, state.proto, state.last_latency_ms,
state.country, state.asn,
ssl_test=state.had_ssl_test, mitm=(state.mitm > 0),
cert_error=state.cert_error)
self.collected.append(state)
# remove from pending dict (O(1))
with self.pending_lock:
self.pending_states.pop(state.proxy, None)
except Queue.Empty:
break
def collect_unfinished(self):
# drain any remaining jobs from job queue
unfinished_count = 0
while True:
try:
self.job_queue.get_nowait()
unfinished_count += 1
except Queue.Empty:
break
if unfinished_count > 0:
_log("discarded %d unfinished jobs" % unfinished_count, "watchd")
# note: corresponding ProxyTestStates will be incomplete
# they'll be re-tested in the next cycle
def submit_collected(self):
if len(self.collected) == 0:
return True
sc = 0
dead_count = 0
args = []
latency_updates = [] # (proxy, latency_ms) for successful tests
max_fail = config.watchd.max_fail
_dbg('submit_collected: %d jobs' % len(self.collected))
for job in self.collected:
if job.failcount == 0:
sc += 1
_dbg('submit OK: failcount=0', job.proxy)
if job.last_latency_ms is not None:
latency_updates.append((job.proxy, job.last_latency_ms))
# Check if proxy should be marked as permanently dead
effective_failcount = job.failcount
if job.failcount > 0:
is_fatal = job.last_fail_category in FATAL_ERROR_CATEGORIES
# Fatal errors (refused/unreachable/auth) = immediately dead
if is_fatal:
effective_failcount = DEAD_PROXY
dead_count += 1
# Non-fatal: mark dead if exceeded max_fail*2
elif job.failcount >= max_fail * 2:
effective_failcount = DEAD_PROXY
dead_count += 1
args.append((effective_failcount, job.checktime, 1, job.country, job.proto,
job.success_count, job.total_duration, job.mitm,
job.consecutive_success, job.asn, job.proxy))
success_rate = (float(sc) / len(self.collected)) * 100
ret = True
if len(self.collected) >= 100 and success_rate <= config.watchd.outage_threshold and self.tor_safeguard:
_log("WATCHD %.2f%% SUCCESS RATE - tor circuit blocked? won't submit fails" % success_rate, "ERROR")
if sc == 0:
return False
args = []
latency_updates = []
for job in self.collected:
if job.failcount == 0:
args.append((job.failcount, job.checktime, 1, job.country, job.proto,
job.success_count, job.total_duration, job.mitm,
job.consecutive_success, job.asn, job.proxy))
if job.last_latency_ms is not None:
latency_updates.append((job.proxy, job.last_latency_ms))
ret = False
now = time.time()
if (now - self.last_update_log) >= self.log_interval or success_rate > 0:
dead_msg = ', %d marked dead' % dead_count if dead_count > 0 else ''
_log("updating %d DB entries (success rate: %.2f%%%s)" % (len(self.collected), success_rate, dead_msg), 'watchd')
self.last_update_log = now
# Build anonymity updates before DB context
anonymity_updates = [(job.proxy, job.exit_ip, job.ip, job.reveals_headers)
for job in self.collected
if job.failcount == 0 and job.exit_ip]
with self._db_context() as db:
query = 'UPDATE proxylist SET failed=?,tested=?,dronebl=?,country=?,proto=?,success_count=?,total_duration=?,mitm=?,consecutive_success=?,asn=? WHERE proxy=?'
db.executemany(query, args)
# Batch update latency metrics for successful proxies
if latency_updates:
dbs.batch_update_proxy_latency(db, latency_updates)
# Batch update anonymity for proxies with exit IP data
if anonymity_updates:
dbs.batch_update_proxy_anonymity(db, anonymity_updates)
db.commit()
self.collected = []
self.totals['submitted'] += len(args)
self.totals['success'] += sc
return ret
def cleanup_stale(self):
"""Remove proxies that have been dead for too long."""
stale_seconds = config.watchd.stale_days * 86400
cutoff = int(time.time()) - stale_seconds
with self._db_context() as db:
# delete proxies that: (failed >= max_fail OR permanently dead) AND last tested before cutoff
result = db.execute(
'DELETE FROM proxylist WHERE (failed >= ? OR failed = ?) AND tested < ?',
(config.watchd.max_fail, DEAD_PROXY, cutoff)
)
count = result.rowcount if hasattr(result, 'rowcount') else 0
db.commit()
if count > 0:
_log('removed %d stale proxies (not seen in %d days)' % (count, config.watchd.stale_days), 'watchd')
self.last_cleanup = time.time()
def start(self):
if config.watchd.threads == 1 and _run_standalone:
return self._run()
else:
return self._run_background()
def run(self):
if self.in_background:
while 1: time.sleep(1)
def _run_background(self):
self.in_background = True
t = threading.Thread(target=self._run)
t.start()
def get_runtime_stats(self):
"""Return runtime statistics for the dashboard.
Returns:
dict with tested, passed, success_rate, rate, uptime_seconds,
threads, failures, tor_pool, and engine stats.
"""
stats_data = {}
# Stats from Stats object
with self.stats.lock:
stats_data['tested'] = self.stats.tested
stats_data['passed'] = self.stats.passed
stats_data['failed'] = self.stats.failed
elapsed = time.time() - self.stats.start_time
stats_data['uptime_seconds'] = int(elapsed)
stats_data['rate'] = try_div(self.stats.tested, elapsed)
stats_data['success_rate'] = try_div(self.stats.passed * 100.0, self.stats.tested)
stats_data['failures'] = dict(self.stats.fail_categories)
# New: Protocol breakdown
stats_data['by_proto'] = dict(self.stats.by_proto)
# New: Rate history for sparklines
stats_data['rate_history'] = list(self.stats.rate_history)
stats_data['success_rate_history'] = list(self.stats.success_rate_history)
# New: Peak rate
stats_data['peak_rate'] = self.stats.peak_rate
# New: Recent rate (last 60s) and avg latency
stats_data['recent_rate'] = self.stats.get_recent_rate()
stats_data['recent_success_rate'] = self.stats.get_recent_success_rate()
stats_data['avg_latency'] = self.stats.get_avg_latency()
# Latency stats
stats_data['min_latency'] = self.stats.min_latency if self.stats.min_latency != float('inf') else 0
stats_data['max_latency'] = self.stats.max_latency
stats_data['latency_percentiles'] = self.stats.get_latency_percentiles()
stats_data['latency_histogram'] = self.stats.get_latency_histogram()
# Protocol performance (tested, passed, rate per protocol)
stats_data['proto_stats'] = self.stats.get_proto_stats()
# Geographic distribution from session
stats_data['top_countries_session'] = self.stats.get_top_countries(10)
stats_data['top_asns_session'] = self.stats.get_top_asns(10)
# SSL/TLS stats
with self.stats.lock:
ssl_tested = self.stats.ssl_tested
ssl_passed = self.stats.ssl_passed
mitm_stats = mitm_cert_stats.get_stats()
stats_data['ssl'] = {
'tested': ssl_tested,
'passed': ssl_passed,
'failed': self.stats.ssl_failed,
'fail_categories': dict(self.stats.ssl_fail_categories),
'success_rate': try_div(ssl_passed * 100.0, ssl_tested) if ssl_tested > 0 else 0,
'mitm_detected': self.stats.mitm_detected,
'cert_errors': self.stats.cert_errors,
'mitm_unique_certs': mitm_stats.get('unique_certs', 0),
'mitm_unique_proxies': mitm_stats.get('unique_proxies', 0)
}
# Thread info
stats_data['threads'] = len(self.threads)
stats_data['min_threads'] = self.scaler.min_threads
stats_data['max_threads'] = self.scaler.max_threads
stats_data['queue_size'] = self.job_queue.qsize()
stats_data['checktype'] = ','.join(config.watchd.checktypes)
stats_data['checktypes'] = config.watchd.checktypes
stats_data['profiling'] = (
getattr(config.args, 'profile', False) if hasattr(config, 'args') else False
) or getattr(config.common, 'profiling', False)
stats_data['pass_rate'] = try_div(self.stats.passed, elapsed)
# Tor pool stats
pool = connection_pool.get_pool()
if pool:
pool_stats = pool.get_stats()
hosts_list = []
healthy_count = 0
latency_sum = 0.0
latency_count = 0
for h in pool_stats.get('hosts', []):
is_healthy = h.get('available', False)
lat_ms = h.get('avg_latency', 0) * 1000 if h.get('avg_latency') else None
hosts_list.append({
'address': h.get('host', ''),
'healthy': is_healthy,
'latency_ms': lat_ms,
'success_rate': h.get('success_rate', 0),
})
if is_healthy:
healthy_count += 1
if lat_ms and lat_ms > 0:
latency_sum += lat_ms
latency_count += 1
pool_info = {
'hosts': hosts_list,
'total_requests': pool_stats.get('total_requests', 0),
'success_rate': pool_stats.get('success_rate', 0),
'healthy_count': healthy_count,
'total_count': len(hosts_list),
'avg_latency': latency_sum / latency_count if latency_count > 0 else 0,
}
stats_data['tor_pool'] = pool_info
else:
stats_data['tor_pool'] = {
'hosts': [], 'total_requests': 0, 'success_rate': 0,
'healthy_count': 0, 'total_count': 0, 'avg_latency': 0
}
# Judge stats (when using judges checktype)
if 'judges' in config.watchd.checktypes:
js = judge_stats.get_stats()
stats_data['judges'] = {
'total': js.get('total', 0),
'available': js.get('available', 0),
'in_cooldown': js.get('in_cooldown', 0),
'top_judges': js.get('top', [])[:5] # top 5 most successful
}
else:
stats_data['judges'] = None
# Scraper/engine stats
if scraper_available:
scraper_stats = scraper_module.get_scraper_stats()
if scraper_stats:
stats_data['engines_available'] = scraper_stats.get('available', 0)
stats_data['engines_total'] = scraper_stats.get('total', 0)
stats_data['engines_backoff'] = scraper_stats.get('in_backoff', 0)
stats_data['scraper'] = scraper_stats
else:
stats_data['engines_available'] = 0
stats_data['engines_total'] = 0
stats_data['engines_backoff'] = 0
stats_data['scraper'] = None
else:
stats_data['engines_available'] = 0
stats_data['engines_total'] = 0
stats_data['engines_backoff'] = 0
stats_data['scraper'] = None
# Network usage stats
stats_data['network'] = network_stats.get_stats()
# MITM certificate stats (detailed)
stats_data['mitm'] = mitm_cert_stats.get_stats()
return stats_data
def _run(self):
_log('starting...', 'watchd')
_log('config: db=%s checktype=%s threads=%d working=%ds fail_interval=%ds backoff=%s max_fail=%d' % (
config.watchd.database, ','.join(config.watchd.checktypes), config.watchd.threads,
config.watchd.working_checktime, config.watchd.fail_retry_interval,
config.watchd.fail_retry_backoff, config.watchd.max_fail), 'watchd')
# Log database status at startup
with self._db_context() as db:
total = db.execute('SELECT COUNT(*) FROM proxylist').fetchone()[0]
due_sql, due_params = _build_due_sql()
due = db.execute('SELECT COUNT(*) FROM proxylist WHERE ' + due_sql, due_params).fetchone()[0]
# Create stats persistence tables
dbs.create_table_if_not_exists(db, 'stats_history')
dbs.create_table_if_not_exists(db, 'session_state')
# Load persisted session state
saved_state = dbs.load_session_state(db)
if saved_state:
self.stats.load_state(saved_state)
# Load MITM certificate state (same directory as database)
db_dir = os.path.dirname(config.watchd.database) or '.'
self.mitm_state_file = os.path.join(db_dir, 'mitm_certs.json')
mitm_cert_stats.load_state(self.mitm_state_file)
_log('database: %d total proxies, %d due for testing' % (total, due), 'watchd')
# Initialize Tor connection pool
connection_pool.init_pool(config.torhosts, warmup=True)
# Start HTTP API server if enabled
if config.httpd.enabled:
from httpd import ProxyAPIServer, configure_schedule
# Pass schedule config to httpd module
configure_schedule(
config.watchd.working_checktime,
config.watchd.fail_retry_interval,
config.watchd.fail_retry_backoff,
config.watchd.max_fail
)
self.httpd_server = ProxyAPIServer(
config.httpd.listenip,
config.httpd.port,
config.watchd.database,
stats_provider=self.get_runtime_stats
)
self.httpd_server.start()
# Start verification thread if enabled (manager-only, checks disputed results)
if config.verification.enabled and config.watchd.threads > 0:
self.verification_thread = VerificationThread(
database=config.watchd.database,
interval=config.verification.interval,
batch_size=config.verification.batch_size
)
self.verification_thread.start()
_log('verification thread enabled', 'watchd')
# create worker threads with shared queues
for i in range(config.watchd.threads):
threadid = ''.join([random.choice(string.ascii_letters) for x in range(5)])
wt = WorkerThread(threadid, self.job_queue)
if self.in_background:
wt.start_thread()
self.threads.append(wt)
time.sleep(random.random() / 10)
sleeptime = 0
while True:
if self.stopping.is_set():
if self.in_background: self._cleanup()
break
if sleeptime > 0:
time.sleep(1)
sleeptime -= 1
continue
# check if job queue is empty (work-stealing: threads pull as needed)
if self.job_queue.empty():
self.collect_work()
if not self.submit_collected() and self.tor_safeguard:
_log("zzZzZzzZ sleeping 1 minute(s) due to tor issues", "watchd")
sleeptime = 60
else:
job_count = self.prepare_jobs()
if job_count == 0:
# no jobs available, wait before checking again
sleeptime = 10
if not self.in_background: # single_thread scenario
self.threads[0].workloop()
self.collect_work()
if len(self.collected) > self.submit_after:
if not self.submit_collected() and self.tor_safeguard:
_log("zzZzZzzZ sleeping 1 minute(s) due to tor issues", "watchd")
sleeptime = 60
# Update rate history for sparklines
self.stats.update_history()
# periodic stats report
if self.stats.should_report(config.watchd.stats_interval):
_log(self.stats.report(), 'stats')
# Also report pool stats
pool = connection_pool.get_pool()
if pool:
_log(pool.status_line(), 'stats')
# Report judge stats (when using judges checktype)
if 'judges' in config.watchd.checktypes:
_log(judge_stats.status_line(), 'stats')
# Report scaler status
_log(self.scaler.status_line(len(self.threads), self.job_queue.qsize()), 'scaler')
# Save session state periodically (every stats_interval, default 5m)
try:
with self._db_context() as db:
dbs.save_session_state(db, self.stats)
except Exception as e:
_log('failed to save session state: %s' % str(e), 'warn')
# Save MITM certificate stats periodically
try:
mitm_cert_stats.save_state(self.mitm_state_file)
except Exception as e:
_log('failed to save MITM state: %s' % str(e), 'warn')
# Hourly stats snapshot
now = time.time()
if not hasattr(self, '_last_snapshot'):
self._last_snapshot = now
if (now - self._last_snapshot) >= 3600:
try:
with self._db_context() as db:
dbs.save_stats_snapshot(db, self.stats)
self._last_snapshot = now
except Exception as e:
_log('failed to save stats snapshot: %s' % str(e), 'warn')
# Dynamic thread scaling
if self.in_background:
target = self.scaler.should_scale(
len(self.threads),
self.job_queue.qsize(),
self.stats
)
if target != len(self.threads):
self._adjust_threads(target)
# periodic stale proxy cleanup (daily)
if (time.time() - self.last_cleanup) >= 86400:
self.cleanup_stale()
time.sleep(1)
if __name__ == '__main__':
_run_standalone = True
config.load()
errors = config.validate()
if errors:
for e in errors:
_log(e, 'error')
import sys
sys.exit(1)
w = Proxywatchd()
try:
w.start()
w.run()
except KeyboardInterrupt as e:
pass
finally:
w.stop()
w.finish()