diff --git a/proxywatchd.py b/proxywatchd.py index 89594eb..0940391 100644 --- a/proxywatchd.py +++ b/proxywatchd.py @@ -880,16 +880,6 @@ class Proxywatchd(): except Exception as e: _log('failed to save MITM state: %s' % str(e), 'warn') - def _prep_db(self): - """Deprecated: Use _db_context() instead for new code.""" - self.mysqlite = mysqlite.mysqlite(config.watchd.database, str) - - def _close_db(self): - """Deprecated: Use _db_context() instead for new code.""" - if self.mysqlite: - self.mysqlite.close() - self.mysqlite = None - @contextlib.contextmanager def _db_context(self): """Context manager for database connections.""" @@ -1032,13 +1022,14 @@ class Proxywatchd(): if removed: _log('scaled down: -%d threads' % len(removed), 'scaler') - def fetch_rows(self): + def fetch_rows(self, db): + """Fetch proxy rows due for testing from database.""" self.isoldies = False q = 'SELECT ip,port,proto,failed,success_count,total_duration,country,mitm,consecutive_success,asn,proxy FROM proxylist WHERE failed >= ? and failed < ? and (tested + ? + (failed * ?)) < ? ORDER BY RANDOM()' now = time.time() params = (0, config.watchd.max_fail, config.watchd.checktime, config.watchd.perfail_checktime, now) _dbg('fetch_rows: params=(0, %d, %d, %d, %.0f)' % (config.watchd.max_fail, config.watchd.checktime, config.watchd.perfail_checktime, now)) - rows = self.mysqlite.execute(q, params).fetchall() + rows = db.execute(q, params).fetchall() _dbg('fetch_rows: got %d rows, threads=%d' % (len(rows), config.watchd.threads)) # check oldies ? if len(rows) < config.watchd.threads: @@ -1048,14 +1039,17 @@ class Proxywatchd(): self.isoldies = True ## disable tor safeguard for old proxies if self.tor_safeguard: self.tor_safeguard = False - rows = self.mysqlite.execute(q, (config.watchd.max_fail, config.watchd.max_fail + round( config.watchd.max_fail/2), config.watchd.checktime, config.watchd.oldies_checktime, time.time())).fetchall() + rows = db.execute(q, (config.watchd.max_fail, config.watchd.max_fail + round( config.watchd.max_fail/2), config.watchd.checktime, config.watchd.oldies_checktime, time.time())).fetchall() return rows def prepare_jobs(self): - self._prep_db() ## enable tor safeguard by default self.tor_safeguard = config.watchd.tor_safeguard - rows = self.fetch_rows() + + # Fetch rows within context manager scope + with self._db_context() as db: + rows = self.fetch_rows(db) + _dbg('prepare_jobs: %d rows, checktypes=%s' % (len(rows), config.watchd.checktypes)) checktypes = config.watchd.checktypes @@ -1121,7 +1115,6 @@ class Proxywatchd(): ) self.job_queue.put(job, priority) - self._close_db() proxy_count = len(new_states) job_count = len(all_jobs) _dbg('prepare_jobs: created %d jobs for %d proxies' % (job_count, proxy_count)) @@ -1219,23 +1212,26 @@ class Proxywatchd(): dead_msg = ', %d marked dead' % dead_count if dead_count > 0 else '' _log("updating %d DB entries (success rate: %.2f%%%s)" % (len(self.collected), success_rate, dead_msg), 'watchd') self.last_update_log = now - self._prep_db() - query = 'UPDATE proxylist SET failed=?,tested=?,dronebl=?,country=?,proto=?,success_count=?,total_duration=?,mitm=?,consecutive_success=?,asn=? WHERE proxy=?' - self.mysqlite.executemany(query, args) - # Batch update latency metrics for successful proxies - if latency_updates: - dbs.batch_update_proxy_latency(self.mysqlite, latency_updates) - - # Batch update anonymity for proxies with exit IP data + # Build anonymity updates before DB context anonymity_updates = [(job.proxy, job.exit_ip, job.ip, job.reveals_headers) for job in self.collected if job.failcount == 0 and job.exit_ip] - if anonymity_updates: - dbs.batch_update_proxy_anonymity(self.mysqlite, anonymity_updates) - self.mysqlite.commit() - self._close_db() + with self._db_context() as db: + query = 'UPDATE proxylist SET failed=?,tested=?,dronebl=?,country=?,proto=?,success_count=?,total_duration=?,mitm=?,consecutive_success=?,asn=? WHERE proxy=?' + db.executemany(query, args) + + # Batch update latency metrics for successful proxies + if latency_updates: + dbs.batch_update_proxy_latency(db, latency_updates) + + # Batch update anonymity for proxies with exit IP data + if anonymity_updates: + dbs.batch_update_proxy_anonymity(db, anonymity_updates) + + db.commit() + self.collected = [] self.totals['submitted'] += len(args) self.totals['success'] += sc @@ -1429,24 +1425,22 @@ class Proxywatchd(): config.watchd.checktime, config.watchd.perfail_checktime, config.watchd.max_fail), 'watchd') # Log database status at startup - self._prep_db() - total = self.mysqlite.execute('SELECT COUNT(*) FROM proxylist').fetchone()[0] - now = time.time() - due = self.mysqlite.execute( - 'SELECT COUNT(*) FROM proxylist WHERE failed >= 0 and failed < ? and (tested + ? + (failed * ?)) < ?', - (config.watchd.max_fail, config.watchd.checktime, config.watchd.perfail_checktime, now) - ).fetchone()[0] + with self._db_context() as db: + total = db.execute('SELECT COUNT(*) FROM proxylist').fetchone()[0] + now = time.time() + due = db.execute( + 'SELECT COUNT(*) FROM proxylist WHERE failed >= 0 and failed < ? and (tested + ? + (failed * ?)) < ?', + (config.watchd.max_fail, config.watchd.checktime, config.watchd.perfail_checktime, now) + ).fetchone()[0] - # Create stats persistence tables - dbs.create_table_if_not_exists(self.mysqlite, 'stats_history') - dbs.create_table_if_not_exists(self.mysqlite, 'session_state') + # Create stats persistence tables + dbs.create_table_if_not_exists(db, 'stats_history') + dbs.create_table_if_not_exists(db, 'session_state') - # Load persisted session state - saved_state = dbs.load_session_state(self.mysqlite) - if saved_state: - self.stats.load_state(saved_state) - - self._close_db() + # Load persisted session state + saved_state = dbs.load_session_state(db) + if saved_state: + self.stats.load_state(saved_state) # Load MITM certificate state (same directory as database) db_dir = os.path.dirname(config.watchd.database) or '.'