From 821ade95ef05851c533525edb72391df7af77aeb Mon Sep 17 00:00:00 2001 From: Username Date: Wed, 18 Feb 2026 01:37:09 +0100 Subject: [PATCH] worker: add local proxy test cache Skip redundant proxy tests across URL batches using a memory-only TTL cache (default 300s, configurable via worker.cache_ttl). --- config.py | 1 + ppf.py | 57 +++++++++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 58 insertions(+) diff --git a/config.py b/config.py index 499b047..593d405 100644 --- a/config.py +++ b/config.py @@ -170,6 +170,7 @@ class Config(ComboParser): self.add_item(section, 'heartbeat', int, 60, 'heartbeat interval in seconds (default: 60)', False) self.add_item(section, 'url_batch_size', int, 5, 'URLs per claim cycle (default: 5)', False) self.add_item(section, 'fetch_timeout', int, 30, 'timeout for URL fetching (default: 30)', False) + self.add_item(section, 'cache_ttl', int, 300, 'local proxy test cache TTL in seconds, 0 to disable (default: 300)', False) self.aparser.add_argument("--file", help="import a single file containing proxy addrs", type=str, default='', required=False) self.aparser.add_argument("--nobs", help="disable BeautifulSoup, use stdlib HTMLParser", action='store_true', default=False) diff --git a/ppf.py b/ppf.py index baffbd6..92b86e2 100644 --- a/ppf.py +++ b/ppf.py @@ -474,6 +474,7 @@ def worker_main(config): _log(' server: %s' % server_url, 'info') _log(' threads: %d' % num_threads, 'info') _log(' url batch: %d' % url_batch_size, 'info') + _log(' cache ttl: %s' % ('%ds' % config.worker.cache_ttl if config.worker.cache_ttl > 0 else 'disabled'), 'info') _log(' tor hosts: %s' % config.common.tor_hosts, 'info') # Verify Tor connectivity before starting @@ -532,6 +533,10 @@ def worker_main(config): worker_profiling = config.args.profile or config.common.profiling wstate = {'worker_key': worker_key, 'worker_id': worker_id, 'backoff': 10} + # Local proxy test cache: addr -> (timestamp, success, result_dict_or_None) + cache_ttl = config.worker.cache_ttl + proxy_cache = {} if cache_ttl > 0 else None + def do_register(): """Register with master, with exponential backoff on failure.""" while True: @@ -710,6 +715,33 @@ def worker_main(config): time.sleep(1) continue + # Filter against local test cache + cached_working = [] + if proxy_cache is not None: + now = time.time() + uncached = [] + cache_hits = 0 + for addr, pr, conf in unique_proxies: + # Normalize to ip:port for cache lookup (strip auth prefix) + cache_key = addr.split('@')[-1] if '@' in addr else addr + entry = proxy_cache.get(cache_key) + if entry and (now - entry[0]) < cache_ttl: + cache_hits += 1 + if entry[1]: # cached success + cached_working.append(entry[2]) + else: + uncached.append((addr, pr, conf)) + if cache_hits: + _log('%d cached (%d working), %d to test' % ( + cache_hits, len(cached_working), len(uncached)), 'info') + unique_proxies = uncached + + if not unique_proxies: + # All proxies were cached, nothing to test + cycles += 1 + time.sleep(1) + continue + _log('testing %d unique proxies' % len(unique_proxies), 'info') # Phase 2: Test extracted proxies using worker thread pool @@ -804,6 +836,19 @@ def worker_main(config): break continue + # Populate proxy test cache from results + if proxy_cache is not None: + now = time.time() + working_addrs = set() + for r in working_results: + addr = '%s:%d' % (r['ip'], r['port']) + proxy_cache[addr] = (now, True, r) + working_addrs.add(addr) + # Cache failures for tested proxies that didn't succeed + for proxy_str in pending_states: + if proxy_str not in working_addrs: + proxy_cache[proxy_str] = (now, False, None) + proxies_working += len(working_results) # Report working proxies to master @@ -820,6 +865,16 @@ def worker_main(config): _log('reported %d working proxies (submitted %d)' % (len(working_results), processed), 'info') cycles += 1 + + # Periodic cache cleanup: evict expired entries every 10 cycles + if proxy_cache is not None and cycles % 10 == 0: + now = time.time() + expired = [k for k, v in proxy_cache.items() if (now - v[0]) >= cache_ttl] + if expired: + for k in expired: + del proxy_cache[k] + _log('cache cleanup: evicted %d expired, %d remaining' % (len(expired), len(proxy_cache)), 'info') + time.sleep(1) except KeyboardInterrupt: @@ -835,6 +890,8 @@ def worker_main(config): _log(' urls fetched: %d' % urls_fetched, 'info') _log(' proxies found: %d' % proxies_found, 'info') _log(' proxies working: %d' % proxies_working, 'info') + if proxy_cache is not None: + _log(' cache entries: %d' % len(proxy_cache), 'info') def main():