# -*- coding: utf-8 -*- """Network mocking utilities for PPF offline testing. Provides mock implementations of rocksock and connection pool for testing proxy validation logic without actual network calls. """ from __future__ import print_function import sys import os sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) # Import rocksock error codes for mock exceptions try: import rocksock RS_E_TARGET_CONN_REFUSED = rocksock.RS_E_TARGET_CONN_REFUSED RS_E_HIT_TIMEOUT = rocksock.RS_E_HIT_TIMEOUT RS_E_PROXY_AUTH_FAILED = rocksock.RS_E_PROXY_AUTH_FAILED RS_E_REMOTE_DISCONNECTED = rocksock.RS_E_REMOTE_DISCONNECTED RS_E_PROXY_UNEXPECTED_RESPONSE = rocksock.RS_E_PROXY_UNEXPECTED_RESPONSE RS_ET_OWN = rocksock.RS_ET_OWN RS_ET_GAI = rocksock.RS_ET_GAI RS_ET_SSL = rocksock.RS_ET_SSL except ImportError: # Define error codes if rocksock not available RS_E_TARGET_CONN_REFUSED = 20 RS_E_HIT_TIMEOUT = 5 RS_E_PROXY_AUTH_FAILED = 7 RS_E_REMOTE_DISCONNECTED = 24 RS_E_PROXY_UNEXPECTED_RESPONSE = 11 RS_ET_OWN = 1 RS_ET_GAI = 2 RS_ET_SSL = 3 class MockRocksockException(Exception): """Mock RocksockException for testing error categorization.""" def __init__(self, error=0, errortype=RS_ET_OWN, message=''): self._error = error self._errortype = errortype self._message = message super(MockRocksockException, self).__init__(message) def get_error(self): return self._error def get_errortype(self): return self._errortype def get_errormessage(self): return self._message # Configurable test scenarios # Maps (proxy_ip, proxy_port) -> test behavior MOCK_SCENARIOS = {} # Default response for unknown proxies DEFAULT_RESPONSE = b'192.0.2.1' # TEST-NET-1 IP for testing class MockRocksock: """Mock rocksock.Rocksock for offline testing. Simulates network connections based on configured scenarios. Can be configured to succeed, fail, or return specific responses. """ def __init__(self, host=None, port=None, proxies=None, ssl=False, timeout=None, **kwargs): self.host = host self.port = port self.proxies = proxies or [] self.ssl = ssl self.timeout = timeout self.connected = False self._response = DEFAULT_RESPONSE self._scenario = None # Determine scenario based on target proxy (second in chain) if len(self.proxies) >= 2: target_proxy = self.proxies[1] if hasattr(target_proxy, 'hostinfo'): key = (target_proxy.hostinfo.host, target_proxy.hostinfo.port) self._scenario = MOCK_SCENARIOS.get(key) def connect(self): """Simulate connection based on scenario.""" if self._scenario: if self._scenario.get('fail'): error = self._scenario.get('error', RS_E_TARGET_CONN_REFUSED) errortype = self._scenario.get('errortype', RS_ET_OWN) raise MockRocksockException(error, errortype, 'Mock connection failed') if self._scenario.get('response'): self._response = self._scenario['response'] self.connected = True return True def send(self, data): """Simulate sending data.""" if not self.connected: raise MockRocksockException(RS_E_REMOTE_DISCONNECTED, RS_ET_OWN, 'Not connected') return len(data) def recv(self, n=-1): """Return configured response.""" if not self.connected: raise MockRocksockException(RS_E_REMOTE_DISCONNECTED, RS_ET_OWN, 'Not connected') return self._response def recvline(self): """Return response as line.""" return self.recv() + b'\n' def disconnect(self): """Close mock connection.""" self.connected = False class MockTorHostState: """Mock TorHostState for testing.""" def __init__(self, host='127.0.0.1:9050'): self.host = host self.success_count = 0 self.failure_count = 0 self.latency_sum = 0.0 self.available = True self.backoff_until = 0 def record_success(self, latency=0): self.success_count += 1 self.latency_sum += latency def record_failure(self): self.failure_count += 1 def is_available(self): return self.available def get_latency(self): if self.success_count == 0: return 0 return self.latency_sum / self.success_count class MockTorConnectionPool: """Mock TorConnectionPool for testing. Provides a simplified pool that returns configured Tor hosts without making actual network connections. """ def __init__(self, tor_hosts=None, warmup=True): self.tor_hosts = tor_hosts or ['127.0.0.1:9050'] self.warmup_complete = True self.host_states = {h: MockTorHostState(h) for h in self.tor_hosts} self.successes = [] self.failures = [] def get_tor_host(self, worker_id=None): """Return first available Tor host.""" for host in self.tor_hosts: state = self.host_states.get(host) if state and state.is_available(): return host return None def record_success(self, host, latency=0): """Record successful connection.""" self.successes.append((host, latency)) if host in self.host_states: self.host_states[host].record_success(latency) def record_failure(self, host): """Record failed connection.""" self.failures.append(host) if host in self.host_states: self.host_states[host].record_failure() def get_stats(self): """Return pool statistics.""" return { 'available_hosts': sum(1 for h in self.host_states.values() if h.is_available()), 'total_hosts': len(self.tor_hosts), 'total_successes': sum(h.success_count for h in self.host_states.values()), 'total_failures': sum(h.failure_count for h in self.host_states.values()), } def configure_scenario(proxy_ip, proxy_port, **kwargs): """Configure test scenario for a specific proxy. Args: proxy_ip: Proxy IP address proxy_port: Proxy port number **kwargs: Scenario configuration: - fail: If True, connection will fail - error: Error code to raise (default: RS_E_TARGET_CONN_REFUSED) - errortype: Error type (default: RS_ET_OWN) - response: Bytes to return from recv() Example: # Configure proxy to fail with timeout configure_scenario('1.2.3.4', 8080, fail=True, error=RS_E_HIT_TIMEOUT) # Configure proxy to succeed with specific exit IP configure_scenario('5.6.7.8', 3128, response=b'203.0.113.50') """ MOCK_SCENARIOS[(proxy_ip, proxy_port)] = kwargs def clear_scenarios(): """Clear all configured test scenarios.""" MOCK_SCENARIOS.clear() def reset_mock_state(): """Reset all mock state for clean test runs.""" clear_scenarios() # Convenience functions for common scenarios def scenario_timeout(proxy_ip, proxy_port): """Configure proxy to fail with timeout.""" configure_scenario(proxy_ip, proxy_port, fail=True, error=RS_E_HIT_TIMEOUT) def scenario_refused(proxy_ip, proxy_port): """Configure proxy to fail with connection refused.""" configure_scenario(proxy_ip, proxy_port, fail=True, error=RS_E_TARGET_CONN_REFUSED) def scenario_auth_fail(proxy_ip, proxy_port): """Configure proxy to fail with authentication error.""" configure_scenario(proxy_ip, proxy_port, fail=True, error=RS_E_PROXY_AUTH_FAILED) def scenario_success(proxy_ip, proxy_port, exit_ip='192.0.2.1'): """Configure proxy to succeed with specific exit IP.""" configure_scenario(proxy_ip, proxy_port, response=exit_ip.encode('utf-8'))