diff --git a/proxywatchd.py b/proxywatchd.py index bcaeb23..0755887 100644 --- a/proxywatchd.py +++ b/proxywatchd.py @@ -96,6 +96,41 @@ def _sample_dbg(msg, proxy=None, force=False): prefix = '[SAMPLE %s] ' % proxy if proxy else '[SAMPLE] ' _log('%s%s' % (prefix, msg), 'diag') + +def _build_due_sql(): + """Build SQL WHERE clause for proxies due for testing. + + Returns (sql_condition, params) using new schedule formula: + - failed=0: tested + working_checktime < now + - failed>0 with backoff: tested + (failed * fail_retry_interval) < now + - failed>0 without backoff: tested + fail_retry_interval < now + """ + now = time.time() + if config.watchd.fail_retry_backoff: + # Linear backoff: multiply interval by failure count + sql = '''failed >= 0 AND failed < ? AND ( + tested IS NULL OR + CASE WHEN failed = 0 + THEN tested + ? < ? + ELSE tested + (failed * ?) < ? + END + )''' + params = (config.watchd.max_fail, config.watchd.working_checktime, now, + config.watchd.fail_retry_interval, now) + else: + # Fixed interval: same delay regardless of failure count + sql = '''failed >= 0 AND failed < ? AND ( + tested IS NULL OR + CASE WHEN failed = 0 + THEN tested + ? < ? + ELSE tested + ? < ? + END + )''' + params = (config.watchd.max_fail, config.watchd.working_checktime, now, + config.watchd.fail_retry_interval, now) + return sql, params + + # IP pattern for judge services (validates response contains valid IP in body) IP_PATTERN = r'\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}' @@ -588,7 +623,11 @@ class TargetTestJob(object): result = self._try_ssl_handshake(protos, pool) if result is not None: return result # SSL succeeded or MITM detected - # SSL failed for all protocols, continue to secondary check + # SSL failed for all protocols + if config.watchd.ssl_only: + # ssl_only mode: skip secondary check, mark as failed + _dbg('SSL failed, ssl_only mode, skipping secondary check', ps.proxy) + return (None, None, 0, None, None, 1, 0, 'ssl_only') _dbg('SSL failed, trying secondary check: %s' % self.checktype, ps.proxy) # Phase 2: Secondary check (configured checktype) @@ -852,6 +891,174 @@ class WorkerThread(): avg_t = try_div(duration_total, job_count) _log("terminated, %d jobs, avg.time %.2f" % (job_count, avg_t), self.id) + +class VerificationThread(threading.Thread): + """Manager thread that verifies disputed/suspicious proxy results. + + Pulls from verification_queue, tests proxies directly via Tor, + records authoritative results, and updates worker trust scores. + """ + + def __init__(self, database, interval=30, batch_size=10): + threading.Thread.__init__(self) + self.database = database + self.interval = interval + self.batch_size = batch_size + self.daemon = True + self._stop = threading.Event() + self.verified_count = 0 + self.disagreements_resolved = 0 + + def stop(self): + self._stop.set() + + def run(self): + _log('verification thread started (interval=%ds, batch=%d)' % ( + self.interval, self.batch_size), 'verify') + + while not self._stop.is_set(): + try: + self._verification_cycle() + except Exception as e: + _log('verification cycle error: %s' % e, 'error') + + # Wait for next cycle + self._stop.wait(self.interval) + + _log('verification thread stopped (verified=%d, resolved=%d)' % ( + self.verified_count, self.disagreements_resolved), 'verify') + + def _verification_cycle(self): + """Process pending verifications.""" + db = mysqlite.mysqlite(self.database, str) + try: + pending = dbs.get_pending_verifications(db, self.batch_size) + if not pending: + return + + for item in pending: + if self._stop.is_set(): + break + + proxy = item['proxy'] + trigger = item['trigger'] + + # Test proxy directly + result = self._test_proxy(proxy) + + if result is None: + # Test failed/inconclusive, skip for now + continue + + # Record verification result + self._record_verification(db, item, result) + self.verified_count += 1 + + if trigger == 'disagreement': + self.disagreements_resolved += 1 + + db.commit() + + finally: + db.close() + + def _test_proxy(self, proxy): + """Test proxy directly using local Tor connection. + + Returns: + True if proxy works, False if fails, None if inconclusive + """ + parts = proxy.split(':') + if len(parts) != 2: + return None + + ip, port_str = parts + try: + port = int(port_str) + except ValueError: + return None + + # Try SSL handshake as primary test + pool = connection_pool.get_pool() + if not pool: + return None + + proto_map = { + 'http': rocksock.RS_PT_HTTP, + 'socks4': rocksock.RS_PT_SOCKS4, + 'socks5': rocksock.RS_PT_SOCKS5 + } + protos = ['http', 'socks5', 'socks4'] + + for proto in protos: + try: + sock = rocksock.Rocksock( + host='www.google.com', + port=443, + ssl=True, + timeout=config.common.timeout_connect, + proxies=[ + (tor_proxy_url(pool.get_host(), proto='socks5'), rocksock.RS_PT_SOCKS5), + (proxy, proto_map.get(proto, rocksock.RS_PT_HTTP)) + ] + ) + sock.connect() + sock.disconnect() + return True + except Exception: + continue + + return False + + def _record_verification(self, db, item, result): + """Record verification result and update worker trust.""" + proxy = item['proxy'] + trigger = item['trigger'] + worker_a = item.get('worker_a') + worker_b = item.get('worker_b') + result_a = item.get('result_a') + result_b = item.get('result_b') + + # Update worker trust based on who was correct + if trigger == 'disagreement' and worker_a and worker_b: + # Check which worker(s) agreed with verification + result_int = 1 if result else 0 + + if result_a == result_int: + dbs.update_worker_trust(db, worker_a, True) + else: + dbs.update_worker_trust(db, worker_a, False) + + if result_b == result_int: + dbs.update_worker_trust(db, worker_b, True) + else: + dbs.update_worker_trust(db, worker_b, False) + + elif worker_a: + # Single worker trigger (resurrection/sudden_death) + result_int = 1 if result else 0 + was_correct = (result_a == result_int) + dbs.update_worker_trust(db, worker_a, was_correct) + + # Update proxy status with authoritative result + if result: + db.execute(''' + UPDATE proxylist SET failed = 0, tested = ? + WHERE proxy = ? + ''', (int(time.time()), proxy)) + else: + db.execute(''' + UPDATE proxylist SET failed = failed + 1, tested = ? + WHERE proxy = ? + ''', (int(time.time()), proxy)) + + # Remove from verification queue + dbs.remove_from_verification_queue(db, proxy) + + _log('verified %s: %s (trigger=%s)' % ( + proxy, 'PASS' if result else 'FAIL', trigger), 'verify') + + class Proxywatchd(): def stop(self): @@ -867,6 +1074,8 @@ class Proxywatchd(): self.submit_collected() if self.httpd_server: self.httpd_server.stop() + if self.verification_thread: + self.verification_thread.stop() self.stopped.set() def finish(self): @@ -929,6 +1138,7 @@ class Proxywatchd(): self.stats = Stats() self.last_cleanup = time.time() self.httpd_server = None + self.verification_thread = None # Dynamic thread scaling min_t = config.watchd.min_threads if config.watchd.min_threads > 0 else max(5, config.watchd.threads // 4) @@ -990,9 +1200,10 @@ class Proxywatchd(): _log('checktype changed (%s -> %s), requires restart to take effect' % ( ','.join(old_checktypes), ','.join(config.watchd.checktypes)), 'warn') - _log('config reloaded: threads=%d timeout=%d checktime=%d max_fail=%d' % ( + _log('config reloaded: threads=%d timeout=%d working=%ds fail_interval=%ds backoff=%s' % ( config.watchd.threads, config.watchd.timeout, - config.watchd.checktime, config.watchd.max_fail), 'watchd') + config.watchd.working_checktime, config.watchd.fail_retry_interval, + config.watchd.fail_retry_backoff), 'watchd') return True def _spawn_thread(self): @@ -1036,21 +1247,35 @@ class Proxywatchd(): def fetch_rows(self, db): """Fetch proxy rows due for testing from database.""" self.isoldies = False - q = 'SELECT ip,port,proto,failed,success_count,total_duration,country,mitm,consecutive_success,asn,proxy FROM proxylist WHERE failed >= ? and failed < ? and (tested + ? + (failed * ?)) < ? ORDER BY RANDOM()' - now = time.time() - params = (0, config.watchd.max_fail, config.watchd.checktime, config.watchd.perfail_checktime, now) - _dbg('fetch_rows: params=(0, %d, %d, %d, %.0f)' % (config.watchd.max_fail, config.watchd.checktime, config.watchd.perfail_checktime, now)) - rows = db.execute(q, params).fetchall() + + # Build due condition using new schedule formula + due_sql, due_params = _build_due_sql() + q = '''SELECT ip,port,proto,failed,success_count,total_duration,country,mitm, + consecutive_success,asn,proxy FROM proxylist WHERE %s ORDER BY RANDOM()''' % due_sql + _dbg('fetch_rows: working=%d fail_interval=%d backoff=%s max_fail=%d' % ( + config.watchd.working_checktime, config.watchd.fail_retry_interval, + config.watchd.fail_retry_backoff, config.watchd.max_fail)) + rows = db.execute(q, due_params).fetchall() _dbg('fetch_rows: got %d rows, threads=%d' % (len(rows), config.watchd.threads)) - # check oldies ? + + # Check oldies? (dead proxies getting a second chance) if len(rows) < config.watchd.threads: _dbg('fetch_rows: rows < threads, clearing rows (oldies=%s)' % config.watchd.oldies) rows = [] if config.watchd.oldies: self.isoldies = True - ## disable tor safeguard for old proxies - if self.tor_safeguard: self.tor_safeguard = False - rows = db.execute(q, (config.watchd.max_fail, config.watchd.max_fail + round( config.watchd.max_fail/2), config.watchd.checktime, config.watchd.oldies_checktime, time.time())).fetchall() + # Disable tor safeguard for old proxies + if self.tor_safeguard: + self.tor_safeguard = False + # Oldies use simple checktime-based query (dead proxies) + now = time.time() + oldies_max = config.watchd.max_fail + round(config.watchd.max_fail / 2) + q_oldies = '''SELECT ip,port,proto,failed,success_count,total_duration,country, + mitm,consecutive_success,asn,proxy FROM proxylist + WHERE failed >= ? AND failed < ? AND (tested + ?) < ? + ORDER BY RANDOM()''' + rows = db.execute(q_oldies, (config.watchd.max_fail, oldies_max, + config.watchd.oldies_checktime, now)).fetchall() return rows def prepare_jobs(self): @@ -1431,18 +1656,16 @@ class Proxywatchd(): def _run(self): _log('starting...', 'watchd') - _log('config: db=%s checktype=%s threads=%d checktime=%d perfail=%d max_fail=%d' % ( + _log('config: db=%s checktype=%s threads=%d working=%ds fail_interval=%ds backoff=%s max_fail=%d' % ( config.watchd.database, ','.join(config.watchd.checktypes), config.watchd.threads, - config.watchd.checktime, config.watchd.perfail_checktime, config.watchd.max_fail), 'watchd') + config.watchd.working_checktime, config.watchd.fail_retry_interval, + config.watchd.fail_retry_backoff, config.watchd.max_fail), 'watchd') # Log database status at startup with self._db_context() as db: total = db.execute('SELECT COUNT(*) FROM proxylist').fetchone()[0] - now = time.time() - due = db.execute( - 'SELECT COUNT(*) FROM proxylist WHERE failed >= 0 and failed < ? and (tested + ? + (failed * ?)) < ?', - (config.watchd.max_fail, config.watchd.checktime, config.watchd.perfail_checktime, now) - ).fetchone()[0] + due_sql, due_params = _build_due_sql() + due = db.execute('SELECT COUNT(*) FROM proxylist WHERE ' + due_sql, due_params).fetchone()[0] # Create stats persistence tables dbs.create_table_if_not_exists(db, 'stats_history') @@ -1464,7 +1687,14 @@ class Proxywatchd(): # Start HTTP API server if enabled if config.httpd.enabled: - from httpd import ProxyAPIServer + from httpd import ProxyAPIServer, configure_schedule + # Pass schedule config to httpd module + configure_schedule( + config.watchd.working_checktime, + config.watchd.fail_retry_interval, + config.watchd.fail_retry_backoff, + config.watchd.max_fail + ) self.httpd_server = ProxyAPIServer( config.httpd.listenip, config.httpd.port, @@ -1473,6 +1703,16 @@ class Proxywatchd(): ) self.httpd_server.start() + # Start verification thread if enabled (manager-only, checks disputed results) + if config.verification.enabled and config.watchd.threads > 0: + self.verification_thread = VerificationThread( + database=config.watchd.database, + interval=config.verification.interval, + batch_size=config.verification.batch_size + ) + self.verification_thread.start() + _log('verification thread enabled', 'watchd') + # create worker threads with shared queues for i in range(config.watchd.threads): threadid = ''.join([random.choice(string.ascii_letters) for x in range(5)])