diff --git a/httpd.py b/httpd.py index fedc7ba..0ffb092 100644 --- a/httpd.py +++ b/httpd.py @@ -568,6 +568,75 @@ def submit_url_reports(url_db, worker_id, reports): return processed +def submit_proxy_reports(db, worker_id, proxies): + """Process working-proxy reports from workers. Returns count of processed proxies. + + Simplified trust-based model: workers report only working proxies. + Each proxy is upserted with failed=0, last_seen=now, latency updated. + """ + global _last_workers_save + processed = 0 + now_int = int(time.time()) + now = time.time() + + for p in proxies: + ip = p.get('ip', '') + port = p.get('port', 0) + if not ip or not port: + continue + + proxy_key = '%s:%s' % (ip, port) + proto = p.get('proto', 'http') + latency = p.get('latency', 0) + source_url = p.get('source_url') + + try: + # Upsert: insert new proxy or update existing as working + db.execute(''' + INSERT INTO proxylist (proxy, ip, port, proto, failed, tested, added, + avg_latency, last_seen) + VALUES (?, ?, ?, ?, 0, ?, ?, ?, ?) + ON CONFLICT(proxy) DO UPDATE SET + failed = 0, + tested = excluded.tested, + proto = excluded.proto, + avg_latency = excluded.avg_latency, + last_seen = excluded.last_seen + ''', (proxy_key, ip, port, proto, now_int, now_int, latency, now_int)) + + # Geolocate if IP2Location available + if _geolite and _geodb: + try: + rec = _geodb.get_all(ip) + if rec and rec.country_short and rec.country_short != '-': + db.execute( + 'UPDATE proxylist SET country=? WHERE proxy=?', + (rec.country_short, proxy_key)) + except Exception: + pass + + processed += 1 + except Exception as e: + _log('submit_proxy_reports error for %s: %s' % (proxy_key, e), 'error') + + # Commit database changes + db.commit() + + # Update worker stats + with _workers_lock: + if worker_id in _workers: + w = _workers[worker_id] + w['proxies_working'] = w.get('proxies_working', 0) + processed + w['last_seen'] = now + + # Save workers periodically + if now - _last_workers_save > 60: + save_workers() + _last_workers_save = now + + return processed + + _last_workers_save = 0 def submit_results(db, worker_id, results): @@ -1438,6 +1507,9 @@ class ProxyAPIServer(threading.Thread): '/api/results': 'submit test results (POST, params: key)', '/api/register': 'register as worker (POST)', '/api/workers': 'list connected workers', + '/api/claim-urls': 'claim URL batch for worker-driven fetching (GET, params: key, count)', + '/api/report-urls': 'report URL fetch results (POST, params: key)', + '/api/report-proxies': 'report working proxies (POST, params: key)', '/proxies': 'list working proxies (params: limit, proto, country, asn)', '/proxies/all': 'list ALL working proxies (params: proto, country, asn, format)', '/proxies/count': 'count working proxies', @@ -1818,6 +1890,30 @@ class ProxyAPIServer(threading.Thread): except Exception as e: return json.dumps({'error': str(e)}), 'application/json', 500 + elif path == '/api/report-proxies': + # Worker reports working proxies (POST) + key = query_params.get('key', '') + if not validate_worker_key(key): + return json.dumps({'error': 'invalid worker key'}), 'application/json', 403 + worker_id, _ = get_worker_by_key(key) + if not worker_id: + return json.dumps({'error': 'worker not found'}), 'application/json', 404 + if not post_data: + return json.dumps({'error': 'POST body required'}), 'application/json', 400 + proxies = post_data.get('proxies', []) + if not proxies: + return json.dumps({'error': 'no proxies provided'}), 'application/json', 400 + try: + db = mysqlite.mysqlite(self.database, str) + processed = submit_proxy_reports(db, worker_id, proxies) + update_worker_heartbeat(worker_id) + return json.dumps({ + 'worker_id': worker_id, + 'processed': processed, + }), 'application/json', 200 + except Exception as e: + return json.dumps({'error': str(e)}), 'application/json', 500 + else: return json.dumps({'error': 'not found'}), 'application/json', 404