scraper: reuse connections, cycle circuit on block
This commit is contained in:
102
fetch.py
102
fetch.py
@@ -15,6 +15,108 @@ def tor_proxy_url(torhost):
|
||||
user = ''.join(random.choice(chars) for _ in range(8))
|
||||
passwd = ''.join(random.choice(chars) for _ in range(8))
|
||||
return 'socks5://%s:%s@%s' % (user, passwd, torhost)
|
||||
|
||||
|
||||
class FetchSession(object):
|
||||
"""Reusable fetch session with persistent Tor circuit.
|
||||
|
||||
Maintains HTTP connection and Tor credentials across multiple requests.
|
||||
Call cycle() to get a new Tor circuit when blocked.
|
||||
"""
|
||||
|
||||
def __init__(self):
|
||||
self.http = None
|
||||
self.current_host = None
|
||||
self.current_port = None
|
||||
self.current_ssl = None
|
||||
self.tor_url = None
|
||||
self._new_circuit()
|
||||
|
||||
def _new_circuit(self):
|
||||
"""Generate new Tor credentials for a fresh circuit."""
|
||||
if config and config.torhosts:
|
||||
torhost = random.choice(config.torhosts)
|
||||
self.tor_url = tor_proxy_url(torhost)
|
||||
|
||||
def cycle(self):
|
||||
"""Cycle to a new Tor circuit (call when blocked)."""
|
||||
self.close()
|
||||
self._new_circuit()
|
||||
|
||||
def close(self):
|
||||
"""Close current connection."""
|
||||
if self.http:
|
||||
try:
|
||||
self.http.disconnect()
|
||||
except Exception:
|
||||
pass
|
||||
self.http = None
|
||||
self.current_host = None
|
||||
|
||||
def fetch(self, url, head=False):
|
||||
"""Fetch URL, reusing connection if possible."""
|
||||
network_stats.set_category('scraper')
|
||||
host, port, ssl, uri = _parse_url(url)
|
||||
|
||||
# Check if we can reuse existing connection
|
||||
if (self.http and self.current_host == host and
|
||||
self.current_port == port and self.current_ssl == ssl):
|
||||
# Reuse existing connection
|
||||
try:
|
||||
if head:
|
||||
return self.http.head(uri, [
|
||||
'Accept-Language: en-US,en;q=0.8',
|
||||
'Cache-Control: max-age=0',
|
||||
])
|
||||
hdr, res = self.http.get(uri, [
|
||||
'Accept-Language: en-US,en;q=0.8',
|
||||
'Cache-Control: max-age=0',
|
||||
])
|
||||
res = res.encode('utf-8') if isinstance(res, unicode) else res
|
||||
return res
|
||||
except Exception:
|
||||
# Connection died, close and reconnect
|
||||
self.close()
|
||||
|
||||
# Need new connection
|
||||
self.close()
|
||||
if not self.tor_url:
|
||||
self._new_circuit()
|
||||
|
||||
proxies = [rocksock.RocksockProxyFromURL(self.tor_url)]
|
||||
self.http = RsHttp(
|
||||
host, ssl=ssl, port=port, keep_alive=True,
|
||||
timeout=config.ppf.timeout, max_tries=config.ppf.http_retries,
|
||||
follow_redirects=True, auto_set_cookies=True, proxies=proxies,
|
||||
user_agent='Mozilla/5.0 (Windows NT 6.1; rv:60.0) Gecko/20100101 Firefox/60.0',
|
||||
log_errors=False
|
||||
)
|
||||
|
||||
if not self.http.connect():
|
||||
self.close()
|
||||
return None
|
||||
|
||||
self.current_host = host
|
||||
self.current_port = port
|
||||
self.current_ssl = ssl
|
||||
|
||||
try:
|
||||
if head:
|
||||
return self.http.head(uri, [
|
||||
'Accept-Language: en-US,en;q=0.8',
|
||||
'Cache-Control: max-age=0',
|
||||
])
|
||||
hdr, res = self.http.get(uri, [
|
||||
'Accept-Language: en-US,en;q=0.8',
|
||||
'Cache-Control: max-age=0',
|
||||
])
|
||||
res = res.encode('utf-8') if isinstance(res, unicode) else res
|
||||
return res
|
||||
except Exception:
|
||||
self.close()
|
||||
return None
|
||||
|
||||
|
||||
_last_fail_log = 0
|
||||
_fail_log_interval = 60
|
||||
|
||||
|
||||
86
scraper.py
86
scraper.py
@@ -277,52 +277,64 @@ def scrape_engine(engine, ident, query, urignore, sqlite):
|
||||
consecutive_empty = 0
|
||||
total_urls = 0
|
||||
|
||||
for page in range(max_pages):
|
||||
try:
|
||||
url = engine.build_url(query, page)
|
||||
# Use session for connection reuse within engine
|
||||
session = fetch.FetchSession()
|
||||
|
||||
if config.scraper.debug:
|
||||
_log('%s page %d: %s' % (engine.name, page, url), 'debug')
|
||||
try:
|
||||
for page in range(max_pages):
|
||||
try:
|
||||
url = engine.build_url(query, page)
|
||||
|
||||
content = fetch.fetch_contents(url)
|
||||
if config.scraper.debug:
|
||||
_log('%s page %d: %s' % (engine.name, page, url), 'debug')
|
||||
|
||||
# Check for rate limiting
|
||||
if engine.is_rate_limited(content):
|
||||
engine_tracker.mark_failure(ident)
|
||||
return total_urls
|
||||
content = session.fetch(url)
|
||||
|
||||
if not content:
|
||||
consecutive_empty += 1
|
||||
if consecutive_empty >= config.scraper.fail_threshold:
|
||||
# Check for rate limiting
|
||||
if engine.is_rate_limited(content):
|
||||
engine_tracker.mark_failure(ident)
|
||||
# Cycle to new circuit for next attempt
|
||||
session.cycle()
|
||||
return total_urls
|
||||
continue
|
||||
|
||||
# Extract URLs
|
||||
urls = engine.extract_urls(content, urignore)
|
||||
if not content:
|
||||
consecutive_empty += 1
|
||||
if consecutive_empty >= config.scraper.fail_threshold:
|
||||
engine_tracker.mark_failure(ident)
|
||||
# Cycle to new circuit for next attempt
|
||||
session.cycle()
|
||||
return total_urls
|
||||
continue
|
||||
|
||||
if not urls:
|
||||
# Empty results on first page likely means rate limited
|
||||
if page == 0:
|
||||
engine_tracker.mark_failure(ident)
|
||||
# Extract URLs
|
||||
urls = engine.extract_urls(content, urignore)
|
||||
|
||||
if not urls:
|
||||
# Empty results on first page likely means rate limited
|
||||
if page == 0:
|
||||
engine_tracker.mark_failure(ident)
|
||||
session.cycle()
|
||||
return total_urls
|
||||
|
||||
# Success
|
||||
engine_tracker.mark_success(ident)
|
||||
consecutive_empty = 0
|
||||
|
||||
# Deduplicate and insert
|
||||
urls = list(set(urls))
|
||||
source = '%s (page %d, query: %s)' % (engine.name, page, query[:50])
|
||||
new_count = dbs.insert_urls(urls, source, sqlite)
|
||||
total_urls += new_count
|
||||
|
||||
# Small delay between pages
|
||||
time.sleep(random.uniform(1.0, 3.0))
|
||||
|
||||
except Exception as e:
|
||||
engine_tracker.mark_failure(ident)
|
||||
session.cycle()
|
||||
return total_urls
|
||||
|
||||
# Success
|
||||
engine_tracker.mark_success(ident)
|
||||
consecutive_empty = 0
|
||||
|
||||
# Deduplicate and insert
|
||||
urls = list(set(urls))
|
||||
source = '%s (page %d, query: %s)' % (engine.name, page, query[:50])
|
||||
new_count = dbs.insert_urls(urls, source, sqlite)
|
||||
total_urls += new_count
|
||||
|
||||
# Small delay between pages
|
||||
time.sleep(random.uniform(1.0, 3.0))
|
||||
|
||||
except Exception as e:
|
||||
engine_tracker.mark_failure(ident)
|
||||
return total_urls
|
||||
finally:
|
||||
session.close()
|
||||
|
||||
return total_urls
|
||||
|
||||
|
||||
Reference in New Issue
Block a user