diff --git a/documentation/design-worker-driven-discovery.md b/documentation/design-worker-driven-discovery.md new file mode 100644 index 0000000..5adf931 --- /dev/null +++ b/documentation/design-worker-driven-discovery.md @@ -0,0 +1,572 @@ +# Design: Worker-Driven Discovery + +## Status + +**Proposal** -- Not yet implemented. + +## Problem + +The current architecture centralizes all proxy list fetching on the master +node (odin). Workers only test proxies handed to them. This creates several +issues: + +1. **Single point of fetch** -- If odin can't reach a source (blocked IP, + transient failure), that source is dead for everyone. +2. **Bandwidth concentration** -- Odin fetches 40 proxy lists every cycle, + extracts proxies, deduplicates, and stores them before workers ever see + them. +3. **Wasted vantage points** -- Workers sit behind different Tor exits and + IPs, but never use that diversity for fetching. +4. **Tight coupling** -- Workers can't operate at all without the master's + claim queue. If odin restarts, all workers stall. + +## Proposed Architecture + +Move proxy list fetching to workers. Master becomes a coordinator and +aggregator rather than a fetcher. + +``` +Current: Proposed: + +Master Master + Fetch URLs --------+ Manage URL database + Extract proxies | Score URLs from feedback + Store proxylist | Aggregate working proxies + Serve /api/work ---+-> Workers Serve /api/claim-urls ----> Workers + <- /api/results | + <- /api/report-urls ------+ + <- /api/report-proxies ---+ +``` + +### Role Changes + +``` ++--------+---------------------------+----------------------------------+ +| Host | Current Role | New Role | ++--------+---------------------------+----------------------------------+ +| odin | Fetch URLs | Maintain URL database | +| | Extract proxies | Score URLs from worker feedback | +| | Store proxylist | Aggregate reported proxies | +| | Distribute proxy batches | Distribute URL batches | +| | Collect test results | Collect URL + proxy reports | ++--------+---------------------------+----------------------------------+ +| worker | Claim proxy batch | Claim URL batch | +| | Test each proxy | Fetch URL, extract proxies | +| | Report pass/fail | Test extracted proxies | +| | | Report URL health + proxy results| ++--------+---------------------------+----------------------------------+ +``` + +## Data Flow + +### Phase 1: URL Claiming + +Worker requests a batch of URLs to process. + +``` +Worker Master + | | + | GET /api/claim-urls | + | ?key=...&count=5 | + |------------------------------>| + | | Select due URLs from uris table + | | Mark as claimed (in-memory) + | [{url, last_hash, proto_hint}, ...] + |<------------------------------| +``` + +**Claim response:** +```json +{ + "worker_id": "abc123", + "urls": [ + { + "url": "https://raw.githubusercontent.com/.../http.txt", + "last_hash": "a1b2c3d4...", + "proto_hint": "http", + "priority": 1 + } + ] +} +``` + +Fields: +- `last_hash` -- MD5 of last extracted proxy list. Worker can skip + extraction and report "unchanged" if hash matches, saving CPU. +- `proto_hint` -- Protocol inferred from URL path. Worker uses this for + extraction confidence scoring. +- `priority` -- Higher = fetch sooner. Based on URL score. + +### Phase 2: Fetch and Extract + +Worker fetches each URL through Tor, extracts proxies using the existing +`fetch.extract_proxies()` pipeline. + +``` +Worker + | + | For each claimed URL: + | 1. Fetch through Tor (fetch_contents) + | 2. Compute content hash (MD5) + | 3. If hash == last_hash: skip extraction, report unchanged + | 4. Else: extract_proxies() -> list of (addr, proto, confidence) + | 5. Queue extracted proxies for testing + | +``` + +### Phase 3: URL Feedback + +Worker reports fetch results for each URL back to master. + +``` +Worker Master + | | + | POST /api/report-urls | + | {reports: [...]} | + |------------------------------>| + | | Update uris table: + | | check_time, error, stale_count, + | | retrievals, proxies_added, + | | content_hash, worker_scores + | {ok: true} | + |<------------------------------| +``` + +**URL report payload:** +```json +{ + "reports": [ + { + "url": "https://...", + "success": true, + "content_hash": "a1b2c3d4...", + "proxy_count": 1523, + "fetch_time_ms": 2340, + "changed": true, + "error": null + }, + { + "url": "https://...", + "success": false, + "content_hash": null, + "proxy_count": 0, + "fetch_time_ms": 0, + "changed": false, + "error": "timeout" + } + ] +} +``` + +### Phase 4: Proxy Testing and Reporting + +Worker tests extracted proxies locally using the existing `TargetTestJob` +pipeline. **Only working proxies are reported to master.** Failed proxies +are discarded silently -- no point wasting bandwidth on negatives. + +Workers are trusted. If a worker says a proxy works, master accepts it. + +``` +Worker Master + | | + | Test proxies locally | + | (same TargetTestJob flow) | + | Discard failures | + | | + | POST /api/report-proxies | + | {proxies: [...]} | (working only) + |------------------------------>| + | | Upsert into proxylist: + | | INSERT OR REPLACE + | | Set failed=0, update last_seen + | {ok: true} | + |<------------------------------| +``` + +**Proxy report payload (working only):** +```json +{ + "proxies": [ + { + "ip": "1.2.3.4", + "port": 8080, + "proto": "socks5", + "source_proto": "socks5", + "latency": 1.234, + "exit_ip": "5.6.7.8", + "anonymity": "elite", + "source_url": "https://..." + } + ] +} +``` + +No `working` field needed -- everything in the report is working by +definition. The `source_url` links proxy provenance to the URL that +yielded it, enabling URL quality scoring. + +### Complete Cycle + +``` +Worker main loop: + 1. GET /api/claim-urls Claim batch of URLs + 2. For each URL: + a. Fetch through Tor + b. Extract proxies (or skip if unchanged) + c. Test extracted proxies + 3. POST /api/report-urls Report URL health + 4. POST /api/report-proxies Report proxy results + 5. POST /api/heartbeat Health check + 6. Sleep, repeat +``` + +## Master-Side Changes + +### URL Scheduling + +Current `Leechered` threads fetch URLs on a timer based on error/stale +count. Replace with a scoring system that workers consume. + +**URL score** (higher = fetch sooner): + +``` +score = base_score + + freshness_bonus # High-frequency sources score higher + - error_penalty # Consecutive errors reduce score + - stale_penalty # Unchanged content reduces score + + yield_bonus # URLs that produce many proxies score higher + + quality_bonus # URLs whose proxies actually work score higher +``` + +Concrete formula: + +```python +def url_score(url_row): + age = now - url_row.check_time + base = age / url_row.check_interval # 1.0 when due + + # Yield: proxies found per fetch (rolling average) + yield_rate = url_row.proxies_added / max(url_row.retrievals, 1) + yield_bonus = min(yield_rate / 100.0, 1.0) # Cap at 1.0 + + # Quality: what % of extracted proxies actually worked + quality_bonus = url_row.working_ratio * 0.5 # 0.0 to 0.5 + + # Penalties + error_penalty = min(url_row.error * 0.3, 2.0) + stale_penalty = min(url_row.stale_count * 0.1, 1.0) + + return base + yield_bonus + quality_bonus - error_penalty - stale_penalty +``` + +URLs with `score >= 1.0` are due for fetching. Claimed URLs are locked +in memory for `claim_timeout` seconds (existing pattern). + +### New uris Columns + +```sql +ALTER TABLE uris ADD COLUMN check_interval INT DEFAULT 3600; +ALTER TABLE uris ADD COLUMN working_ratio REAL DEFAULT 0.0; +ALTER TABLE uris ADD COLUMN avg_fetch_time INT DEFAULT 0; +ALTER TABLE uris ADD COLUMN last_worker TEXT; +ALTER TABLE uris ADD COLUMN yield_rate REAL DEFAULT 0.0; +``` + +- `check_interval` -- Adaptive: decreases for high-yield URLs, increases + for stale/erroring ones. Replaces the `checktime + error * perfail` + formula with a persisted value. +- `working_ratio` -- EMA of (working_proxies / total_proxies) from worker + feedback. URLs that yield dead proxies get deprioritized. +- `avg_fetch_time` -- EMA of fetch duration in ms. Helps identify slow + sources. +- `last_worker` -- Which worker last fetched this URL. Useful for + debugging, and to distribute URLs across workers evenly. +- `yield_rate` -- EMA of proxies extracted per fetch. + +### Proxy Aggregation + +Trust model: **workers are trusted.** If any worker reports a proxy as +working, master accepts it. Failed proxies are never reported -- workers +discard them locally. + +``` +Worker A reports: 1.2.3.4:8080 working, latency 1.2s +Worker B reports: 1.2.3.4:8080 working, latency 1.5s + +Master action: + - INSERT OR REPLACE with latest report + - Update last_seen, latency EMA + - Set failed = 0 +``` + +No consensus, no voting, no trust scoring. A proxy lives as long as at +least one worker keeps confirming it. It dies when nobody reports it for +`proxy_ttl` seconds. + +New `proxylist` column: + +```sql +ALTER TABLE proxylist ADD COLUMN last_seen INT DEFAULT 0; +``` + +- `last_seen` -- Unix timestamp of most recent "working" report. Proxies + not seen in N hours are expired by the master's periodic cleanup. + +### Proxy Expiry + +Working proxies that haven't been reported by any worker within +`proxy_ttl` (default: 4 hours) are marked stale and re-queued for +testing. After `proxy_ttl * 3` with no reports, they're marked failed. + +```python +def expire_stale_proxies(db, proxy_ttl): + cutoff_stale = now - proxy_ttl + cutoff_dead = now - (proxy_ttl * 3) + + # Mark stale proxies for retesting + db.execute(''' + UPDATE proxylist SET failed = 1 + WHERE failed = 0 AND last_seen < ? AND last_seen > 0 + ''', (cutoff_stale,)) + + # Kill proxies not seen in a long time + db.execute(''' + UPDATE proxylist SET failed = -1 + WHERE failed > 0 AND last_seen < ? AND last_seen > 0 + ''', (cutoff_dead,)) +``` + +## Worker-Side Changes + +### New Worker Loop + +Replace the current claim-test-report loop with a two-phase loop: + +```python +def worker_main_v2(config): + register() + verify_tor() + + while True: + # Phase 1: Fetch URLs and extract proxies + urls = claim_urls(server, key, count=5) + url_reports = [] + proxy_batch = [] + + for url_info in urls: + report, proxies = fetch_and_extract(url_info) + url_reports.append(report) + proxy_batch.extend(proxies) + + report_urls(server, key, url_reports) + + # Phase 2: Test extracted proxies, report working only + if proxy_batch: + working = test_proxies(proxy_batch) + if working: + report_proxies(server, key, working) + + heartbeat(server, key) + sleep(1) +``` + +### fetch_and_extract() + +New function that combines fetching + extraction on the worker side: + +```python +def fetch_and_extract(url_info): + url = url_info['url'] + last_hash = url_info.get('last_hash') + proto_hint = url_info.get('proto_hint') + + start = time.time() + try: + content = fetch_contents(url, head=False, proxy=tor_proxy) + except Exception as e: + return {'url': url, 'success': False, 'error': str(e)}, [] + + elapsed = int((time.time() - start) * 1000) + content_hash = hashlib.md5(content).hexdigest() + + if content_hash == last_hash: + return { + 'url': url, 'success': True, 'content_hash': content_hash, + 'proxy_count': 0, 'fetch_time_ms': elapsed, + 'changed': False, 'error': None + }, [] + + proxies = extract_proxies(content, url) + return { + 'url': url, 'success': True, 'content_hash': content_hash, + 'proxy_count': len(proxies), 'fetch_time_ms': elapsed, + 'changed': True, 'error': None + }, proxies +``` + +### Deduplication + +Workers may extract the same proxies from different URLs. Local +deduplication before testing: + +```python +seen = set() +unique = [] +for addr, proto, confidence in proxy_batch: + if addr not in seen: + seen.add(addr) + unique.append((addr, proto, confidence)) +proxy_batch = unique +``` + +### Proxy Testing + +Reuse the existing `TargetTestJob` / `WorkerThread` pipeline. The only +change: proxies come from local extraction instead of master's claim +response. The test loop, result collection, and evaluation logic remain +identical. + +## API Changes Summary + +### New Endpoints + +| Endpoint | Method | Purpose | +|----------|--------|---------| +| `/api/claim-urls` | GET | Worker claims batch of due URLs | +| `/api/report-urls` | POST | Worker reports URL fetch results | +| `/api/report-proxies` | POST | Worker reports proxy test results | + +### Modified Endpoints + +| Endpoint | Change | +|----------|--------| +| `/api/work` | Deprecated but kept for backward compatibility | +| `/api/results` | Deprecated but kept for backward compatibility | + +### Unchanged Endpoints + +| Endpoint | Reason | +|----------|--------| +| `/api/register` | Same registration flow | +| `/api/heartbeat` | Same health reporting | +| `/dashboard` | Still reads from same DB | +| `/proxies` | Still reads from proxylist | + +## Schema Changes + +### uris table additions + +```sql +ALTER TABLE uris ADD COLUMN check_interval INT DEFAULT 3600; +ALTER TABLE uris ADD COLUMN working_ratio REAL DEFAULT 0.0; +ALTER TABLE uris ADD COLUMN avg_fetch_time INT DEFAULT 0; +ALTER TABLE uris ADD COLUMN last_worker TEXT; +ALTER TABLE uris ADD COLUMN yield_rate REAL DEFAULT 0.0; +``` + +### proxylist table additions + +```sql +ALTER TABLE proxylist ADD COLUMN last_seen INT DEFAULT 0; +``` + +## Migration Strategy + +### Phase 1: Add New Endpoints (non-breaking) + +Add `/api/claim-urls`, `/api/report-urls`, `/api/report-proxies` to +`httpd.py`. Keep all existing endpoints working. Master still runs its +own `Leechered` threads. + +Files: `httpd.py`, `dbs.py` (migrations) + +### Phase 2: Worker V2 Mode + +Add `--worker-v2` flag to `ppf.py`. When set, worker uses the new +URL-claiming loop instead of the proxy-claiming loop. Both modes coexist. + +Old workers (`--worker`) continue working against `/api/work` and +`/api/results`. New workers (`--worker-v2`) use the new endpoints. + +Files: `ppf.py`, `config.py` + +### Phase 3: URL Scoring + +Implement URL scoring in master based on worker feedback. Replace +`Leechered` timer-based scheduling with score-based scheduling. Master's +own fetching becomes a fallback for URLs no worker has claimed recently. + +Files: `httpd.py`, `dbs.py` + +### Phase 4: Remove Legacy + +Once all workers run V2, remove `/api/work`, `/api/results`, and +master-side `Leechered` threads. Master no longer fetches proxy lists +directly. + +Files: `ppf.py`, `httpd.py` + +## Configuration + +### New config.ini Options + +```ini +[worker] +# V2 mode: worker fetches URLs instead of proxy batches +mode = v2 # v1 (legacy) or v2 (url-driven) +url_batch_size = 5 # URLs per claim cycle +max_proxies_per_cycle = 500 # Cap on proxies tested per cycle +fetch_timeout = 30 # Timeout for URL fetching (seconds) + +[ppf] +# URL scoring weights +score_yield_weight = 1.0 +score_quality_weight = 0.5 +score_error_penalty = 0.3 +score_stale_penalty = 0.1 + +# Proxy expiry +proxy_ttl = 14400 # Seconds before unseen proxy goes stale (4h) +proxy_ttl_dead = 43200 # Seconds before unseen proxy is killed (12h) + +# Fallback: master fetches URLs not claimed by any worker +fallback_fetch = true +fallback_interval = 7200 # Seconds before master fetches unclaimed URL +``` + +## Risks and Mitigations + +| Risk | Impact | Mitigation | +|------|--------|------------| +| Workers extract different proxy counts from same URL | Inconsistent proxy_count in reports | Use content_hash for dedup; only update yield_rate when hash changes | +| Tor exit blocks a source for one worker | Worker reports error for a working URL | Require 2+ consecutive errors before incrementing URL error count | +| Workers test same proxies redundantly | Wasted CPU | Master tracks which URLs are assigned to which workers; avoid assigning same URL to multiple workers in same cycle | +| Large proxy lists overwhelm worker memory | OOM on worker | Cap `max_proxies_per_cycle`; worker discards excess after dedup | +| Master restart loses claim state | Workers refetch recently-fetched URLs | Harmless -- just a redundant fetch. content_hash prevents duplicate work | +| `fetch.py` imports unavailable on worker image | ImportError | Verify worker Dockerfile includes fetch.py and dependencies | + +## What Stays the Same + +- `rocksock.py` -- No changes to proxy chain logic +- `connection_pool.py` -- Tor host selection unchanged +- `proxywatchd.py` core -- `TargetTestJob`, `WorkerThread`, `ProxyTestState` + remain identical. Only the job source changes. +- `fetch.py` -- Used on workers now, but the code itself doesn't change +- `httpd.py` dashboard/proxies -- Still reads from same `proxylist` table +- SQLite as storage -- No database engine change + +## Open Questions + +1. **Should workers share extracted proxy lists with each other?** Peer + exchange would reduce redundant fetching but adds protocol complexity. + Recommendation: no, keep it simple. Master deduplicates via + `INSERT OR REPLACE`. + +2. **Should URL claiming be weighted by worker geography?** Some sources + may be accessible from certain Tor exits but not others. + Recommendation: defer. Let natural retries handle this; track + per-worker URL success rates for future optimization. + +3. **What's the right `proxy_ttl`?** Too short and we churn proxies + needlessly. Too long and we serve stale data. Start with 4 hours, + tune based on observed proxy lifetime distribution.