add project roadmap and task list

This commit is contained in:
Username
2025-12-20 16:46:43 +01:00
parent a4e2c2ed7c
commit d9cea386e9
2 changed files with 804 additions and 0 deletions

562
TODO.md Normal file
View File

@@ -0,0 +1,562 @@
# PPF Implementation Tasks
## Legend
```
[ ] Not started
[~] In progress
[x] Completed
[!] Blocked/needs discussion
```
---
## Immediate Priority (Next Sprint)
### [ ] 1. Unify _known_proxies Cache
**Problem:** Both ppf.py and fetch.py maintain separate `_known_proxies` dictionaries.
Updates to one don't reflect in the other, causing potential duplicate processing.
**Implementation:**
```python
# fetch.py - becomes the single source of truth
_known_proxies = {}
_known_proxies_lock = threading.Lock()
def get_known_proxies():
"""Return reference to shared known proxies dict."""
return _known_proxies
def is_known_proxy(proxy):
"""Thread-safe check if proxy is known."""
with _known_proxies_lock:
return proxy in _known_proxies
def mark_proxy_known(proxy):
"""Thread-safe mark proxy as known."""
with _known_proxies_lock:
_known_proxies[proxy] = True
```
**Files:** fetch.py, ppf.py
**Effort:** Low
**Risk:** Low
---
### [ ] 2. Graceful SQLite Error Handling
**Problem:** SQLite can throw "database is locked" errors under concurrent access.
Currently these bubble up and crash the application.
**Implementation:**
```python
# mysqlite.py
import time
class mysqlite():
def execute(self, query, params=None, retries=5):
for attempt in range(retries):
try:
if params:
return self.cur.execute(query, params)
return self.cur.execute(query)
except sqlite3.OperationalError as e:
if 'locked' in str(e) and attempt < retries - 1:
time.sleep(0.1 * (attempt + 1)) # Exponential backoff
continue
raise
```
**Files:** mysqlite.py
**Effort:** Low
**Risk:** Low
---
### [ ] 3. Enable SQLite WAL Mode
**Problem:** Default SQLite journaling mode blocks concurrent readers during writes.
**Implementation:**
```python
# mysqlite.py - in __init__
def __init__(self, database, rowtype):
self.conn = sqlite3.connect(database, check_same_thread=False)
self.conn.execute('PRAGMA journal_mode=WAL')
self.conn.execute('PRAGMA synchronous=NORMAL')
# ...
```
**Files:** mysqlite.py
**Effort:** Low
**Risk:** Low (WAL is well-tested)
---
### [ ] 4. Batch Database Inserts
**Problem:** insert_proxies() and insert_urls() do individual INSERTs, causing
excessive disk I/O and lock contention.
**Implementation:**
```python
# dbs.py
def insert_proxies(proxydb, proxies, source):
"""Batch insert proxies."""
if not proxies:
return
mytime = int(time.time())
values = [(p, source, mytime) for p in proxies]
proxydb.executemany(
'INSERT OR IGNORE INTO proxylist (proxy, source, first_seen) VALUES (?, ?, ?)',
values
)
proxydb.commit()
```
**Files:** dbs.py
**Effort:** Low
**Risk:** Low
---
### [ ] 5. Add Database Indexes
**Problem:** Queries on large tables are slow without proper indexes.
**Implementation:**
```python
# dbs.py - in create_table_if_not_exists
def create_indexes(db, table):
if table == 'proxylist':
db.execute('CREATE INDEX IF NOT EXISTS idx_proxy_failed ON proxylist(failed)')
db.execute('CREATE INDEX IF NOT EXISTS idx_proxy_proto ON proxylist(proto)')
elif table == 'uris':
db.execute('CREATE INDEX IF NOT EXISTS idx_uri_error ON uris(error)')
db.execute('CREATE INDEX IF NOT EXISTS idx_uri_checktime ON uris(check_time)')
db.commit()
```
**Files:** dbs.py
**Effort:** Low
**Risk:** Low
---
## Short Term (This Month)
### [ ] 6. Standardize Logging
**Problem:** Inconsistent logging across files. Some use print(), some use _log(),
some silently swallow errors.
**Implementation:**
```python
# misc.py - enhanced logging
import sys
LOG_LEVELS = {'debug': 0, 'info': 1, 'warn': 2, 'error': 3}
_log_level = 1 # Default: info
def set_log_level(level):
global _log_level
_log_level = LOG_LEVELS.get(level, 1)
def _log(msg, level='info', module=None):
if LOG_LEVELS.get(level, 1) < _log_level:
return
prefix = '%s/%s' % (timestamp(), level)
if module:
prefix = '%s/%s' % (prefix, module)
output = sys.stderr if level in ('warn', 'error') else sys.stdout
print >> output, '\r%s\t%s' % (prefix, msg)
```
**Files:** misc.py, all other files
**Effort:** Medium
**Risk:** Low
---
### [ ] 7. Connection Timeout Standardization
**Problem:** Timeout values are scattered: config.ppf.timeout, config.watchd.timeout,
hardcoded values in rocksock.py.
**Implementation:**
- Add to config.py: `[network]` section with timeout_connect, timeout_read, timeout_total
- Pass timeout explicitly to all network functions
- Remove hardcoded timeout values
**Files:** config.py, fetch.py, proxywatchd.py, rocksock.py
**Effort:** Medium
**Risk:** Low
---
### [ ] 8. Failure Categorization
**Problem:** All failures treated equally. Timeout vs connection-refused vs
auth-failure have different implications.
**Implementation:**
```python
# proxywatchd.py
class FailureType:
TIMEOUT = 'timeout' # Retry later
REFUSED = 'refused' # Proxy down, lower priority
AUTH_FAIL = 'auth_fail' # Wrong protocol, try others
TARGET_DOWN = 'target_down' # Not proxy's fault
UNKNOWN = 'unknown'
def categorize_failure(exception):
"""Categorize failure type from exception."""
msg = str(exception).lower()
if 'timeout' in msg or 'timed out' in msg:
return FailureType.TIMEOUT
if 'refused' in msg:
return FailureType.REFUSED
if 'auth' in msg or 'handshake' in msg:
return FailureType.AUTH_FAIL
return FailureType.UNKNOWN
```
**Files:** proxywatchd.py
**Effort:** Medium
**Risk:** Low
---
### [ ] 9. Priority Queue for Proxy Testing
**Problem:** All proxies tested with equal priority. Should prioritize:
- Recently successful proxies (verify still working)
- New proxies (determine if usable)
- Low fail-count proxies
**Implementation:**
```python
# proxywatchd.py
import heapq
class PriorityJobQueue:
"""Priority queue wrapper for proxy test jobs."""
def __init__(self):
self.heap = []
self.lock = threading.Lock()
def put(self, job, priority):
"""Lower priority number = higher priority."""
with self.lock:
heapq.heappush(self.heap, (priority, id(job), job))
def get(self, timeout=None):
"""Get highest priority job."""
# ... implementation with timeout
```
Priority calculation:
- New proxy (retrievals=0): priority 0
- Recent success (last_success < 1hr): priority 1
- Low fail count (failed < 3): priority 2
- Medium fail count: priority 3
- High fail count: priority 4
**Files:** proxywatchd.py
**Effort:** Medium
**Risk:** Medium
---
### [ ] 10. Periodic Statistics Output
**Problem:** No visibility into system performance during operation.
**Implementation:**
```python
# proxywatchd.py
class Stats:
def __init__(self):
self.lock = threading.Lock()
self.tested = 0
self.passed = 0
self.failed = 0
self.start_time = time.time()
def record(self, success):
with self.lock:
self.tested += 1
if success:
self.passed += 1
else:
self.failed += 1
def report(self):
with self.lock:
elapsed = time.time() - self.start_time
rate = self.tested / elapsed if elapsed > 0 else 0
pct = (self.passed * 100.0 / self.tested) if self.tested > 0 else 0
return 'tested=%d passed=%d (%.1f%%) rate=%.1f/s' % (
self.tested, self.passed, pct, rate)
# In main loop, every 5 minutes:
if time.time() - last_stats > 300:
_log(stats.report(), 'stats', 'watchd')
last_stats = time.time()
```
**Files:** proxywatchd.py
**Effort:** Low
**Risk:** Low
---
## Medium Term (Next Quarter)
### [ ] 11. Tor Connection Pooling
**Problem:** Each proxy test creates a new Tor connection. Tor circuit establishment
is slow (~2-3 seconds).
**Implementation:**
```python
# new file: connection_pool.py
class TorConnectionPool:
"""Pool of reusable Tor SOCKS connections."""
def __init__(self, tor_hosts, pool_size=10):
self.tor_hosts = tor_hosts
self.pool_size = pool_size
self.connections = Queue.Queue(pool_size)
self.lock = threading.Lock()
def get(self, timeout=5):
"""Get a Tor connection from pool, or create new."""
try:
return self.connections.get(timeout=0.1)
except Queue.Empty:
return self._create_connection()
def release(self, conn):
"""Return connection to pool."""
try:
self.connections.put_nowait(conn)
except Queue.Full:
conn.close()
def _create_connection(self):
"""Create new Tor SOCKS connection."""
host = random.choice(self.tor_hosts)
# ... establish connection
```
**Files:** new connection_pool.py, proxywatchd.py
**Effort:** High
**Risk:** Medium
---
### [ ] 12. Dynamic Thread Scaling
**Problem:** Fixed thread count regardless of success rate or system load.
**Implementation:**
```python
# proxywatchd.py
class ThreadScaler:
"""Dynamically adjust thread count based on performance."""
def __init__(self, min_threads=5, max_threads=50):
self.min = min_threads
self.max = max_threads
self.current = min_threads
self.success_rate_window = []
def record_result(self, success):
self.success_rate_window.append(success)
if len(self.success_rate_window) > 100:
self.success_rate_window.pop(0)
def recommended_threads(self):
if len(self.success_rate_window) < 20:
return self.current
success_rate = sum(self.success_rate_window) / len(self.success_rate_window)
# High success rate -> can handle more threads
if success_rate > 0.7 and self.current < self.max:
return self.current + 5
# Low success rate -> reduce load
elif success_rate < 0.3 and self.current > self.min:
return self.current - 5
return self.current
```
**Files:** proxywatchd.py
**Effort:** Medium
**Risk:** Medium
---
### [ ] 13. Latency Tracking
**Problem:** No visibility into proxy speed. A working but slow proxy may be
less useful than a fast one.
**Implementation:**
```python
# dbs.py - add columns
# ALTER TABLE proxylist ADD COLUMN avg_latency REAL DEFAULT 0
# ALTER TABLE proxylist ADD COLUMN latency_samples INTEGER DEFAULT 0
def update_proxy_latency(proxydb, proxy, latency):
"""Update rolling average latency for proxy."""
row = proxydb.execute(
'SELECT avg_latency, latency_samples FROM proxylist WHERE proxy=?',
(proxy,)
).fetchone()
if row:
old_avg, samples = row
# Exponential moving average
new_avg = (old_avg * samples + latency) / (samples + 1)
new_samples = min(samples + 1, 100) # Cap at 100 samples
proxydb.execute(
'UPDATE proxylist SET avg_latency=?, latency_samples=? WHERE proxy=?',
(new_avg, new_samples, proxy)
)
```
**Files:** dbs.py, proxywatchd.py
**Effort:** Medium
**Risk:** Low
---
### [ ] 14. Export Functionality
**Problem:** No easy way to export working proxies for use elsewhere.
**Implementation:**
```python
# new file: export.py
def export_proxies(proxydb, format='txt', filters=None):
"""Export working proxies to various formats."""
query = 'SELECT proto, proxy FROM proxylist WHERE failed=0'
if filters:
if 'proto' in filters:
query += ' AND proto=?'
rows = proxydb.execute(query).fetchall()
if format == 'txt':
return '\n'.join('%s://%s' % (r[0], r[1]) for r in rows)
elif format == 'json':
import json
return json.dumps([{'proto': r[0], 'address': r[1]} for r in rows])
elif format == 'csv':
return 'proto,address\n' + '\n'.join('%s,%s' % r for r in rows)
# CLI: python export.py --format json --proto socks5 > proxies.json
```
**Files:** new export.py
**Effort:** Low
**Risk:** Low
---
### [ ] 15. Unit Test Infrastructure
**Problem:** No automated tests. Changes can break existing functionality silently.
**Implementation:**
```
tests/
├── __init__.py
├── test_proxy_utils.py # Test IP validation, cleansing
├── test_extract.py # Test proxy/URL extraction
├── test_database.py # Test DB operations with temp DB
└── mock_network.py # Mock rocksock for offline testing
```
```python
# tests/test_proxy_utils.py
import unittest
import sys
sys.path.insert(0, '..')
import fetch
class TestProxyValidation(unittest.TestCase):
def test_valid_proxy(self):
self.assertTrue(fetch.is_usable_proxy('8.8.8.8:8080'))
def test_private_ip_rejected(self):
self.assertFalse(fetch.is_usable_proxy('192.168.1.1:8080'))
self.assertFalse(fetch.is_usable_proxy('10.0.0.1:8080'))
self.assertFalse(fetch.is_usable_proxy('172.16.0.1:8080'))
def test_invalid_port_rejected(self):
self.assertFalse(fetch.is_usable_proxy('8.8.8.8:0'))
self.assertFalse(fetch.is_usable_proxy('8.8.8.8:99999'))
if __name__ == '__main__':
unittest.main()
```
**Files:** tests/ directory
**Effort:** High (initial), Low (ongoing)
**Risk:** Low
---
## Long Term (Future)
### [ ] 16. Geographic Validation
Verify proxy actually routes through claimed location using IP geolocation.
### [ ] 17. HTTPS/SSL Proxy Testing
Add capability to test HTTPS CONNECT proxies.
### [ ] 18. Additional Search Engines
Support Google, Bing, DuckDuckGo beyond Searx.
### [ ] 19. REST API
Simple HTTP API to query proxy database.
### [ ] 20. Web Dashboard
Status page showing live statistics.
---
## Completed
### [x] Work-Stealing Queue
- Implemented shared Queue.Queue() for job distribution
- Workers pull from shared queue instead of pre-assigned lists
- Better utilization across threads
### [x] Multi-Target Validation
- Test each proxy against 3 random targets
- 2/3 majority required for success
- Reduces false negatives from single target failures
### [x] Interleaved Testing
- Jobs shuffled across all proxies before queueing
- Prevents burst of 3 connections to same proxy
- ProxyTestState accumulates results from TargetTestJobs
### [x] Code Cleanup
- Removed 93 lines dead HTTP server code (ppf.py)
- Removed dead gumbo parser (soup_parser.py)
- Removed test code (comboparse.py)
- Removed unused functions (misc.py)
- Fixed IP/port cleansing (ppf.py)
- Updated .gitignore