#!/usr/bin/env python2 # -*- coding: utf-8 -*- """Multi-engine proxy list scraper.""" import dbs import json import random import time import threading import urllib import mysqlite import proxywatchd from misc import _log from config import Config import fetch import engines import translations import os # State file for engine backoff persistence (in data directory for persistence) STATE_FILE = 'data/scraper_state.json' config = Config() # Default search terms (can be overridden by search_terms.txt) search_terms = ['free proxy list', 'socks5 proxy', 'http proxy'] # Load Searx instances if file exists searx_instances = [] if os.path.exists('searx.instances'): with open('searx.instances') as h: searx_instances = [line.strip() for line in h.readlines() if line.lower().startswith('http')] class EngineTracker(object): """Track multiple search engine instances with rate limiting.""" def __init__(self, engine_names, searx_urls, base_delay=30, max_delay=3600, state_file=None): self.base_delay = base_delay self.max_delay = max_delay self.failures = {} self.backoff_until = {} self.success_count = {} self.last_rate_log = 0 self.log_interval = 60 # seconds between rate-limit log messages self.state_file = state_file or STATE_FILE self._save_interval = 60 # seconds between saves self._last_save = 0 # Build list of (engine_instance, identifier) self.engines = [] for name in engine_names: name = name.strip().lower() if name == 'searx': for url in searx_urls: eng = engines.Searx(url) self.engines.append((eng, url)) elif name in engines.ENGINES: eng = engines.get_engine(name) self.engines.append((eng, name)) else: _log('unknown engine: %s' % name, 'warn') # Load persisted state self.load_state() def get_available(self): """Return engines not currently in backoff.""" now = time.time() available = [] for eng, ident in self.engines: if ident not in self.backoff_until or now >= self.backoff_until[ident]: available.append((eng, ident)) return available def mark_success(self, ident): """Reset failure count on success.""" self.failures[ident] = 0 self.success_count[ident] = self.success_count.get(ident, 0) + 1 if ident in self.backoff_until: del self.backoff_until[ident] self.save_state() def mark_failure(self, ident): """Increment failure count and set exponential backoff.""" count = self.failures.get(ident, 0) + 1 self.failures[ident] = count delay = min(self.base_delay * (2 ** (count - 1)), self.max_delay) self.backoff_until[ident] = time.time() + delay now = time.time() if (now - self.last_rate_log) >= self.log_interval: name = ident.split('/')[2] if '/' in ident else ident avail, in_backoff, total = self.get_status() _log('%d/%d engines in backoff (last: %s)' % (in_backoff, total, name), 'rate') self.last_rate_log = now self.save_state() return delay def get_status(self): """Return status summary.""" available = len(self.get_available()) in_backoff = len(self.engines) - available return available, in_backoff, len(self.engines) def get_stats(self): """Return detailed stats for API/dashboard.""" now = time.time() available = self.get_available() available_ids = set(ident for _, ident in available) engines_list = [] for eng, ident in self.engines: # Shorten identifier for display if '/' in ident: name = ident.split('/')[2] # extract domain from URL else: name = ident backoff_remaining = 0 if ident in self.backoff_until: backoff_remaining = max(0, int(self.backoff_until[ident] - now)) engines_list.append({ 'name': name, 'available': ident in available_ids, 'successes': self.success_count.get(ident, 0), 'failures': self.failures.get(ident, 0), 'backoff_remaining': backoff_remaining }) # Sort by success count descending engines_list.sort(key=lambda x: -x['successes']) return { 'available': len(available), 'in_backoff': len(self.engines) - len(available), 'total': len(self.engines), 'total_successes': sum(self.success_count.values()), 'engines': engines_list[:20] # Top 20 engines } def load_state(self): """Load persisted backoff state from JSON file.""" if not os.path.exists(self.state_file): return try: with open(self.state_file, 'r') as f: data = json.load(f) now = time.time() loaded_backoffs = 0 # Restore failures if 'failures' in data: self.failures = data['failures'] # Restore success counts if 'success_count' in data: self.success_count = data['success_count'] # Restore backoff_until (only if still in future) if 'backoff_until' in data: for ident, until in data['backoff_until'].items(): if until > now: self.backoff_until[ident] = until loaded_backoffs += 1 if loaded_backoffs > 0: _log('loaded %d active backoffs from state' % loaded_backoffs, 'info') except (IOError, ValueError) as e: _log('failed to load scraper state: %s' % str(e), 'warn') def save_state(self, force=False): """Save backoff state to JSON file. Args: force: If True, save immediately. Otherwise respect save interval. """ now = time.time() if not force and (now - self._last_save) < self._save_interval: return try: # Ensure directory exists state_dir = os.path.dirname(self.state_file) if state_dir and not os.path.exists(state_dir): os.makedirs(state_dir) data = { 'failures': self.failures, 'backoff_until': self.backoff_until, 'success_count': self.success_count, 'saved_at': now } # Atomic write tmp_file = self.state_file + '.tmp' with open(tmp_file, 'w') as f: json.dump(data, f, indent=2) os.rename(tmp_file, self.state_file) self._last_save = now except (IOError, OSError) as e: _log('failed to save scraper state: %s' % str(e), 'warn') engine_tracker = None def get_scraper_stats(): """Get scraper stats for API/dashboard.""" if engine_tracker is None: return None return engine_tracker.get_stats() def build_search_query(sqlite=None): """Build a search query using configured sources.""" search = '' # Search by working proxy if 'p' in config.scraper.query: proxydb = mysqlite.mysqlite(config.watchd.database, str) proxies = [i[0] for i in proxydb.execute( 'SELECT proxy FROM proxylist WHERE failed=0 ORDER BY RANDOM() LIMIT 10' ).fetchall()] if proxies and random.random() < 0.5: search = ' '.join(random.sample(proxies, random.randint(1, 2))) # Search by known website if ('w' in config.scraper.query and not search) or random.random() < 0.5: if sqlite is None: sqlite = mysqlite.mysqlite(config.ppf.database, str) uris = [i[0] for i in sqlite.execute( 'SELECT url FROM uris WHERE error=0 AND url NOT LIKE "%github%" ORDER BY RANDOM() LIMIT 10' ).fetchall()] if uris and random.random() < 0.5: if search: search = '%s OR ' % search search = search + 'site:%s' % random.choice(uris).split('/')[2] # Search by term (multi-lingual) if ('s' in config.scraper.query and not search) or random.random() < 0.5: if search: search = '%s OR ' % search # 70% chance of non-English term if random.random() < 0.7: term = translations.get_random_search_term() else: term = random.choice(search_terms) search = search + term return search def scrape_engine(engine, ident, query, urignore, sqlite): """Scrape a single engine for proxy list URLs.""" max_pages = config.scraper.max_pages consecutive_empty = 0 total_urls = 0 for page in range(max_pages): try: url = engine.build_url(query, page) if config.scraper.debug: _log('%s page %d: %s' % (engine.name, page, url), 'debug') content = fetch.fetch_contents(url) # Check for rate limiting if engine.is_rate_limited(content): engine_tracker.mark_failure(ident) return total_urls if not content: consecutive_empty += 1 if consecutive_empty >= config.scraper.fail_threshold: engine_tracker.mark_failure(ident) return total_urls continue # Extract URLs urls = engine.extract_urls(content, urignore) if not urls: # Empty results on first page likely means rate limited if page == 0: engine_tracker.mark_failure(ident) return total_urls # Success engine_tracker.mark_success(ident) consecutive_empty = 0 # Deduplicate and insert urls = list(set(urls)) source = '%s (page %d, query: %s)' % (engine.name, page, query[:50]) dbs.insert_urls(urls, source, sqlite) total_urls += len(urls) # Small delay between pages time.sleep(random.uniform(1.0, 3.0)) except Exception as e: engine_tracker.mark_failure(ident) return total_urls return total_urls def proxyfind(sqlite=None, urignore=None): """Find proxy list URLs using available search engines.""" global engine_tracker # Get available engines available = engine_tracker.get_available() if not available: avail, backoff, total = engine_tracker.get_status() _log('all %d engines in backoff, sleeping 60s' % total, 'rate') time.sleep(60) return # Build search query query = build_search_query(sqlite) if not query: return if config.scraper.debug: _log('query: %s' % query, 'debug') # Shuffle and pick engines random.shuffle(available) # Use 1-3 engines per round num_engines = min(len(available), random.randint(1, 3)) for engine, ident in available[:num_engines]: total = scrape_engine(engine, ident, query, urignore, sqlite) if total > 0: name = ident.split('/')[2] if '/' in ident else ident _log('%s: found %d URLs' % (name, total), 'scraper') # Delay between engines time.sleep(random.uniform(2.0, 5.0)) def load_urignore(): """Load URL ignore patterns.""" urignore = [] # Load from file if os.path.exists('urignore.txt'): with open('urignore.txt', 'r') as f: urignore = [i.strip() for i in f.read().split('\n') if i.strip()] # Add Searx instances to ignore (avoid loops) for i in searx_instances: urignore.append(i.split('/')[2]) # Add search engine domains to ignore ignore_domains = [ 'duckduckgo.com', 'startpage.com', 'mojeek.com', 'qwant.com', 'yandex.com', 'yandex.ru', 'ecosia.org', 'brave.com', 'google.com', 'bing.com', 'yahoo.com', ] for domain in ignore_domains: urignore.append(domain) return urignore class Scraper(threading.Thread): """Background thread for search engine scraping.""" def __init__(self, cfg): self.cfg = cfg self.running = False self.urignore = load_urignore() threading.Thread.__init__(self) self.daemon = True def init_tracker(self): """Initialize engine tracker with configured engines.""" global engine_tracker enabled_engines = [e.strip() for e in self.cfg.scraper.engines.split(',')] engine_tracker = EngineTracker( enabled_engines, searx_instances, base_delay=self.cfg.scraper.backoff_base, max_delay=self.cfg.scraper.backoff_max ) avail, backoff, total = engine_tracker.get_status() _log('scraper: %d engine(s) (%s)' % (total, ', '.join(enabled_engines)), 'info') def stop(self): """Signal the scraper to stop.""" self.running = False def run(self): """Main scraper loop.""" global config config = self.cfg fetch.set_config(self.cfg) translations.set_config(self.cfg) self.init_tracker() self.running = True # Create thread-local database connection urldb = mysqlite.mysqlite(self.cfg.ppf.database, str) while self.running: try: proxyfind(urldb, self.urignore) time.sleep(random.uniform(5.0, 15.0)) except Exception as e: try: err_msg = repr(e) if isinstance(err_msg, unicode): err_msg = err_msg.encode('ascii', 'backslashreplace') except: err_msg = type(e).__name__ _log('scraper error: %s' % err_msg, 'error') time.sleep(30) urldb.close() engine_tracker.save_state(force=True) avail, backoff, total = engine_tracker.get_status() _log('scraper stopped (%d/%d engines available)' % (avail, total), 'info') if __name__ == '__main__': config.load() errors = config.validate() if errors: for e in errors: _log(e, 'error') import sys sys.exit(1) fetch.set_config(config) translations.set_config(config) proxydb = mysqlite.mysqlite(config.watchd.database, str) dbs.create_table_if_not_exists(proxydb, 'proxylist') urldb = mysqlite.mysqlite(config.ppf.database, str) dbs.create_table_if_not_exists(urldb, 'uris') # Load search terms from file if exists if os.path.exists('search_terms.txt'): with open('search_terms.txt', 'r') as f: terms = [i.strip() for i in f.read().split('\n') if i.strip()] if terms: search_terms = terms urignore = load_urignore() # Parse enabled engines from config enabled_engines = [e.strip() for e in config.scraper.engines.split(',')] # Initialize engine tracker engine_tracker = EngineTracker( enabled_engines, searx_instances, base_delay=config.scraper.backoff_base, max_delay=config.scraper.backoff_max ) avail, backoff, total = engine_tracker.get_status() _log('loaded %d engine instances (%s)' % (total, ', '.join(enabled_engines)), 'info') try: while True: proxyfind(urldb, urignore) # Small delay between rounds time.sleep(random.uniform(5.0, 15.0)) except KeyboardInterrupt: engine_tracker.save_state(force=True) avail, backoff, total = engine_tracker.get_status() _log('scraper stopped (engines: %d/%d available)' % (avail, total), 'info')