Files
ppf/dbs.py
Username 53f37510f3 dashboard: add system monitoring and enhanced stats
- prominent check type badge in header (SSL/judges/http/irc)
- system monitor bar: load, memory, disk, process RSS
- anonymity breakdown: elite/anonymous/transparent counts
- database health: size, recent activity, dead proxy count
- enhanced Tor pool stats: requests, success rate, latency
- SQLite ANALYZE/VACUUM functions for query optimization
- database statistics API functions
2025-12-23 17:47:12 +01:00

556 lines
19 KiB
Python

#!/usr/bin/env python2
# -*- coding: utf-8 -*-
"""Database table creation and insertion utilities."""
import time
from misc import _log
def _migrate_latency_columns(sqlite):
"""Add latency columns to existing databases."""
try:
sqlite.execute('SELECT avg_latency FROM proxylist LIMIT 1')
except Exception:
sqlite.execute('ALTER TABLE proxylist ADD COLUMN avg_latency REAL DEFAULT 0')
sqlite.execute('ALTER TABLE proxylist ADD COLUMN latency_samples INT DEFAULT 0')
sqlite.commit()
def _migrate_anonymity_columns(sqlite):
"""Add anonymity detection columns to existing databases."""
try:
sqlite.execute('SELECT anonymity FROM proxylist LIMIT 1')
except Exception:
# anonymity: transparent, anonymous, elite, or NULL (unknown)
sqlite.execute('ALTER TABLE proxylist ADD COLUMN anonymity TEXT')
# exit_ip: the IP seen by the target server
sqlite.execute('ALTER TABLE proxylist ADD COLUMN exit_ip TEXT')
sqlite.commit()
def _migrate_asn_column(sqlite):
"""Add ASN column to existing databases."""
try:
sqlite.execute('SELECT asn FROM proxylist LIMIT 1')
except Exception:
sqlite.execute('ALTER TABLE proxylist ADD COLUMN asn INT')
sqlite.commit()
def _migrate_content_hash_column(sqlite):
"""Add content_hash column to uris table for duplicate detection."""
try:
sqlite.execute('SELECT content_hash FROM uris LIMIT 1')
except Exception:
sqlite.execute('ALTER TABLE uris ADD COLUMN content_hash TEXT')
sqlite.commit()
def compute_proxy_list_hash(proxies):
"""Compute MD5 hash of sorted proxy list for change detection.
Args:
proxies: List of proxy strings (ip:port) or tuples (address, proto)
Returns:
Hexadecimal MD5 hash string, or None if list is empty
"""
if not proxies:
return None
import hashlib
# Handle both tuple (address, proto) and plain string formats
addresses = [p[0] if isinstance(p, tuple) else p for p in proxies]
sorted_list = '\n'.join(sorted(addresses))
return hashlib.md5(sorted_list.encode('utf-8') if hasattr(sorted_list, 'encode') else sorted_list).hexdigest()
def update_proxy_latency(sqlite, proxy, latency_ms):
"""Update rolling average latency for a proxy.
Args:
sqlite: Database connection
proxy: Proxy address (ip:port)
latency_ms: Response latency in milliseconds
"""
row = sqlite.execute(
'SELECT avg_latency, latency_samples FROM proxylist WHERE proxy=?',
(proxy,)
).fetchone()
if row:
old_avg, samples = row[0] or 0, row[1] or 0
# Exponential moving average, capped at 100 samples
new_samples = min(samples + 1, 100)
if samples == 0:
new_avg = latency_ms
else:
# Weight recent samples more heavily
alpha = 2.0 / (new_samples + 1)
new_avg = alpha * latency_ms + (1 - alpha) * old_avg
sqlite.execute(
'UPDATE proxylist SET avg_latency=?, latency_samples=? WHERE proxy=?',
(new_avg, new_samples, proxy)
)
def update_proxy_anonymity(sqlite, proxy, exit_ip, proxy_ip, reveals_headers=None):
"""Update anonymity level based on exit IP and header analysis.
Anonymity levels:
transparent: exit_ip == proxy_ip (proxy reveals itself)
anonymous: exit_ip != proxy_ip, adds X-Forwarded-For/Via headers
elite: exit_ip != proxy_ip, no revealing headers
Args:
sqlite: Database connection
proxy: Proxy address (ip:port)
exit_ip: IP address seen by target server
proxy_ip: Proxy's IP address
reveals_headers: True if proxy adds revealing headers, False if not, None if unknown
"""
if not exit_ip:
return
# Normalize IPs (remove leading zeros)
def normalize_ip(ip):
if not ip:
return None
parts = ip.strip().split('.')
if len(parts) != 4:
return None
try:
return '.'.join(str(int(p)) for p in parts)
except ValueError:
return None
exit_ip = normalize_ip(exit_ip)
proxy_ip = normalize_ip(proxy_ip)
if not exit_ip:
return
# Determine anonymity level
if exit_ip == proxy_ip:
anonymity = 'transparent'
elif reveals_headers is False:
anonymity = 'elite'
elif reveals_headers is True:
anonymity = 'anonymous'
else:
# No header check performed, conservative default
anonymity = 'anonymous'
sqlite.execute(
'UPDATE proxylist SET anonymity=?, exit_ip=? WHERE proxy=?',
(anonymity, exit_ip, proxy)
)
def create_table_if_not_exists(sqlite, dbname):
"""Create database table with indexes if it doesn't exist."""
if dbname == 'proxylist':
sqlite.execute("""CREATE TABLE IF NOT EXISTS proxylist (
proxy BLOB UNIQUE,
country BLOB,
added INT,
failed INT,
tested INT,
dronebl INT,
proto TEXT,
mitm INT,
success_count INT,
ip TEXT,
port INT,
consecutive_success INT,
total_duration INT,
avg_latency REAL DEFAULT 0,
latency_samples INT DEFAULT 0,
anonymity TEXT,
exit_ip TEXT,
asn INT)""")
# Migration: add columns to existing databases (must run before creating indexes)
_migrate_latency_columns(sqlite)
_migrate_anonymity_columns(sqlite)
_migrate_asn_column(sqlite)
# Indexes for common query patterns
sqlite.execute('CREATE INDEX IF NOT EXISTS idx_proxylist_failed ON proxylist(failed)')
sqlite.execute('CREATE INDEX IF NOT EXISTS idx_proxylist_tested ON proxylist(tested)')
sqlite.execute('CREATE INDEX IF NOT EXISTS idx_proxylist_proto ON proxylist(proto)')
sqlite.execute('CREATE INDEX IF NOT EXISTS idx_proxylist_anonymity ON proxylist(anonymity)')
sqlite.execute('CREATE INDEX IF NOT EXISTS idx_proxylist_asn ON proxylist(asn)')
elif dbname == 'uris':
sqlite.execute("""CREATE TABLE IF NOT EXISTS uris (
url TEXT UNIQUE,
content_type TEXT,
check_time INT,
error INT,
stale_count INT,
retrievals INT,
proxies_added INT,
added INT,
content_hash TEXT)""")
# Migration for existing databases
_migrate_content_hash_column(sqlite)
# Indexes for common query patterns
sqlite.execute('CREATE INDEX IF NOT EXISTS idx_uris_error ON uris(error)')
sqlite.execute('CREATE INDEX IF NOT EXISTS idx_uris_checktime ON uris(check_time)')
elif dbname == 'stats_history':
# Hourly stats snapshots for historical graphs
sqlite.execute("""CREATE TABLE IF NOT EXISTS stats_history (
timestamp INT PRIMARY KEY,
tested INT,
passed INT,
failed INT,
success_rate REAL,
avg_latency REAL,
ssl_tested INT,
ssl_passed INT,
mitm_detected INT,
proto_http INT,
proto_socks4 INT,
proto_socks5 INT)""")
sqlite.execute('CREATE INDEX IF NOT EXISTS idx_stats_history_ts ON stats_history(timestamp)')
elif dbname == 'session_state':
# Single-row table for persisting session state across restarts
sqlite.execute("""CREATE TABLE IF NOT EXISTS session_state (
id INT PRIMARY KEY DEFAULT 1,
tested INT,
passed INT,
failed INT,
ssl_tested INT,
ssl_passed INT,
ssl_failed INT,
mitm_detected INT,
cert_errors INT,
proto_http_tested INT,
proto_http_passed INT,
proto_socks4_tested INT,
proto_socks4_passed INT,
proto_socks5_tested INT,
proto_socks5_passed INT,
peak_rate REAL,
start_time INT,
last_save INT,
fail_categories TEXT,
country_passed TEXT,
asn_passed TEXT)""")
sqlite.commit()
def insert_proxies(proxydb, proxies, url):
"""Insert new proxies into database.
Args:
proxydb: Database connection
proxies: List of (address, proto) tuples or plain address strings
url: Source URL for logging
"""
if not proxies:
return
timestamp = int(time.time())
rows = []
for p in proxies:
# Handle both tuple (address, proto) and plain string formats
if isinstance(p, tuple):
addr, proto = p
else:
addr, proto = p, None
ip, port = addr.split(':')
rows.append((timestamp, addr, ip, port, proto, 3, 0, 0, 0, 0, 0))
proxydb.executemany(
'INSERT OR IGNORE INTO proxylist '
'(added,proxy,ip,port,proto,failed,tested,success_count,total_duration,mitm,consecutive_success) '
'VALUES (?,?,?,?,?,?,?,?,?,?,?)',
rows
)
proxydb.commit()
_log('+%d proxy/ies from %s' % (len(proxies), url), 'added')
def insert_urls(urls, search, sqlite):
"""Insert new URLs into database."""
if not urls:
return
timestamp = int(time.time())
rows = [(timestamp, u, 0, 1, 0, 0, 0) for u in urls]
sqlite.executemany(
'INSERT OR IGNORE INTO uris '
'(added,url,check_time,error,stale_count,retrievals,proxies_added) '
'VALUES (?,?,?,?,?,?,?)',
rows
)
sqlite.commit()
_log('+%d url(s) from %s' % (len(urls), search), 'added')
# Known proxy list sources (GitHub raw lists, APIs)
PROXY_SOURCES = [
# TheSpeedX/PROXY-List - large, hourly updates
'https://raw.githubusercontent.com/TheSpeedX/PROXY-List/master/http.txt',
'https://raw.githubusercontent.com/TheSpeedX/PROXY-List/master/socks4.txt',
'https://raw.githubusercontent.com/TheSpeedX/PROXY-List/master/socks5.txt',
# clarketm/proxy-list - curated, daily
'https://raw.githubusercontent.com/clarketm/proxy-list/master/proxy-list-raw.txt',
# monosans/proxy-list - hourly updates
'https://raw.githubusercontent.com/monosans/proxy-list/main/proxies/http.txt',
'https://raw.githubusercontent.com/monosans/proxy-list/main/proxies/socks4.txt',
'https://raw.githubusercontent.com/monosans/proxy-list/main/proxies/socks5.txt',
# jetkai/proxy-list - 10 min updates
'https://raw.githubusercontent.com/jetkai/proxy-list/main/online-proxies/txt/proxies.txt',
# roosterkid/openproxylist
'https://raw.githubusercontent.com/roosterkid/openproxylist/main/HTTPS_RAW.txt',
'https://raw.githubusercontent.com/roosterkid/openproxylist/main/SOCKS4_RAW.txt',
'https://raw.githubusercontent.com/roosterkid/openproxylist/main/SOCKS5_RAW.txt',
# ShiftyTR/Proxy-List
'https://raw.githubusercontent.com/ShiftyTR/Proxy-List/master/http.txt',
'https://raw.githubusercontent.com/ShiftyTR/Proxy-List/master/socks4.txt',
'https://raw.githubusercontent.com/ShiftyTR/Proxy-List/master/socks5.txt',
# mmpx12/proxy-list
'https://raw.githubusercontent.com/mmpx12/proxy-list/master/http.txt',
'https://raw.githubusercontent.com/mmpx12/proxy-list/master/socks4.txt',
'https://raw.githubusercontent.com/mmpx12/proxy-list/master/socks5.txt',
# proxyscrape API
'https://api.proxyscrape.com/v2/?request=displayproxies&protocol=http&timeout=10000&country=all',
'https://api.proxyscrape.com/v2/?request=displayproxies&protocol=socks4&timeout=10000&country=all',
'https://api.proxyscrape.com/v2/?request=displayproxies&protocol=socks5&timeout=10000&country=all',
]
def seed_proxy_sources(sqlite):
"""Seed known proxy list sources into uris table."""
timestamp = int(time.time())
added = 0
for url in PROXY_SOURCES:
try:
sqlite.execute(
'INSERT OR IGNORE INTO uris '
'(added,url,check_time,error,stale_count,retrievals,proxies_added) '
'VALUES (?,?,?,?,?,?,?)',
(timestamp, url, 0, 0, 0, 0, 0)
)
if sqlite.cursor.rowcount > 0:
added += 1
except Exception:
pass
sqlite.commit()
if added > 0:
_log('seeded %d proxy source URLs' % added, 'info')
def save_session_state(sqlite, stats):
"""Save session state to database for persistence across restarts.
Args:
sqlite: Database connection
stats: Stats object from proxywatchd
"""
import json
now = int(time.time())
# Serialize dicts as JSON
fail_cats_json = json.dumps(dict(stats.fail_categories))
country_json = json.dumps(dict(stats.country_passed))
asn_json = json.dumps(dict(stats.asn_passed))
sqlite.execute('''INSERT OR REPLACE INTO session_state
(id, tested, passed, failed, ssl_tested, ssl_passed, ssl_failed,
mitm_detected, cert_errors, proto_http_tested, proto_http_passed,
proto_socks4_tested, proto_socks4_passed, proto_socks5_tested, proto_socks5_passed,
peak_rate, start_time, last_save, fail_categories, country_passed, asn_passed)
VALUES (1, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)''',
(stats.tested, stats.passed, stats.failed,
stats.ssl_tested, stats.ssl_passed, stats.ssl_failed,
stats.mitm_detected, stats.cert_errors,
stats.proto_tested.get('http', 0), stats.proto_passed.get('http', 0),
stats.proto_tested.get('socks4', 0), stats.proto_passed.get('socks4', 0),
stats.proto_tested.get('socks5', 0), stats.proto_passed.get('socks5', 0),
stats.peak_rate, int(stats.start_time), now,
fail_cats_json, country_json, asn_json))
sqlite.commit()
def load_session_state(sqlite):
"""Load session state from database.
Args:
sqlite: Database connection
Returns:
dict with state fields, or None if no saved state
"""
import json
try:
row = sqlite.execute(
'SELECT * FROM session_state WHERE id=1'
).fetchone()
if not row:
return None
# Map column names to values
cols = ['id', 'tested', 'passed', 'failed', 'ssl_tested', 'ssl_passed',
'ssl_failed', 'mitm_detected', 'cert_errors',
'proto_http_tested', 'proto_http_passed',
'proto_socks4_tested', 'proto_socks4_passed',
'proto_socks5_tested', 'proto_socks5_passed',
'peak_rate', 'start_time', 'last_save',
'fail_categories', 'country_passed', 'asn_passed']
state = dict(zip(cols, row))
# Parse JSON fields
if state.get('fail_categories'):
state['fail_categories'] = json.loads(state['fail_categories'])
else:
state['fail_categories'] = {}
if state.get('country_passed'):
state['country_passed'] = json.loads(state['country_passed'])
else:
state['country_passed'] = {}
if state.get('asn_passed'):
state['asn_passed'] = json.loads(state['asn_passed'])
else:
state['asn_passed'] = {}
return state
except Exception as e:
_log('failed to load session state: %s' % str(e), 'warn')
return None
def save_stats_snapshot(sqlite, stats):
"""Save hourly stats snapshot for historical graphs.
Args:
sqlite: Database connection
stats: Stats object from proxywatchd
"""
now = int(time.time())
# Round to nearest hour
hour_ts = (now // 3600) * 3600
success_rate = 0
if stats.tested > 0:
success_rate = (stats.passed * 100.0) / stats.tested
avg_latency = 0
if stats.latency_count > 0:
avg_latency = stats.latency_sum / stats.latency_count
sqlite.execute('''INSERT OR REPLACE INTO stats_history
(timestamp, tested, passed, failed, success_rate, avg_latency,
ssl_tested, ssl_passed, mitm_detected, proto_http, proto_socks4, proto_socks5)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)''',
(hour_ts, stats.tested, stats.passed, stats.failed,
success_rate, avg_latency,
stats.ssl_tested, stats.ssl_passed, stats.mitm_detected,
stats.proto_passed.get('http', 0),
stats.proto_passed.get('socks4', 0),
stats.proto_passed.get('socks5', 0)))
sqlite.commit()
def get_stats_history(sqlite, hours=24):
"""Get historical stats for the last N hours.
Args:
sqlite: Database connection
hours: Number of hours of history to retrieve
Returns:
List of dicts with hourly stats
"""
now = int(time.time())
since = now - (hours * 3600)
rows = sqlite.execute(
'SELECT * FROM stats_history WHERE timestamp >= ? ORDER BY timestamp',
(since,)
).fetchall()
cols = ['timestamp', 'tested', 'passed', 'failed', 'success_rate',
'avg_latency', 'ssl_tested', 'ssl_passed', 'mitm_detected',
'proto_http', 'proto_socks4', 'proto_socks5']
return [dict(zip(cols, row)) for row in rows]
def analyze_database(sqlite):
"""Run ANALYZE to update SQLite query planner statistics.
Should be called periodically (e.g., hourly) for optimal query performance.
Also enables stat4 for better index statistics on complex queries.
Args:
sqlite: Database connection
"""
try:
# Enable advanced statistics (persists in database)
sqlite.execute('PRAGMA analysis_limit=1000')
# Run ANALYZE on all tables and indexes
sqlite.execute('ANALYZE')
sqlite.commit()
_log('database ANALYZE completed', 'debug')
except Exception as e:
_log('database ANALYZE failed: %s' % str(e), 'warn')
def vacuum_database(sqlite):
"""Run VACUUM to reclaim unused space and defragment database.
Should be called infrequently (e.g., daily or weekly) as it's expensive.
Requires no active transactions.
Args:
sqlite: Database connection
"""
try:
sqlite.execute('VACUUM')
_log('database VACUUM completed', 'info')
except Exception as e:
_log('database VACUUM failed: %s' % str(e), 'warn')
def get_database_stats(sqlite):
"""Get database statistics for monitoring.
Args:
sqlite: Database connection
Returns:
Dict with database statistics
"""
stats = {}
try:
row = sqlite.execute('PRAGMA page_count').fetchone()
stats['page_count'] = row[0] if row else 0
row = sqlite.execute('PRAGMA page_size').fetchone()
stats['page_size'] = row[0] if row else 4096
row = sqlite.execute('PRAGMA freelist_count').fetchone()
stats['freelist_count'] = row[0] if row else 0
# Calculate sizes
stats['total_size'] = stats['page_count'] * stats['page_size']
stats['free_size'] = stats['freelist_count'] * stats['page_size']
stats['used_size'] = stats['total_size'] - stats['free_size']
# Table row counts
row = sqlite.execute('SELECT COUNT(*) FROM proxylist').fetchone()
stats['proxy_count'] = row[0] if row else 0
row = sqlite.execute('SELECT COUNT(*) FROM proxylist WHERE failed=0').fetchone()
stats['working_count'] = row[0] if row else 0
row = sqlite.execute('SELECT COUNT(*) FROM uris').fetchone()
stats['uri_count'] = row[0] if row else 0
except Exception:
pass
return stats