diff --git a/proxywatchd.py b/proxywatchd.py index 53bd15a..043d06f 100644 --- a/proxywatchd.py +++ b/proxywatchd.py @@ -6,12 +6,6 @@ from __future__ import division from gevent import monkey monkey.patch_all() -import gevent -from gevent.pool import Pool as GreenPool -from gevent.queue import Queue as GreenQueue -from gevent.event import Event as GreenEvent -from gevent.lock import Semaphore as GreenLock - import threading # Now patched by gevent import time import random @@ -868,7 +862,7 @@ def extract_cert_info(cert_der): return None -def get_mitm_certificate(proxy_ip, proxy_port, proto, torhost, target_host, target_port, timeout): +def get_mitm_certificate(proxy_ip, proxy_port, proto, torhost, target_host, target_port, timeout, auth=None): """Connect to target through proxy without cert verification to get MITM cert. Args: @@ -879,14 +873,19 @@ def get_mitm_certificate(proxy_ip, proxy_port, proto, torhost, target_host, targ target_host: Target host for SSL connection target_port: Target port (usually 443) timeout: Connection timeout + auth: Optional auth credentials (user:pass) Returns: dict with certificate info or None on failure """ try: + if auth: + proxy_url = '%s://%s@%s:%s' % (proto, auth, proxy_ip, proxy_port) + else: + proxy_url = '%s://%s:%s' % (proto, proxy_ip, proxy_port) proxies = [ rocksock.RocksockProxyFromURL(tor_proxy_url(torhost)), - rocksock.RocksockProxyFromURL('%s://%s:%s' % (proto, proxy_ip, proxy_port)), + rocksock.RocksockProxyFromURL(proxy_url), ] # Connect without certificate verification @@ -1038,8 +1037,9 @@ class ThreadScaler(object): success_rate = try_div(passed * 100.0, total) if total > 0 else 50.0 # Calculate ideal thread count based on queue depth + # Never spawn more threads than jobs available ideal = max(self.min_threads, queue_size // self.target_queue_per_thread) - ideal = min(ideal, self.max_threads) + ideal = min(ideal, self.max_threads, max(self.min_threads, queue_size)) target = current_threads @@ -1050,7 +1050,9 @@ class ThreadScaler(object): # Scale down: queue is shallow elif queue_size < current_threads * 2: - target = max(current_threads - 2, self.min_threads) + # Scale down faster when threads far exceed ideal + reduction = 2 if current_threads <= ideal * 2 else 5 + target = max(current_threads - reduction, ideal, self.min_threads) if target != current_threads: self.last_scale_time = now @@ -1094,10 +1096,16 @@ class ProxyTestState(): """ def __init__(self, ip, port, proto, failcount, success_count, total_duration, country, mitm, consecutive_success, asn=None, num_targets=3, oldies=False, - completion_queue=None): + completion_queue=None, proxy_full=None): self.ip = ip self.port = int(port) self.proxy = '%s:%s' % (ip, port) + # Parse auth credentials from full proxy string (user:pass@ip:port) + self.auth = None + if proxy_full and '@' in str(proxy_full): + auth_part = str(proxy_full).rsplit('@', 1)[0] + if ':' in auth_part: + self.auth = auth_part # user:pass self.proto = proto self.failcount = failcount self.checktime = None @@ -1425,9 +1433,14 @@ class TargetTestJob(): continue duration = time.time() + # Build proxy URL, including auth credentials if present + if ps.auth: + proxy_url = '%s://%s@%s:%s' % (proto, ps.auth, ps.ip, ps.port) + else: + proxy_url = '%s://%s:%s' % (proto, ps.ip, ps.port) proxies = [ rocksock.RocksockProxyFromURL(tor_proxy_url(torhost)), - rocksock.RocksockProxyFromURL('%s://%s:%s' % (proto, ps.ip, ps.port)), + rocksock.RocksockProxyFromURL(proxy_url), ] # Adaptive timeout: give proxies with failures slightly more time @@ -1499,7 +1512,8 @@ class TargetTestJob(): try: cert_info = get_mitm_certificate( ps.ip, ps.port, proto, torhost, - connect_host, server_port, config.watchd.timeout + connect_host, server_port, config.watchd.timeout, + auth=ps.auth ) if cert_info: mitm_cert_stats.add_cert(ps.ip, cert_info) @@ -1526,9 +1540,13 @@ class TargetTestJob(): try: http_port = 80 # New Tor credentials = new circuit + if ps.auth: + fallback_proxy_url = '%s://%s@%s:%s' % (proto, ps.auth, ps.ip, ps.port) + else: + fallback_proxy_url = '%s://%s:%s' % (proto, ps.ip, ps.port) http_proxies = [ rocksock.RocksockProxyFromURL(tor_proxy_url(torhost)), - rocksock.RocksockProxyFromURL('%s://%s:%s' % (proto, ps.ip, ps.port)), + rocksock.RocksockProxyFromURL(fallback_proxy_url), ] http_sock = rocksock.Rocksock(host=connect_host, port=http_port, ssl=0, proxies=http_proxies, timeout=adaptive_timeout) @@ -1762,7 +1780,7 @@ class Proxywatchd(): def fetch_rows(self): self.isoldies = False - q = 'SELECT ip,port,proto,failed,success_count,total_duration,country,mitm,consecutive_success,asn FROM proxylist WHERE failed >= ? and failed < ? and (tested + ? + (failed * ?)) < ? ORDER BY RANDOM()' + q = 'SELECT ip,port,proto,failed,success_count,total_duration,country,mitm,consecutive_success,asn,proxy FROM proxylist WHERE failed >= ? and failed < ? and (tested + ? + (failed * ?)) < ? ORDER BY RANDOM()' now = time.time() params = (0, config.watchd.max_fail, config.watchd.checktime, config.watchd.perfail_checktime, now) if config.watchd.debug: @@ -1811,10 +1829,13 @@ class Proxywatchd(): for row in rows: # create shared state for this proxy + # row: ip, port, proto, failed, success_count, total_duration, + # country, mitm, consecutive_success, asn, proxy state = ProxyTestState( row[0], row[1], row[2], row[3], row[4], row[5], row[6], row[7], row[8], asn=row[9], num_targets=num_targets, - oldies=self.isoldies, completion_queue=self.completion_queue + oldies=self.isoldies, completion_queue=self.completion_queue, + proxy_full=row[10] ) new_states.append(state)