httpd: add /api/report-proxies endpoint
This commit is contained in:
96
httpd.py
96
httpd.py
@@ -568,6 +568,75 @@ def submit_url_reports(url_db, worker_id, reports):
|
||||
return processed
|
||||
|
||||
|
||||
def submit_proxy_reports(db, worker_id, proxies):
|
||||
"""Process working-proxy reports from workers. Returns count of processed proxies.
|
||||
|
||||
Simplified trust-based model: workers report only working proxies.
|
||||
Each proxy is upserted with failed=0, last_seen=now, latency updated.
|
||||
"""
|
||||
global _last_workers_save
|
||||
processed = 0
|
||||
now_int = int(time.time())
|
||||
now = time.time()
|
||||
|
||||
for p in proxies:
|
||||
ip = p.get('ip', '')
|
||||
port = p.get('port', 0)
|
||||
if not ip or not port:
|
||||
continue
|
||||
|
||||
proxy_key = '%s:%s' % (ip, port)
|
||||
proto = p.get('proto', 'http')
|
||||
latency = p.get('latency', 0)
|
||||
source_url = p.get('source_url')
|
||||
|
||||
try:
|
||||
# Upsert: insert new proxy or update existing as working
|
||||
db.execute('''
|
||||
INSERT INTO proxylist (proxy, ip, port, proto, failed, tested, added,
|
||||
avg_latency, last_seen)
|
||||
VALUES (?, ?, ?, ?, 0, ?, ?, ?, ?)
|
||||
ON CONFLICT(proxy) DO UPDATE SET
|
||||
failed = 0,
|
||||
tested = excluded.tested,
|
||||
proto = excluded.proto,
|
||||
avg_latency = excluded.avg_latency,
|
||||
last_seen = excluded.last_seen
|
||||
''', (proxy_key, ip, port, proto, now_int, now_int, latency, now_int))
|
||||
|
||||
# Geolocate if IP2Location available
|
||||
if _geolite and _geodb:
|
||||
try:
|
||||
rec = _geodb.get_all(ip)
|
||||
if rec and rec.country_short and rec.country_short != '-':
|
||||
db.execute(
|
||||
'UPDATE proxylist SET country=? WHERE proxy=?',
|
||||
(rec.country_short, proxy_key))
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
processed += 1
|
||||
except Exception as e:
|
||||
_log('submit_proxy_reports error for %s: %s' % (proxy_key, e), 'error')
|
||||
|
||||
# Commit database changes
|
||||
db.commit()
|
||||
|
||||
# Update worker stats
|
||||
with _workers_lock:
|
||||
if worker_id in _workers:
|
||||
w = _workers[worker_id]
|
||||
w['proxies_working'] = w.get('proxies_working', 0) + processed
|
||||
w['last_seen'] = now
|
||||
|
||||
# Save workers periodically
|
||||
if now - _last_workers_save > 60:
|
||||
save_workers()
|
||||
_last_workers_save = now
|
||||
|
||||
return processed
|
||||
|
||||
|
||||
_last_workers_save = 0
|
||||
|
||||
def submit_results(db, worker_id, results):
|
||||
@@ -1438,6 +1507,9 @@ class ProxyAPIServer(threading.Thread):
|
||||
'/api/results': 'submit test results (POST, params: key)',
|
||||
'/api/register': 'register as worker (POST)',
|
||||
'/api/workers': 'list connected workers',
|
||||
'/api/claim-urls': 'claim URL batch for worker-driven fetching (GET, params: key, count)',
|
||||
'/api/report-urls': 'report URL fetch results (POST, params: key)',
|
||||
'/api/report-proxies': 'report working proxies (POST, params: key)',
|
||||
'/proxies': 'list working proxies (params: limit, proto, country, asn)',
|
||||
'/proxies/all': 'list ALL working proxies (params: proto, country, asn, format)',
|
||||
'/proxies/count': 'count working proxies',
|
||||
@@ -1818,6 +1890,30 @@ class ProxyAPIServer(threading.Thread):
|
||||
except Exception as e:
|
||||
return json.dumps({'error': str(e)}), 'application/json', 500
|
||||
|
||||
elif path == '/api/report-proxies':
|
||||
# Worker reports working proxies (POST)
|
||||
key = query_params.get('key', '')
|
||||
if not validate_worker_key(key):
|
||||
return json.dumps({'error': 'invalid worker key'}), 'application/json', 403
|
||||
worker_id, _ = get_worker_by_key(key)
|
||||
if not worker_id:
|
||||
return json.dumps({'error': 'worker not found'}), 'application/json', 404
|
||||
if not post_data:
|
||||
return json.dumps({'error': 'POST body required'}), 'application/json', 400
|
||||
proxies = post_data.get('proxies', [])
|
||||
if not proxies:
|
||||
return json.dumps({'error': 'no proxies provided'}), 'application/json', 400
|
||||
try:
|
||||
db = mysqlite.mysqlite(self.database, str)
|
||||
processed = submit_proxy_reports(db, worker_id, proxies)
|
||||
update_worker_heartbeat(worker_id)
|
||||
return json.dumps({
|
||||
'worker_id': worker_id,
|
||||
'processed': processed,
|
||||
}), 'application/json', 200
|
||||
except Exception as e:
|
||||
return json.dumps({'error': str(e)}), 'application/json', 500
|
||||
|
||||
else:
|
||||
return json.dumps({'error': 'not found'}), 'application/json', 404
|
||||
|
||||
|
||||
Reference in New Issue
Block a user