diff --git a/fetch.py b/fetch.py index 9f2674c..891481f 100644 --- a/fetch.py +++ b/fetch.py @@ -15,6 +15,108 @@ def tor_proxy_url(torhost): user = ''.join(random.choice(chars) for _ in range(8)) passwd = ''.join(random.choice(chars) for _ in range(8)) return 'socks5://%s:%s@%s' % (user, passwd, torhost) + + +class FetchSession(object): + """Reusable fetch session with persistent Tor circuit. + + Maintains HTTP connection and Tor credentials across multiple requests. + Call cycle() to get a new Tor circuit when blocked. + """ + + def __init__(self): + self.http = None + self.current_host = None + self.current_port = None + self.current_ssl = None + self.tor_url = None + self._new_circuit() + + def _new_circuit(self): + """Generate new Tor credentials for a fresh circuit.""" + if config and config.torhosts: + torhost = random.choice(config.torhosts) + self.tor_url = tor_proxy_url(torhost) + + def cycle(self): + """Cycle to a new Tor circuit (call when blocked).""" + self.close() + self._new_circuit() + + def close(self): + """Close current connection.""" + if self.http: + try: + self.http.disconnect() + except Exception: + pass + self.http = None + self.current_host = None + + def fetch(self, url, head=False): + """Fetch URL, reusing connection if possible.""" + network_stats.set_category('scraper') + host, port, ssl, uri = _parse_url(url) + + # Check if we can reuse existing connection + if (self.http and self.current_host == host and + self.current_port == port and self.current_ssl == ssl): + # Reuse existing connection + try: + if head: + return self.http.head(uri, [ + 'Accept-Language: en-US,en;q=0.8', + 'Cache-Control: max-age=0', + ]) + hdr, res = self.http.get(uri, [ + 'Accept-Language: en-US,en;q=0.8', + 'Cache-Control: max-age=0', + ]) + res = res.encode('utf-8') if isinstance(res, unicode) else res + return res + except Exception: + # Connection died, close and reconnect + self.close() + + # Need new connection + self.close() + if not self.tor_url: + self._new_circuit() + + proxies = [rocksock.RocksockProxyFromURL(self.tor_url)] + self.http = RsHttp( + host, ssl=ssl, port=port, keep_alive=True, + timeout=config.ppf.timeout, max_tries=config.ppf.http_retries, + follow_redirects=True, auto_set_cookies=True, proxies=proxies, + user_agent='Mozilla/5.0 (Windows NT 6.1; rv:60.0) Gecko/20100101 Firefox/60.0', + log_errors=False + ) + + if not self.http.connect(): + self.close() + return None + + self.current_host = host + self.current_port = port + self.current_ssl = ssl + + try: + if head: + return self.http.head(uri, [ + 'Accept-Language: en-US,en;q=0.8', + 'Cache-Control: max-age=0', + ]) + hdr, res = self.http.get(uri, [ + 'Accept-Language: en-US,en;q=0.8', + 'Cache-Control: max-age=0', + ]) + res = res.encode('utf-8') if isinstance(res, unicode) else res + return res + except Exception: + self.close() + return None + + _last_fail_log = 0 _fail_log_interval = 60 diff --git a/scraper.py b/scraper.py index 9566ba5..1b75a5a 100755 --- a/scraper.py +++ b/scraper.py @@ -277,52 +277,64 @@ def scrape_engine(engine, ident, query, urignore, sqlite): consecutive_empty = 0 total_urls = 0 - for page in range(max_pages): - try: - url = engine.build_url(query, page) + # Use session for connection reuse within engine + session = fetch.FetchSession() - if config.scraper.debug: - _log('%s page %d: %s' % (engine.name, page, url), 'debug') + try: + for page in range(max_pages): + try: + url = engine.build_url(query, page) - content = fetch.fetch_contents(url) + if config.scraper.debug: + _log('%s page %d: %s' % (engine.name, page, url), 'debug') - # Check for rate limiting - if engine.is_rate_limited(content): - engine_tracker.mark_failure(ident) - return total_urls + content = session.fetch(url) - if not content: - consecutive_empty += 1 - if consecutive_empty >= config.scraper.fail_threshold: + # Check for rate limiting + if engine.is_rate_limited(content): engine_tracker.mark_failure(ident) + # Cycle to new circuit for next attempt + session.cycle() return total_urls - continue - # Extract URLs - urls = engine.extract_urls(content, urignore) + if not content: + consecutive_empty += 1 + if consecutive_empty >= config.scraper.fail_threshold: + engine_tracker.mark_failure(ident) + # Cycle to new circuit for next attempt + session.cycle() + return total_urls + continue - if not urls: - # Empty results on first page likely means rate limited - if page == 0: - engine_tracker.mark_failure(ident) + # Extract URLs + urls = engine.extract_urls(content, urignore) + + if not urls: + # Empty results on first page likely means rate limited + if page == 0: + engine_tracker.mark_failure(ident) + session.cycle() + return total_urls + + # Success + engine_tracker.mark_success(ident) + consecutive_empty = 0 + + # Deduplicate and insert + urls = list(set(urls)) + source = '%s (page %d, query: %s)' % (engine.name, page, query[:50]) + new_count = dbs.insert_urls(urls, source, sqlite) + total_urls += new_count + + # Small delay between pages + time.sleep(random.uniform(1.0, 3.0)) + + except Exception as e: + engine_tracker.mark_failure(ident) + session.cycle() return total_urls - - # Success - engine_tracker.mark_success(ident) - consecutive_empty = 0 - - # Deduplicate and insert - urls = list(set(urls)) - source = '%s (page %d, query: %s)' % (engine.name, page, query[:50]) - new_count = dbs.insert_urls(urls, source, sqlite) - total_urls += new_count - - # Small delay between pages - time.sleep(random.uniform(1.0, 3.0)) - - except Exception as e: - engine_tracker.mark_failure(ident) - return total_urls + finally: + session.close() return total_urls