feat: worker-driven discovery and validation tightening #1
101
httpd.py
101
httpd.py
@@ -80,6 +80,11 @@ _master_key = None # master key for worker registration
|
||||
_claim_timeout = 300 # seconds before unclaimed work is released
|
||||
_workers_file = 'data/workers.json' # persistent storage
|
||||
|
||||
# URL claim tracking (parallel to proxy claims)
|
||||
_url_claims = {} # url -> {worker_id, claimed_at}
|
||||
_url_claims_lock = threading.Lock()
|
||||
_url_claim_timeout = 600 # 10 min (URLs take longer to fetch+extract)
|
||||
|
||||
# Test rate tracking: worker_id -> list of (timestamp, count) tuples
|
||||
_worker_test_history = {}
|
||||
_worker_test_history_lock = threading.Lock()
|
||||
@@ -425,6 +430,79 @@ def claim_work(db, worker_id, count=100):
|
||||
|
||||
return claimed
|
||||
|
||||
def claim_urls(url_db, worker_id, count=5):
|
||||
"""Claim a batch of URLs for worker-driven fetching. Returns list of URL dicts."""
|
||||
now = time.time()
|
||||
now_int = int(now)
|
||||
|
||||
# Import here to avoid circular dependency at module level
|
||||
try:
|
||||
from fetch import detect_proto_from_path
|
||||
except ImportError:
|
||||
detect_proto_from_path = None
|
||||
|
||||
# Clean expired URL claims
|
||||
with _url_claims_lock:
|
||||
stale = [k for k, v in _url_claims.items() if now - v['claimed_at'] > _url_claim_timeout]
|
||||
for k in stale:
|
||||
del _url_claims[k]
|
||||
claimed_urls = set(_url_claims.keys())
|
||||
|
||||
# Reuse the same scheduling formula as ppf.py main loop (line 800-805):
|
||||
# WHERE error < max_fail
|
||||
# AND (check_time + checktime + ((error + stale_count) * perfail_checktime) < now)
|
||||
# AND (added > min_added OR proxies_added > 0)
|
||||
# Use defaults matching config.ppf: checktime=3600, perfail_checktime=3600, max_fail=10
|
||||
# list_max_age_days=30
|
||||
checktime = 3600
|
||||
perfail_checktime = 3600
|
||||
max_fail = 10
|
||||
list_max_age_seconds = 30 * 86400
|
||||
min_added = now_int - list_max_age_seconds
|
||||
|
||||
try:
|
||||
rows = url_db.execute(
|
||||
'''SELECT url, content_hash, check_time, error, stale_count,
|
||||
retrievals, proxies_added
|
||||
FROM uris
|
||||
WHERE error < ?
|
||||
AND (check_time + ? + ((error + stale_count) * ?) < ?)
|
||||
AND (added > ? OR proxies_added > 0)
|
||||
ORDER BY RANDOM()
|
||||
LIMIT ?''',
|
||||
(max_fail, checktime, perfail_checktime, now_int, min_added, count * 3)
|
||||
).fetchall()
|
||||
except Exception as e:
|
||||
_log('claim_urls query error: %s' % e, 'error')
|
||||
return []
|
||||
|
||||
# Filter out already-claimed URLs and lock new claims
|
||||
claimed = []
|
||||
with _url_claims_lock:
|
||||
for row in rows:
|
||||
url = row[0]
|
||||
if url in _url_claims or url in claimed_urls:
|
||||
continue
|
||||
_url_claims[url] = {'worker_id': worker_id, 'claimed_at': now}
|
||||
|
||||
proto_hint = None
|
||||
if detect_proto_from_path:
|
||||
proto_hint = detect_proto_from_path(url)
|
||||
|
||||
claimed.append({
|
||||
'url': url,
|
||||
'last_hash': row[1],
|
||||
'proto_hint': proto_hint,
|
||||
})
|
||||
if len(claimed) >= count:
|
||||
break
|
||||
|
||||
if claimed:
|
||||
_log('claim_urls: %d URLs to %s' % (len(claimed), worker_id[:8]), 'info')
|
||||
|
||||
return claimed
|
||||
|
||||
|
||||
_last_workers_save = 0
|
||||
|
||||
def submit_results(db, worker_id, results):
|
||||
@@ -1625,6 +1703,29 @@ class ProxyAPIServer(threading.Thread):
|
||||
_log('api/workers error: %s' % e, 'warn')
|
||||
return json.dumps({'error': str(e)}), 'application/json', 500
|
||||
|
||||
elif path == '/api/claim-urls':
|
||||
# Worker claims batch of URLs for fetching (GET)
|
||||
key = query_params.get('key', '')
|
||||
if not validate_worker_key(key):
|
||||
return json.dumps({'error': 'invalid worker key'}), 'application/json', 403
|
||||
worker_id, _ = get_worker_by_key(key)
|
||||
if not worker_id:
|
||||
return json.dumps({'error': 'worker not found'}), 'application/json', 404
|
||||
if not self.url_database:
|
||||
return json.dumps({'error': 'url database not configured'}), 'application/json', 500
|
||||
count = min(int(query_params.get('count', 5)), 20)
|
||||
try:
|
||||
url_db = mysqlite.mysqlite(self.url_database, str)
|
||||
urls = claim_urls(url_db, worker_id, count)
|
||||
update_worker_heartbeat(worker_id)
|
||||
return json.dumps({
|
||||
'worker_id': worker_id,
|
||||
'count': len(urls),
|
||||
'urls': urls,
|
||||
}), 'application/json', 200
|
||||
except Exception as e:
|
||||
return json.dumps({'error': str(e)}), 'application/json', 500
|
||||
|
||||
else:
|
||||
return json.dumps({'error': 'not found'}), 'application/json', 404
|
||||
|
||||
|
||||
Reference in New Issue
Block a user