diff --git a/dbs.py b/dbs.py index d9df0c4..aed53bd 100644 --- a/dbs.py +++ b/dbs.py @@ -50,7 +50,7 @@ def compute_proxy_list_hash(proxies): """Compute MD5 hash of sorted proxy list for change detection. Args: - proxies: List of proxy strings (ip:port format) + proxies: List of proxy strings (ip:port) or tuples (address, proto) Returns: Hexadecimal MD5 hash string, or None if list is empty @@ -58,7 +58,9 @@ def compute_proxy_list_hash(proxies): if not proxies: return None import hashlib - sorted_list = '\n'.join(sorted(proxies)) + # Handle both tuple (address, proto) and plain string formats + addresses = [p[0] if isinstance(p, tuple) else p for p in proxies] + sorted_list = '\n'.join(sorted(addresses)) return hashlib.md5(sorted_list.encode('utf-8') if hasattr(sorted_list, 'encode') else sorted_list).hexdigest() @@ -117,7 +119,10 @@ def update_proxy_anonymity(sqlite, proxy, exit_ip, proxy_ip, reveals_headers=Non parts = ip.strip().split('.') if len(parts) != 4: return None - return '.'.join(str(int(p)) for p in parts) + try: + return '.'.join(str(int(p)) for p in parts) + except ValueError: + return None exit_ip = normalize_ip(exit_ip) proxy_ip = normalize_ip(proxy_ip) @@ -192,22 +197,75 @@ def create_table_if_not_exists(sqlite, dbname): sqlite.execute('CREATE INDEX IF NOT EXISTS idx_uris_error ON uris(error)') sqlite.execute('CREATE INDEX IF NOT EXISTS idx_uris_checktime ON uris(check_time)') + elif dbname == 'stats_history': + # Hourly stats snapshots for historical graphs + sqlite.execute("""CREATE TABLE IF NOT EXISTS stats_history ( + timestamp INT PRIMARY KEY, + tested INT, + passed INT, + failed INT, + success_rate REAL, + avg_latency REAL, + ssl_tested INT, + ssl_passed INT, + mitm_detected INT, + proto_http INT, + proto_socks4 INT, + proto_socks5 INT)""") + sqlite.execute('CREATE INDEX IF NOT EXISTS idx_stats_history_ts ON stats_history(timestamp)') + + elif dbname == 'session_state': + # Single-row table for persisting session state across restarts + sqlite.execute("""CREATE TABLE IF NOT EXISTS session_state ( + id INT PRIMARY KEY DEFAULT 1, + tested INT, + passed INT, + failed INT, + ssl_tested INT, + ssl_passed INT, + ssl_failed INT, + mitm_detected INT, + cert_errors INT, + proto_http_tested INT, + proto_http_passed INT, + proto_socks4_tested INT, + proto_socks4_passed INT, + proto_socks5_tested INT, + proto_socks5_passed INT, + peak_rate REAL, + start_time INT, + last_save INT, + fail_categories TEXT, + country_passed TEXT, + asn_passed TEXT)""") + sqlite.commit() def insert_proxies(proxydb, proxies, url): - """Insert new proxies into database.""" + """Insert new proxies into database. + + Args: + proxydb: Database connection + proxies: List of (address, proto) tuples or plain address strings + url: Source URL for logging + """ if not proxies: return timestamp = int(time.time()) rows = [] for p in proxies: - ip, port = p.split(':') - rows.append((timestamp, p, ip, port, 3, 0, 0, 0, 0, 0)) + # Handle both tuple (address, proto) and plain string formats + if isinstance(p, tuple): + addr, proto = p + else: + addr, proto = p, None + ip, port = addr.split(':') + rows.append((timestamp, addr, ip, port, proto, 3, 0, 0, 0, 0, 0)) proxydb.executemany( 'INSERT OR IGNORE INTO proxylist ' - '(added,proxy,ip,port,failed,tested,success_count,total_duration,mitm,consecutive_success) ' - 'VALUES (?,?,?,?,?,?,?,?,?,?)', + '(added,proxy,ip,port,proto,failed,tested,success_count,total_duration,mitm,consecutive_success) ' + 'VALUES (?,?,?,?,?,?,?,?,?,?,?)', rows ) proxydb.commit() @@ -282,3 +340,139 @@ def seed_proxy_sources(sqlite): sqlite.commit() if added > 0: _log('seeded %d proxy source URLs' % added, 'info') + + +def save_session_state(sqlite, stats): + """Save session state to database for persistence across restarts. + + Args: + sqlite: Database connection + stats: Stats object from proxywatchd + """ + import json + now = int(time.time()) + + # Serialize dicts as JSON + fail_cats_json = json.dumps(dict(stats.fail_categories)) + country_json = json.dumps(dict(stats.country_passed)) + asn_json = json.dumps(dict(stats.asn_passed)) + + sqlite.execute('''INSERT OR REPLACE INTO session_state + (id, tested, passed, failed, ssl_tested, ssl_passed, ssl_failed, + mitm_detected, cert_errors, proto_http_tested, proto_http_passed, + proto_socks4_tested, proto_socks4_passed, proto_socks5_tested, proto_socks5_passed, + peak_rate, start_time, last_save, fail_categories, country_passed, asn_passed) + VALUES (1, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)''', + (stats.tested, stats.passed, stats.failed, + stats.ssl_tested, stats.ssl_passed, stats.ssl_failed, + stats.mitm_detected, stats.cert_errors, + stats.proto_tested.get('http', 0), stats.proto_passed.get('http', 0), + stats.proto_tested.get('socks4', 0), stats.proto_passed.get('socks4', 0), + stats.proto_tested.get('socks5', 0), stats.proto_passed.get('socks5', 0), + stats.peak_rate, int(stats.start_time), now, + fail_cats_json, country_json, asn_json)) + sqlite.commit() + + +def load_session_state(sqlite): + """Load session state from database. + + Args: + sqlite: Database connection + + Returns: + dict with state fields, or None if no saved state + """ + import json + try: + row = sqlite.execute( + 'SELECT * FROM session_state WHERE id=1' + ).fetchone() + if not row: + return None + + # Map column names to values + cols = ['id', 'tested', 'passed', 'failed', 'ssl_tested', 'ssl_passed', + 'ssl_failed', 'mitm_detected', 'cert_errors', + 'proto_http_tested', 'proto_http_passed', + 'proto_socks4_tested', 'proto_socks4_passed', + 'proto_socks5_tested', 'proto_socks5_passed', + 'peak_rate', 'start_time', 'last_save', + 'fail_categories', 'country_passed', 'asn_passed'] + state = dict(zip(cols, row)) + + # Parse JSON fields + if state.get('fail_categories'): + state['fail_categories'] = json.loads(state['fail_categories']) + else: + state['fail_categories'] = {} + if state.get('country_passed'): + state['country_passed'] = json.loads(state['country_passed']) + else: + state['country_passed'] = {} + if state.get('asn_passed'): + state['asn_passed'] = json.loads(state['asn_passed']) + else: + state['asn_passed'] = {} + + return state + except Exception as e: + _log('failed to load session state: %s' % str(e), 'warn') + return None + + +def save_stats_snapshot(sqlite, stats): + """Save hourly stats snapshot for historical graphs. + + Args: + sqlite: Database connection + stats: Stats object from proxywatchd + """ + now = int(time.time()) + # Round to nearest hour + hour_ts = (now // 3600) * 3600 + + success_rate = 0 + if stats.tested > 0: + success_rate = (stats.passed * 100.0) / stats.tested + + avg_latency = 0 + if stats.latency_count > 0: + avg_latency = stats.latency_sum / stats.latency_count + + sqlite.execute('''INSERT OR REPLACE INTO stats_history + (timestamp, tested, passed, failed, success_rate, avg_latency, + ssl_tested, ssl_passed, mitm_detected, proto_http, proto_socks4, proto_socks5) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)''', + (hour_ts, stats.tested, stats.passed, stats.failed, + success_rate, avg_latency, + stats.ssl_tested, stats.ssl_passed, stats.mitm_detected, + stats.proto_passed.get('http', 0), + stats.proto_passed.get('socks4', 0), + stats.proto_passed.get('socks5', 0))) + sqlite.commit() + + +def get_stats_history(sqlite, hours=24): + """Get historical stats for the last N hours. + + Args: + sqlite: Database connection + hours: Number of hours of history to retrieve + + Returns: + List of dicts with hourly stats + """ + now = int(time.time()) + since = now - (hours * 3600) + + rows = sqlite.execute( + 'SELECT * FROM stats_history WHERE timestamp >= ? ORDER BY timestamp', + (since,) + ).fetchall() + + cols = ['timestamp', 'tested', 'passed', 'failed', 'success_rate', + 'avg_latency', 'ssl_tested', 'ssl_passed', 'mitm_detected', + 'proto_http', 'proto_socks4', 'proto_socks5'] + + return [dict(zip(cols, row)) for row in rows]