diff --git a/proxywatchd.py b/proxywatchd.py index 0f12f41..8e22820 100644 --- a/proxywatchd.py +++ b/proxywatchd.py @@ -1,6 +1,17 @@ #!/usr/bin/env python2 -import threading +# Gevent monkey-patching MUST happen before any other imports +# This patches threading, socket, time, etc. for cooperative concurrency +from gevent import monkey +monkey.patch_all() + +import gevent +from gevent.pool import Pool as GreenPool +from gevent.queue import Queue as GreenQueue +from gevent.event import Event as GreenEvent +from gevent.lock import Semaphore as GreenLock + +import threading # Now patched by gevent import time import random import string @@ -36,6 +47,14 @@ from misc import _log, categorize_error import rocksock import connection_pool +# Optional scraper integration for engine stats +try: + import scraper as scraper_module + scraper_available = True +except ImportError: + scraper_module = None + scraper_available = False + config = Config() _run_standalone = False @@ -156,6 +175,26 @@ class JudgeStats(): (time.time() - s['last_block']) < self.cooldown_seconds) return 'judges: %d total, %d in cooldown' % (total, blocked) + def get_stats(self): + """Return statistics dict for API/dashboard.""" + with self.lock: + now = time.time() + total = len(self.stats) + in_cooldown = sum(1 for s in self.stats.values() + if s['block'] >= self.block_threshold and + (now - s['last_block']) < self.cooldown_seconds) + available = total - in_cooldown + # Get top judges by success count + top = [] + for judge, s in self.stats.items(): + total_tests = s['success'] + s['fail'] + if total_tests > 0: + success_pct = (s['success'] * 100.0) / total_tests + top.append({'judge': judge, 'success': s['success'], + 'tests': total_tests, 'rate': round(success_pct, 1)}) + top.sort(key=lambda x: x['success'], reverse=True) + return {'total': total, 'available': available, 'in_cooldown': in_cooldown, 'top': top} + # Global judge stats instance judge_stats = JudgeStats() @@ -224,28 +263,316 @@ ssl_targets = [ class Stats(): - """Track and report runtime statistics.""" + """Track and report comprehensive runtime statistics.""" + + HISTORY_SIZE = 120 # 10 min at 5s intervals + LATENCY_BUCKETS = [100, 250, 500, 1000, 2000, 5000, 10000] # ms thresholds def __init__(self): - self.lock = threading.Lock() + self.lock = threading.RLock() # RLock for reentrant access (get_runtime_stats) self.tested = 0 self.passed = 0 self.failed = 0 self.start_time = time.time() self.last_report = time.time() + # Failure category tracking self.fail_categories = {} - def record(self, success, category=None): + # Protocol tracking (tested and passed separately) + self.proto_tested = {'http': 0, 'socks4': 0, 'socks5': 0} + self.proto_passed = {'http': 0, 'socks4': 0, 'socks5': 0} + # Legacy alias for compatibility + self.by_proto = self.proto_passed + + # Time series history (5s intervals) + self.rate_history = [] + self.success_rate_history = [] + self.latency_history = [] + self.last_history_time = time.time() + self.last_history_tested = 0 + self.last_history_passed = 0 + + # Peak values + self.peak_rate = 0.0 + self.peak_success_rate = 0.0 + self.min_latency = float('inf') + self.max_latency = 0.0 + + # Latency tracking with percentiles + self.latency_sum = 0.0 + self.latency_count = 0 + self.latency_samples = [] # Recent samples for percentiles + self.latency_buckets = {b: 0 for b in self.LATENCY_BUCKETS + [float('inf')]} + + # Recent window (last 60s) + self.recent_tested = 0 + self.recent_passed = 0 + self.recent_start = time.time() + + # Country/ASN tracking (top N) + self.country_passed = {} + self.asn_passed = {} + + # Hourly aggregates + self.hourly_tested = 0 + self.hourly_passed = 0 + self.hourly_start = time.time() + self.hours_data = [] # Last 24 hours + + # SSL/TLS tracking + self.ssl_tested = 0 + self.ssl_passed = 0 + self.ssl_failed = 0 + self.mitm_detected = 0 + self.cert_errors = 0 + + def record(self, success, category=None, proto=None, latency_ms=None, country=None, asn=None, + ssl_test=False, mitm=False, cert_error=False): with self.lock: self.tested += 1 + self.recent_tested += 1 + self.hourly_tested += 1 + + # Track protocol tests + if proto and proto in self.proto_tested: + self.proto_tested[proto] += 1 + if success: self.passed += 1 + self.recent_passed += 1 + self.hourly_passed += 1 + + if proto and proto in self.proto_passed: + self.proto_passed[proto] += 1 + + if latency_ms and latency_ms > 0: + self.latency_sum += latency_ms + self.latency_count += 1 + # Track min/max + if latency_ms < self.min_latency: + self.min_latency = latency_ms + if latency_ms > self.max_latency: + self.max_latency = latency_ms + # Keep recent samples for percentiles (max 1000) + self.latency_samples.append(latency_ms) + if len(self.latency_samples) > 1000: + self.latency_samples.pop(0) + # Bucket for histogram + for bucket in self.LATENCY_BUCKETS: + if latency_ms <= bucket: + self.latency_buckets[bucket] += 1 + break + else: + self.latency_buckets[float('inf')] += 1 + + # Track country/ASN + if country: + self.country_passed[country] = self.country_passed.get(country, 0) + 1 + if asn: + self.asn_passed[asn] = self.asn_passed.get(asn, 0) + 1 else: self.failed += 1 if category: self.fail_categories[category] = self.fail_categories.get(category, 0) + 1 + # SSL/TLS tracking + if ssl_test: + self.ssl_tested += 1 + if success: + self.ssl_passed += 1 + else: + self.ssl_failed += 1 + if mitm: + self.mitm_detected += 1 + if cert_error: + self.cert_errors += 1 + + def update_history(self): + """Update time series history (call periodically).""" + now = time.time() + with self.lock: + elapsed = now - self.last_history_time + if elapsed >= 5: # Update every 5 seconds + # Rate + tests_delta = self.tested - self.last_history_tested + rate = tests_delta / elapsed if elapsed > 0 else 0 + self.rate_history.append(round(rate, 2)) + if len(self.rate_history) > self.HISTORY_SIZE: + self.rate_history.pop(0) + if rate > self.peak_rate: + self.peak_rate = rate + + # Success rate + passed_delta = self.passed - self.last_history_passed + sr = (passed_delta / tests_delta * 100) if tests_delta > 0 else 0 + self.success_rate_history.append(round(sr, 1)) + if len(self.success_rate_history) > self.HISTORY_SIZE: + self.success_rate_history.pop(0) + if sr > self.peak_success_rate: + self.peak_success_rate = sr + + # Average latency for this interval + avg_lat = self.get_avg_latency() + self.latency_history.append(round(avg_lat, 0)) + if len(self.latency_history) > self.HISTORY_SIZE: + self.latency_history.pop(0) + + self.last_history_time = now + self.last_history_tested = self.tested + self.last_history_passed = self.passed + + # Reset recent window every 60s + if now - self.recent_start >= 60: + self.recent_tested = 0 + self.recent_passed = 0 + self.recent_start = now + + # Hourly aggregation + if now - self.hourly_start >= 3600: + self.hours_data.append({ + 'tested': self.hourly_tested, + 'passed': self.hourly_passed, + 'rate': self.hourly_passed / 3600.0 if self.hourly_tested > 0 else 0, + 'success_rate': (self.hourly_passed / self.hourly_tested * 100) if self.hourly_tested > 0 else 0, + }) + if len(self.hours_data) > 24: + self.hours_data.pop(0) + self.hourly_tested = 0 + self.hourly_passed = 0 + self.hourly_start = now + + def get_recent_rate(self): + """Get rate for last 60 seconds.""" + with self.lock: + elapsed = time.time() - self.recent_start + if elapsed > 0: + return self.recent_tested / elapsed + return 0.0 + + def get_recent_success_rate(self): + """Get success rate for last 60 seconds.""" + with self.lock: + if self.recent_tested > 0: + return (self.recent_passed / self.recent_tested) * 100 + return 0.0 + + def get_avg_latency(self): + """Get average latency in ms.""" + with self.lock: + if self.latency_count > 0: + return self.latency_sum / self.latency_count + return 0.0 + + def get_latency_percentiles(self): + """Get latency percentiles (p50, p90, p99).""" + with self.lock: + if not self.latency_samples: + return {'p50': 0, 'p90': 0, 'p99': 0} + sorted_samples = sorted(self.latency_samples) + n = len(sorted_samples) + return { + 'p50': sorted_samples[int(n * 0.50)] if n > 0 else 0, + 'p90': sorted_samples[int(n * 0.90)] if n > 0 else 0, + 'p99': sorted_samples[min(int(n * 0.99), n - 1)] if n > 0 else 0, + } + + def get_latency_histogram(self): + """Get latency distribution histogram.""" + with self.lock: + total = sum(self.latency_buckets.values()) + if total == 0: + return [] + result = [] + prev = 0 + for bucket in self.LATENCY_BUCKETS: + count = self.latency_buckets[bucket] + result.append({ + 'range': '%d-%d' % (prev, bucket), + 'count': count, + 'pct': round(count / total * 100, 1), + }) + prev = bucket + # Over max bucket + over = self.latency_buckets[float('inf')] + if over > 0: + result.append({ + 'range': '>%d' % self.LATENCY_BUCKETS[-1], + 'count': over, + 'pct': round(over / total * 100, 1), + }) + return result + + def get_proto_stats(self): + """Get protocol-specific success rates.""" + with self.lock: + result = {} + for proto in ['http', 'socks4', 'socks5']: + tested = self.proto_tested[proto] + passed = self.proto_passed[proto] + result[proto] = { + 'tested': tested, + 'passed': passed, + 'success_rate': round(passed / tested * 100, 1) if tested > 0 else 0, + } + return result + + def get_top_countries(self, limit=10): + """Get top countries by working proxy count.""" + with self.lock: + sorted_countries = sorted(self.country_passed.items(), key=lambda x: -x[1]) + return sorted_countries[:limit] + + def get_top_asns(self, limit=10): + """Get top ASNs by working proxy count.""" + with self.lock: + sorted_asns = sorted(self.asn_passed.items(), key=lambda x: -x[1]) + return sorted_asns[:limit] + + def get_hourly_data(self): + """Get last 24 hours of hourly data.""" + with self.lock: + return list(self.hours_data) + + def load_state(self, state): + """Load persisted state from a dict (from database). + + Args: + state: dict from dbs.load_session_state() + """ + if not state: + return + with self.lock: + self.tested = state.get('tested', 0) + self.passed = state.get('passed', 0) + self.failed = state.get('failed', 0) + self.ssl_tested = state.get('ssl_tested', 0) + self.ssl_passed = state.get('ssl_passed', 0) + self.ssl_failed = state.get('ssl_failed', 0) + self.mitm_detected = state.get('mitm_detected', 0) + self.cert_errors = state.get('cert_errors', 0) + self.proto_tested['http'] = state.get('proto_http_tested', 0) + self.proto_passed['http'] = state.get('proto_http_passed', 0) + self.proto_tested['socks4'] = state.get('proto_socks4_tested', 0) + self.proto_passed['socks4'] = state.get('proto_socks4_passed', 0) + self.proto_tested['socks5'] = state.get('proto_socks5_tested', 0) + self.proto_passed['socks5'] = state.get('proto_socks5_passed', 0) + self.peak_rate = state.get('peak_rate', 0.0) + # Restore start_time to preserve uptime calculation + if state.get('start_time'): + self.start_time = state['start_time'] + # Restore failure categories + if state.get('fail_categories'): + self.fail_categories = dict(state['fail_categories']) + # Restore geo tracking + if state.get('country_passed'): + self.country_passed = dict(state['country_passed']) + if state.get('asn_passed'): + # Convert string keys back to int for ASN + self.asn_passed = {int(k) if k.isdigit() else k: v + for k, v in state['asn_passed'].items()} + _log('restored session: %d tested, %d passed' % (self.tested, self.passed), 'info') + def should_report(self, interval): return (time.time() - self.last_report) >= interval @@ -263,6 +590,38 @@ class Stats(): return '%s [%s]' % (base, cats) return base + def get_full_stats(self): + """Get comprehensive stats dict for API.""" + with self.lock: + elapsed = time.time() - self.start_time + return { + 'tested': self.tested, + 'passed': self.passed, + 'failed': self.failed, + 'success_rate': round(self.passed / self.tested * 100, 1) if self.tested > 0 else 0, + 'rate': round(self.tested / elapsed, 2) if elapsed > 0 else 0, + 'pass_rate': round(self.passed / elapsed, 2) if elapsed > 0 else 0, + 'recent_rate': self.get_recent_rate(), + 'recent_success_rate': self.get_recent_success_rate(), + 'peak_rate': self.peak_rate, + 'peak_success_rate': self.peak_success_rate, + 'uptime_seconds': int(elapsed), + 'rate_history': list(self.rate_history), + 'success_rate_history': list(self.success_rate_history), + 'latency_history': list(self.latency_history), + 'avg_latency': self.get_avg_latency(), + 'min_latency': self.min_latency if self.min_latency != float('inf') else 0, + 'max_latency': self.max_latency, + 'latency_percentiles': self.get_latency_percentiles(), + 'latency_histogram': self.get_latency_histogram(), + 'by_proto': dict(self.proto_passed), + 'proto_stats': self.get_proto_stats(), + 'failures': dict(self.fail_categories), + 'top_countries': self.get_top_countries(), + 'top_asns': self.get_top_asns(), + 'hourly_data': self.get_hourly_data(), + } + def try_div(a, b): if b != 0: return a/float(b) @@ -357,6 +716,9 @@ def calculate_priority(failcount, success_count, max_fail): class ThreadScaler(object): """Dynamic thread scaling based on queue depth and success rate. + With gevent greenlets, we can scale much more aggressively than with + OS threads since greenlets are cooperative and lightweight. + Scales up when: - Queue depth exceeds high watermark - Success rate is above threshold @@ -365,12 +727,12 @@ class ThreadScaler(object): - Success rate drops below threshold """ - def __init__(self, min_threads=2, max_threads=50, target_queue_per_thread=10): + def __init__(self, min_threads=5, max_threads=100, target_queue_per_thread=5): self.min_threads = min_threads self.max_threads = max_threads self.target_queue_per_thread = target_queue_per_thread self.last_scale_time = 0 - self.scale_cooldown = 30 # seconds between scaling decisions + self.scale_cooldown = 10 # seconds between scaling (faster with greenlets) self.success_threshold = 10.0 # minimum success rate % to scale up def should_scale(self, current_threads, queue_size, stats): @@ -401,16 +763,17 @@ class ThreadScaler(object): target = current_threads # Scale up: queue is deep and success rate is acceptable - if queue_size > current_threads * self.target_queue_per_thread * 2: + # With greenlets, scale more aggressively (+5 instead of +2) + if queue_size > current_threads * self.target_queue_per_thread: if success_rate >= self.success_threshold: - target = min(current_threads + 2, ideal, self.max_threads) + target = min(current_threads + 5, ideal, self.max_threads) # Scale down: queue is shallow or success rate is poor elif queue_size < current_threads * 2: - target = max(current_threads - 1, self.min_threads) + target = max(current_threads - 2, self.min_threads) elif success_rate < self.success_threshold / 2: # Drastic success rate drop - scale down to reduce load - target = max(current_threads - 2, self.min_threads) + target = max(current_threads - 5, self.min_threads) if target != current_threads: self.last_scale_time = now @@ -476,6 +839,10 @@ class ProxyTestState(): self.last_latency_ms = None # average latency from successful tests self.exit_ip = None # IP seen by target server (for anonymity detection) self.reveals_headers = None # True if proxy adds X-Forwarded-For/Via headers + # SSL/TLS tracking + self.had_ssl_test = False + self.ssl_success = False + self.cert_error = False def record_result(self, success, proto=None, duration=0, srv=None, tor=None, ssl=None, category=None, exit_ip=None, reveals_headers=None): """Record a single target test result. Thread-safe.""" @@ -491,6 +858,14 @@ class ProxyTestState(): 'exit_ip': exit_ip, 'reveals_headers': reveals_headers }) + # Track SSL tests + if ssl: + self.had_ssl_test = True + if success: + self.ssl_success = True + # Track cert errors + if category == 'cert_error' or category == 'ssl_error': + self.cert_error = True def is_complete(self): """Check if all target tests have finished.""" @@ -498,9 +873,11 @@ class ProxyTestState(): return len(self.results) >= self.num_targets def rwip(self, ip): + """Remove leading zeros from IP octets (e.g., 192.168.001.010 -> 192.168.1.10).""" n = [] for b in ip.split('.'): - while b[0] == 0 and len(b) > 1: b = b[:1] + while b[0] == '0' and len(b) > 1: + b = b[1:] n.append(b) return '.'.join(n) @@ -866,6 +1243,16 @@ class Proxywatchd(): success_rate = try_div(self.totals['success'], self.totals['submitted']) * 100 _log("total results: %d/%d (%.2f%%)"%(self.totals['success'], self.totals['submitted'], success_rate), "watchd") + # Final save of session state before exit + try: + self._prep_db() + dbs.save_session_state(self.mysqlite, self.stats) + dbs.save_stats_snapshot(self.mysqlite, self.stats) + self._close_db() + _log('session state saved', 'watchd') + except Exception as e: + _log('failed to save final session state: %s' % str(e), 'warn') + def _prep_db(self): self.mysqlite = mysqlite.mysqlite(config.watchd.database, str) def _close_db(self): @@ -902,11 +1289,12 @@ class Proxywatchd(): self.last_cleanup = time.time() self.httpd_server = None - # Dynamic thread scaling + # Dynamic thread scaling (with gevent, greenlets are cheap - use higher limits) + min_t = config.watchd.min_threads if config.watchd.min_threads > 0 else max(5, config.watchd.threads // 2) self.scaler = ThreadScaler( - min_threads=max(2, config.watchd.threads // 4), - max_threads=config.watchd.threads * 2, - target_queue_per_thread=10 + min_threads=min_t, + max_threads=config.watchd.threads * 10, # greenlets allow much higher concurrency + target_queue_per_thread=5 # lower threshold for faster scaling ) self.thread_id_counter = 0 self.last_jobs_log = 0 @@ -950,9 +1338,9 @@ class Proxywatchd(): # Update runtime values self.submit_after = config.watchd.submit_after - # Update scaler limits - self.scaler.min_threads = max(2, config.watchd.threads // 4) - self.scaler.max_threads = config.watchd.threads * 2 + # Update scaler limits (greenlets allow higher concurrency) + self.scaler.min_threads = max(5, config.watchd.threads // 2) + self.scaler.max_threads = config.watchd.threads * 10 # Warn about values requiring restart if config.watchd.checktype != old_checktype: @@ -1111,7 +1499,10 @@ class Proxywatchd(): for state in self.pending_states: if state.is_complete(): success, category = state.evaluate() - self.stats.record(success, category) + self.stats.record(success, category, state.proto, state.last_latency_ms, + state.country, state.asn, + ssl_test=state.had_ssl_test, mitm=(state.mitm > 0), + cert_error=state.cert_error) self.collected.append(state) else: still_pending.append(state) @@ -1219,6 +1610,125 @@ class Proxywatchd(): t = threading.Thread(target=self._run) t.start() + def get_runtime_stats(self): + """Return runtime statistics for the dashboard. + + Returns: + dict with tested, passed, success_rate, rate, uptime_seconds, + threads, failures, tor_pool, and engine stats. + """ + stats_data = {} + + # Stats from Stats object + with self.stats.lock: + stats_data['tested'] = self.stats.tested + stats_data['passed'] = self.stats.passed + stats_data['failed'] = self.stats.failed + elapsed = time.time() - self.stats.start_time + stats_data['uptime_seconds'] = int(elapsed) + stats_data['rate'] = try_div(self.stats.tested, elapsed) + stats_data['success_rate'] = try_div(self.stats.passed * 100.0, self.stats.tested) + stats_data['failures'] = dict(self.stats.fail_categories) + # New: Protocol breakdown + stats_data['by_proto'] = dict(self.stats.by_proto) + # New: Rate history for sparklines + stats_data['rate_history'] = list(self.stats.rate_history) + stats_data['success_rate_history'] = list(self.stats.success_rate_history) + # New: Peak rate + stats_data['peak_rate'] = self.stats.peak_rate + + # New: Recent rate (last 60s) and avg latency + stats_data['recent_rate'] = self.stats.get_recent_rate() + stats_data['recent_success_rate'] = self.stats.get_recent_success_rate() + stats_data['avg_latency'] = self.stats.get_avg_latency() + + # Latency stats + stats_data['min_latency'] = self.stats.min_latency if self.stats.min_latency != float('inf') else 0 + stats_data['max_latency'] = self.stats.max_latency + stats_data['latency_percentiles'] = self.stats.get_latency_percentiles() + stats_data['latency_histogram'] = self.stats.get_latency_histogram() + + # Protocol performance (tested, passed, rate per protocol) + stats_data['proto_stats'] = self.stats.get_proto_stats() + + # Geographic distribution from session + stats_data['top_countries_session'] = self.stats.get_top_countries(10) + stats_data['top_asns_session'] = self.stats.get_top_asns(10) + + # SSL/TLS stats + with self.stats.lock: + ssl_tested = self.stats.ssl_tested + ssl_passed = self.stats.ssl_passed + stats_data['ssl'] = { + 'tested': ssl_tested, + 'passed': ssl_passed, + 'failed': self.stats.ssl_failed, + 'success_rate': try_div(ssl_passed * 100.0, ssl_tested) if ssl_tested > 0 else 0, + 'mitm_detected': self.stats.mitm_detected, + 'cert_errors': self.stats.cert_errors + } + + # Thread info + stats_data['threads'] = len(self.threads) + stats_data['min_threads'] = self.scaler.min_threads + stats_data['max_threads'] = self.scaler.max_threads + stats_data['queue_size'] = self.job_queue.qsize() + stats_data['checktype'] = config.watchd.checktype + stats_data['pass_rate'] = try_div(self.stats.passed, elapsed) + + # Tor pool stats + pool = connection_pool.get_pool() + if pool: + pool_stats = pool.get_stats() + pool_info = { + 'hosts': [], + 'total_requests': pool_stats.get('total_requests', 0), + 'success_rate': pool_stats.get('success_rate', 0) + } + for h in pool_stats.get('hosts', []): + pool_info['hosts'].append({ + 'address': h.get('host', ''), + 'healthy': h.get('available', False), + 'latency_ms': h.get('avg_latency', 0) * 1000 if h.get('avg_latency') else None, + 'success_rate': h.get('success_rate', 0), + }) + stats_data['tor_pool'] = pool_info + else: + stats_data['tor_pool'] = {'hosts': [], 'total_requests': 0, 'success_rate': 0} + + # Judge stats (when using judges checktype) + if config.watchd.checktype == 'judges': + js = judge_stats.get_stats() + stats_data['judges'] = { + 'total': js.get('total', 0), + 'available': js.get('available', 0), + 'in_cooldown': js.get('in_cooldown', 0), + 'top_judges': js.get('top', [])[:5] # top 5 most successful + } + else: + stats_data['judges'] = None + + # Scraper/engine stats + if scraper_available: + scraper_stats = scraper_module.get_scraper_stats() + if scraper_stats: + stats_data['engines_available'] = scraper_stats.get('available', 0) + stats_data['engines_total'] = scraper_stats.get('total', 0) + stats_data['engines_backoff'] = scraper_stats.get('in_backoff', 0) + stats_data['scraper'] = scraper_stats + else: + stats_data['engines_available'] = 0 + stats_data['engines_total'] = 0 + stats_data['engines_backoff'] = 0 + stats_data['scraper'] = None + else: + stats_data['engines_available'] = 0 + stats_data['engines_total'] = 0 + stats_data['engines_backoff'] = 0 + stats_data['scraper'] = None + + return stats_data + def _run(self): _log('starting...', 'watchd') _log('config: db=%s checktype=%s threads=%d checktime=%d perfail=%d max_fail=%d' % ( @@ -1233,6 +1743,16 @@ class Proxywatchd(): 'SELECT COUNT(*) FROM proxylist WHERE failed >= 0 and failed < ? and (tested + ? + (failed * ?)) < ?', (config.watchd.max_fail, config.watchd.checktime, config.watchd.perfail_checktime, now) ).fetchone()[0] + + # Create stats persistence tables + dbs.create_table_if_not_exists(self.mysqlite, 'stats_history') + dbs.create_table_if_not_exists(self.mysqlite, 'session_state') + + # Load persisted session state + saved_state = dbs.load_session_state(self.mysqlite) + if saved_state: + self.stats.load_state(saved_state) + self._close_db() _log('database: %d total proxies, %d due for testing' % (total, due), 'watchd') @@ -1245,13 +1765,14 @@ class Proxywatchd(): self.httpd_server = ProxyAPIServer( config.httpd.listenip, config.httpd.port, - config.watchd.database + config.watchd.database, + stats_provider=self.get_runtime_stats ) self.httpd_server.start() # create worker threads with shared queues for i in range(config.watchd.threads): - threadid = ''.join([random.choice(string.letters) for x in range(5)]) + threadid = ''.join([random.choice(string.ascii_letters) for x in range(5)]) wt = WorkerThread(threadid, self.job_queue, self.result_queue) if self.in_background: wt.start_thread() @@ -1292,6 +1813,9 @@ class Proxywatchd(): _log("zzZzZzzZ sleeping 1 minute(s) due to tor issues", "watchd") sleeptime = 60 + # Update rate history for sparklines + self.stats.update_history() + # periodic stats report if self.stats.should_report(config.watchd.stats_interval): _log(self.stats.report(), 'stats') @@ -1305,6 +1829,27 @@ class Proxywatchd(): # Report scaler status _log(self.scaler.status_line(len(self.threads), self.job_queue.qsize()), 'scaler') + # Save session state periodically (every stats_interval, default 5m) + try: + self._prep_db() + dbs.save_session_state(self.mysqlite, self.stats) + self._close_db() + except Exception as e: + _log('failed to save session state: %s' % str(e), 'warn') + + # Hourly stats snapshot + now = time.time() + if not hasattr(self, '_last_snapshot'): + self._last_snapshot = now + if (now - self._last_snapshot) >= 3600: + try: + self._prep_db() + dbs.save_stats_snapshot(self.mysqlite, self.stats) + self._close_db() + self._last_snapshot = now + except Exception as e: + _log('failed to save stats snapshot: %s' % str(e), 'warn') + # Dynamic thread scaling if self.in_background: target = self.scaler.should_scale(