proxywatchd: cap threads by queue size, faster scale-down

This commit is contained in:
Username
2025-12-26 18:12:39 +01:00
parent 8ae639cb94
commit 07262e8b50

View File

@@ -6,12 +6,6 @@ from __future__ import division
from gevent import monkey
monkey.patch_all()
import gevent
from gevent.pool import Pool as GreenPool
from gevent.queue import Queue as GreenQueue
from gevent.event import Event as GreenEvent
from gevent.lock import Semaphore as GreenLock
import threading # Now patched by gevent
import time
import random
@@ -868,7 +862,7 @@ def extract_cert_info(cert_der):
return None
def get_mitm_certificate(proxy_ip, proxy_port, proto, torhost, target_host, target_port, timeout):
def get_mitm_certificate(proxy_ip, proxy_port, proto, torhost, target_host, target_port, timeout, auth=None):
"""Connect to target through proxy without cert verification to get MITM cert.
Args:
@@ -879,14 +873,19 @@ def get_mitm_certificate(proxy_ip, proxy_port, proto, torhost, target_host, targ
target_host: Target host for SSL connection
target_port: Target port (usually 443)
timeout: Connection timeout
auth: Optional auth credentials (user:pass)
Returns:
dict with certificate info or None on failure
"""
try:
if auth:
proxy_url = '%s://%s@%s:%s' % (proto, auth, proxy_ip, proxy_port)
else:
proxy_url = '%s://%s:%s' % (proto, proxy_ip, proxy_port)
proxies = [
rocksock.RocksockProxyFromURL(tor_proxy_url(torhost)),
rocksock.RocksockProxyFromURL('%s://%s:%s' % (proto, proxy_ip, proxy_port)),
rocksock.RocksockProxyFromURL(proxy_url),
]
# Connect without certificate verification
@@ -1038,8 +1037,9 @@ class ThreadScaler(object):
success_rate = try_div(passed * 100.0, total) if total > 0 else 50.0
# Calculate ideal thread count based on queue depth
# Never spawn more threads than jobs available
ideal = max(self.min_threads, queue_size // self.target_queue_per_thread)
ideal = min(ideal, self.max_threads)
ideal = min(ideal, self.max_threads, max(self.min_threads, queue_size))
target = current_threads
@@ -1050,7 +1050,9 @@ class ThreadScaler(object):
# Scale down: queue is shallow
elif queue_size < current_threads * 2:
target = max(current_threads - 2, self.min_threads)
# Scale down faster when threads far exceed ideal
reduction = 2 if current_threads <= ideal * 2 else 5
target = max(current_threads - reduction, ideal, self.min_threads)
if target != current_threads:
self.last_scale_time = now
@@ -1094,10 +1096,16 @@ class ProxyTestState():
"""
def __init__(self, ip, port, proto, failcount, success_count, total_duration,
country, mitm, consecutive_success, asn=None, num_targets=3, oldies=False,
completion_queue=None):
completion_queue=None, proxy_full=None):
self.ip = ip
self.port = int(port)
self.proxy = '%s:%s' % (ip, port)
# Parse auth credentials from full proxy string (user:pass@ip:port)
self.auth = None
if proxy_full and '@' in str(proxy_full):
auth_part = str(proxy_full).rsplit('@', 1)[0]
if ':' in auth_part:
self.auth = auth_part # user:pass
self.proto = proto
self.failcount = failcount
self.checktime = None
@@ -1425,9 +1433,14 @@ class TargetTestJob():
continue
duration = time.time()
# Build proxy URL, including auth credentials if present
if ps.auth:
proxy_url = '%s://%s@%s:%s' % (proto, ps.auth, ps.ip, ps.port)
else:
proxy_url = '%s://%s:%s' % (proto, ps.ip, ps.port)
proxies = [
rocksock.RocksockProxyFromURL(tor_proxy_url(torhost)),
rocksock.RocksockProxyFromURL('%s://%s:%s' % (proto, ps.ip, ps.port)),
rocksock.RocksockProxyFromURL(proxy_url),
]
# Adaptive timeout: give proxies with failures slightly more time
@@ -1499,7 +1512,8 @@ class TargetTestJob():
try:
cert_info = get_mitm_certificate(
ps.ip, ps.port, proto, torhost,
connect_host, server_port, config.watchd.timeout
connect_host, server_port, config.watchd.timeout,
auth=ps.auth
)
if cert_info:
mitm_cert_stats.add_cert(ps.ip, cert_info)
@@ -1526,9 +1540,13 @@ class TargetTestJob():
try:
http_port = 80
# New Tor credentials = new circuit
if ps.auth:
fallback_proxy_url = '%s://%s@%s:%s' % (proto, ps.auth, ps.ip, ps.port)
else:
fallback_proxy_url = '%s://%s:%s' % (proto, ps.ip, ps.port)
http_proxies = [
rocksock.RocksockProxyFromURL(tor_proxy_url(torhost)),
rocksock.RocksockProxyFromURL('%s://%s:%s' % (proto, ps.ip, ps.port)),
rocksock.RocksockProxyFromURL(fallback_proxy_url),
]
http_sock = rocksock.Rocksock(host=connect_host, port=http_port, ssl=0,
proxies=http_proxies, timeout=adaptive_timeout)
@@ -1762,7 +1780,7 @@ class Proxywatchd():
def fetch_rows(self):
self.isoldies = False
q = 'SELECT ip,port,proto,failed,success_count,total_duration,country,mitm,consecutive_success,asn FROM proxylist WHERE failed >= ? and failed < ? and (tested + ? + (failed * ?)) < ? ORDER BY RANDOM()'
q = 'SELECT ip,port,proto,failed,success_count,total_duration,country,mitm,consecutive_success,asn,proxy FROM proxylist WHERE failed >= ? and failed < ? and (tested + ? + (failed * ?)) < ? ORDER BY RANDOM()'
now = time.time()
params = (0, config.watchd.max_fail, config.watchd.checktime, config.watchd.perfail_checktime, now)
if config.watchd.debug:
@@ -1811,10 +1829,13 @@ class Proxywatchd():
for row in rows:
# create shared state for this proxy
# row: ip, port, proto, failed, success_count, total_duration,
# country, mitm, consecutive_success, asn, proxy
state = ProxyTestState(
row[0], row[1], row[2], row[3], row[4], row[5],
row[6], row[7], row[8], asn=row[9], num_targets=num_targets,
oldies=self.isoldies, completion_queue=self.completion_queue
oldies=self.isoldies, completion_queue=self.completion_queue,
proxy_full=row[10]
)
new_states.append(state)