# PPF Implementation Tasks ## Legend ``` [ ] Not started [~] In progress [x] Completed [!] Blocked/needs discussion ``` --- ## Immediate Priority (Next Sprint) ### [ ] 1. Unify _known_proxies Cache **Problem:** Both ppf.py and fetch.py maintain separate `_known_proxies` dictionaries. Updates to one don't reflect in the other, causing potential duplicate processing. **Implementation:** ```python # fetch.py - becomes the single source of truth _known_proxies = {} _known_proxies_lock = threading.Lock() def get_known_proxies(): """Return reference to shared known proxies dict.""" return _known_proxies def is_known_proxy(proxy): """Thread-safe check if proxy is known.""" with _known_proxies_lock: return proxy in _known_proxies def mark_proxy_known(proxy): """Thread-safe mark proxy as known.""" with _known_proxies_lock: _known_proxies[proxy] = True ``` **Files:** fetch.py, ppf.py **Effort:** Low **Risk:** Low --- ### [ ] 2. Graceful SQLite Error Handling **Problem:** SQLite can throw "database is locked" errors under concurrent access. Currently these bubble up and crash the application. **Implementation:** ```python # mysqlite.py import time class mysqlite(): def execute(self, query, params=None, retries=5): for attempt in range(retries): try: if params: return self.cur.execute(query, params) return self.cur.execute(query) except sqlite3.OperationalError as e: if 'locked' in str(e) and attempt < retries - 1: time.sleep(0.1 * (attempt + 1)) # Exponential backoff continue raise ``` **Files:** mysqlite.py **Effort:** Low **Risk:** Low --- ### [ ] 3. Enable SQLite WAL Mode **Problem:** Default SQLite journaling mode blocks concurrent readers during writes. **Implementation:** ```python # mysqlite.py - in __init__ def __init__(self, database, rowtype): self.conn = sqlite3.connect(database, check_same_thread=False) self.conn.execute('PRAGMA journal_mode=WAL') self.conn.execute('PRAGMA synchronous=NORMAL') # ... ``` **Files:** mysqlite.py **Effort:** Low **Risk:** Low (WAL is well-tested) --- ### [ ] 4. Batch Database Inserts **Problem:** insert_proxies() and insert_urls() do individual INSERTs, causing excessive disk I/O and lock contention. **Implementation:** ```python # dbs.py def insert_proxies(proxydb, proxies, source): """Batch insert proxies.""" if not proxies: return mytime = int(time.time()) values = [(p, source, mytime) for p in proxies] proxydb.executemany( 'INSERT OR IGNORE INTO proxylist (proxy, source, first_seen) VALUES (?, ?, ?)', values ) proxydb.commit() ``` **Files:** dbs.py **Effort:** Low **Risk:** Low --- ### [ ] 5. Add Database Indexes **Problem:** Queries on large tables are slow without proper indexes. **Implementation:** ```python # dbs.py - in create_table_if_not_exists def create_indexes(db, table): if table == 'proxylist': db.execute('CREATE INDEX IF NOT EXISTS idx_proxy_failed ON proxylist(failed)') db.execute('CREATE INDEX IF NOT EXISTS idx_proxy_proto ON proxylist(proto)') elif table == 'uris': db.execute('CREATE INDEX IF NOT EXISTS idx_uri_error ON uris(error)') db.execute('CREATE INDEX IF NOT EXISTS idx_uri_checktime ON uris(check_time)') db.commit() ``` **Files:** dbs.py **Effort:** Low **Risk:** Low --- ## Short Term (This Month) ### [ ] 6. Standardize Logging **Problem:** Inconsistent logging across files. Some use print(), some use _log(), some silently swallow errors. **Implementation:** ```python # misc.py - enhanced logging import sys LOG_LEVELS = {'debug': 0, 'info': 1, 'warn': 2, 'error': 3} _log_level = 1 # Default: info def set_log_level(level): global _log_level _log_level = LOG_LEVELS.get(level, 1) def _log(msg, level='info', module=None): if LOG_LEVELS.get(level, 1) < _log_level: return prefix = '%s/%s' % (timestamp(), level) if module: prefix = '%s/%s' % (prefix, module) output = sys.stderr if level in ('warn', 'error') else sys.stdout print >> output, '\r%s\t%s' % (prefix, msg) ``` **Files:** misc.py, all other files **Effort:** Medium **Risk:** Low --- ### [ ] 7. Connection Timeout Standardization **Problem:** Timeout values are scattered: config.ppf.timeout, config.watchd.timeout, hardcoded values in rocksock.py. **Implementation:** - Add to config.py: `[network]` section with timeout_connect, timeout_read, timeout_total - Pass timeout explicitly to all network functions - Remove hardcoded timeout values **Files:** config.py, fetch.py, proxywatchd.py, rocksock.py **Effort:** Medium **Risk:** Low --- ### [ ] 8. Failure Categorization **Problem:** All failures treated equally. Timeout vs connection-refused vs auth-failure have different implications. **Implementation:** ```python # proxywatchd.py class FailureType: TIMEOUT = 'timeout' # Retry later REFUSED = 'refused' # Proxy down, lower priority AUTH_FAIL = 'auth_fail' # Wrong protocol, try others TARGET_DOWN = 'target_down' # Not proxy's fault UNKNOWN = 'unknown' def categorize_failure(exception): """Categorize failure type from exception.""" msg = str(exception).lower() if 'timeout' in msg or 'timed out' in msg: return FailureType.TIMEOUT if 'refused' in msg: return FailureType.REFUSED if 'auth' in msg or 'handshake' in msg: return FailureType.AUTH_FAIL return FailureType.UNKNOWN ``` **Files:** proxywatchd.py **Effort:** Medium **Risk:** Low --- ### [ ] 9. Priority Queue for Proxy Testing **Problem:** All proxies tested with equal priority. Should prioritize: - Recently successful proxies (verify still working) - New proxies (determine if usable) - Low fail-count proxies **Implementation:** ```python # proxywatchd.py import heapq class PriorityJobQueue: """Priority queue wrapper for proxy test jobs.""" def __init__(self): self.heap = [] self.lock = threading.Lock() def put(self, job, priority): """Lower priority number = higher priority.""" with self.lock: heapq.heappush(self.heap, (priority, id(job), job)) def get(self, timeout=None): """Get highest priority job.""" # ... implementation with timeout ``` Priority calculation: - New proxy (retrievals=0): priority 0 - Recent success (last_success < 1hr): priority 1 - Low fail count (failed < 3): priority 2 - Medium fail count: priority 3 - High fail count: priority 4 **Files:** proxywatchd.py **Effort:** Medium **Risk:** Medium --- ### [ ] 10. Periodic Statistics Output **Problem:** No visibility into system performance during operation. **Implementation:** ```python # proxywatchd.py class Stats: def __init__(self): self.lock = threading.Lock() self.tested = 0 self.passed = 0 self.failed = 0 self.start_time = time.time() def record(self, success): with self.lock: self.tested += 1 if success: self.passed += 1 else: self.failed += 1 def report(self): with self.lock: elapsed = time.time() - self.start_time rate = self.tested / elapsed if elapsed > 0 else 0 pct = (self.passed * 100.0 / self.tested) if self.tested > 0 else 0 return 'tested=%d passed=%d (%.1f%%) rate=%.1f/s' % ( self.tested, self.passed, pct, rate) # In main loop, every 5 minutes: if time.time() - last_stats > 300: _log(stats.report(), 'stats', 'watchd') last_stats = time.time() ``` **Files:** proxywatchd.py **Effort:** Low **Risk:** Low --- ## Medium Term (Next Quarter) ### [ ] 11. Tor Connection Pooling **Problem:** Each proxy test creates a new Tor connection. Tor circuit establishment is slow (~2-3 seconds). **Implementation:** ```python # new file: connection_pool.py class TorConnectionPool: """Pool of reusable Tor SOCKS connections.""" def __init__(self, tor_hosts, pool_size=10): self.tor_hosts = tor_hosts self.pool_size = pool_size self.connections = Queue.Queue(pool_size) self.lock = threading.Lock() def get(self, timeout=5): """Get a Tor connection from pool, or create new.""" try: return self.connections.get(timeout=0.1) except Queue.Empty: return self._create_connection() def release(self, conn): """Return connection to pool.""" try: self.connections.put_nowait(conn) except Queue.Full: conn.close() def _create_connection(self): """Create new Tor SOCKS connection.""" host = random.choice(self.tor_hosts) # ... establish connection ``` **Files:** new connection_pool.py, proxywatchd.py **Effort:** High **Risk:** Medium --- ### [ ] 12. Dynamic Thread Scaling **Problem:** Fixed thread count regardless of success rate or system load. **Implementation:** ```python # proxywatchd.py class ThreadScaler: """Dynamically adjust thread count based on performance.""" def __init__(self, min_threads=5, max_threads=50): self.min = min_threads self.max = max_threads self.current = min_threads self.success_rate_window = [] def record_result(self, success): self.success_rate_window.append(success) if len(self.success_rate_window) > 100: self.success_rate_window.pop(0) def recommended_threads(self): if len(self.success_rate_window) < 20: return self.current success_rate = sum(self.success_rate_window) / len(self.success_rate_window) # High success rate -> can handle more threads if success_rate > 0.7 and self.current < self.max: return self.current + 5 # Low success rate -> reduce load elif success_rate < 0.3 and self.current > self.min: return self.current - 5 return self.current ``` **Files:** proxywatchd.py **Effort:** Medium **Risk:** Medium --- ### [ ] 13. Latency Tracking **Problem:** No visibility into proxy speed. A working but slow proxy may be less useful than a fast one. **Implementation:** ```python # dbs.py - add columns # ALTER TABLE proxylist ADD COLUMN avg_latency REAL DEFAULT 0 # ALTER TABLE proxylist ADD COLUMN latency_samples INTEGER DEFAULT 0 def update_proxy_latency(proxydb, proxy, latency): """Update rolling average latency for proxy.""" row = proxydb.execute( 'SELECT avg_latency, latency_samples FROM proxylist WHERE proxy=?', (proxy,) ).fetchone() if row: old_avg, samples = row # Exponential moving average new_avg = (old_avg * samples + latency) / (samples + 1) new_samples = min(samples + 1, 100) # Cap at 100 samples proxydb.execute( 'UPDATE proxylist SET avg_latency=?, latency_samples=? WHERE proxy=?', (new_avg, new_samples, proxy) ) ``` **Files:** dbs.py, proxywatchd.py **Effort:** Medium **Risk:** Low --- ### [ ] 14. Export Functionality **Problem:** No easy way to export working proxies for use elsewhere. **Implementation:** ```python # new file: export.py def export_proxies(proxydb, format='txt', filters=None): """Export working proxies to various formats.""" query = 'SELECT proto, proxy FROM proxylist WHERE failed=0' if filters: if 'proto' in filters: query += ' AND proto=?' rows = proxydb.execute(query).fetchall() if format == 'txt': return '\n'.join('%s://%s' % (r[0], r[1]) for r in rows) elif format == 'json': import json return json.dumps([{'proto': r[0], 'address': r[1]} for r in rows]) elif format == 'csv': return 'proto,address\n' + '\n'.join('%s,%s' % r for r in rows) # CLI: python export.py --format json --proto socks5 > proxies.json ``` **Files:** new export.py **Effort:** Low **Risk:** Low --- ### [ ] 15. Unit Test Infrastructure **Problem:** No automated tests. Changes can break existing functionality silently. **Implementation:** ``` tests/ ├── __init__.py ├── test_proxy_utils.py # Test IP validation, cleansing ├── test_extract.py # Test proxy/URL extraction ├── test_database.py # Test DB operations with temp DB └── mock_network.py # Mock rocksock for offline testing ``` ```python # tests/test_proxy_utils.py import unittest import sys sys.path.insert(0, '..') import fetch class TestProxyValidation(unittest.TestCase): def test_valid_proxy(self): self.assertTrue(fetch.is_usable_proxy('8.8.8.8:8080')) def test_private_ip_rejected(self): self.assertFalse(fetch.is_usable_proxy('192.168.1.1:8080')) self.assertFalse(fetch.is_usable_proxy('10.0.0.1:8080')) self.assertFalse(fetch.is_usable_proxy('172.16.0.1:8080')) def test_invalid_port_rejected(self): self.assertFalse(fetch.is_usable_proxy('8.8.8.8:0')) self.assertFalse(fetch.is_usable_proxy('8.8.8.8:99999')) if __name__ == '__main__': unittest.main() ``` **Files:** tests/ directory **Effort:** High (initial), Low (ongoing) **Risk:** Low --- ## Long Term (Future) ### [ ] 16. Geographic Validation Verify proxy actually routes through claimed location using IP geolocation. ### [ ] 17. HTTPS/SSL Proxy Testing Add capability to test HTTPS CONNECT proxies. ### [ ] 18. Additional Search Engines Support Google, Bing, DuckDuckGo beyond Searx. ### [ ] 19. REST API Simple HTTP API to query proxy database. ### [ ] 20. Web Dashboard Status page showing live statistics. --- ## Completed ### [x] Work-Stealing Queue - Implemented shared Queue.Queue() for job distribution - Workers pull from shared queue instead of pre-assigned lists - Better utilization across threads ### [x] Multi-Target Validation - Test each proxy against 3 random targets - 2/3 majority required for success - Reduces false negatives from single target failures ### [x] Interleaved Testing - Jobs shuffled across all proxies before queueing - Prevents burst of 3 connections to same proxy - ProxyTestState accumulates results from TargetTestJobs ### [x] Code Cleanup - Removed 93 lines dead HTTP server code (ppf.py) - Removed dead gumbo parser (soup_parser.py) - Removed test code (comboparse.py) - Removed unused functions (misc.py) - Fixed IP/port cleansing (ppf.py) - Updated .gitignore