proxywatchd: always use a new mysql obj

could prevent memleaks, also helps thread-ownership issues.
This commit is contained in:
rofl0r
2019-01-05 18:14:41 +00:00
parent cb342c3818
commit 74d9d965bb

View File

@@ -132,13 +132,18 @@ class Proxywatchd():
wt.term() wt.term()
self.collect_work() self.collect_work()
self.submit_collected() self.submit_collected()
self.mysqlite.close()
self.stopped.set() self.stopped.set()
def finish(self): def finish(self):
if not self.in_background: self._cleanup() if not self.in_background: self._cleanup()
while not self.stopped.is_set(): time.sleep(0.1) while not self.stopped.is_set(): time.sleep(0.1)
def _prep_db(self):
self.mysqlite = mysqlite.mysqlite(config.database, str)
def _close_db(self):
if self.mysqlite:
self.mysqlite.close()
self.mysqlite = None
def __init__(self): def __init__(self):
config.load() config.load()
self.in_background = False self.in_background = False
@@ -147,11 +152,10 @@ class Proxywatchd():
self.stopped = threading.Event() self.stopped = threading.Event()
# create table if needed # create table if needed
self.mysqlite = mysqlite.mysqlite(config.database, str) self._prep_db()
self.mysqlite.execute('CREATE TABLE IF NOT EXISTS proxylist (proxy BLOB, country BLOB, added INT, failed INT, tested INT, source BLOB, dronebl INT, proto TEXT, duration INT)') self.mysqlite.execute('CREATE TABLE IF NOT EXISTS proxylist (proxy BLOB, country BLOB, added INT, failed INT, tested INT, source BLOB, dronebl INT, proto TEXT, duration INT)')
self.mysqlite.commit() self.mysqlite.commit()
self.mysqlite.close() self._close_db()
self.mysqlite = None
self.submit_after = 200 # number of collected jobs before writing db self.submit_after = 200 # number of collected jobs before writing db
self.echoise = time.time() - 3600; self.echoise = time.time() - 3600;
@@ -160,21 +164,25 @@ class Proxywatchd():
self.collected = [] self.collected = []
def prepare_jobs(self): def prepare_jobs(self):
self._prep_db()
q = 'SELECT proxy,proto,failed FROM proxylist WHERE failed<? and tested<? ORDER BY RANDOM()' # ' LIMIT ?' q = 'SELECT proxy,proto,failed FROM proxylist WHERE failed<? and tested<? ORDER BY RANDOM()' # ' LIMIT ?'
rows = self.mysqlite.execute(q, (config.maxfail, time.time())).fetchall() rows = self.mysqlite.execute(q, (config.maxfail, time.time())).fetchall()
for row in rows: for row in rows:
job = WorkerJob(row[0], row[1], row[2]) job = WorkerJob(row[0], row[1], row[2])
self.jobs.append(job) self.jobs.append(job)
self._close_db()
def collect_work(self): def collect_work(self):
for wt in self.threads: for wt in self.threads:
self.collected.extend(wt.collect()) self.collected.extend(wt.collect())
def submit_collected(self): def submit_collected(self):
self._prep_db()
query = 'UPDATE proxylist SET failed=?,tested=?,dronebl=?,country=?,proto=?,duration=? WHERE proxy=?' query = 'UPDATE proxylist SET failed=?,tested=?,dronebl=?,country=?,proto=?,duration=? WHERE proxy=?'
args = [ (job.failcount, job.nextcheck, 1, 'unknown', job.proto, job.duration, job.proxy) for job in self.collected ] args = [ (job.failcount, job.nextcheck, 1, 'unknown', job.proto, job.duration, job.proxy) for job in self.collected ]
self.mysqlite.executemany(query, args) self.mysqlite.executemany(query, args)
self.mysqlite.commit() self.mysqlite.commit()
self._close_db()
self.collected = [] self.collected = []
def start(self): def start(self):
@@ -194,7 +202,6 @@ class Proxywatchd():
def _run(self): def _run(self):
_log('Starting proxywatchd..', 'notice') _log('Starting proxywatchd..', 'notice')
self.mysqlite = mysqlite.mysqlite(config.database, str)
for i in range(config.watchd_threads): for i in range(config.watchd_threads):
threadid = ''.join( [ random.choice(string.letters) for x in range(5) ] ) threadid = ''.join( [ random.choice(string.letters) for x in range(5) ] )