Files
ppf/proxywatchd.py
Username e24f68500c style: normalize indentation and improve code style
- convert tabs to 4-space indentation
- add docstrings to modules and classes
- remove unused import (copy)
- use explicit object inheritance
- use 'while True' over 'while 1'
- use 'while args' over 'while len(args)'
- use '{}' over 'dict()'
- consistent string formatting
- Python 2/3 compatible Queue import
2025-12-20 23:18:45 +01:00

834 lines
29 KiB
Python

#!/usr/bin/env python2
import threading
import time
import random
import string
import re
import heapq
try:
import Queue
except ImportError:
import queue as Queue
try:
import IP2Location
import os
geodb = IP2Location.IP2Location(os.path.join("data", "IP2LOCATION-LITE-DB1.BIN"))
geolite = True
except (ImportError, IOError):
geolite = False
from config import Config
import mysqlite
from misc import _log, categorize_error
import rocksock
import connection_pool
config = Config()
_run_standalone = False
cached_dns = {}
regexes = {
'www.facebook.com': 'X-FB-Debug',
'www.fbcdn.net': 'X-FB-Debug',
'www.reddit.com': 'x-clacks-overhead',
'www.twitter.com': 'x-connection-hash',
't.co': 'x-connection-hash',
'www.msn.com': 'x-aspnetmvc-version',
'www.bing.com': 'p3p',
'www.ask.com': 'x-served-by',
'www.hotmail.com': 'x-msedge-ref',
'www.bbc.co.uk': 'x-bbc-edge-cache-status',
'www.skype.com': 'X-XSS-Protection',
'www.alibaba.com': 'object-status',
'www.mozilla.org': 'cf-ray',
'www.cloudflare.com': 'cf-ray',
'www.wikimedia.org': 'x-client-ip',
'www.vk.com': 'x-frontend',
'www.tinypic.com': 'x-amz-cf-pop',
'www.netflix.com': 'X-Netflix.proxy.execution-time',
'www.amazon.de': 'x-amz-cf-id',
'www.reuters.com': 'x-amz-cf-id',
'www.ikea.com': 'x-frame-options',
'www.twitpic.com': 'timing-allow-origin',
'www.digg.com': 'cf-request-id',
'www.wikia.com': 'x-served-by',
'www.wp.com': 'x-ac',
'www.last.fm': 'x-timer',
'www.usps.com': 'x-ruleset-version',
'www.linkedin.com': 'x-li-uuid',
'www.vimeo.com': 'x-timer',
'www.yelp.com': 'x-timer',
'www.ebay.com': 'x-envoy-upstream-service-time',
'www.wikihow.com': 'x-c',
'www.archive.org': 'referrer-policy',
'www.pandora.tv': 'X-UA-Compatible',
'www.w3.org': 'x-backend',
'www.time.com': 'x-amz-cf-pop'
}
class Stats():
"""Track and report runtime statistics."""
def __init__(self):
self.lock = threading.Lock()
self.tested = 0
self.passed = 0
self.failed = 0
self.start_time = time.time()
self.last_report = time.time()
# Failure category tracking
self.fail_categories = {}
def record(self, success, category=None):
with self.lock:
self.tested += 1
if success:
self.passed += 1
else:
self.failed += 1
if category:
self.fail_categories[category] = self.fail_categories.get(category, 0) + 1
def should_report(self, interval):
return (time.time() - self.last_report) >= interval
def report(self):
with self.lock:
self.last_report = time.time()
elapsed = time.time() - self.start_time
rate = try_div(self.tested, elapsed)
pct = try_div(self.passed * 100.0, self.tested)
base = 'tested=%d passed=%d (%.1f%%) rate=%.2f/s uptime=%dm' % (
self.tested, self.passed, pct, rate, int(elapsed / 60))
# Add failure breakdown if there are failures
if self.fail_categories:
cats = ' '.join('%s=%d' % (k, v) for k, v in sorted(self.fail_categories.items()))
return '%s [%s]' % (base, cats)
return base
def try_div(a, b):
if b != 0: return a/float(b)
return 0
class PriorityJobQueue(object):
"""Priority queue for proxy test jobs.
Lower priority number = higher priority.
Priority 0: New proxies (never tested)
Priority 1: Recently working (no failures, has successes)
Priority 2: Low fail count (< 3 failures)
Priority 3: Medium fail count
Priority 4: High fail count
"""
def __init__(self):
self.heap = []
self.lock = threading.Lock()
self.not_empty = threading.Condition(self.lock)
self.counter = 0 # tie-breaker for equal priorities
def put(self, job, priority=3):
"""Add job with priority (lower = higher priority)."""
with self.lock:
heapq.heappush(self.heap, (priority, self.counter, job))
self.counter += 1
self.not_empty.notify()
def get(self, timeout=None):
"""Get highest priority job. Raises Queue.Empty on timeout."""
with self.not_empty:
if timeout is None:
while not self.heap:
self.not_empty.wait()
else:
end_time = time.time() + timeout
while not self.heap:
remaining = end_time - time.time()
if remaining <= 0:
raise Queue.Empty()
self.not_empty.wait(remaining)
_, _, job = heapq.heappop(self.heap)
return job
def get_nowait(self):
"""Get job without waiting. Raises Queue.Empty if empty."""
with self.lock:
if not self.heap:
raise Queue.Empty()
_, _, job = heapq.heappop(self.heap)
return job
def empty(self):
"""Check if queue is empty."""
with self.lock:
return len(self.heap) == 0
def qsize(self):
"""Return queue size."""
with self.lock:
return len(self.heap)
def task_done(self):
"""Compatibility method (no-op for heap queue)."""
pass
def calculate_priority(failcount, success_count, max_fail):
"""Calculate job priority based on proxy state.
Returns:
int: Priority 0-4 (lower = higher priority)
"""
# New proxy (never successfully tested)
if success_count == 0 and failcount == 0:
return 0
# Recently working (no current failures)
if failcount == 0:
return 1
# Low fail count
if failcount < 3:
return 2
# Medium fail count
if failcount < max_fail // 2:
return 3
# High fail count
return 4
def socks4_resolve(srvname, server_port):
srv = srvname
if srv in cached_dns:
srv = cached_dns[srvname]
if config.watchd.debug:
_log("using cached ip (%s) for %s"%(srv, srvname), "debug")
else:
dns_fail = False
try:
af, sa = rocksock.resolve(rocksock.RocksockHostinfo(srvname, server_port), want_v4=True)
if sa is not None:
cached_dns[srvname] = sa[0]
srv = sa[0]
else: dns_fail = True
except rocksock.RocksockException as e:
assert(e.get_errortype() == rocksock.RS_ET_GAI)
dns_fail = True
if dns_fail:
fail_inc = 0
_log("could not resolve connection target %s"%srvname, "ERROR")
return False
return srv
class ProxyTestState():
"""Thread-safe state for a proxy being tested against multiple targets.
Results from TargetTestJob instances are aggregated here.
When all tests complete, evaluate() determines final pass/fail.
"""
def __init__(self, ip, port, proto, failcount, success_count, total_duration,
country, mitm, consecutive_success, num_targets=3, oldies=False):
self.ip = ip
self.port = int(port)
self.proxy = '%s:%s' % (ip, port)
self.proto = proto
self.failcount = failcount
self.checktime = None
self.success_count = success_count
self.total_duration = total_duration
self.country = country
self.mitm = mitm
self.consecutive_success = consecutive_success
self.isoldies = oldies
self.num_targets = num_targets
# thread-safe result accumulation
self.lock = threading.Lock()
self.results = [] # list of (success, proto, duration, srv, tor, ssl)
self.completed = False
def record_result(self, success, proto=None, duration=0, srv=None, tor=None, ssl=None, category=None):
"""Record a single target test result. Thread-safe."""
with self.lock:
self.results.append({
'success': success,
'proto': proto,
'duration': duration,
'srv': srv,
'tor': tor,
'ssl': ssl,
'category': category
})
def is_complete(self):
"""Check if all target tests have finished."""
with self.lock:
return len(self.results) >= self.num_targets
def rwip(self, ip):
n = []
for b in ip.split('.'):
while b[0] == 0 and len(b) > 1: b = b[:1]
n.append(b)
return '.'.join(n)
def evaluate(self):
"""Evaluate results after all tests complete.
Returns:
(success, category) tuple where success is bool and category is
the dominant failure type (or None on success)
"""
with self.lock:
if self.completed:
return (self.failcount == 0, None)
self.completed = True
self.checktime = int(time.time())
successes = [r for r in self.results if r['success']]
failures = [r for r in self.results if not r['success']]
num_success = len(successes)
# Determine dominant failure category
fail_category = None
if failures:
cats = {}
for f in failures:
cat = f.get('category') or 'other'
cats[cat] = cats.get(cat, 0) + 1
if cats:
fail_category = max(cats.keys(), key=lambda k: cats[k])
# require majority success (2/3)
if num_success >= 2:
# use last successful result for metrics
last_good = successes[-1]
if geolite and self.country is None:
self.ip = self.rwip(self.ip)
rec = geodb.get_all(self.ip)
if rec is not None and rec.country_short:
self.country = rec.country_short
self.proto = last_good['proto']
self.failcount = 0
if (self.consecutive_success % 3) == 0:
self.mitm = 0
self.consecutive_success += 1
self.success_count += 1
self.total_duration += int(last_good['duration'] * 1000)
torstats = "" if len(config.torhosts) == 1 else ' tor: %s;' % last_good['tor']
_log('%s://%s:%d (%s) d: %.2f sec(s);%s srv: %s; ssl: %s; %d/%d targets' % (
last_good['proto'], self.ip, self.port, self.country,
last_good['duration'], torstats, last_good['srv'], str(last_good['ssl']),
num_success, self.num_targets), 'xxxxx')
return (True, None)
elif num_success == 1:
# partial success - don't increment fail, but reset consecutive
self.consecutive_success = 0
_log('%s:%d partial success %d/%d targets' % (
self.ip, self.port, num_success, self.num_targets), 'debug')
return (False, fail_category)
else:
self.failcount += 1
self.consecutive_success = 0
return (False, fail_category)
class TargetTestJob():
"""Job to test a single proxy against a single target.
Multiple TargetTestJob instances share the same ProxyTestState,
allowing tests to be interleaved with other proxies in the queue.
"""
def __init__(self, proxy_state, target_srv, checktype, worker_id=None):
self.proxy_state = proxy_state
self.target_srv = target_srv
self.checktype = checktype
self.worker_id = worker_id
def run(self):
"""Test the proxy against this job's target server."""
sock, proto, duration, tor, srv, failinc, is_ssl, err_cat = self._connect_and_test()
if not sock:
self.proxy_state.record_result(False, category=err_cat)
return
try:
recv = sock.recv(-1)
regex = '^(:|NOTICE|ERROR)' if self.checktype == 'irc' else regexes[srv]
if re.search(regex, recv, re.IGNORECASE):
elapsed = time.time() - duration
self.proxy_state.record_result(
True, proto=proto, duration=elapsed,
srv=srv, tor=tor, ssl=is_ssl
)
else:
self.proxy_state.record_result(False, category='other')
except KeyboardInterrupt as e:
sock.disconnect()
raise e
except rocksock.RocksockException as e:
self.proxy_state.record_result(False, category=categorize_error(e))
finally:
sock.disconnect()
def _connect_and_test(self):
"""Connect to target through the proxy and send test packet."""
ps = self.proxy_state
srvname = self.target_srv.strip() if self.checktype == 'irc' else self.target_srv
use_ssl = random.choice([0, 1]) if config.watchd.use_ssl == 2 else config.watchd.use_ssl
if ps.consecutive_success > 0 and (ps.consecutive_success % 3) == 0:
use_ssl = 1
if self.checktype == 'irc':
server_port = 6697 if use_ssl else 6667
else:
server_port = 443 if use_ssl else 80
verifycert = True if use_ssl else False
protos = ['http', 'socks5', 'socks4'] if ps.proto is None else [ps.proto]
last_error_category = None
# Get Tor host from pool (with worker affinity)
pool = connection_pool.get_pool()
for proto in protos:
if pool:
torhost = pool.get_tor_host(self.worker_id)
else:
torhost = random.choice(config.torhosts)
if proto == 'socks4':
srv = socks4_resolve(srvname, server_port)
else:
srv = srvname
if not srv:
continue
duration = time.time()
proxies = [
rocksock.RocksockProxyFromURL('socks5://%s' % torhost),
rocksock.RocksockProxyFromURL('%s://%s:%s' % (proto, ps.ip, ps.port)),
]
try:
sock = rocksock.Rocksock(host=srv, port=server_port, ssl=use_ssl,
proxies=proxies, timeout=config.watchd.timeout,
verifycert=verifycert)
sock.connect()
if self.checktype == 'irc':
sock.send('NICK\n')
else:
sock.send('HEAD / HTTP/1.0\r\nHost: %s\r\n\r\n' % srvname)
# Record success in pool
if pool:
pool.record_success(torhost, time.time() - duration)
return sock, proto, duration, torhost, srvname, 0, use_ssl, None
except rocksock.RocksockException as e:
last_error_category = categorize_error(e)
if config.watchd.debug:
_log("proxy failed: %s://%s:%d: %s [%s]" % (proto, ps.ip, ps.port,
e.get_errormessage(), last_error_category), 'debug')
et = e.get_errortype()
err = e.get_error()
fp = e.get_failedproxy()
sock.disconnect()
if et == rocksock.RS_ET_OWN:
if fp == 1 and (err == rocksock.RS_E_REMOTE_DISCONNECTED or
err == rocksock.RS_E_HIT_TIMEOUT):
break
elif fp == 0 and err == rocksock.RS_E_TARGET_CONN_REFUSED:
# Tor connection failed - record in pool
if pool:
pool.record_failure(torhost)
if random.randint(0, (config.watchd.threads - 1) / 2) == 0:
_log("could not connect to tor, sleep 5s", "ERROR")
time.sleep(5)
elif et == rocksock.RS_ET_GAI:
_log("could not resolve connection target %s" % srvname, "ERROR")
break
elif err == rocksock.RS_E_SSL_CERTIFICATE_ERROR:
ps.mitm = 1
except KeyboardInterrupt as e:
raise e
return None, None, None, None, None, 1, use_ssl, last_error_category
class WorkerThread():
def __init__(self, id, job_queue, result_queue):
self.id = id
self.done = threading.Event()
self.thread = None
self.job_queue = job_queue # shared input queue
self.result_queue = result_queue # shared output queue
def stop(self):
self.done.set()
def term(self):
if self.thread: self.thread.join()
def start_thread(self):
self.thread = threading.Thread(target=self.workloop)
self.thread.start()
def workloop(self):
job_count = 0
duration_total = 0
while not self.done.is_set():
try:
job = self.job_queue.get(timeout=0.5)
except Queue.Empty:
continue
nao = time.time()
# Assign worker ID for connection pool affinity
job.worker_id = self.id
job.run()
spent = time.time() - nao
job_count += 1
duration_total += spent
self.result_queue.put(job)
self.job_queue.task_done()
if self.thread and job_count > 0:
avg_t = try_div(duration_total, job_count)
_log("terminated, %d jobs, avg.time %.2f" % (job_count, avg_t), self.id)
class Proxywatchd():
def stop(self):
_log('halting... (%d thread(s))' % len(self.threads), 'watchd')
self.stopping.set()
def _cleanup(self):
for wt in self.threads:
wt.stop()
for wt in self.threads:
wt.term()
self.collect_work()
self.submit_collected()
if self.httpd_server:
self.httpd_server.stop()
self.stopped.set()
def finish(self):
if not self.in_background: self._cleanup()
while not self.stopped.is_set(): time.sleep(0.1)
success_rate = try_div(self.totals['success'], self.totals['submitted']) * 100
_log("total results: %d/%d (%.2f%%)"%(self.totals['success'], self.totals['submitted'], success_rate), "watchd")
def _prep_db(self):
self.mysqlite = mysqlite.mysqlite(config.watchd.database, str)
def _close_db(self):
if self.mysqlite:
self.mysqlite.close()
self.mysqlite = None
def __init__(self):
config.load()
self.in_background = False
self.threads = []
self.stopping = threading.Event()
self.stopped = threading.Event()
# shared work-stealing queues
self.job_queue = PriorityJobQueue()
self.result_queue = Queue.Queue()
# track pending proxy states (for multi-target aggregation)
self.pending_states = [] # list of ProxyTestState awaiting completion
self.pending_lock = threading.Lock()
# create table if needed
self._prep_db()
self.mysqlite.execute('CREATE TABLE IF NOT EXISTS proxylist (proxy BLOB, country BLOB, added INT, failed INT, tested INT, source BLOB, dronebl INT, proto TEXT, success_count INT, total_duration INT, ip TEXT, port INT)')
self.mysqlite.commit()
self._close_db()
self.submit_after = config.watchd.submit_after # number of collected jobs before writing db
self.collected = [] # completed ProxyTestState objects ready for DB
self.totals = {
'submitted':0,
'success':0,
}
self.stats = Stats()
self.last_cleanup = time.time()
self.httpd_server = None
def fetch_rows(self):
self.isoldies = False
q = 'SELECT ip,port,proto,failed,success_count,total_duration,country,mitm,consecutive_success FROM proxylist WHERE failed >= ? and failed < ? and (tested + ? + (failed * ?)) < ? ORDER BY RANDOM()'
rows = self.mysqlite.execute(q, (0, config.watchd.max_fail, config.watchd.checktime, config.watchd.perfail_checktime, time.time())).fetchall()
# check oldies ?
if len(rows) < config.watchd.threads:
rows = []
if config.watchd.oldies:
self.isoldies = True
## disable tor safeguard for old proxies
if self.tor_safeguard: self.tor_safeguard = False
rows = self.mysqlite.execute(q, (config.watchd.max_fail, config.watchd.max_fail + round( config.watchd.max_fail/2), config.watchd.checktime, config.watchd.oldies_checktime, time.time())).fetchall()
return rows
def prepare_jobs(self):
self._prep_db()
## enable tor safeguard by default
self.tor_safeguard = config.watchd.tor_safeguard
rows = self.fetch_rows()
checktype = config.watchd.checktype
num_targets = 3
# select target pool based on checktype
if checktype == 'irc':
target_pool = config.servers
else:
target_pool = list(regexes.keys())
# create all jobs first, then shuffle for interleaving
all_jobs = []
new_states = []
for row in rows:
# create shared state for this proxy
state = ProxyTestState(
row[0], row[1], row[2], row[3], row[4], row[5],
row[6], row[7], row[8], num_targets=num_targets,
oldies=self.isoldies
)
new_states.append(state)
# select random targets for this proxy
targets = random.sample(target_pool, min(num_targets, len(target_pool)))
# create one job per target
for target in targets:
job = TargetTestJob(state, target, checktype)
all_jobs.append(job)
# shuffle to interleave tests across different proxies
random.shuffle(all_jobs)
# track pending states
with self.pending_lock:
self.pending_states.extend(new_states)
# queue all jobs with priority
for job in all_jobs:
priority = calculate_priority(
job.proxy_state.failcount,
job.proxy_state.success_count,
config.watchd.max_fail
)
self.job_queue.put(job, priority)
self._close_db()
proxy_count = len(new_states)
job_count = len(all_jobs)
if proxy_count > 0:
_log("created %d jobs for %d proxies (%d targets each)" % (
job_count, proxy_count, num_targets), 'watchd')
return job_count
def collect_work(self):
# drain results from shared result queue (TargetTestJob objects)
# results are already recorded in their ProxyTestState
while True:
try:
self.result_queue.get_nowait()
except Queue.Empty:
break
# check for completed proxy states and evaluate them
with self.pending_lock:
still_pending = []
for state in self.pending_states:
if state.is_complete():
success, category = state.evaluate()
self.stats.record(success, category)
self.collected.append(state)
else:
still_pending.append(state)
self.pending_states = still_pending
def collect_unfinished(self):
# drain any remaining jobs from job queue
unfinished_count = 0
while True:
try:
self.job_queue.get_nowait()
unfinished_count += 1
except Queue.Empty:
break
if unfinished_count > 0:
_log("discarded %d unfinished jobs" % unfinished_count, "watchd")
# note: corresponding ProxyTestStates will be incomplete
# they'll be re-tested in the next cycle
def submit_collected(self):
if len(self.collected) == 0: return True
sc = 0
args = []
for job in self.collected:
if job.failcount == 0: sc += 1
args.append( (job.failcount, job.checktime, 1, job.country, job.proto, job.success_count, job.total_duration, job.mitm, job.consecutive_success, job.proxy) )
success_rate = (float(sc) / len(self.collected)) * 100
ret = True
if len(self.collected) >= 100 and success_rate <= config.watchd.outage_threshold and self.tor_safeguard:
_log("WATCHD %.2f%% SUCCESS RATE - tor circuit blocked? won't submit fails"%success_rate, "ERROR")
if sc == 0: return False
args = []
for job in self.collected:
if job.failcount == 0:
args.append( (job.failcount, job.checktime, 1, job.country, job.proto, job.success_count, job.total_duration, job.mitm, job.consecutive_success, job.proxy) )
ret = False
_log("updating %d DB entries (success rate: %.2f%%)"%(len(self.collected), success_rate), 'watchd')
self._prep_db()
query = 'UPDATE proxylist SET failed=?,tested=?,dronebl=?,country=?,proto=?,success_count=?,total_duration=?,mitm=?,consecutive_success=? WHERE proxy=?'
self.mysqlite.executemany(query, args)
self.mysqlite.commit()
self._close_db()
self.collected = []
self.totals['submitted'] += len(args)
self.totals['success'] += sc
return ret
def cleanup_stale(self):
"""Remove proxies that have been dead for too long."""
stale_seconds = config.watchd.stale_days * 86400
cutoff = int(time.time()) - stale_seconds
self._prep_db()
# delete proxies that: failed >= max_fail AND last tested before cutoff
result = self.mysqlite.execute(
'DELETE FROM proxylist WHERE failed >= ? AND tested < ?',
(config.watchd.max_fail, cutoff)
)
count = result.rowcount if hasattr(result, 'rowcount') else 0
self.mysqlite.commit()
self._close_db()
if count > 0:
_log('removed %d stale proxies (not seen in %d days)' % (count, config.watchd.stale_days), 'watchd')
self.last_cleanup = time.time()
def start(self):
if config.watchd.threads == 1 and _run_standalone:
return self._run()
else:
return self._run_background()
def run(self):
if self.in_background:
while 1: time.sleep(1)
def _run_background(self):
self.in_background = True
t = threading.Thread(target=self._run)
t.start()
def _run(self):
_log('starting...', 'watchd')
# Initialize Tor connection pool
connection_pool.init_pool(config.torhosts, warmup=True)
# Start HTTP API server if enabled
if config.httpd.enabled:
from httpd import ProxyAPIServer
self.httpd_server = ProxyAPIServer(
config.httpd.listenip,
config.httpd.port,
config.watchd.database
)
self.httpd_server.start()
# create worker threads with shared queues
for i in range(config.watchd.threads):
threadid = ''.join([random.choice(string.letters) for x in range(5)])
wt = WorkerThread(threadid, self.job_queue, self.result_queue)
if self.in_background:
wt.start_thread()
self.threads.append(wt)
time.sleep(random.random() / 10)
sleeptime = 0
while True:
if self.stopping.is_set():
if self.in_background: self._cleanup()
break
if sleeptime > 0:
time.sleep(1)
sleeptime -= 1
continue
# check if job queue is empty (work-stealing: threads pull as needed)
if self.job_queue.empty():
self.collect_work()
if not self.submit_collected() and self.tor_safeguard:
_log("zzZzZzzZ sleeping 1 minute(s) due to tor issues", "watchd")
sleeptime = 60
else:
job_count = self.prepare_jobs()
if job_count == 0:
# no jobs available, wait before checking again
sleeptime = 10
if not self.in_background: # single_thread scenario
self.threads[0].workloop()
self.collect_work()
if len(self.collected) > self.submit_after:
if not self.submit_collected() and self.tor_safeguard:
_log("zzZzZzzZ sleeping 1 minute(s) due to tor issues", "watchd")
sleeptime = 60
# periodic stats report
if self.stats.should_report(config.watchd.stats_interval):
_log(self.stats.report(), 'stats')
# Also report pool stats
pool = connection_pool.get_pool()
if pool:
_log(pool.status_line(), 'stats')
# periodic stale proxy cleanup (daily)
if (time.time() - self.last_cleanup) >= 86400:
self.cleanup_stale()
time.sleep(1)
if __name__ == '__main__':
_run_standalone = True
config.load()
errors = config.validate()
if errors:
for e in errors:
_log(e, 'error')
import sys
sys.exit(1)
w = Proxywatchd()
try:
w.start()
w.run()
except KeyboardInterrupt as e:
pass
finally:
w.stop()
w.finish()