proxywatchd: add priority queue for job scheduling
This commit is contained in:
@@ -3,6 +3,7 @@
|
||||
import threading
|
||||
import time, random, string, re, copy
|
||||
import Queue
|
||||
import heapq
|
||||
try:
|
||||
import IP2Location
|
||||
import os
|
||||
@@ -108,6 +109,91 @@ def try_div(a, b):
|
||||
if b != 0: return a/float(b)
|
||||
return 0
|
||||
|
||||
|
||||
class PriorityJobQueue(object):
|
||||
"""Priority queue for proxy test jobs.
|
||||
|
||||
Lower priority number = higher priority.
|
||||
Priority 0: New proxies (never tested)
|
||||
Priority 1: Recently working (no failures, has successes)
|
||||
Priority 2: Low fail count (< 3 failures)
|
||||
Priority 3: Medium fail count
|
||||
Priority 4: High fail count
|
||||
"""
|
||||
|
||||
def __init__(self):
|
||||
self.heap = []
|
||||
self.lock = threading.Lock()
|
||||
self.not_empty = threading.Condition(self.lock)
|
||||
self.counter = 0 # tie-breaker for equal priorities
|
||||
|
||||
def put(self, job, priority=3):
|
||||
"""Add job with priority (lower = higher priority)."""
|
||||
with self.lock:
|
||||
heapq.heappush(self.heap, (priority, self.counter, job))
|
||||
self.counter += 1
|
||||
self.not_empty.notify()
|
||||
|
||||
def get(self, timeout=None):
|
||||
"""Get highest priority job. Raises Queue.Empty on timeout."""
|
||||
with self.not_empty:
|
||||
if timeout is None:
|
||||
while not self.heap:
|
||||
self.not_empty.wait()
|
||||
else:
|
||||
end_time = time.time() + timeout
|
||||
while not self.heap:
|
||||
remaining = end_time - time.time()
|
||||
if remaining <= 0:
|
||||
raise Queue.Empty()
|
||||
self.not_empty.wait(remaining)
|
||||
_, _, job = heapq.heappop(self.heap)
|
||||
return job
|
||||
|
||||
def get_nowait(self):
|
||||
"""Get job without waiting. Raises Queue.Empty if empty."""
|
||||
with self.lock:
|
||||
if not self.heap:
|
||||
raise Queue.Empty()
|
||||
_, _, job = heapq.heappop(self.heap)
|
||||
return job
|
||||
|
||||
def empty(self):
|
||||
"""Check if queue is empty."""
|
||||
with self.lock:
|
||||
return len(self.heap) == 0
|
||||
|
||||
def qsize(self):
|
||||
"""Return queue size."""
|
||||
with self.lock:
|
||||
return len(self.heap)
|
||||
|
||||
def task_done(self):
|
||||
"""Compatibility method (no-op for heap queue)."""
|
||||
pass
|
||||
|
||||
|
||||
def calculate_priority(failcount, success_count, max_fail):
|
||||
"""Calculate job priority based on proxy state.
|
||||
|
||||
Returns:
|
||||
int: Priority 0-4 (lower = higher priority)
|
||||
"""
|
||||
# New proxy (never successfully tested)
|
||||
if success_count == 0 and failcount == 0:
|
||||
return 0
|
||||
# Recently working (no current failures)
|
||||
if failcount == 0:
|
||||
return 1
|
||||
# Low fail count
|
||||
if failcount < 3:
|
||||
return 2
|
||||
# Medium fail count
|
||||
if failcount < max_fail // 2:
|
||||
return 3
|
||||
# High fail count
|
||||
return 4
|
||||
|
||||
def socks4_resolve(srvname, server_port):
|
||||
srv = srvname
|
||||
if srv in cached_dns:
|
||||
@@ -451,7 +537,7 @@ class Proxywatchd():
|
||||
self.stopped = threading.Event()
|
||||
|
||||
# shared work-stealing queues
|
||||
self.job_queue = Queue.Queue()
|
||||
self.job_queue = PriorityJobQueue()
|
||||
self.result_queue = Queue.Queue()
|
||||
|
||||
# track pending proxy states (for multi-target aggregation)
|
||||
@@ -530,9 +616,14 @@ class Proxywatchd():
|
||||
with self.pending_lock:
|
||||
self.pending_states.extend(new_states)
|
||||
|
||||
# queue all jobs
|
||||
# queue all jobs with priority
|
||||
for job in all_jobs:
|
||||
self.job_queue.put(job)
|
||||
priority = calculate_priority(
|
||||
job.proxy_state.failcount,
|
||||
job.proxy_state.success_count,
|
||||
config.watchd.max_fail
|
||||
)
|
||||
self.job_queue.put(job, priority)
|
||||
|
||||
self._close_db()
|
||||
proxy_count = len(new_states)
|
||||
|
||||
Reference in New Issue
Block a user