From 1c8e3062b714c4447cb17f2529dd271b106bbf2b Mon Sep 17 00:00:00 2001 From: Username Date: Sun, 28 Dec 2025 14:37:32 +0100 Subject: [PATCH] add configurable thread scaling and queue counter reset - Add scale_cooldown and scale_threshold config options - ThreadScaler now reads scaling params from config - Reset priority queue counter when queue empties to prevent unbounded growth --- config.ini.sample | 20 ++++++++++++++++++++ config.py | 19 ++++++++++++++++--- proxywatchd.py | 16 +++++++++++----- 3 files changed, 47 insertions(+), 8 deletions(-) diff --git a/config.ini.sample b/config.ini.sample index 9547195..dfd904e 100644 --- a/config.ini.sample +++ b/config.ini.sample @@ -67,6 +67,10 @@ source_file = servers.txt # Enable Tor circuit health monitoring tor_safeguard = 1 +# Thread scaling +scale_cooldown = 10 +scale_threshold = 10.0 + # Debug output debug = 0 @@ -95,6 +99,10 @@ max_fail = 5 # Only extract URLs from same domain extract_samedomain = 0 +# Max age in days for proxy list URLs +# URLs older than this that never produced proxies are skipped +list_max_age_days = 7 + # Debug output debug = 0 @@ -140,3 +148,15 @@ listenip = 127.0.0.1 # Listen port port = 8081 + +[worker] +# Distributed worker settings (for --worker mode) + +# Proxies per work batch +batch_size = 100 + +# Heartbeat interval in seconds +heartbeat = 60 + +# Seconds before unclaimed work is released +claim_timeout = 300 diff --git a/config.py b/config.py index fd29b19..715d126 100644 --- a/config.py +++ b/config.py @@ -37,9 +37,9 @@ class Config(ComboParser): if self.ppf.timeout <= 0: errors.append('ppf.timeout must be > 0') - # Validate thread counts - if self.watchd.threads < 1: - errors.append('watchd.threads must be >= 1') + # Validate thread counts (0 allowed for watchd to disable local testing) + if self.watchd.threads < 0: + errors.append('watchd.threads must be >= 0') if self.ppf.threads < 1: errors.append('ppf.threads must be >= 1') if self.scraper.threads < 1: @@ -112,6 +112,8 @@ class Config(ComboParser): self.add_item(section, 'stats_interval', int, 300, 'seconds between status reports (default: 300)', False) self.add_item(section, 'tor_safeguard', bool, True, 'enable tor safeguard (default: True)', False) self.add_item(section, 'checktype', str, 'ssl', 'check type(s): irc, head, judges, ssl (comma-separated for random)', False) + self.add_item(section, 'scale_cooldown', int, 10, 'seconds between thread scaling decisions (default: 10)', False) + self.add_item(section, 'scale_threshold', float, 10.0, 'min success rate % to scale up threads (default: 10.0)', False) section = 'httpd' self.add_item(section, 'listenip', str, '127.0.0.1', 'address for the httpd to listen to (default: 127.0.0.1)', True) @@ -129,6 +131,7 @@ class Config(ComboParser): self.add_item(section, 'max_fail', int, 5, 'number of fails after which an url is considered dead', False) self.add_item(section, 'database', str, 'proxies.sqlite', 'filename of database', True) self.add_item(section, 'extract_samedomain', bool, False, 'extract only url from same domains? (default: False)', False) + self.add_item(section, 'list_max_age_days', int, 7, 'max age in days for proxy list URLs (default: 7)', False) section = 'scraper' self.add_item(section, 'enabled', bool, True, 'enable search engine scraper (default: True)', False) @@ -143,8 +146,18 @@ class Config(ComboParser): self.add_item(section, 'libretranslate_url', str, 'https://lt.mymx.me/translate', 'LibreTranslate API URL (default: https://lt.mymx.me/translate)', False) self.add_item(section, 'libretranslate_enabled', bool, False, 'enable LibreTranslate for dynamic translations (default: False)', False) + section = 'worker' + self.add_item(section, 'batch_size', int, 100, 'proxies per work batch (default: 100)', False) + self.add_item(section, 'heartbeat', int, 60, 'heartbeat interval in seconds (default: 60)', False) + self.add_item(section, 'claim_timeout', int, 300, 'seconds before unclaimed work is released (default: 300)', False) + self.aparser.add_argument("--file", help="import a single file containing proxy addrs", type=str, default='', required=False) self.aparser.add_argument("--nobs", help="disable BeautifulSoup, use stdlib HTMLParser", action='store_true', default=False) self.aparser.add_argument("-q", "--quiet", help="suppress info messages, show warnings and errors only", action='store_true', default=False) self.aparser.add_argument("-v", "--verbose", help="show debug messages", action='store_true', default=False) self.aparser.add_argument("--profile", help="enable cProfile profiling, output to profile.stats", action='store_true', default=False) + self.aparser.add_argument("--worker", help="run as worker node", action='store_true', default=False) + self.aparser.add_argument("--server", help="master server URL (e.g., https://master:8081)", type=str, default='') + self.aparser.add_argument("--worker-key", help="worker authentication key", type=str, default='') + self.aparser.add_argument("--register", help="register as worker with master server", action='store_true', default=False) + self.aparser.add_argument("--worker-name", help="worker name for registration (default: hostname)", type=str, default='') diff --git a/proxywatchd.py b/proxywatchd.py index f0b0a06..5919f31 100644 --- a/proxywatchd.py +++ b/proxywatchd.py @@ -963,6 +963,9 @@ class PriorityJobQueue(object): def put(self, job, priority=3): """Add job with priority (lower = higher priority).""" with self.lock: + # Reset counter when queue was empty to prevent unbounded growth + if not self.heap: + self.counter = 0 heapq.heappush(self.heap, (priority, self.counter, job)) self.counter += 1 self.not_empty.notify() @@ -1042,13 +1045,14 @@ class ThreadScaler(object): - Success rate drops below threshold """ - def __init__(self, min_threads=5, max_threads=100, target_queue_per_thread=5): + def __init__(self, min_threads=5, max_threads=100, target_queue_per_thread=5, + scale_cooldown=10, scale_threshold=10.0): self.min_threads = min_threads self.max_threads = max_threads self.target_queue_per_thread = target_queue_per_thread self.last_scale_time = 0 - self.scale_cooldown = 10 # seconds between scaling (faster with greenlets) - self.success_threshold = 10.0 # minimum success rate % to scale up + self.scale_cooldown = scale_cooldown + self.success_threshold = scale_threshold def should_scale(self, current_threads, queue_size, stats): """Determine if scaling is needed. @@ -1771,8 +1775,10 @@ class Proxywatchd(): min_t = config.watchd.min_threads if config.watchd.min_threads > 0 else max(5, config.watchd.threads // 4) self.scaler = ThreadScaler( min_threads=min_t, - max_threads=config.watchd.threads, # respect configured thread limit - target_queue_per_thread=5 + max_threads=config.watchd.threads, + target_queue_per_thread=5, + scale_cooldown=config.watchd.scale_cooldown, + scale_threshold=config.watchd.scale_threshold ) self.thread_id_counter = 0 self.last_jobs_log = 0