From 3162c655495d62a942b71d74a69f8532d1caea05 Mon Sep 17 00:00:00 2001 From: Username Date: Tue, 17 Feb 2026 13:42:59 +0100 Subject: [PATCH] httpd: add /api/claim-urls endpoint --- httpd.py | 101 +++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 101 insertions(+) diff --git a/httpd.py b/httpd.py index 420f111..b84f2f2 100644 --- a/httpd.py +++ b/httpd.py @@ -80,6 +80,11 @@ _master_key = None # master key for worker registration _claim_timeout = 300 # seconds before unclaimed work is released _workers_file = 'data/workers.json' # persistent storage +# URL claim tracking (parallel to proxy claims) +_url_claims = {} # url -> {worker_id, claimed_at} +_url_claims_lock = threading.Lock() +_url_claim_timeout = 600 # 10 min (URLs take longer to fetch+extract) + # Test rate tracking: worker_id -> list of (timestamp, count) tuples _worker_test_history = {} _worker_test_history_lock = threading.Lock() @@ -425,6 +430,79 @@ def claim_work(db, worker_id, count=100): return claimed +def claim_urls(url_db, worker_id, count=5): + """Claim a batch of URLs for worker-driven fetching. Returns list of URL dicts.""" + now = time.time() + now_int = int(now) + + # Import here to avoid circular dependency at module level + try: + from fetch import detect_proto_from_path + except ImportError: + detect_proto_from_path = None + + # Clean expired URL claims + with _url_claims_lock: + stale = [k for k, v in _url_claims.items() if now - v['claimed_at'] > _url_claim_timeout] + for k in stale: + del _url_claims[k] + claimed_urls = set(_url_claims.keys()) + + # Reuse the same scheduling formula as ppf.py main loop (line 800-805): + # WHERE error < max_fail + # AND (check_time + checktime + ((error + stale_count) * perfail_checktime) < now) + # AND (added > min_added OR proxies_added > 0) + # Use defaults matching config.ppf: checktime=3600, perfail_checktime=3600, max_fail=10 + # list_max_age_days=30 + checktime = 3600 + perfail_checktime = 3600 + max_fail = 10 + list_max_age_seconds = 30 * 86400 + min_added = now_int - list_max_age_seconds + + try: + rows = url_db.execute( + '''SELECT url, content_hash, check_time, error, stale_count, + retrievals, proxies_added + FROM uris + WHERE error < ? + AND (check_time + ? + ((error + stale_count) * ?) < ?) + AND (added > ? OR proxies_added > 0) + ORDER BY RANDOM() + LIMIT ?''', + (max_fail, checktime, perfail_checktime, now_int, min_added, count * 3) + ).fetchall() + except Exception as e: + _log('claim_urls query error: %s' % e, 'error') + return [] + + # Filter out already-claimed URLs and lock new claims + claimed = [] + with _url_claims_lock: + for row in rows: + url = row[0] + if url in _url_claims or url in claimed_urls: + continue + _url_claims[url] = {'worker_id': worker_id, 'claimed_at': now} + + proto_hint = None + if detect_proto_from_path: + proto_hint = detect_proto_from_path(url) + + claimed.append({ + 'url': url, + 'last_hash': row[1], + 'proto_hint': proto_hint, + }) + if len(claimed) >= count: + break + + if claimed: + _log('claim_urls: %d URLs to %s' % (len(claimed), worker_id[:8]), 'info') + + return claimed + + _last_workers_save = 0 def submit_results(db, worker_id, results): @@ -1625,6 +1703,29 @@ class ProxyAPIServer(threading.Thread): _log('api/workers error: %s' % e, 'warn') return json.dumps({'error': str(e)}), 'application/json', 500 + elif path == '/api/claim-urls': + # Worker claims batch of URLs for fetching (GET) + key = query_params.get('key', '') + if not validate_worker_key(key): + return json.dumps({'error': 'invalid worker key'}), 'application/json', 403 + worker_id, _ = get_worker_by_key(key) + if not worker_id: + return json.dumps({'error': 'worker not found'}), 'application/json', 404 + if not self.url_database: + return json.dumps({'error': 'url database not configured'}), 'application/json', 500 + count = min(int(query_params.get('count', 5)), 20) + try: + url_db = mysqlite.mysqlite(self.url_database, str) + urls = claim_urls(url_db, worker_id, count) + update_worker_heartbeat(worker_id) + return json.dumps({ + 'worker_id': worker_id, + 'count': len(urls), + 'urls': urls, + }), 'application/json', 200 + except Exception as e: + return json.dumps({'error': str(e)}), 'application/json', 500 + else: return json.dumps({'error': 'not found'}), 'application/json', 404