diff --git a/proxywatchd.py b/proxywatchd.py index 1c6f936..f661590 100644 --- a/proxywatchd.py +++ b/proxywatchd.py @@ -3,6 +3,7 @@ import threading import time, random, string, re, copy import Queue +import heapq try: import IP2Location import os @@ -108,6 +109,91 @@ def try_div(a, b): if b != 0: return a/float(b) return 0 + +class PriorityJobQueue(object): + """Priority queue for proxy test jobs. + + Lower priority number = higher priority. + Priority 0: New proxies (never tested) + Priority 1: Recently working (no failures, has successes) + Priority 2: Low fail count (< 3 failures) + Priority 3: Medium fail count + Priority 4: High fail count + """ + + def __init__(self): + self.heap = [] + self.lock = threading.Lock() + self.not_empty = threading.Condition(self.lock) + self.counter = 0 # tie-breaker for equal priorities + + def put(self, job, priority=3): + """Add job with priority (lower = higher priority).""" + with self.lock: + heapq.heappush(self.heap, (priority, self.counter, job)) + self.counter += 1 + self.not_empty.notify() + + def get(self, timeout=None): + """Get highest priority job. Raises Queue.Empty on timeout.""" + with self.not_empty: + if timeout is None: + while not self.heap: + self.not_empty.wait() + else: + end_time = time.time() + timeout + while not self.heap: + remaining = end_time - time.time() + if remaining <= 0: + raise Queue.Empty() + self.not_empty.wait(remaining) + _, _, job = heapq.heappop(self.heap) + return job + + def get_nowait(self): + """Get job without waiting. Raises Queue.Empty if empty.""" + with self.lock: + if not self.heap: + raise Queue.Empty() + _, _, job = heapq.heappop(self.heap) + return job + + def empty(self): + """Check if queue is empty.""" + with self.lock: + return len(self.heap) == 0 + + def qsize(self): + """Return queue size.""" + with self.lock: + return len(self.heap) + + def task_done(self): + """Compatibility method (no-op for heap queue).""" + pass + + +def calculate_priority(failcount, success_count, max_fail): + """Calculate job priority based on proxy state. + + Returns: + int: Priority 0-4 (lower = higher priority) + """ + # New proxy (never successfully tested) + if success_count == 0 and failcount == 0: + return 0 + # Recently working (no current failures) + if failcount == 0: + return 1 + # Low fail count + if failcount < 3: + return 2 + # Medium fail count + if failcount < max_fail // 2: + return 3 + # High fail count + return 4 + def socks4_resolve(srvname, server_port): srv = srvname if srv in cached_dns: @@ -451,7 +537,7 @@ class Proxywatchd(): self.stopped = threading.Event() # shared work-stealing queues - self.job_queue = Queue.Queue() + self.job_queue = PriorityJobQueue() self.result_queue = Queue.Queue() # track pending proxy states (for multi-target aggregation) @@ -530,9 +616,14 @@ class Proxywatchd(): with self.pending_lock: self.pending_states.extend(new_states) - # queue all jobs + # queue all jobs with priority for job in all_jobs: - self.job_queue.put(job) + priority = calculate_priority( + job.proxy_state.failcount, + job.proxy_state.success_count, + config.watchd.max_fail + ) + self.job_queue.put(job, priority) self._close_db() proxy_count = len(new_states)