feat: worker-driven discovery and validation tightening #1

Merged
username merged 24 commits from feature/worker-driven-discovery into master 2026-02-17 17:39:49 +00:00
14 changed files with 2130 additions and 1357 deletions

126
CLAUDE.md
View File

@@ -7,9 +7,9 @@
│ Host │ Role │ Notes
├──────────┼─────────────┼────────────────────────────────────────────────────────┤
│ odin │ Master │ Scrapes proxy lists, verifies conflicts, port 8081
forge │ Worker │ Tests proxies, reports to master via WireGuard
hermes │ Worker │ Tests proxies, reports to master via WireGuard
janus │ Worker │ Tests proxies, reports to master via WireGuard
cassius │ Worker │ Tests proxies, reports to master via WireGuard
edge │ Worker │ Tests proxies, reports to master via WireGuard
sentinel │ Worker │ Tests proxies, reports to master via WireGuard
└──────────┴─────────────┴────────────────────────────────────────────────────────┘
```
@@ -43,20 +43,21 @@ cd /opt/ansible && source venv/bin/activate
```bash
# Check worker status
ANSIBLE_REMOTE_TMP=/tmp/.ansible ansible forge,hermes,janus -m shell -a "hostname"
ANSIBLE_REMOTE_TMP=/tmp/.ansible ansible cassius,edge,sentinel -m shell -a "hostname"
# Check worker config
ANSIBLE_REMOTE_TMP=/tmp/.ansible ansible forge,hermes,janus -m shell -a "grep -E 'threads|timeout|ssl' /home/podman/ppf/config.ini"
ANSIBLE_REMOTE_TMP=/tmp/.ansible ansible cassius,edge,sentinel -m shell -a "grep -E 'threads|timeout|ssl' /home/podman/ppf/config.ini"
# Check worker logs
ANSIBLE_REMOTE_TMP=/tmp/.ansible ansible forge -m shell -a "sudo -u podman journalctl --user -u ppf-worker -n 20"
# Check worker logs (dynamic UID)
ANSIBLE_REMOTE_TMP=/tmp/.ansible ansible cassius -m raw \
-a "uid=\$(id -u podman) && sudo -u podman podman logs --tail 20 ppf-worker"
# Modify config option
ANSIBLE_REMOTE_TMP=/tmp/.ansible ansible forge,hermes,janus -m lineinfile -a "path=/home/podman/ppf/config.ini line='ssl_only = 1' insertafter='ssl_first'"
ANSIBLE_REMOTE_TMP=/tmp/.ansible ansible cassius,edge,sentinel -m lineinfile -a "path=/home/podman/ppf/config.ini line='ssl_only = 1' insertafter='ssl_first'"
# Restart workers (different UIDs!)
ANSIBLE_REMOTE_TMP=/tmp/.ansible ansible janus,forge -m raw -a "sudo -u podman XDG_RUNTIME_DIR=/run/user/996 systemctl --user restart ppf-worker"
ANSIBLE_REMOTE_TMP=/tmp/.ansible ansible hermes -m raw -a "sudo -u podman XDG_RUNTIME_DIR=/run/user/1001 systemctl --user restart ppf-worker"
# Restart workers (dynamic UID discovery)
ANSIBLE_REMOTE_TMP=/tmp/.ansible ansible cassius,edge,sentinel -m raw \
-a "uid=\$(id -u podman) && sudo -u podman XDG_RUNTIME_DIR=/run/user/\$uid podman restart ppf-worker"
```
## Full Deployment Procedure
@@ -78,11 +79,11 @@ ANSIBLE_REMOTE_TMP=/tmp/.ansible ansible odin -m synchronize \
-a "src=/home/user/git/ppf/ dest=/home/podman/ppf/ rsync_opts='--include=*.py,--include=servers.txt,--exclude=*'"
# Deploy to WORKERS (ppf/src/ subdirectory)
ANSIBLE_REMOTE_TMP=/tmp/.ansible ansible forge,hermes,janus -m synchronize \
ANSIBLE_REMOTE_TMP=/tmp/.ansible ansible cassius,edge,sentinel -m synchronize \
-a "src=/home/user/git/ppf/ dest=/home/podman/ppf/src/ rsync_opts='--include=*.py,--include=servers.txt,--exclude=*'"
# CRITICAL: Fix ownership on ALL hosts (rsync uses ansible user, containers need podman)
ANSIBLE_REMOTE_TMP=/tmp/.ansible ansible odin,forge,hermes,janus -m raw \
ANSIBLE_REMOTE_TMP=/tmp/.ansible ansible odin,cassius,edge,sentinel -m raw \
-a "chown -R podman:podman /home/podman/ppf/"
```
@@ -95,11 +96,9 @@ ANSIBLE_REMOTE_TMP=/tmp/.ansible ansible odin,forge,hermes,janus -m raw \
ansible odin -m raw \
-a "cd /tmp && XDG_RUNTIME_DIR=/run/user/1005 runuser -u podman -- podman restart ppf"
# Restart WORKERS (note different UIDs)
ansible janus,forge -m raw \
-a "sudo -u podman XDG_RUNTIME_DIR=/run/user/996 systemctl --user restart ppf-worker"
ansible hermes -m raw \
-a "sudo -u podman XDG_RUNTIME_DIR=/run/user/1001 systemctl --user restart ppf-worker"
# Restart WORKERS (dynamic UID discovery)
ansible cassius,edge,sentinel -m raw \
-a "uid=\$(id -u podman) && sudo -u podman XDG_RUNTIME_DIR=/run/user/\$uid podman restart ppf-worker"
```
### Step 4: Verify All Running
@@ -109,11 +108,9 @@ ansible hermes -m raw \
ansible odin -m raw \
-a "cd /tmp && XDG_RUNTIME_DIR=/run/user/1005 runuser -u podman -- podman ps"
# Check workers
ansible janus,forge -m raw \
-a "sudo -u podman XDG_RUNTIME_DIR=/run/user/996 systemctl --user is-active ppf-worker"
ansible hermes -m raw \
-a "sudo -u podman XDG_RUNTIME_DIR=/run/user/1001 systemctl --user is-active ppf-worker"
# Check workers (dynamic UID discovery)
ansible cassius,edge,sentinel -m raw \
-a "uid=\$(id -u podman) && sudo -u podman XDG_RUNTIME_DIR=/run/user/\$uid podman ps --format '{{.Names}} {{.Status}}'"
```
## Podman User IDs
@@ -123,12 +120,14 @@ ansible hermes -m raw \
│ Host │ UID │ XDG_RUNTIME_DIR
├──────────┼───────┼─────────────────────────────┤
│ odin │ 1005 │ /run/user/1005
hermes │ 1001 │ /run/user/1001
janus │ 996 │ /run/user/996
forge │ 996 │ /run/user/996
cassius │ 993 │ /run/user/993
edge │ 993 │ /run/user/993
sentinel │ 992 │ /run/user/992
└──────────┴───────┴─────────────────────────────┘
```
**Prefer dynamic UID discovery** (`uid=$(id -u podman)`) over hardcoded values.
## Configuration
### Odin config.ini
@@ -186,41 +185,26 @@ batch_size = clamp(fair_share, min=100, max=1000)
- Workers shuffle their batch locally to avoid testing same proxies simultaneously
- Claims expire after 5 minutes if not completed
## Worker systemd Unit
## Worker Container
Located at `/home/podman/.config/systemd/user/ppf-worker.service`:
Workers run as podman containers with `--restart=unless-stopped`:
```ini
[Unit]
Description=PPF Worker Container
After=network-online.target tor.service
[Service]
Type=simple
Restart=on-failure
RestartSec=10
WorkingDirectory=%h
ExecStartPre=-/usr/bin/podman stop -t 10 ppf-worker
ExecStartPre=-/usr/bin/podman rm -f ppf-worker
ExecStart=/usr/bin/podman run \
--name ppf-worker --rm --log-driver=journald --network=host \
-v %h/ppf/src:/app:ro \
-v %h/ppf/data:/app/data \
-v %h/ppf/config.ini:/app/config.ini:ro \
-e PYTHONUNBUFFERED=1 \
localhost/ppf-worker:latest \
python -u ppf.py --worker --server http://10.200.1.250:8081
ExecStop=/usr/bin/podman stop -t 10 ppf-worker
[Install]
WantedBy=default.target
```bash
podman run -d --name ppf-worker --network=host --restart=unless-stopped \
-e PYTHONUNBUFFERED=1 \
-v /home/podman/ppf/src:/app:ro,Z \
-v /home/podman/ppf/data:/app/data:Z \
-v /home/podman/ppf/config.ini:/app/config.ini:ro,Z \
-v /home/podman/ppf/servers.txt:/app/servers.txt:ro,Z \
localhost/ppf-worker:latest \
python -u ppf.py --worker --server http://10.200.1.250:8081
```
## Rebuilding Images
```bash
# Workers - from ppf/ directory (Dockerfile copies from src/)
ansible forge,hermes,janus -m raw \
ansible cassius,edge,sentinel -m raw \
-a "cd /home/podman/ppf && sudo -u podman podman build -t localhost/ppf-worker:latest ."
# Odin - from ppf/ directory
@@ -231,14 +215,16 @@ ansible odin -m raw \
## API Endpoints
```
/dashboard Web UI with live statistics
/map Interactive world map
/health Health check
/api/stats Runtime statistics (JSON)
/api/workers Connected worker status
/api/memory Memory profiling data
/api/countries Proxy counts by country
/proxies Working proxies list
/dashboard Web UI with live statistics
/map Interactive world map
/health Health check
/api/stats Runtime statistics (JSON)
/api/workers Connected worker status
/api/countries Proxy counts by country
/api/claim-urls Claim URL batch for worker-driven fetching (GET)
/api/report-urls Report URL fetch results (POST)
/api/report-proxies Report working proxies (POST)
/proxies Working proxies list
```
## Troubleshooting
@@ -247,7 +233,7 @@ ansible odin -m raw \
Workers need `servers.txt` in src/:
```bash
ansible forge,hermes,janus -m copy \
ansible cassius,edge,sentinel -m copy \
-a "src=/home/user/git/ppf/servers.txt dest=/home/podman/ppf/src/servers.txt owner=podman group=podman"
```
@@ -270,15 +256,17 @@ ansible odin -m raw -a "cd /tmp; sudo -u podman podman restart ppf"
### Worker Keeps Crashing
1. Check systemd status with correct UID
2. Verify servers.txt exists in src/
3. Check ownership
4. Run manually to see error:
1. Check container status: `sudo -u podman podman ps -a`
2. Check logs: `sudo -u podman podman logs --tail 50 ppf-worker`
3. Verify servers.txt exists in src/
4. Check ownership: `ls -la /home/podman/ppf/src/`
5. Run manually to see error:
```bash
sudo -u podman podman run --rm --network=host \
-v /home/podman/ppf/src:/app:ro \
-v /home/podman/ppf/data:/app/data \
-v /home/podman/ppf/config.ini:/app/config.ini:ro \
-v /home/podman/ppf/src:/app:ro,Z \
-v /home/podman/ppf/data:/app/data:Z \
-v /home/podman/ppf/config.ini:/app/config.ini:ro,Z \
-v /home/podman/ppf/servers.txt:/app/servers.txt:ro,Z \
localhost/ppf-worker:latest \
python -u ppf.py --worker --server http://10.200.1.250:8081
```

View File

@@ -45,83 +45,15 @@ PPF (Proxy Fetcher) is a Python 2 proxy scraping and validation framework design
---
## Phase 1: Stability & Code Quality
## Open Work
**Objective:** Establish a solid, maintainable codebase
### 1.1 Error Handling Improvements
### Validation
| Task | Description | File(s) |
|------|-------------|---------|
| Add connection retry logic | Implement exponential backoff for failed connections | rocksock.py, fetch.py |
| Graceful database errors | Handle SQLite lock/busy errors with retry | mysqlite.py |
| Timeout standardization | Consistent timeout handling across all network ops | proxywatchd.py, fetch.py |
| Exception logging | Log exceptions with context, not just silently catch | all files |
### 1.2 Code Consolidation
| Task | Description | File(s) |
|------|-------------|---------|
| Unify _known_proxies | Single source of truth for known proxy cache | ppf.py, fetch.py |
| Extract proxy utils | Create proxy_utils.py with cleanse/validate functions | new file |
| Remove global config pattern | Pass config explicitly instead of set_config() | fetch.py |
| Standardize logging | Consistent _log() usage with levels across all modules | all files |
### 1.3 Testing Infrastructure
| Task | Description | File(s) |
|------|-------------|---------|
| Add unit tests | Test proxy parsing, URL extraction, IP validation | tests/ |
| Mock network layer | Allow testing without live network/Tor | tests/ |
| Validation test suite | Verify multi-target voting logic | tests/ |
---
## Phase 2: Performance Optimization
**Objective:** Improve throughput and resource efficiency
### 2.1 Connection Pooling
| Task | Description | File(s) |
|------|-------------|---------|
| Tor connection reuse | Pool Tor SOCKS connections instead of reconnecting | proxywatchd.py |
| HTTP keep-alive | Reuse connections to same target servers | http2.py |
| Connection warm-up | Pre-establish connections before job assignment | proxywatchd.py |
### 2.2 Database Optimization
| Task | Description | File(s) |
|------|-------------|---------|
| Batch inserts | Group INSERT operations (already partial) | dbs.py |
| Index optimization | Add indexes for frequent query patterns | dbs.py |
| WAL mode | Enable Write-Ahead Logging for better concurrency | mysqlite.py |
| Prepared statements | Cache compiled SQL statements | mysqlite.py |
### 2.3 Threading Improvements
| Task | Description | File(s) |
|------|-------------|---------|
| Dynamic thread scaling | Adjust thread count based on success rate | proxywatchd.py |
| Priority queue | Test high-value proxies (low fail count) first | proxywatchd.py |
| Stale proxy cleanup | Background thread to remove long-dead proxies | proxywatchd.py |
---
## Phase 3: Reliability & Accuracy
**Objective:** Improve proxy validation accuracy and system reliability
### 3.1 Enhanced Validation
| Task | Description | File(s) |
|------|-------------|---------|
| Latency tracking | Store and use connection latency metrics | proxywatchd.py, dbs.py |
| Geographic validation | Verify proxy actually routes through claimed location | proxywatchd.py |
| Protocol fingerprinting | Better SOCKS4/SOCKS5/HTTP detection | rocksock.py |
| HTTPS/SSL testing | Validate SSL proxy capabilities | proxywatchd.py |
### 3.2 Target Management
### Target Management
| Task | Description | File(s) |
|------|-------------|---------|
@@ -129,283 +61,31 @@ PPF (Proxy Fetcher) is a Python 2 proxy scraping and validation framework design
| Target health tracking | Remove unresponsive targets from pool | proxywatchd.py |
| Geographic target spread | Ensure targets span multiple regions | config.py |
### 3.3 Failure Analysis
| Task | Description | File(s) |
|------|-------------|---------|
| Failure categorization | Distinguish timeout vs refused vs auth-fail | proxywatchd.py |
| Retry strategies | Different retry logic per failure type | proxywatchd.py |
| Dead proxy quarantine | Separate storage for likely-dead proxies | dbs.py |
---
## Phase 4: Features & Usability
**Objective:** Add useful features while maintaining simplicity
### 4.1 Reporting & Monitoring
| Task | Description | File(s) |
|------|-------------|---------|
| Statistics collection | Track success rates, throughput, latency | proxywatchd.py |
| Periodic status output | Log summary stats every N minutes | ppf.py, proxywatchd.py |
| Export functionality | Export working proxies to file (txt, json) | new: export.py |
### 4.2 Configuration
| Task | Description | File(s) |
|------|-------------|---------|
| Config validation | Validate config.ini on startup | config.py |
| Runtime reconfiguration | Reload config without restart (SIGHUP) | proxywatchd.py |
| Sensible defaults | Document and improve default values | config.py |
### 4.3 Proxy Source Expansion
| Task | Description | File(s) |
|------|-------------|---------|
| Additional scrapers | Support more search engines beyond Searx | scraper.py |
| API sources | Integrate free proxy API endpoints | new: api_sources.py |
| Import formats | Support various proxy list formats | ppf.py |
---
## Implementation Priority
```
┌─────────────────────────────────────────────────────────────────────────────┐
│ Priority Matrix │
├──────────────────────────┬──────────────────────────────────────────────────┤
│ HIGH IMPACT / LOW EFFORT │ HIGH IMPACT / HIGH EFFORT │
│ │ │
│ [x] Unify _known_proxies │ [x] Connection pooling │
│ [x] Graceful DB errors │ [x] Dynamic thread scaling │
│ [x] Batch inserts │ [x] Unit test infrastructure │
│ [x] WAL mode for SQLite │ [x] Latency tracking │
│ │ │
├──────────────────────────┼──────────────────────────────────────────────────┤
│ LOW IMPACT / LOW EFFORT │ LOW IMPACT / HIGH EFFORT │
│ │ │
│ [x] Standardize logging │ [x] Geographic validation │
│ [x] Config validation │ [x] Additional scrapers (Bing, Yahoo, Mojeek) │
│ [x] Export functionality │ [ ] API sources │
│ [x] Status output │ [ ] Protocol fingerprinting │
│ │ │
└──────────────────────────┴──────────────────────────────────────────────────┘
```
---
## Completed Work
### Multi-Target Validation (Done)
- [x] Work-stealing queue with shared Queue.Queue()
- [x] Multi-target validation (2/3 majority voting)
- [x] Interleaved testing (jobs shuffled across proxies)
- [x] ProxyTestState and TargetTestJob classes
### Code Cleanup (Done)
- [x] Removed dead HTTP server code from ppf.py
- [x] Removed dead gumbo code from soup_parser.py
- [x] Removed test code from comboparse.py
- [x] Removed unused functions from misc.py
- [x] Fixed IP/port cleansing in ppf.py extract_proxies()
- [x] Updated .gitignore, removed .pyc files
### Database Optimization (Done)
- [x] Enable SQLite WAL mode for better concurrency
- [x] Add indexes for common query patterns (failed, tested, proto, error, check_time)
- [x] Optimize batch inserts (remove redundant SELECT before INSERT OR IGNORE)
### Dependency Reduction (Done)
- [x] Make lxml optional (removed from requirements)
- [x] Make IP2Location optional (graceful fallback)
- [x] Add --nobs flag for stdlib HTMLParser fallback (bs4 optional)
### Rate Limiting & Stability (Done)
- [x] InstanceTracker class in scraper.py with exponential backoff
- [x] Configurable backoff_base, backoff_max, fail_threshold
- [x] Exception logging with context (replaced bare except blocks)
- [x] Unified _known_proxies cache in fetch.py
### Monitoring & Maintenance (Done)
- [x] Stats class in proxywatchd.py (tested/passed/failed tracking)
- [x] Periodic stats reporting (configurable stats_interval)
- [x] Stale proxy cleanup (cleanup_stale() with configurable stale_days)
- [x] Timeout config options (timeout_connect, timeout_read)
### Connection Pooling (Done)
- [x] TorHostState class tracking per-host health and latency
- [x] TorConnectionPool with worker affinity for circuit reuse
- [x] Exponential backoff (5s, 10s, 20s, 40s, max 60s) on failures
- [x] Pool warmup and health status reporting
### Priority Queue (Done)
- [x] PriorityJobQueue class with heap-based ordering
- [x] calculate_priority() assigns priority 0-4 by proxy state
- [x] New proxies tested first, high-fail proxies last
### Dynamic Thread Scaling (Done)
- [x] ThreadScaler class adjusts thread count dynamically
- [x] Scales up when queue deep and success rate acceptable
- [x] Scales down when queue shallow or success rate drops
- [x] Respects min/max bounds with cooldown period
### Latency Tracking (Done)
- [x] avg_latency, latency_samples columns in proxylist
- [x] Exponential moving average calculation
- [x] Migration function for existing databases
- [x] Latency recorded for successful proxy tests
### Container Support (Done)
- [x] Dockerfile with Python 2.7-slim base
- [x] docker-compose.yml for local development
- [x] Rootless podman deployment documentation
- [x] Volume mounts for persistent data
### Code Style (Done)
- [x] Normalized indentation (4-space, no tabs)
- [x] Removed dead code and unused imports
- [x] Added docstrings to classes and functions
- [x] Python 2/3 compatible imports (Queue/queue)
### Geographic Validation (Done)
- [x] IP2Location integration for country lookup
- [x] pyasn integration for ASN lookup
- [x] Graceful fallback when database files missing
- [x] Country codes displayed in test output: `(US)`, `(IN)`, etc.
- [x] Data files: IP2LOCATION-LITE-DB1.BIN, ipasn.dat
### SSL Proxy Testing (Done)
- [x] Default checktype changed to 'ssl'
- [x] ssl_targets list with major HTTPS sites
- [x] TLS handshake validation with certificate verification
- [x] Detects MITM proxies that intercept SSL connections
### Export Functionality (Done)
- [x] export.py CLI tool for exporting working proxies
- [x] Multiple formats: txt, json, csv, len (length-prefixed)
- [x] Filters: proto, country, anonymity, max_latency
- [x] Sort options: latency, added, tested, success
- [x] Output to stdout or file
### Web Dashboard (Done)
- [x] /dashboard endpoint with dark theme HTML UI
- [x] /api/stats endpoint for JSON runtime statistics
- [x] Auto-refresh with JavaScript fetch every 3 seconds
- [x] Stats provider callback from proxywatchd.py to httpd.py
- [x] Displays: tested/passed/success rate, thread count, uptime
- [x] Tor pool health: per-host latency, success rate, availability
- [x] Failure categories breakdown: timeout, proxy, ssl, closed
### Dashboard Enhancements v2 (Done)
- [x] Prominent check type badge in header (SSL/JUDGES/HTTP/IRC)
- [x] System monitor bar: load, memory, disk, process RSS
- [x] Anonymity breakdown: elite/anonymous/transparent counts
- [x] Database health: size, tested/hour, added/day, dead count
- [x] Enhanced Tor pool stats: requests, success rate, healthy nodes, latency
- [x] SQLite ANALYZE/VACUUM functions for query optimization
- [x] Lightweight design: client-side polling, minimal DOM updates
### Dashboard Enhancements v3 (Done)
- [x] Electric cyan theme with translucent glass-morphism effects
- [x] Unified wrapper styling (.chart-wrap, .histo-wrap, .stats-wrap, .lb-wrap, .pie-wrap)
- [x] Consistent backdrop-filter blur and electric glow borders
- [x] Tor Exit Nodes cards with hover effects (.tor-card)
- [x] Lighter background/tile color scheme (#1e2738 bg, #181f2a card)
- [x] Map endpoint restyled to match dashboard (electric cyan theme)
- [x] Map markers updated from gold to cyan for approximate locations
### Memory Profiling & Analysis (Done)
- [x] /api/memory endpoint with comprehensive memory stats
- [x] objgraph integration for object type counting
- [x] pympler integration for memory summaries
- [x] Memory sample history tracking (RSS over time)
- [x] Process memory from /proc/self/status (VmRSS, VmPeak, VmData, etc.)
- [x] GC statistics (collections, objects, thresholds)
### MITM Detection Optimization (Done)
- [x] MITM re-test skip optimization - avoid redundant SSL checks for known MITM proxies
- [x] mitm_retest_skipped stats counter for tracking optimization effectiveness
- [x] Content hash deduplication for stale proxy list detection
- [x] stale_count reset when content hash changes
### Distributed Workers (Done)
- [x] Worker registration and heartbeat system
- [x] /api/workers endpoint for worker status monitoring
- [x] Tor connectivity check before workers claim work
- [x] Worker test rate tracking with sliding window calculation
- [x] Combined rate aggregation across all workers
- [x] Dashboard worker cards showing per-worker stats
### Dashboard Performance (Done)
- [x] Keyboard shortcuts: r=refresh, 1-9=tabs, t=theme, p=pause
- [x] Tab-aware chart rendering - skip expensive renders for hidden tabs
- [x] Visibility API - pause polling when browser tab hidden
- [x] Dark/muted-dark/light theme cycling
- [x] Stats export endpoint (/api/stats/export?format=json|csv)
### Proxy Validation Cache (Done)
- [x] LRU cache for is_usable_proxy() using OrderedDict
- [x] Thread-safe with lock for concurrent access
- [x] Proper LRU eviction (move_to_end on hits, popitem oldest when full)
### Database Context Manager (Done)
- [x] Refactored all DB operations to use `_db_context()` context manager
- [x] Connections guaranteed to close even on exceptions
- [x] Removed deprecated `_prep_db()` and `_close_db()` methods
- [x] `fetch_rows()` now accepts db parameter for cleaner dependency injection
### Additional Search Engines (Done)
- [x] Bing and Yahoo engine implementations in scraper.py
- [x] Engine rotation for rate limit avoidance
- [x] engines.py module with SearchEngine base class
### Worker Health Improvements (Done)
- [x] Tor connectivity check before workers claim work
- [x] Fixed interval Tor check (30s) instead of exponential backoff
- [x] Graceful handling when Tor unavailable
### Memory Optimization (Done)
- [x] `__slots__` on ProxyTestState (27 attrs) and TargetTestJob (4 attrs)
- [x] Reduced per-object memory overhead for hot path objects
---
## Technical Debt
| Item | Description | Risk |
|------|-------------|------|
| ~~Dual _known_proxies~~ | ~~ppf.py and fetch.py maintain separate caches~~ | **Resolved** |
| Global config in fetch.py | set_config() pattern is fragile | Low - works but not clean |
| ~~No input validation~~ | ~~Proxy strings parsed without validation~~ | **Resolved** |
| ~~Silent exception catching~~ | ~~Some except: pass patterns hide errors~~ | **Resolved** |
| ~~Hardcoded timeouts~~ | ~~Various timeout values scattered in code~~ | **Resolved** |
---
## File Reference
| File | Purpose | Status |
|------|---------|--------|
| ppf.py | Main URL harvester daemon | Active, cleaned |
| proxywatchd.py | Proxy validation daemon | Active, enhanced |
| scraper.py | Searx search integration | Active, cleaned |
| fetch.py | HTTP fetching with proxy support | Active, LRU cache |
| dbs.py | Database schema and inserts | Active |
| mysqlite.py | SQLite wrapper | Active |
| rocksock.py | Socket/proxy abstraction (3rd party) | Stable |
| http2.py | HTTP client implementation | Stable |
| httpd.py | Web dashboard and REST API server | Active, enhanced |
| config.py | Configuration management | Active |
| comboparse.py | Config/arg parser framework | Stable, cleaned |
| soup_parser.py | BeautifulSoup wrapper | Stable, cleaned |
| misc.py | Utilities (timestamp, logging) | Stable, cleaned |
| export.py | Proxy export CLI tool | Active |
| engines.py | Search engine implementations | Active |
| connection_pool.py | Tor connection pooling | Active |
| network_stats.py | Network statistics tracking | Active |
| dns.py | DNS resolution with caching | Active |
| mitm.py | MITM certificate detection | Active |
| job.py | Priority job queue | Active |
| static/dashboard.js | Dashboard frontend logic | Active, enhanced |
| static/dashboard.html | Dashboard HTML template | Active |
| File | Purpose |
|------|---------|
| ppf.py | Main URL harvester daemon |
| proxywatchd.py | Proxy validation daemon |
| scraper.py | Searx search integration |
| fetch.py | HTTP fetching with proxy support |
| dbs.py | Database schema and inserts |
| mysqlite.py | SQLite wrapper |
| rocksock.py | Socket/proxy abstraction (3rd party) |
| http2.py | HTTP client implementation |
| httpd.py | Web dashboard and REST API server |
| config.py | Configuration management |
| comboparse.py | Config/arg parser framework |
| soup_parser.py | BeautifulSoup wrapper |
| misc.py | Utilities (timestamp, logging) |
| export.py | Proxy export CLI tool |
| engines.py | Search engine implementations |
| connection_pool.py | Tor connection pooling |
| network_stats.py | Network statistics tracking |
| dns.py | DNS resolution with caching |
| mitm.py | MITM certificate detection |
| job.py | Priority job queue |
| static/dashboard.js | Dashboard frontend logic |
| static/dashboard.html | Dashboard HTML template |

875
TODO.md
View File

@@ -1,866 +1,73 @@
# PPF Implementation Tasks
# PPF TODO
## Legend
## Optimization
```
[ ] Not started
[~] In progress
[x] Completed
[!] Blocked/needs discussion
```
### [ ] JSON Stats Response Caching
---
## Immediate Priority (Next Sprint)
### [x] 1. Unify _known_proxies Cache
**Completed.** Added `init_known_proxies()`, `add_known_proxies()`, `is_known_proxy()`
to fetch.py. Updated ppf.py to use these functions instead of local cache.
---
### [x] 2. Graceful SQLite Error Handling
**Completed.** mysqlite.py now retries on "locked" errors with exponential backoff.
---
### [x] 3. Enable SQLite WAL Mode
**Completed.** mysqlite.py enables WAL mode and NORMAL synchronous on init.
---
### [x] 4. Batch Database Inserts
**Completed.** dbs.py uses executemany() for batch inserts.
---
### [x] 5. Add Database Indexes
**Completed.** dbs.py creates indexes on failed, tested, proto, error, check_time.
---
## Short Term (This Month)
### [x] 6. Log Level Filtering
**Completed.** Added log level filtering with -q/--quiet and -v/--verbose CLI flags.
- misc.py: LOG_LEVELS dict, set_log_level(), get_log_level()
- config.py: Added -q/--quiet and -v/--verbose arguments
- Log levels: debug=0, info=1, warn=2, error=3
- --quiet: only show warn/error
- --verbose: show debug messages
---
### [x] 7. Connection Timeout Standardization
**Completed.** Added timeout_connect and timeout_read to [common] section in config.py.
---
### [x] 8. Failure Categorization
**Completed.** Added failure categorization for proxy errors.
- misc.py: categorize_error() function, FAIL_* constants
- Categories: timeout, refused, auth, unreachable, dns, ssl, closed, proxy, other
- proxywatchd.py: Stats.record() now accepts category parameter
- Stats.report() shows failure breakdown by category
- ProxyTestState.evaluate() returns (success, category) tuple
---
### [x] 9. Priority Queue for Proxy Testing
**Completed.** Added priority-based job scheduling for proxy tests.
- PriorityJobQueue class with heap-based ordering
- calculate_priority() assigns priority 0-4 based on proxy state
- Priority 0: New proxies (never tested)
- Priority 1: Working proxies (no failures)
- Priority 2: Low fail count (< 3)
- Priority 3-4: Medium/high fail count
- Integrated into prepare_jobs() for automatic prioritization
---
### [x] 10. Periodic Statistics Output
**Completed.** Added Stats class to proxywatchd.py with record(), should_report(),
and report() methods. Integrated into main loop with configurable stats_interval.
---
## Medium Term (Next Quarter)
### [x] 11. Tor Connection Pooling
**Completed.** Added connection pooling with worker-Tor affinity and health monitoring.
- connection_pool.py: TorHostState class tracks per-host health, latency, backoff
- connection_pool.py: TorConnectionPool with worker affinity, warmup, statistics
- proxywatchd.py: Workers get consistent Tor host assignment for circuit reuse
- Success/failure tracking with exponential backoff (5s, 10s, 20s, 40s, max 60s)
- Latency tracking with rolling averages
- Pool status reported alongside periodic stats
---
### [x] 12. Dynamic Thread Scaling
**Completed.** Added dynamic thread scaling based on queue depth and success rate.
- ThreadScaler class in proxywatchd.py with should_scale(), status_line()
- Scales up when queue is deep (2x target) and success rate > 10%
- Scales down when queue is shallow or success rate drops
- Min/max threads derived from config.watchd.threads (1/4x to 2x)
- 30-second cooldown between scaling decisions
- _spawn_thread(), _remove_thread(), _adjust_threads() helper methods
- Scaler status reported alongside periodic stats
---
### [x] 13. Latency Tracking
**Completed.** Added per-proxy latency tracking with exponential moving average.
- dbs.py: avg_latency, latency_samples columns added to proxylist schema
- dbs.py: _migrate_latency_columns() for backward-compatible migration
- dbs.py: update_proxy_latency() with EMA (alpha = 2/(samples+1))
- proxywatchd.py: ProxyTestState.last_latency_ms field
- proxywatchd.py: evaluate() calculates average latency from successful tests
- proxywatchd.py: submit_collected() records latency for passing proxies
---
### [x] 14. Export Functionality
**Completed.** Added export.py CLI tool for exporting working proxies.
- Formats: txt (default), json, csv, len (length-prefixed)
- Filters: --proto, --country, --anonymity, --max-latency
- Options: --sort (latency, added, tested, success), --limit, --pretty
- Output: stdout or --output file
- Usage: `python export.py --proto http --country US --sort latency --limit 100`
---
### [x] 15. Unit Test Infrastructure
**Completed.** Added pytest-based test suite with comprehensive coverage.
- tests/conftest.py: Shared fixtures (temp_db, proxy_db, sample_proxies, etc.)
- tests/test_dbs.py: 40 tests for database operations (CDN filtering, latency, stats)
- tests/test_fetch.py: 60 tests for proxy validation (skipped in Python 3)
- tests/test_misc.py: 39 tests for utilities (timestamp, log levels, SSL errors)
- tests/mock_network.py: Network mocking infrastructure
```
Test Results: 79 passed, 60 skipped (Python 2 only)
Run with: python3 -m pytest tests/ -v
```
---
## Long Term (Future)
### [x] 16. Geographic Validation
**Completed.** Added IP2Location and pyasn for proxy geolocation.
- requirements.txt: Added IP2Location package
- proxywatchd.py: IP2Location for country lookup, pyasn for ASN lookup
- proxywatchd.py: Fixed ValueError handling when database files missing
- data/: IP2LOCATION-LITE-DB1.BIN (2.7M), ipasn.dat (23M)
- Output shows country codes: `http://1.2.3.4:8080 (US)` or `(IN)`, `(DE)`, etc.
---
### [x] 17. SSL Proxy Testing
**Completed.** Added SSL checktype for TLS handshake validation.
- config.py: Default checktype changed to 'ssl'
- proxywatchd.py: ssl_targets list with major HTTPS sites
- Validates TLS handshake with certificate verification
- Detects MITM proxies that intercept SSL connections
### [x] 18. Additional Search Engines
**Completed.** Added modular search engine architecture.
- engines.py: SearchEngine base class with build_url(), extract_urls(), is_rate_limited()
- Engines: DuckDuckGo, Startpage, Mojeek (UK), Qwant (FR), Yandex (RU), Ecosia, Brave
- Git hosters: GitHub, GitLab, Codeberg, Gitea
- scraper.py: EngineTracker class for multi-engine rate limiting
- Config: [scraper] engines, max_pages settings
- searx.instances: Updated with 51 active SearXNG instances
### [x] 19. REST API
**Completed.** Added HTTP API server for querying working proxies.
- httpd.py: ProxyAPIServer class with BaseHTTPServer
- Endpoints: /proxies, /proxies/count, /health
- Params: limit, proto, country, format (json/plain)
- Integrated into proxywatchd.py (starts when httpd.enabled=True)
- Config: [httpd] section with listenip, port, enabled
### [x] 20. Web Dashboard
**Completed.** Added web dashboard with live statistics.
- httpd.py: DASHBOARD_HTML template with dark theme UI
- Endpoint: /dashboard (HTML page with auto-refresh)
- Endpoint: /api/stats (JSON runtime statistics)
- Stats include: tested/passed counts, success rate, thread count, uptime
- Tor pool health: per-host latency, success rate, availability
- Failure categories: timeout, proxy, ssl, closed, etc.
- proxywatchd.py: get_runtime_stats() method provides stats callback
### [x] 21. Dashboard Enhancements (v2)
**Completed.** Major dashboard improvements for better visibility.
- Prominent check type badge in header (SSL/JUDGES/HTTP/IRC with color coding)
- System monitor bar: load average, memory usage, disk usage, process RSS
- Anonymity breakdown: elite/anonymous/transparent proxy counts
- Database health indicators: size, tested/hour, added/day, dead count
- Enhanced Tor pool: total requests, success rate, healthy nodes, avg latency
- SQLite ANALYZE/VACUUM functions for query optimization (dbs.py)
- Database statistics API (get_database_stats())
### [x] 22. Completion Queue Optimization
**Completed.** Eliminated polling bottleneck in proxy test collection.
- Added `completion_queue` for event-driven state signaling
- `ProxyTestState.record_result()` signals when all targets complete
- `collect_work()` drains queue instead of polling all pending states
- Changed `pending_states` from list to dict for O(1) removal
- Result: `is_complete()` eliminated from hot path, `collect_work()` 54x faster
---
### [x] 23. Batch API Endpoint
**Completed.** Added `/api/dashboard` batch endpoint combining stats, workers, and countries.
**Implementation:**
- `httpd.py`: New `/api/dashboard` endpoint returns combined data in single response
- `httpd.py`: Refactored `/api/workers` to use `_get_workers_data()` helper method
- `dashboard.js`: Updated `fetchStats()` to use batch endpoint instead of multiple calls
**Response Structure:**
```json
{
"stats": { /* same as /api/stats */ },
"workers": { /* same as /api/workers */ },
"countries": { /* same as /api/countries */ }
}
```
**Benefit:**
- Reduces dashboard polling from 2 HTTP requests to 1 per poll cycle
- Lower RTT impact over SSH tunnels and high-latency connections
- Single database connection serves all data
---
## Profiling-Based Performance Optimizations
**Baseline:** 30-minute profiling session, 25.6M function calls, 1842s runtime
The following optimizations were identified through cProfile analysis. Each is
assessed for real-world impact based on measured data.
### [x] 1. SQLite Query Batching
**Completed.** Added batch update functions and optimized submit_collected().
**Implementation:**
- `batch_update_proxy_latency()`: Single SELECT with IN clause, compute EMA in Python,
batch UPDATE with executemany()
- `batch_update_proxy_anonymity()`: Batch all anonymity updates in single executemany()
- `submit_collected()`: Uses batch functions instead of per-proxy loops
**Previous State:**
- 18,182 execute() calls consuming 50.6s (2.7% of runtime)
- Individual UPDATE for each proxy latency and anonymity
**Improvement:**
- Reduced from N execute() + N commit() to 1 SELECT + 1 executemany() per batch
- Estimated 15-25% reduction in SQLite overhead
---
### [x] 2. Proxy Validation Caching
**Completed.** Converted is_usable_proxy() cache to proper LRU with OrderedDict.
**Implementation:**
- fetch.py: Changed _proxy_valid_cache from dict to OrderedDict
- Added thread-safe _proxy_valid_cache_lock
- move_to_end() on cache hits to maintain LRU order
- Evict oldest entries when cache reaches max size (10,000)
- Proper LRU eviction instead of stopping inserts when full
---
### [x] 3. Regex Pattern Pre-compilation
**Completed.** Pre-compiled proxy extraction pattern at module load.
**Implementation:**
- `fetch.py`: Added `PROXY_PATTERN = re.compile(r'...')` at module level
- `extract_proxies()`: Changed `re.findall(pattern, ...)` to `PROXY_PATTERN.findall(...)`
- Pattern compiled once at import, not on each call
**Previous State:**
- `extract_proxies()`: 166 calls, 2.87s total (17.3ms each)
- Pattern recompiled on each call
**Improvement:**
- Eliminated per-call regex compilation overhead
- Estimated 30-50% reduction in extract_proxies() time
---
### [ ] 4. JSON Stats Response Caching
**Current State:**
- 1.9M calls to JSON encoder functions
- `_iterencode_dict`: 1.4s, `_iterencode_list`: 0.8s
- Dashboard polls every 3 seconds = 600 requests per 30min
- Most stats data unchanged between requests
**Proposed Change:**
- Cache serialized JSON response with short TTL (1-2 seconds)
- Cache serialized JSON response with short TTL (1-2s)
- Only regenerate when underlying stats change
- Use ETag/If-None-Match for client-side caching
- Savings: ~7-9s/hour. Low priority, only matters with frequent dashboard access.
**Assessment:**
```
Current cost: ~5.5s per 30min (JSON encoding overhead)
Potential saving: 60-80% = 3.3-4.4s per 30min = 6.6-8.8s/hour
Effort: Medium (add caching layer to httpd.py)
Risk: Low (stale stats for 1-2 seconds acceptable)
```
### [ ] Object Pooling for Test States
**Verdict:** LOW PRIORITY. Only matters with frequent dashboard access.
- Pool ProxyTestState and TargetTestJob, reset and reuse
- Savings: ~11-15s/hour. **Not recommended** - high effort, medium risk, modest gain.
### [ ] SQLite Connection Reuse
- Persistent connection per thread with health checks
- Savings: ~0.3s/hour. **Not recommended** - negligible benefit.
---
### [ ] 5. Object Pooling for Test States
## Dashboard
**Current State:**
- `__new__` calls: 43,413 at 10.1s total
- `ProxyTestState.__init__`: 18,150 calls, 0.87s
- `TargetTestJob` creation: similar overhead
- Objects created and discarded each test cycle
### [ ] Performance
**Proposed Change:**
- Implement object pool for ProxyTestState and TargetTestJob
- Reset and reuse objects instead of creating new
- Pool size: 2x thread count
- Cache expensive DB queries (top countries, protocol breakdown)
- Lazy-load historical data (only when scrolled into view)
- WebSocket option for push updates (reduce polling overhead)
- Configurable refresh interval via URL param or localStorage
**Assessment:**
```
Current cost: ~11s per 30min = 22s/hour = 14.7min/day
Potential saving: 50-70% = 5.5-7.7s per 30min = 11-15s/hour = 7-10min/day
Effort: High (significant refactoring, reset logic needed)
Risk: Medium (state leakage bugs if reset incomplete)
```
### [ ] Features
**Verdict:** NOT RECOMMENDED. High effort, medium risk, modest gain.
Python's object creation is already optimized. Focus elsewhere.
- Historical graphs (24h, 7d) using stats_history table
- Per-ASN performance analysis
- Alert thresholds (success rate < X%, MITM detected)
- Mobile-responsive improvements
---
### [ ] 6. SQLite Connection Reuse
## Memory
**Current State:**
- 718 connection opens in 30min session
- Each open: 0.26ms (total 0.18s for connects)
- Connection per operation pattern in mysqlite.py
**Proposed Change:**
- Maintain persistent connection per thread
- Implement connection pool with health checks
- Reuse connections across operations
**Assessment:**
```
Current cost: 0.18s per 30min (connection overhead only)
Potential saving: 90% = 0.16s per 30min = 0.32s/hour
Effort: Medium (thread-local storage, lifecycle management)
Risk: Medium (connection state, locking issues)
```
**Verdict:** NOT RECOMMENDED. Negligible time savings (0.16s per 30min).
SQLite's lightweight connections don't justify pooling complexity.
---
### Summary: Optimization Priority Matrix
```
┌─────────────────────────────────────┬────────┬────────┬─────────┬───────────┐
│ Optimization │ Effort │ Risk │ Savings │ Status
├─────────────────────────────────────┼────────┼────────┼─────────┼───────────┤
│ 1. SQLite Query Batching │ Low │ Low │ 20-34s/h│ DONE
│ 2. Proxy Validation Caching │ V.Low │ None │ 5-8s/h │ DONE
│ 3. Regex Pre-compilation │ Low │ None │ 5-8s/h │ DONE
│ 4. JSON Response Caching │ Medium │ Low │ 7-9s/h │ Later
│ 5. Object Pooling │ High │ Medium │ 11-15s/h│ Skip
│ 6. SQLite Connection Reuse │ Medium │ Medium │ 0.3s/h │ Skip
└─────────────────────────────────────┴────────┴────────┴─────────┴───────────┘
Completed: 1 (SQLite Batching), 2 (Proxy Caching), 3 (Regex Pre-compilation)
Remaining: 4 (JSON Caching - Later)
Realized savings from completed optimizations:
Per hour: 25-42 seconds saved
Per day: 10-17 minutes saved
Per week: 1.2-2.0 hours saved
Note: 68.7% of runtime is socket I/O (recv/send) which cannot be optimized
without changing the fundamental network architecture. The optimizations
above target the remaining 31.3% of CPU-bound operations.
```
---
## Potential Dashboard Improvements
### [ ] Dashboard Performance Optimizations
**Goal:** Ensure dashboard remains lightweight and doesn't impact system performance.
**Current safeguards:**
- No polling on server side (client-initiated via fetch)
- 3-second refresh interval (configurable)
- Minimal DOM updates (targeted element updates, not full re-render)
- Static CSS/JS (no server-side templating per request)
- No persistent connections (stateless HTTP)
**Future considerations:**
- [x] Add rate limiting on /api/stats endpoint (300 req/60s sliding window)
- [ ] Cache expensive DB queries (top countries, protocol breakdown)
- [ ] Lazy-load historical data (only when scrolled into view)
- [ ] WebSocket option for push updates (reduce polling overhead)
- [ ] Configurable refresh interval via URL param or localStorage
- [x] Pause polling when browser tab not visible (Page Visibility API)
- [x] Skip chart rendering for inactive dashboard tabs (reduces CPU)
- [ ] Batch API endpoints - combine /api/stats, /api/workers, /api/countries into
single /api/dashboard call to reduce round-trips (helps SSH tunnel latency)
### [ ] Dashboard Feature Ideas
**Low priority - consider when time permits:**
- [x] Geographic map visualization - /map endpoint with Leaflet.js
- [x] Dark/light/muted theme toggle - t key cycles themes
- [x] Export stats as CSV/JSON from dashboard (/api/stats/export?format=json|csv)
- [ ] Historical graphs (24h, 7d) using stats_history table
- [ ] Per-ASN performance analysis
- [ ] Alert thresholds (success rate < X%, MITM detected)
- [ ] Mobile-responsive improvements
- [x] Keyboard shortcuts (r=refresh, 1-9=tabs, t=theme, p=pause)
### [x] Local JS Library Serving
**Completed.** All JavaScript libraries now served locally from /static/lib/ endpoint.
**Bundled libraries (static/lib/):**
- Leaflet.js 1.9.4 (leaflet.js, leaflet.css)
- Leaflet.markercluster (MarkerCluster.Default.css)
- Chart.js 4.x (chart.umd.min.js)
- uPlot (uPlot.iife.min.js, uPlot.min.css)
**Candidate libraries for future enhancements:**
```
┌─────────────────┬─────────┬───────────────────────────────────────────────┐
│ Library │ Size │ Use Case
├─────────────────┼─────────┼───────────────────────────────────────────────┤
│ Chart.js │ 65 KB │ Line/bar/pie charts (simpler API than D3)
│ uPlot │ 15 KB │ Fast time-series charts (minimal, performant)
│ ApexCharts │ 125 KB │ Modern charts with animations
│ Frappe Charts │ 25 KB │ Simple, modern SVG charts
│ Sparkline │ 2 KB │ Tiny inline charts (already have custom impl)
├─────────────────┼─────────┼───────────────────────────────────────────────┤
│ D3.js │ 85 KB │ Full control, complex visualizations
│ D3-geo │ 30 KB │ Geographic projections (alternative to Leaflet)
├─────────────────┼─────────┼───────────────────────────────────────────────┤
│ Leaflet │ 40 KB │ Interactive maps (already using)
│ Leaflet.heat │ 5 KB │ Heatmap layer for proxy density
│ Leaflet.cluster │ 10 KB │ Marker clustering for many points
└─────────────────┴─────────┴───────────────────────────────────────────────┘
Recommendations:
● uPlot - Best for time-series (rate history, success rate history)
● Chart.js - Best for pie/bar charts (failure breakdown, protocol stats)
● Leaflet - Keep for maps, add heatmap plugin for density viz
```
**Current custom implementations (no library):**
- Sparkline charts (Test Rate History, Success Rate History) - inline SVG
- Histogram bars (Response Time Distribution) - CSS divs
- Pie charts (Failure Breakdown, Protocol Stats) - CSS conic-gradient
**Decision:** Current custom implementations are lightweight and sufficient.
Add libraries only when custom becomes unmaintainable or new features needed.
### [ ] Memory Optimization Candidates
**Based on memory analysis (production metrics):**
```
Current State (260k queue):
Start RSS: 442 MB
Current RSS: 1,615 MB
Per-job: ~4.5 KB overhead
Object Distribution:
259,863 TargetTestJob (1 per job)
259,863 ProxyTestState (1 per job)
259,950 LockType (1 per job - threading locks)
523,395 dict (2 per job - state + metadata)
522,807 list (2 per job - results + targets)
```
**Potential optimizations:**
- [ ] Lock consolidation - reduce per-proxy locks (260k LockType objects)
- [ ] Leaner state objects - reduce dict/list count per job
- [x] Slot-based classes - use `__slots__` on ProxyTestState (27 attrs), TargetTestJob (4 attrs)
- [ ] Object pooling - reuse ProxyTestState/TargetTestJob objects (not recommended)
**Verdict:** Memory scales linearly with queue (~4.5 KB/job). No leaks detected.
Current usage acceptable for production workloads. Optimize only if memory
becomes a constraint.
Memory scales linearly with queue (~4.5 KB/job). No leaks detected.
Optimize only if memory becomes a constraint.
---
## Completed
## Known Issues
### [x] Work-Stealing Queue
- Implemented shared Queue.Queue() for job distribution
- Workers pull from shared queue instead of pre-assigned lists
- Better utilization across threads
### [!] Podman Container Metadata Disappears
### [x] Multi-Target Validation
- Test each proxy against 3 random targets
- 2/3 majority required for success
- Reduces false negatives from single target failures
### [x] Interleaved Testing
- Jobs shuffled across all proxies before queueing
- Prevents burst of 3 connections to same proxy
- ProxyTestState accumulates results from TargetTestJobs
### [x] Code Cleanup
- Removed 93 lines dead HTTP server code (ppf.py)
- Removed dead gumbo parser (soup_parser.py)
- Removed test code (comboparse.py)
- Removed unused functions (misc.py)
- Fixed IP/port cleansing (ppf.py)
- Updated .gitignore
### [x] Rate Limiting & Instance Tracking (scraper.py)
- InstanceTracker class with exponential backoff
- Configurable backoff_base, backoff_max, fail_threshold
- Instance cycling when rate limited
### [x] Exception Logging with Context
- Replaced bare `except:` with typed exceptions across all files
- Added context logging to exception handlers (e.g., URL, error message)
### [x] Timeout Standardization
- Added timeout_connect, timeout_read to [common] config section
- Added stale_days, stats_interval to [watchd] config section
### [x] Periodic Stats & Stale Cleanup (proxywatchd.py)
- Stats class tracks tested/passed/failed with thread-safe counters
- Configurable stats_interval (default: 300s)
- cleanup_stale() removes dead proxies older than stale_days (default: 30)
### [x] Unified Proxy Cache
- Moved _known_proxies to fetch.py with helper functions
- init_known_proxies(), add_known_proxies(), is_known_proxy()
- ppf.py now uses shared cache via fetch module
### [x] Config Validation
- config.py: validate() method checks config values on startup
- Validates: port ranges, timeout values, thread counts, engine names
- Warns on missing source_file, unknown engines
- Errors on unwritable database directories
- Integrated into ppf.py, proxywatchd.py, scraper.py main entry points
### [x] Profiling Support
- config.py: Added --profile CLI argument
- ppf.py: Refactored main logic into main() function
- ppf.py: cProfile wrapper with stats output to profile.stats
- Prints top 20 functions by cumulative time on exit
- Usage: `python2 ppf.py --profile`
### [x] SIGTERM Graceful Shutdown
- ppf.py: Added signal handler converting SIGTERM to KeyboardInterrupt
- Ensures profile stats are written before container exit
- Allows clean thread shutdown in containerized environments
- Podman stop now triggers proper cleanup instead of SIGKILL
### [x] Unicode Exception Handling (Python 2)
- Problem: `repr(e)` on exceptions with unicode content caused encoding errors
- Files affected: ppf.py, scraper.py (3 exception handlers)
- Solution: Check `isinstance(err_msg, unicode)` then encode with 'backslashreplace'
- Pattern applied:
```python
try:
err_msg = repr(e)
if isinstance(err_msg, unicode):
err_msg = err_msg.encode('ascii', 'backslashreplace')
except:
err_msg = type(e).__name__
```
- Handles Korean/CJK characters in search queries without crashing
### [x] Interactive World Map (/map endpoint)
- Added Leaflet.js interactive map showing proxy distribution by country
- Modern glassmorphism UI with `backdrop-filter: blur(12px)`
- CartoDB dark tiles for dark theme
- Circle markers sized proportionally to proxy count per country
- Hover effects with smooth transitions
- Stats overlay showing total countries/proxies
- Legend with proxy count scale
- Country coordinates and names lookup tables
### [x] Dashboard v3 - Electric Cyan Theme
- Translucent glass-morphism effects with `backdrop-filter: blur()`
- Electric cyan glow borders `rgba(56,189,248,...)` on all graph wrappers
- Gradient overlays using `::before` pseudo-elements
- Unified styling across: .chart-wrap, .histo-wrap, .stats-wrap, .lb-wrap, .pie-wrap
- New .tor-card wrapper for Tor Exit Nodes with hover effects
- Lighter background color scheme (#1e2738 bg, #181f2a card)
### [x] Map Endpoint Styling Update
- Converted from gold/bronze theme (#c8b48c) to electric cyan (#38bdf8)
- Glass panels with electric glow matching dashboard
- Map markers for approximate locations now cyan instead of gold
- Unified map_bg color with dashboard background (#1e2738)
- Updated Leaflet controls, popups, and legend to cyan theme
### [x] MITM Re-test Optimization
- Skip redundant SSL checks for proxies already known to be MITM
- Added `mitm_retest_skipped` counter to Stats class
- Optimization in `_try_ssl_check()` checks existing MITM flag before testing
- Avoids 6k+ unnecessary re-tests per session (based on production metrics)
### [x] Memory Profiling Endpoint
- /api/memory endpoint with comprehensive memory analysis
- objgraph integration for object type distribution
- pympler integration for memory summaries
- Memory sample history tracking (RSS over time)
- Process memory from /proc/self/status
- GC statistics and collection counts
### [x] Database Context Manager Refactoring
- Refactored all DB operations to use `_db_context()` context manager
- `prepare_jobs()`, `submit_collected()`, `_run()` now use `with self._db_context() as db:`
- `fetch_rows()` accepts db parameter for dependency injection
- Removed deprecated `_prep_db()` and `_close_db()` methods
- Connections guaranteed to close even on exceptions
`podman ps -a` shows empty even though process is running. Service functions
correctly despite missing metadata. Monitor via `ss -tlnp`, `ps aux`, or
`curl localhost:8081/health`. Low impact.
---
## Deployment Troubleshooting Log
### [x] Container Crash on Startup (2024-12-24)
**Symptoms:**
- Container starts then immediately disappears
- `podman ps` shows no running containers
- `podman logs ppf` returns "no such container"
- Port 8081 not listening
**Debugging Process:**
1. **Initial diagnosis** - SSH to odin, checked container state:
```bash
sudo -u podman podman ps -a # Empty
sudo ss -tlnp | grep 8081 # Nothing listening
```
2. **Ran container in foreground** to capture output:
```bash
sudo -u podman bash -c 'cd /home/podman/ppf && \
timeout 25 podman run --rm --name ppf --network=host \
-v ./src:/app:ro -v ./data:/app/data \
-v ./config.ini:/app/config.ini:ro \
localhost/ppf python2 -u proxywatchd.py 2>&1'
```
3. **Found the error** in httpd thread startup:
```
error: [Errno 98] Address already in use: ('0.0.0.0', 8081)
```
Container started, httpd failed to bind, process continued but HTTP unavailable.
4. **Identified root cause** - orphaned processes from previous debug attempts:
```bash
ps aux | grep -E "[p]pf|[p]roxy"
# Found: python2 ppf.py (PID 6421) still running, holding port 8081
# Found: conmon, timeout, bash processes from stale container
```
5. **Why orphans existed:**
- Previous `timeout 15 podman run` commands timed out
- `podman rm -f` doesn't kill processes when container metadata is corrupted
- Orphaned python2 process kept running with port bound
**Root Cause:**
Stale container processes from interrupted debug sessions held port 8081.
The container started successfully but httpd thread failed to bind,
causing silent failure (no HTTP endpoints) while proxy testing continued.
**Fix Applied:**
```bash
# Force kill all orphaned processes
sudo pkill -9 -f "ppf.py"
sudo pkill -9 -f "proxywatchd.py"
sudo pkill -9 -f "conmon.*ppf"
sleep 2
# Verify port is free
sudo ss -tlnp | grep 8081 # Should show nothing
# Clean podman state
sudo -u podman podman rm -f -a
sudo -u podman podman container prune -f
# Start fresh
sudo -u podman bash -c 'cd /home/podman/ppf && \
podman run -d --rm --name ppf --network=host \
-v ./src:/app:ro -v ./data:/app/data \
-v ./config.ini:/app/config.ini:ro \
localhost/ppf python2 -u proxywatchd.py'
```
**Verification:**
```bash
curl -sf http://localhost:8081/health
# {"status": "ok", "timestamp": 1766573885}
```
**Prevention:**
- Use `podman-compose` for reliable container management
- Use `pkill -9 -f` to kill orphaned processes before restart
- Check port availability before starting: `ss -tlnp | grep 8081`
- Run container foreground first to capture startup errors
**Correct Deployment Procedure:**
```bash
# As root or with sudo
sudo -i -u podman bash
cd /home/podman/ppf
podman-compose down
podman-compose up -d
podman ps
podman logs -f ppf
```
**docker-compose.yml (updated):**
```yaml
version: '3.8'
services:
ppf:
image: localhost/ppf:latest
container_name: ppf
network_mode: host
volumes:
- ./src:/app:ro
- ./data:/app/data
- ./config.ini:/app/config.ini:ro
command: python2 -u proxywatchd.py
restart: unless-stopped
environment:
- PYTHONUNBUFFERED=1
```
---
### [x] SSH Connection Flooding / fail2ban (2024-12-24)
**Symptoms:**
- SSH connections timing out or reset
- "Connection refused" errors
- Intermittent access to odin
**Root Cause:**
Multiple individual SSH commands triggered fail2ban rate limiting.
**Fix Applied:**
Created `~/.claude/rules/ssh-usage.md` with batching best practices.
**Key Pattern:**
```bash
# BAD: 5 separate connections
ssh host 'cmd1'
ssh host 'cmd2'
ssh host 'cmd3'
# GOOD: 1 connection, all commands
ssh host bash <<'EOF'
cmd1
cmd2
cmd3
EOF
```
---
### [!] Podman Container Metadata Disappears (2024-12-24)
**Symptoms:**
- `podman ps -a` shows empty even though process is running
- `podman logs ppf` returns "no such container"
- Port is listening and service responds to health checks
**Observed Behavior:**
```
# Container starts
podman run -d --name ppf ...
# Returns container ID: dc55f0a218b7...
# Immediately after
podman ps -a # Empty!
ss -tlnp | grep 8081 # Shows python2 listening
curl localhost:8081/health # {"status": "ok"}
```
**Analysis:**
- The process runs correctly inside the container namespace
- Container metadata in podman's database is lost/corrupted
- May be related to `--rm` flag interaction with detached mode
- Rootless podman with overlayfs can have state sync issues
**Workaround:**
Service works despite missing metadata. Monitor via:
- `ss -tlnp | grep 8081` - port listening
- `ps aux | grep proxywatchd` - process running
- `curl localhost:8081/health` - service responding
**Impact:** Low. Service functions correctly. Only `podman logs` unavailable.
---
### Container Debugging Checklist
When container fails to start or crashes:
## Container Debugging Checklist
```
┌───┬─────────────────────────────────────────────────────────────────────────┐
│ 1 │ Check for orphans: ps aux | grep -E "[p]rocess_name"
│ 2 │ Check port conflicts: ss -tlnp | grep PORT
│ 3 │ Run foreground: podman run --rm (no -d) to see output
│ 4 │ Check podman state: podman ps -a
│ 5 │ Clean stale: pkill -9 -f "pattern" && podman rm -f -a
│ 6 │ Verify deps: config files, data dirs, volumes exist
│ 7 │ Check logs: podman logs container_name 2>&1 | tail -50
│ 8 │ Health check: curl -sf http://localhost:PORT/health
└───┴─────────────────────────────────────────────────────────────────────────┘
Note: If podman ps shows empty but port is listening and health check passes,
the service is running correctly despite metadata issues. See "Podman Container
Metadata Disappears" section above.
1. Check for orphans: ps aux | grep -E "[p]rocess_name"
2. Check port conflicts: ss -tlnp | grep PORT
3. Run foreground: podman run --rm (no -d) to see output
4. Check podman state: podman ps -a
5. Clean stale: pkill -9 -f "pattern" && podman rm -f -a
6. Verify deps: config files, data dirs, volumes exist
7. Check logs: podman logs container_name 2>&1 | tail -50
8. Health check: curl -sf http://localhost:PORT/health
```

30
compose.master.yml Normal file
View File

@@ -0,0 +1,30 @@
# PPF master node (odin)
#
# Scrapes proxy sources, runs verification, serves API/dashboard.
# No routine proxy testing -- workers handle that.
#
# Prerequisites:
# - config.ini (not tracked, host-specific)
# - data/ (created automatically)
#
# Usage:
# podman-compose -f compose.master.yml up -d
# podman-compose -f compose.master.yml logs -f
# podman-compose -f compose.master.yml down
services:
ppf:
container_name: ppf
image: localhost/ppf:latest
build: .
network_mode: host
restart: unless-stopped
stop_signal: SIGTERM
stop_grace_period: 30s
environment:
PYTHONUNBUFFERED: "1"
volumes:
- .:/app:ro,Z
- ./data:/app/data:Z
- ./config.ini:/app/config.ini:ro,Z
command: python -u ppf.py

38
compose.worker.yml Normal file
View File

@@ -0,0 +1,38 @@
# PPF worker node (cassius, edge, sentinel, ...)
#
# Tests proxies and reports results to master via WireGuard.
# Each worker uses only local Tor (127.0.0.1:9050).
#
# Prerequisites:
# - config.ini (not tracked, host-specific)
# - servers.txt (deploy from repo)
# - src/ (deploy *.py from repo root into src/)
# - data/ (created automatically)
#
# Usage:
# PPF_MASTER_URL=http://10.200.1.250:8081 podman-compose -f compose.worker.yml up -d
# podman-compose -f compose.worker.yml logs -f
# podman-compose -f compose.worker.yml down
#
# The master URL defaults to http://10.200.1.250:8081 (odin via WireGuard).
# Override with PPF_MASTER_URL env var or edit .env file.
services:
ppf-worker:
container_name: ppf-worker
image: localhost/ppf-worker:latest
build: .
network_mode: host
restart: unless-stopped
stop_signal: SIGTERM
stop_grace_period: 30s
logging:
driver: k8s-file
environment:
PYTHONUNBUFFERED: "1"
volumes:
- ./src:/app:ro,Z
- ./data:/app/data:Z
- ./config.ini:/app/config.ini:ro,Z
- ./servers.txt:/app/servers.txt:ro,Z
command: python -u ppf.py --worker-v2 --server ${PPF_MASTER_URL:-http://10.200.1.250:8081}

View File

@@ -11,7 +11,12 @@ class Config(ComboParser):
with open(self.watchd.source_file, 'r') as handle:
self.servers = [x.strip() for x in handle.readlines() if len(x.strip()) > 0]
# Parse checktypes as comma-separated list
self.watchd.checktypes = [t.strip() for t in self.watchd.checktype.split(',') if t.strip()]
# Normalize: 'false'/'off'/'disabled' -> 'none' (SSL-only mode)
raw_types = [t.strip().lower() for t in self.watchd.checktype.split(',') if t.strip()]
self.watchd.checktypes = ['none' if t in ('false', 'off', 'disabled') else t for t in raw_types]
# SSL-only mode: force ssl_first when secondary check is disabled
if self.watchd.checktypes == ['none']:
self.watchd.ssl_first = True
# Apply log level from CLI flags
if self.args.quiet:
set_log_level('warn')
@@ -52,12 +57,15 @@ class Config(ComboParser):
errors.append('ppf.max_fail must be >= 1')
# Validate checktypes (secondary check types, ssl is handled by ssl_first)
valid_checktypes = {'irc', 'head', 'judges'}
# 'none' = SSL-only mode (no secondary check)
valid_checktypes = {'irc', 'head', 'judges', 'none'}
for ct in self.watchd.checktypes:
if ct not in valid_checktypes:
errors.append('watchd.checktype "%s" invalid, must be one of: %s' % (ct, ', '.join(sorted(valid_checktypes))))
if not self.watchd.checktypes:
errors.append('watchd.checktype must specify at least one valid type')
if 'none' in self.watchd.checktypes and len(self.watchd.checktypes) > 1:
errors.append('watchd.checktype "none" cannot be combined with other types')
# Validate engine names
valid_engines = {'duckduckgo', 'startpage', 'brave', 'ecosia',
@@ -112,7 +120,7 @@ class Config(ComboParser):
self.add_item(section, 'stale_days', int, 30, 'days after which dead proxies are removed (default: 30)', False)
self.add_item(section, 'stats_interval', int, 300, 'seconds between status reports (default: 300)', False)
self.add_item(section, 'tor_safeguard', bool, True, 'enable tor safeguard (default: True)', False)
self.add_item(section, 'checktype', str, 'head', 'secondary check type: irc, head, judges (used when ssl_first fails)', False)
self.add_item(section, 'checktype', str, 'head', 'secondary check type: head, irc, judges, none/false (none = SSL-only)', False)
self.add_item(section, 'ssl_first', bool, True, 'try SSL handshake first, fallback to checktype on failure (default: True)', False)
self.add_item(section, 'ssl_only', bool, False, 'when ssl_first enabled, skip secondary check on SSL failure (default: False)', False)
self.add_item(section, 'scale_cooldown', int, 10, 'seconds between thread scaling decisions (default: 10)', False)
@@ -161,6 +169,8 @@ class Config(ComboParser):
self.add_item(section, 'batch_size', int, 100, 'proxies per work batch (default: 100)', False)
self.add_item(section, 'heartbeat', int, 60, 'heartbeat interval in seconds (default: 60)', False)
self.add_item(section, 'claim_timeout', int, 300, 'seconds before unclaimed work is released (default: 300)', False)
self.add_item(section, 'url_batch_size', int, 5, 'URLs per claim cycle for V2 mode (default: 5)', False)
self.add_item(section, 'fetch_timeout', int, 30, 'timeout for URL fetching in V2 mode (default: 30)', False)
self.aparser.add_argument("--file", help="import a single file containing proxy addrs", type=str, default='', required=False)
self.aparser.add_argument("--nobs", help="disable BeautifulSoup, use stdlib HTMLParser", action='store_true', default=False)
@@ -172,3 +182,4 @@ class Config(ComboParser):
self.aparser.add_argument("--worker-key", help="worker authentication key", type=str, default='')
self.aparser.add_argument("--register", help="register as worker with master server", action='store_true', default=False)
self.aparser.add_argument("--worker-name", help="worker name for registration (default: hostname)", type=str, default='')
self.aparser.add_argument("--worker-v2", help="run as V2 worker (URL-driven fetching)", action='store_true', default=False)

123
dbs.py
View File

@@ -66,6 +66,83 @@ def _migrate_confidence_column(sqlite):
sqlite.commit()
def _migrate_source_proto(sqlite):
"""Add source_proto columns to preserve scraper-detected protocol intelligence."""
try:
sqlite.execute('SELECT source_proto FROM proxylist LIMIT 1')
except Exception:
# source_proto: protocol detected by scraper (never overwritten by tests)
sqlite.execute('ALTER TABLE proxylist ADD COLUMN source_proto TEXT')
# source_confidence: scraper confidence score (0-100)
sqlite.execute('ALTER TABLE proxylist ADD COLUMN source_confidence INT DEFAULT 0')
sqlite.commit()
def _migrate_protos_working(sqlite):
"""Add protos_working column for multi-protocol storage."""
try:
sqlite.execute('SELECT protos_working FROM proxylist LIMIT 1')
except Exception:
# protos_working: comma-separated list of working protos (e.g. "http,socks5")
sqlite.execute('ALTER TABLE proxylist ADD COLUMN protos_working TEXT')
sqlite.commit()
def _migrate_last_seen(sqlite):
"""Add last_seen column for worker-reported proxy freshness tracking."""
try:
sqlite.execute('SELECT last_seen FROM proxylist LIMIT 1')
except Exception:
# last_seen: unix timestamp of most recent "working" report from any worker
sqlite.execute('ALTER TABLE proxylist ADD COLUMN last_seen INT DEFAULT 0')
sqlite.commit()
def _migrate_uri_check_interval(sqlite):
"""Add adaptive check_interval column to uris table."""
try:
sqlite.execute('SELECT check_interval FROM uris LIMIT 1')
except Exception:
sqlite.execute('ALTER TABLE uris ADD COLUMN check_interval INT DEFAULT 3600')
sqlite.commit()
def _migrate_uri_working_ratio(sqlite):
"""Add working_ratio column to uris table for proxy quality tracking."""
try:
sqlite.execute('SELECT working_ratio FROM uris LIMIT 1')
except Exception:
sqlite.execute('ALTER TABLE uris ADD COLUMN working_ratio REAL DEFAULT 0.0')
sqlite.commit()
def _migrate_uri_avg_fetch_time(sqlite):
"""Add avg_fetch_time column to uris table for fetch latency EMA."""
try:
sqlite.execute('SELECT avg_fetch_time FROM uris LIMIT 1')
except Exception:
sqlite.execute('ALTER TABLE uris ADD COLUMN avg_fetch_time INT DEFAULT 0')
sqlite.commit()
def _migrate_uri_last_worker(sqlite):
"""Add last_worker column to uris table."""
try:
sqlite.execute('SELECT last_worker FROM uris LIMIT 1')
except Exception:
sqlite.execute('ALTER TABLE uris ADD COLUMN last_worker TEXT')
sqlite.commit()
def _migrate_uri_yield_rate(sqlite):
"""Add yield_rate column to uris table for proxy yield EMA."""
try:
sqlite.execute('SELECT yield_rate FROM uris LIMIT 1')
except Exception:
sqlite.execute('ALTER TABLE uris ADD COLUMN yield_rate REAL DEFAULT 0.0')
sqlite.commit()
def compute_proxy_list_hash(proxies):
"""Compute MD5 hash of sorted proxy list for change detection.
@@ -290,13 +367,20 @@ def create_table_if_not_exists(sqlite, dbname):
asn INT,
latitude REAL,
longitude REAL,
confidence INT DEFAULT 30)""")
confidence INT DEFAULT 30,
source_proto TEXT,
source_confidence INT DEFAULT 0,
protos_working TEXT,
last_seen INT DEFAULT 0)""")
# Migration: add columns to existing databases (must run before creating indexes)
_migrate_latency_columns(sqlite)
_migrate_anonymity_columns(sqlite)
_migrate_asn_column(sqlite)
_migrate_geolocation_columns(sqlite)
_migrate_confidence_column(sqlite)
_migrate_source_proto(sqlite)
_migrate_protos_working(sqlite)
_migrate_last_seen(sqlite)
# Indexes for common query patterns
sqlite.execute('CREATE INDEX IF NOT EXISTS idx_proxylist_failed ON proxylist(failed)')
sqlite.execute('CREATE INDEX IF NOT EXISTS idx_proxylist_tested ON proxylist(tested)')
@@ -317,6 +401,11 @@ def create_table_if_not_exists(sqlite, dbname):
content_hash TEXT)""")
# Migration for existing databases
_migrate_content_hash_column(sqlite)
_migrate_uri_check_interval(sqlite)
_migrate_uri_working_ratio(sqlite)
_migrate_uri_avg_fetch_time(sqlite)
_migrate_uri_last_worker(sqlite)
_migrate_uri_yield_rate(sqlite)
# Indexes for common query patterns
sqlite.execute('CREATE INDEX IF NOT EXISTS idx_uris_error ON uris(error)')
sqlite.execute('CREATE INDEX IF NOT EXISTS idx_uris_checktime ON uris(check_time)')
@@ -444,11 +533,11 @@ def insert_proxies(proxydb, proxies, url):
filtered += 1
continue
rows.append((timestamp, addr, ip, port, proto, 1, 0, 0, 0, 0, 0, confidence))
rows.append((timestamp, addr, ip, port, proto, 1, 0, 0, 0, 0, 0, confidence, proto, confidence))
proxydb.executemany(
'INSERT OR IGNORE INTO proxylist '
'(added,proxy,ip,port,proto,failed,tested,success_count,total_duration,mitm,consecutive_success,confidence) '
'VALUES (?,?,?,?,?,?,?,?,?,?,?,?)',
'(added,proxy,ip,port,proto,failed,tested,success_count,total_duration,mitm,consecutive_success,confidence,source_proto,source_confidence) '
'VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?)',
rows
)
proxydb.commit()
@@ -508,6 +597,32 @@ PROXY_SOURCES = [
'https://api.proxyscrape.com/v2/?request=displayproxies&protocol=http&timeout=10000&country=all',
'https://api.proxyscrape.com/v2/?request=displayproxies&protocol=socks4&timeout=10000&country=all',
'https://api.proxyscrape.com/v2/?request=displayproxies&protocol=socks5&timeout=10000&country=all',
# proxifly/free-proxy-list - 5 min updates (jsDelivr CDN)
'https://cdn.jsdelivr.net/gh/proxifly/free-proxy-list@main/proxies/protocols/http/data.txt',
'https://cdn.jsdelivr.net/gh/proxifly/free-proxy-list@main/proxies/protocols/socks4/data.txt',
'https://cdn.jsdelivr.net/gh/proxifly/free-proxy-list@main/proxies/protocols/socks5/data.txt',
# vakhov/fresh-proxy-list - 5-20 min updates (GitHub Pages)
'https://vakhov.github.io/fresh-proxy-list/http.txt',
'https://vakhov.github.io/fresh-proxy-list/socks4.txt',
'https://vakhov.github.io/fresh-proxy-list/socks5.txt',
# prxchk/proxy-list - 10 min updates
'https://raw.githubusercontent.com/prxchk/proxy-list/main/http.txt',
'https://raw.githubusercontent.com/prxchk/proxy-list/main/socks4.txt',
'https://raw.githubusercontent.com/prxchk/proxy-list/main/socks5.txt',
# sunny9577/proxy-scraper - 3 hour updates (GitHub Pages)
'https://sunny9577.github.io/proxy-scraper/generated/http_proxies.txt',
'https://sunny9577.github.io/proxy-scraper/generated/socks4_proxies.txt',
'https://sunny9577.github.io/proxy-scraper/generated/socks5_proxies.txt',
# officialputuid/KangProxy - 4-6 hour updates
'https://raw.githubusercontent.com/officialputuid/KangProxy/KangProxy/http/http.txt',
'https://raw.githubusercontent.com/officialputuid/KangProxy/KangProxy/socks4/socks4.txt',
'https://raw.githubusercontent.com/officialputuid/KangProxy/KangProxy/socks5/socks5.txt',
# hookzof/socks5_list - hourly updates
'https://raw.githubusercontent.com/hookzof/socks5_list/master/proxy.txt',
# iplocate/free-proxy-list - 30 min updates
'https://raw.githubusercontent.com/iplocate/free-proxy-list/main/protocols/http.txt',
'https://raw.githubusercontent.com/iplocate/free-proxy-list/main/protocols/socks4.txt',
'https://raw.githubusercontent.com/iplocate/free-proxy-list/main/protocols/socks5.txt',
]

View File

@@ -1,11 +0,0 @@
version: '3.8'
services:
ppf:
build: .
volumes:
- .:/app
working_dir: /app
command: python ppf.py
environment:
- PYTHONUNBUFFERED=1

View File

@@ -0,0 +1,572 @@
# Design: Worker-Driven Discovery
## Status
**Proposal** -- Not yet implemented.
## Problem
The current architecture centralizes all proxy list fetching on the master
node (odin). Workers only test proxies handed to them. This creates several
issues:
1. **Single point of fetch** -- If odin can't reach a source (blocked IP,
transient failure), that source is dead for everyone.
2. **Bandwidth concentration** -- Odin fetches 40 proxy lists every cycle,
extracts proxies, deduplicates, and stores them before workers ever see
them.
3. **Wasted vantage points** -- Workers sit behind different Tor exits and
IPs, but never use that diversity for fetching.
4. **Tight coupling** -- Workers can't operate at all without the master's
claim queue. If odin restarts, all workers stall.
## Proposed Architecture
Move proxy list fetching to workers. Master becomes a coordinator and
aggregator rather than a fetcher.
```
Current: Proposed:
Master Master
Fetch URLs --------+ Manage URL database
Extract proxies | Score URLs from feedback
Store proxylist | Aggregate working proxies
Serve /api/work ---+-> Workers Serve /api/claim-urls ----> Workers
<- /api/results |
<- /api/report-urls ------+
<- /api/report-proxies ---+
```
### Role Changes
```
+--------+---------------------------+----------------------------------+
| Host | Current Role | New Role |
+--------+---------------------------+----------------------------------+
| odin | Fetch URLs | Maintain URL database |
| | Extract proxies | Score URLs from worker feedback |
| | Store proxylist | Aggregate reported proxies |
| | Distribute proxy batches | Distribute URL batches |
| | Collect test results | Collect URL + proxy reports |
+--------+---------------------------+----------------------------------+
| worker | Claim proxy batch | Claim URL batch |
| | Test each proxy | Fetch URL, extract proxies |
| | Report pass/fail | Test extracted proxies |
| | | Report URL health + proxy results|
+--------+---------------------------+----------------------------------+
```
## Data Flow
### Phase 1: URL Claiming
Worker requests a batch of URLs to process.
```
Worker Master
| |
| GET /api/claim-urls |
| ?key=...&count=5 |
|------------------------------>|
| | Select due URLs from uris table
| | Mark as claimed (in-memory)
| [{url, last_hash, proto_hint}, ...]
|<------------------------------|
```
**Claim response:**
```json
{
"worker_id": "abc123",
"urls": [
{
"url": "https://raw.githubusercontent.com/.../http.txt",
"last_hash": "a1b2c3d4...",
"proto_hint": "http",
"priority": 1
}
]
}
```
Fields:
- `last_hash` -- MD5 of last extracted proxy list. Worker can skip
extraction and report "unchanged" if hash matches, saving CPU.
- `proto_hint` -- Protocol inferred from URL path. Worker uses this for
extraction confidence scoring.
- `priority` -- Higher = fetch sooner. Based on URL score.
### Phase 2: Fetch and Extract
Worker fetches each URL through Tor, extracts proxies using the existing
`fetch.extract_proxies()` pipeline.
```
Worker
|
| For each claimed URL:
| 1. Fetch through Tor (fetch_contents)
| 2. Compute content hash (MD5)
| 3. If hash == last_hash: skip extraction, report unchanged
| 4. Else: extract_proxies() -> list of (addr, proto, confidence)
| 5. Queue extracted proxies for testing
|
```
### Phase 3: URL Feedback
Worker reports fetch results for each URL back to master.
```
Worker Master
| |
| POST /api/report-urls |
| {reports: [...]} |
|------------------------------>|
| | Update uris table:
| | check_time, error, stale_count,
| | retrievals, proxies_added,
| | content_hash, worker_scores
| {ok: true} |
|<------------------------------|
```
**URL report payload:**
```json
{
"reports": [
{
"url": "https://...",
"success": true,
"content_hash": "a1b2c3d4...",
"proxy_count": 1523,
"fetch_time_ms": 2340,
"changed": true,
"error": null
},
{
"url": "https://...",
"success": false,
"content_hash": null,
"proxy_count": 0,
"fetch_time_ms": 0,
"changed": false,
"error": "timeout"
}
]
}
```
### Phase 4: Proxy Testing and Reporting
Worker tests extracted proxies locally using the existing `TargetTestJob`
pipeline. **Only working proxies are reported to master.** Failed proxies
are discarded silently -- no point wasting bandwidth on negatives.
Workers are trusted. If a worker says a proxy works, master accepts it.
```
Worker Master
| |
| Test proxies locally |
| (same TargetTestJob flow) |
| Discard failures |
| |
| POST /api/report-proxies |
| {proxies: [...]} | (working only)
|------------------------------>|
| | Upsert into proxylist:
| | INSERT OR REPLACE
| | Set failed=0, update last_seen
| {ok: true} |
|<------------------------------|
```
**Proxy report payload (working only):**
```json
{
"proxies": [
{
"ip": "1.2.3.4",
"port": 8080,
"proto": "socks5",
"source_proto": "socks5",
"latency": 1.234,
"exit_ip": "5.6.7.8",
"anonymity": "elite",
"source_url": "https://..."
}
]
}
```
No `working` field needed -- everything in the report is working by
definition. The `source_url` links proxy provenance to the URL that
yielded it, enabling URL quality scoring.
### Complete Cycle
```
Worker main loop:
1. GET /api/claim-urls Claim batch of URLs
2. For each URL:
a. Fetch through Tor
b. Extract proxies (or skip if unchanged)
c. Test extracted proxies
3. POST /api/report-urls Report URL health
4. POST /api/report-proxies Report proxy results
5. POST /api/heartbeat Health check
6. Sleep, repeat
```
## Master-Side Changes
### URL Scheduling
Current `Leechered` threads fetch URLs on a timer based on error/stale
count. Replace with a scoring system that workers consume.
**URL score** (higher = fetch sooner):
```
score = base_score
+ freshness_bonus # High-frequency sources score higher
- error_penalty # Consecutive errors reduce score
- stale_penalty # Unchanged content reduces score
+ yield_bonus # URLs that produce many proxies score higher
+ quality_bonus # URLs whose proxies actually work score higher
```
Concrete formula:
```python
def url_score(url_row):
age = now - url_row.check_time
base = age / url_row.check_interval # 1.0 when due
# Yield: proxies found per fetch (rolling average)
yield_rate = url_row.proxies_added / max(url_row.retrievals, 1)
yield_bonus = min(yield_rate / 100.0, 1.0) # Cap at 1.0
# Quality: what % of extracted proxies actually worked
quality_bonus = url_row.working_ratio * 0.5 # 0.0 to 0.5
# Penalties
error_penalty = min(url_row.error * 0.3, 2.0)
stale_penalty = min(url_row.stale_count * 0.1, 1.0)
return base + yield_bonus + quality_bonus - error_penalty - stale_penalty
```
URLs with `score >= 1.0` are due for fetching. Claimed URLs are locked
in memory for `claim_timeout` seconds (existing pattern).
### New uris Columns
```sql
ALTER TABLE uris ADD COLUMN check_interval INT DEFAULT 3600;
ALTER TABLE uris ADD COLUMN working_ratio REAL DEFAULT 0.0;
ALTER TABLE uris ADD COLUMN avg_fetch_time INT DEFAULT 0;
ALTER TABLE uris ADD COLUMN last_worker TEXT;
ALTER TABLE uris ADD COLUMN yield_rate REAL DEFAULT 0.0;
```
- `check_interval` -- Adaptive: decreases for high-yield URLs, increases
for stale/erroring ones. Replaces the `checktime + error * perfail`
formula with a persisted value.
- `working_ratio` -- EMA of (working_proxies / total_proxies) from worker
feedback. URLs that yield dead proxies get deprioritized.
- `avg_fetch_time` -- EMA of fetch duration in ms. Helps identify slow
sources.
- `last_worker` -- Which worker last fetched this URL. Useful for
debugging, and to distribute URLs across workers evenly.
- `yield_rate` -- EMA of proxies extracted per fetch.
### Proxy Aggregation
Trust model: **workers are trusted.** If any worker reports a proxy as
working, master accepts it. Failed proxies are never reported -- workers
discard them locally.
```
Worker A reports: 1.2.3.4:8080 working, latency 1.2s
Worker B reports: 1.2.3.4:8080 working, latency 1.5s
Master action:
- INSERT OR REPLACE with latest report
- Update last_seen, latency EMA
- Set failed = 0
```
No consensus, no voting, no trust scoring. A proxy lives as long as at
least one worker keeps confirming it. It dies when nobody reports it for
`proxy_ttl` seconds.
New `proxylist` column:
```sql
ALTER TABLE proxylist ADD COLUMN last_seen INT DEFAULT 0;
```
- `last_seen` -- Unix timestamp of most recent "working" report. Proxies
not seen in N hours are expired by the master's periodic cleanup.
### Proxy Expiry
Working proxies that haven't been reported by any worker within
`proxy_ttl` (default: 4 hours) are marked stale and re-queued for
testing. After `proxy_ttl * 3` with no reports, they're marked failed.
```python
def expire_stale_proxies(db, proxy_ttl):
cutoff_stale = now - proxy_ttl
cutoff_dead = now - (proxy_ttl * 3)
# Mark stale proxies for retesting
db.execute('''
UPDATE proxylist SET failed = 1
WHERE failed = 0 AND last_seen < ? AND last_seen > 0
''', (cutoff_stale,))
# Kill proxies not seen in a long time
db.execute('''
UPDATE proxylist SET failed = -1
WHERE failed > 0 AND last_seen < ? AND last_seen > 0
''', (cutoff_dead,))
```
## Worker-Side Changes
### New Worker Loop
Replace the current claim-test-report loop with a two-phase loop:
```python
def worker_main_v2(config):
register()
verify_tor()
while True:
# Phase 1: Fetch URLs and extract proxies
urls = claim_urls(server, key, count=5)
url_reports = []
proxy_batch = []
for url_info in urls:
report, proxies = fetch_and_extract(url_info)
url_reports.append(report)
proxy_batch.extend(proxies)
report_urls(server, key, url_reports)
# Phase 2: Test extracted proxies, report working only
if proxy_batch:
working = test_proxies(proxy_batch)
if working:
report_proxies(server, key, working)
heartbeat(server, key)
sleep(1)
```
### fetch_and_extract()
New function that combines fetching + extraction on the worker side:
```python
def fetch_and_extract(url_info):
url = url_info['url']
last_hash = url_info.get('last_hash')
proto_hint = url_info.get('proto_hint')
start = time.time()
try:
content = fetch_contents(url, head=False, proxy=tor_proxy)
except Exception as e:
return {'url': url, 'success': False, 'error': str(e)}, []
elapsed = int((time.time() - start) * 1000)
content_hash = hashlib.md5(content).hexdigest()
if content_hash == last_hash:
return {
'url': url, 'success': True, 'content_hash': content_hash,
'proxy_count': 0, 'fetch_time_ms': elapsed,
'changed': False, 'error': None
}, []
proxies = extract_proxies(content, url)
return {
'url': url, 'success': True, 'content_hash': content_hash,
'proxy_count': len(proxies), 'fetch_time_ms': elapsed,
'changed': True, 'error': None
}, proxies
```
### Deduplication
Workers may extract the same proxies from different URLs. Local
deduplication before testing:
```python
seen = set()
unique = []
for addr, proto, confidence in proxy_batch:
if addr not in seen:
seen.add(addr)
unique.append((addr, proto, confidence))
proxy_batch = unique
```
### Proxy Testing
Reuse the existing `TargetTestJob` / `WorkerThread` pipeline. The only
change: proxies come from local extraction instead of master's claim
response. The test loop, result collection, and evaluation logic remain
identical.
## API Changes Summary
### New Endpoints
| Endpoint | Method | Purpose |
|----------|--------|---------|
| `/api/claim-urls` | GET | Worker claims batch of due URLs |
| `/api/report-urls` | POST | Worker reports URL fetch results |
| `/api/report-proxies` | POST | Worker reports proxy test results |
### Modified Endpoints
| Endpoint | Change |
|----------|--------|
| `/api/work` | Deprecated but kept for backward compatibility |
| `/api/results` | Deprecated but kept for backward compatibility |
### Unchanged Endpoints
| Endpoint | Reason |
|----------|--------|
| `/api/register` | Same registration flow |
| `/api/heartbeat` | Same health reporting |
| `/dashboard` | Still reads from same DB |
| `/proxies` | Still reads from proxylist |
## Schema Changes
### uris table additions
```sql
ALTER TABLE uris ADD COLUMN check_interval INT DEFAULT 3600;
ALTER TABLE uris ADD COLUMN working_ratio REAL DEFAULT 0.0;
ALTER TABLE uris ADD COLUMN avg_fetch_time INT DEFAULT 0;
ALTER TABLE uris ADD COLUMN last_worker TEXT;
ALTER TABLE uris ADD COLUMN yield_rate REAL DEFAULT 0.0;
```
### proxylist table additions
```sql
ALTER TABLE proxylist ADD COLUMN last_seen INT DEFAULT 0;
```
## Migration Strategy
### Phase 1: Add New Endpoints (non-breaking)
Add `/api/claim-urls`, `/api/report-urls`, `/api/report-proxies` to
`httpd.py`. Keep all existing endpoints working. Master still runs its
own `Leechered` threads.
Files: `httpd.py`, `dbs.py` (migrations)
### Phase 2: Worker V2 Mode
Add `--worker-v2` flag to `ppf.py`. When set, worker uses the new
URL-claiming loop instead of the proxy-claiming loop. Both modes coexist.
Old workers (`--worker`) continue working against `/api/work` and
`/api/results`. New workers (`--worker-v2`) use the new endpoints.
Files: `ppf.py`, `config.py`
### Phase 3: URL Scoring
Implement URL scoring in master based on worker feedback. Replace
`Leechered` timer-based scheduling with score-based scheduling. Master's
own fetching becomes a fallback for URLs no worker has claimed recently.
Files: `httpd.py`, `dbs.py`
### Phase 4: Remove Legacy
Once all workers run V2, remove `/api/work`, `/api/results`, and
master-side `Leechered` threads. Master no longer fetches proxy lists
directly.
Files: `ppf.py`, `httpd.py`
## Configuration
### New config.ini Options
```ini
[worker]
# V2 mode: worker fetches URLs instead of proxy batches
mode = v2 # v1 (legacy) or v2 (url-driven)
url_batch_size = 5 # URLs per claim cycle
max_proxies_per_cycle = 500 # Cap on proxies tested per cycle
fetch_timeout = 30 # Timeout for URL fetching (seconds)
[ppf]
# URL scoring weights
score_yield_weight = 1.0
score_quality_weight = 0.5
score_error_penalty = 0.3
score_stale_penalty = 0.1
# Proxy expiry
proxy_ttl = 14400 # Seconds before unseen proxy goes stale (4h)
proxy_ttl_dead = 43200 # Seconds before unseen proxy is killed (12h)
# Fallback: master fetches URLs not claimed by any worker
fallback_fetch = true
fallback_interval = 7200 # Seconds before master fetches unclaimed URL
```
## Risks and Mitigations
| Risk | Impact | Mitigation |
|------|--------|------------|
| Workers extract different proxy counts from same URL | Inconsistent proxy_count in reports | Use content_hash for dedup; only update yield_rate when hash changes |
| Tor exit blocks a source for one worker | Worker reports error for a working URL | Require 2+ consecutive errors before incrementing URL error count |
| Workers test same proxies redundantly | Wasted CPU | Master tracks which URLs are assigned to which workers; avoid assigning same URL to multiple workers in same cycle |
| Large proxy lists overwhelm worker memory | OOM on worker | Cap `max_proxies_per_cycle`; worker discards excess after dedup |
| Master restart loses claim state | Workers refetch recently-fetched URLs | Harmless -- just a redundant fetch. content_hash prevents duplicate work |
| `fetch.py` imports unavailable on worker image | ImportError | Verify worker Dockerfile includes fetch.py and dependencies |
## What Stays the Same
- `rocksock.py` -- No changes to proxy chain logic
- `connection_pool.py` -- Tor host selection unchanged
- `proxywatchd.py` core -- `TargetTestJob`, `WorkerThread`, `ProxyTestState`
remain identical. Only the job source changes.
- `fetch.py` -- Used on workers now, but the code itself doesn't change
- `httpd.py` dashboard/proxies -- Still reads from same `proxylist` table
- SQLite as storage -- No database engine change
## Open Questions
1. **Should workers share extracted proxy lists with each other?** Peer
exchange would reduce redundant fetching but adds protocol complexity.
Recommendation: no, keep it simple. Master deduplicates via
`INSERT OR REPLACE`.
2. **Should URL claiming be weighted by worker geography?** Some sources
may be accessible from certain Tor exits but not others.
Recommendation: defer. Let natural retries handle this; track
per-worker URL success rates for future optimization.
3. **What's the right `proxy_ttl`?** Too short and we churn proxies
needlessly. Too long and we serve stale data. Start with 4 hours,
tune based on observed proxy lifetime distribution.

View File

@@ -56,6 +56,8 @@ class FetchSession(object):
def fetch(self, url, head=False):
"""Fetch URL, reusing connection if possible."""
network_stats.set_category('scraper')
if isinstance(url, unicode):
url = url.encode('utf-8')
host, port, ssl, uri = _parse_url(url)
# Check if we can reuse existing connection
@@ -489,6 +491,8 @@ def fetch_contents(url, head=False, proxy=None):
retry_messages = ('Engines cannot retrieve results', 'Rate limit exceeded')
def _fetch_contents(url, head = False, proxy=None):
network_stats.set_category('scraper')
if isinstance(url, unicode):
url = url.encode('utf-8')
host, port, ssl, uri = _parse_url(url)
headers=[
'Accept-Language: en-US,en;q=0.8',

598
httpd.py
View File

@@ -80,14 +80,30 @@ _master_key = None # master key for worker registration
_claim_timeout = 300 # seconds before unclaimed work is released
_workers_file = 'data/workers.json' # persistent storage
# URL claim tracking (parallel to proxy claims)
_url_claims = {} # url -> {worker_id, claimed_at}
_url_claims_lock = threading.Lock()
_url_claim_timeout = 600 # 10 min (URLs take longer to fetch+extract)
# URL scoring: pending proxy counts for working_ratio correlation
_url_pending_counts = {} # url -> {total, worker_id, time}
_url_pending_lock = threading.Lock()
_url_database_path = None # set in ProxyAPIServer.__init__ for cross-db access
# URL scoring defaults (overridden by configure_url_scoring)
_url_checktime = 3600
_url_perfail_checktime = 3600
_url_max_fail = 10
_url_list_max_age_days = 7
# Test rate tracking: worker_id -> list of (timestamp, count) tuples
_worker_test_history = {}
_worker_test_history_lock = threading.Lock()
_test_history_window = 120 # seconds to keep test history for rate calculation
# Fair distribution settings
_min_batch_size = 100 # minimum proxies per batch
_max_batch_size = 1000 # maximum proxies per batch
_min_batch_size = 1 # minimum proxies per batch
_max_batch_size = 10000 # maximum proxies per batch
_worker_timeout = 120 # seconds before worker considered inactive
# Session tracking
@@ -109,6 +125,15 @@ def configure_schedule(working_checktime, fail_retry_interval, fail_retry_backof
_max_fail = max_fail
def configure_url_scoring(checktime, perfail_checktime, max_fail, list_max_age_days):
"""Set URL scoring parameters from config."""
global _url_checktime, _url_perfail_checktime, _url_max_fail, _url_list_max_age_days
_url_checktime = checktime
_url_perfail_checktime = perfail_checktime
_url_max_fail = max_fail
_url_list_max_age_days = list_max_age_days
def _build_due_condition():
"""Build SQL condition for proxy due check.
@@ -170,19 +195,22 @@ def get_due_proxy_count(db):
def calculate_fair_batch_size(db, worker_id):
"""Calculate fair batch size based on active workers and queue size."""
"""Calculate fair batch size based on active workers and queue size.
Divides due work evenly among active workers. No artificial floor —
if only 6 proxies are due with 3 workers, each gets 2.
"""
active_workers = max(1, get_active_worker_count())
due_count = get_due_proxy_count(db)
if due_count == 0:
return _min_batch_size
return 0
# Fair share: divide due work among active workers
# Add 20% buffer for speed variations between workers
fair_share = int((due_count / active_workers) * 1.2)
# Fair share: divide due work evenly among active workers
fair_share = max(1, int(due_count / active_workers))
# Clamp to bounds
batch_size = max(_min_batch_size, min(fair_share, _max_batch_size))
# Clamp to upper bound only
batch_size = min(fair_share, _max_batch_size)
_log('fair_batch: due=%d workers=%d share=%d batch=%d' % (
due_count, active_workers, fair_share, batch_size), 'debug')
@@ -381,7 +409,7 @@ def claim_work(db, worker_id, count=100):
priority_params = [now_int, _working_checktime, now_int, _fail_retry_interval]
query = '''
SELECT ip, port, proto, failed,
SELECT ip, port, proto, failed, source_proto,
CASE
WHEN tested IS NULL THEN 0
WHEN (%s) > 3600 THEN 1
@@ -413,6 +441,7 @@ def claim_work(db, worker_id, count=100):
'port': row[1],
'proto': row[2],
'failed': row[3],
'source_proto': row[4],
})
if claimed:
@@ -421,6 +450,343 @@ def claim_work(db, worker_id, count=100):
return claimed
def claim_urls(url_db, worker_id, count=5):
"""Claim a batch of URLs for worker-driven fetching. Returns list of URL dicts.
Uses score-based scheduling: high-yield URLs checked more often,
stale/broken ones less. Score components:
- age/interval: 1.0 when due, >1.0 when overdue
- yield_bonus: capped at 1.0 for high-yield sources
- quality_bonus: 0-0.5 based on working_ratio
- error_penalty: 0-2.0 based on consecutive errors
- stale_penalty: 0-1.0 based on unchanged fetches
"""
now = time.time()
now_int = int(now)
# Import here to avoid circular dependency at module level
try:
from fetch import detect_proto_from_path
except ImportError:
detect_proto_from_path = None
# Clean expired URL claims and pending counts
with _url_claims_lock:
stale = [k for k, v in _url_claims.items() if now - v['claimed_at'] > _url_claim_timeout]
for k in stale:
del _url_claims[k]
claimed_urls = set(_url_claims.keys())
with _url_pending_lock:
stale_pending = [k for k, v in _url_pending_counts.items() if now - v['time'] > 600]
for k in stale_pending:
del _url_pending_counts[k]
list_max_age_seconds = _url_list_max_age_days * 86400
min_added = now_int - list_max_age_seconds
try:
rows = url_db.execute(
'''SELECT url, content_hash,
(? - check_time) * 1.0 / MAX(COALESCE(check_interval, 3600), 1)
+ MIN(COALESCE(yield_rate, 0) / 100.0, 1.0)
+ COALESCE(working_ratio, 0) * 0.5
- MIN(error * 0.3, 2.0)
- MIN(stale_count * 0.1, 1.0)
AS score
FROM uris
WHERE error < ?
AND (? - check_time) * 1.0 / MAX(COALESCE(check_interval, 3600), 1) >= 0.8
AND (added > ? OR proxies_added > 0)
ORDER BY score DESC
LIMIT ?''',
(now_int, _url_max_fail, now_int, min_added, count * 3)
).fetchall()
except Exception as e:
_log('claim_urls query error: %s' % e, 'error')
return []
# Filter out already-claimed URLs and lock new claims
claimed = []
with _url_claims_lock:
for row in rows:
url = row[0]
if url in _url_claims or url in claimed_urls:
continue
_url_claims[url] = {'worker_id': worker_id, 'claimed_at': now}
proto_hint = None
if detect_proto_from_path:
proto_hint = detect_proto_from_path(url)
claimed.append({
'url': url,
'last_hash': row[1],
'proto_hint': proto_hint,
})
if len(claimed) >= count:
break
if claimed:
_log('claim_urls: %d URLs to %s' % (len(claimed), worker_id[:8]), 'info')
return claimed
def submit_url_reports(url_db, worker_id, reports):
"""Process URL fetch feedback from workers. Returns count of processed reports.
Updates EMA metrics per URL:
- avg_fetch_time: exponential moving average of fetch latency
- check_interval: adaptive interval (shrinks for productive URLs, grows for stale)
- yield_rate: EMA of proxy count per fetch (on changed content)
- last_worker: worker that last fetched this URL
Stores pending proxy count for working_ratio correlation in submit_proxy_reports.
"""
processed = 0
now_int = int(time.time())
alpha = 0.3 # EMA smoothing factor
for r in reports:
url = r.get('url', '')
if not url:
continue
# Release URL claim
with _url_claims_lock:
if url in _url_claims:
del _url_claims[url]
try:
success = r.get('success', False)
content_hash = r.get('content_hash')
proxy_count = r.get('proxy_count', 0)
changed = r.get('changed', False)
fetch_time_ms = r.get('fetch_time_ms', 0)
# Fetch current row for EMA computation
row = url_db.execute(
'''SELECT check_interval, avg_fetch_time, yield_rate
FROM uris WHERE url = ?''', (url,)
).fetchone()
if not row:
processed += 1
continue
old_interval = row[0] if row[0] is not None else 3600
old_fetch_time = row[1] if row[1] is not None else 0
old_yield = row[2] if row[2] is not None else 0.0
# EMA: avg_fetch_time
if old_fetch_time > 0 and fetch_time_ms > 0:
new_fetch_time = int(alpha * fetch_time_ms + (1 - alpha) * old_fetch_time)
elif fetch_time_ms > 0:
new_fetch_time = fetch_time_ms
else:
new_fetch_time = old_fetch_time
if success:
if changed and proxy_count > 0:
# Success + changed + proxies: converge interval toward 15min
new_interval = max(900, int(old_interval * 0.9))
# EMA: yield_rate
new_yield = alpha * proxy_count + (1 - alpha) * old_yield
url_db.execute(
'''UPDATE uris SET
check_time = ?,
retrievals = retrievals + 1,
error = 0,
stale_count = 0,
content_hash = ?,
proxies_added = proxies_added + ?,
check_interval = ?,
avg_fetch_time = ?,
yield_rate = ?,
last_worker = ?
WHERE url = ?''',
(now_int, content_hash, proxy_count, new_interval,
new_fetch_time, new_yield, worker_id, url)
)
# Store pending count for working_ratio correlation
with _url_pending_lock:
_url_pending_counts[url] = {
'total': proxy_count,
'worker_id': worker_id,
'time': time.time(),
}
else:
# Success + unchanged (or no proxies): drift interval toward 24h
new_interval = min(86400, int(old_interval * 1.25))
url_db.execute(
'''UPDATE uris SET
check_time = ?,
retrievals = retrievals + 1,
error = 0,
stale_count = stale_count + 1,
content_hash = ?,
check_interval = ?,
avg_fetch_time = ?,
last_worker = ?
WHERE url = ?''',
(now_int, content_hash, new_interval,
new_fetch_time, worker_id, url)
)
else:
# Failure: back off faster
new_interval = min(86400, int(old_interval * 1.5))
url_db.execute(
'''UPDATE uris SET
check_time = ?,
error = error + 1,
check_interval = ?,
avg_fetch_time = ?,
last_worker = ?
WHERE url = ?''',
(now_int, new_interval, new_fetch_time, worker_id, url)
)
processed += 1
except Exception as e:
_log('submit_url_reports error for %s: %s' % (url, e), 'error')
url_db.commit()
return processed
def _update_url_working_ratios(url_working_counts):
"""Correlate working proxy counts with pending totals to update working_ratio.
Called after submit_proxy_reports processes all proxies. For each source_url
with a pending entry from submit_url_reports, computes:
ratio = working_count / pending_total
working_ratio = alpha * ratio + (1 - alpha) * old_working_ratio
"""
if not url_working_counts or not _url_database_path:
return
alpha = 0.3
settled = []
with _url_pending_lock:
pending_snapshot = dict(_url_pending_counts)
try:
url_db = mysqlite.mysqlite(_url_database_path, str)
for url, working_count in url_working_counts.items():
pending = pending_snapshot.get(url)
if not pending or pending['total'] <= 0:
continue
ratio = min(float(working_count) / pending['total'], 1.0)
row = url_db.execute(
'SELECT working_ratio FROM uris WHERE url = ?', (url,)
).fetchone()
old_ratio = row[0] if row and row[0] is not None else 0.0
new_ratio = alpha * ratio + (1 - alpha) * old_ratio
url_db.execute(
'UPDATE uris SET working_ratio = ? WHERE url = ?',
(new_ratio, url)
)
settled.append(url)
url_db.commit()
url_db.close()
except Exception as e:
_log('_update_url_working_ratios error: %s' % e, 'error')
# Remove settled entries from pending
if settled:
with _url_pending_lock:
for url in settled:
_url_pending_counts.pop(url, None)
def submit_proxy_reports(db, worker_id, proxies):
"""Process working-proxy reports from workers. Returns count of processed proxies.
Simplified trust-based model: workers report only working proxies.
Each proxy is upserted with failed=0, last_seen=now, latency updated.
Also tracks per-URL working counts for working_ratio correlation.
"""
global _last_workers_save
processed = 0
now_int = int(time.time())
now = time.time()
url_working_counts = {} # source_url -> working count
for p in proxies:
ip = p.get('ip', '')
port = p.get('port', 0)
if not ip or not port:
continue
proxy_key = '%s:%s' % (ip, port)
proto = p.get('proto', 'http')
latency = p.get('latency', 0)
source_url = p.get('source_url')
try:
# Upsert: insert new proxy or update existing as working
db.execute('''
INSERT INTO proxylist (proxy, ip, port, proto, failed, tested, added,
avg_latency, last_seen)
VALUES (?, ?, ?, ?, 0, ?, ?, ?, ?)
ON CONFLICT(proxy) DO UPDATE SET
failed = 0,
tested = excluded.tested,
proto = excluded.proto,
avg_latency = excluded.avg_latency,
last_seen = excluded.last_seen
''', (proxy_key, ip, port, proto, now_int, now_int, latency, now_int))
# Geolocate if IP2Location available
if _geolite and _geodb:
try:
rec = _geodb.get_all(ip)
if rec and rec.country_short and rec.country_short != '-':
db.execute(
'UPDATE proxylist SET country=? WHERE proxy=?',
(rec.country_short, proxy_key))
except Exception:
pass
# Track per-URL working count for working_ratio
if source_url:
url_working_counts[source_url] = url_working_counts.get(source_url, 0) + 1
processed += 1
except Exception as e:
_log('submit_proxy_reports error for %s: %s' % (proxy_key, e), 'error')
# Commit database changes
db.commit()
# Update working_ratio for source URLs
if url_working_counts:
_update_url_working_ratios(url_working_counts)
# Update worker stats
with _workers_lock:
if worker_id in _workers:
w = _workers[worker_id]
w['proxies_working'] = w.get('proxies_working', 0) + processed
w['last_seen'] = now
# Save workers periodically
if now - _last_workers_save > 60:
save_workers()
_last_workers_save = now
return processed
_last_workers_save = 0
def submit_results(db, worker_id, results):
@@ -914,6 +1280,7 @@ class ProxyAPIHandler(BaseHTTPServer.BaseHTTPRequestHandler):
'/api/stats/export': self.handle_stats_export,
'/api/countries': self.handle_countries,
'/proxies': self.handle_proxies,
'/proxies/all': self.handle_proxies_all,
'/proxies/count': self.handle_count,
'/health': self.handle_health,
}
@@ -930,6 +1297,7 @@ class ProxyAPIHandler(BaseHTTPServer.BaseHTTPRequestHandler):
'/api/stats': 'runtime statistics (JSON)',
'/api/stats/export': 'export stats (params: format=json|csv)',
'/proxies': 'list working proxies (params: limit, proto, country, asn)',
'/proxies/all': 'list ALL working proxies (params: proto, country, asn, format)',
'/proxies/count': 'count working proxies',
'/health': 'health check',
}
@@ -967,7 +1335,7 @@ class ProxyAPIHandler(BaseHTTPServer.BaseHTTPRequestHandler):
stats = {}
# Total counts
row = db.execute('SELECT COUNT(*) FROM proxylist WHERE failed=0').fetchone()
row = db.execute('SELECT COUNT(*) FROM proxylist WHERE failed=0 AND proto IS NOT NULL').fetchone()
stats['working'] = row[0] if row else 0
row = db.execute('SELECT COUNT(*) FROM proxylist').fetchone()
stats['total'] = row[0] if row else 0
@@ -1076,7 +1444,7 @@ class ProxyAPIHandler(BaseHTTPServer.BaseHTTPRequestHandler):
asn = params.get('asn', '')
fmt = params.get('format', 'json')
sql = 'SELECT ip, port, proto, country, asn, avg_latency FROM proxylist WHERE failed=0'
sql = 'SELECT ip, port, proto, country, asn, avg_latency, protos_working FROM proxylist WHERE failed=0 AND proto IS NOT NULL'
args = []
if proto:
@@ -1101,7 +1469,52 @@ class ProxyAPIHandler(BaseHTTPServer.BaseHTTPRequestHandler):
else:
proxies = [{
'ip': r[0], 'port': r[1], 'proto': r[2],
'country': r[3], 'asn': r[4], 'latency': r[5]
'country': r[3], 'asn': r[4], 'latency': r[5],
'protos': r[6].split(',') if r[6] else [r[2]]
} for r in rows]
self.send_json({'count': len(proxies), 'proxies': proxies})
except Exception as e:
self.send_json({'error': str(e)}, 500)
def handle_proxies_all(self):
params = {}
if '?' in self.path:
for pair in self.path.split('?')[1].split('&'):
if '=' in pair:
k, v = pair.split('=', 1)
params[k] = v
proto = params.get('proto', '')
country = params.get('country', '')
asn = params.get('asn', '')
fmt = params.get('format', 'json')
sql = 'SELECT ip, port, proto, country, asn, avg_latency, protos_working FROM proxylist WHERE failed=0 AND proto IS NOT NULL'
args = []
if proto:
sql += ' AND proto=?'
args.append(proto)
if country:
sql += ' AND country=?'
args.append(country.upper())
if asn:
sql += ' AND asn=?'
args.append(int(asn))
sql += ' ORDER BY avg_latency ASC, tested DESC'
try:
db = mysqlite.mysqlite(self.database, str)
rows = db.execute(sql, args).fetchall()
if fmt == 'plain':
self.send_text('\n'.join('%s://%s:%s' % (r[2] or 'http', r[0], r[1]) for r in rows))
else:
proxies = [{
'ip': r[0], 'port': r[1], 'proto': r[2],
'country': r[3], 'asn': r[4], 'latency': r[5],
'protos': r[6].split(',') if r[6] else [r[2]]
} for r in rows]
self.send_json({'count': len(proxies), 'proxies': proxies})
except Exception as e:
@@ -1110,7 +1523,7 @@ class ProxyAPIHandler(BaseHTTPServer.BaseHTTPRequestHandler):
def handle_count(self):
try:
db = mysqlite.mysqlite(self.database, str)
row = db.execute('SELECT COUNT(*) FROM proxylist WHERE failed=0').fetchone()
row = db.execute('SELECT COUNT(*) FROM proxylist WHERE failed=0 AND proto IS NOT NULL').fetchone()
self.send_json({'count': row[0] if row else 0})
except Exception as e:
self.send_json({'error': str(e)}, 500)
@@ -1126,14 +1539,17 @@ class ProxyAPIServer(threading.Thread):
otherwise falls back to standard BaseHTTPServer.
"""
def __init__(self, host, port, database, stats_provider=None, profiling=False):
def __init__(self, host, port, database, stats_provider=None, profiling=False, url_database=None):
threading.Thread.__init__(self)
self.host = host
self.port = port
self.database = database
self.url_database = url_database
self.stats_provider = stats_provider
self.profiling = profiling
self.daemon = True
global _url_database_path
_url_database_path = url_database
self.server = None
self._stop_event = threading.Event() if not GEVENT_PATCHED else None
# Load static library files into cache
@@ -1171,7 +1587,8 @@ class ProxyAPIServer(threading.Thread):
return [b'Method not allowed']
# POST only allowed for worker API endpoints
post_endpoints = ('/api/register', '/api/results', '/api/heartbeat')
post_endpoints = ('/api/register', '/api/results', '/api/heartbeat',
'/api/report-urls', '/api/report-proxies')
if method == 'POST' and path not in post_endpoints:
start_response('405 Method Not Allowed', [('Content-Type', 'text/plain')])
return [b'POST not allowed for this endpoint']
@@ -1242,7 +1659,11 @@ class ProxyAPIServer(threading.Thread):
'/api/results': 'submit test results (POST, params: key)',
'/api/register': 'register as worker (POST)',
'/api/workers': 'list connected workers',
'/api/claim-urls': 'claim URL batch for worker-driven fetching (GET, params: key, count)',
'/api/report-urls': 'report URL fetch results (POST, params: key)',
'/api/report-proxies': 'report working proxies (POST, params: key)',
'/proxies': 'list working proxies (params: limit, proto, country, asn)',
'/proxies/all': 'list ALL working proxies (params: proto, country, asn, format)',
'/proxies/count': 'count working proxies',
'/health': 'health check',
}
@@ -1392,18 +1813,76 @@ class ProxyAPIServer(threading.Thread):
return json.dumps({'error': str(e)}), 'application/json', 500
elif path == '/proxies':
try:
limit = min(int(query_params.get('limit', 100)), 1000)
proto = query_params.get('proto', '')
country = query_params.get('country', '')
asn = query_params.get('asn', '')
fmt = query_params.get('format', 'json')
sql = 'SELECT ip, port, proto, country, asn, avg_latency, protos_working FROM proxylist WHERE failed=0 AND proto IS NOT NULL'
args = []
if proto:
sql += ' AND proto=?'
args.append(proto)
if country:
sql += ' AND country=?'
args.append(country.upper())
if asn:
sql += ' AND asn=?'
args.append(int(asn))
sql += ' ORDER BY avg_latency ASC, tested DESC LIMIT ?'
args.append(limit)
db = mysqlite.mysqlite(self.database, str)
rows = db.execute(
'SELECT proxy, proto, country, asn FROM proxylist WHERE failed=0 LIMIT 100'
).fetchall()
proxies = [{'proxy': r[0], 'proto': r[1], 'country': r[2], 'asn': r[3]} for r in rows]
return json.dumps({'proxies': proxies}, indent=2), 'application/json', 200
rows = db.execute(sql, args).fetchall()
if fmt == 'plain':
return '\n'.join('%s:%s' % (r[0], r[1]) for r in rows), 'text/plain', 200
proxies = [{
'proxy': '%s:%s' % (r[0], r[1]), 'proto': r[2], 'country': r[3],
'asn': r[4], 'latency': r[5],
'protos': r[6].split(',') if r[6] else [r[2]]
} for r in rows]
return json.dumps({'count': len(proxies), 'proxies': proxies}, indent=2), 'application/json', 200
except Exception as e:
return json.dumps({'error': str(e)}), 'application/json', 500
elif path == '/proxies/all':
try:
proto = query_params.get('proto', '')
country = query_params.get('country', '')
asn = query_params.get('asn', '')
fmt = query_params.get('format', 'json')
sql = 'SELECT ip, port, proto, country, asn, avg_latency, protos_working FROM proxylist WHERE failed=0 AND proto IS NOT NULL'
args = []
if proto:
sql += ' AND proto=?'
args.append(proto)
if country:
sql += ' AND country=?'
args.append(country.upper())
if asn:
sql += ' AND asn=?'
args.append(int(asn))
sql += ' ORDER BY avg_latency ASC, tested DESC'
db = mysqlite.mysqlite(self.database, str)
rows = db.execute(sql, args).fetchall()
if fmt == 'plain':
return '\n'.join('%s://%s:%s' % (r[2] or 'http', r[0], r[1]) for r in rows), 'text/plain', 200
proxies = [{
'proxy': '%s:%s' % (r[0], r[1]), 'proto': r[2], 'country': r[3],
'asn': r[4], 'latency': r[5],
'protos': r[6].split(',') if r[6] else [r[2]]
} for r in rows]
return json.dumps({'count': len(proxies), 'proxies': proxies}, indent=2), 'application/json', 200
except Exception as e:
return json.dumps({'error': str(e)}), 'application/json', 500
elif path == '/proxies/count':
try:
db = mysqlite.mysqlite(self.database, str)
row = db.execute('SELECT COUNT(*) FROM proxylist WHERE failed=0').fetchone()
row = db.execute('SELECT COUNT(*) FROM proxylist WHERE failed=0 AND proto IS NOT NULL').fetchone()
return json.dumps({'count': row[0] if row else 0}), 'application/json', 200
except Exception as e:
return json.dumps({'error': str(e)}), 'application/json', 500
@@ -1514,6 +1993,79 @@ class ProxyAPIServer(threading.Thread):
_log('api/workers error: %s' % e, 'warn')
return json.dumps({'error': str(e)}), 'application/json', 500
elif path == '/api/claim-urls':
# Worker claims batch of URLs for fetching (GET)
key = query_params.get('key', '')
if not validate_worker_key(key):
return json.dumps({'error': 'invalid worker key'}), 'application/json', 403
worker_id, _ = get_worker_by_key(key)
if not worker_id:
return json.dumps({'error': 'worker not found'}), 'application/json', 404
if not self.url_database:
return json.dumps({'error': 'url database not configured'}), 'application/json', 500
count = min(int(query_params.get('count', 5)), 20)
try:
url_db = mysqlite.mysqlite(self.url_database, str)
urls = claim_urls(url_db, worker_id, count)
update_worker_heartbeat(worker_id)
return json.dumps({
'worker_id': worker_id,
'count': len(urls),
'urls': urls,
}), 'application/json', 200
except Exception as e:
return json.dumps({'error': str(e)}), 'application/json', 500
elif path == '/api/report-urls':
# Worker reports URL fetch results (POST)
key = query_params.get('key', '')
if not validate_worker_key(key):
return json.dumps({'error': 'invalid worker key'}), 'application/json', 403
worker_id, _ = get_worker_by_key(key)
if not worker_id:
return json.dumps({'error': 'worker not found'}), 'application/json', 404
if not self.url_database:
return json.dumps({'error': 'url database not configured'}), 'application/json', 500
if not post_data:
return json.dumps({'error': 'POST body required'}), 'application/json', 400
reports = post_data.get('reports', [])
if not reports:
return json.dumps({'error': 'no reports provided'}), 'application/json', 400
try:
url_db = mysqlite.mysqlite(self.url_database, str)
processed = submit_url_reports(url_db, worker_id, reports)
update_worker_heartbeat(worker_id)
return json.dumps({
'worker_id': worker_id,
'processed': processed,
}), 'application/json', 200
except Exception as e:
return json.dumps({'error': str(e)}), 'application/json', 500
elif path == '/api/report-proxies':
# Worker reports working proxies (POST)
key = query_params.get('key', '')
if not validate_worker_key(key):
return json.dumps({'error': 'invalid worker key'}), 'application/json', 403
worker_id, _ = get_worker_by_key(key)
if not worker_id:
return json.dumps({'error': 'worker not found'}), 'application/json', 404
if not post_data:
return json.dumps({'error': 'POST body required'}), 'application/json', 400
proxies = post_data.get('proxies', [])
if not proxies:
return json.dumps({'error': 'no proxies provided'}), 'application/json', 400
try:
db = mysqlite.mysqlite(self.database, str)
processed = submit_proxy_reports(db, worker_id, proxies)
update_worker_heartbeat(worker_id)
return json.dumps({
'worker_id': worker_id,
'processed': processed,
}), 'application/json', 200
except Exception as e:
return json.dumps({'error': str(e)}), 'application/json', 500
else:
return json.dumps({'error': 'not found'}), 'application/json', 404
@@ -1535,7 +2087,7 @@ class ProxyAPIServer(threading.Thread):
stats['top_countries'] = [{'code': r[0], 'count': r[1]} for r in rows]
# Total counts
row = db.execute('SELECT COUNT(*) FROM proxylist WHERE failed=0').fetchone()
row = db.execute('SELECT COUNT(*) FROM proxylist WHERE failed=0 AND proto IS NOT NULL').fetchone()
stats['working'] = row[0] if row else 0
row = db.execute('SELECT COUNT(*) FROM proxylist').fetchone()
stats['total'] = row[0] if row else 0

505
ppf.py
View File

@@ -5,10 +5,10 @@ __version__ = '2.0.0'
import sys
import os
# Worker mode requires gevent - must monkey-patch before other imports
if '--worker' in sys.argv or '--register' in sys.argv:
from gevent import monkey
monkey.patch_all()
# Gevent monkey-patch MUST happen before any other imports
# Both master (httpd) and worker modes use gevent for async I/O
from gevent import monkey
monkey.patch_all()
import cProfile
import pstats
@@ -369,6 +369,71 @@ def worker_send_heartbeat(server_url, worker_key, tor_ok, tor_ip=None, profiling
return False
def worker_claim_urls(server_url, worker_key, count=5):
"""Claim batch of URLs for V2 worker mode."""
url = '%s/api/claim-urls?key=%s&count=%d' % (server_url.rstrip('/'), worker_key, count)
try:
resp = urllib2.urlopen(url, timeout=30)
result = json.loads(resp.read())
return result.get('urls', [])
except urllib2.HTTPError as e:
if e.code == 403:
_log('worker key rejected (403), need to re-register', 'warn')
raise NeedReregister()
_log('failed to claim urls: %s' % e, 'error')
return []
except Exception as e:
_log('failed to claim urls: %s' % e, 'error')
return []
def worker_report_urls(server_url, worker_key, reports):
"""Report URL fetch results to master."""
url = '%s/api/report-urls?key=%s' % (server_url.rstrip('/'), worker_key)
data = json.dumps({'reports': reports})
req = urllib2.Request(url, data)
req.add_header('Content-Type', 'application/json')
try:
resp = urllib2.urlopen(req, timeout=30)
result = json.loads(resp.read())
return result.get('processed', 0)
except urllib2.HTTPError as e:
if e.code == 403:
_log('worker key rejected (403), need to re-register', 'warn')
raise NeedReregister()
_log('failed to report urls: %s' % e, 'error')
return 0
except Exception as e:
_log('failed to report urls: %s' % e, 'error')
return 0
def worker_report_proxies(server_url, worker_key, proxies):
"""Report working proxies to master."""
url = '%s/api/report-proxies?key=%s' % (server_url.rstrip('/'), worker_key)
data = json.dumps({'proxies': proxies})
req = urllib2.Request(url, data)
req.add_header('Content-Type', 'application/json')
try:
resp = urllib2.urlopen(req, timeout=30)
result = json.loads(resp.read())
return result.get('processed', 0)
except urllib2.HTTPError as e:
if e.code == 403:
_log('worker key rejected (403), need to re-register', 'warn')
raise NeedReregister()
_log('failed to report proxies: %s' % e, 'error')
return 0
except Exception as e:
_log('failed to report proxies: %s' % e, 'error')
return 0
def check_tor_connectivity(tor_hosts):
"""Test Tor connectivity. Returns (working_hosts, tor_ip)."""
import socket
@@ -428,6 +493,7 @@ def worker_main(config):
worker_name = config.args.worker_name or os.uname()[1]
batch_size = config.worker.batch_size
num_threads = config.watchd.threads
worker_id = None
# Register if --register flag or no key provided
if config.args.register or not worker_key:
@@ -598,6 +664,7 @@ def worker_main(config):
port = proxy_info['port']
proto = proxy_info.get('proto', 'http')
failed = proxy_info.get('failed', 0)
source_proto = proxy_info.get('source_proto')
proxy_str = '%s:%d' % (ip, port)
# Create state for this proxy
@@ -607,7 +674,7 @@ def worker_main(config):
country=None, mitm=0, consecutive_success=0,
asn=None, oldies=False,
completion_queue=completion_queue,
proxy_full=proxy_str
proxy_full=proxy_str, source_proto=source_proto
)
pending_states[proxy_str] = state
@@ -706,6 +773,412 @@ def worker_main(config):
_log(' proxies tested: %d' % proxies_tested, 'info')
def worker_v2_main(config):
"""V2 worker mode -- URL-driven discovery.
Claims URLs from master, fetches through Tor, extracts and tests proxies,
reports working proxies back to master.
"""
import json
global urllib2
try:
import Queue
except ImportError:
import queue as Queue
import proxywatchd
proxywatchd.set_config(config)
server_url = config.args.server
if not server_url:
_log('--server URL required for worker mode', 'error')
sys.exit(1)
worker_key = config.args.worker_key
worker_name = config.args.worker_name or os.uname()[1]
num_threads = config.watchd.threads
url_batch_size = config.worker.url_batch_size
worker_id = None
# Register if --register flag or no key provided
if config.args.register or not worker_key:
_log('registering with master: %s' % server_url, 'info')
worker_id, worker_key = worker_register(server_url, worker_name)
if not worker_key:
_log('registration failed, exiting', 'error')
sys.exit(1)
_log('registered as %s (id: %s)' % (worker_name, worker_id), 'info')
_log('worker key: %s' % worker_key, 'info')
_log('save this key with --worker-key for future runs', 'info')
if config.args.register:
return
_log('starting worker V2 mode (URL-driven)', 'info')
_log(' server: %s' % server_url, 'info')
_log(' threads: %d' % num_threads, 'info')
_log(' url batch: %d' % url_batch_size, 'info')
_log(' tor hosts: %s' % config.common.tor_hosts, 'info')
# Verify Tor connectivity before starting
import socks
working_tor_hosts = []
for tor_host in config.torhosts:
host, port = tor_host.split(':')
port = int(port)
try:
test_sock = socks.socksocket()
test_sock.set_proxy(socks.SOCKS5, host, port)
test_sock.settimeout(10)
test_sock.connect(('check.torproject.org', 80))
test_sock.send(b'GET / HTTP/1.0\r\nHost: check.torproject.org\r\n\r\n')
resp = test_sock.recv(512)
test_sock.close()
if resp and (b'HTTP/' in resp or len(resp) > 0):
status = resp.split(b'\r\n')[0] if b'\r\n' in resp else resp[:50]
_log('tor host %s:%d OK (%s)' % (host, port, status), 'info')
working_tor_hosts.append(tor_host)
else:
_log('tor host %s:%d no response' % (host, port), 'warn')
except Exception as e:
_log('tor host %s:%d failed: %s' % (host, port, e), 'warn')
if not working_tor_hosts:
_log('no working Tor hosts, cannot start worker', 'error')
sys.exit(1)
_log('%d/%d Tor hosts verified' % (len(working_tor_hosts), len(config.torhosts)), 'info')
# Create shared queues for worker threads
job_queue = proxywatchd.PriorityJobQueue()
completion_queue = Queue.Queue()
# Spawn worker threads
threads = []
for i in range(num_threads):
wt = proxywatchd.WorkerThread('w%d' % i, job_queue)
wt.start_thread()
threads.append(wt)
time.sleep(random.random() / 10)
_log('spawned %d worker threads' % len(threads), 'info')
# Session for fetching URLs through Tor
session = fetch.FetchSession()
cycles = 0
urls_fetched = 0
proxies_found = 0
proxies_working = 0
start_time = time.time()
current_tor_ip = None
consecutive_tor_failures = 0
worker_profiling = config.args.profile or config.common.profiling
wstate = {'worker_key': worker_key, 'worker_id': worker_id, 'backoff': 10}
def do_register():
"""Register with master, with exponential backoff on failure."""
while True:
_log('registering with master: %s' % server_url, 'info')
new_id, new_key = worker_register(server_url, worker_name)
if new_key:
wstate['worker_id'] = new_id
wstate['worker_key'] = new_key
wstate['backoff'] = 10
_log('registered as %s (id: %s)' % (worker_name, new_id), 'info')
return True
else:
_log('registration failed, retrying in %ds' % wstate['backoff'], 'warn')
time.sleep(wstate['backoff'])
wstate['backoff'] = min(wstate['backoff'] * 2, 300)
def wait_for_tor():
"""Wait for Tor to become available, checking every 30 seconds."""
check_interval = 30
while True:
working, tor_ip = check_tor_connectivity(config.torhosts)
if working:
_log('tor recovered: %s (%s)' % (working[0], tor_ip or 'unknown'), 'info')
try:
worker_send_heartbeat(server_url, wstate['worker_key'], True, tor_ip, worker_profiling, num_threads)
except NeedReregister:
do_register()
return working, tor_ip
_log('tor still down, retrying in %ds' % check_interval, 'warn')
try:
worker_send_heartbeat(server_url, wstate['worker_key'], False, None, worker_profiling, num_threads)
except NeedReregister:
do_register()
time.sleep(check_interval)
try:
while True:
# Tor connectivity check
working, tor_ip = check_tor_connectivity(config.torhosts)
if not working:
consecutive_tor_failures += 1
_log('tor down before claiming URLs (consecutive: %d)' % consecutive_tor_failures, 'warn')
try:
worker_send_heartbeat(server_url, wstate['worker_key'], False, None, worker_profiling, num_threads)
except NeedReregister:
do_register()
if consecutive_tor_failures >= 2:
_log('tor appears down, waiting before claiming URLs', 'error')
working, current_tor_ip = wait_for_tor()
consecutive_tor_failures = 0
else:
time.sleep(10)
continue
else:
consecutive_tor_failures = 0
if tor_ip != current_tor_ip:
if current_tor_ip:
_log('tor circuit rotated: %s' % tor_ip, 'info')
current_tor_ip = tor_ip
try:
worker_send_heartbeat(server_url, wstate['worker_key'], True, tor_ip, worker_profiling, num_threads)
except NeedReregister:
do_register()
# Claim URLs from master
try:
url_infos = worker_claim_urls(server_url, wstate['worker_key'], url_batch_size)
except NeedReregister:
do_register()
continue
if not url_infos:
_log('no URLs available, sleeping 30s', 'info')
time.sleep(30)
continue
_log('claimed %d URLs to process' % len(url_infos), 'info')
# Phase 1: Fetch URLs and extract proxies
url_reports = []
all_extracted = [] # list of (addr, proto, confidence, source_url)
for url_info in url_infos:
url = url_info.get('url', '')
last_hash = url_info.get('last_hash')
proto_hint = url_info.get('proto_hint')
fetch_start = time.time()
try:
content = session.fetch(url)
except Exception as e:
_log('%s: fetch error: %s' % (url.split('/')[2] if '/' in url else url, e), 'error')
content = None
fetch_time_ms = int((time.time() - fetch_start) * 1000)
urls_fetched += 1
if not content:
url_reports.append({
'url': url,
'success': False,
'content_hash': None,
'proxy_count': 0,
'fetch_time_ms': fetch_time_ms,
'changed': False,
'error': 'fetch failed',
})
continue
# Detect protocol from URL path
proto = fetch.detect_proto_from_path(url) or proto_hint
# Extract proxies (no filter_known -- workers have no proxydb)
extracted = fetch.extract_proxies(content, filter_known=False, proto=proto)
# Compute hash of extracted proxy list
content_hash = dbs.compute_proxy_list_hash(extracted)
if content_hash and last_hash and content_hash == last_hash:
# Content unchanged
url_reports.append({
'url': url,
'success': True,
'content_hash': content_hash,
'proxy_count': len(extracted),
'fetch_time_ms': fetch_time_ms,
'changed': False,
'error': None,
})
host = url.split('/')[2] if '/' in url else url
_log('%s: unchanged (%d proxies, hash match)' % (host, len(extracted)), 'stale')
continue
# Content changed or first fetch
for addr, pr, conf in extracted:
all_extracted.append((addr, pr, conf, url))
url_reports.append({
'url': url,
'success': True,
'content_hash': content_hash,
'proxy_count': len(extracted),
'fetch_time_ms': fetch_time_ms,
'changed': True,
'error': None,
})
host = url.split('/')[2] if '/' in url else url
_log('%s: %d proxies extracted' % (host, len(extracted)), 'info')
# Report URL health to master
if url_reports:
try:
worker_report_urls(server_url, wstate['worker_key'], url_reports)
except NeedReregister:
do_register()
try:
worker_report_urls(server_url, wstate['worker_key'], url_reports)
except NeedReregister:
_log('still rejected after re-register, discarding url reports', 'error')
# Deduplicate extracted proxies by address
seen = set()
unique_proxies = []
source_map = {} # addr -> first source_url
for addr, pr, conf, source_url in all_extracted:
if addr not in seen:
seen.add(addr)
unique_proxies.append((addr, pr, conf))
source_map[addr] = source_url
proxies_found += len(unique_proxies)
if not unique_proxies:
cycles += 1
time.sleep(1)
continue
_log('testing %d unique proxies' % len(unique_proxies), 'info')
# Phase 2: Test extracted proxies using worker thread pool
pending_states = {}
all_jobs = []
checktypes = config.watchd.checktypes
for addr, pr, conf in unique_proxies:
# Parse ip:port from addr (may contain auth: user:pass@ip:port)
addr_part = addr.split('@')[-1] if '@' in addr else addr
# Handle IPv6 [ipv6]:port
if addr_part.startswith('['):
bracket_end = addr_part.index(']')
ip = addr_part[1:bracket_end]
port = int(addr_part[bracket_end+2:])
else:
ip, port_str = addr_part.rsplit(':', 1)
port = int(port_str)
proto = pr or 'http'
proxy_str = '%s:%d' % (ip, port)
state = proxywatchd.ProxyTestState(
ip, port, proto, 0,
success_count=0, total_duration=0.0,
country=None, mitm=0, consecutive_success=0,
asn=None, oldies=False,
completion_queue=completion_queue,
proxy_full=addr, source_proto=pr
)
pending_states[proxy_str] = state
checktype = random.choice(checktypes)
if checktype == 'judges':
available = proxywatchd.judge_stats.get_available_judges(
list(proxywatchd.judges.keys()))
target = random.choice(available) if available else random.choice(
list(proxywatchd.judges.keys()))
elif checktype == 'ssl':
target = random.choice(proxywatchd.ssl_targets)
elif checktype == 'irc':
target = random.choice(config.servers) if config.servers else 'irc.libera.chat:6667'
else: # head
target = random.choice(list(proxywatchd.regexes.keys()))
job = proxywatchd.TargetTestJob(state, target, checktype)
all_jobs.append(job)
random.shuffle(all_jobs)
for job in all_jobs:
job_queue.put(job, priority=0)
# Wait for completion
completed = 0
timeout_start = time.time()
timeout_seconds = max(config.watchd.timeout * 2 + 30, len(all_jobs) * 0.5)
working_results = []
while completed < len(all_jobs):
try:
state = completion_queue.get(timeout=1)
completed += 1
if state.failcount == 0:
latency_sec = (state.last_latency_ms / 1000.0) if state.last_latency_ms else 0
proxy_addr = state.proxy
if state.auth:
proxy_addr = '%s@%s' % (state.auth, state.proxy)
working_results.append({
'ip': state.ip,
'port': state.port,
'proto': state.proto,
'source_proto': state.source_proto,
'latency': round(latency_sec, 3),
'exit_ip': state.exit_ip,
'source_url': source_map.get(proxy_addr) or source_map.get(state.proxy, ''),
})
if completed % 50 == 0 or completed == len(all_jobs):
_log('tested %d/%d proxies (%d working)' % (
completed, len(all_jobs), len(working_results)), 'info')
except Queue.Empty:
if time.time() - timeout_start > timeout_seconds:
_log('test timeout, %d/%d completed' % (completed, len(all_jobs)), 'warn')
break
continue
proxies_working += len(working_results)
# Report working proxies to master
if working_results:
try:
processed = worker_report_proxies(server_url, wstate['worker_key'], working_results)
except NeedReregister:
do_register()
try:
processed = worker_report_proxies(server_url, wstate['worker_key'], working_results)
except NeedReregister:
_log('still rejected after re-register, discarding proxy reports', 'error')
processed = 0
_log('reported %d working proxies (submitted %d)' % (len(working_results), processed), 'info')
cycles += 1
time.sleep(1)
except KeyboardInterrupt:
elapsed = time.time() - start_time
_log('worker V2 stopping...', 'info')
session.close()
for wt in threads:
wt.stop()
for wt in threads:
wt.term()
_log('worker V2 stopped after %s' % format_duration(int(elapsed)), 'info')
_log(' cycles: %d' % cycles, 'info')
_log(' urls fetched: %d' % urls_fetched, 'info')
_log(' proxies found: %d' % proxies_found, 'info')
_log(' proxies working: %d' % proxies_working, 'info')
def main():
"""Main entry point."""
global config
@@ -718,7 +1191,12 @@ def main():
else:
sys.exit(1)
# Worker mode: connect to master server instead of running locally
# V2 worker mode: URL-driven discovery
if config.args.worker_v2:
worker_v2_main(config)
return
# V1 worker mode: connect to master server instead of running locally
if config.args.worker or config.args.register:
worker_main(config)
return
@@ -747,8 +1225,14 @@ def main():
watcherd = None
# Start httpd independently when watchd is disabled
if config.httpd.enabled:
from httpd import ProxyAPIServer
from httpd import ProxyAPIServer, configure_url_scoring
import network_stats
configure_url_scoring(
config.ppf.checktime,
config.ppf.perfail_checktime,
config.ppf.max_fail,
config.ppf.list_max_age_days
)
def httpd_stats_provider():
"""Stats provider for httpd-only mode (scraping without testing)."""
@@ -780,6 +1264,7 @@ def main():
config.watchd.database,
stats_provider=httpd_stats_provider,
profiling=profiling,
url_database=config.ppf.database,
)
httpd_server.start()
@@ -838,9 +1323,6 @@ def main():
else:
_log('handing %d job(s) to %d thread(s)' % ( len(rows), config.ppf.threads ), 'ppf')
_proxylist = [ '%s://%s' % (p[0], p[1]) for p in proxydb.execute("SELECT proto,proxy from proxylist where failed=0 AND tested IS NOT NULL AND proto IN ('http','socks4','socks5')").fetchall() ]
if not _proxylist: _proxylist = None
for thread in threads:
if thread.status == 'ok':
url, proxylist, stale_count, error, retrievals, content_type, proxies_added, execute = thread.retrieve()
@@ -857,6 +1339,9 @@ def main():
threads = [ thread for thread in threads if thread.is_alive() ]
if len(threads) < config.ppf.threads and rows:
# Only query proxydb when actually starting a new thread (reduces GIL blocking)
_proxylist = [ '%s://%s' % (p[0], p[1]) for p in proxydb.execute("SELECT proto,proxy from proxylist where failed=0 AND tested IS NOT NULL AND proto IN ('http','socks4','socks5')").fetchall() ]
if not _proxylist: _proxylist = None
p = random.sample(_proxylist, min(5, len(_proxylist))) if _proxylist else None
row = random.choice(rows)
urldb.execute('UPDATE uris SET check_time=? where url=?', (time.time(), row[0]))

View File

@@ -142,6 +142,20 @@ def is_valid_ip(ip_str):
except (ValueError, AttributeError):
return False
def is_public_ip(ip_str):
"""Validate IP is a public, globally routable address."""
if not is_valid_ip(ip_str):
return False
parts = [int(p) for p in ip_str.split('.')]
if parts[0] == 0: return False # 0.0.0.0/8
if parts[0] == 10: return False # 10.0.0.0/8
if parts[0] == 127: return False # 127.0.0.0/8
if parts[0] == 169 and parts[1] == 254: return False # link-local
if parts[0] == 172 and 16 <= parts[1] <= 31: return False # 172.16/12
if parts[0] == 192 and parts[1] == 168: return False # 192.168/16
if parts[0] >= 224: return False # multicast + reserved
return True
# Pattern for header echo - if X-Forwarded-For or Via present, proxy reveals chain
HEADER_REVEAL_PATTERN = r'(X-Forwarded-For|Via|X-Real-Ip|Forwarded)'
@@ -285,12 +299,12 @@ class ProxyTestState(object):
'asn', 'isoldies', 'completion_queue', 'lock', 'results', 'completed',
'evaluated', 'last_latency_ms', 'exit_ip', 'reveals_headers',
'last_fail_category', 'original_failcount', 'had_ssl_test', 'ssl_success',
'cert_error'
'cert_error', 'source_proto', 'protos_working'
)
def __init__(self, ip, port, proto, failcount, success_count, total_duration,
country, mitm, consecutive_success, asn=None, oldies=False,
completion_queue=None, proxy_full=None):
completion_queue=None, proxy_full=None, source_proto=None):
self.ip = ip
self.port = int(port)
self.proxy = '%s:%s' % (ip, port)
@@ -326,6 +340,9 @@ class ProxyTestState(object):
self.had_ssl_test = False
self.ssl_success = False
self.cert_error = False
# Protocol fingerprinting
self.source_proto = source_proto
self.protos_working = None
def record_result(self, success, proto=None, duration=0, srv=None, tor=None, ssl=None, category=None, exit_ip=None, reveals_headers=None):
"""Record a single target test result. Thread-safe.
@@ -390,10 +407,20 @@ class ProxyTestState(object):
self.evaluated = True
self.checktime = int(time.time())
successes = [r for r in self.results if r['success']]
failures = [r for r in self.results if not r['success']]
# Filter out judge_block results (inconclusive, neither pass nor fail)
real_results = [r for r in self.results if r.get('category') != 'judge_block']
successes = [r for r in real_results if r['success']]
failures = [r for r in real_results if not r['success']]
num_success = len(successes)
_dbg('evaluate: %d success, %d fail, results=%d' % (num_success, len(failures), len(self.results)), self.proxy)
judge_blocks = len(self.results) - len(real_results)
_dbg('evaluate: %d success, %d fail, %d judge_block, results=%d' % (
num_success, len(failures), judge_blocks, len(self.results)), self.proxy)
# All results were judge blocks: inconclusive, preserve current state
if not real_results and self.results:
_dbg('all results inconclusive (judge_block), no state change', self.proxy)
self.failcount = self.original_failcount
return (self.original_failcount == 0, None)
# Determine dominant failure category
fail_category = None
@@ -432,7 +459,22 @@ class ProxyTestState(object):
if config.watchd.debug:
_log('ASN lookup failed for %s: %s' % (self.ip, e), 'debug')
self.proto = last_good['proto']
# Collect all distinct working protocols
working_protos = set()
for s in successes:
if s.get('proto'):
working_protos.add(s['proto'])
if working_protos:
self.protos_working = ','.join(sorted(working_protos))
# Pick most specific protocol: socks5 > socks4 > http
for best in ('socks5', 'socks4', 'http'):
if best in working_protos:
self.proto = best
break
else:
self.proto = last_good['proto']
else:
self.proto = last_good['proto']
self.failcount = 0
# Only reset mitm after 3 consecutive clean successes (not on first success)
# and only if this test didn't detect MITM
@@ -529,6 +571,12 @@ class TargetTestJob(object):
recv = sock.recv(-1)
_sample_dbg('RECV: %d bytes from %s, first 80: %r' % (len(recv), srv, recv[:80]), self.proxy_state.proxy)
# Validate HTTP response for non-IRC checks
if self.checktype != 'irc' and not recv.startswith('HTTP/'):
_dbg('not an HTTP response, failing (first 40: %r)' % recv[:40], self.proxy_state.proxy)
self.proxy_state.record_result(False, category='bad_response')
return
# Select regex based on check type (or fallback target)
if 'check.torproject.org' in srv:
# Tor API fallback (judge using torproject.org)
@@ -553,7 +601,7 @@ class TargetTestJob(object):
reveals_headers = None
if self.checktype == 'judges' or 'check.torproject.org' in srv:
ip_match = re.search(IP_PATTERN, recv)
if ip_match and is_valid_ip(ip_match.group(0)):
if ip_match and is_public_ip(ip_match.group(0)):
exit_ip = ip_match.group(0)
if self.checktype == 'judges' and 'check.torproject.org' not in srv:
# Check for header echo judge (elite detection)
@@ -572,17 +620,14 @@ class TargetTestJob(object):
# Check if judge is blocking us (not a proxy failure)
if self.checktype == 'judges' and JUDGE_BLOCK_RE.search(recv):
judge_stats.record_block(srv)
# Judge block = proxy worked, we got HTTP response, just no IP
# Count as success without exit_ip
block_elapsed = time.time() - duration
_dbg('judge BLOCK detected, counting as success', self.proxy_state.proxy)
# Judge block = inconclusive, not a pass or fail
_dbg('judge BLOCK detected, skipping (neutral)', self.proxy_state.proxy)
self.proxy_state.record_result(
True, proto=proto, duration=block_elapsed,
srv=srv, tor=tor, ssl=is_ssl, exit_ip=None,
reveals_headers=None
False, category='judge_block', proto=proto,
srv=srv, tor=tor, ssl=is_ssl
)
if config.watchd.debug:
_log('judge %s challenged proxy %s (counted as success)' % (
_log('judge %s challenged proxy %s (neutral, skipped)' % (
srv, self.proxy_state.proxy), 'debug')
else:
_dbg('FAIL: no match, no block', self.proxy_state.proxy)
@@ -598,6 +643,45 @@ class TargetTestJob(object):
finally:
sock.disconnect()
def _build_proto_order(self):
"""Build smart protocol test order based on available intelligence.
Priority:
1. Previously successful proto (if set)
2. Source-detected proto (if different, confidence >= 60)
3. Remaining protos in default order: socks5, socks4, http
For failing proxies (failcount > 0 and proto known), only retest
with the known proto to save resources.
"""
ps = self.proxy_state
default_order = ['socks5', 'socks4', 'http']
# Known proto from previous test: only retest that
if ps.proto is not None:
# For failing proxies, skip multi-proto discovery
if ps.failcount > 0:
return [ps.proto]
# For working proxies, lead with known proto but try others
protos = [ps.proto]
# Add source hint if different
if ps.source_proto and ps.source_proto != ps.proto:
protos.append(ps.source_proto)
# Fill remaining
for p in default_order:
if p not in protos:
protos.append(p)
return protos
# Unknown proto: use source hint if available
protos = []
if ps.source_proto:
protos.append(ps.source_proto)
for p in default_order:
if p not in protos:
protos.append(p)
return protos
def _connect_and_test(self):
"""Connect to target through the proxy and send test packet.
@@ -615,18 +699,17 @@ class TargetTestJob(object):
_log('FIRST TEST: proxy=%s target=%s check=%s ssl_first=%s' % (
ps.proxy, self.target_srv, self.checktype, config.watchd.ssl_first), 'info')
protos = ['http', 'socks5', 'socks4'] if ps.proto is None else [ps.proto]
protos = self._build_proto_order()
pool = connection_pool.get_pool()
# Phase 1: SSL handshake (if ssl_first enabled)
if config.watchd.ssl_first:
# Phase 1: SSL handshake (if ssl_first enabled or SSL-only mode)
if config.watchd.ssl_first or self.checktype == 'none':
result = self._try_ssl_handshake(protos, pool)
if result is not None:
return result # SSL succeeded or MITM detected
# SSL failed for all protocols
if config.watchd.ssl_only:
# ssl_only mode: skip secondary check, mark as failed
_dbg('SSL failed, ssl_only mode, skipping secondary check', ps.proxy)
if config.watchd.ssl_only or self.checktype == 'none':
_dbg('SSL failed, no secondary check', ps.proxy)
return (None, None, 0, None, None, 1, 0, 'ssl_only')
_dbg('SSL failed, trying secondary check: %s' % self.checktype, ps.proxy)
@@ -882,7 +965,15 @@ class WorkerThread():
nao = time.time()
# Assign worker ID for connection pool affinity
job.worker_id = self.id
job.run()
try:
job.run()
except Exception as e:
# Ensure state completes on unexpected exceptions (prevents memory leak)
_log('job exception: %s' % e, 'error')
try:
job.proxy_state.record_result(False, category='exception')
except Exception:
pass # State may already be completed
spent = time.time() - nao
job_count += 1
duration_total += spent
@@ -1251,7 +1342,7 @@ class Proxywatchd():
# Build due condition using new schedule formula
due_sql, due_params = _build_due_sql()
q = '''SELECT ip,port,proto,failed,success_count,total_duration,country,mitm,
consecutive_success,asn,proxy FROM proxylist WHERE %s ORDER BY RANDOM()''' % due_sql
consecutive_success,asn,proxy,source_proto FROM proxylist WHERE %s ORDER BY RANDOM()''' % due_sql
_dbg('fetch_rows: working=%d fail_interval=%d backoff=%s max_fail=%d' % (
config.watchd.working_checktime, config.watchd.fail_retry_interval,
config.watchd.fail_retry_backoff, config.watchd.max_fail))
@@ -1271,7 +1362,7 @@ class Proxywatchd():
now = time.time()
oldies_max = config.watchd.max_fail + round(config.watchd.max_fail / 2)
q_oldies = '''SELECT ip,port,proto,failed,success_count,total_duration,country,
mitm,consecutive_success,asn,proxy FROM proxylist
mitm,consecutive_success,asn,proxy,source_proto FROM proxylist
WHERE failed >= ? AND failed < ? AND (tested + ?) < ?
ORDER BY RANDOM()'''
rows = db.execute(q_oldies, (config.watchd.max_fail, oldies_max,
@@ -1292,7 +1383,11 @@ class Proxywatchd():
# Build target pools for each checktype
target_pools = {}
for ct in checktypes:
if ct == 'irc':
if ct == 'none':
# SSL-only mode: use ssl_targets as placeholder
target_pools[ct] = ssl_targets
_dbg('target_pool[none]: SSL-only mode, %d ssl targets' % len(ssl_targets))
elif ct == 'irc':
target_pools[ct] = config.servers
_dbg('target_pool[irc]: %d servers' % len(config.servers))
elif ct == 'judges':
@@ -1314,12 +1409,12 @@ class Proxywatchd():
for row in rows:
# create shared state for this proxy
# row: ip, port, proto, failed, success_count, total_duration,
# country, mitm, consecutive_success, asn, proxy
# country, mitm, consecutive_success, asn, proxy, source_proto
state = ProxyTestState(
row[0], row[1], row[2], row[3], row[4], row[5],
row[6], row[7], row[8], asn=row[9],
oldies=self.isoldies, completion_queue=self.completion_queue,
proxy_full=row[10]
proxy_full=row[10], source_proto=row[11]
)
new_states.append(state)
@@ -1424,7 +1519,7 @@ class Proxywatchd():
dead_count += 1
args.append((effective_failcount, job.checktime, 1, job.country, job.proto,
job.success_count, job.total_duration, job.mitm,
job.consecutive_success, job.asn, job.proxy))
job.consecutive_success, job.asn, job.protos_working, job.proxy))
success_rate = (float(sc) / len(self.collected)) * 100
ret = True
@@ -1438,7 +1533,7 @@ class Proxywatchd():
if job.failcount == 0:
args.append((job.failcount, job.checktime, 1, job.country, job.proto,
job.success_count, job.total_duration, job.mitm,
job.consecutive_success, job.asn, job.proxy))
job.consecutive_success, job.asn, job.protos_working, job.proxy))
if job.last_latency_ms is not None:
latency_updates.append((job.proxy, job.last_latency_ms))
ret = False
@@ -1455,7 +1550,7 @@ class Proxywatchd():
if job.failcount == 0 and job.exit_ip]
with self._db_context() as db:
query = 'UPDATE proxylist SET failed=?,tested=?,dronebl=?,country=?,proto=?,success_count=?,total_duration=?,mitm=?,consecutive_success=?,asn=? WHERE proxy=?'
query = 'UPDATE proxylist SET failed=?,tested=?,dronebl=?,country=?,proto=?,success_count=?,total_duration=?,mitm=?,consecutive_success=?,asn=?,protos_working=? WHERE proxy=?'
db.executemany(query, args)
# Batch update latency metrics for successful proxies
@@ -1687,7 +1782,7 @@ class Proxywatchd():
# Start HTTP API server if enabled
if config.httpd.enabled:
from httpd import ProxyAPIServer, configure_schedule
from httpd import ProxyAPIServer, configure_schedule, configure_url_scoring
# Pass schedule config to httpd module
configure_schedule(
config.watchd.working_checktime,
@@ -1695,11 +1790,18 @@ class Proxywatchd():
config.watchd.fail_retry_backoff,
config.watchd.max_fail
)
configure_url_scoring(
config.ppf.checktime,
config.ppf.perfail_checktime,
config.ppf.max_fail,
config.ppf.list_max_age_days
)
self.httpd_server = ProxyAPIServer(
config.httpd.listenip,
config.httpd.port,
config.watchd.database,
stats_provider=self.get_runtime_stats
stats_provider=self.get_runtime_stats,
url_database=config.ppf.database,
)
self.httpd_server.start()
@@ -1734,27 +1836,32 @@ class Proxywatchd():
sleeptime -= 1
continue
# check if job queue is empty (work-stealing: threads pull as needed)
if self.job_queue.empty():
# Skip job processing when threads=0 (master-only mode)
if config.watchd.threads > 0:
# check if job queue is empty (work-stealing: threads pull as needed)
if self.job_queue.empty():
self.collect_work()
if not self.submit_collected() and self.tor_safeguard:
_log("zzZzZzzZ sleeping 1 minute(s) due to tor issues", "watchd")
sleeptime = 60
else:
job_count = self.prepare_jobs()
if job_count == 0:
# no jobs available, wait before checking again
sleeptime = 10
if not self.in_background: # single_thread scenario
self.threads[0].workloop()
self.collect_work()
if not self.submit_collected() and self.tor_safeguard:
_log("zzZzZzzZ sleeping 1 minute(s) due to tor issues", "watchd")
sleeptime = 60
else:
job_count = self.prepare_jobs()
if job_count == 0:
# no jobs available, wait before checking again
sleeptime = 10
if not self.in_background: # single_thread scenario
self.threads[0].workloop()
self.collect_work()
if len(self.collected) > self.submit_after:
if not self.submit_collected() and self.tor_safeguard:
_log("zzZzZzzZ sleeping 1 minute(s) due to tor issues", "watchd")
sleeptime = 60
if len(self.collected) > self.submit_after:
if not self.submit_collected() and self.tor_safeguard:
_log("zzZzZzzZ sleeping 1 minute(s) due to tor issues", "watchd")
sleeptime = 60
else:
# Master-only mode: sleep to avoid busy loop
sleeptime = 10
# Update rate history for sparklines
self.stats.update_history()

View File

@@ -107,11 +107,9 @@ regexes = {
'www.twitter.com': 'x-connection-hash',
't.co': 'x-connection-hash',
'www.msn.com': 'x-aspnetmvc-version',
'www.bing.com': 'p3p',
'www.ask.com': 'x-served-by',
'www.hotmail.com': 'x-msedge-ref',
'www.bbc.co.uk': 'x-bbc-edge-cache-status',
'www.skype.com': 'X-XSS-Protection',
'www.alibaba.com': 'object-status',
'www.mozilla.org': 'cf-ray',
'www.cloudflare.com': 'cf-ray',
@@ -121,7 +119,6 @@ regexes = {
'www.netflix.com': 'X-Netflix.proxy.execution-time',
'www.amazon.de': 'x-amz-cf-id',
'www.reuters.com': 'x-amz-cf-id',
'www.ikea.com': 'x-frame-options',
'www.twitpic.com': 'timing-allow-origin',
'www.digg.com': 'cf-request-id',
'www.wikia.com': 'x-served-by',
@@ -133,8 +130,6 @@ regexes = {
'www.yelp.com': 'x-timer',
'www.ebay.com': 'x-envoy-upstream-service-time',
'www.wikihow.com': 'x-c',
'www.archive.org': 'referrer-policy',
'www.pandora.tv': 'X-UA-Compatible',
'www.w3.org': 'x-backend',
'www.time.com': 'x-amz-cf-pop'
}