Compare commits

...

31 Commits

Author SHA1 Message Date
Username
361b70ace9 dbs: expand seed sources to 111 URLs
All checks were successful
CI / validate (push) Successful in 20s
Add 21 new proxy source URLs: missing protocol variants from
existing repos, 4 new GitHub repos, openproxylist.xyz and
spys.me APIs, 5 web scraper targets, 2 Telegram channels.
2026-02-22 17:14:47 +01:00
Username
9c7b7ba070 add compose-based test runner for Python 2.7
All checks were successful
CI / validate (push) Successful in 20s
Dockerfile.test builds production image with pytest baked in.
compose.test.yml mounts source as volume for fast iteration.
Usage: podman-compose -f compose.test.yml run --rm test
2026-02-22 15:38:00 +01:00
Username
0669b38782 docs: update roadmap and tasklist with completed items 2026-02-22 15:37:54 +01:00
Username
6130b196b1 dbs: add SOCKS5-specific proxy sources
New sources: zevtyardt, UptimerBot, Anonym0usWork1221, ErcinDedeoglu,
proxy-list.download API, sockslist.us, mtpro.xyz, proxy-tools.com.
Addresses structural SOCKS5 coverage gap (78% HTTP in pool).
2026-02-22 15:37:50 +01:00
Username
ce2d28ab07 httpd: cache sqlite connections per-greenlet, lazy-load ASN, sharpen URL scoring
- threading.local() caches proxy_db and url_db per greenlet (eliminates
  ~2.7k redundant sqlite3.connect + PRAGMA calls per session on odin)
- ASN database now lazy-loaded on first lookup (defers ~3.6s startup cost)
- URL claim error penalty increased from 0.3*error(cap 2) to 0.5*error(cap 4)
  and stale penalty from 0.1*stale(cap 1) to 0.2*stale(cap 1.5) to reduce
  worker cycles wasted on erroring URLs (71% of 7,158 URLs erroring)
2026-02-22 15:37:43 +01:00
Username
93eb395727 docs: update roadmap, todo, and add tasklist
All checks were successful
CI / validate (push) Successful in 21s
Restructure roadmap into phases. Clean up todo as intake buffer.
Add execution tasklist with prioritized items.
2026-02-22 13:58:37 +01:00
Username
f9d237fe0d httpd: add protocol-aware source weighting
Boost SOCKS sources in claim_urls scoring when SOCKS proxies
are underrepresented (<40% of pool). Dynamic 0-1.0 boost based
on current protocol distribution.
2026-02-22 13:58:32 +01:00
Username
0f1fe981ef dbs: expand seed sources from 37 to 100+
Add GitHub raw lists, API endpoints, web scrapers, and Telegram
channels. Extra SOCKS5 sources to address protocol imbalance.
2026-02-22 13:58:26 +01:00
Username
0a53e4457f rocksock: skip shutdown on never-connected sockets
Track connection state with _connected flag. Only call
socket.shutdown() on successfully connected sockets.
Saves ~39s/session on workers (974k disconnect calls).
2026-02-22 13:58:20 +01:00
Username
2ea7eb41b7 tests: add extraction short-circuit and integration tests
All checks were successful
CI / validate (push) Successful in 19s
Cover short-circuit guards, table/JSON/hint extraction,
and full extract_proxies() integration (82 tests, all passing).
2026-02-22 13:50:34 +01:00
Username
98b232f3d3 fetch: add short-circuit guards to extraction functions
Skip expensive regex scans when content lacks required markers:
- extract_auth_proxies: skip if no '@' in content
- extract_proxies_from_table: skip if no '<table' tag
- extract_proxies_from_json: skip if no '{' or '['
- Hoist table regexes to module-level precompiled constants
2026-02-22 13:50:29 +01:00
Username
b300afed6c httpd: expose URL pipeline stats in /api/stats
All checks were successful
CI / validate (push) Successful in 19s
Add urls section with total/healthy/dead/erroring counts, fetch
activity, productive source count, aggregate yield, and top sources
ranked by working_ratio.
2026-02-22 11:53:57 +01:00
Username
eeadf656f5 httpd: add ASN enrichment for worker-reported proxies
All checks were successful
CI / validate (push) Successful in 20s
Load pyasn database in httpd and look up ASN when workers report
working proxies. Falls back to a pure-Python ipasn.dat reader when
the pyasn C extension is unavailable (Python 2.7 containers).
Backfills existing proxies with null ASN on startup.
2026-02-22 11:30:01 +01:00
Username
7ae0ac0c26 ppf: add periodic re-seeding of proxy source URLs
Seed sources that error out are permanently excluded from claiming.
Over time this starves the pipeline. Re-seed every 6 hours with
error reset for exhausted sources, preventing the starvation loop
that caused the previous outage.
2026-02-22 11:18:45 +01:00
Username
35285a84bf watchd: update last_seen on successful proxy verification
All checks were successful
CI / validate (push) Successful in 20s
Serving endpoints filter by last_seen >= now - 3600, but watchd
never set last_seen -- only worker reports did. This caused the
API to return 0 proxies despite 70+ passing verification.
2026-02-22 10:04:28 +01:00
Username
438e956be9 httpd: log report-proxies 500 errors
All checks were successful
CI / validate (push) Successful in 19s
2026-02-20 09:30:53 +01:00
Username
5dd9060c2b ppf: add periodic heartbeat and proxy reporting during test phase
All checks were successful
CI / validate (push) Successful in 19s
2026-02-19 18:17:19 +01:00
Username
304cdb3b4c comboparse: replace SafeConfigParser with ConfigParser
All checks were successful
CI / validate (push) Successful in 19s
2026-02-18 21:01:10 +01:00
Username
9f926f4ab5 ci: consolidate jobs, expand import check, add yaml lint
Some checks failed
CI / validate (push) Failing after 19s
2026-02-18 20:59:49 +01:00
Username
7705ef54f6 ci: run syntax-check in container (fix missing git)
All checks were successful
CI / syntax-check (push) Successful in 17s
CI / memory-leak-check (push) Successful in 16s
2026-02-18 18:34:54 +01:00
Username
f5b9037763 docs: update roadmap with completed target health tracking
Some checks failed
CI / syntax-check (push) Failing after 1s
CI / memory-leak-check (push) Successful in 17s
2026-02-18 18:23:58 +01:00
Username
56accde90d httpd: add mitm field to JSON proxy list endpoints
Some checks failed
CI / syntax-check (push) Failing after 1s
CI / memory-leak-check (push) Successful in 16s
2026-02-18 18:21:58 +01:00
Username
e985f52fe6 watchd: add target health tracking for all target pools
Generalizes JudgeStats into TargetStats with cooldown-based filtering
for head targets, SSL targets, and IRC servers. Targets that repeatedly
block or fail are temporarily avoided, preventing unfair proxy failures
when a target goes down. Exposes per-pool health via /api/stats.
2026-02-18 18:21:53 +01:00
Username
3e5c486e7e watchd: adaptive ssl for secondary checks
Some checks failed
CI / syntax-check (push) Failing after 1s
CI / memory-leak-check (push) Successful in 18s
Use SSL error reason from primary handshake to decide whether
the secondary check should use SSL or plain HTTP. Protocol errors
(proxy can't TLS) fall back to plain HTTP; other failures retry
with SSL sans cert verification.
2026-02-18 09:49:40 +01:00
Username
727ed86692 compose: add k8s-file logging driver to master
Some checks failed
CI / syntax-check (push) Failing after 1s
CI / memory-leak-check (push) Successful in 17s
2026-02-18 08:35:04 +01:00
Username
821ade95ef worker: add local proxy test cache
Some checks failed
CI / syntax-check (push) Failing after 1s
CI / memory-leak-check (push) Successful in 16s
Skip redundant proxy tests across URL batches using a memory-only
TTL cache (default 300s, configurable via worker.cache_ttl).
2026-02-18 01:37:09 +01:00
Username
01b91836c4 tools: fix ansible output filter in ppf-db
Some checks failed
CI / syntax-check (push) Failing after 1s
CI / memory-leak-check (push) Successful in 16s
2026-02-18 01:02:45 +01:00
Username
04fb362181 tools: add ppf-status cluster overview 2026-02-18 01:02:42 +01:00
Username
304830e382 watchd: delete proxies immediately on max_fail instead of marking stale
Some checks failed
CI / syntax-check (push) Failing after 1s
CI / memory-leak-check (push) Successful in 17s
2026-02-18 00:50:00 +01:00
Username
752ef359b5 docs: update odin role to SSL-only verification
Some checks failed
CI / syntax-check (push) Failing after 1s
CI / memory-leak-check (push) Successful in 17s
2026-02-18 00:46:51 +01:00
Username
af6e27bd77 config: allow ppf.threads = 0 to disable URL cycling
Some checks failed
CI / syntax-check (push) Failing after 1s
CI / memory-leak-check (push) Successful in 17s
2026-02-18 00:33:16 +01:00
21 changed files with 1429 additions and 385 deletions

View File

@@ -8,11 +8,14 @@ on:
workflow_dispatch:
jobs:
syntax-check:
validate:
runs-on: dotfiles
container:
image: python:3-slim
steps:
- name: Checkout
run: |
apt-get update -qq && apt-get install -y -qq git >/dev/null
git clone --depth 1 --branch "${GITHUB_REF_NAME}" \
"https://oauth2:${{ github.token }}@${GITHUB_SERVER_URL#https://}/${GITHUB_REPOSITORY}.git" .
@@ -30,70 +33,30 @@ jobs:
done
exit $failed
memory-leak-check:
runs-on: dotfiles
container:
image: python:3-slim
steps:
- name: Checkout
- name: Import validation
run: |
apt-get update && apt-get install -y git
git clone --depth 1 --branch "${GITHUB_REF_NAME}" \
"https://oauth2:${{ github.token }}@${GITHUB_SERVER_URL#https://}/${GITHUB_REPOSITORY}.git" .
- name: Check for memory leak patterns
run: |
echo "Scanning for common memory leak patterns..."
echo "Verifying module imports..."
failed=0
# Check for unbounded list/dict growth without limits
echo "Checking for unbounded collections..."
for f in ppf.py proxywatchd.py scraper.py httpd.py; do
if [ -f "$f" ]; then
# Look for .append() without corresponding size limits
if grep -n "\.append(" "$f" | grep -v "# bounded" | grep -v "_max\|max_\|limit\|[:]\|pop(" > /tmp/unbounded 2>/dev/null; then
count=$(wc -l < /tmp/unbounded)
if [ "$count" -gt 20 ]; then
echo "WARN $f: $count potential unbounded appends"
fi
fi
for mod in comboparse config dbs job misc mysqlite network_stats stats translations; do
if python3 -c "import sys; sys.path.insert(0,'.'); import $mod; print('OK $mod')"; then
:
else
echo "FAIL $mod"
failed=1
fi
done
exit $failed
# Check for circular references
echo "Checking for potential circular references..."
for f in ppf.py proxywatchd.py scraper.py httpd.py connection_pool.py; do
if [ -f "$f" ]; then
if grep -n "self\.\w* = self" "$f" 2>/dev/null; then
echo "WARN $f: potential self-reference"
fi
fi
done
# Check for __del__ methods (often problematic)
echo "Checking for __del__ methods..."
for f in *.py; do
if grep -n "def __del__" "$f" 2>/dev/null; then
echo "WARN $f: has __del__ method (may cause leaks)"
fi
done
# Check that gc is imported where needed
echo "Checking gc module usage..."
for f in proxywatchd.py httpd.py; do
if [ -f "$f" ]; then
if ! grep -q "^import gc" "$f" && ! grep -q "^from gc" "$f"; then
echo "INFO $f: gc module not imported"
fi
fi
done
echo "Memory leak pattern scan complete"
- name: Static import check
- name: YAML lint
run: |
echo "Verifying imports..."
python3 -c "import sys; sys.path.insert(0,'.'); import config; print('OK config')" || echo "FAIL config"
python3 -c "import sys; sys.path.insert(0,'.'); import misc; print('OK misc')" || echo "FAIL misc"
python3 -c "import sys; sys.path.insert(0,'.'); import mysqlite; print('OK mysqlite')" || echo "FAIL mysqlite"
echo "Checking YAML files for tabs..."
failed=0
for f in compose.master.yml compose.worker.yml .gitea/workflows/ci.yml; do
if grep -qP '\t' "$f"; then
echo "FAIL $f: contains tabs"
failed=1
else
echo "OK $f"
fi
done
exit $failed

View File

@@ -6,7 +6,7 @@
┌──────────┬─────────────┬────────────────────────────────────────────────────────┐
│ Host │ Role │ Notes
├──────────┼─────────────┼────────────────────────────────────────────────────────┤
│ odin │ Master │ API server, serves URLs to workers, port 8081
│ odin │ Master │ API server + SSL-only proxy verification, port 8081
│ cassius │ Worker │ Tests proxies, reports to master via WireGuard
│ edge │ Worker │ Tests proxies, reports to master via WireGuard
│ sentinel │ Worker │ Tests proxies, reports to master via WireGuard
@@ -15,7 +15,7 @@
### Role Separation
- **Odin (Master)**: API server only. No proxy testing, no URL cycling. Serves `/api/claim-urls` to workers, receives results. Local Tor only.
- **Odin (Master)**: API server + SSL-only proxy verification (10 threads). No URL cycling (workers handle it via `/api/claim-urls`). Local Tor only.
- **Workers**: All URL fetching (via `/api/claim-urls`) and proxy testing. Each uses only local Tor (127.0.0.1:9050).
## CRITICAL: Directory Structure Differences
@@ -42,6 +42,7 @@ tools/
ppf-logs view container logs
ppf-service manage containers (status/start/stop/restart)
ppf-db database operations (stats/purge-proxies/vacuum)
ppf-status cluster overview (containers, workers, queue)
playbooks/
deploy.yml ansible playbook (sync, compose, restart)
inventory.ini hosts with WireGuard IPs + SSH key
@@ -107,6 +108,13 @@ ppf-db purge-proxies # stop odin, delete all proxies, restart
ppf-db vacuum # reclaim disk space
```
### Cluster Status
```bash
ppf-status # full overview: containers, DB, workers, queue
ppf-status --json # raw JSON from odin API
```
### Direct Ansible (for operations not covered by tools)
Use the toolkit inventory for ad-hoc commands over WireGuard:
@@ -148,7 +156,11 @@ ansible -i $INV workers -m lineinfile \
tor_hosts = 127.0.0.1:9050 # Local Tor ONLY
[watchd]
threads = 0 # NO proxy testing
threads = 10 # SSL-only verification of worker-reported proxies
timeout = 7
checktype = none # No secondary check
ssl_first = 1
ssl_only = 1
database = data/proxies.sqlite
[ppf]

29
Dockerfile.test Normal file
View File

@@ -0,0 +1,29 @@
FROM python:2.7-slim
WORKDIR /app
RUN sed -i 's/deb.debian.org/archive.debian.org/g' /etc/apt/sources.list && \
sed -i 's/security.debian.org/archive.debian.org/g' /etc/apt/sources.list && \
sed -i '/buster-updates/d' /etc/apt/sources.list && \
echo 'deb http://archive.debian.org/debian-security buster/updates main' >> /etc/apt/sources.list && \
apt-get update && \
apt-get upgrade -y && \
apt-get install -y --no-install-recommends gcc libc-dev && \
rm -rf /var/lib/apt/lists/*
RUN pip install --upgrade "pip<21" "setuptools<45" "wheel<0.38"
COPY requirements.txt .
RUN pip install -r requirements.txt || true
RUN pip install pytest
RUN mkdir -p /app/data && \
python -c "import pyasn" 2>/dev/null && \
pyasn_util_download.py --latest && \
pyasn_util_convert.py --single rib.*.bz2 /app/data/ipasn.dat && \
rm -f rib.*.bz2 || \
echo "pyasn database setup skipped"
RUN apt-get purge -y gcc libc-dev && apt-get autoremove -y || true
CMD ["python", "-m", "pytest", "tests/", "-v", "--tb=short"]

View File

@@ -227,6 +227,7 @@ ppf-deploy --check # dry run with diff
ppf-logs [node] # view container logs (-f to follow)
ppf-service <cmd> [nodes...] # status / start / stop / restart
ppf-db <cmd> # stats / purge-proxies / vacuum
ppf-status # cluster overview (containers, workers, queue)
```
See `--help` on each tool.

View File

@@ -1,65 +1,100 @@
# PPF Project Roadmap
# PPF Roadmap
## Project Purpose
PPF (Proxy Fetcher) is a Python 2 proxy scraping and validation framework designed to:
1. **Discover** proxy addresses by crawling websites and search engines
2. **Validate** proxies through multi-target testing via Tor
3. **Maintain** a database of working proxies with protocol detection (SOCKS4/SOCKS5/HTTP)
## Architecture Overview
## Architecture
```
┌─────────────────────────────────────────────────────────────────────────────┐
PPF Architecture
├─────────────────────────────────────────────────────────────────────────────┤
┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ scraper.py │ │ ppf.py │ │proxywatchd │ │
│ │ │ │ │
│ Searx query │───>│ URL harvest │───>│ Proxy test
│ URL finding │ │ Proxy extract│ │ Validation │
─────────────┘ └───────────── ─────────────┘ │
v v v
│ ┌─────────────────────────────────────────────────────────────────┐
│ │ SQLite Databases │ │
│ │ uris.db (URLs) proxies.db (proxy list) │ │
│ └─────────────────────────────────────────────────────────────────┘ │
│ │
│ ┌─────────────────────────────────────────────────────────────────┐ │
│ │ Network Layer │ │
│ │ rocksock.py ─── Tor SOCKS ─── Test Proxy ─── Target Server │ │
│ └─────────────────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────────────┘
──────────────────────────────────────────┐
Odin (Master)
│ httpd.py ─ API + SSL-only verification │
proxywatchd.py ─ proxy recheck daemon
SQLite: proxies.db, websites.db
└──────────┬───────────────────────────────┘
WireGuard (10.200.1.0/24)
┌────────────────┼────────────────┐
v v v
───────────┐ ┌─────────── ───────────
cassius edge │ │ sentinel
│ Worker Worker Worker
│ ppf.py │ │ ppf.py │ ppf.py
└───────────┘ └───────────┘ └───────────┘
```
Workers claim URLs, extract proxies, test them, report back.
Master verifies (SSL-only), serves API, coordinates distribution.
## Constraints
- **Python 2.7** compatibility required
- **Minimal external dependencies** (avoid adding new modules)
- Current dependencies: beautifulsoup4, pyasn, IP2Location
- Data files: IP2LOCATION-LITE-DB1.BIN (country), ipasn.dat (ASN)
- Python 2.7 runtime (container-based)
- Minimal external dependencies
- All traffic via Tor
---
## Open Work
## Phase 1: Performance and Quality (current)
### Validation
Profiling-driven optimizations and source pipeline hardening.
| Task | Description | File(s) |
|------|-------------|---------|
| Protocol fingerprinting | Better SOCKS4/SOCKS5/HTTP detection | rocksock.py |
| Item | Status | Description |
|------|--------|-------------|
| Extraction short-circuits | done | Guard clauses in fetch.py extractors |
| Skip shutdown on failed sockets | done | Track _connected flag, skip shutdown on dead sockets |
| SQLite connection reuse (odin) | done | Per-greenlet cached handles via threading.local |
| Lazy-load ASN database | done | Defer ipasn.dat parsing to first lookup |
| Add more seed sources (100+) | done | Expanded to 120+ URLs with SOCKS5-specific sources |
| Protocol-aware source weighting | done | Dynamic SOCKS boost in claim_urls scoring |
| Sharpen error penalty in URL scoring | done | Reduce erroring URL claim frequency |
### Target Management
## Phase 2: Proxy Diversity and Consumer API
| Task | Description | File(s) |
|------|-------------|---------|
| Dynamic target pool | Auto-discover and rotate validation targets | proxywatchd.py |
| Target health tracking | Remove unresponsive targets from pool | proxywatchd.py |
| Geographic target spread | Ensure targets span multiple regions | config.py |
Address customer-reported quality gaps.
| Item | Status | Description |
|------|--------|-------------|
| ASN diversity scoring | pending | Deprioritize over-represented ASNs in testing |
| Graduated recheck intervals | pending | Fresh proxies rechecked more often than stale |
| API filters (proto/country/ASN/latency) | pending | Consumer-facing query parameters on /proxies |
| Latency-based ranking | pending | Expose latency percentiles per proxy |
## Phase 3: Self-Expanding Source Pool
Worker-driven link discovery from productive pages.
| Item | Status | Description |
|------|--------|-------------|
| Link extraction from productive pages | pending | Parse HTML for links when page yields proxies |
| Report discovered URLs to master | pending | New endpoint for worker URL submissions |
| Conditional discovery | pending | Only extract links from confirmed-productive pages |
## Phase 4: Long-Term
| Item | Status | Description |
|------|--------|-------------|
| Python 3 migration | deferred | Unblocks modern deps, security patches, pyasn native |
| Worker trust scoring | pending | Activate spot-check verification framework |
| Dynamic target pool | pending | Auto-discover and rotate validation targets |
| Geographic target spread | pending | Ensure targets span multiple regions |
---
## Completed
| Item | Date | Description |
|------|------|-------------|
| Sharpen URL error penalty | 2026-02-22 | error*0.5 cap 4.0 + stale*0.2 cap 1.5 |
| SOCKS5 source expansion | 2026-02-22 | Added 10 new SOCKS5-specific sources |
| SQLite connection reuse | 2026-02-22 | Per-greenlet cached handles via threading.local |
| Lazy-load ASN database | 2026-02-22 | Deferred ipasn.dat to first lookup |
| Socket shutdown skip | 2026-02-22 | _connected flag, skip shutdown on dead sockets |
| Protocol-aware weighting | 2026-02-22 | Dynamic SOCKS boost in claim_urls scoring |
| Seed sources expanded | 2026-02-22 | 37 -> 120+ URLs |
| last_seen freshness fix | 2026-02-22 | Watchd updates last_seen on verification |
| Periodic re-seeding | 2026-02-22 | Reset errored sources every 6h |
| ASN enrichment | 2026-02-22 | Pure-Python ipasn.dat reader + backfill |
| URL pipeline stats | 2026-02-22 | /api/stats exposes source health metrics |
| Extraction short-circuits | 2026-02-22 | Guard clauses + precompiled table regexes |
| Target health tracking | prior | Cooldown-based health for all target pools |
| MITM field in proxy list | prior | Expose mitm boolean in JSON endpoints |
| V1 worker protocol removal | prior | Cleaned up legacy --worker code path |
---
@@ -67,31 +102,12 @@ PPF (Proxy Fetcher) is a Python 2 proxy scraping and validation framework design
| File | Purpose |
|------|---------|
| ppf.py | Main URL harvester daemon |
| ppf.py | URL harvester, worker main loop |
| proxywatchd.py | Proxy validation daemon |
| scraper.py | Searx search integration |
| fetch.py | HTTP fetching with proxy support |
| dbs.py | Database schema and inserts |
| mysqlite.py | SQLite wrapper |
| rocksock.py | Socket/proxy abstraction (3rd party) |
| http2.py | HTTP client implementation |
| httpd.py | Web dashboard and REST API server |
| fetch.py | HTTP fetching, proxy extraction |
| httpd.py | API server, worker coordination |
| dbs.py | Database schema, seed sources |
| config.py | Configuration management |
| comboparse.py | Config/arg parser framework |
| soup_parser.py | BeautifulSoup wrapper |
| misc.py | Utilities (timestamp, logging) |
| export.py | Proxy export CLI tool |
| engines.py | Search engine implementations |
| connection_pool.py | Tor connection pooling |
| network_stats.py | Network statistics tracking |
| dns.py | DNS resolution with caching |
| mitm.py | MITM certificate detection |
| job.py | Priority job queue |
| static/dashboard.js | Dashboard frontend logic |
| static/dashboard.html | Dashboard HTML template |
| tools/lib/ppf-common.sh | Shared ops library (hosts, wrappers, colors) |
| tools/ppf-deploy | Deploy wrapper (validation + playbook) |
| tools/ppf-logs | View container logs |
| tools/ppf-service | Container lifecycle management |
| tools/playbooks/deploy.yml | Ansible deploy playbook |
| tools/playbooks/inventory.ini | Host inventory (WireGuard IPs) |
| rocksock.py | Socket/proxy abstraction |
| http2.py | HTTP client implementation |
| tools/ppf-deploy | Deployment wrapper |

34
TASKLIST.md Normal file
View File

@@ -0,0 +1,34 @@
# PPF Tasklist
Active execution queue. Ordered by priority.
---
## In Progress
| # | Task | File(s) | Notes |
|---|------|---------|-------|
## Queued
| # | Task | File(s) | Notes |
|---|------|---------|-------|
| 12 | API filters on /proxies (proto/country/ASN) | httpd.py | Consumer query params |
| 8 | Graduated recheck intervals | proxywatchd.py | Fresh proxies checked more often |
## Done
| # | Task | Date |
|---|------|------|
| - | Sharpen URL error penalty scoring | 2026-02-22 |
| - | Add SOCKS5-specific sources (10 new) | 2026-02-22 |
| 3 | Lazy-load ASN database | 2026-02-22 |
| 2 | SQLite connection reuse on odin | 2026-02-22 |
| 1 | Skip socket.shutdown on failed connections | 2026-02-22 |
| 4 | Add more seed sources (100+) | 2026-02-22 |
| 6 | Protocol-aware source weighting | 2026-02-22 |
| - | Extraction short-circuits | 2026-02-22 |
| - | last_seen freshness fix | 2026-02-22 |
| - | Periodic re-seeding | 2026-02-22 |
| - | ASN enrichment | 2026-02-22 |
| - | URL pipeline stats | 2026-02-22 |

82
TODO.md
View File

@@ -1,83 +1,35 @@
# PPF TODO
## Optimization
### [ ] JSON Stats Response Caching
- Cache serialized JSON response with short TTL (1-2s)
- Only regenerate when underlying stats change
- Use ETag/If-None-Match for client-side caching
- Savings: ~7-9s/hour. Low priority, only matters with frequent dashboard access.
### [ ] Object Pooling for Test States
- Pool ProxyTestState and TargetTestJob, reset and reuse
- Savings: ~11-15s/hour. **Not recommended** - high effort, medium risk, modest gain.
### [ ] SQLite Connection Reuse
- Persistent connection per thread with health checks
- Savings: ~0.3s/hour. **Not recommended** - negligible benefit.
Intake buffer. Items refined here move to TASKLIST.md.
---
## Dashboard
### [ ] Performance
- Cache expensive DB queries (top countries, protocol breakdown)
- Lazy-load historical data (only when scrolled into view)
- WebSocket option for push updates (reduce polling overhead)
- Configurable refresh interval via URL param or localStorage
### [ ] Features
- Historical graphs (24h, 7d) using stats_history table
- Per-ASN performance analysis
- Alert thresholds (success rate < X%, MITM detected)
- Mobile-responsive improvements
---
- [ ] Cache expensive DB queries (top countries, protocol breakdown)
- [ ] Historical graphs (24h, 7d) using stats_history table
- [ ] Per-ASN performance analysis
- [ ] Alert thresholds (success rate < X%, MITM detected)
- [ ] WebSocket push updates (reduce polling overhead)
- [ ] Mobile-responsive improvements
## Memory
- [ ] Lock consolidation - reduce per-proxy locks (260k LockType objects)
- [ ] Leaner state objects - reduce dict/list count per job
- [ ] Lock consolidation (260k LockType objects at scale)
- [ ] Leaner state objects per job
Memory scales linearly with queue (~4.5 KB/job). No leaks detected.
Optimize only if memory becomes a constraint.
Memory scales ~4.5 KB/job. No leaks detected. Optimize only if constrained.
---
## Source Pipeline
## Deprecation
### [x] Remove V1 worker protocol
Completed. Removed `--worker` flag, `worker_main()`, `claim_work()`,
`submit_results()`, `/api/work`, `/api/results`, and related config
options. `--worker` now routes to the URL-driven protocol.
---
- [ ] PasteBin/GitHub API scrapers for proxy lists
- [ ] Telegram channel scrapers (beyond t.me/s/ HTML)
- [ ] Source quality decay tracking (flag sources going stale)
- [ ] Deduplication of sources across different URL forms
## Known Issues
### [!] Podman Container Metadata Disappears
`podman ps -a` shows empty even though process is running. Service functions
correctly despite missing metadata. Monitor via `ss -tlnp`, `ps aux`, or
`curl localhost:8081/health`. Low impact.
---
## Container Debugging Checklist
```
1. Check for orphans: ps aux | grep -E "[p]rocess_name"
2. Check port conflicts: ss -tlnp | grep PORT
3. Run foreground: podman run --rm (no -d) to see output
4. Check podman state: podman ps -a
5. Clean stale: pkill -9 -f "pattern" && podman rm -f -a
6. Verify deps: config files, data dirs, volumes exist
7. Check logs: podman logs container_name 2>&1 | tail -50
8. Health check: curl -sf http://localhost:PORT/health
```
`podman ps -a` shows empty even though process is running.
Monitor via `ss -tlnp`, `ps aux`, or `curl localhost:8081/health`.

View File

@@ -3,9 +3,9 @@
"""Combined config file and argument parser."""
try:
from ConfigParser import SafeConfigParser, NoOptionError
from ConfigParser import SafeConfigParser as ConfigParser, NoOptionError
except ImportError:
from configparser import SafeConfigParser, NoOptionError
from configparser import ConfigParser, NoOptionError
from argparse import ArgumentParser
import sys
@@ -23,7 +23,7 @@ class ComboParser(object):
def __init__(self, ini):
self.items = []
self.cparser = SafeConfigParser()
self.cparser = ConfigParser()
self.aparser = ArgumentParser()
self.ini = ini
self.loaded = False

View File

@@ -19,6 +19,8 @@ services:
build: .
network_mode: host
restart: unless-stopped
logging:
driver: k8s-file
stop_signal: SIGTERM
stop_grace_period: 30s
environment:

18
compose.test.yml Normal file
View File

@@ -0,0 +1,18 @@
# PPF test runner (Python 2.7, production deps + pytest)
#
# Mounts source and tests as volumes so no rebuild needed between runs.
#
# Usage:
# podman-compose -f compose.test.yml run --rm test
# podman-compose -f compose.test.yml run --rm test python -m pytest tests/test_fetch.py -v
services:
test:
container_name: ppf-test
build:
context: .
dockerfile: Dockerfile.test
volumes:
- .:/app:ro,Z
working_dir: /app
command: python -m pytest tests/ -v --tb=short

View File

@@ -45,10 +45,10 @@ class Config(ComboParser):
# Validate thread counts (0 allowed for watchd to disable local testing)
if self.watchd.threads < 0:
errors.append('watchd.threads must be >= 0')
if self.ppf.threads < 1:
errors.append('ppf.threads must be >= 1')
if self.scraper.threads < 1:
errors.append('scraper.threads must be >= 1')
if self.ppf.threads < 0:
errors.append('ppf.threads must be >= 0')
if self.scraper.enabled and self.scraper.threads < 1:
errors.append('scraper.threads must be >= 1 when scraper is enabled')
# Validate max_fail
if self.watchd.max_fail < 1:
@@ -170,6 +170,7 @@ class Config(ComboParser):
self.add_item(section, 'heartbeat', int, 60, 'heartbeat interval in seconds (default: 60)', False)
self.add_item(section, 'url_batch_size', int, 5, 'URLs per claim cycle (default: 5)', False)
self.add_item(section, 'fetch_timeout', int, 30, 'timeout for URL fetching (default: 30)', False)
self.add_item(section, 'cache_ttl', int, 300, 'local proxy test cache TTL in seconds, 0 to disable (default: 300)', False)
self.aparser.add_argument("--file", help="import a single file containing proxy addrs", type=str, default='', required=False)
self.aparser.add_argument("--nobs", help="disable BeautifulSoup, use stdlib HTMLParser", action='store_true', default=False)

202
dbs.py
View File

@@ -582,34 +582,107 @@ def insert_urls(urls, search, sqlite):
# Known proxy list sources (GitHub raw lists, APIs)
PROXY_SOURCES = [
# --- GitHub raw lists (sorted by update frequency) ---
# TheSpeedX/PROXY-List - large, hourly updates
'https://raw.githubusercontent.com/TheSpeedX/PROXY-List/master/http.txt',
'https://raw.githubusercontent.com/TheSpeedX/PROXY-List/master/socks4.txt',
'https://raw.githubusercontent.com/TheSpeedX/PROXY-List/master/socks5.txt',
# clarketm/proxy-list - curated, daily
'https://raw.githubusercontent.com/clarketm/proxy-list/master/proxy-list-raw.txt',
# monosans/proxy-list - hourly updates
'https://raw.githubusercontent.com/monosans/proxy-list/main/proxies/http.txt',
'https://raw.githubusercontent.com/monosans/proxy-list/main/proxies/socks4.txt',
'https://raw.githubusercontent.com/monosans/proxy-list/main/proxies/socks5.txt',
# prxchk/proxy-list - 10 min updates
'https://raw.githubusercontent.com/prxchk/proxy-list/main/http.txt',
'https://raw.githubusercontent.com/prxchk/proxy-list/main/socks4.txt',
'https://raw.githubusercontent.com/prxchk/proxy-list/main/socks5.txt',
# jetkai/proxy-list - 10 min updates
'https://raw.githubusercontent.com/jetkai/proxy-list/main/online-proxies/txt/proxies.txt',
# roosterkid/openproxylist
'https://raw.githubusercontent.com/roosterkid/openproxylist/main/HTTPS_RAW.txt',
'https://raw.githubusercontent.com/roosterkid/openproxylist/main/SOCKS4_RAW.txt',
'https://raw.githubusercontent.com/roosterkid/openproxylist/main/SOCKS5_RAW.txt',
# ShiftyTR/Proxy-List
'https://raw.githubusercontent.com/ShiftyTR/Proxy-List/master/http.txt',
'https://raw.githubusercontent.com/ShiftyTR/Proxy-List/master/socks4.txt',
'https://raw.githubusercontent.com/ShiftyTR/Proxy-List/master/socks5.txt',
# hookzof/socks5_list - hourly, SOCKS5 focused
'https://raw.githubusercontent.com/hookzof/socks5_list/master/proxy.txt',
# mmpx12/proxy-list
'https://raw.githubusercontent.com/mmpx12/proxy-list/master/http.txt',
'https://raw.githubusercontent.com/mmpx12/proxy-list/master/socks4.txt',
'https://raw.githubusercontent.com/mmpx12/proxy-list/master/socks5.txt',
# proxyscrape API
'https://api.proxyscrape.com/v2/?request=displayproxies&protocol=http&timeout=10000&country=all',
'https://api.proxyscrape.com/v2/?request=displayproxies&protocol=socks4&timeout=10000&country=all',
'https://api.proxyscrape.com/v2/?request=displayproxies&protocol=socks5&timeout=10000&country=all',
# ShiftyTR/Proxy-List
'https://raw.githubusercontent.com/ShiftyTR/Proxy-List/master/http.txt',
'https://raw.githubusercontent.com/ShiftyTR/Proxy-List/master/socks4.txt',
'https://raw.githubusercontent.com/ShiftyTR/Proxy-List/master/socks5.txt',
# roosterkid/openproxylist
'https://raw.githubusercontent.com/roosterkid/openproxylist/main/HTTPS_RAW.txt',
'https://raw.githubusercontent.com/roosterkid/openproxylist/main/SOCKS4_RAW.txt',
'https://raw.githubusercontent.com/roosterkid/openproxylist/main/SOCKS5_RAW.txt',
# clarketm/proxy-list - curated, daily
'https://raw.githubusercontent.com/clarketm/proxy-list/master/proxy-list-raw.txt',
# officialputuid/KangProxy - 4-6 hour updates
'https://raw.githubusercontent.com/officialputuid/KangProxy/KangProxy/http/http.txt',
'https://raw.githubusercontent.com/officialputuid/KangProxy/KangProxy/https/https.txt',
'https://raw.githubusercontent.com/officialputuid/KangProxy/KangProxy/socks4/socks4.txt',
'https://raw.githubusercontent.com/officialputuid/KangProxy/KangProxy/socks5/socks5.txt',
# iplocate/free-proxy-list - 30 min updates
'https://raw.githubusercontent.com/iplocate/free-proxy-list/main/protocols/http.txt',
'https://raw.githubusercontent.com/iplocate/free-proxy-list/main/protocols/socks4.txt',
'https://raw.githubusercontent.com/iplocate/free-proxy-list/main/protocols/socks5.txt',
# ErcinDedeworken/proxy-list - hourly
'https://raw.githubusercontent.com/ErcinDedeworken/proxy-list/main/proxy-list/data.txt',
# MuRongPIG/Proxy-Master - 10 min updates
'https://raw.githubusercontent.com/MuRongPIG/Proxy-Master/main/http.txt',
'https://raw.githubusercontent.com/MuRongPIG/Proxy-Master/main/socks4.txt',
'https://raw.githubusercontent.com/MuRongPIG/Proxy-Master/main/socks5.txt',
# zloi-user/hideip.me - hourly
'https://raw.githubusercontent.com/zloi-user/hideip.me/main/http.txt',
'https://raw.githubusercontent.com/zloi-user/hideip.me/main/socks4.txt',
'https://raw.githubusercontent.com/zloi-user/hideip.me/main/socks5.txt',
# FLAVIEN-music/proxy-list - 30 min updates
'https://raw.githubusercontent.com/FLAVIEN-music/proxy-list/main/proxies/http.txt',
'https://raw.githubusercontent.com/FLAVIEN-music/proxy-list/main/proxies/socks4.txt',
'https://raw.githubusercontent.com/FLAVIEN-music/proxy-list/main/proxies/socks5.txt',
# Zaeem20/FREE_PROXIES_LIST - 30 min updates
'https://raw.githubusercontent.com/Zaeem20/FREE_PROXIES_LIST/master/http.txt',
'https://raw.githubusercontent.com/Zaeem20/FREE_PROXIES_LIST/master/https.txt',
'https://raw.githubusercontent.com/Zaeem20/FREE_PROXIES_LIST/master/socks4.txt',
'https://raw.githubusercontent.com/Zaeem20/FREE_PROXIES_LIST/master/socks5.txt',
# r00tee/Proxy-List - hourly
'https://raw.githubusercontent.com/r00tee/Proxy-List/main/Https.txt',
'https://raw.githubusercontent.com/r00tee/Proxy-List/main/Socks4.txt',
'https://raw.githubusercontent.com/r00tee/Proxy-List/main/Socks5.txt',
# casals-ar/proxy-list
'https://raw.githubusercontent.com/casals-ar/proxy-list/main/http',
'https://raw.githubusercontent.com/casals-ar/proxy-list/main/socks4',
'https://raw.githubusercontent.com/casals-ar/proxy-list/main/socks5',
# yemixzy/proxy-list
'https://raw.githubusercontent.com/yemixzy/proxy-list/main/proxies/http.txt',
'https://raw.githubusercontent.com/yemixzy/proxy-list/main/proxies/socks4.txt',
'https://raw.githubusercontent.com/yemixzy/proxy-list/main/proxies/socks5.txt',
# opsxcq/proxy-list
'https://raw.githubusercontent.com/opsxcq/proxy-list/master/list.txt',
# im-razvan/proxy_list - 10 min updates
'https://raw.githubusercontent.com/im-razvan/proxy_list/main/http.txt',
'https://raw.githubusercontent.com/im-razvan/proxy_list/main/socks4.txt',
'https://raw.githubusercontent.com/im-razvan/proxy_list/main/socks5.txt',
# zevtyardt/proxy-list - daily SOCKS5
'https://raw.githubusercontent.com/zevtyardt/proxy-list/main/socks5.txt',
# UptimerBot/proxy-list - 15 min updates
'https://raw.githubusercontent.com/UptimerBot/proxy-list/main/proxies/socks5.txt',
# Anonym0usWork1221/Free-Proxies
'https://raw.githubusercontent.com/Anonym0usWork1221/Free-Proxies/main/proxy_files/https_proxies.txt',
'https://raw.githubusercontent.com/Anonym0usWork1221/Free-Proxies/main/proxy_files/socks4_proxies.txt',
'https://raw.githubusercontent.com/Anonym0usWork1221/Free-Proxies/main/proxy_files/socks5_proxies.txt',
# ErcinDedeoglu/proxies - hourly
'https://raw.githubusercontent.com/ErcinDedeoglu/proxies/main/proxies/http.txt',
'https://raw.githubusercontent.com/ErcinDedeoglu/proxies/main/proxies/socks4.txt',
'https://raw.githubusercontent.com/ErcinDedeoglu/proxies/main/proxies/socks5.txt',
# dinoz0rg/proxy-list - daily, all protocols
'https://raw.githubusercontent.com/dinoz0rg/proxy-list/main/all.txt',
# elliottophellia/proxylist - SOCKS5
'https://raw.githubusercontent.com/elliottophellia/proxylist/master/results/socks5/global/socks5_len.txt',
# gfpcom/free-proxy-list - SOCKS5
'https://raw.githubusercontent.com/gfpcom/free-proxy-list/main/socks5.txt',
# databay-labs/free-proxy-list - SOCKS5
'https://raw.githubusercontent.com/databay-labs/free-proxy-list/master/socks5.txt',
# --- GitHub Pages / CDN hosted ---
# proxifly/free-proxy-list - 5 min updates (jsDelivr CDN)
'https://cdn.jsdelivr.net/gh/proxifly/free-proxy-list@main/proxies/protocols/http/data.txt',
'https://cdn.jsdelivr.net/gh/proxifly/free-proxy-list@main/proxies/protocols/socks4/data.txt',
@@ -618,31 +691,86 @@ PROXY_SOURCES = [
'https://vakhov.github.io/fresh-proxy-list/http.txt',
'https://vakhov.github.io/fresh-proxy-list/socks4.txt',
'https://vakhov.github.io/fresh-proxy-list/socks5.txt',
# prxchk/proxy-list - 10 min updates
'https://raw.githubusercontent.com/prxchk/proxy-list/main/http.txt',
'https://raw.githubusercontent.com/prxchk/proxy-list/main/socks4.txt',
'https://raw.githubusercontent.com/prxchk/proxy-list/main/socks5.txt',
# sunny9577/proxy-scraper - 3 hour updates (GitHub Pages)
'https://sunny9577.github.io/proxy-scraper/generated/http_proxies.txt',
'https://sunny9577.github.io/proxy-scraper/generated/socks4_proxies.txt',
'https://sunny9577.github.io/proxy-scraper/generated/socks5_proxies.txt',
# officialputuid/KangProxy - 4-6 hour updates
'https://raw.githubusercontent.com/officialputuid/KangProxy/KangProxy/http/http.txt',
'https://raw.githubusercontent.com/officialputuid/KangProxy/KangProxy/socks4/socks4.txt',
'https://raw.githubusercontent.com/officialputuid/KangProxy/KangProxy/socks5/socks5.txt',
# hookzof/socks5_list - hourly updates
'https://raw.githubusercontent.com/hookzof/socks5_list/master/proxy.txt',
# iplocate/free-proxy-list - 30 min updates
'https://raw.githubusercontent.com/iplocate/free-proxy-list/main/protocols/http.txt',
'https://raw.githubusercontent.com/iplocate/free-proxy-list/main/protocols/socks4.txt',
'https://raw.githubusercontent.com/iplocate/free-proxy-list/main/protocols/socks5.txt',
# --- API endpoints ---
# proxyscrape
'https://api.proxyscrape.com/v2/?request=displayproxies&protocol=http&timeout=10000&country=all',
'https://api.proxyscrape.com/v2/?request=displayproxies&protocol=socks4&timeout=10000&country=all',
'https://api.proxyscrape.com/v2/?request=displayproxies&protocol=socks5&timeout=10000&country=all',
# proxy-list.download - SOCKS5 API
'https://www.proxy-list.download/api/v1/get?type=socks5',
'https://www.proxy-list.download/api/v1/get?type=socks4',
# openproxylist.xyz - plain text
'https://api.openproxylist.xyz/http.txt',
'https://api.openproxylist.xyz/socks4.txt',
'https://api.openproxylist.xyz/socks5.txt',
# spys.me - plain text, 30 min updates
'http://spys.me/proxy.txt',
'http://spys.me/socks.txt',
# --- Web scrapers (HTML pages) ---
# spys.one - mixed protocols, requires parsing
'https://spys.one/en/free-proxy-list/',
'https://spys.one/en/socks-proxy-list/',
'https://spys.one/en/https-ssl-proxy/',
# free-proxy-list.net
'https://free-proxy-list.net/',
'https://www.sslproxies.org/',
'https://www.socks-proxy.net/',
# sockslist.us - SOCKS5 focused
'https://sockslist.us/',
# mtpro.xyz - SOCKS5, updated every 5 min
'https://mtpro.xyz/socks5',
# proxy-tools.com - SOCKS5 filtered
'https://proxy-tools.com/proxy/socks5',
# hidemy.name - all protocols, paginated
'https://hide.mn/en/proxy-list/',
# advanced.name - SOCKS5 filtered
'https://advanced.name/freeproxy?type=socks5',
# proxynova.com - by country
'https://www.proxynova.com/proxy-server-list/',
# freeproxy.world - SOCKS5 filtered
'https://www.freeproxy.world/?type=socks5',
# proxydb.net - all protocols
'http://proxydb.net/',
# geonode
'https://proxylist.geonode.com/api/proxy-list?limit=500&page=1&sort_by=lastChecked&sort_type=desc&protocols=http',
'https://proxylist.geonode.com/api/proxy-list?limit=500&page=1&sort_by=lastChecked&sort_type=desc&protocols=socks4',
'https://proxylist.geonode.com/api/proxy-list?limit=500&page=1&sort_by=lastChecked&sort_type=desc&protocols=socks5',
# openproxy.space
'https://openproxy.space/list/http',
'https://openproxy.space/list/socks4',
'https://openproxy.space/list/socks5',
# --- Telegram channels (public HTML view) ---
'https://t.me/s/spys_one',
'https://t.me/s/proxyfree1',
'https://t.me/s/proxylist4free',
'https://t.me/s/proxy_lists',
'https://t.me/s/Proxies4ForYou',
]
def seed_proxy_sources(sqlite):
"""Seed known proxy list sources into uris table."""
def seed_proxy_sources(sqlite, reset_errors=False):
"""Seed known proxy list sources into uris table.
Args:
sqlite: Database connection
reset_errors: If True, reset error/stale counts on existing seed
sources that have errored out, allowing them to be
retried. Safe to call periodically.
"""
timestamp = int(time.time())
added = 0
reset = 0
for url in PROXY_SOURCES:
try:
sqlite.execute(
@@ -653,11 +781,21 @@ def seed_proxy_sources(sqlite):
)
if sqlite.cursor.rowcount > 0:
added += 1
elif reset_errors:
# Reset errored-out seed sources so they get reclaimed
sqlite.execute(
'UPDATE uris SET error = 0, stale_count = 0, '
'check_interval = 3600, check_time = 0 '
'WHERE url = ? AND error >= 5',
(url,)
)
if sqlite.cursor.rowcount > 0:
reset += 1
except Exception as e:
_log('seed_urls insert error for %s: %s' % (url, e), 'warn')
sqlite.commit()
if added > 0:
_log('seeded %d proxy source URLs' % added, 'info')
if added > 0 or reset > 0:
_log('seed sources: %d new, %d reset' % (added, reset), 'info')
def save_session_state(sqlite, stats):

View File

@@ -221,6 +221,10 @@ def extract_auth_proxies(content):
"""
proxies = []
# Short-circuit: auth proxies always contain @
if '@' not in content:
return proxies
# IPv4 auth proxies
for match in AUTH_PROXY_PATTERN.finditer(content):
proto_str, user, passwd, ip, port = match.groups()
@@ -256,6 +260,12 @@ TABLE_PORT_HEADERS = ('port',)
TABLE_PROTO_HEADERS = ('type', 'protocol', 'proto', 'scheme')
_TABLE_PATTERN = re.compile(r'<table[^>]*>(.*?)</table>', re.IGNORECASE | re.DOTALL)
_ROW_PATTERN = re.compile(r'<tr[^>]*>(.*?)</tr>', re.IGNORECASE | re.DOTALL)
_CELL_PATTERN = re.compile(r'<t[hd][^>]*>(.*?)</t[hd]>', re.IGNORECASE | re.DOTALL)
_TAG_STRIP = re.compile(r'<[^>]+>')
def extract_proxies_from_table(content):
"""Extract proxies from HTML tables with IP/Port/Protocol columns.
@@ -269,26 +279,23 @@ def extract_proxies_from_table(content):
"""
proxies = []
# Simple regex-based table parsing (works without BeautifulSoup)
# Find all tables
table_pattern = re.compile(r'<table[^>]*>(.*?)</table>', re.IGNORECASE | re.DOTALL)
row_pattern = re.compile(r'<tr[^>]*>(.*?)</tr>', re.IGNORECASE | re.DOTALL)
cell_pattern = re.compile(r'<t[hd][^>]*>(.*?)</t[hd]>', re.IGNORECASE | re.DOTALL)
tag_strip = re.compile(r'<[^>]+>')
# Short-circuit: no HTML tables in plain text content
if '<table' not in content and '<TABLE' not in content:
return proxies
for table_match in table_pattern.finditer(content):
for table_match in _TABLE_PATTERN.finditer(content):
table_html = table_match.group(1)
rows = row_pattern.findall(table_html)
rows = _ROW_PATTERN.findall(table_html)
if not rows:
continue
# Parse header row to find column indices
ip_col = port_col = proto_col = -1
header_row = rows[0]
headers = cell_pattern.findall(header_row)
headers = _CELL_PATTERN.findall(header_row)
for i, cell in enumerate(headers):
cell_text = tag_strip.sub('', cell).strip().lower()
cell_text = _TAG_STRIP.sub('', cell).strip().lower()
if ip_col < 0 and any(h in cell_text for h in TABLE_IP_HEADERS):
ip_col = i
elif port_col < 0 and any(h in cell_text for h in TABLE_PORT_HEADERS):
@@ -302,11 +309,11 @@ def extract_proxies_from_table(content):
# Parse data rows
for row in rows[1:]:
cells = cell_pattern.findall(row)
cells = _CELL_PATTERN.findall(row)
if len(cells) <= ip_col:
continue
ip_cell = tag_strip.sub('', cells[ip_col]).strip()
ip_cell = _TAG_STRIP.sub('', cells[ip_col]).strip()
# Check if IP cell contains port (ip:port format)
if ':' in ip_cell and port_col < 0:
@@ -315,7 +322,7 @@ def extract_proxies_from_table(content):
ip, port = match.groups()
proto = None
if proto_col >= 0 and len(cells) > proto_col:
proto = _normalize_proto(tag_strip.sub('', cells[proto_col]).strip())
proto = _normalize_proto(_TAG_STRIP.sub('', cells[proto_col]).strip())
addr = '%s:%s' % (ip, port)
if is_usable_proxy(addr):
proxies.append((addr, proto))
@@ -323,7 +330,7 @@ def extract_proxies_from_table(content):
# Separate IP and Port columns
if port_col >= 0 and len(cells) > port_col:
port_cell = tag_strip.sub('', cells[port_col]).strip()
port_cell = _TAG_STRIP.sub('', cells[port_col]).strip()
try:
port = int(port_cell)
except ValueError:
@@ -335,7 +342,7 @@ def extract_proxies_from_table(content):
proto = None
if proto_col >= 0 and len(cells) > proto_col:
proto = _normalize_proto(tag_strip.sub('', cells[proto_col]).strip())
proto = _normalize_proto(_TAG_STRIP.sub('', cells[proto_col]).strip())
addr = '%s:%d' % (ip_cell, port)
if is_usable_proxy(addr):
@@ -358,6 +365,10 @@ def extract_proxies_from_json(content):
"""
proxies = []
# Short-circuit: content must contain JSON delimiters
if '{' not in content and '[' not in content:
return proxies
# Try to find JSON in content (may be embedded in HTML)
json_matches = []

293
httpd.py
View File

@@ -31,6 +31,68 @@ except (ImportError, IOError, ValueError):
_geodb = None
_geolite = False
# ASN lookup (optional, lazy-loaded on first use)
# Defers ~3.6s startup cost of parsing ipasn.dat until first ASN lookup.
_asndb = None
_asndb_loaded = False
_asn_dat_path = os.path.join("data", "ipasn.dat")
import socket
import struct
import bisect
class _AsnLookup(object):
"""Pure-Python ASN lookup using ipasn.dat (CIDR/ASN text format)."""
def __init__(self, path):
self._entries = []
with open(path) as f:
for line in f:
line = line.strip()
if not line or line.startswith(';'):
continue
parts = line.split('\t')
if len(parts) != 2:
continue
cidr, asn = parts
ip, prefix = cidr.split('/')
start = struct.unpack('!I', socket.inet_aton(ip))[0]
self._entries.append((start, int(prefix), int(asn)))
self._entries.sort()
_log('asn: loaded %d prefixes (pure-python)' % len(self._entries), 'info')
def lookup(self, ip):
ip_int = struct.unpack('!I', socket.inet_aton(ip))[0]
idx = bisect.bisect_right(self._entries, (ip_int, 33, 0)) - 1
if idx < 0:
return (None, None)
start, prefix_len, asn = self._entries[idx]
mask = (0xFFFFFFFF << (32 - prefix_len)) & 0xFFFFFFFF
if (ip_int & mask) == (start & mask):
return (asn, None)
return (None, None)
def _get_asndb():
"""Lazy-load ASN database on first call. Returns db instance or None."""
global _asndb, _asndb_loaded
if _asndb_loaded:
return _asndb
_asndb_loaded = True
try:
import pyasn
_asndb = pyasn.pyasn(_asn_dat_path)
return _asndb
except (ImportError, IOError):
pass
if os.path.exists(_asn_dat_path):
try:
_asndb = _AsnLookup(_asn_dat_path)
except Exception as e:
_log('asn: failed to load %s: %s' % (_asn_dat_path, e), 'warn')
return _asndb
# Rate limiting configuration
_rate_limits = defaultdict(list)
_rate_lock = threading.Lock()
@@ -107,6 +169,30 @@ _fail_retry_interval = 60 # retry interval for failing proxies
_fail_retry_backoff = True # True=linear backoff (60,120,180...), False=fixed (60,60,60...)
_max_fail = 5 # failures before proxy considered dead
# Per-greenlet (or per-thread) SQLite connection cache
# Under gevent, threading.local() is monkey-patched to greenlet-local storage.
# Connections are reused across requests handled by the same greenlet, eliminating
# redundant sqlite3.connect() + PRAGMA calls (~0.5ms each, ~2.7k/session on odin).
_local = threading.local()
def _get_db(path):
"""Get a cached SQLite connection for the proxy database."""
db = getattr(_local, 'proxy_db', None)
if db is None:
db = mysqlite.mysqlite(path, str)
_local.proxy_db = db
return db
def _get_url_db(path):
"""Get a cached SQLite connection for the URL database."""
db = getattr(_local, 'url_db', None)
if db is None:
db = mysqlite.mysqlite(path, str)
_local.url_db = db
return db
def configure_schedule(working_checktime, fail_retry_interval, fail_retry_backoff, max_fail):
"""Set testing schedule parameters from config."""
@@ -300,6 +386,42 @@ def get_worker_test_rate(worker_id):
return 0.0
return total_tests / elapsed
def _get_proto_boost():
"""Calculate protocol scarcity boost for URL scoring.
Returns a value 0.0-1.0 to boost SOCKS sources when SOCKS proxies
are underrepresented relative to HTTP. Returns 0.0 when balanced.
"""
try:
if not _proxy_database:
return 0.0
db = _get_db(_proxy_database)
if not db:
return 0.0
row = db.execute(
"SELECT "
" SUM(CASE WHEN proto='http' THEN 1 ELSE 0 END),"
" SUM(CASE WHEN proto IN ('socks4','socks5') THEN 1 ELSE 0 END)"
" FROM proxylist WHERE failed=0"
).fetchone()
if not row or not row[0]:
return 0.5 # no data, default mild boost
http_count, socks_count = row[0] or 0, row[1] or 0
total = http_count + socks_count
if total == 0:
return 0.5
socks_ratio = float(socks_count) / total
# Boost SOCKS sources when socks_ratio < 40%
if socks_ratio >= 0.4:
return 0.0
return min((0.4 - socks_ratio) * 2.5, 1.0) # 0.0-1.0 scale
except Exception:
return 0.0
# Global reference to proxy database path (set by ProxyAPIServer.__init__)
_proxy_database = None
def claim_urls(url_db, worker_id, count=5):
"""Claim a batch of URLs for worker-driven fetching. Returns list of URL dicts.
@@ -310,6 +432,7 @@ def claim_urls(url_db, worker_id, count=5):
- quality_bonus: 0-0.5 based on working_ratio
- error_penalty: 0-2.0 based on consecutive errors
- stale_penalty: 0-1.0 based on unchanged fetches
- proto_boost: 0-1.0 for SOCKS sources when SOCKS underrepresented
"""
now = time.time()
now_int = int(now)
@@ -335,14 +458,19 @@ def claim_urls(url_db, worker_id, count=5):
list_max_age_seconds = _url_list_max_age_days * 86400
min_added = now_int - list_max_age_seconds
# Boost SOCKS sources when protocol pool is imbalanced
proto_boost = _get_proto_boost()
try:
rows = url_db.execute(
'''SELECT url, content_hash,
(? - check_time) * 1.0 / MAX(COALESCE(check_interval, 3600), 1)
+ MIN(COALESCE(yield_rate, 0) / 100.0, 1.0)
+ COALESCE(working_ratio, 0) * 0.5
- MIN(error * 0.3, 2.0)
- MIN(stale_count * 0.1, 1.0)
- MIN(error * 0.5, 4.0)
- MIN(stale_count * 0.2, 1.5)
+ CASE WHEN LOWER(url) LIKE '%socks5%' OR LOWER(url) LIKE '%socks4%'
THEN ? ELSE 0 END
AS score
FROM uris
WHERE error < ?
@@ -350,7 +478,7 @@ def claim_urls(url_db, worker_id, count=5):
AND (added > ? OR proxies_added > 0)
ORDER BY score DESC
LIMIT ?''',
(now_int, _url_max_fail, now_int, min_added, count * 3)
(now_int, proto_boost, _url_max_fail, now_int, min_added, count * 3)
).fetchall()
except Exception as e:
_log('claim_urls query error: %s' % e, 'error')
@@ -526,7 +654,7 @@ def _update_url_working_ratios(url_working_counts):
pending_snapshot = dict(_url_pending_counts)
try:
url_db = mysqlite.mysqlite(_url_database_path, str)
url_db = _get_url_db(_url_database_path)
for url, working_count in url_working_counts.items():
pending = pending_snapshot.get(url)
if not pending or pending['total'] <= 0:
@@ -547,7 +675,6 @@ def _update_url_working_ratios(url_working_counts):
settled.append(url)
url_db.commit()
url_db.close()
except Exception as e:
_log('_update_url_working_ratios error: %s' % e, 'error')
@@ -604,7 +731,7 @@ def submit_proxy_reports(db, worker_id, proxies):
''', (proxy_key, ip, port, proto, now_int, now_int, latency, now_int,
checktype, target))
# Geolocate if IP2Location available
# Geolocate and ASN lookup
if _geolite and _geodb:
try:
rec = _geodb.get_all(ip)
@@ -614,6 +741,16 @@ def submit_proxy_reports(db, worker_id, proxies):
(rec.country_short, proxy_key))
except Exception:
pass
asndb = _get_asndb()
if asndb:
try:
asn_result = asndb.lookup(ip)
if asn_result and asn_result[0]:
db.execute(
'UPDATE proxylist SET asn=? WHERE proxy=?',
(asn_result[0], proxy_key))
except Exception:
pass
# Track per-URL working count for working_ratio
if source_url:
@@ -1050,7 +1187,7 @@ class ProxyAPIHandler(BaseHTTPServer.BaseHTTPRequestHandler):
def handle_countries(self):
"""Return all countries with proxy counts."""
try:
db = mysqlite.mysqlite(self.database, str)
db = _get_db(self.database)
rows = db.execute(
'SELECT country, COUNT(*) as c FROM proxylist WHERE failed=0 AND country IS NOT NULL '
'GROUP BY country ORDER BY c DESC'
@@ -1069,7 +1206,7 @@ class ProxyAPIHandler(BaseHTTPServer.BaseHTTPRequestHandler):
def get_db_stats(self):
"""Get statistics from database."""
try:
db = mysqlite.mysqlite(self.database, str)
db = _get_db(self.database)
stats = {}
# Total counts
@@ -1117,7 +1254,7 @@ class ProxyAPIHandler(BaseHTTPServer.BaseHTTPRequestHandler):
# Add database stats
try:
db = mysqlite.mysqlite(self.database, str)
db = _get_db(self.database)
stats['db'] = self.get_db_stats()
stats['db_health'] = get_db_health(db)
except Exception as e:
@@ -1204,7 +1341,7 @@ class ProxyAPIHandler(BaseHTTPServer.BaseHTTPRequestHandler):
args.append(limit)
try:
db = mysqlite.mysqlite(self.database, str)
db = _get_db(self.database)
rows = db.execute(sql, args).fetchall()
if fmt == 'plain':
@@ -1253,7 +1390,7 @@ class ProxyAPIHandler(BaseHTTPServer.BaseHTTPRequestHandler):
sql += ' ORDER BY avg_latency ASC, tested DESC'
try:
db = mysqlite.mysqlite(self.database, str)
db = _get_db(self.database)
rows = db.execute(sql, args).fetchall()
if fmt == 'plain':
@@ -1270,7 +1407,7 @@ class ProxyAPIHandler(BaseHTTPServer.BaseHTTPRequestHandler):
def handle_count(self):
try:
db = mysqlite.mysqlite(self.database, str)
db = _get_db(self.database)
row = db.execute('SELECT COUNT(*) FROM proxylist WHERE failed=0 AND proto IS NOT NULL').fetchone()
self.send_json({'count': row[0] if row else 0})
except Exception as e:
@@ -1296,8 +1433,9 @@ class ProxyAPIServer(threading.Thread):
self.stats_provider = stats_provider
self.profiling = profiling
self.daemon = True
global _url_database_path
global _url_database_path, _proxy_database
_url_database_path = url_database
_proxy_database = database
self.server = None
self._stop_event = threading.Event() if not GEVENT_PATCHED else None
# Load static library files into cache
@@ -1306,14 +1444,42 @@ class ProxyAPIServer(threading.Thread):
load_static_files(THEME)
# Load worker registry from disk
load_workers()
# Backfill ASN for existing proxies missing it (triggers lazy-load)
if _get_asndb():
self._backfill_asn()
# Create verification tables if they don't exist
try:
db = mysqlite.mysqlite(self.database, str)
db = _get_db(self.database)
create_verification_tables(db)
_log('verification tables initialized', 'debug')
except Exception as e:
_log('failed to create verification tables: %s' % e, 'warn')
def _backfill_asn(self):
"""One-time backfill of ASN for proxies that have ip but no ASN."""
try:
db = _get_db(self.database)
rows = db.execute(
'SELECT proxy, ip FROM proxylist WHERE asn IS NULL AND ip IS NOT NULL'
).fetchall()
if not rows:
return
updated = 0
for proxy_key, ip in rows:
try:
result = _get_asndb().lookup(ip)
if result and result[0]:
db.execute('UPDATE proxylist SET asn=? WHERE proxy=?',
(result[0], proxy_key))
updated += 1
except Exception:
pass
db.commit()
if updated:
_log('asn: backfilled %d/%d proxies' % (updated, len(rows)), 'info')
except Exception as e:
_log('asn backfill error: %s' % e, 'warn')
def _wsgi_app(self, environ, start_response):
"""WSGI application wrapper for gevent."""
path = environ.get('PATH_INFO', '/').split('?')[0]
@@ -1472,11 +1638,15 @@ class ProxyAPIServer(threading.Thread):
stats['system'] = get_system_stats()
# Add database stats
try:
db = mysqlite.mysqlite(self.database, str)
db = _get_db(self.database)
stats['db'] = self._get_db_stats(db)
stats['db_health'] = get_db_health(db)
except Exception as e:
_log('api/stats db error: %s' % e, 'warn')
# Add URL pipeline stats
url_stats = self._get_url_stats()
if url_stats is not None:
stats['urls'] = url_stats
# Add profiling flag (from constructor or stats_provider)
if 'profiling' not in stats:
stats['profiling'] = self.profiling
@@ -1501,7 +1671,7 @@ class ProxyAPIServer(threading.Thread):
# 2. Database stats and health
try:
db = mysqlite.mysqlite(self.database, str)
db = _get_db(self.database)
result['stats']['db'] = self._get_db_stats(db)
result['stats']['db_health'] = get_db_health(db)
@@ -1514,8 +1684,6 @@ class ProxyAPIServer(threading.Thread):
# 4. Workers (same as /api/workers)
result['workers'] = self._get_workers_data(db)
db.close()
except Exception as e:
_log('api/dashboard db error: %s' % e, 'warn')
result['countries'] = {}
@@ -1534,7 +1702,7 @@ class ProxyAPIServer(threading.Thread):
return json.dumps({'error': 'stats not available'}), 'application/json', 500
elif path == '/api/countries':
try:
db = mysqlite.mysqlite(self.database, str)
db = _get_db(self.database)
rows = db.execute(
'SELECT country, COUNT(*) as c FROM proxylist WHERE failed=0 AND country IS NOT NULL '
'GROUP BY country ORDER BY c DESC'
@@ -1546,7 +1714,7 @@ class ProxyAPIServer(threading.Thread):
elif path == '/api/locations':
# Return proxy locations aggregated by lat/lon grid (0.5 degree cells)
try:
db = mysqlite.mysqlite(self.database, str)
db = _get_db(self.database)
rows = db.execute(
'SELECT ROUND(latitude, 1) as lat, ROUND(longitude, 1) as lon, '
'country, anonymity, COUNT(*) as c FROM proxylist '
@@ -1566,7 +1734,7 @@ class ProxyAPIServer(threading.Thread):
mitm_filter = query_params.get('mitm', '')
fmt = query_params.get('format', 'json')
sql = 'SELECT ip, port, proto, country, asn, avg_latency, protos_working FROM proxylist WHERE failed=0 AND proto IS NOT NULL AND last_seen >= strftime("%s","now") - 3600'
sql = 'SELECT ip, port, proto, country, asn, avg_latency, protos_working, mitm FROM proxylist WHERE failed=0 AND proto IS NOT NULL AND last_seen >= strftime("%s","now") - 3600'
args = []
if proto:
sql += ' AND proto=?'
@@ -1584,7 +1752,7 @@ class ProxyAPIServer(threading.Thread):
sql += ' ORDER BY avg_latency ASC, tested DESC LIMIT ?'
args.append(limit)
db = mysqlite.mysqlite(self.database, str)
db = _get_db(self.database)
rows = db.execute(sql, args).fetchall()
if fmt == 'plain':
@@ -1592,7 +1760,8 @@ class ProxyAPIServer(threading.Thread):
proxies = [{
'proxy': '%s:%s' % (r[0], r[1]), 'proto': r[2], 'country': r[3],
'asn': r[4], 'latency': r[5],
'protos': r[6].split(',') if r[6] else [r[2]]
'protos': r[6].split(',') if r[6] else [r[2]],
'mitm': bool(r[7]),
} for r in rows]
return json.dumps({'count': len(proxies), 'proxies': proxies}, indent=2), 'application/json', 200
except Exception as e:
@@ -1605,7 +1774,7 @@ class ProxyAPIServer(threading.Thread):
mitm_filter = query_params.get('mitm', '')
fmt = query_params.get('format', 'json')
sql = 'SELECT ip, port, proto, country, asn, avg_latency, protos_working FROM proxylist WHERE failed=0 AND proto IS NOT NULL AND last_seen >= strftime("%s","now") - 3600'
sql = 'SELECT ip, port, proto, country, asn, avg_latency, protos_working, mitm FROM proxylist WHERE failed=0 AND proto IS NOT NULL AND last_seen >= strftime("%s","now") - 3600'
args = []
if proto:
sql += ' AND proto=?'
@@ -1622,7 +1791,7 @@ class ProxyAPIServer(threading.Thread):
sql += ' AND mitm=1'
sql += ' ORDER BY avg_latency ASC, tested DESC'
db = mysqlite.mysqlite(self.database, str)
db = _get_db(self.database)
rows = db.execute(sql, args).fetchall()
if fmt == 'plain':
@@ -1630,7 +1799,8 @@ class ProxyAPIServer(threading.Thread):
proxies = [{
'proxy': '%s:%s' % (r[0], r[1]), 'proto': r[2], 'country': r[3],
'asn': r[4], 'latency': r[5],
'protos': r[6].split(',') if r[6] else [r[2]]
'protos': r[6].split(',') if r[6] else [r[2]],
'mitm': bool(r[7]),
} for r in rows]
return json.dumps({'count': len(proxies), 'proxies': proxies}, indent=2), 'application/json', 200
except Exception as e:
@@ -1643,7 +1813,7 @@ class ProxyAPIServer(threading.Thread):
sql += ' AND mitm=0'
elif mitm_filter == '1':
sql += ' AND mitm=1'
db = mysqlite.mysqlite(self.database, str)
db = _get_db(self.database)
row = db.execute(sql).fetchone()
return json.dumps({'count': row[0] if row else 0}), 'application/json', 200
except Exception as e:
@@ -1699,9 +1869,8 @@ class ProxyAPIServer(threading.Thread):
elif path == '/api/workers':
# List connected workers
try:
db = mysqlite.mysqlite(self.database, str)
db = _get_db(self.database)
workers_data = self._get_workers_data(db)
db.close()
return json.dumps(workers_data, indent=2), 'application/json', 200
except Exception as e:
_log('api/workers error: %s' % e, 'warn')
@@ -1719,7 +1888,7 @@ class ProxyAPIServer(threading.Thread):
return json.dumps({'error': 'url database not configured'}), 'application/json', 500
count = min(int(query_params.get('count', 5)), 20)
try:
url_db = mysqlite.mysqlite(self.url_database, str)
url_db = _get_url_db(self.url_database)
urls = claim_urls(url_db, worker_id, count)
update_worker_heartbeat(worker_id)
return json.dumps({
@@ -1746,7 +1915,7 @@ class ProxyAPIServer(threading.Thread):
if not reports:
return json.dumps({'error': 'no reports provided'}), 'application/json', 400
try:
url_db = mysqlite.mysqlite(self.url_database, str)
url_db = _get_url_db(self.url_database)
processed = submit_url_reports(url_db, worker_id, reports)
update_worker_heartbeat(worker_id)
return json.dumps({
@@ -1770,7 +1939,7 @@ class ProxyAPIServer(threading.Thread):
if not proxies:
return json.dumps({'error': 'no proxies provided'}), 'application/json', 400
try:
db = mysqlite.mysqlite(self.database, str)
db = _get_db(self.database)
processed = submit_proxy_reports(db, worker_id, proxies)
update_worker_heartbeat(worker_id)
return json.dumps({
@@ -1778,6 +1947,7 @@ class ProxyAPIServer(threading.Thread):
'processed': processed,
}), 'application/json', 200
except Exception as e:
_log('report-proxies error from %s: %s' % (worker_id, e), 'error')
return json.dumps({'error': str(e)}), 'application/json', 500
else:
@@ -1818,6 +1988,65 @@ class ProxyAPIServer(threading.Thread):
_log('_get_db_stats error: %s' % e, 'warn')
return stats
def _get_url_stats(self):
"""Get URL pipeline statistics from the websites database."""
if not self.url_database:
return None
try:
db = _get_url_db(self.url_database)
stats = {}
now = int(time.time())
# Total URLs and health breakdown
row = db.execute('SELECT COUNT(*) FROM uris').fetchone()
stats['total'] = row[0] if row else 0
row = db.execute('SELECT COUNT(*) FROM uris WHERE error >= 10').fetchone()
stats['dead'] = row[0] if row else 0
row = db.execute('SELECT COUNT(*) FROM uris WHERE error > 0 AND error < 10').fetchone()
stats['erroring'] = row[0] if row else 0
row = db.execute('SELECT COUNT(*) FROM uris WHERE error = 0').fetchone()
stats['healthy'] = row[0] if row else 0
# Recently active (fetched in last hour)
row = db.execute(
'SELECT COUNT(*) FROM uris WHERE check_time >= ?',
(now - 3600,)).fetchone()
stats['fetched_last_hour'] = row[0] if row else 0
# Productive sources (have produced working proxies)
row = db.execute(
'SELECT COUNT(*) FROM uris WHERE working_ratio > 0'
).fetchone()
stats['productive'] = row[0] if row else 0
# Aggregate yield
row = db.execute(
'SELECT SUM(proxies_added), SUM(retrievals) FROM uris'
).fetchone()
stats['total_proxies_extracted'] = row[0] or 0 if row else 0
stats['total_fetches'] = row[1] or 0 if row else 0
# Currently claimed
with _url_claims_lock:
stats['claimed'] = len(_url_claims)
# Top sources by working_ratio (productive URLs only)
rows = db.execute(
'SELECT url, working_ratio, yield_rate, proxies_added, retrievals '
'FROM uris WHERE working_ratio > 0 AND retrievals > 0 '
'ORDER BY working_ratio DESC LIMIT 10'
).fetchall()
stats['top_sources'] = [{
'url': r[0], 'working_ratio': round(r[1], 3),
'yield_rate': round(r[2], 1), 'proxies_added': r[3],
'fetches': r[4],
} for r in rows]
return stats
except Exception as e:
_log('_get_url_stats error: %s' % e, 'warn')
return None
def _get_workers_data(self, db):
"""Get worker status data. Used by /api/workers and /api/dashboard."""
now = time.time()

100
ppf.py
View File

@@ -474,6 +474,7 @@ def worker_main(config):
_log(' server: %s' % server_url, 'info')
_log(' threads: %d' % num_threads, 'info')
_log(' url batch: %d' % url_batch_size, 'info')
_log(' cache ttl: %s' % ('%ds' % config.worker.cache_ttl if config.worker.cache_ttl > 0 else 'disabled'), 'info')
_log(' tor hosts: %s' % config.common.tor_hosts, 'info')
# Verify Tor connectivity before starting
@@ -532,6 +533,10 @@ def worker_main(config):
worker_profiling = config.args.profile or config.common.profiling
wstate = {'worker_key': worker_key, 'worker_id': worker_id, 'backoff': 10}
# Local proxy test cache: addr -> (timestamp, success, result_dict_or_None)
cache_ttl = config.worker.cache_ttl
proxy_cache = {} if cache_ttl > 0 else None
def do_register():
"""Register with master, with exponential backoff on failure."""
while True:
@@ -710,6 +715,33 @@ def worker_main(config):
time.sleep(1)
continue
# Filter against local test cache
cached_working = []
if proxy_cache is not None:
now = time.time()
uncached = []
cache_hits = 0
for addr, pr, conf in unique_proxies:
# Normalize to ip:port for cache lookup (strip auth prefix)
cache_key = addr.split('@')[-1] if '@' in addr else addr
entry = proxy_cache.get(cache_key)
if entry and (now - entry[0]) < cache_ttl:
cache_hits += 1
if entry[1]: # cached success
cached_working.append(entry[2])
else:
uncached.append((addr, pr, conf))
if cache_hits:
_log('%d cached (%d working), %d to test' % (
cache_hits, len(cached_working), len(uncached)), 'info')
unique_proxies = uncached
if not unique_proxies:
# All proxies were cached, nothing to test
cycles += 1
time.sleep(1)
continue
_log('testing %d unique proxies' % len(unique_proxies), 'info')
# Phase 2: Test extracted proxies using worker thread pool
@@ -769,6 +801,8 @@ def worker_main(config):
timeout_start = time.time()
timeout_seconds = max(config.watchd.timeout * 2 + 30, len(all_jobs) * 0.5)
working_results = []
last_heartbeat = time.time()
last_report = time.time()
while completed < len(all_jobs):
try:
@@ -802,8 +836,55 @@ def worker_main(config):
if time.time() - timeout_start > timeout_seconds:
_log('test timeout, %d/%d completed' % (completed, len(all_jobs)), 'warn')
break
# Periodic heartbeat to prevent stale detection
now = time.time()
if now - last_heartbeat >= 60:
try:
worker_send_heartbeat(server_url, wstate['worker_key'],
True, current_tor_ip, worker_profiling, num_threads)
except NeedReregister:
do_register()
last_heartbeat = now
# Periodic proxy report (flush working results every 5 minutes)
if working_results and now - last_report >= 300:
reported = False
try:
processed = worker_report_proxies(server_url, wstate['worker_key'],
working_results)
if processed > 0:
_log('interim report: %d proxies (%d submitted)' % (
len(working_results), processed), 'info')
reported = True
except NeedReregister:
do_register()
try:
processed = worker_report_proxies(server_url, wstate['worker_key'],
working_results)
if processed > 0:
reported = True
except NeedReregister:
pass
if reported:
working_results = []
last_report = now
continue
# Populate proxy test cache from results
if proxy_cache is not None:
now = time.time()
working_addrs = set()
for r in working_results:
addr = '%s:%d' % (r['ip'], r['port'])
proxy_cache[addr] = (now, True, r)
working_addrs.add(addr)
# Cache failures for tested proxies that didn't succeed
for proxy_str in pending_states:
if proxy_str not in working_addrs:
proxy_cache[proxy_str] = (now, False, None)
proxies_working += len(working_results)
# Report working proxies to master
@@ -820,6 +901,16 @@ def worker_main(config):
_log('reported %d working proxies (submitted %d)' % (len(working_results), processed), 'info')
cycles += 1
# Periodic cache cleanup: evict expired entries every 10 cycles
if proxy_cache is not None and cycles % 10 == 0:
now = time.time()
expired = [k for k, v in proxy_cache.items() if (now - v[0]) >= cache_ttl]
if expired:
for k in expired:
del proxy_cache[k]
_log('cache cleanup: evicted %d expired, %d remaining' % (len(expired), len(proxy_cache)), 'info')
time.sleep(1)
except KeyboardInterrupt:
@@ -835,6 +926,8 @@ def worker_main(config):
_log(' urls fetched: %d' % urls_fetched, 'info')
_log(' proxies found: %d' % proxies_found, 'info')
_log(' proxies working: %d' % proxies_working, 'info')
if proxy_cache is not None:
_log(' cache entries: %d' % len(proxy_cache), 'info')
def main():
@@ -952,8 +1045,15 @@ def main():
statusmsg = time.time()
list_max_age_seconds = config.ppf.list_max_age_days * 86400
last_skip_log = 0
last_reseed = time.time()
reseed_interval = 6 * 3600 # re-seed sources every 6 hours
while True:
try:
# Periodic re-seeding: reset errored-out seed sources
if time.time() - last_reseed >= reseed_interval:
dbs.seed_proxy_sources(urldb, reset_errors=True)
last_reseed = time.time()
# When ppf threads = 0, skip URL fetching (workers handle it via /api/claim-urls)
if config.ppf.threads == 0:
time.sleep(60)

View File

@@ -44,7 +44,7 @@ import dns
from misc import _log, categorize_error, tor_proxy_url, is_ssl_protocol_error
import rocksock
import connection_pool
from stats import JudgeStats, Stats, regexes, ssl_targets, try_div
from stats import TargetStats, JudgeStats, Stats, regexes, ssl_targets, try_div
from mitm import MITMCertStats, extract_cert_info, get_mitm_certificate
from dns import socks4_resolve
from job import PriorityJobQueue, calculate_priority
@@ -164,9 +164,9 @@ DEAD_PROXY = -1
# Error categories that indicate proxy is definitely dead (not temporary failure)
FATAL_ERROR_CATEGORIES = ('refused', 'unreachable', 'auth')
# Patterns indicating judge is blocking the proxy (not a proxy failure)
# These should NOT count as proxy failures - retry with different judge
JUDGE_BLOCK_PATTERNS = [
# Patterns indicating HTTP target is blocking the proxy (not a proxy failure)
# These should NOT count as proxy failures - applies to judges and head targets
HTTP_BLOCK_PATTERNS = [
r'HTTP/1\.[01] 403', # Forbidden
r'HTTP/1\.[01] 429', # Too Many Requests
r'HTTP/1\.[01] 503', # Service Unavailable
@@ -179,7 +179,7 @@ JUDGE_BLOCK_PATTERNS = [
r'blocked', # Explicit block
r'Checking your browser', # Cloudflare JS challenge
]
JUDGE_BLOCK_RE = re.compile('|'.join(JUDGE_BLOCK_PATTERNS), re.IGNORECASE)
HTTP_BLOCK_RE = re.compile('|'.join(HTTP_BLOCK_PATTERNS), re.IGNORECASE)
# Check types: irc, http (header match), judges (body match), ssl (TLS handshake)
# Judge services - return IP in body (plain text, JSON, or HTML)
@@ -213,6 +213,9 @@ judges = {
# Global instances
judge_stats = JudgeStats()
head_target_stats = TargetStats(cooldown_seconds=300, block_threshold=3)
ssl_target_stats = TargetStats(cooldown_seconds=300, block_threshold=3)
irc_target_stats = TargetStats(cooldown_seconds=300, block_threshold=3)
mitm_cert_stats = MITMCertStats()
@@ -411,18 +414,19 @@ class ProxyTestState(object):
self.evaluated = True
self.checktime = int(time.time())
# Filter out judge_block results (inconclusive, neither pass nor fail)
real_results = [r for r in self.results if r.get('category') != 'judge_block']
# Filter out target_block results (inconclusive, neither pass nor fail)
block_cats = ('judge_block', 'target_block')
real_results = [r for r in self.results if r.get('category') not in block_cats]
successes = [r for r in real_results if r['success']]
failures = [r for r in real_results if not r['success']]
num_success = len(successes)
judge_blocks = len(self.results) - len(real_results)
_dbg('evaluate: %d success, %d fail, %d judge_block, results=%d' % (
num_success, len(failures), judge_blocks, len(self.results)), self.proxy)
target_blocks = len(self.results) - len(real_results)
_dbg('evaluate: %d success, %d fail, %d target_block, results=%d' % (
num_success, len(failures), target_blocks, len(self.results)), self.proxy)
# All results were judge blocks: inconclusive, preserve current state
# All results were target blocks: inconclusive, preserve current state
if not real_results and self.results:
_dbg('all results inconclusive (judge_block), no state change', self.proxy)
_dbg('all results inconclusive (target_block), no state change', self.proxy)
self.failcount = self.original_failcount
return (self.original_failcount == 0, None)
@@ -617,6 +621,10 @@ class TargetTestJob(object):
reveals_headers = bool(re.search(HEADER_REVEAL_PATTERN, recv, re.IGNORECASE))
# Record successful judge
judge_stats.record_success(srv)
elif self.checktype == 'head':
head_target_stats.record_success(srv)
elif self.checktype == 'irc':
irc_target_stats.record_success(srv)
self.proxy_state.record_result(
True, proto=proto, duration=elapsed,
srv=srv, tor=tor, ssl=is_ssl, exit_ip=exit_ip,
@@ -624,22 +632,28 @@ class TargetTestJob(object):
)
else:
_dbg('regex NO MATCH, recv[:100]=%r' % recv[:100], self.proxy_state.proxy)
# Check if judge is blocking us (not a proxy failure)
if self.checktype == 'judges' and JUDGE_BLOCK_RE.search(recv):
judge_stats.record_block(srv)
# Judge block = inconclusive, not a pass or fail
_dbg('judge BLOCK detected, skipping (neutral)', self.proxy_state.proxy)
# Check if HTTP target is blocking us (not a proxy failure)
if self.checktype in ('judges', 'head') and HTTP_BLOCK_RE.search(recv):
if self.checktype == 'judges':
judge_stats.record_block(srv)
else:
head_target_stats.record_block(srv)
_dbg('target BLOCK detected, skipping (neutral)', self.proxy_state.proxy)
self.proxy_state.record_result(
False, category='judge_block', proto=proto,
False, category='target_block', proto=proto,
srv=srv, tor=tor, ssl=is_ssl
)
if config.watchd.debug:
_log('judge %s challenged proxy %s (neutral, skipped)' % (
srv, self.proxy_state.proxy), 'debug')
_log('%s %s challenged proxy %s (neutral, skipped)' % (
self.checktype, srv, self.proxy_state.proxy), 'debug')
else:
_dbg('FAIL: no match, no block', self.proxy_state.proxy)
if self.checktype == 'judges':
judge_stats.record_failure(srv)
elif self.checktype == 'head':
head_target_stats.record_failure(srv)
elif self.checktype == 'irc':
irc_target_stats.record_failure(srv)
self.proxy_state.record_result(False, category='other')
except KeyboardInterrupt as e:
@@ -800,8 +814,9 @@ class TargetTestJob(object):
protos = [detected] + [p for p in protos if p != detected]
# Phase 1: SSL handshake (if ssl_first enabled or SSL-only mode)
ssl_reason = None
if config.watchd.ssl_first or self.checktype == 'none':
result = self._try_ssl_handshake(protos, pool)
result, ssl_reason = self._try_ssl_handshake(protos, pool)
if result is not None:
return result # SSL succeeded or MITM detected
# SSL failed for all protocols
@@ -811,17 +826,21 @@ class TargetTestJob(object):
_dbg('SSL failed, trying secondary check: %s' % self.checktype, ps.proxy)
# Phase 2: Secondary check (configured checktype)
return self._try_secondary_check(protos, pool)
return self._try_secondary_check(protos, pool, ssl_reason)
def _try_ssl_handshake(self, protos, pool):
"""Attempt SSL handshake to verify proxy works with TLS.
Returns:
Tuple on success/MITM, None on failure (should try secondary check)
(result, ssl_reason) where result is a tuple on success/MITM
or None on failure, and ssl_reason is the last SSL error reason
string (for secondary check SSL/plain decision).
"""
ps = self.proxy_state
ssl_target = random.choice(ssl_targets)
available_ssl = ssl_target_stats.get_available(ssl_targets) or ssl_targets
ssl_target = random.choice(available_ssl)
last_error_category = None
last_ssl_reason = None
for proto in protos:
if pool:
@@ -862,15 +881,22 @@ class TargetTestJob(object):
elapsed = time.time() - duration
if pool:
pool.record_success(torhost, elapsed)
ssl_target_stats.record_success(ssl_target)
sock.disconnect()
_dbg('SSL handshake OK', ps.proxy)
return None, proto, duration, torhost, ssl_target, 0, 1, 'ssl_ok'
return (None, proto, duration, torhost, ssl_target, 0, 1, 'ssl_ok'), None
except rocksock.RocksockException as e:
last_error_category = categorize_error(e)
et = e.get_errortype()
err = e.get_error()
# Track SSL reason for secondary check decision
if et == rocksock.RS_ET_SSL:
reason = e.get_failedproxy()
if isinstance(reason, str):
last_ssl_reason = reason
try:
sock.disconnect()
except:
@@ -883,7 +909,7 @@ class TargetTestJob(object):
if pool:
pool.record_success(torhost, elapsed)
_dbg('SSL MITM detected', ps.proxy)
return None, proto, duration, torhost, ssl_target, 0, 1, 'ssl_mitm'
return (None, proto, duration, torhost, ssl_target, 0, 1, 'ssl_mitm'), None
if config.watchd.debug:
_log('SSL handshake failed: %s://%s:%d: %s' % (
@@ -891,18 +917,32 @@ class TargetTestJob(object):
# Check for Tor connection issues
if et == rocksock.RS_ET_OWN:
if e.get_failedproxy() == 0 and err == rocksock.RS_E_TARGET_CONN_REFUSED:
fp = e.get_failedproxy()
if fp == 0 and err == rocksock.RS_E_TARGET_CONN_REFUSED:
if pool:
pool.record_failure(torhost)
elif fp == 1 and (err == rocksock.RS_E_REMOTE_DISCONNECTED or
err == rocksock.RS_E_HIT_TIMEOUT):
# Target-side failure
ssl_target_stats.record_failure(ssl_target)
elif et == rocksock.RS_ET_GAI:
# DNS failure -- target unresolvable
ssl_target_stats.record_block(ssl_target)
except KeyboardInterrupt:
raise
# All protocols failed SSL
return None
return None, last_ssl_reason
def _try_secondary_check(self, protos, pool):
"""Try the configured secondary checktype (head, judges, irc)."""
def _try_secondary_check(self, protos, pool, ssl_reason=None):
"""Try the configured secondary checktype (head, judges, irc).
ssl_reason: last SSL error reason from _try_ssl_handshake, used to
decide whether to use SSL or plain HTTP for the secondary check.
Protocol errors (proxy doesn't speak TLS) -> plain HTTP.
Other errors (cert, timeout, etc.) -> SSL without cert verification.
"""
ps = self.proxy_state
_sample_dbg('TEST START: proxy=%s target=%s check=%s' % (
ps.proxy, self.target_srv, self.checktype), ps.proxy)
@@ -914,13 +954,26 @@ class TargetTestJob(object):
else:
connect_host = srvname
# Secondary checks: always use plain HTTP
use_ssl = 0
# Decide SSL based on why the primary handshake failed:
# - protocol error (proxy can't TLS) -> plain HTTP
# - other error (cert, timeout) -> SSL without cert verification
# - no ssl_reason (ssl_first off) -> plain HTTP (no prior info)
protocol_error = is_ssl_protocol_error(ssl_reason) if ssl_reason else True
verifycert = False
if self.checktype == 'irc':
server_port = 6667
if protocol_error:
use_ssl = 0
if self.checktype == 'irc':
server_port = 6667
else:
server_port = 80
_dbg('secondary: plain (ssl protocol error)', ps.proxy)
else:
server_port = 80
use_ssl = 1
if self.checktype == 'irc':
server_port = 6697
else:
server_port = 443
_dbg('secondary: ssl/no-verify (non-protocol ssl error)', ps.proxy)
last_error_category = None
@@ -996,6 +1049,11 @@ class TargetTestJob(object):
if et == rocksock.RS_ET_OWN:
if fp == 1 and (err == rocksock.RS_E_REMOTE_DISCONNECTED or
err == rocksock.RS_E_HIT_TIMEOUT):
# Target-side failure -- proxy reached target but it's down
if self.checktype == 'head':
head_target_stats.record_failure(srvname)
elif self.checktype == 'irc':
irc_target_stats.record_failure(srvname)
break
elif fp == 0 and err == rocksock.RS_E_TARGET_CONN_REFUSED:
# Tor connection failed - record in pool
@@ -1005,6 +1063,11 @@ class TargetTestJob(object):
_log("could not connect to tor, sleep 5s", "ERROR")
time.sleep(5)
elif et == rocksock.RS_ET_GAI:
# DNS failure -- target hostname unresolvable (hard failure)
if self.checktype == 'head':
head_target_stats.record_block(connect_host)
elif self.checktype == 'irc':
irc_target_stats.record_block(srvname)
_log("could not resolve connection target %s" % connect_host, "ERROR")
break
elif et == rocksock.RS_ET_SSL and err == rocksock.RS_E_SSL_CERTIFICATE_ERROR:
@@ -1229,16 +1292,17 @@ class VerificationThread(threading.Thread):
dbs.update_worker_trust(db, worker_a, was_correct)
# Update proxy status with authoritative result
now_int = int(time.time())
if result:
db.execute('''
UPDATE proxylist SET failed = 0, tested = ?
UPDATE proxylist SET failed = 0, tested = ?, last_seen = ?
WHERE proxy = ?
''', (int(time.time()), proxy))
''', (now_int, now_int, proxy))
else:
db.execute('''
UPDATE proxylist SET failed = failed + 1, tested = ?
WHERE proxy = ?
''', (int(time.time()), proxy))
''', (now_int, proxy))
# Remove from verification queue
dbs.remove_from_verification_queue(db, proxy)
@@ -1477,7 +1541,7 @@ class Proxywatchd():
_dbg('prepare_jobs: %d rows, checktypes=%s' % (len(rows), config.watchd.checktypes))
checktypes = config.watchd.checktypes
# Build target pools for each checktype
# Build target pools for each checktype (filter out targets in cooldown)
target_pools = {}
for ct in checktypes:
if ct == 'none':
@@ -1485,19 +1549,23 @@ class Proxywatchd():
target_pools[ct] = ssl_targets
_dbg('target_pool[none]: SSL-only mode, %d ssl targets' % len(ssl_targets))
elif ct == 'irc':
target_pools[ct] = config.servers
_dbg('target_pool[irc]: %d servers' % len(config.servers))
all_servers = config.servers
available = irc_target_stats.get_available(all_servers)
target_pools[ct] = available if available else all_servers
_dbg('target_pool[irc]: %d/%d servers available' % (len(target_pools[ct]), len(all_servers)))
elif ct == 'judges':
# Filter out judges in cooldown (blocked/rate-limited)
all_judges = list(judges.keys())
available = judge_stats.get_available_judges(all_judges)
available = judge_stats.get_available(all_judges)
target_pools[ct] = available if available else all_judges
elif ct == 'ssl':
target_pools[ct] = ssl_targets
_dbg('target_pool[ssl]: %d targets' % len(ssl_targets))
available = ssl_target_stats.get_available(ssl_targets)
target_pools[ct] = available if available else ssl_targets
_dbg('target_pool[ssl]: %d/%d targets available' % (len(target_pools[ct]), len(ssl_targets)))
else: # head
target_pools[ct] = list(regexes.keys())
_dbg('target_pool[%s]: %d targets' % (ct, len(regexes)))
all_targets = list(regexes.keys())
available = head_target_stats.get_available(all_targets)
target_pools[ct] = available if available else all_targets
_dbg('target_pool[%s]: %d/%d targets available' % (ct, len(target_pools[ct]), len(all_targets)))
# create all jobs first, then shuffle for interleaving
all_jobs = []
@@ -1617,7 +1685,7 @@ class Proxywatchd():
args.append((effective_failcount, job.checktime, 1, job.country, job.proto,
job.success_count, job.total_duration, job.mitm,
job.consecutive_success, job.asn, job.protos_working,
job.last_check, job.last_target, job.proxy))
job.last_check, job.last_target, effective_failcount, job.proxy))
success_rate = (float(sc) / len(self.collected)) * 100
ret = True
@@ -1632,7 +1700,7 @@ class Proxywatchd():
args.append((job.failcount, job.checktime, 1, job.country, job.proto,
job.success_count, job.total_duration, job.mitm,
job.consecutive_success, job.asn, job.protos_working,
job.last_check, job.last_target, job.proxy))
job.last_check, job.last_target, job.failcount, job.proxy))
if job.last_latency_ms is not None:
latency_updates.append((job.proxy, job.last_latency_ms))
ret = False
@@ -1648,9 +1716,20 @@ class Proxywatchd():
for job in self.collected
if job.failcount == 0 and job.exit_ip]
# Separate dead proxies for deletion
dead_proxies = [a[-1] for a in args if a[0] == DEAD_PROXY or a[0] >= max_fail]
live_args = [a for a in args if a[0] != DEAD_PROXY and a[0] < max_fail]
with self._db_context() as db:
query = 'UPDATE proxylist SET failed=?,tested=?,dronebl=?,country=?,proto=?,success_count=?,total_duration=?,mitm=?,consecutive_success=?,asn=?,protos_working=?,last_check=?,last_target=? WHERE proxy=?'
db.executemany(query, args)
query = 'UPDATE proxylist SET failed=?,tested=?,dronebl=?,country=?,proto=?,success_count=?,total_duration=?,mitm=?,consecutive_success=?,asn=?,protos_working=?,last_check=?,last_target=?,last_seen=CASE WHEN ?=0 THEN strftime("%s","now") ELSE last_seen END WHERE proxy=?'
if live_args:
db.executemany(query, live_args)
# Delete proxies that reached max_fail
if dead_proxies:
db.executemany('DELETE FROM proxylist WHERE proxy=?',
[(p,) for p in dead_proxies])
_log('deleted %d dead proxies' % len(dead_proxies), 'watchd')
# Batch update latency metrics for successful proxies
if latency_updates:
@@ -1812,15 +1891,25 @@ class Proxywatchd():
# Judge stats (when using judges checktype)
if 'judges' in config.watchd.checktypes:
js = judge_stats.get_stats()
# Remap 'target' -> 'judge' for dashboard compatibility
top = [dict(j, judge=j['target']) for j in js.get('top', [])[:5]]
stats_data['judges'] = {
'total': js.get('total', 0),
'available': js.get('available', 0),
'in_cooldown': js.get('in_cooldown', 0),
'top_judges': js.get('top', [])[:5] # top 5 most successful
'top_judges': top,
}
else:
stats_data['judges'] = None
# Target health stats (all target pools)
stats_data['target_health'] = {
'head': head_target_stats.get_stats(),
'ssl': ssl_target_stats.get_stats(),
'irc': irc_target_stats.get_stats(),
'judges': judge_stats.get_stats(),
}
# Scraper/engine stats
if scraper_available:
scraper_stats = scraper_module.get_scraper_stats()

View File

@@ -242,6 +242,7 @@ class Rocksock():
target = RocksockProxy(host, port, RS_PT_NONE)
self.proxychain.append(target)
self.sock = None
self._connected = False
self.timeout = timeout
def _translate_socket_error(self, e, pnum):
@@ -302,15 +303,18 @@ class Rocksock():
select.select([], [self.sock], [])
"""
self._connected = True
def disconnect(self):
if self.sock is None: return
try:
self.sock.shutdown(socket.SHUT_RDWR)
except socket.error:
pass
if self._connected:
try:
self.sock.shutdown(socket.SHUT_RDWR)
except socket.error:
pass
self.sock.close()
self.sock = None
self._connected = False
def canread(self):
return select.select([self.sock], [], [], 0)[0]

View File

@@ -14,60 +14,64 @@ def try_div(a, b):
return 0
class JudgeStats():
"""Track per-judge success/failure rates for reliability scoring.
class TargetStats():
"""Track per-target success/failure rates with cooldown.
Judges that frequently block or rate-limit are temporarily avoided.
Stats decay over time to allow recovery.
Targets that frequently block or fail are temporarily avoided.
Block counters reset on success or cooldown expiry.
Used for all target pools: judges, head targets, SSL targets, IRC servers.
"""
def __init__(self, cooldown_seconds=300, block_threshold=3):
self.lock = threading.Lock()
self.stats = {} # judge -> {'success': n, 'fail': n, 'block': n, 'last_block': timestamp}
self.cooldown_seconds = cooldown_seconds # seconds to avoid blocked judges
self.block_threshold = block_threshold # consecutive blocks before cooldown
self.stats = {} # target -> {'success': n, 'fail': n, 'block': n, 'last_block': timestamp}
self.cooldown_seconds = cooldown_seconds
self.block_threshold = block_threshold
def record_success(self, judge):
"""Record successful judge response."""
with self.lock:
if judge not in self.stats:
self.stats[judge] = {'success': 0, 'fail': 0, 'block': 0, 'last_block': 0}
self.stats[judge]['success'] += 1
# Reset block count on success
self.stats[judge]['block'] = 0
def _ensure(self, target):
if target not in self.stats:
self.stats[target] = {'success': 0, 'fail': 0, 'block': 0, 'last_block': 0}
def record_failure(self, judge):
"""Record judge failure (proxy failed, not judge block)."""
def record_success(self, target):
"""Record successful target response."""
with self.lock:
if judge not in self.stats:
self.stats[judge] = {'success': 0, 'fail': 0, 'block': 0, 'last_block': 0}
self.stats[judge]['fail'] += 1
self._ensure(target)
self.stats[target]['success'] += 1
self.stats[target]['block'] = 0
def record_block(self, judge):
"""Record judge blocking the proxy (403, captcha, rate-limit)."""
def record_failure(self, target):
"""Record target failure (soft -- doesn't trigger cooldown)."""
with self.lock:
if judge not in self.stats:
self.stats[judge] = {'success': 0, 'fail': 0, 'block': 0, 'last_block': 0}
self.stats[judge]['block'] += 1
self.stats[judge]['last_block'] = time.time()
self._ensure(target)
self.stats[target]['fail'] += 1
def is_available(self, judge):
"""Check if judge is available (not in cooldown)."""
def record_block(self, target):
"""Record target block (403, captcha, DNS failure, rate-limit)."""
with self.lock:
if judge not in self.stats:
self._ensure(target)
self.stats[target]['block'] += 1
self.stats[target]['last_block'] = time.time()
def is_available(self, target):
"""Check if target is available (not in cooldown)."""
with self.lock:
if target not in self.stats:
return True
s = self.stats[judge]
# Check if in cooldown period
s = self.stats[target]
if s['block'] >= self.block_threshold:
if (time.time() - s['last_block']) < self.cooldown_seconds:
return False
# Cooldown expired, reset block count
s['block'] = 0
return True
def get_available(self, target_list):
"""Return targets not in cooldown."""
return [t for t in target_list if self.is_available(t)]
def get_available_judges(self, judge_list):
"""Return list of judges not in cooldown."""
return [j for j in judge_list if self.is_available(j)]
"""Compat alias for get_available()."""
return self.get_available(judge_list)
def status_line(self):
"""Return status summary for logging."""
@@ -76,7 +80,7 @@ class JudgeStats():
blocked = sum(1 for s in self.stats.values()
if s['block'] >= self.block_threshold and
(time.time() - s['last_block']) < self.cooldown_seconds)
return 'judges: %d total, %d in cooldown' % (total, blocked)
return '%d total, %d in cooldown' % (total, blocked)
def get_stats(self):
"""Return statistics dict for API/dashboard."""
@@ -87,18 +91,21 @@ class JudgeStats():
if s['block'] >= self.block_threshold and
(now - s['last_block']) < self.cooldown_seconds)
available = total - in_cooldown
# Get top judges by success count
top = []
for judge, s in self.stats.items():
for target, s in self.stats.items():
total_tests = s['success'] + s['fail']
if total_tests > 0:
success_pct = (s['success'] * 100.0) / total_tests
top.append({'judge': judge, 'success': s['success'],
top.append({'target': target, 'success': s['success'],
'tests': total_tests, 'rate': round(success_pct, 1)})
top.sort(key=lambda x: x['success'], reverse=True)
return {'total': total, 'available': available, 'in_cooldown': in_cooldown, 'top': top}
# Backwards-compatible alias
JudgeStats = TargetStats
# HTTP targets - check for specific headers
regexes = {
'www.facebook.com': 'X-FB-Debug',

View File

@@ -359,6 +359,198 @@ class TestExtractAuthProxies:
assert fetch.extract_auth_proxies('just some text') == []
class TestExtractAuthProxiesShortCircuit:
"""Tests for extract_auth_proxies() short-circuit on missing @."""
def test_no_at_sign_returns_empty(self):
"""Content without @ skips regex entirely."""
content = '1.2.3.4:8080 socks5://5.6.7.8:1080 plain text'
assert fetch.extract_auth_proxies(content) == []
def test_at_sign_still_extracts(self):
"""Content with @ still finds auth proxies."""
content = 'user:pass@1.2.3.4:8080'
result = fetch.extract_auth_proxies(content)
assert len(result) == 1
assert result[0][0] == 'user:pass@1.2.3.4:8080'
def test_at_sign_no_match_returns_empty(self):
"""Content with @ but no auth proxy pattern returns empty."""
content = 'email@example.com has no proxy'
assert fetch.extract_auth_proxies(content) == []
class TestExtractProxiesFromTable:
"""Tests for extract_proxies_from_table() with precompiled regexes."""
def test_no_table_returns_empty(self):
"""Plain text without <table> returns empty."""
content = '1.2.3.4:8080\n5.6.7.8:3128\n'
assert fetch.extract_proxies_from_table(content) == []
def test_simple_table(self):
"""Basic HTML table with IP/Port columns is parsed."""
content = '''
<table>
<tr><th>IP</th><th>Port</th><th>Type</th></tr>
<tr><td>1.2.3.4</td><td>8080</td><td>HTTP</td></tr>
<tr><td>5.6.7.8</td><td>1080</td><td>SOCKS5</td></tr>
</table>
'''
result = fetch.extract_proxies_from_table(content)
assert len(result) == 2
addrs = [r[0] for r in result]
assert '1.2.3.4:8080' in addrs
assert '5.6.7.8:1080' in addrs
def test_uppercase_table_tag(self):
"""<TABLE> (uppercase) is also detected."""
content = '''
<TABLE>
<TR><TH>IP</TH><TH>Port</TH></TR>
<TR><TD>1.2.3.4</TD><TD>8080</TD></TR>
</TABLE>
'''
result = fetch.extract_proxies_from_table(content)
assert len(result) == 1
def test_empty_table(self):
"""Table with headers but no data rows returns empty."""
content = '''
<table>
<tr><th>IP</th><th>Port</th></tr>
</table>
'''
result = fetch.extract_proxies_from_table(content)
assert result == []
class TestExtractProxiesFromJson:
"""Tests for extract_proxies_from_json() short-circuit."""
def test_no_braces_returns_empty(self):
"""Content without { or [ skips JSON parsing."""
content = '1.2.3.4:8080\n5.6.7.8:3128\n'
assert fetch.extract_proxies_from_json(content) == []
def test_json_array_of_objects(self):
"""JSON array with ip/port objects is parsed."""
content = '[{"ip": "1.2.3.4", "port": 8080}]'
result = fetch.extract_proxies_from_json(content)
assert len(result) >= 1
addrs = [r[0] for r in result]
assert '1.2.3.4:8080' in addrs
def test_json_array_of_strings(self):
"""JSON array of ip:port strings is parsed."""
content = '["1.2.3.4:8080", "5.6.7.8:3128"]'
result = fetch.extract_proxies_from_json(content)
addrs = [r[0] for r in result]
assert '1.2.3.4:8080' in addrs
assert '5.6.7.8:3128' in addrs
def test_plain_html_skips_json(self):
"""HTML without JSON delimiters returns empty."""
content = '<html><body>1.2.3.4:8080</body></html>'
# HTML has < and > but this function checks for { and [
# The < > chars won't trigger JSON parsing
result = fetch.extract_proxies_from_json(content)
# May or may not find anything depending on HTML structure
# but should not crash
assert isinstance(result, list)
class TestExtractProxiesWithHints:
"""Tests for extract_proxies_with_hints()."""
def test_proto_before_ip(self):
"""Protocol keyword before IP:PORT is detected."""
content = 'socks5 1.2.3.4:8080'
result = fetch.extract_proxies_with_hints(content)
assert '1.2.3.4:8080' in result
assert result['1.2.3.4:8080'] == 'socks5'
def test_proto_after_ip(self):
"""Protocol keyword after IP:PORT is detected."""
content = '1.2.3.4:8080 socks5'
result = fetch.extract_proxies_with_hints(content)
assert '1.2.3.4:8080' in result
def test_no_hints_returns_empty(self):
"""Plain IP:PORT without protocol hints returns empty."""
content = '1.2.3.4:8080'
result = fetch.extract_proxies_with_hints(content)
assert result == {}
class TestExtractProxiesIntegration:
"""Integration tests for extract_proxies() combining all extractors."""
def test_plain_text_proxy_list(self):
"""Plain text IP:PORT list extracts correctly."""
content = '1.2.3.4:8080\n5.6.7.8:3128\n9.10.11.12:1080\n'
result = fetch.extract_proxies(content, filter_known=False)
addrs = [r[0] for r in result]
assert '1.2.3.4:8080' in addrs
assert '5.6.7.8:3128' in addrs
assert '9.10.11.12:1080' in addrs
def test_auth_proxies_extracted(self):
"""Auth proxies found in mixed content."""
content = 'user:pass@1.2.3.4:8080\n5.6.7.8:3128\n'
result = fetch.extract_proxies(content, filter_known=False)
addrs = [r[0] for r in result]
assert 'user:pass@1.2.3.4:8080' in addrs
assert '5.6.7.8:3128' in addrs
def test_html_table_extraction(self):
"""Proxies extracted from HTML table."""
content = '''
<table>
<tr><th>IP</th><th>Port</th></tr>
<tr><td>1.2.3.4</td><td>8080</td></tr>
</table>
'''
result = fetch.extract_proxies(content, filter_known=False)
addrs = [r[0] for r in result]
assert '1.2.3.4:8080' in addrs
def test_json_extraction(self):
"""Proxies extracted from JSON content."""
content = '[{"ip": "1.2.3.4", "port": 8080}]'
result = fetch.extract_proxies(content, filter_known=False)
addrs = [r[0] for r in result]
assert '1.2.3.4:8080' in addrs
def test_empty_content(self):
"""Empty content returns no proxies."""
result = fetch.extract_proxies('', filter_known=False)
assert result == []
def test_private_ips_filtered(self):
"""Private IPs are not returned."""
content = '10.0.0.1:8080\n192.168.1.1:3128\n1.2.3.4:8080\n'
result = fetch.extract_proxies(content, filter_known=False)
addrs = [r[0] for r in result]
assert '10.0.0.1:8080' not in addrs
assert '192.168.1.1:3128' not in addrs
assert '1.2.3.4:8080' in addrs
def test_proto_from_hints(self):
"""Protocol hints are picked up."""
content = 'socks5 1.2.3.4:8080\n'
result = fetch.extract_proxies(content, filter_known=False)
protos = {r[0]: r[1] for r in result}
assert protos.get('1.2.3.4:8080') == 'socks5'
def test_proto_from_arg(self):
"""Fallback proto from argument is used."""
content = '1.2.3.4:8080\n'
result = fetch.extract_proxies(content, filter_known=False, proto='socks4')
protos = {r[0]: r[1] for r in result}
assert protos.get('1.2.3.4:8080') == 'socks4'
class TestConfidenceScoring:
"""Tests for confidence score constants."""

View File

@@ -50,7 +50,7 @@ run_sql() {
local db="$1" sql="$2"
ansible_cmd "$MASTER" -m raw -a \
"sudo -u podman sqlite3 '$db' \"$sql\"" 2>/dev/null \
| grep -vE '^\s*$|^odin|Shared connection|CHANGED|SUCCESS'
| sed 's/Shared connection.*//; /^\s*$/d; /^odin/d; /CHANGED/d; /SUCCESS/d'
}
# ---------------------------------------------------------------------------

246
tools/ppf-status Executable file
View File

@@ -0,0 +1,246 @@
#!/bin/bash
# ppf-status -- PPF cluster overview
#
# Usage:
# ppf-status [options]
set -eu
# Resolve to real path (handles symlinks from ~/.local/bin/)
SCRIPT_PATH="$(cd "$(dirname "$0")" && pwd)/$(basename "$0")"
SCRIPT_DIR="$(dirname "$(readlink -f "$SCRIPT_PATH")")"
# shellcheck disable=SC1091
. "$SCRIPT_DIR/lib/ppf-common.sh"
ODIN_URL="http://127.0.0.1:8081"
PROXY_DB="/home/podman/ppf/data/proxies.sqlite"
URL_DB="/home/podman/ppf/data/websites.sqlite"
# ---------------------------------------------------------------------------
# Usage
# ---------------------------------------------------------------------------
usage() {
cat <<EOF
Usage: ppf-status [options]
Show PPF cluster overview.
Options:
--json raw JSON from API
--help show this help
--version show version
Displays:
- Container health per node
- Worker stats (tested, working, rate, active)
- Odin manager stats (verification, queue)
- Database counts (proxies, URLs)
EOF
exit 0
}
# ---------------------------------------------------------------------------
# Parse args
# ---------------------------------------------------------------------------
RAW_JSON=0
while [ $# -gt 0 ]; do
case "$1" in
--help|-h) usage ;;
--version|-V) echo "ppf-status $PPF_TOOLS_VERSION"; exit 0 ;;
--json) RAW_JSON=1 ;;
-*) die "Unknown option: $1" ;;
*) die "Unknown argument: $1" ;;
esac
shift
done
# ---------------------------------------------------------------------------
# Fetch API data from odin (run on odin via curl to localhost)
# ---------------------------------------------------------------------------
api_json=$(ansible_cmd "$MASTER" -m raw -a \
"curl -sf --max-time 5 ${ODIN_URL}/api/workers 2>/dev/null || echo '{}'" \
2>/dev/null | sed 's/Shared connection.*closed\.\?//; /^\s*$/d; /^odin/d; /CHANGED/d; /SUCCESS/d')
if [ "$RAW_JSON" -eq 1 ]; then
echo "$api_json"
exit 0
fi
# Check if we got valid data
if ! echo "$api_json" | python3 -c "import sys,json; json.load(sys.stdin)" 2>/dev/null; then
die "Failed to fetch API data from odin"
fi
# ---------------------------------------------------------------------------
# Container health
# ---------------------------------------------------------------------------
section "Containers"
for host in $ALL_HOSTS; do
output=$(compose_cmd "$host" "ps" 2>/dev/null) || true
if echo "$output" | grep -qi "up\|running"; then
log_ok "$host"
elif echo "$output" | grep -qi "exit"; then
log_err "$host (exited)"
else
log_warn "$host (unknown)"
fi
done
# ---------------------------------------------------------------------------
# Database summary (quick counts from odin)
# ---------------------------------------------------------------------------
section "Database"
proxy_count=$(ansible_cmd "$MASTER" -m raw -a \
"sudo -u podman sqlite3 '$PROXY_DB' 'SELECT COUNT(*) FROM proxylist;'" 2>/dev/null \
| sed 's/Shared connection.*//; /^\s*$/d; /^odin/d; /CHANGED/d; /SUCCESS/d' || echo '?')
working_count=$(ansible_cmd "$MASTER" -m raw -a \
"sudo -u podman sqlite3 '$PROXY_DB' 'SELECT COUNT(*) FROM proxylist WHERE failed=0 AND proto IS NOT NULL;'" 2>/dev/null \
| sed 's/Shared connection.*//; /^\s*$/d; /^odin/d; /CHANGED/d; /SUCCESS/d' || echo '?')
url_count=$(ansible_cmd "$MASTER" -m raw -a \
"sudo -u podman sqlite3 '$URL_DB' 'SELECT COUNT(*) FROM uris;'" 2>/dev/null \
| sed 's/Shared connection.*//; /^\s*$/d; /^odin/d; /CHANGED/d; /SUCCESS/d' || echo '?')
log_info "Proxies: ${proxy_count} total, ${working_count} working"
log_info "URLs: ${url_count}"
# ---------------------------------------------------------------------------
# Parse and display via Python for clean formatting
# ---------------------------------------------------------------------------
echo "$api_json" | python3 -c "
import sys, json
NO_COLOR = __import__('os').environ.get('NO_COLOR', '')
# Colors
if not NO_COLOR and sys.stdout.isatty():
RST = '\033[0m'
DIM = '\033[2m'
BOLD = '\033[1m'
RED = '\033[38;5;167m'
GREEN = '\033[38;5;114m'
YELLOW = '\033[38;5;180m'
BLUE = '\033[38;5;110m'
CYAN = '\033[38;5;116m'
else:
RST = DIM = BOLD = RED = GREEN = YELLOW = BLUE = CYAN = ''
def ok(s): return GREEN + s + RST
def err(s): return RED + s + RST
def warn(s): return YELLOW + s + RST
def dim(s): return DIM + s + RST
def bold(s): return BOLD + CYAN + s + RST
try:
data = json.load(sys.stdin)
except:
sys.exit(0)
workers = data.get('workers', [])
summary = data.get('summary', {})
queue = data.get('queue', {})
manager = data.get('manager', {})
# Workers table
print()
print(bold(' Workers'))
if workers:
# Header
print(dim(' %-12s %7s %9s %9s %7s %6s %s' % (
'NAME', 'TESTED', 'WORKING', 'FAILED', 'RATE', 'ACT', 'STATUS')))
for w in sorted(workers, key=lambda x: x.get('name', '')):
name = w.get('name', w.get('ip', '?'))
tested = w.get('proxies_tested', 0)
working = w.get('proxies_working', 0)
failed = w.get('proxies_failed', 0)
rate = w.get('success_rate', 0)
active = w.get('active', False)
threads = w.get('threads', 0)
# Format numbers compactly
def fmt(n):
if n >= 1000000: return '%.1fM' % (n / 1000000)
if n >= 1000: return '%.1fk' % (n / 1000)
return str(n)
act_str = ok('yes') if active else err('no')
if rate >= 30:
rate_str = ok('%.1f%%' % rate)
elif rate >= 10:
rate_str = warn('%.1f%%' % rate)
else:
rate_str = err('%.1f%%' % rate)
age = w.get('age', 0)
if age > 300 and not active:
status = err('stale (%dm)' % (age // 60))
elif active:
status = ok('testing')
else:
status = dim('idle')
print(' %-12s %7s %9s %9s %7s %6s %s' % (
name, fmt(tested), fmt(working), fmt(failed),
rate_str, act_str, status))
# Summary line
total_t = summary.get('total_tested', 0)
total_w = summary.get('total_working', 0)
total_f = summary.get('total_failed', 0)
overall = summary.get('overall_success_rate', 0)
active_count = data.get('active', 0)
total_count = data.get('total', 0)
print(dim(' %-12s %7s %9s %9s %7s %6s' % (
'TOTAL',
fmt(total_t) if total_t else '-',
fmt(total_w) if total_w else '-',
fmt(total_f) if total_f else '-',
'%.1f%%' % overall,
'%d/%d' % (active_count, total_count))))
else:
print(err(' no workers connected'))
# Manager (odin verification)
if manager:
print()
print(bold(' Odin Verification'))
m_rate = manager.get('success_rate', 0)
m_tested = manager.get('tested', 0)
m_passed = manager.get('passed', 0)
m_threads = manager.get('threads', 0)
m_speed = manager.get('rate', 0)
m_queue = manager.get('queue_size', 0)
m_uptime = manager.get('uptime', 0)
def fmt_time(s):
if s >= 3600: return '%dh%dm' % (s // 3600, (s % 3600) // 60)
if s >= 60: return '%dm%ds' % (s // 60, s % 60)
return '%ds' % s
if m_rate >= 30:
rate_str = ok('%.1f%%' % m_rate)
elif m_rate >= 10:
rate_str = warn('%.1f%%' % m_rate)
else:
rate_str = err('%.1f%%' % m_rate)
print(' threads: %d rate: %.2f/s uptime: %s' % (m_threads, m_speed, fmt_time(m_uptime)))
print(' tested: %s passed: %s success: %s' % (fmt(m_tested), fmt(m_passed), rate_str))
print(' queue: %d jobs' % m_queue)
# Queue
if queue:
print()
print(bold(' Proxy Queue'))
print(' total: %d due: %d pending: %d claimed: %d' % (
queue.get('total', 0), queue.get('due', 0),
queue.get('pending', 0), queue.get('claimed', 0)))
sess_tested = queue.get('session_tested', 0)
sess_pct = queue.get('session_pct', 0)
if sess_tested:
print(' session: %s tested (%.1f%%)' % (fmt(sess_tested), sess_pct))
print()
"