diff --git a/httpd.py b/httpd.py index 0ffb092..57de23f 100644 --- a/httpd.py +++ b/httpd.py @@ -85,6 +85,17 @@ _url_claims = {} # url -> {worker_id, claimed_at} _url_claims_lock = threading.Lock() _url_claim_timeout = 600 # 10 min (URLs take longer to fetch+extract) +# URL scoring: pending proxy counts for working_ratio correlation +_url_pending_counts = {} # url -> {total, worker_id, time} +_url_pending_lock = threading.Lock() +_url_database_path = None # set in ProxyAPIServer.__init__ for cross-db access + +# URL scoring defaults (overridden by configure_url_scoring) +_url_checktime = 3600 +_url_perfail_checktime = 3600 +_url_max_fail = 10 +_url_list_max_age_days = 7 + # Test rate tracking: worker_id -> list of (timestamp, count) tuples _worker_test_history = {} _worker_test_history_lock = threading.Lock() @@ -114,6 +125,15 @@ def configure_schedule(working_checktime, fail_retry_interval, fail_retry_backof _max_fail = max_fail +def configure_url_scoring(checktime, perfail_checktime, max_fail, list_max_age_days): + """Set URL scoring parameters from config.""" + global _url_checktime, _url_perfail_checktime, _url_max_fail, _url_list_max_age_days + _url_checktime = checktime + _url_perfail_checktime = perfail_checktime + _url_max_fail = max_fail + _url_list_max_age_days = list_max_age_days + + def _build_due_condition(): """Build SQL condition for proxy due check. @@ -431,7 +451,16 @@ def claim_work(db, worker_id, count=100): return claimed def claim_urls(url_db, worker_id, count=5): - """Claim a batch of URLs for worker-driven fetching. Returns list of URL dicts.""" + """Claim a batch of URLs for worker-driven fetching. Returns list of URL dicts. + + Uses score-based scheduling: high-yield URLs checked more often, + stale/broken ones less. Score components: + - age/interval: 1.0 when due, >1.0 when overdue + - yield_bonus: capped at 1.0 for high-yield sources + - quality_bonus: 0-0.5 based on working_ratio + - error_penalty: 0-2.0 based on consecutive errors + - stale_penalty: 0-1.0 based on unchanged fetches + """ now = time.time() now_int = int(now) @@ -441,36 +470,37 @@ def claim_urls(url_db, worker_id, count=5): except ImportError: detect_proto_from_path = None - # Clean expired URL claims + # Clean expired URL claims and pending counts with _url_claims_lock: stale = [k for k, v in _url_claims.items() if now - v['claimed_at'] > _url_claim_timeout] for k in stale: del _url_claims[k] claimed_urls = set(_url_claims.keys()) - # Reuse the same scheduling formula as ppf.py main loop (line 800-805): - # WHERE error < max_fail - # AND (check_time + checktime + ((error + stale_count) * perfail_checktime) < now) - # AND (added > min_added OR proxies_added > 0) - # Use defaults matching config.ppf: checktime=3600, perfail_checktime=3600, max_fail=10 - # list_max_age_days=30 - checktime = 3600 - perfail_checktime = 3600 - max_fail = 10 - list_max_age_seconds = 30 * 86400 + with _url_pending_lock: + stale_pending = [k for k, v in _url_pending_counts.items() if now - v['time'] > 600] + for k in stale_pending: + del _url_pending_counts[k] + + list_max_age_seconds = _url_list_max_age_days * 86400 min_added = now_int - list_max_age_seconds try: rows = url_db.execute( - '''SELECT url, content_hash, check_time, error, stale_count, - retrievals, proxies_added - FROM uris - WHERE error < ? - AND (check_time + ? + ((error + stale_count) * ?) < ?) - AND (added > ? OR proxies_added > 0) - ORDER BY RANDOM() - LIMIT ?''', - (max_fail, checktime, perfail_checktime, now_int, min_added, count * 3) + '''SELECT url, content_hash, + (? - check_time) * 1.0 / MAX(COALESCE(check_interval, 3600), 1) + + MIN(COALESCE(yield_rate, 0) / 100.0, 1.0) + + COALESCE(working_ratio, 0) * 0.5 + - MIN(error * 0.3, 2.0) + - MIN(stale_count * 0.1, 1.0) + AS score + FROM uris + WHERE error < ? + AND (? - check_time) * 1.0 / MAX(COALESCE(check_interval, 3600), 1) >= 0.8 + AND (added > ? OR proxies_added > 0) + ORDER BY score DESC + LIMIT ?''', + (now_int, _url_max_fail, now_int, min_added, count * 3) ).fetchall() except Exception as e: _log('claim_urls query error: %s' % e, 'error') @@ -504,9 +534,19 @@ def claim_urls(url_db, worker_id, count=5): def submit_url_reports(url_db, worker_id, reports): - """Process URL fetch feedback from workers. Returns count of processed reports.""" + """Process URL fetch feedback from workers. Returns count of processed reports. + + Updates EMA metrics per URL: + - avg_fetch_time: exponential moving average of fetch latency + - check_interval: adaptive interval (shrinks for productive URLs, grows for stale) + - yield_rate: EMA of proxy count per fetch (on changed content) + - last_worker: worker that last fetched this URL + + Stores pending proxy count for working_ratio correlation in submit_proxy_reports. + """ processed = 0 now_int = int(time.time()) + alpha = 0.3 # EMA smoothing factor for r in reports: url = r.get('url', '') @@ -523,10 +563,36 @@ def submit_url_reports(url_db, worker_id, reports): content_hash = r.get('content_hash') proxy_count = r.get('proxy_count', 0) changed = r.get('changed', False) + fetch_time_ms = r.get('fetch_time_ms', 0) + + # Fetch current row for EMA computation + row = url_db.execute( + '''SELECT check_interval, avg_fetch_time, yield_rate + FROM uris WHERE url = ?''', (url,) + ).fetchone() + if not row: + processed += 1 + continue + + old_interval = row[0] if row[0] is not None else 3600 + old_fetch_time = row[1] if row[1] is not None else 0 + old_yield = row[2] if row[2] is not None else 0.0 + + # EMA: avg_fetch_time + if old_fetch_time > 0 and fetch_time_ms > 0: + new_fetch_time = int(alpha * fetch_time_ms + (1 - alpha) * old_fetch_time) + elif fetch_time_ms > 0: + new_fetch_time = fetch_time_ms + else: + new_fetch_time = old_fetch_time if success: - if changed: - # Content changed: reset stale_count, update hash, add proxy count + if changed and proxy_count > 0: + # Success + changed + proxies: converge interval toward 15min + new_interval = max(900, int(old_interval * 0.9)) + # EMA: yield_rate + new_yield = alpha * proxy_count + (1 - alpha) * old_yield + url_db.execute( '''UPDATE uris SET check_time = ?, @@ -534,30 +600,54 @@ def submit_url_reports(url_db, worker_id, reports): error = 0, stale_count = 0, content_hash = ?, - proxies_added = proxies_added + ? + proxies_added = proxies_added + ?, + check_interval = ?, + avg_fetch_time = ?, + yield_rate = ?, + last_worker = ? WHERE url = ?''', - (now_int, content_hash, proxy_count, url) + (now_int, content_hash, proxy_count, new_interval, + new_fetch_time, new_yield, worker_id, url) ) + + # Store pending count for working_ratio correlation + with _url_pending_lock: + _url_pending_counts[url] = { + 'total': proxy_count, + 'worker_id': worker_id, + 'time': time.time(), + } else: - # Content unchanged: increment stale_count + # Success + unchanged (or no proxies): drift interval toward 24h + new_interval = min(86400, int(old_interval * 1.25)) + url_db.execute( '''UPDATE uris SET check_time = ?, retrievals = retrievals + 1, error = 0, stale_count = stale_count + 1, - content_hash = ? + content_hash = ?, + check_interval = ?, + avg_fetch_time = ?, + last_worker = ? WHERE url = ?''', - (now_int, content_hash, url) + (now_int, content_hash, new_interval, + new_fetch_time, worker_id, url) ) else: - # Fetch failed: increment error count + # Failure: back off faster + new_interval = min(86400, int(old_interval * 1.5)) + url_db.execute( '''UPDATE uris SET check_time = ?, - error = error + 1 + error = error + 1, + check_interval = ?, + avg_fetch_time = ?, + last_worker = ? WHERE url = ?''', - (now_int, url) + (now_int, new_interval, new_fetch_time, worker_id, url) ) processed += 1 @@ -568,16 +658,68 @@ def submit_url_reports(url_db, worker_id, reports): return processed +def _update_url_working_ratios(url_working_counts): + """Correlate working proxy counts with pending totals to update working_ratio. + + Called after submit_proxy_reports processes all proxies. For each source_url + with a pending entry from submit_url_reports, computes: + ratio = working_count / pending_total + working_ratio = alpha * ratio + (1 - alpha) * old_working_ratio + """ + if not url_working_counts or not _url_database_path: + return + + alpha = 0.3 + settled = [] + + with _url_pending_lock: + pending_snapshot = dict(_url_pending_counts) + + try: + url_db = mysqlite.mysqlite(_url_database_path, str) + for url, working_count in url_working_counts.items(): + pending = pending_snapshot.get(url) + if not pending or pending['total'] <= 0: + continue + + ratio = min(float(working_count) / pending['total'], 1.0) + + row = url_db.execute( + 'SELECT working_ratio FROM uris WHERE url = ?', (url,) + ).fetchone() + old_ratio = row[0] if row and row[0] is not None else 0.0 + new_ratio = alpha * ratio + (1 - alpha) * old_ratio + + url_db.execute( + 'UPDATE uris SET working_ratio = ? WHERE url = ?', + (new_ratio, url) + ) + settled.append(url) + + url_db.commit() + url_db.close() + except Exception as e: + _log('_update_url_working_ratios error: %s' % e, 'error') + + # Remove settled entries from pending + if settled: + with _url_pending_lock: + for url in settled: + _url_pending_counts.pop(url, None) + + def submit_proxy_reports(db, worker_id, proxies): """Process working-proxy reports from workers. Returns count of processed proxies. Simplified trust-based model: workers report only working proxies. Each proxy is upserted with failed=0, last_seen=now, latency updated. + Also tracks per-URL working counts for working_ratio correlation. """ global _last_workers_save processed = 0 now_int = int(time.time()) now = time.time() + url_working_counts = {} # source_url -> working count for p in proxies: ip = p.get('ip', '') @@ -615,6 +757,10 @@ def submit_proxy_reports(db, worker_id, proxies): except Exception: pass + # Track per-URL working count for working_ratio + if source_url: + url_working_counts[source_url] = url_working_counts.get(source_url, 0) + 1 + processed += 1 except Exception as e: _log('submit_proxy_reports error for %s: %s' % (proxy_key, e), 'error') @@ -622,6 +768,10 @@ def submit_proxy_reports(db, worker_id, proxies): # Commit database changes db.commit() + # Update working_ratio for source URLs + if url_working_counts: + _update_url_working_ratios(url_working_counts) + # Update worker stats with _workers_lock: if worker_id in _workers: @@ -1398,6 +1548,8 @@ class ProxyAPIServer(threading.Thread): self.stats_provider = stats_provider self.profiling = profiling self.daemon = True + global _url_database_path + _url_database_path = url_database self.server = None self._stop_event = threading.Event() if not GEVENT_PATCHED else None # Load static library files into cache