httpd: add score-based url scheduling with EMA tracking
Replace ORDER BY RANDOM() in claim_urls with composite score: age/interval ratio, yield bonus, quality bonus, error/stale penalties. Rewrite submit_url_reports with adaptive check_interval and EMA for avg_fetch_time and yield_rate. Add working_ratio correlation in submit_proxy_reports via pending count tracking.
This commit is contained in:
216
httpd.py
216
httpd.py
@@ -85,6 +85,17 @@ _url_claims = {} # url -> {worker_id, claimed_at}
|
||||
_url_claims_lock = threading.Lock()
|
||||
_url_claim_timeout = 600 # 10 min (URLs take longer to fetch+extract)
|
||||
|
||||
# URL scoring: pending proxy counts for working_ratio correlation
|
||||
_url_pending_counts = {} # url -> {total, worker_id, time}
|
||||
_url_pending_lock = threading.Lock()
|
||||
_url_database_path = None # set in ProxyAPIServer.__init__ for cross-db access
|
||||
|
||||
# URL scoring defaults (overridden by configure_url_scoring)
|
||||
_url_checktime = 3600
|
||||
_url_perfail_checktime = 3600
|
||||
_url_max_fail = 10
|
||||
_url_list_max_age_days = 7
|
||||
|
||||
# Test rate tracking: worker_id -> list of (timestamp, count) tuples
|
||||
_worker_test_history = {}
|
||||
_worker_test_history_lock = threading.Lock()
|
||||
@@ -114,6 +125,15 @@ def configure_schedule(working_checktime, fail_retry_interval, fail_retry_backof
|
||||
_max_fail = max_fail
|
||||
|
||||
|
||||
def configure_url_scoring(checktime, perfail_checktime, max_fail, list_max_age_days):
|
||||
"""Set URL scoring parameters from config."""
|
||||
global _url_checktime, _url_perfail_checktime, _url_max_fail, _url_list_max_age_days
|
||||
_url_checktime = checktime
|
||||
_url_perfail_checktime = perfail_checktime
|
||||
_url_max_fail = max_fail
|
||||
_url_list_max_age_days = list_max_age_days
|
||||
|
||||
|
||||
def _build_due_condition():
|
||||
"""Build SQL condition for proxy due check.
|
||||
|
||||
@@ -431,7 +451,16 @@ def claim_work(db, worker_id, count=100):
|
||||
return claimed
|
||||
|
||||
def claim_urls(url_db, worker_id, count=5):
|
||||
"""Claim a batch of URLs for worker-driven fetching. Returns list of URL dicts."""
|
||||
"""Claim a batch of URLs for worker-driven fetching. Returns list of URL dicts.
|
||||
|
||||
Uses score-based scheduling: high-yield URLs checked more often,
|
||||
stale/broken ones less. Score components:
|
||||
- age/interval: 1.0 when due, >1.0 when overdue
|
||||
- yield_bonus: capped at 1.0 for high-yield sources
|
||||
- quality_bonus: 0-0.5 based on working_ratio
|
||||
- error_penalty: 0-2.0 based on consecutive errors
|
||||
- stale_penalty: 0-1.0 based on unchanged fetches
|
||||
"""
|
||||
now = time.time()
|
||||
now_int = int(now)
|
||||
|
||||
@@ -441,36 +470,37 @@ def claim_urls(url_db, worker_id, count=5):
|
||||
except ImportError:
|
||||
detect_proto_from_path = None
|
||||
|
||||
# Clean expired URL claims
|
||||
# Clean expired URL claims and pending counts
|
||||
with _url_claims_lock:
|
||||
stale = [k for k, v in _url_claims.items() if now - v['claimed_at'] > _url_claim_timeout]
|
||||
for k in stale:
|
||||
del _url_claims[k]
|
||||
claimed_urls = set(_url_claims.keys())
|
||||
|
||||
# Reuse the same scheduling formula as ppf.py main loop (line 800-805):
|
||||
# WHERE error < max_fail
|
||||
# AND (check_time + checktime + ((error + stale_count) * perfail_checktime) < now)
|
||||
# AND (added > min_added OR proxies_added > 0)
|
||||
# Use defaults matching config.ppf: checktime=3600, perfail_checktime=3600, max_fail=10
|
||||
# list_max_age_days=30
|
||||
checktime = 3600
|
||||
perfail_checktime = 3600
|
||||
max_fail = 10
|
||||
list_max_age_seconds = 30 * 86400
|
||||
with _url_pending_lock:
|
||||
stale_pending = [k for k, v in _url_pending_counts.items() if now - v['time'] > 600]
|
||||
for k in stale_pending:
|
||||
del _url_pending_counts[k]
|
||||
|
||||
list_max_age_seconds = _url_list_max_age_days * 86400
|
||||
min_added = now_int - list_max_age_seconds
|
||||
|
||||
try:
|
||||
rows = url_db.execute(
|
||||
'''SELECT url, content_hash, check_time, error, stale_count,
|
||||
retrievals, proxies_added
|
||||
FROM uris
|
||||
WHERE error < ?
|
||||
AND (check_time + ? + ((error + stale_count) * ?) < ?)
|
||||
AND (added > ? OR proxies_added > 0)
|
||||
ORDER BY RANDOM()
|
||||
LIMIT ?''',
|
||||
(max_fail, checktime, perfail_checktime, now_int, min_added, count * 3)
|
||||
'''SELECT url, content_hash,
|
||||
(? - check_time) * 1.0 / MAX(COALESCE(check_interval, 3600), 1)
|
||||
+ MIN(COALESCE(yield_rate, 0) / 100.0, 1.0)
|
||||
+ COALESCE(working_ratio, 0) * 0.5
|
||||
- MIN(error * 0.3, 2.0)
|
||||
- MIN(stale_count * 0.1, 1.0)
|
||||
AS score
|
||||
FROM uris
|
||||
WHERE error < ?
|
||||
AND (? - check_time) * 1.0 / MAX(COALESCE(check_interval, 3600), 1) >= 0.8
|
||||
AND (added > ? OR proxies_added > 0)
|
||||
ORDER BY score DESC
|
||||
LIMIT ?''',
|
||||
(now_int, _url_max_fail, now_int, min_added, count * 3)
|
||||
).fetchall()
|
||||
except Exception as e:
|
||||
_log('claim_urls query error: %s' % e, 'error')
|
||||
@@ -504,9 +534,19 @@ def claim_urls(url_db, worker_id, count=5):
|
||||
|
||||
|
||||
def submit_url_reports(url_db, worker_id, reports):
|
||||
"""Process URL fetch feedback from workers. Returns count of processed reports."""
|
||||
"""Process URL fetch feedback from workers. Returns count of processed reports.
|
||||
|
||||
Updates EMA metrics per URL:
|
||||
- avg_fetch_time: exponential moving average of fetch latency
|
||||
- check_interval: adaptive interval (shrinks for productive URLs, grows for stale)
|
||||
- yield_rate: EMA of proxy count per fetch (on changed content)
|
||||
- last_worker: worker that last fetched this URL
|
||||
|
||||
Stores pending proxy count for working_ratio correlation in submit_proxy_reports.
|
||||
"""
|
||||
processed = 0
|
||||
now_int = int(time.time())
|
||||
alpha = 0.3 # EMA smoothing factor
|
||||
|
||||
for r in reports:
|
||||
url = r.get('url', '')
|
||||
@@ -523,10 +563,36 @@ def submit_url_reports(url_db, worker_id, reports):
|
||||
content_hash = r.get('content_hash')
|
||||
proxy_count = r.get('proxy_count', 0)
|
||||
changed = r.get('changed', False)
|
||||
fetch_time_ms = r.get('fetch_time_ms', 0)
|
||||
|
||||
# Fetch current row for EMA computation
|
||||
row = url_db.execute(
|
||||
'''SELECT check_interval, avg_fetch_time, yield_rate
|
||||
FROM uris WHERE url = ?''', (url,)
|
||||
).fetchone()
|
||||
if not row:
|
||||
processed += 1
|
||||
continue
|
||||
|
||||
old_interval = row[0] if row[0] is not None else 3600
|
||||
old_fetch_time = row[1] if row[1] is not None else 0
|
||||
old_yield = row[2] if row[2] is not None else 0.0
|
||||
|
||||
# EMA: avg_fetch_time
|
||||
if old_fetch_time > 0 and fetch_time_ms > 0:
|
||||
new_fetch_time = int(alpha * fetch_time_ms + (1 - alpha) * old_fetch_time)
|
||||
elif fetch_time_ms > 0:
|
||||
new_fetch_time = fetch_time_ms
|
||||
else:
|
||||
new_fetch_time = old_fetch_time
|
||||
|
||||
if success:
|
||||
if changed:
|
||||
# Content changed: reset stale_count, update hash, add proxy count
|
||||
if changed and proxy_count > 0:
|
||||
# Success + changed + proxies: converge interval toward 15min
|
||||
new_interval = max(900, int(old_interval * 0.9))
|
||||
# EMA: yield_rate
|
||||
new_yield = alpha * proxy_count + (1 - alpha) * old_yield
|
||||
|
||||
url_db.execute(
|
||||
'''UPDATE uris SET
|
||||
check_time = ?,
|
||||
@@ -534,30 +600,54 @@ def submit_url_reports(url_db, worker_id, reports):
|
||||
error = 0,
|
||||
stale_count = 0,
|
||||
content_hash = ?,
|
||||
proxies_added = proxies_added + ?
|
||||
proxies_added = proxies_added + ?,
|
||||
check_interval = ?,
|
||||
avg_fetch_time = ?,
|
||||
yield_rate = ?,
|
||||
last_worker = ?
|
||||
WHERE url = ?''',
|
||||
(now_int, content_hash, proxy_count, url)
|
||||
(now_int, content_hash, proxy_count, new_interval,
|
||||
new_fetch_time, new_yield, worker_id, url)
|
||||
)
|
||||
|
||||
# Store pending count for working_ratio correlation
|
||||
with _url_pending_lock:
|
||||
_url_pending_counts[url] = {
|
||||
'total': proxy_count,
|
||||
'worker_id': worker_id,
|
||||
'time': time.time(),
|
||||
}
|
||||
else:
|
||||
# Content unchanged: increment stale_count
|
||||
# Success + unchanged (or no proxies): drift interval toward 24h
|
||||
new_interval = min(86400, int(old_interval * 1.25))
|
||||
|
||||
url_db.execute(
|
||||
'''UPDATE uris SET
|
||||
check_time = ?,
|
||||
retrievals = retrievals + 1,
|
||||
error = 0,
|
||||
stale_count = stale_count + 1,
|
||||
content_hash = ?
|
||||
content_hash = ?,
|
||||
check_interval = ?,
|
||||
avg_fetch_time = ?,
|
||||
last_worker = ?
|
||||
WHERE url = ?''',
|
||||
(now_int, content_hash, url)
|
||||
(now_int, content_hash, new_interval,
|
||||
new_fetch_time, worker_id, url)
|
||||
)
|
||||
else:
|
||||
# Fetch failed: increment error count
|
||||
# Failure: back off faster
|
||||
new_interval = min(86400, int(old_interval * 1.5))
|
||||
|
||||
url_db.execute(
|
||||
'''UPDATE uris SET
|
||||
check_time = ?,
|
||||
error = error + 1
|
||||
error = error + 1,
|
||||
check_interval = ?,
|
||||
avg_fetch_time = ?,
|
||||
last_worker = ?
|
||||
WHERE url = ?''',
|
||||
(now_int, url)
|
||||
(now_int, new_interval, new_fetch_time, worker_id, url)
|
||||
)
|
||||
|
||||
processed += 1
|
||||
@@ -568,16 +658,68 @@ def submit_url_reports(url_db, worker_id, reports):
|
||||
return processed
|
||||
|
||||
|
||||
def _update_url_working_ratios(url_working_counts):
|
||||
"""Correlate working proxy counts with pending totals to update working_ratio.
|
||||
|
||||
Called after submit_proxy_reports processes all proxies. For each source_url
|
||||
with a pending entry from submit_url_reports, computes:
|
||||
ratio = working_count / pending_total
|
||||
working_ratio = alpha * ratio + (1 - alpha) * old_working_ratio
|
||||
"""
|
||||
if not url_working_counts or not _url_database_path:
|
||||
return
|
||||
|
||||
alpha = 0.3
|
||||
settled = []
|
||||
|
||||
with _url_pending_lock:
|
||||
pending_snapshot = dict(_url_pending_counts)
|
||||
|
||||
try:
|
||||
url_db = mysqlite.mysqlite(_url_database_path, str)
|
||||
for url, working_count in url_working_counts.items():
|
||||
pending = pending_snapshot.get(url)
|
||||
if not pending or pending['total'] <= 0:
|
||||
continue
|
||||
|
||||
ratio = min(float(working_count) / pending['total'], 1.0)
|
||||
|
||||
row = url_db.execute(
|
||||
'SELECT working_ratio FROM uris WHERE url = ?', (url,)
|
||||
).fetchone()
|
||||
old_ratio = row[0] if row and row[0] is not None else 0.0
|
||||
new_ratio = alpha * ratio + (1 - alpha) * old_ratio
|
||||
|
||||
url_db.execute(
|
||||
'UPDATE uris SET working_ratio = ? WHERE url = ?',
|
||||
(new_ratio, url)
|
||||
)
|
||||
settled.append(url)
|
||||
|
||||
url_db.commit()
|
||||
url_db.close()
|
||||
except Exception as e:
|
||||
_log('_update_url_working_ratios error: %s' % e, 'error')
|
||||
|
||||
# Remove settled entries from pending
|
||||
if settled:
|
||||
with _url_pending_lock:
|
||||
for url in settled:
|
||||
_url_pending_counts.pop(url, None)
|
||||
|
||||
|
||||
def submit_proxy_reports(db, worker_id, proxies):
|
||||
"""Process working-proxy reports from workers. Returns count of processed proxies.
|
||||
|
||||
Simplified trust-based model: workers report only working proxies.
|
||||
Each proxy is upserted with failed=0, last_seen=now, latency updated.
|
||||
Also tracks per-URL working counts for working_ratio correlation.
|
||||
"""
|
||||
global _last_workers_save
|
||||
processed = 0
|
||||
now_int = int(time.time())
|
||||
now = time.time()
|
||||
url_working_counts = {} # source_url -> working count
|
||||
|
||||
for p in proxies:
|
||||
ip = p.get('ip', '')
|
||||
@@ -615,6 +757,10 @@ def submit_proxy_reports(db, worker_id, proxies):
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
# Track per-URL working count for working_ratio
|
||||
if source_url:
|
||||
url_working_counts[source_url] = url_working_counts.get(source_url, 0) + 1
|
||||
|
||||
processed += 1
|
||||
except Exception as e:
|
||||
_log('submit_proxy_reports error for %s: %s' % (proxy_key, e), 'error')
|
||||
@@ -622,6 +768,10 @@ def submit_proxy_reports(db, worker_id, proxies):
|
||||
# Commit database changes
|
||||
db.commit()
|
||||
|
||||
# Update working_ratio for source URLs
|
||||
if url_working_counts:
|
||||
_update_url_working_ratios(url_working_counts)
|
||||
|
||||
# Update worker stats
|
||||
with _workers_lock:
|
||||
if worker_id in _workers:
|
||||
@@ -1398,6 +1548,8 @@ class ProxyAPIServer(threading.Thread):
|
||||
self.stats_provider = stats_provider
|
||||
self.profiling = profiling
|
||||
self.daemon = True
|
||||
global _url_database_path
|
||||
_url_database_path = url_database
|
||||
self.server = None
|
||||
self._stop_event = threading.Event() if not GEVENT_PATCHED else None
|
||||
# Load static library files into cache
|
||||
|
||||
Reference in New Issue
Block a user