Generalizes JudgeStats into TargetStats with cooldown-based filtering for head targets, SSL targets, and IRC servers. Targets that repeatedly block or fail are temporarily avoided, preventing unfair proxy failures when a target goes down. Exposes per-pool health via /api/stats.
2131 lines
87 KiB
Python
2131 lines
87 KiB
Python
#!/usr/bin/env python2
|
|
from __future__ import division
|
|
|
|
# Gevent monkey-patching MUST happen before any other imports
|
|
# This patches threading, socket, time, etc. for cooperative concurrency
|
|
from gevent import monkey
|
|
monkey.patch_all()
|
|
|
|
import threading # Now patched by gevent
|
|
import time
|
|
import random
|
|
import string
|
|
import re
|
|
import signal
|
|
import network_stats
|
|
|
|
import os
|
|
import ssl
|
|
import contextlib
|
|
try:
|
|
import Queue
|
|
except ImportError:
|
|
import queue as Queue
|
|
try:
|
|
import IP2Location
|
|
geodb = IP2Location.IP2Location(os.path.join("data", "IP2LOCATION-LITE-DB1.BIN"))
|
|
geolite = True
|
|
except (ImportError, IOError, ValueError):
|
|
geolite = False
|
|
|
|
try:
|
|
import pyasn
|
|
asndb = pyasn.pyasn(os.path.join("data", "ipasn.dat"))
|
|
asn_enabled = True
|
|
except (ImportError, IOError, NameError):
|
|
asndb = None
|
|
asn_enabled = False
|
|
|
|
from config import Config
|
|
|
|
import mysqlite
|
|
import dbs
|
|
import dns
|
|
from misc import _log, categorize_error, tor_proxy_url, is_ssl_protocol_error
|
|
import rocksock
|
|
import connection_pool
|
|
from stats import TargetStats, JudgeStats, Stats, regexes, ssl_targets, try_div
|
|
from mitm import MITMCertStats, extract_cert_info, get_mitm_certificate
|
|
from dns import socks4_resolve
|
|
from job import PriorityJobQueue, calculate_priority
|
|
|
|
# Optional scraper integration for engine stats
|
|
try:
|
|
import scraper as scraper_module
|
|
scraper_available = True
|
|
except ImportError:
|
|
scraper_module = None
|
|
scraper_available = False
|
|
|
|
config = Config()
|
|
|
|
def set_config(cfg):
|
|
"""Set the config object (used when imported from ppf.py)."""
|
|
global config
|
|
config = cfg
|
|
dns.set_config(cfg)
|
|
|
|
_run_standalone = False
|
|
|
|
# Debug mode for proxy check path - set via PPF_DEBUG env or config
|
|
_debug_proxy = os.environ.get('PPF_DEBUG', '').lower() in ('1', 'true', 'proxy')
|
|
|
|
# Sampling debug: print detailed diagnostics for every Nth test
|
|
_sample_debug_interval = 50 # Print debug for every 50th test (lowered for diagnosis)
|
|
_sample_debug_counter = 0
|
|
_sample_debug_lock = threading.Lock()
|
|
|
|
def _dbg(msg, proxy=None):
|
|
"""Debug log for proxy check path. Only logs when PPF_DEBUG=1."""
|
|
if _debug_proxy:
|
|
prefix = '[%s] ' % proxy if proxy else ''
|
|
# Use 'dbg' category (shows at info level) instead of 'debug' (filtered by default)
|
|
_log('%s%s' % (prefix, msg), 'dbg')
|
|
|
|
def _sample_dbg(msg, proxy=None, force=False):
|
|
"""Sampled debug: log every Nth test for diagnostics without flooding."""
|
|
global _sample_debug_counter
|
|
should_log = force
|
|
if not should_log:
|
|
with _sample_debug_lock:
|
|
_sample_debug_counter += 1
|
|
if _sample_debug_counter >= _sample_debug_interval:
|
|
_sample_debug_counter = 0
|
|
should_log = True
|
|
if should_log:
|
|
prefix = '[SAMPLE %s] ' % proxy if proxy else '[SAMPLE] '
|
|
_log('%s%s' % (prefix, msg), 'diag')
|
|
|
|
|
|
def _build_due_sql():
|
|
"""Build SQL WHERE clause for proxies due for testing.
|
|
|
|
Returns (sql_condition, params) using new schedule formula:
|
|
- failed=0: tested + working_checktime < now
|
|
- failed>0 with backoff: tested + (failed * fail_retry_interval) < now
|
|
- failed>0 without backoff: tested + fail_retry_interval < now
|
|
"""
|
|
now = time.time()
|
|
if config.watchd.fail_retry_backoff:
|
|
# Linear backoff: multiply interval by failure count
|
|
sql = '''failed >= 0 AND failed < ? AND (
|
|
tested IS NULL OR
|
|
CASE WHEN failed = 0
|
|
THEN tested + ? < ?
|
|
ELSE tested + (failed * ?) < ?
|
|
END
|
|
)'''
|
|
params = (config.watchd.max_fail, config.watchd.working_checktime, now,
|
|
config.watchd.fail_retry_interval, now)
|
|
else:
|
|
# Fixed interval: same delay regardless of failure count
|
|
sql = '''failed >= 0 AND failed < ? AND (
|
|
tested IS NULL OR
|
|
CASE WHEN failed = 0
|
|
THEN tested + ? < ?
|
|
ELSE tested + ? < ?
|
|
END
|
|
)'''
|
|
params = (config.watchd.max_fail, config.watchd.working_checktime, now,
|
|
config.watchd.fail_retry_interval, now)
|
|
return sql, params
|
|
|
|
|
|
# IP pattern for judge services (validates response contains valid IP in body)
|
|
IP_PATTERN = r'\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}'
|
|
|
|
def is_valid_ip(ip_str):
|
|
"""Validate IP address octets are 0-255."""
|
|
try:
|
|
parts = ip_str.split('.')
|
|
return len(parts) == 4 and all(0 <= int(p) <= 255 for p in parts)
|
|
except (ValueError, AttributeError):
|
|
return False
|
|
|
|
def is_public_ip(ip_str):
|
|
"""Validate IP is a public, globally routable address."""
|
|
if not is_valid_ip(ip_str):
|
|
return False
|
|
parts = [int(p) for p in ip_str.split('.')]
|
|
if parts[0] == 0: return False # 0.0.0.0/8
|
|
if parts[0] == 10: return False # 10.0.0.0/8
|
|
if parts[0] == 127: return False # 127.0.0.0/8
|
|
if parts[0] == 169 and parts[1] == 254: return False # link-local
|
|
if parts[0] == 172 and 16 <= parts[1] <= 31: return False # 172.16/12
|
|
if parts[0] == 192 and parts[1] == 168: return False # 192.168/16
|
|
if parts[0] >= 224: return False # multicast + reserved
|
|
return True
|
|
|
|
# Pattern for header echo - if X-Forwarded-For or Via present, proxy reveals chain
|
|
HEADER_REVEAL_PATTERN = r'(X-Forwarded-For|Via|X-Real-Ip|Forwarded)'
|
|
|
|
# Dead proxy marker - proxies with failed=-1 are permanently dead and never retested
|
|
DEAD_PROXY = -1
|
|
# Error categories that indicate proxy is definitely dead (not temporary failure)
|
|
FATAL_ERROR_CATEGORIES = ('refused', 'unreachable', 'auth')
|
|
|
|
# Patterns indicating HTTP target is blocking the proxy (not a proxy failure)
|
|
# These should NOT count as proxy failures - applies to judges and head targets
|
|
HTTP_BLOCK_PATTERNS = [
|
|
r'HTTP/1\.[01] 403', # Forbidden
|
|
r'HTTP/1\.[01] 429', # Too Many Requests
|
|
r'HTTP/1\.[01] 503', # Service Unavailable
|
|
r'captcha', # Captcha challenge
|
|
r'challenge', # Generic challenge
|
|
r'verify you are human', # Human verification
|
|
r'rate.?limit', # Rate limiting
|
|
r'too many requests', # Rate limiting
|
|
r'access.?denied', # Access denied
|
|
r'blocked', # Explicit block
|
|
r'Checking your browser', # Cloudflare JS challenge
|
|
]
|
|
HTTP_BLOCK_RE = re.compile('|'.join(HTTP_BLOCK_PATTERNS), re.IGNORECASE)
|
|
|
|
# Check types: irc, http (header match), judges (body match), ssl (TLS handshake)
|
|
# Judge services - return IP in body (plain text, JSON, or HTML)
|
|
judges = {
|
|
# Plain text IP
|
|
'api.ipify.org': IP_PATTERN,
|
|
'icanhazip.com': IP_PATTERN,
|
|
'checkip.amazonaws.com': IP_PATTERN,
|
|
'ifconfig.me/ip': IP_PATTERN,
|
|
'ipinfo.io/ip': IP_PATTERN,
|
|
'ip.me': IP_PATTERN,
|
|
'myip.dnsomatic.com': IP_PATTERN,
|
|
'ident.me': IP_PATTERN,
|
|
'ipecho.net/plain': IP_PATTERN,
|
|
'wtfismyip.com/text': IP_PATTERN,
|
|
'eth0.me': IP_PATTERN,
|
|
'l2.io/ip': IP_PATTERN,
|
|
'tnx.nl/ip': IP_PATTERN,
|
|
'wgetip.com': IP_PATTERN,
|
|
'curlmyip.net': IP_PATTERN,
|
|
# JSON responses (IP pattern matches within JSON)
|
|
'httpbin.org/ip': IP_PATTERN,
|
|
'ip-api.com/json': IP_PATTERN,
|
|
'ipapi.co/json': IP_PATTERN,
|
|
'ipwhois.app/json': IP_PATTERN,
|
|
# Header echo - for elite detection (check if proxy adds revealing headers)
|
|
'httpbin.org/headers': IP_PATTERN, # returns request headers as JSON
|
|
}
|
|
|
|
|
|
|
|
# Global instances
|
|
judge_stats = JudgeStats()
|
|
head_target_stats = TargetStats(cooldown_seconds=300, block_threshold=3)
|
|
ssl_target_stats = TargetStats(cooldown_seconds=300, block_threshold=3)
|
|
irc_target_stats = TargetStats(cooldown_seconds=300, block_threshold=3)
|
|
mitm_cert_stats = MITMCertStats()
|
|
|
|
|
|
class ThreadScaler(object):
|
|
"""Dynamic thread scaling based on queue depth and success rate.
|
|
|
|
With gevent greenlets, we can scale much more aggressively than with
|
|
OS threads since greenlets are cooperative and lightweight.
|
|
|
|
Scales up when:
|
|
- Queue depth exceeds high watermark
|
|
- Success rate is above threshold
|
|
Scales down when:
|
|
- Queue is nearly empty
|
|
- Success rate drops below threshold
|
|
"""
|
|
|
|
def __init__(self, min_threads=5, max_threads=100, target_queue_per_thread=5,
|
|
scale_cooldown=10, scale_threshold=10.0):
|
|
self.min_threads = min_threads
|
|
self.max_threads = max_threads
|
|
self.target_queue_per_thread = target_queue_per_thread
|
|
self.last_scale_time = 0
|
|
self.scale_cooldown = scale_cooldown
|
|
self.success_threshold = scale_threshold
|
|
|
|
def should_scale(self, current_threads, queue_size, stats):
|
|
"""Determine if scaling is needed.
|
|
|
|
Args:
|
|
current_threads: Current number of active threads
|
|
queue_size: Current job queue depth
|
|
stats: Stats object with success/fail counts
|
|
|
|
Returns:
|
|
int: Target thread count (may be same as current)
|
|
"""
|
|
now = time.time()
|
|
if (now - self.last_scale_time) < self.scale_cooldown:
|
|
return current_threads
|
|
|
|
# Calculate current success rate
|
|
with stats.lock:
|
|
total = stats.tested
|
|
passed = stats.passed
|
|
success_rate = try_div(passed * 100.0, total) if total > 0 else 50.0
|
|
|
|
# Calculate ideal thread count based on queue depth
|
|
# Never spawn more threads than jobs available
|
|
ideal = max(self.min_threads, queue_size // self.target_queue_per_thread)
|
|
ideal = min(ideal, self.max_threads, max(self.min_threads, queue_size))
|
|
|
|
target = current_threads
|
|
|
|
# Scale up: queue is deep - scale based on queue depth only
|
|
# With greenlets, scale more aggressively (+5 instead of +2)
|
|
if queue_size > current_threads * self.target_queue_per_thread:
|
|
target = min(current_threads + 5, ideal, self.max_threads)
|
|
|
|
# Scale down: queue is shallow
|
|
elif queue_size < current_threads * 2:
|
|
# Scale down faster when threads far exceed ideal
|
|
reduction = 2 if current_threads <= ideal * 2 else 5
|
|
target = max(current_threads - reduction, ideal, self.min_threads)
|
|
|
|
if target != current_threads:
|
|
self.last_scale_time = now
|
|
|
|
return target
|
|
|
|
def status_line(self, current_threads, queue_size):
|
|
"""Return status string for logging."""
|
|
return 'threads=%d queue=%d target_per_thread=%d' % (
|
|
current_threads, queue_size, self.target_queue_per_thread)
|
|
|
|
class ProxyTestState(object):
|
|
"""Thread-safe state for a proxy being tested.
|
|
|
|
Holds test results and evaluates final pass/fail status.
|
|
"""
|
|
__slots__ = (
|
|
'ip', 'port', 'proxy', 'auth', 'proto', 'failcount', 'checktime',
|
|
'success_count', 'total_duration', 'country', 'mitm', 'consecutive_success',
|
|
'asn', 'isoldies', 'completion_queue', 'lock', 'results', 'completed',
|
|
'evaluated', 'last_latency_ms', 'exit_ip', 'reveals_headers',
|
|
'last_fail_category', 'original_failcount', 'had_ssl_test', 'ssl_success',
|
|
'cert_error', 'source_proto', 'protos_working',
|
|
'last_check', 'last_target'
|
|
)
|
|
|
|
def __init__(self, ip, port, proto, failcount, success_count, total_duration,
|
|
country, mitm, consecutive_success, asn=None, oldies=False,
|
|
completion_queue=None, proxy_full=None, source_proto=None):
|
|
self.ip = ip
|
|
self.port = int(port)
|
|
self.proxy = '%s:%s' % (ip, port)
|
|
# Parse auth credentials from full proxy string (user:pass@ip:port)
|
|
self.auth = None
|
|
if proxy_full and '@' in str(proxy_full):
|
|
auth_part = str(proxy_full).rsplit('@', 1)[0]
|
|
if ':' in auth_part:
|
|
self.auth = auth_part # user:pass
|
|
self.proto = proto
|
|
self.failcount = failcount
|
|
self.checktime = None
|
|
self.success_count = success_count
|
|
self.total_duration = total_duration
|
|
self.country = country
|
|
self.mitm = mitm
|
|
self.consecutive_success = consecutive_success
|
|
self.asn = asn
|
|
self.isoldies = oldies
|
|
self.completion_queue = completion_queue # for signaling completion
|
|
|
|
# thread-safe result accumulation
|
|
self.lock = threading.Lock()
|
|
self.results = [] # list of (success, proto, duration, srv, tor, ssl)
|
|
self.completed = False
|
|
self.evaluated = False # for evaluate() idempotency
|
|
self.last_latency_ms = None # average latency from successful tests
|
|
self.exit_ip = None # IP seen by target server (for anonymity detection)
|
|
self.reveals_headers = None # True if proxy adds X-Forwarded-For/Via headers
|
|
self.last_fail_category = None # failure category from last test (for dead detection)
|
|
self.original_failcount = failcount # failcount before this test cycle
|
|
# SSL/TLS tracking
|
|
self.had_ssl_test = False
|
|
self.ssl_success = False
|
|
self.cert_error = False
|
|
# Protocol fingerprinting
|
|
self.source_proto = source_proto
|
|
self.protos_working = None
|
|
# Test provenance
|
|
self.last_check = None
|
|
self.last_target = None
|
|
|
|
def record_result(self, success, proto=None, duration=0, srv=None, tor=None, ssl=None, category=None, exit_ip=None, reveals_headers=None):
|
|
"""Record a single target test result. Thread-safe.
|
|
|
|
When all target tests complete, signals via completion_queue.
|
|
"""
|
|
_dbg('record: success=%s proto=%s srv=%s cat=%s' % (success, proto, srv, category), self.proxy)
|
|
should_signal = False
|
|
with self.lock:
|
|
self.results.append({
|
|
'success': success,
|
|
'proto': proto,
|
|
'duration': duration,
|
|
'srv': srv,
|
|
'tor': tor,
|
|
'ssl': ssl,
|
|
'category': category,
|
|
'exit_ip': exit_ip,
|
|
'reveals_headers': reveals_headers
|
|
})
|
|
# Track SSL tests
|
|
if ssl:
|
|
self.had_ssl_test = True
|
|
if success:
|
|
self.ssl_success = True
|
|
# Track cert errors
|
|
if category in ('cert_error', 'ssl_error', 'ssl_mitm'):
|
|
self.cert_error = True
|
|
# Check completion (single-target mode)
|
|
if not self.completed and len(self.results) >= 1:
|
|
self.completed = True
|
|
should_signal = True
|
|
# Signal outside lock to avoid deadlock
|
|
if should_signal and self.completion_queue is not None:
|
|
self.completion_queue.put(self)
|
|
|
|
def is_complete(self):
|
|
"""Check if all target tests have finished."""
|
|
# Fast path: check completed flag without lock (atomic read)
|
|
if self.completed:
|
|
return True
|
|
# Slow path: check with lock (only during transition)
|
|
with self.lock:
|
|
return len(self.results) >= 1
|
|
|
|
@staticmethod
|
|
def rwip(ip):
|
|
"""Remove leading zeros from IP octets (e.g., 192.168.001.010 -> 192.168.1.10)."""
|
|
return '.'.join(str(int(b)) for b in ip.split('.'))
|
|
|
|
def evaluate(self):
|
|
"""Evaluate results after all tests complete.
|
|
|
|
Returns:
|
|
(success, category) tuple where success is bool and category is
|
|
the dominant failure type (or None on success)
|
|
"""
|
|
with self.lock:
|
|
if self.evaluated:
|
|
# Already evaluated - return cached result
|
|
return (self.failcount == 0, self.last_fail_category)
|
|
self.evaluated = True
|
|
self.checktime = int(time.time())
|
|
|
|
# Filter out target_block results (inconclusive, neither pass nor fail)
|
|
block_cats = ('judge_block', 'target_block')
|
|
real_results = [r for r in self.results if r.get('category') not in block_cats]
|
|
successes = [r for r in real_results if r['success']]
|
|
failures = [r for r in real_results if not r['success']]
|
|
num_success = len(successes)
|
|
target_blocks = len(self.results) - len(real_results)
|
|
_dbg('evaluate: %d success, %d fail, %d target_block, results=%d' % (
|
|
num_success, len(failures), target_blocks, len(self.results)), self.proxy)
|
|
|
|
# All results were target blocks: inconclusive, preserve current state
|
|
if not real_results and self.results:
|
|
_dbg('all results inconclusive (target_block), no state change', self.proxy)
|
|
self.failcount = self.original_failcount
|
|
return (self.original_failcount == 0, None)
|
|
|
|
# Determine dominant failure category
|
|
fail_category = None
|
|
if failures:
|
|
cats = {}
|
|
for f in failures:
|
|
cat = f.get('category') or 'other'
|
|
cats[cat] = cats.get(cat, 0) + 1
|
|
if cats:
|
|
fail_category = max(cats.keys(), key=lambda k: cats[k])
|
|
|
|
# require success (single target mode)
|
|
if num_success >= 1:
|
|
# use last successful result for metrics
|
|
last_good = successes[-1]
|
|
|
|
# Extract exit IP and header info from successful judge responses
|
|
for s in successes:
|
|
if s.get('exit_ip'):
|
|
self.exit_ip = s['exit_ip']
|
|
if s.get('reveals_headers') is not None:
|
|
self.reveals_headers = s['reveals_headers']
|
|
|
|
if geolite and self.country is None:
|
|
self.ip = self.rwip(self.ip)
|
|
rec = geodb.get_all(self.ip)
|
|
if rec is not None and rec.country_short:
|
|
self.country = rec.country_short
|
|
|
|
if asn_enabled and self.asn is None:
|
|
try:
|
|
asn_result = asndb.lookup(self.ip)
|
|
if asn_result and asn_result[0]:
|
|
self.asn = asn_result[0]
|
|
except Exception as e:
|
|
if config.watchd.debug:
|
|
_log('ASN lookup failed for %s: %s' % (self.ip, e), 'debug')
|
|
|
|
# Collect all distinct working protocols
|
|
working_protos = set()
|
|
for s in successes:
|
|
if s.get('proto'):
|
|
working_protos.add(s['proto'])
|
|
if working_protos:
|
|
self.protos_working = ','.join(sorted(working_protos))
|
|
# Pick most specific protocol: socks5 > socks4 > http
|
|
for best in ('socks5', 'socks4', 'http'):
|
|
if best in working_protos:
|
|
self.proto = best
|
|
break
|
|
else:
|
|
self.proto = last_good['proto']
|
|
else:
|
|
self.proto = last_good['proto']
|
|
self.failcount = 0
|
|
# Only reset mitm after 3 consecutive clean successes (not on first success)
|
|
# and only if this test didn't detect MITM
|
|
if self.consecutive_success > 0 and (self.consecutive_success % 3) == 0 and not self.cert_error:
|
|
self.mitm = 0
|
|
self.consecutive_success = (self.consecutive_success or 0) + 1
|
|
self.success_count = (self.success_count or 0) + 1
|
|
self.total_duration = (self.total_duration or 0) + int(last_good['duration'] * 1000)
|
|
|
|
# Calculate average latency from successful tests (in ms)
|
|
durations = [s['duration'] for s in successes if s['duration']]
|
|
if durations:
|
|
self.last_latency_ms = sum(durations) * 1000.0 / len(durations)
|
|
|
|
torstats = "" if len(config.torhosts) == 1 else ' tor: %s;' % last_good['tor']
|
|
# Determine anonymity for status message
|
|
# transparent: exit_ip == proxy_ip
|
|
# anonymous: exit_ip != proxy_ip, adds revealing headers
|
|
# elite: exit_ip != proxy_ip, no revealing headers
|
|
anon_status = ''
|
|
if self.exit_ip:
|
|
if self.exit_ip == self.ip:
|
|
anon_status = ' anon: transparent;'
|
|
elif self.reveals_headers is False:
|
|
anon_status = ' anon: elite;'
|
|
elif self.reveals_headers is True:
|
|
anon_status = ' anon: anonymous;'
|
|
else:
|
|
anon_status = ' anon: anonymous;' # default if no header check
|
|
_log('%s://%s:%d (%s) d: %.2f sec(s);%s%s srv: %s; ssl: %s' % (
|
|
last_good['proto'], self.ip, self.port, self.country,
|
|
last_good['duration'], torstats, anon_status, last_good['srv'], str(last_good['ssl'])), 'info')
|
|
_dbg('PASS: failcount=0', self.proxy)
|
|
return (True, None)
|
|
|
|
else:
|
|
# Real proxy failure
|
|
self.failcount += 1
|
|
self.consecutive_success = 0
|
|
self.last_fail_category = fail_category
|
|
_dbg('FAIL: failcount=%d cat=%s' % (self.failcount, fail_category), self.proxy)
|
|
return (False, fail_category)
|
|
|
|
|
|
class TargetTestJob(object):
|
|
"""Job to test a single proxy against a single target.
|
|
|
|
Multiple TargetTestJob instances share the same ProxyTestState,
|
|
allowing tests to be interleaved with other proxies in the queue.
|
|
"""
|
|
__slots__ = ('proxy_state', 'target_srv', 'checktype', 'worker_id')
|
|
|
|
def __init__(self, proxy_state, target_srv, checktype, worker_id=None):
|
|
self.proxy_state = proxy_state
|
|
self.target_srv = target_srv
|
|
self.checktype = checktype
|
|
self.worker_id = worker_id
|
|
|
|
def run(self):
|
|
"""Test the proxy against this job's target server."""
|
|
# DIAGNOSTIC: Verify run() is being called
|
|
global _sample_debug_counter
|
|
with _sample_debug_lock:
|
|
_sample_debug_counter += 1
|
|
if _sample_debug_counter <= 3 or _sample_debug_counter % 50 == 0:
|
|
_log('JOB RUN #%d: %s -> %s (%s)' % (_sample_debug_counter,
|
|
self.proxy_state.proxy, self.target_srv, self.checktype), 'info')
|
|
network_stats.set_category('proxy')
|
|
# Track test provenance (overwritten on each attempt, last success wins)
|
|
self.proxy_state.last_check = self.checktype
|
|
self.proxy_state.last_target = self.target_srv
|
|
_dbg('test start: %s via %s' % (self.target_srv, self.checktype), self.proxy_state.proxy)
|
|
sock, proto, duration, tor, srv, failinc, is_ssl, err_cat = self._connect_and_test()
|
|
_dbg('connect result: sock=%s proto=%s err=%s' % (bool(sock), proto, err_cat), self.proxy_state.proxy)
|
|
|
|
if not sock:
|
|
# SSL-only check passed (handshake success, no request needed)
|
|
if err_cat == 'ssl_ok':
|
|
elapsed = time.time() - duration
|
|
self.proxy_state.record_result(
|
|
True, proto=proto, duration=elapsed,
|
|
srv=srv, tor=tor, ssl=is_ssl
|
|
)
|
|
elif err_cat == 'ssl_mitm':
|
|
# MITM detected - proxy works but intercepts TLS
|
|
elapsed = time.time() - duration
|
|
self.proxy_state.record_result(
|
|
True, proto=proto, duration=elapsed,
|
|
srv=srv, tor=tor, ssl=is_ssl
|
|
)
|
|
self.proxy_state.mitm = 1
|
|
else:
|
|
self.proxy_state.record_result(False, category=err_cat, ssl=is_ssl)
|
|
return
|
|
|
|
try:
|
|
recv = sock.recv(-1)
|
|
_sample_dbg('RECV: %d bytes from %s, first 80: %r' % (len(recv), srv, recv[:80]), self.proxy_state.proxy)
|
|
|
|
# Validate HTTP response for non-IRC checks
|
|
if self.checktype != 'irc' and not recv.startswith('HTTP/'):
|
|
_dbg('not an HTTP response, failing (first 40: %r)' % recv[:40], self.proxy_state.proxy)
|
|
self.proxy_state.record_result(False, category='bad_response')
|
|
return
|
|
|
|
# Select regex based on check type (or fallback target)
|
|
if 'check.torproject.org' in srv:
|
|
# Tor API fallback (judge using torproject.org)
|
|
regex = r'"IsTor"\s*:\s*true'
|
|
elif self.checktype == 'irc':
|
|
regex = '^(:|NOTICE|ERROR)'
|
|
elif self.checktype == 'judges':
|
|
regex = judges[srv]
|
|
elif self.checktype == 'ssl':
|
|
# Should not reach here - ssl returns before recv
|
|
self.proxy_state.record_result(True, proto=proto, srv=srv, ssl=is_ssl)
|
|
return
|
|
else: # http
|
|
regex = regexes[srv]
|
|
|
|
_dbg('recv %d bytes, regex=%s' % (len(recv), regex[:30]), self.proxy_state.proxy)
|
|
if re.search(regex, recv, re.IGNORECASE):
|
|
elapsed = time.time() - duration
|
|
_dbg('regex MATCH, elapsed=%.2fs' % elapsed, self.proxy_state.proxy)
|
|
# Extract exit IP from judge/tor response
|
|
exit_ip = None
|
|
reveals_headers = None
|
|
if self.checktype == 'judges' or 'check.torproject.org' in srv:
|
|
ip_match = re.search(IP_PATTERN, recv)
|
|
if ip_match and is_public_ip(ip_match.group(0)):
|
|
exit_ip = ip_match.group(0)
|
|
if self.checktype == 'judges' and 'check.torproject.org' not in srv:
|
|
# Check for header echo judge (elite detection)
|
|
if 'headers' in srv:
|
|
# If X-Forwarded-For/Via/etc present, proxy reveals chain
|
|
reveals_headers = bool(re.search(HEADER_REVEAL_PATTERN, recv, re.IGNORECASE))
|
|
# Record successful judge
|
|
judge_stats.record_success(srv)
|
|
elif self.checktype == 'head':
|
|
head_target_stats.record_success(srv)
|
|
elif self.checktype == 'irc':
|
|
irc_target_stats.record_success(srv)
|
|
self.proxy_state.record_result(
|
|
True, proto=proto, duration=elapsed,
|
|
srv=srv, tor=tor, ssl=is_ssl, exit_ip=exit_ip,
|
|
reveals_headers=reveals_headers
|
|
)
|
|
else:
|
|
_dbg('regex NO MATCH, recv[:100]=%r' % recv[:100], self.proxy_state.proxy)
|
|
# Check if HTTP target is blocking us (not a proxy failure)
|
|
if self.checktype in ('judges', 'head') and HTTP_BLOCK_RE.search(recv):
|
|
if self.checktype == 'judges':
|
|
judge_stats.record_block(srv)
|
|
else:
|
|
head_target_stats.record_block(srv)
|
|
_dbg('target BLOCK detected, skipping (neutral)', self.proxy_state.proxy)
|
|
self.proxy_state.record_result(
|
|
False, category='target_block', proto=proto,
|
|
srv=srv, tor=tor, ssl=is_ssl
|
|
)
|
|
if config.watchd.debug:
|
|
_log('%s %s challenged proxy %s (neutral, skipped)' % (
|
|
self.checktype, srv, self.proxy_state.proxy), 'debug')
|
|
else:
|
|
_dbg('FAIL: no match, no block', self.proxy_state.proxy)
|
|
if self.checktype == 'judges':
|
|
judge_stats.record_failure(srv)
|
|
elif self.checktype == 'head':
|
|
head_target_stats.record_failure(srv)
|
|
elif self.checktype == 'irc':
|
|
irc_target_stats.record_failure(srv)
|
|
self.proxy_state.record_result(False, category='other')
|
|
|
|
except KeyboardInterrupt as e:
|
|
sock.disconnect()
|
|
raise e
|
|
except rocksock.RocksockException as e:
|
|
self.proxy_state.record_result(False, category=categorize_error(e))
|
|
finally:
|
|
sock.disconnect()
|
|
|
|
def _build_proto_order(self):
|
|
"""Build smart protocol test order based on available intelligence.
|
|
|
|
Priority:
|
|
1. Previously successful proto (if set)
|
|
2. Source-detected proto (if different, confidence >= 60)
|
|
3. Remaining protos in default order: socks5, socks4, http
|
|
|
|
For failing proxies (failcount > 0 and proto known), only retest
|
|
with the known proto to save resources.
|
|
"""
|
|
ps = self.proxy_state
|
|
default_order = ['socks5', 'socks4', 'http']
|
|
|
|
# Known proto from previous test: only retest that
|
|
if ps.proto is not None:
|
|
# For failing proxies, skip multi-proto discovery
|
|
if ps.failcount > 0:
|
|
return [ps.proto]
|
|
# For working proxies, lead with known proto but try others
|
|
protos = [ps.proto]
|
|
# Add source hint if different
|
|
if ps.source_proto and ps.source_proto != ps.proto:
|
|
protos.append(ps.source_proto)
|
|
# Fill remaining
|
|
for p in default_order:
|
|
if p not in protos:
|
|
protos.append(p)
|
|
return protos
|
|
|
|
# Unknown proto: use source hint if available
|
|
protos = []
|
|
if ps.source_proto:
|
|
protos.append(ps.source_proto)
|
|
for p in default_order:
|
|
if p not in protos:
|
|
protos.append(p)
|
|
return protos
|
|
|
|
def _fingerprint_protocol(self, pool):
|
|
"""Identify proxy protocol via lightweight handshake probes.
|
|
|
|
Sends protocol-specific greeting bytes directly to the proxy
|
|
and identifies the protocol from the response pattern.
|
|
|
|
Returns: 'socks5', 'socks4', 'http', or None
|
|
"""
|
|
ps = self.proxy_state
|
|
fp_timeout = min(config.watchd.timeout, 5)
|
|
torhost = pool.get_tor_host(self.worker_id) if pool else random.choice(config.torhosts)
|
|
|
|
for probe_fn, name in (
|
|
(self._probe_socks5, 'socks5'),
|
|
(self._probe_socks4, 'socks4'),
|
|
(self._probe_http, 'http'),
|
|
):
|
|
result = probe_fn(ps, torhost, fp_timeout)
|
|
if result:
|
|
_sample_dbg('fingerprint: %s detected' % result, ps.proxy)
|
|
return result
|
|
return None
|
|
|
|
def _probe_socks5(self, ps, torhost, timeout):
|
|
"""Probe for SOCKS5 protocol. Returns 'socks5' or None."""
|
|
try:
|
|
sock = rocksock.Rocksock(
|
|
host=ps.ip, port=int(ps.port),
|
|
proxies=[rocksock.RocksockProxyFromURL(tor_proxy_url(torhost))],
|
|
timeout=timeout
|
|
)
|
|
sock.connect()
|
|
sock.send('\x05\x01\x00')
|
|
res = sock.recv(2)
|
|
sock.disconnect()
|
|
if len(res) >= 1 and res[0] == '\x05':
|
|
return 'socks5'
|
|
except rocksock.RocksockException:
|
|
pass
|
|
except KeyboardInterrupt:
|
|
raise
|
|
return None
|
|
|
|
def _probe_socks4(self, ps, torhost, timeout):
|
|
"""Probe for SOCKS4 protocol. Returns 'socks4' or None."""
|
|
try:
|
|
sock = rocksock.Rocksock(
|
|
host=ps.ip, port=int(ps.port),
|
|
proxies=[rocksock.RocksockProxyFromURL(tor_proxy_url(torhost))],
|
|
timeout=timeout
|
|
)
|
|
sock.connect()
|
|
# CONNECT 1.1.1.1:80
|
|
sock.send('\x04\x01\x00\x50\x01\x01\x01\x01\x00')
|
|
res = sock.recv(2)
|
|
sock.disconnect()
|
|
if len(res) >= 2 and ord(res[0]) == 0 and ord(res[1]) in (0x5a, 0x5b, 0x5c, 0x5d):
|
|
return 'socks4'
|
|
except rocksock.RocksockException:
|
|
pass
|
|
except KeyboardInterrupt:
|
|
raise
|
|
return None
|
|
|
|
def _probe_http(self, ps, torhost, timeout):
|
|
"""Probe for HTTP CONNECT protocol. Returns 'http' or None."""
|
|
try:
|
|
sock = rocksock.Rocksock(
|
|
host=ps.ip, port=int(ps.port),
|
|
proxies=[rocksock.RocksockProxyFromURL(tor_proxy_url(torhost))],
|
|
timeout=timeout
|
|
)
|
|
sock.connect()
|
|
sock.send('CONNECT 1.1.1.1:80 HTTP/1.1\r\nHost: 1.1.1.1:80\r\n\r\n')
|
|
res = sock.recv(13)
|
|
sock.disconnect()
|
|
if res.startswith('HTTP/'):
|
|
return 'http'
|
|
except rocksock.RocksockException:
|
|
pass
|
|
except KeyboardInterrupt:
|
|
raise
|
|
return None
|
|
|
|
def _connect_and_test(self):
|
|
"""Connect to target through the proxy and send test packet.
|
|
|
|
If ssl_first is enabled:
|
|
1. Try SSL handshake first
|
|
2. If SSL succeeds -> return success
|
|
3. If SSL fails -> try secondary check (configured checktype)
|
|
"""
|
|
ps = self.proxy_state
|
|
_dbg('_connect_and_test: target=%s checktype=%s ssl_first=%s' % (
|
|
self.target_srv, self.checktype, config.watchd.ssl_first), ps.proxy)
|
|
# Always log first test to verify code path
|
|
global _sample_debug_counter
|
|
if _sample_debug_counter == 0:
|
|
_log('FIRST TEST: proxy=%s target=%s check=%s ssl_first=%s' % (
|
|
ps.proxy, self.target_srv, self.checktype, config.watchd.ssl_first), 'info')
|
|
|
|
protos = self._build_proto_order()
|
|
pool = connection_pool.get_pool()
|
|
|
|
# Fingerprint unknown proxies to avoid brute-force protocol guessing
|
|
if ps.proto is None and config.watchd.fingerprint:
|
|
detected = self._fingerprint_protocol(pool)
|
|
if detected:
|
|
protos = [detected] + [p for p in protos if p != detected]
|
|
|
|
# Phase 1: SSL handshake (if ssl_first enabled or SSL-only mode)
|
|
ssl_reason = None
|
|
if config.watchd.ssl_first or self.checktype == 'none':
|
|
result, ssl_reason = self._try_ssl_handshake(protos, pool)
|
|
if result is not None:
|
|
return result # SSL succeeded or MITM detected
|
|
# SSL failed for all protocols
|
|
if config.watchd.ssl_only or self.checktype == 'none':
|
|
_dbg('SSL failed, no secondary check', ps.proxy)
|
|
return (None, None, 0, None, None, 1, 0, 'ssl_only')
|
|
_dbg('SSL failed, trying secondary check: %s' % self.checktype, ps.proxy)
|
|
|
|
# Phase 2: Secondary check (configured checktype)
|
|
return self._try_secondary_check(protos, pool, ssl_reason)
|
|
|
|
def _try_ssl_handshake(self, protos, pool):
|
|
"""Attempt SSL handshake to verify proxy works with TLS.
|
|
|
|
Returns:
|
|
(result, ssl_reason) where result is a tuple on success/MITM
|
|
or None on failure, and ssl_reason is the last SSL error reason
|
|
string (for secondary check SSL/plain decision).
|
|
"""
|
|
ps = self.proxy_state
|
|
available_ssl = ssl_target_stats.get_available(ssl_targets) or ssl_targets
|
|
ssl_target = random.choice(available_ssl)
|
|
last_error_category = None
|
|
last_ssl_reason = None
|
|
|
|
for proto in protos:
|
|
if pool:
|
|
torhost = pool.get_tor_host(self.worker_id)
|
|
else:
|
|
torhost = random.choice(config.torhosts)
|
|
network_stats.set_tor_node(torhost)
|
|
|
|
if proto == 'socks4':
|
|
srv = socks4_resolve(ssl_target, 443)
|
|
else:
|
|
srv = ssl_target
|
|
if not srv:
|
|
continue
|
|
|
|
duration = time.time()
|
|
if ps.auth:
|
|
proxy_url = '%s://%s@%s:%s' % (proto, ps.auth, ps.ip, ps.port)
|
|
else:
|
|
proxy_url = '%s://%s:%s' % (proto, ps.ip, ps.port)
|
|
proxies = [
|
|
rocksock.RocksockProxyFromURL(tor_proxy_url(torhost)),
|
|
rocksock.RocksockProxyFromURL(proxy_url),
|
|
]
|
|
|
|
adaptive_timeout = config.watchd.timeout + min(
|
|
ps.failcount * config.watchd.timeout_fail_inc,
|
|
config.watchd.timeout_fail_max)
|
|
|
|
try:
|
|
sock = rocksock.Rocksock(host=srv, port=443, ssl=1,
|
|
proxies=proxies, timeout=adaptive_timeout,
|
|
verifycert=True)
|
|
_dbg('SSL handshake: proto=%s tor=%s target=%s' % (proto, torhost, ssl_target), ps.proxy)
|
|
sock.connect()
|
|
|
|
# SSL handshake succeeded
|
|
elapsed = time.time() - duration
|
|
if pool:
|
|
pool.record_success(torhost, elapsed)
|
|
ssl_target_stats.record_success(ssl_target)
|
|
sock.disconnect()
|
|
_dbg('SSL handshake OK', ps.proxy)
|
|
return (None, proto, duration, torhost, ssl_target, 0, 1, 'ssl_ok'), None
|
|
|
|
except rocksock.RocksockException as e:
|
|
last_error_category = categorize_error(e)
|
|
et = e.get_errortype()
|
|
err = e.get_error()
|
|
|
|
# Track SSL reason for secondary check decision
|
|
if et == rocksock.RS_ET_SSL:
|
|
reason = e.get_failedproxy()
|
|
if isinstance(reason, str):
|
|
last_ssl_reason = reason
|
|
|
|
try:
|
|
sock.disconnect()
|
|
except:
|
|
pass
|
|
|
|
if et == rocksock.RS_ET_SSL and err == rocksock.RS_E_SSL_CERTIFICATE_ERROR:
|
|
# MITM detected - proxy works but intercepts TLS
|
|
ps.mitm = 1
|
|
elapsed = time.time() - duration
|
|
if pool:
|
|
pool.record_success(torhost, elapsed)
|
|
_dbg('SSL MITM detected', ps.proxy)
|
|
return (None, proto, duration, torhost, ssl_target, 0, 1, 'ssl_mitm'), None
|
|
|
|
if config.watchd.debug:
|
|
_log('SSL handshake failed: %s://%s:%d: %s' % (
|
|
proto, ps.ip, ps.port, e.get_errormessage()), 'debug')
|
|
|
|
# Check for Tor connection issues
|
|
if et == rocksock.RS_ET_OWN:
|
|
fp = e.get_failedproxy()
|
|
if fp == 0 and err == rocksock.RS_E_TARGET_CONN_REFUSED:
|
|
if pool:
|
|
pool.record_failure(torhost)
|
|
elif fp == 1 and (err == rocksock.RS_E_REMOTE_DISCONNECTED or
|
|
err == rocksock.RS_E_HIT_TIMEOUT):
|
|
# Target-side failure
|
|
ssl_target_stats.record_failure(ssl_target)
|
|
elif et == rocksock.RS_ET_GAI:
|
|
# DNS failure -- target unresolvable
|
|
ssl_target_stats.record_block(ssl_target)
|
|
|
|
except KeyboardInterrupt:
|
|
raise
|
|
|
|
# All protocols failed SSL
|
|
return None, last_ssl_reason
|
|
|
|
def _try_secondary_check(self, protos, pool, ssl_reason=None):
|
|
"""Try the configured secondary checktype (head, judges, irc).
|
|
|
|
ssl_reason: last SSL error reason from _try_ssl_handshake, used to
|
|
decide whether to use SSL or plain HTTP for the secondary check.
|
|
Protocol errors (proxy doesn't speak TLS) -> plain HTTP.
|
|
Other errors (cert, timeout, etc.) -> SSL without cert verification.
|
|
"""
|
|
ps = self.proxy_state
|
|
_sample_dbg('TEST START: proxy=%s target=%s check=%s' % (
|
|
ps.proxy, self.target_srv, self.checktype), ps.proxy)
|
|
srvname = self.target_srv.strip() if self.checktype == 'irc' else self.target_srv
|
|
|
|
# For judges, extract host from 'host/path' format
|
|
if self.checktype == 'judges' and '/' in srvname:
|
|
connect_host = srvname.split('/')[0]
|
|
else:
|
|
connect_host = srvname
|
|
|
|
# Decide SSL based on why the primary handshake failed:
|
|
# - protocol error (proxy can't TLS) -> plain HTTP
|
|
# - other error (cert, timeout) -> SSL without cert verification
|
|
# - no ssl_reason (ssl_first off) -> plain HTTP (no prior info)
|
|
protocol_error = is_ssl_protocol_error(ssl_reason) if ssl_reason else True
|
|
verifycert = False
|
|
if protocol_error:
|
|
use_ssl = 0
|
|
if self.checktype == 'irc':
|
|
server_port = 6667
|
|
else:
|
|
server_port = 80
|
|
_dbg('secondary: plain (ssl protocol error)', ps.proxy)
|
|
else:
|
|
use_ssl = 1
|
|
if self.checktype == 'irc':
|
|
server_port = 6697
|
|
else:
|
|
server_port = 443
|
|
_dbg('secondary: ssl/no-verify (non-protocol ssl error)', ps.proxy)
|
|
|
|
last_error_category = None
|
|
|
|
for proto in protos:
|
|
if pool:
|
|
torhost = pool.get_tor_host(self.worker_id)
|
|
else:
|
|
torhost = random.choice(config.torhosts)
|
|
network_stats.set_tor_node(torhost)
|
|
if proto == 'socks4':
|
|
srv = socks4_resolve(connect_host, server_port)
|
|
else:
|
|
srv = connect_host
|
|
if not srv:
|
|
continue
|
|
|
|
duration = time.time()
|
|
# Build proxy URL, including auth credentials if present
|
|
if ps.auth:
|
|
proxy_url = '%s://%s@%s:%s' % (proto, ps.auth, ps.ip, ps.port)
|
|
else:
|
|
proxy_url = '%s://%s:%s' % (proto, ps.ip, ps.port)
|
|
proxies = [
|
|
rocksock.RocksockProxyFromURL(tor_proxy_url(torhost)),
|
|
rocksock.RocksockProxyFromURL(proxy_url),
|
|
]
|
|
|
|
# Adaptive timeout: give proxies with failures more time
|
|
adaptive_timeout = config.watchd.timeout + min(
|
|
ps.failcount * config.watchd.timeout_fail_inc,
|
|
config.watchd.timeout_fail_max)
|
|
|
|
try:
|
|
sock = rocksock.Rocksock(host=srv, port=server_port, ssl=use_ssl,
|
|
proxies=proxies, timeout=adaptive_timeout,
|
|
verifycert=verifycert)
|
|
_dbg('connecting: proto=%s tor=%s ssl=%d' % (proto, torhost, use_ssl), ps.proxy)
|
|
_sample_dbg('CONNECT: tor=%s -> proxy=%s:%s (%s) -> %s:%d ssl=%d timeout=%.1f' % (
|
|
torhost, ps.ip, ps.port, proto, srv, server_port, use_ssl, adaptive_timeout), ps.proxy)
|
|
sock.connect()
|
|
_dbg('connected OK', ps.proxy)
|
|
_sample_dbg('CONNECTED OK: %s via %s' % (ps.proxy, proto), ps.proxy)
|
|
|
|
if self.checktype == 'irc':
|
|
sock.send('NICK\n')
|
|
elif self.checktype == 'judges':
|
|
# GET request to receive body (IP)
|
|
sock.send('GET /%s HTTP/1.0\r\nHost: %s\r\n\r\n' % (
|
|
srvname.split('/', 1)[1] if '/' in srvname else '',
|
|
srvname.split('/')[0]
|
|
))
|
|
else: # head - HEAD is sufficient for header checks
|
|
sock.send('HEAD / HTTP/1.0\r\nHost: %s\r\n\r\n' % srvname)
|
|
# Record success in pool
|
|
if pool:
|
|
pool.record_success(torhost, time.time() - duration)
|
|
return sock, proto, duration, torhost, srvname, 0, use_ssl, None
|
|
|
|
except rocksock.RocksockException as e:
|
|
last_error_category = categorize_error(e)
|
|
if config.watchd.debug:
|
|
_log("proxy failed: %s://%s:%d: %s [%s]" % (proto, ps.ip, ps.port,
|
|
e.get_errormessage(), last_error_category), 'debug')
|
|
|
|
et = e.get_errortype()
|
|
err = e.get_error()
|
|
fp = e.get_failedproxy()
|
|
_sample_dbg('ERROR: %s via %s -> %s (et=%d err=%d fp=%d cat=%s)' % (
|
|
ps.proxy, proto, e.get_errormessage(), et, err, fp, last_error_category), ps.proxy)
|
|
|
|
sock.disconnect()
|
|
|
|
if et == rocksock.RS_ET_OWN:
|
|
if fp == 1 and (err == rocksock.RS_E_REMOTE_DISCONNECTED or
|
|
err == rocksock.RS_E_HIT_TIMEOUT):
|
|
# Target-side failure -- proxy reached target but it's down
|
|
if self.checktype == 'head':
|
|
head_target_stats.record_failure(srvname)
|
|
elif self.checktype == 'irc':
|
|
irc_target_stats.record_failure(srvname)
|
|
break
|
|
elif fp == 0 and err == rocksock.RS_E_TARGET_CONN_REFUSED:
|
|
# Tor connection failed - record in pool
|
|
if pool:
|
|
pool.record_failure(torhost)
|
|
if random.randint(0, (config.watchd.threads - 1) / 2) == 0:
|
|
_log("could not connect to tor, sleep 5s", "ERROR")
|
|
time.sleep(5)
|
|
elif et == rocksock.RS_ET_GAI:
|
|
# DNS failure -- target hostname unresolvable (hard failure)
|
|
if self.checktype == 'head':
|
|
head_target_stats.record_block(connect_host)
|
|
elif self.checktype == 'irc':
|
|
irc_target_stats.record_block(srvname)
|
|
_log("could not resolve connection target %s" % connect_host, "ERROR")
|
|
break
|
|
elif et == rocksock.RS_ET_SSL and err == rocksock.RS_E_SSL_CERTIFICATE_ERROR:
|
|
# MITM detected - proxy works but intercepts TLS
|
|
ps.mitm = 1
|
|
elapsed = time.time() - duration
|
|
if pool:
|
|
pool.record_success(torhost, elapsed)
|
|
# Extract MITM certificate info (async to not block)
|
|
try:
|
|
cert_info = get_mitm_certificate(
|
|
ps.ip, ps.port, proto, torhost,
|
|
connect_host, server_port, config.watchd.timeout,
|
|
auth=ps.auth
|
|
)
|
|
if cert_info:
|
|
mitm_cert_stats.add_cert(ps.ip, cert_info)
|
|
if config.watchd.debug:
|
|
_log('MITM cert: %s (CN=%s, O=%s)' % (
|
|
cert_info.get('fingerprint', ''),
|
|
cert_info.get('subject_cn', ''),
|
|
cert_info.get('subject_o', '')), 'debug')
|
|
except Exception as e:
|
|
if config.watchd.debug:
|
|
_log('failed to extract MITM cert: %s' % str(e), 'debug')
|
|
return None, proto, duration, torhost, srvname, 0, use_ssl, 'ssl_mitm'
|
|
except KeyboardInterrupt as e:
|
|
raise e
|
|
|
|
_sample_dbg('ALL PROTOS FAILED: %s last_cat=%s' % (ps.proxy, last_error_category), ps.proxy)
|
|
return None, None, None, None, None, 1, use_ssl, last_error_category
|
|
|
|
|
|
class WorkerThread():
|
|
def __init__(self, id, job_queue):
|
|
self.id = id
|
|
self.done = threading.Event()
|
|
self.thread = None
|
|
self.job_queue = job_queue # shared input queue
|
|
def stop(self):
|
|
self.done.set()
|
|
def term(self):
|
|
if self.thread: self.thread.join()
|
|
def start_thread(self):
|
|
self.thread = threading.Thread(target=self.workloop)
|
|
self.thread.start()
|
|
def workloop(self):
|
|
job_count = 0
|
|
duration_total = 0
|
|
while not self.done.is_set():
|
|
try:
|
|
job = self.job_queue.get(timeout=0.5)
|
|
except Queue.Empty:
|
|
continue
|
|
nao = time.time()
|
|
# Assign worker ID for connection pool affinity
|
|
job.worker_id = self.id
|
|
try:
|
|
job.run()
|
|
except Exception as e:
|
|
# Ensure state completes on unexpected exceptions (prevents memory leak)
|
|
_log('job exception: %s' % e, 'error')
|
|
try:
|
|
job.proxy_state.record_result(False, category='exception')
|
|
except Exception:
|
|
pass # State may already be completed
|
|
spent = time.time() - nao
|
|
job_count += 1
|
|
duration_total += spent
|
|
self.job_queue.task_done()
|
|
if self.thread and job_count > 0:
|
|
avg_t = try_div(duration_total, job_count)
|
|
_log("terminated, %d jobs, avg.time %.2f" % (job_count, avg_t), self.id)
|
|
|
|
|
|
class VerificationThread(threading.Thread):
|
|
"""Manager thread that verifies disputed/suspicious proxy results.
|
|
|
|
Pulls from verification_queue, tests proxies directly via Tor,
|
|
records authoritative results, and updates worker trust scores.
|
|
"""
|
|
|
|
def __init__(self, database, interval=30, batch_size=10):
|
|
threading.Thread.__init__(self)
|
|
self.database = database
|
|
self.interval = interval
|
|
self.batch_size = batch_size
|
|
self.daemon = True
|
|
self._stop = threading.Event()
|
|
self.verified_count = 0
|
|
self.disagreements_resolved = 0
|
|
|
|
def stop(self):
|
|
self._stop.set()
|
|
|
|
def run(self):
|
|
_log('verification thread started (interval=%ds, batch=%d)' % (
|
|
self.interval, self.batch_size), 'verify')
|
|
|
|
while not self._stop.is_set():
|
|
try:
|
|
self._verification_cycle()
|
|
except Exception as e:
|
|
_log('verification cycle error: %s' % e, 'error')
|
|
|
|
# Wait for next cycle
|
|
self._stop.wait(self.interval)
|
|
|
|
_log('verification thread stopped (verified=%d, resolved=%d)' % (
|
|
self.verified_count, self.disagreements_resolved), 'verify')
|
|
|
|
def _verification_cycle(self):
|
|
"""Process pending verifications."""
|
|
db = mysqlite.mysqlite(self.database, str)
|
|
try:
|
|
pending = dbs.get_pending_verifications(db, self.batch_size)
|
|
if not pending:
|
|
return
|
|
|
|
for item in pending:
|
|
if self._stop.is_set():
|
|
break
|
|
|
|
proxy = item['proxy']
|
|
trigger = item['trigger']
|
|
|
|
# Test proxy directly
|
|
result = self._test_proxy(proxy)
|
|
|
|
if result is None:
|
|
# Test failed/inconclusive, skip for now
|
|
continue
|
|
|
|
# Record verification result
|
|
self._record_verification(db, item, result)
|
|
self.verified_count += 1
|
|
|
|
if trigger == 'disagreement':
|
|
self.disagreements_resolved += 1
|
|
|
|
db.commit()
|
|
|
|
finally:
|
|
db.close()
|
|
|
|
def _test_proxy(self, proxy):
|
|
"""Test proxy directly using local Tor connection.
|
|
|
|
Returns:
|
|
True if proxy works, False if fails, None if inconclusive
|
|
"""
|
|
parts = proxy.split(':')
|
|
if len(parts) != 2:
|
|
return None
|
|
|
|
ip, port_str = parts
|
|
try:
|
|
port = int(port_str)
|
|
except ValueError:
|
|
return None
|
|
|
|
# Try SSL handshake as primary test
|
|
pool = connection_pool.get_pool()
|
|
if not pool:
|
|
return None
|
|
|
|
proto_map = {
|
|
'http': rocksock.RS_PT_HTTP,
|
|
'socks4': rocksock.RS_PT_SOCKS4,
|
|
'socks5': rocksock.RS_PT_SOCKS5
|
|
}
|
|
protos = ['http', 'socks5', 'socks4']
|
|
|
|
for proto in protos:
|
|
try:
|
|
sock = rocksock.Rocksock(
|
|
host='www.google.com',
|
|
port=443,
|
|
ssl=True,
|
|
timeout=config.common.timeout_connect,
|
|
proxies=[
|
|
(tor_proxy_url(pool.get_host(), proto='socks5'), rocksock.RS_PT_SOCKS5),
|
|
(proxy, proto_map.get(proto, rocksock.RS_PT_HTTP))
|
|
]
|
|
)
|
|
sock.connect()
|
|
sock.disconnect()
|
|
return True
|
|
except Exception:
|
|
continue
|
|
|
|
return False
|
|
|
|
def _record_verification(self, db, item, result):
|
|
"""Record verification result and update worker trust."""
|
|
proxy = item['proxy']
|
|
trigger = item['trigger']
|
|
worker_a = item.get('worker_a')
|
|
worker_b = item.get('worker_b')
|
|
result_a = item.get('result_a')
|
|
result_b = item.get('result_b')
|
|
|
|
# Update worker trust based on who was correct
|
|
if trigger == 'disagreement' and worker_a and worker_b:
|
|
# Check which worker(s) agreed with verification
|
|
result_int = 1 if result else 0
|
|
|
|
if result_a == result_int:
|
|
dbs.update_worker_trust(db, worker_a, True)
|
|
else:
|
|
dbs.update_worker_trust(db, worker_a, False)
|
|
|
|
if result_b == result_int:
|
|
dbs.update_worker_trust(db, worker_b, True)
|
|
else:
|
|
dbs.update_worker_trust(db, worker_b, False)
|
|
|
|
elif worker_a:
|
|
# Single worker trigger (resurrection/sudden_death)
|
|
result_int = 1 if result else 0
|
|
was_correct = (result_a == result_int)
|
|
dbs.update_worker_trust(db, worker_a, was_correct)
|
|
|
|
# Update proxy status with authoritative result
|
|
if result:
|
|
db.execute('''
|
|
UPDATE proxylist SET failed = 0, tested = ?
|
|
WHERE proxy = ?
|
|
''', (int(time.time()), proxy))
|
|
else:
|
|
db.execute('''
|
|
UPDATE proxylist SET failed = failed + 1, tested = ?
|
|
WHERE proxy = ?
|
|
''', (int(time.time()), proxy))
|
|
|
|
# Remove from verification queue
|
|
dbs.remove_from_verification_queue(db, proxy)
|
|
|
|
_log('verified %s: %s (trigger=%s)' % (
|
|
proxy, 'PASS' if result else 'FAIL', trigger), 'verify')
|
|
|
|
|
|
class Proxywatchd():
|
|
|
|
def stop(self):
|
|
_log('halting... (%d thread(s))' % len(self.threads), 'watchd')
|
|
self.stopping.set()
|
|
|
|
def _cleanup(self):
|
|
for wt in self.threads:
|
|
wt.stop()
|
|
for wt in self.threads:
|
|
wt.term()
|
|
self.collect_work()
|
|
self.submit_collected()
|
|
if self.httpd_server:
|
|
self.httpd_server.stop()
|
|
if self.verification_thread:
|
|
self.verification_thread.stop()
|
|
self.stopped.set()
|
|
|
|
def finish(self):
|
|
if not self.in_background: self._cleanup()
|
|
while not self.stopped.is_set(): time.sleep(0.1)
|
|
success_rate = try_div(self.totals['success'], self.totals['submitted']) * 100
|
|
_log("total results: %d/%d (%.2f%%)"%(self.totals['success'], self.totals['submitted'], success_rate), "watchd")
|
|
|
|
# Final save of session state before exit
|
|
try:
|
|
with self._db_context() as db:
|
|
dbs.save_session_state(db, self.stats)
|
|
dbs.save_stats_snapshot(db, self.stats)
|
|
_log('session state saved', 'watchd')
|
|
except Exception as e:
|
|
_log('failed to save final session state: %s' % str(e), 'warn')
|
|
|
|
# Save MITM certificate stats
|
|
try:
|
|
mitm_cert_stats.save_state(self.mitm_state_file)
|
|
_log('MITM cert state saved', 'watchd')
|
|
except Exception as e:
|
|
_log('failed to save MITM state: %s' % str(e), 'warn')
|
|
|
|
@contextlib.contextmanager
|
|
def _db_context(self):
|
|
"""Context manager for database connections."""
|
|
db = mysqlite.mysqlite(config.watchd.database, str)
|
|
try:
|
|
yield db
|
|
finally:
|
|
db.close()
|
|
|
|
def __init__(self):
|
|
config.load()
|
|
dns.set_config(config)
|
|
self.in_background = False
|
|
self.threads = []
|
|
self.stopping = threading.Event()
|
|
self.stopped = threading.Event()
|
|
|
|
# shared work-stealing queues
|
|
self.job_queue = PriorityJobQueue()
|
|
self.completion_queue = Queue.Queue() # completed ProxyTestState objects
|
|
|
|
# track pending proxy states
|
|
self.pending_states = {} # dict: proxy -> ProxyTestState
|
|
self.pending_lock = threading.Lock()
|
|
|
|
# create table if needed (use dbs.py for canonical schema)
|
|
with self._db_context() as db:
|
|
dbs.create_table_if_not_exists(db, 'proxylist')
|
|
|
|
self.submit_after = config.watchd.submit_after # number of collected jobs before writing db
|
|
self.collected = [] # completed ProxyTestState objects ready for DB
|
|
self.totals = {
|
|
'submitted':0,
|
|
'success':0,
|
|
}
|
|
self.stats = Stats()
|
|
self.last_cleanup = time.time()
|
|
self.httpd_server = None
|
|
self.verification_thread = None
|
|
|
|
# Dynamic thread scaling
|
|
min_t = config.watchd.min_threads if config.watchd.min_threads > 0 else max(5, config.watchd.threads // 4)
|
|
self.scaler = ThreadScaler(
|
|
min_threads=min_t,
|
|
max_threads=config.watchd.threads,
|
|
target_queue_per_thread=5,
|
|
scale_cooldown=config.watchd.scale_cooldown,
|
|
scale_threshold=config.watchd.scale_threshold
|
|
)
|
|
self.thread_id_counter = 0
|
|
self.last_jobs_log = 0
|
|
self.last_update_log = 0
|
|
self.log_interval = 60 # seconds between routine log messages
|
|
|
|
# Register SIGHUP handler for config reload
|
|
signal.signal(signal.SIGHUP, self._handle_sighup)
|
|
|
|
def _handle_sighup(self, signum, frame):
|
|
"""Handle SIGHUP signal for config reload."""
|
|
_log('received SIGHUP, reloading config...', 'watchd')
|
|
self.reload_config()
|
|
|
|
def reload_config(self):
|
|
"""Reload configuration without restart.
|
|
|
|
Hot-reloadable settings:
|
|
threads, timeout, checktime, perfail_checktime, submit_after,
|
|
stats_interval, debug, tor_safeguard, stale_days, max_fail,
|
|
outage_threshold
|
|
|
|
Requires restart:
|
|
database, source_file, checktype
|
|
"""
|
|
old_threads = config.watchd.threads
|
|
old_checktypes = list(config.watchd.checktypes)
|
|
|
|
try:
|
|
config.load()
|
|
errors = config.validate()
|
|
if errors:
|
|
for e in errors:
|
|
_log('config error: %s' % e, 'error')
|
|
_log('config reload failed, keeping old config', 'error')
|
|
return False
|
|
except Exception as e:
|
|
_log('config reload failed: %s' % str(e), 'error')
|
|
return False
|
|
|
|
# Update runtime values
|
|
self.submit_after = config.watchd.submit_after
|
|
|
|
# Update scaler limits
|
|
self.scaler.min_threads = max(5, config.watchd.threads // 4)
|
|
self.scaler.max_threads = config.watchd.threads
|
|
|
|
# Warn about values requiring restart
|
|
if config.watchd.checktypes != old_checktypes:
|
|
_log('checktype changed (%s -> %s), requires restart to take effect' % (
|
|
','.join(old_checktypes), ','.join(config.watchd.checktypes)), 'warn')
|
|
|
|
_log('config reloaded: threads=%d timeout=%d working=%ds fail_interval=%ds backoff=%s' % (
|
|
config.watchd.threads, config.watchd.timeout,
|
|
config.watchd.working_checktime, config.watchd.fail_retry_interval,
|
|
config.watchd.fail_retry_backoff), 'watchd')
|
|
return True
|
|
|
|
def _spawn_thread(self):
|
|
"""Spawn a new worker thread."""
|
|
self.thread_id_counter += 1
|
|
threadid = 'dyn%d' % self.thread_id_counter
|
|
wt = WorkerThread(threadid, self.job_queue)
|
|
wt.start_thread()
|
|
self.threads.append(wt)
|
|
return threadid
|
|
|
|
def _remove_thread(self):
|
|
"""Remove one worker thread (graceful shutdown)."""
|
|
if len(self.threads) <= self.scaler.min_threads:
|
|
return None
|
|
# Stop the last thread
|
|
wt = self.threads.pop()
|
|
wt.stop()
|
|
# Don't wait for it - let it finish its current job
|
|
return wt.id
|
|
|
|
def _adjust_threads(self, target):
|
|
"""Adjust thread count to target."""
|
|
current = len(self.threads)
|
|
if target > current:
|
|
added = []
|
|
for _ in range(target - current):
|
|
tid = self._spawn_thread()
|
|
added.append(tid)
|
|
if added:
|
|
_log('scaled up: +%d threads (%s)' % (len(added), ','.join(added)), 'scaler')
|
|
elif target < current:
|
|
removed = []
|
|
for _ in range(current - target):
|
|
tid = self._remove_thread()
|
|
if tid:
|
|
removed.append(tid)
|
|
if removed:
|
|
_log('scaled down: -%d threads' % len(removed), 'scaler')
|
|
|
|
def fetch_rows(self, db):
|
|
"""Fetch proxy rows due for testing from database."""
|
|
self.isoldies = False
|
|
|
|
# Build due condition using new schedule formula
|
|
due_sql, due_params = _build_due_sql()
|
|
q = '''SELECT ip,port,proto,failed,success_count,total_duration,country,mitm,
|
|
consecutive_success,asn,proxy,source_proto FROM proxylist WHERE %s ORDER BY RANDOM()''' % due_sql
|
|
_dbg('fetch_rows: working=%d fail_interval=%d backoff=%s max_fail=%d' % (
|
|
config.watchd.working_checktime, config.watchd.fail_retry_interval,
|
|
config.watchd.fail_retry_backoff, config.watchd.max_fail))
|
|
rows = db.execute(q, due_params).fetchall()
|
|
_dbg('fetch_rows: got %d rows, threads=%d' % (len(rows), config.watchd.threads))
|
|
|
|
# Check oldies? (dead proxies getting a second chance)
|
|
if len(rows) < config.watchd.threads:
|
|
_dbg('fetch_rows: rows < threads, clearing rows (oldies=%s)' % config.watchd.oldies)
|
|
rows = []
|
|
if config.watchd.oldies:
|
|
self.isoldies = True
|
|
# Disable tor safeguard for old proxies
|
|
if self.tor_safeguard:
|
|
self.tor_safeguard = False
|
|
# Oldies use simple checktime-based query (dead proxies)
|
|
now = time.time()
|
|
oldies_max = config.watchd.max_fail + round(config.watchd.max_fail / 2)
|
|
q_oldies = '''SELECT ip,port,proto,failed,success_count,total_duration,country,
|
|
mitm,consecutive_success,asn,proxy,source_proto FROM proxylist
|
|
WHERE failed >= ? AND failed < ? AND (tested + ?) < ?
|
|
ORDER BY RANDOM()'''
|
|
rows = db.execute(q_oldies, (config.watchd.max_fail, oldies_max,
|
|
config.watchd.oldies_checktime, now)).fetchall()
|
|
return rows
|
|
|
|
def prepare_jobs(self):
|
|
## enable tor safeguard by default
|
|
self.tor_safeguard = config.watchd.tor_safeguard
|
|
|
|
# Fetch rows within context manager scope
|
|
with self._db_context() as db:
|
|
rows = self.fetch_rows(db)
|
|
|
|
_dbg('prepare_jobs: %d rows, checktypes=%s' % (len(rows), config.watchd.checktypes))
|
|
checktypes = config.watchd.checktypes
|
|
|
|
# Build target pools for each checktype (filter out targets in cooldown)
|
|
target_pools = {}
|
|
for ct in checktypes:
|
|
if ct == 'none':
|
|
# SSL-only mode: use ssl_targets as placeholder
|
|
target_pools[ct] = ssl_targets
|
|
_dbg('target_pool[none]: SSL-only mode, %d ssl targets' % len(ssl_targets))
|
|
elif ct == 'irc':
|
|
all_servers = config.servers
|
|
available = irc_target_stats.get_available(all_servers)
|
|
target_pools[ct] = available if available else all_servers
|
|
_dbg('target_pool[irc]: %d/%d servers available' % (len(target_pools[ct]), len(all_servers)))
|
|
elif ct == 'judges':
|
|
all_judges = list(judges.keys())
|
|
available = judge_stats.get_available(all_judges)
|
|
target_pools[ct] = available if available else all_judges
|
|
elif ct == 'ssl':
|
|
available = ssl_target_stats.get_available(ssl_targets)
|
|
target_pools[ct] = available if available else ssl_targets
|
|
_dbg('target_pool[ssl]: %d/%d targets available' % (len(target_pools[ct]), len(ssl_targets)))
|
|
else: # head
|
|
all_targets = list(regexes.keys())
|
|
available = head_target_stats.get_available(all_targets)
|
|
target_pools[ct] = available if available else all_targets
|
|
_dbg('target_pool[%s]: %d/%d targets available' % (ct, len(target_pools[ct]), len(all_targets)))
|
|
|
|
# create all jobs first, then shuffle for interleaving
|
|
all_jobs = []
|
|
new_states = []
|
|
|
|
for row in rows:
|
|
# create shared state for this proxy
|
|
# row: ip, port, proto, failed, success_count, total_duration,
|
|
# country, mitm, consecutive_success, asn, proxy, source_proto
|
|
state = ProxyTestState(
|
|
row[0], row[1], row[2], row[3], row[4], row[5],
|
|
row[6], row[7], row[8], asn=row[9],
|
|
oldies=self.isoldies, completion_queue=self.completion_queue,
|
|
proxy_full=row[10], source_proto=row[11]
|
|
)
|
|
new_states.append(state)
|
|
|
|
# randomly select checktype for this proxy
|
|
checktype = random.choice(checktypes)
|
|
target_pool = target_pools[checktype]
|
|
|
|
# select single target (single-target mode)
|
|
target = random.choice(target_pool)
|
|
|
|
# create job for this proxy
|
|
job = TargetTestJob(state, target, checktype)
|
|
all_jobs.append(job)
|
|
|
|
# shuffle to interleave tests across different proxies
|
|
random.shuffle(all_jobs)
|
|
|
|
# track pending states (dict for O(1) lookup/removal)
|
|
with self.pending_lock:
|
|
for state in new_states:
|
|
self.pending_states[state.proxy] = state
|
|
|
|
# queue all jobs with priority
|
|
for job in all_jobs:
|
|
priority = calculate_priority(
|
|
job.proxy_state.failcount,
|
|
job.proxy_state.success_count,
|
|
config.watchd.max_fail
|
|
)
|
|
self.job_queue.put(job, priority)
|
|
|
|
proxy_count = len(new_states)
|
|
job_count = len(all_jobs)
|
|
_dbg('prepare_jobs: created %d jobs for %d proxies' % (job_count, proxy_count))
|
|
now = time.time()
|
|
if proxy_count > 0 and (now - self.last_jobs_log) >= self.log_interval:
|
|
_log("created %d jobs for %d proxies" % (job_count, proxy_count), 'watchd')
|
|
self.last_jobs_log = now
|
|
return job_count
|
|
|
|
def collect_work(self):
|
|
# process completed states from completion queue (event-driven, not polling)
|
|
# ProxyTestState.record_result() pushes to completion_queue when all targets done
|
|
completed_count = 0
|
|
while True:
|
|
try:
|
|
state = self.completion_queue.get_nowait()
|
|
completed_count += 1
|
|
# evaluate and record stats
|
|
success, category = state.evaluate()
|
|
self.stats.record(success, category, state.proto, state.last_latency_ms,
|
|
state.country, state.asn,
|
|
ssl_test=state.had_ssl_test, mitm=(state.mitm > 0),
|
|
cert_error=state.cert_error)
|
|
self.collected.append(state)
|
|
# remove from pending dict (O(1))
|
|
with self.pending_lock:
|
|
self.pending_states.pop(state.proxy, None)
|
|
except Queue.Empty:
|
|
break
|
|
|
|
def collect_unfinished(self):
|
|
# drain any remaining jobs from job queue
|
|
unfinished_count = 0
|
|
while True:
|
|
try:
|
|
self.job_queue.get_nowait()
|
|
unfinished_count += 1
|
|
except Queue.Empty:
|
|
break
|
|
if unfinished_count > 0:
|
|
_log("discarded %d unfinished jobs" % unfinished_count, "watchd")
|
|
# note: corresponding ProxyTestStates will be incomplete
|
|
# they'll be re-tested in the next cycle
|
|
|
|
def submit_collected(self):
|
|
if len(self.collected) == 0:
|
|
return True
|
|
sc = 0
|
|
dead_count = 0
|
|
args = []
|
|
latency_updates = [] # (proxy, latency_ms) for successful tests
|
|
max_fail = config.watchd.max_fail
|
|
_dbg('submit_collected: %d jobs' % len(self.collected))
|
|
for job in self.collected:
|
|
if job.failcount == 0:
|
|
sc += 1
|
|
_dbg('submit OK: failcount=0', job.proxy)
|
|
if job.last_latency_ms is not None:
|
|
latency_updates.append((job.proxy, job.last_latency_ms))
|
|
# Check if proxy should be marked as permanently dead
|
|
effective_failcount = job.failcount
|
|
if job.failcount > 0:
|
|
is_fatal = job.last_fail_category in FATAL_ERROR_CATEGORIES
|
|
# Fatal errors (refused/unreachable/auth) = immediately dead
|
|
if is_fatal:
|
|
effective_failcount = DEAD_PROXY
|
|
dead_count += 1
|
|
# Non-fatal: mark dead if exceeded max_fail*2
|
|
elif job.failcount >= max_fail * 2:
|
|
effective_failcount = DEAD_PROXY
|
|
dead_count += 1
|
|
args.append((effective_failcount, job.checktime, 1, job.country, job.proto,
|
|
job.success_count, job.total_duration, job.mitm,
|
|
job.consecutive_success, job.asn, job.protos_working,
|
|
job.last_check, job.last_target, job.proxy))
|
|
|
|
success_rate = (float(sc) / len(self.collected)) * 100
|
|
ret = True
|
|
if len(self.collected) >= 100 and success_rate <= config.watchd.outage_threshold and self.tor_safeguard:
|
|
_log("WATCHD %.2f%% SUCCESS RATE - tor circuit blocked? won't submit fails" % success_rate, "ERROR")
|
|
if sc == 0:
|
|
return False
|
|
args = []
|
|
latency_updates = []
|
|
for job in self.collected:
|
|
if job.failcount == 0:
|
|
args.append((job.failcount, job.checktime, 1, job.country, job.proto,
|
|
job.success_count, job.total_duration, job.mitm,
|
|
job.consecutive_success, job.asn, job.protos_working,
|
|
job.last_check, job.last_target, job.proxy))
|
|
if job.last_latency_ms is not None:
|
|
latency_updates.append((job.proxy, job.last_latency_ms))
|
|
ret = False
|
|
|
|
now = time.time()
|
|
if (now - self.last_update_log) >= self.log_interval or success_rate > 0:
|
|
dead_msg = ', %d marked dead' % dead_count if dead_count > 0 else ''
|
|
_log("updating %d DB entries (success rate: %.2f%%%s)" % (len(self.collected), success_rate, dead_msg), 'watchd')
|
|
self.last_update_log = now
|
|
|
|
# Build anonymity updates before DB context
|
|
anonymity_updates = [(job.proxy, job.exit_ip, job.ip, job.reveals_headers)
|
|
for job in self.collected
|
|
if job.failcount == 0 and job.exit_ip]
|
|
|
|
# Separate dead proxies for deletion
|
|
dead_proxies = [a[-1] for a in args if a[0] == DEAD_PROXY or a[0] >= max_fail]
|
|
live_args = [a for a in args if a[0] != DEAD_PROXY and a[0] < max_fail]
|
|
|
|
with self._db_context() as db:
|
|
query = 'UPDATE proxylist SET failed=?,tested=?,dronebl=?,country=?,proto=?,success_count=?,total_duration=?,mitm=?,consecutive_success=?,asn=?,protos_working=?,last_check=?,last_target=? WHERE proxy=?'
|
|
if live_args:
|
|
db.executemany(query, live_args)
|
|
|
|
# Delete proxies that reached max_fail
|
|
if dead_proxies:
|
|
db.executemany('DELETE FROM proxylist WHERE proxy=?',
|
|
[(p,) for p in dead_proxies])
|
|
_log('deleted %d dead proxies' % len(dead_proxies), 'watchd')
|
|
|
|
# Batch update latency metrics for successful proxies
|
|
if latency_updates:
|
|
dbs.batch_update_proxy_latency(db, latency_updates)
|
|
|
|
# Batch update anonymity for proxies with exit IP data
|
|
if anonymity_updates:
|
|
dbs.batch_update_proxy_anonymity(db, anonymity_updates)
|
|
|
|
db.commit()
|
|
|
|
self.collected = []
|
|
self.totals['submitted'] += len(args)
|
|
self.totals['success'] += sc
|
|
return ret
|
|
|
|
def cleanup_stale(self):
|
|
"""Remove proxies that have been dead for too long."""
|
|
stale_seconds = config.watchd.stale_days * 86400
|
|
cutoff = int(time.time()) - stale_seconds
|
|
with self._db_context() as db:
|
|
# delete proxies that: (failed >= max_fail OR permanently dead) AND last tested before cutoff
|
|
result = db.execute(
|
|
'DELETE FROM proxylist WHERE (failed >= ? OR failed = ?) AND tested < ?',
|
|
(config.watchd.max_fail, DEAD_PROXY, cutoff)
|
|
)
|
|
count = result.rowcount if hasattr(result, 'rowcount') else 0
|
|
db.commit()
|
|
if count > 0:
|
|
_log('removed %d stale proxies (not seen in %d days)' % (count, config.watchd.stale_days), 'watchd')
|
|
self.last_cleanup = time.time()
|
|
|
|
def start(self):
|
|
if config.watchd.threads == 1 and _run_standalone:
|
|
return self._run()
|
|
else:
|
|
return self._run_background()
|
|
|
|
def run(self):
|
|
if self.in_background:
|
|
while 1: time.sleep(1)
|
|
|
|
def _run_background(self):
|
|
self.in_background = True
|
|
t = threading.Thread(target=self._run)
|
|
t.start()
|
|
|
|
def get_runtime_stats(self):
|
|
"""Return runtime statistics for the dashboard.
|
|
|
|
Returns:
|
|
dict with tested, passed, success_rate, rate, uptime_seconds,
|
|
threads, failures, tor_pool, and engine stats.
|
|
"""
|
|
stats_data = {}
|
|
|
|
# Stats from Stats object
|
|
with self.stats.lock:
|
|
stats_data['tested'] = self.stats.tested
|
|
stats_data['passed'] = self.stats.passed
|
|
stats_data['failed'] = self.stats.failed
|
|
elapsed = time.time() - self.stats.start_time
|
|
stats_data['uptime_seconds'] = int(elapsed)
|
|
stats_data['rate'] = try_div(self.stats.tested, elapsed)
|
|
stats_data['success_rate'] = try_div(self.stats.passed * 100.0, self.stats.tested)
|
|
stats_data['failures'] = dict(self.stats.fail_categories)
|
|
# New: Protocol breakdown
|
|
stats_data['by_proto'] = dict(self.stats.by_proto)
|
|
# New: Rate history for sparklines
|
|
stats_data['rate_history'] = list(self.stats.rate_history)
|
|
stats_data['success_rate_history'] = list(self.stats.success_rate_history)
|
|
# New: Peak rate
|
|
stats_data['peak_rate'] = self.stats.peak_rate
|
|
|
|
# New: Recent rate (last 60s) and avg latency
|
|
stats_data['recent_rate'] = self.stats.get_recent_rate()
|
|
stats_data['recent_success_rate'] = self.stats.get_recent_success_rate()
|
|
stats_data['avg_latency'] = self.stats.get_avg_latency()
|
|
|
|
# Latency stats
|
|
stats_data['min_latency'] = self.stats.min_latency if self.stats.min_latency != float('inf') else 0
|
|
stats_data['max_latency'] = self.stats.max_latency
|
|
stats_data['latency_percentiles'] = self.stats.get_latency_percentiles()
|
|
stats_data['latency_histogram'] = self.stats.get_latency_histogram()
|
|
|
|
# Protocol performance (tested, passed, rate per protocol)
|
|
stats_data['proto_stats'] = self.stats.get_proto_stats()
|
|
|
|
# Geographic distribution from session
|
|
stats_data['top_countries_session'] = self.stats.get_top_countries(10)
|
|
stats_data['top_asns_session'] = self.stats.get_top_asns(10)
|
|
|
|
# SSL/TLS stats
|
|
with self.stats.lock:
|
|
ssl_tested = self.stats.ssl_tested
|
|
ssl_passed = self.stats.ssl_passed
|
|
mitm_stats = mitm_cert_stats.get_stats()
|
|
stats_data['ssl'] = {
|
|
'tested': ssl_tested,
|
|
'passed': ssl_passed,
|
|
'failed': self.stats.ssl_failed,
|
|
'fail_categories': dict(self.stats.ssl_fail_categories),
|
|
'success_rate': try_div(ssl_passed * 100.0, ssl_tested) if ssl_tested > 0 else 0,
|
|
'mitm_detected': self.stats.mitm_detected,
|
|
'cert_errors': self.stats.cert_errors,
|
|
'mitm_unique_certs': mitm_stats.get('unique_certs', 0),
|
|
'mitm_unique_proxies': mitm_stats.get('unique_proxies', 0)
|
|
}
|
|
|
|
# Thread info
|
|
stats_data['threads'] = len(self.threads)
|
|
stats_data['min_threads'] = self.scaler.min_threads
|
|
stats_data['max_threads'] = self.scaler.max_threads
|
|
stats_data['queue_size'] = self.job_queue.qsize()
|
|
stats_data['checktype'] = ','.join(config.watchd.checktypes)
|
|
stats_data['checktypes'] = config.watchd.checktypes
|
|
stats_data['profiling'] = (
|
|
getattr(config.args, 'profile', False) if hasattr(config, 'args') else False
|
|
) or getattr(config.common, 'profiling', False)
|
|
stats_data['pass_rate'] = try_div(self.stats.passed, elapsed)
|
|
|
|
# Tor pool stats
|
|
pool = connection_pool.get_pool()
|
|
if pool:
|
|
pool_stats = pool.get_stats()
|
|
hosts_list = []
|
|
healthy_count = 0
|
|
latency_sum = 0.0
|
|
latency_count = 0
|
|
for h in pool_stats.get('hosts', []):
|
|
is_healthy = h.get('available', False)
|
|
lat_ms = h.get('avg_latency', 0) * 1000 if h.get('avg_latency') else None
|
|
hosts_list.append({
|
|
'address': h.get('host', ''),
|
|
'healthy': is_healthy,
|
|
'latency_ms': lat_ms,
|
|
'success_rate': h.get('success_rate', 0),
|
|
})
|
|
if is_healthy:
|
|
healthy_count += 1
|
|
if lat_ms and lat_ms > 0:
|
|
latency_sum += lat_ms
|
|
latency_count += 1
|
|
pool_info = {
|
|
'hosts': hosts_list,
|
|
'total_requests': pool_stats.get('total_requests', 0),
|
|
'success_rate': pool_stats.get('success_rate', 0),
|
|
'healthy_count': healthy_count,
|
|
'total_count': len(hosts_list),
|
|
'avg_latency': latency_sum / latency_count if latency_count > 0 else 0,
|
|
}
|
|
stats_data['tor_pool'] = pool_info
|
|
else:
|
|
stats_data['tor_pool'] = {
|
|
'hosts': [], 'total_requests': 0, 'success_rate': 0,
|
|
'healthy_count': 0, 'total_count': 0, 'avg_latency': 0
|
|
}
|
|
|
|
# Judge stats (when using judges checktype)
|
|
if 'judges' in config.watchd.checktypes:
|
|
js = judge_stats.get_stats()
|
|
# Remap 'target' -> 'judge' for dashboard compatibility
|
|
top = [dict(j, judge=j['target']) for j in js.get('top', [])[:5]]
|
|
stats_data['judges'] = {
|
|
'total': js.get('total', 0),
|
|
'available': js.get('available', 0),
|
|
'in_cooldown': js.get('in_cooldown', 0),
|
|
'top_judges': top,
|
|
}
|
|
else:
|
|
stats_data['judges'] = None
|
|
|
|
# Target health stats (all target pools)
|
|
stats_data['target_health'] = {
|
|
'head': head_target_stats.get_stats(),
|
|
'ssl': ssl_target_stats.get_stats(),
|
|
'irc': irc_target_stats.get_stats(),
|
|
'judges': judge_stats.get_stats(),
|
|
}
|
|
|
|
# Scraper/engine stats
|
|
if scraper_available:
|
|
scraper_stats = scraper_module.get_scraper_stats()
|
|
if scraper_stats:
|
|
stats_data['engines_available'] = scraper_stats.get('available', 0)
|
|
stats_data['engines_total'] = scraper_stats.get('total', 0)
|
|
stats_data['engines_backoff'] = scraper_stats.get('in_backoff', 0)
|
|
stats_data['scraper'] = scraper_stats
|
|
else:
|
|
stats_data['engines_available'] = 0
|
|
stats_data['engines_total'] = 0
|
|
stats_data['engines_backoff'] = 0
|
|
stats_data['scraper'] = None
|
|
else:
|
|
stats_data['engines_available'] = 0
|
|
stats_data['engines_total'] = 0
|
|
stats_data['engines_backoff'] = 0
|
|
stats_data['scraper'] = None
|
|
|
|
# Network usage stats
|
|
stats_data['network'] = network_stats.get_stats()
|
|
|
|
# MITM certificate stats (detailed)
|
|
stats_data['mitm'] = mitm_cert_stats.get_stats()
|
|
|
|
return stats_data
|
|
|
|
def _run(self):
|
|
_log('starting...', 'watchd')
|
|
_log('config: db=%s checktype=%s threads=%d working=%ds fail_interval=%ds backoff=%s max_fail=%d' % (
|
|
config.watchd.database, ','.join(config.watchd.checktypes), config.watchd.threads,
|
|
config.watchd.working_checktime, config.watchd.fail_retry_interval,
|
|
config.watchd.fail_retry_backoff, config.watchd.max_fail), 'watchd')
|
|
|
|
# Log database status at startup
|
|
with self._db_context() as db:
|
|
total = db.execute('SELECT COUNT(*) FROM proxylist').fetchone()[0]
|
|
due_sql, due_params = _build_due_sql()
|
|
due = db.execute('SELECT COUNT(*) FROM proxylist WHERE ' + due_sql, due_params).fetchone()[0]
|
|
|
|
# Create stats persistence tables
|
|
dbs.create_table_if_not_exists(db, 'stats_history')
|
|
dbs.create_table_if_not_exists(db, 'session_state')
|
|
|
|
# Load persisted session state
|
|
saved_state = dbs.load_session_state(db)
|
|
if saved_state:
|
|
self.stats.load_state(saved_state)
|
|
|
|
# Load MITM certificate state (same directory as database)
|
|
db_dir = os.path.dirname(config.watchd.database) or '.'
|
|
self.mitm_state_file = os.path.join(db_dir, 'mitm_certs.json')
|
|
mitm_cert_stats.load_state(self.mitm_state_file)
|
|
_log('database: %d total proxies, %d due for testing' % (total, due), 'watchd')
|
|
|
|
# Initialize Tor connection pool
|
|
connection_pool.init_pool(config.torhosts, warmup=True)
|
|
|
|
# Start HTTP API server if enabled
|
|
if config.httpd.enabled:
|
|
from httpd import ProxyAPIServer, configure_schedule, configure_url_scoring
|
|
# Pass schedule config to httpd module
|
|
configure_schedule(
|
|
config.watchd.working_checktime,
|
|
config.watchd.fail_retry_interval,
|
|
config.watchd.fail_retry_backoff,
|
|
config.watchd.max_fail
|
|
)
|
|
configure_url_scoring(
|
|
config.ppf.checktime,
|
|
config.ppf.perfail_checktime,
|
|
config.ppf.max_fail,
|
|
config.ppf.list_max_age_days
|
|
)
|
|
self.httpd_server = ProxyAPIServer(
|
|
config.httpd.listenip,
|
|
config.httpd.port,
|
|
config.watchd.database,
|
|
stats_provider=self.get_runtime_stats,
|
|
url_database=config.ppf.database,
|
|
)
|
|
self.httpd_server.start()
|
|
|
|
# Start verification thread if enabled (manager-only, checks disputed results)
|
|
if config.verification.enabled and config.watchd.threads > 0:
|
|
self.verification_thread = VerificationThread(
|
|
database=config.watchd.database,
|
|
interval=config.verification.interval,
|
|
batch_size=config.verification.batch_size
|
|
)
|
|
self.verification_thread.start()
|
|
_log('verification thread enabled', 'watchd')
|
|
|
|
# create worker threads with shared queues
|
|
for i in range(config.watchd.threads):
|
|
threadid = ''.join([random.choice(string.ascii_letters) for x in range(5)])
|
|
wt = WorkerThread(threadid, self.job_queue)
|
|
if self.in_background:
|
|
wt.start_thread()
|
|
self.threads.append(wt)
|
|
time.sleep(random.random() / 10)
|
|
|
|
sleeptime = 0
|
|
while True:
|
|
|
|
if self.stopping.is_set():
|
|
if self.in_background: self._cleanup()
|
|
break
|
|
|
|
if sleeptime > 0:
|
|
time.sleep(1)
|
|
sleeptime -= 1
|
|
continue
|
|
|
|
# Skip job processing when threads=0 (master-only mode)
|
|
if config.watchd.threads > 0:
|
|
# check if job queue is empty (work-stealing: threads pull as needed)
|
|
if self.job_queue.empty():
|
|
self.collect_work()
|
|
if not self.submit_collected() and self.tor_safeguard:
|
|
_log("zzZzZzzZ sleeping 1 minute(s) due to tor issues", "watchd")
|
|
sleeptime = 60
|
|
else:
|
|
job_count = self.prepare_jobs()
|
|
if job_count == 0:
|
|
# no jobs available, wait before checking again
|
|
sleeptime = 10
|
|
|
|
if not self.in_background: # single_thread scenario
|
|
self.threads[0].workloop()
|
|
|
|
self.collect_work()
|
|
|
|
if len(self.collected) > self.submit_after:
|
|
if not self.submit_collected() and self.tor_safeguard:
|
|
_log("zzZzZzzZ sleeping 1 minute(s) due to tor issues", "watchd")
|
|
sleeptime = 60
|
|
else:
|
|
# Master-only mode: sleep to avoid busy loop
|
|
sleeptime = 10
|
|
|
|
# Update rate history for sparklines
|
|
self.stats.update_history()
|
|
|
|
# periodic stats report
|
|
if self.stats.should_report(config.watchd.stats_interval):
|
|
_log(self.stats.report(), 'stats')
|
|
# Also report pool stats
|
|
pool = connection_pool.get_pool()
|
|
if pool:
|
|
_log(pool.status_line(), 'stats')
|
|
# Report judge stats (when using judges checktype)
|
|
if 'judges' in config.watchd.checktypes:
|
|
_log(judge_stats.status_line(), 'stats')
|
|
# Report scaler status
|
|
_log(self.scaler.status_line(len(self.threads), self.job_queue.qsize()), 'scaler')
|
|
|
|
# Save session state periodically (every stats_interval, default 5m)
|
|
try:
|
|
with self._db_context() as db:
|
|
dbs.save_session_state(db, self.stats)
|
|
except Exception as e:
|
|
_log('failed to save session state: %s' % str(e), 'warn')
|
|
|
|
# Save MITM certificate stats periodically
|
|
try:
|
|
mitm_cert_stats.save_state(self.mitm_state_file)
|
|
except Exception as e:
|
|
_log('failed to save MITM state: %s' % str(e), 'warn')
|
|
|
|
# Hourly stats snapshot
|
|
now = time.time()
|
|
if not hasattr(self, '_last_snapshot'):
|
|
self._last_snapshot = now
|
|
if (now - self._last_snapshot) >= 3600:
|
|
try:
|
|
with self._db_context() as db:
|
|
dbs.save_stats_snapshot(db, self.stats)
|
|
self._last_snapshot = now
|
|
except Exception as e:
|
|
_log('failed to save stats snapshot: %s' % str(e), 'warn')
|
|
|
|
# Dynamic thread scaling
|
|
if self.in_background:
|
|
target = self.scaler.should_scale(
|
|
len(self.threads),
|
|
self.job_queue.qsize(),
|
|
self.stats
|
|
)
|
|
if target != len(self.threads):
|
|
self._adjust_threads(target)
|
|
|
|
# periodic stale proxy cleanup (daily)
|
|
if (time.time() - self.last_cleanup) >= 86400:
|
|
self.cleanup_stale()
|
|
|
|
time.sleep(1)
|
|
|
|
|
|
if __name__ == '__main__':
|
|
_run_standalone = True
|
|
|
|
config.load()
|
|
errors = config.validate()
|
|
if errors:
|
|
for e in errors:
|
|
_log(e, 'error')
|
|
import sys
|
|
sys.exit(1)
|
|
|
|
w = Proxywatchd()
|
|
try:
|
|
w.start()
|
|
w.run()
|
|
except KeyboardInterrupt as e:
|
|
pass
|
|
finally:
|
|
w.stop()
|
|
w.finish()
|