proxywatchd: use context manager for all DB operations
This commit is contained in:
@@ -880,16 +880,6 @@ class Proxywatchd():
|
|||||||
except Exception as e:
|
except Exception as e:
|
||||||
_log('failed to save MITM state: %s' % str(e), 'warn')
|
_log('failed to save MITM state: %s' % str(e), 'warn')
|
||||||
|
|
||||||
def _prep_db(self):
|
|
||||||
"""Deprecated: Use _db_context() instead for new code."""
|
|
||||||
self.mysqlite = mysqlite.mysqlite(config.watchd.database, str)
|
|
||||||
|
|
||||||
def _close_db(self):
|
|
||||||
"""Deprecated: Use _db_context() instead for new code."""
|
|
||||||
if self.mysqlite:
|
|
||||||
self.mysqlite.close()
|
|
||||||
self.mysqlite = None
|
|
||||||
|
|
||||||
@contextlib.contextmanager
|
@contextlib.contextmanager
|
||||||
def _db_context(self):
|
def _db_context(self):
|
||||||
"""Context manager for database connections."""
|
"""Context manager for database connections."""
|
||||||
@@ -1032,13 +1022,14 @@ class Proxywatchd():
|
|||||||
if removed:
|
if removed:
|
||||||
_log('scaled down: -%d threads' % len(removed), 'scaler')
|
_log('scaled down: -%d threads' % len(removed), 'scaler')
|
||||||
|
|
||||||
def fetch_rows(self):
|
def fetch_rows(self, db):
|
||||||
|
"""Fetch proxy rows due for testing from database."""
|
||||||
self.isoldies = False
|
self.isoldies = False
|
||||||
q = 'SELECT ip,port,proto,failed,success_count,total_duration,country,mitm,consecutive_success,asn,proxy FROM proxylist WHERE failed >= ? and failed < ? and (tested + ? + (failed * ?)) < ? ORDER BY RANDOM()'
|
q = 'SELECT ip,port,proto,failed,success_count,total_duration,country,mitm,consecutive_success,asn,proxy FROM proxylist WHERE failed >= ? and failed < ? and (tested + ? + (failed * ?)) < ? ORDER BY RANDOM()'
|
||||||
now = time.time()
|
now = time.time()
|
||||||
params = (0, config.watchd.max_fail, config.watchd.checktime, config.watchd.perfail_checktime, now)
|
params = (0, config.watchd.max_fail, config.watchd.checktime, config.watchd.perfail_checktime, now)
|
||||||
_dbg('fetch_rows: params=(0, %d, %d, %d, %.0f)' % (config.watchd.max_fail, config.watchd.checktime, config.watchd.perfail_checktime, now))
|
_dbg('fetch_rows: params=(0, %d, %d, %d, %.0f)' % (config.watchd.max_fail, config.watchd.checktime, config.watchd.perfail_checktime, now))
|
||||||
rows = self.mysqlite.execute(q, params).fetchall()
|
rows = db.execute(q, params).fetchall()
|
||||||
_dbg('fetch_rows: got %d rows, threads=%d' % (len(rows), config.watchd.threads))
|
_dbg('fetch_rows: got %d rows, threads=%d' % (len(rows), config.watchd.threads))
|
||||||
# check oldies ?
|
# check oldies ?
|
||||||
if len(rows) < config.watchd.threads:
|
if len(rows) < config.watchd.threads:
|
||||||
@@ -1048,14 +1039,17 @@ class Proxywatchd():
|
|||||||
self.isoldies = True
|
self.isoldies = True
|
||||||
## disable tor safeguard for old proxies
|
## disable tor safeguard for old proxies
|
||||||
if self.tor_safeguard: self.tor_safeguard = False
|
if self.tor_safeguard: self.tor_safeguard = False
|
||||||
rows = self.mysqlite.execute(q, (config.watchd.max_fail, config.watchd.max_fail + round( config.watchd.max_fail/2), config.watchd.checktime, config.watchd.oldies_checktime, time.time())).fetchall()
|
rows = db.execute(q, (config.watchd.max_fail, config.watchd.max_fail + round( config.watchd.max_fail/2), config.watchd.checktime, config.watchd.oldies_checktime, time.time())).fetchall()
|
||||||
return rows
|
return rows
|
||||||
|
|
||||||
def prepare_jobs(self):
|
def prepare_jobs(self):
|
||||||
self._prep_db()
|
|
||||||
## enable tor safeguard by default
|
## enable tor safeguard by default
|
||||||
self.tor_safeguard = config.watchd.tor_safeguard
|
self.tor_safeguard = config.watchd.tor_safeguard
|
||||||
rows = self.fetch_rows()
|
|
||||||
|
# Fetch rows within context manager scope
|
||||||
|
with self._db_context() as db:
|
||||||
|
rows = self.fetch_rows(db)
|
||||||
|
|
||||||
_dbg('prepare_jobs: %d rows, checktypes=%s' % (len(rows), config.watchd.checktypes))
|
_dbg('prepare_jobs: %d rows, checktypes=%s' % (len(rows), config.watchd.checktypes))
|
||||||
checktypes = config.watchd.checktypes
|
checktypes = config.watchd.checktypes
|
||||||
|
|
||||||
@@ -1121,7 +1115,6 @@ class Proxywatchd():
|
|||||||
)
|
)
|
||||||
self.job_queue.put(job, priority)
|
self.job_queue.put(job, priority)
|
||||||
|
|
||||||
self._close_db()
|
|
||||||
proxy_count = len(new_states)
|
proxy_count = len(new_states)
|
||||||
job_count = len(all_jobs)
|
job_count = len(all_jobs)
|
||||||
_dbg('prepare_jobs: created %d jobs for %d proxies' % (job_count, proxy_count))
|
_dbg('prepare_jobs: created %d jobs for %d proxies' % (job_count, proxy_count))
|
||||||
@@ -1219,23 +1212,26 @@ class Proxywatchd():
|
|||||||
dead_msg = ', %d marked dead' % dead_count if dead_count > 0 else ''
|
dead_msg = ', %d marked dead' % dead_count if dead_count > 0 else ''
|
||||||
_log("updating %d DB entries (success rate: %.2f%%%s)" % (len(self.collected), success_rate, dead_msg), 'watchd')
|
_log("updating %d DB entries (success rate: %.2f%%%s)" % (len(self.collected), success_rate, dead_msg), 'watchd')
|
||||||
self.last_update_log = now
|
self.last_update_log = now
|
||||||
self._prep_db()
|
|
||||||
query = 'UPDATE proxylist SET failed=?,tested=?,dronebl=?,country=?,proto=?,success_count=?,total_duration=?,mitm=?,consecutive_success=?,asn=? WHERE proxy=?'
|
|
||||||
self.mysqlite.executemany(query, args)
|
|
||||||
|
|
||||||
# Batch update latency metrics for successful proxies
|
# Build anonymity updates before DB context
|
||||||
if latency_updates:
|
|
||||||
dbs.batch_update_proxy_latency(self.mysqlite, latency_updates)
|
|
||||||
|
|
||||||
# Batch update anonymity for proxies with exit IP data
|
|
||||||
anonymity_updates = [(job.proxy, job.exit_ip, job.ip, job.reveals_headers)
|
anonymity_updates = [(job.proxy, job.exit_ip, job.ip, job.reveals_headers)
|
||||||
for job in self.collected
|
for job in self.collected
|
||||||
if job.failcount == 0 and job.exit_ip]
|
if job.failcount == 0 and job.exit_ip]
|
||||||
if anonymity_updates:
|
|
||||||
dbs.batch_update_proxy_anonymity(self.mysqlite, anonymity_updates)
|
|
||||||
|
|
||||||
self.mysqlite.commit()
|
with self._db_context() as db:
|
||||||
self._close_db()
|
query = 'UPDATE proxylist SET failed=?,tested=?,dronebl=?,country=?,proto=?,success_count=?,total_duration=?,mitm=?,consecutive_success=?,asn=? WHERE proxy=?'
|
||||||
|
db.executemany(query, args)
|
||||||
|
|
||||||
|
# Batch update latency metrics for successful proxies
|
||||||
|
if latency_updates:
|
||||||
|
dbs.batch_update_proxy_latency(db, latency_updates)
|
||||||
|
|
||||||
|
# Batch update anonymity for proxies with exit IP data
|
||||||
|
if anonymity_updates:
|
||||||
|
dbs.batch_update_proxy_anonymity(db, anonymity_updates)
|
||||||
|
|
||||||
|
db.commit()
|
||||||
|
|
||||||
self.collected = []
|
self.collected = []
|
||||||
self.totals['submitted'] += len(args)
|
self.totals['submitted'] += len(args)
|
||||||
self.totals['success'] += sc
|
self.totals['success'] += sc
|
||||||
@@ -1429,24 +1425,22 @@ class Proxywatchd():
|
|||||||
config.watchd.checktime, config.watchd.perfail_checktime, config.watchd.max_fail), 'watchd')
|
config.watchd.checktime, config.watchd.perfail_checktime, config.watchd.max_fail), 'watchd')
|
||||||
|
|
||||||
# Log database status at startup
|
# Log database status at startup
|
||||||
self._prep_db()
|
with self._db_context() as db:
|
||||||
total = self.mysqlite.execute('SELECT COUNT(*) FROM proxylist').fetchone()[0]
|
total = db.execute('SELECT COUNT(*) FROM proxylist').fetchone()[0]
|
||||||
now = time.time()
|
now = time.time()
|
||||||
due = self.mysqlite.execute(
|
due = db.execute(
|
||||||
'SELECT COUNT(*) FROM proxylist WHERE failed >= 0 and failed < ? and (tested + ? + (failed * ?)) < ?',
|
'SELECT COUNT(*) FROM proxylist WHERE failed >= 0 and failed < ? and (tested + ? + (failed * ?)) < ?',
|
||||||
(config.watchd.max_fail, config.watchd.checktime, config.watchd.perfail_checktime, now)
|
(config.watchd.max_fail, config.watchd.checktime, config.watchd.perfail_checktime, now)
|
||||||
).fetchone()[0]
|
).fetchone()[0]
|
||||||
|
|
||||||
# Create stats persistence tables
|
# Create stats persistence tables
|
||||||
dbs.create_table_if_not_exists(self.mysqlite, 'stats_history')
|
dbs.create_table_if_not_exists(db, 'stats_history')
|
||||||
dbs.create_table_if_not_exists(self.mysqlite, 'session_state')
|
dbs.create_table_if_not_exists(db, 'session_state')
|
||||||
|
|
||||||
# Load persisted session state
|
# Load persisted session state
|
||||||
saved_state = dbs.load_session_state(self.mysqlite)
|
saved_state = dbs.load_session_state(db)
|
||||||
if saved_state:
|
if saved_state:
|
||||||
self.stats.load_state(saved_state)
|
self.stats.load_state(saved_state)
|
||||||
|
|
||||||
self._close_db()
|
|
||||||
|
|
||||||
# Load MITM certificate state (same directory as database)
|
# Load MITM certificate state (same directory as database)
|
||||||
db_dir = os.path.dirname(config.watchd.database) or '.'
|
db_dir = os.path.dirname(config.watchd.database) or '.'
|
||||||
|
|||||||
Reference in New Issue
Block a user