scraper: add multi-engine support and tracking
This commit is contained in:
166
scraper.py
166
scraper.py
@@ -3,8 +3,10 @@
|
||||
"""Multi-engine proxy list scraper."""
|
||||
|
||||
import dbs
|
||||
import json
|
||||
import random
|
||||
import time
|
||||
import threading
|
||||
import urllib
|
||||
import mysqlite
|
||||
import proxywatchd
|
||||
@@ -15,8 +17,14 @@ import engines
|
||||
import translations
|
||||
import os
|
||||
|
||||
# State file for engine backoff persistence (in data directory for persistence)
|
||||
STATE_FILE = 'data/scraper_state.json'
|
||||
|
||||
config = Config()
|
||||
|
||||
# Default search terms (can be overridden by search_terms.txt)
|
||||
search_terms = ['free proxy list', 'socks5 proxy', 'http proxy']
|
||||
|
||||
# Load Searx instances if file exists
|
||||
searx_instances = []
|
||||
if os.path.exists('searx.instances'):
|
||||
@@ -28,12 +36,18 @@ if os.path.exists('searx.instances'):
|
||||
class EngineTracker(object):
|
||||
"""Track multiple search engine instances with rate limiting."""
|
||||
|
||||
def __init__(self, engine_names, searx_urls, base_delay=30, max_delay=3600):
|
||||
def __init__(self, engine_names, searx_urls, base_delay=30, max_delay=3600,
|
||||
state_file=None):
|
||||
self.base_delay = base_delay
|
||||
self.max_delay = max_delay
|
||||
self.failures = {}
|
||||
self.backoff_until = {}
|
||||
self.success_count = {}
|
||||
self.last_rate_log = 0
|
||||
self.log_interval = 60 # seconds between rate-limit log messages
|
||||
self.state_file = state_file or STATE_FILE
|
||||
self._save_interval = 60 # seconds between saves
|
||||
self._last_save = 0
|
||||
|
||||
# Build list of (engine_instance, identifier)
|
||||
self.engines = []
|
||||
@@ -49,6 +63,9 @@ class EngineTracker(object):
|
||||
else:
|
||||
_log('unknown engine: %s' % name, 'warn')
|
||||
|
||||
# Load persisted state
|
||||
self.load_state()
|
||||
|
||||
def get_available(self):
|
||||
"""Return engines not currently in backoff."""
|
||||
now = time.time()
|
||||
@@ -64,6 +81,7 @@ class EngineTracker(object):
|
||||
self.success_count[ident] = self.success_count.get(ident, 0) + 1
|
||||
if ident in self.backoff_until:
|
||||
del self.backoff_until[ident]
|
||||
self.save_state()
|
||||
|
||||
def mark_failure(self, ident):
|
||||
"""Increment failure count and set exponential backoff."""
|
||||
@@ -71,8 +89,13 @@ class EngineTracker(object):
|
||||
self.failures[ident] = count
|
||||
delay = min(self.base_delay * (2 ** (count - 1)), self.max_delay)
|
||||
self.backoff_until[ident] = time.time() + delay
|
||||
name = ident.split('/')[2] if '/' in ident else ident
|
||||
_log('%s: backoff %ds (failures: %d)' % (name, delay, count), 'rate')
|
||||
now = time.time()
|
||||
if (now - self.last_rate_log) >= self.log_interval:
|
||||
name = ident.split('/')[2] if '/' in ident else ident
|
||||
avail, in_backoff, total = self.get_status()
|
||||
_log('%d/%d engines in backoff (last: %s)' % (in_backoff, total, name), 'rate')
|
||||
self.last_rate_log = now
|
||||
self.save_state()
|
||||
return delay
|
||||
|
||||
def get_status(self):
|
||||
@@ -81,6 +104,72 @@ class EngineTracker(object):
|
||||
in_backoff = len(self.engines) - available
|
||||
return available, in_backoff, len(self.engines)
|
||||
|
||||
def load_state(self):
|
||||
"""Load persisted backoff state from JSON file."""
|
||||
if not os.path.exists(self.state_file):
|
||||
return
|
||||
|
||||
try:
|
||||
with open(self.state_file, 'r') as f:
|
||||
data = json.load(f)
|
||||
|
||||
now = time.time()
|
||||
loaded_backoffs = 0
|
||||
|
||||
# Restore failures
|
||||
if 'failures' in data:
|
||||
self.failures = data['failures']
|
||||
|
||||
# Restore success counts
|
||||
if 'success_count' in data:
|
||||
self.success_count = data['success_count']
|
||||
|
||||
# Restore backoff_until (only if still in future)
|
||||
if 'backoff_until' in data:
|
||||
for ident, until in data['backoff_until'].items():
|
||||
if until > now:
|
||||
self.backoff_until[ident] = until
|
||||
loaded_backoffs += 1
|
||||
|
||||
if loaded_backoffs > 0:
|
||||
_log('loaded %d active backoffs from state' % loaded_backoffs, 'info')
|
||||
|
||||
except (IOError, ValueError) as e:
|
||||
_log('failed to load scraper state: %s' % str(e), 'warn')
|
||||
|
||||
def save_state(self, force=False):
|
||||
"""Save backoff state to JSON file.
|
||||
|
||||
Args:
|
||||
force: If True, save immediately. Otherwise respect save interval.
|
||||
"""
|
||||
now = time.time()
|
||||
if not force and (now - self._last_save) < self._save_interval:
|
||||
return
|
||||
|
||||
try:
|
||||
# Ensure directory exists
|
||||
state_dir = os.path.dirname(self.state_file)
|
||||
if state_dir and not os.path.exists(state_dir):
|
||||
os.makedirs(state_dir)
|
||||
|
||||
data = {
|
||||
'failures': self.failures,
|
||||
'backoff_until': self.backoff_until,
|
||||
'success_count': self.success_count,
|
||||
'saved_at': now
|
||||
}
|
||||
|
||||
# Atomic write
|
||||
tmp_file = self.state_file + '.tmp'
|
||||
with open(tmp_file, 'w') as f:
|
||||
json.dump(data, f, indent=2)
|
||||
os.rename(tmp_file, self.state_file)
|
||||
self._last_save = now
|
||||
|
||||
except (IOError, OSError) as e:
|
||||
_log('failed to save scraper state: %s' % str(e), 'warn')
|
||||
|
||||
|
||||
engine_tracker = None
|
||||
|
||||
@@ -141,7 +230,6 @@ def scrape_engine(engine, ident, query, urignore, sqlite):
|
||||
|
||||
# Check for rate limiting
|
||||
if engine.is_rate_limited(content):
|
||||
_log('%s: rate limited' % engine.name, 'rate')
|
||||
engine_tracker.mark_failure(ident)
|
||||
return total_urls
|
||||
|
||||
@@ -175,8 +263,6 @@ def scrape_engine(engine, ident, query, urignore, sqlite):
|
||||
time.sleep(random.uniform(1.0, 3.0))
|
||||
|
||||
except Exception as e:
|
||||
name = ident.split('/')[2] if '/' in ident else ident
|
||||
_log('%s: error: %s' % (name, str(e)), 'error')
|
||||
engine_tracker.mark_failure(ident)
|
||||
return total_urls
|
||||
|
||||
@@ -244,6 +330,66 @@ def load_urignore():
|
||||
return urignore
|
||||
|
||||
|
||||
class Scraper(threading.Thread):
|
||||
"""Background thread for search engine scraping."""
|
||||
|
||||
def __init__(self, cfg):
|
||||
self.cfg = cfg
|
||||
self.running = False
|
||||
self.urignore = load_urignore()
|
||||
threading.Thread.__init__(self)
|
||||
self.daemon = True
|
||||
|
||||
def init_tracker(self):
|
||||
"""Initialize engine tracker with configured engines."""
|
||||
global engine_tracker
|
||||
enabled_engines = [e.strip() for e in self.cfg.scraper.engines.split(',')]
|
||||
engine_tracker = EngineTracker(
|
||||
enabled_engines,
|
||||
searx_instances,
|
||||
base_delay=self.cfg.scraper.backoff_base,
|
||||
max_delay=self.cfg.scraper.backoff_max
|
||||
)
|
||||
avail, backoff, total = engine_tracker.get_status()
|
||||
_log('scraper: %d engine(s) (%s)' % (total, ', '.join(enabled_engines)), 'info')
|
||||
|
||||
def stop(self):
|
||||
"""Signal the scraper to stop."""
|
||||
self.running = False
|
||||
|
||||
def run(self):
|
||||
"""Main scraper loop."""
|
||||
global config
|
||||
config = self.cfg
|
||||
fetch.set_config(self.cfg)
|
||||
translations.set_config(self.cfg)
|
||||
|
||||
self.init_tracker()
|
||||
self.running = True
|
||||
|
||||
# Create thread-local database connection
|
||||
urldb = mysqlite.mysqlite(self.cfg.ppf.database, str)
|
||||
|
||||
while self.running:
|
||||
try:
|
||||
proxyfind(urldb, self.urignore)
|
||||
time.sleep(random.uniform(5.0, 15.0))
|
||||
except Exception as e:
|
||||
try:
|
||||
err_msg = repr(e)
|
||||
if isinstance(err_msg, unicode):
|
||||
err_msg = err_msg.encode('ascii', 'backslashreplace')
|
||||
except:
|
||||
err_msg = type(e).__name__
|
||||
_log('scraper error: %s' % err_msg, 'error')
|
||||
time.sleep(30)
|
||||
|
||||
urldb.close()
|
||||
engine_tracker.save_state(force=True)
|
||||
avail, backoff, total = engine_tracker.get_status()
|
||||
_log('scraper stopped (%d/%d engines available)' % (avail, total), 'info')
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
config.load()
|
||||
errors = config.validate()
|
||||
@@ -261,11 +407,12 @@ if __name__ == '__main__':
|
||||
urldb = mysqlite.mysqlite(config.ppf.database, str)
|
||||
dbs.create_table_if_not_exists(urldb, 'uris')
|
||||
|
||||
# Load search terms
|
||||
search_terms = ['free proxy list', 'socks5 proxy', 'http proxy']
|
||||
# Load search terms from file if exists
|
||||
if os.path.exists('search_terms.txt'):
|
||||
with open('search_terms.txt', 'r') as f:
|
||||
search_terms = [i.strip() for i in f.read().split('\n') if i.strip()]
|
||||
terms = [i.strip() for i in f.read().split('\n') if i.strip()]
|
||||
if terms:
|
||||
search_terms = terms
|
||||
|
||||
urignore = load_urignore()
|
||||
|
||||
@@ -289,5 +436,6 @@ if __name__ == '__main__':
|
||||
# Small delay between rounds
|
||||
time.sleep(random.uniform(5.0, 15.0))
|
||||
except KeyboardInterrupt:
|
||||
engine_tracker.save_state(force=True)
|
||||
avail, backoff, total = engine_tracker.get_status()
|
||||
_log('scraper stopped (engines: %d/%d available)' % (avail, total), 'info')
|
||||
|
||||
Reference in New Issue
Block a user