worker: add local proxy test cache
Skip redundant proxy tests across URL batches using a memory-only TTL cache (default 300s, configurable via worker.cache_ttl).
This commit is contained in:
@@ -170,6 +170,7 @@ class Config(ComboParser):
|
||||
self.add_item(section, 'heartbeat', int, 60, 'heartbeat interval in seconds (default: 60)', False)
|
||||
self.add_item(section, 'url_batch_size', int, 5, 'URLs per claim cycle (default: 5)', False)
|
||||
self.add_item(section, 'fetch_timeout', int, 30, 'timeout for URL fetching (default: 30)', False)
|
||||
self.add_item(section, 'cache_ttl', int, 300, 'local proxy test cache TTL in seconds, 0 to disable (default: 300)', False)
|
||||
|
||||
self.aparser.add_argument("--file", help="import a single file containing proxy addrs", type=str, default='', required=False)
|
||||
self.aparser.add_argument("--nobs", help="disable BeautifulSoup, use stdlib HTMLParser", action='store_true', default=False)
|
||||
|
||||
57
ppf.py
57
ppf.py
@@ -474,6 +474,7 @@ def worker_main(config):
|
||||
_log(' server: %s' % server_url, 'info')
|
||||
_log(' threads: %d' % num_threads, 'info')
|
||||
_log(' url batch: %d' % url_batch_size, 'info')
|
||||
_log(' cache ttl: %s' % ('%ds' % config.worker.cache_ttl if config.worker.cache_ttl > 0 else 'disabled'), 'info')
|
||||
_log(' tor hosts: %s' % config.common.tor_hosts, 'info')
|
||||
|
||||
# Verify Tor connectivity before starting
|
||||
@@ -532,6 +533,10 @@ def worker_main(config):
|
||||
worker_profiling = config.args.profile or config.common.profiling
|
||||
wstate = {'worker_key': worker_key, 'worker_id': worker_id, 'backoff': 10}
|
||||
|
||||
# Local proxy test cache: addr -> (timestamp, success, result_dict_or_None)
|
||||
cache_ttl = config.worker.cache_ttl
|
||||
proxy_cache = {} if cache_ttl > 0 else None
|
||||
|
||||
def do_register():
|
||||
"""Register with master, with exponential backoff on failure."""
|
||||
while True:
|
||||
@@ -710,6 +715,33 @@ def worker_main(config):
|
||||
time.sleep(1)
|
||||
continue
|
||||
|
||||
# Filter against local test cache
|
||||
cached_working = []
|
||||
if proxy_cache is not None:
|
||||
now = time.time()
|
||||
uncached = []
|
||||
cache_hits = 0
|
||||
for addr, pr, conf in unique_proxies:
|
||||
# Normalize to ip:port for cache lookup (strip auth prefix)
|
||||
cache_key = addr.split('@')[-1] if '@' in addr else addr
|
||||
entry = proxy_cache.get(cache_key)
|
||||
if entry and (now - entry[0]) < cache_ttl:
|
||||
cache_hits += 1
|
||||
if entry[1]: # cached success
|
||||
cached_working.append(entry[2])
|
||||
else:
|
||||
uncached.append((addr, pr, conf))
|
||||
if cache_hits:
|
||||
_log('%d cached (%d working), %d to test' % (
|
||||
cache_hits, len(cached_working), len(uncached)), 'info')
|
||||
unique_proxies = uncached
|
||||
|
||||
if not unique_proxies:
|
||||
# All proxies were cached, nothing to test
|
||||
cycles += 1
|
||||
time.sleep(1)
|
||||
continue
|
||||
|
||||
_log('testing %d unique proxies' % len(unique_proxies), 'info')
|
||||
|
||||
# Phase 2: Test extracted proxies using worker thread pool
|
||||
@@ -804,6 +836,19 @@ def worker_main(config):
|
||||
break
|
||||
continue
|
||||
|
||||
# Populate proxy test cache from results
|
||||
if proxy_cache is not None:
|
||||
now = time.time()
|
||||
working_addrs = set()
|
||||
for r in working_results:
|
||||
addr = '%s:%d' % (r['ip'], r['port'])
|
||||
proxy_cache[addr] = (now, True, r)
|
||||
working_addrs.add(addr)
|
||||
# Cache failures for tested proxies that didn't succeed
|
||||
for proxy_str in pending_states:
|
||||
if proxy_str not in working_addrs:
|
||||
proxy_cache[proxy_str] = (now, False, None)
|
||||
|
||||
proxies_working += len(working_results)
|
||||
|
||||
# Report working proxies to master
|
||||
@@ -820,6 +865,16 @@ def worker_main(config):
|
||||
_log('reported %d working proxies (submitted %d)' % (len(working_results), processed), 'info')
|
||||
|
||||
cycles += 1
|
||||
|
||||
# Periodic cache cleanup: evict expired entries every 10 cycles
|
||||
if proxy_cache is not None and cycles % 10 == 0:
|
||||
now = time.time()
|
||||
expired = [k for k, v in proxy_cache.items() if (now - v[0]) >= cache_ttl]
|
||||
if expired:
|
||||
for k in expired:
|
||||
del proxy_cache[k]
|
||||
_log('cache cleanup: evicted %d expired, %d remaining' % (len(expired), len(proxy_cache)), 'info')
|
||||
|
||||
time.sleep(1)
|
||||
|
||||
except KeyboardInterrupt:
|
||||
@@ -835,6 +890,8 @@ def worker_main(config):
|
||||
_log(' urls fetched: %d' % urls_fetched, 'info')
|
||||
_log(' proxies found: %d' % proxies_found, 'info')
|
||||
_log(' proxies working: %d' % proxies_working, 'info')
|
||||
if proxy_cache is not None:
|
||||
_log(' cache entries: %d' % len(proxy_cache), 'info')
|
||||
|
||||
|
||||
def main():
|
||||
|
||||
Reference in New Issue
Block a user