feat: worker-driven discovery and validation tightening #1
94
httpd.py
94
httpd.py
@@ -503,6 +503,71 @@ def claim_urls(url_db, worker_id, count=5):
|
|||||||
return claimed
|
return claimed
|
||||||
|
|
||||||
|
|
||||||
|
def submit_url_reports(url_db, worker_id, reports):
|
||||||
|
"""Process URL fetch feedback from workers. Returns count of processed reports."""
|
||||||
|
processed = 0
|
||||||
|
now_int = int(time.time())
|
||||||
|
|
||||||
|
for r in reports:
|
||||||
|
url = r.get('url', '')
|
||||||
|
if not url:
|
||||||
|
continue
|
||||||
|
|
||||||
|
# Release URL claim
|
||||||
|
with _url_claims_lock:
|
||||||
|
if url in _url_claims:
|
||||||
|
del _url_claims[url]
|
||||||
|
|
||||||
|
try:
|
||||||
|
success = r.get('success', False)
|
||||||
|
content_hash = r.get('content_hash')
|
||||||
|
proxy_count = r.get('proxy_count', 0)
|
||||||
|
changed = r.get('changed', False)
|
||||||
|
|
||||||
|
if success:
|
||||||
|
if changed:
|
||||||
|
# Content changed: reset stale_count, update hash, add proxy count
|
||||||
|
url_db.execute(
|
||||||
|
'''UPDATE uris SET
|
||||||
|
check_time = ?,
|
||||||
|
retrievals = retrievals + 1,
|
||||||
|
error = 0,
|
||||||
|
stale_count = 0,
|
||||||
|
content_hash = ?,
|
||||||
|
proxies_added = proxies_added + ?
|
||||||
|
WHERE url = ?''',
|
||||||
|
(now_int, content_hash, proxy_count, url)
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
# Content unchanged: increment stale_count
|
||||||
|
url_db.execute(
|
||||||
|
'''UPDATE uris SET
|
||||||
|
check_time = ?,
|
||||||
|
retrievals = retrievals + 1,
|
||||||
|
error = 0,
|
||||||
|
stale_count = stale_count + 1,
|
||||||
|
content_hash = ?
|
||||||
|
WHERE url = ?''',
|
||||||
|
(now_int, content_hash, url)
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
# Fetch failed: increment error count
|
||||||
|
url_db.execute(
|
||||||
|
'''UPDATE uris SET
|
||||||
|
check_time = ?,
|
||||||
|
error = error + 1
|
||||||
|
WHERE url = ?''',
|
||||||
|
(now_int, url)
|
||||||
|
)
|
||||||
|
|
||||||
|
processed += 1
|
||||||
|
except Exception as e:
|
||||||
|
_log('submit_url_reports error for %s: %s' % (url, e), 'error')
|
||||||
|
|
||||||
|
url_db.commit()
|
||||||
|
return processed
|
||||||
|
|
||||||
|
|
||||||
_last_workers_save = 0
|
_last_workers_save = 0
|
||||||
|
|
||||||
def submit_results(db, worker_id, results):
|
def submit_results(db, worker_id, results):
|
||||||
@@ -1301,7 +1366,8 @@ class ProxyAPIServer(threading.Thread):
|
|||||||
return [b'Method not allowed']
|
return [b'Method not allowed']
|
||||||
|
|
||||||
# POST only allowed for worker API endpoints
|
# POST only allowed for worker API endpoints
|
||||||
post_endpoints = ('/api/register', '/api/results', '/api/heartbeat')
|
post_endpoints = ('/api/register', '/api/results', '/api/heartbeat',
|
||||||
|
'/api/report-urls', '/api/report-proxies')
|
||||||
if method == 'POST' and path not in post_endpoints:
|
if method == 'POST' and path not in post_endpoints:
|
||||||
start_response('405 Method Not Allowed', [('Content-Type', 'text/plain')])
|
start_response('405 Method Not Allowed', [('Content-Type', 'text/plain')])
|
||||||
return [b'POST not allowed for this endpoint']
|
return [b'POST not allowed for this endpoint']
|
||||||
@@ -1726,6 +1792,32 @@ class ProxyAPIServer(threading.Thread):
|
|||||||
except Exception as e:
|
except Exception as e:
|
||||||
return json.dumps({'error': str(e)}), 'application/json', 500
|
return json.dumps({'error': str(e)}), 'application/json', 500
|
||||||
|
|
||||||
|
elif path == '/api/report-urls':
|
||||||
|
# Worker reports URL fetch results (POST)
|
||||||
|
key = query_params.get('key', '')
|
||||||
|
if not validate_worker_key(key):
|
||||||
|
return json.dumps({'error': 'invalid worker key'}), 'application/json', 403
|
||||||
|
worker_id, _ = get_worker_by_key(key)
|
||||||
|
if not worker_id:
|
||||||
|
return json.dumps({'error': 'worker not found'}), 'application/json', 404
|
||||||
|
if not self.url_database:
|
||||||
|
return json.dumps({'error': 'url database not configured'}), 'application/json', 500
|
||||||
|
if not post_data:
|
||||||
|
return json.dumps({'error': 'POST body required'}), 'application/json', 400
|
||||||
|
reports = post_data.get('reports', [])
|
||||||
|
if not reports:
|
||||||
|
return json.dumps({'error': 'no reports provided'}), 'application/json', 400
|
||||||
|
try:
|
||||||
|
url_db = mysqlite.mysqlite(self.url_database, str)
|
||||||
|
processed = submit_url_reports(url_db, worker_id, reports)
|
||||||
|
update_worker_heartbeat(worker_id)
|
||||||
|
return json.dumps({
|
||||||
|
'worker_id': worker_id,
|
||||||
|
'processed': processed,
|
||||||
|
}), 'application/json', 200
|
||||||
|
except Exception as e:
|
||||||
|
return json.dumps({'error': str(e)}), 'application/json', 500
|
||||||
|
|
||||||
else:
|
else:
|
||||||
return json.dumps({'error': 'not found'}), 'application/json', 404
|
return json.dumps({'error': 'not found'}), 'application/json', 404
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user