#!/usr/bin/env python2 import threading import time import random import string import re import heapq import signal import os try: import Queue except ImportError: import queue as Queue try: import IP2Location geodb = IP2Location.IP2Location(os.path.join("data", "IP2LOCATION-LITE-DB1.BIN")) geolite = True except (ImportError, IOError, ValueError): geolite = False try: import pyasn asndb = pyasn.pyasn(os.path.join("data", "ipasn.dat")) asn_enabled = True except (ImportError, IOError, NameError): asndb = None asn_enabled = False from config import Config import mysqlite import dbs from misc import _log, categorize_error import rocksock import connection_pool config = Config() _run_standalone = False cached_dns = {} # IP pattern for judge services (validates response contains valid IP in body) IP_PATTERN = r'\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}' # Pattern for header echo - if X-Forwarded-For or Via present, proxy reveals chain HEADER_REVEAL_PATTERN = r'(X-Forwarded-For|Via|X-Real-Ip|Forwarded)' # Patterns indicating judge is blocking the proxy (not a proxy failure) # These should NOT count as proxy failures - retry with different judge JUDGE_BLOCK_PATTERNS = [ r'HTTP/1\.[01] 403', # Forbidden r'HTTP/1\.[01] 429', # Too Many Requests r'HTTP/1\.[01] 503', # Service Unavailable r'captcha', # Captcha challenge r'challenge', # Generic challenge r'verify you are human', # Human verification r'rate.?limit', # Rate limiting r'too many requests', # Rate limiting r'access.?denied', # Access denied r'blocked', # Explicit block r'Checking your browser', # Cloudflare JS challenge ] JUDGE_BLOCK_RE = re.compile('|'.join(JUDGE_BLOCK_PATTERNS), re.IGNORECASE) # Check types: irc, http (header match), judges (body match), ssl (TLS handshake) # Judge services - return IP in body (plain text, JSON, or HTML) judges = { # Plain text IP 'api.ipify.org': IP_PATTERN, 'icanhazip.com': IP_PATTERN, 'checkip.amazonaws.com': IP_PATTERN, 'ifconfig.me/ip': IP_PATTERN, 'ipinfo.io/ip': IP_PATTERN, 'ip.me': IP_PATTERN, 'myip.dnsomatic.com': IP_PATTERN, 'ident.me': IP_PATTERN, 'ipecho.net/plain': IP_PATTERN, 'wtfismyip.com/text': IP_PATTERN, 'eth0.me': IP_PATTERN, 'l2.io/ip': IP_PATTERN, 'tnx.nl/ip': IP_PATTERN, 'wgetip.com': IP_PATTERN, 'curlmyip.net': IP_PATTERN, # JSON responses (IP pattern matches within JSON) 'httpbin.org/ip': IP_PATTERN, 'ip-api.com/json': IP_PATTERN, 'ipapi.co/json': IP_PATTERN, 'ipwhois.app/json': IP_PATTERN, # Header echo - for elite detection (check if proxy adds revealing headers) 'httpbin.org/headers': IP_PATTERN, # returns request headers as JSON } class JudgeStats(): """Track per-judge success/failure rates for reliability scoring. Judges that frequently block or rate-limit are temporarily avoided. Stats decay over time to allow recovery. """ def __init__(self, cooldown_seconds=300, block_threshold=3): self.lock = threading.Lock() self.stats = {} # judge -> {'success': n, 'fail': n, 'block': n, 'last_block': timestamp} self.cooldown_seconds = cooldown_seconds # seconds to avoid blocked judges self.block_threshold = block_threshold # consecutive blocks before cooldown def record_success(self, judge): """Record successful judge response.""" with self.lock: if judge not in self.stats: self.stats[judge] = {'success': 0, 'fail': 0, 'block': 0, 'last_block': 0} self.stats[judge]['success'] += 1 # Reset block count on success self.stats[judge]['block'] = 0 def record_failure(self, judge): """Record judge failure (proxy failed, not judge block).""" with self.lock: if judge not in self.stats: self.stats[judge] = {'success': 0, 'fail': 0, 'block': 0, 'last_block': 0} self.stats[judge]['fail'] += 1 def record_block(self, judge): """Record judge blocking the proxy (403, captcha, rate-limit).""" with self.lock: if judge not in self.stats: self.stats[judge] = {'success': 0, 'fail': 0, 'block': 0, 'last_block': 0} self.stats[judge]['block'] += 1 self.stats[judge]['last_block'] = time.time() def is_available(self, judge): """Check if judge is available (not in cooldown).""" with self.lock: if judge not in self.stats: return True s = self.stats[judge] # Check if in cooldown period if s['block'] >= self.block_threshold: if (time.time() - s['last_block']) < self.cooldown_seconds: return False # Cooldown expired, reset block count s['block'] = 0 return True def get_available_judges(self, judge_list): """Return list of judges not in cooldown.""" return [j for j in judge_list if self.is_available(j)] def status_line(self): """Return status summary for logging.""" with self.lock: total = len(self.stats) blocked = sum(1 for s in self.stats.values() if s['block'] >= self.block_threshold and (time.time() - s['last_block']) < self.cooldown_seconds) return 'judges: %d total, %d in cooldown' % (total, blocked) # Global judge stats instance judge_stats = JudgeStats() # HTTP targets - check for specific headers regexes = { 'www.facebook.com': 'X-FB-Debug', 'www.fbcdn.net': 'X-FB-Debug', 'www.reddit.com': 'x-clacks-overhead', 'www.twitter.com': 'x-connection-hash', 't.co': 'x-connection-hash', 'www.msn.com': 'x-aspnetmvc-version', 'www.bing.com': 'p3p', 'www.ask.com': 'x-served-by', 'www.hotmail.com': 'x-msedge-ref', 'www.bbc.co.uk': 'x-bbc-edge-cache-status', 'www.skype.com': 'X-XSS-Protection', 'www.alibaba.com': 'object-status', 'www.mozilla.org': 'cf-ray', 'www.cloudflare.com': 'cf-ray', 'www.wikimedia.org': 'x-client-ip', 'www.vk.com': 'x-frontend', 'www.tinypic.com': 'x-amz-cf-pop', 'www.netflix.com': 'X-Netflix.proxy.execution-time', 'www.amazon.de': 'x-amz-cf-id', 'www.reuters.com': 'x-amz-cf-id', 'www.ikea.com': 'x-frame-options', 'www.twitpic.com': 'timing-allow-origin', 'www.digg.com': 'cf-request-id', 'www.wikia.com': 'x-served-by', 'www.wp.com': 'x-ac', 'www.last.fm': 'x-timer', 'www.usps.com': 'x-ruleset-version', 'www.linkedin.com': 'x-li-uuid', 'www.vimeo.com': 'x-timer', 'www.yelp.com': 'x-timer', 'www.ebay.com': 'x-envoy-upstream-service-time', 'www.wikihow.com': 'x-c', 'www.archive.org': 'referrer-policy', 'www.pandora.tv': 'X-UA-Compatible', 'www.w3.org': 'x-backend', 'www.time.com': 'x-amz-cf-pop' } # SSL targets - verify TLS handshake only (MITM detection) # No HTTP request needed - just connect with verifycert=True ssl_targets = [ 'www.google.com', 'www.microsoft.com', 'www.apple.com', 'www.amazon.com', 'www.cloudflare.com', 'www.github.com', 'www.mozilla.org', 'www.wikipedia.org', 'www.reddit.com', 'www.twitter.com', 'x.com', 'www.facebook.com', 'www.linkedin.com', 'www.paypal.com', 'www.stripe.com', 'www.digicert.com', 'www.letsencrypt.org', ] class Stats(): """Track and report runtime statistics.""" def __init__(self): self.lock = threading.Lock() self.tested = 0 self.passed = 0 self.failed = 0 self.start_time = time.time() self.last_report = time.time() # Failure category tracking self.fail_categories = {} def record(self, success, category=None): with self.lock: self.tested += 1 if success: self.passed += 1 else: self.failed += 1 if category: self.fail_categories[category] = self.fail_categories.get(category, 0) + 1 def should_report(self, interval): return (time.time() - self.last_report) >= interval def report(self): with self.lock: self.last_report = time.time() elapsed = time.time() - self.start_time rate = try_div(self.tested, elapsed) pct = try_div(self.passed * 100.0, self.tested) base = 'tested=%d passed=%d (%.1f%%) rate=%.2f/s uptime=%dm' % ( self.tested, self.passed, pct, rate, int(elapsed / 60)) # Add failure breakdown if there are failures if self.fail_categories: cats = ' '.join('%s=%d' % (k, v) for k, v in sorted(self.fail_categories.items())) return '%s [%s]' % (base, cats) return base def try_div(a, b): if b != 0: return a/float(b) return 0 class PriorityJobQueue(object): """Priority queue for proxy test jobs. Lower priority number = higher priority. Priority 0: New proxies (never tested) Priority 1: Recently working (no failures, has successes) Priority 2: Low fail count (< 3 failures) Priority 3: Medium fail count Priority 4: High fail count """ def __init__(self): self.heap = [] self.lock = threading.Lock() self.not_empty = threading.Condition(self.lock) self.counter = 0 # tie-breaker for equal priorities def put(self, job, priority=3): """Add job with priority (lower = higher priority).""" with self.lock: heapq.heappush(self.heap, (priority, self.counter, job)) self.counter += 1 self.not_empty.notify() def get(self, timeout=None): """Get highest priority job. Raises Queue.Empty on timeout.""" with self.not_empty: if timeout is None: while not self.heap: self.not_empty.wait() else: end_time = time.time() + timeout while not self.heap: remaining = end_time - time.time() if remaining <= 0: raise Queue.Empty() self.not_empty.wait(remaining) _, _, job = heapq.heappop(self.heap) return job def get_nowait(self): """Get job without waiting. Raises Queue.Empty if empty.""" with self.lock: if not self.heap: raise Queue.Empty() _, _, job = heapq.heappop(self.heap) return job def empty(self): """Check if queue is empty.""" with self.lock: return len(self.heap) == 0 def qsize(self): """Return queue size.""" with self.lock: return len(self.heap) def task_done(self): """Compatibility method (no-op for heap queue).""" pass def calculate_priority(failcount, success_count, max_fail): """Calculate job priority based on proxy state. Returns: int: Priority 0-4 (lower = higher priority) """ # New proxy (never successfully tested) if success_count == 0 and failcount == 0: return 0 # Recently working (no current failures) if failcount == 0: return 1 # Low fail count if failcount < 3: return 2 # Medium fail count if failcount < max_fail // 2: return 3 # High fail count return 4 class ThreadScaler(object): """Dynamic thread scaling based on queue depth and success rate. Scales up when: - Queue depth exceeds high watermark - Success rate is above threshold Scales down when: - Queue is nearly empty - Success rate drops below threshold """ def __init__(self, min_threads=2, max_threads=50, target_queue_per_thread=10): self.min_threads = min_threads self.max_threads = max_threads self.target_queue_per_thread = target_queue_per_thread self.last_scale_time = 0 self.scale_cooldown = 30 # seconds between scaling decisions self.success_threshold = 10.0 # minimum success rate % to scale up def should_scale(self, current_threads, queue_size, stats): """Determine if scaling is needed. Args: current_threads: Current number of active threads queue_size: Current job queue depth stats: Stats object with success/fail counts Returns: int: Target thread count (may be same as current) """ now = time.time() if (now - self.last_scale_time) < self.scale_cooldown: return current_threads # Calculate current success rate with stats.lock: total = stats.tested passed = stats.passed success_rate = try_div(passed * 100.0, total) if total > 0 else 50.0 # Calculate ideal thread count based on queue depth ideal = max(self.min_threads, queue_size // self.target_queue_per_thread) ideal = min(ideal, self.max_threads) target = current_threads # Scale up: queue is deep and success rate is acceptable if queue_size > current_threads * self.target_queue_per_thread * 2: if success_rate >= self.success_threshold: target = min(current_threads + 2, ideal, self.max_threads) # Scale down: queue is shallow or success rate is poor elif queue_size < current_threads * 2: target = max(current_threads - 1, self.min_threads) elif success_rate < self.success_threshold / 2: # Drastic success rate drop - scale down to reduce load target = max(current_threads - 2, self.min_threads) if target != current_threads: self.last_scale_time = now return target def status_line(self, current_threads, queue_size): """Return status string for logging.""" return 'threads=%d queue=%d target_per_thread=%d' % ( current_threads, queue_size, self.target_queue_per_thread) def socks4_resolve(srvname, server_port): srv = srvname if srv in cached_dns: srv = cached_dns[srvname] if config.watchd.debug: _log("using cached ip (%s) for %s"%(srv, srvname), "debug") else: dns_fail = False try: af, sa = rocksock.resolve(rocksock.RocksockHostinfo(srvname, server_port), want_v4=True) if sa is not None: cached_dns[srvname] = sa[0] srv = sa[0] else: dns_fail = True except rocksock.RocksockException as e: assert(e.get_errortype() == rocksock.RS_ET_GAI) dns_fail = True if dns_fail: fail_inc = 0 _log("could not resolve connection target %s"%srvname, "ERROR") return False return srv class ProxyTestState(): """Thread-safe state for a proxy being tested against multiple targets. Results from TargetTestJob instances are aggregated here. When all tests complete, evaluate() determines final pass/fail. """ def __init__(self, ip, port, proto, failcount, success_count, total_duration, country, mitm, consecutive_success, asn=None, num_targets=3, oldies=False): self.ip = ip self.port = int(port) self.proxy = '%s:%s' % (ip, port) self.proto = proto self.failcount = failcount self.checktime = None self.success_count = success_count self.total_duration = total_duration self.country = country self.mitm = mitm self.consecutive_success = consecutive_success self.asn = asn self.isoldies = oldies self.num_targets = num_targets # thread-safe result accumulation self.lock = threading.Lock() self.results = [] # list of (success, proto, duration, srv, tor, ssl) self.completed = False self.last_latency_ms = None # average latency from successful tests self.exit_ip = None # IP seen by target server (for anonymity detection) self.reveals_headers = None # True if proxy adds X-Forwarded-For/Via headers def record_result(self, success, proto=None, duration=0, srv=None, tor=None, ssl=None, category=None, exit_ip=None, reveals_headers=None): """Record a single target test result. Thread-safe.""" with self.lock: self.results.append({ 'success': success, 'proto': proto, 'duration': duration, 'srv': srv, 'tor': tor, 'ssl': ssl, 'category': category, 'exit_ip': exit_ip, 'reveals_headers': reveals_headers }) def is_complete(self): """Check if all target tests have finished.""" with self.lock: return len(self.results) >= self.num_targets def rwip(self, ip): n = [] for b in ip.split('.'): while b[0] == 0 and len(b) > 1: b = b[:1] n.append(b) return '.'.join(n) def evaluate(self): """Evaluate results after all tests complete. Returns: (success, category) tuple where success is bool and category is the dominant failure type (or None on success) """ with self.lock: if self.completed: return (self.failcount == 0, None) self.completed = True self.checktime = int(time.time()) successes = [r for r in self.results if r['success']] failures = [r for r in self.results if not r['success']] num_success = len(successes) # Determine dominant failure category fail_category = None if failures: cats = {} for f in failures: cat = f.get('category') or 'other' cats[cat] = cats.get(cat, 0) + 1 if cats: fail_category = max(cats.keys(), key=lambda k: cats[k]) # require success (single target mode) if num_success >= 1: # use last successful result for metrics last_good = successes[-1] # Extract exit IP and header info from successful judge responses for s in successes: if s.get('exit_ip'): self.exit_ip = s['exit_ip'] if s.get('reveals_headers') is not None: self.reveals_headers = s['reveals_headers'] if geolite and self.country is None: self.ip = self.rwip(self.ip) rec = geodb.get_all(self.ip) if rec is not None and rec.country_short: self.country = rec.country_short if asn_enabled and self.asn is None: try: asn_result = asndb.lookup(self.ip) if asn_result and asn_result[0]: self.asn = asn_result[0] except Exception: pass self.proto = last_good['proto'] self.failcount = 0 if (self.consecutive_success % 3) == 0: self.mitm = 0 self.consecutive_success += 1 self.success_count += 1 self.total_duration += int(last_good['duration'] * 1000) # Calculate average latency from successful tests (in ms) durations = [s['duration'] for s in successes if s['duration']] if durations: self.last_latency_ms = sum(durations) * 1000.0 / len(durations) torstats = "" if len(config.torhosts) == 1 else ' tor: %s;' % last_good['tor'] # Determine anonymity for status message # transparent: exit_ip == proxy_ip # anonymous: exit_ip != proxy_ip, adds revealing headers # elite: exit_ip != proxy_ip, no revealing headers anon_status = '' if self.exit_ip: if self.exit_ip == self.ip: anon_status = ' anon: transparent;' elif self.reveals_headers is False: anon_status = ' anon: elite;' elif self.reveals_headers is True: anon_status = ' anon: anonymous;' else: anon_status = ' anon: anonymous;' # default if no header check _log('%s://%s:%d (%s) d: %.2f sec(s);%s%s srv: %s; ssl: %s; %d/%d targets' % ( last_good['proto'], self.ip, self.port, self.country, last_good['duration'], torstats, anon_status, last_good['srv'], str(last_good['ssl']), num_success, self.num_targets), 'xxxxx') return (True, None) else: # Check if failures were all judge blocks (not proxy's fault) judge_blocks = [f for f in failures if f.get('category') == 'judge_block'] real_failures = [f for f in failures if f.get('category') != 'judge_block'] if judge_blocks and not real_failures: # All failures were judge blocks - inconclusive, don't penalize proxy # checktime still updated so we don't immediately retest return (False, 'judge_block') else: # Real proxy failure self.failcount += 1 self.consecutive_success = 0 return (False, fail_category) class TargetTestJob(): """Job to test a single proxy against a single target. Multiple TargetTestJob instances share the same ProxyTestState, allowing tests to be interleaved with other proxies in the queue. """ def __init__(self, proxy_state, target_srv, checktype, worker_id=None): self.proxy_state = proxy_state self.target_srv = target_srv self.checktype = checktype self.worker_id = worker_id def run(self): """Test the proxy against this job's target server.""" sock, proto, duration, tor, srv, failinc, is_ssl, err_cat = self._connect_and_test() if not sock: # SSL-only check passed (handshake success, no request needed) if err_cat == 'ssl_ok': elapsed = time.time() - duration self.proxy_state.record_result( True, proto=proto, duration=elapsed, srv=srv, tor=tor, ssl=is_ssl ) else: self.proxy_state.record_result(False, category=err_cat) return try: recv = sock.recv(-1) # Select regex based on check type if self.checktype == 'irc': regex = '^(:|NOTICE|ERROR)' elif self.checktype == 'judges': regex = judges[srv] elif self.checktype == 'ssl': # Should not reach here - ssl returns before recv self.proxy_state.record_result(True, proto=proto, srv=srv, ssl=is_ssl) return else: # http regex = regexes[srv] if re.search(regex, recv, re.IGNORECASE): elapsed = time.time() - duration # Extract exit IP from judge response exit_ip = None reveals_headers = None if self.checktype == 'judges': ip_match = re.search(IP_PATTERN, recv) if ip_match: exit_ip = ip_match.group(0) # Check for header echo judge (elite detection) if 'headers' in srv: # If X-Forwarded-For/Via/etc present, proxy reveals chain reveals_headers = bool(re.search(HEADER_REVEAL_PATTERN, recv, re.IGNORECASE)) # Record successful judge judge_stats.record_success(srv) self.proxy_state.record_result( True, proto=proto, duration=elapsed, srv=srv, tor=tor, ssl=is_ssl, exit_ip=exit_ip, reveals_headers=reveals_headers ) else: # Check if judge is blocking us (not a proxy failure) if self.checktype == 'judges' and JUDGE_BLOCK_RE.search(recv): judge_stats.record_block(srv) # Don't count as proxy failure - judge is blocking self.proxy_state.record_result(False, category='judge_block') if config.watchd.debug: _log('judge %s blocked proxy %s' % (srv, self.proxy_state.proxy), 'debug') else: if self.checktype == 'judges': judge_stats.record_failure(srv) self.proxy_state.record_result(False, category='other') except KeyboardInterrupt as e: sock.disconnect() raise e except rocksock.RocksockException as e: self.proxy_state.record_result(False, category=categorize_error(e)) finally: sock.disconnect() def _connect_and_test(self): """Connect to target through the proxy and send test packet.""" ps = self.proxy_state srvname = self.target_srv.strip() if self.checktype == 'irc' else self.target_srv # For judges, extract host from 'host/path' format if self.checktype == 'judges' and '/' in srvname: connect_host = srvname.split('/')[0] else: connect_host = srvname # SSL checktype: always use SSL with certificate verification if self.checktype == 'ssl': use_ssl = 1 ssl_only_check = True # handshake only, no HTTP request server_port = 443 verifycert = True else: use_ssl = random.choice([0, 1]) if config.watchd.use_ssl == 2 else config.watchd.use_ssl ssl_only_check = False # minimal SSL test (handshake only, no request) if ps.consecutive_success > 0 and (ps.consecutive_success % 3) == 0: use_ssl = 1 ssl_only_check = True # periodic MITM check - handshake is sufficient if self.checktype == 'irc': server_port = 6697 if use_ssl else 6667 else: server_port = 443 if use_ssl else 80 verifycert = True if use_ssl else False protos = ['http', 'socks5', 'socks4'] if ps.proto is None else [ps.proto] last_error_category = None # Get Tor host from pool (with worker affinity) pool = connection_pool.get_pool() for proto in protos: if pool: torhost = pool.get_tor_host(self.worker_id) else: torhost = random.choice(config.torhosts) if proto == 'socks4': srv = socks4_resolve(connect_host, server_port) else: srv = connect_host if not srv: continue duration = time.time() proxies = [ rocksock.RocksockProxyFromURL('socks5://%s' % torhost), rocksock.RocksockProxyFromURL('%s://%s:%s' % (proto, ps.ip, ps.port)), ] try: sock = rocksock.Rocksock(host=srv, port=server_port, ssl=use_ssl, proxies=proxies, timeout=config.watchd.timeout, verifycert=verifycert) sock.connect() # SSL-only check: handshake passed, no request needed if ssl_only_check: elapsed = time.time() - duration if pool: pool.record_success(torhost, elapsed) sock.disconnect() return None, proto, duration, torhost, srvname, 0, use_ssl, 'ssl_ok' if self.checktype == 'irc': sock.send('NICK\n') elif self.checktype == 'judges': # GET request to receive body with IP address sock.send('GET /%s HTTP/1.0\r\nHost: %s\r\n\r\n' % ( srvname.split('/', 1)[1] if '/' in srvname else '', srvname.split('/')[0] )) else: # http - HEAD is sufficient for header checks sock.send('HEAD / HTTP/1.0\r\nHost: %s\r\n\r\n' % srvname) # Record success in pool if pool: pool.record_success(torhost, time.time() - duration) return sock, proto, duration, torhost, srvname, 0, use_ssl, None except rocksock.RocksockException as e: last_error_category = categorize_error(e) if config.watchd.debug: _log("proxy failed: %s://%s:%d: %s [%s]" % (proto, ps.ip, ps.port, e.get_errormessage(), last_error_category), 'debug') et = e.get_errortype() err = e.get_error() fp = e.get_failedproxy() sock.disconnect() if et == rocksock.RS_ET_OWN: if fp == 1 and (err == rocksock.RS_E_REMOTE_DISCONNECTED or err == rocksock.RS_E_HIT_TIMEOUT): break elif fp == 0 and err == rocksock.RS_E_TARGET_CONN_REFUSED: # Tor connection failed - record in pool if pool: pool.record_failure(torhost) if random.randint(0, (config.watchd.threads - 1) / 2) == 0: _log("could not connect to tor, sleep 5s", "ERROR") time.sleep(5) elif et == rocksock.RS_ET_GAI: _log("could not resolve connection target %s" % connect_host, "ERROR") break elif err == rocksock.RS_E_SSL_CERTIFICATE_ERROR: ps.mitm = 1 except KeyboardInterrupt as e: raise e return None, None, None, None, None, 1, use_ssl, last_error_category class WorkerThread(): def __init__(self, id, job_queue, result_queue): self.id = id self.done = threading.Event() self.thread = None self.job_queue = job_queue # shared input queue self.result_queue = result_queue # shared output queue def stop(self): self.done.set() def term(self): if self.thread: self.thread.join() def start_thread(self): self.thread = threading.Thread(target=self.workloop) self.thread.start() def workloop(self): job_count = 0 duration_total = 0 while not self.done.is_set(): try: job = self.job_queue.get(timeout=0.5) except Queue.Empty: continue nao = time.time() # Assign worker ID for connection pool affinity job.worker_id = self.id job.run() spent = time.time() - nao job_count += 1 duration_total += spent self.result_queue.put(job) self.job_queue.task_done() if self.thread and job_count > 0: avg_t = try_div(duration_total, job_count) _log("terminated, %d jobs, avg.time %.2f" % (job_count, avg_t), self.id) class Proxywatchd(): def stop(self): _log('halting... (%d thread(s))' % len(self.threads), 'watchd') self.stopping.set() def _cleanup(self): for wt in self.threads: wt.stop() for wt in self.threads: wt.term() self.collect_work() self.submit_collected() if self.httpd_server: self.httpd_server.stop() self.stopped.set() def finish(self): if not self.in_background: self._cleanup() while not self.stopped.is_set(): time.sleep(0.1) success_rate = try_div(self.totals['success'], self.totals['submitted']) * 100 _log("total results: %d/%d (%.2f%%)"%(self.totals['success'], self.totals['submitted'], success_rate), "watchd") def _prep_db(self): self.mysqlite = mysqlite.mysqlite(config.watchd.database, str) def _close_db(self): if self.mysqlite: self.mysqlite.close() self.mysqlite = None def __init__(self): config.load() self.in_background = False self.threads = [] self.stopping = threading.Event() self.stopped = threading.Event() # shared work-stealing queues self.job_queue = PriorityJobQueue() self.result_queue = Queue.Queue() # track pending proxy states (for multi-target aggregation) self.pending_states = [] # list of ProxyTestState awaiting completion self.pending_lock = threading.Lock() # create table if needed (use dbs.py for canonical schema) self._prep_db() dbs.create_table_if_not_exists(self.mysqlite, 'proxylist') self._close_db() self.submit_after = config.watchd.submit_after # number of collected jobs before writing db self.collected = [] # completed ProxyTestState objects ready for DB self.totals = { 'submitted':0, 'success':0, } self.stats = Stats() self.last_cleanup = time.time() self.httpd_server = None # Dynamic thread scaling self.scaler = ThreadScaler( min_threads=max(2, config.watchd.threads // 4), max_threads=config.watchd.threads * 2, target_queue_per_thread=10 ) self.thread_id_counter = 0 self.last_jobs_log = 0 self.last_update_log = 0 self.log_interval = 60 # seconds between routine log messages # Register SIGHUP handler for config reload signal.signal(signal.SIGHUP, self._handle_sighup) def _handle_sighup(self, signum, frame): """Handle SIGHUP signal for config reload.""" _log('received SIGHUP, reloading config...', 'watchd') self.reload_config() def reload_config(self): """Reload configuration without restart. Hot-reloadable settings: threads, timeout, checktime, perfail_checktime, submit_after, stats_interval, debug, tor_safeguard, stale_days, max_fail, outage_threshold Requires restart: database, source_file, checktype """ old_threads = config.watchd.threads old_checktype = config.watchd.checktype try: config.load() errors = config.validate() if errors: for e in errors: _log('config error: %s' % e, 'error') _log('config reload failed, keeping old config', 'error') return False except Exception as e: _log('config reload failed: %s' % str(e), 'error') return False # Update runtime values self.submit_after = config.watchd.submit_after # Update scaler limits self.scaler.min_threads = max(2, config.watchd.threads // 4) self.scaler.max_threads = config.watchd.threads * 2 # Warn about values requiring restart if config.watchd.checktype != old_checktype: _log('checktype changed (%s -> %s), requires restart to take effect' % ( old_checktype, config.watchd.checktype), 'warn') _log('config reloaded: threads=%d timeout=%d checktime=%d max_fail=%d' % ( config.watchd.threads, config.watchd.timeout, config.watchd.checktime, config.watchd.max_fail), 'watchd') return True def _spawn_thread(self): """Spawn a new worker thread.""" self.thread_id_counter += 1 threadid = 'dyn%d' % self.thread_id_counter wt = WorkerThread(threadid, self.job_queue, self.result_queue) wt.start_thread() self.threads.append(wt) return threadid def _remove_thread(self): """Remove one worker thread (graceful shutdown).""" if len(self.threads) <= self.scaler.min_threads: return None # Stop the last thread wt = self.threads.pop() wt.stop() # Don't wait for it - let it finish its current job return wt.id def _adjust_threads(self, target): """Adjust thread count to target.""" current = len(self.threads) if target > current: added = [] for _ in range(target - current): tid = self._spawn_thread() added.append(tid) if added: _log('scaled up: +%d threads (%s)' % (len(added), ','.join(added)), 'scaler') elif target < current: removed = [] for _ in range(current - target): tid = self._remove_thread() if tid: removed.append(tid) if removed: _log('scaled down: -%d threads' % len(removed), 'scaler') def fetch_rows(self): self.isoldies = False q = 'SELECT ip,port,proto,failed,success_count,total_duration,country,mitm,consecutive_success,asn FROM proxylist WHERE failed >= ? and failed < ? and (tested + ? + (failed * ?)) < ? ORDER BY RANDOM()' now = time.time() params = (0, config.watchd.max_fail, config.watchd.checktime, config.watchd.perfail_checktime, now) if config.watchd.debug: _log('fetch_rows: db=%s checktime=%d perfail=%d max_fail=%d now=%d' % ( config.watchd.database, config.watchd.checktime, config.watchd.perfail_checktime, config.watchd.max_fail, int(now)), 'debug') rows = self.mysqlite.execute(q, params).fetchall() if config.watchd.debug: _log('fetch_rows: got %d rows' % len(rows), 'debug') # check oldies ? if len(rows) < config.watchd.threads: rows = [] if config.watchd.oldies: self.isoldies = True ## disable tor safeguard for old proxies if self.tor_safeguard: self.tor_safeguard = False rows = self.mysqlite.execute(q, (config.watchd.max_fail, config.watchd.max_fail + round( config.watchd.max_fail/2), config.watchd.checktime, config.watchd.oldies_checktime, time.time())).fetchall() return rows def prepare_jobs(self): self._prep_db() ## enable tor safeguard by default self.tor_safeguard = config.watchd.tor_safeguard rows = self.fetch_rows() checktype = config.watchd.checktype num_targets = 1 # select target pool based on checktype if checktype == 'irc': target_pool = config.servers elif checktype == 'judges': # Filter out judges in cooldown (blocked/rate-limited) all_judges = list(judges.keys()) target_pool = judge_stats.get_available_judges(all_judges) if not target_pool: # All judges in cooldown - use all anyway target_pool = all_judges if config.watchd.debug: _log('all judges in cooldown, using full list', 'debug') elif checktype == 'ssl': target_pool = ssl_targets else: # http target_pool = list(regexes.keys()) # create all jobs first, then shuffle for interleaving all_jobs = [] new_states = [] for row in rows: # create shared state for this proxy state = ProxyTestState( row[0], row[1], row[2], row[3], row[4], row[5], row[6], row[7], row[8], asn=row[9], num_targets=num_targets, oldies=self.isoldies ) new_states.append(state) # select random targets for this proxy targets = random.sample(target_pool, min(num_targets, len(target_pool))) # create one job per target for target in targets: job = TargetTestJob(state, target, checktype) all_jobs.append(job) # shuffle to interleave tests across different proxies random.shuffle(all_jobs) # track pending states with self.pending_lock: self.pending_states.extend(new_states) # queue all jobs with priority for job in all_jobs: priority = calculate_priority( job.proxy_state.failcount, job.proxy_state.success_count, config.watchd.max_fail ) self.job_queue.put(job, priority) self._close_db() proxy_count = len(new_states) job_count = len(all_jobs) now = time.time() if proxy_count > 0 and (now - self.last_jobs_log) >= self.log_interval: _log("created %d jobs for %d proxies (%d targets each)" % ( job_count, proxy_count, num_targets), 'watchd') self.last_jobs_log = now return job_count def collect_work(self): # drain results from shared result queue (TargetTestJob objects) # results are already recorded in their ProxyTestState while True: try: self.result_queue.get_nowait() except Queue.Empty: break # check for completed proxy states and evaluate them with self.pending_lock: still_pending = [] for state in self.pending_states: if state.is_complete(): success, category = state.evaluate() self.stats.record(success, category) self.collected.append(state) else: still_pending.append(state) self.pending_states = still_pending def collect_unfinished(self): # drain any remaining jobs from job queue unfinished_count = 0 while True: try: self.job_queue.get_nowait() unfinished_count += 1 except Queue.Empty: break if unfinished_count > 0: _log("discarded %d unfinished jobs" % unfinished_count, "watchd") # note: corresponding ProxyTestStates will be incomplete # they'll be re-tested in the next cycle def submit_collected(self): if len(self.collected) == 0: return True sc = 0 args = [] latency_updates = [] # (proxy, latency_ms) for successful tests for job in self.collected: if job.failcount == 0: sc += 1 if job.last_latency_ms is not None: latency_updates.append((job.proxy, job.last_latency_ms)) args.append((job.failcount, job.checktime, 1, job.country, job.proto, job.success_count, job.total_duration, job.mitm, job.consecutive_success, job.asn, job.proxy)) success_rate = (float(sc) / len(self.collected)) * 100 ret = True if len(self.collected) >= 100 and success_rate <= config.watchd.outage_threshold and self.tor_safeguard: _log("WATCHD %.2f%% SUCCESS RATE - tor circuit blocked? won't submit fails" % success_rate, "ERROR") if sc == 0: return False args = [] latency_updates = [] for job in self.collected: if job.failcount == 0: args.append((job.failcount, job.checktime, 1, job.country, job.proto, job.success_count, job.total_duration, job.mitm, job.consecutive_success, job.asn, job.proxy)) if job.last_latency_ms is not None: latency_updates.append((job.proxy, job.last_latency_ms)) ret = False now = time.time() if (now - self.last_update_log) >= self.log_interval or success_rate > 0: _log("updating %d DB entries (success rate: %.2f%%)" % (len(self.collected), success_rate), 'watchd') self.last_update_log = now self._prep_db() query = 'UPDATE proxylist SET failed=?,tested=?,dronebl=?,country=?,proto=?,success_count=?,total_duration=?,mitm=?,consecutive_success=?,asn=? WHERE proxy=?' self.mysqlite.executemany(query, args) # Update latency metrics for successful proxies for proxy, latency_ms in latency_updates: dbs.update_proxy_latency(self.mysqlite, proxy, latency_ms) # Update anonymity for proxies with exit IP data for job in self.collected: if job.failcount == 0 and job.exit_ip: dbs.update_proxy_anonymity(self.mysqlite, job.proxy, job.exit_ip, job.ip, job.reveals_headers) self.mysqlite.commit() self._close_db() self.collected = [] self.totals['submitted'] += len(args) self.totals['success'] += sc return ret def cleanup_stale(self): """Remove proxies that have been dead for too long.""" stale_seconds = config.watchd.stale_days * 86400 cutoff = int(time.time()) - stale_seconds self._prep_db() # delete proxies that: failed >= max_fail AND last tested before cutoff result = self.mysqlite.execute( 'DELETE FROM proxylist WHERE failed >= ? AND tested < ?', (config.watchd.max_fail, cutoff) ) count = result.rowcount if hasattr(result, 'rowcount') else 0 self.mysqlite.commit() self._close_db() if count > 0: _log('removed %d stale proxies (not seen in %d days)' % (count, config.watchd.stale_days), 'watchd') self.last_cleanup = time.time() def start(self): if config.watchd.threads == 1 and _run_standalone: return self._run() else: return self._run_background() def run(self): if self.in_background: while 1: time.sleep(1) def _run_background(self): self.in_background = True t = threading.Thread(target=self._run) t.start() def _run(self): _log('starting...', 'watchd') _log('config: db=%s checktype=%s threads=%d checktime=%d perfail=%d max_fail=%d' % ( config.watchd.database, config.watchd.checktype, config.watchd.threads, config.watchd.checktime, config.watchd.perfail_checktime, config.watchd.max_fail), 'watchd') # Log database status at startup self._prep_db() total = self.mysqlite.execute('SELECT COUNT(*) FROM proxylist').fetchone()[0] now = time.time() due = self.mysqlite.execute( 'SELECT COUNT(*) FROM proxylist WHERE failed >= 0 and failed < ? and (tested + ? + (failed * ?)) < ?', (config.watchd.max_fail, config.watchd.checktime, config.watchd.perfail_checktime, now) ).fetchone()[0] self._close_db() _log('database: %d total proxies, %d due for testing' % (total, due), 'watchd') # Initialize Tor connection pool connection_pool.init_pool(config.torhosts, warmup=True) # Start HTTP API server if enabled if config.httpd.enabled: from httpd import ProxyAPIServer self.httpd_server = ProxyAPIServer( config.httpd.listenip, config.httpd.port, config.watchd.database ) self.httpd_server.start() # create worker threads with shared queues for i in range(config.watchd.threads): threadid = ''.join([random.choice(string.letters) for x in range(5)]) wt = WorkerThread(threadid, self.job_queue, self.result_queue) if self.in_background: wt.start_thread() self.threads.append(wt) time.sleep(random.random() / 10) sleeptime = 0 while True: if self.stopping.is_set(): if self.in_background: self._cleanup() break if sleeptime > 0: time.sleep(1) sleeptime -= 1 continue # check if job queue is empty (work-stealing: threads pull as needed) if self.job_queue.empty(): self.collect_work() if not self.submit_collected() and self.tor_safeguard: _log("zzZzZzzZ sleeping 1 minute(s) due to tor issues", "watchd") sleeptime = 60 else: job_count = self.prepare_jobs() if job_count == 0: # no jobs available, wait before checking again sleeptime = 10 if not self.in_background: # single_thread scenario self.threads[0].workloop() self.collect_work() if len(self.collected) > self.submit_after: if not self.submit_collected() and self.tor_safeguard: _log("zzZzZzzZ sleeping 1 minute(s) due to tor issues", "watchd") sleeptime = 60 # periodic stats report if self.stats.should_report(config.watchd.stats_interval): _log(self.stats.report(), 'stats') # Also report pool stats pool = connection_pool.get_pool() if pool: _log(pool.status_line(), 'stats') # Report judge stats (when using judges checktype) if config.watchd.checktype == 'judges': _log(judge_stats.status_line(), 'stats') # Report scaler status _log(self.scaler.status_line(len(self.threads), self.job_queue.qsize()), 'scaler') # Dynamic thread scaling if self.in_background: target = self.scaler.should_scale( len(self.threads), self.job_queue.qsize(), self.stats ) if target != len(self.threads): self._adjust_threads(target) # periodic stale proxy cleanup (daily) if (time.time() - self.last_cleanup) >= 86400: self.cleanup_stale() time.sleep(1) if __name__ == '__main__': _run_standalone = True config.load() errors = config.validate() if errors: for e in errors: _log(e, 'error') import sys sys.exit(1) w = Proxywatchd() try: w.start() w.run() except KeyboardInterrupt as e: pass finally: w.stop() w.finish()