proxywatchd: add stats tracking and httpd integration
- Stats class with failure category tracking - Configurable stats_interval for periodic reports - Optional httpd server startup when enabled - cleanup_stale() for removing dead proxies
This commit is contained in:
146
proxywatchd.py
146
proxywatchd.py
@@ -8,13 +8,13 @@ try:
|
||||
import os
|
||||
geodb = IP2Location.IP2Location(os.path.join("data", "IP2LOCATION-LITE-DB1.BIN"))
|
||||
geolite = True
|
||||
except:
|
||||
except (ImportError, IOError):
|
||||
geolite = False
|
||||
|
||||
from config import Config
|
||||
|
||||
import mysqlite
|
||||
from misc import _log
|
||||
from misc import _log, categorize_error
|
||||
import rocksock
|
||||
|
||||
config = Config()
|
||||
@@ -61,6 +61,48 @@ regexes = {
|
||||
'www.time.com': 'x-amz-cf-pop'
|
||||
}
|
||||
|
||||
|
||||
class Stats():
|
||||
"""Track and report runtime statistics."""
|
||||
|
||||
def __init__(self):
|
||||
self.lock = threading.Lock()
|
||||
self.tested = 0
|
||||
self.passed = 0
|
||||
self.failed = 0
|
||||
self.start_time = time.time()
|
||||
self.last_report = time.time()
|
||||
# Failure category tracking
|
||||
self.fail_categories = {}
|
||||
|
||||
def record(self, success, category=None):
|
||||
with self.lock:
|
||||
self.tested += 1
|
||||
if success:
|
||||
self.passed += 1
|
||||
else:
|
||||
self.failed += 1
|
||||
if category:
|
||||
self.fail_categories[category] = self.fail_categories.get(category, 0) + 1
|
||||
|
||||
def should_report(self, interval):
|
||||
return (time.time() - self.last_report) >= interval
|
||||
|
||||
def report(self):
|
||||
with self.lock:
|
||||
self.last_report = time.time()
|
||||
elapsed = time.time() - self.start_time
|
||||
rate = try_div(self.tested, elapsed)
|
||||
pct = try_div(self.passed * 100.0, self.tested)
|
||||
base = 'tested=%d passed=%d (%.1f%%) rate=%.2f/s uptime=%dm' % (
|
||||
self.tested, self.passed, pct, rate, int(elapsed / 60))
|
||||
# Add failure breakdown if there are failures
|
||||
if self.fail_categories:
|
||||
cats = ' '.join('%s=%d' % (k, v) for k, v in sorted(self.fail_categories.items()))
|
||||
return '%s [%s]' % (base, cats)
|
||||
return base
|
||||
|
||||
|
||||
def try_div(a, b):
|
||||
if b != 0: return a/float(b)
|
||||
return 0
|
||||
@@ -116,7 +158,7 @@ class ProxyTestState():
|
||||
self.results = [] # list of (success, proto, duration, srv, tor, ssl)
|
||||
self.completed = False
|
||||
|
||||
def record_result(self, success, proto=None, duration=0, srv=None, tor=None, ssl=None):
|
||||
def record_result(self, success, proto=None, duration=0, srv=None, tor=None, ssl=None, category=None):
|
||||
"""Record a single target test result. Thread-safe."""
|
||||
with self.lock:
|
||||
self.results.append({
|
||||
@@ -125,7 +167,8 @@ class ProxyTestState():
|
||||
'duration': duration,
|
||||
'srv': srv,
|
||||
'tor': tor,
|
||||
'ssl': ssl
|
||||
'ssl': ssl,
|
||||
'category': category
|
||||
})
|
||||
|
||||
def is_complete(self):
|
||||
@@ -141,16 +184,32 @@ class ProxyTestState():
|
||||
return '.'.join(n)
|
||||
|
||||
def evaluate(self):
|
||||
"""Evaluate results after all tests complete. Returns True if proxy is valid."""
|
||||
"""Evaluate results after all tests complete.
|
||||
|
||||
Returns:
|
||||
(success, category) tuple where success is bool and category is
|
||||
the dominant failure type (or None on success)
|
||||
"""
|
||||
with self.lock:
|
||||
if self.completed:
|
||||
return self.failcount == 0
|
||||
return (self.failcount == 0, None)
|
||||
self.completed = True
|
||||
self.checktime = int(time.time())
|
||||
|
||||
successes = [r for r in self.results if r['success']]
|
||||
failures = [r for r in self.results if not r['success']]
|
||||
num_success = len(successes)
|
||||
|
||||
# Determine dominant failure category
|
||||
fail_category = None
|
||||
if failures:
|
||||
cats = {}
|
||||
for f in failures:
|
||||
cat = f.get('category') or 'other'
|
||||
cats[cat] = cats.get(cat, 0) + 1
|
||||
if cats:
|
||||
fail_category = max(cats.keys(), key=lambda k: cats[k])
|
||||
|
||||
# require majority success (2/3)
|
||||
if num_success >= 2:
|
||||
# use last successful result for metrics
|
||||
@@ -175,19 +234,19 @@ class ProxyTestState():
|
||||
last_good['proto'], self.ip, self.port, self.country,
|
||||
last_good['duration'], torstats, last_good['srv'], str(last_good['ssl']),
|
||||
num_success, self.num_targets), 'xxxxx')
|
||||
return True
|
||||
return (True, None)
|
||||
|
||||
elif num_success == 1:
|
||||
# partial success - don't increment fail, but reset consecutive
|
||||
self.consecutive_success = 0
|
||||
_log('%s:%d partial success %d/%d targets' % (
|
||||
self.ip, self.port, num_success, self.num_targets), 'debug')
|
||||
return False
|
||||
return (False, fail_category)
|
||||
|
||||
else:
|
||||
self.failcount += 1
|
||||
self.consecutive_success = 0
|
||||
return False
|
||||
return (False, fail_category)
|
||||
|
||||
|
||||
class TargetTestJob():
|
||||
@@ -203,10 +262,10 @@ class TargetTestJob():
|
||||
|
||||
def run(self):
|
||||
"""Test the proxy against this job's target server."""
|
||||
sock, proto, duration, tor, srv, failinc, is_ssl = self._connect_and_test()
|
||||
sock, proto, duration, tor, srv, failinc, is_ssl, err_cat = self._connect_and_test()
|
||||
|
||||
if not sock:
|
||||
self.proxy_state.record_result(False)
|
||||
self.proxy_state.record_result(False, category=err_cat)
|
||||
return
|
||||
|
||||
try:
|
||||
@@ -220,13 +279,13 @@ class TargetTestJob():
|
||||
srv=srv, tor=tor, ssl=is_ssl
|
||||
)
|
||||
else:
|
||||
self.proxy_state.record_result(False)
|
||||
self.proxy_state.record_result(False, category='other')
|
||||
|
||||
except KeyboardInterrupt as e:
|
||||
sock.disconnect()
|
||||
raise e
|
||||
except rocksock.RocksockException:
|
||||
self.proxy_state.record_result(False)
|
||||
except rocksock.RocksockException as e:
|
||||
self.proxy_state.record_result(False, category=categorize_error(e))
|
||||
finally:
|
||||
sock.disconnect()
|
||||
|
||||
@@ -246,6 +305,7 @@ class TargetTestJob():
|
||||
|
||||
verifycert = True if use_ssl else False
|
||||
protos = ['http', 'socks5', 'socks4'] if ps.proto is None else [ps.proto]
|
||||
last_error_category = None
|
||||
|
||||
for proto in protos:
|
||||
torhost = random.choice(config.torhosts)
|
||||
@@ -271,12 +331,13 @@ class TargetTestJob():
|
||||
sock.send('NICK\n')
|
||||
else:
|
||||
sock.send('HEAD / HTTP/1.0\r\nHost: %s\r\n\r\n' % srvname)
|
||||
return sock, proto, duration, torhost, srvname, 0, use_ssl
|
||||
return sock, proto, duration, torhost, srvname, 0, use_ssl, None
|
||||
|
||||
except rocksock.RocksockException as e:
|
||||
last_error_category = categorize_error(e)
|
||||
if config.watchd.debug:
|
||||
_log("proxy failed: %s://%s:%d: %s" % (proto, ps.ip, ps.port,
|
||||
e.get_errormessage()), 'debug')
|
||||
_log("proxy failed: %s://%s:%d: %s [%s]" % (proto, ps.ip, ps.port,
|
||||
e.get_errormessage(), last_error_category), 'debug')
|
||||
|
||||
et = e.get_errortype()
|
||||
err = e.get_error()
|
||||
@@ -301,7 +362,7 @@ class TargetTestJob():
|
||||
except KeyboardInterrupt as e:
|
||||
raise e
|
||||
|
||||
return None, None, None, None, None, 1, use_ssl
|
||||
return None, None, None, None, None, 1, use_ssl, last_error_category
|
||||
|
||||
|
||||
class WorkerThread():
|
||||
@@ -350,6 +411,8 @@ class Proxywatchd():
|
||||
wt.term()
|
||||
self.collect_work()
|
||||
self.submit_collected()
|
||||
if self.httpd_server:
|
||||
self.httpd_server.stop()
|
||||
self.stopped.set()
|
||||
|
||||
def finish(self):
|
||||
@@ -391,6 +454,9 @@ class Proxywatchd():
|
||||
'submitted':0,
|
||||
'success':0,
|
||||
}
|
||||
self.stats = Stats()
|
||||
self.last_cleanup = time.time()
|
||||
self.httpd_server = None
|
||||
|
||||
def fetch_rows(self):
|
||||
self.isoldies = False
|
||||
@@ -474,7 +540,8 @@ class Proxywatchd():
|
||||
still_pending = []
|
||||
for state in self.pending_states:
|
||||
if state.is_complete():
|
||||
state.evaluate()
|
||||
success, category = state.evaluate()
|
||||
self.stats.record(success, category)
|
||||
self.collected.append(state)
|
||||
else:
|
||||
still_pending.append(state)
|
||||
@@ -524,6 +591,23 @@ class Proxywatchd():
|
||||
self.totals['success'] += sc
|
||||
return ret
|
||||
|
||||
def cleanup_stale(self):
|
||||
"""Remove proxies that have been dead for too long."""
|
||||
stale_seconds = config.watchd.stale_days * 86400
|
||||
cutoff = int(time.time()) - stale_seconds
|
||||
self._prep_db()
|
||||
# delete proxies that: failed >= max_fail AND last tested before cutoff
|
||||
result = self.mysqlite.execute(
|
||||
'DELETE FROM proxylist WHERE failed >= ? AND tested < ?',
|
||||
(config.watchd.max_fail, cutoff)
|
||||
)
|
||||
count = result.rowcount if hasattr(result, 'rowcount') else 0
|
||||
self.mysqlite.commit()
|
||||
self._close_db()
|
||||
if count > 0:
|
||||
_log('removed %d stale proxies (not seen in %d days)' % (count, config.watchd.stale_days), 'watchd')
|
||||
self.last_cleanup = time.time()
|
||||
|
||||
def start(self):
|
||||
if config.watchd.threads == 1 and _run_standalone:
|
||||
return self._run()
|
||||
@@ -542,6 +626,16 @@ class Proxywatchd():
|
||||
def _run(self):
|
||||
_log('starting...', 'watchd')
|
||||
|
||||
# Start HTTP API server if enabled
|
||||
if config.httpd.enabled:
|
||||
from httpd import ProxyAPIServer
|
||||
self.httpd_server = ProxyAPIServer(
|
||||
config.httpd.listenip,
|
||||
config.httpd.port,
|
||||
config.watchd.database
|
||||
)
|
||||
self.httpd_server.start()
|
||||
|
||||
# create worker threads with shared queues
|
||||
for i in range(config.watchd.threads):
|
||||
threadid = ''.join([random.choice(string.letters) for x in range(5)])
|
||||
@@ -585,6 +679,14 @@ class Proxywatchd():
|
||||
_log("zzZzZzzZ sleeping 1 minute(s) due to tor issues", "watchd")
|
||||
sleeptime = 60
|
||||
|
||||
# periodic stats report
|
||||
if self.stats.should_report(config.watchd.stats_interval):
|
||||
_log(self.stats.report(), 'stats')
|
||||
|
||||
# periodic stale proxy cleanup (daily)
|
||||
if (time.time() - self.last_cleanup) >= 86400:
|
||||
self.cleanup_stale()
|
||||
|
||||
time.sleep(1)
|
||||
|
||||
|
||||
@@ -592,6 +694,12 @@ if __name__ == '__main__':
|
||||
_run_standalone = True
|
||||
|
||||
config.load()
|
||||
errors = config.validate()
|
||||
if errors:
|
||||
for e in errors:
|
||||
_log(e, 'error')
|
||||
import sys
|
||||
sys.exit(1)
|
||||
|
||||
w = Proxywatchd()
|
||||
try:
|
||||
|
||||
Reference in New Issue
Block a user