Files
ppf/httpd.py
Username d1e22a388c
All checks were successful
CI / validate (push) Successful in 21s
httpd: add ASN enrichment for worker-reported proxies
Load pyasn database in httpd and look up ASN when workers report
working proxies. Previously ASN was only populated by proxywatchd
which doesn't run independently on the master node, leaving all
worker-reported proxies with asn=null.
2026-02-22 11:18:51 +01:00

2031 lines
80 KiB
Python

#!/usr/bin/env python2
# -*- coding: utf-8 -*-
from __future__ import division
"""HTTP API server with advanced web dashboard for PPF."""
try:
from ppf import __version__
except ImportError:
__version__ = 'unknown'
import BaseHTTPServer
import json
import threading
import time
import os
import gc
import sys
from collections import defaultdict
import mysqlite
from misc import _log
from dbs import (create_verification_tables, insert_proxy_result,
check_for_disagreement, queue_verification,
get_verification_stats, get_all_worker_trust)
# IP geolocation (optional)
try:
import IP2Location
_geodb = IP2Location.IP2Location(os.path.join("data", "IP2LOCATION-LITE-DB1.BIN"))
_geolite = True
except (ImportError, IOError, ValueError):
_geodb = None
_geolite = False
# ASN lookup (optional)
try:
import pyasn
_asndb = pyasn.pyasn(os.path.join("data", "ipasn.dat"))
except (ImportError, IOError):
_asndb = None
# Rate limiting configuration
_rate_limits = defaultdict(list)
_rate_lock = threading.Lock()
_rate_limit_requests = 300 # requests per window (increased for workers)
_rate_limit_window = 60 # window in seconds
# Static directories (relative to this file)
_STATIC_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'static')
_STATIC_LIB_DIR = os.path.join(_STATIC_DIR, 'lib')
# Content type mapping for static files
_CONTENT_TYPES = {
'.js': 'application/javascript; charset=utf-8',
'.css': 'text/css; charset=utf-8',
'.json': 'application/json; charset=utf-8',
'.png': 'image/png',
'.svg': 'image/svg+xml',
'.woff': 'font/woff',
'.woff2': 'font/woff2',
}
# Cache for static library files (loaded once at startup)
_LIB_CACHE = {}
# Cache for dashboard static files (HTML, CSS, JS)
_STATIC_CACHE = {}
# Cache for expensive operations
_db_health_cache = {'value': {}, 'time': 0}
_db_health_ttl = 10 # seconds
# Simple RSS tracking for dashboard
_peak_rss = 0
_start_rss = 0
# Worker registry for distributed testing
import hashlib
import random
import string
_workers = {} # worker_id -> {name, ip, last_seen, jobs_completed, proxies_tested, ...}
_workers_lock = threading.Lock()
_worker_keys = set() # valid API keys
_master_key = None # master key for worker registration
_workers_file = 'data/workers.json' # persistent storage
# URL claim tracking (parallel to proxy claims)
_url_claims = {} # url -> {worker_id, claimed_at}
_url_claims_lock = threading.Lock()
_url_claim_timeout = 600 # 10 min (URLs take longer to fetch+extract)
# URL scoring: pending proxy counts for working_ratio correlation
_url_pending_counts = {} # url -> {total, worker_id, time}
_url_pending_lock = threading.Lock()
_url_database_path = None # set in ProxyAPIServer.__init__ for cross-db access
# URL scoring defaults (overridden by configure_url_scoring)
_url_checktime = 3600
_url_perfail_checktime = 3600
_url_max_fail = 10
_url_list_max_age_days = 7
# Test rate tracking: worker_id -> list of (timestamp, count) tuples
_worker_test_history = {}
_worker_test_history_lock = threading.Lock()
_test_history_window = 120 # seconds to keep test history for rate calculation
# Session tracking
_session_start_time = int(time.time()) # when httpd started
# Testing schedule configuration (set from config at startup)
_working_checktime = 300 # retest interval for working proxies (failed=0)
_fail_retry_interval = 60 # retry interval for failing proxies
_fail_retry_backoff = True # True=linear backoff (60,120,180...), False=fixed (60,60,60...)
_max_fail = 5 # failures before proxy considered dead
def configure_schedule(working_checktime, fail_retry_interval, fail_retry_backoff, max_fail):
"""Set testing schedule parameters from config."""
global _working_checktime, _fail_retry_interval, _fail_retry_backoff, _max_fail
_working_checktime = working_checktime
_fail_retry_interval = fail_retry_interval
_fail_retry_backoff = fail_retry_backoff
_max_fail = max_fail
def configure_url_scoring(checktime, perfail_checktime, max_fail, list_max_age_days):
"""Set URL scoring parameters from config."""
global _url_checktime, _url_perfail_checktime, _url_max_fail, _url_list_max_age_days
_url_checktime = checktime
_url_perfail_checktime = perfail_checktime
_url_max_fail = max_fail
_url_list_max_age_days = list_max_age_days
def _build_due_condition():
"""Build SQL condition for proxy due check.
Returns (condition_sql, params) where condition_sql is the WHERE clause
and params is a tuple of parameter values.
Formula:
- failed=0: tested + working_checktime < now
- failed>0 with backoff: tested + (failed * fail_retry_interval) < now
- failed>0 without backoff: tested + fail_retry_interval < now
"""
now_int = int(time.time())
if _fail_retry_backoff:
# Linear backoff: multiply interval by failure count
condition = '''
failed >= 0 AND failed < ?
AND (tested IS NULL OR
CASE WHEN failed = 0
THEN tested + ? < ?
ELSE tested + (failed * ?) < ?
END)
'''
params = (_max_fail, _working_checktime, now_int, _fail_retry_interval, now_int)
else:
# Fixed interval: same delay regardless of failure count
condition = '''
failed >= 0 AND failed < ?
AND (tested IS NULL OR
CASE WHEN failed = 0
THEN tested + ? < ?
ELSE tested + ? < ?
END)
'''
params = (_max_fail, _working_checktime, now_int, _fail_retry_interval, now_int)
return condition, params
def load_workers():
"""Load worker registry from disk."""
global _workers, _worker_keys
try:
if os.path.exists(_workers_file):
with open(_workers_file, 'r') as f:
data = json.load(f)
_workers = data.get('workers', {})
_worker_keys = set(data.get('keys', []))
_log('loaded %d workers from %s' % (len(_workers), _workers_file), 'info')
except Exception as e:
_log('failed to load workers: %s' % e, 'warn')
def save_workers():
"""Save worker registry to disk."""
try:
data = {
'workers': _workers,
'keys': list(_worker_keys),
'saved': time.time(),
}
with open(_workers_file, 'w') as f:
json.dump(data, f, indent=2)
except Exception as e:
_log('failed to save workers: %s' % e, 'warn')
def generate_worker_key():
"""Generate a random worker API key."""
chars = string.ascii_letters + string.digits
return ''.join(random.choice(chars) for _ in range(32))
def generate_worker_id(name, ip):
"""Generate a unique worker ID from name and IP."""
data = '%s:%s:%s' % (name, ip, time.time())
return hashlib.sha256(data.encode()).hexdigest()[:16]
def register_worker(name, ip, key=None):
"""Register a new worker and return its credentials.
If a worker with same name exists, update its IP and reset key.
This allows workers to re-register after restarts.
"""
# Check for existing worker with same name
with _workers_lock:
for wid, info in _workers.items():
if info.get('name') == name:
# Re-registration: update IP, generate new key, preserve stats
new_key = key or generate_worker_key()
_worker_keys.discard(info.get('key')) # Remove old key
info['ip'] = ip
info['key'] = new_key
info['last_seen'] = time.time()
_worker_keys.add(new_key)
save_workers()
return wid, new_key
# New registration
worker_id = generate_worker_id(name, ip)
worker_key = key or generate_worker_key()
with _workers_lock:
_workers[worker_id] = {
'name': name,
'ip': ip,
'key': worker_key,
'registered': time.time(),
'last_seen': time.time(),
'jobs_completed': 0,
'proxies_tested': 0,
'proxies_working': 0,
'proxies_failed': 0,
'total_latency': 0,
'last_batch_size': 0,
'last_batch_working': 0,
}
_worker_keys.add(worker_key)
save_workers()
return worker_id, worker_key
def validate_worker_key(key):
"""Check if worker key is valid."""
if not key:
return False
# Master key allows all operations
if _master_key and key == _master_key:
return True
return key in _worker_keys
def get_worker_by_key(key):
"""Get worker info by API key."""
with _workers_lock:
for wid, info in _workers.items():
if info.get('key') == key:
return wid, info
return None, None
def update_worker_heartbeat(worker_id):
"""Update worker's last_seen timestamp."""
with _workers_lock:
if worker_id in _workers:
_workers[worker_id]['last_seen'] = time.time()
def record_test_rate(worker_id, count):
"""Record test submission for rate calculation."""
now = time.time()
with _worker_test_history_lock:
if worker_id not in _worker_test_history:
_worker_test_history[worker_id] = []
_worker_test_history[worker_id].append((now, count))
# Prune old entries
cutoff = now - _test_history_window
_worker_test_history[worker_id] = [
(t, c) for t, c in _worker_test_history[worker_id] if t > cutoff
]
def get_worker_test_rate(worker_id):
"""Calculate worker's test rate (tests/sec) over recent window."""
now = time.time()
with _worker_test_history_lock:
if worker_id not in _worker_test_history:
return 0.0
history = _worker_test_history[worker_id]
if not history:
return 0.0
# Sum tests in window and calculate rate
cutoff = now - _test_history_window
in_window = [(t, c) for t, c in history if t > cutoff]
if not in_window:
return 0.0
total_tests = sum(c for t, c in in_window)
oldest = min(t for t, c in in_window)
elapsed = now - oldest
if elapsed < 1:
return 0.0
return total_tests / elapsed
def claim_urls(url_db, worker_id, count=5):
"""Claim a batch of URLs for worker-driven fetching. Returns list of URL dicts.
Uses score-based scheduling: high-yield URLs checked more often,
stale/broken ones less. Score components:
- age/interval: 1.0 when due, >1.0 when overdue
- yield_bonus: capped at 1.0 for high-yield sources
- quality_bonus: 0-0.5 based on working_ratio
- error_penalty: 0-2.0 based on consecutive errors
- stale_penalty: 0-1.0 based on unchanged fetches
"""
now = time.time()
now_int = int(now)
# Import here to avoid circular dependency at module level
try:
from fetch import detect_proto_from_path
except ImportError:
detect_proto_from_path = None
# Clean expired URL claims and pending counts
with _url_claims_lock:
stale = [k for k, v in _url_claims.items() if now - v['claimed_at'] > _url_claim_timeout]
for k in stale:
del _url_claims[k]
claimed_urls = set(_url_claims.keys())
with _url_pending_lock:
stale_pending = [k for k, v in _url_pending_counts.items() if now - v['time'] > 600]
for k in stale_pending:
del _url_pending_counts[k]
list_max_age_seconds = _url_list_max_age_days * 86400
min_added = now_int - list_max_age_seconds
try:
rows = url_db.execute(
'''SELECT url, content_hash,
(? - check_time) * 1.0 / MAX(COALESCE(check_interval, 3600), 1)
+ MIN(COALESCE(yield_rate, 0) / 100.0, 1.0)
+ COALESCE(working_ratio, 0) * 0.5
- MIN(error * 0.3, 2.0)
- MIN(stale_count * 0.1, 1.0)
AS score
FROM uris
WHERE error < ?
AND (? - check_time) * 1.0 / MAX(COALESCE(check_interval, 3600), 1) >= 0.8
AND (added > ? OR proxies_added > 0)
ORDER BY score DESC
LIMIT ?''',
(now_int, _url_max_fail, now_int, min_added, count * 3)
).fetchall()
except Exception as e:
_log('claim_urls query error: %s' % e, 'error')
return []
# Filter out already-claimed URLs and lock new claims
claimed = []
with _url_claims_lock:
for row in rows:
url = row[0]
if url in _url_claims or url in claimed_urls:
continue
_url_claims[url] = {'worker_id': worker_id, 'claimed_at': now}
proto_hint = None
if detect_proto_from_path:
proto_hint = detect_proto_from_path(url)
claimed.append({
'url': url,
'last_hash': row[1],
'proto_hint': proto_hint,
})
if len(claimed) >= count:
break
if claimed:
_log('claim_urls: %d URLs to %s' % (len(claimed), worker_id[:8]), 'info')
return claimed
def submit_url_reports(url_db, worker_id, reports):
"""Process URL fetch feedback from workers. Returns count of processed reports.
Updates EMA metrics per URL:
- avg_fetch_time: exponential moving average of fetch latency
- check_interval: adaptive interval (shrinks for productive URLs, grows for stale)
- yield_rate: EMA of proxy count per fetch (on changed content)
- last_worker: worker that last fetched this URL
Stores pending proxy count for working_ratio correlation in submit_proxy_reports.
"""
processed = 0
now_int = int(time.time())
alpha = 0.3 # EMA smoothing factor
for r in reports:
url = r.get('url', '')
if not url:
continue
# Release URL claim
with _url_claims_lock:
if url in _url_claims:
del _url_claims[url]
try:
success = r.get('success', False)
content_hash = r.get('content_hash')
proxy_count = r.get('proxy_count', 0)
changed = r.get('changed', False)
fetch_time_ms = r.get('fetch_time_ms', 0)
# Fetch current row for EMA computation
row = url_db.execute(
'''SELECT check_interval, avg_fetch_time, yield_rate
FROM uris WHERE url = ?''', (url,)
).fetchone()
if not row:
processed += 1
continue
old_interval = row[0] if row[0] is not None else 3600
old_fetch_time = row[1] if row[1] is not None else 0
old_yield = row[2] if row[2] is not None else 0.0
# EMA: avg_fetch_time
if old_fetch_time > 0 and fetch_time_ms > 0:
new_fetch_time = int(alpha * fetch_time_ms + (1 - alpha) * old_fetch_time)
elif fetch_time_ms > 0:
new_fetch_time = fetch_time_ms
else:
new_fetch_time = old_fetch_time
if success:
if changed and proxy_count > 0:
# Success + changed + proxies: converge interval toward 15min
new_interval = max(900, int(old_interval * 0.9))
# EMA: yield_rate
new_yield = alpha * proxy_count + (1 - alpha) * old_yield
url_db.execute(
'''UPDATE uris SET
check_time = ?,
retrievals = retrievals + 1,
error = 0,
stale_count = 0,
content_hash = ?,
proxies_added = proxies_added + ?,
check_interval = ?,
avg_fetch_time = ?,
yield_rate = ?,
last_worker = ?
WHERE url = ?''',
(now_int, content_hash, proxy_count, new_interval,
new_fetch_time, new_yield, worker_id, url)
)
# Store pending count for working_ratio correlation
with _url_pending_lock:
_url_pending_counts[url] = {
'total': proxy_count,
'worker_id': worker_id,
'time': time.time(),
}
else:
# Success + unchanged (or no proxies): drift interval toward 24h
new_interval = min(86400, int(old_interval * 1.25))
url_db.execute(
'''UPDATE uris SET
check_time = ?,
retrievals = retrievals + 1,
error = 0,
stale_count = stale_count + 1,
content_hash = ?,
check_interval = ?,
avg_fetch_time = ?,
last_worker = ?
WHERE url = ?''',
(now_int, content_hash, new_interval,
new_fetch_time, worker_id, url)
)
else:
# Failure: back off faster
new_interval = min(86400, int(old_interval * 1.5))
url_db.execute(
'''UPDATE uris SET
check_time = ?,
error = error + 1,
check_interval = ?,
avg_fetch_time = ?,
last_worker = ?
WHERE url = ?''',
(now_int, new_interval, new_fetch_time, worker_id, url)
)
processed += 1
except Exception as e:
_log('submit_url_reports error for %s: %s' % (url, e), 'error')
url_db.commit()
return processed
def _update_url_working_ratios(url_working_counts):
"""Correlate working proxy counts with pending totals to update working_ratio.
Called after submit_proxy_reports processes all proxies. For each source_url
with a pending entry from submit_url_reports, computes:
ratio = working_count / pending_total
working_ratio = alpha * ratio + (1 - alpha) * old_working_ratio
"""
if not url_working_counts or not _url_database_path:
return
alpha = 0.3
settled = []
with _url_pending_lock:
pending_snapshot = dict(_url_pending_counts)
try:
url_db = mysqlite.mysqlite(_url_database_path, str)
for url, working_count in url_working_counts.items():
pending = pending_snapshot.get(url)
if not pending or pending['total'] <= 0:
continue
ratio = min(float(working_count) / pending['total'], 1.0)
row = url_db.execute(
'SELECT working_ratio FROM uris WHERE url = ?', (url,)
).fetchone()
old_ratio = row[0] if row and row[0] is not None else 0.0
new_ratio = alpha * ratio + (1 - alpha) * old_ratio
url_db.execute(
'UPDATE uris SET working_ratio = ? WHERE url = ?',
(new_ratio, url)
)
settled.append(url)
url_db.commit()
url_db.close()
except Exception as e:
_log('_update_url_working_ratios error: %s' % e, 'error')
# Remove settled entries from pending
if settled:
with _url_pending_lock:
for url in settled:
_url_pending_counts.pop(url, None)
def submit_proxy_reports(db, worker_id, proxies):
"""Process working-proxy reports from workers. Returns count of processed proxies.
Simplified trust-based model: workers report only working proxies.
Each proxy is upserted with failed=0, last_seen=now, latency updated.
Also tracks per-URL working counts for working_ratio correlation.
"""
global _last_workers_save
processed = 0
now_int = int(time.time())
now = time.time()
url_working_counts = {} # source_url -> working count
for p in proxies:
ip = p.get('ip', '')
port = p.get('port', 0)
if not ip or not port:
continue
proxy_key = '%s:%s' % (ip, port)
proto = p.get('proto', 'http')
latency = p.get('latency', 0)
source_url = p.get('source_url')
checktype = p.get('checktype', '')
target = p.get('target', '')
try:
# Upsert: insert new proxy or update existing as working
db.execute('''
INSERT INTO proxylist (proxy, ip, port, proto, failed, tested, added,
avg_latency, last_seen, success_count,
consecutive_success, last_check, last_target)
VALUES (?, ?, ?, ?, 0, ?, ?, ?, ?, 1, 1, ?, ?)
ON CONFLICT(proxy) DO UPDATE SET
failed = 0,
tested = excluded.tested,
proto = excluded.proto,
avg_latency = excluded.avg_latency,
last_seen = excluded.last_seen,
success_count = COALESCE(success_count, 0) + 1,
consecutive_success = COALESCE(consecutive_success, 0) + 1,
last_check = excluded.last_check,
last_target = excluded.last_target
''', (proxy_key, ip, port, proto, now_int, now_int, latency, now_int,
checktype, target))
# Geolocate and ASN lookup
if _geolite and _geodb:
try:
rec = _geodb.get_all(ip)
if rec and rec.country_short and rec.country_short != '-':
db.execute(
'UPDATE proxylist SET country=? WHERE proxy=?',
(rec.country_short, proxy_key))
except Exception:
pass
if _asndb:
try:
asn_result = _asndb.lookup(ip)
if asn_result and asn_result[0]:
db.execute(
'UPDATE proxylist SET asn=? WHERE proxy=?',
(asn_result[0], proxy_key))
except Exception:
pass
# Track per-URL working count for working_ratio
if source_url:
url_working_counts[source_url] = url_working_counts.get(source_url, 0) + 1
processed += 1
except Exception as e:
_log('submit_proxy_reports error for %s: %s' % (proxy_key, e), 'error')
# Commit database changes
db.commit()
# Update working_ratio for source URLs
if url_working_counts:
_update_url_working_ratios(url_working_counts)
# Update worker stats
with _workers_lock:
if worker_id in _workers:
w = _workers[worker_id]
w['proxies_working'] = w.get('proxies_working', 0) + processed
w['last_seen'] = now
# Save workers periodically
if now - _last_workers_save > 60:
save_workers()
_last_workers_save = now
return processed
def is_localhost(ip):
"""Check if IP is localhost (127.0.0.0/8 or ::1)."""
if not ip:
return False
# IPv6 localhost
if ip == '::1':
return True
# IPv4 localhost (127.0.0.0/8)
if ip.startswith('127.'):
return True
return False
def check_rate_limit(ip):
"""Check if IP is within rate limit. Returns True if allowed."""
if is_localhost(ip):
return True # No rate limit for localhost
now = time.time()
with _rate_lock:
# Clean old entries
_rate_limits[ip] = [t for t in _rate_limits[ip] if now - t < _rate_limit_window]
if len(_rate_limits[ip]) >= _rate_limit_requests:
return False
_rate_limits[ip].append(now)
return True
def get_security_headers(content_type):
"""Return security headers for responses."""
headers = [
('X-Content-Type-Options', 'nosniff'),
('X-Frame-Options', 'DENY'),
('Referrer-Policy', 'strict-origin-when-cross-origin'),
]
# Add CSP for HTML pages
if 'text/html' in content_type:
headers.append((
'Content-Security-Policy',
"default-src 'self'; "
"script-src 'self' 'unsafe-inline'; "
"style-src 'self' 'unsafe-inline'; "
"img-src 'self' data: https:; "
"connect-src 'self'"
))
return headers
def get_system_stats():
"""Collect system resource statistics."""
stats = {}
# Load average (1, 5, 15 min)
try:
load = os.getloadavg()
stats['load_1m'] = round(load[0], 2)
stats['load_5m'] = round(load[1], 2)
stats['load_15m'] = round(load[2], 2)
except (OSError, AttributeError):
stats['load_1m'] = stats['load_5m'] = stats['load_15m'] = 0
# CPU count
try:
stats['cpu_count'] = os.sysconf('SC_NPROCESSORS_ONLN')
except (ValueError, OSError, AttributeError):
stats['cpu_count'] = 1
# Memory from /proc/meminfo (Linux)
try:
with open('/proc/meminfo', 'r') as f:
meminfo = {}
for line in f:
parts = line.split()
if len(parts) >= 2:
meminfo[parts[0].rstrip(':')] = int(parts[1]) * 1024 # KB to bytes
total = meminfo.get('MemTotal', 0)
available = meminfo.get('MemAvailable', meminfo.get('MemFree', 0))
stats['mem_total'] = total
stats['mem_available'] = available
stats['mem_used'] = total - available
stats['mem_pct'] = round((total - available) / total * 100, 1) if total > 0 else 0
except (IOError, KeyError, ZeroDivisionError):
stats['mem_total'] = stats['mem_available'] = stats['mem_used'] = 0
stats['mem_pct'] = 0
# Disk usage for data directory
try:
st = os.statvfs('data' if os.path.exists('data') else '.')
total = st.f_blocks * st.f_frsize
free = st.f_bavail * st.f_frsize
used = total - free
stats['disk_total'] = total
stats['disk_free'] = free
stats['disk_used'] = used
stats['disk_pct'] = round(used / total * 100, 1) if total > 0 else 0
except (OSError, ZeroDivisionError):
stats['disk_total'] = stats['disk_free'] = stats['disk_used'] = 0
stats['disk_pct'] = 0
# Process stats from /proc/self/status
try:
with open('/proc/self/status', 'r') as f:
for line in f:
if line.startswith('VmRSS:'):
stats['proc_rss'] = int(line.split()[1]) * 1024 # KB to bytes
elif line.startswith('Threads:'):
stats['proc_threads'] = int(line.split()[1])
except (IOError, ValueError, IndexError):
stats['proc_rss'] = 0
stats['proc_threads'] = 0
# Simple RSS tracking for dashboard
global _peak_rss, _start_rss
rss = stats.get('proc_rss', 0)
if rss > 0:
if _start_rss == 0:
_start_rss = rss
if rss > _peak_rss:
_peak_rss = rss
stats['proc_rss_peak'] = _peak_rss
stats['proc_rss_start'] = _start_rss
stats['proc_rss_growth'] = rss - _start_rss if _start_rss > 0 else 0
# GC generation counts (fast - no object iteration)
try:
gc_counts = gc.get_count()
stats['gc_count_gen0'] = gc_counts[0]
stats['gc_count_gen1'] = gc_counts[1]
stats['gc_count_gen2'] = gc_counts[2]
except Exception:
stats['gc_count_gen0'] = stats['gc_count_gen1'] = stats['gc_count_gen2'] = 0
return stats
def get_db_health(db):
"""Get database health and statistics (cached)."""
global _db_health_cache
now = time.time()
if now - _db_health_cache['time'] < _db_health_ttl:
return _db_health_cache['value']
stats = {}
try:
# Database file size
db_path = db.path if hasattr(db, 'path') else 'data/proxies.sqlite'
if os.path.exists(db_path):
stats['db_size'] = os.path.getsize(db_path)
else:
stats['db_size'] = 0
# Page stats from pragma
row = db.execute('PRAGMA page_count').fetchone()
stats['page_count'] = row[0] if row else 0
row = db.execute('PRAGMA page_size').fetchone()
stats['page_size'] = row[0] if row else 0
row = db.execute('PRAGMA freelist_count').fetchone()
stats['freelist_count'] = row[0] if row else 0
# Anonymity breakdown
rows = db.execute(
'SELECT anonymity, COUNT(*) FROM proxylist WHERE failed=0 GROUP BY anonymity'
).fetchall()
stats['anonymity'] = {r[0] or 'unknown': r[1] for r in rows}
# Latency stats
row = db.execute(
'SELECT AVG(avg_latency), MIN(avg_latency), MAX(avg_latency) '
'FROM proxylist WHERE failed=0 AND avg_latency > 0'
).fetchone()
if row and row[0]:
stats['db_avg_latency'] = round(row[0], 1)
stats['db_min_latency'] = round(row[1], 1)
stats['db_max_latency'] = round(row[2], 1)
else:
stats['db_avg_latency'] = stats['db_min_latency'] = stats['db_max_latency'] = 0
# Recent activity
now = int(time.time())
row = db.execute(
'SELECT COUNT(*) FROM proxylist WHERE tested > ?', (now - 3600,)
).fetchone()
stats['tested_last_hour'] = row[0] if row else 0
row = db.execute(
'SELECT COUNT(*) FROM proxylist WHERE added > ?', (now - 86400,)
).fetchone()
stats['added_last_day'] = row[0] if row else 0
# Dead proxies count (permanently dead = -1, failing = positive)
row = db.execute(
'SELECT COUNT(*) FROM proxylist WHERE failed = -1'
).fetchone()
stats['dead_count'] = row[0] if row else 0
# Failing proxies count (positive fail count but not permanently dead)
row = db.execute(
'SELECT COUNT(*) FROM proxylist WHERE failed > 0'
).fetchone()
stats['failing_count'] = row[0] if row else 0
except Exception as e:
_log('get_db_health error: %s' % e, 'warn')
# Update cache
_db_health_cache['value'] = stats
_db_health_cache['time'] = time.time()
return stats
# Detect if gevent has monkey-patched the environment
try:
from gevent import monkey
GEVENT_PATCHED = monkey.is_module_patched('socket')
except ImportError:
GEVENT_PATCHED = False
if GEVENT_PATCHED:
from gevent.pywsgi import WSGIServer
def load_static_libs():
"""Load static library files into cache at startup."""
global _LIB_CACHE
if not os.path.isdir(_STATIC_LIB_DIR):
_log('static/lib directory not found: %s' % _STATIC_LIB_DIR, 'warn')
return
for fname in os.listdir(_STATIC_LIB_DIR):
fpath = os.path.join(_STATIC_LIB_DIR, fname)
if os.path.isfile(fpath):
try:
with open(fpath, 'rb') as f:
_LIB_CACHE[fname] = f.read()
_log('loaded static lib: %s (%d bytes)' % (fname, len(_LIB_CACHE[fname])), 'debug')
except IOError as e:
_log('failed to load %s: %s' % (fname, e), 'warn')
_log('loaded %d static library files' % len(_LIB_CACHE), 'info')
def get_static_lib(filename):
"""Get a cached static library file."""
return _LIB_CACHE.get(filename)
def load_static_files(theme):
"""Load dashboard static files into cache at startup.
Args:
theme: dict of color name -> color value for CSS variable substitution
"""
global _STATIC_CACHE
files = {
'dashboard.html': 'static/dashboard.html',
'map.html': 'static/map.html',
'mitm.html': 'static/mitm.html',
'workers.html': 'static/workers.html',
'style.css': 'static/style.css',
'dashboard.js': 'static/dashboard.js',
'map.js': 'static/map.js',
'mitm.js': 'static/mitm.js',
}
for key, relpath in files.items():
fpath = os.path.join(os.path.dirname(os.path.abspath(__file__)), relpath)
if os.path.isfile(fpath):
try:
with open(fpath, 'rb') as f:
content = f.read()
# Apply theme substitution to CSS
if key == 'style.css' and theme:
for name, val in theme.items():
content = content.replace('{' + name + '}', val)
_STATIC_CACHE[key] = content
_log('loaded static file: %s (%d bytes)' % (key, len(content)), 'debug')
except IOError as e:
_log('failed to load %s: %s' % (fpath, e), 'warn')
else:
_log('static file not found: %s' % fpath, 'warn')
_log('loaded %d dashboard static files' % len(_STATIC_CACHE), 'info')
def get_static_file(filename):
"""Get a cached dashboard static file."""
return _STATIC_CACHE.get(filename)
# Theme colors - dark tiles on lighter background
THEME = {
'bg': '#1e2738',
'card': '#181f2a',
'card_alt': '#212a36',
'border': '#3a4858',
'text': '#e8eef5',
'dim': '#8b929b',
'green': '#3fb950',
'red': '#f85149',
'yellow': '#d29922',
'blue': '#58a6ff',
'purple': '#a371f7',
'cyan': '#39c5cf',
'orange': '#db6d28',
'pink': '#db61a2',
'map_bg': '#1e2738', # Match dashboard background
}
class ProxyAPIHandler(BaseHTTPServer.BaseHTTPRequestHandler):
"""HTTP request handler for proxy API."""
database = None
stats_provider = None
def log_message(self, format, *args):
pass
def send_response_body(self, body, content_type, status=200):
self.send_response(status)
self.send_header('Content-Type', content_type)
self.send_header('Content-Length', len(body))
self.send_header('Cache-Control', 'no-cache')
# Add security headers
for header, value in get_security_headers(content_type):
self.send_header(header, value)
self.end_headers()
self.wfile.write(body)
def send_json(self, data, status=200):
self.send_response_body(json.dumps(data, indent=2), 'application/json', status)
def send_text(self, text, status=200):
self.send_response_body(text, 'text/plain', status)
def send_html(self, html, status=200):
self.send_response_body(html, 'text/html; charset=utf-8', status)
def send_css(self, css, status=200):
self.send_response_body(css, 'text/css; charset=utf-8', status)
def send_js(self, js, status=200):
self.send_response_body(js, 'application/javascript; charset=utf-8', status)
def send_download(self, body, content_type, filename):
"""Send response with Content-Disposition for download."""
self.send_response(200)
self.send_header('Content-Type', content_type)
self.send_header('Content-Length', len(body))
self.send_header('Content-Disposition', 'attachment; filename="%s"' % filename)
for header, value in get_security_headers(content_type):
self.send_header(header, value)
self.end_headers()
self.wfile.write(body)
def do_GET(self):
# Rate limiting check
client_ip = self.client_address[0] if self.client_address else ''
if not check_rate_limit(client_ip):
self.send_response(429)
self.send_header('Content-Type', 'application/json')
self.send_header('Retry-After', str(_rate_limit_window))
for header, value in get_security_headers('application/json'):
self.send_header(header, value)
self.end_headers()
self.wfile.write(json.dumps({'error': 'rate limited', 'retry_after': _rate_limit_window}))
return
path = self.path.split('?')[0]
routes = {
'/': self.handle_index,
'/dashboard': self.handle_dashboard,
'/map': self.handle_map,
'/static/style.css': self.handle_css,
'/static/dashboard.js': self.handle_js,
'/api/stats': self.handle_stats,
'/api/stats/export': self.handle_stats_export,
'/api/countries': self.handle_countries,
'/proxies': self.handle_proxies,
'/proxies/all': self.handle_proxies_all,
'/proxies/count': self.handle_count,
'/health': self.handle_health,
}
handler = routes.get(path)
if handler:
handler()
else:
self.send_json({'error': 'not found'}, 404)
def handle_index(self):
self.send_json({
'endpoints': {
'/dashboard': 'web dashboard (HTML)',
'/api/stats': 'runtime statistics (JSON)',
'/api/stats/export': 'export stats (params: format=json|csv)',
'/proxies': 'list working proxies (params: limit, proto, country, asn)',
'/proxies/all': 'list ALL working proxies (params: proto, country, asn, format)',
'/proxies/count': 'count working proxies',
'/health': 'health check',
}
})
def handle_dashboard(self):
self.send_html(DASHBOARD_HTML)
def handle_map(self):
self.send_html(MAP_HTML)
def handle_countries(self):
"""Return all countries with proxy counts."""
try:
db = mysqlite.mysqlite(self.database, str)
rows = db.execute(
'SELECT country, COUNT(*) as c FROM proxylist WHERE failed=0 AND country IS NOT NULL '
'GROUP BY country ORDER BY c DESC'
).fetchall()
countries = {r[0]: r[1] for r in rows}
self.send_json({'countries': countries})
except Exception as e:
self.send_json({'error': str(e)}, 500)
def handle_css(self):
self.send_css(DASHBOARD_CSS)
def handle_js(self):
self.send_js(DASHBOARD_JS)
def get_db_stats(self):
"""Get statistics from database."""
try:
db = mysqlite.mysqlite(self.database, str)
stats = {}
# Total counts
row = db.execute('SELECT COUNT(*) FROM proxylist WHERE failed=0 AND proto IS NOT NULL').fetchone()
stats['working'] = row[0] if row else 0
row = db.execute('SELECT COUNT(*) FROM proxylist').fetchone()
stats['total'] = row[0] if row else 0
# By protocol
rows = db.execute(
'SELECT proto, COUNT(*) FROM proxylist WHERE failed=0 GROUP BY proto'
).fetchall()
stats['by_proto'] = {r[0] or 'unknown': r[1] for r in rows}
# Top countries
rows = db.execute(
'SELECT country, COUNT(*) as c FROM proxylist WHERE failed=0 AND country IS NOT NULL '
'GROUP BY country ORDER BY c DESC LIMIT 10'
).fetchall()
stats['top_countries'] = [{'code': r[0], 'count': r[1]} for r in rows]
# Top ASNs
rows = db.execute(
'SELECT asn, COUNT(*) as c FROM proxylist WHERE failed=0 AND asn IS NOT NULL '
'GROUP BY asn ORDER BY c DESC LIMIT 10'
).fetchall()
stats['top_asns'] = [(r[0], r[1]) for r in rows]
return stats
except Exception as e:
return {'error': str(e)}
def handle_stats(self):
stats = {}
# Runtime stats from provider
if self.stats_provider:
try:
stats.update(self.stats_provider())
except Exception as e:
_log('stats_provider error: %s' % str(e), 'error')
# Add system stats
stats['system'] = get_system_stats()
# Add database stats
try:
db = mysqlite.mysqlite(self.database, str)
stats['db'] = self.get_db_stats()
stats['db_health'] = get_db_health(db)
except Exception as e:
_log('handle_stats db error: %s' % e, 'warn')
self.send_json(stats)
def handle_stats_export(self):
"""Export stats as JSON or CSV with download header."""
params = {}
if '?' in self.path:
for pair in self.path.split('?')[1].split('&'):
if '=' in pair:
k, v = pair.split('=', 1)
params[k] = v
fmt = params.get('format', 'json').lower()
timestamp = time.strftime('%Y%m%d_%H%M%S')
# Gather stats
stats = {}
if self.stats_provider:
try:
stats.update(self.stats_provider())
except Exception as e:
_log('stats_provider error in export: %s' % e, 'warn')
stats['system'] = get_system_stats()
stats['exported_at'] = time.strftime('%Y-%m-%d %H:%M:%S')
if fmt == 'csv':
# Flatten stats to CSV rows
lines = ['key,value']
def flatten(obj, prefix=''):
if isinstance(obj, dict):
for k, v in sorted(obj.items()):
flatten(v, '%s%s.' % (prefix, k) if prefix else '%s.' % k)
elif isinstance(obj, (list, tuple)):
for i, v in enumerate(obj):
flatten(v, '%s%d.' % (prefix, i))
else:
key = prefix.rstrip('.')
val = str(obj).replace('"', '""')
lines.append('"%s","%s"' % (key, val))
flatten(stats)
body = '\n'.join(lines)
self.send_download(body, 'text/csv', 'ppf_stats_%s.csv' % timestamp)
else:
body = json.dumps(stats, indent=2)
self.send_download(body, 'application/json', 'ppf_stats_%s.json' % timestamp)
def handle_proxies(self):
params = {}
if '?' in self.path:
for pair in self.path.split('?')[1].split('&'):
if '=' in pair:
k, v = pair.split('=', 1)
params[k] = v
limit = min(int(params.get('limit', 100)), 1000)
proto = params.get('proto', '')
country = params.get('country', '')
asn = params.get('asn', '')
mitm_filter = params.get('mitm', '')
fmt = params.get('format', 'json')
sql = 'SELECT ip, port, proto, country, asn, avg_latency, protos_working FROM proxylist WHERE failed=0 AND proto IS NOT NULL AND last_seen >= strftime("%s","now") - 3600'
args = []
if proto:
sql += ' AND proto=?'
args.append(proto)
if country:
sql += ' AND country=?'
args.append(country.upper())
if asn:
sql += ' AND asn=?'
args.append(int(asn))
if mitm_filter == '0':
sql += ' AND mitm=0'
elif mitm_filter == '1':
sql += ' AND mitm=1'
sql += ' ORDER BY avg_latency ASC, tested DESC LIMIT ?'
args.append(limit)
try:
db = mysqlite.mysqlite(self.database, str)
rows = db.execute(sql, args).fetchall()
if fmt == 'plain':
self.send_text('\n'.join('%s:%s' % (r[0], r[1]) for r in rows))
else:
proxies = [{
'ip': r[0], 'port': r[1], 'proto': r[2],
'country': r[3], 'asn': r[4], 'latency': r[5],
'protos': r[6].split(',') if r[6] else [r[2]]
} for r in rows]
self.send_json({'count': len(proxies), 'proxies': proxies})
except Exception as e:
self.send_json({'error': str(e)}, 500)
def handle_proxies_all(self):
params = {}
if '?' in self.path:
for pair in self.path.split('?')[1].split('&'):
if '=' in pair:
k, v = pair.split('=', 1)
params[k] = v
proto = params.get('proto', '')
country = params.get('country', '')
asn = params.get('asn', '')
mitm_filter = params.get('mitm', '')
fmt = params.get('format', 'json')
sql = 'SELECT ip, port, proto, country, asn, avg_latency, protos_working FROM proxylist WHERE failed=0 AND proto IS NOT NULL AND last_seen >= strftime("%s","now") - 3600'
args = []
if proto:
sql += ' AND proto=?'
args.append(proto)
if country:
sql += ' AND country=?'
args.append(country.upper())
if asn:
sql += ' AND asn=?'
args.append(int(asn))
if mitm_filter == '0':
sql += ' AND mitm=0'
elif mitm_filter == '1':
sql += ' AND mitm=1'
sql += ' ORDER BY avg_latency ASC, tested DESC'
try:
db = mysqlite.mysqlite(self.database, str)
rows = db.execute(sql, args).fetchall()
if fmt == 'plain':
self.send_text('\n'.join('%s://%s:%s' % (r[2] or 'http', r[0], r[1]) for r in rows))
else:
proxies = [{
'ip': r[0], 'port': r[1], 'proto': r[2],
'country': r[3], 'asn': r[4], 'latency': r[5],
'protos': r[6].split(',') if r[6] else [r[2]]
} for r in rows]
self.send_json({'count': len(proxies), 'proxies': proxies})
except Exception as e:
self.send_json({'error': str(e)}, 500)
def handle_count(self):
try:
db = mysqlite.mysqlite(self.database, str)
row = db.execute('SELECT COUNT(*) FROM proxylist WHERE failed=0 AND proto IS NOT NULL').fetchone()
self.send_json({'count': row[0] if row else 0})
except Exception as e:
self.send_json({'error': str(e)}, 500)
def handle_health(self):
self.send_json({'status': 'ok', 'version': __version__, 'timestamp': int(time.time())})
class ProxyAPIServer(threading.Thread):
"""Threaded HTTP API server.
Uses gevent's WSGIServer when running in a gevent-patched environment,
otherwise falls back to standard BaseHTTPServer.
"""
def __init__(self, host, port, database, stats_provider=None, profiling=False, url_database=None):
threading.Thread.__init__(self)
self.host = host
self.port = port
self.database = database
self.url_database = url_database
self.stats_provider = stats_provider
self.profiling = profiling
self.daemon = True
global _url_database_path
_url_database_path = url_database
self.server = None
self._stop_event = threading.Event() if not GEVENT_PATCHED else None
# Load static library files into cache
load_static_libs()
# Load dashboard static files (HTML, CSS, JS) with theme substitution
load_static_files(THEME)
# Load worker registry from disk
load_workers()
# Create verification tables if they don't exist
try:
db = mysqlite.mysqlite(self.database, str)
create_verification_tables(db)
_log('verification tables initialized', 'debug')
except Exception as e:
_log('failed to create verification tables: %s' % e, 'warn')
def _wsgi_app(self, environ, start_response):
"""WSGI application wrapper for gevent."""
path = environ.get('PATH_INFO', '/').split('?')[0]
query_string = environ.get('QUERY_STRING', '')
method = environ.get('REQUEST_METHOD', 'GET')
remote_addr = environ.get('REMOTE_ADDR', '')
# Parse query parameters
query_params = {}
if query_string:
for param in query_string.split('&'):
if '=' in param:
k, v = param.split('=', 1)
query_params[k] = v
# Only allow GET and POST
if method not in ('GET', 'POST'):
start_response('405 Method Not Allowed', [('Content-Type', 'text/plain')])
return [b'Method not allowed']
# POST only allowed for worker API endpoints
post_endpoints = ('/api/register', '/api/heartbeat',
'/api/report-urls', '/api/report-proxies')
if method == 'POST' and path not in post_endpoints:
start_response('405 Method Not Allowed', [('Content-Type', 'text/plain')])
return [b'POST not allowed for this endpoint']
# Parse POST body
post_data = None
if method == 'POST':
try:
content_length = int(environ.get('CONTENT_LENGTH', 0))
if content_length > 0:
body = environ['wsgi.input'].read(content_length)
post_data = json.loads(body)
except Exception:
error_body = json.dumps({'error': 'invalid JSON body'})
headers = [('Content-Type', 'application/json')]
start_response('400 Bad Request', headers)
return [error_body.encode('utf-8')]
# Rate limiting check
if not check_rate_limit(remote_addr):
error_body = json.dumps({'error': 'rate limited', 'retry_after': _rate_limit_window})
headers = [
('Content-Type', 'application/json'),
('Content-Length', str(len(error_body))),
('Retry-After', str(_rate_limit_window)),
]
headers.extend(get_security_headers('application/json'))
start_response('429 Too Many Requests', headers)
return [error_body]
# Route handling
try:
response_body, content_type, status = self._handle_route(path, remote_addr, query_params, post_data)
status_line = '%d %s' % (status, 'OK' if status == 200 else 'Error')
headers = [
('Content-Type', content_type),
('Content-Length', str(len(response_body))),
('Cache-Control', 'no-cache'),
]
headers.extend(get_security_headers(content_type))
start_response(status_line, headers)
return [response_body.encode('utf-8') if isinstance(response_body, unicode) else response_body]
except Exception as e:
error_body = json.dumps({'error': str(e)})
headers = [
('Content-Type', 'application/json'),
('Content-Length', str(len(error_body))),
]
headers.extend(get_security_headers('application/json'))
start_response('500 Internal Server Error', headers)
return [error_body]
def _handle_route(self, path, remote_addr='', query_params=None, post_data=None):
"""Handle route and return (body, content_type, status)."""
if query_params is None:
query_params = {}
if path == '/':
body = json.dumps({
'endpoints': {
'/dashboard': 'web dashboard (HTML)',
'/map': 'proxy distribution by country (HTML)',
'/mitm': 'MITM certificate search (HTML)',
'/api/dashboard': 'batch endpoint: stats + workers + countries (JSON)',
'/api/stats': 'runtime statistics (JSON)',
'/api/mitm': 'MITM certificate statistics (JSON)',
'/api/countries': 'proxy counts by country (JSON)',
'/api/register': 'register as worker (POST)',
'/api/workers': 'list connected workers',
'/api/claim-urls': 'claim URL batch for worker-driven fetching (GET, params: key, count)',
'/api/report-urls': 'report URL fetch results (POST, params: key)',
'/api/report-proxies': 'report working proxies (POST, params: key)',
'/proxies': 'list working proxies (params: limit, proto, country, asn)',
'/proxies/all': 'list ALL working proxies (params: proto, country, asn, format)',
'/proxies/count': 'count working proxies',
'/health': 'health check',
}
}, indent=2)
return body, 'application/json', 200
elif path == '/dashboard':
content = get_static_file('dashboard.html')
if content:
return content, 'text/html; charset=utf-8', 200
return '{"error": "dashboard.html not loaded"}', 'application/json', 500
elif path == '/map':
content = get_static_file('map.html')
if content:
return content, 'text/html; charset=utf-8', 200
return '{"error": "map.html not loaded"}', 'application/json', 500
elif path == '/mitm':
content = get_static_file('mitm.html')
if content:
return content, 'text/html; charset=utf-8', 200
return '{"error": "mitm.html not loaded"}', 'application/json', 500
elif path == '/workers':
content = get_static_file('workers.html')
if content:
return content, 'text/html; charset=utf-8', 200
return '{"error": "workers.html not loaded"}', 'application/json', 500
elif path == '/static/style.css':
content = get_static_file('style.css')
if content:
return content, 'text/css; charset=utf-8', 200
return '{"error": "style.css not loaded"}', 'application/json', 500
elif path == '/static/dashboard.js':
content = get_static_file('dashboard.js')
if content:
return content, 'application/javascript; charset=utf-8', 200
return '{"error": "dashboard.js not loaded"}', 'application/json', 500
elif path == '/static/map.js':
content = get_static_file('map.js')
if content:
return content, 'application/javascript; charset=utf-8', 200
return '{"error": "map.js not loaded"}', 'application/json', 500
elif path == '/static/mitm.js':
content = get_static_file('mitm.js')
if content:
return content, 'application/javascript; charset=utf-8', 200
return '{"error": "mitm.js not loaded"}', 'application/json', 500
elif path.startswith('/static/lib/'):
# Serve static library files from cache
filename = path.split('/')[-1]
content = get_static_lib(filename)
if content:
ext = os.path.splitext(filename)[1]
content_type = _CONTENT_TYPES.get(ext, 'application/octet-stream')
return content, content_type, 200
return '{"error": "not found"}', 'application/json', 404
elif path == '/api/stats':
stats = {}
if self.stats_provider:
stats = self.stats_provider()
# Add system stats
stats['system'] = get_system_stats()
# Add database stats
try:
db = mysqlite.mysqlite(self.database, str)
stats['db'] = self._get_db_stats(db)
stats['db_health'] = get_db_health(db)
except Exception as e:
_log('api/stats db error: %s' % e, 'warn')
# Add profiling flag (from constructor or stats_provider)
if 'profiling' not in stats:
stats['profiling'] = self.profiling
return json.dumps(stats, indent=2), 'application/json', 200
elif path == '/api/dashboard':
# Batch endpoint combining stats + workers + countries
# Reduces RTT for dashboard - single request instead of multiple
result = {}
# 1. Runtime stats (same as /api/stats)
stats = {}
if self.stats_provider:
try:
stats = self.stats_provider()
except Exception as e:
_log('api/dashboard stats_provider error: %s' % e, 'warn')
stats['system'] = get_system_stats()
if 'profiling' not in stats:
stats['profiling'] = self.profiling
result['stats'] = stats
# 2. Database stats and health
try:
db = mysqlite.mysqlite(self.database, str)
result['stats']['db'] = self._get_db_stats(db)
result['stats']['db_health'] = get_db_health(db)
# 3. Countries (same as /api/countries)
rows = db.execute(
'SELECT country, COUNT(*) as c FROM proxylist WHERE failed=0 AND country IS NOT NULL '
'GROUP BY country ORDER BY c DESC'
).fetchall()
result['countries'] = {r[0]: r[1] for r in rows}
# 4. Workers (same as /api/workers)
result['workers'] = self._get_workers_data(db)
db.close()
except Exception as e:
_log('api/dashboard db error: %s' % e, 'warn')
result['countries'] = {}
result['workers'] = {'workers': [], 'total': 0, 'active': 0}
return json.dumps(result, indent=2), 'application/json', 200
elif path == '/api/mitm':
# MITM certificate statistics
if self.stats_provider:
try:
stats = self.stats_provider()
mitm = stats.get('mitm', {})
return json.dumps(mitm, indent=2), 'application/json', 200
except Exception as e:
return json.dumps({'error': str(e)}), 'application/json', 500
return json.dumps({'error': 'stats not available'}), 'application/json', 500
elif path == '/api/countries':
try:
db = mysqlite.mysqlite(self.database, str)
rows = db.execute(
'SELECT country, COUNT(*) as c FROM proxylist WHERE failed=0 AND country IS NOT NULL '
'GROUP BY country ORDER BY c DESC'
).fetchall()
countries = {r[0]: r[1] for r in rows}
return json.dumps({'countries': countries}, indent=2), 'application/json', 200
except Exception as e:
return json.dumps({'error': str(e)}), 'application/json', 500
elif path == '/api/locations':
# Return proxy locations aggregated by lat/lon grid (0.5 degree cells)
try:
db = mysqlite.mysqlite(self.database, str)
rows = db.execute(
'SELECT ROUND(latitude, 1) as lat, ROUND(longitude, 1) as lon, '
'country, anonymity, COUNT(*) as c FROM proxylist '
'WHERE failed=0 AND latitude IS NOT NULL AND longitude IS NOT NULL '
'GROUP BY lat, lon, country, anonymity ORDER BY c DESC'
).fetchall()
locations = [{'lat': r[0], 'lon': r[1], 'country': r[2], 'anon': r[3] or 'unknown', 'count': r[4]} for r in rows]
return json.dumps({'locations': locations}, indent=2), 'application/json', 200
except Exception as e:
return json.dumps({'error': str(e)}), 'application/json', 500
elif path == '/proxies':
try:
limit = min(int(query_params.get('limit', 100)), 1000)
proto = query_params.get('proto', '')
country = query_params.get('country', '')
asn = query_params.get('asn', '')
mitm_filter = query_params.get('mitm', '')
fmt = query_params.get('format', 'json')
sql = 'SELECT ip, port, proto, country, asn, avg_latency, protos_working, mitm FROM proxylist WHERE failed=0 AND proto IS NOT NULL AND last_seen >= strftime("%s","now") - 3600'
args = []
if proto:
sql += ' AND proto=?'
args.append(proto)
if country:
sql += ' AND country=?'
args.append(country.upper())
if asn:
sql += ' AND asn=?'
args.append(int(asn))
if mitm_filter == '0':
sql += ' AND mitm=0'
elif mitm_filter == '1':
sql += ' AND mitm=1'
sql += ' ORDER BY avg_latency ASC, tested DESC LIMIT ?'
args.append(limit)
db = mysqlite.mysqlite(self.database, str)
rows = db.execute(sql, args).fetchall()
if fmt == 'plain':
return '\n'.join('%s://%s:%s' % (r[2] or 'http', r[0], r[1]) for r in rows), 'text/plain', 200
proxies = [{
'proxy': '%s:%s' % (r[0], r[1]), 'proto': r[2], 'country': r[3],
'asn': r[4], 'latency': r[5],
'protos': r[6].split(',') if r[6] else [r[2]],
'mitm': bool(r[7]),
} for r in rows]
return json.dumps({'count': len(proxies), 'proxies': proxies}, indent=2), 'application/json', 200
except Exception as e:
return json.dumps({'error': str(e)}), 'application/json', 500
elif path == '/proxies/all':
try:
proto = query_params.get('proto', '')
country = query_params.get('country', '')
asn = query_params.get('asn', '')
mitm_filter = query_params.get('mitm', '')
fmt = query_params.get('format', 'json')
sql = 'SELECT ip, port, proto, country, asn, avg_latency, protos_working, mitm FROM proxylist WHERE failed=0 AND proto IS NOT NULL AND last_seen >= strftime("%s","now") - 3600'
args = []
if proto:
sql += ' AND proto=?'
args.append(proto)
if country:
sql += ' AND country=?'
args.append(country.upper())
if asn:
sql += ' AND asn=?'
args.append(int(asn))
if mitm_filter == '0':
sql += ' AND mitm=0'
elif mitm_filter == '1':
sql += ' AND mitm=1'
sql += ' ORDER BY avg_latency ASC, tested DESC'
db = mysqlite.mysqlite(self.database, str)
rows = db.execute(sql, args).fetchall()
if fmt == 'plain':
return '\n'.join('%s://%s:%s' % (r[2] or 'http', r[0], r[1]) for r in rows), 'text/plain', 200
proxies = [{
'proxy': '%s:%s' % (r[0], r[1]), 'proto': r[2], 'country': r[3],
'asn': r[4], 'latency': r[5],
'protos': r[6].split(',') if r[6] else [r[2]],
'mitm': bool(r[7]),
} for r in rows]
return json.dumps({'count': len(proxies), 'proxies': proxies}, indent=2), 'application/json', 200
except Exception as e:
return json.dumps({'error': str(e)}), 'application/json', 500
elif path == '/proxies/count':
try:
mitm_filter = query_params.get('mitm', '')
sql = 'SELECT COUNT(*) FROM proxylist WHERE failed=0 AND proto IS NOT NULL AND last_seen >= strftime("%s","now") - 3600'
if mitm_filter == '0':
sql += ' AND mitm=0'
elif mitm_filter == '1':
sql += ' AND mitm=1'
db = mysqlite.mysqlite(self.database, str)
row = db.execute(sql).fetchone()
return json.dumps({'count': row[0] if row else 0}), 'application/json', 200
except Exception as e:
return json.dumps({'error': str(e)}), 'application/json', 500
elif path == '/health':
return json.dumps({'status': 'ok', 'version': __version__, 'timestamp': int(time.time())}), 'application/json', 200
# Worker API endpoints
elif path == '/api/register':
# Register a new worker (POST)
if not post_data:
return json.dumps({'error': 'POST body required'}), 'application/json', 400
name = post_data.get('name', 'worker-%s' % remote_addr)
master_key = post_data.get('master_key', '')
# Require master key for registration (if set)
if _master_key and master_key != _master_key:
return json.dumps({'error': 'invalid master key'}), 'application/json', 403
worker_id, worker_key = register_worker(name, remote_addr)
_log('worker registered: %s (%s) from %s' % (name, worker_id, remote_addr), 'info')
return json.dumps({
'worker_id': worker_id,
'worker_key': worker_key,
'message': 'registered successfully',
}), 'application/json', 200
elif path == '/api/heartbeat':
# Worker heartbeat with Tor status (POST)
key = query_params.get('key', '')
if not validate_worker_key(key):
return json.dumps({'error': 'invalid worker key'}), 'application/json', 403
worker_id, _ = get_worker_by_key(key)
if not worker_id:
return json.dumps({'error': 'worker not found'}), 'application/json', 404
# Update worker Tor, profiling, and thread status
tor_ok = post_data.get('tor_ok', True) if post_data else True
tor_ip = post_data.get('tor_ip') if post_data else None
profiling = post_data.get('profiling', False) if post_data else False
threads = post_data.get('threads', 0) if post_data else 0
now = time.time()
with _workers_lock:
if worker_id in _workers:
_workers[worker_id]['last_seen'] = now
_workers[worker_id]['tor_ok'] = tor_ok
_workers[worker_id]['tor_ip'] = tor_ip
_workers[worker_id]['tor_last_check'] = now
_workers[worker_id]['profiling'] = profiling
_workers[worker_id]['threads'] = threads
return json.dumps({
'worker_id': worker_id,
'message': 'heartbeat received',
}), 'application/json', 200
elif path == '/api/workers':
# List connected workers
try:
db = mysqlite.mysqlite(self.database, str)
workers_data = self._get_workers_data(db)
db.close()
return json.dumps(workers_data, indent=2), 'application/json', 200
except Exception as e:
_log('api/workers error: %s' % e, 'warn')
return json.dumps({'error': str(e)}), 'application/json', 500
elif path == '/api/claim-urls':
# Worker claims batch of URLs for fetching (GET)
key = query_params.get('key', '')
if not validate_worker_key(key):
return json.dumps({'error': 'invalid worker key'}), 'application/json', 403
worker_id, _ = get_worker_by_key(key)
if not worker_id:
return json.dumps({'error': 'worker not found'}), 'application/json', 404
if not self.url_database:
return json.dumps({'error': 'url database not configured'}), 'application/json', 500
count = min(int(query_params.get('count', 5)), 20)
try:
url_db = mysqlite.mysqlite(self.url_database, str)
urls = claim_urls(url_db, worker_id, count)
update_worker_heartbeat(worker_id)
return json.dumps({
'worker_id': worker_id,
'count': len(urls),
'urls': urls,
}), 'application/json', 200
except Exception as e:
return json.dumps({'error': str(e)}), 'application/json', 500
elif path == '/api/report-urls':
# Worker reports URL fetch results (POST)
key = query_params.get('key', '')
if not validate_worker_key(key):
return json.dumps({'error': 'invalid worker key'}), 'application/json', 403
worker_id, _ = get_worker_by_key(key)
if not worker_id:
return json.dumps({'error': 'worker not found'}), 'application/json', 404
if not self.url_database:
return json.dumps({'error': 'url database not configured'}), 'application/json', 500
if not post_data:
return json.dumps({'error': 'POST body required'}), 'application/json', 400
reports = post_data.get('reports', [])
if not reports:
return json.dumps({'error': 'no reports provided'}), 'application/json', 400
try:
url_db = mysqlite.mysqlite(self.url_database, str)
processed = submit_url_reports(url_db, worker_id, reports)
update_worker_heartbeat(worker_id)
return json.dumps({
'worker_id': worker_id,
'processed': processed,
}), 'application/json', 200
except Exception as e:
return json.dumps({'error': str(e)}), 'application/json', 500
elif path == '/api/report-proxies':
# Worker reports working proxies (POST)
key = query_params.get('key', '')
if not validate_worker_key(key):
return json.dumps({'error': 'invalid worker key'}), 'application/json', 403
worker_id, _ = get_worker_by_key(key)
if not worker_id:
return json.dumps({'error': 'worker not found'}), 'application/json', 404
if not post_data:
return json.dumps({'error': 'POST body required'}), 'application/json', 400
proxies = post_data.get('proxies', [])
if not proxies:
return json.dumps({'error': 'no proxies provided'}), 'application/json', 400
try:
db = mysqlite.mysqlite(self.database, str)
processed = submit_proxy_reports(db, worker_id, proxies)
update_worker_heartbeat(worker_id)
return json.dumps({
'worker_id': worker_id,
'processed': processed,
}), 'application/json', 200
except Exception as e:
_log('report-proxies error from %s: %s' % (worker_id, e), 'error')
return json.dumps({'error': str(e)}), 'application/json', 500
else:
return json.dumps({'error': 'not found'}), 'application/json', 404
def _get_db_stats(self, db):
"""Get database statistics."""
stats = {}
try:
# By protocol
rows = db.execute(
'SELECT proto, COUNT(*) FROM proxylist WHERE failed=0 GROUP BY proto'
).fetchall()
stats['by_proto'] = {r[0]: r[1] for r in rows if r[0]}
# Top countries
rows = db.execute(
'SELECT country, COUNT(*) as cnt FROM proxylist WHERE failed=0 AND country IS NOT NULL '
'GROUP BY country ORDER BY cnt DESC LIMIT 10'
).fetchall()
stats['top_countries'] = [{'code': r[0], 'count': r[1]} for r in rows]
# Total counts
row = db.execute('SELECT COUNT(*) FROM proxylist WHERE failed=0 AND proto IS NOT NULL').fetchone()
stats['working'] = row[0] if row else 0
row = db.execute('SELECT COUNT(*) FROM proxylist').fetchone()
stats['total'] = row[0] if row else 0
# Proxies due for testing (eligible for worker claims)
due_condition, due_params = _build_due_condition()
row = db.execute(
'SELECT COUNT(*) FROM proxylist WHERE ' + due_condition,
due_params).fetchone()
due_total = row[0] if row else 0
stats['due'] = due_total
stats['claimed'] = 0
except Exception as e:
_log('_get_db_stats error: %s' % e, 'warn')
return stats
def _get_workers_data(self, db):
"""Get worker status data. Used by /api/workers and /api/dashboard."""
now = time.time()
with _workers_lock:
workers = []
total_tested = 0
total_working = 0
total_failed = 0
for wid, info in _workers.items():
tested = info.get('proxies_tested', 0)
working = info.get('proxies_working', 0)
failed = info.get('proxies_failed', 0)
total_latency = info.get('total_latency', 0)
avg_latency = round(total_latency / working, 2) if working > 0 else 0
success_rate = round(100 * working / tested, 1) if tested > 0 else 0
total_tested += tested
total_working += working
total_failed += failed
# Tor status with age check
tor_ok = info.get('tor_ok', True)
tor_ip = info.get('tor_ip')
tor_last_check = info.get('tor_last_check', 0)
tor_age = int(now - tor_last_check) if tor_last_check else None
worker_profiling = info.get('profiling', False)
worker_threads = info.get('threads', 0)
# Calculate test rate for this worker
test_rate = get_worker_test_rate(wid)
workers.append({
'id': wid,
'name': info['name'],
'ip': info['ip'],
'registered': int(info['registered']),
'last_seen': int(info['last_seen']),
'age': int(now - info['last_seen']),
'jobs_completed': info.get('jobs_completed', 0),
'proxies_tested': tested,
'proxies_working': working,
'proxies_failed': failed,
'success_rate': success_rate,
'avg_latency': avg_latency,
'last_batch_size': info.get('last_batch_size', 0),
'last_batch_working': info.get('last_batch_working', 0),
'active': (now - info['last_seen']) < 120,
'test_rate': round(test_rate, 2),
'tor_ok': tor_ok,
'tor_ip': tor_ip,
'tor_age': tor_age,
'profiling': worker_profiling,
'threads': worker_threads,
})
# Sort by name for consistent display
workers.sort(key=lambda w: w['name'])
# Get queue status from database
queue_stats = {
'pending': 0, 'claimed': 0, 'due': 0, 'total': 0,
'untested': 0, 'session_tested': 0, 'session_pct': 0,
'session_start': _session_start_time
}
try:
# Total proxies in database
row = db.execute('SELECT COUNT(*) FROM proxylist').fetchone()
queue_stats['total'] = row[0] if row else 0
# Never tested (tested IS NULL)
row = db.execute('SELECT COUNT(*) FROM proxylist WHERE tested IS NULL').fetchone()
queue_stats['untested'] = row[0] if row else 0
# Tested this session (since httpd started)
row = db.execute('SELECT COUNT(*) FROM proxylist WHERE tested >= ?', (_session_start_time,)).fetchone()
queue_stats['session_tested'] = row[0] if row else 0
# Pending = total eligible (not dead)
row = db.execute('SELECT COUNT(*) FROM proxylist WHERE failed >= 0 AND failed < 5').fetchone()
queue_stats['pending'] = row[0] if row else 0
# Session progress percentage (tested / total, capped at 100%)
if queue_stats['total'] > 0:
pct = 100.0 * queue_stats['session_tested'] / queue_stats['total']
queue_stats['session_pct'] = round(min(pct, 100.0), 1)
queue_stats['claimed'] = 0
# Due = ready for testing (respecting cooldown)
due_condition, due_params = _build_due_condition()
row = db.execute(
'SELECT COUNT(*) FROM proxylist WHERE ' + due_condition,
due_params).fetchone()
queue_stats['due'] = row[0] if row else 0
except Exception as e:
_log('_get_workers_data queue stats error: %s' % e, 'warn')
# Calculate combined test rate from active workers
combined_rate = sum(w['test_rate'] for w in workers if w['active'])
# Get manager's own stats (local proxy testing)
manager_stats = None
if self.stats_provider:
try:
stats = self.stats_provider()
threads = stats.get('threads', 0)
if threads > 0: # Manager has local testing enabled
manager_stats = {
'threads': threads,
'tested': stats.get('tested', 0),
'passed': stats.get('passed', 0),
'rate': round(stats.get('recent_rate', 0), 2),
'success_rate': round(stats.get('success_rate', 0), 1),
'uptime': stats.get('uptime_seconds', 0),
'queue_size': stats.get('queue_size', 0),
}
except Exception as e:
_log('_get_workers_data manager stats error: %s' % e, 'warn')
# Get verification stats and worker trust
verification_stats = {'queue_size': 0, 'queue_by_trigger': {}}
worker_trust_data = {}
try:
verification_stats = get_verification_stats(db)
worker_trust_data = get_all_worker_trust(db)
except Exception as e:
_log('_get_workers_data verification stats error: %s' % e, 'warn')
# Add trust scores to workers
for w in workers:
wid = w['id']
if wid in worker_trust_data:
trust = worker_trust_data[wid]
w['trust_score'] = round(trust['trust_score'], 2)
w['verifications'] = trust['verifications']
w['trust_correct'] = trust['correct']
w['trust_incorrect'] = trust['incorrect']
else:
w['trust_score'] = 1.0 # Default for new workers
w['verifications'] = 0
w['trust_correct'] = 0
w['trust_incorrect'] = 0
return {
'workers': workers,
'total': len(workers),
'active': sum(1 for w in workers if w['active']),
'manager': manager_stats,
'summary': {
'total_tested': total_tested,
'total_working': total_working,
'total_failed': total_failed,
'overall_success_rate': round(100 * total_working / total_tested, 1) if total_tested > 0 else 0,
'combined_rate': round(combined_rate, 2),
},
'queue': queue_stats,
'verification': verification_stats,
}
def run(self):
ProxyAPIHandler.database = self.database
ProxyAPIHandler.stats_provider = self.stats_provider
if GEVENT_PATCHED:
# Use gevent's WSGIServer for proper async handling
self.server = WSGIServer((self.host, self.port), self._wsgi_app, log=None)
_log('httpd listening on %s:%d (gevent)' % (self.host, self.port), 'info')
self.server.serve_forever()
else:
# Standard BaseHTTPServer for non-gevent environments
self.server = BaseHTTPServer.HTTPServer((self.host, self.port), ProxyAPIHandler)
_log('httpd listening on %s:%d' % (self.host, self.port), 'info')
self.server.serve_forever()
def stop(self):
if self.server:
if GEVENT_PATCHED:
self.server.stop()
else:
self.server.shutdown()
if __name__ == '__main__':
import sys
host = '127.0.0.1'
port = 8081
database = 'data/proxies.sqlite'
if len(sys.argv) > 1:
database = sys.argv[1]
_log('starting test server on %s:%d (db: %s)' % (host, port, database), 'info')
server = ProxyAPIServer(host, port, database)
server.start()
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
server.stop()
_log('server stopped', 'info')