Compare commits

...

31 Commits

Author SHA1 Message Date
Username
c091216afc docs: add ppf-db to README operations toolkit
Some checks failed
CI / syntax-check (push) Failing after 1s
CI / memory-leak-check (push) Successful in 16s
2026-02-18 00:29:28 +01:00
Username
4cefdf976c docs: update CLAUDE.md for ppf-db and corrected odin role 2026-02-18 00:29:25 +01:00
Username
98c2e74412 ppf: skip URL cycling when ppf.threads = 0 2026-02-18 00:28:37 +01:00
Username
24d6f345f6 tools: add ppf-db for database operations 2026-02-18 00:28:27 +01:00
Username
1ca096c78a ppf-service: use down+up for restart to pick up code changes 2026-02-18 00:22:55 +01:00
Username
15a7f0bb6a ppf-common: fix compose_cmd to run as podman user 2026-02-18 00:22:52 +01:00
Username
b6045bd05c tools: use down+up in deploy handler to pick up code changes 2026-02-18 00:22:48 +01:00
Username
d7b004f0ac httpd: include protocol in /proxies plain text format 2026-02-18 00:18:58 +01:00
Username
00952b7947 fix: call evaluate() in worker mode before checking results
failcount was initialized to 0 and never updated because
evaluate() was skipped, causing all proxies to pass.
2026-02-18 00:16:35 +01:00
Username
6800995361 docs: reflect podman-compose on all nodes
Remove stale systemd unit and standalone podman run references.
All nodes now managed exclusively via compose.
2026-02-17 23:44:35 +01:00
Username
7a271896a8 ppf-common: fix ad-hoc ansible for toolkit inventory
Add --become to ansible_cmd (needed when connecting as
ansible user). Add cd /tmp to podman_cmd so sudo -u podman
doesn't fail on inaccessible /home/ansible cwd.
2026-02-17 23:38:13 +01:00
Username
8779979780 tools: use compose up -d for ppf-service restart 2026-02-17 23:23:10 +01:00
Username
195d25c653 tools: use compose up -d instead of restart in handler
compose restart reuses the existing container config; up -d
recreates from compose.yml, picking up changes like renamed
CLI flags.
2026-02-17 23:22:33 +01:00
Username
9b8be9d302 tools: use toolkit inventory for all ansible commands
Route ansible_cmd through ppf inventory instead of /opt/ansible
default. Eliminates dynamic inventory warnings and connects
via WireGuard IPs.
2026-02-17 23:22:29 +01:00
Username
9eff4496d6 docs: update README and ROADMAP for playbook deployment 2026-02-17 23:19:59 +01:00
Username
b1de91a969 docs: update CLAUDE.md for playbook-based deployment
Document WireGuard connectivity, playbook architecture, --check
flag, parallel execution, and updated ad-hoc ansible commands
using toolkit inventory.
2026-02-17 23:19:54 +01:00
Username
df2078c7f7 tools: fix symlink resolution in ppf-logs and ppf-service 2026-02-17 23:18:50 +01:00
Username
782deab95d tools: rewrite ppf-deploy as playbook wrapper
Replace sequential ansible ad-hoc calls with ansible-playbook.
Add ansible_playbook_cmd to shared library. Supports --check
for dry runs.
2026-02-17 23:18:46 +01:00
Username
8208670fc1 tools: add ansible deploy playbook
Parallel execution across hosts, handler-based restart on change,
role-aware paths via group_vars. Connects over WireGuard with
dedicated inventory and SSH key.
2026-02-17 23:18:41 +01:00
Username
d902ecafff docs: add tools to ROADMAP.md file reference 2026-02-17 22:53:01 +01:00
Username
fdb761f9f1 docs: add operations toolkit to README.md 2026-02-17 22:52:58 +01:00
Username
12f6b1d8eb docs: update CLAUDE.md for operations toolkit
Replace verbose ansible deployment commands with ppf-deploy,
ppf-logs, and ppf-service references. Keep raw ansible only
for ad-hoc config operations not covered by tools.
2026-02-17 22:52:54 +01:00
Username
1f14173595 tools: add ppf-service
Status, start, stop, restart for PPF containers. Status includes
compose ps, master health check, and worker API query.
2026-02-17 22:50:42 +01:00
Username
2128814a41 tools: add ppf-logs
View container logs with -f follow and -n line count. Resolves
dynamic UID and container name per node role.
2026-02-17 22:50:38 +01:00
Username
7f59cae05c tools: add ppf-deploy
Validates syntax, rsyncs code, copies compose files, fixes ownership,
restarts containers. Supports --no-restart and per-host targeting.
2026-02-17 22:50:34 +01:00
Username
9b7ca20728 tools: add shared library ppf-common.sh
Host topology, ansible/podman/compose wrappers, color helpers,
syntax validation, and target resolution for the PPF ops toolkit.
2026-02-17 22:50:30 +01:00
Username
82c909d7c0 rename --worker-v2 to --worker
No V1 means no need for the suffix. Update flag, function name,
compose command, log messages, and docs.
2026-02-17 22:30:09 +01:00
Username
cb52a978e9 todo: mark V1 worker deprecation complete 2026-02-17 22:13:49 +01:00
Username
224d3642f9 config: remove V1 worker options
Drop --worker flag, batch_size, and claim_timeout. V2 is the
only worker protocol; --worker-v2 and --register remain.
2026-02-17 22:13:31 +01:00
Username
d184dc2926 httpd: remove V1 work distribution and result submission
Drop _work_claims tracking, claim_work(), submit_results(),
get_due_proxy_count(), calculate_fair_batch_size(), and the
/api/work + /api/results endpoint handlers.
2026-02-17 22:12:57 +01:00
Username
2782e6d754 ppf: remove V1 worker functions and main loop
Drop worker_get_work(), worker_submit_results(), and the entire
worker_main() V1 loop. Rewire --register to use worker_v2_main().
2026-02-17 22:10:38 +01:00
18 changed files with 953 additions and 842 deletions

217
CLAUDE.md
View File

@@ -6,7 +6,7 @@
┌──────────┬─────────────┬────────────────────────────────────────────────────────┐
│ Host │ Role │ Notes
├──────────┼─────────────┼────────────────────────────────────────────────────────┤
│ odin │ Master │ Scrapes proxy lists, verifies conflicts, port 8081
│ odin │ Master │ API server, serves URLs to workers, port 8081
│ cassius │ Worker │ Tests proxies, reports to master via WireGuard
│ edge │ Worker │ Tests proxies, reports to master via WireGuard
│ sentinel │ Worker │ Tests proxies, reports to master via WireGuard
@@ -15,8 +15,8 @@
### Role Separation
- **Odin (Master)**: Scrapes proxy sources, does verification tests only. No routine testing. Local Tor only.
- **Workers**: All routine proxy testing. Each uses only local Tor (127.0.0.1:9050).
- **Odin (Master)**: API server only. No proxy testing, no URL cycling. Serves `/api/claim-urls` to workers, receives results. Local Tor only.
- **Workers**: All URL fetching (via `/api/claim-urls`) and proxy testing. Each uses only local Tor (127.0.0.1:9050).
## CRITICAL: Directory Structure Differences
@@ -25,95 +25,103 @@
│ Host │ Code Location │ Container Mount
├──────────┼─────────────────────────┼──────────────────────────────────────────┤
│ odin │ /home/podman/ppf/*.py │ Mounts ppf/ directly to /app
│ workers │ /home/podman/ppf/src/ │ Mounts ppf/src/ to /app (via systemd)
│ workers │ /home/podman/ppf/src/ │ Mounts ppf/src/ to /app (via compose)
└──────────┴─────────────────────────┴──────────────────────────────────────────┘
```
**ODIN uses root ppf/ directory. WORKERS use ppf/src/ subdirectory.**
## Host Access
## Operations Toolkit
**ALWAYS use Ansible from `/opt/ansible` with venv activated:**
All deployment and service management is handled by `tools/`:
```
tools/
lib/ppf-common.sh shared library (hosts, wrappers, colors)
ppf-deploy deploy wrapper (local validation + playbook)
ppf-logs view container logs
ppf-service manage containers (status/start/stop/restart)
ppf-db database operations (stats/purge-proxies/vacuum)
playbooks/
deploy.yml ansible playbook (sync, compose, restart)
inventory.ini hosts with WireGuard IPs + SSH key
group_vars/
all.yml shared vars (ppf_base, ppf_owner)
master.yml odin paths + compose file
workers.yml worker paths + compose file
```
Symlinked to `~/.local/bin/` for direct use.
### Connectivity
All tools connect over WireGuard (`10.200.1.0/24`) as user `ansible`
with the SSH key at `/opt/ansible/secrets/ssh/ansible`.
### Deployment
`ppf-deploy` validates syntax locally, then runs the Ansible playbook.
Hosts execute in parallel; containers restart only when files change.
```bash
ppf-deploy # all nodes: validate, sync, restart
ppf-deploy odin # master only
ppf-deploy workers # cassius, edge, sentinel
ppf-deploy cassius edge # specific hosts
ppf-deploy --no-restart # sync only, skip restart
ppf-deploy --check # dry run (ansible --check --diff)
ppf-deploy -v # verbose ansible output
```
Playbook steps (per host, in parallel):
1. Rsync `*.py` + `servers.txt` (role-aware destination via group_vars)
2. Copy compose file per role (`compose.master.yml` / `compose.worker.yml`)
3. Fix ownership (`podman:podman`, recursive)
4. Restart containers via handler (only if files changed)
5. Show container status
### Container Logs
```bash
ppf-logs # last 40 lines from odin
ppf-logs cassius # specific worker
ppf-logs -f edge # follow mode
ppf-logs -n 100 sentinel # last N lines
```
### Service Management
```bash
ppf-service status # all nodes: compose ps + health
ppf-service status workers # workers only
ppf-service restart odin # restart master
ppf-service stop cassius # stop specific worker
ppf-service start workers # start all workers
```
### Database Management
```bash
ppf-db stats # proxy and URL counts
ppf-db purge-proxies # stop odin, delete all proxies, restart
ppf-db vacuum # reclaim disk space
```
### Direct Ansible (for operations not covered by tools)
Use the toolkit inventory for ad-hoc commands over WireGuard:
```bash
cd /opt/ansible && source venv/bin/activate
```
### Quick Reference Commands
```bash
# Check worker status
ANSIBLE_REMOTE_TMP=/tmp/.ansible ansible cassius,edge,sentinel -m shell -a "hostname"
INV=/home/user/git/ppf/tools/playbooks/inventory.ini
# Check worker config
ANSIBLE_REMOTE_TMP=/tmp/.ansible ansible cassius,edge,sentinel -m shell -a "grep -E 'threads|timeout|ssl' /home/podman/ppf/config.ini"
# Check worker logs (dynamic UID)
ANSIBLE_REMOTE_TMP=/tmp/.ansible ansible cassius -m raw \
-a "uid=\$(id -u podman) && sudo -u podman podman logs --tail 20 ppf-worker"
ansible -i $INV workers -m shell \
-a "grep -E 'threads|timeout|ssl' /home/podman/ppf/config.ini"
# Modify config option
ANSIBLE_REMOTE_TMP=/tmp/.ansible ansible cassius,edge,sentinel -m lineinfile -a "path=/home/podman/ppf/config.ini line='ssl_only = 1' insertafter='ssl_first'"
# Restart workers via compose
ANSIBLE_REMOTE_TMP=/tmp/.ansible ansible cassius,edge,sentinel -m raw \
-a "uid=\$(id -u podman) && cd /home/podman/ppf && sudo -u podman XDG_RUNTIME_DIR=/run/user/\$uid podman-compose restart"
```
## Full Deployment Procedure
All hosts use `podman-compose` with `compose.yml` for container management.
Rsync deploys code; compose handles restart.
### Step 1: Validate Syntax Locally
```bash
cd /home/user/git/ppf
for f in *.py; do python3 -m py_compile "$f" && echo "OK: $f"; done
```
### Step 2: Deploy to ALL Hosts
```bash
cd /opt/ansible && source venv/bin/activate
# Deploy to ODIN (root ppf/ directory + compose.master.yml as compose.yml)
ANSIBLE_REMOTE_TMP=/tmp/.ansible ansible odin -m synchronize \
-a "src=/home/user/git/ppf/ dest=/home/podman/ppf/ rsync_opts='--include=*.py,--include=servers.txt,--include=Dockerfile,--exclude=*'"
ANSIBLE_REMOTE_TMP=/tmp/.ansible ansible odin -m copy \
-a "src=/home/user/git/ppf/compose.master.yml dest=/home/podman/ppf/compose.yml owner=podman group=podman"
# Deploy to WORKERS (ppf/src/ subdirectory + compose.worker.yml as compose.yml)
ANSIBLE_REMOTE_TMP=/tmp/.ansible ansible cassius,edge,sentinel -m synchronize \
-a "src=/home/user/git/ppf/ dest=/home/podman/ppf/src/ rsync_opts='--include=*.py,--include=servers.txt,--include=Dockerfile,--exclude=*'"
ANSIBLE_REMOTE_TMP=/tmp/.ansible ansible cassius,edge,sentinel -m copy \
-a "src=/home/user/git/ppf/compose.worker.yml dest=/home/podman/ppf/compose.yml owner=podman group=podman"
# CRITICAL: Fix ownership on ALL hosts (rsync uses ansible user, containers need podman)
ANSIBLE_REMOTE_TMP=/tmp/.ansible ansible odin,cassius,edge,sentinel -m raw \
-a "chown -R podman:podman /home/podman/ppf/"
```
**Note:** Ownership must be fixed after every deploy. rsync runs as ansible user, but containers run as podman user. Missing ownership fix causes `ImportError: No module named X` errors.
### Step 3: Restart Services
```bash
# Restart ODIN via compose
ANSIBLE_REMOTE_TMP=/tmp/.ansible ansible odin -m raw \
-a "uid=\$(id -u podman) && cd /home/podman/ppf && sudo -u podman XDG_RUNTIME_DIR=/run/user/\$uid podman-compose restart"
# Restart WORKERS via compose
ANSIBLE_REMOTE_TMP=/tmp/.ansible ansible cassius,edge,sentinel -m raw \
-a "uid=\$(id -u podman) && cd /home/podman/ppf && sudo -u podman XDG_RUNTIME_DIR=/run/user/\$uid podman-compose restart"
```
### Step 4: Verify All Running
```bash
# Check all hosts via compose
ANSIBLE_REMOTE_TMP=/tmp/.ansible ansible odin,cassius,edge,sentinel -m raw \
-a "uid=\$(id -u podman) && cd /home/podman/ppf && sudo -u podman XDG_RUNTIME_DIR=/run/user/\$uid podman-compose ps"
ansible -i $INV workers -m lineinfile \
-a "path=/home/podman/ppf/config.ini line='ssl_only = 1' insertafter='ssl_first'"
```
## Podman User IDs
@@ -140,11 +148,15 @@ ANSIBLE_REMOTE_TMP=/tmp/.ansible ansible odin,cassius,edge,sentinel -m raw \
tor_hosts = 127.0.0.1:9050 # Local Tor ONLY
[watchd]
threads = 0 # NO routine testing
database = data/ppf.sqlite
threads = 0 # NO proxy testing
database = data/proxies.sqlite
[ppf]
threads = 0 # NO URL cycling (workers handle it)
database = data/websites.sqlite
[scraper]
threads = 10
enabled = 0 # Disabled on master
```
### Worker config.ini
@@ -188,20 +200,15 @@ batch_size = clamp(fair_share, min=100, max=1000)
- Workers shuffle their batch locally to avoid testing same proxies simultaneously
- Claims expire after 5 minutes if not completed
## Worker Container
## Container Management
Workers run as podman containers with `--restart=unless-stopped`:
All nodes run via `podman-compose` with role-specific compose files:
```bash
podman run -d --name ppf-worker --network=host --restart=unless-stopped \
-e PYTHONUNBUFFERED=1 \
-v /home/podman/ppf/src:/app:ro,Z \
-v /home/podman/ppf/data:/app/data:Z \
-v /home/podman/ppf/config.ini:/app/config.ini:ro,Z \
-v /home/podman/ppf/servers.txt:/app/servers.txt:ro,Z \
localhost/ppf-worker:latest \
python -u ppf.py --worker --server http://10.200.1.250:8081
```
- **Odin**: `compose.master.yml` -> deployed as `compose.yml`
- **Workers**: `compose.worker.yml` -> deployed as `compose.yml`
Containers are managed exclusively through compose. No systemd user
services or standalone `podman run` commands.
## Rebuilding Images
@@ -234,10 +241,9 @@ ansible odin -m raw \
### Missing servers.txt
Workers need `servers.txt` in src/:
Redeploy syncs `servers.txt` automatically:
```bash
ansible cassius,edge,sentinel -m copy \
-a "src=/home/user/git/ppf/servers.txt dest=/home/podman/ppf/src/servers.txt owner=podman group=podman"
ppf-deploy workers
```
### Exit Code 126 (Permission/Storage)
@@ -249,22 +255,17 @@ sudo -u podman podman system reset --force
### Dashboard Shows NaN or Missing Data
Odin likely running old code. Redeploy to odin:
Odin likely running old code:
```bash
ANSIBLE_REMOTE_TMP=/tmp/.ansible ansible odin -m synchronize \
-a "src=/home/user/git/ppf/ dest=/home/podman/ppf/ rsync_opts='--include=*.py,--include=servers.txt,--exclude=*'"
ANSIBLE_REMOTE_TMP=/tmp/.ansible ansible odin -m raw -a "chown -R podman:podman /home/podman/ppf/"
ANSIBLE_REMOTE_TMP=/tmp/.ansible ansible odin -m raw \
-a "uid=\$(id -u podman) && cd /home/podman/ppf && sudo -u podman XDG_RUNTIME_DIR=/run/user/\$uid podman-compose restart"
ppf-deploy odin
```
### Worker Keeps Crashing
1. Check container status: `sudo -u podman podman ps -a`
2. Check logs: `sudo -u podman podman logs --tail 50 ppf-worker`
3. Verify servers.txt exists in src/
4. Check ownership: `ls -la /home/podman/ppf/src/`
5. Run manually to see error:
1. Check status: `ppf-service status workers`
2. Check logs: `ppf-logs -n 50 cassius`
3. Redeploy (fixes ownership + servers.txt): `ppf-deploy cassius`
4. If still failing, run manually on the host to see error:
```bash
sudo -u podman podman run --rm --network=host \
-v /home/podman/ppf/src:/app:ro,Z \

View File

@@ -197,46 +197,39 @@ stale_count INT -- checks without new proxies
## Deployment
### Systemd Service
```ini
[Unit]
Description=PPF Python Proxy Finder
After=network-online.target tor.service
Wants=network-online.target
[Service]
Type=simple
User=ppf
WorkingDirectory=/opt/ppf
# ppf.py is the main entry point (runs harvester + validator)
ExecStart=/usr/bin/python2 ppf.py
Restart=on-failure
RestartSec=30
[Install]
WantedBy=multi-user.target
```
### Container Deployment
All nodes use `podman-compose` with role-specific compose files
(rootless, as `podman` user). `--network=host` required for Tor
access at 127.0.0.1:9050.
```sh
# Build
# Build image
podman build -t ppf:latest .
# Run with persistent storage
# IMPORTANT: Use ppf.py as entry point (runs both harvester + validator)
podman run -d --name ppf \
--network=host \
-v ./data:/app/data:Z \
-v ./config.ini:/app/config.ini:ro \
ppf:latest python ppf.py
# Start via compose
podman-compose up -d
# Generate systemd unit
podman generate systemd --name ppf --files --new
# View logs / stop
podman-compose logs -f
podman-compose down
```
Note: `--network=host` required for Tor access at 127.0.0.1:9050.
### Operations Toolkit
The `tools/` directory provides CLI wrappers for multi-node operations.
Deployment uses an Ansible playbook over WireGuard for parallel execution
and handler-based restarts.
```sh
ppf-deploy [targets...] # validate + deploy + restart (playbook)
ppf-deploy --check # dry run with diff
ppf-logs [node] # view container logs (-f to follow)
ppf-service <cmd> [nodes...] # status / start / stop / restart
ppf-db <cmd> # stats / purge-proxies / vacuum
```
See `--help` on each tool.
## Troubleshooting

View File

@@ -89,3 +89,9 @@ PPF (Proxy Fetcher) is a Python 2 proxy scraping and validation framework design
| job.py | Priority job queue |
| static/dashboard.js | Dashboard frontend logic |
| static/dashboard.html | Dashboard HTML template |
| tools/lib/ppf-common.sh | Shared ops library (hosts, wrappers, colors) |
| tools/ppf-deploy | Deploy wrapper (validation + playbook) |
| tools/ppf-logs | View container logs |
| tools/ppf-service | Container lifecycle management |
| tools/playbooks/deploy.yml | Ansible deploy playbook |
| tools/playbooks/inventory.ini | Host inventory (WireGuard IPs) |

10
TODO.md
View File

@@ -51,13 +51,11 @@ Optimize only if memory becomes a constraint.
## Deprecation
### [ ] Remove V1 worker protocol
### [x] Remove V1 worker protocol
- V2 workers (URL-driven) are the standard; no V1 workers remain active
- Remove `--worker` flag and V1 code path in ppf.py
- Remove `/api/claim`, `/api/submit` V1 endpoints in httpd.py
- Remove V1 heartbeat/registration handling
- Clean up any V1-specific state tracking in proxywatchd.py
Completed. Removed `--worker` flag, `worker_main()`, `claim_work()`,
`submit_results()`, `/api/work`, `/api/results`, and related config
options. `--worker` now routes to the URL-driven protocol.
---

View File

@@ -35,4 +35,4 @@ services:
- ./data:/app/data:Z
- ./config.ini:/app/config.ini:ro,Z
- ./servers.txt:/app/servers.txt:ro,Z
command: python -u ppf.py --worker-v2 --server ${PPF_MASTER_URL:-http://10.200.1.250:8081}
command: python -u ppf.py --worker --server ${PPF_MASTER_URL:-http://10.200.1.250:8081}

View File

@@ -167,20 +167,17 @@ class Config(ComboParser):
self.add_item(section, 'spot_check_pct', float, 1.0, 'percent of working proxies to spot-check (default: 1.0)', False)
section = 'worker'
self.add_item(section, 'batch_size', int, 100, 'proxies per work batch (default: 100)', False)
self.add_item(section, 'heartbeat', int, 60, 'heartbeat interval in seconds (default: 60)', False)
self.add_item(section, 'claim_timeout', int, 300, 'seconds before unclaimed work is released (default: 300)', False)
self.add_item(section, 'url_batch_size', int, 5, 'URLs per claim cycle for V2 mode (default: 5)', False)
self.add_item(section, 'fetch_timeout', int, 30, 'timeout for URL fetching in V2 mode (default: 30)', False)
self.add_item(section, 'url_batch_size', int, 5, 'URLs per claim cycle (default: 5)', False)
self.add_item(section, 'fetch_timeout', int, 30, 'timeout for URL fetching (default: 30)', False)
self.aparser.add_argument("--file", help="import a single file containing proxy addrs", type=str, default='', required=False)
self.aparser.add_argument("--nobs", help="disable BeautifulSoup, use stdlib HTMLParser", action='store_true', default=False)
self.aparser.add_argument("-q", "--quiet", help="suppress info messages, show warnings and errors only", action='store_true', default=False)
self.aparser.add_argument("-v", "--verbose", help="show debug messages", action='store_true', default=False)
self.aparser.add_argument("--profile", help="enable cProfile profiling, output to profile.stats", action='store_true', default=False)
self.aparser.add_argument("--worker", help="run as worker node", action='store_true', default=False)
self.aparser.add_argument("--server", help="master server URL (e.g., https://master:8081)", type=str, default='')
self.aparser.add_argument("--worker-key", help="worker authentication key", type=str, default='')
self.aparser.add_argument("--register", help="register as worker with master server", action='store_true', default=False)
self.aparser.add_argument("--worker-name", help="worker name for registration (default: hostname)", type=str, default='')
self.aparser.add_argument("--worker-v2", help="run as V2 worker (URL-driven fetching)", action='store_true', default=False)
self.aparser.add_argument("--worker", help="run as worker node", action='store_true', default=False)

338
httpd.py
View File

@@ -73,11 +73,8 @@ import string
_workers = {} # worker_id -> {name, ip, last_seen, jobs_completed, proxies_tested, ...}
_workers_lock = threading.Lock()
_work_claims = {} # proxy_key -> {worker_id, claimed_at}
_work_claims_lock = threading.Lock()
_worker_keys = set() # valid API keys
_master_key = None # master key for worker registration
_claim_timeout = 300 # seconds before unclaimed work is released
_workers_file = 'data/workers.json' # persistent storage
# URL claim tracking (parallel to proxy claims)
@@ -101,11 +98,6 @@ _worker_test_history = {}
_worker_test_history_lock = threading.Lock()
_test_history_window = 120 # seconds to keep test history for rate calculation
# Fair distribution settings
_min_batch_size = 1 # minimum proxies per batch
_max_batch_size = 10000 # maximum proxies per batch
_worker_timeout = 120 # seconds before worker considered inactive
# Session tracking
_session_start_time = int(time.time()) # when httpd started
@@ -171,53 +163,6 @@ def _build_due_condition():
return condition, params
def get_active_worker_count():
"""Count workers seen within timeout window."""
now = time.time()
with _workers_lock:
return sum(1 for w in _workers.values()
if (now - w.get('last_seen', 0)) < _worker_timeout)
def get_due_proxy_count(db):
"""Count proxies due for testing (not claimed)."""
with _work_claims_lock:
claimed_count = len(_work_claims)
try:
condition, params = _build_due_condition()
query = 'SELECT COUNT(*) FROM proxylist WHERE ' + condition
result = db.execute(query, params).fetchone()
total_due = result[0] if result else 0
return max(0, total_due - claimed_count)
except Exception:
return 0
def calculate_fair_batch_size(db, worker_id):
"""Calculate fair batch size based on active workers and queue size.
Divides due work evenly among active workers. No artificial floor —
if only 6 proxies are due with 3 workers, each gets 2.
"""
active_workers = max(1, get_active_worker_count())
due_count = get_due_proxy_count(db)
if due_count == 0:
return 0
# Fair share: divide due work evenly among active workers
fair_share = max(1, int(due_count / active_workers))
# Clamp to upper bound only
batch_size = min(fair_share, _max_batch_size)
_log('fair_batch: due=%d workers=%d share=%d batch=%d' % (
due_count, active_workers, fair_share, batch_size), 'debug')
return batch_size
def load_workers():
"""Load worker registry from disk."""
global _workers, _worker_keys
@@ -355,101 +300,6 @@ def get_worker_test_rate(worker_id):
return 0.0
return total_tests / elapsed
def claim_work(db, worker_id, count=100):
"""Claim a batch of proxies for testing. Returns list of proxy dicts."""
now = time.time()
now_int = int(now)
# Calculate fair batch size based on active workers and queue size
# Distributes work evenly: due_proxies / active_workers (with bounds)
target_count = calculate_fair_batch_size(db, worker_id)
# Clean up stale claims and get current claimed set
with _work_claims_lock:
stale = [k for k, v in _work_claims.items() if now - v['claimed_at'] > _claim_timeout]
for k in stale:
del _work_claims[k]
# Copy current claims to exclude from query
claimed_keys = set(_work_claims.keys())
# Get proxies that need testing
# Priority: untested first, then oldest due - with randomness within tiers
try:
# Build exclusion clause for already-claimed proxies
# Use ip||':'||port to match our claim key format
if claimed_keys:
# SQLite placeholder limit is ~999, chunk if needed
placeholders = ','.join('?' for _ in claimed_keys)
exclude_clause = "AND (ip || ':' || port) NOT IN (%s)" % placeholders
exclude_params = list(claimed_keys)
else:
exclude_clause = ""
exclude_params = []
# Build due condition using new schedule formula
due_condition, due_params = _build_due_condition()
# Priority tiers: 0=untested, 1=very overdue (>1hr), 2=recently due
# Calculate overdue time based on new formula
if _fail_retry_backoff:
overdue_calc = '''
CASE WHEN failed = 0
THEN ? - (tested + ?)
ELSE ? - (tested + (failed * ?))
END
'''
priority_params = [now_int, _working_checktime, now_int, _fail_retry_interval]
else:
overdue_calc = '''
CASE WHEN failed = 0
THEN ? - (tested + ?)
ELSE ? - (tested + ?)
END
'''
priority_params = [now_int, _working_checktime, now_int, _fail_retry_interval]
query = '''
SELECT ip, port, proto, failed, source_proto,
CASE
WHEN tested IS NULL THEN 0
WHEN (%s) > 3600 THEN 1
ELSE 2
END as priority
FROM proxylist
WHERE %s
%s
ORDER BY priority, RANDOM()
LIMIT ?
''' % (overdue_calc, due_condition, exclude_clause)
params = priority_params + list(due_params) + exclude_params + [target_count]
rows = db.execute(query, params).fetchall()
except Exception as e:
_log('claim_work query error: %s' % e, 'error')
return []
# Claim the fetched proxies (already filtered by query)
claimed = []
with _work_claims_lock:
for row in rows:
proxy_key = '%s:%s' % (row[0], row[1])
# Double-check not claimed (race condition protection)
if proxy_key not in _work_claims:
_work_claims[proxy_key] = {'worker_id': worker_id, 'claimed_at': now}
claimed.append({
'ip': row[0],
'port': row[1],
'proto': row[2],
'failed': row[3],
'source_proto': row[4],
})
if claimed:
_log('claim_work: %d proxies to %s (pool: %d claimed)' % (
len(claimed), worker_id[:8], len(_work_claims)), 'info')
return claimed
def claim_urls(url_db, worker_id, count=5):
"""Claim a batch of URLs for worker-driven fetching. Returns list of URL dicts.
@@ -795,126 +645,6 @@ def submit_proxy_reports(db, worker_id, proxies):
return processed
_last_workers_save = 0
def submit_results(db, worker_id, results):
"""Process test results from a worker. Returns count of processed results."""
global _last_workers_save
processed = 0
working_count = 0
total_latency = 0
now = time.time()
with _workers_lock:
if worker_id in _workers:
_workers[worker_id]['last_seen'] = now
for r in results:
proxy_key = '%s:%s' % (r.get('ip', ''), r.get('port', ''))
# Release claim
with _work_claims_lock:
if proxy_key in _work_claims:
del _work_claims[proxy_key]
# Update database - trust workers, add missing proxies if working
try:
working = 1 if r.get('working') else 0
latency_ms = r.get('latency', 0) if working else None
error_cat = r.get('error_category') if not working else None
if working:
# Use INSERT OR REPLACE to add working proxies that don't exist
db.execute('''
INSERT INTO proxylist (proxy, ip, port, proto, failed, tested, avg_latency, added)
VALUES (?, ?, ?, ?, 0, ?, ?, ?)
ON CONFLICT(proxy) DO UPDATE SET
failed = 0,
tested = excluded.tested,
avg_latency = excluded.avg_latency
''', (proxy_key, r['ip'], r['port'], r.get('proto', 'http'), int(now),
latency_ms, int(now)))
working_count += 1
total_latency += latency_ms or 0
# Geolocate working proxy if IP2Location available
if _geolite and _geodb:
try:
rec = _geodb.get_all(r['ip'])
if rec and rec.country_short and rec.country_short != '-':
db.execute(
'UPDATE proxylist SET country=? WHERE proxy=?',
(rec.country_short, proxy_key))
except Exception:
pass # Geolocation is best-effort
else:
# For failures, only update if exists (don't add non-working proxies)
db.execute('''
UPDATE proxylist SET
failed = failed + 1,
tested = ?
WHERE ip = ? AND port = ?
''', (int(now), r['ip'], r['port']))
# Record result for verification system
insert_proxy_result(db, proxy_key, worker_id, working,
latency_ms=latency_ms, error_category=error_cat)
# Check for disagreement with other workers
disagreement, other_worker, other_result = check_for_disagreement(
db, proxy_key, worker_id, working)
if disagreement:
# Queue for manager verification (priority 3 = high)
queue_verification(db, proxy_key, 'disagreement', priority=3,
worker_a=worker_id, worker_b=other_worker,
result_a=working, result_b=other_result)
elif working:
# Check for resurrection: was dead (failed >= 3), now working
row = db.execute(
'SELECT failed FROM proxylist WHERE proxy = ?', (proxy_key,)
).fetchone()
if row and row[0] >= 3:
queue_verification(db, proxy_key, 'resurrection', priority=3,
worker_a=worker_id, result_a=1)
else:
# Check for sudden death: was working (consecutive_success >= 3), now failed
row = db.execute(
'SELECT consecutive_success FROM proxylist WHERE proxy = ?', (proxy_key,)
).fetchone()
if row and row[0] and row[0] >= 3:
queue_verification(db, proxy_key, 'sudden_death', priority=2,
worker_a=worker_id, result_a=0)
processed += 1
except Exception as e:
_log('submit_results db error for %s: %s' % (proxy_key, e), 'error')
# Update worker stats
with _workers_lock:
if worker_id in _workers:
w = _workers[worker_id]
w['jobs_completed'] += 1
w['proxies_tested'] += processed
w['proxies_working'] = w.get('proxies_working', 0) + working_count
w['proxies_failed'] = w.get('proxies_failed', 0) + (processed - working_count)
w['total_latency'] = w.get('total_latency', 0) + total_latency
w['last_batch_size'] = len(results)
w['last_batch_working'] = working_count
# Commit database changes
db.commit()
# Record for test rate calculation
record_test_rate(worker_id, processed)
# Save workers periodically (every 60s)
if now - _last_workers_save > 60:
save_workers()
_last_workers_save = now
return processed
def is_localhost(ip):
"""Check if IP is localhost (127.0.0.0/8 or ::1)."""
if not ip:
@@ -1605,7 +1335,7 @@ class ProxyAPIServer(threading.Thread):
return [b'Method not allowed']
# POST only allowed for worker API endpoints
post_endpoints = ('/api/register', '/api/results', '/api/heartbeat',
post_endpoints = ('/api/register', '/api/heartbeat',
'/api/report-urls', '/api/report-proxies')
if method == 'POST' and path not in post_endpoints:
start_response('405 Method Not Allowed', [('Content-Type', 'text/plain')])
@@ -1673,8 +1403,6 @@ class ProxyAPIServer(threading.Thread):
'/api/stats': 'runtime statistics (JSON)',
'/api/mitm': 'MITM certificate statistics (JSON)',
'/api/countries': 'proxy counts by country (JSON)',
'/api/work': 'get work batch for worker (params: key, count)',
'/api/results': 'submit test results (POST, params: key)',
'/api/register': 'register as worker (POST)',
'/api/workers': 'list connected workers',
'/api/claim-urls': 'claim URL batch for worker-driven fetching (GET, params: key, count)',
@@ -1860,7 +1588,7 @@ class ProxyAPIServer(threading.Thread):
rows = db.execute(sql, args).fetchall()
if fmt == 'plain':
return '\n'.join('%s:%s' % (r[0], r[1]) for r in rows), 'text/plain', 200
return '\n'.join('%s://%s:%s' % (r[2] or 'http', r[0], r[1]) for r in rows), 'text/plain', 200
proxies = [{
'proxy': '%s:%s' % (r[0], r[1]), 'proto': r[2], 'country': r[3],
'asn': r[4], 'latency': r[5],
@@ -1941,54 +1669,6 @@ class ProxyAPIServer(threading.Thread):
'message': 'registered successfully',
}), 'application/json', 200
elif path == '/api/work':
# Get batch of proxies to test (GET)
key = query_params.get('key', '')
if not validate_worker_key(key):
return json.dumps({'error': 'invalid worker key'}), 'application/json', 403
worker_id, _ = get_worker_by_key(key)
if not worker_id:
return json.dumps({'error': 'worker not found'}), 'application/json', 404
count = int(query_params.get('count', 100))
count = min(count, 500) # Cap at 500
try:
db = mysqlite.mysqlite(self.database, str)
proxies = claim_work(db, worker_id, count)
update_worker_heartbeat(worker_id)
return json.dumps({
'worker_id': worker_id,
'count': len(proxies),
'proxies': proxies,
}), 'application/json', 200
except Exception as e:
return json.dumps({'error': str(e)}), 'application/json', 500
elif path == '/api/results':
# Submit test results (POST)
key = query_params.get('key', '')
if not validate_worker_key(key):
return json.dumps({'error': 'invalid worker key'}), 'application/json', 403
worker_id, _ = get_worker_by_key(key)
if not worker_id:
return json.dumps({'error': 'worker not found'}), 'application/json', 404
if not post_data:
return json.dumps({'error': 'POST body required'}), 'application/json', 400
results = post_data.get('results', [])
if not results:
return json.dumps({'error': 'no results provided'}), 'application/json', 400
working = sum(1 for r in results if r.get('working'))
_log('results: %d from %s (%d working)' % (len(results), worker_id[:8], working), 'info')
try:
db = mysqlite.mysqlite(self.database, str)
processed = submit_results(db, worker_id, results)
return json.dumps({
'worker_id': worker_id,
'processed': processed,
'message': 'results submitted',
}), 'application/json', 200
except Exception as e:
return json.dumps({'error': str(e)}), 'application/json', 500
elif path == '/api/heartbeat':
# Worker heartbeat with Tor status (POST)
key = query_params.get('key', '')
@@ -2132,11 +1812,8 @@ class ProxyAPIServer(threading.Thread):
'SELECT COUNT(*) FROM proxylist WHERE ' + due_condition,
due_params).fetchone()
due_total = row[0] if row else 0
# Subtract currently claimed
with _work_claims_lock:
claimed_count = len(_work_claims)
stats['due'] = max(0, due_total - claimed_count)
stats['claimed'] = claimed_count
stats['due'] = due_total
stats['claimed'] = 0
except Exception as e:
_log('_get_db_stats error: %s' % e, 'warn')
return stats
@@ -2218,16 +1895,13 @@ class ProxyAPIServer(threading.Thread):
if queue_stats['total'] > 0:
pct = 100.0 * queue_stats['session_tested'] / queue_stats['total']
queue_stats['session_pct'] = round(min(pct, 100.0), 1)
# Claimed = currently being tested by workers
with _work_claims_lock:
queue_stats['claimed'] = len(_work_claims)
queue_stats['claimed'] = 0
# Due = ready for testing (respecting cooldown)
due_condition, due_params = _build_due_condition()
row = db.execute(
'SELECT COUNT(*) FROM proxylist WHERE ' + due_condition,
due_params).fetchone()
due_total = row[0] if row else 0
queue_stats['due'] = max(0, due_total - queue_stats['claimed'])
queue_stats['due'] = row[0] if row else 0
except Exception as e:
_log('_get_workers_data queue stats error: %s' % e, 'warn')

370
ppf.py
View File

@@ -303,48 +303,6 @@ class NeedReregister(Exception):
pass
def worker_get_work(server_url, worker_key, count=100):
"""Fetch batch of proxies from master."""
url = '%s/api/work?key=%s&count=%d' % (server_url.rstrip('/'), worker_key, count)
try:
resp = urllib2.urlopen(url, timeout=30)
result = json.loads(resp.read())
return result.get('proxies', [])
except urllib2.HTTPError as e:
if e.code == 403:
_log('worker key rejected (403), need to re-register', 'warn')
raise NeedReregister()
_log('failed to get work: %s' % e, 'error')
return []
except Exception as e:
_log('failed to get work: %s' % e, 'error')
return []
def worker_submit_results(server_url, worker_key, results):
"""Submit test results to master."""
url = '%s/api/results?key=%s' % (server_url.rstrip('/'), worker_key)
data = json.dumps({'results': results})
req = urllib2.Request(url, data)
req.add_header('Content-Type', 'application/json')
try:
resp = urllib2.urlopen(req, timeout=30)
result = json.loads(resp.read())
return result.get('processed', 0)
except urllib2.HTTPError as e:
if e.code == 403:
_log('worker key rejected (403), need to re-register', 'warn')
raise NeedReregister()
_log('failed to submit results: %s' % e, 'error')
return 0
except Exception as e:
_log('failed to submit results: %s' % e, 'error')
return 0
def worker_send_heartbeat(server_url, worker_key, tor_ok, tor_ip=None, profiling=False, threads=0):
"""Send heartbeat with Tor status to master."""
url = '%s/api/heartbeat?key=%s' % (server_url.rstrip('/'), worker_key)
@@ -370,7 +328,7 @@ def worker_send_heartbeat(server_url, worker_key, tor_ok, tor_ip=None, profiling
def worker_claim_urls(server_url, worker_key, count=5):
"""Claim batch of URLs for V2 worker mode."""
"""Claim batch of URLs for worker mode."""
url = '%s/api/claim-urls?key=%s&count=%d' % (server_url.rstrip('/'), worker_key, count)
try:
@@ -471,310 +429,7 @@ def check_tor_connectivity(tor_hosts):
def worker_main(config):
"""Worker mode main loop - uses proxywatchd multi-threaded testing."""
import json
global urllib2
try:
import Queue
except ImportError:
import queue as Queue
# Import proxywatchd for multi-threaded testing (gevent already patched at top)
import proxywatchd
proxywatchd.set_config(config)
server_url = config.args.server
if not server_url:
_log('--server URL required for worker mode', 'error')
sys.exit(1)
worker_key = config.args.worker_key
worker_name = config.args.worker_name or os.uname()[1]
batch_size = config.worker.batch_size
num_threads = config.watchd.threads
worker_id = None
# Register if --register flag or no key provided
if config.args.register or not worker_key:
_log('registering with master: %s' % server_url, 'info')
worker_id, worker_key = worker_register(server_url, worker_name)
if not worker_key:
_log('registration failed, exiting', 'error')
sys.exit(1)
_log('registered as %s (id: %s)' % (worker_name, worker_id), 'info')
_log('worker key: %s' % worker_key, 'info')
_log('save this key with --worker-key for future runs', 'info')
if config.args.register:
# Just register and exit
return
_log('starting worker mode', 'info')
_log(' server: %s' % server_url, 'info')
_log(' threads: %d' % num_threads, 'info')
_log(' batch size: %d' % batch_size, 'info')
_log(' tor hosts: %s' % config.common.tor_hosts, 'info')
# Verify Tor connectivity before claiming work
import socket
import socks
working_tor_hosts = []
for tor_host in config.torhosts:
host, port = tor_host.split(':')
port = int(port)
try:
# Test SOCKS connection
test_sock = socks.socksocket()
test_sock.set_proxy(socks.SOCKS5, host, port)
test_sock.settimeout(10)
test_sock.connect(('check.torproject.org', 80))
test_sock.send(b'GET / HTTP/1.0\r\nHost: check.torproject.org\r\n\r\n')
resp = test_sock.recv(512)
test_sock.close()
# Accept any HTTP response (200, 301, 302, etc.)
if resp and (b'HTTP/' in resp or len(resp) > 0):
status = resp.split(b'\r\n')[0] if b'\r\n' in resp else resp[:50]
_log('tor host %s:%d OK (%s)' % (host, port, status), 'info')
working_tor_hosts.append(tor_host)
else:
_log('tor host %s:%d no response (recv=%d bytes)' % (host, port, len(resp) if resp else 0), 'warn')
except Exception as e:
_log('tor host %s:%d failed: %s' % (host, port, e), 'warn')
if not working_tor_hosts:
_log('no working Tor hosts, cannot start worker', 'error')
sys.exit(1)
_log('%d/%d Tor hosts verified' % (len(working_tor_hosts), len(config.torhosts)), 'info')
# Create shared queues for worker threads
job_queue = proxywatchd.PriorityJobQueue()
completion_queue = Queue.Queue()
# Spawn worker threads with stagger to avoid overwhelming Tor
threads = []
for i in range(num_threads):
wt = proxywatchd.WorkerThread('w%d' % i, job_queue)
wt.start_thread()
threads.append(wt)
time.sleep(random.random() / 10) # 0-100ms stagger per thread
_log('spawned %d worker threads' % len(threads), 'info')
jobs_completed = 0
proxies_tested = 0
start_time = time.time()
current_tor_ip = None
consecutive_tor_failures = 0
worker_profiling = config.args.profile or config.common.profiling
# Use dict to allow mutation in nested function (Python 2 compatible)
wstate = {'worker_key': worker_key, 'worker_id': worker_id, 'backoff': 10}
def do_register():
"""Register with master, with exponential backoff on failure."""
while True:
_log('registering with master: %s' % server_url, 'info')
new_id, new_key = worker_register(server_url, worker_name)
if new_key:
wstate['worker_id'] = new_id
wstate['worker_key'] = new_key
wstate['backoff'] = 10 # Reset backoff on success
_log('registered as %s (id: %s)' % (worker_name, new_id), 'info')
return True
else:
_log('registration failed, retrying in %ds' % wstate['backoff'], 'warn')
time.sleep(wstate['backoff'])
wstate['backoff'] = min(wstate['backoff'] * 2, 300) # Max 5 min backoff
def wait_for_tor():
"""Wait for Tor to become available, checking every 30 seconds."""
check_interval = 30
while True:
working, tor_ip = check_tor_connectivity(config.torhosts)
if working:
_log('tor recovered: %s (%s)' % (working[0], tor_ip or 'unknown'), 'info')
# Send heartbeat to manager
try:
worker_send_heartbeat(server_url, wstate['worker_key'], True, tor_ip, worker_profiling, num_threads)
except NeedReregister:
do_register()
return working, tor_ip
_log('tor still down, retrying in %ds' % check_interval, 'warn')
# Send heartbeat with tor_ok=False
try:
worker_send_heartbeat(server_url, wstate['worker_key'], False, None, worker_profiling, num_threads)
except NeedReregister:
do_register()
time.sleep(check_interval)
try:
while True:
# Tor check before claiming work - don't claim if Tor is down
working, tor_ip = check_tor_connectivity(config.torhosts)
if not working:
consecutive_tor_failures += 1
_log('tor down before claiming work (consecutive: %d)' % consecutive_tor_failures, 'warn')
try:
worker_send_heartbeat(server_url, wstate['worker_key'], False, None, worker_profiling, num_threads)
except NeedReregister:
do_register()
if consecutive_tor_failures >= 2:
_log('tor appears down, waiting before claiming work', 'error')
working, current_tor_ip = wait_for_tor()
consecutive_tor_failures = 0
else:
time.sleep(10)
continue
else:
consecutive_tor_failures = 0
if tor_ip != current_tor_ip:
if current_tor_ip:
_log('tor circuit rotated: %s' % tor_ip, 'info')
current_tor_ip = tor_ip
# Send heartbeat to manager
try:
worker_send_heartbeat(server_url, wstate['worker_key'], True, tor_ip, worker_profiling, num_threads)
except NeedReregister:
do_register()
# Get work from master
try:
proxies = worker_get_work(server_url, wstate['worker_key'], batch_size)
except NeedReregister:
do_register()
continue
if not proxies:
_log('no work available, sleeping 30s', 'info')
time.sleep(30)
continue
_log('received %d proxies to test' % len(proxies), 'info')
# Create ProxyTestState and jobs for each proxy
pending_states = {}
all_jobs = []
# Get checktype(s) from config
checktypes = config.watchd.checktypes
for proxy_info in proxies:
ip = proxy_info['ip']
port = proxy_info['port']
proto = proxy_info.get('proto', 'http')
failed = proxy_info.get('failed', 0)
source_proto = proxy_info.get('source_proto')
proxy_str = '%s:%d' % (ip, port)
# Create state for this proxy
state = proxywatchd.ProxyTestState(
ip, port, proto, failed,
success_count=0, total_duration=0.0,
country=None, mitm=0, consecutive_success=0,
asn=None, oldies=False,
completion_queue=completion_queue,
proxy_full=proxy_str, source_proto=source_proto
)
pending_states[proxy_str] = state
# Select random checktype
checktype = random.choice(checktypes)
# Get target for this checktype
if checktype == 'judges':
available = proxywatchd.judge_stats.get_available_judges(
list(proxywatchd.judges.keys()))
target = random.choice(available) if available else random.choice(
list(proxywatchd.judges.keys()))
elif checktype == 'ssl':
target = random.choice(proxywatchd.ssl_targets)
elif checktype == 'irc':
target = random.choice(config.servers) if config.servers else 'irc.libera.chat:6667'
else: # head
target = random.choice(list(proxywatchd.regexes.keys()))
job = proxywatchd.TargetTestJob(state, target, checktype)
all_jobs.append(job)
# Shuffle and queue jobs
random.shuffle(all_jobs)
for job in all_jobs:
job_queue.put(job, priority=0)
# Wait for all jobs to complete
completed = 0
results = []
timeout_start = time.time()
timeout_seconds = config.watchd.timeout * 2 + 30 # generous timeout
while completed < len(proxies):
try:
state = completion_queue.get(timeout=1)
completed += 1
# Build result from state (failcount == 0 means success)
is_working = state.failcount == 0
latency_sec = (state.last_latency_ms / 1000.0) if state.last_latency_ms else 0
result = {
'ip': state.ip,
'port': state.port,
'proto': state.proto,
'working': is_working,
'latency': round(latency_sec, 3) if is_working else 0,
'error': None if is_working else 'failed',
}
results.append(result)
# Progress logging
if completed % 20 == 0 or completed == len(proxies):
working = sum(1 for r in results if r.get('working'))
_log('tested %d/%d proxies (%d working)' % (
completed, len(proxies), working), 'info')
except Queue.Empty:
if time.time() - timeout_start > timeout_seconds:
_log('batch timeout, %d/%d completed' % (completed, len(proxies)), 'warn')
break
continue
# Submit results
try:
processed = worker_submit_results(server_url, wstate['worker_key'], results)
except NeedReregister:
do_register()
# Retry submission with new key
try:
processed = worker_submit_results(server_url, wstate['worker_key'], results)
except NeedReregister:
_log('still rejected after re-register, discarding batch', 'error')
processed = 0
jobs_completed += 1
proxies_tested += len(results)
working = sum(1 for r in results if r.get('working'))
_log('batch %d: %d/%d working, submitted %d' % (
jobs_completed, working, len(results), processed), 'info')
# Brief pause between batches
time.sleep(1)
except KeyboardInterrupt:
elapsed = time.time() - start_time
_log('worker stopping...', 'info')
# Stop threads
for wt in threads:
wt.stop()
for wt in threads:
wt.term()
_log('worker stopped after %s' % format_duration(int(elapsed)), 'info')
_log(' jobs completed: %d' % jobs_completed, 'info')
_log(' proxies tested: %d' % proxies_tested, 'info')
def worker_v2_main(config):
"""V2 worker mode -- URL-driven discovery.
"""Worker mode -- URL-driven discovery.
Claims URLs from master, fetches through Tor, extracts and tests proxies,
reports working proxies back to master.
@@ -815,7 +470,7 @@ def worker_v2_main(config):
if config.args.register:
return
_log('starting worker V2 mode (URL-driven)', 'info')
_log('starting worker mode (URL-driven)', 'info')
_log(' server: %s' % server_url, 'info')
_log(' threads: %d' % num_threads, 'info')
_log(' url batch: %d' % url_batch_size, 'info')
@@ -1120,7 +775,8 @@ def worker_v2_main(config):
state = completion_queue.get(timeout=1)
completed += 1
if state.failcount == 0:
success, _ = state.evaluate()
if success:
latency_sec = (state.last_latency_ms / 1000.0) if state.last_latency_ms else 0
proxy_addr = state.proxy
if state.auth:
@@ -1168,13 +824,13 @@ def worker_v2_main(config):
except KeyboardInterrupt:
elapsed = time.time() - start_time
_log('worker V2 stopping...', 'info')
_log('worker stopping...', 'info')
session.close()
for wt in threads:
wt.stop()
for wt in threads:
wt.term()
_log('worker V2 stopped after %s' % format_duration(int(elapsed)), 'info')
_log('worker stopped after %s' % format_duration(int(elapsed)), 'info')
_log(' cycles: %d' % cycles, 'info')
_log(' urls fetched: %d' % urls_fetched, 'info')
_log(' proxies found: %d' % proxies_found, 'info')
@@ -1193,12 +849,7 @@ def main():
else:
sys.exit(1)
# V2 worker mode: URL-driven discovery
if config.args.worker_v2:
worker_v2_main(config)
return
# V1 worker mode: connect to master server instead of running locally
# Worker mode: URL-driven discovery
if config.args.worker or config.args.register:
worker_main(config)
return
@@ -1303,6 +954,11 @@ def main():
last_skip_log = 0
while True:
try:
# When ppf threads = 0, skip URL fetching (workers handle it via /api/claim-urls)
if config.ppf.threads == 0:
time.sleep(60)
continue
time.sleep(random.random()/10)
if (time.time() - statusmsg) > 180:
_log('running %d thread(s) over %d' % (len(threads), config.ppf.threads), 'ppf')

170
tools/lib/ppf-common.sh Normal file
View File

@@ -0,0 +1,170 @@
#!/bin/bash
# ppf-common.sh -- shared library for PPF operations toolkit
# Source this file; do not execute directly.
set -eu
# ---------------------------------------------------------------------------
# Paths
# ---------------------------------------------------------------------------
PPF_DIR="${PPF_DIR:-$HOME/git/ppf}"
ANSIBLE_DIR="/opt/ansible"
ANSIBLE_VENV="${ANSIBLE_DIR}/venv/bin/activate"
PPF_INVENTORY="${PPF_DIR}/tools/playbooks/inventory.ini"
# ---------------------------------------------------------------------------
# Host topology
# ---------------------------------------------------------------------------
MASTER="odin"
WORKERS="cassius edge sentinel"
ALL_HOSTS="odin cassius edge sentinel"
# Container names per role
MASTER_CONTAINER="ppf"
WORKER_CONTAINER="ppf-worker"
# ---------------------------------------------------------------------------
# Colors (respects NO_COLOR -- https://no-color.org)
# ---------------------------------------------------------------------------
if [ -z "${NO_COLOR:-}" ] && [ -t 1 ]; then
C_RST='\033[0m'
C_DIM='\033[2m'
C_BOLD='\033[1m'
C_RED='\033[38;5;167m'
C_GREEN='\033[38;5;114m'
C_YELLOW='\033[38;5;180m'
C_BLUE='\033[38;5;110m'
C_CYAN='\033[38;5;116m'
else
C_RST='' C_DIM='' C_BOLD='' C_RED='' C_GREEN=''
C_YELLOW='' C_BLUE='' C_CYAN=''
fi
# ---------------------------------------------------------------------------
# Output helpers
# ---------------------------------------------------------------------------
log_ok() { printf "${C_GREEN}${C_RST} %s\n" "$*"; }
log_err() { printf "${C_RED}${C_RST} %s\n" "$*" >&2; }
log_warn() { printf "${C_YELLOW}${C_RST} %s\n" "$*"; }
log_info() { printf "${C_BLUE}${C_RST} %s\n" "$*"; }
log_dim() { printf "${C_DIM} %s${C_RST}\n" "$*"; }
die() { log_err "$@"; exit 1; }
# Section header
section() {
printf "\n${C_BOLD}${C_CYAN} %s${C_RST}\n" "$*"
}
# ---------------------------------------------------------------------------
# Host resolution helpers
# ---------------------------------------------------------------------------
is_master() { [ "$1" = "$MASTER" ]; }
is_worker() {
local h
for h in $WORKERS; do [ "$h" = "$1" ] && return 0; done
return 1
}
container_name() {
if is_master "$1"; then echo "$MASTER_CONTAINER"; else echo "$WORKER_CONTAINER"; fi
}
# Expand target aliases into host list
# "all" -> all hosts
# "workers" -> worker hosts
# "odin" -> just odin
# Multiple args are concatenated with comma
resolve_targets() {
local targets=""
local arg
for arg in "$@"; do
case "$arg" in
all) targets="${targets:+$targets }$ALL_HOSTS" ;;
workers) targets="${targets:+$targets }$WORKERS" ;;
master) targets="${targets:+$targets }$MASTER" ;;
*) targets="${targets:+$targets }$arg" ;;
esac
done
# Deduplicate while preserving order
echo "$targets" | tr ' ' '\n' | awk '!seen[$0]++' | tr '\n' ' ' | sed 's/ $//'
}
# Convert space-separated host list to comma-separated for ansible
hosts_csv() {
echo "$*" | tr ' ' ','
}
# ---------------------------------------------------------------------------
# Ansible wrapper
# ---------------------------------------------------------------------------
# Runs ansible with toolkit inventory via venv.
# Usage: ansible_cmd <ansible args...>
ansible_cmd() {
(
# shellcheck disable=SC1090
. "$ANSIBLE_VENV"
cd "$ANSIBLE_DIR"
ansible -i "$PPF_INVENTORY" --become "$@"
)
}
# Runs ansible-playbook with toolkit inventory via venv.
# Usage: ansible_playbook_cmd <ansible-playbook args...>
ansible_playbook_cmd() {
(
# shellcheck disable=SC1090
. "$ANSIBLE_VENV"
cd "$ANSIBLE_DIR"
ansible-playbook "$@"
)
}
# ---------------------------------------------------------------------------
# Remote podman/compose wrappers
# ---------------------------------------------------------------------------
# Run a podman command on a remote host as the podman user.
# Uses dynamic UID discovery.
# Usage: podman_cmd HOST "podman subcommand..."
podman_cmd() {
local host="$1"; shift
local cmd="$*"
ansible_cmd "$host" -m raw -a \
"uid=\$(id -u podman) && cd /tmp && sudo -u podman XDG_RUNTIME_DIR=/run/user/\$uid $cmd"
}
# Run a podman-compose subcommand on a remote host.
# Usage: compose_cmd HOST "subcommand [args]"
compose_cmd() {
local host="$1"; shift
local cmd="$*"
ansible_cmd "$host" -m raw -a \
"uid=\$(id -u podman) && sudo -u podman bash -c 'export XDG_RUNTIME_DIR=/run/user/'\$uid' && cd /home/podman/ppf && podman-compose $cmd'"
}
# ---------------------------------------------------------------------------
# Validation
# ---------------------------------------------------------------------------
validate_syntax() {
local errors=0
local f
section "Validating Python syntax"
for f in "$PPF_DIR"/*.py; do
[ -f "$f" ] || continue
if python3 -m py_compile "$f" 2>/dev/null; then
log_dim "$(basename "$f")"
else
log_err "$(basename "$f")"
errors=$((errors + 1))
fi
done
if [ "$errors" -gt 0 ]; then
die "$errors file(s) failed syntax check"
fi
log_ok "All files valid"
}
# ---------------------------------------------------------------------------
# Version
# ---------------------------------------------------------------------------
PPF_TOOLS_VERSION="1.0.0"

View File

@@ -0,0 +1,58 @@
---
- name: Deploy PPF code
hosts: ppf
gather_facts: false
become: true
tasks:
- name: Sync Python code and support files
ansible.posix.synchronize:
src: "{{ ppf_src }}/"
dest: "{{ ppf_code_dest }}"
rsync_opts:
- "--include=*.py"
- "--include=servers.txt"
- "--include=Dockerfile"
- "--exclude=*"
register: sync_result
notify: restart containers
- name: Deploy compose file
ansible.builtin.copy:
src: "{{ ppf_src }}/{{ ppf_compose_src }}"
dest: "{{ ppf_base }}/compose.yml"
owner: "{{ ppf_owner }}"
group: "{{ ppf_owner }}"
register: compose_result
notify: restart containers
- name: Fix file ownership
ansible.builtin.file:
path: "{{ ppf_base }}"
owner: "{{ ppf_owner }}"
group: "{{ ppf_owner }}"
recurse: true
- name: Flush handlers before status check
ansible.builtin.meta: flush_handlers
- name: Wait for containers to settle
ansible.builtin.pause:
seconds: 2
when: >-
ppf_restart | bool and
(sync_result is changed or compose_result is changed)
- name: Check container status
ansible.builtin.raw: "uid=$(id -u {{ ppf_owner }}) && sudo -u {{ ppf_owner }} bash -c 'export XDG_RUNTIME_DIR=/run/user/'$uid' && cd {{ ppf_base }} && podman-compose ps'"
register: status_result
changed_when: false
- name: Show container status
ansible.builtin.debug:
msg: "{{ status_result.stdout_lines | default([]) }}"
handlers:
- name: restart containers
ansible.builtin.raw: "uid=$(id -u {{ ppf_owner }}) && sudo -u {{ ppf_owner }} bash -c 'export XDG_RUNTIME_DIR=/run/user/'$uid' && cd {{ ppf_base }} && podman-compose down && podman-compose up -d'"
when: ppf_restart | bool

View File

@@ -0,0 +1,3 @@
ppf_base: /home/podman/ppf
ppf_owner: podman
ppf_restart: true

View File

@@ -0,0 +1,2 @@
ppf_code_dest: /home/podman/ppf/
ppf_compose_src: compose.master.yml

View File

@@ -0,0 +1,2 @@
ppf_code_dest: /home/podman/ppf/src/
ppf_compose_src: compose.worker.yml

View File

@@ -0,0 +1,16 @@
[master]
odin ansible_host=10.200.1.250
[workers]
cassius ansible_host=10.200.1.13
edge ansible_host=10.200.1.254
sentinel ansible_host=10.200.1.1
[ppf:children]
master
workers
[ppf:vars]
ansible_user=ansible
ansible_ssh_private_key_file=/opt/ansible/secrets/ssh/ansible
ansible_remote_tmp=~/.ansible/tmp

154
tools/ppf-db Executable file
View File

@@ -0,0 +1,154 @@
#!/bin/bash
# ppf-db -- manage PPF databases
#
# Usage:
# ppf-db <command> [options]
#
# Commands: stats, purge-proxies, vacuum
set -eu
# Resolve to real path (handles symlinks from ~/.local/bin/)
SCRIPT_PATH="$(cd "$(dirname "$0")" && pwd)/$(basename "$0")"
SCRIPT_DIR="$(dirname "$(readlink -f "$SCRIPT_PATH")")"
# shellcheck disable=SC1091
. "$SCRIPT_DIR/lib/ppf-common.sh"
PROXY_DB="/home/podman/ppf/data/proxies.sqlite"
URL_DB="/home/podman/ppf/data/websites.sqlite"
# ---------------------------------------------------------------------------
# Usage
# ---------------------------------------------------------------------------
usage() {
cat <<EOF
Usage: ppf-db <command> [options]
Manage PPF databases on odin (master).
Commands:
stats show proxy and URL counts
purge-proxies delete all proxies (keeps URLs)
vacuum reclaim disk space after purge
Options:
--help show this help
--version show version
Examples:
ppf-db stats
ppf-db purge-proxies
ppf-db vacuum
EOF
exit 0
}
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
run_sql() {
local db="$1" sql="$2"
ansible_cmd "$MASTER" -m raw -a \
"sudo -u podman sqlite3 '$db' \"$sql\"" 2>/dev/null \
| grep -vE '^\s*$|^odin|Shared connection|CHANGED|SUCCESS'
}
# ---------------------------------------------------------------------------
# Commands
# ---------------------------------------------------------------------------
cmd_stats() {
section "Database stats (odin)"
local proxies total_urls active_urls working
proxies=$(run_sql "$PROXY_DB" "SELECT COUNT(*) FROM proxylist;")
working=$(run_sql "$PROXY_DB" "SELECT COUNT(*) FROM proxylist WHERE failed=0 AND proto IS NOT NULL;")
total_urls=$(run_sql "$URL_DB" "SELECT COUNT(*) FROM uris;")
active_urls=$(run_sql "$URL_DB" "SELECT COUNT(*) FROM uris WHERE error=0;")
log_info "Proxies: ${proxies} total, ${working} working"
log_info "URLs: ${total_urls} total, ${active_urls} active"
}
cmd_purge_proxies() {
section "Purging proxies from odin"
# Get counts before
local before
before=$(run_sql "$PROXY_DB" "SELECT COUNT(*) FROM proxylist;")
log_info "Proxies before: $before"
# Stop container
log_info "Stopping container..."
compose_cmd "$MASTER" "down" > /dev/null 2>&1 \
&& log_ok "Container stopped" \
|| die "Failed to stop container"
# Delete proxies
log_info "Deleting proxylist rows..."
run_sql "$PROXY_DB" "DELETE FROM proxylist;" > /dev/null 2>&1
log_ok "Proxylist purged"
# Vacuum to reclaim space
log_info "Vacuuming database..."
run_sql "$PROXY_DB" "VACUUM;" > /dev/null 2>&1
log_ok "Database vacuumed"
# Verify URLs intact
local urls_after
urls_after=$(run_sql "$URL_DB" "SELECT COUNT(*) FROM uris;")
log_ok "URLs preserved: $urls_after"
# Start container
log_info "Starting container..."
compose_cmd "$MASTER" "up -d" > /dev/null 2>&1 \
&& log_ok "Container started" \
|| die "Failed to start container"
}
cmd_vacuum() {
section "Vacuuming database (odin)"
local before after
before=$(ansible_cmd "$MASTER" -m raw -a \
"sudo -u podman ls -lh '$PROXY_DB'" 2>/dev/null \
| grep -oE '[0-9]+[KMG]?' | head -1)
run_sql "$PROXY_DB" "VACUUM;" > /dev/null 2>&1
after=$(ansible_cmd "$MASTER" -m raw -a \
"sudo -u podman ls -lh '$PROXY_DB'" 2>/dev/null \
| grep -oE '[0-9]+[KMG]?' | head -1)
log_ok "Vacuumed: ${before:-?} -> ${after:-?}"
}
# ---------------------------------------------------------------------------
# Parse args
# ---------------------------------------------------------------------------
[ $# -eq 0 ] && usage
COMMAND=""
while [ $# -gt 0 ]; do
case "$1" in
--help|-h) usage ;;
--version|-V) echo "ppf-db $PPF_TOOLS_VERSION"; exit 0 ;;
stats|purge-proxies|vacuum)
[ -n "$COMMAND" ] && die "Multiple commands given"
COMMAND="$1"
;;
-*) die "Unknown option: $1" ;;
*) die "Unknown command: $1" ;;
esac
shift
done
[ -z "$COMMAND" ] && die "No command given. Use: stats, purge-proxies, vacuum"
case "$COMMAND" in
stats) cmd_stats ;;
purge-proxies) cmd_purge_proxies ;;
vacuum) cmd_vacuum ;;
esac
printf "\n"

115
tools/ppf-deploy Executable file
View File

@@ -0,0 +1,115 @@
#!/bin/bash
# ppf-deploy -- deploy PPF code to nodes
#
# Usage:
# ppf-deploy [options] [targets...]
#
# Targets:
# all odin + all workers (default)
# workers cassius, edge, sentinel
# master odin
# <hostname> specific host(s)
set -eu
# Resolve to real path (handles symlinks from ~/.local/bin/)
SCRIPT_PATH="$(cd "$(dirname "$0")" && pwd)/$(basename "$0")"
SCRIPT_DIR="$(dirname "$(readlink -f "$SCRIPT_PATH")")"
# shellcheck disable=SC1091
. "$SCRIPT_DIR/lib/ppf-common.sh"
PLAYBOOK_DIR="$SCRIPT_DIR/playbooks"
# ---------------------------------------------------------------------------
# Usage
# ---------------------------------------------------------------------------
usage() {
cat <<EOF
Usage: ppf-deploy [options] [targets...]
Deploy PPF code to nodes via Ansible playbook.
Targets:
all odin + all workers (default)
workers cassius, edge, sentinel
master odin
<hostname> specific host(s)
Options:
--no-restart sync files only, skip container restart
--check dry run (ansible --check --diff)
-v verbose ansible output
--help show this help
--version show version
Steps performed:
1. Validate Python syntax locally
2. Rsync *.py + servers.txt (role-aware destinations)
3. Copy compose file per role
4. Fix ownership (podman:podman)
5. Restart containers on change (unless --no-restart)
6. Show container status
EOF
exit 0
}
# ---------------------------------------------------------------------------
# Parse args
# ---------------------------------------------------------------------------
DO_RESTART=1
CHECK_MODE=0
VERBOSE=""
TARGETS=""
while [ $# -gt 0 ]; do
case "$1" in
--help|-h) usage ;;
--version|-V) echo "ppf-deploy $PPF_TOOLS_VERSION"; exit 0 ;;
--no-restart) DO_RESTART=0 ;;
--check) CHECK_MODE=1 ;;
-v) VERBOSE="-v" ;;
-*) die "Unknown option: $1" ;;
*) TARGETS="${TARGETS:+$TARGETS }$1" ;;
esac
shift
done
TARGETS="${TARGETS:-all}"
# ---------------------------------------------------------------------------
# Pre-flight: local syntax validation
# ---------------------------------------------------------------------------
validate_syntax
# ---------------------------------------------------------------------------
# Build ansible-playbook arguments
# ---------------------------------------------------------------------------
ARGS=(-i "$PLAYBOOK_DIR/inventory.ini")
ARGS+=(-e "ppf_src=$PPF_DIR")
if [ "$DO_RESTART" -eq 0 ]; then
ARGS+=(-e "ppf_restart=false")
fi
if [ "$CHECK_MODE" -eq 1 ]; then
ARGS+=(--check --diff)
fi
[ -n "$VERBOSE" ] && ARGS+=("$VERBOSE")
# Target resolution: map aliases to ansible --limit
case "$TARGETS" in
all) ;; # no --limit = all hosts in inventory
*)
LIMIT=$(resolve_targets $TARGETS | tr ' ' ',')
ARGS+=(--limit "$LIMIT")
;;
esac
ARGS+=("$PLAYBOOK_DIR/deploy.yml")
# ---------------------------------------------------------------------------
# Run playbook
# ---------------------------------------------------------------------------
section "Deploying to ${TARGETS}"
ansible_playbook_cmd "${ARGS[@]}"

80
tools/ppf-logs Executable file
View File

@@ -0,0 +1,80 @@
#!/bin/bash
# ppf-logs -- view PPF container logs
#
# Usage:
# ppf-logs [options] [node]
#
# Defaults to odin if no node specified.
set -eu
# Resolve to real path (handles symlinks from ~/.local/bin/)
SCRIPT_PATH="$(cd "$(dirname "$0")" && pwd)/$(basename "$0")"
SCRIPT_DIR="$(dirname "$(readlink -f "$SCRIPT_PATH")")"
# shellcheck disable=SC1091
. "$SCRIPT_DIR/lib/ppf-common.sh"
# ---------------------------------------------------------------------------
# Usage
# ---------------------------------------------------------------------------
usage() {
cat <<EOF
Usage: ppf-logs [options] [node]
View PPF container logs.
Nodes:
odin, cassius, edge, sentinel (default: odin)
Options:
-f follow log output
-n LINES number of lines to show (default: 40)
--help show this help
--version show version
Examples:
ppf-logs last 40 lines from odin
ppf-logs cassius last 40 lines from cassius
ppf-logs -f edge follow edge worker logs
ppf-logs -n 100 sentinel last 100 lines from sentinel
EOF
exit 0
}
# ---------------------------------------------------------------------------
# Parse args
# ---------------------------------------------------------------------------
FOLLOW=0
LINES=40
NODE=""
while [ $# -gt 0 ]; do
case "$1" in
--help|-h) usage ;;
--version|-V) echo "ppf-logs $PPF_TOOLS_VERSION"; exit 0 ;;
-f) FOLLOW=1 ;;
-n) shift; LINES="${1:?'-n' requires a number}" ;;
-*) die "Unknown option: $1" ;;
*) NODE="$1" ;;
esac
shift
done
NODE="${NODE:-$MASTER}"
# Validate node
is_master "$NODE" || is_worker "$NODE" || die "Unknown node: $NODE"
CNAME=$(container_name "$NODE")
# ---------------------------------------------------------------------------
# Build podman logs command
# ---------------------------------------------------------------------------
CMD="podman logs --tail $LINES"
[ "$FOLLOW" -eq 1 ] && CMD="$CMD -f"
CMD="$CMD $CNAME"
section "$NODE ($CNAME)"
# Run with raw output -- logs go straight to terminal
podman_cmd "$NODE" "$CMD"

186
tools/ppf-service Executable file
View File

@@ -0,0 +1,186 @@
#!/bin/bash
# ppf-service -- manage PPF containers
#
# Usage:
# ppf-service <command> [nodes...]
#
# Commands: status, start, stop, restart
set -eu
# Resolve to real path (handles symlinks from ~/.local/bin/)
SCRIPT_PATH="$(cd "$(dirname "$0")" && pwd)/$(basename "$0")"
SCRIPT_DIR="$(dirname "$(readlink -f "$SCRIPT_PATH")")"
# shellcheck disable=SC1091
. "$SCRIPT_DIR/lib/ppf-common.sh"
ODIN_URL="http://10.200.1.250:8081"
# ---------------------------------------------------------------------------
# Usage
# ---------------------------------------------------------------------------
usage() {
cat <<EOF
Usage: ppf-service <command> [nodes...]
Manage PPF containers on remote nodes.
Commands:
status show container state + health (default nodes: all)
start start containers (compose up -d)
stop stop containers (compose stop)
restart restart containers (compose restart)
Nodes:
all odin + all workers (default)
workers cassius, edge, sentinel
master odin
<hostname> specific host(s)
Options:
--help show this help
--version show version
Examples:
ppf-service status
ppf-service restart workers
ppf-service stop cassius edge
ppf-service start odin
EOF
exit 0
}
# ---------------------------------------------------------------------------
# Status helpers
# ---------------------------------------------------------------------------
show_health() {
local result
result=$(ansible_cmd "$MASTER" -m raw -a \
"curl -sf --max-time 5 ${ODIN_URL}/health 2>/dev/null || echo UNREACHABLE" \
2>/dev/null) || true
if echo "$result" | grep -qi "ok\|healthy"; then
log_ok "master health: ok"
elif echo "$result" | grep -qi "UNREACHABLE"; then
log_err "master health: unreachable"
else
log_warn "master health: $result"
fi
}
show_workers_api() {
local result
result=$(ansible_cmd "$MASTER" -m raw -a \
"curl -sf --max-time 5 ${ODIN_URL}/api/workers 2>/dev/null || echo '{}'" \
2>/dev/null) || true
# Just show the raw output, trimmed
local data
data=$(echo "$result" | grep -v '^\s*$' | grep -v '^[A-Z]' | head -20)
if [ -n "$data" ]; then
log_info "Worker API response:"
echo "$data" | while IFS= read -r line; do
log_dim "$line"
done
fi
}
# ---------------------------------------------------------------------------
# Commands
# ---------------------------------------------------------------------------
cmd_status() {
local hosts="$1"
section "Container status"
for host in $hosts; do
local output
output=$(compose_cmd "$host" "ps" 2>/dev/null) || true
if echo "$output" | grep -qi "up\|running"; then
log_ok "$host"
elif echo "$output" | grep -qi "exit"; then
log_err "$host (exited)"
else
log_warn "$host (unknown)"
fi
echo "$output" | grep -v '^\s*$' | while IFS= read -r line; do
log_dim "$line"
done
done
# Show health/worker info if master is in target list
local h
for h in $hosts; do
if is_master "$h"; then
section "Master health"
show_health
show_workers_api
break
fi
done
}
cmd_start() {
local hosts="$1"
section "Starting containers"
for host in $hosts; do
compose_cmd "$host" "up -d" > /dev/null 2>&1 \
&& log_ok "$host started" \
|| log_err "$host start failed"
done
}
cmd_stop() {
local hosts="$1"
section "Stopping containers"
for host in $hosts; do
compose_cmd "$host" "stop" > /dev/null 2>&1 \
&& log_ok "$host stopped" \
|| log_err "$host stop failed"
done
}
cmd_restart() {
local hosts="$1"
section "Restarting containers"
for host in $hosts; do
compose_cmd "$host" "down" > /dev/null 2>&1 \
&& compose_cmd "$host" "up -d" > /dev/null 2>&1 \
&& log_ok "$host restarted" \
|| log_err "$host restart failed"
done
}
# ---------------------------------------------------------------------------
# Parse args
# ---------------------------------------------------------------------------
[ $# -eq 0 ] && usage
COMMAND=""
TARGETS=""
while [ $# -gt 0 ]; do
case "$1" in
--help|-h) usage ;;
--version|-V) echo "ppf-service $PPF_TOOLS_VERSION"; exit 0 ;;
status|start|stop|restart)
[ -n "$COMMAND" ] && die "Multiple commands given"
COMMAND="$1"
;;
-*) die "Unknown option: $1" ;;
*) TARGETS="${TARGETS:+$TARGETS }$1" ;;
esac
shift
done
[ -z "$COMMAND" ] && die "No command given. Use: status, start, stop, restart"
TARGETS="${TARGETS:-all}"
HOSTS=$(resolve_targets $TARGETS)
[ -z "$HOSTS" ] && die "No valid targets"
case "$COMMAND" in
status) cmd_status "$HOSTS" ;;
start) cmd_start "$HOSTS" ;;
stop) cmd_stop "$HOSTS" ;;
restart) cmd_restart "$HOSTS" ;;
esac
printf "\n"