proxywatchd: gevent concurrency and comprehensive stats

Major features:
- gevent monkey-patching for cooperative concurrency
- Stats class with latency percentiles, histograms, geo tracking
- session state persistence across restarts
- JudgeStats with cooldown for blocked/rate-limited judges
- ThreadScaler for dynamic greenlet pool sizing
- SIGHUP handler for config hot-reload
- SSL/TLS tracking with MITM detection
- anonymity detection (transparent/anonymous/elite)

Bug fixes:
- rwip(): fix string comparison (b[0] == '0' not == 0)
- rwip(): fix strip logic (b[1:] not b[:1])
- use string.ascii_letters for Py2/3 compatibility
This commit is contained in:
Username
2025-12-23 17:23:45 +01:00
parent e7478de79e
commit c0dbba7b45

View File

@@ -1,6 +1,17 @@
#!/usr/bin/env python2
import threading
# Gevent monkey-patching MUST happen before any other imports
# This patches threading, socket, time, etc. for cooperative concurrency
from gevent import monkey
monkey.patch_all()
import gevent
from gevent.pool import Pool as GreenPool
from gevent.queue import Queue as GreenQueue
from gevent.event import Event as GreenEvent
from gevent.lock import Semaphore as GreenLock
import threading # Now patched by gevent
import time
import random
import string
@@ -36,6 +47,14 @@ from misc import _log, categorize_error
import rocksock
import connection_pool
# Optional scraper integration for engine stats
try:
import scraper as scraper_module
scraper_available = True
except ImportError:
scraper_module = None
scraper_available = False
config = Config()
_run_standalone = False
@@ -156,6 +175,26 @@ class JudgeStats():
(time.time() - s['last_block']) < self.cooldown_seconds)
return 'judges: %d total, %d in cooldown' % (total, blocked)
def get_stats(self):
"""Return statistics dict for API/dashboard."""
with self.lock:
now = time.time()
total = len(self.stats)
in_cooldown = sum(1 for s in self.stats.values()
if s['block'] >= self.block_threshold and
(now - s['last_block']) < self.cooldown_seconds)
available = total - in_cooldown
# Get top judges by success count
top = []
for judge, s in self.stats.items():
total_tests = s['success'] + s['fail']
if total_tests > 0:
success_pct = (s['success'] * 100.0) / total_tests
top.append({'judge': judge, 'success': s['success'],
'tests': total_tests, 'rate': round(success_pct, 1)})
top.sort(key=lambda x: x['success'], reverse=True)
return {'total': total, 'available': available, 'in_cooldown': in_cooldown, 'top': top}
# Global judge stats instance
judge_stats = JudgeStats()
@@ -224,28 +263,316 @@ ssl_targets = [
class Stats():
"""Track and report runtime statistics."""
"""Track and report comprehensive runtime statistics."""
HISTORY_SIZE = 120 # 10 min at 5s intervals
LATENCY_BUCKETS = [100, 250, 500, 1000, 2000, 5000, 10000] # ms thresholds
def __init__(self):
self.lock = threading.Lock()
self.lock = threading.RLock() # RLock for reentrant access (get_runtime_stats)
self.tested = 0
self.passed = 0
self.failed = 0
self.start_time = time.time()
self.last_report = time.time()
# Failure category tracking
self.fail_categories = {}
def record(self, success, category=None):
# Protocol tracking (tested and passed separately)
self.proto_tested = {'http': 0, 'socks4': 0, 'socks5': 0}
self.proto_passed = {'http': 0, 'socks4': 0, 'socks5': 0}
# Legacy alias for compatibility
self.by_proto = self.proto_passed
# Time series history (5s intervals)
self.rate_history = []
self.success_rate_history = []
self.latency_history = []
self.last_history_time = time.time()
self.last_history_tested = 0
self.last_history_passed = 0
# Peak values
self.peak_rate = 0.0
self.peak_success_rate = 0.0
self.min_latency = float('inf')
self.max_latency = 0.0
# Latency tracking with percentiles
self.latency_sum = 0.0
self.latency_count = 0
self.latency_samples = [] # Recent samples for percentiles
self.latency_buckets = {b: 0 for b in self.LATENCY_BUCKETS + [float('inf')]}
# Recent window (last 60s)
self.recent_tested = 0
self.recent_passed = 0
self.recent_start = time.time()
# Country/ASN tracking (top N)
self.country_passed = {}
self.asn_passed = {}
# Hourly aggregates
self.hourly_tested = 0
self.hourly_passed = 0
self.hourly_start = time.time()
self.hours_data = [] # Last 24 hours
# SSL/TLS tracking
self.ssl_tested = 0
self.ssl_passed = 0
self.ssl_failed = 0
self.mitm_detected = 0
self.cert_errors = 0
def record(self, success, category=None, proto=None, latency_ms=None, country=None, asn=None,
ssl_test=False, mitm=False, cert_error=False):
with self.lock:
self.tested += 1
self.recent_tested += 1
self.hourly_tested += 1
# Track protocol tests
if proto and proto in self.proto_tested:
self.proto_tested[proto] += 1
if success:
self.passed += 1
self.recent_passed += 1
self.hourly_passed += 1
if proto and proto in self.proto_passed:
self.proto_passed[proto] += 1
if latency_ms and latency_ms > 0:
self.latency_sum += latency_ms
self.latency_count += 1
# Track min/max
if latency_ms < self.min_latency:
self.min_latency = latency_ms
if latency_ms > self.max_latency:
self.max_latency = latency_ms
# Keep recent samples for percentiles (max 1000)
self.latency_samples.append(latency_ms)
if len(self.latency_samples) > 1000:
self.latency_samples.pop(0)
# Bucket for histogram
for bucket in self.LATENCY_BUCKETS:
if latency_ms <= bucket:
self.latency_buckets[bucket] += 1
break
else:
self.latency_buckets[float('inf')] += 1
# Track country/ASN
if country:
self.country_passed[country] = self.country_passed.get(country, 0) + 1
if asn:
self.asn_passed[asn] = self.asn_passed.get(asn, 0) + 1
else:
self.failed += 1
if category:
self.fail_categories[category] = self.fail_categories.get(category, 0) + 1
# SSL/TLS tracking
if ssl_test:
self.ssl_tested += 1
if success:
self.ssl_passed += 1
else:
self.ssl_failed += 1
if mitm:
self.mitm_detected += 1
if cert_error:
self.cert_errors += 1
def update_history(self):
"""Update time series history (call periodically)."""
now = time.time()
with self.lock:
elapsed = now - self.last_history_time
if elapsed >= 5: # Update every 5 seconds
# Rate
tests_delta = self.tested - self.last_history_tested
rate = tests_delta / elapsed if elapsed > 0 else 0
self.rate_history.append(round(rate, 2))
if len(self.rate_history) > self.HISTORY_SIZE:
self.rate_history.pop(0)
if rate > self.peak_rate:
self.peak_rate = rate
# Success rate
passed_delta = self.passed - self.last_history_passed
sr = (passed_delta / tests_delta * 100) if tests_delta > 0 else 0
self.success_rate_history.append(round(sr, 1))
if len(self.success_rate_history) > self.HISTORY_SIZE:
self.success_rate_history.pop(0)
if sr > self.peak_success_rate:
self.peak_success_rate = sr
# Average latency for this interval
avg_lat = self.get_avg_latency()
self.latency_history.append(round(avg_lat, 0))
if len(self.latency_history) > self.HISTORY_SIZE:
self.latency_history.pop(0)
self.last_history_time = now
self.last_history_tested = self.tested
self.last_history_passed = self.passed
# Reset recent window every 60s
if now - self.recent_start >= 60:
self.recent_tested = 0
self.recent_passed = 0
self.recent_start = now
# Hourly aggregation
if now - self.hourly_start >= 3600:
self.hours_data.append({
'tested': self.hourly_tested,
'passed': self.hourly_passed,
'rate': self.hourly_passed / 3600.0 if self.hourly_tested > 0 else 0,
'success_rate': (self.hourly_passed / self.hourly_tested * 100) if self.hourly_tested > 0 else 0,
})
if len(self.hours_data) > 24:
self.hours_data.pop(0)
self.hourly_tested = 0
self.hourly_passed = 0
self.hourly_start = now
def get_recent_rate(self):
"""Get rate for last 60 seconds."""
with self.lock:
elapsed = time.time() - self.recent_start
if elapsed > 0:
return self.recent_tested / elapsed
return 0.0
def get_recent_success_rate(self):
"""Get success rate for last 60 seconds."""
with self.lock:
if self.recent_tested > 0:
return (self.recent_passed / self.recent_tested) * 100
return 0.0
def get_avg_latency(self):
"""Get average latency in ms."""
with self.lock:
if self.latency_count > 0:
return self.latency_sum / self.latency_count
return 0.0
def get_latency_percentiles(self):
"""Get latency percentiles (p50, p90, p99)."""
with self.lock:
if not self.latency_samples:
return {'p50': 0, 'p90': 0, 'p99': 0}
sorted_samples = sorted(self.latency_samples)
n = len(sorted_samples)
return {
'p50': sorted_samples[int(n * 0.50)] if n > 0 else 0,
'p90': sorted_samples[int(n * 0.90)] if n > 0 else 0,
'p99': sorted_samples[min(int(n * 0.99), n - 1)] if n > 0 else 0,
}
def get_latency_histogram(self):
"""Get latency distribution histogram."""
with self.lock:
total = sum(self.latency_buckets.values())
if total == 0:
return []
result = []
prev = 0
for bucket in self.LATENCY_BUCKETS:
count = self.latency_buckets[bucket]
result.append({
'range': '%d-%d' % (prev, bucket),
'count': count,
'pct': round(count / total * 100, 1),
})
prev = bucket
# Over max bucket
over = self.latency_buckets[float('inf')]
if over > 0:
result.append({
'range': '>%d' % self.LATENCY_BUCKETS[-1],
'count': over,
'pct': round(over / total * 100, 1),
})
return result
def get_proto_stats(self):
"""Get protocol-specific success rates."""
with self.lock:
result = {}
for proto in ['http', 'socks4', 'socks5']:
tested = self.proto_tested[proto]
passed = self.proto_passed[proto]
result[proto] = {
'tested': tested,
'passed': passed,
'success_rate': round(passed / tested * 100, 1) if tested > 0 else 0,
}
return result
def get_top_countries(self, limit=10):
"""Get top countries by working proxy count."""
with self.lock:
sorted_countries = sorted(self.country_passed.items(), key=lambda x: -x[1])
return sorted_countries[:limit]
def get_top_asns(self, limit=10):
"""Get top ASNs by working proxy count."""
with self.lock:
sorted_asns = sorted(self.asn_passed.items(), key=lambda x: -x[1])
return sorted_asns[:limit]
def get_hourly_data(self):
"""Get last 24 hours of hourly data."""
with self.lock:
return list(self.hours_data)
def load_state(self, state):
"""Load persisted state from a dict (from database).
Args:
state: dict from dbs.load_session_state()
"""
if not state:
return
with self.lock:
self.tested = state.get('tested', 0)
self.passed = state.get('passed', 0)
self.failed = state.get('failed', 0)
self.ssl_tested = state.get('ssl_tested', 0)
self.ssl_passed = state.get('ssl_passed', 0)
self.ssl_failed = state.get('ssl_failed', 0)
self.mitm_detected = state.get('mitm_detected', 0)
self.cert_errors = state.get('cert_errors', 0)
self.proto_tested['http'] = state.get('proto_http_tested', 0)
self.proto_passed['http'] = state.get('proto_http_passed', 0)
self.proto_tested['socks4'] = state.get('proto_socks4_tested', 0)
self.proto_passed['socks4'] = state.get('proto_socks4_passed', 0)
self.proto_tested['socks5'] = state.get('proto_socks5_tested', 0)
self.proto_passed['socks5'] = state.get('proto_socks5_passed', 0)
self.peak_rate = state.get('peak_rate', 0.0)
# Restore start_time to preserve uptime calculation
if state.get('start_time'):
self.start_time = state['start_time']
# Restore failure categories
if state.get('fail_categories'):
self.fail_categories = dict(state['fail_categories'])
# Restore geo tracking
if state.get('country_passed'):
self.country_passed = dict(state['country_passed'])
if state.get('asn_passed'):
# Convert string keys back to int for ASN
self.asn_passed = {int(k) if k.isdigit() else k: v
for k, v in state['asn_passed'].items()}
_log('restored session: %d tested, %d passed' % (self.tested, self.passed), 'info')
def should_report(self, interval):
return (time.time() - self.last_report) >= interval
@@ -263,6 +590,38 @@ class Stats():
return '%s [%s]' % (base, cats)
return base
def get_full_stats(self):
"""Get comprehensive stats dict for API."""
with self.lock:
elapsed = time.time() - self.start_time
return {
'tested': self.tested,
'passed': self.passed,
'failed': self.failed,
'success_rate': round(self.passed / self.tested * 100, 1) if self.tested > 0 else 0,
'rate': round(self.tested / elapsed, 2) if elapsed > 0 else 0,
'pass_rate': round(self.passed / elapsed, 2) if elapsed > 0 else 0,
'recent_rate': self.get_recent_rate(),
'recent_success_rate': self.get_recent_success_rate(),
'peak_rate': self.peak_rate,
'peak_success_rate': self.peak_success_rate,
'uptime_seconds': int(elapsed),
'rate_history': list(self.rate_history),
'success_rate_history': list(self.success_rate_history),
'latency_history': list(self.latency_history),
'avg_latency': self.get_avg_latency(),
'min_latency': self.min_latency if self.min_latency != float('inf') else 0,
'max_latency': self.max_latency,
'latency_percentiles': self.get_latency_percentiles(),
'latency_histogram': self.get_latency_histogram(),
'by_proto': dict(self.proto_passed),
'proto_stats': self.get_proto_stats(),
'failures': dict(self.fail_categories),
'top_countries': self.get_top_countries(),
'top_asns': self.get_top_asns(),
'hourly_data': self.get_hourly_data(),
}
def try_div(a, b):
if b != 0: return a/float(b)
@@ -357,6 +716,9 @@ def calculate_priority(failcount, success_count, max_fail):
class ThreadScaler(object):
"""Dynamic thread scaling based on queue depth and success rate.
With gevent greenlets, we can scale much more aggressively than with
OS threads since greenlets are cooperative and lightweight.
Scales up when:
- Queue depth exceeds high watermark
- Success rate is above threshold
@@ -365,12 +727,12 @@ class ThreadScaler(object):
- Success rate drops below threshold
"""
def __init__(self, min_threads=2, max_threads=50, target_queue_per_thread=10):
def __init__(self, min_threads=5, max_threads=100, target_queue_per_thread=5):
self.min_threads = min_threads
self.max_threads = max_threads
self.target_queue_per_thread = target_queue_per_thread
self.last_scale_time = 0
self.scale_cooldown = 30 # seconds between scaling decisions
self.scale_cooldown = 10 # seconds between scaling (faster with greenlets)
self.success_threshold = 10.0 # minimum success rate % to scale up
def should_scale(self, current_threads, queue_size, stats):
@@ -401,16 +763,17 @@ class ThreadScaler(object):
target = current_threads
# Scale up: queue is deep and success rate is acceptable
if queue_size > current_threads * self.target_queue_per_thread * 2:
# With greenlets, scale more aggressively (+5 instead of +2)
if queue_size > current_threads * self.target_queue_per_thread:
if success_rate >= self.success_threshold:
target = min(current_threads + 2, ideal, self.max_threads)
target = min(current_threads + 5, ideal, self.max_threads)
# Scale down: queue is shallow or success rate is poor
elif queue_size < current_threads * 2:
target = max(current_threads - 1, self.min_threads)
target = max(current_threads - 2, self.min_threads)
elif success_rate < self.success_threshold / 2:
# Drastic success rate drop - scale down to reduce load
target = max(current_threads - 2, self.min_threads)
target = max(current_threads - 5, self.min_threads)
if target != current_threads:
self.last_scale_time = now
@@ -476,6 +839,10 @@ class ProxyTestState():
self.last_latency_ms = None # average latency from successful tests
self.exit_ip = None # IP seen by target server (for anonymity detection)
self.reveals_headers = None # True if proxy adds X-Forwarded-For/Via headers
# SSL/TLS tracking
self.had_ssl_test = False
self.ssl_success = False
self.cert_error = False
def record_result(self, success, proto=None, duration=0, srv=None, tor=None, ssl=None, category=None, exit_ip=None, reveals_headers=None):
"""Record a single target test result. Thread-safe."""
@@ -491,6 +858,14 @@ class ProxyTestState():
'exit_ip': exit_ip,
'reveals_headers': reveals_headers
})
# Track SSL tests
if ssl:
self.had_ssl_test = True
if success:
self.ssl_success = True
# Track cert errors
if category == 'cert_error' or category == 'ssl_error':
self.cert_error = True
def is_complete(self):
"""Check if all target tests have finished."""
@@ -498,9 +873,11 @@ class ProxyTestState():
return len(self.results) >= self.num_targets
def rwip(self, ip):
"""Remove leading zeros from IP octets (e.g., 192.168.001.010 -> 192.168.1.10)."""
n = []
for b in ip.split('.'):
while b[0] == 0 and len(b) > 1: b = b[:1]
while b[0] == '0' and len(b) > 1:
b = b[1:]
n.append(b)
return '.'.join(n)
@@ -866,6 +1243,16 @@ class Proxywatchd():
success_rate = try_div(self.totals['success'], self.totals['submitted']) * 100
_log("total results: %d/%d (%.2f%%)"%(self.totals['success'], self.totals['submitted'], success_rate), "watchd")
# Final save of session state before exit
try:
self._prep_db()
dbs.save_session_state(self.mysqlite, self.stats)
dbs.save_stats_snapshot(self.mysqlite, self.stats)
self._close_db()
_log('session state saved', 'watchd')
except Exception as e:
_log('failed to save final session state: %s' % str(e), 'warn')
def _prep_db(self):
self.mysqlite = mysqlite.mysqlite(config.watchd.database, str)
def _close_db(self):
@@ -902,11 +1289,12 @@ class Proxywatchd():
self.last_cleanup = time.time()
self.httpd_server = None
# Dynamic thread scaling
# Dynamic thread scaling (with gevent, greenlets are cheap - use higher limits)
min_t = config.watchd.min_threads if config.watchd.min_threads > 0 else max(5, config.watchd.threads // 2)
self.scaler = ThreadScaler(
min_threads=max(2, config.watchd.threads // 4),
max_threads=config.watchd.threads * 2,
target_queue_per_thread=10
min_threads=min_t,
max_threads=config.watchd.threads * 10, # greenlets allow much higher concurrency
target_queue_per_thread=5 # lower threshold for faster scaling
)
self.thread_id_counter = 0
self.last_jobs_log = 0
@@ -950,9 +1338,9 @@ class Proxywatchd():
# Update runtime values
self.submit_after = config.watchd.submit_after
# Update scaler limits
self.scaler.min_threads = max(2, config.watchd.threads // 4)
self.scaler.max_threads = config.watchd.threads * 2
# Update scaler limits (greenlets allow higher concurrency)
self.scaler.min_threads = max(5, config.watchd.threads // 2)
self.scaler.max_threads = config.watchd.threads * 10
# Warn about values requiring restart
if config.watchd.checktype != old_checktype:
@@ -1111,7 +1499,10 @@ class Proxywatchd():
for state in self.pending_states:
if state.is_complete():
success, category = state.evaluate()
self.stats.record(success, category)
self.stats.record(success, category, state.proto, state.last_latency_ms,
state.country, state.asn,
ssl_test=state.had_ssl_test, mitm=(state.mitm > 0),
cert_error=state.cert_error)
self.collected.append(state)
else:
still_pending.append(state)
@@ -1219,6 +1610,125 @@ class Proxywatchd():
t = threading.Thread(target=self._run)
t.start()
def get_runtime_stats(self):
"""Return runtime statistics for the dashboard.
Returns:
dict with tested, passed, success_rate, rate, uptime_seconds,
threads, failures, tor_pool, and engine stats.
"""
stats_data = {}
# Stats from Stats object
with self.stats.lock:
stats_data['tested'] = self.stats.tested
stats_data['passed'] = self.stats.passed
stats_data['failed'] = self.stats.failed
elapsed = time.time() - self.stats.start_time
stats_data['uptime_seconds'] = int(elapsed)
stats_data['rate'] = try_div(self.stats.tested, elapsed)
stats_data['success_rate'] = try_div(self.stats.passed * 100.0, self.stats.tested)
stats_data['failures'] = dict(self.stats.fail_categories)
# New: Protocol breakdown
stats_data['by_proto'] = dict(self.stats.by_proto)
# New: Rate history for sparklines
stats_data['rate_history'] = list(self.stats.rate_history)
stats_data['success_rate_history'] = list(self.stats.success_rate_history)
# New: Peak rate
stats_data['peak_rate'] = self.stats.peak_rate
# New: Recent rate (last 60s) and avg latency
stats_data['recent_rate'] = self.stats.get_recent_rate()
stats_data['recent_success_rate'] = self.stats.get_recent_success_rate()
stats_data['avg_latency'] = self.stats.get_avg_latency()
# Latency stats
stats_data['min_latency'] = self.stats.min_latency if self.stats.min_latency != float('inf') else 0
stats_data['max_latency'] = self.stats.max_latency
stats_data['latency_percentiles'] = self.stats.get_latency_percentiles()
stats_data['latency_histogram'] = self.stats.get_latency_histogram()
# Protocol performance (tested, passed, rate per protocol)
stats_data['proto_stats'] = self.stats.get_proto_stats()
# Geographic distribution from session
stats_data['top_countries_session'] = self.stats.get_top_countries(10)
stats_data['top_asns_session'] = self.stats.get_top_asns(10)
# SSL/TLS stats
with self.stats.lock:
ssl_tested = self.stats.ssl_tested
ssl_passed = self.stats.ssl_passed
stats_data['ssl'] = {
'tested': ssl_tested,
'passed': ssl_passed,
'failed': self.stats.ssl_failed,
'success_rate': try_div(ssl_passed * 100.0, ssl_tested) if ssl_tested > 0 else 0,
'mitm_detected': self.stats.mitm_detected,
'cert_errors': self.stats.cert_errors
}
# Thread info
stats_data['threads'] = len(self.threads)
stats_data['min_threads'] = self.scaler.min_threads
stats_data['max_threads'] = self.scaler.max_threads
stats_data['queue_size'] = self.job_queue.qsize()
stats_data['checktype'] = config.watchd.checktype
stats_data['pass_rate'] = try_div(self.stats.passed, elapsed)
# Tor pool stats
pool = connection_pool.get_pool()
if pool:
pool_stats = pool.get_stats()
pool_info = {
'hosts': [],
'total_requests': pool_stats.get('total_requests', 0),
'success_rate': pool_stats.get('success_rate', 0)
}
for h in pool_stats.get('hosts', []):
pool_info['hosts'].append({
'address': h.get('host', ''),
'healthy': h.get('available', False),
'latency_ms': h.get('avg_latency', 0) * 1000 if h.get('avg_latency') else None,
'success_rate': h.get('success_rate', 0),
})
stats_data['tor_pool'] = pool_info
else:
stats_data['tor_pool'] = {'hosts': [], 'total_requests': 0, 'success_rate': 0}
# Judge stats (when using judges checktype)
if config.watchd.checktype == 'judges':
js = judge_stats.get_stats()
stats_data['judges'] = {
'total': js.get('total', 0),
'available': js.get('available', 0),
'in_cooldown': js.get('in_cooldown', 0),
'top_judges': js.get('top', [])[:5] # top 5 most successful
}
else:
stats_data['judges'] = None
# Scraper/engine stats
if scraper_available:
scraper_stats = scraper_module.get_scraper_stats()
if scraper_stats:
stats_data['engines_available'] = scraper_stats.get('available', 0)
stats_data['engines_total'] = scraper_stats.get('total', 0)
stats_data['engines_backoff'] = scraper_stats.get('in_backoff', 0)
stats_data['scraper'] = scraper_stats
else:
stats_data['engines_available'] = 0
stats_data['engines_total'] = 0
stats_data['engines_backoff'] = 0
stats_data['scraper'] = None
else:
stats_data['engines_available'] = 0
stats_data['engines_total'] = 0
stats_data['engines_backoff'] = 0
stats_data['scraper'] = None
return stats_data
def _run(self):
_log('starting...', 'watchd')
_log('config: db=%s checktype=%s threads=%d checktime=%d perfail=%d max_fail=%d' % (
@@ -1233,6 +1743,16 @@ class Proxywatchd():
'SELECT COUNT(*) FROM proxylist WHERE failed >= 0 and failed < ? and (tested + ? + (failed * ?)) < ?',
(config.watchd.max_fail, config.watchd.checktime, config.watchd.perfail_checktime, now)
).fetchone()[0]
# Create stats persistence tables
dbs.create_table_if_not_exists(self.mysqlite, 'stats_history')
dbs.create_table_if_not_exists(self.mysqlite, 'session_state')
# Load persisted session state
saved_state = dbs.load_session_state(self.mysqlite)
if saved_state:
self.stats.load_state(saved_state)
self._close_db()
_log('database: %d total proxies, %d due for testing' % (total, due), 'watchd')
@@ -1245,13 +1765,14 @@ class Proxywatchd():
self.httpd_server = ProxyAPIServer(
config.httpd.listenip,
config.httpd.port,
config.watchd.database
config.watchd.database,
stats_provider=self.get_runtime_stats
)
self.httpd_server.start()
# create worker threads with shared queues
for i in range(config.watchd.threads):
threadid = ''.join([random.choice(string.letters) for x in range(5)])
threadid = ''.join([random.choice(string.ascii_letters) for x in range(5)])
wt = WorkerThread(threadid, self.job_queue, self.result_queue)
if self.in_background:
wt.start_thread()
@@ -1292,6 +1813,9 @@ class Proxywatchd():
_log("zzZzZzzZ sleeping 1 minute(s) due to tor issues", "watchd")
sleeptime = 60
# Update rate history for sparklines
self.stats.update_history()
# periodic stats report
if self.stats.should_report(config.watchd.stats_interval):
_log(self.stats.report(), 'stats')
@@ -1305,6 +1829,27 @@ class Proxywatchd():
# Report scaler status
_log(self.scaler.status_line(len(self.threads), self.job_queue.qsize()), 'scaler')
# Save session state periodically (every stats_interval, default 5m)
try:
self._prep_db()
dbs.save_session_state(self.mysqlite, self.stats)
self._close_db()
except Exception as e:
_log('failed to save session state: %s' % str(e), 'warn')
# Hourly stats snapshot
now = time.time()
if not hasattr(self, '_last_snapshot'):
self._last_snapshot = now
if (now - self._last_snapshot) >= 3600:
try:
self._prep_db()
dbs.save_stats_snapshot(self.mysqlite, self.stats)
self._close_db()
self._last_snapshot = now
except Exception as e:
_log('failed to save stats snapshot: %s' % str(e), 'warn')
# Dynamic thread scaling
if self.in_background:
target = self.scaler.should_scale(