From b88aa2a878f3aa767c409db580d206267dfd3e4f Mon Sep 17 00:00:00 2001 From: Username Date: Sun, 21 Dec 2025 23:37:48 +0100 Subject: [PATCH] scraper: add multi-engine support and tracking --- scraper.py | 166 ++++++++++++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 157 insertions(+), 9 deletions(-) diff --git a/scraper.py b/scraper.py index 19e82c3..f6ddedf 100755 --- a/scraper.py +++ b/scraper.py @@ -3,8 +3,10 @@ """Multi-engine proxy list scraper.""" import dbs +import json import random import time +import threading import urllib import mysqlite import proxywatchd @@ -15,8 +17,14 @@ import engines import translations import os +# State file for engine backoff persistence (in data directory for persistence) +STATE_FILE = 'data/scraper_state.json' + config = Config() +# Default search terms (can be overridden by search_terms.txt) +search_terms = ['free proxy list', 'socks5 proxy', 'http proxy'] + # Load Searx instances if file exists searx_instances = [] if os.path.exists('searx.instances'): @@ -28,12 +36,18 @@ if os.path.exists('searx.instances'): class EngineTracker(object): """Track multiple search engine instances with rate limiting.""" - def __init__(self, engine_names, searx_urls, base_delay=30, max_delay=3600): + def __init__(self, engine_names, searx_urls, base_delay=30, max_delay=3600, + state_file=None): self.base_delay = base_delay self.max_delay = max_delay self.failures = {} self.backoff_until = {} self.success_count = {} + self.last_rate_log = 0 + self.log_interval = 60 # seconds between rate-limit log messages + self.state_file = state_file or STATE_FILE + self._save_interval = 60 # seconds between saves + self._last_save = 0 # Build list of (engine_instance, identifier) self.engines = [] @@ -49,6 +63,9 @@ class EngineTracker(object): else: _log('unknown engine: %s' % name, 'warn') + # Load persisted state + self.load_state() + def get_available(self): """Return engines not currently in backoff.""" now = time.time() @@ -64,6 +81,7 @@ class EngineTracker(object): self.success_count[ident] = self.success_count.get(ident, 0) + 1 if ident in self.backoff_until: del self.backoff_until[ident] + self.save_state() def mark_failure(self, ident): """Increment failure count and set exponential backoff.""" @@ -71,8 +89,13 @@ class EngineTracker(object): self.failures[ident] = count delay = min(self.base_delay * (2 ** (count - 1)), self.max_delay) self.backoff_until[ident] = time.time() + delay - name = ident.split('/')[2] if '/' in ident else ident - _log('%s: backoff %ds (failures: %d)' % (name, delay, count), 'rate') + now = time.time() + if (now - self.last_rate_log) >= self.log_interval: + name = ident.split('/')[2] if '/' in ident else ident + avail, in_backoff, total = self.get_status() + _log('%d/%d engines in backoff (last: %s)' % (in_backoff, total, name), 'rate') + self.last_rate_log = now + self.save_state() return delay def get_status(self): @@ -81,6 +104,72 @@ class EngineTracker(object): in_backoff = len(self.engines) - available return available, in_backoff, len(self.engines) + def load_state(self): + """Load persisted backoff state from JSON file.""" + if not os.path.exists(self.state_file): + return + + try: + with open(self.state_file, 'r') as f: + data = json.load(f) + + now = time.time() + loaded_backoffs = 0 + + # Restore failures + if 'failures' in data: + self.failures = data['failures'] + + # Restore success counts + if 'success_count' in data: + self.success_count = data['success_count'] + + # Restore backoff_until (only if still in future) + if 'backoff_until' in data: + for ident, until in data['backoff_until'].items(): + if until > now: + self.backoff_until[ident] = until + loaded_backoffs += 1 + + if loaded_backoffs > 0: + _log('loaded %d active backoffs from state' % loaded_backoffs, 'info') + + except (IOError, ValueError) as e: + _log('failed to load scraper state: %s' % str(e), 'warn') + + def save_state(self, force=False): + """Save backoff state to JSON file. + + Args: + force: If True, save immediately. Otherwise respect save interval. + """ + now = time.time() + if not force and (now - self._last_save) < self._save_interval: + return + + try: + # Ensure directory exists + state_dir = os.path.dirname(self.state_file) + if state_dir and not os.path.exists(state_dir): + os.makedirs(state_dir) + + data = { + 'failures': self.failures, + 'backoff_until': self.backoff_until, + 'success_count': self.success_count, + 'saved_at': now + } + + # Atomic write + tmp_file = self.state_file + '.tmp' + with open(tmp_file, 'w') as f: + json.dump(data, f, indent=2) + os.rename(tmp_file, self.state_file) + self._last_save = now + + except (IOError, OSError) as e: + _log('failed to save scraper state: %s' % str(e), 'warn') + engine_tracker = None @@ -141,7 +230,6 @@ def scrape_engine(engine, ident, query, urignore, sqlite): # Check for rate limiting if engine.is_rate_limited(content): - _log('%s: rate limited' % engine.name, 'rate') engine_tracker.mark_failure(ident) return total_urls @@ -175,8 +263,6 @@ def scrape_engine(engine, ident, query, urignore, sqlite): time.sleep(random.uniform(1.0, 3.0)) except Exception as e: - name = ident.split('/')[2] if '/' in ident else ident - _log('%s: error: %s' % (name, str(e)), 'error') engine_tracker.mark_failure(ident) return total_urls @@ -244,6 +330,66 @@ def load_urignore(): return urignore +class Scraper(threading.Thread): + """Background thread for search engine scraping.""" + + def __init__(self, cfg): + self.cfg = cfg + self.running = False + self.urignore = load_urignore() + threading.Thread.__init__(self) + self.daemon = True + + def init_tracker(self): + """Initialize engine tracker with configured engines.""" + global engine_tracker + enabled_engines = [e.strip() for e in self.cfg.scraper.engines.split(',')] + engine_tracker = EngineTracker( + enabled_engines, + searx_instances, + base_delay=self.cfg.scraper.backoff_base, + max_delay=self.cfg.scraper.backoff_max + ) + avail, backoff, total = engine_tracker.get_status() + _log('scraper: %d engine(s) (%s)' % (total, ', '.join(enabled_engines)), 'info') + + def stop(self): + """Signal the scraper to stop.""" + self.running = False + + def run(self): + """Main scraper loop.""" + global config + config = self.cfg + fetch.set_config(self.cfg) + translations.set_config(self.cfg) + + self.init_tracker() + self.running = True + + # Create thread-local database connection + urldb = mysqlite.mysqlite(self.cfg.ppf.database, str) + + while self.running: + try: + proxyfind(urldb, self.urignore) + time.sleep(random.uniform(5.0, 15.0)) + except Exception as e: + try: + err_msg = repr(e) + if isinstance(err_msg, unicode): + err_msg = err_msg.encode('ascii', 'backslashreplace') + except: + err_msg = type(e).__name__ + _log('scraper error: %s' % err_msg, 'error') + time.sleep(30) + + urldb.close() + engine_tracker.save_state(force=True) + avail, backoff, total = engine_tracker.get_status() + _log('scraper stopped (%d/%d engines available)' % (avail, total), 'info') + + if __name__ == '__main__': config.load() errors = config.validate() @@ -261,11 +407,12 @@ if __name__ == '__main__': urldb = mysqlite.mysqlite(config.ppf.database, str) dbs.create_table_if_not_exists(urldb, 'uris') - # Load search terms - search_terms = ['free proxy list', 'socks5 proxy', 'http proxy'] + # Load search terms from file if exists if os.path.exists('search_terms.txt'): with open('search_terms.txt', 'r') as f: - search_terms = [i.strip() for i in f.read().split('\n') if i.strip()] + terms = [i.strip() for i in f.read().split('\n') if i.strip()] + if terms: + search_terms = terms urignore = load_urignore() @@ -289,5 +436,6 @@ if __name__ == '__main__': # Small delay between rounds time.sleep(random.uniform(5.0, 15.0)) except KeyboardInterrupt: + engine_tracker.save_state(force=True) avail, backoff, total = engine_tracker.get_status() _log('scraper stopped (engines: %d/%d available)' % (avail, total), 'info')