diff --git a/config.py b/config.py index ae5ab61..110942b 100644 --- a/config.py +++ b/config.py @@ -6,6 +6,8 @@ class Config(ComboParser): def load(self): super(Config, self).load() self.torhosts = [ str(i).strip() for i in self.common.tor_hosts.split(',') ] + # threads config = per-host value, multiply by Tor host count + self.watchd.threads = self.watchd.threads * len(self.torhosts) #with open('servers.txt', 'r') as handle: with open(self.watchd.source_file, 'r') as handle: self.servers = [x.strip() for x in handle.readlines() if len(x.strip()) > 0] @@ -39,6 +41,8 @@ class Config(ComboParser): errors.append('watchd.threads must be >= 1') if self.ppf.threads < 1: errors.append('ppf.threads must be >= 1') + if self.scraper.threads < 1: + errors.append('scraper.threads must be >= 1') # Validate max_fail if self.watchd.max_fail < 1: @@ -47,7 +51,7 @@ class Config(ComboParser): errors.append('ppf.max_fail must be >= 1') # Validate checktype - valid_checktypes = {'irc', 'http', 'judges', 'ssl'} + valid_checktypes = {'irc', 'head', 'judges', 'ssl'} if self.watchd.checktype not in valid_checktypes: errors.append('watchd.checktype must be one of: %s' % ', '.join(sorted(valid_checktypes))) @@ -81,6 +85,7 @@ class Config(ComboParser): self.add_item(section, 'tor_hosts', str, '127.0.0.1:9050', 'comma-separated list of tor proxy address(es)', True) self.add_item(section, 'timeout_connect', int, 10, 'connection timeout in seconds (default: 10)', False) self.add_item(section, 'timeout_read', int, 15, 'read timeout in seconds (default: 15)', False) + self.add_item(section, 'profiling', bool, False, 'enable cProfile profiling (default: False)', False) section = 'watchd' self.add_item(section, 'outage_threshold', float, 4.0, 'mininum success percentage required to not drop check results', False) @@ -90,7 +95,7 @@ class Config(ComboParser): self.add_item(section, 'timeout', int, 15, 'timeout for blocking operations (connect/recv/...) for proxy checks in seconds', False) self.add_item(section, 'submit_after', int, 200, 'min. number of tested proxies for DB write', False) self.add_item(section, 'debug', bool, False, 'whether to print additional debug info', False) - self.add_item(section, 'use_ssl', int, 0, 'whether to use SSL and port 6697 to connect to targets (slower)', False) + self.add_item(section, 'use_ssl', int, 1, 'whether to use SSL (1=always, 0=never, 2=random)', False) self.add_item(section, 'checktime', int, 1800, 'base checking interval for proxies in db in seconds', False) self.add_item(section, 'perfail_checktime', int, 3600, 'additional checking interval for proxies in db in seconds per experienced failure', False) self.add_item(section, 'database', str, 'proxies.sqlite', 'filename of database', True) @@ -101,7 +106,7 @@ class Config(ComboParser): self.add_item(section, 'stale_days', int, 30, 'days after which dead proxies are removed (default: 30)', False) self.add_item(section, 'stats_interval', int, 300, 'seconds between status reports (default: 300)', False) self.add_item(section, 'tor_safeguard', bool, True, 'enable tor safeguard (default: True)', False) - self.add_item(section, 'checktype', str, 'ssl', 'check type: irc, http, judges, or ssl', False) + self.add_item(section, 'checktype', str, 'ssl', 'check type: irc, head, judges, or ssl', False) section = 'httpd' self.add_item(section, 'listenip', str, '127.0.0.1', 'address for the httpd to listen to (default: 127.0.0.1)', True) @@ -122,6 +127,7 @@ class Config(ComboParser): section = 'scraper' self.add_item(section, 'enabled', bool, True, 'enable search engine scraper (default: True)', False) + self.add_item(section, 'threads', int, 3, 'number of scraper threads (default: 3)', False) self.add_item(section, 'debug', bool, False, 'scraper: whether to print additional debug info', False) self.add_item(section, 'query', str, 'psw', 'build query using Proxies, Search, Websites', False) self.add_item(section, 'backoff_base', int, 30, 'base backoff delay in seconds (default: 30)', False) diff --git a/dbs.py b/dbs.py index 9ef6ff7..b4878d4 100644 --- a/dbs.py +++ b/dbs.py @@ -46,6 +46,16 @@ def _migrate_content_hash_column(sqlite): sqlite.commit() +def _migrate_geolocation_columns(sqlite): + """Add latitude/longitude columns for precise proxy mapping.""" + try: + sqlite.execute('SELECT latitude FROM proxylist LIMIT 1') + except Exception: + sqlite.execute('ALTER TABLE proxylist ADD COLUMN latitude REAL') + sqlite.execute('ALTER TABLE proxylist ADD COLUMN longitude REAL') + sqlite.commit() + + def compute_proxy_list_hash(proxies): """Compute MD5 hash of sorted proxy list for change detection. @@ -267,11 +277,14 @@ def create_table_if_not_exists(sqlite, dbname): latency_samples INT DEFAULT 0, anonymity TEXT, exit_ip TEXT, - asn INT)""") + asn INT, + latitude REAL, + longitude REAL)""") # Migration: add columns to existing databases (must run before creating indexes) _migrate_latency_columns(sqlite) _migrate_anonymity_columns(sqlite) _migrate_asn_column(sqlite) + _migrate_geolocation_columns(sqlite) # Indexes for common query patterns sqlite.execute('CREATE INDEX IF NOT EXISTS idx_proxylist_failed ON proxylist(failed)') sqlite.execute('CREATE INDEX IF NOT EXISTS idx_proxylist_tested ON proxylist(tested)') @@ -360,7 +373,7 @@ def insert_proxies(proxydb, proxies, url): else: addr, proto = p, None ip, port = addr.split(':') - rows.append((timestamp, addr, ip, port, proto, 3, 0, 0, 0, 0, 0)) + rows.append((timestamp, addr, ip, port, proto, 1, 0, 0, 0, 0, 0)) proxydb.executemany( 'INSERT OR IGNORE INTO proxylist ' '(added,proxy,ip,port,proto,failed,tested,success_count,total_duration,mitm,consecutive_success) ' diff --git a/fetch.py b/fetch.py index 97cf0d8..aa30a0e 100644 --- a/fetch.py +++ b/fetch.py @@ -1,5 +1,7 @@ import re, random, time +import threading import rocksock +import network_stats from http2 import RsHttp, _parse_url from soup_parser import soupify from misc import _log @@ -43,6 +45,7 @@ def fetch_contents(url, head=False, proxy=None): retry_messages = ('Engines cannot retrieve results', 'Rate limit exceeded') def _fetch_contents(url, head = False, proxy=None): + network_stats.set_category('scraper') host, port, ssl, uri = _parse_url(url) headers=[ 'Accept-Language: en-US,en;q=0.8', @@ -52,47 +55,54 @@ def _fetch_contents(url, head = False, proxy=None): _log("connecting to %s... (header: %s)" % (url, str(head)), "debug") tor_retries = 0 max_tor_retries = 1 - while True: - proxies = [rocksock.RocksockProxyFromURL('socks4://%s' % random.choice( config.torhosts ))] - if proxy: proxies.append( rocksock.RocksockProxyFromURL(proxy)) + http = None + try: + while True: + proxies = [rocksock.RocksockProxyFromURL('socks4://%s' % random.choice( config.torhosts ))] + if proxy: proxies.append( rocksock.RocksockProxyFromURL(proxy)) - http = RsHttp(host,ssl=ssl,port=port, keep_alive=True, timeout=config.ppf.timeout, max_tries=config.ppf.http_retries, follow_redirects=True, auto_set_cookies=True, proxies=proxies, user_agent='Mozilla/5.0 (Windows NT 6.1; rv:60.0) Gecko/20100101 Firefox/60.0', log_errors=False) - if not http.connect(): - global _last_fail_log - now = time.time() - if (now - _last_fail_log) >= _fail_log_interval: - _log("failed to connect to %s"%url, "ppf") - _last_fail_log = now - e = http.get_last_rocksock_exception() - if not e: - return None - et = e.get_errortype() - ee = e.get_error() - ef = e.get_failedproxy() - if et == rocksock.RS_ET_OWN and \ - ee == rocksock.RS_E_TARGET_CONN_REFUSED \ - and ef == 0: - tor_retries += 1 - if tor_retries >= max_tor_retries: - _log("tor proxy failed after %d retries" % tor_retries, "error") + http = RsHttp(host,ssl=ssl,port=port, keep_alive=True, timeout=config.ppf.timeout, max_tries=config.ppf.http_retries, follow_redirects=True, auto_set_cookies=True, proxies=proxies, user_agent='Mozilla/5.0 (Windows NT 6.1; rv:60.0) Gecko/20100101 Firefox/60.0', log_errors=False) + if not http.connect(): + global _last_fail_log + now = time.time() + if (now - _last_fail_log) >= _fail_log_interval: + _log("failed to connect to %s"%url, "ppf") + _last_fail_log = now + e = http.get_last_rocksock_exception() + if not e: return None - _log("tor proxy retry %d/%d" % (tor_retries, max_tor_retries), "warn") - time.sleep(5) - continue - return None - break + et = e.get_errortype() + ee = e.get_error() + ef = e.get_failedproxy() + if et == rocksock.RS_ET_OWN and \ + ee == rocksock.RS_E_TARGET_CONN_REFUSED \ + and ef == 0: + http.disconnect() + http = None + tor_retries += 1 + if tor_retries >= max_tor_retries: + _log("tor proxy failed after %d retries" % tor_retries, "error") + return None + _log("tor proxy retry %d/%d" % (tor_retries, max_tor_retries), "warn") + time.sleep(5) + continue + return None + break - ## only request header - if head: - hdr = http.head(uri, headers) - return hdr + ## only request header + if head: + hdr = http.head(uri, headers) + return hdr - hdr, res = http.get(uri, headers) - res = res.encode('utf-8') if isinstance(res, unicode) else res - for retry_message in retry_messages: - if retry_message in res: return None + hdr, res = http.get(uri, headers) + res = res.encode('utf-8') if isinstance(res, unicode) else res + for retry_message in retry_messages: + if retry_message in res: return None - return res + return res + finally: + if http: + http.disconnect() def valid_port(port): """Check if port number is valid (1-65535).""" @@ -176,25 +186,29 @@ def is_usable_proxy(proxy): return False _known_proxies = {} +_known_proxies_lock = threading.Lock() def init_known_proxies(proxydb): """Initialize known proxies cache from database.""" global _known_proxies - if _known_proxies: - return - known = proxydb.execute('SELECT proxy FROM proxylist').fetchall() - for k in known: - _known_proxies[k[0]] = True + with _known_proxies_lock: + if _known_proxies: + return + known = proxydb.execute('SELECT proxy FROM proxylist').fetchall() + for k in known: + _known_proxies[k[0]] = True def add_known_proxies(proxies): """Add proxies to known cache.""" global _known_proxies - for p in proxies: - _known_proxies[p] = True + with _known_proxies_lock: + for p in proxies: + _known_proxies[p] = True def is_known_proxy(proxy): """Check if proxy is in known cache.""" - return proxy in _known_proxies + with _known_proxies_lock: + return proxy in _known_proxies def detect_proto_from_path(url): """Detect proxy protocol from URL path. diff --git a/http2.py b/http2.py index f1e311e..cfae04b 100644 --- a/http2.py +++ b/http2.py @@ -111,6 +111,15 @@ class RsHttp(): def get_last_rocksock_exception(self): return self.last_rs_exception + def disconnect(self): + """Safely close the underlying connection.""" + if hasattr(self, 'conn') and self.conn: + try: + self.conn.disconnect() + except: + pass + self.conn = None + def _err_log(self, s): if self.log_errors: sys.stderr.write(s + '\n') diff --git a/misc.py b/misc.py index 2e55364..891262e 100644 --- a/misc.py +++ b/misc.py @@ -28,6 +28,12 @@ FAIL_CLOSED = 'closed' FAIL_PROXY = 'proxy' FAIL_OTHER = 'other' +# SSL errors - proxy is actively intercepting (still working for MITM proxies) +SSL_ERRORS = frozenset({FAIL_SSL}) + +# Connection errors - proxy might be dead, need secondary verification +CONN_ERRORS = frozenset({FAIL_TIMEOUT, FAIL_REFUSED, FAIL_UNREACHABLE, FAIL_CLOSED, FAIL_DNS}) + # Levels that go to stderr STDERR_LEVELS = ('warn', 'error') diff --git a/ppf.py b/ppf.py index f866d05..2c9c3e3 100644 --- a/ppf.py +++ b/ppf.py @@ -11,7 +11,7 @@ from misc import _log from config import Config import fetch import sys -from soup_parser import soupify, set_nobs +from soup_parser import set_nobs import re import threading import random @@ -49,15 +49,17 @@ def format_duration(seconds): return '%dd %dh' % (d, h) if h else '%dd' % d -def import_from_file(fn, sqlite): - with open(fn, 'r') as f: - urls = [ url for url in f.read().split('\n') if url ] - cinc = 0 - while True: - chunk = urls[cinc:cinc+200] - if chunk: dbs.insert_urls(chunk, 'import.txt', urldb) - else: break - cinc = cinc + 200 +def import_from_file(fn, urldb): + """Import URLs from a text file into the database.""" + try: + with open(fn, 'r') as f: + urls = [url.strip() for url in f if url.strip()] + except IOError: + return # File not found, silently skip + for i in range(0, len(urls), 200): + chunk = urls[i:i+200] + if chunk: + dbs.insert_urls(chunk, 'import.txt', urldb) def get_content_type(url, proxy): @@ -74,38 +76,6 @@ def is_good_content_type(string): if ct.lower() in string.lower(): return True return False -def is_bad_url(uri, domain=None, samedomain=False): - # if uri needs to be from same domain and domains missmatch - if samedomain and str(uri.split('/')[2]).lower() != str(domain).lower(): - return True - for u in urignore: - if re.findall(u, uri): return True - return False - -def extract_urls(html, url): - mytime = int(time.time()) - proto = url.split(':')[0] - domain = url.split('/')[2] - urls = [] - - soup = soupify(html, nohtml=True) - - for a in soup.find_all('a', href=True): - item = a['href'].encode('utf-8') if isinstance(a['href'], unicode) else a['href'] - item = item.strip() - - if item.startswith('www.'): - item = 'http://%s' % item - elif not item.startswith('http'): - if not item.startswith('/'): item = '/%s' % item - item = '%s://%s%s' % (proto,domain,item) - - elif is_bad_url(item, domain=domain, samedomain=config.ppf.extract_samedomain): - continue - if not item in urls: urls.append(item) - - if urls: dbs.insert_urls(urls, url, urldb) #insert_if_not_exists(urls) - def import_proxies_from_file(proxydb, fn): content = open(fn, 'r').read() # Detect protocol from filename (e.g., socks5.txt, http-proxies.txt) @@ -142,84 +112,97 @@ class Leechered(threading.Thread): def run(self): self.status = 'nok' - if not self.content_type: self.content_type = get_content_type(self.url, self.proxy) + try: + if not self.content_type: self.content_type = get_content_type(self.url, self.proxy) - if is_good_content_type(self.content_type): - try: - content = fetch.fetch_contents(self.url, proxy=self.proxy) - except KeyboardInterrupt as e: - raise e - except Exception as e: + if is_good_content_type(self.content_type): try: - err_msg = repr(e) - if isinstance(err_msg, unicode): - err_msg = err_msg.encode('ascii', 'backslashreplace') - except: - err_msg = type(e).__name__ - _log('%s: fetch error: %s' % (self.url.split('/')[2], err_msg), 'error') + content = fetch.fetch_contents(self.url, proxy=self.proxy) + except KeyboardInterrupt as e: + raise e + except Exception as e: + try: + err_msg = repr(e) + if isinstance(err_msg, unicode): + err_msg = err_msg.encode('ascii', 'backslashreplace') + except: + err_msg = type(e).__name__ + _log('%s: fetch error: %s' % (self.url.split('/')[2], err_msg), 'error') + content = '' + else: content = '' - else: - content = '' - # Detect protocol from source URL (e.g., .../socks5/list.txt) - proto = fetch.detect_proto_from_path(self.url) - unique = fetch.extract_proxies(content, filter_known=False, proto=proto) + # Detect protocol from source URL (e.g., .../socks5/list.txt) + proto = fetch.detect_proto_from_path(self.url) + unique = fetch.extract_proxies(content, filter_known=False, proto=proto) - # Compute hash of all extracted proxies for change detection - self.new_hash = dbs.compute_proxy_list_hash(unique) + # Compute hash of all extracted proxies for change detection + self.new_hash = dbs.compute_proxy_list_hash(unique) - # Check if content unchanged (same proxies as last time) - if self.new_hash and self.content_hash and self.new_hash == self.content_hash: - self.hash_unchanged = True - self.proxylist = [] - self.stale_count += 1 - next_check = config.ppf.checktime + (self.error + self.stale_count) * config.ppf.perfail_checktime - _log('%s: unchanged (hash match), next in %s' % (self.url.split('/')[2], format_duration(next_check)), 'stale') - # Content unchanged - increment stale_count, update check_time - self.execute = (self.error, self.stale_count, int(time.time()), self.retrievals, self.proxies_added, self.content_type, self.url) + # Check if content unchanged (same proxies as last time) + if self.new_hash and self.content_hash and self.new_hash == self.content_hash: + self.hash_unchanged = True + self.proxylist = [] + self.stale_count += 1 + next_check = config.ppf.checktime + (self.error + self.stale_count) * config.ppf.perfail_checktime + _log('%s: unchanged (hash match), next in %s' % (self.url.split('/')[2], format_duration(next_check)), 'stale') + # Content unchanged - increment stale_count, update check_time + self.execute = (self.error, self.stale_count, int(time.time()), self.retrievals, self.proxies_added, self.content_type, self.url) + self.status = 'ok' + return + + # Content changed or first fetch - reset stale_count, proceed with normal processing + self.stale_count = 0 + # unique is list of (address, proto) tuples; filter by address, keep tuple + self.proxylist = [(addr, pr) for addr, pr in unique if not fetch.is_known_proxy(addr)] + proxy_count = len(self.proxylist) + + if self.retrievals == 0: # new site + if content and not self.proxylist: # site works but has zero proxy addresses + self.error += 1 + self.stale_count += 1 + elif proxy_count: + self.error = 0 + self.stale_count = 0 + else: + self.error += 2 + self.stale_count += 2 + else: # not a new site + # proxylist is empty + if not proxy_count: + self.stale_count += 1 + # proxylist is not empty: site is working + else: + self.stale_count = 0 + self.error = 0 + # site has no content + if not content: + self.error += 1 + self.stale_count += 1 + # site has proxies + if proxy_count: + self.error = 0 + self.stale_count = 0 + + self.execute = (self.error, self.stale_count, int(time.time()), self.retrievals, self.proxies_added+len(self.proxylist), self.content_type, self.url) self.status = 'ok' - return - # Content changed or first fetch - reset stale_count, proceed with normal processing - self.stale_count = 0 - # unique is list of (address, proto) tuples; filter by address, keep tuple - self.proxylist = [(addr, pr) for addr, pr in unique if not fetch.is_known_proxy(addr)] - proxy_count = len(self.proxylist) - - if self.retrievals == 0: # new site - if content and not self.proxylist: # site works but has zero proxy addresses - self.error += 1 - self.stale_count += 1 - elif proxy_count: - self.error = 0 - self.stale_count = 0 - else: - self.error += 2 - self.stale_count += 2 - else: # not a new site - # proxylist is empty - if not proxy_count: - self.stale_count += 1 - # proxylist is not empty: site is working - else: - self.stale_count = 0 - self.error = 0 - # site has no content - if not content: - self.error += 1 - self.stale_count += 1 - #else: - # self.retrievals += 1 - # self.error = 0 - # self.stale_count = 0 - # site has proxies - if proxy_count: - self.error = 0 - self.stale_count = 0 - extract_urls(content, self.url) - - self.execute = (self.error, self.stale_count, int(time.time()), self.retrievals, self.proxies_added+len(self.proxylist), self.content_type, self.url) - self.status = 'ok' + except KeyboardInterrupt: + raise + except Exception as e: + try: + host = self.url.split('/')[2] if '/' in self.url else self.url + err_msg = repr(e) + if isinstance(err_msg, unicode): + err_msg = err_msg.encode('ascii', 'backslashreplace') + except: + host = 'unknown' + err_msg = type(e).__name__ + _log('%s: thread error: %s' % (host, err_msg), 'error') + # Set error state so site gets retried later + self.error += 1 + self.execute = (self.error, self.stale_count, int(time.time()), self.retrievals, self.proxies_added, self.content_type, self.url) + self.status = 'nok' def main(): @@ -247,12 +230,15 @@ def main(): else: watcherd = None - # start scraper if enabled - scraperd = None + # start scraper threads if enabled + scrapers = [] if config.scraper.enabled: import scraper - scraperd = scraper.Scraper(config) - scraperd.start() + for i in range(config.scraper.threads): + s = scraper.Scraper(config) + s.start() + scrapers.append(s) + _log('started %d scraper thread(s)' % len(scrapers), 'info') qurl = 'SELECT url,stale_count,error,retrievals,proxies_added,content_type,content_hash FROM uris WHERE error < ? and (check_time+?+((error+stale_count)*?) = self.backoff_until[ident]: - available.append((eng, ident)) + with self._lock: + for eng, ident in self.engines: + if ident not in self.backoff_until or now >= self.backoff_until[ident]: + available.append((eng, ident)) return available def mark_success(self, ident): """Reset failure count on success.""" - self.failures[ident] = 0 - self.success_count[ident] = self.success_count.get(ident, 0) + 1 - if ident in self.backoff_until: - del self.backoff_until[ident] + with self._lock: + self.failures[ident] = 0 + self.success_count[ident] = self.success_count.get(ident, 0) + 1 + if ident in self.backoff_until: + del self.backoff_until[ident] self.save_state() def mark_failure(self, ident): """Increment failure count and set exponential backoff.""" - count = self.failures.get(ident, 0) + 1 - self.failures[ident] = count - delay = min(self.base_delay * (2 ** (count - 1)), self.max_delay) - self.backoff_until[ident] = time.time() + delay - now = time.time() - if (now - self.last_rate_log) >= self.log_interval: - name = ident.split('/')[2] if '/' in ident else ident - avail, in_backoff, total = self.get_status() - _log('%d/%d engines in backoff (last: %s)' % (in_backoff, total, name), 'rate') - self.last_rate_log = now + with self._lock: + count = self.failures.get(ident, 0) + 1 + self.failures[ident] = count + delay = min(self.base_delay * (2 ** (count - 1)), self.max_delay) + self.backoff_until[ident] = time.time() + delay + now = time.time() + if (now - self.last_rate_log) >= self.log_interval: + name = ident.split('/')[2] if '/' in ident else ident + avail, in_backoff, total = self.get_status() + _log('%d/%d engines in backoff (last: %s)' % (in_backoff, total, name), 'rate') + self.last_rate_log = now self.save_state() return delay @@ -107,28 +111,31 @@ class EngineTracker(object): def get_stats(self): """Return detailed stats for API/dashboard.""" now = time.time() - available = self.get_available() - available_ids = set(ident for _, ident in available) + with self._lock: + available = self.get_available() + available_ids = set(ident for _, ident in available) - engines_list = [] - for eng, ident in self.engines: - # Shorten identifier for display - if '/' in ident: - name = ident.split('/')[2] # extract domain from URL - else: - name = ident + engines_list = [] + for eng, ident in self.engines: + # Shorten identifier for display + if '/' in ident: + name = ident.split('/')[2] # extract domain from URL + else: + name = ident - backoff_remaining = 0 - if ident in self.backoff_until: - backoff_remaining = max(0, int(self.backoff_until[ident] - now)) + backoff_remaining = 0 + if ident in self.backoff_until: + backoff_remaining = max(0, int(self.backoff_until[ident] - now)) - engines_list.append({ - 'name': name, - 'available': ident in available_ids, - 'successes': self.success_count.get(ident, 0), - 'failures': self.failures.get(ident, 0), - 'backoff_remaining': backoff_remaining - }) + engines_list.append({ + 'name': name, + 'available': ident in available_ids, + 'successes': self.success_count.get(ident, 0), + 'failures': self.failures.get(ident, 0), + 'backoff_remaining': backoff_remaining + }) + + total_successes = sum(self.success_count.values()) # Sort by success count descending engines_list.sort(key=lambda x: -x['successes']) @@ -137,7 +144,7 @@ class EngineTracker(object): 'available': len(available), 'in_backoff': len(self.engines) - len(available), 'total': len(self.engines), - 'total_successes': sum(self.success_count.values()), + 'total_successes': total_successes, 'engines': engines_list[:20] # Top 20 engines } @@ -184,25 +191,32 @@ class EngineTracker(object): if not force and (now - self._last_save) < self._save_interval: return + with self._lock: + try: + # Ensure directory exists + state_dir = os.path.dirname(self.state_file) + if state_dir and not os.path.exists(state_dir): + os.makedirs(state_dir) + + # Copy dicts under lock for thread-safe serialization + data = { + 'failures': dict(self.failures), + 'backoff_until': dict(self.backoff_until), + 'success_count': dict(self.success_count), + 'saved_at': now + } + self._last_save = now + + except (IOError, OSError) as e: + _log('failed to save scraper state: %s' % str(e), 'warn') + return + + # File I/O outside lock to minimize lock hold time try: - # Ensure directory exists - state_dir = os.path.dirname(self.state_file) - if state_dir and not os.path.exists(state_dir): - os.makedirs(state_dir) - - data = { - 'failures': self.failures, - 'backoff_until': self.backoff_until, - 'success_count': self.success_count, - 'saved_at': now - } - - # Atomic write tmp_file = self.state_file + '.tmp' with open(tmp_file, 'w') as f: json.dump(data, f, indent=2) os.rename(tmp_file, self.state_file) - self._last_save = now except (IOError, OSError) as e: _log('failed to save scraper state: %s' % str(e), 'warn')