Files
ppf/httpd.py
Username 6ba4b3e1e9 httpd: exclude untested proxies from results
Filter out entries with proto IS NULL from /proxies and /proxies/count
endpoints. These are proxies added to the database but never validated,
leaking into results with null proto, asn, and zero latency.
2026-02-15 04:02:00 +01:00

1773 lines
70 KiB
Python

#!/usr/bin/env python2
# -*- coding: utf-8 -*-
from __future__ import division
"""HTTP API server with advanced web dashboard for PPF."""
try:
from ppf import __version__
except ImportError:
__version__ = 'unknown'
import BaseHTTPServer
import json
import threading
import time
import os
import gc
import sys
from collections import defaultdict
import mysqlite
from misc import _log
from dbs import (create_verification_tables, insert_proxy_result,
check_for_disagreement, queue_verification,
get_verification_stats, get_all_worker_trust)
# IP geolocation (optional)
try:
import IP2Location
_geodb = IP2Location.IP2Location(os.path.join("data", "IP2LOCATION-LITE-DB1.BIN"))
_geolite = True
except (ImportError, IOError, ValueError):
_geodb = None
_geolite = False
# Rate limiting configuration
_rate_limits = defaultdict(list)
_rate_lock = threading.Lock()
_rate_limit_requests = 300 # requests per window (increased for workers)
_rate_limit_window = 60 # window in seconds
# Static directories (relative to this file)
_STATIC_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'static')
_STATIC_LIB_DIR = os.path.join(_STATIC_DIR, 'lib')
# Content type mapping for static files
_CONTENT_TYPES = {
'.js': 'application/javascript; charset=utf-8',
'.css': 'text/css; charset=utf-8',
'.json': 'application/json; charset=utf-8',
'.png': 'image/png',
'.svg': 'image/svg+xml',
'.woff': 'font/woff',
'.woff2': 'font/woff2',
}
# Cache for static library files (loaded once at startup)
_LIB_CACHE = {}
# Cache for dashboard static files (HTML, CSS, JS)
_STATIC_CACHE = {}
# Cache for expensive operations
_db_health_cache = {'value': {}, 'time': 0}
_db_health_ttl = 10 # seconds
# Simple RSS tracking for dashboard
_peak_rss = 0
_start_rss = 0
# Worker registry for distributed testing
import hashlib
import random
import string
_workers = {} # worker_id -> {name, ip, last_seen, jobs_completed, proxies_tested, ...}
_workers_lock = threading.Lock()
_work_claims = {} # proxy_key -> {worker_id, claimed_at}
_work_claims_lock = threading.Lock()
_worker_keys = set() # valid API keys
_master_key = None # master key for worker registration
_claim_timeout = 300 # seconds before unclaimed work is released
_workers_file = 'data/workers.json' # persistent storage
# Test rate tracking: worker_id -> list of (timestamp, count) tuples
_worker_test_history = {}
_worker_test_history_lock = threading.Lock()
_test_history_window = 120 # seconds to keep test history for rate calculation
# Fair distribution settings
_min_batch_size = 100 # minimum proxies per batch
_max_batch_size = 1000 # maximum proxies per batch
_worker_timeout = 120 # seconds before worker considered inactive
# Session tracking
_session_start_time = int(time.time()) # when httpd started
# Testing schedule configuration (set from config at startup)
_working_checktime = 300 # retest interval for working proxies (failed=0)
_fail_retry_interval = 60 # retry interval for failing proxies
_fail_retry_backoff = True # True=linear backoff (60,120,180...), False=fixed (60,60,60...)
_max_fail = 5 # failures before proxy considered dead
def configure_schedule(working_checktime, fail_retry_interval, fail_retry_backoff, max_fail):
"""Set testing schedule parameters from config."""
global _working_checktime, _fail_retry_interval, _fail_retry_backoff, _max_fail
_working_checktime = working_checktime
_fail_retry_interval = fail_retry_interval
_fail_retry_backoff = fail_retry_backoff
_max_fail = max_fail
def _build_due_condition():
"""Build SQL condition for proxy due check.
Returns (condition_sql, params) where condition_sql is the WHERE clause
and params is a tuple of parameter values.
Formula:
- failed=0: tested + working_checktime < now
- failed>0 with backoff: tested + (failed * fail_retry_interval) < now
- failed>0 without backoff: tested + fail_retry_interval < now
"""
now_int = int(time.time())
if _fail_retry_backoff:
# Linear backoff: multiply interval by failure count
condition = '''
failed >= 0 AND failed < ?
AND (tested IS NULL OR
CASE WHEN failed = 0
THEN tested + ? < ?
ELSE tested + (failed * ?) < ?
END)
'''
params = (_max_fail, _working_checktime, now_int, _fail_retry_interval, now_int)
else:
# Fixed interval: same delay regardless of failure count
condition = '''
failed >= 0 AND failed < ?
AND (tested IS NULL OR
CASE WHEN failed = 0
THEN tested + ? < ?
ELSE tested + ? < ?
END)
'''
params = (_max_fail, _working_checktime, now_int, _fail_retry_interval, now_int)
return condition, params
def get_active_worker_count():
"""Count workers seen within timeout window."""
now = time.time()
with _workers_lock:
return sum(1 for w in _workers.values()
if (now - w.get('last_seen', 0)) < _worker_timeout)
def get_due_proxy_count(db):
"""Count proxies due for testing (not claimed)."""
with _work_claims_lock:
claimed_count = len(_work_claims)
try:
condition, params = _build_due_condition()
query = 'SELECT COUNT(*) FROM proxylist WHERE ' + condition
result = db.execute(query, params).fetchone()
total_due = result[0] if result else 0
return max(0, total_due - claimed_count)
except Exception:
return 0
def calculate_fair_batch_size(db, worker_id):
"""Calculate fair batch size based on active workers and queue size."""
active_workers = max(1, get_active_worker_count())
due_count = get_due_proxy_count(db)
if due_count == 0:
return _min_batch_size
# Fair share: divide due work among active workers
# Add 20% buffer for speed variations between workers
fair_share = int((due_count / active_workers) * 1.2)
# Clamp to bounds
batch_size = max(_min_batch_size, min(fair_share, _max_batch_size))
_log('fair_batch: due=%d workers=%d share=%d batch=%d' % (
due_count, active_workers, fair_share, batch_size), 'debug')
return batch_size
def load_workers():
"""Load worker registry from disk."""
global _workers, _worker_keys
try:
if os.path.exists(_workers_file):
with open(_workers_file, 'r') as f:
data = json.load(f)
_workers = data.get('workers', {})
_worker_keys = set(data.get('keys', []))
_log('loaded %d workers from %s' % (len(_workers), _workers_file), 'info')
except Exception as e:
_log('failed to load workers: %s' % e, 'warn')
def save_workers():
"""Save worker registry to disk."""
try:
data = {
'workers': _workers,
'keys': list(_worker_keys),
'saved': time.time(),
}
with open(_workers_file, 'w') as f:
json.dump(data, f, indent=2)
except Exception as e:
_log('failed to save workers: %s' % e, 'warn')
def generate_worker_key():
"""Generate a random worker API key."""
chars = string.ascii_letters + string.digits
return ''.join(random.choice(chars) for _ in range(32))
def generate_worker_id(name, ip):
"""Generate a unique worker ID from name and IP."""
data = '%s:%s:%s' % (name, ip, time.time())
return hashlib.sha256(data.encode()).hexdigest()[:16]
def register_worker(name, ip, key=None):
"""Register a new worker and return its credentials.
If a worker with same name exists, update its IP and reset key.
This allows workers to re-register after restarts.
"""
# Check for existing worker with same name
with _workers_lock:
for wid, info in _workers.items():
if info.get('name') == name:
# Re-registration: update IP, generate new key, preserve stats
new_key = key or generate_worker_key()
_worker_keys.discard(info.get('key')) # Remove old key
info['ip'] = ip
info['key'] = new_key
info['last_seen'] = time.time()
_worker_keys.add(new_key)
save_workers()
return wid, new_key
# New registration
worker_id = generate_worker_id(name, ip)
worker_key = key or generate_worker_key()
with _workers_lock:
_workers[worker_id] = {
'name': name,
'ip': ip,
'key': worker_key,
'registered': time.time(),
'last_seen': time.time(),
'jobs_completed': 0,
'proxies_tested': 0,
'proxies_working': 0,
'proxies_failed': 0,
'total_latency': 0,
'last_batch_size': 0,
'last_batch_working': 0,
}
_worker_keys.add(worker_key)
save_workers()
return worker_id, worker_key
def validate_worker_key(key):
"""Check if worker key is valid."""
if not key:
return False
# Master key allows all operations
if _master_key and key == _master_key:
return True
return key in _worker_keys
def get_worker_by_key(key):
"""Get worker info by API key."""
with _workers_lock:
for wid, info in _workers.items():
if info.get('key') == key:
return wid, info
return None, None
def update_worker_heartbeat(worker_id):
"""Update worker's last_seen timestamp."""
with _workers_lock:
if worker_id in _workers:
_workers[worker_id]['last_seen'] = time.time()
def record_test_rate(worker_id, count):
"""Record test submission for rate calculation."""
now = time.time()
with _worker_test_history_lock:
if worker_id not in _worker_test_history:
_worker_test_history[worker_id] = []
_worker_test_history[worker_id].append((now, count))
# Prune old entries
cutoff = now - _test_history_window
_worker_test_history[worker_id] = [
(t, c) for t, c in _worker_test_history[worker_id] if t > cutoff
]
def get_worker_test_rate(worker_id):
"""Calculate worker's test rate (tests/sec) over recent window."""
now = time.time()
with _worker_test_history_lock:
if worker_id not in _worker_test_history:
return 0.0
history = _worker_test_history[worker_id]
if not history:
return 0.0
# Sum tests in window and calculate rate
cutoff = now - _test_history_window
in_window = [(t, c) for t, c in history if t > cutoff]
if not in_window:
return 0.0
total_tests = sum(c for t, c in in_window)
oldest = min(t for t, c in in_window)
elapsed = now - oldest
if elapsed < 1:
return 0.0
return total_tests / elapsed
def claim_work(db, worker_id, count=100):
"""Claim a batch of proxies for testing. Returns list of proxy dicts."""
now = time.time()
now_int = int(now)
# Calculate fair batch size based on active workers and queue size
# Distributes work evenly: due_proxies / active_workers (with bounds)
target_count = calculate_fair_batch_size(db, worker_id)
# Clean up stale claims and get current claimed set
with _work_claims_lock:
stale = [k for k, v in _work_claims.items() if now - v['claimed_at'] > _claim_timeout]
for k in stale:
del _work_claims[k]
# Copy current claims to exclude from query
claimed_keys = set(_work_claims.keys())
# Get proxies that need testing
# Priority: untested first, then oldest due - with randomness within tiers
try:
# Build exclusion clause for already-claimed proxies
# Use ip||':'||port to match our claim key format
if claimed_keys:
# SQLite placeholder limit is ~999, chunk if needed
placeholders = ','.join('?' for _ in claimed_keys)
exclude_clause = "AND (ip || ':' || port) NOT IN (%s)" % placeholders
exclude_params = list(claimed_keys)
else:
exclude_clause = ""
exclude_params = []
# Build due condition using new schedule formula
due_condition, due_params = _build_due_condition()
# Priority tiers: 0=untested, 1=very overdue (>1hr), 2=recently due
# Calculate overdue time based on new formula
if _fail_retry_backoff:
overdue_calc = '''
CASE WHEN failed = 0
THEN ? - (tested + ?)
ELSE ? - (tested + (failed * ?))
END
'''
priority_params = [now_int, _working_checktime, now_int, _fail_retry_interval]
else:
overdue_calc = '''
CASE WHEN failed = 0
THEN ? - (tested + ?)
ELSE ? - (tested + ?)
END
'''
priority_params = [now_int, _working_checktime, now_int, _fail_retry_interval]
query = '''
SELECT ip, port, proto, failed,
CASE
WHEN tested IS NULL THEN 0
WHEN (%s) > 3600 THEN 1
ELSE 2
END as priority
FROM proxylist
WHERE %s
%s
ORDER BY priority, RANDOM()
LIMIT ?
''' % (overdue_calc, due_condition, exclude_clause)
params = priority_params + list(due_params) + exclude_params + [target_count]
rows = db.execute(query, params).fetchall()
except Exception as e:
_log('claim_work query error: %s' % e, 'error')
return []
# Claim the fetched proxies (already filtered by query)
claimed = []
with _work_claims_lock:
for row in rows:
proxy_key = '%s:%s' % (row[0], row[1])
# Double-check not claimed (race condition protection)
if proxy_key not in _work_claims:
_work_claims[proxy_key] = {'worker_id': worker_id, 'claimed_at': now}
claimed.append({
'ip': row[0],
'port': row[1],
'proto': row[2],
'failed': row[3],
})
if claimed:
_log('claim_work: %d proxies to %s (pool: %d claimed)' % (
len(claimed), worker_id[:8], len(_work_claims)), 'info')
return claimed
_last_workers_save = 0
def submit_results(db, worker_id, results):
"""Process test results from a worker. Returns count of processed results."""
global _last_workers_save
processed = 0
working_count = 0
total_latency = 0
now = time.time()
with _workers_lock:
if worker_id in _workers:
_workers[worker_id]['last_seen'] = now
for r in results:
proxy_key = '%s:%s' % (r.get('ip', ''), r.get('port', ''))
# Release claim
with _work_claims_lock:
if proxy_key in _work_claims:
del _work_claims[proxy_key]
# Update database - trust workers, add missing proxies if working
try:
working = 1 if r.get('working') else 0
latency_ms = r.get('latency', 0) if working else None
error_cat = r.get('error_category') if not working else None
if working:
# Use INSERT OR REPLACE to add working proxies that don't exist
db.execute('''
INSERT INTO proxylist (proxy, ip, port, proto, failed, tested, avg_latency, added)
VALUES (?, ?, ?, ?, 0, ?, ?, ?)
ON CONFLICT(proxy) DO UPDATE SET
failed = 0,
tested = excluded.tested,
avg_latency = excluded.avg_latency
''', (proxy_key, r['ip'], r['port'], r.get('proto', 'http'), int(now),
latency_ms, int(now)))
working_count += 1
total_latency += latency_ms or 0
# Geolocate working proxy if IP2Location available
if _geolite and _geodb:
try:
rec = _geodb.get_all(r['ip'])
if rec and rec.country_short and rec.country_short != '-':
db.execute(
'UPDATE proxylist SET country=? WHERE proxy=?',
(rec.country_short, proxy_key))
except Exception:
pass # Geolocation is best-effort
else:
# For failures, only update if exists (don't add non-working proxies)
db.execute('''
UPDATE proxylist SET
failed = failed + 1,
tested = ?
WHERE ip = ? AND port = ?
''', (int(now), r['ip'], r['port']))
# Record result for verification system
insert_proxy_result(db, proxy_key, worker_id, working,
latency_ms=latency_ms, error_category=error_cat)
# Check for disagreement with other workers
disagreement, other_worker, other_result = check_for_disagreement(
db, proxy_key, worker_id, working)
if disagreement:
# Queue for manager verification (priority 3 = high)
queue_verification(db, proxy_key, 'disagreement', priority=3,
worker_a=worker_id, worker_b=other_worker,
result_a=working, result_b=other_result)
elif working:
# Check for resurrection: was dead (failed >= 3), now working
row = db.execute(
'SELECT failed FROM proxylist WHERE proxy = ?', (proxy_key,)
).fetchone()
if row and row[0] >= 3:
queue_verification(db, proxy_key, 'resurrection', priority=3,
worker_a=worker_id, result_a=1)
else:
# Check for sudden death: was working (consecutive_success >= 3), now failed
row = db.execute(
'SELECT consecutive_success FROM proxylist WHERE proxy = ?', (proxy_key,)
).fetchone()
if row and row[0] and row[0] >= 3:
queue_verification(db, proxy_key, 'sudden_death', priority=2,
worker_a=worker_id, result_a=0)
processed += 1
except Exception as e:
_log('submit_results db error for %s: %s' % (proxy_key, e), 'error')
# Update worker stats
with _workers_lock:
if worker_id in _workers:
w = _workers[worker_id]
w['jobs_completed'] += 1
w['proxies_tested'] += processed
w['proxies_working'] = w.get('proxies_working', 0) + working_count
w['proxies_failed'] = w.get('proxies_failed', 0) + (processed - working_count)
w['total_latency'] = w.get('total_latency', 0) + total_latency
w['last_batch_size'] = len(results)
w['last_batch_working'] = working_count
# Commit database changes
db.commit()
# Record for test rate calculation
record_test_rate(worker_id, processed)
# Save workers periodically (every 60s)
if now - _last_workers_save > 60:
save_workers()
_last_workers_save = now
return processed
def is_localhost(ip):
"""Check if IP is localhost (127.0.0.0/8 or ::1)."""
if not ip:
return False
# IPv6 localhost
if ip == '::1':
return True
# IPv4 localhost (127.0.0.0/8)
if ip.startswith('127.'):
return True
return False
def check_rate_limit(ip):
"""Check if IP is within rate limit. Returns True if allowed."""
if is_localhost(ip):
return True # No rate limit for localhost
now = time.time()
with _rate_lock:
# Clean old entries
_rate_limits[ip] = [t for t in _rate_limits[ip] if now - t < _rate_limit_window]
if len(_rate_limits[ip]) >= _rate_limit_requests:
return False
_rate_limits[ip].append(now)
return True
def get_security_headers(content_type):
"""Return security headers for responses."""
headers = [
('X-Content-Type-Options', 'nosniff'),
('X-Frame-Options', 'DENY'),
('Referrer-Policy', 'strict-origin-when-cross-origin'),
]
# Add CSP for HTML pages
if 'text/html' in content_type:
headers.append((
'Content-Security-Policy',
"default-src 'self'; "
"script-src 'self' 'unsafe-inline'; "
"style-src 'self' 'unsafe-inline'; "
"img-src 'self' data: https:; "
"connect-src 'self'"
))
return headers
def get_system_stats():
"""Collect system resource statistics."""
stats = {}
# Load average (1, 5, 15 min)
try:
load = os.getloadavg()
stats['load_1m'] = round(load[0], 2)
stats['load_5m'] = round(load[1], 2)
stats['load_15m'] = round(load[2], 2)
except (OSError, AttributeError):
stats['load_1m'] = stats['load_5m'] = stats['load_15m'] = 0
# CPU count
try:
stats['cpu_count'] = os.sysconf('SC_NPROCESSORS_ONLN')
except (ValueError, OSError, AttributeError):
stats['cpu_count'] = 1
# Memory from /proc/meminfo (Linux)
try:
with open('/proc/meminfo', 'r') as f:
meminfo = {}
for line in f:
parts = line.split()
if len(parts) >= 2:
meminfo[parts[0].rstrip(':')] = int(parts[1]) * 1024 # KB to bytes
total = meminfo.get('MemTotal', 0)
available = meminfo.get('MemAvailable', meminfo.get('MemFree', 0))
stats['mem_total'] = total
stats['mem_available'] = available
stats['mem_used'] = total - available
stats['mem_pct'] = round((total - available) / total * 100, 1) if total > 0 else 0
except (IOError, KeyError, ZeroDivisionError):
stats['mem_total'] = stats['mem_available'] = stats['mem_used'] = 0
stats['mem_pct'] = 0
# Disk usage for data directory
try:
st = os.statvfs('data' if os.path.exists('data') else '.')
total = st.f_blocks * st.f_frsize
free = st.f_bavail * st.f_frsize
used = total - free
stats['disk_total'] = total
stats['disk_free'] = free
stats['disk_used'] = used
stats['disk_pct'] = round(used / total * 100, 1) if total > 0 else 0
except (OSError, ZeroDivisionError):
stats['disk_total'] = stats['disk_free'] = stats['disk_used'] = 0
stats['disk_pct'] = 0
# Process stats from /proc/self/status
try:
with open('/proc/self/status', 'r') as f:
for line in f:
if line.startswith('VmRSS:'):
stats['proc_rss'] = int(line.split()[1]) * 1024 # KB to bytes
elif line.startswith('Threads:'):
stats['proc_threads'] = int(line.split()[1])
except (IOError, ValueError, IndexError):
stats['proc_rss'] = 0
stats['proc_threads'] = 0
# Simple RSS tracking for dashboard
global _peak_rss, _start_rss
rss = stats.get('proc_rss', 0)
if rss > 0:
if _start_rss == 0:
_start_rss = rss
if rss > _peak_rss:
_peak_rss = rss
stats['proc_rss_peak'] = _peak_rss
stats['proc_rss_start'] = _start_rss
stats['proc_rss_growth'] = rss - _start_rss if _start_rss > 0 else 0
# GC generation counts (fast - no object iteration)
try:
gc_counts = gc.get_count()
stats['gc_count_gen0'] = gc_counts[0]
stats['gc_count_gen1'] = gc_counts[1]
stats['gc_count_gen2'] = gc_counts[2]
except Exception:
stats['gc_count_gen0'] = stats['gc_count_gen1'] = stats['gc_count_gen2'] = 0
return stats
def get_db_health(db):
"""Get database health and statistics (cached)."""
global _db_health_cache
now = time.time()
if now - _db_health_cache['time'] < _db_health_ttl:
return _db_health_cache['value']
stats = {}
try:
# Database file size
db_path = db.path if hasattr(db, 'path') else 'data/proxies.sqlite'
if os.path.exists(db_path):
stats['db_size'] = os.path.getsize(db_path)
else:
stats['db_size'] = 0
# Page stats from pragma
row = db.execute('PRAGMA page_count').fetchone()
stats['page_count'] = row[0] if row else 0
row = db.execute('PRAGMA page_size').fetchone()
stats['page_size'] = row[0] if row else 0
row = db.execute('PRAGMA freelist_count').fetchone()
stats['freelist_count'] = row[0] if row else 0
# Anonymity breakdown
rows = db.execute(
'SELECT anonymity, COUNT(*) FROM proxylist WHERE failed=0 GROUP BY anonymity'
).fetchall()
stats['anonymity'] = {r[0] or 'unknown': r[1] for r in rows}
# Latency stats
row = db.execute(
'SELECT AVG(avg_latency), MIN(avg_latency), MAX(avg_latency) '
'FROM proxylist WHERE failed=0 AND avg_latency > 0'
).fetchone()
if row and row[0]:
stats['db_avg_latency'] = round(row[0], 1)
stats['db_min_latency'] = round(row[1], 1)
stats['db_max_latency'] = round(row[2], 1)
else:
stats['db_avg_latency'] = stats['db_min_latency'] = stats['db_max_latency'] = 0
# Recent activity
now = int(time.time())
row = db.execute(
'SELECT COUNT(*) FROM proxylist WHERE tested > ?', (now - 3600,)
).fetchone()
stats['tested_last_hour'] = row[0] if row else 0
row = db.execute(
'SELECT COUNT(*) FROM proxylist WHERE added > ?', (now - 86400,)
).fetchone()
stats['added_last_day'] = row[0] if row else 0
# Dead proxies count (permanently dead = -1, failing = positive)
row = db.execute(
'SELECT COUNT(*) FROM proxylist WHERE failed = -1'
).fetchone()
stats['dead_count'] = row[0] if row else 0
# Failing proxies count (positive fail count but not permanently dead)
row = db.execute(
'SELECT COUNT(*) FROM proxylist WHERE failed > 0'
).fetchone()
stats['failing_count'] = row[0] if row else 0
except Exception as e:
_log('get_db_health error: %s' % e, 'warn')
# Update cache
_db_health_cache['value'] = stats
_db_health_cache['time'] = time.time()
return stats
# Detect if gevent has monkey-patched the environment
try:
from gevent import monkey
GEVENT_PATCHED = monkey.is_module_patched('socket')
except ImportError:
GEVENT_PATCHED = False
if GEVENT_PATCHED:
from gevent.pywsgi import WSGIServer
def load_static_libs():
"""Load static library files into cache at startup."""
global _LIB_CACHE
if not os.path.isdir(_STATIC_LIB_DIR):
_log('static/lib directory not found: %s' % _STATIC_LIB_DIR, 'warn')
return
for fname in os.listdir(_STATIC_LIB_DIR):
fpath = os.path.join(_STATIC_LIB_DIR, fname)
if os.path.isfile(fpath):
try:
with open(fpath, 'rb') as f:
_LIB_CACHE[fname] = f.read()
_log('loaded static lib: %s (%d bytes)' % (fname, len(_LIB_CACHE[fname])), 'debug')
except IOError as e:
_log('failed to load %s: %s' % (fname, e), 'warn')
_log('loaded %d static library files' % len(_LIB_CACHE), 'info')
def get_static_lib(filename):
"""Get a cached static library file."""
return _LIB_CACHE.get(filename)
def load_static_files(theme):
"""Load dashboard static files into cache at startup.
Args:
theme: dict of color name -> color value for CSS variable substitution
"""
global _STATIC_CACHE
files = {
'dashboard.html': 'static/dashboard.html',
'map.html': 'static/map.html',
'mitm.html': 'static/mitm.html',
'workers.html': 'static/workers.html',
'style.css': 'static/style.css',
'dashboard.js': 'static/dashboard.js',
'map.js': 'static/map.js',
'mitm.js': 'static/mitm.js',
}
for key, relpath in files.items():
fpath = os.path.join(os.path.dirname(os.path.abspath(__file__)), relpath)
if os.path.isfile(fpath):
try:
with open(fpath, 'rb') as f:
content = f.read()
# Apply theme substitution to CSS
if key == 'style.css' and theme:
for name, val in theme.items():
content = content.replace('{' + name + '}', val)
_STATIC_CACHE[key] = content
_log('loaded static file: %s (%d bytes)' % (key, len(content)), 'debug')
except IOError as e:
_log('failed to load %s: %s' % (fpath, e), 'warn')
else:
_log('static file not found: %s' % fpath, 'warn')
_log('loaded %d dashboard static files' % len(_STATIC_CACHE), 'info')
def get_static_file(filename):
"""Get a cached dashboard static file."""
return _STATIC_CACHE.get(filename)
# Theme colors - dark tiles on lighter background
THEME = {
'bg': '#1e2738',
'card': '#181f2a',
'card_alt': '#212a36',
'border': '#3a4858',
'text': '#e8eef5',
'dim': '#8b929b',
'green': '#3fb950',
'red': '#f85149',
'yellow': '#d29922',
'blue': '#58a6ff',
'purple': '#a371f7',
'cyan': '#39c5cf',
'orange': '#db6d28',
'pink': '#db61a2',
'map_bg': '#1e2738', # Match dashboard background
}
class ProxyAPIHandler(BaseHTTPServer.BaseHTTPRequestHandler):
"""HTTP request handler for proxy API."""
database = None
stats_provider = None
def log_message(self, format, *args):
pass
def send_response_body(self, body, content_type, status=200):
self.send_response(status)
self.send_header('Content-Type', content_type)
self.send_header('Content-Length', len(body))
self.send_header('Cache-Control', 'no-cache')
# Add security headers
for header, value in get_security_headers(content_type):
self.send_header(header, value)
self.end_headers()
self.wfile.write(body)
def send_json(self, data, status=200):
self.send_response_body(json.dumps(data, indent=2), 'application/json', status)
def send_text(self, text, status=200):
self.send_response_body(text, 'text/plain', status)
def send_html(self, html, status=200):
self.send_response_body(html, 'text/html; charset=utf-8', status)
def send_css(self, css, status=200):
self.send_response_body(css, 'text/css; charset=utf-8', status)
def send_js(self, js, status=200):
self.send_response_body(js, 'application/javascript; charset=utf-8', status)
def send_download(self, body, content_type, filename):
"""Send response with Content-Disposition for download."""
self.send_response(200)
self.send_header('Content-Type', content_type)
self.send_header('Content-Length', len(body))
self.send_header('Content-Disposition', 'attachment; filename="%s"' % filename)
for header, value in get_security_headers(content_type):
self.send_header(header, value)
self.end_headers()
self.wfile.write(body)
def do_GET(self):
# Rate limiting check
client_ip = self.client_address[0] if self.client_address else ''
if not check_rate_limit(client_ip):
self.send_response(429)
self.send_header('Content-Type', 'application/json')
self.send_header('Retry-After', str(_rate_limit_window))
for header, value in get_security_headers('application/json'):
self.send_header(header, value)
self.end_headers()
self.wfile.write(json.dumps({'error': 'rate limited', 'retry_after': _rate_limit_window}))
return
path = self.path.split('?')[0]
routes = {
'/': self.handle_index,
'/dashboard': self.handle_dashboard,
'/map': self.handle_map,
'/static/style.css': self.handle_css,
'/static/dashboard.js': self.handle_js,
'/api/stats': self.handle_stats,
'/api/stats/export': self.handle_stats_export,
'/api/countries': self.handle_countries,
'/proxies': self.handle_proxies,
'/proxies/count': self.handle_count,
'/health': self.handle_health,
}
handler = routes.get(path)
if handler:
handler()
else:
self.send_json({'error': 'not found'}, 404)
def handle_index(self):
self.send_json({
'endpoints': {
'/dashboard': 'web dashboard (HTML)',
'/api/stats': 'runtime statistics (JSON)',
'/api/stats/export': 'export stats (params: format=json|csv)',
'/proxies': 'list working proxies (params: limit, proto, country, asn)',
'/proxies/count': 'count working proxies',
'/health': 'health check',
}
})
def handle_dashboard(self):
self.send_html(DASHBOARD_HTML)
def handle_map(self):
self.send_html(MAP_HTML)
def handle_countries(self):
"""Return all countries with proxy counts."""
try:
db = mysqlite.mysqlite(self.database, str)
rows = db.execute(
'SELECT country, COUNT(*) as c FROM proxylist WHERE failed=0 AND country IS NOT NULL '
'GROUP BY country ORDER BY c DESC'
).fetchall()
countries = {r[0]: r[1] for r in rows}
self.send_json({'countries': countries})
except Exception as e:
self.send_json({'error': str(e)}, 500)
def handle_css(self):
self.send_css(DASHBOARD_CSS)
def handle_js(self):
self.send_js(DASHBOARD_JS)
def get_db_stats(self):
"""Get statistics from database."""
try:
db = mysqlite.mysqlite(self.database, str)
stats = {}
# Total counts
row = db.execute('SELECT COUNT(*) FROM proxylist WHERE failed=0 AND proto IS NOT NULL').fetchone()
stats['working'] = row[0] if row else 0
row = db.execute('SELECT COUNT(*) FROM proxylist').fetchone()
stats['total'] = row[0] if row else 0
# By protocol
rows = db.execute(
'SELECT proto, COUNT(*) FROM proxylist WHERE failed=0 GROUP BY proto'
).fetchall()
stats['by_proto'] = {r[0] or 'unknown': r[1] for r in rows}
# Top countries
rows = db.execute(
'SELECT country, COUNT(*) as c FROM proxylist WHERE failed=0 AND country IS NOT NULL '
'GROUP BY country ORDER BY c DESC LIMIT 10'
).fetchall()
stats['top_countries'] = [{'code': r[0], 'count': r[1]} for r in rows]
# Top ASNs
rows = db.execute(
'SELECT asn, COUNT(*) as c FROM proxylist WHERE failed=0 AND asn IS NOT NULL '
'GROUP BY asn ORDER BY c DESC LIMIT 10'
).fetchall()
stats['top_asns'] = [(r[0], r[1]) for r in rows]
return stats
except Exception as e:
return {'error': str(e)}
def handle_stats(self):
stats = {}
# Runtime stats from provider
if self.stats_provider:
try:
stats.update(self.stats_provider())
except Exception as e:
_log('stats_provider error: %s' % str(e), 'error')
# Add system stats
stats['system'] = get_system_stats()
# Add database stats
try:
db = mysqlite.mysqlite(self.database, str)
stats['db'] = self.get_db_stats()
stats['db_health'] = get_db_health(db)
except Exception as e:
_log('handle_stats db error: %s' % e, 'warn')
self.send_json(stats)
def handle_stats_export(self):
"""Export stats as JSON or CSV with download header."""
params = {}
if '?' in self.path:
for pair in self.path.split('?')[1].split('&'):
if '=' in pair:
k, v = pair.split('=', 1)
params[k] = v
fmt = params.get('format', 'json').lower()
timestamp = time.strftime('%Y%m%d_%H%M%S')
# Gather stats
stats = {}
if self.stats_provider:
try:
stats.update(self.stats_provider())
except Exception as e:
_log('stats_provider error in export: %s' % e, 'warn')
stats['system'] = get_system_stats()
stats['exported_at'] = time.strftime('%Y-%m-%d %H:%M:%S')
if fmt == 'csv':
# Flatten stats to CSV rows
lines = ['key,value']
def flatten(obj, prefix=''):
if isinstance(obj, dict):
for k, v in sorted(obj.items()):
flatten(v, '%s%s.' % (prefix, k) if prefix else '%s.' % k)
elif isinstance(obj, (list, tuple)):
for i, v in enumerate(obj):
flatten(v, '%s%d.' % (prefix, i))
else:
key = prefix.rstrip('.')
val = str(obj).replace('"', '""')
lines.append('"%s","%s"' % (key, val))
flatten(stats)
body = '\n'.join(lines)
self.send_download(body, 'text/csv', 'ppf_stats_%s.csv' % timestamp)
else:
body = json.dumps(stats, indent=2)
self.send_download(body, 'application/json', 'ppf_stats_%s.json' % timestamp)
def handle_proxies(self):
params = {}
if '?' in self.path:
for pair in self.path.split('?')[1].split('&'):
if '=' in pair:
k, v = pair.split('=', 1)
params[k] = v
limit = min(int(params.get('limit', 100)), 1000)
proto = params.get('proto', '')
country = params.get('country', '')
asn = params.get('asn', '')
fmt = params.get('format', 'json')
sql = 'SELECT ip, port, proto, country, asn, avg_latency FROM proxylist WHERE failed=0 AND proto IS NOT NULL'
args = []
if proto:
sql += ' AND proto=?'
args.append(proto)
if country:
sql += ' AND country=?'
args.append(country.upper())
if asn:
sql += ' AND asn=?'
args.append(int(asn))
sql += ' ORDER BY avg_latency ASC, tested DESC LIMIT ?'
args.append(limit)
try:
db = mysqlite.mysqlite(self.database, str)
rows = db.execute(sql, args).fetchall()
if fmt == 'plain':
self.send_text('\n'.join('%s:%s' % (r[0], r[1]) for r in rows))
else:
proxies = [{
'ip': r[0], 'port': r[1], 'proto': r[2],
'country': r[3], 'asn': r[4], 'latency': r[5]
} for r in rows]
self.send_json({'count': len(proxies), 'proxies': proxies})
except Exception as e:
self.send_json({'error': str(e)}, 500)
def handle_count(self):
try:
db = mysqlite.mysqlite(self.database, str)
row = db.execute('SELECT COUNT(*) FROM proxylist WHERE failed=0 AND proto IS NOT NULL').fetchone()
self.send_json({'count': row[0] if row else 0})
except Exception as e:
self.send_json({'error': str(e)}, 500)
def handle_health(self):
self.send_json({'status': 'ok', 'version': __version__, 'timestamp': int(time.time())})
class ProxyAPIServer(threading.Thread):
"""Threaded HTTP API server.
Uses gevent's WSGIServer when running in a gevent-patched environment,
otherwise falls back to standard BaseHTTPServer.
"""
def __init__(self, host, port, database, stats_provider=None, profiling=False):
threading.Thread.__init__(self)
self.host = host
self.port = port
self.database = database
self.stats_provider = stats_provider
self.profiling = profiling
self.daemon = True
self.server = None
self._stop_event = threading.Event() if not GEVENT_PATCHED else None
# Load static library files into cache
load_static_libs()
# Load dashboard static files (HTML, CSS, JS) with theme substitution
load_static_files(THEME)
# Load worker registry from disk
load_workers()
# Create verification tables if they don't exist
try:
db = mysqlite.mysqlite(self.database, str)
create_verification_tables(db)
_log('verification tables initialized', 'debug')
except Exception as e:
_log('failed to create verification tables: %s' % e, 'warn')
def _wsgi_app(self, environ, start_response):
"""WSGI application wrapper for gevent."""
path = environ.get('PATH_INFO', '/').split('?')[0]
query_string = environ.get('QUERY_STRING', '')
method = environ.get('REQUEST_METHOD', 'GET')
remote_addr = environ.get('REMOTE_ADDR', '')
# Parse query parameters
query_params = {}
if query_string:
for param in query_string.split('&'):
if '=' in param:
k, v = param.split('=', 1)
query_params[k] = v
# Only allow GET and POST
if method not in ('GET', 'POST'):
start_response('405 Method Not Allowed', [('Content-Type', 'text/plain')])
return [b'Method not allowed']
# POST only allowed for worker API endpoints
post_endpoints = ('/api/register', '/api/results', '/api/heartbeat')
if method == 'POST' and path not in post_endpoints:
start_response('405 Method Not Allowed', [('Content-Type', 'text/plain')])
return [b'POST not allowed for this endpoint']
# Parse POST body
post_data = None
if method == 'POST':
try:
content_length = int(environ.get('CONTENT_LENGTH', 0))
if content_length > 0:
body = environ['wsgi.input'].read(content_length)
post_data = json.loads(body)
except Exception:
error_body = json.dumps({'error': 'invalid JSON body'})
headers = [('Content-Type', 'application/json')]
start_response('400 Bad Request', headers)
return [error_body.encode('utf-8')]
# Rate limiting check
if not check_rate_limit(remote_addr):
error_body = json.dumps({'error': 'rate limited', 'retry_after': _rate_limit_window})
headers = [
('Content-Type', 'application/json'),
('Content-Length', str(len(error_body))),
('Retry-After', str(_rate_limit_window)),
]
headers.extend(get_security_headers('application/json'))
start_response('429 Too Many Requests', headers)
return [error_body]
# Route handling
try:
response_body, content_type, status = self._handle_route(path, remote_addr, query_params, post_data)
status_line = '%d %s' % (status, 'OK' if status == 200 else 'Error')
headers = [
('Content-Type', content_type),
('Content-Length', str(len(response_body))),
('Cache-Control', 'no-cache'),
]
headers.extend(get_security_headers(content_type))
start_response(status_line, headers)
return [response_body.encode('utf-8') if isinstance(response_body, unicode) else response_body]
except Exception as e:
error_body = json.dumps({'error': str(e)})
headers = [
('Content-Type', 'application/json'),
('Content-Length', str(len(error_body))),
]
headers.extend(get_security_headers('application/json'))
start_response('500 Internal Server Error', headers)
return [error_body]
def _handle_route(self, path, remote_addr='', query_params=None, post_data=None):
"""Handle route and return (body, content_type, status)."""
if query_params is None:
query_params = {}
if path == '/':
body = json.dumps({
'endpoints': {
'/dashboard': 'web dashboard (HTML)',
'/map': 'proxy distribution by country (HTML)',
'/mitm': 'MITM certificate search (HTML)',
'/api/dashboard': 'batch endpoint: stats + workers + countries (JSON)',
'/api/stats': 'runtime statistics (JSON)',
'/api/mitm': 'MITM certificate statistics (JSON)',
'/api/countries': 'proxy counts by country (JSON)',
'/api/work': 'get work batch for worker (params: key, count)',
'/api/results': 'submit test results (POST, params: key)',
'/api/register': 'register as worker (POST)',
'/api/workers': 'list connected workers',
'/proxies': 'list working proxies (params: limit, proto, country, asn)',
'/proxies/count': 'count working proxies',
'/health': 'health check',
}
}, indent=2)
return body, 'application/json', 200
elif path == '/dashboard':
content = get_static_file('dashboard.html')
if content:
return content, 'text/html; charset=utf-8', 200
return '{"error": "dashboard.html not loaded"}', 'application/json', 500
elif path == '/map':
content = get_static_file('map.html')
if content:
return content, 'text/html; charset=utf-8', 200
return '{"error": "map.html not loaded"}', 'application/json', 500
elif path == '/mitm':
content = get_static_file('mitm.html')
if content:
return content, 'text/html; charset=utf-8', 200
return '{"error": "mitm.html not loaded"}', 'application/json', 500
elif path == '/workers':
content = get_static_file('workers.html')
if content:
return content, 'text/html; charset=utf-8', 200
return '{"error": "workers.html not loaded"}', 'application/json', 500
elif path == '/static/style.css':
content = get_static_file('style.css')
if content:
return content, 'text/css; charset=utf-8', 200
return '{"error": "style.css not loaded"}', 'application/json', 500
elif path == '/static/dashboard.js':
content = get_static_file('dashboard.js')
if content:
return content, 'application/javascript; charset=utf-8', 200
return '{"error": "dashboard.js not loaded"}', 'application/json', 500
elif path == '/static/map.js':
content = get_static_file('map.js')
if content:
return content, 'application/javascript; charset=utf-8', 200
return '{"error": "map.js not loaded"}', 'application/json', 500
elif path == '/static/mitm.js':
content = get_static_file('mitm.js')
if content:
return content, 'application/javascript; charset=utf-8', 200
return '{"error": "mitm.js not loaded"}', 'application/json', 500
elif path.startswith('/static/lib/'):
# Serve static library files from cache
filename = path.split('/')[-1]
content = get_static_lib(filename)
if content:
ext = os.path.splitext(filename)[1]
content_type = _CONTENT_TYPES.get(ext, 'application/octet-stream')
return content, content_type, 200
return '{"error": "not found"}', 'application/json', 404
elif path == '/api/stats':
stats = {}
if self.stats_provider:
stats = self.stats_provider()
# Add system stats
stats['system'] = get_system_stats()
# Add database stats
try:
db = mysqlite.mysqlite(self.database, str)
stats['db'] = self._get_db_stats(db)
stats['db_health'] = get_db_health(db)
except Exception as e:
_log('api/stats db error: %s' % e, 'warn')
# Add profiling flag (from constructor or stats_provider)
if 'profiling' not in stats:
stats['profiling'] = self.profiling
return json.dumps(stats, indent=2), 'application/json', 200
elif path == '/api/dashboard':
# Batch endpoint combining stats + workers + countries
# Reduces RTT for dashboard - single request instead of multiple
result = {}
# 1. Runtime stats (same as /api/stats)
stats = {}
if self.stats_provider:
try:
stats = self.stats_provider()
except Exception as e:
_log('api/dashboard stats_provider error: %s' % e, 'warn')
stats['system'] = get_system_stats()
if 'profiling' not in stats:
stats['profiling'] = self.profiling
result['stats'] = stats
# 2. Database stats and health
try:
db = mysqlite.mysqlite(self.database, str)
result['stats']['db'] = self._get_db_stats(db)
result['stats']['db_health'] = get_db_health(db)
# 3. Countries (same as /api/countries)
rows = db.execute(
'SELECT country, COUNT(*) as c FROM proxylist WHERE failed=0 AND country IS NOT NULL '
'GROUP BY country ORDER BY c DESC'
).fetchall()
result['countries'] = {r[0]: r[1] for r in rows}
# 4. Workers (same as /api/workers)
result['workers'] = self._get_workers_data(db)
db.close()
except Exception as e:
_log('api/dashboard db error: %s' % e, 'warn')
result['countries'] = {}
result['workers'] = {'workers': [], 'total': 0, 'active': 0}
return json.dumps(result, indent=2), 'application/json', 200
elif path == '/api/mitm':
# MITM certificate statistics
if self.stats_provider:
try:
stats = self.stats_provider()
mitm = stats.get('mitm', {})
return json.dumps(mitm, indent=2), 'application/json', 200
except Exception as e:
return json.dumps({'error': str(e)}), 'application/json', 500
return json.dumps({'error': 'stats not available'}), 'application/json', 500
elif path == '/api/countries':
try:
db = mysqlite.mysqlite(self.database, str)
rows = db.execute(
'SELECT country, COUNT(*) as c FROM proxylist WHERE failed=0 AND country IS NOT NULL '
'GROUP BY country ORDER BY c DESC'
).fetchall()
countries = {r[0]: r[1] for r in rows}
return json.dumps({'countries': countries}, indent=2), 'application/json', 200
except Exception as e:
return json.dumps({'error': str(e)}), 'application/json', 500
elif path == '/api/locations':
# Return proxy locations aggregated by lat/lon grid (0.5 degree cells)
try:
db = mysqlite.mysqlite(self.database, str)
rows = db.execute(
'SELECT ROUND(latitude, 1) as lat, ROUND(longitude, 1) as lon, '
'country, anonymity, COUNT(*) as c FROM proxylist '
'WHERE failed=0 AND latitude IS NOT NULL AND longitude IS NOT NULL '
'GROUP BY lat, lon, country, anonymity ORDER BY c DESC'
).fetchall()
locations = [{'lat': r[0], 'lon': r[1], 'country': r[2], 'anon': r[3] or 'unknown', 'count': r[4]} for r in rows]
return json.dumps({'locations': locations}, indent=2), 'application/json', 200
except Exception as e:
return json.dumps({'error': str(e)}), 'application/json', 500
elif path == '/proxies':
try:
limit = min(int(query_params.get('limit', 100)), 1000)
proto = query_params.get('proto', '')
country = query_params.get('country', '')
asn = query_params.get('asn', '')
fmt = query_params.get('format', 'json')
sql = 'SELECT ip, port, proto, country, asn, avg_latency FROM proxylist WHERE failed=0 AND proto IS NOT NULL'
args = []
if proto:
sql += ' AND proto=?'
args.append(proto)
if country:
sql += ' AND country=?'
args.append(country.upper())
if asn:
sql += ' AND asn=?'
args.append(int(asn))
sql += ' ORDER BY avg_latency ASC, tested DESC LIMIT ?'
args.append(limit)
db = mysqlite.mysqlite(self.database, str)
rows = db.execute(sql, args).fetchall()
if fmt == 'plain':
return '\n'.join('%s:%s' % (r[0], r[1]) for r in rows), 'text/plain', 200
proxies = [{'proxy': '%s:%s' % (r[0], r[1]), 'proto': r[2], 'country': r[3], 'asn': r[4], 'latency': r[5]} for r in rows]
return json.dumps({'count': len(proxies), 'proxies': proxies}, indent=2), 'application/json', 200
except Exception as e:
return json.dumps({'error': str(e)}), 'application/json', 500
elif path == '/proxies/count':
try:
db = mysqlite.mysqlite(self.database, str)
row = db.execute('SELECT COUNT(*) FROM proxylist WHERE failed=0 AND proto IS NOT NULL').fetchone()
return json.dumps({'count': row[0] if row else 0}), 'application/json', 200
except Exception as e:
return json.dumps({'error': str(e)}), 'application/json', 500
elif path == '/health':
return json.dumps({'status': 'ok', 'version': __version__, 'timestamp': int(time.time())}), 'application/json', 200
# Worker API endpoints
elif path == '/api/register':
# Register a new worker (POST)
if not post_data:
return json.dumps({'error': 'POST body required'}), 'application/json', 400
name = post_data.get('name', 'worker-%s' % remote_addr)
master_key = post_data.get('master_key', '')
# Require master key for registration (if set)
if _master_key and master_key != _master_key:
return json.dumps({'error': 'invalid master key'}), 'application/json', 403
worker_id, worker_key = register_worker(name, remote_addr)
_log('worker registered: %s (%s) from %s' % (name, worker_id, remote_addr), 'info')
return json.dumps({
'worker_id': worker_id,
'worker_key': worker_key,
'message': 'registered successfully',
}), 'application/json', 200
elif path == '/api/work':
# Get batch of proxies to test (GET)
key = query_params.get('key', '')
if not validate_worker_key(key):
return json.dumps({'error': 'invalid worker key'}), 'application/json', 403
worker_id, _ = get_worker_by_key(key)
if not worker_id:
return json.dumps({'error': 'worker not found'}), 'application/json', 404
count = int(query_params.get('count', 100))
count = min(count, 500) # Cap at 500
try:
db = mysqlite.mysqlite(self.database, str)
proxies = claim_work(db, worker_id, count)
update_worker_heartbeat(worker_id)
return json.dumps({
'worker_id': worker_id,
'count': len(proxies),
'proxies': proxies,
}), 'application/json', 200
except Exception as e:
return json.dumps({'error': str(e)}), 'application/json', 500
elif path == '/api/results':
# Submit test results (POST)
key = query_params.get('key', '')
if not validate_worker_key(key):
return json.dumps({'error': 'invalid worker key'}), 'application/json', 403
worker_id, _ = get_worker_by_key(key)
if not worker_id:
return json.dumps({'error': 'worker not found'}), 'application/json', 404
if not post_data:
return json.dumps({'error': 'POST body required'}), 'application/json', 400
results = post_data.get('results', [])
if not results:
return json.dumps({'error': 'no results provided'}), 'application/json', 400
working = sum(1 for r in results if r.get('working'))
_log('results: %d from %s (%d working)' % (len(results), worker_id[:8], working), 'info')
try:
db = mysqlite.mysqlite(self.database, str)
processed = submit_results(db, worker_id, results)
return json.dumps({
'worker_id': worker_id,
'processed': processed,
'message': 'results submitted',
}), 'application/json', 200
except Exception as e:
return json.dumps({'error': str(e)}), 'application/json', 500
elif path == '/api/heartbeat':
# Worker heartbeat with Tor status (POST)
key = query_params.get('key', '')
if not validate_worker_key(key):
return json.dumps({'error': 'invalid worker key'}), 'application/json', 403
worker_id, _ = get_worker_by_key(key)
if not worker_id:
return json.dumps({'error': 'worker not found'}), 'application/json', 404
# Update worker Tor, profiling, and thread status
tor_ok = post_data.get('tor_ok', True) if post_data else True
tor_ip = post_data.get('tor_ip') if post_data else None
profiling = post_data.get('profiling', False) if post_data else False
threads = post_data.get('threads', 0) if post_data else 0
now = time.time()
with _workers_lock:
if worker_id in _workers:
_workers[worker_id]['last_seen'] = now
_workers[worker_id]['tor_ok'] = tor_ok
_workers[worker_id]['tor_ip'] = tor_ip
_workers[worker_id]['tor_last_check'] = now
_workers[worker_id]['profiling'] = profiling
_workers[worker_id]['threads'] = threads
return json.dumps({
'worker_id': worker_id,
'message': 'heartbeat received',
}), 'application/json', 200
elif path == '/api/workers':
# List connected workers
try:
db = mysqlite.mysqlite(self.database, str)
workers_data = self._get_workers_data(db)
db.close()
return json.dumps(workers_data, indent=2), 'application/json', 200
except Exception as e:
_log('api/workers error: %s' % e, 'warn')
return json.dumps({'error': str(e)}), 'application/json', 500
else:
return json.dumps({'error': 'not found'}), 'application/json', 404
def _get_db_stats(self, db):
"""Get database statistics."""
stats = {}
try:
# By protocol
rows = db.execute(
'SELECT proto, COUNT(*) FROM proxylist WHERE failed=0 GROUP BY proto'
).fetchall()
stats['by_proto'] = {r[0]: r[1] for r in rows if r[0]}
# Top countries
rows = db.execute(
'SELECT country, COUNT(*) as cnt FROM proxylist WHERE failed=0 AND country IS NOT NULL '
'GROUP BY country ORDER BY cnt DESC LIMIT 10'
).fetchall()
stats['top_countries'] = [{'code': r[0], 'count': r[1]} for r in rows]
# Total counts
row = db.execute('SELECT COUNT(*) FROM proxylist WHERE failed=0 AND proto IS NOT NULL').fetchone()
stats['working'] = row[0] if row else 0
row = db.execute('SELECT COUNT(*) FROM proxylist').fetchone()
stats['total'] = row[0] if row else 0
# Proxies due for testing (eligible for worker claims)
due_condition, due_params = _build_due_condition()
row = db.execute(
'SELECT COUNT(*) FROM proxylist WHERE ' + due_condition,
due_params).fetchone()
due_total = row[0] if row else 0
# Subtract currently claimed
with _work_claims_lock:
claimed_count = len(_work_claims)
stats['due'] = max(0, due_total - claimed_count)
stats['claimed'] = claimed_count
except Exception as e:
_log('_get_db_stats error: %s' % e, 'warn')
return stats
def _get_workers_data(self, db):
"""Get worker status data. Used by /api/workers and /api/dashboard."""
now = time.time()
with _workers_lock:
workers = []
total_tested = 0
total_working = 0
total_failed = 0
for wid, info in _workers.items():
tested = info.get('proxies_tested', 0)
working = info.get('proxies_working', 0)
failed = info.get('proxies_failed', 0)
total_latency = info.get('total_latency', 0)
avg_latency = round(total_latency / working, 2) if working > 0 else 0
success_rate = round(100 * working / tested, 1) if tested > 0 else 0
total_tested += tested
total_working += working
total_failed += failed
# Tor status with age check
tor_ok = info.get('tor_ok', True)
tor_ip = info.get('tor_ip')
tor_last_check = info.get('tor_last_check', 0)
tor_age = int(now - tor_last_check) if tor_last_check else None
worker_profiling = info.get('profiling', False)
worker_threads = info.get('threads', 0)
# Calculate test rate for this worker
test_rate = get_worker_test_rate(wid)
workers.append({
'id': wid,
'name': info['name'],
'ip': info['ip'],
'registered': int(info['registered']),
'last_seen': int(info['last_seen']),
'age': int(now - info['last_seen']),
'jobs_completed': info.get('jobs_completed', 0),
'proxies_tested': tested,
'proxies_working': working,
'proxies_failed': failed,
'success_rate': success_rate,
'avg_latency': avg_latency,
'last_batch_size': info.get('last_batch_size', 0),
'last_batch_working': info.get('last_batch_working', 0),
'active': (now - info['last_seen']) < 120,
'test_rate': round(test_rate, 2),
'tor_ok': tor_ok,
'tor_ip': tor_ip,
'tor_age': tor_age,
'profiling': worker_profiling,
'threads': worker_threads,
})
# Sort by name for consistent display
workers.sort(key=lambda w: w['name'])
# Get queue status from database
queue_stats = {
'pending': 0, 'claimed': 0, 'due': 0, 'total': 0,
'untested': 0, 'session_tested': 0, 'session_pct': 0,
'session_start': _session_start_time
}
try:
# Total proxies in database
row = db.execute('SELECT COUNT(*) FROM proxylist').fetchone()
queue_stats['total'] = row[0] if row else 0
# Never tested (tested IS NULL)
row = db.execute('SELECT COUNT(*) FROM proxylist WHERE tested IS NULL').fetchone()
queue_stats['untested'] = row[0] if row else 0
# Tested this session (since httpd started)
row = db.execute('SELECT COUNT(*) FROM proxylist WHERE tested >= ?', (_session_start_time,)).fetchone()
queue_stats['session_tested'] = row[0] if row else 0
# Pending = total eligible (not dead)
row = db.execute('SELECT COUNT(*) FROM proxylist WHERE failed >= 0 AND failed < 5').fetchone()
queue_stats['pending'] = row[0] if row else 0
# Session progress percentage (tested / total, capped at 100%)
if queue_stats['total'] > 0:
pct = 100.0 * queue_stats['session_tested'] / queue_stats['total']
queue_stats['session_pct'] = round(min(pct, 100.0), 1)
# Claimed = currently being tested by workers
with _work_claims_lock:
queue_stats['claimed'] = len(_work_claims)
# Due = ready for testing (respecting cooldown)
due_condition, due_params = _build_due_condition()
row = db.execute(
'SELECT COUNT(*) FROM proxylist WHERE ' + due_condition,
due_params).fetchone()
due_total = row[0] if row else 0
queue_stats['due'] = max(0, due_total - queue_stats['claimed'])
except Exception as e:
_log('_get_workers_data queue stats error: %s' % e, 'warn')
# Calculate combined test rate from active workers
combined_rate = sum(w['test_rate'] for w in workers if w['active'])
# Get manager's own stats (local proxy testing)
manager_stats = None
if self.stats_provider:
try:
stats = self.stats_provider()
threads = stats.get('threads', 0)
if threads > 0: # Manager has local testing enabled
manager_stats = {
'threads': threads,
'tested': stats.get('tested', 0),
'passed': stats.get('passed', 0),
'rate': round(stats.get('recent_rate', 0), 2),
'success_rate': round(stats.get('success_rate', 0), 1),
'uptime': stats.get('uptime_seconds', 0),
'queue_size': stats.get('queue_size', 0),
}
except Exception as e:
_log('_get_workers_data manager stats error: %s' % e, 'warn')
# Get verification stats and worker trust
verification_stats = {'queue_size': 0, 'queue_by_trigger': {}}
worker_trust_data = {}
try:
verification_stats = get_verification_stats(db)
worker_trust_data = get_all_worker_trust(db)
except Exception as e:
_log('_get_workers_data verification stats error: %s' % e, 'warn')
# Add trust scores to workers
for w in workers:
wid = w['id']
if wid in worker_trust_data:
trust = worker_trust_data[wid]
w['trust_score'] = round(trust['trust_score'], 2)
w['verifications'] = trust['verifications']
w['trust_correct'] = trust['correct']
w['trust_incorrect'] = trust['incorrect']
else:
w['trust_score'] = 1.0 # Default for new workers
w['verifications'] = 0
w['trust_correct'] = 0
w['trust_incorrect'] = 0
return {
'workers': workers,
'total': len(workers),
'active': sum(1 for w in workers if w['active']),
'manager': manager_stats,
'summary': {
'total_tested': total_tested,
'total_working': total_working,
'total_failed': total_failed,
'overall_success_rate': round(100 * total_working / total_tested, 1) if total_tested > 0 else 0,
'combined_rate': round(combined_rate, 2),
},
'queue': queue_stats,
'verification': verification_stats,
}
def run(self):
ProxyAPIHandler.database = self.database
ProxyAPIHandler.stats_provider = self.stats_provider
if GEVENT_PATCHED:
# Use gevent's WSGIServer for proper async handling
self.server = WSGIServer((self.host, self.port), self._wsgi_app, log=None)
_log('httpd listening on %s:%d (gevent)' % (self.host, self.port), 'info')
self.server.serve_forever()
else:
# Standard BaseHTTPServer for non-gevent environments
self.server = BaseHTTPServer.HTTPServer((self.host, self.port), ProxyAPIHandler)
_log('httpd listening on %s:%d' % (self.host, self.port), 'info')
self.server.serve_forever()
def stop(self):
if self.server:
if GEVENT_PATCHED:
self.server.stop()
else:
self.server.shutdown()
if __name__ == '__main__':
import sys
host = '127.0.0.1'
port = 8081
database = 'data/proxies.sqlite'
if len(sys.argv) > 1:
database = sys.argv[1]
_log('starting test server on %s:%d (db: %s)' % (host, port, database), 'info')
server = ProxyAPIServer(host, port, database)
server.start()
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
server.stop()
_log('server stopped', 'info')