phase 2: code cleanup and simplification
All checks were successful
CI / syntax-check (push) Successful in 3s
CI / memory-leak-check (push) Successful in 11s

- Remove unused result_queue from WorkerThread and worker mode
- Remove num_targets abstraction, simplify to single-target mode
- Add _db_context() context manager for database connections
- Refactor 5 call sites to use context manager (finish, init, cleanup_stale, periodic saves)
- Mark _prep_db/_close_db as deprecated
- Add __version__ = '2.0.0' to ppf.py
- Add thread spawn stagger (0-100ms) in worker mode for Tor-friendly startup
This commit is contained in:
Username
2025-12-28 14:31:37 +01:00
parent 72a2dcdaf4
commit d219cc567f
2 changed files with 54 additions and 61 deletions

5
ppf.py
View File

@@ -1,5 +1,7 @@
#!/usr/bin/env python2
__version__ = '2.0.0'
import sys
import os
@@ -481,13 +483,12 @@ def worker_main(config):
# Create shared queues for worker threads
job_queue = proxywatchd.PriorityJobQueue()
result_queue = Queue.Queue()
completion_queue = Queue.Queue()
# Spawn worker threads with stagger to avoid overwhelming Tor
threads = []
for i in range(num_threads):
wt = proxywatchd.WorkerThread('w%d' % i, job_queue, result_queue)
wt = proxywatchd.WorkerThread('w%d' % i, job_queue)
wt.start_thread()
threads.append(wt)
time.sleep(random.random() / 10) # 0-100ms stagger per thread

View File

@@ -17,6 +17,7 @@ import network_stats
import os
import ssl
import contextlib
try:
import Queue
except ImportError:
@@ -1133,13 +1134,12 @@ def socks4_resolve(srvname, server_port):
class ProxyTestState():
"""Thread-safe state for a proxy being tested against multiple targets.
"""Thread-safe state for a proxy being tested.
Results from TargetTestJob instances are aggregated here.
When all tests complete, evaluate() determines final pass/fail.
Holds test results and evaluates final pass/fail status.
"""
def __init__(self, ip, port, proto, failcount, success_count, total_duration,
country, mitm, consecutive_success, asn=None, num_targets=3, oldies=False,
country, mitm, consecutive_success, asn=None, oldies=False,
completion_queue=None, proxy_full=None):
self.ip = ip
self.port = int(port)
@@ -1160,7 +1160,6 @@ class ProxyTestState():
self.consecutive_success = consecutive_success
self.asn = asn
self.isoldies = oldies
self.num_targets = num_targets
self.completion_queue = completion_queue # for signaling completion
# thread-safe result accumulation
@@ -1205,8 +1204,8 @@ class ProxyTestState():
# Track cert errors
if category in ('cert_error', 'ssl_error', 'ssl_mitm'):
self.cert_error = True
# Check completion (inside lock to prevent race)
if not self.completed and len(self.results) >= self.num_targets:
# Check completion (single-target mode)
if not self.completed and len(self.results) >= 1:
self.completed = True
should_signal = True
# Signal outside lock to avoid deadlock
@@ -1220,7 +1219,7 @@ class ProxyTestState():
return True
# Slow path: check with lock (only during transition)
with self.lock:
return len(self.results) >= self.num_targets
return len(self.results) >= 1
@staticmethod
def rwip(ip):
@@ -1313,10 +1312,9 @@ class ProxyTestState():
anon_status = ' anon: anonymous;'
else:
anon_status = ' anon: anonymous;' # default if no header check
_log('%s://%s:%d (%s) d: %.2f sec(s);%s%s srv: %s; ssl: %s; %d/%d targets' % (
_log('%s://%s:%d (%s) d: %.2f sec(s);%s%s srv: %s; ssl: %s' % (
last_good['proto'], self.ip, self.port, self.country,
last_good['duration'], torstats, anon_status, last_good['srv'], str(last_good['ssl']),
num_success, self.num_targets), 'info')
last_good['duration'], torstats, anon_status, last_good['srv'], str(last_good['ssl'])), 'info')
_dbg('PASS: failcount=0', self.proxy)
return (True, None)
@@ -1650,12 +1648,11 @@ class TargetTestJob():
class WorkerThread():
def __init__(self, id, job_queue, result_queue):
def __init__(self, id, job_queue):
self.id = id
self.done = threading.Event()
self.thread = None
self.job_queue = job_queue # shared input queue
self.result_queue = result_queue # shared output queue
def stop(self):
self.done.set()
def term(self):
@@ -1678,7 +1675,6 @@ class WorkerThread():
spent = time.time() - nao
job_count += 1
duration_total += spent
self.result_queue.put(job)
self.job_queue.task_done()
if self.thread and job_count > 0:
avg_t = try_div(duration_total, job_count)
@@ -1709,10 +1705,9 @@ class Proxywatchd():
# Final save of session state before exit
try:
self._prep_db()
dbs.save_session_state(self.mysqlite, self.stats)
dbs.save_stats_snapshot(self.mysqlite, self.stats)
self._close_db()
with self._db_context() as db:
dbs.save_session_state(db, self.stats)
dbs.save_stats_snapshot(db, self.stats)
_log('session state saved', 'watchd')
except Exception as e:
_log('failed to save final session state: %s' % str(e), 'warn')
@@ -1725,11 +1720,24 @@ class Proxywatchd():
_log('failed to save MITM state: %s' % str(e), 'warn')
def _prep_db(self):
"""Deprecated: Use _db_context() instead for new code."""
self.mysqlite = mysqlite.mysqlite(config.watchd.database, str)
def _close_db(self):
"""Deprecated: Use _db_context() instead for new code."""
if self.mysqlite:
self.mysqlite.close()
self.mysqlite = None
@contextlib.contextmanager
def _db_context(self):
"""Context manager for database connections."""
db = mysqlite.mysqlite(config.watchd.database, str)
try:
yield db
finally:
db.close()
def __init__(self):
config.load()
self.in_background = False
@@ -1739,17 +1747,15 @@ class Proxywatchd():
# shared work-stealing queues
self.job_queue = PriorityJobQueue()
self.result_queue = Queue.Queue()
self.completion_queue = Queue.Queue() # completed ProxyTestState objects
# track pending proxy states (for multi-target aggregation)
self.pending_states = {} # dict: proxy -> ProxyTestState (O(1) lookup/removal)
# track pending proxy states
self.pending_states = {} # dict: proxy -> ProxyTestState
self.pending_lock = threading.Lock()
# create table if needed (use dbs.py for canonical schema)
self._prep_db()
dbs.create_table_if_not_exists(self.mysqlite, 'proxylist')
self._close_db()
with self._db_context() as db:
dbs.create_table_if_not_exists(db, 'proxylist')
self.submit_after = config.watchd.submit_after # number of collected jobs before writing db
self.collected = [] # completed ProxyTestState objects ready for DB
@@ -1828,7 +1834,7 @@ class Proxywatchd():
"""Spawn a new worker thread."""
self.thread_id_counter += 1
threadid = 'dyn%d' % self.thread_id_counter
wt = WorkerThread(threadid, self.job_queue, self.result_queue)
wt = WorkerThread(threadid, self.job_queue)
wt.start_thread()
self.threads.append(wt)
return threadid
@@ -1888,7 +1894,6 @@ class Proxywatchd():
rows = self.fetch_rows()
_dbg('prepare_jobs: %d rows, checktypes=%s' % (len(rows), config.watchd.checktypes))
checktypes = config.watchd.checktypes
num_targets = 1
# Build target pools for each checktype
target_pools = {}
@@ -1918,7 +1923,7 @@ class Proxywatchd():
# country, mitm, consecutive_success, asn, proxy
state = ProxyTestState(
row[0], row[1], row[2], row[3], row[4], row[5],
row[6], row[7], row[8], asn=row[9], num_targets=num_targets,
row[6], row[7], row[8], asn=row[9],
oldies=self.isoldies, completion_queue=self.completion_queue,
proxy_full=row[10]
)
@@ -1928,13 +1933,12 @@ class Proxywatchd():
checktype = random.choice(checktypes)
target_pool = target_pools[checktype]
# select random targets for this proxy
targets = random.sample(target_pool, min(num_targets, len(target_pool)))
# select single target (single-target mode)
target = random.choice(target_pool)
# create one job per target
for target in targets:
job = TargetTestJob(state, target, checktype)
all_jobs.append(job)
# create job for this proxy
job = TargetTestJob(state, target, checktype)
all_jobs.append(job)
# shuffle to interleave tests across different proxies
random.shuffle(all_jobs)
@@ -1959,20 +1963,11 @@ class Proxywatchd():
_dbg('prepare_jobs: created %d jobs for %d proxies' % (job_count, proxy_count))
now = time.time()
if proxy_count > 0 and (now - self.last_jobs_log) >= self.log_interval:
_log("created %d jobs for %d proxies (%d targets each)" % (
job_count, proxy_count, num_targets), 'watchd')
_log("created %d jobs for %d proxies" % (job_count, proxy_count), 'watchd')
self.last_jobs_log = now
return job_count
def collect_work(self):
# drain results from shared result queue (TargetTestJob objects)
# results are already recorded in their ProxyTestState
while True:
try:
self.result_queue.get_nowait()
except Queue.Empty:
break
# process completed states from completion queue (event-driven, not polling)
# ProxyTestState.record_result() pushes to completion_queue when all targets done
completed_count = 0
@@ -2086,15 +2081,14 @@ class Proxywatchd():
"""Remove proxies that have been dead for too long."""
stale_seconds = config.watchd.stale_days * 86400
cutoff = int(time.time()) - stale_seconds
self._prep_db()
# delete proxies that: (failed >= max_fail OR permanently dead) AND last tested before cutoff
result = self.mysqlite.execute(
'DELETE FROM proxylist WHERE (failed >= ? OR failed = ?) AND tested < ?',
(config.watchd.max_fail, DEAD_PROXY, cutoff)
)
count = result.rowcount if hasattr(result, 'rowcount') else 0
self.mysqlite.commit()
self._close_db()
with self._db_context() as db:
# delete proxies that: (failed >= max_fail OR permanently dead) AND last tested before cutoff
result = db.execute(
'DELETE FROM proxylist WHERE (failed >= ? OR failed = ?) AND tested < ?',
(config.watchd.max_fail, DEAD_PROXY, cutoff)
)
count = result.rowcount if hasattr(result, 'rowcount') else 0
db.commit()
if count > 0:
_log('removed %d stale proxies (not seen in %d days)' % (count, config.watchd.stale_days), 'watchd')
self.last_cleanup = time.time()
@@ -2313,7 +2307,7 @@ class Proxywatchd():
# create worker threads with shared queues
for i in range(config.watchd.threads):
threadid = ''.join([random.choice(string.ascii_letters) for x in range(5)])
wt = WorkerThread(threadid, self.job_queue, self.result_queue)
wt = WorkerThread(threadid, self.job_queue)
if self.in_background:
wt.start_thread()
self.threads.append(wt)
@@ -2371,9 +2365,8 @@ class Proxywatchd():
# Save session state periodically (every stats_interval, default 5m)
try:
self._prep_db()
dbs.save_session_state(self.mysqlite, self.stats)
self._close_db()
with self._db_context() as db:
dbs.save_session_state(db, self.stats)
except Exception as e:
_log('failed to save session state: %s' % str(e), 'warn')
@@ -2389,9 +2382,8 @@ class Proxywatchd():
self._last_snapshot = now
if (now - self._last_snapshot) >= 3600:
try:
self._prep_db()
dbs.save_stats_snapshot(self.mysqlite, self.stats)
self._close_db()
with self._db_context() as db:
dbs.save_stats_snapshot(db, self.stats)
self._last_snapshot = now
except Exception as e:
_log('failed to save stats snapshot: %s' % str(e), 'warn')