Compare commits

..

31 Commits

Author SHA1 Message Date
Username
c091216afc docs: add ppf-db to README operations toolkit
Some checks failed
CI / syntax-check (push) Failing after 1s
CI / memory-leak-check (push) Successful in 16s
2026-02-18 00:29:28 +01:00
Username
4cefdf976c docs: update CLAUDE.md for ppf-db and corrected odin role 2026-02-18 00:29:25 +01:00
Username
98c2e74412 ppf: skip URL cycling when ppf.threads = 0 2026-02-18 00:28:37 +01:00
Username
24d6f345f6 tools: add ppf-db for database operations 2026-02-18 00:28:27 +01:00
Username
1ca096c78a ppf-service: use down+up for restart to pick up code changes 2026-02-18 00:22:55 +01:00
Username
15a7f0bb6a ppf-common: fix compose_cmd to run as podman user 2026-02-18 00:22:52 +01:00
Username
b6045bd05c tools: use down+up in deploy handler to pick up code changes 2026-02-18 00:22:48 +01:00
Username
d7b004f0ac httpd: include protocol in /proxies plain text format 2026-02-18 00:18:58 +01:00
Username
00952b7947 fix: call evaluate() in worker mode before checking results
failcount was initialized to 0 and never updated because
evaluate() was skipped, causing all proxies to pass.
2026-02-18 00:16:35 +01:00
Username
6800995361 docs: reflect podman-compose on all nodes
Remove stale systemd unit and standalone podman run references.
All nodes now managed exclusively via compose.
2026-02-17 23:44:35 +01:00
Username
7a271896a8 ppf-common: fix ad-hoc ansible for toolkit inventory
Add --become to ansible_cmd (needed when connecting as
ansible user). Add cd /tmp to podman_cmd so sudo -u podman
doesn't fail on inaccessible /home/ansible cwd.
2026-02-17 23:38:13 +01:00
Username
8779979780 tools: use compose up -d for ppf-service restart 2026-02-17 23:23:10 +01:00
Username
195d25c653 tools: use compose up -d instead of restart in handler
compose restart reuses the existing container config; up -d
recreates from compose.yml, picking up changes like renamed
CLI flags.
2026-02-17 23:22:33 +01:00
Username
9b8be9d302 tools: use toolkit inventory for all ansible commands
Route ansible_cmd through ppf inventory instead of /opt/ansible
default. Eliminates dynamic inventory warnings and connects
via WireGuard IPs.
2026-02-17 23:22:29 +01:00
Username
9eff4496d6 docs: update README and ROADMAP for playbook deployment 2026-02-17 23:19:59 +01:00
Username
b1de91a969 docs: update CLAUDE.md for playbook-based deployment
Document WireGuard connectivity, playbook architecture, --check
flag, parallel execution, and updated ad-hoc ansible commands
using toolkit inventory.
2026-02-17 23:19:54 +01:00
Username
df2078c7f7 tools: fix symlink resolution in ppf-logs and ppf-service 2026-02-17 23:18:50 +01:00
Username
782deab95d tools: rewrite ppf-deploy as playbook wrapper
Replace sequential ansible ad-hoc calls with ansible-playbook.
Add ansible_playbook_cmd to shared library. Supports --check
for dry runs.
2026-02-17 23:18:46 +01:00
Username
8208670fc1 tools: add ansible deploy playbook
Parallel execution across hosts, handler-based restart on change,
role-aware paths via group_vars. Connects over WireGuard with
dedicated inventory and SSH key.
2026-02-17 23:18:41 +01:00
Username
d902ecafff docs: add tools to ROADMAP.md file reference 2026-02-17 22:53:01 +01:00
Username
fdb761f9f1 docs: add operations toolkit to README.md 2026-02-17 22:52:58 +01:00
Username
12f6b1d8eb docs: update CLAUDE.md for operations toolkit
Replace verbose ansible deployment commands with ppf-deploy,
ppf-logs, and ppf-service references. Keep raw ansible only
for ad-hoc config operations not covered by tools.
2026-02-17 22:52:54 +01:00
Username
1f14173595 tools: add ppf-service
Status, start, stop, restart for PPF containers. Status includes
compose ps, master health check, and worker API query.
2026-02-17 22:50:42 +01:00
Username
2128814a41 tools: add ppf-logs
View container logs with -f follow and -n line count. Resolves
dynamic UID and container name per node role.
2026-02-17 22:50:38 +01:00
Username
7f59cae05c tools: add ppf-deploy
Validates syntax, rsyncs code, copies compose files, fixes ownership,
restarts containers. Supports --no-restart and per-host targeting.
2026-02-17 22:50:34 +01:00
Username
9b7ca20728 tools: add shared library ppf-common.sh
Host topology, ansible/podman/compose wrappers, color helpers,
syntax validation, and target resolution for the PPF ops toolkit.
2026-02-17 22:50:30 +01:00
Username
82c909d7c0 rename --worker-v2 to --worker
No V1 means no need for the suffix. Update flag, function name,
compose command, log messages, and docs.
2026-02-17 22:30:09 +01:00
Username
cb52a978e9 todo: mark V1 worker deprecation complete 2026-02-17 22:13:49 +01:00
Username
224d3642f9 config: remove V1 worker options
Drop --worker flag, batch_size, and claim_timeout. V2 is the
only worker protocol; --worker-v2 and --register remain.
2026-02-17 22:13:31 +01:00
Username
d184dc2926 httpd: remove V1 work distribution and result submission
Drop _work_claims tracking, claim_work(), submit_results(),
get_due_proxy_count(), calculate_fair_batch_size(), and the
/api/work + /api/results endpoint handlers.
2026-02-17 22:12:57 +01:00
Username
2782e6d754 ppf: remove V1 worker functions and main loop
Drop worker_get_work(), worker_submit_results(), and the entire
worker_main() V1 loop. Rewire --register to use worker_v2_main().
2026-02-17 22:10:38 +01:00
18 changed files with 953 additions and 842 deletions

217
CLAUDE.md
View File

@@ -6,7 +6,7 @@
┌──────────┬─────────────┬────────────────────────────────────────────────────────┐ ┌──────────┬─────────────┬────────────────────────────────────────────────────────┐
│ Host │ Role │ Notes │ Host │ Role │ Notes
├──────────┼─────────────┼────────────────────────────────────────────────────────┤ ├──────────┼─────────────┼────────────────────────────────────────────────────────┤
│ odin │ Master │ Scrapes proxy lists, verifies conflicts, port 8081 │ odin │ Master │ API server, serves URLs to workers, port 8081
│ cassius │ Worker │ Tests proxies, reports to master via WireGuard │ cassius │ Worker │ Tests proxies, reports to master via WireGuard
│ edge │ Worker │ Tests proxies, reports to master via WireGuard │ edge │ Worker │ Tests proxies, reports to master via WireGuard
│ sentinel │ Worker │ Tests proxies, reports to master via WireGuard │ sentinel │ Worker │ Tests proxies, reports to master via WireGuard
@@ -15,8 +15,8 @@
### Role Separation ### Role Separation
- **Odin (Master)**: Scrapes proxy sources, does verification tests only. No routine testing. Local Tor only. - **Odin (Master)**: API server only. No proxy testing, no URL cycling. Serves `/api/claim-urls` to workers, receives results. Local Tor only.
- **Workers**: All routine proxy testing. Each uses only local Tor (127.0.0.1:9050). - **Workers**: All URL fetching (via `/api/claim-urls`) and proxy testing. Each uses only local Tor (127.0.0.1:9050).
## CRITICAL: Directory Structure Differences ## CRITICAL: Directory Structure Differences
@@ -25,95 +25,103 @@
│ Host │ Code Location │ Container Mount │ Host │ Code Location │ Container Mount
├──────────┼─────────────────────────┼──────────────────────────────────────────┤ ├──────────┼─────────────────────────┼──────────────────────────────────────────┤
│ odin │ /home/podman/ppf/*.py │ Mounts ppf/ directly to /app │ odin │ /home/podman/ppf/*.py │ Mounts ppf/ directly to /app
│ workers │ /home/podman/ppf/src/ │ Mounts ppf/src/ to /app (via systemd) │ workers │ /home/podman/ppf/src/ │ Mounts ppf/src/ to /app (via compose)
└──────────┴─────────────────────────┴──────────────────────────────────────────┘ └──────────┴─────────────────────────┴──────────────────────────────────────────┘
``` ```
**ODIN uses root ppf/ directory. WORKERS use ppf/src/ subdirectory.** **ODIN uses root ppf/ directory. WORKERS use ppf/src/ subdirectory.**
## Host Access ## Operations Toolkit
**ALWAYS use Ansible from `/opt/ansible` with venv activated:** All deployment and service management is handled by `tools/`:
```
tools/
lib/ppf-common.sh shared library (hosts, wrappers, colors)
ppf-deploy deploy wrapper (local validation + playbook)
ppf-logs view container logs
ppf-service manage containers (status/start/stop/restart)
ppf-db database operations (stats/purge-proxies/vacuum)
playbooks/
deploy.yml ansible playbook (sync, compose, restart)
inventory.ini hosts with WireGuard IPs + SSH key
group_vars/
all.yml shared vars (ppf_base, ppf_owner)
master.yml odin paths + compose file
workers.yml worker paths + compose file
```
Symlinked to `~/.local/bin/` for direct use.
### Connectivity
All tools connect over WireGuard (`10.200.1.0/24`) as user `ansible`
with the SSH key at `/opt/ansible/secrets/ssh/ansible`.
### Deployment
`ppf-deploy` validates syntax locally, then runs the Ansible playbook.
Hosts execute in parallel; containers restart only when files change.
```bash
ppf-deploy # all nodes: validate, sync, restart
ppf-deploy odin # master only
ppf-deploy workers # cassius, edge, sentinel
ppf-deploy cassius edge # specific hosts
ppf-deploy --no-restart # sync only, skip restart
ppf-deploy --check # dry run (ansible --check --diff)
ppf-deploy -v # verbose ansible output
```
Playbook steps (per host, in parallel):
1. Rsync `*.py` + `servers.txt` (role-aware destination via group_vars)
2. Copy compose file per role (`compose.master.yml` / `compose.worker.yml`)
3. Fix ownership (`podman:podman`, recursive)
4. Restart containers via handler (only if files changed)
5. Show container status
### Container Logs
```bash
ppf-logs # last 40 lines from odin
ppf-logs cassius # specific worker
ppf-logs -f edge # follow mode
ppf-logs -n 100 sentinel # last N lines
```
### Service Management
```bash
ppf-service status # all nodes: compose ps + health
ppf-service status workers # workers only
ppf-service restart odin # restart master
ppf-service stop cassius # stop specific worker
ppf-service start workers # start all workers
```
### Database Management
```bash
ppf-db stats # proxy and URL counts
ppf-db purge-proxies # stop odin, delete all proxies, restart
ppf-db vacuum # reclaim disk space
```
### Direct Ansible (for operations not covered by tools)
Use the toolkit inventory for ad-hoc commands over WireGuard:
```bash ```bash
cd /opt/ansible && source venv/bin/activate cd /opt/ansible && source venv/bin/activate
``` INV=/home/user/git/ppf/tools/playbooks/inventory.ini
### Quick Reference Commands
```bash
# Check worker status
ANSIBLE_REMOTE_TMP=/tmp/.ansible ansible cassius,edge,sentinel -m shell -a "hostname"
# Check worker config # Check worker config
ANSIBLE_REMOTE_TMP=/tmp/.ansible ansible cassius,edge,sentinel -m shell -a "grep -E 'threads|timeout|ssl' /home/podman/ppf/config.ini" ansible -i $INV workers -m shell \
-a "grep -E 'threads|timeout|ssl' /home/podman/ppf/config.ini"
# Check worker logs (dynamic UID)
ANSIBLE_REMOTE_TMP=/tmp/.ansible ansible cassius -m raw \
-a "uid=\$(id -u podman) && sudo -u podman podman logs --tail 20 ppf-worker"
# Modify config option # Modify config option
ANSIBLE_REMOTE_TMP=/tmp/.ansible ansible cassius,edge,sentinel -m lineinfile -a "path=/home/podman/ppf/config.ini line='ssl_only = 1' insertafter='ssl_first'" ansible -i $INV workers -m lineinfile \
-a "path=/home/podman/ppf/config.ini line='ssl_only = 1' insertafter='ssl_first'"
# Restart workers via compose
ANSIBLE_REMOTE_TMP=/tmp/.ansible ansible cassius,edge,sentinel -m raw \
-a "uid=\$(id -u podman) && cd /home/podman/ppf && sudo -u podman XDG_RUNTIME_DIR=/run/user/\$uid podman-compose restart"
```
## Full Deployment Procedure
All hosts use `podman-compose` with `compose.yml` for container management.
Rsync deploys code; compose handles restart.
### Step 1: Validate Syntax Locally
```bash
cd /home/user/git/ppf
for f in *.py; do python3 -m py_compile "$f" && echo "OK: $f"; done
```
### Step 2: Deploy to ALL Hosts
```bash
cd /opt/ansible && source venv/bin/activate
# Deploy to ODIN (root ppf/ directory + compose.master.yml as compose.yml)
ANSIBLE_REMOTE_TMP=/tmp/.ansible ansible odin -m synchronize \
-a "src=/home/user/git/ppf/ dest=/home/podman/ppf/ rsync_opts='--include=*.py,--include=servers.txt,--include=Dockerfile,--exclude=*'"
ANSIBLE_REMOTE_TMP=/tmp/.ansible ansible odin -m copy \
-a "src=/home/user/git/ppf/compose.master.yml dest=/home/podman/ppf/compose.yml owner=podman group=podman"
# Deploy to WORKERS (ppf/src/ subdirectory + compose.worker.yml as compose.yml)
ANSIBLE_REMOTE_TMP=/tmp/.ansible ansible cassius,edge,sentinel -m synchronize \
-a "src=/home/user/git/ppf/ dest=/home/podman/ppf/src/ rsync_opts='--include=*.py,--include=servers.txt,--include=Dockerfile,--exclude=*'"
ANSIBLE_REMOTE_TMP=/tmp/.ansible ansible cassius,edge,sentinel -m copy \
-a "src=/home/user/git/ppf/compose.worker.yml dest=/home/podman/ppf/compose.yml owner=podman group=podman"
# CRITICAL: Fix ownership on ALL hosts (rsync uses ansible user, containers need podman)
ANSIBLE_REMOTE_TMP=/tmp/.ansible ansible odin,cassius,edge,sentinel -m raw \
-a "chown -R podman:podman /home/podman/ppf/"
```
**Note:** Ownership must be fixed after every deploy. rsync runs as ansible user, but containers run as podman user. Missing ownership fix causes `ImportError: No module named X` errors.
### Step 3: Restart Services
```bash
# Restart ODIN via compose
ANSIBLE_REMOTE_TMP=/tmp/.ansible ansible odin -m raw \
-a "uid=\$(id -u podman) && cd /home/podman/ppf && sudo -u podman XDG_RUNTIME_DIR=/run/user/\$uid podman-compose restart"
# Restart WORKERS via compose
ANSIBLE_REMOTE_TMP=/tmp/.ansible ansible cassius,edge,sentinel -m raw \
-a "uid=\$(id -u podman) && cd /home/podman/ppf && sudo -u podman XDG_RUNTIME_DIR=/run/user/\$uid podman-compose restart"
```
### Step 4: Verify All Running
```bash
# Check all hosts via compose
ANSIBLE_REMOTE_TMP=/tmp/.ansible ansible odin,cassius,edge,sentinel -m raw \
-a "uid=\$(id -u podman) && cd /home/podman/ppf && sudo -u podman XDG_RUNTIME_DIR=/run/user/\$uid podman-compose ps"
``` ```
## Podman User IDs ## Podman User IDs
@@ -140,11 +148,15 @@ ANSIBLE_REMOTE_TMP=/tmp/.ansible ansible odin,cassius,edge,sentinel -m raw \
tor_hosts = 127.0.0.1:9050 # Local Tor ONLY tor_hosts = 127.0.0.1:9050 # Local Tor ONLY
[watchd] [watchd]
threads = 0 # NO routine testing threads = 0 # NO proxy testing
database = data/ppf.sqlite database = data/proxies.sqlite
[ppf]
threads = 0 # NO URL cycling (workers handle it)
database = data/websites.sqlite
[scraper] [scraper]
threads = 10 enabled = 0 # Disabled on master
``` ```
### Worker config.ini ### Worker config.ini
@@ -188,20 +200,15 @@ batch_size = clamp(fair_share, min=100, max=1000)
- Workers shuffle their batch locally to avoid testing same proxies simultaneously - Workers shuffle their batch locally to avoid testing same proxies simultaneously
- Claims expire after 5 minutes if not completed - Claims expire after 5 minutes if not completed
## Worker Container ## Container Management
Workers run as podman containers with `--restart=unless-stopped`: All nodes run via `podman-compose` with role-specific compose files:
```bash - **Odin**: `compose.master.yml` -> deployed as `compose.yml`
podman run -d --name ppf-worker --network=host --restart=unless-stopped \ - **Workers**: `compose.worker.yml` -> deployed as `compose.yml`
-e PYTHONUNBUFFERED=1 \
-v /home/podman/ppf/src:/app:ro,Z \ Containers are managed exclusively through compose. No systemd user
-v /home/podman/ppf/data:/app/data:Z \ services or standalone `podman run` commands.
-v /home/podman/ppf/config.ini:/app/config.ini:ro,Z \
-v /home/podman/ppf/servers.txt:/app/servers.txt:ro,Z \
localhost/ppf-worker:latest \
python -u ppf.py --worker --server http://10.200.1.250:8081
```
## Rebuilding Images ## Rebuilding Images
@@ -234,10 +241,9 @@ ansible odin -m raw \
### Missing servers.txt ### Missing servers.txt
Workers need `servers.txt` in src/: Redeploy syncs `servers.txt` automatically:
```bash ```bash
ansible cassius,edge,sentinel -m copy \ ppf-deploy workers
-a "src=/home/user/git/ppf/servers.txt dest=/home/podman/ppf/src/servers.txt owner=podman group=podman"
``` ```
### Exit Code 126 (Permission/Storage) ### Exit Code 126 (Permission/Storage)
@@ -249,22 +255,17 @@ sudo -u podman podman system reset --force
### Dashboard Shows NaN or Missing Data ### Dashboard Shows NaN or Missing Data
Odin likely running old code. Redeploy to odin: Odin likely running old code:
```bash ```bash
ANSIBLE_REMOTE_TMP=/tmp/.ansible ansible odin -m synchronize \ ppf-deploy odin
-a "src=/home/user/git/ppf/ dest=/home/podman/ppf/ rsync_opts='--include=*.py,--include=servers.txt,--exclude=*'"
ANSIBLE_REMOTE_TMP=/tmp/.ansible ansible odin -m raw -a "chown -R podman:podman /home/podman/ppf/"
ANSIBLE_REMOTE_TMP=/tmp/.ansible ansible odin -m raw \
-a "uid=\$(id -u podman) && cd /home/podman/ppf && sudo -u podman XDG_RUNTIME_DIR=/run/user/\$uid podman-compose restart"
``` ```
### Worker Keeps Crashing ### Worker Keeps Crashing
1. Check container status: `sudo -u podman podman ps -a` 1. Check status: `ppf-service status workers`
2. Check logs: `sudo -u podman podman logs --tail 50 ppf-worker` 2. Check logs: `ppf-logs -n 50 cassius`
3. Verify servers.txt exists in src/ 3. Redeploy (fixes ownership + servers.txt): `ppf-deploy cassius`
4. Check ownership: `ls -la /home/podman/ppf/src/` 4. If still failing, run manually on the host to see error:
5. Run manually to see error:
```bash ```bash
sudo -u podman podman run --rm --network=host \ sudo -u podman podman run --rm --network=host \
-v /home/podman/ppf/src:/app:ro,Z \ -v /home/podman/ppf/src:/app:ro,Z \

View File

@@ -197,46 +197,39 @@ stale_count INT -- checks without new proxies
## Deployment ## Deployment
### Systemd Service
```ini
[Unit]
Description=PPF Python Proxy Finder
After=network-online.target tor.service
Wants=network-online.target
[Service]
Type=simple
User=ppf
WorkingDirectory=/opt/ppf
# ppf.py is the main entry point (runs harvester + validator)
ExecStart=/usr/bin/python2 ppf.py
Restart=on-failure
RestartSec=30
[Install]
WantedBy=multi-user.target
```
### Container Deployment ### Container Deployment
All nodes use `podman-compose` with role-specific compose files
(rootless, as `podman` user). `--network=host` required for Tor
access at 127.0.0.1:9050.
```sh ```sh
# Build # Build image
podman build -t ppf:latest . podman build -t ppf:latest .
# Run with persistent storage # Start via compose
# IMPORTANT: Use ppf.py as entry point (runs both harvester + validator) podman-compose up -d
podman run -d --name ppf \
--network=host \
-v ./data:/app/data:Z \
-v ./config.ini:/app/config.ini:ro \
ppf:latest python ppf.py
# Generate systemd unit # View logs / stop
podman generate systemd --name ppf --files --new podman-compose logs -f
podman-compose down
``` ```
Note: `--network=host` required for Tor access at 127.0.0.1:9050. ### Operations Toolkit
The `tools/` directory provides CLI wrappers for multi-node operations.
Deployment uses an Ansible playbook over WireGuard for parallel execution
and handler-based restarts.
```sh
ppf-deploy [targets...] # validate + deploy + restart (playbook)
ppf-deploy --check # dry run with diff
ppf-logs [node] # view container logs (-f to follow)
ppf-service <cmd> [nodes...] # status / start / stop / restart
ppf-db <cmd> # stats / purge-proxies / vacuum
```
See `--help` on each tool.
## Troubleshooting ## Troubleshooting

View File

@@ -89,3 +89,9 @@ PPF (Proxy Fetcher) is a Python 2 proxy scraping and validation framework design
| job.py | Priority job queue | | job.py | Priority job queue |
| static/dashboard.js | Dashboard frontend logic | | static/dashboard.js | Dashboard frontend logic |
| static/dashboard.html | Dashboard HTML template | | static/dashboard.html | Dashboard HTML template |
| tools/lib/ppf-common.sh | Shared ops library (hosts, wrappers, colors) |
| tools/ppf-deploy | Deploy wrapper (validation + playbook) |
| tools/ppf-logs | View container logs |
| tools/ppf-service | Container lifecycle management |
| tools/playbooks/deploy.yml | Ansible deploy playbook |
| tools/playbooks/inventory.ini | Host inventory (WireGuard IPs) |

10
TODO.md
View File

@@ -51,13 +51,11 @@ Optimize only if memory becomes a constraint.
## Deprecation ## Deprecation
### [ ] Remove V1 worker protocol ### [x] Remove V1 worker protocol
- V2 workers (URL-driven) are the standard; no V1 workers remain active Completed. Removed `--worker` flag, `worker_main()`, `claim_work()`,
- Remove `--worker` flag and V1 code path in ppf.py `submit_results()`, `/api/work`, `/api/results`, and related config
- Remove `/api/claim`, `/api/submit` V1 endpoints in httpd.py options. `--worker` now routes to the URL-driven protocol.
- Remove V1 heartbeat/registration handling
- Clean up any V1-specific state tracking in proxywatchd.py
--- ---

View File

@@ -35,4 +35,4 @@ services:
- ./data:/app/data:Z - ./data:/app/data:Z
- ./config.ini:/app/config.ini:ro,Z - ./config.ini:/app/config.ini:ro,Z
- ./servers.txt:/app/servers.txt:ro,Z - ./servers.txt:/app/servers.txt:ro,Z
command: python -u ppf.py --worker-v2 --server ${PPF_MASTER_URL:-http://10.200.1.250:8081} command: python -u ppf.py --worker --server ${PPF_MASTER_URL:-http://10.200.1.250:8081}

View File

@@ -167,20 +167,17 @@ class Config(ComboParser):
self.add_item(section, 'spot_check_pct', float, 1.0, 'percent of working proxies to spot-check (default: 1.0)', False) self.add_item(section, 'spot_check_pct', float, 1.0, 'percent of working proxies to spot-check (default: 1.0)', False)
section = 'worker' section = 'worker'
self.add_item(section, 'batch_size', int, 100, 'proxies per work batch (default: 100)', False)
self.add_item(section, 'heartbeat', int, 60, 'heartbeat interval in seconds (default: 60)', False) self.add_item(section, 'heartbeat', int, 60, 'heartbeat interval in seconds (default: 60)', False)
self.add_item(section, 'claim_timeout', int, 300, 'seconds before unclaimed work is released (default: 300)', False) self.add_item(section, 'url_batch_size', int, 5, 'URLs per claim cycle (default: 5)', False)
self.add_item(section, 'url_batch_size', int, 5, 'URLs per claim cycle for V2 mode (default: 5)', False) self.add_item(section, 'fetch_timeout', int, 30, 'timeout for URL fetching (default: 30)', False)
self.add_item(section, 'fetch_timeout', int, 30, 'timeout for URL fetching in V2 mode (default: 30)', False)
self.aparser.add_argument("--file", help="import a single file containing proxy addrs", type=str, default='', required=False) self.aparser.add_argument("--file", help="import a single file containing proxy addrs", type=str, default='', required=False)
self.aparser.add_argument("--nobs", help="disable BeautifulSoup, use stdlib HTMLParser", action='store_true', default=False) self.aparser.add_argument("--nobs", help="disable BeautifulSoup, use stdlib HTMLParser", action='store_true', default=False)
self.aparser.add_argument("-q", "--quiet", help="suppress info messages, show warnings and errors only", action='store_true', default=False) self.aparser.add_argument("-q", "--quiet", help="suppress info messages, show warnings and errors only", action='store_true', default=False)
self.aparser.add_argument("-v", "--verbose", help="show debug messages", action='store_true', default=False) self.aparser.add_argument("-v", "--verbose", help="show debug messages", action='store_true', default=False)
self.aparser.add_argument("--profile", help="enable cProfile profiling, output to profile.stats", action='store_true', default=False) self.aparser.add_argument("--profile", help="enable cProfile profiling, output to profile.stats", action='store_true', default=False)
self.aparser.add_argument("--worker", help="run as worker node", action='store_true', default=False)
self.aparser.add_argument("--server", help="master server URL (e.g., https://master:8081)", type=str, default='') self.aparser.add_argument("--server", help="master server URL (e.g., https://master:8081)", type=str, default='')
self.aparser.add_argument("--worker-key", help="worker authentication key", type=str, default='') self.aparser.add_argument("--worker-key", help="worker authentication key", type=str, default='')
self.aparser.add_argument("--register", help="register as worker with master server", action='store_true', default=False) self.aparser.add_argument("--register", help="register as worker with master server", action='store_true', default=False)
self.aparser.add_argument("--worker-name", help="worker name for registration (default: hostname)", type=str, default='') self.aparser.add_argument("--worker-name", help="worker name for registration (default: hostname)", type=str, default='')
self.aparser.add_argument("--worker-v2", help="run as V2 worker (URL-driven fetching)", action='store_true', default=False) self.aparser.add_argument("--worker", help="run as worker node", action='store_true', default=False)

338
httpd.py
View File

@@ -73,11 +73,8 @@ import string
_workers = {} # worker_id -> {name, ip, last_seen, jobs_completed, proxies_tested, ...} _workers = {} # worker_id -> {name, ip, last_seen, jobs_completed, proxies_tested, ...}
_workers_lock = threading.Lock() _workers_lock = threading.Lock()
_work_claims = {} # proxy_key -> {worker_id, claimed_at}
_work_claims_lock = threading.Lock()
_worker_keys = set() # valid API keys _worker_keys = set() # valid API keys
_master_key = None # master key for worker registration _master_key = None # master key for worker registration
_claim_timeout = 300 # seconds before unclaimed work is released
_workers_file = 'data/workers.json' # persistent storage _workers_file = 'data/workers.json' # persistent storage
# URL claim tracking (parallel to proxy claims) # URL claim tracking (parallel to proxy claims)
@@ -101,11 +98,6 @@ _worker_test_history = {}
_worker_test_history_lock = threading.Lock() _worker_test_history_lock = threading.Lock()
_test_history_window = 120 # seconds to keep test history for rate calculation _test_history_window = 120 # seconds to keep test history for rate calculation
# Fair distribution settings
_min_batch_size = 1 # minimum proxies per batch
_max_batch_size = 10000 # maximum proxies per batch
_worker_timeout = 120 # seconds before worker considered inactive
# Session tracking # Session tracking
_session_start_time = int(time.time()) # when httpd started _session_start_time = int(time.time()) # when httpd started
@@ -171,53 +163,6 @@ def _build_due_condition():
return condition, params return condition, params
def get_active_worker_count():
"""Count workers seen within timeout window."""
now = time.time()
with _workers_lock:
return sum(1 for w in _workers.values()
if (now - w.get('last_seen', 0)) < _worker_timeout)
def get_due_proxy_count(db):
"""Count proxies due for testing (not claimed)."""
with _work_claims_lock:
claimed_count = len(_work_claims)
try:
condition, params = _build_due_condition()
query = 'SELECT COUNT(*) FROM proxylist WHERE ' + condition
result = db.execute(query, params).fetchone()
total_due = result[0] if result else 0
return max(0, total_due - claimed_count)
except Exception:
return 0
def calculate_fair_batch_size(db, worker_id):
"""Calculate fair batch size based on active workers and queue size.
Divides due work evenly among active workers. No artificial floor —
if only 6 proxies are due with 3 workers, each gets 2.
"""
active_workers = max(1, get_active_worker_count())
due_count = get_due_proxy_count(db)
if due_count == 0:
return 0
# Fair share: divide due work evenly among active workers
fair_share = max(1, int(due_count / active_workers))
# Clamp to upper bound only
batch_size = min(fair_share, _max_batch_size)
_log('fair_batch: due=%d workers=%d share=%d batch=%d' % (
due_count, active_workers, fair_share, batch_size), 'debug')
return batch_size
def load_workers(): def load_workers():
"""Load worker registry from disk.""" """Load worker registry from disk."""
global _workers, _worker_keys global _workers, _worker_keys
@@ -355,101 +300,6 @@ def get_worker_test_rate(worker_id):
return 0.0 return 0.0
return total_tests / elapsed return total_tests / elapsed
def claim_work(db, worker_id, count=100):
"""Claim a batch of proxies for testing. Returns list of proxy dicts."""
now = time.time()
now_int = int(now)
# Calculate fair batch size based on active workers and queue size
# Distributes work evenly: due_proxies / active_workers (with bounds)
target_count = calculate_fair_batch_size(db, worker_id)
# Clean up stale claims and get current claimed set
with _work_claims_lock:
stale = [k for k, v in _work_claims.items() if now - v['claimed_at'] > _claim_timeout]
for k in stale:
del _work_claims[k]
# Copy current claims to exclude from query
claimed_keys = set(_work_claims.keys())
# Get proxies that need testing
# Priority: untested first, then oldest due - with randomness within tiers
try:
# Build exclusion clause for already-claimed proxies
# Use ip||':'||port to match our claim key format
if claimed_keys:
# SQLite placeholder limit is ~999, chunk if needed
placeholders = ','.join('?' for _ in claimed_keys)
exclude_clause = "AND (ip || ':' || port) NOT IN (%s)" % placeholders
exclude_params = list(claimed_keys)
else:
exclude_clause = ""
exclude_params = []
# Build due condition using new schedule formula
due_condition, due_params = _build_due_condition()
# Priority tiers: 0=untested, 1=very overdue (>1hr), 2=recently due
# Calculate overdue time based on new formula
if _fail_retry_backoff:
overdue_calc = '''
CASE WHEN failed = 0
THEN ? - (tested + ?)
ELSE ? - (tested + (failed * ?))
END
'''
priority_params = [now_int, _working_checktime, now_int, _fail_retry_interval]
else:
overdue_calc = '''
CASE WHEN failed = 0
THEN ? - (tested + ?)
ELSE ? - (tested + ?)
END
'''
priority_params = [now_int, _working_checktime, now_int, _fail_retry_interval]
query = '''
SELECT ip, port, proto, failed, source_proto,
CASE
WHEN tested IS NULL THEN 0
WHEN (%s) > 3600 THEN 1
ELSE 2
END as priority
FROM proxylist
WHERE %s
%s
ORDER BY priority, RANDOM()
LIMIT ?
''' % (overdue_calc, due_condition, exclude_clause)
params = priority_params + list(due_params) + exclude_params + [target_count]
rows = db.execute(query, params).fetchall()
except Exception as e:
_log('claim_work query error: %s' % e, 'error')
return []
# Claim the fetched proxies (already filtered by query)
claimed = []
with _work_claims_lock:
for row in rows:
proxy_key = '%s:%s' % (row[0], row[1])
# Double-check not claimed (race condition protection)
if proxy_key not in _work_claims:
_work_claims[proxy_key] = {'worker_id': worker_id, 'claimed_at': now}
claimed.append({
'ip': row[0],
'port': row[1],
'proto': row[2],
'failed': row[3],
'source_proto': row[4],
})
if claimed:
_log('claim_work: %d proxies to %s (pool: %d claimed)' % (
len(claimed), worker_id[:8], len(_work_claims)), 'info')
return claimed
def claim_urls(url_db, worker_id, count=5): def claim_urls(url_db, worker_id, count=5):
"""Claim a batch of URLs for worker-driven fetching. Returns list of URL dicts. """Claim a batch of URLs for worker-driven fetching. Returns list of URL dicts.
@@ -795,126 +645,6 @@ def submit_proxy_reports(db, worker_id, proxies):
return processed return processed
_last_workers_save = 0
def submit_results(db, worker_id, results):
"""Process test results from a worker. Returns count of processed results."""
global _last_workers_save
processed = 0
working_count = 0
total_latency = 0
now = time.time()
with _workers_lock:
if worker_id in _workers:
_workers[worker_id]['last_seen'] = now
for r in results:
proxy_key = '%s:%s' % (r.get('ip', ''), r.get('port', ''))
# Release claim
with _work_claims_lock:
if proxy_key in _work_claims:
del _work_claims[proxy_key]
# Update database - trust workers, add missing proxies if working
try:
working = 1 if r.get('working') else 0
latency_ms = r.get('latency', 0) if working else None
error_cat = r.get('error_category') if not working else None
if working:
# Use INSERT OR REPLACE to add working proxies that don't exist
db.execute('''
INSERT INTO proxylist (proxy, ip, port, proto, failed, tested, avg_latency, added)
VALUES (?, ?, ?, ?, 0, ?, ?, ?)
ON CONFLICT(proxy) DO UPDATE SET
failed = 0,
tested = excluded.tested,
avg_latency = excluded.avg_latency
''', (proxy_key, r['ip'], r['port'], r.get('proto', 'http'), int(now),
latency_ms, int(now)))
working_count += 1
total_latency += latency_ms or 0
# Geolocate working proxy if IP2Location available
if _geolite and _geodb:
try:
rec = _geodb.get_all(r['ip'])
if rec and rec.country_short and rec.country_short != '-':
db.execute(
'UPDATE proxylist SET country=? WHERE proxy=?',
(rec.country_short, proxy_key))
except Exception:
pass # Geolocation is best-effort
else:
# For failures, only update if exists (don't add non-working proxies)
db.execute('''
UPDATE proxylist SET
failed = failed + 1,
tested = ?
WHERE ip = ? AND port = ?
''', (int(now), r['ip'], r['port']))
# Record result for verification system
insert_proxy_result(db, proxy_key, worker_id, working,
latency_ms=latency_ms, error_category=error_cat)
# Check for disagreement with other workers
disagreement, other_worker, other_result = check_for_disagreement(
db, proxy_key, worker_id, working)
if disagreement:
# Queue for manager verification (priority 3 = high)
queue_verification(db, proxy_key, 'disagreement', priority=3,
worker_a=worker_id, worker_b=other_worker,
result_a=working, result_b=other_result)
elif working:
# Check for resurrection: was dead (failed >= 3), now working
row = db.execute(
'SELECT failed FROM proxylist WHERE proxy = ?', (proxy_key,)
).fetchone()
if row and row[0] >= 3:
queue_verification(db, proxy_key, 'resurrection', priority=3,
worker_a=worker_id, result_a=1)
else:
# Check for sudden death: was working (consecutive_success >= 3), now failed
row = db.execute(
'SELECT consecutive_success FROM proxylist WHERE proxy = ?', (proxy_key,)
).fetchone()
if row and row[0] and row[0] >= 3:
queue_verification(db, proxy_key, 'sudden_death', priority=2,
worker_a=worker_id, result_a=0)
processed += 1
except Exception as e:
_log('submit_results db error for %s: %s' % (proxy_key, e), 'error')
# Update worker stats
with _workers_lock:
if worker_id in _workers:
w = _workers[worker_id]
w['jobs_completed'] += 1
w['proxies_tested'] += processed
w['proxies_working'] = w.get('proxies_working', 0) + working_count
w['proxies_failed'] = w.get('proxies_failed', 0) + (processed - working_count)
w['total_latency'] = w.get('total_latency', 0) + total_latency
w['last_batch_size'] = len(results)
w['last_batch_working'] = working_count
# Commit database changes
db.commit()
# Record for test rate calculation
record_test_rate(worker_id, processed)
# Save workers periodically (every 60s)
if now - _last_workers_save > 60:
save_workers()
_last_workers_save = now
return processed
def is_localhost(ip): def is_localhost(ip):
"""Check if IP is localhost (127.0.0.0/8 or ::1).""" """Check if IP is localhost (127.0.0.0/8 or ::1)."""
if not ip: if not ip:
@@ -1605,7 +1335,7 @@ class ProxyAPIServer(threading.Thread):
return [b'Method not allowed'] return [b'Method not allowed']
# POST only allowed for worker API endpoints # POST only allowed for worker API endpoints
post_endpoints = ('/api/register', '/api/results', '/api/heartbeat', post_endpoints = ('/api/register', '/api/heartbeat',
'/api/report-urls', '/api/report-proxies') '/api/report-urls', '/api/report-proxies')
if method == 'POST' and path not in post_endpoints: if method == 'POST' and path not in post_endpoints:
start_response('405 Method Not Allowed', [('Content-Type', 'text/plain')]) start_response('405 Method Not Allowed', [('Content-Type', 'text/plain')])
@@ -1673,8 +1403,6 @@ class ProxyAPIServer(threading.Thread):
'/api/stats': 'runtime statistics (JSON)', '/api/stats': 'runtime statistics (JSON)',
'/api/mitm': 'MITM certificate statistics (JSON)', '/api/mitm': 'MITM certificate statistics (JSON)',
'/api/countries': 'proxy counts by country (JSON)', '/api/countries': 'proxy counts by country (JSON)',
'/api/work': 'get work batch for worker (params: key, count)',
'/api/results': 'submit test results (POST, params: key)',
'/api/register': 'register as worker (POST)', '/api/register': 'register as worker (POST)',
'/api/workers': 'list connected workers', '/api/workers': 'list connected workers',
'/api/claim-urls': 'claim URL batch for worker-driven fetching (GET, params: key, count)', '/api/claim-urls': 'claim URL batch for worker-driven fetching (GET, params: key, count)',
@@ -1860,7 +1588,7 @@ class ProxyAPIServer(threading.Thread):
rows = db.execute(sql, args).fetchall() rows = db.execute(sql, args).fetchall()
if fmt == 'plain': if fmt == 'plain':
return '\n'.join('%s:%s' % (r[0], r[1]) for r in rows), 'text/plain', 200 return '\n'.join('%s://%s:%s' % (r[2] or 'http', r[0], r[1]) for r in rows), 'text/plain', 200
proxies = [{ proxies = [{
'proxy': '%s:%s' % (r[0], r[1]), 'proto': r[2], 'country': r[3], 'proxy': '%s:%s' % (r[0], r[1]), 'proto': r[2], 'country': r[3],
'asn': r[4], 'latency': r[5], 'asn': r[4], 'latency': r[5],
@@ -1941,54 +1669,6 @@ class ProxyAPIServer(threading.Thread):
'message': 'registered successfully', 'message': 'registered successfully',
}), 'application/json', 200 }), 'application/json', 200
elif path == '/api/work':
# Get batch of proxies to test (GET)
key = query_params.get('key', '')
if not validate_worker_key(key):
return json.dumps({'error': 'invalid worker key'}), 'application/json', 403
worker_id, _ = get_worker_by_key(key)
if not worker_id:
return json.dumps({'error': 'worker not found'}), 'application/json', 404
count = int(query_params.get('count', 100))
count = min(count, 500) # Cap at 500
try:
db = mysqlite.mysqlite(self.database, str)
proxies = claim_work(db, worker_id, count)
update_worker_heartbeat(worker_id)
return json.dumps({
'worker_id': worker_id,
'count': len(proxies),
'proxies': proxies,
}), 'application/json', 200
except Exception as e:
return json.dumps({'error': str(e)}), 'application/json', 500
elif path == '/api/results':
# Submit test results (POST)
key = query_params.get('key', '')
if not validate_worker_key(key):
return json.dumps({'error': 'invalid worker key'}), 'application/json', 403
worker_id, _ = get_worker_by_key(key)
if not worker_id:
return json.dumps({'error': 'worker not found'}), 'application/json', 404
if not post_data:
return json.dumps({'error': 'POST body required'}), 'application/json', 400
results = post_data.get('results', [])
if not results:
return json.dumps({'error': 'no results provided'}), 'application/json', 400
working = sum(1 for r in results if r.get('working'))
_log('results: %d from %s (%d working)' % (len(results), worker_id[:8], working), 'info')
try:
db = mysqlite.mysqlite(self.database, str)
processed = submit_results(db, worker_id, results)
return json.dumps({
'worker_id': worker_id,
'processed': processed,
'message': 'results submitted',
}), 'application/json', 200
except Exception as e:
return json.dumps({'error': str(e)}), 'application/json', 500
elif path == '/api/heartbeat': elif path == '/api/heartbeat':
# Worker heartbeat with Tor status (POST) # Worker heartbeat with Tor status (POST)
key = query_params.get('key', '') key = query_params.get('key', '')
@@ -2132,11 +1812,8 @@ class ProxyAPIServer(threading.Thread):
'SELECT COUNT(*) FROM proxylist WHERE ' + due_condition, 'SELECT COUNT(*) FROM proxylist WHERE ' + due_condition,
due_params).fetchone() due_params).fetchone()
due_total = row[0] if row else 0 due_total = row[0] if row else 0
# Subtract currently claimed stats['due'] = due_total
with _work_claims_lock: stats['claimed'] = 0
claimed_count = len(_work_claims)
stats['due'] = max(0, due_total - claimed_count)
stats['claimed'] = claimed_count
except Exception as e: except Exception as e:
_log('_get_db_stats error: %s' % e, 'warn') _log('_get_db_stats error: %s' % e, 'warn')
return stats return stats
@@ -2218,16 +1895,13 @@ class ProxyAPIServer(threading.Thread):
if queue_stats['total'] > 0: if queue_stats['total'] > 0:
pct = 100.0 * queue_stats['session_tested'] / queue_stats['total'] pct = 100.0 * queue_stats['session_tested'] / queue_stats['total']
queue_stats['session_pct'] = round(min(pct, 100.0), 1) queue_stats['session_pct'] = round(min(pct, 100.0), 1)
# Claimed = currently being tested by workers queue_stats['claimed'] = 0
with _work_claims_lock:
queue_stats['claimed'] = len(_work_claims)
# Due = ready for testing (respecting cooldown) # Due = ready for testing (respecting cooldown)
due_condition, due_params = _build_due_condition() due_condition, due_params = _build_due_condition()
row = db.execute( row = db.execute(
'SELECT COUNT(*) FROM proxylist WHERE ' + due_condition, 'SELECT COUNT(*) FROM proxylist WHERE ' + due_condition,
due_params).fetchone() due_params).fetchone()
due_total = row[0] if row else 0 queue_stats['due'] = row[0] if row else 0
queue_stats['due'] = max(0, due_total - queue_stats['claimed'])
except Exception as e: except Exception as e:
_log('_get_workers_data queue stats error: %s' % e, 'warn') _log('_get_workers_data queue stats error: %s' % e, 'warn')

370
ppf.py
View File

@@ -303,48 +303,6 @@ class NeedReregister(Exception):
pass pass
def worker_get_work(server_url, worker_key, count=100):
"""Fetch batch of proxies from master."""
url = '%s/api/work?key=%s&count=%d' % (server_url.rstrip('/'), worker_key, count)
try:
resp = urllib2.urlopen(url, timeout=30)
result = json.loads(resp.read())
return result.get('proxies', [])
except urllib2.HTTPError as e:
if e.code == 403:
_log('worker key rejected (403), need to re-register', 'warn')
raise NeedReregister()
_log('failed to get work: %s' % e, 'error')
return []
except Exception as e:
_log('failed to get work: %s' % e, 'error')
return []
def worker_submit_results(server_url, worker_key, results):
"""Submit test results to master."""
url = '%s/api/results?key=%s' % (server_url.rstrip('/'), worker_key)
data = json.dumps({'results': results})
req = urllib2.Request(url, data)
req.add_header('Content-Type', 'application/json')
try:
resp = urllib2.urlopen(req, timeout=30)
result = json.loads(resp.read())
return result.get('processed', 0)
except urllib2.HTTPError as e:
if e.code == 403:
_log('worker key rejected (403), need to re-register', 'warn')
raise NeedReregister()
_log('failed to submit results: %s' % e, 'error')
return 0
except Exception as e:
_log('failed to submit results: %s' % e, 'error')
return 0
def worker_send_heartbeat(server_url, worker_key, tor_ok, tor_ip=None, profiling=False, threads=0): def worker_send_heartbeat(server_url, worker_key, tor_ok, tor_ip=None, profiling=False, threads=0):
"""Send heartbeat with Tor status to master.""" """Send heartbeat with Tor status to master."""
url = '%s/api/heartbeat?key=%s' % (server_url.rstrip('/'), worker_key) url = '%s/api/heartbeat?key=%s' % (server_url.rstrip('/'), worker_key)
@@ -370,7 +328,7 @@ def worker_send_heartbeat(server_url, worker_key, tor_ok, tor_ip=None, profiling
def worker_claim_urls(server_url, worker_key, count=5): def worker_claim_urls(server_url, worker_key, count=5):
"""Claim batch of URLs for V2 worker mode.""" """Claim batch of URLs for worker mode."""
url = '%s/api/claim-urls?key=%s&count=%d' % (server_url.rstrip('/'), worker_key, count) url = '%s/api/claim-urls?key=%s&count=%d' % (server_url.rstrip('/'), worker_key, count)
try: try:
@@ -471,310 +429,7 @@ def check_tor_connectivity(tor_hosts):
def worker_main(config): def worker_main(config):
"""Worker mode main loop - uses proxywatchd multi-threaded testing.""" """Worker mode -- URL-driven discovery.
import json
global urllib2
try:
import Queue
except ImportError:
import queue as Queue
# Import proxywatchd for multi-threaded testing (gevent already patched at top)
import proxywatchd
proxywatchd.set_config(config)
server_url = config.args.server
if not server_url:
_log('--server URL required for worker mode', 'error')
sys.exit(1)
worker_key = config.args.worker_key
worker_name = config.args.worker_name or os.uname()[1]
batch_size = config.worker.batch_size
num_threads = config.watchd.threads
worker_id = None
# Register if --register flag or no key provided
if config.args.register or not worker_key:
_log('registering with master: %s' % server_url, 'info')
worker_id, worker_key = worker_register(server_url, worker_name)
if not worker_key:
_log('registration failed, exiting', 'error')
sys.exit(1)
_log('registered as %s (id: %s)' % (worker_name, worker_id), 'info')
_log('worker key: %s' % worker_key, 'info')
_log('save this key with --worker-key for future runs', 'info')
if config.args.register:
# Just register and exit
return
_log('starting worker mode', 'info')
_log(' server: %s' % server_url, 'info')
_log(' threads: %d' % num_threads, 'info')
_log(' batch size: %d' % batch_size, 'info')
_log(' tor hosts: %s' % config.common.tor_hosts, 'info')
# Verify Tor connectivity before claiming work
import socket
import socks
working_tor_hosts = []
for tor_host in config.torhosts:
host, port = tor_host.split(':')
port = int(port)
try:
# Test SOCKS connection
test_sock = socks.socksocket()
test_sock.set_proxy(socks.SOCKS5, host, port)
test_sock.settimeout(10)
test_sock.connect(('check.torproject.org', 80))
test_sock.send(b'GET / HTTP/1.0\r\nHost: check.torproject.org\r\n\r\n')
resp = test_sock.recv(512)
test_sock.close()
# Accept any HTTP response (200, 301, 302, etc.)
if resp and (b'HTTP/' in resp or len(resp) > 0):
status = resp.split(b'\r\n')[0] if b'\r\n' in resp else resp[:50]
_log('tor host %s:%d OK (%s)' % (host, port, status), 'info')
working_tor_hosts.append(tor_host)
else:
_log('tor host %s:%d no response (recv=%d bytes)' % (host, port, len(resp) if resp else 0), 'warn')
except Exception as e:
_log('tor host %s:%d failed: %s' % (host, port, e), 'warn')
if not working_tor_hosts:
_log('no working Tor hosts, cannot start worker', 'error')
sys.exit(1)
_log('%d/%d Tor hosts verified' % (len(working_tor_hosts), len(config.torhosts)), 'info')
# Create shared queues for worker threads
job_queue = proxywatchd.PriorityJobQueue()
completion_queue = Queue.Queue()
# Spawn worker threads with stagger to avoid overwhelming Tor
threads = []
for i in range(num_threads):
wt = proxywatchd.WorkerThread('w%d' % i, job_queue)
wt.start_thread()
threads.append(wt)
time.sleep(random.random() / 10) # 0-100ms stagger per thread
_log('spawned %d worker threads' % len(threads), 'info')
jobs_completed = 0
proxies_tested = 0
start_time = time.time()
current_tor_ip = None
consecutive_tor_failures = 0
worker_profiling = config.args.profile or config.common.profiling
# Use dict to allow mutation in nested function (Python 2 compatible)
wstate = {'worker_key': worker_key, 'worker_id': worker_id, 'backoff': 10}
def do_register():
"""Register with master, with exponential backoff on failure."""
while True:
_log('registering with master: %s' % server_url, 'info')
new_id, new_key = worker_register(server_url, worker_name)
if new_key:
wstate['worker_id'] = new_id
wstate['worker_key'] = new_key
wstate['backoff'] = 10 # Reset backoff on success
_log('registered as %s (id: %s)' % (worker_name, new_id), 'info')
return True
else:
_log('registration failed, retrying in %ds' % wstate['backoff'], 'warn')
time.sleep(wstate['backoff'])
wstate['backoff'] = min(wstate['backoff'] * 2, 300) # Max 5 min backoff
def wait_for_tor():
"""Wait for Tor to become available, checking every 30 seconds."""
check_interval = 30
while True:
working, tor_ip = check_tor_connectivity(config.torhosts)
if working:
_log('tor recovered: %s (%s)' % (working[0], tor_ip or 'unknown'), 'info')
# Send heartbeat to manager
try:
worker_send_heartbeat(server_url, wstate['worker_key'], True, tor_ip, worker_profiling, num_threads)
except NeedReregister:
do_register()
return working, tor_ip
_log('tor still down, retrying in %ds' % check_interval, 'warn')
# Send heartbeat with tor_ok=False
try:
worker_send_heartbeat(server_url, wstate['worker_key'], False, None, worker_profiling, num_threads)
except NeedReregister:
do_register()
time.sleep(check_interval)
try:
while True:
# Tor check before claiming work - don't claim if Tor is down
working, tor_ip = check_tor_connectivity(config.torhosts)
if not working:
consecutive_tor_failures += 1
_log('tor down before claiming work (consecutive: %d)' % consecutive_tor_failures, 'warn')
try:
worker_send_heartbeat(server_url, wstate['worker_key'], False, None, worker_profiling, num_threads)
except NeedReregister:
do_register()
if consecutive_tor_failures >= 2:
_log('tor appears down, waiting before claiming work', 'error')
working, current_tor_ip = wait_for_tor()
consecutive_tor_failures = 0
else:
time.sleep(10)
continue
else:
consecutive_tor_failures = 0
if tor_ip != current_tor_ip:
if current_tor_ip:
_log('tor circuit rotated: %s' % tor_ip, 'info')
current_tor_ip = tor_ip
# Send heartbeat to manager
try:
worker_send_heartbeat(server_url, wstate['worker_key'], True, tor_ip, worker_profiling, num_threads)
except NeedReregister:
do_register()
# Get work from master
try:
proxies = worker_get_work(server_url, wstate['worker_key'], batch_size)
except NeedReregister:
do_register()
continue
if not proxies:
_log('no work available, sleeping 30s', 'info')
time.sleep(30)
continue
_log('received %d proxies to test' % len(proxies), 'info')
# Create ProxyTestState and jobs for each proxy
pending_states = {}
all_jobs = []
# Get checktype(s) from config
checktypes = config.watchd.checktypes
for proxy_info in proxies:
ip = proxy_info['ip']
port = proxy_info['port']
proto = proxy_info.get('proto', 'http')
failed = proxy_info.get('failed', 0)
source_proto = proxy_info.get('source_proto')
proxy_str = '%s:%d' % (ip, port)
# Create state for this proxy
state = proxywatchd.ProxyTestState(
ip, port, proto, failed,
success_count=0, total_duration=0.0,
country=None, mitm=0, consecutive_success=0,
asn=None, oldies=False,
completion_queue=completion_queue,
proxy_full=proxy_str, source_proto=source_proto
)
pending_states[proxy_str] = state
# Select random checktype
checktype = random.choice(checktypes)
# Get target for this checktype
if checktype == 'judges':
available = proxywatchd.judge_stats.get_available_judges(
list(proxywatchd.judges.keys()))
target = random.choice(available) if available else random.choice(
list(proxywatchd.judges.keys()))
elif checktype == 'ssl':
target = random.choice(proxywatchd.ssl_targets)
elif checktype == 'irc':
target = random.choice(config.servers) if config.servers else 'irc.libera.chat:6667'
else: # head
target = random.choice(list(proxywatchd.regexes.keys()))
job = proxywatchd.TargetTestJob(state, target, checktype)
all_jobs.append(job)
# Shuffle and queue jobs
random.shuffle(all_jobs)
for job in all_jobs:
job_queue.put(job, priority=0)
# Wait for all jobs to complete
completed = 0
results = []
timeout_start = time.time()
timeout_seconds = config.watchd.timeout * 2 + 30 # generous timeout
while completed < len(proxies):
try:
state = completion_queue.get(timeout=1)
completed += 1
# Build result from state (failcount == 0 means success)
is_working = state.failcount == 0
latency_sec = (state.last_latency_ms / 1000.0) if state.last_latency_ms else 0
result = {
'ip': state.ip,
'port': state.port,
'proto': state.proto,
'working': is_working,
'latency': round(latency_sec, 3) if is_working else 0,
'error': None if is_working else 'failed',
}
results.append(result)
# Progress logging
if completed % 20 == 0 or completed == len(proxies):
working = sum(1 for r in results if r.get('working'))
_log('tested %d/%d proxies (%d working)' % (
completed, len(proxies), working), 'info')
except Queue.Empty:
if time.time() - timeout_start > timeout_seconds:
_log('batch timeout, %d/%d completed' % (completed, len(proxies)), 'warn')
break
continue
# Submit results
try:
processed = worker_submit_results(server_url, wstate['worker_key'], results)
except NeedReregister:
do_register()
# Retry submission with new key
try:
processed = worker_submit_results(server_url, wstate['worker_key'], results)
except NeedReregister:
_log('still rejected after re-register, discarding batch', 'error')
processed = 0
jobs_completed += 1
proxies_tested += len(results)
working = sum(1 for r in results if r.get('working'))
_log('batch %d: %d/%d working, submitted %d' % (
jobs_completed, working, len(results), processed), 'info')
# Brief pause between batches
time.sleep(1)
except KeyboardInterrupt:
elapsed = time.time() - start_time
_log('worker stopping...', 'info')
# Stop threads
for wt in threads:
wt.stop()
for wt in threads:
wt.term()
_log('worker stopped after %s' % format_duration(int(elapsed)), 'info')
_log(' jobs completed: %d' % jobs_completed, 'info')
_log(' proxies tested: %d' % proxies_tested, 'info')
def worker_v2_main(config):
"""V2 worker mode -- URL-driven discovery.
Claims URLs from master, fetches through Tor, extracts and tests proxies, Claims URLs from master, fetches through Tor, extracts and tests proxies,
reports working proxies back to master. reports working proxies back to master.
@@ -815,7 +470,7 @@ def worker_v2_main(config):
if config.args.register: if config.args.register:
return return
_log('starting worker V2 mode (URL-driven)', 'info') _log('starting worker mode (URL-driven)', 'info')
_log(' server: %s' % server_url, 'info') _log(' server: %s' % server_url, 'info')
_log(' threads: %d' % num_threads, 'info') _log(' threads: %d' % num_threads, 'info')
_log(' url batch: %d' % url_batch_size, 'info') _log(' url batch: %d' % url_batch_size, 'info')
@@ -1120,7 +775,8 @@ def worker_v2_main(config):
state = completion_queue.get(timeout=1) state = completion_queue.get(timeout=1)
completed += 1 completed += 1
if state.failcount == 0: success, _ = state.evaluate()
if success:
latency_sec = (state.last_latency_ms / 1000.0) if state.last_latency_ms else 0 latency_sec = (state.last_latency_ms / 1000.0) if state.last_latency_ms else 0
proxy_addr = state.proxy proxy_addr = state.proxy
if state.auth: if state.auth:
@@ -1168,13 +824,13 @@ def worker_v2_main(config):
except KeyboardInterrupt: except KeyboardInterrupt:
elapsed = time.time() - start_time elapsed = time.time() - start_time
_log('worker V2 stopping...', 'info') _log('worker stopping...', 'info')
session.close() session.close()
for wt in threads: for wt in threads:
wt.stop() wt.stop()
for wt in threads: for wt in threads:
wt.term() wt.term()
_log('worker V2 stopped after %s' % format_duration(int(elapsed)), 'info') _log('worker stopped after %s' % format_duration(int(elapsed)), 'info')
_log(' cycles: %d' % cycles, 'info') _log(' cycles: %d' % cycles, 'info')
_log(' urls fetched: %d' % urls_fetched, 'info') _log(' urls fetched: %d' % urls_fetched, 'info')
_log(' proxies found: %d' % proxies_found, 'info') _log(' proxies found: %d' % proxies_found, 'info')
@@ -1193,12 +849,7 @@ def main():
else: else:
sys.exit(1) sys.exit(1)
# V2 worker mode: URL-driven discovery # Worker mode: URL-driven discovery
if config.args.worker_v2:
worker_v2_main(config)
return
# V1 worker mode: connect to master server instead of running locally
if config.args.worker or config.args.register: if config.args.worker or config.args.register:
worker_main(config) worker_main(config)
return return
@@ -1303,6 +954,11 @@ def main():
last_skip_log = 0 last_skip_log = 0
while True: while True:
try: try:
# When ppf threads = 0, skip URL fetching (workers handle it via /api/claim-urls)
if config.ppf.threads == 0:
time.sleep(60)
continue
time.sleep(random.random()/10) time.sleep(random.random()/10)
if (time.time() - statusmsg) > 180: if (time.time() - statusmsg) > 180:
_log('running %d thread(s) over %d' % (len(threads), config.ppf.threads), 'ppf') _log('running %d thread(s) over %d' % (len(threads), config.ppf.threads), 'ppf')

170
tools/lib/ppf-common.sh Normal file
View File

@@ -0,0 +1,170 @@
#!/bin/bash
# ppf-common.sh -- shared library for PPF operations toolkit
# Source this file; do not execute directly.
set -eu
# ---------------------------------------------------------------------------
# Paths
# ---------------------------------------------------------------------------
PPF_DIR="${PPF_DIR:-$HOME/git/ppf}"
ANSIBLE_DIR="/opt/ansible"
ANSIBLE_VENV="${ANSIBLE_DIR}/venv/bin/activate"
PPF_INVENTORY="${PPF_DIR}/tools/playbooks/inventory.ini"
# ---------------------------------------------------------------------------
# Host topology
# ---------------------------------------------------------------------------
MASTER="odin"
WORKERS="cassius edge sentinel"
ALL_HOSTS="odin cassius edge sentinel"
# Container names per role
MASTER_CONTAINER="ppf"
WORKER_CONTAINER="ppf-worker"
# ---------------------------------------------------------------------------
# Colors (respects NO_COLOR -- https://no-color.org)
# ---------------------------------------------------------------------------
if [ -z "${NO_COLOR:-}" ] && [ -t 1 ]; then
C_RST='\033[0m'
C_DIM='\033[2m'
C_BOLD='\033[1m'
C_RED='\033[38;5;167m'
C_GREEN='\033[38;5;114m'
C_YELLOW='\033[38;5;180m'
C_BLUE='\033[38;5;110m'
C_CYAN='\033[38;5;116m'
else
C_RST='' C_DIM='' C_BOLD='' C_RED='' C_GREEN=''
C_YELLOW='' C_BLUE='' C_CYAN=''
fi
# ---------------------------------------------------------------------------
# Output helpers
# ---------------------------------------------------------------------------
log_ok() { printf "${C_GREEN}${C_RST} %s\n" "$*"; }
log_err() { printf "${C_RED}${C_RST} %s\n" "$*" >&2; }
log_warn() { printf "${C_YELLOW}${C_RST} %s\n" "$*"; }
log_info() { printf "${C_BLUE}${C_RST} %s\n" "$*"; }
log_dim() { printf "${C_DIM} %s${C_RST}\n" "$*"; }
die() { log_err "$@"; exit 1; }
# Section header
section() {
printf "\n${C_BOLD}${C_CYAN} %s${C_RST}\n" "$*"
}
# ---------------------------------------------------------------------------
# Host resolution helpers
# ---------------------------------------------------------------------------
is_master() { [ "$1" = "$MASTER" ]; }
is_worker() {
local h
for h in $WORKERS; do [ "$h" = "$1" ] && return 0; done
return 1
}
container_name() {
if is_master "$1"; then echo "$MASTER_CONTAINER"; else echo "$WORKER_CONTAINER"; fi
}
# Expand target aliases into host list
# "all" -> all hosts
# "workers" -> worker hosts
# "odin" -> just odin
# Multiple args are concatenated with comma
resolve_targets() {
local targets=""
local arg
for arg in "$@"; do
case "$arg" in
all) targets="${targets:+$targets }$ALL_HOSTS" ;;
workers) targets="${targets:+$targets }$WORKERS" ;;
master) targets="${targets:+$targets }$MASTER" ;;
*) targets="${targets:+$targets }$arg" ;;
esac
done
# Deduplicate while preserving order
echo "$targets" | tr ' ' '\n' | awk '!seen[$0]++' | tr '\n' ' ' | sed 's/ $//'
}
# Convert space-separated host list to comma-separated for ansible
hosts_csv() {
echo "$*" | tr ' ' ','
}
# ---------------------------------------------------------------------------
# Ansible wrapper
# ---------------------------------------------------------------------------
# Runs ansible with toolkit inventory via venv.
# Usage: ansible_cmd <ansible args...>
ansible_cmd() {
(
# shellcheck disable=SC1090
. "$ANSIBLE_VENV"
cd "$ANSIBLE_DIR"
ansible -i "$PPF_INVENTORY" --become "$@"
)
}
# Runs ansible-playbook with toolkit inventory via venv.
# Usage: ansible_playbook_cmd <ansible-playbook args...>
ansible_playbook_cmd() {
(
# shellcheck disable=SC1090
. "$ANSIBLE_VENV"
cd "$ANSIBLE_DIR"
ansible-playbook "$@"
)
}
# ---------------------------------------------------------------------------
# Remote podman/compose wrappers
# ---------------------------------------------------------------------------
# Run a podman command on a remote host as the podman user.
# Uses dynamic UID discovery.
# Usage: podman_cmd HOST "podman subcommand..."
podman_cmd() {
local host="$1"; shift
local cmd="$*"
ansible_cmd "$host" -m raw -a \
"uid=\$(id -u podman) && cd /tmp && sudo -u podman XDG_RUNTIME_DIR=/run/user/\$uid $cmd"
}
# Run a podman-compose subcommand on a remote host.
# Usage: compose_cmd HOST "subcommand [args]"
compose_cmd() {
local host="$1"; shift
local cmd="$*"
ansible_cmd "$host" -m raw -a \
"uid=\$(id -u podman) && sudo -u podman bash -c 'export XDG_RUNTIME_DIR=/run/user/'\$uid' && cd /home/podman/ppf && podman-compose $cmd'"
}
# ---------------------------------------------------------------------------
# Validation
# ---------------------------------------------------------------------------
validate_syntax() {
local errors=0
local f
section "Validating Python syntax"
for f in "$PPF_DIR"/*.py; do
[ -f "$f" ] || continue
if python3 -m py_compile "$f" 2>/dev/null; then
log_dim "$(basename "$f")"
else
log_err "$(basename "$f")"
errors=$((errors + 1))
fi
done
if [ "$errors" -gt 0 ]; then
die "$errors file(s) failed syntax check"
fi
log_ok "All files valid"
}
# ---------------------------------------------------------------------------
# Version
# ---------------------------------------------------------------------------
PPF_TOOLS_VERSION="1.0.0"

View File

@@ -0,0 +1,58 @@
---
- name: Deploy PPF code
hosts: ppf
gather_facts: false
become: true
tasks:
- name: Sync Python code and support files
ansible.posix.synchronize:
src: "{{ ppf_src }}/"
dest: "{{ ppf_code_dest }}"
rsync_opts:
- "--include=*.py"
- "--include=servers.txt"
- "--include=Dockerfile"
- "--exclude=*"
register: sync_result
notify: restart containers
- name: Deploy compose file
ansible.builtin.copy:
src: "{{ ppf_src }}/{{ ppf_compose_src }}"
dest: "{{ ppf_base }}/compose.yml"
owner: "{{ ppf_owner }}"
group: "{{ ppf_owner }}"
register: compose_result
notify: restart containers
- name: Fix file ownership
ansible.builtin.file:
path: "{{ ppf_base }}"
owner: "{{ ppf_owner }}"
group: "{{ ppf_owner }}"
recurse: true
- name: Flush handlers before status check
ansible.builtin.meta: flush_handlers
- name: Wait for containers to settle
ansible.builtin.pause:
seconds: 2
when: >-
ppf_restart | bool and
(sync_result is changed or compose_result is changed)
- name: Check container status
ansible.builtin.raw: "uid=$(id -u {{ ppf_owner }}) && sudo -u {{ ppf_owner }} bash -c 'export XDG_RUNTIME_DIR=/run/user/'$uid' && cd {{ ppf_base }} && podman-compose ps'"
register: status_result
changed_when: false
- name: Show container status
ansible.builtin.debug:
msg: "{{ status_result.stdout_lines | default([]) }}"
handlers:
- name: restart containers
ansible.builtin.raw: "uid=$(id -u {{ ppf_owner }}) && sudo -u {{ ppf_owner }} bash -c 'export XDG_RUNTIME_DIR=/run/user/'$uid' && cd {{ ppf_base }} && podman-compose down && podman-compose up -d'"
when: ppf_restart | bool

View File

@@ -0,0 +1,3 @@
ppf_base: /home/podman/ppf
ppf_owner: podman
ppf_restart: true

View File

@@ -0,0 +1,2 @@
ppf_code_dest: /home/podman/ppf/
ppf_compose_src: compose.master.yml

View File

@@ -0,0 +1,2 @@
ppf_code_dest: /home/podman/ppf/src/
ppf_compose_src: compose.worker.yml

View File

@@ -0,0 +1,16 @@
[master]
odin ansible_host=10.200.1.250
[workers]
cassius ansible_host=10.200.1.13
edge ansible_host=10.200.1.254
sentinel ansible_host=10.200.1.1
[ppf:children]
master
workers
[ppf:vars]
ansible_user=ansible
ansible_ssh_private_key_file=/opt/ansible/secrets/ssh/ansible
ansible_remote_tmp=~/.ansible/tmp

154
tools/ppf-db Executable file
View File

@@ -0,0 +1,154 @@
#!/bin/bash
# ppf-db -- manage PPF databases
#
# Usage:
# ppf-db <command> [options]
#
# Commands: stats, purge-proxies, vacuum
set -eu
# Resolve to real path (handles symlinks from ~/.local/bin/)
SCRIPT_PATH="$(cd "$(dirname "$0")" && pwd)/$(basename "$0")"
SCRIPT_DIR="$(dirname "$(readlink -f "$SCRIPT_PATH")")"
# shellcheck disable=SC1091
. "$SCRIPT_DIR/lib/ppf-common.sh"
PROXY_DB="/home/podman/ppf/data/proxies.sqlite"
URL_DB="/home/podman/ppf/data/websites.sqlite"
# ---------------------------------------------------------------------------
# Usage
# ---------------------------------------------------------------------------
usage() {
cat <<EOF
Usage: ppf-db <command> [options]
Manage PPF databases on odin (master).
Commands:
stats show proxy and URL counts
purge-proxies delete all proxies (keeps URLs)
vacuum reclaim disk space after purge
Options:
--help show this help
--version show version
Examples:
ppf-db stats
ppf-db purge-proxies
ppf-db vacuum
EOF
exit 0
}
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
run_sql() {
local db="$1" sql="$2"
ansible_cmd "$MASTER" -m raw -a \
"sudo -u podman sqlite3 '$db' \"$sql\"" 2>/dev/null \
| grep -vE '^\s*$|^odin|Shared connection|CHANGED|SUCCESS'
}
# ---------------------------------------------------------------------------
# Commands
# ---------------------------------------------------------------------------
cmd_stats() {
section "Database stats (odin)"
local proxies total_urls active_urls working
proxies=$(run_sql "$PROXY_DB" "SELECT COUNT(*) FROM proxylist;")
working=$(run_sql "$PROXY_DB" "SELECT COUNT(*) FROM proxylist WHERE failed=0 AND proto IS NOT NULL;")
total_urls=$(run_sql "$URL_DB" "SELECT COUNT(*) FROM uris;")
active_urls=$(run_sql "$URL_DB" "SELECT COUNT(*) FROM uris WHERE error=0;")
log_info "Proxies: ${proxies} total, ${working} working"
log_info "URLs: ${total_urls} total, ${active_urls} active"
}
cmd_purge_proxies() {
section "Purging proxies from odin"
# Get counts before
local before
before=$(run_sql "$PROXY_DB" "SELECT COUNT(*) FROM proxylist;")
log_info "Proxies before: $before"
# Stop container
log_info "Stopping container..."
compose_cmd "$MASTER" "down" > /dev/null 2>&1 \
&& log_ok "Container stopped" \
|| die "Failed to stop container"
# Delete proxies
log_info "Deleting proxylist rows..."
run_sql "$PROXY_DB" "DELETE FROM proxylist;" > /dev/null 2>&1
log_ok "Proxylist purged"
# Vacuum to reclaim space
log_info "Vacuuming database..."
run_sql "$PROXY_DB" "VACUUM;" > /dev/null 2>&1
log_ok "Database vacuumed"
# Verify URLs intact
local urls_after
urls_after=$(run_sql "$URL_DB" "SELECT COUNT(*) FROM uris;")
log_ok "URLs preserved: $urls_after"
# Start container
log_info "Starting container..."
compose_cmd "$MASTER" "up -d" > /dev/null 2>&1 \
&& log_ok "Container started" \
|| die "Failed to start container"
}
cmd_vacuum() {
section "Vacuuming database (odin)"
local before after
before=$(ansible_cmd "$MASTER" -m raw -a \
"sudo -u podman ls -lh '$PROXY_DB'" 2>/dev/null \
| grep -oE '[0-9]+[KMG]?' | head -1)
run_sql "$PROXY_DB" "VACUUM;" > /dev/null 2>&1
after=$(ansible_cmd "$MASTER" -m raw -a \
"sudo -u podman ls -lh '$PROXY_DB'" 2>/dev/null \
| grep -oE '[0-9]+[KMG]?' | head -1)
log_ok "Vacuumed: ${before:-?} -> ${after:-?}"
}
# ---------------------------------------------------------------------------
# Parse args
# ---------------------------------------------------------------------------
[ $# -eq 0 ] && usage
COMMAND=""
while [ $# -gt 0 ]; do
case "$1" in
--help|-h) usage ;;
--version|-V) echo "ppf-db $PPF_TOOLS_VERSION"; exit 0 ;;
stats|purge-proxies|vacuum)
[ -n "$COMMAND" ] && die "Multiple commands given"
COMMAND="$1"
;;
-*) die "Unknown option: $1" ;;
*) die "Unknown command: $1" ;;
esac
shift
done
[ -z "$COMMAND" ] && die "No command given. Use: stats, purge-proxies, vacuum"
case "$COMMAND" in
stats) cmd_stats ;;
purge-proxies) cmd_purge_proxies ;;
vacuum) cmd_vacuum ;;
esac
printf "\n"

115
tools/ppf-deploy Executable file
View File

@@ -0,0 +1,115 @@
#!/bin/bash
# ppf-deploy -- deploy PPF code to nodes
#
# Usage:
# ppf-deploy [options] [targets...]
#
# Targets:
# all odin + all workers (default)
# workers cassius, edge, sentinel
# master odin
# <hostname> specific host(s)
set -eu
# Resolve to real path (handles symlinks from ~/.local/bin/)
SCRIPT_PATH="$(cd "$(dirname "$0")" && pwd)/$(basename "$0")"
SCRIPT_DIR="$(dirname "$(readlink -f "$SCRIPT_PATH")")"
# shellcheck disable=SC1091
. "$SCRIPT_DIR/lib/ppf-common.sh"
PLAYBOOK_DIR="$SCRIPT_DIR/playbooks"
# ---------------------------------------------------------------------------
# Usage
# ---------------------------------------------------------------------------
usage() {
cat <<EOF
Usage: ppf-deploy [options] [targets...]
Deploy PPF code to nodes via Ansible playbook.
Targets:
all odin + all workers (default)
workers cassius, edge, sentinel
master odin
<hostname> specific host(s)
Options:
--no-restart sync files only, skip container restart
--check dry run (ansible --check --diff)
-v verbose ansible output
--help show this help
--version show version
Steps performed:
1. Validate Python syntax locally
2. Rsync *.py + servers.txt (role-aware destinations)
3. Copy compose file per role
4. Fix ownership (podman:podman)
5. Restart containers on change (unless --no-restart)
6. Show container status
EOF
exit 0
}
# ---------------------------------------------------------------------------
# Parse args
# ---------------------------------------------------------------------------
DO_RESTART=1
CHECK_MODE=0
VERBOSE=""
TARGETS=""
while [ $# -gt 0 ]; do
case "$1" in
--help|-h) usage ;;
--version|-V) echo "ppf-deploy $PPF_TOOLS_VERSION"; exit 0 ;;
--no-restart) DO_RESTART=0 ;;
--check) CHECK_MODE=1 ;;
-v) VERBOSE="-v" ;;
-*) die "Unknown option: $1" ;;
*) TARGETS="${TARGETS:+$TARGETS }$1" ;;
esac
shift
done
TARGETS="${TARGETS:-all}"
# ---------------------------------------------------------------------------
# Pre-flight: local syntax validation
# ---------------------------------------------------------------------------
validate_syntax
# ---------------------------------------------------------------------------
# Build ansible-playbook arguments
# ---------------------------------------------------------------------------
ARGS=(-i "$PLAYBOOK_DIR/inventory.ini")
ARGS+=(-e "ppf_src=$PPF_DIR")
if [ "$DO_RESTART" -eq 0 ]; then
ARGS+=(-e "ppf_restart=false")
fi
if [ "$CHECK_MODE" -eq 1 ]; then
ARGS+=(--check --diff)
fi
[ -n "$VERBOSE" ] && ARGS+=("$VERBOSE")
# Target resolution: map aliases to ansible --limit
case "$TARGETS" in
all) ;; # no --limit = all hosts in inventory
*)
LIMIT=$(resolve_targets $TARGETS | tr ' ' ',')
ARGS+=(--limit "$LIMIT")
;;
esac
ARGS+=("$PLAYBOOK_DIR/deploy.yml")
# ---------------------------------------------------------------------------
# Run playbook
# ---------------------------------------------------------------------------
section "Deploying to ${TARGETS}"
ansible_playbook_cmd "${ARGS[@]}"

80
tools/ppf-logs Executable file
View File

@@ -0,0 +1,80 @@
#!/bin/bash
# ppf-logs -- view PPF container logs
#
# Usage:
# ppf-logs [options] [node]
#
# Defaults to odin if no node specified.
set -eu
# Resolve to real path (handles symlinks from ~/.local/bin/)
SCRIPT_PATH="$(cd "$(dirname "$0")" && pwd)/$(basename "$0")"
SCRIPT_DIR="$(dirname "$(readlink -f "$SCRIPT_PATH")")"
# shellcheck disable=SC1091
. "$SCRIPT_DIR/lib/ppf-common.sh"
# ---------------------------------------------------------------------------
# Usage
# ---------------------------------------------------------------------------
usage() {
cat <<EOF
Usage: ppf-logs [options] [node]
View PPF container logs.
Nodes:
odin, cassius, edge, sentinel (default: odin)
Options:
-f follow log output
-n LINES number of lines to show (default: 40)
--help show this help
--version show version
Examples:
ppf-logs last 40 lines from odin
ppf-logs cassius last 40 lines from cassius
ppf-logs -f edge follow edge worker logs
ppf-logs -n 100 sentinel last 100 lines from sentinel
EOF
exit 0
}
# ---------------------------------------------------------------------------
# Parse args
# ---------------------------------------------------------------------------
FOLLOW=0
LINES=40
NODE=""
while [ $# -gt 0 ]; do
case "$1" in
--help|-h) usage ;;
--version|-V) echo "ppf-logs $PPF_TOOLS_VERSION"; exit 0 ;;
-f) FOLLOW=1 ;;
-n) shift; LINES="${1:?'-n' requires a number}" ;;
-*) die "Unknown option: $1" ;;
*) NODE="$1" ;;
esac
shift
done
NODE="${NODE:-$MASTER}"
# Validate node
is_master "$NODE" || is_worker "$NODE" || die "Unknown node: $NODE"
CNAME=$(container_name "$NODE")
# ---------------------------------------------------------------------------
# Build podman logs command
# ---------------------------------------------------------------------------
CMD="podman logs --tail $LINES"
[ "$FOLLOW" -eq 1 ] && CMD="$CMD -f"
CMD="$CMD $CNAME"
section "$NODE ($CNAME)"
# Run with raw output -- logs go straight to terminal
podman_cmd "$NODE" "$CMD"

186
tools/ppf-service Executable file
View File

@@ -0,0 +1,186 @@
#!/bin/bash
# ppf-service -- manage PPF containers
#
# Usage:
# ppf-service <command> [nodes...]
#
# Commands: status, start, stop, restart
set -eu
# Resolve to real path (handles symlinks from ~/.local/bin/)
SCRIPT_PATH="$(cd "$(dirname "$0")" && pwd)/$(basename "$0")"
SCRIPT_DIR="$(dirname "$(readlink -f "$SCRIPT_PATH")")"
# shellcheck disable=SC1091
. "$SCRIPT_DIR/lib/ppf-common.sh"
ODIN_URL="http://10.200.1.250:8081"
# ---------------------------------------------------------------------------
# Usage
# ---------------------------------------------------------------------------
usage() {
cat <<EOF
Usage: ppf-service <command> [nodes...]
Manage PPF containers on remote nodes.
Commands:
status show container state + health (default nodes: all)
start start containers (compose up -d)
stop stop containers (compose stop)
restart restart containers (compose restart)
Nodes:
all odin + all workers (default)
workers cassius, edge, sentinel
master odin
<hostname> specific host(s)
Options:
--help show this help
--version show version
Examples:
ppf-service status
ppf-service restart workers
ppf-service stop cassius edge
ppf-service start odin
EOF
exit 0
}
# ---------------------------------------------------------------------------
# Status helpers
# ---------------------------------------------------------------------------
show_health() {
local result
result=$(ansible_cmd "$MASTER" -m raw -a \
"curl -sf --max-time 5 ${ODIN_URL}/health 2>/dev/null || echo UNREACHABLE" \
2>/dev/null) || true
if echo "$result" | grep -qi "ok\|healthy"; then
log_ok "master health: ok"
elif echo "$result" | grep -qi "UNREACHABLE"; then
log_err "master health: unreachable"
else
log_warn "master health: $result"
fi
}
show_workers_api() {
local result
result=$(ansible_cmd "$MASTER" -m raw -a \
"curl -sf --max-time 5 ${ODIN_URL}/api/workers 2>/dev/null || echo '{}'" \
2>/dev/null) || true
# Just show the raw output, trimmed
local data
data=$(echo "$result" | grep -v '^\s*$' | grep -v '^[A-Z]' | head -20)
if [ -n "$data" ]; then
log_info "Worker API response:"
echo "$data" | while IFS= read -r line; do
log_dim "$line"
done
fi
}
# ---------------------------------------------------------------------------
# Commands
# ---------------------------------------------------------------------------
cmd_status() {
local hosts="$1"
section "Container status"
for host in $hosts; do
local output
output=$(compose_cmd "$host" "ps" 2>/dev/null) || true
if echo "$output" | grep -qi "up\|running"; then
log_ok "$host"
elif echo "$output" | grep -qi "exit"; then
log_err "$host (exited)"
else
log_warn "$host (unknown)"
fi
echo "$output" | grep -v '^\s*$' | while IFS= read -r line; do
log_dim "$line"
done
done
# Show health/worker info if master is in target list
local h
for h in $hosts; do
if is_master "$h"; then
section "Master health"
show_health
show_workers_api
break
fi
done
}
cmd_start() {
local hosts="$1"
section "Starting containers"
for host in $hosts; do
compose_cmd "$host" "up -d" > /dev/null 2>&1 \
&& log_ok "$host started" \
|| log_err "$host start failed"
done
}
cmd_stop() {
local hosts="$1"
section "Stopping containers"
for host in $hosts; do
compose_cmd "$host" "stop" > /dev/null 2>&1 \
&& log_ok "$host stopped" \
|| log_err "$host stop failed"
done
}
cmd_restart() {
local hosts="$1"
section "Restarting containers"
for host in $hosts; do
compose_cmd "$host" "down" > /dev/null 2>&1 \
&& compose_cmd "$host" "up -d" > /dev/null 2>&1 \
&& log_ok "$host restarted" \
|| log_err "$host restart failed"
done
}
# ---------------------------------------------------------------------------
# Parse args
# ---------------------------------------------------------------------------
[ $# -eq 0 ] && usage
COMMAND=""
TARGETS=""
while [ $# -gt 0 ]; do
case "$1" in
--help|-h) usage ;;
--version|-V) echo "ppf-service $PPF_TOOLS_VERSION"; exit 0 ;;
status|start|stop|restart)
[ -n "$COMMAND" ] && die "Multiple commands given"
COMMAND="$1"
;;
-*) die "Unknown option: $1" ;;
*) TARGETS="${TARGETS:+$TARGETS }$1" ;;
esac
shift
done
[ -z "$COMMAND" ] && die "No command given. Use: status, start, stop, restart"
TARGETS="${TARGETS:-all}"
HOSTS=$(resolve_targets $TARGETS)
[ -z "$HOSTS" ] && die "No valid targets"
case "$COMMAND" in
status) cmd_status "$HOSTS" ;;
start) cmd_start "$HOSTS" ;;
stop) cmd_stop "$HOSTS" ;;
restart) cmd_restart "$HOSTS" ;;
esac
printf "\n"