Compare commits

...

10 Commits

Author SHA1 Message Date
Username
fab1e1d110 compose: rewrite master and worker compose files
Some checks failed
CI / syntax-check (pull_request) Failing after 0s
CI / syntax-check (push) Failing after 0s
CI / memory-leak-check (pull_request) Failing after 16s
CI / memory-leak-check (push) Successful in 16s
Drop deprecated version key, add SELinux volume labels, SIGTERM
handling with 30s grace period, configurable master URL via
PPF_MASTER_URL env var, and usage documentation in headers.
2026-02-17 18:37:49 +01:00
Username
716d60898b config: allow checktype = none to disable secondary check
Accepts none/false/off/disabled as checktype value, normalized to
'none' internally. When set, ssl_first is forced on and no Phase 2
check runs -- only successful TLS handshakes count as working.
2026-02-17 18:37:44 +01:00
Username
2e3ce149f9 watchd: tighten secondary check validation
- judge blocks record as neutral (judge_block category), not success;
  evaluate() filters them out so they affect neither pass nor fail count
- require HTTP/1.x response line for non-IRC checks; non-HTTP garbage
  (captive portals, proxy error pages) fails immediately
- add is_public_ip() rejecting RFC 1918, loopback, link-local, and
  multicast ranges from judge exit IP extraction
- remove 5 weak HEAD regex targets whose fingerprint headers appear on
  error pages and captive portals (p3p, X-XSS-Protection,
  x-frame-options, referrer-policy, X-UA-Compatible)
2026-02-17 18:37:38 +01:00
Username
1236ddbd2d add compose files for container management
Replace raw podman run with declarative compose.yml per host type.
Master (odin) gets compose.master.yml, workers get compose.worker.yml.
2026-02-17 18:17:12 +01:00
Username
0311abb46a fetch: encode unicode URLs to bytes before HTTP/SOCKS ops
When URLs arrive as unicode (e.g. from JSON API responses), the unicode
type propagates through _parse_url into the SOCKS5 packet construction
in rocksock. Port bytes > 127 formatted via %c in a unicode string
produce non-ASCII characters that fail on socket sendall() implicit
ASCII encode.

Encode URLs to UTF-8 bytes at fetch entry points to keep the entire
request pipeline in str (bytes) domain.
2026-02-17 16:43:26 +01:00
Username
e74782ad3f ppf: fix worker_id undefined when using --worker-key 2026-02-17 16:15:04 +01:00
Username
c710555aad ppf: pass url scoring config to httpd module 2026-02-17 15:20:15 +01:00
Username
c5287073bf httpd: add score-based url scheduling with EMA tracking
Replace ORDER BY RANDOM() in claim_urls with composite score:
age/interval ratio, yield bonus, quality bonus, error/stale penalties.

Rewrite submit_url_reports with adaptive check_interval and EMA for
avg_fetch_time and yield_rate. Add working_ratio correlation in
submit_proxy_reports via pending count tracking.
2026-02-17 15:20:07 +01:00
Username
66441f9292 dbs: add url scoring columns to uris table
Migration functions for check_interval, working_ratio, avg_fetch_time,
last_worker, and yield_rate columns with sensible defaults.
2026-02-17 15:19:59 +01:00
Username
862eeed5c8 ppf: add worker_v2_main() for URL-driven discovery 2026-02-17 14:23:58 +01:00
10 changed files with 792 additions and 72 deletions

30
compose.master.yml Normal file
View File

@@ -0,0 +1,30 @@
# PPF master node (odin)
#
# Scrapes proxy sources, runs verification, serves API/dashboard.
# No routine proxy testing -- workers handle that.
#
# Prerequisites:
# - config.ini (not tracked, host-specific)
# - data/ (created automatically)
#
# Usage:
# podman-compose -f compose.master.yml up -d
# podman-compose -f compose.master.yml logs -f
# podman-compose -f compose.master.yml down
services:
ppf:
container_name: ppf
image: localhost/ppf:latest
build: .
network_mode: host
restart: unless-stopped
stop_signal: SIGTERM
stop_grace_period: 30s
environment:
PYTHONUNBUFFERED: "1"
volumes:
- .:/app:ro,Z
- ./data:/app/data:Z
- ./config.ini:/app/config.ini:ro,Z
command: python -u ppf.py

38
compose.worker.yml Normal file
View File

@@ -0,0 +1,38 @@
# PPF worker node (cassius, edge, sentinel, ...)
#
# Tests proxies and reports results to master via WireGuard.
# Each worker uses only local Tor (127.0.0.1:9050).
#
# Prerequisites:
# - config.ini (not tracked, host-specific)
# - servers.txt (deploy from repo)
# - src/ (deploy *.py from repo root into src/)
# - data/ (created automatically)
#
# Usage:
# PPF_MASTER_URL=http://10.200.1.250:8081 podman-compose -f compose.worker.yml up -d
# podman-compose -f compose.worker.yml logs -f
# podman-compose -f compose.worker.yml down
#
# The master URL defaults to http://10.200.1.250:8081 (odin via WireGuard).
# Override with PPF_MASTER_URL env var or edit .env file.
services:
ppf-worker:
container_name: ppf-worker
image: localhost/ppf-worker:latest
build: .
network_mode: host
restart: unless-stopped
stop_signal: SIGTERM
stop_grace_period: 30s
logging:
driver: k8s-file
environment:
PYTHONUNBUFFERED: "1"
volumes:
- ./src:/app:ro,Z
- ./data:/app/data:Z
- ./config.ini:/app/config.ini:ro,Z
- ./servers.txt:/app/servers.txt:ro,Z
command: python -u ppf.py --worker-v2 --server ${PPF_MASTER_URL:-http://10.200.1.250:8081}

View File

@@ -11,7 +11,12 @@ class Config(ComboParser):
with open(self.watchd.source_file, 'r') as handle:
self.servers = [x.strip() for x in handle.readlines() if len(x.strip()) > 0]
# Parse checktypes as comma-separated list
self.watchd.checktypes = [t.strip() for t in self.watchd.checktype.split(',') if t.strip()]
# Normalize: 'false'/'off'/'disabled' -> 'none' (SSL-only mode)
raw_types = [t.strip().lower() for t in self.watchd.checktype.split(',') if t.strip()]
self.watchd.checktypes = ['none' if t in ('false', 'off', 'disabled') else t for t in raw_types]
# SSL-only mode: force ssl_first when secondary check is disabled
if self.watchd.checktypes == ['none']:
self.watchd.ssl_first = True
# Apply log level from CLI flags
if self.args.quiet:
set_log_level('warn')
@@ -52,12 +57,15 @@ class Config(ComboParser):
errors.append('ppf.max_fail must be >= 1')
# Validate checktypes (secondary check types, ssl is handled by ssl_first)
valid_checktypes = {'irc', 'head', 'judges'}
# 'none' = SSL-only mode (no secondary check)
valid_checktypes = {'irc', 'head', 'judges', 'none'}
for ct in self.watchd.checktypes:
if ct not in valid_checktypes:
errors.append('watchd.checktype "%s" invalid, must be one of: %s' % (ct, ', '.join(sorted(valid_checktypes))))
if not self.watchd.checktypes:
errors.append('watchd.checktype must specify at least one valid type')
if 'none' in self.watchd.checktypes and len(self.watchd.checktypes) > 1:
errors.append('watchd.checktype "none" cannot be combined with other types')
# Validate engine names
valid_engines = {'duckduckgo', 'startpage', 'brave', 'ecosia',
@@ -112,7 +120,7 @@ class Config(ComboParser):
self.add_item(section, 'stale_days', int, 30, 'days after which dead proxies are removed (default: 30)', False)
self.add_item(section, 'stats_interval', int, 300, 'seconds between status reports (default: 300)', False)
self.add_item(section, 'tor_safeguard', bool, True, 'enable tor safeguard (default: True)', False)
self.add_item(section, 'checktype', str, 'head', 'secondary check type: irc, head, judges (used when ssl_first fails)', False)
self.add_item(section, 'checktype', str, 'head', 'secondary check type: head, irc, judges, none/false (none = SSL-only)', False)
self.add_item(section, 'ssl_first', bool, True, 'try SSL handshake first, fallback to checktype on failure (default: True)', False)
self.add_item(section, 'ssl_only', bool, False, 'when ssl_first enabled, skip secondary check on SSL failure (default: False)', False)
self.add_item(section, 'scale_cooldown', int, 10, 'seconds between thread scaling decisions (default: 10)', False)

50
dbs.py
View File

@@ -98,6 +98,51 @@ def _migrate_last_seen(sqlite):
sqlite.commit()
def _migrate_uri_check_interval(sqlite):
"""Add adaptive check_interval column to uris table."""
try:
sqlite.execute('SELECT check_interval FROM uris LIMIT 1')
except Exception:
sqlite.execute('ALTER TABLE uris ADD COLUMN check_interval INT DEFAULT 3600')
sqlite.commit()
def _migrate_uri_working_ratio(sqlite):
"""Add working_ratio column to uris table for proxy quality tracking."""
try:
sqlite.execute('SELECT working_ratio FROM uris LIMIT 1')
except Exception:
sqlite.execute('ALTER TABLE uris ADD COLUMN working_ratio REAL DEFAULT 0.0')
sqlite.commit()
def _migrate_uri_avg_fetch_time(sqlite):
"""Add avg_fetch_time column to uris table for fetch latency EMA."""
try:
sqlite.execute('SELECT avg_fetch_time FROM uris LIMIT 1')
except Exception:
sqlite.execute('ALTER TABLE uris ADD COLUMN avg_fetch_time INT DEFAULT 0')
sqlite.commit()
def _migrate_uri_last_worker(sqlite):
"""Add last_worker column to uris table."""
try:
sqlite.execute('SELECT last_worker FROM uris LIMIT 1')
except Exception:
sqlite.execute('ALTER TABLE uris ADD COLUMN last_worker TEXT')
sqlite.commit()
def _migrate_uri_yield_rate(sqlite):
"""Add yield_rate column to uris table for proxy yield EMA."""
try:
sqlite.execute('SELECT yield_rate FROM uris LIMIT 1')
except Exception:
sqlite.execute('ALTER TABLE uris ADD COLUMN yield_rate REAL DEFAULT 0.0')
sqlite.commit()
def compute_proxy_list_hash(proxies):
"""Compute MD5 hash of sorted proxy list for change detection.
@@ -356,6 +401,11 @@ def create_table_if_not_exists(sqlite, dbname):
content_hash TEXT)""")
# Migration for existing databases
_migrate_content_hash_column(sqlite)
_migrate_uri_check_interval(sqlite)
_migrate_uri_working_ratio(sqlite)
_migrate_uri_avg_fetch_time(sqlite)
_migrate_uri_last_worker(sqlite)
_migrate_uri_yield_rate(sqlite)
# Indexes for common query patterns
sqlite.execute('CREATE INDEX IF NOT EXISTS idx_uris_error ON uris(error)')
sqlite.execute('CREATE INDEX IF NOT EXISTS idx_uris_checktime ON uris(check_time)')

View File

@@ -1,11 +0,0 @@
version: '3.8'
services:
ppf:
build: .
volumes:
- .:/app
working_dir: /app
command: python ppf.py
environment:
- PYTHONUNBUFFERED=1

View File

@@ -56,6 +56,8 @@ class FetchSession(object):
def fetch(self, url, head=False):
"""Fetch URL, reusing connection if possible."""
network_stats.set_category('scraper')
if isinstance(url, unicode):
url = url.encode('utf-8')
host, port, ssl, uri = _parse_url(url)
# Check if we can reuse existing connection
@@ -489,6 +491,8 @@ def fetch_contents(url, head=False, proxy=None):
retry_messages = ('Engines cannot retrieve results', 'Rate limit exceeded')
def _fetch_contents(url, head = False, proxy=None):
network_stats.set_category('scraper')
if isinstance(url, unicode):
url = url.encode('utf-8')
host, port, ssl, uri = _parse_url(url)
headers=[
'Accept-Language: en-US,en;q=0.8',

216
httpd.py
View File

@@ -85,6 +85,17 @@ _url_claims = {} # url -> {worker_id, claimed_at}
_url_claims_lock = threading.Lock()
_url_claim_timeout = 600 # 10 min (URLs take longer to fetch+extract)
# URL scoring: pending proxy counts for working_ratio correlation
_url_pending_counts = {} # url -> {total, worker_id, time}
_url_pending_lock = threading.Lock()
_url_database_path = None # set in ProxyAPIServer.__init__ for cross-db access
# URL scoring defaults (overridden by configure_url_scoring)
_url_checktime = 3600
_url_perfail_checktime = 3600
_url_max_fail = 10
_url_list_max_age_days = 7
# Test rate tracking: worker_id -> list of (timestamp, count) tuples
_worker_test_history = {}
_worker_test_history_lock = threading.Lock()
@@ -114,6 +125,15 @@ def configure_schedule(working_checktime, fail_retry_interval, fail_retry_backof
_max_fail = max_fail
def configure_url_scoring(checktime, perfail_checktime, max_fail, list_max_age_days):
"""Set URL scoring parameters from config."""
global _url_checktime, _url_perfail_checktime, _url_max_fail, _url_list_max_age_days
_url_checktime = checktime
_url_perfail_checktime = perfail_checktime
_url_max_fail = max_fail
_url_list_max_age_days = list_max_age_days
def _build_due_condition():
"""Build SQL condition for proxy due check.
@@ -431,7 +451,16 @@ def claim_work(db, worker_id, count=100):
return claimed
def claim_urls(url_db, worker_id, count=5):
"""Claim a batch of URLs for worker-driven fetching. Returns list of URL dicts."""
"""Claim a batch of URLs for worker-driven fetching. Returns list of URL dicts.
Uses score-based scheduling: high-yield URLs checked more often,
stale/broken ones less. Score components:
- age/interval: 1.0 when due, >1.0 when overdue
- yield_bonus: capped at 1.0 for high-yield sources
- quality_bonus: 0-0.5 based on working_ratio
- error_penalty: 0-2.0 based on consecutive errors
- stale_penalty: 0-1.0 based on unchanged fetches
"""
now = time.time()
now_int = int(now)
@@ -441,36 +470,37 @@ def claim_urls(url_db, worker_id, count=5):
except ImportError:
detect_proto_from_path = None
# Clean expired URL claims
# Clean expired URL claims and pending counts
with _url_claims_lock:
stale = [k for k, v in _url_claims.items() if now - v['claimed_at'] > _url_claim_timeout]
for k in stale:
del _url_claims[k]
claimed_urls = set(_url_claims.keys())
# Reuse the same scheduling formula as ppf.py main loop (line 800-805):
# WHERE error < max_fail
# AND (check_time + checktime + ((error + stale_count) * perfail_checktime) < now)
# AND (added > min_added OR proxies_added > 0)
# Use defaults matching config.ppf: checktime=3600, perfail_checktime=3600, max_fail=10
# list_max_age_days=30
checktime = 3600
perfail_checktime = 3600
max_fail = 10
list_max_age_seconds = 30 * 86400
with _url_pending_lock:
stale_pending = [k for k, v in _url_pending_counts.items() if now - v['time'] > 600]
for k in stale_pending:
del _url_pending_counts[k]
list_max_age_seconds = _url_list_max_age_days * 86400
min_added = now_int - list_max_age_seconds
try:
rows = url_db.execute(
'''SELECT url, content_hash, check_time, error, stale_count,
retrievals, proxies_added
FROM uris
WHERE error < ?
AND (check_time + ? + ((error + stale_count) * ?) < ?)
AND (added > ? OR proxies_added > 0)
ORDER BY RANDOM()
LIMIT ?''',
(max_fail, checktime, perfail_checktime, now_int, min_added, count * 3)
'''SELECT url, content_hash,
(? - check_time) * 1.0 / MAX(COALESCE(check_interval, 3600), 1)
+ MIN(COALESCE(yield_rate, 0) / 100.0, 1.0)
+ COALESCE(working_ratio, 0) * 0.5
- MIN(error * 0.3, 2.0)
- MIN(stale_count * 0.1, 1.0)
AS score
FROM uris
WHERE error < ?
AND (? - check_time) * 1.0 / MAX(COALESCE(check_interval, 3600), 1) >= 0.8
AND (added > ? OR proxies_added > 0)
ORDER BY score DESC
LIMIT ?''',
(now_int, _url_max_fail, now_int, min_added, count * 3)
).fetchall()
except Exception as e:
_log('claim_urls query error: %s' % e, 'error')
@@ -504,9 +534,19 @@ def claim_urls(url_db, worker_id, count=5):
def submit_url_reports(url_db, worker_id, reports):
"""Process URL fetch feedback from workers. Returns count of processed reports."""
"""Process URL fetch feedback from workers. Returns count of processed reports.
Updates EMA metrics per URL:
- avg_fetch_time: exponential moving average of fetch latency
- check_interval: adaptive interval (shrinks for productive URLs, grows for stale)
- yield_rate: EMA of proxy count per fetch (on changed content)
- last_worker: worker that last fetched this URL
Stores pending proxy count for working_ratio correlation in submit_proxy_reports.
"""
processed = 0
now_int = int(time.time())
alpha = 0.3 # EMA smoothing factor
for r in reports:
url = r.get('url', '')
@@ -523,10 +563,36 @@ def submit_url_reports(url_db, worker_id, reports):
content_hash = r.get('content_hash')
proxy_count = r.get('proxy_count', 0)
changed = r.get('changed', False)
fetch_time_ms = r.get('fetch_time_ms', 0)
# Fetch current row for EMA computation
row = url_db.execute(
'''SELECT check_interval, avg_fetch_time, yield_rate
FROM uris WHERE url = ?''', (url,)
).fetchone()
if not row:
processed += 1
continue
old_interval = row[0] if row[0] is not None else 3600
old_fetch_time = row[1] if row[1] is not None else 0
old_yield = row[2] if row[2] is not None else 0.0
# EMA: avg_fetch_time
if old_fetch_time > 0 and fetch_time_ms > 0:
new_fetch_time = int(alpha * fetch_time_ms + (1 - alpha) * old_fetch_time)
elif fetch_time_ms > 0:
new_fetch_time = fetch_time_ms
else:
new_fetch_time = old_fetch_time
if success:
if changed:
# Content changed: reset stale_count, update hash, add proxy count
if changed and proxy_count > 0:
# Success + changed + proxies: converge interval toward 15min
new_interval = max(900, int(old_interval * 0.9))
# EMA: yield_rate
new_yield = alpha * proxy_count + (1 - alpha) * old_yield
url_db.execute(
'''UPDATE uris SET
check_time = ?,
@@ -534,30 +600,54 @@ def submit_url_reports(url_db, worker_id, reports):
error = 0,
stale_count = 0,
content_hash = ?,
proxies_added = proxies_added + ?
proxies_added = proxies_added + ?,
check_interval = ?,
avg_fetch_time = ?,
yield_rate = ?,
last_worker = ?
WHERE url = ?''',
(now_int, content_hash, proxy_count, url)
(now_int, content_hash, proxy_count, new_interval,
new_fetch_time, new_yield, worker_id, url)
)
# Store pending count for working_ratio correlation
with _url_pending_lock:
_url_pending_counts[url] = {
'total': proxy_count,
'worker_id': worker_id,
'time': time.time(),
}
else:
# Content unchanged: increment stale_count
# Success + unchanged (or no proxies): drift interval toward 24h
new_interval = min(86400, int(old_interval * 1.25))
url_db.execute(
'''UPDATE uris SET
check_time = ?,
retrievals = retrievals + 1,
error = 0,
stale_count = stale_count + 1,
content_hash = ?
content_hash = ?,
check_interval = ?,
avg_fetch_time = ?,
last_worker = ?
WHERE url = ?''',
(now_int, content_hash, url)
(now_int, content_hash, new_interval,
new_fetch_time, worker_id, url)
)
else:
# Fetch failed: increment error count
# Failure: back off faster
new_interval = min(86400, int(old_interval * 1.5))
url_db.execute(
'''UPDATE uris SET
check_time = ?,
error = error + 1
error = error + 1,
check_interval = ?,
avg_fetch_time = ?,
last_worker = ?
WHERE url = ?''',
(now_int, url)
(now_int, new_interval, new_fetch_time, worker_id, url)
)
processed += 1
@@ -568,16 +658,68 @@ def submit_url_reports(url_db, worker_id, reports):
return processed
def _update_url_working_ratios(url_working_counts):
"""Correlate working proxy counts with pending totals to update working_ratio.
Called after submit_proxy_reports processes all proxies. For each source_url
with a pending entry from submit_url_reports, computes:
ratio = working_count / pending_total
working_ratio = alpha * ratio + (1 - alpha) * old_working_ratio
"""
if not url_working_counts or not _url_database_path:
return
alpha = 0.3
settled = []
with _url_pending_lock:
pending_snapshot = dict(_url_pending_counts)
try:
url_db = mysqlite.mysqlite(_url_database_path, str)
for url, working_count in url_working_counts.items():
pending = pending_snapshot.get(url)
if not pending or pending['total'] <= 0:
continue
ratio = min(float(working_count) / pending['total'], 1.0)
row = url_db.execute(
'SELECT working_ratio FROM uris WHERE url = ?', (url,)
).fetchone()
old_ratio = row[0] if row and row[0] is not None else 0.0
new_ratio = alpha * ratio + (1 - alpha) * old_ratio
url_db.execute(
'UPDATE uris SET working_ratio = ? WHERE url = ?',
(new_ratio, url)
)
settled.append(url)
url_db.commit()
url_db.close()
except Exception as e:
_log('_update_url_working_ratios error: %s' % e, 'error')
# Remove settled entries from pending
if settled:
with _url_pending_lock:
for url in settled:
_url_pending_counts.pop(url, None)
def submit_proxy_reports(db, worker_id, proxies):
"""Process working-proxy reports from workers. Returns count of processed proxies.
Simplified trust-based model: workers report only working proxies.
Each proxy is upserted with failed=0, last_seen=now, latency updated.
Also tracks per-URL working counts for working_ratio correlation.
"""
global _last_workers_save
processed = 0
now_int = int(time.time())
now = time.time()
url_working_counts = {} # source_url -> working count
for p in proxies:
ip = p.get('ip', '')
@@ -615,6 +757,10 @@ def submit_proxy_reports(db, worker_id, proxies):
except Exception:
pass
# Track per-URL working count for working_ratio
if source_url:
url_working_counts[source_url] = url_working_counts.get(source_url, 0) + 1
processed += 1
except Exception as e:
_log('submit_proxy_reports error for %s: %s' % (proxy_key, e), 'error')
@@ -622,6 +768,10 @@ def submit_proxy_reports(db, worker_id, proxies):
# Commit database changes
db.commit()
# Update working_ratio for source URLs
if url_working_counts:
_update_url_working_ratios(url_working_counts)
# Update worker stats
with _workers_lock:
if worker_id in _workers:
@@ -1398,6 +1548,8 @@ class ProxyAPIServer(threading.Thread):
self.stats_provider = stats_provider
self.profiling = profiling
self.daemon = True
global _url_database_path
_url_database_path = url_database
self.server = None
self._stop_event = threading.Event() if not GEVENT_PATCHED else None
# Load static library files into cache

422
ppf.py
View File

@@ -493,6 +493,7 @@ def worker_main(config):
worker_name = config.args.worker_name or os.uname()[1]
batch_size = config.worker.batch_size
num_threads = config.watchd.threads
worker_id = None
# Register if --register flag or no key provided
if config.args.register or not worker_key:
@@ -772,6 +773,412 @@ def worker_main(config):
_log(' proxies tested: %d' % proxies_tested, 'info')
def worker_v2_main(config):
"""V2 worker mode -- URL-driven discovery.
Claims URLs from master, fetches through Tor, extracts and tests proxies,
reports working proxies back to master.
"""
import json
global urllib2
try:
import Queue
except ImportError:
import queue as Queue
import proxywatchd
proxywatchd.set_config(config)
server_url = config.args.server
if not server_url:
_log('--server URL required for worker mode', 'error')
sys.exit(1)
worker_key = config.args.worker_key
worker_name = config.args.worker_name or os.uname()[1]
num_threads = config.watchd.threads
url_batch_size = config.worker.url_batch_size
worker_id = None
# Register if --register flag or no key provided
if config.args.register or not worker_key:
_log('registering with master: %s' % server_url, 'info')
worker_id, worker_key = worker_register(server_url, worker_name)
if not worker_key:
_log('registration failed, exiting', 'error')
sys.exit(1)
_log('registered as %s (id: %s)' % (worker_name, worker_id), 'info')
_log('worker key: %s' % worker_key, 'info')
_log('save this key with --worker-key for future runs', 'info')
if config.args.register:
return
_log('starting worker V2 mode (URL-driven)', 'info')
_log(' server: %s' % server_url, 'info')
_log(' threads: %d' % num_threads, 'info')
_log(' url batch: %d' % url_batch_size, 'info')
_log(' tor hosts: %s' % config.common.tor_hosts, 'info')
# Verify Tor connectivity before starting
import socks
working_tor_hosts = []
for tor_host in config.torhosts:
host, port = tor_host.split(':')
port = int(port)
try:
test_sock = socks.socksocket()
test_sock.set_proxy(socks.SOCKS5, host, port)
test_sock.settimeout(10)
test_sock.connect(('check.torproject.org', 80))
test_sock.send(b'GET / HTTP/1.0\r\nHost: check.torproject.org\r\n\r\n')
resp = test_sock.recv(512)
test_sock.close()
if resp and (b'HTTP/' in resp or len(resp) > 0):
status = resp.split(b'\r\n')[0] if b'\r\n' in resp else resp[:50]
_log('tor host %s:%d OK (%s)' % (host, port, status), 'info')
working_tor_hosts.append(tor_host)
else:
_log('tor host %s:%d no response' % (host, port), 'warn')
except Exception as e:
_log('tor host %s:%d failed: %s' % (host, port, e), 'warn')
if not working_tor_hosts:
_log('no working Tor hosts, cannot start worker', 'error')
sys.exit(1)
_log('%d/%d Tor hosts verified' % (len(working_tor_hosts), len(config.torhosts)), 'info')
# Create shared queues for worker threads
job_queue = proxywatchd.PriorityJobQueue()
completion_queue = Queue.Queue()
# Spawn worker threads
threads = []
for i in range(num_threads):
wt = proxywatchd.WorkerThread('w%d' % i, job_queue)
wt.start_thread()
threads.append(wt)
time.sleep(random.random() / 10)
_log('spawned %d worker threads' % len(threads), 'info')
# Session for fetching URLs through Tor
session = fetch.FetchSession()
cycles = 0
urls_fetched = 0
proxies_found = 0
proxies_working = 0
start_time = time.time()
current_tor_ip = None
consecutive_tor_failures = 0
worker_profiling = config.args.profile or config.common.profiling
wstate = {'worker_key': worker_key, 'worker_id': worker_id, 'backoff': 10}
def do_register():
"""Register with master, with exponential backoff on failure."""
while True:
_log('registering with master: %s' % server_url, 'info')
new_id, new_key = worker_register(server_url, worker_name)
if new_key:
wstate['worker_id'] = new_id
wstate['worker_key'] = new_key
wstate['backoff'] = 10
_log('registered as %s (id: %s)' % (worker_name, new_id), 'info')
return True
else:
_log('registration failed, retrying in %ds' % wstate['backoff'], 'warn')
time.sleep(wstate['backoff'])
wstate['backoff'] = min(wstate['backoff'] * 2, 300)
def wait_for_tor():
"""Wait for Tor to become available, checking every 30 seconds."""
check_interval = 30
while True:
working, tor_ip = check_tor_connectivity(config.torhosts)
if working:
_log('tor recovered: %s (%s)' % (working[0], tor_ip or 'unknown'), 'info')
try:
worker_send_heartbeat(server_url, wstate['worker_key'], True, tor_ip, worker_profiling, num_threads)
except NeedReregister:
do_register()
return working, tor_ip
_log('tor still down, retrying in %ds' % check_interval, 'warn')
try:
worker_send_heartbeat(server_url, wstate['worker_key'], False, None, worker_profiling, num_threads)
except NeedReregister:
do_register()
time.sleep(check_interval)
try:
while True:
# Tor connectivity check
working, tor_ip = check_tor_connectivity(config.torhosts)
if not working:
consecutive_tor_failures += 1
_log('tor down before claiming URLs (consecutive: %d)' % consecutive_tor_failures, 'warn')
try:
worker_send_heartbeat(server_url, wstate['worker_key'], False, None, worker_profiling, num_threads)
except NeedReregister:
do_register()
if consecutive_tor_failures >= 2:
_log('tor appears down, waiting before claiming URLs', 'error')
working, current_tor_ip = wait_for_tor()
consecutive_tor_failures = 0
else:
time.sleep(10)
continue
else:
consecutive_tor_failures = 0
if tor_ip != current_tor_ip:
if current_tor_ip:
_log('tor circuit rotated: %s' % tor_ip, 'info')
current_tor_ip = tor_ip
try:
worker_send_heartbeat(server_url, wstate['worker_key'], True, tor_ip, worker_profiling, num_threads)
except NeedReregister:
do_register()
# Claim URLs from master
try:
url_infos = worker_claim_urls(server_url, wstate['worker_key'], url_batch_size)
except NeedReregister:
do_register()
continue
if not url_infos:
_log('no URLs available, sleeping 30s', 'info')
time.sleep(30)
continue
_log('claimed %d URLs to process' % len(url_infos), 'info')
# Phase 1: Fetch URLs and extract proxies
url_reports = []
all_extracted = [] # list of (addr, proto, confidence, source_url)
for url_info in url_infos:
url = url_info.get('url', '')
last_hash = url_info.get('last_hash')
proto_hint = url_info.get('proto_hint')
fetch_start = time.time()
try:
content = session.fetch(url)
except Exception as e:
_log('%s: fetch error: %s' % (url.split('/')[2] if '/' in url else url, e), 'error')
content = None
fetch_time_ms = int((time.time() - fetch_start) * 1000)
urls_fetched += 1
if not content:
url_reports.append({
'url': url,
'success': False,
'content_hash': None,
'proxy_count': 0,
'fetch_time_ms': fetch_time_ms,
'changed': False,
'error': 'fetch failed',
})
continue
# Detect protocol from URL path
proto = fetch.detect_proto_from_path(url) or proto_hint
# Extract proxies (no filter_known -- workers have no proxydb)
extracted = fetch.extract_proxies(content, filter_known=False, proto=proto)
# Compute hash of extracted proxy list
content_hash = dbs.compute_proxy_list_hash(extracted)
if content_hash and last_hash and content_hash == last_hash:
# Content unchanged
url_reports.append({
'url': url,
'success': True,
'content_hash': content_hash,
'proxy_count': len(extracted),
'fetch_time_ms': fetch_time_ms,
'changed': False,
'error': None,
})
host = url.split('/')[2] if '/' in url else url
_log('%s: unchanged (%d proxies, hash match)' % (host, len(extracted)), 'stale')
continue
# Content changed or first fetch
for addr, pr, conf in extracted:
all_extracted.append((addr, pr, conf, url))
url_reports.append({
'url': url,
'success': True,
'content_hash': content_hash,
'proxy_count': len(extracted),
'fetch_time_ms': fetch_time_ms,
'changed': True,
'error': None,
})
host = url.split('/')[2] if '/' in url else url
_log('%s: %d proxies extracted' % (host, len(extracted)), 'info')
# Report URL health to master
if url_reports:
try:
worker_report_urls(server_url, wstate['worker_key'], url_reports)
except NeedReregister:
do_register()
try:
worker_report_urls(server_url, wstate['worker_key'], url_reports)
except NeedReregister:
_log('still rejected after re-register, discarding url reports', 'error')
# Deduplicate extracted proxies by address
seen = set()
unique_proxies = []
source_map = {} # addr -> first source_url
for addr, pr, conf, source_url in all_extracted:
if addr not in seen:
seen.add(addr)
unique_proxies.append((addr, pr, conf))
source_map[addr] = source_url
proxies_found += len(unique_proxies)
if not unique_proxies:
cycles += 1
time.sleep(1)
continue
_log('testing %d unique proxies' % len(unique_proxies), 'info')
# Phase 2: Test extracted proxies using worker thread pool
pending_states = {}
all_jobs = []
checktypes = config.watchd.checktypes
for addr, pr, conf in unique_proxies:
# Parse ip:port from addr (may contain auth: user:pass@ip:port)
addr_part = addr.split('@')[-1] if '@' in addr else addr
# Handle IPv6 [ipv6]:port
if addr_part.startswith('['):
bracket_end = addr_part.index(']')
ip = addr_part[1:bracket_end]
port = int(addr_part[bracket_end+2:])
else:
ip, port_str = addr_part.rsplit(':', 1)
port = int(port_str)
proto = pr or 'http'
proxy_str = '%s:%d' % (ip, port)
state = proxywatchd.ProxyTestState(
ip, port, proto, 0,
success_count=0, total_duration=0.0,
country=None, mitm=0, consecutive_success=0,
asn=None, oldies=False,
completion_queue=completion_queue,
proxy_full=addr, source_proto=pr
)
pending_states[proxy_str] = state
checktype = random.choice(checktypes)
if checktype == 'judges':
available = proxywatchd.judge_stats.get_available_judges(
list(proxywatchd.judges.keys()))
target = random.choice(available) if available else random.choice(
list(proxywatchd.judges.keys()))
elif checktype == 'ssl':
target = random.choice(proxywatchd.ssl_targets)
elif checktype == 'irc':
target = random.choice(config.servers) if config.servers else 'irc.libera.chat:6667'
else: # head
target = random.choice(list(proxywatchd.regexes.keys()))
job = proxywatchd.TargetTestJob(state, target, checktype)
all_jobs.append(job)
random.shuffle(all_jobs)
for job in all_jobs:
job_queue.put(job, priority=0)
# Wait for completion
completed = 0
timeout_start = time.time()
timeout_seconds = max(config.watchd.timeout * 2 + 30, len(all_jobs) * 0.5)
working_results = []
while completed < len(all_jobs):
try:
state = completion_queue.get(timeout=1)
completed += 1
if state.failcount == 0:
latency_sec = (state.last_latency_ms / 1000.0) if state.last_latency_ms else 0
proxy_addr = state.proxy
if state.auth:
proxy_addr = '%s@%s' % (state.auth, state.proxy)
working_results.append({
'ip': state.ip,
'port': state.port,
'proto': state.proto,
'source_proto': state.source_proto,
'latency': round(latency_sec, 3),
'exit_ip': state.exit_ip,
'source_url': source_map.get(proxy_addr) or source_map.get(state.proxy, ''),
})
if completed % 50 == 0 or completed == len(all_jobs):
_log('tested %d/%d proxies (%d working)' % (
completed, len(all_jobs), len(working_results)), 'info')
except Queue.Empty:
if time.time() - timeout_start > timeout_seconds:
_log('test timeout, %d/%d completed' % (completed, len(all_jobs)), 'warn')
break
continue
proxies_working += len(working_results)
# Report working proxies to master
if working_results:
try:
processed = worker_report_proxies(server_url, wstate['worker_key'], working_results)
except NeedReregister:
do_register()
try:
processed = worker_report_proxies(server_url, wstate['worker_key'], working_results)
except NeedReregister:
_log('still rejected after re-register, discarding proxy reports', 'error')
processed = 0
_log('reported %d working proxies (submitted %d)' % (len(working_results), processed), 'info')
cycles += 1
time.sleep(1)
except KeyboardInterrupt:
elapsed = time.time() - start_time
_log('worker V2 stopping...', 'info')
session.close()
for wt in threads:
wt.stop()
for wt in threads:
wt.term()
_log('worker V2 stopped after %s' % format_duration(int(elapsed)), 'info')
_log(' cycles: %d' % cycles, 'info')
_log(' urls fetched: %d' % urls_fetched, 'info')
_log(' proxies found: %d' % proxies_found, 'info')
_log(' proxies working: %d' % proxies_working, 'info')
def main():
"""Main entry point."""
global config
@@ -784,7 +1191,12 @@ def main():
else:
sys.exit(1)
# Worker mode: connect to master server instead of running locally
# V2 worker mode: URL-driven discovery
if config.args.worker_v2:
worker_v2_main(config)
return
# V1 worker mode: connect to master server instead of running locally
if config.args.worker or config.args.register:
worker_main(config)
return
@@ -813,8 +1225,14 @@ def main():
watcherd = None
# Start httpd independently when watchd is disabled
if config.httpd.enabled:
from httpd import ProxyAPIServer
from httpd import ProxyAPIServer, configure_url_scoring
import network_stats
configure_url_scoring(
config.ppf.checktime,
config.ppf.perfail_checktime,
config.ppf.max_fail,
config.ppf.list_max_age_days
)
def httpd_stats_provider():
"""Stats provider for httpd-only mode (scraping without testing)."""

View File

@@ -142,6 +142,20 @@ def is_valid_ip(ip_str):
except (ValueError, AttributeError):
return False
def is_public_ip(ip_str):
"""Validate IP is a public, globally routable address."""
if not is_valid_ip(ip_str):
return False
parts = [int(p) for p in ip_str.split('.')]
if parts[0] == 0: return False # 0.0.0.0/8
if parts[0] == 10: return False # 10.0.0.0/8
if parts[0] == 127: return False # 127.0.0.0/8
if parts[0] == 169 and parts[1] == 254: return False # link-local
if parts[0] == 172 and 16 <= parts[1] <= 31: return False # 172.16/12
if parts[0] == 192 and parts[1] == 168: return False # 192.168/16
if parts[0] >= 224: return False # multicast + reserved
return True
# Pattern for header echo - if X-Forwarded-For or Via present, proxy reveals chain
HEADER_REVEAL_PATTERN = r'(X-Forwarded-For|Via|X-Real-Ip|Forwarded)'
@@ -393,10 +407,20 @@ class ProxyTestState(object):
self.evaluated = True
self.checktime = int(time.time())
successes = [r for r in self.results if r['success']]
failures = [r for r in self.results if not r['success']]
# Filter out judge_block results (inconclusive, neither pass nor fail)
real_results = [r for r in self.results if r.get('category') != 'judge_block']
successes = [r for r in real_results if r['success']]
failures = [r for r in real_results if not r['success']]
num_success = len(successes)
_dbg('evaluate: %d success, %d fail, results=%d' % (num_success, len(failures), len(self.results)), self.proxy)
judge_blocks = len(self.results) - len(real_results)
_dbg('evaluate: %d success, %d fail, %d judge_block, results=%d' % (
num_success, len(failures), judge_blocks, len(self.results)), self.proxy)
# All results were judge blocks: inconclusive, preserve current state
if not real_results and self.results:
_dbg('all results inconclusive (judge_block), no state change', self.proxy)
self.failcount = self.original_failcount
return (self.original_failcount == 0, None)
# Determine dominant failure category
fail_category = None
@@ -547,6 +571,12 @@ class TargetTestJob(object):
recv = sock.recv(-1)
_sample_dbg('RECV: %d bytes from %s, first 80: %r' % (len(recv), srv, recv[:80]), self.proxy_state.proxy)
# Validate HTTP response for non-IRC checks
if self.checktype != 'irc' and not recv.startswith('HTTP/'):
_dbg('not an HTTP response, failing (first 40: %r)' % recv[:40], self.proxy_state.proxy)
self.proxy_state.record_result(False, category='bad_response')
return
# Select regex based on check type (or fallback target)
if 'check.torproject.org' in srv:
# Tor API fallback (judge using torproject.org)
@@ -571,7 +601,7 @@ class TargetTestJob(object):
reveals_headers = None
if self.checktype == 'judges' or 'check.torproject.org' in srv:
ip_match = re.search(IP_PATTERN, recv)
if ip_match and is_valid_ip(ip_match.group(0)):
if ip_match and is_public_ip(ip_match.group(0)):
exit_ip = ip_match.group(0)
if self.checktype == 'judges' and 'check.torproject.org' not in srv:
# Check for header echo judge (elite detection)
@@ -590,17 +620,14 @@ class TargetTestJob(object):
# Check if judge is blocking us (not a proxy failure)
if self.checktype == 'judges' and JUDGE_BLOCK_RE.search(recv):
judge_stats.record_block(srv)
# Judge block = proxy worked, we got HTTP response, just no IP
# Count as success without exit_ip
block_elapsed = time.time() - duration
_dbg('judge BLOCK detected, counting as success', self.proxy_state.proxy)
# Judge block = inconclusive, not a pass or fail
_dbg('judge BLOCK detected, skipping (neutral)', self.proxy_state.proxy)
self.proxy_state.record_result(
True, proto=proto, duration=block_elapsed,
srv=srv, tor=tor, ssl=is_ssl, exit_ip=None,
reveals_headers=None
False, category='judge_block', proto=proto,
srv=srv, tor=tor, ssl=is_ssl
)
if config.watchd.debug:
_log('judge %s challenged proxy %s (counted as success)' % (
_log('judge %s challenged proxy %s (neutral, skipped)' % (
srv, self.proxy_state.proxy), 'debug')
else:
_dbg('FAIL: no match, no block', self.proxy_state.proxy)
@@ -675,15 +702,14 @@ class TargetTestJob(object):
protos = self._build_proto_order()
pool = connection_pool.get_pool()
# Phase 1: SSL handshake (if ssl_first enabled)
if config.watchd.ssl_first:
# Phase 1: SSL handshake (if ssl_first enabled or SSL-only mode)
if config.watchd.ssl_first or self.checktype == 'none':
result = self._try_ssl_handshake(protos, pool)
if result is not None:
return result # SSL succeeded or MITM detected
# SSL failed for all protocols
if config.watchd.ssl_only:
# ssl_only mode: skip secondary check, mark as failed
_dbg('SSL failed, ssl_only mode, skipping secondary check', ps.proxy)
if config.watchd.ssl_only or self.checktype == 'none':
_dbg('SSL failed, no secondary check', ps.proxy)
return (None, None, 0, None, None, 1, 0, 'ssl_only')
_dbg('SSL failed, trying secondary check: %s' % self.checktype, ps.proxy)
@@ -1357,7 +1383,11 @@ class Proxywatchd():
# Build target pools for each checktype
target_pools = {}
for ct in checktypes:
if ct == 'irc':
if ct == 'none':
# SSL-only mode: use ssl_targets as placeholder
target_pools[ct] = ssl_targets
_dbg('target_pool[none]: SSL-only mode, %d ssl targets' % len(ssl_targets))
elif ct == 'irc':
target_pools[ct] = config.servers
_dbg('target_pool[irc]: %d servers' % len(config.servers))
elif ct == 'judges':
@@ -1752,7 +1782,7 @@ class Proxywatchd():
# Start HTTP API server if enabled
if config.httpd.enabled:
from httpd import ProxyAPIServer, configure_schedule
from httpd import ProxyAPIServer, configure_schedule, configure_url_scoring
# Pass schedule config to httpd module
configure_schedule(
config.watchd.working_checktime,
@@ -1760,6 +1790,12 @@ class Proxywatchd():
config.watchd.fail_retry_backoff,
config.watchd.max_fail
)
configure_url_scoring(
config.ppf.checktime,
config.ppf.perfail_checktime,
config.ppf.max_fail,
config.ppf.list_max_age_days
)
self.httpd_server = ProxyAPIServer(
config.httpd.listenip,
config.httpd.port,

View File

@@ -107,11 +107,9 @@ regexes = {
'www.twitter.com': 'x-connection-hash',
't.co': 'x-connection-hash',
'www.msn.com': 'x-aspnetmvc-version',
'www.bing.com': 'p3p',
'www.ask.com': 'x-served-by',
'www.hotmail.com': 'x-msedge-ref',
'www.bbc.co.uk': 'x-bbc-edge-cache-status',
'www.skype.com': 'X-XSS-Protection',
'www.alibaba.com': 'object-status',
'www.mozilla.org': 'cf-ray',
'www.cloudflare.com': 'cf-ray',
@@ -121,7 +119,6 @@ regexes = {
'www.netflix.com': 'X-Netflix.proxy.execution-time',
'www.amazon.de': 'x-amz-cf-id',
'www.reuters.com': 'x-amz-cf-id',
'www.ikea.com': 'x-frame-options',
'www.twitpic.com': 'timing-allow-origin',
'www.digg.com': 'cf-request-id',
'www.wikia.com': 'x-served-by',
@@ -133,8 +130,6 @@ regexes = {
'www.yelp.com': 'x-timer',
'www.ebay.com': 'x-envoy-upstream-service-time',
'www.wikihow.com': 'x-c',
'www.archive.org': 'referrer-policy',
'www.pandora.tv': 'X-UA-Compatible',
'www.w3.org': 'x-backend',
'www.time.com': 'x-amz-cf-pop'
}