diff --git a/httpd.py b/httpd.py index b84f2f2..fedc7ba 100644 --- a/httpd.py +++ b/httpd.py @@ -503,6 +503,71 @@ def claim_urls(url_db, worker_id, count=5): return claimed +def submit_url_reports(url_db, worker_id, reports): + """Process URL fetch feedback from workers. Returns count of processed reports.""" + processed = 0 + now_int = int(time.time()) + + for r in reports: + url = r.get('url', '') + if not url: + continue + + # Release URL claim + with _url_claims_lock: + if url in _url_claims: + del _url_claims[url] + + try: + success = r.get('success', False) + content_hash = r.get('content_hash') + proxy_count = r.get('proxy_count', 0) + changed = r.get('changed', False) + + if success: + if changed: + # Content changed: reset stale_count, update hash, add proxy count + url_db.execute( + '''UPDATE uris SET + check_time = ?, + retrievals = retrievals + 1, + error = 0, + stale_count = 0, + content_hash = ?, + proxies_added = proxies_added + ? + WHERE url = ?''', + (now_int, content_hash, proxy_count, url) + ) + else: + # Content unchanged: increment stale_count + url_db.execute( + '''UPDATE uris SET + check_time = ?, + retrievals = retrievals + 1, + error = 0, + stale_count = stale_count + 1, + content_hash = ? + WHERE url = ?''', + (now_int, content_hash, url) + ) + else: + # Fetch failed: increment error count + url_db.execute( + '''UPDATE uris SET + check_time = ?, + error = error + 1 + WHERE url = ?''', + (now_int, url) + ) + + processed += 1 + except Exception as e: + _log('submit_url_reports error for %s: %s' % (url, e), 'error') + + url_db.commit() + return processed + + _last_workers_save = 0 def submit_results(db, worker_id, results): @@ -1301,7 +1366,8 @@ class ProxyAPIServer(threading.Thread): return [b'Method not allowed'] # POST only allowed for worker API endpoints - post_endpoints = ('/api/register', '/api/results', '/api/heartbeat') + post_endpoints = ('/api/register', '/api/results', '/api/heartbeat', + '/api/report-urls', '/api/report-proxies') if method == 'POST' and path not in post_endpoints: start_response('405 Method Not Allowed', [('Content-Type', 'text/plain')]) return [b'POST not allowed for this endpoint'] @@ -1726,6 +1792,32 @@ class ProxyAPIServer(threading.Thread): except Exception as e: return json.dumps({'error': str(e)}), 'application/json', 500 + elif path == '/api/report-urls': + # Worker reports URL fetch results (POST) + key = query_params.get('key', '') + if not validate_worker_key(key): + return json.dumps({'error': 'invalid worker key'}), 'application/json', 403 + worker_id, _ = get_worker_by_key(key) + if not worker_id: + return json.dumps({'error': 'worker not found'}), 'application/json', 404 + if not self.url_database: + return json.dumps({'error': 'url database not configured'}), 'application/json', 500 + if not post_data: + return json.dumps({'error': 'POST body required'}), 'application/json', 400 + reports = post_data.get('reports', []) + if not reports: + return json.dumps({'error': 'no reports provided'}), 'application/json', 400 + try: + url_db = mysqlite.mysqlite(self.url_database, str) + processed = submit_url_reports(url_db, worker_id, reports) + update_worker_heartbeat(worker_id) + return json.dumps({ + 'worker_id': worker_id, + 'processed': processed, + }), 'application/json', 200 + except Exception as e: + return json.dumps({'error': str(e)}), 'application/json', 500 + else: return json.dumps({'error': 'not found'}), 'application/json', 404