1345 lines
50 KiB
Python
1345 lines
50 KiB
Python
#!/usr/bin/env python2
|
|
|
|
import threading
|
|
import time
|
|
import random
|
|
import string
|
|
import re
|
|
import heapq
|
|
import signal
|
|
|
|
import os
|
|
try:
|
|
import Queue
|
|
except ImportError:
|
|
import queue as Queue
|
|
try:
|
|
import IP2Location
|
|
geodb = IP2Location.IP2Location(os.path.join("data", "IP2LOCATION-LITE-DB1.BIN"))
|
|
geolite = True
|
|
except (ImportError, IOError, ValueError):
|
|
geolite = False
|
|
|
|
try:
|
|
import pyasn
|
|
asndb = pyasn.pyasn(os.path.join("data", "ipasn.dat"))
|
|
asn_enabled = True
|
|
except (ImportError, IOError, NameError):
|
|
asndb = None
|
|
asn_enabled = False
|
|
|
|
from config import Config
|
|
|
|
import mysqlite
|
|
import dbs
|
|
from misc import _log, categorize_error
|
|
import rocksock
|
|
import connection_pool
|
|
|
|
config = Config()
|
|
|
|
_run_standalone = False
|
|
cached_dns = {}
|
|
|
|
# IP pattern for judge services (validates response contains valid IP in body)
|
|
IP_PATTERN = r'\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}'
|
|
# Pattern for header echo - if X-Forwarded-For or Via present, proxy reveals chain
|
|
HEADER_REVEAL_PATTERN = r'(X-Forwarded-For|Via|X-Real-Ip|Forwarded)'
|
|
|
|
# Patterns indicating judge is blocking the proxy (not a proxy failure)
|
|
# These should NOT count as proxy failures - retry with different judge
|
|
JUDGE_BLOCK_PATTERNS = [
|
|
r'HTTP/1\.[01] 403', # Forbidden
|
|
r'HTTP/1\.[01] 429', # Too Many Requests
|
|
r'HTTP/1\.[01] 503', # Service Unavailable
|
|
r'captcha', # Captcha challenge
|
|
r'challenge', # Generic challenge
|
|
r'verify you are human', # Human verification
|
|
r'rate.?limit', # Rate limiting
|
|
r'too many requests', # Rate limiting
|
|
r'access.?denied', # Access denied
|
|
r'blocked', # Explicit block
|
|
r'Checking your browser', # Cloudflare JS challenge
|
|
]
|
|
JUDGE_BLOCK_RE = re.compile('|'.join(JUDGE_BLOCK_PATTERNS), re.IGNORECASE)
|
|
|
|
# Check types: irc, http (header match), judges (body match), ssl (TLS handshake)
|
|
# Judge services - return IP in body (plain text, JSON, or HTML)
|
|
judges = {
|
|
# Plain text IP
|
|
'api.ipify.org': IP_PATTERN,
|
|
'icanhazip.com': IP_PATTERN,
|
|
'checkip.amazonaws.com': IP_PATTERN,
|
|
'ifconfig.me/ip': IP_PATTERN,
|
|
'ipinfo.io/ip': IP_PATTERN,
|
|
'ip.me': IP_PATTERN,
|
|
'myip.dnsomatic.com': IP_PATTERN,
|
|
'ident.me': IP_PATTERN,
|
|
'ipecho.net/plain': IP_PATTERN,
|
|
'wtfismyip.com/text': IP_PATTERN,
|
|
'eth0.me': IP_PATTERN,
|
|
'l2.io/ip': IP_PATTERN,
|
|
'tnx.nl/ip': IP_PATTERN,
|
|
'wgetip.com': IP_PATTERN,
|
|
'curlmyip.net': IP_PATTERN,
|
|
# JSON responses (IP pattern matches within JSON)
|
|
'httpbin.org/ip': IP_PATTERN,
|
|
'ip-api.com/json': IP_PATTERN,
|
|
'ipapi.co/json': IP_PATTERN,
|
|
'ipwhois.app/json': IP_PATTERN,
|
|
# Header echo - for elite detection (check if proxy adds revealing headers)
|
|
'httpbin.org/headers': IP_PATTERN, # returns request headers as JSON
|
|
}
|
|
|
|
|
|
class JudgeStats():
|
|
"""Track per-judge success/failure rates for reliability scoring.
|
|
|
|
Judges that frequently block or rate-limit are temporarily avoided.
|
|
Stats decay over time to allow recovery.
|
|
"""
|
|
|
|
def __init__(self, cooldown_seconds=300, block_threshold=3):
|
|
self.lock = threading.Lock()
|
|
self.stats = {} # judge -> {'success': n, 'fail': n, 'block': n, 'last_block': timestamp}
|
|
self.cooldown_seconds = cooldown_seconds # seconds to avoid blocked judges
|
|
self.block_threshold = block_threshold # consecutive blocks before cooldown
|
|
|
|
def record_success(self, judge):
|
|
"""Record successful judge response."""
|
|
with self.lock:
|
|
if judge not in self.stats:
|
|
self.stats[judge] = {'success': 0, 'fail': 0, 'block': 0, 'last_block': 0}
|
|
self.stats[judge]['success'] += 1
|
|
# Reset block count on success
|
|
self.stats[judge]['block'] = 0
|
|
|
|
def record_failure(self, judge):
|
|
"""Record judge failure (proxy failed, not judge block)."""
|
|
with self.lock:
|
|
if judge not in self.stats:
|
|
self.stats[judge] = {'success': 0, 'fail': 0, 'block': 0, 'last_block': 0}
|
|
self.stats[judge]['fail'] += 1
|
|
|
|
def record_block(self, judge):
|
|
"""Record judge blocking the proxy (403, captcha, rate-limit)."""
|
|
with self.lock:
|
|
if judge not in self.stats:
|
|
self.stats[judge] = {'success': 0, 'fail': 0, 'block': 0, 'last_block': 0}
|
|
self.stats[judge]['block'] += 1
|
|
self.stats[judge]['last_block'] = time.time()
|
|
|
|
def is_available(self, judge):
|
|
"""Check if judge is available (not in cooldown)."""
|
|
with self.lock:
|
|
if judge not in self.stats:
|
|
return True
|
|
s = self.stats[judge]
|
|
# Check if in cooldown period
|
|
if s['block'] >= self.block_threshold:
|
|
if (time.time() - s['last_block']) < self.cooldown_seconds:
|
|
return False
|
|
# Cooldown expired, reset block count
|
|
s['block'] = 0
|
|
return True
|
|
|
|
def get_available_judges(self, judge_list):
|
|
"""Return list of judges not in cooldown."""
|
|
return [j for j in judge_list if self.is_available(j)]
|
|
|
|
def status_line(self):
|
|
"""Return status summary for logging."""
|
|
with self.lock:
|
|
total = len(self.stats)
|
|
blocked = sum(1 for s in self.stats.values()
|
|
if s['block'] >= self.block_threshold and
|
|
(time.time() - s['last_block']) < self.cooldown_seconds)
|
|
return 'judges: %d total, %d in cooldown' % (total, blocked)
|
|
|
|
|
|
# Global judge stats instance
|
|
judge_stats = JudgeStats()
|
|
|
|
# HTTP targets - check for specific headers
|
|
regexes = {
|
|
'www.facebook.com': 'X-FB-Debug',
|
|
'www.fbcdn.net': 'X-FB-Debug',
|
|
'www.reddit.com': 'x-clacks-overhead',
|
|
'www.twitter.com': 'x-connection-hash',
|
|
't.co': 'x-connection-hash',
|
|
'www.msn.com': 'x-aspnetmvc-version',
|
|
'www.bing.com': 'p3p',
|
|
'www.ask.com': 'x-served-by',
|
|
'www.hotmail.com': 'x-msedge-ref',
|
|
'www.bbc.co.uk': 'x-bbc-edge-cache-status',
|
|
'www.skype.com': 'X-XSS-Protection',
|
|
'www.alibaba.com': 'object-status',
|
|
'www.mozilla.org': 'cf-ray',
|
|
'www.cloudflare.com': 'cf-ray',
|
|
'www.wikimedia.org': 'x-client-ip',
|
|
'www.vk.com': 'x-frontend',
|
|
'www.tinypic.com': 'x-amz-cf-pop',
|
|
'www.netflix.com': 'X-Netflix.proxy.execution-time',
|
|
'www.amazon.de': 'x-amz-cf-id',
|
|
'www.reuters.com': 'x-amz-cf-id',
|
|
'www.ikea.com': 'x-frame-options',
|
|
'www.twitpic.com': 'timing-allow-origin',
|
|
'www.digg.com': 'cf-request-id',
|
|
'www.wikia.com': 'x-served-by',
|
|
'www.wp.com': 'x-ac',
|
|
'www.last.fm': 'x-timer',
|
|
'www.usps.com': 'x-ruleset-version',
|
|
'www.linkedin.com': 'x-li-uuid',
|
|
'www.vimeo.com': 'x-timer',
|
|
'www.yelp.com': 'x-timer',
|
|
'www.ebay.com': 'x-envoy-upstream-service-time',
|
|
'www.wikihow.com': 'x-c',
|
|
'www.archive.org': 'referrer-policy',
|
|
'www.pandora.tv': 'X-UA-Compatible',
|
|
'www.w3.org': 'x-backend',
|
|
'www.time.com': 'x-amz-cf-pop'
|
|
}
|
|
|
|
# SSL targets - verify TLS handshake only (MITM detection)
|
|
# No HTTP request needed - just connect with verifycert=True
|
|
ssl_targets = [
|
|
'www.google.com',
|
|
'www.microsoft.com',
|
|
'www.apple.com',
|
|
'www.amazon.com',
|
|
'www.cloudflare.com',
|
|
'www.github.com',
|
|
'www.mozilla.org',
|
|
'www.wikipedia.org',
|
|
'www.reddit.com',
|
|
'www.twitter.com',
|
|
'x.com',
|
|
'www.facebook.com',
|
|
'www.linkedin.com',
|
|
'www.paypal.com',
|
|
'www.stripe.com',
|
|
'www.digicert.com',
|
|
'www.letsencrypt.org',
|
|
]
|
|
|
|
|
|
class Stats():
|
|
"""Track and report runtime statistics."""
|
|
|
|
def __init__(self):
|
|
self.lock = threading.Lock()
|
|
self.tested = 0
|
|
self.passed = 0
|
|
self.failed = 0
|
|
self.start_time = time.time()
|
|
self.last_report = time.time()
|
|
# Failure category tracking
|
|
self.fail_categories = {}
|
|
|
|
def record(self, success, category=None):
|
|
with self.lock:
|
|
self.tested += 1
|
|
if success:
|
|
self.passed += 1
|
|
else:
|
|
self.failed += 1
|
|
if category:
|
|
self.fail_categories[category] = self.fail_categories.get(category, 0) + 1
|
|
|
|
def should_report(self, interval):
|
|
return (time.time() - self.last_report) >= interval
|
|
|
|
def report(self):
|
|
with self.lock:
|
|
self.last_report = time.time()
|
|
elapsed = time.time() - self.start_time
|
|
rate = try_div(self.tested, elapsed)
|
|
pct = try_div(self.passed * 100.0, self.tested)
|
|
base = 'tested=%d passed=%d (%.1f%%) rate=%.2f/s uptime=%dm' % (
|
|
self.tested, self.passed, pct, rate, int(elapsed / 60))
|
|
# Add failure breakdown if there are failures
|
|
if self.fail_categories:
|
|
cats = ' '.join('%s=%d' % (k, v) for k, v in sorted(self.fail_categories.items()))
|
|
return '%s [%s]' % (base, cats)
|
|
return base
|
|
|
|
|
|
def try_div(a, b):
|
|
if b != 0: return a/float(b)
|
|
return 0
|
|
|
|
|
|
class PriorityJobQueue(object):
|
|
"""Priority queue for proxy test jobs.
|
|
|
|
Lower priority number = higher priority.
|
|
Priority 0: New proxies (never tested)
|
|
Priority 1: Recently working (no failures, has successes)
|
|
Priority 2: Low fail count (< 3 failures)
|
|
Priority 3: Medium fail count
|
|
Priority 4: High fail count
|
|
"""
|
|
|
|
def __init__(self):
|
|
self.heap = []
|
|
self.lock = threading.Lock()
|
|
self.not_empty = threading.Condition(self.lock)
|
|
self.counter = 0 # tie-breaker for equal priorities
|
|
|
|
def put(self, job, priority=3):
|
|
"""Add job with priority (lower = higher priority)."""
|
|
with self.lock:
|
|
heapq.heappush(self.heap, (priority, self.counter, job))
|
|
self.counter += 1
|
|
self.not_empty.notify()
|
|
|
|
def get(self, timeout=None):
|
|
"""Get highest priority job. Raises Queue.Empty on timeout."""
|
|
with self.not_empty:
|
|
if timeout is None:
|
|
while not self.heap:
|
|
self.not_empty.wait()
|
|
else:
|
|
end_time = time.time() + timeout
|
|
while not self.heap:
|
|
remaining = end_time - time.time()
|
|
if remaining <= 0:
|
|
raise Queue.Empty()
|
|
self.not_empty.wait(remaining)
|
|
_, _, job = heapq.heappop(self.heap)
|
|
return job
|
|
|
|
def get_nowait(self):
|
|
"""Get job without waiting. Raises Queue.Empty if empty."""
|
|
with self.lock:
|
|
if not self.heap:
|
|
raise Queue.Empty()
|
|
_, _, job = heapq.heappop(self.heap)
|
|
return job
|
|
|
|
def empty(self):
|
|
"""Check if queue is empty."""
|
|
with self.lock:
|
|
return len(self.heap) == 0
|
|
|
|
def qsize(self):
|
|
"""Return queue size."""
|
|
with self.lock:
|
|
return len(self.heap)
|
|
|
|
def task_done(self):
|
|
"""Compatibility method (no-op for heap queue)."""
|
|
pass
|
|
|
|
|
|
def calculate_priority(failcount, success_count, max_fail):
|
|
"""Calculate job priority based on proxy state.
|
|
|
|
Returns:
|
|
int: Priority 0-4 (lower = higher priority)
|
|
"""
|
|
# New proxy (never successfully tested)
|
|
if success_count == 0 and failcount == 0:
|
|
return 0
|
|
# Recently working (no current failures)
|
|
if failcount == 0:
|
|
return 1
|
|
# Low fail count
|
|
if failcount < 3:
|
|
return 2
|
|
# Medium fail count
|
|
if failcount < max_fail // 2:
|
|
return 3
|
|
# High fail count
|
|
return 4
|
|
|
|
|
|
class ThreadScaler(object):
|
|
"""Dynamic thread scaling based on queue depth and success rate.
|
|
|
|
Scales up when:
|
|
- Queue depth exceeds high watermark
|
|
- Success rate is above threshold
|
|
Scales down when:
|
|
- Queue is nearly empty
|
|
- Success rate drops below threshold
|
|
"""
|
|
|
|
def __init__(self, min_threads=2, max_threads=50, target_queue_per_thread=10):
|
|
self.min_threads = min_threads
|
|
self.max_threads = max_threads
|
|
self.target_queue_per_thread = target_queue_per_thread
|
|
self.last_scale_time = 0
|
|
self.scale_cooldown = 30 # seconds between scaling decisions
|
|
self.success_threshold = 10.0 # minimum success rate % to scale up
|
|
|
|
def should_scale(self, current_threads, queue_size, stats):
|
|
"""Determine if scaling is needed.
|
|
|
|
Args:
|
|
current_threads: Current number of active threads
|
|
queue_size: Current job queue depth
|
|
stats: Stats object with success/fail counts
|
|
|
|
Returns:
|
|
int: Target thread count (may be same as current)
|
|
"""
|
|
now = time.time()
|
|
if (now - self.last_scale_time) < self.scale_cooldown:
|
|
return current_threads
|
|
|
|
# Calculate current success rate
|
|
with stats.lock:
|
|
total = stats.tested
|
|
passed = stats.passed
|
|
success_rate = try_div(passed * 100.0, total) if total > 0 else 50.0
|
|
|
|
# Calculate ideal thread count based on queue depth
|
|
ideal = max(self.min_threads, queue_size // self.target_queue_per_thread)
|
|
ideal = min(ideal, self.max_threads)
|
|
|
|
target = current_threads
|
|
|
|
# Scale up: queue is deep and success rate is acceptable
|
|
if queue_size > current_threads * self.target_queue_per_thread * 2:
|
|
if success_rate >= self.success_threshold:
|
|
target = min(current_threads + 2, ideal, self.max_threads)
|
|
|
|
# Scale down: queue is shallow or success rate is poor
|
|
elif queue_size < current_threads * 2:
|
|
target = max(current_threads - 1, self.min_threads)
|
|
elif success_rate < self.success_threshold / 2:
|
|
# Drastic success rate drop - scale down to reduce load
|
|
target = max(current_threads - 2, self.min_threads)
|
|
|
|
if target != current_threads:
|
|
self.last_scale_time = now
|
|
|
|
return target
|
|
|
|
def status_line(self, current_threads, queue_size):
|
|
"""Return status string for logging."""
|
|
return 'threads=%d queue=%d target_per_thread=%d' % (
|
|
current_threads, queue_size, self.target_queue_per_thread)
|
|
|
|
def socks4_resolve(srvname, server_port):
|
|
srv = srvname
|
|
if srv in cached_dns:
|
|
srv = cached_dns[srvname]
|
|
if config.watchd.debug:
|
|
_log("using cached ip (%s) for %s"%(srv, srvname), "debug")
|
|
else:
|
|
dns_fail = False
|
|
try:
|
|
af, sa = rocksock.resolve(rocksock.RocksockHostinfo(srvname, server_port), want_v4=True)
|
|
if sa is not None:
|
|
cached_dns[srvname] = sa[0]
|
|
srv = sa[0]
|
|
else: dns_fail = True
|
|
except rocksock.RocksockException as e:
|
|
assert(e.get_errortype() == rocksock.RS_ET_GAI)
|
|
dns_fail = True
|
|
if dns_fail:
|
|
fail_inc = 0
|
|
_log("could not resolve connection target %s"%srvname, "ERROR")
|
|
return False
|
|
return srv
|
|
|
|
|
|
class ProxyTestState():
|
|
"""Thread-safe state for a proxy being tested against multiple targets.
|
|
|
|
Results from TargetTestJob instances are aggregated here.
|
|
When all tests complete, evaluate() determines final pass/fail.
|
|
"""
|
|
def __init__(self, ip, port, proto, failcount, success_count, total_duration,
|
|
country, mitm, consecutive_success, asn=None, num_targets=3, oldies=False):
|
|
self.ip = ip
|
|
self.port = int(port)
|
|
self.proxy = '%s:%s' % (ip, port)
|
|
self.proto = proto
|
|
self.failcount = failcount
|
|
self.checktime = None
|
|
self.success_count = success_count
|
|
self.total_duration = total_duration
|
|
self.country = country
|
|
self.mitm = mitm
|
|
self.consecutive_success = consecutive_success
|
|
self.asn = asn
|
|
self.isoldies = oldies
|
|
self.num_targets = num_targets
|
|
|
|
# thread-safe result accumulation
|
|
self.lock = threading.Lock()
|
|
self.results = [] # list of (success, proto, duration, srv, tor, ssl)
|
|
self.completed = False
|
|
self.last_latency_ms = None # average latency from successful tests
|
|
self.exit_ip = None # IP seen by target server (for anonymity detection)
|
|
self.reveals_headers = None # True if proxy adds X-Forwarded-For/Via headers
|
|
|
|
def record_result(self, success, proto=None, duration=0, srv=None, tor=None, ssl=None, category=None, exit_ip=None, reveals_headers=None):
|
|
"""Record a single target test result. Thread-safe."""
|
|
with self.lock:
|
|
self.results.append({
|
|
'success': success,
|
|
'proto': proto,
|
|
'duration': duration,
|
|
'srv': srv,
|
|
'tor': tor,
|
|
'ssl': ssl,
|
|
'category': category,
|
|
'exit_ip': exit_ip,
|
|
'reveals_headers': reveals_headers
|
|
})
|
|
|
|
def is_complete(self):
|
|
"""Check if all target tests have finished."""
|
|
with self.lock:
|
|
return len(self.results) >= self.num_targets
|
|
|
|
def rwip(self, ip):
|
|
n = []
|
|
for b in ip.split('.'):
|
|
while b[0] == 0 and len(b) > 1: b = b[:1]
|
|
n.append(b)
|
|
return '.'.join(n)
|
|
|
|
def evaluate(self):
|
|
"""Evaluate results after all tests complete.
|
|
|
|
Returns:
|
|
(success, category) tuple where success is bool and category is
|
|
the dominant failure type (or None on success)
|
|
"""
|
|
with self.lock:
|
|
if self.completed:
|
|
return (self.failcount == 0, None)
|
|
self.completed = True
|
|
self.checktime = int(time.time())
|
|
|
|
successes = [r for r in self.results if r['success']]
|
|
failures = [r for r in self.results if not r['success']]
|
|
num_success = len(successes)
|
|
|
|
# Determine dominant failure category
|
|
fail_category = None
|
|
if failures:
|
|
cats = {}
|
|
for f in failures:
|
|
cat = f.get('category') or 'other'
|
|
cats[cat] = cats.get(cat, 0) + 1
|
|
if cats:
|
|
fail_category = max(cats.keys(), key=lambda k: cats[k])
|
|
|
|
# require success (single target mode)
|
|
if num_success >= 1:
|
|
# use last successful result for metrics
|
|
last_good = successes[-1]
|
|
|
|
# Extract exit IP and header info from successful judge responses
|
|
for s in successes:
|
|
if s.get('exit_ip'):
|
|
self.exit_ip = s['exit_ip']
|
|
if s.get('reveals_headers') is not None:
|
|
self.reveals_headers = s['reveals_headers']
|
|
|
|
if geolite and self.country is None:
|
|
self.ip = self.rwip(self.ip)
|
|
rec = geodb.get_all(self.ip)
|
|
if rec is not None and rec.country_short:
|
|
self.country = rec.country_short
|
|
|
|
if asn_enabled and self.asn is None:
|
|
try:
|
|
asn_result = asndb.lookup(self.ip)
|
|
if asn_result and asn_result[0]:
|
|
self.asn = asn_result[0]
|
|
except Exception:
|
|
pass
|
|
|
|
self.proto = last_good['proto']
|
|
self.failcount = 0
|
|
if (self.consecutive_success % 3) == 0:
|
|
self.mitm = 0
|
|
self.consecutive_success += 1
|
|
self.success_count += 1
|
|
self.total_duration += int(last_good['duration'] * 1000)
|
|
|
|
# Calculate average latency from successful tests (in ms)
|
|
durations = [s['duration'] for s in successes if s['duration']]
|
|
if durations:
|
|
self.last_latency_ms = sum(durations) * 1000.0 / len(durations)
|
|
|
|
torstats = "" if len(config.torhosts) == 1 else ' tor: %s;' % last_good['tor']
|
|
# Determine anonymity for status message
|
|
# transparent: exit_ip == proxy_ip
|
|
# anonymous: exit_ip != proxy_ip, adds revealing headers
|
|
# elite: exit_ip != proxy_ip, no revealing headers
|
|
anon_status = ''
|
|
if self.exit_ip:
|
|
if self.exit_ip == self.ip:
|
|
anon_status = ' anon: transparent;'
|
|
elif self.reveals_headers is False:
|
|
anon_status = ' anon: elite;'
|
|
elif self.reveals_headers is True:
|
|
anon_status = ' anon: anonymous;'
|
|
else:
|
|
anon_status = ' anon: anonymous;' # default if no header check
|
|
_log('%s://%s:%d (%s) d: %.2f sec(s);%s%s srv: %s; ssl: %s; %d/%d targets' % (
|
|
last_good['proto'], self.ip, self.port, self.country,
|
|
last_good['duration'], torstats, anon_status, last_good['srv'], str(last_good['ssl']),
|
|
num_success, self.num_targets), 'xxxxx')
|
|
return (True, None)
|
|
|
|
else:
|
|
# Check if failures were all judge blocks (not proxy's fault)
|
|
judge_blocks = [f for f in failures if f.get('category') == 'judge_block']
|
|
real_failures = [f for f in failures if f.get('category') != 'judge_block']
|
|
|
|
if judge_blocks and not real_failures:
|
|
# All failures were judge blocks - inconclusive, don't penalize proxy
|
|
# checktime still updated so we don't immediately retest
|
|
return (False, 'judge_block')
|
|
else:
|
|
# Real proxy failure
|
|
self.failcount += 1
|
|
self.consecutive_success = 0
|
|
return (False, fail_category)
|
|
|
|
|
|
class TargetTestJob():
|
|
"""Job to test a single proxy against a single target.
|
|
|
|
Multiple TargetTestJob instances share the same ProxyTestState,
|
|
allowing tests to be interleaved with other proxies in the queue.
|
|
"""
|
|
def __init__(self, proxy_state, target_srv, checktype, worker_id=None):
|
|
self.proxy_state = proxy_state
|
|
self.target_srv = target_srv
|
|
self.checktype = checktype
|
|
self.worker_id = worker_id
|
|
|
|
def run(self):
|
|
"""Test the proxy against this job's target server."""
|
|
sock, proto, duration, tor, srv, failinc, is_ssl, err_cat = self._connect_and_test()
|
|
|
|
if not sock:
|
|
# SSL-only check passed (handshake success, no request needed)
|
|
if err_cat == 'ssl_ok':
|
|
elapsed = time.time() - duration
|
|
self.proxy_state.record_result(
|
|
True, proto=proto, duration=elapsed,
|
|
srv=srv, tor=tor, ssl=is_ssl
|
|
)
|
|
else:
|
|
self.proxy_state.record_result(False, category=err_cat)
|
|
return
|
|
|
|
try:
|
|
recv = sock.recv(-1)
|
|
|
|
# Select regex based on check type
|
|
if self.checktype == 'irc':
|
|
regex = '^(:|NOTICE|ERROR)'
|
|
elif self.checktype == 'judges':
|
|
regex = judges[srv]
|
|
elif self.checktype == 'ssl':
|
|
# Should not reach here - ssl returns before recv
|
|
self.proxy_state.record_result(True, proto=proto, srv=srv, ssl=is_ssl)
|
|
return
|
|
else: # http
|
|
regex = regexes[srv]
|
|
|
|
if re.search(regex, recv, re.IGNORECASE):
|
|
elapsed = time.time() - duration
|
|
# Extract exit IP from judge response
|
|
exit_ip = None
|
|
reveals_headers = None
|
|
if self.checktype == 'judges':
|
|
ip_match = re.search(IP_PATTERN, recv)
|
|
if ip_match:
|
|
exit_ip = ip_match.group(0)
|
|
# Check for header echo judge (elite detection)
|
|
if 'headers' in srv:
|
|
# If X-Forwarded-For/Via/etc present, proxy reveals chain
|
|
reveals_headers = bool(re.search(HEADER_REVEAL_PATTERN, recv, re.IGNORECASE))
|
|
# Record successful judge
|
|
judge_stats.record_success(srv)
|
|
self.proxy_state.record_result(
|
|
True, proto=proto, duration=elapsed,
|
|
srv=srv, tor=tor, ssl=is_ssl, exit_ip=exit_ip,
|
|
reveals_headers=reveals_headers
|
|
)
|
|
else:
|
|
# Check if judge is blocking us (not a proxy failure)
|
|
if self.checktype == 'judges' and JUDGE_BLOCK_RE.search(recv):
|
|
judge_stats.record_block(srv)
|
|
# Don't count as proxy failure - judge is blocking
|
|
self.proxy_state.record_result(False, category='judge_block')
|
|
if config.watchd.debug:
|
|
_log('judge %s blocked proxy %s' % (srv, self.proxy_state.proxy), 'debug')
|
|
else:
|
|
if self.checktype == 'judges':
|
|
judge_stats.record_failure(srv)
|
|
self.proxy_state.record_result(False, category='other')
|
|
|
|
except KeyboardInterrupt as e:
|
|
sock.disconnect()
|
|
raise e
|
|
except rocksock.RocksockException as e:
|
|
self.proxy_state.record_result(False, category=categorize_error(e))
|
|
finally:
|
|
sock.disconnect()
|
|
|
|
def _connect_and_test(self):
|
|
"""Connect to target through the proxy and send test packet."""
|
|
ps = self.proxy_state
|
|
srvname = self.target_srv.strip() if self.checktype == 'irc' else self.target_srv
|
|
# For judges, extract host from 'host/path' format
|
|
if self.checktype == 'judges' and '/' in srvname:
|
|
connect_host = srvname.split('/')[0]
|
|
else:
|
|
connect_host = srvname
|
|
|
|
# SSL checktype: always use SSL with certificate verification
|
|
if self.checktype == 'ssl':
|
|
use_ssl = 1
|
|
ssl_only_check = True # handshake only, no HTTP request
|
|
server_port = 443
|
|
verifycert = True
|
|
else:
|
|
use_ssl = random.choice([0, 1]) if config.watchd.use_ssl == 2 else config.watchd.use_ssl
|
|
ssl_only_check = False # minimal SSL test (handshake only, no request)
|
|
if ps.consecutive_success > 0 and (ps.consecutive_success % 3) == 0:
|
|
use_ssl = 1
|
|
ssl_only_check = True # periodic MITM check - handshake is sufficient
|
|
|
|
if self.checktype == 'irc':
|
|
server_port = 6697 if use_ssl else 6667
|
|
else:
|
|
server_port = 443 if use_ssl else 80
|
|
|
|
verifycert = True if use_ssl else False
|
|
protos = ['http', 'socks5', 'socks4'] if ps.proto is None else [ps.proto]
|
|
last_error_category = None
|
|
|
|
# Get Tor host from pool (with worker affinity)
|
|
pool = connection_pool.get_pool()
|
|
|
|
for proto in protos:
|
|
if pool:
|
|
torhost = pool.get_tor_host(self.worker_id)
|
|
else:
|
|
torhost = random.choice(config.torhosts)
|
|
if proto == 'socks4':
|
|
srv = socks4_resolve(connect_host, server_port)
|
|
else:
|
|
srv = connect_host
|
|
if not srv:
|
|
continue
|
|
|
|
duration = time.time()
|
|
proxies = [
|
|
rocksock.RocksockProxyFromURL('socks5://%s' % torhost),
|
|
rocksock.RocksockProxyFromURL('%s://%s:%s' % (proto, ps.ip, ps.port)),
|
|
]
|
|
|
|
try:
|
|
sock = rocksock.Rocksock(host=srv, port=server_port, ssl=use_ssl,
|
|
proxies=proxies, timeout=config.watchd.timeout,
|
|
verifycert=verifycert)
|
|
sock.connect()
|
|
|
|
# SSL-only check: handshake passed, no request needed
|
|
if ssl_only_check:
|
|
elapsed = time.time() - duration
|
|
if pool:
|
|
pool.record_success(torhost, elapsed)
|
|
sock.disconnect()
|
|
return None, proto, duration, torhost, srvname, 0, use_ssl, 'ssl_ok'
|
|
|
|
if self.checktype == 'irc':
|
|
sock.send('NICK\n')
|
|
elif self.checktype == 'judges':
|
|
# GET request to receive body with IP address
|
|
sock.send('GET /%s HTTP/1.0\r\nHost: %s\r\n\r\n' % (
|
|
srvname.split('/', 1)[1] if '/' in srvname else '',
|
|
srvname.split('/')[0]
|
|
))
|
|
else: # http - HEAD is sufficient for header checks
|
|
sock.send('HEAD / HTTP/1.0\r\nHost: %s\r\n\r\n' % srvname)
|
|
# Record success in pool
|
|
if pool:
|
|
pool.record_success(torhost, time.time() - duration)
|
|
return sock, proto, duration, torhost, srvname, 0, use_ssl, None
|
|
|
|
except rocksock.RocksockException as e:
|
|
last_error_category = categorize_error(e)
|
|
if config.watchd.debug:
|
|
_log("proxy failed: %s://%s:%d: %s [%s]" % (proto, ps.ip, ps.port,
|
|
e.get_errormessage(), last_error_category), 'debug')
|
|
|
|
et = e.get_errortype()
|
|
err = e.get_error()
|
|
fp = e.get_failedproxy()
|
|
|
|
sock.disconnect()
|
|
|
|
if et == rocksock.RS_ET_OWN:
|
|
if fp == 1 and (err == rocksock.RS_E_REMOTE_DISCONNECTED or
|
|
err == rocksock.RS_E_HIT_TIMEOUT):
|
|
break
|
|
elif fp == 0 and err == rocksock.RS_E_TARGET_CONN_REFUSED:
|
|
# Tor connection failed - record in pool
|
|
if pool:
|
|
pool.record_failure(torhost)
|
|
if random.randint(0, (config.watchd.threads - 1) / 2) == 0:
|
|
_log("could not connect to tor, sleep 5s", "ERROR")
|
|
time.sleep(5)
|
|
elif et == rocksock.RS_ET_GAI:
|
|
_log("could not resolve connection target %s" % connect_host, "ERROR")
|
|
break
|
|
elif err == rocksock.RS_E_SSL_CERTIFICATE_ERROR:
|
|
ps.mitm = 1
|
|
|
|
except KeyboardInterrupt as e:
|
|
raise e
|
|
|
|
return None, None, None, None, None, 1, use_ssl, last_error_category
|
|
|
|
|
|
class WorkerThread():
|
|
def __init__(self, id, job_queue, result_queue):
|
|
self.id = id
|
|
self.done = threading.Event()
|
|
self.thread = None
|
|
self.job_queue = job_queue # shared input queue
|
|
self.result_queue = result_queue # shared output queue
|
|
def stop(self):
|
|
self.done.set()
|
|
def term(self):
|
|
if self.thread: self.thread.join()
|
|
def start_thread(self):
|
|
self.thread = threading.Thread(target=self.workloop)
|
|
self.thread.start()
|
|
def workloop(self):
|
|
job_count = 0
|
|
duration_total = 0
|
|
while not self.done.is_set():
|
|
try:
|
|
job = self.job_queue.get(timeout=0.5)
|
|
except Queue.Empty:
|
|
continue
|
|
nao = time.time()
|
|
# Assign worker ID for connection pool affinity
|
|
job.worker_id = self.id
|
|
job.run()
|
|
spent = time.time() - nao
|
|
job_count += 1
|
|
duration_total += spent
|
|
self.result_queue.put(job)
|
|
self.job_queue.task_done()
|
|
if self.thread and job_count > 0:
|
|
avg_t = try_div(duration_total, job_count)
|
|
_log("terminated, %d jobs, avg.time %.2f" % (job_count, avg_t), self.id)
|
|
|
|
class Proxywatchd():
|
|
|
|
def stop(self):
|
|
_log('halting... (%d thread(s))' % len(self.threads), 'watchd')
|
|
self.stopping.set()
|
|
|
|
def _cleanup(self):
|
|
for wt in self.threads:
|
|
wt.stop()
|
|
for wt in self.threads:
|
|
wt.term()
|
|
self.collect_work()
|
|
self.submit_collected()
|
|
if self.httpd_server:
|
|
self.httpd_server.stop()
|
|
self.stopped.set()
|
|
|
|
def finish(self):
|
|
if not self.in_background: self._cleanup()
|
|
while not self.stopped.is_set(): time.sleep(0.1)
|
|
success_rate = try_div(self.totals['success'], self.totals['submitted']) * 100
|
|
_log("total results: %d/%d (%.2f%%)"%(self.totals['success'], self.totals['submitted'], success_rate), "watchd")
|
|
|
|
def _prep_db(self):
|
|
self.mysqlite = mysqlite.mysqlite(config.watchd.database, str)
|
|
def _close_db(self):
|
|
if self.mysqlite:
|
|
self.mysqlite.close()
|
|
self.mysqlite = None
|
|
def __init__(self):
|
|
config.load()
|
|
self.in_background = False
|
|
self.threads = []
|
|
self.stopping = threading.Event()
|
|
self.stopped = threading.Event()
|
|
|
|
# shared work-stealing queues
|
|
self.job_queue = PriorityJobQueue()
|
|
self.result_queue = Queue.Queue()
|
|
|
|
# track pending proxy states (for multi-target aggregation)
|
|
self.pending_states = [] # list of ProxyTestState awaiting completion
|
|
self.pending_lock = threading.Lock()
|
|
|
|
# create table if needed (use dbs.py for canonical schema)
|
|
self._prep_db()
|
|
dbs.create_table_if_not_exists(self.mysqlite, 'proxylist')
|
|
self._close_db()
|
|
|
|
self.submit_after = config.watchd.submit_after # number of collected jobs before writing db
|
|
self.collected = [] # completed ProxyTestState objects ready for DB
|
|
self.totals = {
|
|
'submitted':0,
|
|
'success':0,
|
|
}
|
|
self.stats = Stats()
|
|
self.last_cleanup = time.time()
|
|
self.httpd_server = None
|
|
|
|
# Dynamic thread scaling
|
|
self.scaler = ThreadScaler(
|
|
min_threads=max(2, config.watchd.threads // 4),
|
|
max_threads=config.watchd.threads * 2,
|
|
target_queue_per_thread=10
|
|
)
|
|
self.thread_id_counter = 0
|
|
self.last_jobs_log = 0
|
|
self.last_update_log = 0
|
|
self.log_interval = 60 # seconds between routine log messages
|
|
|
|
# Register SIGHUP handler for config reload
|
|
signal.signal(signal.SIGHUP, self._handle_sighup)
|
|
|
|
def _handle_sighup(self, signum, frame):
|
|
"""Handle SIGHUP signal for config reload."""
|
|
_log('received SIGHUP, reloading config...', 'watchd')
|
|
self.reload_config()
|
|
|
|
def reload_config(self):
|
|
"""Reload configuration without restart.
|
|
|
|
Hot-reloadable settings:
|
|
threads, timeout, checktime, perfail_checktime, submit_after,
|
|
stats_interval, debug, tor_safeguard, stale_days, max_fail,
|
|
outage_threshold
|
|
|
|
Requires restart:
|
|
database, source_file, checktype
|
|
"""
|
|
old_threads = config.watchd.threads
|
|
old_checktype = config.watchd.checktype
|
|
|
|
try:
|
|
config.load()
|
|
errors = config.validate()
|
|
if errors:
|
|
for e in errors:
|
|
_log('config error: %s' % e, 'error')
|
|
_log('config reload failed, keeping old config', 'error')
|
|
return False
|
|
except Exception as e:
|
|
_log('config reload failed: %s' % str(e), 'error')
|
|
return False
|
|
|
|
# Update runtime values
|
|
self.submit_after = config.watchd.submit_after
|
|
|
|
# Update scaler limits
|
|
self.scaler.min_threads = max(2, config.watchd.threads // 4)
|
|
self.scaler.max_threads = config.watchd.threads * 2
|
|
|
|
# Warn about values requiring restart
|
|
if config.watchd.checktype != old_checktype:
|
|
_log('checktype changed (%s -> %s), requires restart to take effect' % (
|
|
old_checktype, config.watchd.checktype), 'warn')
|
|
|
|
_log('config reloaded: threads=%d timeout=%d checktime=%d max_fail=%d' % (
|
|
config.watchd.threads, config.watchd.timeout,
|
|
config.watchd.checktime, config.watchd.max_fail), 'watchd')
|
|
return True
|
|
|
|
def _spawn_thread(self):
|
|
"""Spawn a new worker thread."""
|
|
self.thread_id_counter += 1
|
|
threadid = 'dyn%d' % self.thread_id_counter
|
|
wt = WorkerThread(threadid, self.job_queue, self.result_queue)
|
|
wt.start_thread()
|
|
self.threads.append(wt)
|
|
return threadid
|
|
|
|
def _remove_thread(self):
|
|
"""Remove one worker thread (graceful shutdown)."""
|
|
if len(self.threads) <= self.scaler.min_threads:
|
|
return None
|
|
# Stop the last thread
|
|
wt = self.threads.pop()
|
|
wt.stop()
|
|
# Don't wait for it - let it finish its current job
|
|
return wt.id
|
|
|
|
def _adjust_threads(self, target):
|
|
"""Adjust thread count to target."""
|
|
current = len(self.threads)
|
|
if target > current:
|
|
added = []
|
|
for _ in range(target - current):
|
|
tid = self._spawn_thread()
|
|
added.append(tid)
|
|
if added:
|
|
_log('scaled up: +%d threads (%s)' % (len(added), ','.join(added)), 'scaler')
|
|
elif target < current:
|
|
removed = []
|
|
for _ in range(current - target):
|
|
tid = self._remove_thread()
|
|
if tid:
|
|
removed.append(tid)
|
|
if removed:
|
|
_log('scaled down: -%d threads' % len(removed), 'scaler')
|
|
|
|
def fetch_rows(self):
|
|
self.isoldies = False
|
|
q = 'SELECT ip,port,proto,failed,success_count,total_duration,country,mitm,consecutive_success,asn FROM proxylist WHERE failed >= ? and failed < ? and (tested + ? + (failed * ?)) < ? ORDER BY RANDOM()'
|
|
now = time.time()
|
|
params = (0, config.watchd.max_fail, config.watchd.checktime, config.watchd.perfail_checktime, now)
|
|
if config.watchd.debug:
|
|
_log('fetch_rows: db=%s checktime=%d perfail=%d max_fail=%d now=%d' % (
|
|
config.watchd.database, config.watchd.checktime, config.watchd.perfail_checktime,
|
|
config.watchd.max_fail, int(now)), 'debug')
|
|
rows = self.mysqlite.execute(q, params).fetchall()
|
|
if config.watchd.debug:
|
|
_log('fetch_rows: got %d rows' % len(rows), 'debug')
|
|
# check oldies ?
|
|
if len(rows) < config.watchd.threads:
|
|
rows = []
|
|
if config.watchd.oldies:
|
|
self.isoldies = True
|
|
## disable tor safeguard for old proxies
|
|
if self.tor_safeguard: self.tor_safeguard = False
|
|
rows = self.mysqlite.execute(q, (config.watchd.max_fail, config.watchd.max_fail + round( config.watchd.max_fail/2), config.watchd.checktime, config.watchd.oldies_checktime, time.time())).fetchall()
|
|
return rows
|
|
|
|
def prepare_jobs(self):
|
|
self._prep_db()
|
|
## enable tor safeguard by default
|
|
self.tor_safeguard = config.watchd.tor_safeguard
|
|
rows = self.fetch_rows()
|
|
checktype = config.watchd.checktype
|
|
num_targets = 1
|
|
|
|
# select target pool based on checktype
|
|
if checktype == 'irc':
|
|
target_pool = config.servers
|
|
elif checktype == 'judges':
|
|
# Filter out judges in cooldown (blocked/rate-limited)
|
|
all_judges = list(judges.keys())
|
|
target_pool = judge_stats.get_available_judges(all_judges)
|
|
if not target_pool:
|
|
# All judges in cooldown - use all anyway
|
|
target_pool = all_judges
|
|
if config.watchd.debug:
|
|
_log('all judges in cooldown, using full list', 'debug')
|
|
elif checktype == 'ssl':
|
|
target_pool = ssl_targets
|
|
else: # http
|
|
target_pool = list(regexes.keys())
|
|
|
|
# create all jobs first, then shuffle for interleaving
|
|
all_jobs = []
|
|
new_states = []
|
|
|
|
for row in rows:
|
|
# create shared state for this proxy
|
|
state = ProxyTestState(
|
|
row[0], row[1], row[2], row[3], row[4], row[5],
|
|
row[6], row[7], row[8], asn=row[9], num_targets=num_targets,
|
|
oldies=self.isoldies
|
|
)
|
|
new_states.append(state)
|
|
|
|
# select random targets for this proxy
|
|
targets = random.sample(target_pool, min(num_targets, len(target_pool)))
|
|
|
|
# create one job per target
|
|
for target in targets:
|
|
job = TargetTestJob(state, target, checktype)
|
|
all_jobs.append(job)
|
|
|
|
# shuffle to interleave tests across different proxies
|
|
random.shuffle(all_jobs)
|
|
|
|
# track pending states
|
|
with self.pending_lock:
|
|
self.pending_states.extend(new_states)
|
|
|
|
# queue all jobs with priority
|
|
for job in all_jobs:
|
|
priority = calculate_priority(
|
|
job.proxy_state.failcount,
|
|
job.proxy_state.success_count,
|
|
config.watchd.max_fail
|
|
)
|
|
self.job_queue.put(job, priority)
|
|
|
|
self._close_db()
|
|
proxy_count = len(new_states)
|
|
job_count = len(all_jobs)
|
|
now = time.time()
|
|
if proxy_count > 0 and (now - self.last_jobs_log) >= self.log_interval:
|
|
_log("created %d jobs for %d proxies (%d targets each)" % (
|
|
job_count, proxy_count, num_targets), 'watchd')
|
|
self.last_jobs_log = now
|
|
return job_count
|
|
|
|
def collect_work(self):
|
|
# drain results from shared result queue (TargetTestJob objects)
|
|
# results are already recorded in their ProxyTestState
|
|
while True:
|
|
try:
|
|
self.result_queue.get_nowait()
|
|
except Queue.Empty:
|
|
break
|
|
|
|
# check for completed proxy states and evaluate them
|
|
with self.pending_lock:
|
|
still_pending = []
|
|
for state in self.pending_states:
|
|
if state.is_complete():
|
|
success, category = state.evaluate()
|
|
self.stats.record(success, category)
|
|
self.collected.append(state)
|
|
else:
|
|
still_pending.append(state)
|
|
self.pending_states = still_pending
|
|
|
|
def collect_unfinished(self):
|
|
# drain any remaining jobs from job queue
|
|
unfinished_count = 0
|
|
while True:
|
|
try:
|
|
self.job_queue.get_nowait()
|
|
unfinished_count += 1
|
|
except Queue.Empty:
|
|
break
|
|
if unfinished_count > 0:
|
|
_log("discarded %d unfinished jobs" % unfinished_count, "watchd")
|
|
# note: corresponding ProxyTestStates will be incomplete
|
|
# they'll be re-tested in the next cycle
|
|
|
|
def submit_collected(self):
|
|
if len(self.collected) == 0:
|
|
return True
|
|
sc = 0
|
|
args = []
|
|
latency_updates = [] # (proxy, latency_ms) for successful tests
|
|
for job in self.collected:
|
|
if job.failcount == 0:
|
|
sc += 1
|
|
if job.last_latency_ms is not None:
|
|
latency_updates.append((job.proxy, job.last_latency_ms))
|
|
args.append((job.failcount, job.checktime, 1, job.country, job.proto,
|
|
job.success_count, job.total_duration, job.mitm,
|
|
job.consecutive_success, job.asn, job.proxy))
|
|
|
|
success_rate = (float(sc) / len(self.collected)) * 100
|
|
ret = True
|
|
if len(self.collected) >= 100 and success_rate <= config.watchd.outage_threshold and self.tor_safeguard:
|
|
_log("WATCHD %.2f%% SUCCESS RATE - tor circuit blocked? won't submit fails" % success_rate, "ERROR")
|
|
if sc == 0:
|
|
return False
|
|
args = []
|
|
latency_updates = []
|
|
for job in self.collected:
|
|
if job.failcount == 0:
|
|
args.append((job.failcount, job.checktime, 1, job.country, job.proto,
|
|
job.success_count, job.total_duration, job.mitm,
|
|
job.consecutive_success, job.asn, job.proxy))
|
|
if job.last_latency_ms is not None:
|
|
latency_updates.append((job.proxy, job.last_latency_ms))
|
|
ret = False
|
|
|
|
now = time.time()
|
|
if (now - self.last_update_log) >= self.log_interval or success_rate > 0:
|
|
_log("updating %d DB entries (success rate: %.2f%%)" % (len(self.collected), success_rate), 'watchd')
|
|
self.last_update_log = now
|
|
self._prep_db()
|
|
query = 'UPDATE proxylist SET failed=?,tested=?,dronebl=?,country=?,proto=?,success_count=?,total_duration=?,mitm=?,consecutive_success=?,asn=? WHERE proxy=?'
|
|
self.mysqlite.executemany(query, args)
|
|
|
|
# Update latency metrics for successful proxies
|
|
for proxy, latency_ms in latency_updates:
|
|
dbs.update_proxy_latency(self.mysqlite, proxy, latency_ms)
|
|
|
|
# Update anonymity for proxies with exit IP data
|
|
for job in self.collected:
|
|
if job.failcount == 0 and job.exit_ip:
|
|
dbs.update_proxy_anonymity(self.mysqlite, job.proxy, job.exit_ip, job.ip, job.reveals_headers)
|
|
|
|
self.mysqlite.commit()
|
|
self._close_db()
|
|
self.collected = []
|
|
self.totals['submitted'] += len(args)
|
|
self.totals['success'] += sc
|
|
return ret
|
|
|
|
def cleanup_stale(self):
|
|
"""Remove proxies that have been dead for too long."""
|
|
stale_seconds = config.watchd.stale_days * 86400
|
|
cutoff = int(time.time()) - stale_seconds
|
|
self._prep_db()
|
|
# delete proxies that: failed >= max_fail AND last tested before cutoff
|
|
result = self.mysqlite.execute(
|
|
'DELETE FROM proxylist WHERE failed >= ? AND tested < ?',
|
|
(config.watchd.max_fail, cutoff)
|
|
)
|
|
count = result.rowcount if hasattr(result, 'rowcount') else 0
|
|
self.mysqlite.commit()
|
|
self._close_db()
|
|
if count > 0:
|
|
_log('removed %d stale proxies (not seen in %d days)' % (count, config.watchd.stale_days), 'watchd')
|
|
self.last_cleanup = time.time()
|
|
|
|
def start(self):
|
|
if config.watchd.threads == 1 and _run_standalone:
|
|
return self._run()
|
|
else:
|
|
return self._run_background()
|
|
|
|
def run(self):
|
|
if self.in_background:
|
|
while 1: time.sleep(1)
|
|
|
|
def _run_background(self):
|
|
self.in_background = True
|
|
t = threading.Thread(target=self._run)
|
|
t.start()
|
|
|
|
def _run(self):
|
|
_log('starting...', 'watchd')
|
|
_log('config: db=%s checktype=%s threads=%d checktime=%d perfail=%d max_fail=%d' % (
|
|
config.watchd.database, config.watchd.checktype, config.watchd.threads,
|
|
config.watchd.checktime, config.watchd.perfail_checktime, config.watchd.max_fail), 'watchd')
|
|
|
|
# Log database status at startup
|
|
self._prep_db()
|
|
total = self.mysqlite.execute('SELECT COUNT(*) FROM proxylist').fetchone()[0]
|
|
now = time.time()
|
|
due = self.mysqlite.execute(
|
|
'SELECT COUNT(*) FROM proxylist WHERE failed >= 0 and failed < ? and (tested + ? + (failed * ?)) < ?',
|
|
(config.watchd.max_fail, config.watchd.checktime, config.watchd.perfail_checktime, now)
|
|
).fetchone()[0]
|
|
self._close_db()
|
|
_log('database: %d total proxies, %d due for testing' % (total, due), 'watchd')
|
|
|
|
# Initialize Tor connection pool
|
|
connection_pool.init_pool(config.torhosts, warmup=True)
|
|
|
|
# Start HTTP API server if enabled
|
|
if config.httpd.enabled:
|
|
from httpd import ProxyAPIServer
|
|
self.httpd_server = ProxyAPIServer(
|
|
config.httpd.listenip,
|
|
config.httpd.port,
|
|
config.watchd.database
|
|
)
|
|
self.httpd_server.start()
|
|
|
|
# create worker threads with shared queues
|
|
for i in range(config.watchd.threads):
|
|
threadid = ''.join([random.choice(string.letters) for x in range(5)])
|
|
wt = WorkerThread(threadid, self.job_queue, self.result_queue)
|
|
if self.in_background:
|
|
wt.start_thread()
|
|
self.threads.append(wt)
|
|
time.sleep(random.random() / 10)
|
|
|
|
sleeptime = 0
|
|
while True:
|
|
|
|
if self.stopping.is_set():
|
|
if self.in_background: self._cleanup()
|
|
break
|
|
|
|
if sleeptime > 0:
|
|
time.sleep(1)
|
|
sleeptime -= 1
|
|
continue
|
|
|
|
# check if job queue is empty (work-stealing: threads pull as needed)
|
|
if self.job_queue.empty():
|
|
self.collect_work()
|
|
if not self.submit_collected() and self.tor_safeguard:
|
|
_log("zzZzZzzZ sleeping 1 minute(s) due to tor issues", "watchd")
|
|
sleeptime = 60
|
|
else:
|
|
job_count = self.prepare_jobs()
|
|
if job_count == 0:
|
|
# no jobs available, wait before checking again
|
|
sleeptime = 10
|
|
|
|
if not self.in_background: # single_thread scenario
|
|
self.threads[0].workloop()
|
|
|
|
self.collect_work()
|
|
|
|
if len(self.collected) > self.submit_after:
|
|
if not self.submit_collected() and self.tor_safeguard:
|
|
_log("zzZzZzzZ sleeping 1 minute(s) due to tor issues", "watchd")
|
|
sleeptime = 60
|
|
|
|
# periodic stats report
|
|
if self.stats.should_report(config.watchd.stats_interval):
|
|
_log(self.stats.report(), 'stats')
|
|
# Also report pool stats
|
|
pool = connection_pool.get_pool()
|
|
if pool:
|
|
_log(pool.status_line(), 'stats')
|
|
# Report judge stats (when using judges checktype)
|
|
if config.watchd.checktype == 'judges':
|
|
_log(judge_stats.status_line(), 'stats')
|
|
# Report scaler status
|
|
_log(self.scaler.status_line(len(self.threads), self.job_queue.qsize()), 'scaler')
|
|
|
|
# Dynamic thread scaling
|
|
if self.in_background:
|
|
target = self.scaler.should_scale(
|
|
len(self.threads),
|
|
self.job_queue.qsize(),
|
|
self.stats
|
|
)
|
|
if target != len(self.threads):
|
|
self._adjust_threads(target)
|
|
|
|
# periodic stale proxy cleanup (daily)
|
|
if (time.time() - self.last_cleanup) >= 86400:
|
|
self.cleanup_stale()
|
|
|
|
time.sleep(1)
|
|
|
|
|
|
if __name__ == '__main__':
|
|
_run_standalone = True
|
|
|
|
config.load()
|
|
errors = config.validate()
|
|
if errors:
|
|
for e in errors:
|
|
_log(e, 'error')
|
|
import sys
|
|
sys.exit(1)
|
|
|
|
w = Proxywatchd()
|
|
try:
|
|
w.start()
|
|
w.run()
|
|
except KeyboardInterrupt as e:
|
|
pass
|
|
finally:
|
|
w.stop()
|
|
w.finish()
|