add configurable thread scaling and queue counter reset
- Add scale_cooldown and scale_threshold config options - ThreadScaler now reads scaling params from config - Reset priority queue counter when queue empties to prevent unbounded growth
This commit is contained in:
@@ -67,6 +67,10 @@ source_file = servers.txt
|
||||
# Enable Tor circuit health monitoring
|
||||
tor_safeguard = 1
|
||||
|
||||
# Thread scaling
|
||||
scale_cooldown = 10
|
||||
scale_threshold = 10.0
|
||||
|
||||
# Debug output
|
||||
debug = 0
|
||||
|
||||
@@ -95,6 +99,10 @@ max_fail = 5
|
||||
# Only extract URLs from same domain
|
||||
extract_samedomain = 0
|
||||
|
||||
# Max age in days for proxy list URLs
|
||||
# URLs older than this that never produced proxies are skipped
|
||||
list_max_age_days = 7
|
||||
|
||||
# Debug output
|
||||
debug = 0
|
||||
|
||||
@@ -140,3 +148,15 @@ listenip = 127.0.0.1
|
||||
|
||||
# Listen port
|
||||
port = 8081
|
||||
|
||||
[worker]
|
||||
# Distributed worker settings (for --worker mode)
|
||||
|
||||
# Proxies per work batch
|
||||
batch_size = 100
|
||||
|
||||
# Heartbeat interval in seconds
|
||||
heartbeat = 60
|
||||
|
||||
# Seconds before unclaimed work is released
|
||||
claim_timeout = 300
|
||||
|
||||
19
config.py
19
config.py
@@ -37,9 +37,9 @@ class Config(ComboParser):
|
||||
if self.ppf.timeout <= 0:
|
||||
errors.append('ppf.timeout must be > 0')
|
||||
|
||||
# Validate thread counts
|
||||
if self.watchd.threads < 1:
|
||||
errors.append('watchd.threads must be >= 1')
|
||||
# Validate thread counts (0 allowed for watchd to disable local testing)
|
||||
if self.watchd.threads < 0:
|
||||
errors.append('watchd.threads must be >= 0')
|
||||
if self.ppf.threads < 1:
|
||||
errors.append('ppf.threads must be >= 1')
|
||||
if self.scraper.threads < 1:
|
||||
@@ -112,6 +112,8 @@ class Config(ComboParser):
|
||||
self.add_item(section, 'stats_interval', int, 300, 'seconds between status reports (default: 300)', False)
|
||||
self.add_item(section, 'tor_safeguard', bool, True, 'enable tor safeguard (default: True)', False)
|
||||
self.add_item(section, 'checktype', str, 'ssl', 'check type(s): irc, head, judges, ssl (comma-separated for random)', False)
|
||||
self.add_item(section, 'scale_cooldown', int, 10, 'seconds between thread scaling decisions (default: 10)', False)
|
||||
self.add_item(section, 'scale_threshold', float, 10.0, 'min success rate % to scale up threads (default: 10.0)', False)
|
||||
|
||||
section = 'httpd'
|
||||
self.add_item(section, 'listenip', str, '127.0.0.1', 'address for the httpd to listen to (default: 127.0.0.1)', True)
|
||||
@@ -129,6 +131,7 @@ class Config(ComboParser):
|
||||
self.add_item(section, 'max_fail', int, 5, 'number of fails after which an url is considered dead', False)
|
||||
self.add_item(section, 'database', str, 'proxies.sqlite', 'filename of database', True)
|
||||
self.add_item(section, 'extract_samedomain', bool, False, 'extract only url from same domains? (default: False)', False)
|
||||
self.add_item(section, 'list_max_age_days', int, 7, 'max age in days for proxy list URLs (default: 7)', False)
|
||||
|
||||
section = 'scraper'
|
||||
self.add_item(section, 'enabled', bool, True, 'enable search engine scraper (default: True)', False)
|
||||
@@ -143,8 +146,18 @@ class Config(ComboParser):
|
||||
self.add_item(section, 'libretranslate_url', str, 'https://lt.mymx.me/translate', 'LibreTranslate API URL (default: https://lt.mymx.me/translate)', False)
|
||||
self.add_item(section, 'libretranslate_enabled', bool, False, 'enable LibreTranslate for dynamic translations (default: False)', False)
|
||||
|
||||
section = 'worker'
|
||||
self.add_item(section, 'batch_size', int, 100, 'proxies per work batch (default: 100)', False)
|
||||
self.add_item(section, 'heartbeat', int, 60, 'heartbeat interval in seconds (default: 60)', False)
|
||||
self.add_item(section, 'claim_timeout', int, 300, 'seconds before unclaimed work is released (default: 300)', False)
|
||||
|
||||
self.aparser.add_argument("--file", help="import a single file containing proxy addrs", type=str, default='', required=False)
|
||||
self.aparser.add_argument("--nobs", help="disable BeautifulSoup, use stdlib HTMLParser", action='store_true', default=False)
|
||||
self.aparser.add_argument("-q", "--quiet", help="suppress info messages, show warnings and errors only", action='store_true', default=False)
|
||||
self.aparser.add_argument("-v", "--verbose", help="show debug messages", action='store_true', default=False)
|
||||
self.aparser.add_argument("--profile", help="enable cProfile profiling, output to profile.stats", action='store_true', default=False)
|
||||
self.aparser.add_argument("--worker", help="run as worker node", action='store_true', default=False)
|
||||
self.aparser.add_argument("--server", help="master server URL (e.g., https://master:8081)", type=str, default='')
|
||||
self.aparser.add_argument("--worker-key", help="worker authentication key", type=str, default='')
|
||||
self.aparser.add_argument("--register", help="register as worker with master server", action='store_true', default=False)
|
||||
self.aparser.add_argument("--worker-name", help="worker name for registration (default: hostname)", type=str, default='')
|
||||
|
||||
@@ -963,6 +963,9 @@ class PriorityJobQueue(object):
|
||||
def put(self, job, priority=3):
|
||||
"""Add job with priority (lower = higher priority)."""
|
||||
with self.lock:
|
||||
# Reset counter when queue was empty to prevent unbounded growth
|
||||
if not self.heap:
|
||||
self.counter = 0
|
||||
heapq.heappush(self.heap, (priority, self.counter, job))
|
||||
self.counter += 1
|
||||
self.not_empty.notify()
|
||||
@@ -1042,13 +1045,14 @@ class ThreadScaler(object):
|
||||
- Success rate drops below threshold
|
||||
"""
|
||||
|
||||
def __init__(self, min_threads=5, max_threads=100, target_queue_per_thread=5):
|
||||
def __init__(self, min_threads=5, max_threads=100, target_queue_per_thread=5,
|
||||
scale_cooldown=10, scale_threshold=10.0):
|
||||
self.min_threads = min_threads
|
||||
self.max_threads = max_threads
|
||||
self.target_queue_per_thread = target_queue_per_thread
|
||||
self.last_scale_time = 0
|
||||
self.scale_cooldown = 10 # seconds between scaling (faster with greenlets)
|
||||
self.success_threshold = 10.0 # minimum success rate % to scale up
|
||||
self.scale_cooldown = scale_cooldown
|
||||
self.success_threshold = scale_threshold
|
||||
|
||||
def should_scale(self, current_threads, queue_size, stats):
|
||||
"""Determine if scaling is needed.
|
||||
@@ -1771,8 +1775,10 @@ class Proxywatchd():
|
||||
min_t = config.watchd.min_threads if config.watchd.min_threads > 0 else max(5, config.watchd.threads // 4)
|
||||
self.scaler = ThreadScaler(
|
||||
min_threads=min_t,
|
||||
max_threads=config.watchd.threads, # respect configured thread limit
|
||||
target_queue_per_thread=5
|
||||
max_threads=config.watchd.threads,
|
||||
target_queue_per_thread=5,
|
||||
scale_cooldown=config.watchd.scale_cooldown,
|
||||
scale_threshold=config.watchd.scale_threshold
|
||||
)
|
||||
self.thread_id_counter = 0
|
||||
self.last_jobs_log = 0
|
||||
|
||||
Reference in New Issue
Block a user