diff --git a/ppf.py b/ppf.py index 83d2d9f..141e367 100644 --- a/ppf.py +++ b/ppf.py @@ -1,5 +1,7 @@ #!/usr/bin/env python2 +__version__ = '2.0.0' + import sys import os @@ -481,13 +483,12 @@ def worker_main(config): # Create shared queues for worker threads job_queue = proxywatchd.PriorityJobQueue() - result_queue = Queue.Queue() completion_queue = Queue.Queue() # Spawn worker threads with stagger to avoid overwhelming Tor threads = [] for i in range(num_threads): - wt = proxywatchd.WorkerThread('w%d' % i, job_queue, result_queue) + wt = proxywatchd.WorkerThread('w%d' % i, job_queue) wt.start_thread() threads.append(wt) time.sleep(random.random() / 10) # 0-100ms stagger per thread diff --git a/proxywatchd.py b/proxywatchd.py index 79a4ccd..f0b0a06 100644 --- a/proxywatchd.py +++ b/proxywatchd.py @@ -17,6 +17,7 @@ import network_stats import os import ssl +import contextlib try: import Queue except ImportError: @@ -1133,13 +1134,12 @@ def socks4_resolve(srvname, server_port): class ProxyTestState(): - """Thread-safe state for a proxy being tested against multiple targets. + """Thread-safe state for a proxy being tested. - Results from TargetTestJob instances are aggregated here. - When all tests complete, evaluate() determines final pass/fail. + Holds test results and evaluates final pass/fail status. """ def __init__(self, ip, port, proto, failcount, success_count, total_duration, - country, mitm, consecutive_success, asn=None, num_targets=3, oldies=False, + country, mitm, consecutive_success, asn=None, oldies=False, completion_queue=None, proxy_full=None): self.ip = ip self.port = int(port) @@ -1160,7 +1160,6 @@ class ProxyTestState(): self.consecutive_success = consecutive_success self.asn = asn self.isoldies = oldies - self.num_targets = num_targets self.completion_queue = completion_queue # for signaling completion # thread-safe result accumulation @@ -1205,8 +1204,8 @@ class ProxyTestState(): # Track cert errors if category in ('cert_error', 'ssl_error', 'ssl_mitm'): self.cert_error = True - # Check completion (inside lock to prevent race) - if not self.completed and len(self.results) >= self.num_targets: + # Check completion (single-target mode) + if not self.completed and len(self.results) >= 1: self.completed = True should_signal = True # Signal outside lock to avoid deadlock @@ -1220,7 +1219,7 @@ class ProxyTestState(): return True # Slow path: check with lock (only during transition) with self.lock: - return len(self.results) >= self.num_targets + return len(self.results) >= 1 @staticmethod def rwip(ip): @@ -1313,10 +1312,9 @@ class ProxyTestState(): anon_status = ' anon: anonymous;' else: anon_status = ' anon: anonymous;' # default if no header check - _log('%s://%s:%d (%s) d: %.2f sec(s);%s%s srv: %s; ssl: %s; %d/%d targets' % ( + _log('%s://%s:%d (%s) d: %.2f sec(s);%s%s srv: %s; ssl: %s' % ( last_good['proto'], self.ip, self.port, self.country, - last_good['duration'], torstats, anon_status, last_good['srv'], str(last_good['ssl']), - num_success, self.num_targets), 'info') + last_good['duration'], torstats, anon_status, last_good['srv'], str(last_good['ssl'])), 'info') _dbg('PASS: failcount=0', self.proxy) return (True, None) @@ -1650,12 +1648,11 @@ class TargetTestJob(): class WorkerThread(): - def __init__(self, id, job_queue, result_queue): + def __init__(self, id, job_queue): self.id = id self.done = threading.Event() self.thread = None self.job_queue = job_queue # shared input queue - self.result_queue = result_queue # shared output queue def stop(self): self.done.set() def term(self): @@ -1678,7 +1675,6 @@ class WorkerThread(): spent = time.time() - nao job_count += 1 duration_total += spent - self.result_queue.put(job) self.job_queue.task_done() if self.thread and job_count > 0: avg_t = try_div(duration_total, job_count) @@ -1709,10 +1705,9 @@ class Proxywatchd(): # Final save of session state before exit try: - self._prep_db() - dbs.save_session_state(self.mysqlite, self.stats) - dbs.save_stats_snapshot(self.mysqlite, self.stats) - self._close_db() + with self._db_context() as db: + dbs.save_session_state(db, self.stats) + dbs.save_stats_snapshot(db, self.stats) _log('session state saved', 'watchd') except Exception as e: _log('failed to save final session state: %s' % str(e), 'warn') @@ -1725,11 +1720,24 @@ class Proxywatchd(): _log('failed to save MITM state: %s' % str(e), 'warn') def _prep_db(self): + """Deprecated: Use _db_context() instead for new code.""" self.mysqlite = mysqlite.mysqlite(config.watchd.database, str) + def _close_db(self): + """Deprecated: Use _db_context() instead for new code.""" if self.mysqlite: self.mysqlite.close() self.mysqlite = None + + @contextlib.contextmanager + def _db_context(self): + """Context manager for database connections.""" + db = mysqlite.mysqlite(config.watchd.database, str) + try: + yield db + finally: + db.close() + def __init__(self): config.load() self.in_background = False @@ -1739,17 +1747,15 @@ class Proxywatchd(): # shared work-stealing queues self.job_queue = PriorityJobQueue() - self.result_queue = Queue.Queue() self.completion_queue = Queue.Queue() # completed ProxyTestState objects - # track pending proxy states (for multi-target aggregation) - self.pending_states = {} # dict: proxy -> ProxyTestState (O(1) lookup/removal) + # track pending proxy states + self.pending_states = {} # dict: proxy -> ProxyTestState self.pending_lock = threading.Lock() # create table if needed (use dbs.py for canonical schema) - self._prep_db() - dbs.create_table_if_not_exists(self.mysqlite, 'proxylist') - self._close_db() + with self._db_context() as db: + dbs.create_table_if_not_exists(db, 'proxylist') self.submit_after = config.watchd.submit_after # number of collected jobs before writing db self.collected = [] # completed ProxyTestState objects ready for DB @@ -1828,7 +1834,7 @@ class Proxywatchd(): """Spawn a new worker thread.""" self.thread_id_counter += 1 threadid = 'dyn%d' % self.thread_id_counter - wt = WorkerThread(threadid, self.job_queue, self.result_queue) + wt = WorkerThread(threadid, self.job_queue) wt.start_thread() self.threads.append(wt) return threadid @@ -1888,7 +1894,6 @@ class Proxywatchd(): rows = self.fetch_rows() _dbg('prepare_jobs: %d rows, checktypes=%s' % (len(rows), config.watchd.checktypes)) checktypes = config.watchd.checktypes - num_targets = 1 # Build target pools for each checktype target_pools = {} @@ -1918,7 +1923,7 @@ class Proxywatchd(): # country, mitm, consecutive_success, asn, proxy state = ProxyTestState( row[0], row[1], row[2], row[3], row[4], row[5], - row[6], row[7], row[8], asn=row[9], num_targets=num_targets, + row[6], row[7], row[8], asn=row[9], oldies=self.isoldies, completion_queue=self.completion_queue, proxy_full=row[10] ) @@ -1928,13 +1933,12 @@ class Proxywatchd(): checktype = random.choice(checktypes) target_pool = target_pools[checktype] - # select random targets for this proxy - targets = random.sample(target_pool, min(num_targets, len(target_pool))) + # select single target (single-target mode) + target = random.choice(target_pool) - # create one job per target - for target in targets: - job = TargetTestJob(state, target, checktype) - all_jobs.append(job) + # create job for this proxy + job = TargetTestJob(state, target, checktype) + all_jobs.append(job) # shuffle to interleave tests across different proxies random.shuffle(all_jobs) @@ -1959,20 +1963,11 @@ class Proxywatchd(): _dbg('prepare_jobs: created %d jobs for %d proxies' % (job_count, proxy_count)) now = time.time() if proxy_count > 0 and (now - self.last_jobs_log) >= self.log_interval: - _log("created %d jobs for %d proxies (%d targets each)" % ( - job_count, proxy_count, num_targets), 'watchd') + _log("created %d jobs for %d proxies" % (job_count, proxy_count), 'watchd') self.last_jobs_log = now return job_count def collect_work(self): - # drain results from shared result queue (TargetTestJob objects) - # results are already recorded in their ProxyTestState - while True: - try: - self.result_queue.get_nowait() - except Queue.Empty: - break - # process completed states from completion queue (event-driven, not polling) # ProxyTestState.record_result() pushes to completion_queue when all targets done completed_count = 0 @@ -2086,15 +2081,14 @@ class Proxywatchd(): """Remove proxies that have been dead for too long.""" stale_seconds = config.watchd.stale_days * 86400 cutoff = int(time.time()) - stale_seconds - self._prep_db() - # delete proxies that: (failed >= max_fail OR permanently dead) AND last tested before cutoff - result = self.mysqlite.execute( - 'DELETE FROM proxylist WHERE (failed >= ? OR failed = ?) AND tested < ?', - (config.watchd.max_fail, DEAD_PROXY, cutoff) - ) - count = result.rowcount if hasattr(result, 'rowcount') else 0 - self.mysqlite.commit() - self._close_db() + with self._db_context() as db: + # delete proxies that: (failed >= max_fail OR permanently dead) AND last tested before cutoff + result = db.execute( + 'DELETE FROM proxylist WHERE (failed >= ? OR failed = ?) AND tested < ?', + (config.watchd.max_fail, DEAD_PROXY, cutoff) + ) + count = result.rowcount if hasattr(result, 'rowcount') else 0 + db.commit() if count > 0: _log('removed %d stale proxies (not seen in %d days)' % (count, config.watchd.stale_days), 'watchd') self.last_cleanup = time.time() @@ -2313,7 +2307,7 @@ class Proxywatchd(): # create worker threads with shared queues for i in range(config.watchd.threads): threadid = ''.join([random.choice(string.ascii_letters) for x in range(5)]) - wt = WorkerThread(threadid, self.job_queue, self.result_queue) + wt = WorkerThread(threadid, self.job_queue) if self.in_background: wt.start_thread() self.threads.append(wt) @@ -2371,9 +2365,8 @@ class Proxywatchd(): # Save session state periodically (every stats_interval, default 5m) try: - self._prep_db() - dbs.save_session_state(self.mysqlite, self.stats) - self._close_db() + with self._db_context() as db: + dbs.save_session_state(db, self.stats) except Exception as e: _log('failed to save session state: %s' % str(e), 'warn') @@ -2389,9 +2382,8 @@ class Proxywatchd(): self._last_snapshot = now if (now - self._last_snapshot) >= 3600: try: - self._prep_db() - dbs.save_stats_snapshot(self.mysqlite, self.stats) - self._close_db() + with self._db_context() as db: + dbs.save_stats_snapshot(db, self.stats) self._last_snapshot = now except Exception as e: _log('failed to save stats snapshot: %s' % str(e), 'warn')