#!/usr/bin/env python2 __version__ = '2.0.0' import sys import os # Gevent monkey-patch MUST happen before any other imports # Both master (httpd) and worker modes use gevent for async I/O from gevent import monkey monkey.patch_all() import cProfile import pstats import signal import dbs import time import mysqlite from misc import _log from config import Config import fetch from soup_parser import set_nobs import threading import random import json # Global profiler for signal handler access _profiler = None # Handle SIGTERM gracefully (for container stop) def sigterm_handler(signum, frame): global _profiler if _profiler: _profiler.disable() _profiler.dump_stats('data/profile.stats') _log('profile stats written to data/profile.stats (SIGTERM)', 'info') raise KeyboardInterrupt signal.signal(signal.SIGTERM, sigterm_handler) config = Config() def reset_state(): """Clear all proxy and session state for a fresh start. Clears: - proxylist table (all proxies) - session_state table (test counters) - stats_history table (historical stats) - scraper_state.json (engine backoffs) - mitm_certs.json (MITM cert tracking) """ db_path = config.watchd.database data_dir = os.path.dirname(db_path) or 'data' # Clear database tables try: db = mysqlite.mysqlite(db_path, str) dbs.create_table_if_not_exists(db, 'proxylist') db.execute('DELETE FROM proxylist') db.commit() _log('cleared proxylist table', 'reset') # Clear session_state if exists try: db.execute('DELETE FROM session_state') db.commit() _log('cleared session_state table', 'reset') except Exception: pass # Clear stats_history if exists try: db.execute('DELETE FROM stats_history') db.commit() _log('cleared stats_history table', 'reset') except Exception: pass db.execute('VACUUM') db.commit() db.close() except Exception as e: _log('database reset failed: %s' % e, 'error') return False # Remove state files state_files = [ os.path.join(data_dir, 'scraper_state.json'), os.path.join(data_dir, 'mitm_certs.json'), ] for f in state_files: if os.path.exists(f): try: os.remove(f) _log('removed %s' % f, 'reset') except Exception as e: _log('failed to remove %s: %s' % (f, e), 'error') _log('state reset complete', 'reset') return True def format_duration(seconds): """Format seconds into compact human-readable duration.""" if seconds < 60: return '%ds' % seconds elif seconds < 3600: return '%dm' % (seconds // 60) elif seconds < 86400: h, m = divmod(seconds, 3600) m = m // 60 return '%dh %dm' % (h, m) if m else '%dh' % h else: d, rem = divmod(seconds, 86400) h = rem // 3600 return '%dd %dh' % (d, h) if h else '%dd' % d def import_from_file(fn, urldb): """Import URLs from a text file into the database.""" try: with open(fn, 'r') as f: urls = [url.strip() for url in f if url.strip()] except IOError: return # File not found, silently skip for i in range(0, len(urls), 200): chunk = urls[i:i+200] if chunk: dbs.insert_urls(chunk, 'import.txt', urldb) def get_content_type(url, proxy): hdr = fetch.fetch_contents(url, head=True, proxy=proxy) for h in hdr.split('\n'): if h.lower().startswith('content-type: '): return h.lower().split(':')[1].strip() return '' def is_good_content_type(string): allowed_ct = [ 'text/html', 'text/plain', 'atom+xml' ] for ct in allowed_ct: if ct.lower() in string.lower(): return True return False def import_proxies_from_file(proxydb, fn): content = open(fn, 'r').read() # Detect protocol from filename (e.g., socks5.txt, http-proxies.txt) proto = fetch.detect_proto_from_path(fn) unique_count, new = fetch.extract_proxies(content, proxydb, proto=proto) if new: dbs.insert_proxies(proxydb, new, fn) return 0 return 1 class Leechered(threading.Thread): def __init__(self, url, stale_count, error, retrievals, proxies_added, content_type, content_hash, proxy): self.status = 'nok' self.proxylist = [] self.running = True self.url = url self.stale_count = stale_count self.error = error self.retrievals = retrievals self.proxies_added = proxies_added self.content_type = content_type self.content_hash = content_hash self.new_hash = None self.hash_unchanged = False self.proxy = proxy self.execute = '' threading.Thread.__init__(self) def retrieve(self): return self.url, self.proxylist, self.stale_count, self.error, self.retrievals, self.content_type, self.proxies_added, self.execute def run(self): self.status = 'nok' try: if not self.content_type: self.content_type = get_content_type(self.url, self.proxy) if is_good_content_type(self.content_type): try: content = fetch.fetch_contents(self.url, proxy=self.proxy) except KeyboardInterrupt as e: raise e except Exception as e: try: err_msg = repr(e) if isinstance(err_msg, unicode): err_msg = err_msg.encode('ascii', 'backslashreplace') except: err_msg = type(e).__name__ _log('%s: fetch error: %s' % (self.url.split('/')[2], err_msg), 'error') content = '' else: content = '' # Detect protocol from source URL (e.g., .../socks5/list.txt) proto = fetch.detect_proto_from_path(self.url) unique = fetch.extract_proxies(content, filter_known=False, proto=proto) # Compute hash of all extracted proxies for change detection self.new_hash = dbs.compute_proxy_list_hash(unique) # Check if content unchanged (same proxies as last time) if self.new_hash and self.content_hash and self.new_hash == self.content_hash: self.hash_unchanged = True self.proxylist = [] self.stale_count += 1 next_check = config.ppf.checktime + (self.error + self.stale_count) * config.ppf.perfail_checktime _log('%s: unchanged (hash match), next in %s' % (self.url.split('/')[2], format_duration(next_check)), 'stale') # Content unchanged - increment stale_count, update check_time self.execute = (self.error, self.stale_count, int(time.time()), self.retrievals, self.proxies_added, self.content_type, self.url) self.status = 'ok' return # Content changed or first fetch - reset stale_count, proceed with normal processing self.stale_count = 0 # unique is list of (address, proto, confidence) tuples; filter by address, keep tuple self.proxylist = [(addr, pr, conf) for addr, pr, conf in unique if not fetch.is_known_proxy(addr)] proxy_count = len(self.proxylist) if self.retrievals == 0: # new site if content and not self.proxylist: # site works but has zero proxy addresses self.error += 1 self.stale_count += 1 elif proxy_count: self.error = 0 self.stale_count = 0 else: self.error += 2 self.stale_count += 2 else: # not a new site # proxylist is empty if not proxy_count: self.stale_count += 1 # proxylist is not empty: site is working else: self.stale_count = 0 self.error = 0 # site has no content if not content: self.error += 1 self.stale_count += 1 # site has proxies if proxy_count: self.error = 0 self.stale_count = 0 self.execute = (self.error, self.stale_count, int(time.time()), self.retrievals, self.proxies_added+len(self.proxylist), self.content_type, self.url) self.status = 'ok' except KeyboardInterrupt: raise except Exception as e: try: host = self.url.split('/')[2] if '/' in self.url else self.url err_msg = repr(e) if isinstance(err_msg, unicode): err_msg = err_msg.encode('ascii', 'backslashreplace') except: host = 'unknown' err_msg = type(e).__name__ _log('%s: thread error: %s' % (host, err_msg), 'error') # Set error state so site gets retried later self.error += 1 self.execute = (self.error, self.stale_count, int(time.time()), self.retrievals, self.proxies_added, self.content_type, self.url) self.status = 'nok' # Worker mode imports (lazy loaded) try: import urllib2 import socket except ImportError: urllib2 = None def worker_register(server_url, name, master_key=''): """Register with master server and get credentials.""" url = server_url.rstrip('/') + '/api/register' data = json.dumps({'name': name, 'master_key': master_key}) req = urllib2.Request(url, data) req.add_header('Content-Type', 'application/json') try: resp = urllib2.urlopen(req, timeout=30) result = json.loads(resp.read()) return result.get('worker_id'), result.get('worker_key') except Exception as e: _log('registration failed: %s' % e, 'error') return None, None class NeedReregister(Exception): """Raised when worker key is invalid and re-registration is needed.""" pass def worker_send_heartbeat(server_url, worker_key, tor_ok, tor_ip=None, profiling=False, threads=0): """Send heartbeat with Tor status to master.""" url = '%s/api/heartbeat?key=%s' % (server_url.rstrip('/'), worker_key) data = json.dumps({ 'tor_ok': tor_ok, 'tor_ip': tor_ip, 'profiling': profiling, 'threads': threads, }) req = urllib2.Request(url, data) req.add_header('Content-Type', 'application/json') try: resp = urllib2.urlopen(req, timeout=10) return True except urllib2.HTTPError as e: if e.code == 403: raise NeedReregister() return False except Exception: return False def worker_claim_urls(server_url, worker_key, count=5): """Claim batch of URLs for worker mode.""" url = '%s/api/claim-urls?key=%s&count=%d' % (server_url.rstrip('/'), worker_key, count) try: resp = urllib2.urlopen(url, timeout=30) result = json.loads(resp.read()) return result.get('urls', []) except urllib2.HTTPError as e: if e.code == 403: _log('worker key rejected (403), need to re-register', 'warn') raise NeedReregister() _log('failed to claim urls: %s' % e, 'error') return [] except Exception as e: _log('failed to claim urls: %s' % e, 'error') return [] def worker_report_urls(server_url, worker_key, reports): """Report URL fetch results to master.""" url = '%s/api/report-urls?key=%s' % (server_url.rstrip('/'), worker_key) data = json.dumps({'reports': reports}) req = urllib2.Request(url, data) req.add_header('Content-Type', 'application/json') try: resp = urllib2.urlopen(req, timeout=30) result = json.loads(resp.read()) return result.get('processed', 0) except urllib2.HTTPError as e: if e.code == 403: _log('worker key rejected (403), need to re-register', 'warn') raise NeedReregister() _log('failed to report urls: %s' % e, 'error') return 0 except Exception as e: _log('failed to report urls: %s' % e, 'error') return 0 def worker_report_proxies(server_url, worker_key, proxies): """Report working proxies to master.""" url = '%s/api/report-proxies?key=%s' % (server_url.rstrip('/'), worker_key) data = json.dumps({'proxies': proxies}) req = urllib2.Request(url, data) req.add_header('Content-Type', 'application/json') try: resp = urllib2.urlopen(req, timeout=30) result = json.loads(resp.read()) return result.get('processed', 0) except urllib2.HTTPError as e: if e.code == 403: _log('worker key rejected (403), need to re-register', 'warn') raise NeedReregister() _log('failed to report proxies: %s' % e, 'error') return 0 except Exception as e: _log('failed to report proxies: %s' % e, 'error') return 0 def check_tor_connectivity(tor_hosts): """Test Tor connectivity. Returns (working_hosts, tor_ip).""" import socket import socks working = [] tor_ip = None for tor_host in tor_hosts: host, port = tor_host.split(':') port = int(port) try: test_sock = socks.socksocket() test_sock.set_proxy(socks.SOCKS5, host, port) test_sock.settimeout(15) test_sock.connect(('check.torproject.org', 80)) test_sock.send(b'GET /api/ip HTTP/1.0\r\nHost: check.torproject.org\r\n\r\n') resp = test_sock.recv(1024) test_sock.close() if resp and b'HTTP/' in resp: working.append(tor_host) # Extract IP from JSON response body if b'\r\n\r\n' in resp: body = resp.split(b'\r\n\r\n', 1)[1] try: data = json.loads(body) tor_ip = data.get('IP') except Exception: pass except Exception: pass return working, tor_ip def worker_main(config): """Worker mode -- URL-driven discovery. Claims URLs from master, fetches through Tor, extracts and tests proxies, reports working proxies back to master. """ import json global urllib2 try: import Queue except ImportError: import queue as Queue import proxywatchd proxywatchd.set_config(config) server_url = config.args.server if not server_url: _log('--server URL required for worker mode', 'error') sys.exit(1) worker_key = config.args.worker_key worker_name = config.args.worker_name or os.uname()[1] num_threads = config.watchd.threads url_batch_size = config.worker.url_batch_size worker_id = None # Register if --register flag or no key provided if config.args.register or not worker_key: _log('registering with master: %s' % server_url, 'info') worker_id, worker_key = worker_register(server_url, worker_name) if not worker_key: _log('registration failed, exiting', 'error') sys.exit(1) _log('registered as %s (id: %s)' % (worker_name, worker_id), 'info') _log('worker key: %s' % worker_key, 'info') _log('save this key with --worker-key for future runs', 'info') if config.args.register: return _log('starting worker mode (URL-driven)', 'info') _log(' server: %s' % server_url, 'info') _log(' threads: %d' % num_threads, 'info') _log(' url batch: %d' % url_batch_size, 'info') _log(' tor hosts: %s' % config.common.tor_hosts, 'info') # Verify Tor connectivity before starting import socks working_tor_hosts = [] for tor_host in config.torhosts: host, port = tor_host.split(':') port = int(port) try: test_sock = socks.socksocket() test_sock.set_proxy(socks.SOCKS5, host, port) test_sock.settimeout(10) test_sock.connect(('check.torproject.org', 80)) test_sock.send(b'GET / HTTP/1.0\r\nHost: check.torproject.org\r\n\r\n') resp = test_sock.recv(512) test_sock.close() if resp and (b'HTTP/' in resp or len(resp) > 0): status = resp.split(b'\r\n')[0] if b'\r\n' in resp else resp[:50] _log('tor host %s:%d OK (%s)' % (host, port, status), 'info') working_tor_hosts.append(tor_host) else: _log('tor host %s:%d no response' % (host, port), 'warn') except Exception as e: _log('tor host %s:%d failed: %s' % (host, port, e), 'warn') if not working_tor_hosts: _log('no working Tor hosts, cannot start worker', 'error') sys.exit(1) _log('%d/%d Tor hosts verified' % (len(working_tor_hosts), len(config.torhosts)), 'info') # Create shared queues for worker threads job_queue = proxywatchd.PriorityJobQueue() completion_queue = Queue.Queue() # Spawn worker threads threads = [] for i in range(num_threads): wt = proxywatchd.WorkerThread('w%d' % i, job_queue) wt.start_thread() threads.append(wt) time.sleep(random.random() / 10) _log('spawned %d worker threads' % len(threads), 'info') # Session for fetching URLs through Tor session = fetch.FetchSession() cycles = 0 urls_fetched = 0 proxies_found = 0 proxies_working = 0 start_time = time.time() current_tor_ip = None consecutive_tor_failures = 0 worker_profiling = config.args.profile or config.common.profiling wstate = {'worker_key': worker_key, 'worker_id': worker_id, 'backoff': 10} def do_register(): """Register with master, with exponential backoff on failure.""" while True: _log('registering with master: %s' % server_url, 'info') new_id, new_key = worker_register(server_url, worker_name) if new_key: wstate['worker_id'] = new_id wstate['worker_key'] = new_key wstate['backoff'] = 10 _log('registered as %s (id: %s)' % (worker_name, new_id), 'info') return True else: _log('registration failed, retrying in %ds' % wstate['backoff'], 'warn') time.sleep(wstate['backoff']) wstate['backoff'] = min(wstate['backoff'] * 2, 300) def wait_for_tor(): """Wait for Tor to become available, checking every 30 seconds.""" check_interval = 30 while True: working, tor_ip = check_tor_connectivity(config.torhosts) if working: _log('tor recovered: %s (%s)' % (working[0], tor_ip or 'unknown'), 'info') try: worker_send_heartbeat(server_url, wstate['worker_key'], True, tor_ip, worker_profiling, num_threads) except NeedReregister: do_register() return working, tor_ip _log('tor still down, retrying in %ds' % check_interval, 'warn') try: worker_send_heartbeat(server_url, wstate['worker_key'], False, None, worker_profiling, num_threads) except NeedReregister: do_register() time.sleep(check_interval) try: while True: # Tor connectivity check working, tor_ip = check_tor_connectivity(config.torhosts) if not working: consecutive_tor_failures += 1 _log('tor down before claiming URLs (consecutive: %d)' % consecutive_tor_failures, 'warn') try: worker_send_heartbeat(server_url, wstate['worker_key'], False, None, worker_profiling, num_threads) except NeedReregister: do_register() if consecutive_tor_failures >= 2: _log('tor appears down, waiting before claiming URLs', 'error') working, current_tor_ip = wait_for_tor() consecutive_tor_failures = 0 else: time.sleep(10) continue else: consecutive_tor_failures = 0 if tor_ip != current_tor_ip: if current_tor_ip: _log('tor circuit rotated: %s' % tor_ip, 'info') current_tor_ip = tor_ip try: worker_send_heartbeat(server_url, wstate['worker_key'], True, tor_ip, worker_profiling, num_threads) except NeedReregister: do_register() # Claim URLs from master try: url_infos = worker_claim_urls(server_url, wstate['worker_key'], url_batch_size) except NeedReregister: do_register() continue if not url_infos: _log('no URLs available, sleeping 30s', 'info') time.sleep(30) continue _log('claimed %d URLs to process' % len(url_infos), 'info') # Phase 1: Fetch URLs and extract proxies url_reports = [] all_extracted = [] # list of (addr, proto, confidence, source_url) for url_info in url_infos: url = url_info.get('url', '') last_hash = url_info.get('last_hash') proto_hint = url_info.get('proto_hint') fetch_start = time.time() try: content = session.fetch(url) except Exception as e: _log('%s: fetch error: %s' % (url.split('/')[2] if '/' in url else url, e), 'error') content = None fetch_time_ms = int((time.time() - fetch_start) * 1000) urls_fetched += 1 if not content: url_reports.append({ 'url': url, 'success': False, 'content_hash': None, 'proxy_count': 0, 'fetch_time_ms': fetch_time_ms, 'changed': False, 'error': 'fetch failed', }) continue # Detect protocol from URL path proto = fetch.detect_proto_from_path(url) or proto_hint # Extract proxies (no filter_known -- workers have no proxydb) extracted = fetch.extract_proxies(content, filter_known=False, proto=proto) # Compute hash of extracted proxy list content_hash = dbs.compute_proxy_list_hash(extracted) if content_hash and last_hash and content_hash == last_hash: # Content unchanged url_reports.append({ 'url': url, 'success': True, 'content_hash': content_hash, 'proxy_count': len(extracted), 'fetch_time_ms': fetch_time_ms, 'changed': False, 'error': None, }) host = url.split('/')[2] if '/' in url else url _log('%s: unchanged (%d proxies, hash match)' % (host, len(extracted)), 'stale') continue # Content changed or first fetch for addr, pr, conf in extracted: all_extracted.append((addr, pr, conf, url)) url_reports.append({ 'url': url, 'success': True, 'content_hash': content_hash, 'proxy_count': len(extracted), 'fetch_time_ms': fetch_time_ms, 'changed': True, 'error': None, }) host = url.split('/')[2] if '/' in url else url _log('%s: %d proxies extracted' % (host, len(extracted)), 'info') # Report URL health to master if url_reports: try: worker_report_urls(server_url, wstate['worker_key'], url_reports) except NeedReregister: do_register() try: worker_report_urls(server_url, wstate['worker_key'], url_reports) except NeedReregister: _log('still rejected after re-register, discarding url reports', 'error') # Deduplicate extracted proxies by address seen = set() unique_proxies = [] source_map = {} # addr -> first source_url for addr, pr, conf, source_url in all_extracted: if addr not in seen: seen.add(addr) unique_proxies.append((addr, pr, conf)) source_map[addr] = source_url proxies_found += len(unique_proxies) if not unique_proxies: cycles += 1 time.sleep(1) continue _log('testing %d unique proxies' % len(unique_proxies), 'info') # Phase 2: Test extracted proxies using worker thread pool pending_states = {} all_jobs = [] checktypes = config.watchd.checktypes for addr, pr, conf in unique_proxies: # Parse ip:port from addr (may contain auth: user:pass@ip:port) addr_part = addr.split('@')[-1] if '@' in addr else addr # Handle IPv6 [ipv6]:port if addr_part.startswith('['): bracket_end = addr_part.index(']') ip = addr_part[1:bracket_end] port = int(addr_part[bracket_end+2:]) else: ip, port_str = addr_part.rsplit(':', 1) port = int(port_str) proto = pr or 'http' proxy_str = '%s:%d' % (ip, port) state = proxywatchd.ProxyTestState( ip, port, proto, 0, success_count=0, total_duration=0.0, country=None, mitm=0, consecutive_success=0, asn=None, oldies=False, completion_queue=completion_queue, proxy_full=addr, source_proto=pr ) pending_states[proxy_str] = state checktype = random.choice(checktypes) if checktype == 'judges': available = proxywatchd.judge_stats.get_available_judges( list(proxywatchd.judges.keys())) target = random.choice(available) if available else random.choice( list(proxywatchd.judges.keys())) elif checktype == 'ssl': target = random.choice(proxywatchd.ssl_targets) elif checktype == 'irc': target = random.choice(config.servers) if config.servers else 'irc.libera.chat:6667' else: # head target = random.choice(list(proxywatchd.regexes.keys())) job = proxywatchd.TargetTestJob(state, target, checktype) all_jobs.append(job) random.shuffle(all_jobs) for job in all_jobs: job_queue.put(job, priority=0) # Wait for completion completed = 0 timeout_start = time.time() timeout_seconds = max(config.watchd.timeout * 2 + 30, len(all_jobs) * 0.5) working_results = [] while completed < len(all_jobs): try: state = completion_queue.get(timeout=1) completed += 1 if state.failcount == 0: latency_sec = (state.last_latency_ms / 1000.0) if state.last_latency_ms else 0 proxy_addr = state.proxy if state.auth: proxy_addr = '%s@%s' % (state.auth, state.proxy) working_results.append({ 'ip': state.ip, 'port': state.port, 'proto': state.proto, 'source_proto': state.source_proto, 'latency': round(latency_sec, 3), 'exit_ip': state.exit_ip, 'source_url': source_map.get(proxy_addr) or source_map.get(state.proxy, ''), 'checktype': state.last_check or '', 'target': state.last_target or '', }) if completed % 50 == 0 or completed == len(all_jobs): _log('tested %d/%d proxies (%d working)' % ( completed, len(all_jobs), len(working_results)), 'info') except Queue.Empty: if time.time() - timeout_start > timeout_seconds: _log('test timeout, %d/%d completed' % (completed, len(all_jobs)), 'warn') break continue proxies_working += len(working_results) # Report working proxies to master if working_results: try: processed = worker_report_proxies(server_url, wstate['worker_key'], working_results) except NeedReregister: do_register() try: processed = worker_report_proxies(server_url, wstate['worker_key'], working_results) except NeedReregister: _log('still rejected after re-register, discarding proxy reports', 'error') processed = 0 _log('reported %d working proxies (submitted %d)' % (len(working_results), processed), 'info') cycles += 1 time.sleep(1) except KeyboardInterrupt: elapsed = time.time() - start_time _log('worker stopping...', 'info') session.close() for wt in threads: wt.stop() for wt in threads: wt.term() _log('worker stopped after %s' % format_duration(int(elapsed)), 'info') _log(' cycles: %d' % cycles, 'info') _log(' urls fetched: %d' % urls_fetched, 'info') _log(' proxies found: %d' % proxies_found, 'info') _log(' proxies working: %d' % proxies_working, 'info') def main(): """Main entry point.""" global config # Handle --reset flag before connecting to databases if len(sys.argv) == 2 and sys.argv[1] == "--reset": if reset_state(): _log('use without --reset to start fresh', 'reset') sys.exit(0) else: sys.exit(1) # Worker mode: URL-driven discovery if config.args.worker or config.args.register: worker_main(config) return proxydb = mysqlite.mysqlite(config.watchd.database, str) dbs.create_table_if_not_exists(proxydb, 'proxylist') fetch.init_known_proxies(proxydb) with open('urignore.txt', 'r') as f: urignore = [ i.strip() for i in f.read().split('\n') if i.strip() ] urldb = mysqlite.mysqlite(config.ppf.database, str) dbs.create_table_if_not_exists(urldb, 'uris') dbs.seed_proxy_sources(urldb) import_from_file('import.txt', urldb) if len(sys.argv) == 3 and sys.argv[1] == "--file": sys.exit(import_proxies_from_file(proxydb, sys.argv[2])) # start proxy watcher (import here to avoid gevent dependency in worker mode) import proxywatchd httpd_server = None if config.watchd.threads > 0: watcherd = proxywatchd.Proxywatchd() watcherd.start() else: watcherd = None # Start httpd independently when watchd is disabled if config.httpd.enabled: from httpd import ProxyAPIServer, configure_url_scoring import network_stats configure_url_scoring( config.ppf.checktime, config.ppf.perfail_checktime, config.ppf.max_fail, config.ppf.list_max_age_days ) def httpd_stats_provider(): """Stats provider for httpd-only mode (scraping without testing).""" stats = { 'network': network_stats.get_stats(), } # Add scraper stats if available try: import scraper scraper_stats = scraper.get_scraper_stats() if scraper_stats: stats['scraper'] = scraper_stats except Exception: pass # Add tor pool stats if available try: import connection_pool pool = connection_pool.get_pool() if pool: stats['tor_pool'] = pool.get_stats() except Exception: pass return stats profiling = config.args.profile or config.common.profiling httpd_server = ProxyAPIServer( config.httpd.listenip, config.httpd.port, config.watchd.database, stats_provider=httpd_stats_provider, profiling=profiling, url_database=config.ppf.database, ) httpd_server.start() # start scraper threads if enabled scrapers = [] if config.scraper.enabled: import scraper for i in range(config.scraper.threads): s = scraper.Scraper(config) s.start() scrapers.append(s) _log('started %d scraper thread(s)' % len(scrapers), 'info') # Filter out stale proxy list URLs: # - URLs added more than list_max_age_days ago AND never produced proxies are skipped # - URLs that have produced proxies before are always included (regardless of age) qurl = '''SELECT url,stale_count,error,retrievals,proxies_added,content_type,content_hash FROM uris WHERE error < ? AND (check_time+?+((error+stale_count)*?) < ?) AND (added > ? OR proxies_added > 0) ORDER BY RANDOM()''' # Query to count skipped old URLs (for logging) qurl_skipped = '''SELECT COUNT(*) FROM uris WHERE error < ? AND (check_time+?+((error+stale_count)*?) < ?) AND added <= ? AND proxies_added = 0''' threads = [] rows = [] reqtime = time.time() - 3600 statusmsg = time.time() list_max_age_seconds = config.ppf.list_max_age_days * 86400 last_skip_log = 0 while True: try: time.sleep(random.random()/10) if (time.time() - statusmsg) > 180: _log('running %d thread(s) over %d' % (len(threads), config.ppf.threads), 'ppf') statusmsg = time.time() if not rows: if (time.time() - reqtime) > 3: now = int(time.time()) min_added = now - list_max_age_seconds rows = urldb.execute(qurl, (config.ppf.max_fail, config.ppf.checktime, config.ppf.perfail_checktime, now, min_added)).fetchall() reqtime = time.time() # Log skipped old URLs periodically (every 10 minutes) if (time.time() - last_skip_log) > 600: skipped = urldb.execute(qurl_skipped, (config.ppf.max_fail, config.ppf.checktime, config.ppf.perfail_checktime, now, min_added)).fetchone() if skipped and skipped[0] > 0: _log('skipping %d old proxy lists (added >%d days ago, no proxies found)' % (skipped[0], config.ppf.list_max_age_days), 'stale') last_skip_log = time.time() if len(rows) < config.ppf.threads: time.sleep(60) rows = [] else: _log('handing %d job(s) to %d thread(s)' % ( len(rows), config.ppf.threads ), 'ppf') for thread in threads: if thread.status == 'ok': url, proxylist, stale_count, error, retrievals, content_type, proxies_added, execute = thread.retrieve() # proxylist is list of (address, proto, confidence) tuples new = [(addr, pr, conf) for addr, pr, conf in proxylist if not fetch.is_known_proxy(addr)] if new: fetch.add_known_proxies([addr for addr, pr, conf in new]) # Update content_hash if we have a new one new_hash = thread.new_hash execute = (error, stale_count, int(time.time()), retrievals, proxies_added+len(new), content_type, new_hash, url) urldb.execute('UPDATE uris SET error=?,stale_count=?,check_time=?,retrievals=?,proxies_added=?,content_type=?,content_hash=? where url=?', execute) urldb.commit() if new: dbs.insert_proxies(proxydb, new, url) threads = [ thread for thread in threads if thread.is_alive() ] if len(threads) < config.ppf.threads and rows: # Only query proxydb when actually starting a new thread (reduces GIL blocking) _proxylist = [ '%s://%s' % (p[0], p[1]) for p in proxydb.execute("SELECT proto,proxy from proxylist where failed=0 AND tested IS NOT NULL AND proto IN ('http','socks4','socks5')").fetchall() ] if not _proxylist: _proxylist = None p = random.sample(_proxylist, min(5, len(_proxylist))) if _proxylist else None row = random.choice(rows) urldb.execute('UPDATE uris SET check_time=? where url=?', (time.time(), row[0])) urldb.commit() rows.remove(row) # row: url, stale_count, error, retrievals, proxies_added, content_type, content_hash t = Leechered(row[0], row[1], row[2], row[3], row[4], row[5], row[6], p) threads.append(t) t.start() except KeyboardInterrupt: for s in scrapers: s.stop() if watcherd: watcherd.stop() watcherd.finish() if httpd_server: httpd_server.stop() break _log('ppf stopped', 'info') if __name__ == '__main__': config.load() errors = config.validate() if errors: for e in errors: _log(e, 'error') sys.exit(1) fetch.set_config(config) # handle flags if config.args.nobs: set_nobs(True) if config.args.profile or config.common.profiling: _log('profiling enabled, output to data/profile.stats', 'info') _profiler = cProfile.Profile() try: _profiler.enable() main() finally: _profiler.disable() _profiler.dump_stats('data/profile.stats') _log('profile stats written to data/profile.stats', 'info') stats = pstats.Stats('data/profile.stats') stats.strip_dirs().sort_stats('cumulative').print_stats(20) else: main()