Files
ppf/TODO.md
2025-12-20 16:46:43 +01:00

14 KiB

PPF Implementation Tasks

Legend

[ ] Not started
[~] In progress
[x] Completed
[!] Blocked/needs discussion

Immediate Priority (Next Sprint)

[ ] 1. Unify _known_proxies Cache

Problem: Both ppf.py and fetch.py maintain separate _known_proxies dictionaries. Updates to one don't reflect in the other, causing potential duplicate processing.

Implementation:

# fetch.py - becomes the single source of truth
_known_proxies = {}
_known_proxies_lock = threading.Lock()

def get_known_proxies():
    """Return reference to shared known proxies dict."""
    return _known_proxies

def is_known_proxy(proxy):
    """Thread-safe check if proxy is known."""
    with _known_proxies_lock:
        return proxy in _known_proxies

def mark_proxy_known(proxy):
    """Thread-safe mark proxy as known."""
    with _known_proxies_lock:
        _known_proxies[proxy] = True

Files: fetch.py, ppf.py Effort: Low Risk: Low


[ ] 2. Graceful SQLite Error Handling

Problem: SQLite can throw "database is locked" errors under concurrent access. Currently these bubble up and crash the application.

Implementation:

# mysqlite.py
import time

class mysqlite():
    def execute(self, query, params=None, retries=5):
        for attempt in range(retries):
            try:
                if params:
                    return self.cur.execute(query, params)
                return self.cur.execute(query)
            except sqlite3.OperationalError as e:
                if 'locked' in str(e) and attempt < retries - 1:
                    time.sleep(0.1 * (attempt + 1))  # Exponential backoff
                    continue
                raise

Files: mysqlite.py Effort: Low Risk: Low


[ ] 3. Enable SQLite WAL Mode

Problem: Default SQLite journaling mode blocks concurrent readers during writes.

Implementation:

# mysqlite.py - in __init__
def __init__(self, database, rowtype):
    self.conn = sqlite3.connect(database, check_same_thread=False)
    self.conn.execute('PRAGMA journal_mode=WAL')
    self.conn.execute('PRAGMA synchronous=NORMAL')
    # ...

Files: mysqlite.py Effort: Low Risk: Low (WAL is well-tested)


[ ] 4. Batch Database Inserts

Problem: insert_proxies() and insert_urls() do individual INSERTs, causing excessive disk I/O and lock contention.

Implementation:

# dbs.py
def insert_proxies(proxydb, proxies, source):
    """Batch insert proxies."""
    if not proxies:
        return

    mytime = int(time.time())
    values = [(p, source, mytime) for p in proxies]

    proxydb.executemany(
        'INSERT OR IGNORE INTO proxylist (proxy, source, first_seen) VALUES (?, ?, ?)',
        values
    )
    proxydb.commit()

Files: dbs.py Effort: Low Risk: Low


[ ] 5. Add Database Indexes

Problem: Queries on large tables are slow without proper indexes.

Implementation:

# dbs.py - in create_table_if_not_exists
def create_indexes(db, table):
    if table == 'proxylist':
        db.execute('CREATE INDEX IF NOT EXISTS idx_proxy_failed ON proxylist(failed)')
        db.execute('CREATE INDEX IF NOT EXISTS idx_proxy_proto ON proxylist(proto)')
    elif table == 'uris':
        db.execute('CREATE INDEX IF NOT EXISTS idx_uri_error ON uris(error)')
        db.execute('CREATE INDEX IF NOT EXISTS idx_uri_checktime ON uris(check_time)')
    db.commit()

Files: dbs.py Effort: Low Risk: Low


Short Term (This Month)

[ ] 6. Standardize Logging

Problem: Inconsistent logging across files. Some use print(), some use _log(), some silently swallow errors.

Implementation:

# misc.py - enhanced logging
import sys

LOG_LEVELS = {'debug': 0, 'info': 1, 'warn': 2, 'error': 3}
_log_level = 1  # Default: info

def set_log_level(level):
    global _log_level
    _log_level = LOG_LEVELS.get(level, 1)

def _log(msg, level='info', module=None):
    if LOG_LEVELS.get(level, 1) < _log_level:
        return
    prefix = '%s/%s' % (timestamp(), level)
    if module:
        prefix = '%s/%s' % (prefix, module)
    output = sys.stderr if level in ('warn', 'error') else sys.stdout
    print >> output, '\r%s\t%s' % (prefix, msg)

Files: misc.py, all other files Effort: Medium Risk: Low


[ ] 7. Connection Timeout Standardization

Problem: Timeout values are scattered: config.ppf.timeout, config.watchd.timeout, hardcoded values in rocksock.py.

Implementation:

  • Add to config.py: [network] section with timeout_connect, timeout_read, timeout_total
  • Pass timeout explicitly to all network functions
  • Remove hardcoded timeout values

Files: config.py, fetch.py, proxywatchd.py, rocksock.py Effort: Medium Risk: Low


[ ] 8. Failure Categorization

Problem: All failures treated equally. Timeout vs connection-refused vs auth-failure have different implications.

Implementation:

# proxywatchd.py
class FailureType:
    TIMEOUT = 'timeout'           # Retry later
    REFUSED = 'refused'           # Proxy down, lower priority
    AUTH_FAIL = 'auth_fail'       # Wrong protocol, try others
    TARGET_DOWN = 'target_down'   # Not proxy's fault
    UNKNOWN = 'unknown'

def categorize_failure(exception):
    """Categorize failure type from exception."""
    msg = str(exception).lower()
    if 'timeout' in msg or 'timed out' in msg:
        return FailureType.TIMEOUT
    if 'refused' in msg:
        return FailureType.REFUSED
    if 'auth' in msg or 'handshake' in msg:
        return FailureType.AUTH_FAIL
    return FailureType.UNKNOWN

Files: proxywatchd.py Effort: Medium Risk: Low


[ ] 9. Priority Queue for Proxy Testing

Problem: All proxies tested with equal priority. Should prioritize:

  • Recently successful proxies (verify still working)
  • New proxies (determine if usable)
  • Low fail-count proxies

Implementation:

# proxywatchd.py
import heapq

class PriorityJobQueue:
    """Priority queue wrapper for proxy test jobs."""
    def __init__(self):
        self.heap = []
        self.lock = threading.Lock()

    def put(self, job, priority):
        """Lower priority number = higher priority."""
        with self.lock:
            heapq.heappush(self.heap, (priority, id(job), job))

    def get(self, timeout=None):
        """Get highest priority job."""
        # ... implementation with timeout

Priority calculation:

  • New proxy (retrievals=0): priority 0
  • Recent success (last_success < 1hr): priority 1
  • Low fail count (failed < 3): priority 2
  • Medium fail count: priority 3
  • High fail count: priority 4

Files: proxywatchd.py Effort: Medium Risk: Medium


[ ] 10. Periodic Statistics Output

Problem: No visibility into system performance during operation.

Implementation:

# proxywatchd.py
class Stats:
    def __init__(self):
        self.lock = threading.Lock()
        self.tested = 0
        self.passed = 0
        self.failed = 0
        self.start_time = time.time()

    def record(self, success):
        with self.lock:
            self.tested += 1
            if success:
                self.passed += 1
            else:
                self.failed += 1

    def report(self):
        with self.lock:
            elapsed = time.time() - self.start_time
            rate = self.tested / elapsed if elapsed > 0 else 0
            pct = (self.passed * 100.0 / self.tested) if self.tested > 0 else 0
            return 'tested=%d passed=%d (%.1f%%) rate=%.1f/s' % (
                self.tested, self.passed, pct, rate)

# In main loop, every 5 minutes:
if time.time() - last_stats > 300:
    _log(stats.report(), 'stats', 'watchd')
    last_stats = time.time()

Files: proxywatchd.py Effort: Low Risk: Low


Medium Term (Next Quarter)

[ ] 11. Tor Connection Pooling

Problem: Each proxy test creates a new Tor connection. Tor circuit establishment is slow (~2-3 seconds).

Implementation:

# new file: connection_pool.py
class TorConnectionPool:
    """Pool of reusable Tor SOCKS connections."""

    def __init__(self, tor_hosts, pool_size=10):
        self.tor_hosts = tor_hosts
        self.pool_size = pool_size
        self.connections = Queue.Queue(pool_size)
        self.lock = threading.Lock()

    def get(self, timeout=5):
        """Get a Tor connection from pool, or create new."""
        try:
            return self.connections.get(timeout=0.1)
        except Queue.Empty:
            return self._create_connection()

    def release(self, conn):
        """Return connection to pool."""
        try:
            self.connections.put_nowait(conn)
        except Queue.Full:
            conn.close()

    def _create_connection(self):
        """Create new Tor SOCKS connection."""
        host = random.choice(self.tor_hosts)
        # ... establish connection

Files: new connection_pool.py, proxywatchd.py Effort: High Risk: Medium


[ ] 12. Dynamic Thread Scaling

Problem: Fixed thread count regardless of success rate or system load.

Implementation:

# proxywatchd.py
class ThreadScaler:
    """Dynamically adjust thread count based on performance."""

    def __init__(self, min_threads=5, max_threads=50):
        self.min = min_threads
        self.max = max_threads
        self.current = min_threads
        self.success_rate_window = []

    def record_result(self, success):
        self.success_rate_window.append(success)
        if len(self.success_rate_window) > 100:
            self.success_rate_window.pop(0)

    def recommended_threads(self):
        if len(self.success_rate_window) < 20:
            return self.current

        success_rate = sum(self.success_rate_window) / len(self.success_rate_window)

        # High success rate -> can handle more threads
        if success_rate > 0.7 and self.current < self.max:
            return self.current + 5
        # Low success rate -> reduce load
        elif success_rate < 0.3 and self.current > self.min:
            return self.current - 5

        return self.current

Files: proxywatchd.py Effort: Medium Risk: Medium


[ ] 13. Latency Tracking

Problem: No visibility into proxy speed. A working but slow proxy may be less useful than a fast one.

Implementation:

# dbs.py - add columns
# ALTER TABLE proxylist ADD COLUMN avg_latency REAL DEFAULT 0
# ALTER TABLE proxylist ADD COLUMN latency_samples INTEGER DEFAULT 0

def update_proxy_latency(proxydb, proxy, latency):
    """Update rolling average latency for proxy."""
    row = proxydb.execute(
        'SELECT avg_latency, latency_samples FROM proxylist WHERE proxy=?',
        (proxy,)
    ).fetchone()

    if row:
        old_avg, samples = row
        # Exponential moving average
        new_avg = (old_avg * samples + latency) / (samples + 1)
        new_samples = min(samples + 1, 100)  # Cap at 100 samples

        proxydb.execute(
            'UPDATE proxylist SET avg_latency=?, latency_samples=? WHERE proxy=?',
            (new_avg, new_samples, proxy)
        )

Files: dbs.py, proxywatchd.py Effort: Medium Risk: Low


[ ] 14. Export Functionality

Problem: No easy way to export working proxies for use elsewhere.

Implementation:

# new file: export.py
def export_proxies(proxydb, format='txt', filters=None):
    """Export working proxies to various formats."""

    query = 'SELECT proto, proxy FROM proxylist WHERE failed=0'
    if filters:
        if 'proto' in filters:
            query += ' AND proto=?'

    rows = proxydb.execute(query).fetchall()

    if format == 'txt':
        return '\n'.join('%s://%s' % (r[0], r[1]) for r in rows)
    elif format == 'json':
        import json
        return json.dumps([{'proto': r[0], 'address': r[1]} for r in rows])
    elif format == 'csv':
        return 'proto,address\n' + '\n'.join('%s,%s' % r for r in rows)

# CLI: python export.py --format json --proto socks5 > proxies.json

Files: new export.py Effort: Low Risk: Low


[ ] 15. Unit Test Infrastructure

Problem: No automated tests. Changes can break existing functionality silently.

Implementation:

tests/
├── __init__.py
├── test_proxy_utils.py    # Test IP validation, cleansing
├── test_extract.py        # Test proxy/URL extraction
├── test_database.py       # Test DB operations with temp DB
└── mock_network.py        # Mock rocksock for offline testing
# tests/test_proxy_utils.py
import unittest
import sys
sys.path.insert(0, '..')
import fetch

class TestProxyValidation(unittest.TestCase):
    def test_valid_proxy(self):
        self.assertTrue(fetch.is_usable_proxy('8.8.8.8:8080'))

    def test_private_ip_rejected(self):
        self.assertFalse(fetch.is_usable_proxy('192.168.1.1:8080'))
        self.assertFalse(fetch.is_usable_proxy('10.0.0.1:8080'))
        self.assertFalse(fetch.is_usable_proxy('172.16.0.1:8080'))

    def test_invalid_port_rejected(self):
        self.assertFalse(fetch.is_usable_proxy('8.8.8.8:0'))
        self.assertFalse(fetch.is_usable_proxy('8.8.8.8:99999'))

if __name__ == '__main__':
    unittest.main()

Files: tests/ directory Effort: High (initial), Low (ongoing) Risk: Low


Long Term (Future)

[ ] 16. Geographic Validation

Verify proxy actually routes through claimed location using IP geolocation.

[ ] 17. HTTPS/SSL Proxy Testing

Add capability to test HTTPS CONNECT proxies.

[ ] 18. Additional Search Engines

Support Google, Bing, DuckDuckGo beyond Searx.

[ ] 19. REST API

Simple HTTP API to query proxy database.

[ ] 20. Web Dashboard

Status page showing live statistics.


Completed

[x] Work-Stealing Queue

  • Implemented shared Queue.Queue() for job distribution
  • Workers pull from shared queue instead of pre-assigned lists
  • Better utilization across threads

[x] Multi-Target Validation

  • Test each proxy against 3 random targets
  • 2/3 majority required for success
  • Reduces false negatives from single target failures

[x] Interleaved Testing

  • Jobs shuffled across all proxies before queueing
  • Prevents burst of 3 connections to same proxy
  • ProxyTestState accumulates results from TargetTestJobs

[x] Code Cleanup

  • Removed 93 lines dead HTTP server code (ppf.py)
  • Removed dead gumbo parser (soup_parser.py)
  • Removed test code (comboparse.py)
  • Removed unused functions (misc.py)
  • Fixed IP/port cleansing (ppf.py)
  • Updated .gitignore