proxywatchd: add startup logging, fix geolocation error handling
This commit is contained in:
462
proxywatchd.py
462
proxywatchd.py
@@ -6,19 +6,28 @@ import random
|
||||
import string
|
||||
import re
|
||||
import heapq
|
||||
import signal
|
||||
|
||||
import os
|
||||
try:
|
||||
import Queue
|
||||
except ImportError:
|
||||
import queue as Queue
|
||||
try:
|
||||
import IP2Location
|
||||
import os
|
||||
geodb = IP2Location.IP2Location(os.path.join("data", "IP2LOCATION-LITE-DB1.BIN"))
|
||||
geolite = True
|
||||
except (ImportError, IOError):
|
||||
except (ImportError, IOError, ValueError):
|
||||
geolite = False
|
||||
|
||||
try:
|
||||
import pyasn
|
||||
asndb = pyasn.pyasn(os.path.join("data", "ipasn.dat"))
|
||||
asn_enabled = True
|
||||
except (ImportError, IOError, NameError):
|
||||
asndb = None
|
||||
asn_enabled = False
|
||||
|
||||
from config import Config
|
||||
|
||||
import mysqlite
|
||||
@@ -32,6 +41,126 @@ config = Config()
|
||||
_run_standalone = False
|
||||
cached_dns = {}
|
||||
|
||||
# IP pattern for judge services (validates response contains valid IP in body)
|
||||
IP_PATTERN = r'\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}'
|
||||
# Pattern for header echo - if X-Forwarded-For or Via present, proxy reveals chain
|
||||
HEADER_REVEAL_PATTERN = r'(X-Forwarded-For|Via|X-Real-Ip|Forwarded)'
|
||||
|
||||
# Patterns indicating judge is blocking the proxy (not a proxy failure)
|
||||
# These should NOT count as proxy failures - retry with different judge
|
||||
JUDGE_BLOCK_PATTERNS = [
|
||||
r'HTTP/1\.[01] 403', # Forbidden
|
||||
r'HTTP/1\.[01] 429', # Too Many Requests
|
||||
r'HTTP/1\.[01] 503', # Service Unavailable
|
||||
r'captcha', # Captcha challenge
|
||||
r'challenge', # Generic challenge
|
||||
r'verify you are human', # Human verification
|
||||
r'rate.?limit', # Rate limiting
|
||||
r'too many requests', # Rate limiting
|
||||
r'access.?denied', # Access denied
|
||||
r'blocked', # Explicit block
|
||||
r'Checking your browser', # Cloudflare JS challenge
|
||||
]
|
||||
JUDGE_BLOCK_RE = re.compile('|'.join(JUDGE_BLOCK_PATTERNS), re.IGNORECASE)
|
||||
|
||||
# Check types: irc, http (header match), judges (body match), ssl (TLS handshake)
|
||||
# Judge services - return IP in body (plain text, JSON, or HTML)
|
||||
judges = {
|
||||
# Plain text IP
|
||||
'api.ipify.org': IP_PATTERN,
|
||||
'icanhazip.com': IP_PATTERN,
|
||||
'checkip.amazonaws.com': IP_PATTERN,
|
||||
'ifconfig.me/ip': IP_PATTERN,
|
||||
'ipinfo.io/ip': IP_PATTERN,
|
||||
'ip.me': IP_PATTERN,
|
||||
'myip.dnsomatic.com': IP_PATTERN,
|
||||
'ident.me': IP_PATTERN,
|
||||
'ipecho.net/plain': IP_PATTERN,
|
||||
'wtfismyip.com/text': IP_PATTERN,
|
||||
'eth0.me': IP_PATTERN,
|
||||
'l2.io/ip': IP_PATTERN,
|
||||
'tnx.nl/ip': IP_PATTERN,
|
||||
'wgetip.com': IP_PATTERN,
|
||||
'curlmyip.net': IP_PATTERN,
|
||||
# JSON responses (IP pattern matches within JSON)
|
||||
'httpbin.org/ip': IP_PATTERN,
|
||||
'ip-api.com/json': IP_PATTERN,
|
||||
'ipapi.co/json': IP_PATTERN,
|
||||
'ipwhois.app/json': IP_PATTERN,
|
||||
# Header echo - for elite detection (check if proxy adds revealing headers)
|
||||
'httpbin.org/headers': IP_PATTERN, # returns request headers as JSON
|
||||
}
|
||||
|
||||
|
||||
class JudgeStats():
|
||||
"""Track per-judge success/failure rates for reliability scoring.
|
||||
|
||||
Judges that frequently block or rate-limit are temporarily avoided.
|
||||
Stats decay over time to allow recovery.
|
||||
"""
|
||||
|
||||
def __init__(self, cooldown_seconds=300, block_threshold=3):
|
||||
self.lock = threading.Lock()
|
||||
self.stats = {} # judge -> {'success': n, 'fail': n, 'block': n, 'last_block': timestamp}
|
||||
self.cooldown_seconds = cooldown_seconds # seconds to avoid blocked judges
|
||||
self.block_threshold = block_threshold # consecutive blocks before cooldown
|
||||
|
||||
def record_success(self, judge):
|
||||
"""Record successful judge response."""
|
||||
with self.lock:
|
||||
if judge not in self.stats:
|
||||
self.stats[judge] = {'success': 0, 'fail': 0, 'block': 0, 'last_block': 0}
|
||||
self.stats[judge]['success'] += 1
|
||||
# Reset block count on success
|
||||
self.stats[judge]['block'] = 0
|
||||
|
||||
def record_failure(self, judge):
|
||||
"""Record judge failure (proxy failed, not judge block)."""
|
||||
with self.lock:
|
||||
if judge not in self.stats:
|
||||
self.stats[judge] = {'success': 0, 'fail': 0, 'block': 0, 'last_block': 0}
|
||||
self.stats[judge]['fail'] += 1
|
||||
|
||||
def record_block(self, judge):
|
||||
"""Record judge blocking the proxy (403, captcha, rate-limit)."""
|
||||
with self.lock:
|
||||
if judge not in self.stats:
|
||||
self.stats[judge] = {'success': 0, 'fail': 0, 'block': 0, 'last_block': 0}
|
||||
self.stats[judge]['block'] += 1
|
||||
self.stats[judge]['last_block'] = time.time()
|
||||
|
||||
def is_available(self, judge):
|
||||
"""Check if judge is available (not in cooldown)."""
|
||||
with self.lock:
|
||||
if judge not in self.stats:
|
||||
return True
|
||||
s = self.stats[judge]
|
||||
# Check if in cooldown period
|
||||
if s['block'] >= self.block_threshold:
|
||||
if (time.time() - s['last_block']) < self.cooldown_seconds:
|
||||
return False
|
||||
# Cooldown expired, reset block count
|
||||
s['block'] = 0
|
||||
return True
|
||||
|
||||
def get_available_judges(self, judge_list):
|
||||
"""Return list of judges not in cooldown."""
|
||||
return [j for j in judge_list if self.is_available(j)]
|
||||
|
||||
def status_line(self):
|
||||
"""Return status summary for logging."""
|
||||
with self.lock:
|
||||
total = len(self.stats)
|
||||
blocked = sum(1 for s in self.stats.values()
|
||||
if s['block'] >= self.block_threshold and
|
||||
(time.time() - s['last_block']) < self.cooldown_seconds)
|
||||
return 'judges: %d total, %d in cooldown' % (total, blocked)
|
||||
|
||||
|
||||
# Global judge stats instance
|
||||
judge_stats = JudgeStats()
|
||||
|
||||
# HTTP targets - check for specific headers
|
||||
regexes = {
|
||||
'www.facebook.com': 'X-FB-Debug',
|
||||
'www.fbcdn.net': 'X-FB-Debug',
|
||||
@@ -71,6 +200,28 @@ regexes = {
|
||||
'www.time.com': 'x-amz-cf-pop'
|
||||
}
|
||||
|
||||
# SSL targets - verify TLS handshake only (MITM detection)
|
||||
# No HTTP request needed - just connect with verifycert=True
|
||||
ssl_targets = [
|
||||
'www.google.com',
|
||||
'www.microsoft.com',
|
||||
'www.apple.com',
|
||||
'www.amazon.com',
|
||||
'www.cloudflare.com',
|
||||
'www.github.com',
|
||||
'www.mozilla.org',
|
||||
'www.wikipedia.org',
|
||||
'www.reddit.com',
|
||||
'www.twitter.com',
|
||||
'x.com',
|
||||
'www.facebook.com',
|
||||
'www.linkedin.com',
|
||||
'www.paypal.com',
|
||||
'www.stripe.com',
|
||||
'www.digicert.com',
|
||||
'www.letsencrypt.org',
|
||||
]
|
||||
|
||||
|
||||
class Stats():
|
||||
"""Track and report runtime statistics."""
|
||||
@@ -302,7 +453,7 @@ class ProxyTestState():
|
||||
When all tests complete, evaluate() determines final pass/fail.
|
||||
"""
|
||||
def __init__(self, ip, port, proto, failcount, success_count, total_duration,
|
||||
country, mitm, consecutive_success, num_targets=3, oldies=False):
|
||||
country, mitm, consecutive_success, asn=None, num_targets=3, oldies=False):
|
||||
self.ip = ip
|
||||
self.port = int(port)
|
||||
self.proxy = '%s:%s' % (ip, port)
|
||||
@@ -314,6 +465,7 @@ class ProxyTestState():
|
||||
self.country = country
|
||||
self.mitm = mitm
|
||||
self.consecutive_success = consecutive_success
|
||||
self.asn = asn
|
||||
self.isoldies = oldies
|
||||
self.num_targets = num_targets
|
||||
|
||||
@@ -322,8 +474,10 @@ class ProxyTestState():
|
||||
self.results = [] # list of (success, proto, duration, srv, tor, ssl)
|
||||
self.completed = False
|
||||
self.last_latency_ms = None # average latency from successful tests
|
||||
self.exit_ip = None # IP seen by target server (for anonymity detection)
|
||||
self.reveals_headers = None # True if proxy adds X-Forwarded-For/Via headers
|
||||
|
||||
def record_result(self, success, proto=None, duration=0, srv=None, tor=None, ssl=None, category=None):
|
||||
def record_result(self, success, proto=None, duration=0, srv=None, tor=None, ssl=None, category=None, exit_ip=None, reveals_headers=None):
|
||||
"""Record a single target test result. Thread-safe."""
|
||||
with self.lock:
|
||||
self.results.append({
|
||||
@@ -333,7 +487,9 @@ class ProxyTestState():
|
||||
'srv': srv,
|
||||
'tor': tor,
|
||||
'ssl': ssl,
|
||||
'category': category
|
||||
'category': category,
|
||||
'exit_ip': exit_ip,
|
||||
'reveals_headers': reveals_headers
|
||||
})
|
||||
|
||||
def is_complete(self):
|
||||
@@ -375,17 +531,32 @@ class ProxyTestState():
|
||||
if cats:
|
||||
fail_category = max(cats.keys(), key=lambda k: cats[k])
|
||||
|
||||
# require majority success (2/3)
|
||||
if num_success >= 2:
|
||||
# require success (single target mode)
|
||||
if num_success >= 1:
|
||||
# use last successful result for metrics
|
||||
last_good = successes[-1]
|
||||
|
||||
# Extract exit IP and header info from successful judge responses
|
||||
for s in successes:
|
||||
if s.get('exit_ip'):
|
||||
self.exit_ip = s['exit_ip']
|
||||
if s.get('reveals_headers') is not None:
|
||||
self.reveals_headers = s['reveals_headers']
|
||||
|
||||
if geolite and self.country is None:
|
||||
self.ip = self.rwip(self.ip)
|
||||
rec = geodb.get_all(self.ip)
|
||||
if rec is not None and rec.country_short:
|
||||
self.country = rec.country_short
|
||||
|
||||
if asn_enabled and self.asn is None:
|
||||
try:
|
||||
asn_result = asndb.lookup(self.ip)
|
||||
if asn_result and asn_result[0]:
|
||||
self.asn = asn_result[0]
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
self.proto = last_good['proto']
|
||||
self.failcount = 0
|
||||
if (self.consecutive_success % 3) == 0:
|
||||
@@ -400,23 +571,40 @@ class ProxyTestState():
|
||||
self.last_latency_ms = sum(durations) * 1000.0 / len(durations)
|
||||
|
||||
torstats = "" if len(config.torhosts) == 1 else ' tor: %s;' % last_good['tor']
|
||||
_log('%s://%s:%d (%s) d: %.2f sec(s);%s srv: %s; ssl: %s; %d/%d targets' % (
|
||||
# Determine anonymity for status message
|
||||
# transparent: exit_ip == proxy_ip
|
||||
# anonymous: exit_ip != proxy_ip, adds revealing headers
|
||||
# elite: exit_ip != proxy_ip, no revealing headers
|
||||
anon_status = ''
|
||||
if self.exit_ip:
|
||||
if self.exit_ip == self.ip:
|
||||
anon_status = ' anon: transparent;'
|
||||
elif self.reveals_headers is False:
|
||||
anon_status = ' anon: elite;'
|
||||
elif self.reveals_headers is True:
|
||||
anon_status = ' anon: anonymous;'
|
||||
else:
|
||||
anon_status = ' anon: anonymous;' # default if no header check
|
||||
_log('%s://%s:%d (%s) d: %.2f sec(s);%s%s srv: %s; ssl: %s; %d/%d targets' % (
|
||||
last_good['proto'], self.ip, self.port, self.country,
|
||||
last_good['duration'], torstats, last_good['srv'], str(last_good['ssl']),
|
||||
last_good['duration'], torstats, anon_status, last_good['srv'], str(last_good['ssl']),
|
||||
num_success, self.num_targets), 'xxxxx')
|
||||
return (True, None)
|
||||
|
||||
elif num_success == 1:
|
||||
# partial success - don't increment fail, but reset consecutive
|
||||
self.consecutive_success = 0
|
||||
_log('%s:%d partial success %d/%d targets' % (
|
||||
self.ip, self.port, num_success, self.num_targets), 'debug')
|
||||
return (False, fail_category)
|
||||
|
||||
else:
|
||||
self.failcount += 1
|
||||
self.consecutive_success = 0
|
||||
return (False, fail_category)
|
||||
# Check if failures were all judge blocks (not proxy's fault)
|
||||
judge_blocks = [f for f in failures if f.get('category') == 'judge_block']
|
||||
real_failures = [f for f in failures if f.get('category') != 'judge_block']
|
||||
|
||||
if judge_blocks and not real_failures:
|
||||
# All failures were judge blocks - inconclusive, don't penalize proxy
|
||||
# checktime still updated so we don't immediately retest
|
||||
return (False, 'judge_block')
|
||||
else:
|
||||
# Real proxy failure
|
||||
self.failcount += 1
|
||||
self.consecutive_success = 0
|
||||
return (False, fail_category)
|
||||
|
||||
|
||||
class TargetTestJob():
|
||||
@@ -436,21 +624,64 @@ class TargetTestJob():
|
||||
sock, proto, duration, tor, srv, failinc, is_ssl, err_cat = self._connect_and_test()
|
||||
|
||||
if not sock:
|
||||
self.proxy_state.record_result(False, category=err_cat)
|
||||
return
|
||||
|
||||
try:
|
||||
recv = sock.recv(-1)
|
||||
regex = '^(:|NOTICE|ERROR)' if self.checktype == 'irc' else regexes[srv]
|
||||
|
||||
if re.search(regex, recv, re.IGNORECASE):
|
||||
# SSL-only check passed (handshake success, no request needed)
|
||||
if err_cat == 'ssl_ok':
|
||||
elapsed = time.time() - duration
|
||||
self.proxy_state.record_result(
|
||||
True, proto=proto, duration=elapsed,
|
||||
srv=srv, tor=tor, ssl=is_ssl
|
||||
)
|
||||
else:
|
||||
self.proxy_state.record_result(False, category='other')
|
||||
self.proxy_state.record_result(False, category=err_cat)
|
||||
return
|
||||
|
||||
try:
|
||||
recv = sock.recv(-1)
|
||||
|
||||
# Select regex based on check type
|
||||
if self.checktype == 'irc':
|
||||
regex = '^(:|NOTICE|ERROR)'
|
||||
elif self.checktype == 'judges':
|
||||
regex = judges[srv]
|
||||
elif self.checktype == 'ssl':
|
||||
# Should not reach here - ssl returns before recv
|
||||
self.proxy_state.record_result(True, proto=proto, srv=srv, ssl=is_ssl)
|
||||
return
|
||||
else: # http
|
||||
regex = regexes[srv]
|
||||
|
||||
if re.search(regex, recv, re.IGNORECASE):
|
||||
elapsed = time.time() - duration
|
||||
# Extract exit IP from judge response
|
||||
exit_ip = None
|
||||
reveals_headers = None
|
||||
if self.checktype == 'judges':
|
||||
ip_match = re.search(IP_PATTERN, recv)
|
||||
if ip_match:
|
||||
exit_ip = ip_match.group(0)
|
||||
# Check for header echo judge (elite detection)
|
||||
if 'headers' in srv:
|
||||
# If X-Forwarded-For/Via/etc present, proxy reveals chain
|
||||
reveals_headers = bool(re.search(HEADER_REVEAL_PATTERN, recv, re.IGNORECASE))
|
||||
# Record successful judge
|
||||
judge_stats.record_success(srv)
|
||||
self.proxy_state.record_result(
|
||||
True, proto=proto, duration=elapsed,
|
||||
srv=srv, tor=tor, ssl=is_ssl, exit_ip=exit_ip,
|
||||
reveals_headers=reveals_headers
|
||||
)
|
||||
else:
|
||||
# Check if judge is blocking us (not a proxy failure)
|
||||
if self.checktype == 'judges' and JUDGE_BLOCK_RE.search(recv):
|
||||
judge_stats.record_block(srv)
|
||||
# Don't count as proxy failure - judge is blocking
|
||||
self.proxy_state.record_result(False, category='judge_block')
|
||||
if config.watchd.debug:
|
||||
_log('judge %s blocked proxy %s' % (srv, self.proxy_state.proxy), 'debug')
|
||||
else:
|
||||
if self.checktype == 'judges':
|
||||
judge_stats.record_failure(srv)
|
||||
self.proxy_state.record_result(False, category='other')
|
||||
|
||||
except KeyboardInterrupt as e:
|
||||
sock.disconnect()
|
||||
@@ -464,17 +695,31 @@ class TargetTestJob():
|
||||
"""Connect to target through the proxy and send test packet."""
|
||||
ps = self.proxy_state
|
||||
srvname = self.target_srv.strip() if self.checktype == 'irc' else self.target_srv
|
||||
|
||||
use_ssl = random.choice([0, 1]) if config.watchd.use_ssl == 2 else config.watchd.use_ssl
|
||||
if ps.consecutive_success > 0 and (ps.consecutive_success % 3) == 0:
|
||||
use_ssl = 1
|
||||
|
||||
if self.checktype == 'irc':
|
||||
server_port = 6697 if use_ssl else 6667
|
||||
# For judges, extract host from 'host/path' format
|
||||
if self.checktype == 'judges' and '/' in srvname:
|
||||
connect_host = srvname.split('/')[0]
|
||||
else:
|
||||
server_port = 443 if use_ssl else 80
|
||||
connect_host = srvname
|
||||
|
||||
verifycert = True if use_ssl else False
|
||||
# SSL checktype: always use SSL with certificate verification
|
||||
if self.checktype == 'ssl':
|
||||
use_ssl = 1
|
||||
ssl_only_check = True # handshake only, no HTTP request
|
||||
server_port = 443
|
||||
verifycert = True
|
||||
else:
|
||||
use_ssl = random.choice([0, 1]) if config.watchd.use_ssl == 2 else config.watchd.use_ssl
|
||||
ssl_only_check = False # minimal SSL test (handshake only, no request)
|
||||
if ps.consecutive_success > 0 and (ps.consecutive_success % 3) == 0:
|
||||
use_ssl = 1
|
||||
ssl_only_check = True # periodic MITM check - handshake is sufficient
|
||||
|
||||
if self.checktype == 'irc':
|
||||
server_port = 6697 if use_ssl else 6667
|
||||
else:
|
||||
server_port = 443 if use_ssl else 80
|
||||
|
||||
verifycert = True if use_ssl else False
|
||||
protos = ['http', 'socks5', 'socks4'] if ps.proto is None else [ps.proto]
|
||||
last_error_category = None
|
||||
|
||||
@@ -487,9 +732,9 @@ class TargetTestJob():
|
||||
else:
|
||||
torhost = random.choice(config.torhosts)
|
||||
if proto == 'socks4':
|
||||
srv = socks4_resolve(srvname, server_port)
|
||||
srv = socks4_resolve(connect_host, server_port)
|
||||
else:
|
||||
srv = srvname
|
||||
srv = connect_host
|
||||
if not srv:
|
||||
continue
|
||||
|
||||
@@ -504,9 +749,24 @@ class TargetTestJob():
|
||||
proxies=proxies, timeout=config.watchd.timeout,
|
||||
verifycert=verifycert)
|
||||
sock.connect()
|
||||
|
||||
# SSL-only check: handshake passed, no request needed
|
||||
if ssl_only_check:
|
||||
elapsed = time.time() - duration
|
||||
if pool:
|
||||
pool.record_success(torhost, elapsed)
|
||||
sock.disconnect()
|
||||
return None, proto, duration, torhost, srvname, 0, use_ssl, 'ssl_ok'
|
||||
|
||||
if self.checktype == 'irc':
|
||||
sock.send('NICK\n')
|
||||
else:
|
||||
elif self.checktype == 'judges':
|
||||
# GET request to receive body with IP address
|
||||
sock.send('GET /%s HTTP/1.0\r\nHost: %s\r\n\r\n' % (
|
||||
srvname.split('/', 1)[1] if '/' in srvname else '',
|
||||
srvname.split('/')[0]
|
||||
))
|
||||
else: # http - HEAD is sufficient for header checks
|
||||
sock.send('HEAD / HTTP/1.0\r\nHost: %s\r\n\r\n' % srvname)
|
||||
# Record success in pool
|
||||
if pool:
|
||||
@@ -537,7 +797,7 @@ class TargetTestJob():
|
||||
_log("could not connect to tor, sleep 5s", "ERROR")
|
||||
time.sleep(5)
|
||||
elif et == rocksock.RS_ET_GAI:
|
||||
_log("could not resolve connection target %s" % srvname, "ERROR")
|
||||
_log("could not resolve connection target %s" % connect_host, "ERROR")
|
||||
break
|
||||
elif err == rocksock.RS_E_SSL_CERTIFICATE_ERROR:
|
||||
ps.mitm = 1
|
||||
@@ -649,6 +909,60 @@ class Proxywatchd():
|
||||
target_queue_per_thread=10
|
||||
)
|
||||
self.thread_id_counter = 0
|
||||
self.last_jobs_log = 0
|
||||
self.last_update_log = 0
|
||||
self.log_interval = 60 # seconds between routine log messages
|
||||
|
||||
# Register SIGHUP handler for config reload
|
||||
signal.signal(signal.SIGHUP, self._handle_sighup)
|
||||
|
||||
def _handle_sighup(self, signum, frame):
|
||||
"""Handle SIGHUP signal for config reload."""
|
||||
_log('received SIGHUP, reloading config...', 'watchd')
|
||||
self.reload_config()
|
||||
|
||||
def reload_config(self):
|
||||
"""Reload configuration without restart.
|
||||
|
||||
Hot-reloadable settings:
|
||||
threads, timeout, checktime, perfail_checktime, submit_after,
|
||||
stats_interval, debug, tor_safeguard, stale_days, max_fail,
|
||||
outage_threshold
|
||||
|
||||
Requires restart:
|
||||
database, source_file, checktype
|
||||
"""
|
||||
old_threads = config.watchd.threads
|
||||
old_checktype = config.watchd.checktype
|
||||
|
||||
try:
|
||||
config.load()
|
||||
errors = config.validate()
|
||||
if errors:
|
||||
for e in errors:
|
||||
_log('config error: %s' % e, 'error')
|
||||
_log('config reload failed, keeping old config', 'error')
|
||||
return False
|
||||
except Exception as e:
|
||||
_log('config reload failed: %s' % str(e), 'error')
|
||||
return False
|
||||
|
||||
# Update runtime values
|
||||
self.submit_after = config.watchd.submit_after
|
||||
|
||||
# Update scaler limits
|
||||
self.scaler.min_threads = max(2, config.watchd.threads // 4)
|
||||
self.scaler.max_threads = config.watchd.threads * 2
|
||||
|
||||
# Warn about values requiring restart
|
||||
if config.watchd.checktype != old_checktype:
|
||||
_log('checktype changed (%s -> %s), requires restart to take effect' % (
|
||||
old_checktype, config.watchd.checktype), 'warn')
|
||||
|
||||
_log('config reloaded: threads=%d timeout=%d checktime=%d max_fail=%d' % (
|
||||
config.watchd.threads, config.watchd.timeout,
|
||||
config.watchd.checktime, config.watchd.max_fail), 'watchd')
|
||||
return True
|
||||
|
||||
def _spawn_thread(self):
|
||||
"""Spawn a new worker thread."""
|
||||
@@ -690,8 +1004,16 @@ class Proxywatchd():
|
||||
|
||||
def fetch_rows(self):
|
||||
self.isoldies = False
|
||||
q = 'SELECT ip,port,proto,failed,success_count,total_duration,country,mitm,consecutive_success FROM proxylist WHERE failed >= ? and failed < ? and (tested + ? + (failed * ?)) < ? ORDER BY RANDOM()'
|
||||
rows = self.mysqlite.execute(q, (0, config.watchd.max_fail, config.watchd.checktime, config.watchd.perfail_checktime, time.time())).fetchall()
|
||||
q = 'SELECT ip,port,proto,failed,success_count,total_duration,country,mitm,consecutive_success,asn FROM proxylist WHERE failed >= ? and failed < ? and (tested + ? + (failed * ?)) < ? ORDER BY RANDOM()'
|
||||
now = time.time()
|
||||
params = (0, config.watchd.max_fail, config.watchd.checktime, config.watchd.perfail_checktime, now)
|
||||
if config.watchd.debug:
|
||||
_log('fetch_rows: db=%s checktime=%d perfail=%d max_fail=%d now=%d' % (
|
||||
config.watchd.database, config.watchd.checktime, config.watchd.perfail_checktime,
|
||||
config.watchd.max_fail, int(now)), 'debug')
|
||||
rows = self.mysqlite.execute(q, params).fetchall()
|
||||
if config.watchd.debug:
|
||||
_log('fetch_rows: got %d rows' % len(rows), 'debug')
|
||||
# check oldies ?
|
||||
if len(rows) < config.watchd.threads:
|
||||
rows = []
|
||||
@@ -708,12 +1030,23 @@ class Proxywatchd():
|
||||
self.tor_safeguard = config.watchd.tor_safeguard
|
||||
rows = self.fetch_rows()
|
||||
checktype = config.watchd.checktype
|
||||
num_targets = 3
|
||||
num_targets = 1
|
||||
|
||||
# select target pool based on checktype
|
||||
if checktype == 'irc':
|
||||
target_pool = config.servers
|
||||
else:
|
||||
elif checktype == 'judges':
|
||||
# Filter out judges in cooldown (blocked/rate-limited)
|
||||
all_judges = list(judges.keys())
|
||||
target_pool = judge_stats.get_available_judges(all_judges)
|
||||
if not target_pool:
|
||||
# All judges in cooldown - use all anyway
|
||||
target_pool = all_judges
|
||||
if config.watchd.debug:
|
||||
_log('all judges in cooldown, using full list', 'debug')
|
||||
elif checktype == 'ssl':
|
||||
target_pool = ssl_targets
|
||||
else: # http
|
||||
target_pool = list(regexes.keys())
|
||||
|
||||
# create all jobs first, then shuffle for interleaving
|
||||
@@ -724,7 +1057,7 @@ class Proxywatchd():
|
||||
# create shared state for this proxy
|
||||
state = ProxyTestState(
|
||||
row[0], row[1], row[2], row[3], row[4], row[5],
|
||||
row[6], row[7], row[8], num_targets=num_targets,
|
||||
row[6], row[7], row[8], asn=row[9], num_targets=num_targets,
|
||||
oldies=self.isoldies
|
||||
)
|
||||
new_states.append(state)
|
||||
@@ -756,9 +1089,11 @@ class Proxywatchd():
|
||||
self._close_db()
|
||||
proxy_count = len(new_states)
|
||||
job_count = len(all_jobs)
|
||||
if proxy_count > 0:
|
||||
now = time.time()
|
||||
if proxy_count > 0 and (now - self.last_jobs_log) >= self.log_interval:
|
||||
_log("created %d jobs for %d proxies (%d targets each)" % (
|
||||
job_count, proxy_count, num_targets), 'watchd')
|
||||
self.last_jobs_log = now
|
||||
return job_count
|
||||
|
||||
def collect_work(self):
|
||||
@@ -809,7 +1144,7 @@ class Proxywatchd():
|
||||
latency_updates.append((job.proxy, job.last_latency_ms))
|
||||
args.append((job.failcount, job.checktime, 1, job.country, job.proto,
|
||||
job.success_count, job.total_duration, job.mitm,
|
||||
job.consecutive_success, job.proxy))
|
||||
job.consecutive_success, job.asn, job.proxy))
|
||||
|
||||
success_rate = (float(sc) / len(self.collected)) * 100
|
||||
ret = True
|
||||
@@ -823,20 +1158,28 @@ class Proxywatchd():
|
||||
if job.failcount == 0:
|
||||
args.append((job.failcount, job.checktime, 1, job.country, job.proto,
|
||||
job.success_count, job.total_duration, job.mitm,
|
||||
job.consecutive_success, job.proxy))
|
||||
job.consecutive_success, job.asn, job.proxy))
|
||||
if job.last_latency_ms is not None:
|
||||
latency_updates.append((job.proxy, job.last_latency_ms))
|
||||
ret = False
|
||||
|
||||
_log("updating %d DB entries (success rate: %.2f%%)" % (len(self.collected), success_rate), 'watchd')
|
||||
now = time.time()
|
||||
if (now - self.last_update_log) >= self.log_interval or success_rate > 0:
|
||||
_log("updating %d DB entries (success rate: %.2f%%)" % (len(self.collected), success_rate), 'watchd')
|
||||
self.last_update_log = now
|
||||
self._prep_db()
|
||||
query = 'UPDATE proxylist SET failed=?,tested=?,dronebl=?,country=?,proto=?,success_count=?,total_duration=?,mitm=?,consecutive_success=? WHERE proxy=?'
|
||||
query = 'UPDATE proxylist SET failed=?,tested=?,dronebl=?,country=?,proto=?,success_count=?,total_duration=?,mitm=?,consecutive_success=?,asn=? WHERE proxy=?'
|
||||
self.mysqlite.executemany(query, args)
|
||||
|
||||
# Update latency metrics for successful proxies
|
||||
for proxy, latency_ms in latency_updates:
|
||||
dbs.update_proxy_latency(self.mysqlite, proxy, latency_ms)
|
||||
|
||||
# Update anonymity for proxies with exit IP data
|
||||
for job in self.collected:
|
||||
if job.failcount == 0 and job.exit_ip:
|
||||
dbs.update_proxy_anonymity(self.mysqlite, job.proxy, job.exit_ip, job.ip, job.reveals_headers)
|
||||
|
||||
self.mysqlite.commit()
|
||||
self._close_db()
|
||||
self.collected = []
|
||||
@@ -878,6 +1221,20 @@ class Proxywatchd():
|
||||
|
||||
def _run(self):
|
||||
_log('starting...', 'watchd')
|
||||
_log('config: db=%s checktype=%s threads=%d checktime=%d perfail=%d max_fail=%d' % (
|
||||
config.watchd.database, config.watchd.checktype, config.watchd.threads,
|
||||
config.watchd.checktime, config.watchd.perfail_checktime, config.watchd.max_fail), 'watchd')
|
||||
|
||||
# Log database status at startup
|
||||
self._prep_db()
|
||||
total = self.mysqlite.execute('SELECT COUNT(*) FROM proxylist').fetchone()[0]
|
||||
now = time.time()
|
||||
due = self.mysqlite.execute(
|
||||
'SELECT COUNT(*) FROM proxylist WHERE failed >= 0 and failed < ? and (tested + ? + (failed * ?)) < ?',
|
||||
(config.watchd.max_fail, config.watchd.checktime, config.watchd.perfail_checktime, now)
|
||||
).fetchone()[0]
|
||||
self._close_db()
|
||||
_log('database: %d total proxies, %d due for testing' % (total, due), 'watchd')
|
||||
|
||||
# Initialize Tor connection pool
|
||||
connection_pool.init_pool(config.torhosts, warmup=True)
|
||||
@@ -942,6 +1299,9 @@ class Proxywatchd():
|
||||
pool = connection_pool.get_pool()
|
||||
if pool:
|
||||
_log(pool.status_line(), 'stats')
|
||||
# Report judge stats (when using judges checktype)
|
||||
if config.watchd.checktype == 'judges':
|
||||
_log(judge_stats.status_line(), 'stats')
|
||||
# Report scaler status
|
||||
_log(self.scaler.status_line(len(self.threads), self.job_queue.qsize()), 'scaler')
|
||||
|
||||
|
||||
Reference in New Issue
Block a user