diff --git a/CLAUDE.md b/CLAUDE.md index 091c629..a1e4987 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -7,9 +7,9 @@ │ Host │ Role │ Notes ├──────────┼─────────────┼────────────────────────────────────────────────────────┤ │ odin │ Master │ Scrapes proxy lists, verifies conflicts, port 8081 -│ forge │ Worker │ Tests proxies, reports to master via WireGuard -│ hermes │ Worker │ Tests proxies, reports to master via WireGuard -│ janus │ Worker │ Tests proxies, reports to master via WireGuard +│ cassius │ Worker │ Tests proxies, reports to master via WireGuard +│ edge │ Worker │ Tests proxies, reports to master via WireGuard +│ sentinel │ Worker │ Tests proxies, reports to master via WireGuard └──────────┴─────────────┴────────────────────────────────────────────────────────┘ ``` @@ -43,20 +43,21 @@ cd /opt/ansible && source venv/bin/activate ```bash # Check worker status -ANSIBLE_REMOTE_TMP=/tmp/.ansible ansible forge,hermes,janus -m shell -a "hostname" +ANSIBLE_REMOTE_TMP=/tmp/.ansible ansible cassius,edge,sentinel -m shell -a "hostname" # Check worker config -ANSIBLE_REMOTE_TMP=/tmp/.ansible ansible forge,hermes,janus -m shell -a "grep -E 'threads|timeout|ssl' /home/podman/ppf/config.ini" +ANSIBLE_REMOTE_TMP=/tmp/.ansible ansible cassius,edge,sentinel -m shell -a "grep -E 'threads|timeout|ssl' /home/podman/ppf/config.ini" -# Check worker logs -ANSIBLE_REMOTE_TMP=/tmp/.ansible ansible forge -m shell -a "sudo -u podman journalctl --user -u ppf-worker -n 20" +# Check worker logs (dynamic UID) +ANSIBLE_REMOTE_TMP=/tmp/.ansible ansible cassius -m raw \ + -a "uid=\$(id -u podman) && sudo -u podman podman logs --tail 20 ppf-worker" # Modify config option -ANSIBLE_REMOTE_TMP=/tmp/.ansible ansible forge,hermes,janus -m lineinfile -a "path=/home/podman/ppf/config.ini line='ssl_only = 1' insertafter='ssl_first'" +ANSIBLE_REMOTE_TMP=/tmp/.ansible ansible cassius,edge,sentinel -m lineinfile -a "path=/home/podman/ppf/config.ini line='ssl_only = 1' insertafter='ssl_first'" -# Restart workers (different UIDs!) -ANSIBLE_REMOTE_TMP=/tmp/.ansible ansible janus,forge -m raw -a "sudo -u podman XDG_RUNTIME_DIR=/run/user/996 systemctl --user restart ppf-worker" -ANSIBLE_REMOTE_TMP=/tmp/.ansible ansible hermes -m raw -a "sudo -u podman XDG_RUNTIME_DIR=/run/user/1001 systemctl --user restart ppf-worker" +# Restart workers (dynamic UID discovery) +ANSIBLE_REMOTE_TMP=/tmp/.ansible ansible cassius,edge,sentinel -m raw \ + -a "uid=\$(id -u podman) && sudo -u podman XDG_RUNTIME_DIR=/run/user/\$uid podman restart ppf-worker" ``` ## Full Deployment Procedure @@ -78,11 +79,11 @@ ANSIBLE_REMOTE_TMP=/tmp/.ansible ansible odin -m synchronize \ -a "src=/home/user/git/ppf/ dest=/home/podman/ppf/ rsync_opts='--include=*.py,--include=servers.txt,--exclude=*'" # Deploy to WORKERS (ppf/src/ subdirectory) -ANSIBLE_REMOTE_TMP=/tmp/.ansible ansible forge,hermes,janus -m synchronize \ +ANSIBLE_REMOTE_TMP=/tmp/.ansible ansible cassius,edge,sentinel -m synchronize \ -a "src=/home/user/git/ppf/ dest=/home/podman/ppf/src/ rsync_opts='--include=*.py,--include=servers.txt,--exclude=*'" # CRITICAL: Fix ownership on ALL hosts (rsync uses ansible user, containers need podman) -ANSIBLE_REMOTE_TMP=/tmp/.ansible ansible odin,forge,hermes,janus -m raw \ +ANSIBLE_REMOTE_TMP=/tmp/.ansible ansible odin,cassius,edge,sentinel -m raw \ -a "chown -R podman:podman /home/podman/ppf/" ``` @@ -95,11 +96,9 @@ ANSIBLE_REMOTE_TMP=/tmp/.ansible ansible odin,forge,hermes,janus -m raw \ ansible odin -m raw \ -a "cd /tmp && XDG_RUNTIME_DIR=/run/user/1005 runuser -u podman -- podman restart ppf" -# Restart WORKERS (note different UIDs) -ansible janus,forge -m raw \ - -a "sudo -u podman XDG_RUNTIME_DIR=/run/user/996 systemctl --user restart ppf-worker" -ansible hermes -m raw \ - -a "sudo -u podman XDG_RUNTIME_DIR=/run/user/1001 systemctl --user restart ppf-worker" +# Restart WORKERS (dynamic UID discovery) +ansible cassius,edge,sentinel -m raw \ + -a "uid=\$(id -u podman) && sudo -u podman XDG_RUNTIME_DIR=/run/user/\$uid podman restart ppf-worker" ``` ### Step 4: Verify All Running @@ -109,11 +108,9 @@ ansible hermes -m raw \ ansible odin -m raw \ -a "cd /tmp && XDG_RUNTIME_DIR=/run/user/1005 runuser -u podman -- podman ps" -# Check workers -ansible janus,forge -m raw \ - -a "sudo -u podman XDG_RUNTIME_DIR=/run/user/996 systemctl --user is-active ppf-worker" -ansible hermes -m raw \ - -a "sudo -u podman XDG_RUNTIME_DIR=/run/user/1001 systemctl --user is-active ppf-worker" +# Check workers (dynamic UID discovery) +ansible cassius,edge,sentinel -m raw \ + -a "uid=\$(id -u podman) && sudo -u podman XDG_RUNTIME_DIR=/run/user/\$uid podman ps --format '{{.Names}} {{.Status}}'" ``` ## Podman User IDs @@ -123,12 +120,14 @@ ansible hermes -m raw \ │ Host │ UID │ XDG_RUNTIME_DIR ├──────────┼───────┼─────────────────────────────┤ │ odin │ 1005 │ /run/user/1005 -│ hermes │ 1001 │ /run/user/1001 -│ janus │ 996 │ /run/user/996 -│ forge │ 996 │ /run/user/996 +│ cassius │ 993 │ /run/user/993 +│ edge │ 993 │ /run/user/993 +│ sentinel │ 992 │ /run/user/992 └──────────┴───────┴─────────────────────────────┘ ``` +**Prefer dynamic UID discovery** (`uid=$(id -u podman)`) over hardcoded values. + ## Configuration ### Odin config.ini @@ -186,41 +185,26 @@ batch_size = clamp(fair_share, min=100, max=1000) - Workers shuffle their batch locally to avoid testing same proxies simultaneously - Claims expire after 5 minutes if not completed -## Worker systemd Unit +## Worker Container -Located at `/home/podman/.config/systemd/user/ppf-worker.service`: +Workers run as podman containers with `--restart=unless-stopped`: -```ini -[Unit] -Description=PPF Worker Container -After=network-online.target tor.service - -[Service] -Type=simple -Restart=on-failure -RestartSec=10 -WorkingDirectory=%h -ExecStartPre=-/usr/bin/podman stop -t 10 ppf-worker -ExecStartPre=-/usr/bin/podman rm -f ppf-worker -ExecStart=/usr/bin/podman run \ - --name ppf-worker --rm --log-driver=journald --network=host \ - -v %h/ppf/src:/app:ro \ - -v %h/ppf/data:/app/data \ - -v %h/ppf/config.ini:/app/config.ini:ro \ - -e PYTHONUNBUFFERED=1 \ - localhost/ppf-worker:latest \ - python -u ppf.py --worker --server http://10.200.1.250:8081 -ExecStop=/usr/bin/podman stop -t 10 ppf-worker - -[Install] -WantedBy=default.target +```bash +podman run -d --name ppf-worker --network=host --restart=unless-stopped \ + -e PYTHONUNBUFFERED=1 \ + -v /home/podman/ppf/src:/app:ro,Z \ + -v /home/podman/ppf/data:/app/data:Z \ + -v /home/podman/ppf/config.ini:/app/config.ini:ro,Z \ + -v /home/podman/ppf/servers.txt:/app/servers.txt:ro,Z \ + localhost/ppf-worker:latest \ + python -u ppf.py --worker --server http://10.200.1.250:8081 ``` ## Rebuilding Images ```bash # Workers - from ppf/ directory (Dockerfile copies from src/) -ansible forge,hermes,janus -m raw \ +ansible cassius,edge,sentinel -m raw \ -a "cd /home/podman/ppf && sudo -u podman podman build -t localhost/ppf-worker:latest ." # Odin - from ppf/ directory @@ -231,14 +215,16 @@ ansible odin -m raw \ ## API Endpoints ``` -/dashboard Web UI with live statistics -/map Interactive world map -/health Health check -/api/stats Runtime statistics (JSON) -/api/workers Connected worker status -/api/memory Memory profiling data -/api/countries Proxy counts by country -/proxies Working proxies list +/dashboard Web UI with live statistics +/map Interactive world map +/health Health check +/api/stats Runtime statistics (JSON) +/api/workers Connected worker status +/api/countries Proxy counts by country +/api/claim-urls Claim URL batch for worker-driven fetching (GET) +/api/report-urls Report URL fetch results (POST) +/api/report-proxies Report working proxies (POST) +/proxies Working proxies list ``` ## Troubleshooting @@ -247,7 +233,7 @@ ansible odin -m raw \ Workers need `servers.txt` in src/: ```bash -ansible forge,hermes,janus -m copy \ +ansible cassius,edge,sentinel -m copy \ -a "src=/home/user/git/ppf/servers.txt dest=/home/podman/ppf/src/servers.txt owner=podman group=podman" ``` @@ -270,15 +256,17 @@ ansible odin -m raw -a "cd /tmp; sudo -u podman podman restart ppf" ### Worker Keeps Crashing -1. Check systemd status with correct UID -2. Verify servers.txt exists in src/ -3. Check ownership -4. Run manually to see error: +1. Check container status: `sudo -u podman podman ps -a` +2. Check logs: `sudo -u podman podman logs --tail 50 ppf-worker` +3. Verify servers.txt exists in src/ +4. Check ownership: `ls -la /home/podman/ppf/src/` +5. Run manually to see error: ```bash sudo -u podman podman run --rm --network=host \ - -v /home/podman/ppf/src:/app:ro \ - -v /home/podman/ppf/data:/app/data \ - -v /home/podman/ppf/config.ini:/app/config.ini:ro \ + -v /home/podman/ppf/src:/app:ro,Z \ + -v /home/podman/ppf/data:/app/data:Z \ + -v /home/podman/ppf/config.ini:/app/config.ini:ro,Z \ + -v /home/podman/ppf/servers.txt:/app/servers.txt:ro,Z \ localhost/ppf-worker:latest \ python -u ppf.py --worker --server http://10.200.1.250:8081 ``` diff --git a/ROADMAP.md b/ROADMAP.md index fd95607..c058549 100644 --- a/ROADMAP.md +++ b/ROADMAP.md @@ -45,83 +45,15 @@ PPF (Proxy Fetcher) is a Python 2 proxy scraping and validation framework design --- -## Phase 1: Stability & Code Quality +## Open Work -**Objective:** Establish a solid, maintainable codebase - -### 1.1 Error Handling Improvements +### Validation | Task | Description | File(s) | |------|-------------|---------| -| Add connection retry logic | Implement exponential backoff for failed connections | rocksock.py, fetch.py | -| Graceful database errors | Handle SQLite lock/busy errors with retry | mysqlite.py | -| Timeout standardization | Consistent timeout handling across all network ops | proxywatchd.py, fetch.py | -| Exception logging | Log exceptions with context, not just silently catch | all files | - -### 1.2 Code Consolidation - -| Task | Description | File(s) | -|------|-------------|---------| -| Unify _known_proxies | Single source of truth for known proxy cache | ppf.py, fetch.py | -| Extract proxy utils | Create proxy_utils.py with cleanse/validate functions | new file | -| Remove global config pattern | Pass config explicitly instead of set_config() | fetch.py | -| Standardize logging | Consistent _log() usage with levels across all modules | all files | - -### 1.3 Testing Infrastructure - -| Task | Description | File(s) | -|------|-------------|---------| -| Add unit tests | Test proxy parsing, URL extraction, IP validation | tests/ | -| Mock network layer | Allow testing without live network/Tor | tests/ | -| Validation test suite | Verify multi-target voting logic | tests/ | - ---- - -## Phase 2: Performance Optimization - -**Objective:** Improve throughput and resource efficiency - -### 2.1 Connection Pooling - -| Task | Description | File(s) | -|------|-------------|---------| -| Tor connection reuse | Pool Tor SOCKS connections instead of reconnecting | proxywatchd.py | -| HTTP keep-alive | Reuse connections to same target servers | http2.py | -| Connection warm-up | Pre-establish connections before job assignment | proxywatchd.py | - -### 2.2 Database Optimization - -| Task | Description | File(s) | -|------|-------------|---------| -| Batch inserts | Group INSERT operations (already partial) | dbs.py | -| Index optimization | Add indexes for frequent query patterns | dbs.py | -| WAL mode | Enable Write-Ahead Logging for better concurrency | mysqlite.py | -| Prepared statements | Cache compiled SQL statements | mysqlite.py | - -### 2.3 Threading Improvements - -| Task | Description | File(s) | -|------|-------------|---------| -| Dynamic thread scaling | Adjust thread count based on success rate | proxywatchd.py | -| Priority queue | Test high-value proxies (low fail count) first | proxywatchd.py | -| Stale proxy cleanup | Background thread to remove long-dead proxies | proxywatchd.py | - ---- - -## Phase 3: Reliability & Accuracy - -**Objective:** Improve proxy validation accuracy and system reliability - -### 3.1 Enhanced Validation - -| Task | Description | File(s) | -|------|-------------|---------| -| Latency tracking | Store and use connection latency metrics | proxywatchd.py, dbs.py | -| Geographic validation | Verify proxy actually routes through claimed location | proxywatchd.py | | Protocol fingerprinting | Better SOCKS4/SOCKS5/HTTP detection | rocksock.py | -| HTTPS/SSL testing | Validate SSL proxy capabilities | proxywatchd.py | -### 3.2 Target Management +### Target Management | Task | Description | File(s) | |------|-------------|---------| @@ -129,283 +61,31 @@ PPF (Proxy Fetcher) is a Python 2 proxy scraping and validation framework design | Target health tracking | Remove unresponsive targets from pool | proxywatchd.py | | Geographic target spread | Ensure targets span multiple regions | config.py | -### 3.3 Failure Analysis - -| Task | Description | File(s) | -|------|-------------|---------| -| Failure categorization | Distinguish timeout vs refused vs auth-fail | proxywatchd.py | -| Retry strategies | Different retry logic per failure type | proxywatchd.py | -| Dead proxy quarantine | Separate storage for likely-dead proxies | dbs.py | - ---- - -## Phase 4: Features & Usability - -**Objective:** Add useful features while maintaining simplicity - -### 4.1 Reporting & Monitoring - -| Task | Description | File(s) | -|------|-------------|---------| -| Statistics collection | Track success rates, throughput, latency | proxywatchd.py | -| Periodic status output | Log summary stats every N minutes | ppf.py, proxywatchd.py | -| Export functionality | Export working proxies to file (txt, json) | new: export.py | - -### 4.2 Configuration - -| Task | Description | File(s) | -|------|-------------|---------| -| Config validation | Validate config.ini on startup | config.py | -| Runtime reconfiguration | Reload config without restart (SIGHUP) | proxywatchd.py | -| Sensible defaults | Document and improve default values | config.py | - -### 4.3 Proxy Source Expansion - -| Task | Description | File(s) | -|------|-------------|---------| -| Additional scrapers | Support more search engines beyond Searx | scraper.py | -| API sources | Integrate free proxy API endpoints | new: api_sources.py | -| Import formats | Support various proxy list formats | ppf.py | - ---- - -## Implementation Priority - -``` -┌─────────────────────────────────────────────────────────────────────────────┐ -│ Priority Matrix │ -├──────────────────────────┬──────────────────────────────────────────────────┤ -│ HIGH IMPACT / LOW EFFORT │ HIGH IMPACT / HIGH EFFORT │ -│ │ │ -│ [x] Unify _known_proxies │ [x] Connection pooling │ -│ [x] Graceful DB errors │ [x] Dynamic thread scaling │ -│ [x] Batch inserts │ [x] Unit test infrastructure │ -│ [x] WAL mode for SQLite │ [x] Latency tracking │ -│ │ │ -├──────────────────────────┼──────────────────────────────────────────────────┤ -│ LOW IMPACT / LOW EFFORT │ LOW IMPACT / HIGH EFFORT │ -│ │ │ -│ [x] Standardize logging │ [x] Geographic validation │ -│ [x] Config validation │ [x] Additional scrapers (Bing, Yahoo, Mojeek) │ -│ [x] Export functionality │ [ ] API sources │ -│ [x] Status output │ [ ] Protocol fingerprinting │ -│ │ │ -└──────────────────────────┴──────────────────────────────────────────────────┘ -``` - ---- - -## Completed Work - -### Multi-Target Validation (Done) -- [x] Work-stealing queue with shared Queue.Queue() -- [x] Multi-target validation (2/3 majority voting) -- [x] Interleaved testing (jobs shuffled across proxies) -- [x] ProxyTestState and TargetTestJob classes - -### Code Cleanup (Done) -- [x] Removed dead HTTP server code from ppf.py -- [x] Removed dead gumbo code from soup_parser.py -- [x] Removed test code from comboparse.py -- [x] Removed unused functions from misc.py -- [x] Fixed IP/port cleansing in ppf.py extract_proxies() -- [x] Updated .gitignore, removed .pyc files - -### Database Optimization (Done) -- [x] Enable SQLite WAL mode for better concurrency -- [x] Add indexes for common query patterns (failed, tested, proto, error, check_time) -- [x] Optimize batch inserts (remove redundant SELECT before INSERT OR IGNORE) - -### Dependency Reduction (Done) -- [x] Make lxml optional (removed from requirements) -- [x] Make IP2Location optional (graceful fallback) -- [x] Add --nobs flag for stdlib HTMLParser fallback (bs4 optional) - -### Rate Limiting & Stability (Done) -- [x] InstanceTracker class in scraper.py with exponential backoff -- [x] Configurable backoff_base, backoff_max, fail_threshold -- [x] Exception logging with context (replaced bare except blocks) -- [x] Unified _known_proxies cache in fetch.py - -### Monitoring & Maintenance (Done) -- [x] Stats class in proxywatchd.py (tested/passed/failed tracking) -- [x] Periodic stats reporting (configurable stats_interval) -- [x] Stale proxy cleanup (cleanup_stale() with configurable stale_days) -- [x] Timeout config options (timeout_connect, timeout_read) - -### Connection Pooling (Done) -- [x] TorHostState class tracking per-host health and latency -- [x] TorConnectionPool with worker affinity for circuit reuse -- [x] Exponential backoff (5s, 10s, 20s, 40s, max 60s) on failures -- [x] Pool warmup and health status reporting - -### Priority Queue (Done) -- [x] PriorityJobQueue class with heap-based ordering -- [x] calculate_priority() assigns priority 0-4 by proxy state -- [x] New proxies tested first, high-fail proxies last - -### Dynamic Thread Scaling (Done) -- [x] ThreadScaler class adjusts thread count dynamically -- [x] Scales up when queue deep and success rate acceptable -- [x] Scales down when queue shallow or success rate drops -- [x] Respects min/max bounds with cooldown period - -### Latency Tracking (Done) -- [x] avg_latency, latency_samples columns in proxylist -- [x] Exponential moving average calculation -- [x] Migration function for existing databases -- [x] Latency recorded for successful proxy tests - -### Container Support (Done) -- [x] Dockerfile with Python 2.7-slim base -- [x] docker-compose.yml for local development -- [x] Rootless podman deployment documentation -- [x] Volume mounts for persistent data - -### Code Style (Done) -- [x] Normalized indentation (4-space, no tabs) -- [x] Removed dead code and unused imports -- [x] Added docstrings to classes and functions -- [x] Python 2/3 compatible imports (Queue/queue) - -### Geographic Validation (Done) -- [x] IP2Location integration for country lookup -- [x] pyasn integration for ASN lookup -- [x] Graceful fallback when database files missing -- [x] Country codes displayed in test output: `(US)`, `(IN)`, etc. -- [x] Data files: IP2LOCATION-LITE-DB1.BIN, ipasn.dat - -### SSL Proxy Testing (Done) -- [x] Default checktype changed to 'ssl' -- [x] ssl_targets list with major HTTPS sites -- [x] TLS handshake validation with certificate verification -- [x] Detects MITM proxies that intercept SSL connections - -### Export Functionality (Done) -- [x] export.py CLI tool for exporting working proxies -- [x] Multiple formats: txt, json, csv, len (length-prefixed) -- [x] Filters: proto, country, anonymity, max_latency -- [x] Sort options: latency, added, tested, success -- [x] Output to stdout or file - -### Web Dashboard (Done) -- [x] /dashboard endpoint with dark theme HTML UI -- [x] /api/stats endpoint for JSON runtime statistics -- [x] Auto-refresh with JavaScript fetch every 3 seconds -- [x] Stats provider callback from proxywatchd.py to httpd.py -- [x] Displays: tested/passed/success rate, thread count, uptime -- [x] Tor pool health: per-host latency, success rate, availability -- [x] Failure categories breakdown: timeout, proxy, ssl, closed - -### Dashboard Enhancements v2 (Done) -- [x] Prominent check type badge in header (SSL/JUDGES/HTTP/IRC) -- [x] System monitor bar: load, memory, disk, process RSS -- [x] Anonymity breakdown: elite/anonymous/transparent counts -- [x] Database health: size, tested/hour, added/day, dead count -- [x] Enhanced Tor pool stats: requests, success rate, healthy nodes, latency -- [x] SQLite ANALYZE/VACUUM functions for query optimization -- [x] Lightweight design: client-side polling, minimal DOM updates - -### Dashboard Enhancements v3 (Done) -- [x] Electric cyan theme with translucent glass-morphism effects -- [x] Unified wrapper styling (.chart-wrap, .histo-wrap, .stats-wrap, .lb-wrap, .pie-wrap) -- [x] Consistent backdrop-filter blur and electric glow borders -- [x] Tor Exit Nodes cards with hover effects (.tor-card) -- [x] Lighter background/tile color scheme (#1e2738 bg, #181f2a card) -- [x] Map endpoint restyled to match dashboard (electric cyan theme) -- [x] Map markers updated from gold to cyan for approximate locations - -### Memory Profiling & Analysis (Done) -- [x] /api/memory endpoint with comprehensive memory stats -- [x] objgraph integration for object type counting -- [x] pympler integration for memory summaries -- [x] Memory sample history tracking (RSS over time) -- [x] Process memory from /proc/self/status (VmRSS, VmPeak, VmData, etc.) -- [x] GC statistics (collections, objects, thresholds) - -### MITM Detection Optimization (Done) -- [x] MITM re-test skip optimization - avoid redundant SSL checks for known MITM proxies -- [x] mitm_retest_skipped stats counter for tracking optimization effectiveness -- [x] Content hash deduplication for stale proxy list detection -- [x] stale_count reset when content hash changes - -### Distributed Workers (Done) -- [x] Worker registration and heartbeat system -- [x] /api/workers endpoint for worker status monitoring -- [x] Tor connectivity check before workers claim work -- [x] Worker test rate tracking with sliding window calculation -- [x] Combined rate aggregation across all workers -- [x] Dashboard worker cards showing per-worker stats - -### Dashboard Performance (Done) -- [x] Keyboard shortcuts: r=refresh, 1-9=tabs, t=theme, p=pause -- [x] Tab-aware chart rendering - skip expensive renders for hidden tabs -- [x] Visibility API - pause polling when browser tab hidden -- [x] Dark/muted-dark/light theme cycling -- [x] Stats export endpoint (/api/stats/export?format=json|csv) - -### Proxy Validation Cache (Done) -- [x] LRU cache for is_usable_proxy() using OrderedDict -- [x] Thread-safe with lock for concurrent access -- [x] Proper LRU eviction (move_to_end on hits, popitem oldest when full) - -### Database Context Manager (Done) -- [x] Refactored all DB operations to use `_db_context()` context manager -- [x] Connections guaranteed to close even on exceptions -- [x] Removed deprecated `_prep_db()` and `_close_db()` methods -- [x] `fetch_rows()` now accepts db parameter for cleaner dependency injection - -### Additional Search Engines (Done) -- [x] Bing and Yahoo engine implementations in scraper.py -- [x] Engine rotation for rate limit avoidance -- [x] engines.py module with SearchEngine base class - -### Worker Health Improvements (Done) -- [x] Tor connectivity check before workers claim work -- [x] Fixed interval Tor check (30s) instead of exponential backoff -- [x] Graceful handling when Tor unavailable - -### Memory Optimization (Done) -- [x] `__slots__` on ProxyTestState (27 attrs) and TargetTestJob (4 attrs) -- [x] Reduced per-object memory overhead for hot path objects - ---- - -## Technical Debt - -| Item | Description | Risk | -|------|-------------|------| -| ~~Dual _known_proxies~~ | ~~ppf.py and fetch.py maintain separate caches~~ | **Resolved** | -| Global config in fetch.py | set_config() pattern is fragile | Low - works but not clean | -| ~~No input validation~~ | ~~Proxy strings parsed without validation~~ | **Resolved** | -| ~~Silent exception catching~~ | ~~Some except: pass patterns hide errors~~ | **Resolved** | -| ~~Hardcoded timeouts~~ | ~~Various timeout values scattered in code~~ | **Resolved** | - --- ## File Reference -| File | Purpose | Status | -|------|---------|--------| -| ppf.py | Main URL harvester daemon | Active, cleaned | -| proxywatchd.py | Proxy validation daemon | Active, enhanced | -| scraper.py | Searx search integration | Active, cleaned | -| fetch.py | HTTP fetching with proxy support | Active, LRU cache | -| dbs.py | Database schema and inserts | Active | -| mysqlite.py | SQLite wrapper | Active | -| rocksock.py | Socket/proxy abstraction (3rd party) | Stable | -| http2.py | HTTP client implementation | Stable | -| httpd.py | Web dashboard and REST API server | Active, enhanced | -| config.py | Configuration management | Active | -| comboparse.py | Config/arg parser framework | Stable, cleaned | -| soup_parser.py | BeautifulSoup wrapper | Stable, cleaned | -| misc.py | Utilities (timestamp, logging) | Stable, cleaned | -| export.py | Proxy export CLI tool | Active | -| engines.py | Search engine implementations | Active | -| connection_pool.py | Tor connection pooling | Active | -| network_stats.py | Network statistics tracking | Active | -| dns.py | DNS resolution with caching | Active | -| mitm.py | MITM certificate detection | Active | -| job.py | Priority job queue | Active | -| static/dashboard.js | Dashboard frontend logic | Active, enhanced | -| static/dashboard.html | Dashboard HTML template | Active | +| File | Purpose | +|------|---------| +| ppf.py | Main URL harvester daemon | +| proxywatchd.py | Proxy validation daemon | +| scraper.py | Searx search integration | +| fetch.py | HTTP fetching with proxy support | +| dbs.py | Database schema and inserts | +| mysqlite.py | SQLite wrapper | +| rocksock.py | Socket/proxy abstraction (3rd party) | +| http2.py | HTTP client implementation | +| httpd.py | Web dashboard and REST API server | +| config.py | Configuration management | +| comboparse.py | Config/arg parser framework | +| soup_parser.py | BeautifulSoup wrapper | +| misc.py | Utilities (timestamp, logging) | +| export.py | Proxy export CLI tool | +| engines.py | Search engine implementations | +| connection_pool.py | Tor connection pooling | +| network_stats.py | Network statistics tracking | +| dns.py | DNS resolution with caching | +| mitm.py | MITM certificate detection | +| job.py | Priority job queue | +| static/dashboard.js | Dashboard frontend logic | +| static/dashboard.html | Dashboard HTML template | diff --git a/TODO.md b/TODO.md index 2108387..b8ac101 100644 --- a/TODO.md +++ b/TODO.md @@ -1,866 +1,73 @@ -# PPF Implementation Tasks +# PPF TODO -## Legend +## Optimization -``` -[ ] Not started -[~] In progress -[x] Completed -[!] Blocked/needs discussion -``` +### [ ] JSON Stats Response Caching ---- - -## Immediate Priority (Next Sprint) - -### [x] 1. Unify _known_proxies Cache - -**Completed.** Added `init_known_proxies()`, `add_known_proxies()`, `is_known_proxy()` -to fetch.py. Updated ppf.py to use these functions instead of local cache. - ---- - -### [x] 2. Graceful SQLite Error Handling - -**Completed.** mysqlite.py now retries on "locked" errors with exponential backoff. - ---- - -### [x] 3. Enable SQLite WAL Mode - -**Completed.** mysqlite.py enables WAL mode and NORMAL synchronous on init. - ---- - -### [x] 4. Batch Database Inserts - -**Completed.** dbs.py uses executemany() for batch inserts. - ---- - -### [x] 5. Add Database Indexes - -**Completed.** dbs.py creates indexes on failed, tested, proto, error, check_time. - ---- - -## Short Term (This Month) - -### [x] 6. Log Level Filtering - -**Completed.** Added log level filtering with -q/--quiet and -v/--verbose CLI flags. -- misc.py: LOG_LEVELS dict, set_log_level(), get_log_level() -- config.py: Added -q/--quiet and -v/--verbose arguments -- Log levels: debug=0, info=1, warn=2, error=3 -- --quiet: only show warn/error -- --verbose: show debug messages - ---- - -### [x] 7. Connection Timeout Standardization - -**Completed.** Added timeout_connect and timeout_read to [common] section in config.py. - ---- - -### [x] 8. Failure Categorization - -**Completed.** Added failure categorization for proxy errors. -- misc.py: categorize_error() function, FAIL_* constants -- Categories: timeout, refused, auth, unreachable, dns, ssl, closed, proxy, other -- proxywatchd.py: Stats.record() now accepts category parameter -- Stats.report() shows failure breakdown by category -- ProxyTestState.evaluate() returns (success, category) tuple - ---- - -### [x] 9. Priority Queue for Proxy Testing - -**Completed.** Added priority-based job scheduling for proxy tests. -- PriorityJobQueue class with heap-based ordering -- calculate_priority() assigns priority 0-4 based on proxy state -- Priority 0: New proxies (never tested) -- Priority 1: Working proxies (no failures) -- Priority 2: Low fail count (< 3) -- Priority 3-4: Medium/high fail count -- Integrated into prepare_jobs() for automatic prioritization - ---- - -### [x] 10. Periodic Statistics Output - -**Completed.** Added Stats class to proxywatchd.py with record(), should_report(), -and report() methods. Integrated into main loop with configurable stats_interval. - ---- - -## Medium Term (Next Quarter) - -### [x] 11. Tor Connection Pooling - -**Completed.** Added connection pooling with worker-Tor affinity and health monitoring. -- connection_pool.py: TorHostState class tracks per-host health, latency, backoff -- connection_pool.py: TorConnectionPool with worker affinity, warmup, statistics -- proxywatchd.py: Workers get consistent Tor host assignment for circuit reuse -- Success/failure tracking with exponential backoff (5s, 10s, 20s, 40s, max 60s) -- Latency tracking with rolling averages -- Pool status reported alongside periodic stats - ---- - -### [x] 12. Dynamic Thread Scaling - -**Completed.** Added dynamic thread scaling based on queue depth and success rate. -- ThreadScaler class in proxywatchd.py with should_scale(), status_line() -- Scales up when queue is deep (2x target) and success rate > 10% -- Scales down when queue is shallow or success rate drops -- Min/max threads derived from config.watchd.threads (1/4x to 2x) -- 30-second cooldown between scaling decisions -- _spawn_thread(), _remove_thread(), _adjust_threads() helper methods -- Scaler status reported alongside periodic stats - ---- - -### [x] 13. Latency Tracking - -**Completed.** Added per-proxy latency tracking with exponential moving average. -- dbs.py: avg_latency, latency_samples columns added to proxylist schema -- dbs.py: _migrate_latency_columns() for backward-compatible migration -- dbs.py: update_proxy_latency() with EMA (alpha = 2/(samples+1)) -- proxywatchd.py: ProxyTestState.last_latency_ms field -- proxywatchd.py: evaluate() calculates average latency from successful tests -- proxywatchd.py: submit_collected() records latency for passing proxies - ---- - -### [x] 14. Export Functionality - -**Completed.** Added export.py CLI tool for exporting working proxies. -- Formats: txt (default), json, csv, len (length-prefixed) -- Filters: --proto, --country, --anonymity, --max-latency -- Options: --sort (latency, added, tested, success), --limit, --pretty -- Output: stdout or --output file -- Usage: `python export.py --proto http --country US --sort latency --limit 100` - ---- - -### [x] 15. Unit Test Infrastructure - -**Completed.** Added pytest-based test suite with comprehensive coverage. -- tests/conftest.py: Shared fixtures (temp_db, proxy_db, sample_proxies, etc.) -- tests/test_dbs.py: 40 tests for database operations (CDN filtering, latency, stats) -- tests/test_fetch.py: 60 tests for proxy validation (skipped in Python 3) -- tests/test_misc.py: 39 tests for utilities (timestamp, log levels, SSL errors) -- tests/mock_network.py: Network mocking infrastructure - -``` -Test Results: 79 passed, 60 skipped (Python 2 only) -Run with: python3 -m pytest tests/ -v -``` - ---- - -## Long Term (Future) - -### [x] 16. Geographic Validation - -**Completed.** Added IP2Location and pyasn for proxy geolocation. -- requirements.txt: Added IP2Location package -- proxywatchd.py: IP2Location for country lookup, pyasn for ASN lookup -- proxywatchd.py: Fixed ValueError handling when database files missing -- data/: IP2LOCATION-LITE-DB1.BIN (2.7M), ipasn.dat (23M) -- Output shows country codes: `http://1.2.3.4:8080 (US)` or `(IN)`, `(DE)`, etc. - ---- - -### [x] 17. SSL Proxy Testing - -**Completed.** Added SSL checktype for TLS handshake validation. -- config.py: Default checktype changed to 'ssl' -- proxywatchd.py: ssl_targets list with major HTTPS sites -- Validates TLS handshake with certificate verification -- Detects MITM proxies that intercept SSL connections - -### [x] 18. Additional Search Engines - -**Completed.** Added modular search engine architecture. -- engines.py: SearchEngine base class with build_url(), extract_urls(), is_rate_limited() -- Engines: DuckDuckGo, Startpage, Mojeek (UK), Qwant (FR), Yandex (RU), Ecosia, Brave -- Git hosters: GitHub, GitLab, Codeberg, Gitea -- scraper.py: EngineTracker class for multi-engine rate limiting -- Config: [scraper] engines, max_pages settings -- searx.instances: Updated with 51 active SearXNG instances - -### [x] 19. REST API - -**Completed.** Added HTTP API server for querying working proxies. -- httpd.py: ProxyAPIServer class with BaseHTTPServer -- Endpoints: /proxies, /proxies/count, /health -- Params: limit, proto, country, format (json/plain) -- Integrated into proxywatchd.py (starts when httpd.enabled=True) -- Config: [httpd] section with listenip, port, enabled - -### [x] 20. Web Dashboard - -**Completed.** Added web dashboard with live statistics. -- httpd.py: DASHBOARD_HTML template with dark theme UI -- Endpoint: /dashboard (HTML page with auto-refresh) -- Endpoint: /api/stats (JSON runtime statistics) -- Stats include: tested/passed counts, success rate, thread count, uptime -- Tor pool health: per-host latency, success rate, availability -- Failure categories: timeout, proxy, ssl, closed, etc. -- proxywatchd.py: get_runtime_stats() method provides stats callback - -### [x] 21. Dashboard Enhancements (v2) - -**Completed.** Major dashboard improvements for better visibility. -- Prominent check type badge in header (SSL/JUDGES/HTTP/IRC with color coding) -- System monitor bar: load average, memory usage, disk usage, process RSS -- Anonymity breakdown: elite/anonymous/transparent proxy counts -- Database health indicators: size, tested/hour, added/day, dead count -- Enhanced Tor pool: total requests, success rate, healthy nodes, avg latency -- SQLite ANALYZE/VACUUM functions for query optimization (dbs.py) -- Database statistics API (get_database_stats()) - -### [x] 22. Completion Queue Optimization - -**Completed.** Eliminated polling bottleneck in proxy test collection. -- Added `completion_queue` for event-driven state signaling -- `ProxyTestState.record_result()` signals when all targets complete -- `collect_work()` drains queue instead of polling all pending states -- Changed `pending_states` from list to dict for O(1) removal -- Result: `is_complete()` eliminated from hot path, `collect_work()` 54x faster - ---- - -### [x] 23. Batch API Endpoint - -**Completed.** Added `/api/dashboard` batch endpoint combining stats, workers, and countries. - -**Implementation:** -- `httpd.py`: New `/api/dashboard` endpoint returns combined data in single response -- `httpd.py`: Refactored `/api/workers` to use `_get_workers_data()` helper method -- `dashboard.js`: Updated `fetchStats()` to use batch endpoint instead of multiple calls - -**Response Structure:** -```json -{ - "stats": { /* same as /api/stats */ }, - "workers": { /* same as /api/workers */ }, - "countries": { /* same as /api/countries */ } -} -``` - -**Benefit:** -- Reduces dashboard polling from 2 HTTP requests to 1 per poll cycle -- Lower RTT impact over SSH tunnels and high-latency connections -- Single database connection serves all data - ---- - -## Profiling-Based Performance Optimizations - -**Baseline:** 30-minute profiling session, 25.6M function calls, 1842s runtime - -The following optimizations were identified through cProfile analysis. Each is -assessed for real-world impact based on measured data. - -### [x] 1. SQLite Query Batching - -**Completed.** Added batch update functions and optimized submit_collected(). - -**Implementation:** -- `batch_update_proxy_latency()`: Single SELECT with IN clause, compute EMA in Python, - batch UPDATE with executemany() -- `batch_update_proxy_anonymity()`: Batch all anonymity updates in single executemany() -- `submit_collected()`: Uses batch functions instead of per-proxy loops - -**Previous State:** -- 18,182 execute() calls consuming 50.6s (2.7% of runtime) -- Individual UPDATE for each proxy latency and anonymity - -**Improvement:** -- Reduced from N execute() + N commit() to 1 SELECT + 1 executemany() per batch -- Estimated 15-25% reduction in SQLite overhead - ---- - -### [x] 2. Proxy Validation Caching - -**Completed.** Converted is_usable_proxy() cache to proper LRU with OrderedDict. - -**Implementation:** -- fetch.py: Changed _proxy_valid_cache from dict to OrderedDict -- Added thread-safe _proxy_valid_cache_lock -- move_to_end() on cache hits to maintain LRU order -- Evict oldest entries when cache reaches max size (10,000) -- Proper LRU eviction instead of stopping inserts when full - ---- - -### [x] 3. Regex Pattern Pre-compilation - -**Completed.** Pre-compiled proxy extraction pattern at module load. - -**Implementation:** -- `fetch.py`: Added `PROXY_PATTERN = re.compile(r'...')` at module level -- `extract_proxies()`: Changed `re.findall(pattern, ...)` to `PROXY_PATTERN.findall(...)` -- Pattern compiled once at import, not on each call - -**Previous State:** -- `extract_proxies()`: 166 calls, 2.87s total (17.3ms each) -- Pattern recompiled on each call - -**Improvement:** -- Eliminated per-call regex compilation overhead -- Estimated 30-50% reduction in extract_proxies() time - ---- - -### [ ] 4. JSON Stats Response Caching - -**Current State:** -- 1.9M calls to JSON encoder functions -- `_iterencode_dict`: 1.4s, `_iterencode_list`: 0.8s -- Dashboard polls every 3 seconds = 600 requests per 30min -- Most stats data unchanged between requests - -**Proposed Change:** -- Cache serialized JSON response with short TTL (1-2 seconds) +- Cache serialized JSON response with short TTL (1-2s) - Only regenerate when underlying stats change - Use ETag/If-None-Match for client-side caching +- Savings: ~7-9s/hour. Low priority, only matters with frequent dashboard access. -**Assessment:** -``` -Current cost: ~5.5s per 30min (JSON encoding overhead) -Potential saving: 60-80% = 3.3-4.4s per 30min = 6.6-8.8s/hour -Effort: Medium (add caching layer to httpd.py) -Risk: Low (stale stats for 1-2 seconds acceptable) -``` +### [ ] Object Pooling for Test States -**Verdict:** LOW PRIORITY. Only matters with frequent dashboard access. +- Pool ProxyTestState and TargetTestJob, reset and reuse +- Savings: ~11-15s/hour. **Not recommended** - high effort, medium risk, modest gain. + +### [ ] SQLite Connection Reuse + +- Persistent connection per thread with health checks +- Savings: ~0.3s/hour. **Not recommended** - negligible benefit. --- -### [ ] 5. Object Pooling for Test States +## Dashboard -**Current State:** -- `__new__` calls: 43,413 at 10.1s total -- `ProxyTestState.__init__`: 18,150 calls, 0.87s -- `TargetTestJob` creation: similar overhead -- Objects created and discarded each test cycle +### [ ] Performance -**Proposed Change:** -- Implement object pool for ProxyTestState and TargetTestJob -- Reset and reuse objects instead of creating new -- Pool size: 2x thread count +- Cache expensive DB queries (top countries, protocol breakdown) +- Lazy-load historical data (only when scrolled into view) +- WebSocket option for push updates (reduce polling overhead) +- Configurable refresh interval via URL param or localStorage -**Assessment:** -``` -Current cost: ~11s per 30min = 22s/hour = 14.7min/day -Potential saving: 50-70% = 5.5-7.7s per 30min = 11-15s/hour = 7-10min/day -Effort: High (significant refactoring, reset logic needed) -Risk: Medium (state leakage bugs if reset incomplete) -``` +### [ ] Features -**Verdict:** NOT RECOMMENDED. High effort, medium risk, modest gain. -Python's object creation is already optimized. Focus elsewhere. +- Historical graphs (24h, 7d) using stats_history table +- Per-ASN performance analysis +- Alert thresholds (success rate < X%, MITM detected) +- Mobile-responsive improvements --- -### [ ] 6. SQLite Connection Reuse +## Memory -**Current State:** -- 718 connection opens in 30min session -- Each open: 0.26ms (total 0.18s for connects) -- Connection per operation pattern in mysqlite.py - -**Proposed Change:** -- Maintain persistent connection per thread -- Implement connection pool with health checks -- Reuse connections across operations - -**Assessment:** -``` -Current cost: 0.18s per 30min (connection overhead only) -Potential saving: 90% = 0.16s per 30min = 0.32s/hour -Effort: Medium (thread-local storage, lifecycle management) -Risk: Medium (connection state, locking issues) -``` - -**Verdict:** NOT RECOMMENDED. Negligible time savings (0.16s per 30min). -SQLite's lightweight connections don't justify pooling complexity. - ---- - -### Summary: Optimization Priority Matrix - -``` -┌─────────────────────────────────────┬────────┬────────┬─────────┬───────────┐ -│ Optimization │ Effort │ Risk │ Savings │ Status -├─────────────────────────────────────┼────────┼────────┼─────────┼───────────┤ -│ 1. SQLite Query Batching │ Low │ Low │ 20-34s/h│ DONE -│ 2. Proxy Validation Caching │ V.Low │ None │ 5-8s/h │ DONE -│ 3. Regex Pre-compilation │ Low │ None │ 5-8s/h │ DONE -│ 4. JSON Response Caching │ Medium │ Low │ 7-9s/h │ Later -│ 5. Object Pooling │ High │ Medium │ 11-15s/h│ Skip -│ 6. SQLite Connection Reuse │ Medium │ Medium │ 0.3s/h │ Skip -└─────────────────────────────────────┴────────┴────────┴─────────┴───────────┘ - -Completed: 1 (SQLite Batching), 2 (Proxy Caching), 3 (Regex Pre-compilation) -Remaining: 4 (JSON Caching - Later) - -Realized savings from completed optimizations: - Per hour: 25-42 seconds saved - Per day: 10-17 minutes saved - Per week: 1.2-2.0 hours saved - -Note: 68.7% of runtime is socket I/O (recv/send) which cannot be optimized -without changing the fundamental network architecture. The optimizations -above target the remaining 31.3% of CPU-bound operations. -``` - ---- - -## Potential Dashboard Improvements - -### [ ] Dashboard Performance Optimizations - -**Goal:** Ensure dashboard remains lightweight and doesn't impact system performance. - -**Current safeguards:** -- No polling on server side (client-initiated via fetch) -- 3-second refresh interval (configurable) -- Minimal DOM updates (targeted element updates, not full re-render) -- Static CSS/JS (no server-side templating per request) -- No persistent connections (stateless HTTP) - -**Future considerations:** -- [x] Add rate limiting on /api/stats endpoint (300 req/60s sliding window) -- [ ] Cache expensive DB queries (top countries, protocol breakdown) -- [ ] Lazy-load historical data (only when scrolled into view) -- [ ] WebSocket option for push updates (reduce polling overhead) -- [ ] Configurable refresh interval via URL param or localStorage -- [x] Pause polling when browser tab not visible (Page Visibility API) -- [x] Skip chart rendering for inactive dashboard tabs (reduces CPU) -- [ ] Batch API endpoints - combine /api/stats, /api/workers, /api/countries into - single /api/dashboard call to reduce round-trips (helps SSH tunnel latency) - -### [ ] Dashboard Feature Ideas - -**Low priority - consider when time permits:** -- [x] Geographic map visualization - /map endpoint with Leaflet.js -- [x] Dark/light/muted theme toggle - t key cycles themes -- [x] Export stats as CSV/JSON from dashboard (/api/stats/export?format=json|csv) -- [ ] Historical graphs (24h, 7d) using stats_history table -- [ ] Per-ASN performance analysis -- [ ] Alert thresholds (success rate < X%, MITM detected) -- [ ] Mobile-responsive improvements -- [x] Keyboard shortcuts (r=refresh, 1-9=tabs, t=theme, p=pause) - -### [x] Local JS Library Serving - -**Completed.** All JavaScript libraries now served locally from /static/lib/ endpoint. - -**Bundled libraries (static/lib/):** -- Leaflet.js 1.9.4 (leaflet.js, leaflet.css) -- Leaflet.markercluster (MarkerCluster.Default.css) -- Chart.js 4.x (chart.umd.min.js) -- uPlot (uPlot.iife.min.js, uPlot.min.css) - -**Candidate libraries for future enhancements:** - -``` -┌─────────────────┬─────────┬───────────────────────────────────────────────┐ -│ Library │ Size │ Use Case -├─────────────────┼─────────┼───────────────────────────────────────────────┤ -│ Chart.js │ 65 KB │ Line/bar/pie charts (simpler API than D3) -│ uPlot │ 15 KB │ Fast time-series charts (minimal, performant) -│ ApexCharts │ 125 KB │ Modern charts with animations -│ Frappe Charts │ 25 KB │ Simple, modern SVG charts -│ Sparkline │ 2 KB │ Tiny inline charts (already have custom impl) -├─────────────────┼─────────┼───────────────────────────────────────────────┤ -│ D3.js │ 85 KB │ Full control, complex visualizations -│ D3-geo │ 30 KB │ Geographic projections (alternative to Leaflet) -├─────────────────┼─────────┼───────────────────────────────────────────────┤ -│ Leaflet │ 40 KB │ Interactive maps (already using) -│ Leaflet.heat │ 5 KB │ Heatmap layer for proxy density -│ Leaflet.cluster │ 10 KB │ Marker clustering for many points -└─────────────────┴─────────┴───────────────────────────────────────────────┘ - -Recommendations: - ● uPlot - Best for time-series (rate history, success rate history) - ● Chart.js - Best for pie/bar charts (failure breakdown, protocol stats) - ● Leaflet - Keep for maps, add heatmap plugin for density viz -``` - -**Current custom implementations (no library):** -- Sparkline charts (Test Rate History, Success Rate History) - inline SVG -- Histogram bars (Response Time Distribution) - CSS divs -- Pie charts (Failure Breakdown, Protocol Stats) - CSS conic-gradient - -**Decision:** Current custom implementations are lightweight and sufficient. -Add libraries only when custom becomes unmaintainable or new features needed. - -### [ ] Memory Optimization Candidates - -**Based on memory analysis (production metrics):** -``` -Current State (260k queue): - Start RSS: 442 MB - Current RSS: 1,615 MB - Per-job: ~4.5 KB overhead - -Object Distribution: - 259,863 TargetTestJob (1 per job) - 259,863 ProxyTestState (1 per job) - 259,950 LockType (1 per job - threading locks) - 523,395 dict (2 per job - state + metadata) - 522,807 list (2 per job - results + targets) -``` - -**Potential optimizations:** - [ ] Lock consolidation - reduce per-proxy locks (260k LockType objects) - [ ] Leaner state objects - reduce dict/list count per job -- [x] Slot-based classes - use `__slots__` on ProxyTestState (27 attrs), TargetTestJob (4 attrs) -- [ ] Object pooling - reuse ProxyTestState/TargetTestJob objects (not recommended) -**Verdict:** Memory scales linearly with queue (~4.5 KB/job). No leaks detected. -Current usage acceptable for production workloads. Optimize only if memory -becomes a constraint. +Memory scales linearly with queue (~4.5 KB/job). No leaks detected. +Optimize only if memory becomes a constraint. --- -## Completed +## Known Issues -### [x] Work-Stealing Queue -- Implemented shared Queue.Queue() for job distribution -- Workers pull from shared queue instead of pre-assigned lists -- Better utilization across threads +### [!] Podman Container Metadata Disappears -### [x] Multi-Target Validation -- Test each proxy against 3 random targets -- 2/3 majority required for success -- Reduces false negatives from single target failures - -### [x] Interleaved Testing -- Jobs shuffled across all proxies before queueing -- Prevents burst of 3 connections to same proxy -- ProxyTestState accumulates results from TargetTestJobs - -### [x] Code Cleanup -- Removed 93 lines dead HTTP server code (ppf.py) -- Removed dead gumbo parser (soup_parser.py) -- Removed test code (comboparse.py) -- Removed unused functions (misc.py) -- Fixed IP/port cleansing (ppf.py) -- Updated .gitignore - -### [x] Rate Limiting & Instance Tracking (scraper.py) -- InstanceTracker class with exponential backoff -- Configurable backoff_base, backoff_max, fail_threshold -- Instance cycling when rate limited - -### [x] Exception Logging with Context -- Replaced bare `except:` with typed exceptions across all files -- Added context logging to exception handlers (e.g., URL, error message) - -### [x] Timeout Standardization -- Added timeout_connect, timeout_read to [common] config section -- Added stale_days, stats_interval to [watchd] config section - -### [x] Periodic Stats & Stale Cleanup (proxywatchd.py) -- Stats class tracks tested/passed/failed with thread-safe counters -- Configurable stats_interval (default: 300s) -- cleanup_stale() removes dead proxies older than stale_days (default: 30) - -### [x] Unified Proxy Cache -- Moved _known_proxies to fetch.py with helper functions -- init_known_proxies(), add_known_proxies(), is_known_proxy() -- ppf.py now uses shared cache via fetch module - -### [x] Config Validation -- config.py: validate() method checks config values on startup -- Validates: port ranges, timeout values, thread counts, engine names -- Warns on missing source_file, unknown engines -- Errors on unwritable database directories -- Integrated into ppf.py, proxywatchd.py, scraper.py main entry points - -### [x] Profiling Support -- config.py: Added --profile CLI argument -- ppf.py: Refactored main logic into main() function -- ppf.py: cProfile wrapper with stats output to profile.stats -- Prints top 20 functions by cumulative time on exit -- Usage: `python2 ppf.py --profile` - -### [x] SIGTERM Graceful Shutdown -- ppf.py: Added signal handler converting SIGTERM to KeyboardInterrupt -- Ensures profile stats are written before container exit -- Allows clean thread shutdown in containerized environments -- Podman stop now triggers proper cleanup instead of SIGKILL - -### [x] Unicode Exception Handling (Python 2) -- Problem: `repr(e)` on exceptions with unicode content caused encoding errors -- Files affected: ppf.py, scraper.py (3 exception handlers) -- Solution: Check `isinstance(err_msg, unicode)` then encode with 'backslashreplace' -- Pattern applied: - ```python - try: - err_msg = repr(e) - if isinstance(err_msg, unicode): - err_msg = err_msg.encode('ascii', 'backslashreplace') - except: - err_msg = type(e).__name__ - ``` -- Handles Korean/CJK characters in search queries without crashing - -### [x] Interactive World Map (/map endpoint) -- Added Leaflet.js interactive map showing proxy distribution by country -- Modern glassmorphism UI with `backdrop-filter: blur(12px)` -- CartoDB dark tiles for dark theme -- Circle markers sized proportionally to proxy count per country -- Hover effects with smooth transitions -- Stats overlay showing total countries/proxies -- Legend with proxy count scale -- Country coordinates and names lookup tables - -### [x] Dashboard v3 - Electric Cyan Theme -- Translucent glass-morphism effects with `backdrop-filter: blur()` -- Electric cyan glow borders `rgba(56,189,248,...)` on all graph wrappers -- Gradient overlays using `::before` pseudo-elements -- Unified styling across: .chart-wrap, .histo-wrap, .stats-wrap, .lb-wrap, .pie-wrap -- New .tor-card wrapper for Tor Exit Nodes with hover effects -- Lighter background color scheme (#1e2738 bg, #181f2a card) - -### [x] Map Endpoint Styling Update -- Converted from gold/bronze theme (#c8b48c) to electric cyan (#38bdf8) -- Glass panels with electric glow matching dashboard -- Map markers for approximate locations now cyan instead of gold -- Unified map_bg color with dashboard background (#1e2738) -- Updated Leaflet controls, popups, and legend to cyan theme - -### [x] MITM Re-test Optimization -- Skip redundant SSL checks for proxies already known to be MITM -- Added `mitm_retest_skipped` counter to Stats class -- Optimization in `_try_ssl_check()` checks existing MITM flag before testing -- Avoids 6k+ unnecessary re-tests per session (based on production metrics) - -### [x] Memory Profiling Endpoint -- /api/memory endpoint with comprehensive memory analysis -- objgraph integration for object type distribution -- pympler integration for memory summaries -- Memory sample history tracking (RSS over time) -- Process memory from /proc/self/status -- GC statistics and collection counts - -### [x] Database Context Manager Refactoring -- Refactored all DB operations to use `_db_context()` context manager -- `prepare_jobs()`, `submit_collected()`, `_run()` now use `with self._db_context() as db:` -- `fetch_rows()` accepts db parameter for dependency injection -- Removed deprecated `_prep_db()` and `_close_db()` methods -- Connections guaranteed to close even on exceptions +`podman ps -a` shows empty even though process is running. Service functions +correctly despite missing metadata. Monitor via `ss -tlnp`, `ps aux`, or +`curl localhost:8081/health`. Low impact. --- -## Deployment Troubleshooting Log - -### [x] Container Crash on Startup (2024-12-24) - -**Symptoms:** -- Container starts then immediately disappears -- `podman ps` shows no running containers -- `podman logs ppf` returns "no such container" -- Port 8081 not listening - -**Debugging Process:** - -1. **Initial diagnosis** - SSH to odin, checked container state: - ```bash - sudo -u podman podman ps -a # Empty - sudo ss -tlnp | grep 8081 # Nothing listening - ``` - -2. **Ran container in foreground** to capture output: - ```bash - sudo -u podman bash -c 'cd /home/podman/ppf && \ - timeout 25 podman run --rm --name ppf --network=host \ - -v ./src:/app:ro -v ./data:/app/data \ - -v ./config.ini:/app/config.ini:ro \ - localhost/ppf python2 -u proxywatchd.py 2>&1' - ``` - -3. **Found the error** in httpd thread startup: - ``` - error: [Errno 98] Address already in use: ('0.0.0.0', 8081) - ``` - Container started, httpd failed to bind, process continued but HTTP unavailable. - -4. **Identified root cause** - orphaned processes from previous debug attempts: - ```bash - ps aux | grep -E "[p]pf|[p]roxy" - # Found: python2 ppf.py (PID 6421) still running, holding port 8081 - # Found: conmon, timeout, bash processes from stale container - ``` - -5. **Why orphans existed:** - - Previous `timeout 15 podman run` commands timed out - - `podman rm -f` doesn't kill processes when container metadata is corrupted - - Orphaned python2 process kept running with port bound - -**Root Cause:** -Stale container processes from interrupted debug sessions held port 8081. -The container started successfully but httpd thread failed to bind, -causing silent failure (no HTTP endpoints) while proxy testing continued. - -**Fix Applied:** -```bash -# Force kill all orphaned processes -sudo pkill -9 -f "ppf.py" -sudo pkill -9 -f "proxywatchd.py" -sudo pkill -9 -f "conmon.*ppf" -sleep 2 - -# Verify port is free -sudo ss -tlnp | grep 8081 # Should show nothing - -# Clean podman state -sudo -u podman podman rm -f -a -sudo -u podman podman container prune -f - -# Start fresh -sudo -u podman bash -c 'cd /home/podman/ppf && \ - podman run -d --rm --name ppf --network=host \ - -v ./src:/app:ro -v ./data:/app/data \ - -v ./config.ini:/app/config.ini:ro \ - localhost/ppf python2 -u proxywatchd.py' -``` - -**Verification:** -```bash -curl -sf http://localhost:8081/health -# {"status": "ok", "timestamp": 1766573885} -``` - -**Prevention:** -- Use `podman-compose` for reliable container management -- Use `pkill -9 -f` to kill orphaned processes before restart -- Check port availability before starting: `ss -tlnp | grep 8081` -- Run container foreground first to capture startup errors - -**Correct Deployment Procedure:** -```bash -# As root or with sudo -sudo -i -u podman bash -cd /home/podman/ppf -podman-compose down -podman-compose up -d -podman ps -podman logs -f ppf -``` - -**docker-compose.yml (updated):** -```yaml -version: '3.8' - -services: - ppf: - image: localhost/ppf:latest - container_name: ppf - network_mode: host - volumes: - - ./src:/app:ro - - ./data:/app/data - - ./config.ini:/app/config.ini:ro - command: python2 -u proxywatchd.py - restart: unless-stopped - environment: - - PYTHONUNBUFFERED=1 -``` - ---- - -### [x] SSH Connection Flooding / fail2ban (2024-12-24) - -**Symptoms:** -- SSH connections timing out or reset -- "Connection refused" errors -- Intermittent access to odin - -**Root Cause:** -Multiple individual SSH commands triggered fail2ban rate limiting. - -**Fix Applied:** -Created `~/.claude/rules/ssh-usage.md` with batching best practices. - -**Key Pattern:** -```bash -# BAD: 5 separate connections -ssh host 'cmd1' -ssh host 'cmd2' -ssh host 'cmd3' - -# GOOD: 1 connection, all commands -ssh host bash <<'EOF' -cmd1 -cmd2 -cmd3 -EOF -``` - ---- - -### [!] Podman Container Metadata Disappears (2024-12-24) - -**Symptoms:** -- `podman ps -a` shows empty even though process is running -- `podman logs ppf` returns "no such container" -- Port is listening and service responds to health checks - -**Observed Behavior:** -``` -# Container starts -podman run -d --name ppf ... -# Returns container ID: dc55f0a218b7... - -# Immediately after -podman ps -a # Empty! -ss -tlnp | grep 8081 # Shows python2 listening -curl localhost:8081/health # {"status": "ok"} -``` - -**Analysis:** -- The process runs correctly inside the container namespace -- Container metadata in podman's database is lost/corrupted -- May be related to `--rm` flag interaction with detached mode -- Rootless podman with overlayfs can have state sync issues - -**Workaround:** -Service works despite missing metadata. Monitor via: -- `ss -tlnp | grep 8081` - port listening -- `ps aux | grep proxywatchd` - process running -- `curl localhost:8081/health` - service responding - -**Impact:** Low. Service functions correctly. Only `podman logs` unavailable. - ---- - -### Container Debugging Checklist - -When container fails to start or crashes: +## Container Debugging Checklist ``` -┌───┬─────────────────────────────────────────────────────────────────────────┐ -│ 1 │ Check for orphans: ps aux | grep -E "[p]rocess_name" -│ 2 │ Check port conflicts: ss -tlnp | grep PORT -│ 3 │ Run foreground: podman run --rm (no -d) to see output -│ 4 │ Check podman state: podman ps -a -│ 5 │ Clean stale: pkill -9 -f "pattern" && podman rm -f -a -│ 6 │ Verify deps: config files, data dirs, volumes exist -│ 7 │ Check logs: podman logs container_name 2>&1 | tail -50 -│ 8 │ Health check: curl -sf http://localhost:PORT/health -└───┴─────────────────────────────────────────────────────────────────────────┘ - -Note: If podman ps shows empty but port is listening and health check passes, -the service is running correctly despite metadata issues. See "Podman Container -Metadata Disappears" section above. +1. Check for orphans: ps aux | grep -E "[p]rocess_name" +2. Check port conflicts: ss -tlnp | grep PORT +3. Run foreground: podman run --rm (no -d) to see output +4. Check podman state: podman ps -a +5. Clean stale: pkill -9 -f "pattern" && podman rm -f -a +6. Verify deps: config files, data dirs, volumes exist +7. Check logs: podman logs container_name 2>&1 | tail -50 +8. Health check: curl -sf http://localhost:PORT/health ``` diff --git a/compose.master.yml b/compose.master.yml new file mode 100644 index 0000000..0355ccd --- /dev/null +++ b/compose.master.yml @@ -0,0 +1,30 @@ +# PPF master node (odin) +# +# Scrapes proxy sources, runs verification, serves API/dashboard. +# No routine proxy testing -- workers handle that. +# +# Prerequisites: +# - config.ini (not tracked, host-specific) +# - data/ (created automatically) +# +# Usage: +# podman-compose -f compose.master.yml up -d +# podman-compose -f compose.master.yml logs -f +# podman-compose -f compose.master.yml down + +services: + ppf: + container_name: ppf + image: localhost/ppf:latest + build: . + network_mode: host + restart: unless-stopped + stop_signal: SIGTERM + stop_grace_period: 30s + environment: + PYTHONUNBUFFERED: "1" + volumes: + - .:/app:ro,Z + - ./data:/app/data:Z + - ./config.ini:/app/config.ini:ro,Z + command: python -u ppf.py diff --git a/compose.worker.yml b/compose.worker.yml new file mode 100644 index 0000000..cac7fa5 --- /dev/null +++ b/compose.worker.yml @@ -0,0 +1,38 @@ +# PPF worker node (cassius, edge, sentinel, ...) +# +# Tests proxies and reports results to master via WireGuard. +# Each worker uses only local Tor (127.0.0.1:9050). +# +# Prerequisites: +# - config.ini (not tracked, host-specific) +# - servers.txt (deploy from repo) +# - src/ (deploy *.py from repo root into src/) +# - data/ (created automatically) +# +# Usage: +# PPF_MASTER_URL=http://10.200.1.250:8081 podman-compose -f compose.worker.yml up -d +# podman-compose -f compose.worker.yml logs -f +# podman-compose -f compose.worker.yml down +# +# The master URL defaults to http://10.200.1.250:8081 (odin via WireGuard). +# Override with PPF_MASTER_URL env var or edit .env file. + +services: + ppf-worker: + container_name: ppf-worker + image: localhost/ppf-worker:latest + build: . + network_mode: host + restart: unless-stopped + stop_signal: SIGTERM + stop_grace_period: 30s + logging: + driver: k8s-file + environment: + PYTHONUNBUFFERED: "1" + volumes: + - ./src:/app:ro,Z + - ./data:/app/data:Z + - ./config.ini:/app/config.ini:ro,Z + - ./servers.txt:/app/servers.txt:ro,Z + command: python -u ppf.py --worker-v2 --server ${PPF_MASTER_URL:-http://10.200.1.250:8081} diff --git a/config.py b/config.py index 0af12fc..04e985c 100644 --- a/config.py +++ b/config.py @@ -11,7 +11,12 @@ class Config(ComboParser): with open(self.watchd.source_file, 'r') as handle: self.servers = [x.strip() for x in handle.readlines() if len(x.strip()) > 0] # Parse checktypes as comma-separated list - self.watchd.checktypes = [t.strip() for t in self.watchd.checktype.split(',') if t.strip()] + # Normalize: 'false'/'off'/'disabled' -> 'none' (SSL-only mode) + raw_types = [t.strip().lower() for t in self.watchd.checktype.split(',') if t.strip()] + self.watchd.checktypes = ['none' if t in ('false', 'off', 'disabled') else t for t in raw_types] + # SSL-only mode: force ssl_first when secondary check is disabled + if self.watchd.checktypes == ['none']: + self.watchd.ssl_first = True # Apply log level from CLI flags if self.args.quiet: set_log_level('warn') @@ -52,12 +57,15 @@ class Config(ComboParser): errors.append('ppf.max_fail must be >= 1') # Validate checktypes (secondary check types, ssl is handled by ssl_first) - valid_checktypes = {'irc', 'head', 'judges'} + # 'none' = SSL-only mode (no secondary check) + valid_checktypes = {'irc', 'head', 'judges', 'none'} for ct in self.watchd.checktypes: if ct not in valid_checktypes: errors.append('watchd.checktype "%s" invalid, must be one of: %s' % (ct, ', '.join(sorted(valid_checktypes)))) if not self.watchd.checktypes: errors.append('watchd.checktype must specify at least one valid type') + if 'none' in self.watchd.checktypes and len(self.watchd.checktypes) > 1: + errors.append('watchd.checktype "none" cannot be combined with other types') # Validate engine names valid_engines = {'duckduckgo', 'startpage', 'brave', 'ecosia', @@ -112,7 +120,7 @@ class Config(ComboParser): self.add_item(section, 'stale_days', int, 30, 'days after which dead proxies are removed (default: 30)', False) self.add_item(section, 'stats_interval', int, 300, 'seconds between status reports (default: 300)', False) self.add_item(section, 'tor_safeguard', bool, True, 'enable tor safeguard (default: True)', False) - self.add_item(section, 'checktype', str, 'head', 'secondary check type: irc, head, judges (used when ssl_first fails)', False) + self.add_item(section, 'checktype', str, 'head', 'secondary check type: head, irc, judges, none/false (none = SSL-only)', False) self.add_item(section, 'ssl_first', bool, True, 'try SSL handshake first, fallback to checktype on failure (default: True)', False) self.add_item(section, 'ssl_only', bool, False, 'when ssl_first enabled, skip secondary check on SSL failure (default: False)', False) self.add_item(section, 'scale_cooldown', int, 10, 'seconds between thread scaling decisions (default: 10)', False) @@ -161,6 +169,8 @@ class Config(ComboParser): self.add_item(section, 'batch_size', int, 100, 'proxies per work batch (default: 100)', False) self.add_item(section, 'heartbeat', int, 60, 'heartbeat interval in seconds (default: 60)', False) self.add_item(section, 'claim_timeout', int, 300, 'seconds before unclaimed work is released (default: 300)', False) + self.add_item(section, 'url_batch_size', int, 5, 'URLs per claim cycle for V2 mode (default: 5)', False) + self.add_item(section, 'fetch_timeout', int, 30, 'timeout for URL fetching in V2 mode (default: 30)', False) self.aparser.add_argument("--file", help="import a single file containing proxy addrs", type=str, default='', required=False) self.aparser.add_argument("--nobs", help="disable BeautifulSoup, use stdlib HTMLParser", action='store_true', default=False) @@ -172,3 +182,4 @@ class Config(ComboParser): self.aparser.add_argument("--worker-key", help="worker authentication key", type=str, default='') self.aparser.add_argument("--register", help="register as worker with master server", action='store_true', default=False) self.aparser.add_argument("--worker-name", help="worker name for registration (default: hostname)", type=str, default='') + self.aparser.add_argument("--worker-v2", help="run as V2 worker (URL-driven fetching)", action='store_true', default=False) diff --git a/dbs.py b/dbs.py index f543efa..811750e 100644 --- a/dbs.py +++ b/dbs.py @@ -66,6 +66,83 @@ def _migrate_confidence_column(sqlite): sqlite.commit() +def _migrate_source_proto(sqlite): + """Add source_proto columns to preserve scraper-detected protocol intelligence.""" + try: + sqlite.execute('SELECT source_proto FROM proxylist LIMIT 1') + except Exception: + # source_proto: protocol detected by scraper (never overwritten by tests) + sqlite.execute('ALTER TABLE proxylist ADD COLUMN source_proto TEXT') + # source_confidence: scraper confidence score (0-100) + sqlite.execute('ALTER TABLE proxylist ADD COLUMN source_confidence INT DEFAULT 0') + sqlite.commit() + + +def _migrate_protos_working(sqlite): + """Add protos_working column for multi-protocol storage.""" + try: + sqlite.execute('SELECT protos_working FROM proxylist LIMIT 1') + except Exception: + # protos_working: comma-separated list of working protos (e.g. "http,socks5") + sqlite.execute('ALTER TABLE proxylist ADD COLUMN protos_working TEXT') + sqlite.commit() + + +def _migrate_last_seen(sqlite): + """Add last_seen column for worker-reported proxy freshness tracking.""" + try: + sqlite.execute('SELECT last_seen FROM proxylist LIMIT 1') + except Exception: + # last_seen: unix timestamp of most recent "working" report from any worker + sqlite.execute('ALTER TABLE proxylist ADD COLUMN last_seen INT DEFAULT 0') + sqlite.commit() + + +def _migrate_uri_check_interval(sqlite): + """Add adaptive check_interval column to uris table.""" + try: + sqlite.execute('SELECT check_interval FROM uris LIMIT 1') + except Exception: + sqlite.execute('ALTER TABLE uris ADD COLUMN check_interval INT DEFAULT 3600') + sqlite.commit() + + +def _migrate_uri_working_ratio(sqlite): + """Add working_ratio column to uris table for proxy quality tracking.""" + try: + sqlite.execute('SELECT working_ratio FROM uris LIMIT 1') + except Exception: + sqlite.execute('ALTER TABLE uris ADD COLUMN working_ratio REAL DEFAULT 0.0') + sqlite.commit() + + +def _migrate_uri_avg_fetch_time(sqlite): + """Add avg_fetch_time column to uris table for fetch latency EMA.""" + try: + sqlite.execute('SELECT avg_fetch_time FROM uris LIMIT 1') + except Exception: + sqlite.execute('ALTER TABLE uris ADD COLUMN avg_fetch_time INT DEFAULT 0') + sqlite.commit() + + +def _migrate_uri_last_worker(sqlite): + """Add last_worker column to uris table.""" + try: + sqlite.execute('SELECT last_worker FROM uris LIMIT 1') + except Exception: + sqlite.execute('ALTER TABLE uris ADD COLUMN last_worker TEXT') + sqlite.commit() + + +def _migrate_uri_yield_rate(sqlite): + """Add yield_rate column to uris table for proxy yield EMA.""" + try: + sqlite.execute('SELECT yield_rate FROM uris LIMIT 1') + except Exception: + sqlite.execute('ALTER TABLE uris ADD COLUMN yield_rate REAL DEFAULT 0.0') + sqlite.commit() + + def compute_proxy_list_hash(proxies): """Compute MD5 hash of sorted proxy list for change detection. @@ -290,13 +367,20 @@ def create_table_if_not_exists(sqlite, dbname): asn INT, latitude REAL, longitude REAL, - confidence INT DEFAULT 30)""") + confidence INT DEFAULT 30, + source_proto TEXT, + source_confidence INT DEFAULT 0, + protos_working TEXT, + last_seen INT DEFAULT 0)""") # Migration: add columns to existing databases (must run before creating indexes) _migrate_latency_columns(sqlite) _migrate_anonymity_columns(sqlite) _migrate_asn_column(sqlite) _migrate_geolocation_columns(sqlite) _migrate_confidence_column(sqlite) + _migrate_source_proto(sqlite) + _migrate_protos_working(sqlite) + _migrate_last_seen(sqlite) # Indexes for common query patterns sqlite.execute('CREATE INDEX IF NOT EXISTS idx_proxylist_failed ON proxylist(failed)') sqlite.execute('CREATE INDEX IF NOT EXISTS idx_proxylist_tested ON proxylist(tested)') @@ -317,6 +401,11 @@ def create_table_if_not_exists(sqlite, dbname): content_hash TEXT)""") # Migration for existing databases _migrate_content_hash_column(sqlite) + _migrate_uri_check_interval(sqlite) + _migrate_uri_working_ratio(sqlite) + _migrate_uri_avg_fetch_time(sqlite) + _migrate_uri_last_worker(sqlite) + _migrate_uri_yield_rate(sqlite) # Indexes for common query patterns sqlite.execute('CREATE INDEX IF NOT EXISTS idx_uris_error ON uris(error)') sqlite.execute('CREATE INDEX IF NOT EXISTS idx_uris_checktime ON uris(check_time)') @@ -444,11 +533,11 @@ def insert_proxies(proxydb, proxies, url): filtered += 1 continue - rows.append((timestamp, addr, ip, port, proto, 1, 0, 0, 0, 0, 0, confidence)) + rows.append((timestamp, addr, ip, port, proto, 1, 0, 0, 0, 0, 0, confidence, proto, confidence)) proxydb.executemany( 'INSERT OR IGNORE INTO proxylist ' - '(added,proxy,ip,port,proto,failed,tested,success_count,total_duration,mitm,consecutive_success,confidence) ' - 'VALUES (?,?,?,?,?,?,?,?,?,?,?,?)', + '(added,proxy,ip,port,proto,failed,tested,success_count,total_duration,mitm,consecutive_success,confidence,source_proto,source_confidence) ' + 'VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?)', rows ) proxydb.commit() @@ -508,6 +597,32 @@ PROXY_SOURCES = [ 'https://api.proxyscrape.com/v2/?request=displayproxies&protocol=http&timeout=10000&country=all', 'https://api.proxyscrape.com/v2/?request=displayproxies&protocol=socks4&timeout=10000&country=all', 'https://api.proxyscrape.com/v2/?request=displayproxies&protocol=socks5&timeout=10000&country=all', + # proxifly/free-proxy-list - 5 min updates (jsDelivr CDN) + 'https://cdn.jsdelivr.net/gh/proxifly/free-proxy-list@main/proxies/protocols/http/data.txt', + 'https://cdn.jsdelivr.net/gh/proxifly/free-proxy-list@main/proxies/protocols/socks4/data.txt', + 'https://cdn.jsdelivr.net/gh/proxifly/free-proxy-list@main/proxies/protocols/socks5/data.txt', + # vakhov/fresh-proxy-list - 5-20 min updates (GitHub Pages) + 'https://vakhov.github.io/fresh-proxy-list/http.txt', + 'https://vakhov.github.io/fresh-proxy-list/socks4.txt', + 'https://vakhov.github.io/fresh-proxy-list/socks5.txt', + # prxchk/proxy-list - 10 min updates + 'https://raw.githubusercontent.com/prxchk/proxy-list/main/http.txt', + 'https://raw.githubusercontent.com/prxchk/proxy-list/main/socks4.txt', + 'https://raw.githubusercontent.com/prxchk/proxy-list/main/socks5.txt', + # sunny9577/proxy-scraper - 3 hour updates (GitHub Pages) + 'https://sunny9577.github.io/proxy-scraper/generated/http_proxies.txt', + 'https://sunny9577.github.io/proxy-scraper/generated/socks4_proxies.txt', + 'https://sunny9577.github.io/proxy-scraper/generated/socks5_proxies.txt', + # officialputuid/KangProxy - 4-6 hour updates + 'https://raw.githubusercontent.com/officialputuid/KangProxy/KangProxy/http/http.txt', + 'https://raw.githubusercontent.com/officialputuid/KangProxy/KangProxy/socks4/socks4.txt', + 'https://raw.githubusercontent.com/officialputuid/KangProxy/KangProxy/socks5/socks5.txt', + # hookzof/socks5_list - hourly updates + 'https://raw.githubusercontent.com/hookzof/socks5_list/master/proxy.txt', + # iplocate/free-proxy-list - 30 min updates + 'https://raw.githubusercontent.com/iplocate/free-proxy-list/main/protocols/http.txt', + 'https://raw.githubusercontent.com/iplocate/free-proxy-list/main/protocols/socks4.txt', + 'https://raw.githubusercontent.com/iplocate/free-proxy-list/main/protocols/socks5.txt', ] diff --git a/docker-compose.yml b/docker-compose.yml deleted file mode 100644 index 519cf80..0000000 --- a/docker-compose.yml +++ /dev/null @@ -1,11 +0,0 @@ -version: '3.8' - -services: - ppf: - build: . - volumes: - - .:/app - working_dir: /app - command: python ppf.py - environment: - - PYTHONUNBUFFERED=1 \ No newline at end of file diff --git a/documentation/design-worker-driven-discovery.md b/documentation/design-worker-driven-discovery.md new file mode 100644 index 0000000..5adf931 --- /dev/null +++ b/documentation/design-worker-driven-discovery.md @@ -0,0 +1,572 @@ +# Design: Worker-Driven Discovery + +## Status + +**Proposal** -- Not yet implemented. + +## Problem + +The current architecture centralizes all proxy list fetching on the master +node (odin). Workers only test proxies handed to them. This creates several +issues: + +1. **Single point of fetch** -- If odin can't reach a source (blocked IP, + transient failure), that source is dead for everyone. +2. **Bandwidth concentration** -- Odin fetches 40 proxy lists every cycle, + extracts proxies, deduplicates, and stores them before workers ever see + them. +3. **Wasted vantage points** -- Workers sit behind different Tor exits and + IPs, but never use that diversity for fetching. +4. **Tight coupling** -- Workers can't operate at all without the master's + claim queue. If odin restarts, all workers stall. + +## Proposed Architecture + +Move proxy list fetching to workers. Master becomes a coordinator and +aggregator rather than a fetcher. + +``` +Current: Proposed: + +Master Master + Fetch URLs --------+ Manage URL database + Extract proxies | Score URLs from feedback + Store proxylist | Aggregate working proxies + Serve /api/work ---+-> Workers Serve /api/claim-urls ----> Workers + <- /api/results | + <- /api/report-urls ------+ + <- /api/report-proxies ---+ +``` + +### Role Changes + +``` ++--------+---------------------------+----------------------------------+ +| Host | Current Role | New Role | ++--------+---------------------------+----------------------------------+ +| odin | Fetch URLs | Maintain URL database | +| | Extract proxies | Score URLs from worker feedback | +| | Store proxylist | Aggregate reported proxies | +| | Distribute proxy batches | Distribute URL batches | +| | Collect test results | Collect URL + proxy reports | ++--------+---------------------------+----------------------------------+ +| worker | Claim proxy batch | Claim URL batch | +| | Test each proxy | Fetch URL, extract proxies | +| | Report pass/fail | Test extracted proxies | +| | | Report URL health + proxy results| ++--------+---------------------------+----------------------------------+ +``` + +## Data Flow + +### Phase 1: URL Claiming + +Worker requests a batch of URLs to process. + +``` +Worker Master + | | + | GET /api/claim-urls | + | ?key=...&count=5 | + |------------------------------>| + | | Select due URLs from uris table + | | Mark as claimed (in-memory) + | [{url, last_hash, proto_hint}, ...] + |<------------------------------| +``` + +**Claim response:** +```json +{ + "worker_id": "abc123", + "urls": [ + { + "url": "https://raw.githubusercontent.com/.../http.txt", + "last_hash": "a1b2c3d4...", + "proto_hint": "http", + "priority": 1 + } + ] +} +``` + +Fields: +- `last_hash` -- MD5 of last extracted proxy list. Worker can skip + extraction and report "unchanged" if hash matches, saving CPU. +- `proto_hint` -- Protocol inferred from URL path. Worker uses this for + extraction confidence scoring. +- `priority` -- Higher = fetch sooner. Based on URL score. + +### Phase 2: Fetch and Extract + +Worker fetches each URL through Tor, extracts proxies using the existing +`fetch.extract_proxies()` pipeline. + +``` +Worker + | + | For each claimed URL: + | 1. Fetch through Tor (fetch_contents) + | 2. Compute content hash (MD5) + | 3. If hash == last_hash: skip extraction, report unchanged + | 4. Else: extract_proxies() -> list of (addr, proto, confidence) + | 5. Queue extracted proxies for testing + | +``` + +### Phase 3: URL Feedback + +Worker reports fetch results for each URL back to master. + +``` +Worker Master + | | + | POST /api/report-urls | + | {reports: [...]} | + |------------------------------>| + | | Update uris table: + | | check_time, error, stale_count, + | | retrievals, proxies_added, + | | content_hash, worker_scores + | {ok: true} | + |<------------------------------| +``` + +**URL report payload:** +```json +{ + "reports": [ + { + "url": "https://...", + "success": true, + "content_hash": "a1b2c3d4...", + "proxy_count": 1523, + "fetch_time_ms": 2340, + "changed": true, + "error": null + }, + { + "url": "https://...", + "success": false, + "content_hash": null, + "proxy_count": 0, + "fetch_time_ms": 0, + "changed": false, + "error": "timeout" + } + ] +} +``` + +### Phase 4: Proxy Testing and Reporting + +Worker tests extracted proxies locally using the existing `TargetTestJob` +pipeline. **Only working proxies are reported to master.** Failed proxies +are discarded silently -- no point wasting bandwidth on negatives. + +Workers are trusted. If a worker says a proxy works, master accepts it. + +``` +Worker Master + | | + | Test proxies locally | + | (same TargetTestJob flow) | + | Discard failures | + | | + | POST /api/report-proxies | + | {proxies: [...]} | (working only) + |------------------------------>| + | | Upsert into proxylist: + | | INSERT OR REPLACE + | | Set failed=0, update last_seen + | {ok: true} | + |<------------------------------| +``` + +**Proxy report payload (working only):** +```json +{ + "proxies": [ + { + "ip": "1.2.3.4", + "port": 8080, + "proto": "socks5", + "source_proto": "socks5", + "latency": 1.234, + "exit_ip": "5.6.7.8", + "anonymity": "elite", + "source_url": "https://..." + } + ] +} +``` + +No `working` field needed -- everything in the report is working by +definition. The `source_url` links proxy provenance to the URL that +yielded it, enabling URL quality scoring. + +### Complete Cycle + +``` +Worker main loop: + 1. GET /api/claim-urls Claim batch of URLs + 2. For each URL: + a. Fetch through Tor + b. Extract proxies (or skip if unchanged) + c. Test extracted proxies + 3. POST /api/report-urls Report URL health + 4. POST /api/report-proxies Report proxy results + 5. POST /api/heartbeat Health check + 6. Sleep, repeat +``` + +## Master-Side Changes + +### URL Scheduling + +Current `Leechered` threads fetch URLs on a timer based on error/stale +count. Replace with a scoring system that workers consume. + +**URL score** (higher = fetch sooner): + +``` +score = base_score + + freshness_bonus # High-frequency sources score higher + - error_penalty # Consecutive errors reduce score + - stale_penalty # Unchanged content reduces score + + yield_bonus # URLs that produce many proxies score higher + + quality_bonus # URLs whose proxies actually work score higher +``` + +Concrete formula: + +```python +def url_score(url_row): + age = now - url_row.check_time + base = age / url_row.check_interval # 1.0 when due + + # Yield: proxies found per fetch (rolling average) + yield_rate = url_row.proxies_added / max(url_row.retrievals, 1) + yield_bonus = min(yield_rate / 100.0, 1.0) # Cap at 1.0 + + # Quality: what % of extracted proxies actually worked + quality_bonus = url_row.working_ratio * 0.5 # 0.0 to 0.5 + + # Penalties + error_penalty = min(url_row.error * 0.3, 2.0) + stale_penalty = min(url_row.stale_count * 0.1, 1.0) + + return base + yield_bonus + quality_bonus - error_penalty - stale_penalty +``` + +URLs with `score >= 1.0` are due for fetching. Claimed URLs are locked +in memory for `claim_timeout` seconds (existing pattern). + +### New uris Columns + +```sql +ALTER TABLE uris ADD COLUMN check_interval INT DEFAULT 3600; +ALTER TABLE uris ADD COLUMN working_ratio REAL DEFAULT 0.0; +ALTER TABLE uris ADD COLUMN avg_fetch_time INT DEFAULT 0; +ALTER TABLE uris ADD COLUMN last_worker TEXT; +ALTER TABLE uris ADD COLUMN yield_rate REAL DEFAULT 0.0; +``` + +- `check_interval` -- Adaptive: decreases for high-yield URLs, increases + for stale/erroring ones. Replaces the `checktime + error * perfail` + formula with a persisted value. +- `working_ratio` -- EMA of (working_proxies / total_proxies) from worker + feedback. URLs that yield dead proxies get deprioritized. +- `avg_fetch_time` -- EMA of fetch duration in ms. Helps identify slow + sources. +- `last_worker` -- Which worker last fetched this URL. Useful for + debugging, and to distribute URLs across workers evenly. +- `yield_rate` -- EMA of proxies extracted per fetch. + +### Proxy Aggregation + +Trust model: **workers are trusted.** If any worker reports a proxy as +working, master accepts it. Failed proxies are never reported -- workers +discard them locally. + +``` +Worker A reports: 1.2.3.4:8080 working, latency 1.2s +Worker B reports: 1.2.3.4:8080 working, latency 1.5s + +Master action: + - INSERT OR REPLACE with latest report + - Update last_seen, latency EMA + - Set failed = 0 +``` + +No consensus, no voting, no trust scoring. A proxy lives as long as at +least one worker keeps confirming it. It dies when nobody reports it for +`proxy_ttl` seconds. + +New `proxylist` column: + +```sql +ALTER TABLE proxylist ADD COLUMN last_seen INT DEFAULT 0; +``` + +- `last_seen` -- Unix timestamp of most recent "working" report. Proxies + not seen in N hours are expired by the master's periodic cleanup. + +### Proxy Expiry + +Working proxies that haven't been reported by any worker within +`proxy_ttl` (default: 4 hours) are marked stale and re-queued for +testing. After `proxy_ttl * 3` with no reports, they're marked failed. + +```python +def expire_stale_proxies(db, proxy_ttl): + cutoff_stale = now - proxy_ttl + cutoff_dead = now - (proxy_ttl * 3) + + # Mark stale proxies for retesting + db.execute(''' + UPDATE proxylist SET failed = 1 + WHERE failed = 0 AND last_seen < ? AND last_seen > 0 + ''', (cutoff_stale,)) + + # Kill proxies not seen in a long time + db.execute(''' + UPDATE proxylist SET failed = -1 + WHERE failed > 0 AND last_seen < ? AND last_seen > 0 + ''', (cutoff_dead,)) +``` + +## Worker-Side Changes + +### New Worker Loop + +Replace the current claim-test-report loop with a two-phase loop: + +```python +def worker_main_v2(config): + register() + verify_tor() + + while True: + # Phase 1: Fetch URLs and extract proxies + urls = claim_urls(server, key, count=5) + url_reports = [] + proxy_batch = [] + + for url_info in urls: + report, proxies = fetch_and_extract(url_info) + url_reports.append(report) + proxy_batch.extend(proxies) + + report_urls(server, key, url_reports) + + # Phase 2: Test extracted proxies, report working only + if proxy_batch: + working = test_proxies(proxy_batch) + if working: + report_proxies(server, key, working) + + heartbeat(server, key) + sleep(1) +``` + +### fetch_and_extract() + +New function that combines fetching + extraction on the worker side: + +```python +def fetch_and_extract(url_info): + url = url_info['url'] + last_hash = url_info.get('last_hash') + proto_hint = url_info.get('proto_hint') + + start = time.time() + try: + content = fetch_contents(url, head=False, proxy=tor_proxy) + except Exception as e: + return {'url': url, 'success': False, 'error': str(e)}, [] + + elapsed = int((time.time() - start) * 1000) + content_hash = hashlib.md5(content).hexdigest() + + if content_hash == last_hash: + return { + 'url': url, 'success': True, 'content_hash': content_hash, + 'proxy_count': 0, 'fetch_time_ms': elapsed, + 'changed': False, 'error': None + }, [] + + proxies = extract_proxies(content, url) + return { + 'url': url, 'success': True, 'content_hash': content_hash, + 'proxy_count': len(proxies), 'fetch_time_ms': elapsed, + 'changed': True, 'error': None + }, proxies +``` + +### Deduplication + +Workers may extract the same proxies from different URLs. Local +deduplication before testing: + +```python +seen = set() +unique = [] +for addr, proto, confidence in proxy_batch: + if addr not in seen: + seen.add(addr) + unique.append((addr, proto, confidence)) +proxy_batch = unique +``` + +### Proxy Testing + +Reuse the existing `TargetTestJob` / `WorkerThread` pipeline. The only +change: proxies come from local extraction instead of master's claim +response. The test loop, result collection, and evaluation logic remain +identical. + +## API Changes Summary + +### New Endpoints + +| Endpoint | Method | Purpose | +|----------|--------|---------| +| `/api/claim-urls` | GET | Worker claims batch of due URLs | +| `/api/report-urls` | POST | Worker reports URL fetch results | +| `/api/report-proxies` | POST | Worker reports proxy test results | + +### Modified Endpoints + +| Endpoint | Change | +|----------|--------| +| `/api/work` | Deprecated but kept for backward compatibility | +| `/api/results` | Deprecated but kept for backward compatibility | + +### Unchanged Endpoints + +| Endpoint | Reason | +|----------|--------| +| `/api/register` | Same registration flow | +| `/api/heartbeat` | Same health reporting | +| `/dashboard` | Still reads from same DB | +| `/proxies` | Still reads from proxylist | + +## Schema Changes + +### uris table additions + +```sql +ALTER TABLE uris ADD COLUMN check_interval INT DEFAULT 3600; +ALTER TABLE uris ADD COLUMN working_ratio REAL DEFAULT 0.0; +ALTER TABLE uris ADD COLUMN avg_fetch_time INT DEFAULT 0; +ALTER TABLE uris ADD COLUMN last_worker TEXT; +ALTER TABLE uris ADD COLUMN yield_rate REAL DEFAULT 0.0; +``` + +### proxylist table additions + +```sql +ALTER TABLE proxylist ADD COLUMN last_seen INT DEFAULT 0; +``` + +## Migration Strategy + +### Phase 1: Add New Endpoints (non-breaking) + +Add `/api/claim-urls`, `/api/report-urls`, `/api/report-proxies` to +`httpd.py`. Keep all existing endpoints working. Master still runs its +own `Leechered` threads. + +Files: `httpd.py`, `dbs.py` (migrations) + +### Phase 2: Worker V2 Mode + +Add `--worker-v2` flag to `ppf.py`. When set, worker uses the new +URL-claiming loop instead of the proxy-claiming loop. Both modes coexist. + +Old workers (`--worker`) continue working against `/api/work` and +`/api/results`. New workers (`--worker-v2`) use the new endpoints. + +Files: `ppf.py`, `config.py` + +### Phase 3: URL Scoring + +Implement URL scoring in master based on worker feedback. Replace +`Leechered` timer-based scheduling with score-based scheduling. Master's +own fetching becomes a fallback for URLs no worker has claimed recently. + +Files: `httpd.py`, `dbs.py` + +### Phase 4: Remove Legacy + +Once all workers run V2, remove `/api/work`, `/api/results`, and +master-side `Leechered` threads. Master no longer fetches proxy lists +directly. + +Files: `ppf.py`, `httpd.py` + +## Configuration + +### New config.ini Options + +```ini +[worker] +# V2 mode: worker fetches URLs instead of proxy batches +mode = v2 # v1 (legacy) or v2 (url-driven) +url_batch_size = 5 # URLs per claim cycle +max_proxies_per_cycle = 500 # Cap on proxies tested per cycle +fetch_timeout = 30 # Timeout for URL fetching (seconds) + +[ppf] +# URL scoring weights +score_yield_weight = 1.0 +score_quality_weight = 0.5 +score_error_penalty = 0.3 +score_stale_penalty = 0.1 + +# Proxy expiry +proxy_ttl = 14400 # Seconds before unseen proxy goes stale (4h) +proxy_ttl_dead = 43200 # Seconds before unseen proxy is killed (12h) + +# Fallback: master fetches URLs not claimed by any worker +fallback_fetch = true +fallback_interval = 7200 # Seconds before master fetches unclaimed URL +``` + +## Risks and Mitigations + +| Risk | Impact | Mitigation | +|------|--------|------------| +| Workers extract different proxy counts from same URL | Inconsistent proxy_count in reports | Use content_hash for dedup; only update yield_rate when hash changes | +| Tor exit blocks a source for one worker | Worker reports error for a working URL | Require 2+ consecutive errors before incrementing URL error count | +| Workers test same proxies redundantly | Wasted CPU | Master tracks which URLs are assigned to which workers; avoid assigning same URL to multiple workers in same cycle | +| Large proxy lists overwhelm worker memory | OOM on worker | Cap `max_proxies_per_cycle`; worker discards excess after dedup | +| Master restart loses claim state | Workers refetch recently-fetched URLs | Harmless -- just a redundant fetch. content_hash prevents duplicate work | +| `fetch.py` imports unavailable on worker image | ImportError | Verify worker Dockerfile includes fetch.py and dependencies | + +## What Stays the Same + +- `rocksock.py` -- No changes to proxy chain logic +- `connection_pool.py` -- Tor host selection unchanged +- `proxywatchd.py` core -- `TargetTestJob`, `WorkerThread`, `ProxyTestState` + remain identical. Only the job source changes. +- `fetch.py` -- Used on workers now, but the code itself doesn't change +- `httpd.py` dashboard/proxies -- Still reads from same `proxylist` table +- SQLite as storage -- No database engine change + +## Open Questions + +1. **Should workers share extracted proxy lists with each other?** Peer + exchange would reduce redundant fetching but adds protocol complexity. + Recommendation: no, keep it simple. Master deduplicates via + `INSERT OR REPLACE`. + +2. **Should URL claiming be weighted by worker geography?** Some sources + may be accessible from certain Tor exits but not others. + Recommendation: defer. Let natural retries handle this; track + per-worker URL success rates for future optimization. + +3. **What's the right `proxy_ttl`?** Too short and we churn proxies + needlessly. Too long and we serve stale data. Start with 4 hours, + tune based on observed proxy lifetime distribution. diff --git a/fetch.py b/fetch.py index fead0e5..d57da6d 100644 --- a/fetch.py +++ b/fetch.py @@ -56,6 +56,8 @@ class FetchSession(object): def fetch(self, url, head=False): """Fetch URL, reusing connection if possible.""" network_stats.set_category('scraper') + if isinstance(url, unicode): + url = url.encode('utf-8') host, port, ssl, uri = _parse_url(url) # Check if we can reuse existing connection @@ -489,6 +491,8 @@ def fetch_contents(url, head=False, proxy=None): retry_messages = ('Engines cannot retrieve results', 'Rate limit exceeded') def _fetch_contents(url, head = False, proxy=None): network_stats.set_category('scraper') + if isinstance(url, unicode): + url = url.encode('utf-8') host, port, ssl, uri = _parse_url(url) headers=[ 'Accept-Language: en-US,en;q=0.8', diff --git a/httpd.py b/httpd.py index 8432312..57de23f 100644 --- a/httpd.py +++ b/httpd.py @@ -80,14 +80,30 @@ _master_key = None # master key for worker registration _claim_timeout = 300 # seconds before unclaimed work is released _workers_file = 'data/workers.json' # persistent storage +# URL claim tracking (parallel to proxy claims) +_url_claims = {} # url -> {worker_id, claimed_at} +_url_claims_lock = threading.Lock() +_url_claim_timeout = 600 # 10 min (URLs take longer to fetch+extract) + +# URL scoring: pending proxy counts for working_ratio correlation +_url_pending_counts = {} # url -> {total, worker_id, time} +_url_pending_lock = threading.Lock() +_url_database_path = None # set in ProxyAPIServer.__init__ for cross-db access + +# URL scoring defaults (overridden by configure_url_scoring) +_url_checktime = 3600 +_url_perfail_checktime = 3600 +_url_max_fail = 10 +_url_list_max_age_days = 7 + # Test rate tracking: worker_id -> list of (timestamp, count) tuples _worker_test_history = {} _worker_test_history_lock = threading.Lock() _test_history_window = 120 # seconds to keep test history for rate calculation # Fair distribution settings -_min_batch_size = 100 # minimum proxies per batch -_max_batch_size = 1000 # maximum proxies per batch +_min_batch_size = 1 # minimum proxies per batch +_max_batch_size = 10000 # maximum proxies per batch _worker_timeout = 120 # seconds before worker considered inactive # Session tracking @@ -109,6 +125,15 @@ def configure_schedule(working_checktime, fail_retry_interval, fail_retry_backof _max_fail = max_fail +def configure_url_scoring(checktime, perfail_checktime, max_fail, list_max_age_days): + """Set URL scoring parameters from config.""" + global _url_checktime, _url_perfail_checktime, _url_max_fail, _url_list_max_age_days + _url_checktime = checktime + _url_perfail_checktime = perfail_checktime + _url_max_fail = max_fail + _url_list_max_age_days = list_max_age_days + + def _build_due_condition(): """Build SQL condition for proxy due check. @@ -170,19 +195,22 @@ def get_due_proxy_count(db): def calculate_fair_batch_size(db, worker_id): - """Calculate fair batch size based on active workers and queue size.""" + """Calculate fair batch size based on active workers and queue size. + + Divides due work evenly among active workers. No artificial floor — + if only 6 proxies are due with 3 workers, each gets 2. + """ active_workers = max(1, get_active_worker_count()) due_count = get_due_proxy_count(db) if due_count == 0: - return _min_batch_size + return 0 - # Fair share: divide due work among active workers - # Add 20% buffer for speed variations between workers - fair_share = int((due_count / active_workers) * 1.2) + # Fair share: divide due work evenly among active workers + fair_share = max(1, int(due_count / active_workers)) - # Clamp to bounds - batch_size = max(_min_batch_size, min(fair_share, _max_batch_size)) + # Clamp to upper bound only + batch_size = min(fair_share, _max_batch_size) _log('fair_batch: due=%d workers=%d share=%d batch=%d' % ( due_count, active_workers, fair_share, batch_size), 'debug') @@ -381,7 +409,7 @@ def claim_work(db, worker_id, count=100): priority_params = [now_int, _working_checktime, now_int, _fail_retry_interval] query = ''' - SELECT ip, port, proto, failed, + SELECT ip, port, proto, failed, source_proto, CASE WHEN tested IS NULL THEN 0 WHEN (%s) > 3600 THEN 1 @@ -413,6 +441,7 @@ def claim_work(db, worker_id, count=100): 'port': row[1], 'proto': row[2], 'failed': row[3], + 'source_proto': row[4], }) if claimed: @@ -421,6 +450,343 @@ def claim_work(db, worker_id, count=100): return claimed +def claim_urls(url_db, worker_id, count=5): + """Claim a batch of URLs for worker-driven fetching. Returns list of URL dicts. + + Uses score-based scheduling: high-yield URLs checked more often, + stale/broken ones less. Score components: + - age/interval: 1.0 when due, >1.0 when overdue + - yield_bonus: capped at 1.0 for high-yield sources + - quality_bonus: 0-0.5 based on working_ratio + - error_penalty: 0-2.0 based on consecutive errors + - stale_penalty: 0-1.0 based on unchanged fetches + """ + now = time.time() + now_int = int(now) + + # Import here to avoid circular dependency at module level + try: + from fetch import detect_proto_from_path + except ImportError: + detect_proto_from_path = None + + # Clean expired URL claims and pending counts + with _url_claims_lock: + stale = [k for k, v in _url_claims.items() if now - v['claimed_at'] > _url_claim_timeout] + for k in stale: + del _url_claims[k] + claimed_urls = set(_url_claims.keys()) + + with _url_pending_lock: + stale_pending = [k for k, v in _url_pending_counts.items() if now - v['time'] > 600] + for k in stale_pending: + del _url_pending_counts[k] + + list_max_age_seconds = _url_list_max_age_days * 86400 + min_added = now_int - list_max_age_seconds + + try: + rows = url_db.execute( + '''SELECT url, content_hash, + (? - check_time) * 1.0 / MAX(COALESCE(check_interval, 3600), 1) + + MIN(COALESCE(yield_rate, 0) / 100.0, 1.0) + + COALESCE(working_ratio, 0) * 0.5 + - MIN(error * 0.3, 2.0) + - MIN(stale_count * 0.1, 1.0) + AS score + FROM uris + WHERE error < ? + AND (? - check_time) * 1.0 / MAX(COALESCE(check_interval, 3600), 1) >= 0.8 + AND (added > ? OR proxies_added > 0) + ORDER BY score DESC + LIMIT ?''', + (now_int, _url_max_fail, now_int, min_added, count * 3) + ).fetchall() + except Exception as e: + _log('claim_urls query error: %s' % e, 'error') + return [] + + # Filter out already-claimed URLs and lock new claims + claimed = [] + with _url_claims_lock: + for row in rows: + url = row[0] + if url in _url_claims or url in claimed_urls: + continue + _url_claims[url] = {'worker_id': worker_id, 'claimed_at': now} + + proto_hint = None + if detect_proto_from_path: + proto_hint = detect_proto_from_path(url) + + claimed.append({ + 'url': url, + 'last_hash': row[1], + 'proto_hint': proto_hint, + }) + if len(claimed) >= count: + break + + if claimed: + _log('claim_urls: %d URLs to %s' % (len(claimed), worker_id[:8]), 'info') + + return claimed + + +def submit_url_reports(url_db, worker_id, reports): + """Process URL fetch feedback from workers. Returns count of processed reports. + + Updates EMA metrics per URL: + - avg_fetch_time: exponential moving average of fetch latency + - check_interval: adaptive interval (shrinks for productive URLs, grows for stale) + - yield_rate: EMA of proxy count per fetch (on changed content) + - last_worker: worker that last fetched this URL + + Stores pending proxy count for working_ratio correlation in submit_proxy_reports. + """ + processed = 0 + now_int = int(time.time()) + alpha = 0.3 # EMA smoothing factor + + for r in reports: + url = r.get('url', '') + if not url: + continue + + # Release URL claim + with _url_claims_lock: + if url in _url_claims: + del _url_claims[url] + + try: + success = r.get('success', False) + content_hash = r.get('content_hash') + proxy_count = r.get('proxy_count', 0) + changed = r.get('changed', False) + fetch_time_ms = r.get('fetch_time_ms', 0) + + # Fetch current row for EMA computation + row = url_db.execute( + '''SELECT check_interval, avg_fetch_time, yield_rate + FROM uris WHERE url = ?''', (url,) + ).fetchone() + if not row: + processed += 1 + continue + + old_interval = row[0] if row[0] is not None else 3600 + old_fetch_time = row[1] if row[1] is not None else 0 + old_yield = row[2] if row[2] is not None else 0.0 + + # EMA: avg_fetch_time + if old_fetch_time > 0 and fetch_time_ms > 0: + new_fetch_time = int(alpha * fetch_time_ms + (1 - alpha) * old_fetch_time) + elif fetch_time_ms > 0: + new_fetch_time = fetch_time_ms + else: + new_fetch_time = old_fetch_time + + if success: + if changed and proxy_count > 0: + # Success + changed + proxies: converge interval toward 15min + new_interval = max(900, int(old_interval * 0.9)) + # EMA: yield_rate + new_yield = alpha * proxy_count + (1 - alpha) * old_yield + + url_db.execute( + '''UPDATE uris SET + check_time = ?, + retrievals = retrievals + 1, + error = 0, + stale_count = 0, + content_hash = ?, + proxies_added = proxies_added + ?, + check_interval = ?, + avg_fetch_time = ?, + yield_rate = ?, + last_worker = ? + WHERE url = ?''', + (now_int, content_hash, proxy_count, new_interval, + new_fetch_time, new_yield, worker_id, url) + ) + + # Store pending count for working_ratio correlation + with _url_pending_lock: + _url_pending_counts[url] = { + 'total': proxy_count, + 'worker_id': worker_id, + 'time': time.time(), + } + else: + # Success + unchanged (or no proxies): drift interval toward 24h + new_interval = min(86400, int(old_interval * 1.25)) + + url_db.execute( + '''UPDATE uris SET + check_time = ?, + retrievals = retrievals + 1, + error = 0, + stale_count = stale_count + 1, + content_hash = ?, + check_interval = ?, + avg_fetch_time = ?, + last_worker = ? + WHERE url = ?''', + (now_int, content_hash, new_interval, + new_fetch_time, worker_id, url) + ) + else: + # Failure: back off faster + new_interval = min(86400, int(old_interval * 1.5)) + + url_db.execute( + '''UPDATE uris SET + check_time = ?, + error = error + 1, + check_interval = ?, + avg_fetch_time = ?, + last_worker = ? + WHERE url = ?''', + (now_int, new_interval, new_fetch_time, worker_id, url) + ) + + processed += 1 + except Exception as e: + _log('submit_url_reports error for %s: %s' % (url, e), 'error') + + url_db.commit() + return processed + + +def _update_url_working_ratios(url_working_counts): + """Correlate working proxy counts with pending totals to update working_ratio. + + Called after submit_proxy_reports processes all proxies. For each source_url + with a pending entry from submit_url_reports, computes: + ratio = working_count / pending_total + working_ratio = alpha * ratio + (1 - alpha) * old_working_ratio + """ + if not url_working_counts or not _url_database_path: + return + + alpha = 0.3 + settled = [] + + with _url_pending_lock: + pending_snapshot = dict(_url_pending_counts) + + try: + url_db = mysqlite.mysqlite(_url_database_path, str) + for url, working_count in url_working_counts.items(): + pending = pending_snapshot.get(url) + if not pending or pending['total'] <= 0: + continue + + ratio = min(float(working_count) / pending['total'], 1.0) + + row = url_db.execute( + 'SELECT working_ratio FROM uris WHERE url = ?', (url,) + ).fetchone() + old_ratio = row[0] if row and row[0] is not None else 0.0 + new_ratio = alpha * ratio + (1 - alpha) * old_ratio + + url_db.execute( + 'UPDATE uris SET working_ratio = ? WHERE url = ?', + (new_ratio, url) + ) + settled.append(url) + + url_db.commit() + url_db.close() + except Exception as e: + _log('_update_url_working_ratios error: %s' % e, 'error') + + # Remove settled entries from pending + if settled: + with _url_pending_lock: + for url in settled: + _url_pending_counts.pop(url, None) + + +def submit_proxy_reports(db, worker_id, proxies): + """Process working-proxy reports from workers. Returns count of processed proxies. + + Simplified trust-based model: workers report only working proxies. + Each proxy is upserted with failed=0, last_seen=now, latency updated. + Also tracks per-URL working counts for working_ratio correlation. + """ + global _last_workers_save + processed = 0 + now_int = int(time.time()) + now = time.time() + url_working_counts = {} # source_url -> working count + + for p in proxies: + ip = p.get('ip', '') + port = p.get('port', 0) + if not ip or not port: + continue + + proxy_key = '%s:%s' % (ip, port) + proto = p.get('proto', 'http') + latency = p.get('latency', 0) + source_url = p.get('source_url') + + try: + # Upsert: insert new proxy or update existing as working + db.execute(''' + INSERT INTO proxylist (proxy, ip, port, proto, failed, tested, added, + avg_latency, last_seen) + VALUES (?, ?, ?, ?, 0, ?, ?, ?, ?) + ON CONFLICT(proxy) DO UPDATE SET + failed = 0, + tested = excluded.tested, + proto = excluded.proto, + avg_latency = excluded.avg_latency, + last_seen = excluded.last_seen + ''', (proxy_key, ip, port, proto, now_int, now_int, latency, now_int)) + + # Geolocate if IP2Location available + if _geolite and _geodb: + try: + rec = _geodb.get_all(ip) + if rec and rec.country_short and rec.country_short != '-': + db.execute( + 'UPDATE proxylist SET country=? WHERE proxy=?', + (rec.country_short, proxy_key)) + except Exception: + pass + + # Track per-URL working count for working_ratio + if source_url: + url_working_counts[source_url] = url_working_counts.get(source_url, 0) + 1 + + processed += 1 + except Exception as e: + _log('submit_proxy_reports error for %s: %s' % (proxy_key, e), 'error') + + # Commit database changes + db.commit() + + # Update working_ratio for source URLs + if url_working_counts: + _update_url_working_ratios(url_working_counts) + + # Update worker stats + with _workers_lock: + if worker_id in _workers: + w = _workers[worker_id] + w['proxies_working'] = w.get('proxies_working', 0) + processed + w['last_seen'] = now + + # Save workers periodically + if now - _last_workers_save > 60: + save_workers() + _last_workers_save = now + + return processed + + _last_workers_save = 0 def submit_results(db, worker_id, results): @@ -914,6 +1280,7 @@ class ProxyAPIHandler(BaseHTTPServer.BaseHTTPRequestHandler): '/api/stats/export': self.handle_stats_export, '/api/countries': self.handle_countries, '/proxies': self.handle_proxies, + '/proxies/all': self.handle_proxies_all, '/proxies/count': self.handle_count, '/health': self.handle_health, } @@ -930,6 +1297,7 @@ class ProxyAPIHandler(BaseHTTPServer.BaseHTTPRequestHandler): '/api/stats': 'runtime statistics (JSON)', '/api/stats/export': 'export stats (params: format=json|csv)', '/proxies': 'list working proxies (params: limit, proto, country, asn)', + '/proxies/all': 'list ALL working proxies (params: proto, country, asn, format)', '/proxies/count': 'count working proxies', '/health': 'health check', } @@ -967,7 +1335,7 @@ class ProxyAPIHandler(BaseHTTPServer.BaseHTTPRequestHandler): stats = {} # Total counts - row = db.execute('SELECT COUNT(*) FROM proxylist WHERE failed=0').fetchone() + row = db.execute('SELECT COUNT(*) FROM proxylist WHERE failed=0 AND proto IS NOT NULL').fetchone() stats['working'] = row[0] if row else 0 row = db.execute('SELECT COUNT(*) FROM proxylist').fetchone() stats['total'] = row[0] if row else 0 @@ -1076,7 +1444,7 @@ class ProxyAPIHandler(BaseHTTPServer.BaseHTTPRequestHandler): asn = params.get('asn', '') fmt = params.get('format', 'json') - sql = 'SELECT ip, port, proto, country, asn, avg_latency FROM proxylist WHERE failed=0' + sql = 'SELECT ip, port, proto, country, asn, avg_latency, protos_working FROM proxylist WHERE failed=0 AND proto IS NOT NULL' args = [] if proto: @@ -1101,7 +1469,52 @@ class ProxyAPIHandler(BaseHTTPServer.BaseHTTPRequestHandler): else: proxies = [{ 'ip': r[0], 'port': r[1], 'proto': r[2], - 'country': r[3], 'asn': r[4], 'latency': r[5] + 'country': r[3], 'asn': r[4], 'latency': r[5], + 'protos': r[6].split(',') if r[6] else [r[2]] + } for r in rows] + self.send_json({'count': len(proxies), 'proxies': proxies}) + except Exception as e: + self.send_json({'error': str(e)}, 500) + + def handle_proxies_all(self): + params = {} + if '?' in self.path: + for pair in self.path.split('?')[1].split('&'): + if '=' in pair: + k, v = pair.split('=', 1) + params[k] = v + + proto = params.get('proto', '') + country = params.get('country', '') + asn = params.get('asn', '') + fmt = params.get('format', 'json') + + sql = 'SELECT ip, port, proto, country, asn, avg_latency, protos_working FROM proxylist WHERE failed=0 AND proto IS NOT NULL' + args = [] + + if proto: + sql += ' AND proto=?' + args.append(proto) + if country: + sql += ' AND country=?' + args.append(country.upper()) + if asn: + sql += ' AND asn=?' + args.append(int(asn)) + + sql += ' ORDER BY avg_latency ASC, tested DESC' + + try: + db = mysqlite.mysqlite(self.database, str) + rows = db.execute(sql, args).fetchall() + + if fmt == 'plain': + self.send_text('\n'.join('%s://%s:%s' % (r[2] or 'http', r[0], r[1]) for r in rows)) + else: + proxies = [{ + 'ip': r[0], 'port': r[1], 'proto': r[2], + 'country': r[3], 'asn': r[4], 'latency': r[5], + 'protos': r[6].split(',') if r[6] else [r[2]] } for r in rows] self.send_json({'count': len(proxies), 'proxies': proxies}) except Exception as e: @@ -1110,7 +1523,7 @@ class ProxyAPIHandler(BaseHTTPServer.BaseHTTPRequestHandler): def handle_count(self): try: db = mysqlite.mysqlite(self.database, str) - row = db.execute('SELECT COUNT(*) FROM proxylist WHERE failed=0').fetchone() + row = db.execute('SELECT COUNT(*) FROM proxylist WHERE failed=0 AND proto IS NOT NULL').fetchone() self.send_json({'count': row[0] if row else 0}) except Exception as e: self.send_json({'error': str(e)}, 500) @@ -1126,14 +1539,17 @@ class ProxyAPIServer(threading.Thread): otherwise falls back to standard BaseHTTPServer. """ - def __init__(self, host, port, database, stats_provider=None, profiling=False): + def __init__(self, host, port, database, stats_provider=None, profiling=False, url_database=None): threading.Thread.__init__(self) self.host = host self.port = port self.database = database + self.url_database = url_database self.stats_provider = stats_provider self.profiling = profiling self.daemon = True + global _url_database_path + _url_database_path = url_database self.server = None self._stop_event = threading.Event() if not GEVENT_PATCHED else None # Load static library files into cache @@ -1171,7 +1587,8 @@ class ProxyAPIServer(threading.Thread): return [b'Method not allowed'] # POST only allowed for worker API endpoints - post_endpoints = ('/api/register', '/api/results', '/api/heartbeat') + post_endpoints = ('/api/register', '/api/results', '/api/heartbeat', + '/api/report-urls', '/api/report-proxies') if method == 'POST' and path not in post_endpoints: start_response('405 Method Not Allowed', [('Content-Type', 'text/plain')]) return [b'POST not allowed for this endpoint'] @@ -1242,7 +1659,11 @@ class ProxyAPIServer(threading.Thread): '/api/results': 'submit test results (POST, params: key)', '/api/register': 'register as worker (POST)', '/api/workers': 'list connected workers', + '/api/claim-urls': 'claim URL batch for worker-driven fetching (GET, params: key, count)', + '/api/report-urls': 'report URL fetch results (POST, params: key)', + '/api/report-proxies': 'report working proxies (POST, params: key)', '/proxies': 'list working proxies (params: limit, proto, country, asn)', + '/proxies/all': 'list ALL working proxies (params: proto, country, asn, format)', '/proxies/count': 'count working proxies', '/health': 'health check', } @@ -1392,18 +1813,76 @@ class ProxyAPIServer(threading.Thread): return json.dumps({'error': str(e)}), 'application/json', 500 elif path == '/proxies': try: + limit = min(int(query_params.get('limit', 100)), 1000) + proto = query_params.get('proto', '') + country = query_params.get('country', '') + asn = query_params.get('asn', '') + fmt = query_params.get('format', 'json') + + sql = 'SELECT ip, port, proto, country, asn, avg_latency, protos_working FROM proxylist WHERE failed=0 AND proto IS NOT NULL' + args = [] + if proto: + sql += ' AND proto=?' + args.append(proto) + if country: + sql += ' AND country=?' + args.append(country.upper()) + if asn: + sql += ' AND asn=?' + args.append(int(asn)) + sql += ' ORDER BY avg_latency ASC, tested DESC LIMIT ?' + args.append(limit) + db = mysqlite.mysqlite(self.database, str) - rows = db.execute( - 'SELECT proxy, proto, country, asn FROM proxylist WHERE failed=0 LIMIT 100' - ).fetchall() - proxies = [{'proxy': r[0], 'proto': r[1], 'country': r[2], 'asn': r[3]} for r in rows] - return json.dumps({'proxies': proxies}, indent=2), 'application/json', 200 + rows = db.execute(sql, args).fetchall() + + if fmt == 'plain': + return '\n'.join('%s:%s' % (r[0], r[1]) for r in rows), 'text/plain', 200 + proxies = [{ + 'proxy': '%s:%s' % (r[0], r[1]), 'proto': r[2], 'country': r[3], + 'asn': r[4], 'latency': r[5], + 'protos': r[6].split(',') if r[6] else [r[2]] + } for r in rows] + return json.dumps({'count': len(proxies), 'proxies': proxies}, indent=2), 'application/json', 200 + except Exception as e: + return json.dumps({'error': str(e)}), 'application/json', 500 + elif path == '/proxies/all': + try: + proto = query_params.get('proto', '') + country = query_params.get('country', '') + asn = query_params.get('asn', '') + fmt = query_params.get('format', 'json') + + sql = 'SELECT ip, port, proto, country, asn, avg_latency, protos_working FROM proxylist WHERE failed=0 AND proto IS NOT NULL' + args = [] + if proto: + sql += ' AND proto=?' + args.append(proto) + if country: + sql += ' AND country=?' + args.append(country.upper()) + if asn: + sql += ' AND asn=?' + args.append(int(asn)) + sql += ' ORDER BY avg_latency ASC, tested DESC' + + db = mysqlite.mysqlite(self.database, str) + rows = db.execute(sql, args).fetchall() + + if fmt == 'plain': + return '\n'.join('%s://%s:%s' % (r[2] or 'http', r[0], r[1]) for r in rows), 'text/plain', 200 + proxies = [{ + 'proxy': '%s:%s' % (r[0], r[1]), 'proto': r[2], 'country': r[3], + 'asn': r[4], 'latency': r[5], + 'protos': r[6].split(',') if r[6] else [r[2]] + } for r in rows] + return json.dumps({'count': len(proxies), 'proxies': proxies}, indent=2), 'application/json', 200 except Exception as e: return json.dumps({'error': str(e)}), 'application/json', 500 elif path == '/proxies/count': try: db = mysqlite.mysqlite(self.database, str) - row = db.execute('SELECT COUNT(*) FROM proxylist WHERE failed=0').fetchone() + row = db.execute('SELECT COUNT(*) FROM proxylist WHERE failed=0 AND proto IS NOT NULL').fetchone() return json.dumps({'count': row[0] if row else 0}), 'application/json', 200 except Exception as e: return json.dumps({'error': str(e)}), 'application/json', 500 @@ -1514,6 +1993,79 @@ class ProxyAPIServer(threading.Thread): _log('api/workers error: %s' % e, 'warn') return json.dumps({'error': str(e)}), 'application/json', 500 + elif path == '/api/claim-urls': + # Worker claims batch of URLs for fetching (GET) + key = query_params.get('key', '') + if not validate_worker_key(key): + return json.dumps({'error': 'invalid worker key'}), 'application/json', 403 + worker_id, _ = get_worker_by_key(key) + if not worker_id: + return json.dumps({'error': 'worker not found'}), 'application/json', 404 + if not self.url_database: + return json.dumps({'error': 'url database not configured'}), 'application/json', 500 + count = min(int(query_params.get('count', 5)), 20) + try: + url_db = mysqlite.mysqlite(self.url_database, str) + urls = claim_urls(url_db, worker_id, count) + update_worker_heartbeat(worker_id) + return json.dumps({ + 'worker_id': worker_id, + 'count': len(urls), + 'urls': urls, + }), 'application/json', 200 + except Exception as e: + return json.dumps({'error': str(e)}), 'application/json', 500 + + elif path == '/api/report-urls': + # Worker reports URL fetch results (POST) + key = query_params.get('key', '') + if not validate_worker_key(key): + return json.dumps({'error': 'invalid worker key'}), 'application/json', 403 + worker_id, _ = get_worker_by_key(key) + if not worker_id: + return json.dumps({'error': 'worker not found'}), 'application/json', 404 + if not self.url_database: + return json.dumps({'error': 'url database not configured'}), 'application/json', 500 + if not post_data: + return json.dumps({'error': 'POST body required'}), 'application/json', 400 + reports = post_data.get('reports', []) + if not reports: + return json.dumps({'error': 'no reports provided'}), 'application/json', 400 + try: + url_db = mysqlite.mysqlite(self.url_database, str) + processed = submit_url_reports(url_db, worker_id, reports) + update_worker_heartbeat(worker_id) + return json.dumps({ + 'worker_id': worker_id, + 'processed': processed, + }), 'application/json', 200 + except Exception as e: + return json.dumps({'error': str(e)}), 'application/json', 500 + + elif path == '/api/report-proxies': + # Worker reports working proxies (POST) + key = query_params.get('key', '') + if not validate_worker_key(key): + return json.dumps({'error': 'invalid worker key'}), 'application/json', 403 + worker_id, _ = get_worker_by_key(key) + if not worker_id: + return json.dumps({'error': 'worker not found'}), 'application/json', 404 + if not post_data: + return json.dumps({'error': 'POST body required'}), 'application/json', 400 + proxies = post_data.get('proxies', []) + if not proxies: + return json.dumps({'error': 'no proxies provided'}), 'application/json', 400 + try: + db = mysqlite.mysqlite(self.database, str) + processed = submit_proxy_reports(db, worker_id, proxies) + update_worker_heartbeat(worker_id) + return json.dumps({ + 'worker_id': worker_id, + 'processed': processed, + }), 'application/json', 200 + except Exception as e: + return json.dumps({'error': str(e)}), 'application/json', 500 + else: return json.dumps({'error': 'not found'}), 'application/json', 404 @@ -1535,7 +2087,7 @@ class ProxyAPIServer(threading.Thread): stats['top_countries'] = [{'code': r[0], 'count': r[1]} for r in rows] # Total counts - row = db.execute('SELECT COUNT(*) FROM proxylist WHERE failed=0').fetchone() + row = db.execute('SELECT COUNT(*) FROM proxylist WHERE failed=0 AND proto IS NOT NULL').fetchone() stats['working'] = row[0] if row else 0 row = db.execute('SELECT COUNT(*) FROM proxylist').fetchone() stats['total'] = row[0] if row else 0 diff --git a/ppf.py b/ppf.py index 748a37b..d9a28ce 100644 --- a/ppf.py +++ b/ppf.py @@ -5,10 +5,10 @@ __version__ = '2.0.0' import sys import os -# Worker mode requires gevent - must monkey-patch before other imports -if '--worker' in sys.argv or '--register' in sys.argv: - from gevent import monkey - monkey.patch_all() +# Gevent monkey-patch MUST happen before any other imports +# Both master (httpd) and worker modes use gevent for async I/O +from gevent import monkey +monkey.patch_all() import cProfile import pstats @@ -369,6 +369,71 @@ def worker_send_heartbeat(server_url, worker_key, tor_ok, tor_ip=None, profiling return False +def worker_claim_urls(server_url, worker_key, count=5): + """Claim batch of URLs for V2 worker mode.""" + url = '%s/api/claim-urls?key=%s&count=%d' % (server_url.rstrip('/'), worker_key, count) + + try: + resp = urllib2.urlopen(url, timeout=30) + result = json.loads(resp.read()) + return result.get('urls', []) + except urllib2.HTTPError as e: + if e.code == 403: + _log('worker key rejected (403), need to re-register', 'warn') + raise NeedReregister() + _log('failed to claim urls: %s' % e, 'error') + return [] + except Exception as e: + _log('failed to claim urls: %s' % e, 'error') + return [] + + +def worker_report_urls(server_url, worker_key, reports): + """Report URL fetch results to master.""" + url = '%s/api/report-urls?key=%s' % (server_url.rstrip('/'), worker_key) + data = json.dumps({'reports': reports}) + + req = urllib2.Request(url, data) + req.add_header('Content-Type', 'application/json') + + try: + resp = urllib2.urlopen(req, timeout=30) + result = json.loads(resp.read()) + return result.get('processed', 0) + except urllib2.HTTPError as e: + if e.code == 403: + _log('worker key rejected (403), need to re-register', 'warn') + raise NeedReregister() + _log('failed to report urls: %s' % e, 'error') + return 0 + except Exception as e: + _log('failed to report urls: %s' % e, 'error') + return 0 + + +def worker_report_proxies(server_url, worker_key, proxies): + """Report working proxies to master.""" + url = '%s/api/report-proxies?key=%s' % (server_url.rstrip('/'), worker_key) + data = json.dumps({'proxies': proxies}) + + req = urllib2.Request(url, data) + req.add_header('Content-Type', 'application/json') + + try: + resp = urllib2.urlopen(req, timeout=30) + result = json.loads(resp.read()) + return result.get('processed', 0) + except urllib2.HTTPError as e: + if e.code == 403: + _log('worker key rejected (403), need to re-register', 'warn') + raise NeedReregister() + _log('failed to report proxies: %s' % e, 'error') + return 0 + except Exception as e: + _log('failed to report proxies: %s' % e, 'error') + return 0 + + def check_tor_connectivity(tor_hosts): """Test Tor connectivity. Returns (working_hosts, tor_ip).""" import socket @@ -428,6 +493,7 @@ def worker_main(config): worker_name = config.args.worker_name or os.uname()[1] batch_size = config.worker.batch_size num_threads = config.watchd.threads + worker_id = None # Register if --register flag or no key provided if config.args.register or not worker_key: @@ -598,6 +664,7 @@ def worker_main(config): port = proxy_info['port'] proto = proxy_info.get('proto', 'http') failed = proxy_info.get('failed', 0) + source_proto = proxy_info.get('source_proto') proxy_str = '%s:%d' % (ip, port) # Create state for this proxy @@ -607,7 +674,7 @@ def worker_main(config): country=None, mitm=0, consecutive_success=0, asn=None, oldies=False, completion_queue=completion_queue, - proxy_full=proxy_str + proxy_full=proxy_str, source_proto=source_proto ) pending_states[proxy_str] = state @@ -706,6 +773,412 @@ def worker_main(config): _log(' proxies tested: %d' % proxies_tested, 'info') +def worker_v2_main(config): + """V2 worker mode -- URL-driven discovery. + + Claims URLs from master, fetches through Tor, extracts and tests proxies, + reports working proxies back to master. + """ + import json + global urllib2 + + try: + import Queue + except ImportError: + import queue as Queue + + import proxywatchd + proxywatchd.set_config(config) + + server_url = config.args.server + if not server_url: + _log('--server URL required for worker mode', 'error') + sys.exit(1) + + worker_key = config.args.worker_key + worker_name = config.args.worker_name or os.uname()[1] + num_threads = config.watchd.threads + url_batch_size = config.worker.url_batch_size + worker_id = None + + # Register if --register flag or no key provided + if config.args.register or not worker_key: + _log('registering with master: %s' % server_url, 'info') + worker_id, worker_key = worker_register(server_url, worker_name) + if not worker_key: + _log('registration failed, exiting', 'error') + sys.exit(1) + _log('registered as %s (id: %s)' % (worker_name, worker_id), 'info') + _log('worker key: %s' % worker_key, 'info') + _log('save this key with --worker-key for future runs', 'info') + + if config.args.register: + return + + _log('starting worker V2 mode (URL-driven)', 'info') + _log(' server: %s' % server_url, 'info') + _log(' threads: %d' % num_threads, 'info') + _log(' url batch: %d' % url_batch_size, 'info') + _log(' tor hosts: %s' % config.common.tor_hosts, 'info') + + # Verify Tor connectivity before starting + import socks + working_tor_hosts = [] + for tor_host in config.torhosts: + host, port = tor_host.split(':') + port = int(port) + try: + test_sock = socks.socksocket() + test_sock.set_proxy(socks.SOCKS5, host, port) + test_sock.settimeout(10) + test_sock.connect(('check.torproject.org', 80)) + test_sock.send(b'GET / HTTP/1.0\r\nHost: check.torproject.org\r\n\r\n') + resp = test_sock.recv(512) + test_sock.close() + if resp and (b'HTTP/' in resp or len(resp) > 0): + status = resp.split(b'\r\n')[0] if b'\r\n' in resp else resp[:50] + _log('tor host %s:%d OK (%s)' % (host, port, status), 'info') + working_tor_hosts.append(tor_host) + else: + _log('tor host %s:%d no response' % (host, port), 'warn') + except Exception as e: + _log('tor host %s:%d failed: %s' % (host, port, e), 'warn') + + if not working_tor_hosts: + _log('no working Tor hosts, cannot start worker', 'error') + sys.exit(1) + + _log('%d/%d Tor hosts verified' % (len(working_tor_hosts), len(config.torhosts)), 'info') + + # Create shared queues for worker threads + job_queue = proxywatchd.PriorityJobQueue() + completion_queue = Queue.Queue() + + # Spawn worker threads + threads = [] + for i in range(num_threads): + wt = proxywatchd.WorkerThread('w%d' % i, job_queue) + wt.start_thread() + threads.append(wt) + time.sleep(random.random() / 10) + + _log('spawned %d worker threads' % len(threads), 'info') + + # Session for fetching URLs through Tor + session = fetch.FetchSession() + + cycles = 0 + urls_fetched = 0 + proxies_found = 0 + proxies_working = 0 + start_time = time.time() + current_tor_ip = None + consecutive_tor_failures = 0 + worker_profiling = config.args.profile or config.common.profiling + wstate = {'worker_key': worker_key, 'worker_id': worker_id, 'backoff': 10} + + def do_register(): + """Register with master, with exponential backoff on failure.""" + while True: + _log('registering with master: %s' % server_url, 'info') + new_id, new_key = worker_register(server_url, worker_name) + if new_key: + wstate['worker_id'] = new_id + wstate['worker_key'] = new_key + wstate['backoff'] = 10 + _log('registered as %s (id: %s)' % (worker_name, new_id), 'info') + return True + else: + _log('registration failed, retrying in %ds' % wstate['backoff'], 'warn') + time.sleep(wstate['backoff']) + wstate['backoff'] = min(wstate['backoff'] * 2, 300) + + def wait_for_tor(): + """Wait for Tor to become available, checking every 30 seconds.""" + check_interval = 30 + while True: + working, tor_ip = check_tor_connectivity(config.torhosts) + if working: + _log('tor recovered: %s (%s)' % (working[0], tor_ip or 'unknown'), 'info') + try: + worker_send_heartbeat(server_url, wstate['worker_key'], True, tor_ip, worker_profiling, num_threads) + except NeedReregister: + do_register() + return working, tor_ip + _log('tor still down, retrying in %ds' % check_interval, 'warn') + try: + worker_send_heartbeat(server_url, wstate['worker_key'], False, None, worker_profiling, num_threads) + except NeedReregister: + do_register() + time.sleep(check_interval) + + try: + while True: + # Tor connectivity check + working, tor_ip = check_tor_connectivity(config.torhosts) + if not working: + consecutive_tor_failures += 1 + _log('tor down before claiming URLs (consecutive: %d)' % consecutive_tor_failures, 'warn') + try: + worker_send_heartbeat(server_url, wstate['worker_key'], False, None, worker_profiling, num_threads) + except NeedReregister: + do_register() + if consecutive_tor_failures >= 2: + _log('tor appears down, waiting before claiming URLs', 'error') + working, current_tor_ip = wait_for_tor() + consecutive_tor_failures = 0 + else: + time.sleep(10) + continue + else: + consecutive_tor_failures = 0 + if tor_ip != current_tor_ip: + if current_tor_ip: + _log('tor circuit rotated: %s' % tor_ip, 'info') + current_tor_ip = tor_ip + try: + worker_send_heartbeat(server_url, wstate['worker_key'], True, tor_ip, worker_profiling, num_threads) + except NeedReregister: + do_register() + + # Claim URLs from master + try: + url_infos = worker_claim_urls(server_url, wstate['worker_key'], url_batch_size) + except NeedReregister: + do_register() + continue + + if not url_infos: + _log('no URLs available, sleeping 30s', 'info') + time.sleep(30) + continue + + _log('claimed %d URLs to process' % len(url_infos), 'info') + + # Phase 1: Fetch URLs and extract proxies + url_reports = [] + all_extracted = [] # list of (addr, proto, confidence, source_url) + + for url_info in url_infos: + url = url_info.get('url', '') + last_hash = url_info.get('last_hash') + proto_hint = url_info.get('proto_hint') + + fetch_start = time.time() + try: + content = session.fetch(url) + except Exception as e: + _log('%s: fetch error: %s' % (url.split('/')[2] if '/' in url else url, e), 'error') + content = None + + fetch_time_ms = int((time.time() - fetch_start) * 1000) + urls_fetched += 1 + + if not content: + url_reports.append({ + 'url': url, + 'success': False, + 'content_hash': None, + 'proxy_count': 0, + 'fetch_time_ms': fetch_time_ms, + 'changed': False, + 'error': 'fetch failed', + }) + continue + + # Detect protocol from URL path + proto = fetch.detect_proto_from_path(url) or proto_hint + + # Extract proxies (no filter_known -- workers have no proxydb) + extracted = fetch.extract_proxies(content, filter_known=False, proto=proto) + + # Compute hash of extracted proxy list + content_hash = dbs.compute_proxy_list_hash(extracted) + + if content_hash and last_hash and content_hash == last_hash: + # Content unchanged + url_reports.append({ + 'url': url, + 'success': True, + 'content_hash': content_hash, + 'proxy_count': len(extracted), + 'fetch_time_ms': fetch_time_ms, + 'changed': False, + 'error': None, + }) + host = url.split('/')[2] if '/' in url else url + _log('%s: unchanged (%d proxies, hash match)' % (host, len(extracted)), 'stale') + continue + + # Content changed or first fetch + for addr, pr, conf in extracted: + all_extracted.append((addr, pr, conf, url)) + + url_reports.append({ + 'url': url, + 'success': True, + 'content_hash': content_hash, + 'proxy_count': len(extracted), + 'fetch_time_ms': fetch_time_ms, + 'changed': True, + 'error': None, + }) + + host = url.split('/')[2] if '/' in url else url + _log('%s: %d proxies extracted' % (host, len(extracted)), 'info') + + # Report URL health to master + if url_reports: + try: + worker_report_urls(server_url, wstate['worker_key'], url_reports) + except NeedReregister: + do_register() + try: + worker_report_urls(server_url, wstate['worker_key'], url_reports) + except NeedReregister: + _log('still rejected after re-register, discarding url reports', 'error') + + # Deduplicate extracted proxies by address + seen = set() + unique_proxies = [] + source_map = {} # addr -> first source_url + for addr, pr, conf, source_url in all_extracted: + if addr not in seen: + seen.add(addr) + unique_proxies.append((addr, pr, conf)) + source_map[addr] = source_url + + proxies_found += len(unique_proxies) + + if not unique_proxies: + cycles += 1 + time.sleep(1) + continue + + _log('testing %d unique proxies' % len(unique_proxies), 'info') + + # Phase 2: Test extracted proxies using worker thread pool + pending_states = {} + all_jobs = [] + checktypes = config.watchd.checktypes + + for addr, pr, conf in unique_proxies: + # Parse ip:port from addr (may contain auth: user:pass@ip:port) + addr_part = addr.split('@')[-1] if '@' in addr else addr + + # Handle IPv6 [ipv6]:port + if addr_part.startswith('['): + bracket_end = addr_part.index(']') + ip = addr_part[1:bracket_end] + port = int(addr_part[bracket_end+2:]) + else: + ip, port_str = addr_part.rsplit(':', 1) + port = int(port_str) + + proto = pr or 'http' + proxy_str = '%s:%d' % (ip, port) + + state = proxywatchd.ProxyTestState( + ip, port, proto, 0, + success_count=0, total_duration=0.0, + country=None, mitm=0, consecutive_success=0, + asn=None, oldies=False, + completion_queue=completion_queue, + proxy_full=addr, source_proto=pr + ) + pending_states[proxy_str] = state + + checktype = random.choice(checktypes) + + if checktype == 'judges': + available = proxywatchd.judge_stats.get_available_judges( + list(proxywatchd.judges.keys())) + target = random.choice(available) if available else random.choice( + list(proxywatchd.judges.keys())) + elif checktype == 'ssl': + target = random.choice(proxywatchd.ssl_targets) + elif checktype == 'irc': + target = random.choice(config.servers) if config.servers else 'irc.libera.chat:6667' + else: # head + target = random.choice(list(proxywatchd.regexes.keys())) + + job = proxywatchd.TargetTestJob(state, target, checktype) + all_jobs.append(job) + + random.shuffle(all_jobs) + for job in all_jobs: + job_queue.put(job, priority=0) + + # Wait for completion + completed = 0 + timeout_start = time.time() + timeout_seconds = max(config.watchd.timeout * 2 + 30, len(all_jobs) * 0.5) + working_results = [] + + while completed < len(all_jobs): + try: + state = completion_queue.get(timeout=1) + completed += 1 + + if state.failcount == 0: + latency_sec = (state.last_latency_ms / 1000.0) if state.last_latency_ms else 0 + proxy_addr = state.proxy + if state.auth: + proxy_addr = '%s@%s' % (state.auth, state.proxy) + + working_results.append({ + 'ip': state.ip, + 'port': state.port, + 'proto': state.proto, + 'source_proto': state.source_proto, + 'latency': round(latency_sec, 3), + 'exit_ip': state.exit_ip, + 'source_url': source_map.get(proxy_addr) or source_map.get(state.proxy, ''), + }) + + if completed % 50 == 0 or completed == len(all_jobs): + _log('tested %d/%d proxies (%d working)' % ( + completed, len(all_jobs), len(working_results)), 'info') + + except Queue.Empty: + if time.time() - timeout_start > timeout_seconds: + _log('test timeout, %d/%d completed' % (completed, len(all_jobs)), 'warn') + break + continue + + proxies_working += len(working_results) + + # Report working proxies to master + if working_results: + try: + processed = worker_report_proxies(server_url, wstate['worker_key'], working_results) + except NeedReregister: + do_register() + try: + processed = worker_report_proxies(server_url, wstate['worker_key'], working_results) + except NeedReregister: + _log('still rejected after re-register, discarding proxy reports', 'error') + processed = 0 + _log('reported %d working proxies (submitted %d)' % (len(working_results), processed), 'info') + + cycles += 1 + time.sleep(1) + + except KeyboardInterrupt: + elapsed = time.time() - start_time + _log('worker V2 stopping...', 'info') + session.close() + for wt in threads: + wt.stop() + for wt in threads: + wt.term() + _log('worker V2 stopped after %s' % format_duration(int(elapsed)), 'info') + _log(' cycles: %d' % cycles, 'info') + _log(' urls fetched: %d' % urls_fetched, 'info') + _log(' proxies found: %d' % proxies_found, 'info') + _log(' proxies working: %d' % proxies_working, 'info') + + def main(): """Main entry point.""" global config @@ -718,7 +1191,12 @@ def main(): else: sys.exit(1) - # Worker mode: connect to master server instead of running locally + # V2 worker mode: URL-driven discovery + if config.args.worker_v2: + worker_v2_main(config) + return + + # V1 worker mode: connect to master server instead of running locally if config.args.worker or config.args.register: worker_main(config) return @@ -747,8 +1225,14 @@ def main(): watcherd = None # Start httpd independently when watchd is disabled if config.httpd.enabled: - from httpd import ProxyAPIServer + from httpd import ProxyAPIServer, configure_url_scoring import network_stats + configure_url_scoring( + config.ppf.checktime, + config.ppf.perfail_checktime, + config.ppf.max_fail, + config.ppf.list_max_age_days + ) def httpd_stats_provider(): """Stats provider for httpd-only mode (scraping without testing).""" @@ -780,6 +1264,7 @@ def main(): config.watchd.database, stats_provider=httpd_stats_provider, profiling=profiling, + url_database=config.ppf.database, ) httpd_server.start() @@ -838,9 +1323,6 @@ def main(): else: _log('handing %d job(s) to %d thread(s)' % ( len(rows), config.ppf.threads ), 'ppf') - _proxylist = [ '%s://%s' % (p[0], p[1]) for p in proxydb.execute("SELECT proto,proxy from proxylist where failed=0 AND tested IS NOT NULL AND proto IN ('http','socks4','socks5')").fetchall() ] - if not _proxylist: _proxylist = None - for thread in threads: if thread.status == 'ok': url, proxylist, stale_count, error, retrievals, content_type, proxies_added, execute = thread.retrieve() @@ -857,6 +1339,9 @@ def main(): threads = [ thread for thread in threads if thread.is_alive() ] if len(threads) < config.ppf.threads and rows: + # Only query proxydb when actually starting a new thread (reduces GIL blocking) + _proxylist = [ '%s://%s' % (p[0], p[1]) for p in proxydb.execute("SELECT proto,proxy from proxylist where failed=0 AND tested IS NOT NULL AND proto IN ('http','socks4','socks5')").fetchall() ] + if not _proxylist: _proxylist = None p = random.sample(_proxylist, min(5, len(_proxylist))) if _proxylist else None row = random.choice(rows) urldb.execute('UPDATE uris SET check_time=? where url=?', (time.time(), row[0])) diff --git a/proxywatchd.py b/proxywatchd.py index 0755887..5c5f73c 100644 --- a/proxywatchd.py +++ b/proxywatchd.py @@ -142,6 +142,20 @@ def is_valid_ip(ip_str): except (ValueError, AttributeError): return False +def is_public_ip(ip_str): + """Validate IP is a public, globally routable address.""" + if not is_valid_ip(ip_str): + return False + parts = [int(p) for p in ip_str.split('.')] + if parts[0] == 0: return False # 0.0.0.0/8 + if parts[0] == 10: return False # 10.0.0.0/8 + if parts[0] == 127: return False # 127.0.0.0/8 + if parts[0] == 169 and parts[1] == 254: return False # link-local + if parts[0] == 172 and 16 <= parts[1] <= 31: return False # 172.16/12 + if parts[0] == 192 and parts[1] == 168: return False # 192.168/16 + if parts[0] >= 224: return False # multicast + reserved + return True + # Pattern for header echo - if X-Forwarded-For or Via present, proxy reveals chain HEADER_REVEAL_PATTERN = r'(X-Forwarded-For|Via|X-Real-Ip|Forwarded)' @@ -285,12 +299,12 @@ class ProxyTestState(object): 'asn', 'isoldies', 'completion_queue', 'lock', 'results', 'completed', 'evaluated', 'last_latency_ms', 'exit_ip', 'reveals_headers', 'last_fail_category', 'original_failcount', 'had_ssl_test', 'ssl_success', - 'cert_error' + 'cert_error', 'source_proto', 'protos_working' ) def __init__(self, ip, port, proto, failcount, success_count, total_duration, country, mitm, consecutive_success, asn=None, oldies=False, - completion_queue=None, proxy_full=None): + completion_queue=None, proxy_full=None, source_proto=None): self.ip = ip self.port = int(port) self.proxy = '%s:%s' % (ip, port) @@ -326,6 +340,9 @@ class ProxyTestState(object): self.had_ssl_test = False self.ssl_success = False self.cert_error = False + # Protocol fingerprinting + self.source_proto = source_proto + self.protos_working = None def record_result(self, success, proto=None, duration=0, srv=None, tor=None, ssl=None, category=None, exit_ip=None, reveals_headers=None): """Record a single target test result. Thread-safe. @@ -390,10 +407,20 @@ class ProxyTestState(object): self.evaluated = True self.checktime = int(time.time()) - successes = [r for r in self.results if r['success']] - failures = [r for r in self.results if not r['success']] + # Filter out judge_block results (inconclusive, neither pass nor fail) + real_results = [r for r in self.results if r.get('category') != 'judge_block'] + successes = [r for r in real_results if r['success']] + failures = [r for r in real_results if not r['success']] num_success = len(successes) - _dbg('evaluate: %d success, %d fail, results=%d' % (num_success, len(failures), len(self.results)), self.proxy) + judge_blocks = len(self.results) - len(real_results) + _dbg('evaluate: %d success, %d fail, %d judge_block, results=%d' % ( + num_success, len(failures), judge_blocks, len(self.results)), self.proxy) + + # All results were judge blocks: inconclusive, preserve current state + if not real_results and self.results: + _dbg('all results inconclusive (judge_block), no state change', self.proxy) + self.failcount = self.original_failcount + return (self.original_failcount == 0, None) # Determine dominant failure category fail_category = None @@ -432,7 +459,22 @@ class ProxyTestState(object): if config.watchd.debug: _log('ASN lookup failed for %s: %s' % (self.ip, e), 'debug') - self.proto = last_good['proto'] + # Collect all distinct working protocols + working_protos = set() + for s in successes: + if s.get('proto'): + working_protos.add(s['proto']) + if working_protos: + self.protos_working = ','.join(sorted(working_protos)) + # Pick most specific protocol: socks5 > socks4 > http + for best in ('socks5', 'socks4', 'http'): + if best in working_protos: + self.proto = best + break + else: + self.proto = last_good['proto'] + else: + self.proto = last_good['proto'] self.failcount = 0 # Only reset mitm after 3 consecutive clean successes (not on first success) # and only if this test didn't detect MITM @@ -529,6 +571,12 @@ class TargetTestJob(object): recv = sock.recv(-1) _sample_dbg('RECV: %d bytes from %s, first 80: %r' % (len(recv), srv, recv[:80]), self.proxy_state.proxy) + # Validate HTTP response for non-IRC checks + if self.checktype != 'irc' and not recv.startswith('HTTP/'): + _dbg('not an HTTP response, failing (first 40: %r)' % recv[:40], self.proxy_state.proxy) + self.proxy_state.record_result(False, category='bad_response') + return + # Select regex based on check type (or fallback target) if 'check.torproject.org' in srv: # Tor API fallback (judge using torproject.org) @@ -553,7 +601,7 @@ class TargetTestJob(object): reveals_headers = None if self.checktype == 'judges' or 'check.torproject.org' in srv: ip_match = re.search(IP_PATTERN, recv) - if ip_match and is_valid_ip(ip_match.group(0)): + if ip_match and is_public_ip(ip_match.group(0)): exit_ip = ip_match.group(0) if self.checktype == 'judges' and 'check.torproject.org' not in srv: # Check for header echo judge (elite detection) @@ -572,17 +620,14 @@ class TargetTestJob(object): # Check if judge is blocking us (not a proxy failure) if self.checktype == 'judges' and JUDGE_BLOCK_RE.search(recv): judge_stats.record_block(srv) - # Judge block = proxy worked, we got HTTP response, just no IP - # Count as success without exit_ip - block_elapsed = time.time() - duration - _dbg('judge BLOCK detected, counting as success', self.proxy_state.proxy) + # Judge block = inconclusive, not a pass or fail + _dbg('judge BLOCK detected, skipping (neutral)', self.proxy_state.proxy) self.proxy_state.record_result( - True, proto=proto, duration=block_elapsed, - srv=srv, tor=tor, ssl=is_ssl, exit_ip=None, - reveals_headers=None + False, category='judge_block', proto=proto, + srv=srv, tor=tor, ssl=is_ssl ) if config.watchd.debug: - _log('judge %s challenged proxy %s (counted as success)' % ( + _log('judge %s challenged proxy %s (neutral, skipped)' % ( srv, self.proxy_state.proxy), 'debug') else: _dbg('FAIL: no match, no block', self.proxy_state.proxy) @@ -598,6 +643,45 @@ class TargetTestJob(object): finally: sock.disconnect() + def _build_proto_order(self): + """Build smart protocol test order based on available intelligence. + + Priority: + 1. Previously successful proto (if set) + 2. Source-detected proto (if different, confidence >= 60) + 3. Remaining protos in default order: socks5, socks4, http + + For failing proxies (failcount > 0 and proto known), only retest + with the known proto to save resources. + """ + ps = self.proxy_state + default_order = ['socks5', 'socks4', 'http'] + + # Known proto from previous test: only retest that + if ps.proto is not None: + # For failing proxies, skip multi-proto discovery + if ps.failcount > 0: + return [ps.proto] + # For working proxies, lead with known proto but try others + protos = [ps.proto] + # Add source hint if different + if ps.source_proto and ps.source_proto != ps.proto: + protos.append(ps.source_proto) + # Fill remaining + for p in default_order: + if p not in protos: + protos.append(p) + return protos + + # Unknown proto: use source hint if available + protos = [] + if ps.source_proto: + protos.append(ps.source_proto) + for p in default_order: + if p not in protos: + protos.append(p) + return protos + def _connect_and_test(self): """Connect to target through the proxy and send test packet. @@ -615,18 +699,17 @@ class TargetTestJob(object): _log('FIRST TEST: proxy=%s target=%s check=%s ssl_first=%s' % ( ps.proxy, self.target_srv, self.checktype, config.watchd.ssl_first), 'info') - protos = ['http', 'socks5', 'socks4'] if ps.proto is None else [ps.proto] + protos = self._build_proto_order() pool = connection_pool.get_pool() - # Phase 1: SSL handshake (if ssl_first enabled) - if config.watchd.ssl_first: + # Phase 1: SSL handshake (if ssl_first enabled or SSL-only mode) + if config.watchd.ssl_first or self.checktype == 'none': result = self._try_ssl_handshake(protos, pool) if result is not None: return result # SSL succeeded or MITM detected # SSL failed for all protocols - if config.watchd.ssl_only: - # ssl_only mode: skip secondary check, mark as failed - _dbg('SSL failed, ssl_only mode, skipping secondary check', ps.proxy) + if config.watchd.ssl_only or self.checktype == 'none': + _dbg('SSL failed, no secondary check', ps.proxy) return (None, None, 0, None, None, 1, 0, 'ssl_only') _dbg('SSL failed, trying secondary check: %s' % self.checktype, ps.proxy) @@ -882,7 +965,15 @@ class WorkerThread(): nao = time.time() # Assign worker ID for connection pool affinity job.worker_id = self.id - job.run() + try: + job.run() + except Exception as e: + # Ensure state completes on unexpected exceptions (prevents memory leak) + _log('job exception: %s' % e, 'error') + try: + job.proxy_state.record_result(False, category='exception') + except Exception: + pass # State may already be completed spent = time.time() - nao job_count += 1 duration_total += spent @@ -1251,7 +1342,7 @@ class Proxywatchd(): # Build due condition using new schedule formula due_sql, due_params = _build_due_sql() q = '''SELECT ip,port,proto,failed,success_count,total_duration,country,mitm, - consecutive_success,asn,proxy FROM proxylist WHERE %s ORDER BY RANDOM()''' % due_sql + consecutive_success,asn,proxy,source_proto FROM proxylist WHERE %s ORDER BY RANDOM()''' % due_sql _dbg('fetch_rows: working=%d fail_interval=%d backoff=%s max_fail=%d' % ( config.watchd.working_checktime, config.watchd.fail_retry_interval, config.watchd.fail_retry_backoff, config.watchd.max_fail)) @@ -1271,7 +1362,7 @@ class Proxywatchd(): now = time.time() oldies_max = config.watchd.max_fail + round(config.watchd.max_fail / 2) q_oldies = '''SELECT ip,port,proto,failed,success_count,total_duration,country, - mitm,consecutive_success,asn,proxy FROM proxylist + mitm,consecutive_success,asn,proxy,source_proto FROM proxylist WHERE failed >= ? AND failed < ? AND (tested + ?) < ? ORDER BY RANDOM()''' rows = db.execute(q_oldies, (config.watchd.max_fail, oldies_max, @@ -1292,7 +1383,11 @@ class Proxywatchd(): # Build target pools for each checktype target_pools = {} for ct in checktypes: - if ct == 'irc': + if ct == 'none': + # SSL-only mode: use ssl_targets as placeholder + target_pools[ct] = ssl_targets + _dbg('target_pool[none]: SSL-only mode, %d ssl targets' % len(ssl_targets)) + elif ct == 'irc': target_pools[ct] = config.servers _dbg('target_pool[irc]: %d servers' % len(config.servers)) elif ct == 'judges': @@ -1314,12 +1409,12 @@ class Proxywatchd(): for row in rows: # create shared state for this proxy # row: ip, port, proto, failed, success_count, total_duration, - # country, mitm, consecutive_success, asn, proxy + # country, mitm, consecutive_success, asn, proxy, source_proto state = ProxyTestState( row[0], row[1], row[2], row[3], row[4], row[5], row[6], row[7], row[8], asn=row[9], oldies=self.isoldies, completion_queue=self.completion_queue, - proxy_full=row[10] + proxy_full=row[10], source_proto=row[11] ) new_states.append(state) @@ -1424,7 +1519,7 @@ class Proxywatchd(): dead_count += 1 args.append((effective_failcount, job.checktime, 1, job.country, job.proto, job.success_count, job.total_duration, job.mitm, - job.consecutive_success, job.asn, job.proxy)) + job.consecutive_success, job.asn, job.protos_working, job.proxy)) success_rate = (float(sc) / len(self.collected)) * 100 ret = True @@ -1438,7 +1533,7 @@ class Proxywatchd(): if job.failcount == 0: args.append((job.failcount, job.checktime, 1, job.country, job.proto, job.success_count, job.total_duration, job.mitm, - job.consecutive_success, job.asn, job.proxy)) + job.consecutive_success, job.asn, job.protos_working, job.proxy)) if job.last_latency_ms is not None: latency_updates.append((job.proxy, job.last_latency_ms)) ret = False @@ -1455,7 +1550,7 @@ class Proxywatchd(): if job.failcount == 0 and job.exit_ip] with self._db_context() as db: - query = 'UPDATE proxylist SET failed=?,tested=?,dronebl=?,country=?,proto=?,success_count=?,total_duration=?,mitm=?,consecutive_success=?,asn=? WHERE proxy=?' + query = 'UPDATE proxylist SET failed=?,tested=?,dronebl=?,country=?,proto=?,success_count=?,total_duration=?,mitm=?,consecutive_success=?,asn=?,protos_working=? WHERE proxy=?' db.executemany(query, args) # Batch update latency metrics for successful proxies @@ -1687,7 +1782,7 @@ class Proxywatchd(): # Start HTTP API server if enabled if config.httpd.enabled: - from httpd import ProxyAPIServer, configure_schedule + from httpd import ProxyAPIServer, configure_schedule, configure_url_scoring # Pass schedule config to httpd module configure_schedule( config.watchd.working_checktime, @@ -1695,11 +1790,18 @@ class Proxywatchd(): config.watchd.fail_retry_backoff, config.watchd.max_fail ) + configure_url_scoring( + config.ppf.checktime, + config.ppf.perfail_checktime, + config.ppf.max_fail, + config.ppf.list_max_age_days + ) self.httpd_server = ProxyAPIServer( config.httpd.listenip, config.httpd.port, config.watchd.database, - stats_provider=self.get_runtime_stats + stats_provider=self.get_runtime_stats, + url_database=config.ppf.database, ) self.httpd_server.start() @@ -1734,27 +1836,32 @@ class Proxywatchd(): sleeptime -= 1 continue - # check if job queue is empty (work-stealing: threads pull as needed) - if self.job_queue.empty(): + # Skip job processing when threads=0 (master-only mode) + if config.watchd.threads > 0: + # check if job queue is empty (work-stealing: threads pull as needed) + if self.job_queue.empty(): + self.collect_work() + if not self.submit_collected() and self.tor_safeguard: + _log("zzZzZzzZ sleeping 1 minute(s) due to tor issues", "watchd") + sleeptime = 60 + else: + job_count = self.prepare_jobs() + if job_count == 0: + # no jobs available, wait before checking again + sleeptime = 10 + + if not self.in_background: # single_thread scenario + self.threads[0].workloop() + self.collect_work() - if not self.submit_collected() and self.tor_safeguard: - _log("zzZzZzzZ sleeping 1 minute(s) due to tor issues", "watchd") - sleeptime = 60 - else: - job_count = self.prepare_jobs() - if job_count == 0: - # no jobs available, wait before checking again - sleeptime = 10 - if not self.in_background: # single_thread scenario - self.threads[0].workloop() - - self.collect_work() - - if len(self.collected) > self.submit_after: - if not self.submit_collected() and self.tor_safeguard: - _log("zzZzZzzZ sleeping 1 minute(s) due to tor issues", "watchd") - sleeptime = 60 + if len(self.collected) > self.submit_after: + if not self.submit_collected() and self.tor_safeguard: + _log("zzZzZzzZ sleeping 1 minute(s) due to tor issues", "watchd") + sleeptime = 60 + else: + # Master-only mode: sleep to avoid busy loop + sleeptime = 10 # Update rate history for sparklines self.stats.update_history() diff --git a/stats.py b/stats.py index a24396c..1b6dc95 100644 --- a/stats.py +++ b/stats.py @@ -107,11 +107,9 @@ regexes = { 'www.twitter.com': 'x-connection-hash', 't.co': 'x-connection-hash', 'www.msn.com': 'x-aspnetmvc-version', - 'www.bing.com': 'p3p', 'www.ask.com': 'x-served-by', 'www.hotmail.com': 'x-msedge-ref', 'www.bbc.co.uk': 'x-bbc-edge-cache-status', - 'www.skype.com': 'X-XSS-Protection', 'www.alibaba.com': 'object-status', 'www.mozilla.org': 'cf-ray', 'www.cloudflare.com': 'cf-ray', @@ -121,7 +119,6 @@ regexes = { 'www.netflix.com': 'X-Netflix.proxy.execution-time', 'www.amazon.de': 'x-amz-cf-id', 'www.reuters.com': 'x-amz-cf-id', - 'www.ikea.com': 'x-frame-options', 'www.twitpic.com': 'timing-allow-origin', 'www.digg.com': 'cf-request-id', 'www.wikia.com': 'x-served-by', @@ -133,8 +130,6 @@ regexes = { 'www.yelp.com': 'x-timer', 'www.ebay.com': 'x-envoy-upstream-service-time', 'www.wikihow.com': 'x-c', - 'www.archive.org': 'referrer-policy', - 'www.pandora.tv': 'X-UA-Compatible', 'www.w3.org': 'x-backend', 'www.time.com': 'x-amz-cf-pop' }