refactor core modules, integrate network stats

This commit is contained in:
Username
2025-12-25 11:13:20 +01:00
parent 2201515b10
commit 269fed55ff
8 changed files with 270 additions and 219 deletions

View File

@@ -6,6 +6,8 @@ class Config(ComboParser):
def load(self): def load(self):
super(Config, self).load() super(Config, self).load()
self.torhosts = [ str(i).strip() for i in self.common.tor_hosts.split(',') ] self.torhosts = [ str(i).strip() for i in self.common.tor_hosts.split(',') ]
# threads config = per-host value, multiply by Tor host count
self.watchd.threads = self.watchd.threads * len(self.torhosts)
#with open('servers.txt', 'r') as handle: #with open('servers.txt', 'r') as handle:
with open(self.watchd.source_file, 'r') as handle: with open(self.watchd.source_file, 'r') as handle:
self.servers = [x.strip() for x in handle.readlines() if len(x.strip()) > 0] self.servers = [x.strip() for x in handle.readlines() if len(x.strip()) > 0]
@@ -39,6 +41,8 @@ class Config(ComboParser):
errors.append('watchd.threads must be >= 1') errors.append('watchd.threads must be >= 1')
if self.ppf.threads < 1: if self.ppf.threads < 1:
errors.append('ppf.threads must be >= 1') errors.append('ppf.threads must be >= 1')
if self.scraper.threads < 1:
errors.append('scraper.threads must be >= 1')
# Validate max_fail # Validate max_fail
if self.watchd.max_fail < 1: if self.watchd.max_fail < 1:
@@ -47,7 +51,7 @@ class Config(ComboParser):
errors.append('ppf.max_fail must be >= 1') errors.append('ppf.max_fail must be >= 1')
# Validate checktype # Validate checktype
valid_checktypes = {'irc', 'http', 'judges', 'ssl'} valid_checktypes = {'irc', 'head', 'judges', 'ssl'}
if self.watchd.checktype not in valid_checktypes: if self.watchd.checktype not in valid_checktypes:
errors.append('watchd.checktype must be one of: %s' % ', '.join(sorted(valid_checktypes))) errors.append('watchd.checktype must be one of: %s' % ', '.join(sorted(valid_checktypes)))
@@ -81,6 +85,7 @@ class Config(ComboParser):
self.add_item(section, 'tor_hosts', str, '127.0.0.1:9050', 'comma-separated list of tor proxy address(es)', True) self.add_item(section, 'tor_hosts', str, '127.0.0.1:9050', 'comma-separated list of tor proxy address(es)', True)
self.add_item(section, 'timeout_connect', int, 10, 'connection timeout in seconds (default: 10)', False) self.add_item(section, 'timeout_connect', int, 10, 'connection timeout in seconds (default: 10)', False)
self.add_item(section, 'timeout_read', int, 15, 'read timeout in seconds (default: 15)', False) self.add_item(section, 'timeout_read', int, 15, 'read timeout in seconds (default: 15)', False)
self.add_item(section, 'profiling', bool, False, 'enable cProfile profiling (default: False)', False)
section = 'watchd' section = 'watchd'
self.add_item(section, 'outage_threshold', float, 4.0, 'mininum success percentage required to not drop check results', False) self.add_item(section, 'outage_threshold', float, 4.0, 'mininum success percentage required to not drop check results', False)
@@ -90,7 +95,7 @@ class Config(ComboParser):
self.add_item(section, 'timeout', int, 15, 'timeout for blocking operations (connect/recv/...) for proxy checks in seconds', False) self.add_item(section, 'timeout', int, 15, 'timeout for blocking operations (connect/recv/...) for proxy checks in seconds', False)
self.add_item(section, 'submit_after', int, 200, 'min. number of tested proxies for DB write', False) self.add_item(section, 'submit_after', int, 200, 'min. number of tested proxies for DB write', False)
self.add_item(section, 'debug', bool, False, 'whether to print additional debug info', False) self.add_item(section, 'debug', bool, False, 'whether to print additional debug info', False)
self.add_item(section, 'use_ssl', int, 0, 'whether to use SSL and port 6697 to connect to targets (slower)', False) self.add_item(section, 'use_ssl', int, 1, 'whether to use SSL (1=always, 0=never, 2=random)', False)
self.add_item(section, 'checktime', int, 1800, 'base checking interval for proxies in db in seconds', False) self.add_item(section, 'checktime', int, 1800, 'base checking interval for proxies in db in seconds', False)
self.add_item(section, 'perfail_checktime', int, 3600, 'additional checking interval for proxies in db in seconds per experienced failure', False) self.add_item(section, 'perfail_checktime', int, 3600, 'additional checking interval for proxies in db in seconds per experienced failure', False)
self.add_item(section, 'database', str, 'proxies.sqlite', 'filename of database', True) self.add_item(section, 'database', str, 'proxies.sqlite', 'filename of database', True)
@@ -101,7 +106,7 @@ class Config(ComboParser):
self.add_item(section, 'stale_days', int, 30, 'days after which dead proxies are removed (default: 30)', False) self.add_item(section, 'stale_days', int, 30, 'days after which dead proxies are removed (default: 30)', False)
self.add_item(section, 'stats_interval', int, 300, 'seconds between status reports (default: 300)', False) self.add_item(section, 'stats_interval', int, 300, 'seconds between status reports (default: 300)', False)
self.add_item(section, 'tor_safeguard', bool, True, 'enable tor safeguard (default: True)', False) self.add_item(section, 'tor_safeguard', bool, True, 'enable tor safeguard (default: True)', False)
self.add_item(section, 'checktype', str, 'ssl', 'check type: irc, http, judges, or ssl', False) self.add_item(section, 'checktype', str, 'ssl', 'check type: irc, head, judges, or ssl', False)
section = 'httpd' section = 'httpd'
self.add_item(section, 'listenip', str, '127.0.0.1', 'address for the httpd to listen to (default: 127.0.0.1)', True) self.add_item(section, 'listenip', str, '127.0.0.1', 'address for the httpd to listen to (default: 127.0.0.1)', True)
@@ -122,6 +127,7 @@ class Config(ComboParser):
section = 'scraper' section = 'scraper'
self.add_item(section, 'enabled', bool, True, 'enable search engine scraper (default: True)', False) self.add_item(section, 'enabled', bool, True, 'enable search engine scraper (default: True)', False)
self.add_item(section, 'threads', int, 3, 'number of scraper threads (default: 3)', False)
self.add_item(section, 'debug', bool, False, 'scraper: whether to print additional debug info', False) self.add_item(section, 'debug', bool, False, 'scraper: whether to print additional debug info', False)
self.add_item(section, 'query', str, 'psw', 'build query using Proxies, Search, Websites', False) self.add_item(section, 'query', str, 'psw', 'build query using Proxies, Search, Websites', False)
self.add_item(section, 'backoff_base', int, 30, 'base backoff delay in seconds (default: 30)', False) self.add_item(section, 'backoff_base', int, 30, 'base backoff delay in seconds (default: 30)', False)

17
dbs.py
View File

@@ -46,6 +46,16 @@ def _migrate_content_hash_column(sqlite):
sqlite.commit() sqlite.commit()
def _migrate_geolocation_columns(sqlite):
"""Add latitude/longitude columns for precise proxy mapping."""
try:
sqlite.execute('SELECT latitude FROM proxylist LIMIT 1')
except Exception:
sqlite.execute('ALTER TABLE proxylist ADD COLUMN latitude REAL')
sqlite.execute('ALTER TABLE proxylist ADD COLUMN longitude REAL')
sqlite.commit()
def compute_proxy_list_hash(proxies): def compute_proxy_list_hash(proxies):
"""Compute MD5 hash of sorted proxy list for change detection. """Compute MD5 hash of sorted proxy list for change detection.
@@ -267,11 +277,14 @@ def create_table_if_not_exists(sqlite, dbname):
latency_samples INT DEFAULT 0, latency_samples INT DEFAULT 0,
anonymity TEXT, anonymity TEXT,
exit_ip TEXT, exit_ip TEXT,
asn INT)""") asn INT,
latitude REAL,
longitude REAL)""")
# Migration: add columns to existing databases (must run before creating indexes) # Migration: add columns to existing databases (must run before creating indexes)
_migrate_latency_columns(sqlite) _migrate_latency_columns(sqlite)
_migrate_anonymity_columns(sqlite) _migrate_anonymity_columns(sqlite)
_migrate_asn_column(sqlite) _migrate_asn_column(sqlite)
_migrate_geolocation_columns(sqlite)
# Indexes for common query patterns # Indexes for common query patterns
sqlite.execute('CREATE INDEX IF NOT EXISTS idx_proxylist_failed ON proxylist(failed)') sqlite.execute('CREATE INDEX IF NOT EXISTS idx_proxylist_failed ON proxylist(failed)')
sqlite.execute('CREATE INDEX IF NOT EXISTS idx_proxylist_tested ON proxylist(tested)') sqlite.execute('CREATE INDEX IF NOT EXISTS idx_proxylist_tested ON proxylist(tested)')
@@ -360,7 +373,7 @@ def insert_proxies(proxydb, proxies, url):
else: else:
addr, proto = p, None addr, proto = p, None
ip, port = addr.split(':') ip, port = addr.split(':')
rows.append((timestamp, addr, ip, port, proto, 3, 0, 0, 0, 0, 0)) rows.append((timestamp, addr, ip, port, proto, 1, 0, 0, 0, 0, 0))
proxydb.executemany( proxydb.executemany(
'INSERT OR IGNORE INTO proxylist ' 'INSERT OR IGNORE INTO proxylist '
'(added,proxy,ip,port,proto,failed,tested,success_count,total_duration,mitm,consecutive_success) ' '(added,proxy,ip,port,proto,failed,tested,success_count,total_duration,mitm,consecutive_success) '

102
fetch.py
View File

@@ -1,5 +1,7 @@
import re, random, time import re, random, time
import threading
import rocksock import rocksock
import network_stats
from http2 import RsHttp, _parse_url from http2 import RsHttp, _parse_url
from soup_parser import soupify from soup_parser import soupify
from misc import _log from misc import _log
@@ -43,6 +45,7 @@ def fetch_contents(url, head=False, proxy=None):
retry_messages = ('Engines cannot retrieve results', 'Rate limit exceeded') retry_messages = ('Engines cannot retrieve results', 'Rate limit exceeded')
def _fetch_contents(url, head = False, proxy=None): def _fetch_contents(url, head = False, proxy=None):
network_stats.set_category('scraper')
host, port, ssl, uri = _parse_url(url) host, port, ssl, uri = _parse_url(url)
headers=[ headers=[
'Accept-Language: en-US,en;q=0.8', 'Accept-Language: en-US,en;q=0.8',
@@ -52,47 +55,54 @@ def _fetch_contents(url, head = False, proxy=None):
_log("connecting to %s... (header: %s)" % (url, str(head)), "debug") _log("connecting to %s... (header: %s)" % (url, str(head)), "debug")
tor_retries = 0 tor_retries = 0
max_tor_retries = 1 max_tor_retries = 1
while True: http = None
proxies = [rocksock.RocksockProxyFromURL('socks4://%s' % random.choice( config.torhosts ))] try:
if proxy: proxies.append( rocksock.RocksockProxyFromURL(proxy)) while True:
proxies = [rocksock.RocksockProxyFromURL('socks4://%s' % random.choice( config.torhosts ))]
if proxy: proxies.append( rocksock.RocksockProxyFromURL(proxy))
http = RsHttp(host,ssl=ssl,port=port, keep_alive=True, timeout=config.ppf.timeout, max_tries=config.ppf.http_retries, follow_redirects=True, auto_set_cookies=True, proxies=proxies, user_agent='Mozilla/5.0 (Windows NT 6.1; rv:60.0) Gecko/20100101 Firefox/60.0', log_errors=False) http = RsHttp(host,ssl=ssl,port=port, keep_alive=True, timeout=config.ppf.timeout, max_tries=config.ppf.http_retries, follow_redirects=True, auto_set_cookies=True, proxies=proxies, user_agent='Mozilla/5.0 (Windows NT 6.1; rv:60.0) Gecko/20100101 Firefox/60.0', log_errors=False)
if not http.connect(): if not http.connect():
global _last_fail_log global _last_fail_log
now = time.time() now = time.time()
if (now - _last_fail_log) >= _fail_log_interval: if (now - _last_fail_log) >= _fail_log_interval:
_log("failed to connect to %s"%url, "ppf") _log("failed to connect to %s"%url, "ppf")
_last_fail_log = now _last_fail_log = now
e = http.get_last_rocksock_exception() e = http.get_last_rocksock_exception()
if not e: if not e:
return None
et = e.get_errortype()
ee = e.get_error()
ef = e.get_failedproxy()
if et == rocksock.RS_ET_OWN and \
ee == rocksock.RS_E_TARGET_CONN_REFUSED \
and ef == 0:
tor_retries += 1
if tor_retries >= max_tor_retries:
_log("tor proxy failed after %d retries" % tor_retries, "error")
return None return None
_log("tor proxy retry %d/%d" % (tor_retries, max_tor_retries), "warn") et = e.get_errortype()
time.sleep(5) ee = e.get_error()
continue ef = e.get_failedproxy()
return None if et == rocksock.RS_ET_OWN and \
break ee == rocksock.RS_E_TARGET_CONN_REFUSED \
and ef == 0:
http.disconnect()
http = None
tor_retries += 1
if tor_retries >= max_tor_retries:
_log("tor proxy failed after %d retries" % tor_retries, "error")
return None
_log("tor proxy retry %d/%d" % (tor_retries, max_tor_retries), "warn")
time.sleep(5)
continue
return None
break
## only request header ## only request header
if head: if head:
hdr = http.head(uri, headers) hdr = http.head(uri, headers)
return hdr return hdr
hdr, res = http.get(uri, headers) hdr, res = http.get(uri, headers)
res = res.encode('utf-8') if isinstance(res, unicode) else res res = res.encode('utf-8') if isinstance(res, unicode) else res
for retry_message in retry_messages: for retry_message in retry_messages:
if retry_message in res: return None if retry_message in res: return None
return res return res
finally:
if http:
http.disconnect()
def valid_port(port): def valid_port(port):
"""Check if port number is valid (1-65535).""" """Check if port number is valid (1-65535)."""
@@ -176,25 +186,29 @@ def is_usable_proxy(proxy):
return False return False
_known_proxies = {} _known_proxies = {}
_known_proxies_lock = threading.Lock()
def init_known_proxies(proxydb): def init_known_proxies(proxydb):
"""Initialize known proxies cache from database.""" """Initialize known proxies cache from database."""
global _known_proxies global _known_proxies
if _known_proxies: with _known_proxies_lock:
return if _known_proxies:
known = proxydb.execute('SELECT proxy FROM proxylist').fetchall() return
for k in known: known = proxydb.execute('SELECT proxy FROM proxylist').fetchall()
_known_proxies[k[0]] = True for k in known:
_known_proxies[k[0]] = True
def add_known_proxies(proxies): def add_known_proxies(proxies):
"""Add proxies to known cache.""" """Add proxies to known cache."""
global _known_proxies global _known_proxies
for p in proxies: with _known_proxies_lock:
_known_proxies[p] = True for p in proxies:
_known_proxies[p] = True
def is_known_proxy(proxy): def is_known_proxy(proxy):
"""Check if proxy is in known cache.""" """Check if proxy is in known cache."""
return proxy in _known_proxies with _known_proxies_lock:
return proxy in _known_proxies
def detect_proto_from_path(url): def detect_proto_from_path(url):
"""Detect proxy protocol from URL path. """Detect proxy protocol from URL path.

View File

@@ -111,6 +111,15 @@ class RsHttp():
def get_last_rocksock_exception(self): def get_last_rocksock_exception(self):
return self.last_rs_exception return self.last_rs_exception
def disconnect(self):
"""Safely close the underlying connection."""
if hasattr(self, 'conn') and self.conn:
try:
self.conn.disconnect()
except:
pass
self.conn = None
def _err_log(self, s): def _err_log(self, s):
if self.log_errors: if self.log_errors:
sys.stderr.write(s + '\n') sys.stderr.write(s + '\n')

View File

@@ -28,6 +28,12 @@ FAIL_CLOSED = 'closed'
FAIL_PROXY = 'proxy' FAIL_PROXY = 'proxy'
FAIL_OTHER = 'other' FAIL_OTHER = 'other'
# SSL errors - proxy is actively intercepting (still working for MITM proxies)
SSL_ERRORS = frozenset({FAIL_SSL})
# Connection errors - proxy might be dead, need secondary verification
CONN_ERRORS = frozenset({FAIL_TIMEOUT, FAIL_REFUSED, FAIL_UNREACHABLE, FAIL_CLOSED, FAIL_DNS})
# Levels that go to stderr # Levels that go to stderr
STDERR_LEVELS = ('warn', 'error') STDERR_LEVELS = ('warn', 'error')

224
ppf.py
View File

@@ -11,7 +11,7 @@ from misc import _log
from config import Config from config import Config
import fetch import fetch
import sys import sys
from soup_parser import soupify, set_nobs from soup_parser import set_nobs
import re import re
import threading import threading
import random import random
@@ -49,15 +49,17 @@ def format_duration(seconds):
return '%dd %dh' % (d, h) if h else '%dd' % d return '%dd %dh' % (d, h) if h else '%dd' % d
def import_from_file(fn, sqlite): def import_from_file(fn, urldb):
with open(fn, 'r') as f: """Import URLs from a text file into the database."""
urls = [ url for url in f.read().split('\n') if url ] try:
cinc = 0 with open(fn, 'r') as f:
while True: urls = [url.strip() for url in f if url.strip()]
chunk = urls[cinc:cinc+200] except IOError:
if chunk: dbs.insert_urls(chunk, 'import.txt', urldb) return # File not found, silently skip
else: break for i in range(0, len(urls), 200):
cinc = cinc + 200 chunk = urls[i:i+200]
if chunk:
dbs.insert_urls(chunk, 'import.txt', urldb)
def get_content_type(url, proxy): def get_content_type(url, proxy):
@@ -74,38 +76,6 @@ def is_good_content_type(string):
if ct.lower() in string.lower(): return True if ct.lower() in string.lower(): return True
return False return False
def is_bad_url(uri, domain=None, samedomain=False):
# if uri needs to be from same domain and domains missmatch
if samedomain and str(uri.split('/')[2]).lower() != str(domain).lower():
return True
for u in urignore:
if re.findall(u, uri): return True
return False
def extract_urls(html, url):
mytime = int(time.time())
proto = url.split(':')[0]
domain = url.split('/')[2]
urls = []
soup = soupify(html, nohtml=True)
for a in soup.find_all('a', href=True):
item = a['href'].encode('utf-8') if isinstance(a['href'], unicode) else a['href']
item = item.strip()
if item.startswith('www.'):
item = 'http://%s' % item
elif not item.startswith('http'):
if not item.startswith('/'): item = '/%s' % item
item = '%s://%s%s' % (proto,domain,item)
elif is_bad_url(item, domain=domain, samedomain=config.ppf.extract_samedomain):
continue
if not item in urls: urls.append(item)
if urls: dbs.insert_urls(urls, url, urldb) #insert_if_not_exists(urls)
def import_proxies_from_file(proxydb, fn): def import_proxies_from_file(proxydb, fn):
content = open(fn, 'r').read() content = open(fn, 'r').read()
# Detect protocol from filename (e.g., socks5.txt, http-proxies.txt) # Detect protocol from filename (e.g., socks5.txt, http-proxies.txt)
@@ -142,84 +112,97 @@ class Leechered(threading.Thread):
def run(self): def run(self):
self.status = 'nok' self.status = 'nok'
if not self.content_type: self.content_type = get_content_type(self.url, self.proxy) try:
if not self.content_type: self.content_type = get_content_type(self.url, self.proxy)
if is_good_content_type(self.content_type): if is_good_content_type(self.content_type):
try:
content = fetch.fetch_contents(self.url, proxy=self.proxy)
except KeyboardInterrupt as e:
raise e
except Exception as e:
try: try:
err_msg = repr(e) content = fetch.fetch_contents(self.url, proxy=self.proxy)
if isinstance(err_msg, unicode): except KeyboardInterrupt as e:
err_msg = err_msg.encode('ascii', 'backslashreplace') raise e
except: except Exception as e:
err_msg = type(e).__name__ try:
_log('%s: fetch error: %s' % (self.url.split('/')[2], err_msg), 'error') err_msg = repr(e)
if isinstance(err_msg, unicode):
err_msg = err_msg.encode('ascii', 'backslashreplace')
except:
err_msg = type(e).__name__
_log('%s: fetch error: %s' % (self.url.split('/')[2], err_msg), 'error')
content = ''
else:
content = '' content = ''
else:
content = ''
# Detect protocol from source URL (e.g., .../socks5/list.txt) # Detect protocol from source URL (e.g., .../socks5/list.txt)
proto = fetch.detect_proto_from_path(self.url) proto = fetch.detect_proto_from_path(self.url)
unique = fetch.extract_proxies(content, filter_known=False, proto=proto) unique = fetch.extract_proxies(content, filter_known=False, proto=proto)
# Compute hash of all extracted proxies for change detection # Compute hash of all extracted proxies for change detection
self.new_hash = dbs.compute_proxy_list_hash(unique) self.new_hash = dbs.compute_proxy_list_hash(unique)
# Check if content unchanged (same proxies as last time) # Check if content unchanged (same proxies as last time)
if self.new_hash and self.content_hash and self.new_hash == self.content_hash: if self.new_hash and self.content_hash and self.new_hash == self.content_hash:
self.hash_unchanged = True self.hash_unchanged = True
self.proxylist = [] self.proxylist = []
self.stale_count += 1 self.stale_count += 1
next_check = config.ppf.checktime + (self.error + self.stale_count) * config.ppf.perfail_checktime next_check = config.ppf.checktime + (self.error + self.stale_count) * config.ppf.perfail_checktime
_log('%s: unchanged (hash match), next in %s' % (self.url.split('/')[2], format_duration(next_check)), 'stale') _log('%s: unchanged (hash match), next in %s' % (self.url.split('/')[2], format_duration(next_check)), 'stale')
# Content unchanged - increment stale_count, update check_time # Content unchanged - increment stale_count, update check_time
self.execute = (self.error, self.stale_count, int(time.time()), self.retrievals, self.proxies_added, self.content_type, self.url) self.execute = (self.error, self.stale_count, int(time.time()), self.retrievals, self.proxies_added, self.content_type, self.url)
self.status = 'ok'
return
# Content changed or first fetch - reset stale_count, proceed with normal processing
self.stale_count = 0
# unique is list of (address, proto) tuples; filter by address, keep tuple
self.proxylist = [(addr, pr) for addr, pr in unique if not fetch.is_known_proxy(addr)]
proxy_count = len(self.proxylist)
if self.retrievals == 0: # new site
if content and not self.proxylist: # site works but has zero proxy addresses
self.error += 1
self.stale_count += 1
elif proxy_count:
self.error = 0
self.stale_count = 0
else:
self.error += 2
self.stale_count += 2
else: # not a new site
# proxylist is empty
if not proxy_count:
self.stale_count += 1
# proxylist is not empty: site is working
else:
self.stale_count = 0
self.error = 0
# site has no content
if not content:
self.error += 1
self.stale_count += 1
# site has proxies
if proxy_count:
self.error = 0
self.stale_count = 0
self.execute = (self.error, self.stale_count, int(time.time()), self.retrievals, self.proxies_added+len(self.proxylist), self.content_type, self.url)
self.status = 'ok' self.status = 'ok'
return
# Content changed or first fetch - reset stale_count, proceed with normal processing except KeyboardInterrupt:
self.stale_count = 0 raise
# unique is list of (address, proto) tuples; filter by address, keep tuple except Exception as e:
self.proxylist = [(addr, pr) for addr, pr in unique if not fetch.is_known_proxy(addr)] try:
proxy_count = len(self.proxylist) host = self.url.split('/')[2] if '/' in self.url else self.url
err_msg = repr(e)
if self.retrievals == 0: # new site if isinstance(err_msg, unicode):
if content and not self.proxylist: # site works but has zero proxy addresses err_msg = err_msg.encode('ascii', 'backslashreplace')
self.error += 1 except:
self.stale_count += 1 host = 'unknown'
elif proxy_count: err_msg = type(e).__name__
self.error = 0 _log('%s: thread error: %s' % (host, err_msg), 'error')
self.stale_count = 0 # Set error state so site gets retried later
else: self.error += 1
self.error += 2 self.execute = (self.error, self.stale_count, int(time.time()), self.retrievals, self.proxies_added, self.content_type, self.url)
self.stale_count += 2 self.status = 'nok'
else: # not a new site
# proxylist is empty
if not proxy_count:
self.stale_count += 1
# proxylist is not empty: site is working
else:
self.stale_count = 0
self.error = 0
# site has no content
if not content:
self.error += 1
self.stale_count += 1
#else:
# self.retrievals += 1
# self.error = 0
# self.stale_count = 0
# site has proxies
if proxy_count:
self.error = 0
self.stale_count = 0
extract_urls(content, self.url)
self.execute = (self.error, self.stale_count, int(time.time()), self.retrievals, self.proxies_added+len(self.proxylist), self.content_type, self.url)
self.status = 'ok'
def main(): def main():
@@ -247,12 +230,15 @@ def main():
else: else:
watcherd = None watcherd = None
# start scraper if enabled # start scraper threads if enabled
scraperd = None scrapers = []
if config.scraper.enabled: if config.scraper.enabled:
import scraper import scraper
scraperd = scraper.Scraper(config) for i in range(config.scraper.threads):
scraperd.start() s = scraper.Scraper(config)
s.start()
scrapers.append(s)
_log('started %d scraper thread(s)' % len(scrapers), 'info')
qurl = 'SELECT url,stale_count,error,retrievals,proxies_added,content_type,content_hash FROM uris WHERE error < ? and (check_time+?+((error+stale_count)*?) <?) ORDER BY RANDOM()' qurl = 'SELECT url,stale_count,error,retrievals,proxies_added,content_type,content_hash FROM uris WHERE error < ? and (check_time+?+((error+stale_count)*?) <?) ORDER BY RANDOM()'
threads = [] threads = []
@@ -305,8 +291,8 @@ def main():
t.start() t.start()
except KeyboardInterrupt: except KeyboardInterrupt:
if scraperd: for s in scrapers:
scraperd.stop() s.stop()
if watcherd: if watcherd:
watcherd.stop() watcherd.stop()
watcherd.finish() watcherd.finish()
@@ -328,7 +314,7 @@ if __name__ == '__main__':
if config.args.nobs: if config.args.nobs:
set_nobs(True) set_nobs(True)
if config.args.profile: if config.args.profile or config.common.profiling:
_log('profiling enabled, output to data/profile.stats', 'info') _log('profiling enabled, output to data/profile.stats', 'info')
_profiler = cProfile.Profile() _profiler = cProfile.Profile()
try: try:

View File

@@ -19,6 +19,7 @@
# https://www.gnu.org/licenses/old-licenses/lgpl-2.1.en.html # https://www.gnu.org/licenses/old-licenses/lgpl-2.1.en.html
import socket, ssl, select, copy, errno import socket, ssl, select, copy, errno
import network_stats
# rs_proxyType # rs_proxyType
RS_PT_NONE = 0 RS_PT_NONE = 0
@@ -310,6 +311,7 @@ class Rocksock():
if self.sock is None: if self.sock is None:
raise(RocksockException(RS_E_NO_SOCKET, failedproxy=self._failed_proxy(pnum))) raise(RocksockException(RS_E_NO_SOCKET, failedproxy=self._failed_proxy(pnum)))
try: try:
network_stats.add_tx(len(buf))
return self.sock.sendall(buf) return self.sock.sendall(buf)
except socket.error as e: except socket.error as e:
raise(self._translate_socket_error(e, pnum)) raise(self._translate_socket_error(e, pnum))
@@ -340,6 +342,7 @@ class Rocksock():
raise(RocksockException(RS_E_SSL_GENERIC, failedproxy=s, errortype=RS_ET_SSL)) raise(RocksockException(RS_E_SSL_GENERIC, failedproxy=s, errortype=RS_ET_SSL))
if len(chunk) == 0: if len(chunk) == 0:
raise(RocksockException(RS_E_REMOTE_DISCONNECTED, failedproxy=self._failed_proxy(pnum))) raise(RocksockException(RS_E_REMOTE_DISCONNECTED, failedproxy=self._failed_proxy(pnum)))
network_stats.add_rx(len(chunk))
data += chunk data += chunk
if count == -1: break if count == -1: break
else: count -= len(chunk) else: count -= len(chunk)

View File

@@ -48,6 +48,7 @@ class EngineTracker(object):
self.state_file = state_file or STATE_FILE self.state_file = state_file or STATE_FILE
self._save_interval = 60 # seconds between saves self._save_interval = 60 # seconds between saves
self._last_save = 0 self._last_save = 0
self._lock = threading.RLock() # Reentrant lock for nested calls
# Build list of (engine_instance, identifier) # Build list of (engine_instance, identifier)
self.engines = [] self.engines = []
@@ -70,31 +71,34 @@ class EngineTracker(object):
"""Return engines not currently in backoff.""" """Return engines not currently in backoff."""
now = time.time() now = time.time()
available = [] available = []
for eng, ident in self.engines: with self._lock:
if ident not in self.backoff_until or now >= self.backoff_until[ident]: for eng, ident in self.engines:
available.append((eng, ident)) if ident not in self.backoff_until or now >= self.backoff_until[ident]:
available.append((eng, ident))
return available return available
def mark_success(self, ident): def mark_success(self, ident):
"""Reset failure count on success.""" """Reset failure count on success."""
self.failures[ident] = 0 with self._lock:
self.success_count[ident] = self.success_count.get(ident, 0) + 1 self.failures[ident] = 0
if ident in self.backoff_until: self.success_count[ident] = self.success_count.get(ident, 0) + 1
del self.backoff_until[ident] if ident in self.backoff_until:
del self.backoff_until[ident]
self.save_state() self.save_state()
def mark_failure(self, ident): def mark_failure(self, ident):
"""Increment failure count and set exponential backoff.""" """Increment failure count and set exponential backoff."""
count = self.failures.get(ident, 0) + 1 with self._lock:
self.failures[ident] = count count = self.failures.get(ident, 0) + 1
delay = min(self.base_delay * (2 ** (count - 1)), self.max_delay) self.failures[ident] = count
self.backoff_until[ident] = time.time() + delay delay = min(self.base_delay * (2 ** (count - 1)), self.max_delay)
now = time.time() self.backoff_until[ident] = time.time() + delay
if (now - self.last_rate_log) >= self.log_interval: now = time.time()
name = ident.split('/')[2] if '/' in ident else ident if (now - self.last_rate_log) >= self.log_interval:
avail, in_backoff, total = self.get_status() name = ident.split('/')[2] if '/' in ident else ident
_log('%d/%d engines in backoff (last: %s)' % (in_backoff, total, name), 'rate') avail, in_backoff, total = self.get_status()
self.last_rate_log = now _log('%d/%d engines in backoff (last: %s)' % (in_backoff, total, name), 'rate')
self.last_rate_log = now
self.save_state() self.save_state()
return delay return delay
@@ -107,28 +111,31 @@ class EngineTracker(object):
def get_stats(self): def get_stats(self):
"""Return detailed stats for API/dashboard.""" """Return detailed stats for API/dashboard."""
now = time.time() now = time.time()
available = self.get_available() with self._lock:
available_ids = set(ident for _, ident in available) available = self.get_available()
available_ids = set(ident for _, ident in available)
engines_list = [] engines_list = []
for eng, ident in self.engines: for eng, ident in self.engines:
# Shorten identifier for display # Shorten identifier for display
if '/' in ident: if '/' in ident:
name = ident.split('/')[2] # extract domain from URL name = ident.split('/')[2] # extract domain from URL
else: else:
name = ident name = ident
backoff_remaining = 0 backoff_remaining = 0
if ident in self.backoff_until: if ident in self.backoff_until:
backoff_remaining = max(0, int(self.backoff_until[ident] - now)) backoff_remaining = max(0, int(self.backoff_until[ident] - now))
engines_list.append({ engines_list.append({
'name': name, 'name': name,
'available': ident in available_ids, 'available': ident in available_ids,
'successes': self.success_count.get(ident, 0), 'successes': self.success_count.get(ident, 0),
'failures': self.failures.get(ident, 0), 'failures': self.failures.get(ident, 0),
'backoff_remaining': backoff_remaining 'backoff_remaining': backoff_remaining
}) })
total_successes = sum(self.success_count.values())
# Sort by success count descending # Sort by success count descending
engines_list.sort(key=lambda x: -x['successes']) engines_list.sort(key=lambda x: -x['successes'])
@@ -137,7 +144,7 @@ class EngineTracker(object):
'available': len(available), 'available': len(available),
'in_backoff': len(self.engines) - len(available), 'in_backoff': len(self.engines) - len(available),
'total': len(self.engines), 'total': len(self.engines),
'total_successes': sum(self.success_count.values()), 'total_successes': total_successes,
'engines': engines_list[:20] # Top 20 engines 'engines': engines_list[:20] # Top 20 engines
} }
@@ -184,25 +191,32 @@ class EngineTracker(object):
if not force and (now - self._last_save) < self._save_interval: if not force and (now - self._last_save) < self._save_interval:
return return
with self._lock:
try:
# Ensure directory exists
state_dir = os.path.dirname(self.state_file)
if state_dir and not os.path.exists(state_dir):
os.makedirs(state_dir)
# Copy dicts under lock for thread-safe serialization
data = {
'failures': dict(self.failures),
'backoff_until': dict(self.backoff_until),
'success_count': dict(self.success_count),
'saved_at': now
}
self._last_save = now
except (IOError, OSError) as e:
_log('failed to save scraper state: %s' % str(e), 'warn')
return
# File I/O outside lock to minimize lock hold time
try: try:
# Ensure directory exists
state_dir = os.path.dirname(self.state_file)
if state_dir and not os.path.exists(state_dir):
os.makedirs(state_dir)
data = {
'failures': self.failures,
'backoff_until': self.backoff_until,
'success_count': self.success_count,
'saved_at': now
}
# Atomic write
tmp_file = self.state_file + '.tmp' tmp_file = self.state_file + '.tmp'
with open(tmp_file, 'w') as f: with open(tmp_file, 'w') as f:
json.dump(data, f, indent=2) json.dump(data, f, indent=2)
os.rename(tmp_file, self.state_file) os.rename(tmp_file, self.state_file)
self._last_save = now
except (IOError, OSError) as e: except (IOError, OSError) as e:
_log('failed to save scraper state: %s' % str(e), 'warn') _log('failed to save scraper state: %s' % str(e), 'warn')