Compare commits
10 Commits
0685c2bc4c
...
fab1e1d110
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
fab1e1d110 | ||
|
|
716d60898b | ||
|
|
2e3ce149f9 | ||
|
|
1236ddbd2d | ||
|
|
0311abb46a | ||
|
|
e74782ad3f | ||
|
|
c710555aad | ||
|
|
c5287073bf | ||
|
|
66441f9292 | ||
|
|
862eeed5c8 |
30
compose.master.yml
Normal file
30
compose.master.yml
Normal file
@@ -0,0 +1,30 @@
|
||||
# PPF master node (odin)
|
||||
#
|
||||
# Scrapes proxy sources, runs verification, serves API/dashboard.
|
||||
# No routine proxy testing -- workers handle that.
|
||||
#
|
||||
# Prerequisites:
|
||||
# - config.ini (not tracked, host-specific)
|
||||
# - data/ (created automatically)
|
||||
#
|
||||
# Usage:
|
||||
# podman-compose -f compose.master.yml up -d
|
||||
# podman-compose -f compose.master.yml logs -f
|
||||
# podman-compose -f compose.master.yml down
|
||||
|
||||
services:
|
||||
ppf:
|
||||
container_name: ppf
|
||||
image: localhost/ppf:latest
|
||||
build: .
|
||||
network_mode: host
|
||||
restart: unless-stopped
|
||||
stop_signal: SIGTERM
|
||||
stop_grace_period: 30s
|
||||
environment:
|
||||
PYTHONUNBUFFERED: "1"
|
||||
volumes:
|
||||
- .:/app:ro,Z
|
||||
- ./data:/app/data:Z
|
||||
- ./config.ini:/app/config.ini:ro,Z
|
||||
command: python -u ppf.py
|
||||
38
compose.worker.yml
Normal file
38
compose.worker.yml
Normal file
@@ -0,0 +1,38 @@
|
||||
# PPF worker node (cassius, edge, sentinel, ...)
|
||||
#
|
||||
# Tests proxies and reports results to master via WireGuard.
|
||||
# Each worker uses only local Tor (127.0.0.1:9050).
|
||||
#
|
||||
# Prerequisites:
|
||||
# - config.ini (not tracked, host-specific)
|
||||
# - servers.txt (deploy from repo)
|
||||
# - src/ (deploy *.py from repo root into src/)
|
||||
# - data/ (created automatically)
|
||||
#
|
||||
# Usage:
|
||||
# PPF_MASTER_URL=http://10.200.1.250:8081 podman-compose -f compose.worker.yml up -d
|
||||
# podman-compose -f compose.worker.yml logs -f
|
||||
# podman-compose -f compose.worker.yml down
|
||||
#
|
||||
# The master URL defaults to http://10.200.1.250:8081 (odin via WireGuard).
|
||||
# Override with PPF_MASTER_URL env var or edit .env file.
|
||||
|
||||
services:
|
||||
ppf-worker:
|
||||
container_name: ppf-worker
|
||||
image: localhost/ppf-worker:latest
|
||||
build: .
|
||||
network_mode: host
|
||||
restart: unless-stopped
|
||||
stop_signal: SIGTERM
|
||||
stop_grace_period: 30s
|
||||
logging:
|
||||
driver: k8s-file
|
||||
environment:
|
||||
PYTHONUNBUFFERED: "1"
|
||||
volumes:
|
||||
- ./src:/app:ro,Z
|
||||
- ./data:/app/data:Z
|
||||
- ./config.ini:/app/config.ini:ro,Z
|
||||
- ./servers.txt:/app/servers.txt:ro,Z
|
||||
command: python -u ppf.py --worker-v2 --server ${PPF_MASTER_URL:-http://10.200.1.250:8081}
|
||||
14
config.py
14
config.py
@@ -11,7 +11,12 @@ class Config(ComboParser):
|
||||
with open(self.watchd.source_file, 'r') as handle:
|
||||
self.servers = [x.strip() for x in handle.readlines() if len(x.strip()) > 0]
|
||||
# Parse checktypes as comma-separated list
|
||||
self.watchd.checktypes = [t.strip() for t in self.watchd.checktype.split(',') if t.strip()]
|
||||
# Normalize: 'false'/'off'/'disabled' -> 'none' (SSL-only mode)
|
||||
raw_types = [t.strip().lower() for t in self.watchd.checktype.split(',') if t.strip()]
|
||||
self.watchd.checktypes = ['none' if t in ('false', 'off', 'disabled') else t for t in raw_types]
|
||||
# SSL-only mode: force ssl_first when secondary check is disabled
|
||||
if self.watchd.checktypes == ['none']:
|
||||
self.watchd.ssl_first = True
|
||||
# Apply log level from CLI flags
|
||||
if self.args.quiet:
|
||||
set_log_level('warn')
|
||||
@@ -52,12 +57,15 @@ class Config(ComboParser):
|
||||
errors.append('ppf.max_fail must be >= 1')
|
||||
|
||||
# Validate checktypes (secondary check types, ssl is handled by ssl_first)
|
||||
valid_checktypes = {'irc', 'head', 'judges'}
|
||||
# 'none' = SSL-only mode (no secondary check)
|
||||
valid_checktypes = {'irc', 'head', 'judges', 'none'}
|
||||
for ct in self.watchd.checktypes:
|
||||
if ct not in valid_checktypes:
|
||||
errors.append('watchd.checktype "%s" invalid, must be one of: %s' % (ct, ', '.join(sorted(valid_checktypes))))
|
||||
if not self.watchd.checktypes:
|
||||
errors.append('watchd.checktype must specify at least one valid type')
|
||||
if 'none' in self.watchd.checktypes and len(self.watchd.checktypes) > 1:
|
||||
errors.append('watchd.checktype "none" cannot be combined with other types')
|
||||
|
||||
# Validate engine names
|
||||
valid_engines = {'duckduckgo', 'startpage', 'brave', 'ecosia',
|
||||
@@ -112,7 +120,7 @@ class Config(ComboParser):
|
||||
self.add_item(section, 'stale_days', int, 30, 'days after which dead proxies are removed (default: 30)', False)
|
||||
self.add_item(section, 'stats_interval', int, 300, 'seconds between status reports (default: 300)', False)
|
||||
self.add_item(section, 'tor_safeguard', bool, True, 'enable tor safeguard (default: True)', False)
|
||||
self.add_item(section, 'checktype', str, 'head', 'secondary check type: irc, head, judges (used when ssl_first fails)', False)
|
||||
self.add_item(section, 'checktype', str, 'head', 'secondary check type: head, irc, judges, none/false (none = SSL-only)', False)
|
||||
self.add_item(section, 'ssl_first', bool, True, 'try SSL handshake first, fallback to checktype on failure (default: True)', False)
|
||||
self.add_item(section, 'ssl_only', bool, False, 'when ssl_first enabled, skip secondary check on SSL failure (default: False)', False)
|
||||
self.add_item(section, 'scale_cooldown', int, 10, 'seconds between thread scaling decisions (default: 10)', False)
|
||||
|
||||
50
dbs.py
50
dbs.py
@@ -98,6 +98,51 @@ def _migrate_last_seen(sqlite):
|
||||
sqlite.commit()
|
||||
|
||||
|
||||
def _migrate_uri_check_interval(sqlite):
|
||||
"""Add adaptive check_interval column to uris table."""
|
||||
try:
|
||||
sqlite.execute('SELECT check_interval FROM uris LIMIT 1')
|
||||
except Exception:
|
||||
sqlite.execute('ALTER TABLE uris ADD COLUMN check_interval INT DEFAULT 3600')
|
||||
sqlite.commit()
|
||||
|
||||
|
||||
def _migrate_uri_working_ratio(sqlite):
|
||||
"""Add working_ratio column to uris table for proxy quality tracking."""
|
||||
try:
|
||||
sqlite.execute('SELECT working_ratio FROM uris LIMIT 1')
|
||||
except Exception:
|
||||
sqlite.execute('ALTER TABLE uris ADD COLUMN working_ratio REAL DEFAULT 0.0')
|
||||
sqlite.commit()
|
||||
|
||||
|
||||
def _migrate_uri_avg_fetch_time(sqlite):
|
||||
"""Add avg_fetch_time column to uris table for fetch latency EMA."""
|
||||
try:
|
||||
sqlite.execute('SELECT avg_fetch_time FROM uris LIMIT 1')
|
||||
except Exception:
|
||||
sqlite.execute('ALTER TABLE uris ADD COLUMN avg_fetch_time INT DEFAULT 0')
|
||||
sqlite.commit()
|
||||
|
||||
|
||||
def _migrate_uri_last_worker(sqlite):
|
||||
"""Add last_worker column to uris table."""
|
||||
try:
|
||||
sqlite.execute('SELECT last_worker FROM uris LIMIT 1')
|
||||
except Exception:
|
||||
sqlite.execute('ALTER TABLE uris ADD COLUMN last_worker TEXT')
|
||||
sqlite.commit()
|
||||
|
||||
|
||||
def _migrate_uri_yield_rate(sqlite):
|
||||
"""Add yield_rate column to uris table for proxy yield EMA."""
|
||||
try:
|
||||
sqlite.execute('SELECT yield_rate FROM uris LIMIT 1')
|
||||
except Exception:
|
||||
sqlite.execute('ALTER TABLE uris ADD COLUMN yield_rate REAL DEFAULT 0.0')
|
||||
sqlite.commit()
|
||||
|
||||
|
||||
def compute_proxy_list_hash(proxies):
|
||||
"""Compute MD5 hash of sorted proxy list for change detection.
|
||||
|
||||
@@ -356,6 +401,11 @@ def create_table_if_not_exists(sqlite, dbname):
|
||||
content_hash TEXT)""")
|
||||
# Migration for existing databases
|
||||
_migrate_content_hash_column(sqlite)
|
||||
_migrate_uri_check_interval(sqlite)
|
||||
_migrate_uri_working_ratio(sqlite)
|
||||
_migrate_uri_avg_fetch_time(sqlite)
|
||||
_migrate_uri_last_worker(sqlite)
|
||||
_migrate_uri_yield_rate(sqlite)
|
||||
# Indexes for common query patterns
|
||||
sqlite.execute('CREATE INDEX IF NOT EXISTS idx_uris_error ON uris(error)')
|
||||
sqlite.execute('CREATE INDEX IF NOT EXISTS idx_uris_checktime ON uris(check_time)')
|
||||
|
||||
@@ -1,11 +0,0 @@
|
||||
version: '3.8'
|
||||
|
||||
services:
|
||||
ppf:
|
||||
build: .
|
||||
volumes:
|
||||
- .:/app
|
||||
working_dir: /app
|
||||
command: python ppf.py
|
||||
environment:
|
||||
- PYTHONUNBUFFERED=1
|
||||
4
fetch.py
4
fetch.py
@@ -56,6 +56,8 @@ class FetchSession(object):
|
||||
def fetch(self, url, head=False):
|
||||
"""Fetch URL, reusing connection if possible."""
|
||||
network_stats.set_category('scraper')
|
||||
if isinstance(url, unicode):
|
||||
url = url.encode('utf-8')
|
||||
host, port, ssl, uri = _parse_url(url)
|
||||
|
||||
# Check if we can reuse existing connection
|
||||
@@ -489,6 +491,8 @@ def fetch_contents(url, head=False, proxy=None):
|
||||
retry_messages = ('Engines cannot retrieve results', 'Rate limit exceeded')
|
||||
def _fetch_contents(url, head = False, proxy=None):
|
||||
network_stats.set_category('scraper')
|
||||
if isinstance(url, unicode):
|
||||
url = url.encode('utf-8')
|
||||
host, port, ssl, uri = _parse_url(url)
|
||||
headers=[
|
||||
'Accept-Language: en-US,en;q=0.8',
|
||||
|
||||
216
httpd.py
216
httpd.py
@@ -85,6 +85,17 @@ _url_claims = {} # url -> {worker_id, claimed_at}
|
||||
_url_claims_lock = threading.Lock()
|
||||
_url_claim_timeout = 600 # 10 min (URLs take longer to fetch+extract)
|
||||
|
||||
# URL scoring: pending proxy counts for working_ratio correlation
|
||||
_url_pending_counts = {} # url -> {total, worker_id, time}
|
||||
_url_pending_lock = threading.Lock()
|
||||
_url_database_path = None # set in ProxyAPIServer.__init__ for cross-db access
|
||||
|
||||
# URL scoring defaults (overridden by configure_url_scoring)
|
||||
_url_checktime = 3600
|
||||
_url_perfail_checktime = 3600
|
||||
_url_max_fail = 10
|
||||
_url_list_max_age_days = 7
|
||||
|
||||
# Test rate tracking: worker_id -> list of (timestamp, count) tuples
|
||||
_worker_test_history = {}
|
||||
_worker_test_history_lock = threading.Lock()
|
||||
@@ -114,6 +125,15 @@ def configure_schedule(working_checktime, fail_retry_interval, fail_retry_backof
|
||||
_max_fail = max_fail
|
||||
|
||||
|
||||
def configure_url_scoring(checktime, perfail_checktime, max_fail, list_max_age_days):
|
||||
"""Set URL scoring parameters from config."""
|
||||
global _url_checktime, _url_perfail_checktime, _url_max_fail, _url_list_max_age_days
|
||||
_url_checktime = checktime
|
||||
_url_perfail_checktime = perfail_checktime
|
||||
_url_max_fail = max_fail
|
||||
_url_list_max_age_days = list_max_age_days
|
||||
|
||||
|
||||
def _build_due_condition():
|
||||
"""Build SQL condition for proxy due check.
|
||||
|
||||
@@ -431,7 +451,16 @@ def claim_work(db, worker_id, count=100):
|
||||
return claimed
|
||||
|
||||
def claim_urls(url_db, worker_id, count=5):
|
||||
"""Claim a batch of URLs for worker-driven fetching. Returns list of URL dicts."""
|
||||
"""Claim a batch of URLs for worker-driven fetching. Returns list of URL dicts.
|
||||
|
||||
Uses score-based scheduling: high-yield URLs checked more often,
|
||||
stale/broken ones less. Score components:
|
||||
- age/interval: 1.0 when due, >1.0 when overdue
|
||||
- yield_bonus: capped at 1.0 for high-yield sources
|
||||
- quality_bonus: 0-0.5 based on working_ratio
|
||||
- error_penalty: 0-2.0 based on consecutive errors
|
||||
- stale_penalty: 0-1.0 based on unchanged fetches
|
||||
"""
|
||||
now = time.time()
|
||||
now_int = int(now)
|
||||
|
||||
@@ -441,36 +470,37 @@ def claim_urls(url_db, worker_id, count=5):
|
||||
except ImportError:
|
||||
detect_proto_from_path = None
|
||||
|
||||
# Clean expired URL claims
|
||||
# Clean expired URL claims and pending counts
|
||||
with _url_claims_lock:
|
||||
stale = [k for k, v in _url_claims.items() if now - v['claimed_at'] > _url_claim_timeout]
|
||||
for k in stale:
|
||||
del _url_claims[k]
|
||||
claimed_urls = set(_url_claims.keys())
|
||||
|
||||
# Reuse the same scheduling formula as ppf.py main loop (line 800-805):
|
||||
# WHERE error < max_fail
|
||||
# AND (check_time + checktime + ((error + stale_count) * perfail_checktime) < now)
|
||||
# AND (added > min_added OR proxies_added > 0)
|
||||
# Use defaults matching config.ppf: checktime=3600, perfail_checktime=3600, max_fail=10
|
||||
# list_max_age_days=30
|
||||
checktime = 3600
|
||||
perfail_checktime = 3600
|
||||
max_fail = 10
|
||||
list_max_age_seconds = 30 * 86400
|
||||
with _url_pending_lock:
|
||||
stale_pending = [k for k, v in _url_pending_counts.items() if now - v['time'] > 600]
|
||||
for k in stale_pending:
|
||||
del _url_pending_counts[k]
|
||||
|
||||
list_max_age_seconds = _url_list_max_age_days * 86400
|
||||
min_added = now_int - list_max_age_seconds
|
||||
|
||||
try:
|
||||
rows = url_db.execute(
|
||||
'''SELECT url, content_hash, check_time, error, stale_count,
|
||||
retrievals, proxies_added
|
||||
FROM uris
|
||||
WHERE error < ?
|
||||
AND (check_time + ? + ((error + stale_count) * ?) < ?)
|
||||
AND (added > ? OR proxies_added > 0)
|
||||
ORDER BY RANDOM()
|
||||
LIMIT ?''',
|
||||
(max_fail, checktime, perfail_checktime, now_int, min_added, count * 3)
|
||||
'''SELECT url, content_hash,
|
||||
(? - check_time) * 1.0 / MAX(COALESCE(check_interval, 3600), 1)
|
||||
+ MIN(COALESCE(yield_rate, 0) / 100.0, 1.0)
|
||||
+ COALESCE(working_ratio, 0) * 0.5
|
||||
- MIN(error * 0.3, 2.0)
|
||||
- MIN(stale_count * 0.1, 1.0)
|
||||
AS score
|
||||
FROM uris
|
||||
WHERE error < ?
|
||||
AND (? - check_time) * 1.0 / MAX(COALESCE(check_interval, 3600), 1) >= 0.8
|
||||
AND (added > ? OR proxies_added > 0)
|
||||
ORDER BY score DESC
|
||||
LIMIT ?''',
|
||||
(now_int, _url_max_fail, now_int, min_added, count * 3)
|
||||
).fetchall()
|
||||
except Exception as e:
|
||||
_log('claim_urls query error: %s' % e, 'error')
|
||||
@@ -504,9 +534,19 @@ def claim_urls(url_db, worker_id, count=5):
|
||||
|
||||
|
||||
def submit_url_reports(url_db, worker_id, reports):
|
||||
"""Process URL fetch feedback from workers. Returns count of processed reports."""
|
||||
"""Process URL fetch feedback from workers. Returns count of processed reports.
|
||||
|
||||
Updates EMA metrics per URL:
|
||||
- avg_fetch_time: exponential moving average of fetch latency
|
||||
- check_interval: adaptive interval (shrinks for productive URLs, grows for stale)
|
||||
- yield_rate: EMA of proxy count per fetch (on changed content)
|
||||
- last_worker: worker that last fetched this URL
|
||||
|
||||
Stores pending proxy count for working_ratio correlation in submit_proxy_reports.
|
||||
"""
|
||||
processed = 0
|
||||
now_int = int(time.time())
|
||||
alpha = 0.3 # EMA smoothing factor
|
||||
|
||||
for r in reports:
|
||||
url = r.get('url', '')
|
||||
@@ -523,10 +563,36 @@ def submit_url_reports(url_db, worker_id, reports):
|
||||
content_hash = r.get('content_hash')
|
||||
proxy_count = r.get('proxy_count', 0)
|
||||
changed = r.get('changed', False)
|
||||
fetch_time_ms = r.get('fetch_time_ms', 0)
|
||||
|
||||
# Fetch current row for EMA computation
|
||||
row = url_db.execute(
|
||||
'''SELECT check_interval, avg_fetch_time, yield_rate
|
||||
FROM uris WHERE url = ?''', (url,)
|
||||
).fetchone()
|
||||
if not row:
|
||||
processed += 1
|
||||
continue
|
||||
|
||||
old_interval = row[0] if row[0] is not None else 3600
|
||||
old_fetch_time = row[1] if row[1] is not None else 0
|
||||
old_yield = row[2] if row[2] is not None else 0.0
|
||||
|
||||
# EMA: avg_fetch_time
|
||||
if old_fetch_time > 0 and fetch_time_ms > 0:
|
||||
new_fetch_time = int(alpha * fetch_time_ms + (1 - alpha) * old_fetch_time)
|
||||
elif fetch_time_ms > 0:
|
||||
new_fetch_time = fetch_time_ms
|
||||
else:
|
||||
new_fetch_time = old_fetch_time
|
||||
|
||||
if success:
|
||||
if changed:
|
||||
# Content changed: reset stale_count, update hash, add proxy count
|
||||
if changed and proxy_count > 0:
|
||||
# Success + changed + proxies: converge interval toward 15min
|
||||
new_interval = max(900, int(old_interval * 0.9))
|
||||
# EMA: yield_rate
|
||||
new_yield = alpha * proxy_count + (1 - alpha) * old_yield
|
||||
|
||||
url_db.execute(
|
||||
'''UPDATE uris SET
|
||||
check_time = ?,
|
||||
@@ -534,30 +600,54 @@ def submit_url_reports(url_db, worker_id, reports):
|
||||
error = 0,
|
||||
stale_count = 0,
|
||||
content_hash = ?,
|
||||
proxies_added = proxies_added + ?
|
||||
proxies_added = proxies_added + ?,
|
||||
check_interval = ?,
|
||||
avg_fetch_time = ?,
|
||||
yield_rate = ?,
|
||||
last_worker = ?
|
||||
WHERE url = ?''',
|
||||
(now_int, content_hash, proxy_count, url)
|
||||
(now_int, content_hash, proxy_count, new_interval,
|
||||
new_fetch_time, new_yield, worker_id, url)
|
||||
)
|
||||
|
||||
# Store pending count for working_ratio correlation
|
||||
with _url_pending_lock:
|
||||
_url_pending_counts[url] = {
|
||||
'total': proxy_count,
|
||||
'worker_id': worker_id,
|
||||
'time': time.time(),
|
||||
}
|
||||
else:
|
||||
# Content unchanged: increment stale_count
|
||||
# Success + unchanged (or no proxies): drift interval toward 24h
|
||||
new_interval = min(86400, int(old_interval * 1.25))
|
||||
|
||||
url_db.execute(
|
||||
'''UPDATE uris SET
|
||||
check_time = ?,
|
||||
retrievals = retrievals + 1,
|
||||
error = 0,
|
||||
stale_count = stale_count + 1,
|
||||
content_hash = ?
|
||||
content_hash = ?,
|
||||
check_interval = ?,
|
||||
avg_fetch_time = ?,
|
||||
last_worker = ?
|
||||
WHERE url = ?''',
|
||||
(now_int, content_hash, url)
|
||||
(now_int, content_hash, new_interval,
|
||||
new_fetch_time, worker_id, url)
|
||||
)
|
||||
else:
|
||||
# Fetch failed: increment error count
|
||||
# Failure: back off faster
|
||||
new_interval = min(86400, int(old_interval * 1.5))
|
||||
|
||||
url_db.execute(
|
||||
'''UPDATE uris SET
|
||||
check_time = ?,
|
||||
error = error + 1
|
||||
error = error + 1,
|
||||
check_interval = ?,
|
||||
avg_fetch_time = ?,
|
||||
last_worker = ?
|
||||
WHERE url = ?''',
|
||||
(now_int, url)
|
||||
(now_int, new_interval, new_fetch_time, worker_id, url)
|
||||
)
|
||||
|
||||
processed += 1
|
||||
@@ -568,16 +658,68 @@ def submit_url_reports(url_db, worker_id, reports):
|
||||
return processed
|
||||
|
||||
|
||||
def _update_url_working_ratios(url_working_counts):
|
||||
"""Correlate working proxy counts with pending totals to update working_ratio.
|
||||
|
||||
Called after submit_proxy_reports processes all proxies. For each source_url
|
||||
with a pending entry from submit_url_reports, computes:
|
||||
ratio = working_count / pending_total
|
||||
working_ratio = alpha * ratio + (1 - alpha) * old_working_ratio
|
||||
"""
|
||||
if not url_working_counts or not _url_database_path:
|
||||
return
|
||||
|
||||
alpha = 0.3
|
||||
settled = []
|
||||
|
||||
with _url_pending_lock:
|
||||
pending_snapshot = dict(_url_pending_counts)
|
||||
|
||||
try:
|
||||
url_db = mysqlite.mysqlite(_url_database_path, str)
|
||||
for url, working_count in url_working_counts.items():
|
||||
pending = pending_snapshot.get(url)
|
||||
if not pending or pending['total'] <= 0:
|
||||
continue
|
||||
|
||||
ratio = min(float(working_count) / pending['total'], 1.0)
|
||||
|
||||
row = url_db.execute(
|
||||
'SELECT working_ratio FROM uris WHERE url = ?', (url,)
|
||||
).fetchone()
|
||||
old_ratio = row[0] if row and row[0] is not None else 0.0
|
||||
new_ratio = alpha * ratio + (1 - alpha) * old_ratio
|
||||
|
||||
url_db.execute(
|
||||
'UPDATE uris SET working_ratio = ? WHERE url = ?',
|
||||
(new_ratio, url)
|
||||
)
|
||||
settled.append(url)
|
||||
|
||||
url_db.commit()
|
||||
url_db.close()
|
||||
except Exception as e:
|
||||
_log('_update_url_working_ratios error: %s' % e, 'error')
|
||||
|
||||
# Remove settled entries from pending
|
||||
if settled:
|
||||
with _url_pending_lock:
|
||||
for url in settled:
|
||||
_url_pending_counts.pop(url, None)
|
||||
|
||||
|
||||
def submit_proxy_reports(db, worker_id, proxies):
|
||||
"""Process working-proxy reports from workers. Returns count of processed proxies.
|
||||
|
||||
Simplified trust-based model: workers report only working proxies.
|
||||
Each proxy is upserted with failed=0, last_seen=now, latency updated.
|
||||
Also tracks per-URL working counts for working_ratio correlation.
|
||||
"""
|
||||
global _last_workers_save
|
||||
processed = 0
|
||||
now_int = int(time.time())
|
||||
now = time.time()
|
||||
url_working_counts = {} # source_url -> working count
|
||||
|
||||
for p in proxies:
|
||||
ip = p.get('ip', '')
|
||||
@@ -615,6 +757,10 @@ def submit_proxy_reports(db, worker_id, proxies):
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
# Track per-URL working count for working_ratio
|
||||
if source_url:
|
||||
url_working_counts[source_url] = url_working_counts.get(source_url, 0) + 1
|
||||
|
||||
processed += 1
|
||||
except Exception as e:
|
||||
_log('submit_proxy_reports error for %s: %s' % (proxy_key, e), 'error')
|
||||
@@ -622,6 +768,10 @@ def submit_proxy_reports(db, worker_id, proxies):
|
||||
# Commit database changes
|
||||
db.commit()
|
||||
|
||||
# Update working_ratio for source URLs
|
||||
if url_working_counts:
|
||||
_update_url_working_ratios(url_working_counts)
|
||||
|
||||
# Update worker stats
|
||||
with _workers_lock:
|
||||
if worker_id in _workers:
|
||||
@@ -1398,6 +1548,8 @@ class ProxyAPIServer(threading.Thread):
|
||||
self.stats_provider = stats_provider
|
||||
self.profiling = profiling
|
||||
self.daemon = True
|
||||
global _url_database_path
|
||||
_url_database_path = url_database
|
||||
self.server = None
|
||||
self._stop_event = threading.Event() if not GEVENT_PATCHED else None
|
||||
# Load static library files into cache
|
||||
|
||||
422
ppf.py
422
ppf.py
@@ -493,6 +493,7 @@ def worker_main(config):
|
||||
worker_name = config.args.worker_name or os.uname()[1]
|
||||
batch_size = config.worker.batch_size
|
||||
num_threads = config.watchd.threads
|
||||
worker_id = None
|
||||
|
||||
# Register if --register flag or no key provided
|
||||
if config.args.register or not worker_key:
|
||||
@@ -772,6 +773,412 @@ def worker_main(config):
|
||||
_log(' proxies tested: %d' % proxies_tested, 'info')
|
||||
|
||||
|
||||
def worker_v2_main(config):
|
||||
"""V2 worker mode -- URL-driven discovery.
|
||||
|
||||
Claims URLs from master, fetches through Tor, extracts and tests proxies,
|
||||
reports working proxies back to master.
|
||||
"""
|
||||
import json
|
||||
global urllib2
|
||||
|
||||
try:
|
||||
import Queue
|
||||
except ImportError:
|
||||
import queue as Queue
|
||||
|
||||
import proxywatchd
|
||||
proxywatchd.set_config(config)
|
||||
|
||||
server_url = config.args.server
|
||||
if not server_url:
|
||||
_log('--server URL required for worker mode', 'error')
|
||||
sys.exit(1)
|
||||
|
||||
worker_key = config.args.worker_key
|
||||
worker_name = config.args.worker_name or os.uname()[1]
|
||||
num_threads = config.watchd.threads
|
||||
url_batch_size = config.worker.url_batch_size
|
||||
worker_id = None
|
||||
|
||||
# Register if --register flag or no key provided
|
||||
if config.args.register or not worker_key:
|
||||
_log('registering with master: %s' % server_url, 'info')
|
||||
worker_id, worker_key = worker_register(server_url, worker_name)
|
||||
if not worker_key:
|
||||
_log('registration failed, exiting', 'error')
|
||||
sys.exit(1)
|
||||
_log('registered as %s (id: %s)' % (worker_name, worker_id), 'info')
|
||||
_log('worker key: %s' % worker_key, 'info')
|
||||
_log('save this key with --worker-key for future runs', 'info')
|
||||
|
||||
if config.args.register:
|
||||
return
|
||||
|
||||
_log('starting worker V2 mode (URL-driven)', 'info')
|
||||
_log(' server: %s' % server_url, 'info')
|
||||
_log(' threads: %d' % num_threads, 'info')
|
||||
_log(' url batch: %d' % url_batch_size, 'info')
|
||||
_log(' tor hosts: %s' % config.common.tor_hosts, 'info')
|
||||
|
||||
# Verify Tor connectivity before starting
|
||||
import socks
|
||||
working_tor_hosts = []
|
||||
for tor_host in config.torhosts:
|
||||
host, port = tor_host.split(':')
|
||||
port = int(port)
|
||||
try:
|
||||
test_sock = socks.socksocket()
|
||||
test_sock.set_proxy(socks.SOCKS5, host, port)
|
||||
test_sock.settimeout(10)
|
||||
test_sock.connect(('check.torproject.org', 80))
|
||||
test_sock.send(b'GET / HTTP/1.0\r\nHost: check.torproject.org\r\n\r\n')
|
||||
resp = test_sock.recv(512)
|
||||
test_sock.close()
|
||||
if resp and (b'HTTP/' in resp or len(resp) > 0):
|
||||
status = resp.split(b'\r\n')[0] if b'\r\n' in resp else resp[:50]
|
||||
_log('tor host %s:%d OK (%s)' % (host, port, status), 'info')
|
||||
working_tor_hosts.append(tor_host)
|
||||
else:
|
||||
_log('tor host %s:%d no response' % (host, port), 'warn')
|
||||
except Exception as e:
|
||||
_log('tor host %s:%d failed: %s' % (host, port, e), 'warn')
|
||||
|
||||
if not working_tor_hosts:
|
||||
_log('no working Tor hosts, cannot start worker', 'error')
|
||||
sys.exit(1)
|
||||
|
||||
_log('%d/%d Tor hosts verified' % (len(working_tor_hosts), len(config.torhosts)), 'info')
|
||||
|
||||
# Create shared queues for worker threads
|
||||
job_queue = proxywatchd.PriorityJobQueue()
|
||||
completion_queue = Queue.Queue()
|
||||
|
||||
# Spawn worker threads
|
||||
threads = []
|
||||
for i in range(num_threads):
|
||||
wt = proxywatchd.WorkerThread('w%d' % i, job_queue)
|
||||
wt.start_thread()
|
||||
threads.append(wt)
|
||||
time.sleep(random.random() / 10)
|
||||
|
||||
_log('spawned %d worker threads' % len(threads), 'info')
|
||||
|
||||
# Session for fetching URLs through Tor
|
||||
session = fetch.FetchSession()
|
||||
|
||||
cycles = 0
|
||||
urls_fetched = 0
|
||||
proxies_found = 0
|
||||
proxies_working = 0
|
||||
start_time = time.time()
|
||||
current_tor_ip = None
|
||||
consecutive_tor_failures = 0
|
||||
worker_profiling = config.args.profile or config.common.profiling
|
||||
wstate = {'worker_key': worker_key, 'worker_id': worker_id, 'backoff': 10}
|
||||
|
||||
def do_register():
|
||||
"""Register with master, with exponential backoff on failure."""
|
||||
while True:
|
||||
_log('registering with master: %s' % server_url, 'info')
|
||||
new_id, new_key = worker_register(server_url, worker_name)
|
||||
if new_key:
|
||||
wstate['worker_id'] = new_id
|
||||
wstate['worker_key'] = new_key
|
||||
wstate['backoff'] = 10
|
||||
_log('registered as %s (id: %s)' % (worker_name, new_id), 'info')
|
||||
return True
|
||||
else:
|
||||
_log('registration failed, retrying in %ds' % wstate['backoff'], 'warn')
|
||||
time.sleep(wstate['backoff'])
|
||||
wstate['backoff'] = min(wstate['backoff'] * 2, 300)
|
||||
|
||||
def wait_for_tor():
|
||||
"""Wait for Tor to become available, checking every 30 seconds."""
|
||||
check_interval = 30
|
||||
while True:
|
||||
working, tor_ip = check_tor_connectivity(config.torhosts)
|
||||
if working:
|
||||
_log('tor recovered: %s (%s)' % (working[0], tor_ip or 'unknown'), 'info')
|
||||
try:
|
||||
worker_send_heartbeat(server_url, wstate['worker_key'], True, tor_ip, worker_profiling, num_threads)
|
||||
except NeedReregister:
|
||||
do_register()
|
||||
return working, tor_ip
|
||||
_log('tor still down, retrying in %ds' % check_interval, 'warn')
|
||||
try:
|
||||
worker_send_heartbeat(server_url, wstate['worker_key'], False, None, worker_profiling, num_threads)
|
||||
except NeedReregister:
|
||||
do_register()
|
||||
time.sleep(check_interval)
|
||||
|
||||
try:
|
||||
while True:
|
||||
# Tor connectivity check
|
||||
working, tor_ip = check_tor_connectivity(config.torhosts)
|
||||
if not working:
|
||||
consecutive_tor_failures += 1
|
||||
_log('tor down before claiming URLs (consecutive: %d)' % consecutive_tor_failures, 'warn')
|
||||
try:
|
||||
worker_send_heartbeat(server_url, wstate['worker_key'], False, None, worker_profiling, num_threads)
|
||||
except NeedReregister:
|
||||
do_register()
|
||||
if consecutive_tor_failures >= 2:
|
||||
_log('tor appears down, waiting before claiming URLs', 'error')
|
||||
working, current_tor_ip = wait_for_tor()
|
||||
consecutive_tor_failures = 0
|
||||
else:
|
||||
time.sleep(10)
|
||||
continue
|
||||
else:
|
||||
consecutive_tor_failures = 0
|
||||
if tor_ip != current_tor_ip:
|
||||
if current_tor_ip:
|
||||
_log('tor circuit rotated: %s' % tor_ip, 'info')
|
||||
current_tor_ip = tor_ip
|
||||
try:
|
||||
worker_send_heartbeat(server_url, wstate['worker_key'], True, tor_ip, worker_profiling, num_threads)
|
||||
except NeedReregister:
|
||||
do_register()
|
||||
|
||||
# Claim URLs from master
|
||||
try:
|
||||
url_infos = worker_claim_urls(server_url, wstate['worker_key'], url_batch_size)
|
||||
except NeedReregister:
|
||||
do_register()
|
||||
continue
|
||||
|
||||
if not url_infos:
|
||||
_log('no URLs available, sleeping 30s', 'info')
|
||||
time.sleep(30)
|
||||
continue
|
||||
|
||||
_log('claimed %d URLs to process' % len(url_infos), 'info')
|
||||
|
||||
# Phase 1: Fetch URLs and extract proxies
|
||||
url_reports = []
|
||||
all_extracted = [] # list of (addr, proto, confidence, source_url)
|
||||
|
||||
for url_info in url_infos:
|
||||
url = url_info.get('url', '')
|
||||
last_hash = url_info.get('last_hash')
|
||||
proto_hint = url_info.get('proto_hint')
|
||||
|
||||
fetch_start = time.time()
|
||||
try:
|
||||
content = session.fetch(url)
|
||||
except Exception as e:
|
||||
_log('%s: fetch error: %s' % (url.split('/')[2] if '/' in url else url, e), 'error')
|
||||
content = None
|
||||
|
||||
fetch_time_ms = int((time.time() - fetch_start) * 1000)
|
||||
urls_fetched += 1
|
||||
|
||||
if not content:
|
||||
url_reports.append({
|
||||
'url': url,
|
||||
'success': False,
|
||||
'content_hash': None,
|
||||
'proxy_count': 0,
|
||||
'fetch_time_ms': fetch_time_ms,
|
||||
'changed': False,
|
||||
'error': 'fetch failed',
|
||||
})
|
||||
continue
|
||||
|
||||
# Detect protocol from URL path
|
||||
proto = fetch.detect_proto_from_path(url) or proto_hint
|
||||
|
||||
# Extract proxies (no filter_known -- workers have no proxydb)
|
||||
extracted = fetch.extract_proxies(content, filter_known=False, proto=proto)
|
||||
|
||||
# Compute hash of extracted proxy list
|
||||
content_hash = dbs.compute_proxy_list_hash(extracted)
|
||||
|
||||
if content_hash and last_hash and content_hash == last_hash:
|
||||
# Content unchanged
|
||||
url_reports.append({
|
||||
'url': url,
|
||||
'success': True,
|
||||
'content_hash': content_hash,
|
||||
'proxy_count': len(extracted),
|
||||
'fetch_time_ms': fetch_time_ms,
|
||||
'changed': False,
|
||||
'error': None,
|
||||
})
|
||||
host = url.split('/')[2] if '/' in url else url
|
||||
_log('%s: unchanged (%d proxies, hash match)' % (host, len(extracted)), 'stale')
|
||||
continue
|
||||
|
||||
# Content changed or first fetch
|
||||
for addr, pr, conf in extracted:
|
||||
all_extracted.append((addr, pr, conf, url))
|
||||
|
||||
url_reports.append({
|
||||
'url': url,
|
||||
'success': True,
|
||||
'content_hash': content_hash,
|
||||
'proxy_count': len(extracted),
|
||||
'fetch_time_ms': fetch_time_ms,
|
||||
'changed': True,
|
||||
'error': None,
|
||||
})
|
||||
|
||||
host = url.split('/')[2] if '/' in url else url
|
||||
_log('%s: %d proxies extracted' % (host, len(extracted)), 'info')
|
||||
|
||||
# Report URL health to master
|
||||
if url_reports:
|
||||
try:
|
||||
worker_report_urls(server_url, wstate['worker_key'], url_reports)
|
||||
except NeedReregister:
|
||||
do_register()
|
||||
try:
|
||||
worker_report_urls(server_url, wstate['worker_key'], url_reports)
|
||||
except NeedReregister:
|
||||
_log('still rejected after re-register, discarding url reports', 'error')
|
||||
|
||||
# Deduplicate extracted proxies by address
|
||||
seen = set()
|
||||
unique_proxies = []
|
||||
source_map = {} # addr -> first source_url
|
||||
for addr, pr, conf, source_url in all_extracted:
|
||||
if addr not in seen:
|
||||
seen.add(addr)
|
||||
unique_proxies.append((addr, pr, conf))
|
||||
source_map[addr] = source_url
|
||||
|
||||
proxies_found += len(unique_proxies)
|
||||
|
||||
if not unique_proxies:
|
||||
cycles += 1
|
||||
time.sleep(1)
|
||||
continue
|
||||
|
||||
_log('testing %d unique proxies' % len(unique_proxies), 'info')
|
||||
|
||||
# Phase 2: Test extracted proxies using worker thread pool
|
||||
pending_states = {}
|
||||
all_jobs = []
|
||||
checktypes = config.watchd.checktypes
|
||||
|
||||
for addr, pr, conf in unique_proxies:
|
||||
# Parse ip:port from addr (may contain auth: user:pass@ip:port)
|
||||
addr_part = addr.split('@')[-1] if '@' in addr else addr
|
||||
|
||||
# Handle IPv6 [ipv6]:port
|
||||
if addr_part.startswith('['):
|
||||
bracket_end = addr_part.index(']')
|
||||
ip = addr_part[1:bracket_end]
|
||||
port = int(addr_part[bracket_end+2:])
|
||||
else:
|
||||
ip, port_str = addr_part.rsplit(':', 1)
|
||||
port = int(port_str)
|
||||
|
||||
proto = pr or 'http'
|
||||
proxy_str = '%s:%d' % (ip, port)
|
||||
|
||||
state = proxywatchd.ProxyTestState(
|
||||
ip, port, proto, 0,
|
||||
success_count=0, total_duration=0.0,
|
||||
country=None, mitm=0, consecutive_success=0,
|
||||
asn=None, oldies=False,
|
||||
completion_queue=completion_queue,
|
||||
proxy_full=addr, source_proto=pr
|
||||
)
|
||||
pending_states[proxy_str] = state
|
||||
|
||||
checktype = random.choice(checktypes)
|
||||
|
||||
if checktype == 'judges':
|
||||
available = proxywatchd.judge_stats.get_available_judges(
|
||||
list(proxywatchd.judges.keys()))
|
||||
target = random.choice(available) if available else random.choice(
|
||||
list(proxywatchd.judges.keys()))
|
||||
elif checktype == 'ssl':
|
||||
target = random.choice(proxywatchd.ssl_targets)
|
||||
elif checktype == 'irc':
|
||||
target = random.choice(config.servers) if config.servers else 'irc.libera.chat:6667'
|
||||
else: # head
|
||||
target = random.choice(list(proxywatchd.regexes.keys()))
|
||||
|
||||
job = proxywatchd.TargetTestJob(state, target, checktype)
|
||||
all_jobs.append(job)
|
||||
|
||||
random.shuffle(all_jobs)
|
||||
for job in all_jobs:
|
||||
job_queue.put(job, priority=0)
|
||||
|
||||
# Wait for completion
|
||||
completed = 0
|
||||
timeout_start = time.time()
|
||||
timeout_seconds = max(config.watchd.timeout * 2 + 30, len(all_jobs) * 0.5)
|
||||
working_results = []
|
||||
|
||||
while completed < len(all_jobs):
|
||||
try:
|
||||
state = completion_queue.get(timeout=1)
|
||||
completed += 1
|
||||
|
||||
if state.failcount == 0:
|
||||
latency_sec = (state.last_latency_ms / 1000.0) if state.last_latency_ms else 0
|
||||
proxy_addr = state.proxy
|
||||
if state.auth:
|
||||
proxy_addr = '%s@%s' % (state.auth, state.proxy)
|
||||
|
||||
working_results.append({
|
||||
'ip': state.ip,
|
||||
'port': state.port,
|
||||
'proto': state.proto,
|
||||
'source_proto': state.source_proto,
|
||||
'latency': round(latency_sec, 3),
|
||||
'exit_ip': state.exit_ip,
|
||||
'source_url': source_map.get(proxy_addr) or source_map.get(state.proxy, ''),
|
||||
})
|
||||
|
||||
if completed % 50 == 0 or completed == len(all_jobs):
|
||||
_log('tested %d/%d proxies (%d working)' % (
|
||||
completed, len(all_jobs), len(working_results)), 'info')
|
||||
|
||||
except Queue.Empty:
|
||||
if time.time() - timeout_start > timeout_seconds:
|
||||
_log('test timeout, %d/%d completed' % (completed, len(all_jobs)), 'warn')
|
||||
break
|
||||
continue
|
||||
|
||||
proxies_working += len(working_results)
|
||||
|
||||
# Report working proxies to master
|
||||
if working_results:
|
||||
try:
|
||||
processed = worker_report_proxies(server_url, wstate['worker_key'], working_results)
|
||||
except NeedReregister:
|
||||
do_register()
|
||||
try:
|
||||
processed = worker_report_proxies(server_url, wstate['worker_key'], working_results)
|
||||
except NeedReregister:
|
||||
_log('still rejected after re-register, discarding proxy reports', 'error')
|
||||
processed = 0
|
||||
_log('reported %d working proxies (submitted %d)' % (len(working_results), processed), 'info')
|
||||
|
||||
cycles += 1
|
||||
time.sleep(1)
|
||||
|
||||
except KeyboardInterrupt:
|
||||
elapsed = time.time() - start_time
|
||||
_log('worker V2 stopping...', 'info')
|
||||
session.close()
|
||||
for wt in threads:
|
||||
wt.stop()
|
||||
for wt in threads:
|
||||
wt.term()
|
||||
_log('worker V2 stopped after %s' % format_duration(int(elapsed)), 'info')
|
||||
_log(' cycles: %d' % cycles, 'info')
|
||||
_log(' urls fetched: %d' % urls_fetched, 'info')
|
||||
_log(' proxies found: %d' % proxies_found, 'info')
|
||||
_log(' proxies working: %d' % proxies_working, 'info')
|
||||
|
||||
|
||||
def main():
|
||||
"""Main entry point."""
|
||||
global config
|
||||
@@ -784,7 +1191,12 @@ def main():
|
||||
else:
|
||||
sys.exit(1)
|
||||
|
||||
# Worker mode: connect to master server instead of running locally
|
||||
# V2 worker mode: URL-driven discovery
|
||||
if config.args.worker_v2:
|
||||
worker_v2_main(config)
|
||||
return
|
||||
|
||||
# V1 worker mode: connect to master server instead of running locally
|
||||
if config.args.worker or config.args.register:
|
||||
worker_main(config)
|
||||
return
|
||||
@@ -813,8 +1225,14 @@ def main():
|
||||
watcherd = None
|
||||
# Start httpd independently when watchd is disabled
|
||||
if config.httpd.enabled:
|
||||
from httpd import ProxyAPIServer
|
||||
from httpd import ProxyAPIServer, configure_url_scoring
|
||||
import network_stats
|
||||
configure_url_scoring(
|
||||
config.ppf.checktime,
|
||||
config.ppf.perfail_checktime,
|
||||
config.ppf.max_fail,
|
||||
config.ppf.list_max_age_days
|
||||
)
|
||||
|
||||
def httpd_stats_provider():
|
||||
"""Stats provider for httpd-only mode (scraping without testing)."""
|
||||
|
||||
@@ -142,6 +142,20 @@ def is_valid_ip(ip_str):
|
||||
except (ValueError, AttributeError):
|
||||
return False
|
||||
|
||||
def is_public_ip(ip_str):
|
||||
"""Validate IP is a public, globally routable address."""
|
||||
if not is_valid_ip(ip_str):
|
||||
return False
|
||||
parts = [int(p) for p in ip_str.split('.')]
|
||||
if parts[0] == 0: return False # 0.0.0.0/8
|
||||
if parts[0] == 10: return False # 10.0.0.0/8
|
||||
if parts[0] == 127: return False # 127.0.0.0/8
|
||||
if parts[0] == 169 and parts[1] == 254: return False # link-local
|
||||
if parts[0] == 172 and 16 <= parts[1] <= 31: return False # 172.16/12
|
||||
if parts[0] == 192 and parts[1] == 168: return False # 192.168/16
|
||||
if parts[0] >= 224: return False # multicast + reserved
|
||||
return True
|
||||
|
||||
# Pattern for header echo - if X-Forwarded-For or Via present, proxy reveals chain
|
||||
HEADER_REVEAL_PATTERN = r'(X-Forwarded-For|Via|X-Real-Ip|Forwarded)'
|
||||
|
||||
@@ -393,10 +407,20 @@ class ProxyTestState(object):
|
||||
self.evaluated = True
|
||||
self.checktime = int(time.time())
|
||||
|
||||
successes = [r for r in self.results if r['success']]
|
||||
failures = [r for r in self.results if not r['success']]
|
||||
# Filter out judge_block results (inconclusive, neither pass nor fail)
|
||||
real_results = [r for r in self.results if r.get('category') != 'judge_block']
|
||||
successes = [r for r in real_results if r['success']]
|
||||
failures = [r for r in real_results if not r['success']]
|
||||
num_success = len(successes)
|
||||
_dbg('evaluate: %d success, %d fail, results=%d' % (num_success, len(failures), len(self.results)), self.proxy)
|
||||
judge_blocks = len(self.results) - len(real_results)
|
||||
_dbg('evaluate: %d success, %d fail, %d judge_block, results=%d' % (
|
||||
num_success, len(failures), judge_blocks, len(self.results)), self.proxy)
|
||||
|
||||
# All results were judge blocks: inconclusive, preserve current state
|
||||
if not real_results and self.results:
|
||||
_dbg('all results inconclusive (judge_block), no state change', self.proxy)
|
||||
self.failcount = self.original_failcount
|
||||
return (self.original_failcount == 0, None)
|
||||
|
||||
# Determine dominant failure category
|
||||
fail_category = None
|
||||
@@ -547,6 +571,12 @@ class TargetTestJob(object):
|
||||
recv = sock.recv(-1)
|
||||
_sample_dbg('RECV: %d bytes from %s, first 80: %r' % (len(recv), srv, recv[:80]), self.proxy_state.proxy)
|
||||
|
||||
# Validate HTTP response for non-IRC checks
|
||||
if self.checktype != 'irc' and not recv.startswith('HTTP/'):
|
||||
_dbg('not an HTTP response, failing (first 40: %r)' % recv[:40], self.proxy_state.proxy)
|
||||
self.proxy_state.record_result(False, category='bad_response')
|
||||
return
|
||||
|
||||
# Select regex based on check type (or fallback target)
|
||||
if 'check.torproject.org' in srv:
|
||||
# Tor API fallback (judge using torproject.org)
|
||||
@@ -571,7 +601,7 @@ class TargetTestJob(object):
|
||||
reveals_headers = None
|
||||
if self.checktype == 'judges' or 'check.torproject.org' in srv:
|
||||
ip_match = re.search(IP_PATTERN, recv)
|
||||
if ip_match and is_valid_ip(ip_match.group(0)):
|
||||
if ip_match and is_public_ip(ip_match.group(0)):
|
||||
exit_ip = ip_match.group(0)
|
||||
if self.checktype == 'judges' and 'check.torproject.org' not in srv:
|
||||
# Check for header echo judge (elite detection)
|
||||
@@ -590,17 +620,14 @@ class TargetTestJob(object):
|
||||
# Check if judge is blocking us (not a proxy failure)
|
||||
if self.checktype == 'judges' and JUDGE_BLOCK_RE.search(recv):
|
||||
judge_stats.record_block(srv)
|
||||
# Judge block = proxy worked, we got HTTP response, just no IP
|
||||
# Count as success without exit_ip
|
||||
block_elapsed = time.time() - duration
|
||||
_dbg('judge BLOCK detected, counting as success', self.proxy_state.proxy)
|
||||
# Judge block = inconclusive, not a pass or fail
|
||||
_dbg('judge BLOCK detected, skipping (neutral)', self.proxy_state.proxy)
|
||||
self.proxy_state.record_result(
|
||||
True, proto=proto, duration=block_elapsed,
|
||||
srv=srv, tor=tor, ssl=is_ssl, exit_ip=None,
|
||||
reveals_headers=None
|
||||
False, category='judge_block', proto=proto,
|
||||
srv=srv, tor=tor, ssl=is_ssl
|
||||
)
|
||||
if config.watchd.debug:
|
||||
_log('judge %s challenged proxy %s (counted as success)' % (
|
||||
_log('judge %s challenged proxy %s (neutral, skipped)' % (
|
||||
srv, self.proxy_state.proxy), 'debug')
|
||||
else:
|
||||
_dbg('FAIL: no match, no block', self.proxy_state.proxy)
|
||||
@@ -675,15 +702,14 @@ class TargetTestJob(object):
|
||||
protos = self._build_proto_order()
|
||||
pool = connection_pool.get_pool()
|
||||
|
||||
# Phase 1: SSL handshake (if ssl_first enabled)
|
||||
if config.watchd.ssl_first:
|
||||
# Phase 1: SSL handshake (if ssl_first enabled or SSL-only mode)
|
||||
if config.watchd.ssl_first or self.checktype == 'none':
|
||||
result = self._try_ssl_handshake(protos, pool)
|
||||
if result is not None:
|
||||
return result # SSL succeeded or MITM detected
|
||||
# SSL failed for all protocols
|
||||
if config.watchd.ssl_only:
|
||||
# ssl_only mode: skip secondary check, mark as failed
|
||||
_dbg('SSL failed, ssl_only mode, skipping secondary check', ps.proxy)
|
||||
if config.watchd.ssl_only or self.checktype == 'none':
|
||||
_dbg('SSL failed, no secondary check', ps.proxy)
|
||||
return (None, None, 0, None, None, 1, 0, 'ssl_only')
|
||||
_dbg('SSL failed, trying secondary check: %s' % self.checktype, ps.proxy)
|
||||
|
||||
@@ -1357,7 +1383,11 @@ class Proxywatchd():
|
||||
# Build target pools for each checktype
|
||||
target_pools = {}
|
||||
for ct in checktypes:
|
||||
if ct == 'irc':
|
||||
if ct == 'none':
|
||||
# SSL-only mode: use ssl_targets as placeholder
|
||||
target_pools[ct] = ssl_targets
|
||||
_dbg('target_pool[none]: SSL-only mode, %d ssl targets' % len(ssl_targets))
|
||||
elif ct == 'irc':
|
||||
target_pools[ct] = config.servers
|
||||
_dbg('target_pool[irc]: %d servers' % len(config.servers))
|
||||
elif ct == 'judges':
|
||||
@@ -1752,7 +1782,7 @@ class Proxywatchd():
|
||||
|
||||
# Start HTTP API server if enabled
|
||||
if config.httpd.enabled:
|
||||
from httpd import ProxyAPIServer, configure_schedule
|
||||
from httpd import ProxyAPIServer, configure_schedule, configure_url_scoring
|
||||
# Pass schedule config to httpd module
|
||||
configure_schedule(
|
||||
config.watchd.working_checktime,
|
||||
@@ -1760,6 +1790,12 @@ class Proxywatchd():
|
||||
config.watchd.fail_retry_backoff,
|
||||
config.watchd.max_fail
|
||||
)
|
||||
configure_url_scoring(
|
||||
config.ppf.checktime,
|
||||
config.ppf.perfail_checktime,
|
||||
config.ppf.max_fail,
|
||||
config.ppf.list_max_age_days
|
||||
)
|
||||
self.httpd_server = ProxyAPIServer(
|
||||
config.httpd.listenip,
|
||||
config.httpd.port,
|
||||
|
||||
5
stats.py
5
stats.py
@@ -107,11 +107,9 @@ regexes = {
|
||||
'www.twitter.com': 'x-connection-hash',
|
||||
't.co': 'x-connection-hash',
|
||||
'www.msn.com': 'x-aspnetmvc-version',
|
||||
'www.bing.com': 'p3p',
|
||||
'www.ask.com': 'x-served-by',
|
||||
'www.hotmail.com': 'x-msedge-ref',
|
||||
'www.bbc.co.uk': 'x-bbc-edge-cache-status',
|
||||
'www.skype.com': 'X-XSS-Protection',
|
||||
'www.alibaba.com': 'object-status',
|
||||
'www.mozilla.org': 'cf-ray',
|
||||
'www.cloudflare.com': 'cf-ray',
|
||||
@@ -121,7 +119,6 @@ regexes = {
|
||||
'www.netflix.com': 'X-Netflix.proxy.execution-time',
|
||||
'www.amazon.de': 'x-amz-cf-id',
|
||||
'www.reuters.com': 'x-amz-cf-id',
|
||||
'www.ikea.com': 'x-frame-options',
|
||||
'www.twitpic.com': 'timing-allow-origin',
|
||||
'www.digg.com': 'cf-request-id',
|
||||
'www.wikia.com': 'x-served-by',
|
||||
@@ -133,8 +130,6 @@ regexes = {
|
||||
'www.yelp.com': 'x-timer',
|
||||
'www.ebay.com': 'x-envoy-upstream-service-time',
|
||||
'www.wikihow.com': 'x-c',
|
||||
'www.archive.org': 'referrer-policy',
|
||||
'www.pandora.tv': 'X-UA-Compatible',
|
||||
'www.w3.org': 'x-backend',
|
||||
'www.time.com': 'x-amz-cf-pop'
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user