Seed sources that error out are permanently excluded from claiming. Over time this starves the pipeline. Re-seed every 6 hours with error reset for exhausted sources, preventing the starvation loop that caused the previous outage.
1153 lines
44 KiB
Python
1153 lines
44 KiB
Python
#!/usr/bin/env python2
|
|
|
|
__version__ = '2.0.0'
|
|
|
|
import sys
|
|
import os
|
|
|
|
# Gevent monkey-patch MUST happen before any other imports
|
|
# Both master (httpd) and worker modes use gevent for async I/O
|
|
from gevent import monkey
|
|
monkey.patch_all()
|
|
|
|
import cProfile
|
|
import pstats
|
|
import signal
|
|
import dbs
|
|
import time
|
|
import mysqlite
|
|
from misc import _log
|
|
from config import Config
|
|
import fetch
|
|
from soup_parser import set_nobs
|
|
import threading
|
|
import random
|
|
import json
|
|
|
|
# Global profiler for signal handler access
|
|
_profiler = None
|
|
|
|
# Handle SIGTERM gracefully (for container stop)
|
|
def sigterm_handler(signum, frame):
|
|
global _profiler
|
|
if _profiler:
|
|
_profiler.disable()
|
|
_profiler.dump_stats('data/profile.stats')
|
|
_log('profile stats written to data/profile.stats (SIGTERM)', 'info')
|
|
raise KeyboardInterrupt
|
|
|
|
signal.signal(signal.SIGTERM, sigterm_handler)
|
|
|
|
config = Config()
|
|
|
|
|
|
def reset_state():
|
|
"""Clear all proxy and session state for a fresh start.
|
|
|
|
Clears:
|
|
- proxylist table (all proxies)
|
|
- session_state table (test counters)
|
|
- stats_history table (historical stats)
|
|
- scraper_state.json (engine backoffs)
|
|
- mitm_certs.json (MITM cert tracking)
|
|
"""
|
|
db_path = config.watchd.database
|
|
data_dir = os.path.dirname(db_path) or 'data'
|
|
|
|
# Clear database tables
|
|
try:
|
|
db = mysqlite.mysqlite(db_path, str)
|
|
dbs.create_table_if_not_exists(db, 'proxylist')
|
|
|
|
db.execute('DELETE FROM proxylist')
|
|
db.commit()
|
|
_log('cleared proxylist table', 'reset')
|
|
|
|
# Clear session_state if exists
|
|
try:
|
|
db.execute('DELETE FROM session_state')
|
|
db.commit()
|
|
_log('cleared session_state table', 'reset')
|
|
except Exception:
|
|
pass
|
|
|
|
# Clear stats_history if exists
|
|
try:
|
|
db.execute('DELETE FROM stats_history')
|
|
db.commit()
|
|
_log('cleared stats_history table', 'reset')
|
|
except Exception:
|
|
pass
|
|
|
|
db.execute('VACUUM')
|
|
db.commit()
|
|
db.close()
|
|
except Exception as e:
|
|
_log('database reset failed: %s' % e, 'error')
|
|
return False
|
|
|
|
# Remove state files
|
|
state_files = [
|
|
os.path.join(data_dir, 'scraper_state.json'),
|
|
os.path.join(data_dir, 'mitm_certs.json'),
|
|
]
|
|
for f in state_files:
|
|
if os.path.exists(f):
|
|
try:
|
|
os.remove(f)
|
|
_log('removed %s' % f, 'reset')
|
|
except Exception as e:
|
|
_log('failed to remove %s: %s' % (f, e), 'error')
|
|
|
|
_log('state reset complete', 'reset')
|
|
return True
|
|
|
|
|
|
def format_duration(seconds):
|
|
"""Format seconds into compact human-readable duration."""
|
|
if seconds < 60:
|
|
return '%ds' % seconds
|
|
elif seconds < 3600:
|
|
return '%dm' % (seconds // 60)
|
|
elif seconds < 86400:
|
|
h, m = divmod(seconds, 3600)
|
|
m = m // 60
|
|
return '%dh %dm' % (h, m) if m else '%dh' % h
|
|
else:
|
|
d, rem = divmod(seconds, 86400)
|
|
h = rem // 3600
|
|
return '%dd %dh' % (d, h) if h else '%dd' % d
|
|
|
|
|
|
def import_from_file(fn, urldb):
|
|
"""Import URLs from a text file into the database."""
|
|
try:
|
|
with open(fn, 'r') as f:
|
|
urls = [url.strip() for url in f if url.strip()]
|
|
except IOError:
|
|
return # File not found, silently skip
|
|
for i in range(0, len(urls), 200):
|
|
chunk = urls[i:i+200]
|
|
if chunk:
|
|
dbs.insert_urls(chunk, 'import.txt', urldb)
|
|
|
|
|
|
def get_content_type(url, proxy):
|
|
hdr = fetch.fetch_contents(url, head=True, proxy=proxy)
|
|
|
|
for h in hdr.split('\n'):
|
|
if h.lower().startswith('content-type: '): return h.lower().split(':')[1].strip()
|
|
|
|
return ''
|
|
|
|
def is_good_content_type(string):
|
|
allowed_ct = [ 'text/html', 'text/plain', 'atom+xml' ]
|
|
for ct in allowed_ct:
|
|
if ct.lower() in string.lower(): return True
|
|
return False
|
|
|
|
def import_proxies_from_file(proxydb, fn):
|
|
content = open(fn, 'r').read()
|
|
# Detect protocol from filename (e.g., socks5.txt, http-proxies.txt)
|
|
proto = fetch.detect_proto_from_path(fn)
|
|
unique_count, new = fetch.extract_proxies(content, proxydb, proto=proto)
|
|
if new:
|
|
dbs.insert_proxies(proxydb, new, fn)
|
|
return 0
|
|
return 1
|
|
|
|
class Leechered(threading.Thread):
|
|
def __init__(self, url, stale_count, error, retrievals, proxies_added, content_type, content_hash, proxy):
|
|
self.status = 'nok'
|
|
self.proxylist = []
|
|
self.running = True
|
|
self.url = url
|
|
self.stale_count = stale_count
|
|
self.error = error
|
|
self.retrievals = retrievals
|
|
self.proxies_added = proxies_added
|
|
self.content_type = content_type
|
|
self.content_hash = content_hash
|
|
self.new_hash = None
|
|
self.hash_unchanged = False
|
|
self.proxy = proxy
|
|
self.execute = ''
|
|
threading.Thread.__init__(self)
|
|
|
|
def retrieve(self):
|
|
return self.url, self.proxylist, self.stale_count, self.error, self.retrievals, self.content_type, self.proxies_added, self.execute
|
|
|
|
def run(self):
|
|
self.status = 'nok'
|
|
|
|
try:
|
|
if not self.content_type: self.content_type = get_content_type(self.url, self.proxy)
|
|
|
|
if is_good_content_type(self.content_type):
|
|
try:
|
|
content = fetch.fetch_contents(self.url, proxy=self.proxy)
|
|
except KeyboardInterrupt as e:
|
|
raise e
|
|
except Exception as e:
|
|
try:
|
|
err_msg = repr(e)
|
|
if isinstance(err_msg, unicode):
|
|
err_msg = err_msg.encode('ascii', 'backslashreplace')
|
|
except:
|
|
err_msg = type(e).__name__
|
|
_log('%s: fetch error: %s' % (self.url.split('/')[2], err_msg), 'error')
|
|
content = ''
|
|
else:
|
|
content = ''
|
|
|
|
# Detect protocol from source URL (e.g., .../socks5/list.txt)
|
|
proto = fetch.detect_proto_from_path(self.url)
|
|
unique = fetch.extract_proxies(content, filter_known=False, proto=proto)
|
|
|
|
# Compute hash of all extracted proxies for change detection
|
|
self.new_hash = dbs.compute_proxy_list_hash(unique)
|
|
|
|
# Check if content unchanged (same proxies as last time)
|
|
if self.new_hash and self.content_hash and self.new_hash == self.content_hash:
|
|
self.hash_unchanged = True
|
|
self.proxylist = []
|
|
self.stale_count += 1
|
|
next_check = config.ppf.checktime + (self.error + self.stale_count) * config.ppf.perfail_checktime
|
|
_log('%s: unchanged (hash match), next in %s' % (self.url.split('/')[2], format_duration(next_check)), 'stale')
|
|
# Content unchanged - increment stale_count, update check_time
|
|
self.execute = (self.error, self.stale_count, int(time.time()), self.retrievals, self.proxies_added, self.content_type, self.url)
|
|
self.status = 'ok'
|
|
return
|
|
|
|
# Content changed or first fetch - reset stale_count, proceed with normal processing
|
|
self.stale_count = 0
|
|
# unique is list of (address, proto, confidence) tuples; filter by address, keep tuple
|
|
self.proxylist = [(addr, pr, conf) for addr, pr, conf in unique if not fetch.is_known_proxy(addr)]
|
|
proxy_count = len(self.proxylist)
|
|
|
|
if self.retrievals == 0: # new site
|
|
if content and not self.proxylist: # site works but has zero proxy addresses
|
|
self.error += 1
|
|
self.stale_count += 1
|
|
elif proxy_count:
|
|
self.error = 0
|
|
self.stale_count = 0
|
|
else:
|
|
self.error += 2
|
|
self.stale_count += 2
|
|
else: # not a new site
|
|
# proxylist is empty
|
|
if not proxy_count:
|
|
self.stale_count += 1
|
|
# proxylist is not empty: site is working
|
|
else:
|
|
self.stale_count = 0
|
|
self.error = 0
|
|
# site has no content
|
|
if not content:
|
|
self.error += 1
|
|
self.stale_count += 1
|
|
# site has proxies
|
|
if proxy_count:
|
|
self.error = 0
|
|
self.stale_count = 0
|
|
|
|
self.execute = (self.error, self.stale_count, int(time.time()), self.retrievals, self.proxies_added+len(self.proxylist), self.content_type, self.url)
|
|
self.status = 'ok'
|
|
|
|
except KeyboardInterrupt:
|
|
raise
|
|
except Exception as e:
|
|
try:
|
|
host = self.url.split('/')[2] if '/' in self.url else self.url
|
|
err_msg = repr(e)
|
|
if isinstance(err_msg, unicode):
|
|
err_msg = err_msg.encode('ascii', 'backslashreplace')
|
|
except:
|
|
host = 'unknown'
|
|
err_msg = type(e).__name__
|
|
_log('%s: thread error: %s' % (host, err_msg), 'error')
|
|
# Set error state so site gets retried later
|
|
self.error += 1
|
|
self.execute = (self.error, self.stale_count, int(time.time()), self.retrievals, self.proxies_added, self.content_type, self.url)
|
|
self.status = 'nok'
|
|
|
|
|
|
# Worker mode imports (lazy loaded)
|
|
try:
|
|
import urllib2
|
|
import socket
|
|
except ImportError:
|
|
urllib2 = None
|
|
|
|
|
|
def worker_register(server_url, name, master_key=''):
|
|
"""Register with master server and get credentials."""
|
|
url = server_url.rstrip('/') + '/api/register'
|
|
data = json.dumps({'name': name, 'master_key': master_key})
|
|
|
|
req = urllib2.Request(url, data)
|
|
req.add_header('Content-Type', 'application/json')
|
|
|
|
try:
|
|
resp = urllib2.urlopen(req, timeout=30)
|
|
result = json.loads(resp.read())
|
|
return result.get('worker_id'), result.get('worker_key')
|
|
except Exception as e:
|
|
_log('registration failed: %s' % e, 'error')
|
|
return None, None
|
|
|
|
|
|
class NeedReregister(Exception):
|
|
"""Raised when worker key is invalid and re-registration is needed."""
|
|
pass
|
|
|
|
|
|
def worker_send_heartbeat(server_url, worker_key, tor_ok, tor_ip=None, profiling=False, threads=0):
|
|
"""Send heartbeat with Tor status to master."""
|
|
url = '%s/api/heartbeat?key=%s' % (server_url.rstrip('/'), worker_key)
|
|
data = json.dumps({
|
|
'tor_ok': tor_ok,
|
|
'tor_ip': tor_ip,
|
|
'profiling': profiling,
|
|
'threads': threads,
|
|
})
|
|
|
|
req = urllib2.Request(url, data)
|
|
req.add_header('Content-Type', 'application/json')
|
|
|
|
try:
|
|
resp = urllib2.urlopen(req, timeout=10)
|
|
return True
|
|
except urllib2.HTTPError as e:
|
|
if e.code == 403:
|
|
raise NeedReregister()
|
|
return False
|
|
except Exception:
|
|
return False
|
|
|
|
|
|
def worker_claim_urls(server_url, worker_key, count=5):
|
|
"""Claim batch of URLs for worker mode."""
|
|
url = '%s/api/claim-urls?key=%s&count=%d' % (server_url.rstrip('/'), worker_key, count)
|
|
|
|
try:
|
|
resp = urllib2.urlopen(url, timeout=30)
|
|
result = json.loads(resp.read())
|
|
return result.get('urls', [])
|
|
except urllib2.HTTPError as e:
|
|
if e.code == 403:
|
|
_log('worker key rejected (403), need to re-register', 'warn')
|
|
raise NeedReregister()
|
|
_log('failed to claim urls: %s' % e, 'error')
|
|
return []
|
|
except Exception as e:
|
|
_log('failed to claim urls: %s' % e, 'error')
|
|
return []
|
|
|
|
|
|
def worker_report_urls(server_url, worker_key, reports):
|
|
"""Report URL fetch results to master."""
|
|
url = '%s/api/report-urls?key=%s' % (server_url.rstrip('/'), worker_key)
|
|
data = json.dumps({'reports': reports})
|
|
|
|
req = urllib2.Request(url, data)
|
|
req.add_header('Content-Type', 'application/json')
|
|
|
|
try:
|
|
resp = urllib2.urlopen(req, timeout=30)
|
|
result = json.loads(resp.read())
|
|
return result.get('processed', 0)
|
|
except urllib2.HTTPError as e:
|
|
if e.code == 403:
|
|
_log('worker key rejected (403), need to re-register', 'warn')
|
|
raise NeedReregister()
|
|
_log('failed to report urls: %s' % e, 'error')
|
|
return 0
|
|
except Exception as e:
|
|
_log('failed to report urls: %s' % e, 'error')
|
|
return 0
|
|
|
|
|
|
def worker_report_proxies(server_url, worker_key, proxies):
|
|
"""Report working proxies to master."""
|
|
url = '%s/api/report-proxies?key=%s' % (server_url.rstrip('/'), worker_key)
|
|
data = json.dumps({'proxies': proxies})
|
|
|
|
req = urllib2.Request(url, data)
|
|
req.add_header('Content-Type', 'application/json')
|
|
|
|
try:
|
|
resp = urllib2.urlopen(req, timeout=30)
|
|
result = json.loads(resp.read())
|
|
return result.get('processed', 0)
|
|
except urllib2.HTTPError as e:
|
|
if e.code == 403:
|
|
_log('worker key rejected (403), need to re-register', 'warn')
|
|
raise NeedReregister()
|
|
_log('failed to report proxies: %s' % e, 'error')
|
|
return 0
|
|
except Exception as e:
|
|
_log('failed to report proxies: %s' % e, 'error')
|
|
return 0
|
|
|
|
|
|
def check_tor_connectivity(tor_hosts):
|
|
"""Test Tor connectivity. Returns (working_hosts, tor_ip)."""
|
|
import socket
|
|
import socks
|
|
|
|
working = []
|
|
tor_ip = None
|
|
|
|
for tor_host in tor_hosts:
|
|
host, port = tor_host.split(':')
|
|
port = int(port)
|
|
try:
|
|
test_sock = socks.socksocket()
|
|
test_sock.set_proxy(socks.SOCKS5, host, port)
|
|
test_sock.settimeout(15)
|
|
test_sock.connect(('check.torproject.org', 80))
|
|
test_sock.send(b'GET /api/ip HTTP/1.0\r\nHost: check.torproject.org\r\n\r\n')
|
|
resp = test_sock.recv(1024)
|
|
test_sock.close()
|
|
|
|
if resp and b'HTTP/' in resp:
|
|
working.append(tor_host)
|
|
# Extract IP from JSON response body
|
|
if b'\r\n\r\n' in resp:
|
|
body = resp.split(b'\r\n\r\n', 1)[1]
|
|
try:
|
|
data = json.loads(body)
|
|
tor_ip = data.get('IP')
|
|
except Exception:
|
|
pass
|
|
except Exception:
|
|
pass
|
|
|
|
return working, tor_ip
|
|
|
|
|
|
def worker_main(config):
|
|
"""Worker mode -- URL-driven discovery.
|
|
|
|
Claims URLs from master, fetches through Tor, extracts and tests proxies,
|
|
reports working proxies back to master.
|
|
"""
|
|
import json
|
|
global urllib2
|
|
|
|
try:
|
|
import Queue
|
|
except ImportError:
|
|
import queue as Queue
|
|
|
|
import proxywatchd
|
|
proxywatchd.set_config(config)
|
|
|
|
server_url = config.args.server
|
|
if not server_url:
|
|
_log('--server URL required for worker mode', 'error')
|
|
sys.exit(1)
|
|
|
|
worker_key = config.args.worker_key
|
|
worker_name = config.args.worker_name or os.uname()[1]
|
|
num_threads = config.watchd.threads
|
|
url_batch_size = config.worker.url_batch_size
|
|
worker_id = None
|
|
|
|
# Register if --register flag or no key provided
|
|
if config.args.register or not worker_key:
|
|
_log('registering with master: %s' % server_url, 'info')
|
|
worker_id, worker_key = worker_register(server_url, worker_name)
|
|
if not worker_key:
|
|
_log('registration failed, exiting', 'error')
|
|
sys.exit(1)
|
|
_log('registered as %s (id: %s)' % (worker_name, worker_id), 'info')
|
|
_log('worker key: %s' % worker_key, 'info')
|
|
_log('save this key with --worker-key for future runs', 'info')
|
|
|
|
if config.args.register:
|
|
return
|
|
|
|
_log('starting worker mode (URL-driven)', 'info')
|
|
_log(' server: %s' % server_url, 'info')
|
|
_log(' threads: %d' % num_threads, 'info')
|
|
_log(' url batch: %d' % url_batch_size, 'info')
|
|
_log(' cache ttl: %s' % ('%ds' % config.worker.cache_ttl if config.worker.cache_ttl > 0 else 'disabled'), 'info')
|
|
_log(' tor hosts: %s' % config.common.tor_hosts, 'info')
|
|
|
|
# Verify Tor connectivity before starting
|
|
import socks
|
|
working_tor_hosts = []
|
|
for tor_host in config.torhosts:
|
|
host, port = tor_host.split(':')
|
|
port = int(port)
|
|
try:
|
|
test_sock = socks.socksocket()
|
|
test_sock.set_proxy(socks.SOCKS5, host, port)
|
|
test_sock.settimeout(10)
|
|
test_sock.connect(('check.torproject.org', 80))
|
|
test_sock.send(b'GET / HTTP/1.0\r\nHost: check.torproject.org\r\n\r\n')
|
|
resp = test_sock.recv(512)
|
|
test_sock.close()
|
|
if resp and (b'HTTP/' in resp or len(resp) > 0):
|
|
status = resp.split(b'\r\n')[0] if b'\r\n' in resp else resp[:50]
|
|
_log('tor host %s:%d OK (%s)' % (host, port, status), 'info')
|
|
working_tor_hosts.append(tor_host)
|
|
else:
|
|
_log('tor host %s:%d no response' % (host, port), 'warn')
|
|
except Exception as e:
|
|
_log('tor host %s:%d failed: %s' % (host, port, e), 'warn')
|
|
|
|
if not working_tor_hosts:
|
|
_log('no working Tor hosts, cannot start worker', 'error')
|
|
sys.exit(1)
|
|
|
|
_log('%d/%d Tor hosts verified' % (len(working_tor_hosts), len(config.torhosts)), 'info')
|
|
|
|
# Create shared queues for worker threads
|
|
job_queue = proxywatchd.PriorityJobQueue()
|
|
completion_queue = Queue.Queue()
|
|
|
|
# Spawn worker threads
|
|
threads = []
|
|
for i in range(num_threads):
|
|
wt = proxywatchd.WorkerThread('w%d' % i, job_queue)
|
|
wt.start_thread()
|
|
threads.append(wt)
|
|
time.sleep(random.random() / 10)
|
|
|
|
_log('spawned %d worker threads' % len(threads), 'info')
|
|
|
|
# Session for fetching URLs through Tor
|
|
session = fetch.FetchSession()
|
|
|
|
cycles = 0
|
|
urls_fetched = 0
|
|
proxies_found = 0
|
|
proxies_working = 0
|
|
start_time = time.time()
|
|
current_tor_ip = None
|
|
consecutive_tor_failures = 0
|
|
worker_profiling = config.args.profile or config.common.profiling
|
|
wstate = {'worker_key': worker_key, 'worker_id': worker_id, 'backoff': 10}
|
|
|
|
# Local proxy test cache: addr -> (timestamp, success, result_dict_or_None)
|
|
cache_ttl = config.worker.cache_ttl
|
|
proxy_cache = {} if cache_ttl > 0 else None
|
|
|
|
def do_register():
|
|
"""Register with master, with exponential backoff on failure."""
|
|
while True:
|
|
_log('registering with master: %s' % server_url, 'info')
|
|
new_id, new_key = worker_register(server_url, worker_name)
|
|
if new_key:
|
|
wstate['worker_id'] = new_id
|
|
wstate['worker_key'] = new_key
|
|
wstate['backoff'] = 10
|
|
_log('registered as %s (id: %s)' % (worker_name, new_id), 'info')
|
|
return True
|
|
else:
|
|
_log('registration failed, retrying in %ds' % wstate['backoff'], 'warn')
|
|
time.sleep(wstate['backoff'])
|
|
wstate['backoff'] = min(wstate['backoff'] * 2, 300)
|
|
|
|
def wait_for_tor():
|
|
"""Wait for Tor to become available, checking every 30 seconds."""
|
|
check_interval = 30
|
|
while True:
|
|
working, tor_ip = check_tor_connectivity(config.torhosts)
|
|
if working:
|
|
_log('tor recovered: %s (%s)' % (working[0], tor_ip or 'unknown'), 'info')
|
|
try:
|
|
worker_send_heartbeat(server_url, wstate['worker_key'], True, tor_ip, worker_profiling, num_threads)
|
|
except NeedReregister:
|
|
do_register()
|
|
return working, tor_ip
|
|
_log('tor still down, retrying in %ds' % check_interval, 'warn')
|
|
try:
|
|
worker_send_heartbeat(server_url, wstate['worker_key'], False, None, worker_profiling, num_threads)
|
|
except NeedReregister:
|
|
do_register()
|
|
time.sleep(check_interval)
|
|
|
|
try:
|
|
while True:
|
|
# Tor connectivity check
|
|
working, tor_ip = check_tor_connectivity(config.torhosts)
|
|
if not working:
|
|
consecutive_tor_failures += 1
|
|
_log('tor down before claiming URLs (consecutive: %d)' % consecutive_tor_failures, 'warn')
|
|
try:
|
|
worker_send_heartbeat(server_url, wstate['worker_key'], False, None, worker_profiling, num_threads)
|
|
except NeedReregister:
|
|
do_register()
|
|
if consecutive_tor_failures >= 2:
|
|
_log('tor appears down, waiting before claiming URLs', 'error')
|
|
working, current_tor_ip = wait_for_tor()
|
|
consecutive_tor_failures = 0
|
|
else:
|
|
time.sleep(10)
|
|
continue
|
|
else:
|
|
consecutive_tor_failures = 0
|
|
if tor_ip != current_tor_ip:
|
|
if current_tor_ip:
|
|
_log('tor circuit rotated: %s' % tor_ip, 'info')
|
|
current_tor_ip = tor_ip
|
|
try:
|
|
worker_send_heartbeat(server_url, wstate['worker_key'], True, tor_ip, worker_profiling, num_threads)
|
|
except NeedReregister:
|
|
do_register()
|
|
|
|
# Claim URLs from master
|
|
try:
|
|
url_infos = worker_claim_urls(server_url, wstate['worker_key'], url_batch_size)
|
|
except NeedReregister:
|
|
do_register()
|
|
continue
|
|
|
|
if not url_infos:
|
|
_log('no URLs available, sleeping 30s', 'info')
|
|
time.sleep(30)
|
|
continue
|
|
|
|
_log('claimed %d URLs to process' % len(url_infos), 'info')
|
|
|
|
# Phase 1: Fetch URLs and extract proxies
|
|
url_reports = []
|
|
all_extracted = [] # list of (addr, proto, confidence, source_url)
|
|
|
|
for url_info in url_infos:
|
|
url = url_info.get('url', '')
|
|
last_hash = url_info.get('last_hash')
|
|
proto_hint = url_info.get('proto_hint')
|
|
|
|
fetch_start = time.time()
|
|
try:
|
|
content = session.fetch(url)
|
|
except Exception as e:
|
|
_log('%s: fetch error: %s' % (url.split('/')[2] if '/' in url else url, e), 'error')
|
|
content = None
|
|
|
|
fetch_time_ms = int((time.time() - fetch_start) * 1000)
|
|
urls_fetched += 1
|
|
|
|
if not content:
|
|
url_reports.append({
|
|
'url': url,
|
|
'success': False,
|
|
'content_hash': None,
|
|
'proxy_count': 0,
|
|
'fetch_time_ms': fetch_time_ms,
|
|
'changed': False,
|
|
'error': 'fetch failed',
|
|
})
|
|
continue
|
|
|
|
# Detect protocol from URL path
|
|
proto = fetch.detect_proto_from_path(url) or proto_hint
|
|
|
|
# Extract proxies (no filter_known -- workers have no proxydb)
|
|
extracted = fetch.extract_proxies(content, filter_known=False, proto=proto)
|
|
|
|
# Compute hash of extracted proxy list
|
|
content_hash = dbs.compute_proxy_list_hash(extracted)
|
|
|
|
if content_hash and last_hash and content_hash == last_hash:
|
|
# Content unchanged
|
|
url_reports.append({
|
|
'url': url,
|
|
'success': True,
|
|
'content_hash': content_hash,
|
|
'proxy_count': len(extracted),
|
|
'fetch_time_ms': fetch_time_ms,
|
|
'changed': False,
|
|
'error': None,
|
|
})
|
|
host = url.split('/')[2] if '/' in url else url
|
|
_log('%s: unchanged (%d proxies, hash match)' % (host, len(extracted)), 'stale')
|
|
continue
|
|
|
|
# Content changed or first fetch
|
|
for addr, pr, conf in extracted:
|
|
all_extracted.append((addr, pr, conf, url))
|
|
|
|
url_reports.append({
|
|
'url': url,
|
|
'success': True,
|
|
'content_hash': content_hash,
|
|
'proxy_count': len(extracted),
|
|
'fetch_time_ms': fetch_time_ms,
|
|
'changed': True,
|
|
'error': None,
|
|
})
|
|
|
|
host = url.split('/')[2] if '/' in url else url
|
|
_log('%s: %d proxies extracted' % (host, len(extracted)), 'info')
|
|
|
|
# Report URL health to master
|
|
if url_reports:
|
|
try:
|
|
worker_report_urls(server_url, wstate['worker_key'], url_reports)
|
|
except NeedReregister:
|
|
do_register()
|
|
try:
|
|
worker_report_urls(server_url, wstate['worker_key'], url_reports)
|
|
except NeedReregister:
|
|
_log('still rejected after re-register, discarding url reports', 'error')
|
|
|
|
# Deduplicate extracted proxies by address
|
|
seen = set()
|
|
unique_proxies = []
|
|
source_map = {} # addr -> first source_url
|
|
for addr, pr, conf, source_url in all_extracted:
|
|
if addr not in seen:
|
|
seen.add(addr)
|
|
unique_proxies.append((addr, pr, conf))
|
|
source_map[addr] = source_url
|
|
|
|
proxies_found += len(unique_proxies)
|
|
|
|
if not unique_proxies:
|
|
cycles += 1
|
|
time.sleep(1)
|
|
continue
|
|
|
|
# Filter against local test cache
|
|
cached_working = []
|
|
if proxy_cache is not None:
|
|
now = time.time()
|
|
uncached = []
|
|
cache_hits = 0
|
|
for addr, pr, conf in unique_proxies:
|
|
# Normalize to ip:port for cache lookup (strip auth prefix)
|
|
cache_key = addr.split('@')[-1] if '@' in addr else addr
|
|
entry = proxy_cache.get(cache_key)
|
|
if entry and (now - entry[0]) < cache_ttl:
|
|
cache_hits += 1
|
|
if entry[1]: # cached success
|
|
cached_working.append(entry[2])
|
|
else:
|
|
uncached.append((addr, pr, conf))
|
|
if cache_hits:
|
|
_log('%d cached (%d working), %d to test' % (
|
|
cache_hits, len(cached_working), len(uncached)), 'info')
|
|
unique_proxies = uncached
|
|
|
|
if not unique_proxies:
|
|
# All proxies were cached, nothing to test
|
|
cycles += 1
|
|
time.sleep(1)
|
|
continue
|
|
|
|
_log('testing %d unique proxies' % len(unique_proxies), 'info')
|
|
|
|
# Phase 2: Test extracted proxies using worker thread pool
|
|
pending_states = {}
|
|
all_jobs = []
|
|
checktypes = config.watchd.checktypes
|
|
|
|
for addr, pr, conf in unique_proxies:
|
|
# Parse ip:port from addr (may contain auth: user:pass@ip:port)
|
|
addr_part = addr.split('@')[-1] if '@' in addr else addr
|
|
|
|
# Handle IPv6 [ipv6]:port
|
|
if addr_part.startswith('['):
|
|
bracket_end = addr_part.index(']')
|
|
ip = addr_part[1:bracket_end]
|
|
port = int(addr_part[bracket_end+2:])
|
|
else:
|
|
ip, port_str = addr_part.rsplit(':', 1)
|
|
port = int(port_str)
|
|
|
|
proto = pr or 'http'
|
|
proxy_str = '%s:%d' % (ip, port)
|
|
|
|
state = proxywatchd.ProxyTestState(
|
|
ip, port, proto, 0,
|
|
success_count=0, total_duration=0.0,
|
|
country=None, mitm=0, consecutive_success=0,
|
|
asn=None, oldies=False,
|
|
completion_queue=completion_queue,
|
|
proxy_full=addr, source_proto=pr
|
|
)
|
|
pending_states[proxy_str] = state
|
|
|
|
checktype = random.choice(checktypes)
|
|
|
|
if checktype == 'judges':
|
|
available = proxywatchd.judge_stats.get_available_judges(
|
|
list(proxywatchd.judges.keys()))
|
|
target = random.choice(available) if available else random.choice(
|
|
list(proxywatchd.judges.keys()))
|
|
elif checktype == 'ssl':
|
|
target = random.choice(proxywatchd.ssl_targets)
|
|
elif checktype == 'irc':
|
|
target = random.choice(config.servers) if config.servers else 'irc.libera.chat:6667'
|
|
else: # head
|
|
target = random.choice(list(proxywatchd.regexes.keys()))
|
|
|
|
job = proxywatchd.TargetTestJob(state, target, checktype)
|
|
all_jobs.append(job)
|
|
|
|
random.shuffle(all_jobs)
|
|
for job in all_jobs:
|
|
job_queue.put(job, priority=0)
|
|
|
|
# Wait for completion
|
|
completed = 0
|
|
timeout_start = time.time()
|
|
timeout_seconds = max(config.watchd.timeout * 2 + 30, len(all_jobs) * 0.5)
|
|
working_results = []
|
|
last_heartbeat = time.time()
|
|
last_report = time.time()
|
|
|
|
while completed < len(all_jobs):
|
|
try:
|
|
state = completion_queue.get(timeout=1)
|
|
completed += 1
|
|
|
|
success, _ = state.evaluate()
|
|
if success:
|
|
latency_sec = (state.last_latency_ms / 1000.0) if state.last_latency_ms else 0
|
|
proxy_addr = state.proxy
|
|
if state.auth:
|
|
proxy_addr = '%s@%s' % (state.auth, state.proxy)
|
|
|
|
working_results.append({
|
|
'ip': state.ip,
|
|
'port': state.port,
|
|
'proto': state.proto,
|
|
'source_proto': state.source_proto,
|
|
'latency': round(latency_sec, 3),
|
|
'exit_ip': state.exit_ip,
|
|
'source_url': source_map.get(proxy_addr) or source_map.get(state.proxy, ''),
|
|
'checktype': state.last_check or '',
|
|
'target': state.last_target or '',
|
|
})
|
|
|
|
if completed % 50 == 0 or completed == len(all_jobs):
|
|
_log('tested %d/%d proxies (%d working)' % (
|
|
completed, len(all_jobs), len(working_results)), 'info')
|
|
|
|
except Queue.Empty:
|
|
if time.time() - timeout_start > timeout_seconds:
|
|
_log('test timeout, %d/%d completed' % (completed, len(all_jobs)), 'warn')
|
|
break
|
|
|
|
# Periodic heartbeat to prevent stale detection
|
|
now = time.time()
|
|
if now - last_heartbeat >= 60:
|
|
try:
|
|
worker_send_heartbeat(server_url, wstate['worker_key'],
|
|
True, current_tor_ip, worker_profiling, num_threads)
|
|
except NeedReregister:
|
|
do_register()
|
|
last_heartbeat = now
|
|
|
|
# Periodic proxy report (flush working results every 5 minutes)
|
|
if working_results and now - last_report >= 300:
|
|
reported = False
|
|
try:
|
|
processed = worker_report_proxies(server_url, wstate['worker_key'],
|
|
working_results)
|
|
if processed > 0:
|
|
_log('interim report: %d proxies (%d submitted)' % (
|
|
len(working_results), processed), 'info')
|
|
reported = True
|
|
except NeedReregister:
|
|
do_register()
|
|
try:
|
|
processed = worker_report_proxies(server_url, wstate['worker_key'],
|
|
working_results)
|
|
if processed > 0:
|
|
reported = True
|
|
except NeedReregister:
|
|
pass
|
|
if reported:
|
|
working_results = []
|
|
last_report = now
|
|
|
|
continue
|
|
|
|
# Populate proxy test cache from results
|
|
if proxy_cache is not None:
|
|
now = time.time()
|
|
working_addrs = set()
|
|
for r in working_results:
|
|
addr = '%s:%d' % (r['ip'], r['port'])
|
|
proxy_cache[addr] = (now, True, r)
|
|
working_addrs.add(addr)
|
|
# Cache failures for tested proxies that didn't succeed
|
|
for proxy_str in pending_states:
|
|
if proxy_str not in working_addrs:
|
|
proxy_cache[proxy_str] = (now, False, None)
|
|
|
|
proxies_working += len(working_results)
|
|
|
|
# Report working proxies to master
|
|
if working_results:
|
|
try:
|
|
processed = worker_report_proxies(server_url, wstate['worker_key'], working_results)
|
|
except NeedReregister:
|
|
do_register()
|
|
try:
|
|
processed = worker_report_proxies(server_url, wstate['worker_key'], working_results)
|
|
except NeedReregister:
|
|
_log('still rejected after re-register, discarding proxy reports', 'error')
|
|
processed = 0
|
|
_log('reported %d working proxies (submitted %d)' % (len(working_results), processed), 'info')
|
|
|
|
cycles += 1
|
|
|
|
# Periodic cache cleanup: evict expired entries every 10 cycles
|
|
if proxy_cache is not None and cycles % 10 == 0:
|
|
now = time.time()
|
|
expired = [k for k, v in proxy_cache.items() if (now - v[0]) >= cache_ttl]
|
|
if expired:
|
|
for k in expired:
|
|
del proxy_cache[k]
|
|
_log('cache cleanup: evicted %d expired, %d remaining' % (len(expired), len(proxy_cache)), 'info')
|
|
|
|
time.sleep(1)
|
|
|
|
except KeyboardInterrupt:
|
|
elapsed = time.time() - start_time
|
|
_log('worker stopping...', 'info')
|
|
session.close()
|
|
for wt in threads:
|
|
wt.stop()
|
|
for wt in threads:
|
|
wt.term()
|
|
_log('worker stopped after %s' % format_duration(int(elapsed)), 'info')
|
|
_log(' cycles: %d' % cycles, 'info')
|
|
_log(' urls fetched: %d' % urls_fetched, 'info')
|
|
_log(' proxies found: %d' % proxies_found, 'info')
|
|
_log(' proxies working: %d' % proxies_working, 'info')
|
|
if proxy_cache is not None:
|
|
_log(' cache entries: %d' % len(proxy_cache), 'info')
|
|
|
|
|
|
def main():
|
|
"""Main entry point."""
|
|
global config
|
|
|
|
# Handle --reset flag before connecting to databases
|
|
if len(sys.argv) == 2 and sys.argv[1] == "--reset":
|
|
if reset_state():
|
|
_log('use without --reset to start fresh', 'reset')
|
|
sys.exit(0)
|
|
else:
|
|
sys.exit(1)
|
|
|
|
# Worker mode: URL-driven discovery
|
|
if config.args.worker or config.args.register:
|
|
worker_main(config)
|
|
return
|
|
|
|
proxydb = mysqlite.mysqlite(config.watchd.database, str)
|
|
dbs.create_table_if_not_exists(proxydb, 'proxylist')
|
|
fetch.init_known_proxies(proxydb)
|
|
|
|
with open('urignore.txt', 'r') as f:
|
|
urignore = [ i.strip() for i in f.read().split('\n') if i.strip() ]
|
|
|
|
urldb = mysqlite.mysqlite(config.ppf.database, str)
|
|
dbs.create_table_if_not_exists(urldb, 'uris')
|
|
dbs.seed_proxy_sources(urldb)
|
|
import_from_file('import.txt', urldb)
|
|
if len(sys.argv) == 3 and sys.argv[1] == "--file":
|
|
sys.exit(import_proxies_from_file(proxydb, sys.argv[2]))
|
|
|
|
# start proxy watcher (import here to avoid gevent dependency in worker mode)
|
|
import proxywatchd
|
|
httpd_server = None
|
|
if config.watchd.threads > 0:
|
|
watcherd = proxywatchd.Proxywatchd()
|
|
watcherd.start()
|
|
else:
|
|
watcherd = None
|
|
# Start httpd independently when watchd is disabled
|
|
if config.httpd.enabled:
|
|
from httpd import ProxyAPIServer, configure_url_scoring
|
|
import network_stats
|
|
configure_url_scoring(
|
|
config.ppf.checktime,
|
|
config.ppf.perfail_checktime,
|
|
config.ppf.max_fail,
|
|
config.ppf.list_max_age_days
|
|
)
|
|
|
|
def httpd_stats_provider():
|
|
"""Stats provider for httpd-only mode (scraping without testing)."""
|
|
stats = {
|
|
'network': network_stats.get_stats(),
|
|
}
|
|
# Add scraper stats if available
|
|
try:
|
|
import scraper
|
|
scraper_stats = scraper.get_scraper_stats()
|
|
if scraper_stats:
|
|
stats['scraper'] = scraper_stats
|
|
except Exception:
|
|
pass
|
|
# Add tor pool stats if available
|
|
try:
|
|
import connection_pool
|
|
pool = connection_pool.get_pool()
|
|
if pool:
|
|
stats['tor_pool'] = pool.get_stats()
|
|
except Exception:
|
|
pass
|
|
return stats
|
|
|
|
profiling = config.args.profile or config.common.profiling
|
|
httpd_server = ProxyAPIServer(
|
|
config.httpd.listenip,
|
|
config.httpd.port,
|
|
config.watchd.database,
|
|
stats_provider=httpd_stats_provider,
|
|
profiling=profiling,
|
|
url_database=config.ppf.database,
|
|
)
|
|
httpd_server.start()
|
|
|
|
# start scraper threads if enabled
|
|
scrapers = []
|
|
if config.scraper.enabled:
|
|
import scraper
|
|
for i in range(config.scraper.threads):
|
|
s = scraper.Scraper(config)
|
|
s.start()
|
|
scrapers.append(s)
|
|
_log('started %d scraper thread(s)' % len(scrapers), 'info')
|
|
|
|
# Filter out stale proxy list URLs:
|
|
# - URLs added more than list_max_age_days ago AND never produced proxies are skipped
|
|
# - URLs that have produced proxies before are always included (regardless of age)
|
|
qurl = '''SELECT url,stale_count,error,retrievals,proxies_added,content_type,content_hash
|
|
FROM uris
|
|
WHERE error < ?
|
|
AND (check_time+?+((error+stale_count)*?) < ?)
|
|
AND (added > ? OR proxies_added > 0)
|
|
ORDER BY RANDOM()'''
|
|
# Query to count skipped old URLs (for logging)
|
|
qurl_skipped = '''SELECT COUNT(*) FROM uris
|
|
WHERE error < ?
|
|
AND (check_time+?+((error+stale_count)*?) < ?)
|
|
AND added <= ?
|
|
AND proxies_added = 0'''
|
|
threads = []
|
|
rows = []
|
|
reqtime = time.time() - 3600
|
|
statusmsg = time.time()
|
|
list_max_age_seconds = config.ppf.list_max_age_days * 86400
|
|
last_skip_log = 0
|
|
last_reseed = time.time()
|
|
reseed_interval = 6 * 3600 # re-seed sources every 6 hours
|
|
while True:
|
|
try:
|
|
# Periodic re-seeding: reset errored-out seed sources
|
|
if time.time() - last_reseed >= reseed_interval:
|
|
dbs.seed_proxy_sources(urldb, reset_errors=True)
|
|
last_reseed = time.time()
|
|
|
|
# When ppf threads = 0, skip URL fetching (workers handle it via /api/claim-urls)
|
|
if config.ppf.threads == 0:
|
|
time.sleep(60)
|
|
continue
|
|
|
|
time.sleep(random.random()/10)
|
|
if (time.time() - statusmsg) > 180:
|
|
_log('running %d thread(s) over %d' % (len(threads), config.ppf.threads), 'ppf')
|
|
statusmsg = time.time()
|
|
if not rows:
|
|
if (time.time() - reqtime) > 3:
|
|
now = int(time.time())
|
|
min_added = now - list_max_age_seconds
|
|
rows = urldb.execute(qurl, (config.ppf.max_fail, config.ppf.checktime, config.ppf.perfail_checktime, now, min_added)).fetchall()
|
|
reqtime = time.time()
|
|
# Log skipped old URLs periodically (every 10 minutes)
|
|
if (time.time() - last_skip_log) > 600:
|
|
skipped = urldb.execute(qurl_skipped, (config.ppf.max_fail, config.ppf.checktime, config.ppf.perfail_checktime, now, min_added)).fetchone()
|
|
if skipped and skipped[0] > 0:
|
|
_log('skipping %d old proxy lists (added >%d days ago, no proxies found)' % (skipped[0], config.ppf.list_max_age_days), 'stale')
|
|
last_skip_log = time.time()
|
|
if len(rows) < config.ppf.threads:
|
|
time.sleep(60)
|
|
rows = []
|
|
else:
|
|
_log('handing %d job(s) to %d thread(s)' % ( len(rows), config.ppf.threads ), 'ppf')
|
|
|
|
for thread in threads:
|
|
if thread.status == 'ok':
|
|
url, proxylist, stale_count, error, retrievals, content_type, proxies_added, execute = thread.retrieve()
|
|
# proxylist is list of (address, proto, confidence) tuples
|
|
new = [(addr, pr, conf) for addr, pr, conf in proxylist if not fetch.is_known_proxy(addr)]
|
|
if new:
|
|
fetch.add_known_proxies([addr for addr, pr, conf in new])
|
|
# Update content_hash if we have a new one
|
|
new_hash = thread.new_hash
|
|
execute = (error, stale_count, int(time.time()), retrievals, proxies_added+len(new), content_type, new_hash, url)
|
|
urldb.execute('UPDATE uris SET error=?,stale_count=?,check_time=?,retrievals=?,proxies_added=?,content_type=?,content_hash=? where url=?', execute)
|
|
urldb.commit()
|
|
if new: dbs.insert_proxies(proxydb, new, url)
|
|
|
|
threads = [ thread for thread in threads if thread.is_alive() ]
|
|
if len(threads) < config.ppf.threads and rows:
|
|
# Only query proxydb when actually starting a new thread (reduces GIL blocking)
|
|
_proxylist = [ '%s://%s' % (p[0], p[1]) for p in proxydb.execute("SELECT proto,proxy from proxylist where failed=0 AND tested IS NOT NULL AND proto IN ('http','socks4','socks5')").fetchall() ]
|
|
if not _proxylist: _proxylist = None
|
|
p = random.sample(_proxylist, min(5, len(_proxylist))) if _proxylist else None
|
|
row = random.choice(rows)
|
|
urldb.execute('UPDATE uris SET check_time=? where url=?', (time.time(), row[0]))
|
|
urldb.commit()
|
|
rows.remove(row)
|
|
# row: url, stale_count, error, retrievals, proxies_added, content_type, content_hash
|
|
t = Leechered(row[0], row[1], row[2], row[3], row[4], row[5], row[6], p)
|
|
threads.append(t)
|
|
t.start()
|
|
|
|
except KeyboardInterrupt:
|
|
for s in scrapers:
|
|
s.stop()
|
|
if watcherd:
|
|
watcherd.stop()
|
|
watcherd.finish()
|
|
if httpd_server:
|
|
httpd_server.stop()
|
|
break
|
|
|
|
_log('ppf stopped', 'info')
|
|
|
|
|
|
if __name__ == '__main__':
|
|
config.load()
|
|
errors = config.validate()
|
|
if errors:
|
|
for e in errors:
|
|
_log(e, 'error')
|
|
sys.exit(1)
|
|
fetch.set_config(config)
|
|
|
|
# handle flags
|
|
if config.args.nobs:
|
|
set_nobs(True)
|
|
|
|
if config.args.profile or config.common.profiling:
|
|
_log('profiling enabled, output to data/profile.stats', 'info')
|
|
_profiler = cProfile.Profile()
|
|
try:
|
|
_profiler.enable()
|
|
main()
|
|
finally:
|
|
_profiler.disable()
|
|
_profiler.dump_stats('data/profile.stats')
|
|
_log('profile stats written to data/profile.stats', 'info')
|
|
stats = pstats.Stats('data/profile.stats')
|
|
stats.strip_dirs().sort_stats('cumulative').print_stats(20)
|
|
else:
|
|
main()
|