pytest-based test suite with fixtures for database testing. Covers misc.py utilities, dbs.py operations, and fetch.py validation. Includes mock_network.py for future network testing.
245 lines
7.8 KiB
Python
245 lines
7.8 KiB
Python
# -*- coding: utf-8 -*-
|
|
"""Network mocking utilities for PPF offline testing.
|
|
|
|
Provides mock implementations of rocksock and connection pool for testing
|
|
proxy validation logic without actual network calls.
|
|
"""
|
|
from __future__ import print_function
|
|
|
|
import sys
|
|
import os
|
|
|
|
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
|
|
|
|
# Import rocksock error codes for mock exceptions
|
|
try:
|
|
import rocksock
|
|
RS_E_TARGET_CONN_REFUSED = rocksock.RS_E_TARGET_CONN_REFUSED
|
|
RS_E_HIT_TIMEOUT = rocksock.RS_E_HIT_TIMEOUT
|
|
RS_E_PROXY_AUTH_FAILED = rocksock.RS_E_PROXY_AUTH_FAILED
|
|
RS_E_REMOTE_DISCONNECTED = rocksock.RS_E_REMOTE_DISCONNECTED
|
|
RS_E_PROXY_UNEXPECTED_RESPONSE = rocksock.RS_E_PROXY_UNEXPECTED_RESPONSE
|
|
RS_ET_OWN = rocksock.RS_ET_OWN
|
|
RS_ET_GAI = rocksock.RS_ET_GAI
|
|
RS_ET_SSL = rocksock.RS_ET_SSL
|
|
except ImportError:
|
|
# Define error codes if rocksock not available
|
|
RS_E_TARGET_CONN_REFUSED = 20
|
|
RS_E_HIT_TIMEOUT = 5
|
|
RS_E_PROXY_AUTH_FAILED = 7
|
|
RS_E_REMOTE_DISCONNECTED = 24
|
|
RS_E_PROXY_UNEXPECTED_RESPONSE = 11
|
|
RS_ET_OWN = 1
|
|
RS_ET_GAI = 2
|
|
RS_ET_SSL = 3
|
|
|
|
|
|
class MockRocksockException(Exception):
|
|
"""Mock RocksockException for testing error categorization."""
|
|
|
|
def __init__(self, error=0, errortype=RS_ET_OWN, message=''):
|
|
self._error = error
|
|
self._errortype = errortype
|
|
self._message = message
|
|
super(MockRocksockException, self).__init__(message)
|
|
|
|
def get_error(self):
|
|
return self._error
|
|
|
|
def get_errortype(self):
|
|
return self._errortype
|
|
|
|
def get_errormessage(self):
|
|
return self._message
|
|
|
|
|
|
# Configurable test scenarios
|
|
# Maps (proxy_ip, proxy_port) -> test behavior
|
|
MOCK_SCENARIOS = {}
|
|
|
|
# Default response for unknown proxies
|
|
DEFAULT_RESPONSE = b'192.0.2.1' # TEST-NET-1 IP for testing
|
|
|
|
|
|
class MockRocksock:
|
|
"""Mock rocksock.Rocksock for offline testing.
|
|
|
|
Simulates network connections based on configured scenarios.
|
|
Can be configured to succeed, fail, or return specific responses.
|
|
"""
|
|
|
|
def __init__(self, host=None, port=None, proxies=None, ssl=False,
|
|
timeout=None, **kwargs):
|
|
self.host = host
|
|
self.port = port
|
|
self.proxies = proxies or []
|
|
self.ssl = ssl
|
|
self.timeout = timeout
|
|
self.connected = False
|
|
self._response = DEFAULT_RESPONSE
|
|
self._scenario = None
|
|
|
|
# Determine scenario based on target proxy (second in chain)
|
|
if len(self.proxies) >= 2:
|
|
target_proxy = self.proxies[1]
|
|
if hasattr(target_proxy, 'hostinfo'):
|
|
key = (target_proxy.hostinfo.host, target_proxy.hostinfo.port)
|
|
self._scenario = MOCK_SCENARIOS.get(key)
|
|
|
|
def connect(self):
|
|
"""Simulate connection based on scenario."""
|
|
if self._scenario:
|
|
if self._scenario.get('fail'):
|
|
error = self._scenario.get('error', RS_E_TARGET_CONN_REFUSED)
|
|
errortype = self._scenario.get('errortype', RS_ET_OWN)
|
|
raise MockRocksockException(error, errortype, 'Mock connection failed')
|
|
if self._scenario.get('response'):
|
|
self._response = self._scenario['response']
|
|
|
|
self.connected = True
|
|
return True
|
|
|
|
def send(self, data):
|
|
"""Simulate sending data."""
|
|
if not self.connected:
|
|
raise MockRocksockException(RS_E_REMOTE_DISCONNECTED, RS_ET_OWN, 'Not connected')
|
|
return len(data)
|
|
|
|
def recv(self, n=-1):
|
|
"""Return configured response."""
|
|
if not self.connected:
|
|
raise MockRocksockException(RS_E_REMOTE_DISCONNECTED, RS_ET_OWN, 'Not connected')
|
|
return self._response
|
|
|
|
def recvline(self):
|
|
"""Return response as line."""
|
|
return self.recv() + b'\n'
|
|
|
|
def disconnect(self):
|
|
"""Close mock connection."""
|
|
self.connected = False
|
|
|
|
|
|
class MockTorHostState:
|
|
"""Mock TorHostState for testing."""
|
|
|
|
def __init__(self, host='127.0.0.1:9050'):
|
|
self.host = host
|
|
self.success_count = 0
|
|
self.failure_count = 0
|
|
self.latency_sum = 0.0
|
|
self.available = True
|
|
self.backoff_until = 0
|
|
|
|
def record_success(self, latency=0):
|
|
self.success_count += 1
|
|
self.latency_sum += latency
|
|
|
|
def record_failure(self):
|
|
self.failure_count += 1
|
|
|
|
def is_available(self):
|
|
return self.available
|
|
|
|
def get_latency(self):
|
|
if self.success_count == 0:
|
|
return 0
|
|
return self.latency_sum / self.success_count
|
|
|
|
|
|
class MockTorConnectionPool:
|
|
"""Mock TorConnectionPool for testing.
|
|
|
|
Provides a simplified pool that returns configured Tor hosts
|
|
without making actual network connections.
|
|
"""
|
|
|
|
def __init__(self, tor_hosts=None, warmup=True):
|
|
self.tor_hosts = tor_hosts or ['127.0.0.1:9050']
|
|
self.warmup_complete = True
|
|
self.host_states = {h: MockTorHostState(h) for h in self.tor_hosts}
|
|
self.successes = []
|
|
self.failures = []
|
|
|
|
def get_tor_host(self, worker_id=None):
|
|
"""Return first available Tor host."""
|
|
for host in self.tor_hosts:
|
|
state = self.host_states.get(host)
|
|
if state and state.is_available():
|
|
return host
|
|
return None
|
|
|
|
def record_success(self, host, latency=0):
|
|
"""Record successful connection."""
|
|
self.successes.append((host, latency))
|
|
if host in self.host_states:
|
|
self.host_states[host].record_success(latency)
|
|
|
|
def record_failure(self, host):
|
|
"""Record failed connection."""
|
|
self.failures.append(host)
|
|
if host in self.host_states:
|
|
self.host_states[host].record_failure()
|
|
|
|
def get_stats(self):
|
|
"""Return pool statistics."""
|
|
return {
|
|
'available_hosts': sum(1 for h in self.host_states.values() if h.is_available()),
|
|
'total_hosts': len(self.tor_hosts),
|
|
'total_successes': sum(h.success_count for h in self.host_states.values()),
|
|
'total_failures': sum(h.failure_count for h in self.host_states.values()),
|
|
}
|
|
|
|
|
|
def configure_scenario(proxy_ip, proxy_port, **kwargs):
|
|
"""Configure test scenario for a specific proxy.
|
|
|
|
Args:
|
|
proxy_ip: Proxy IP address
|
|
proxy_port: Proxy port number
|
|
**kwargs: Scenario configuration:
|
|
- fail: If True, connection will fail
|
|
- error: Error code to raise (default: RS_E_TARGET_CONN_REFUSED)
|
|
- errortype: Error type (default: RS_ET_OWN)
|
|
- response: Bytes to return from recv()
|
|
|
|
Example:
|
|
# Configure proxy to fail with timeout
|
|
configure_scenario('1.2.3.4', 8080, fail=True, error=RS_E_HIT_TIMEOUT)
|
|
|
|
# Configure proxy to succeed with specific exit IP
|
|
configure_scenario('5.6.7.8', 3128, response=b'203.0.113.50')
|
|
"""
|
|
MOCK_SCENARIOS[(proxy_ip, proxy_port)] = kwargs
|
|
|
|
|
|
def clear_scenarios():
|
|
"""Clear all configured test scenarios."""
|
|
MOCK_SCENARIOS.clear()
|
|
|
|
|
|
def reset_mock_state():
|
|
"""Reset all mock state for clean test runs."""
|
|
clear_scenarios()
|
|
|
|
|
|
# Convenience functions for common scenarios
|
|
def scenario_timeout(proxy_ip, proxy_port):
|
|
"""Configure proxy to fail with timeout."""
|
|
configure_scenario(proxy_ip, proxy_port, fail=True, error=RS_E_HIT_TIMEOUT)
|
|
|
|
|
|
def scenario_refused(proxy_ip, proxy_port):
|
|
"""Configure proxy to fail with connection refused."""
|
|
configure_scenario(proxy_ip, proxy_port, fail=True, error=RS_E_TARGET_CONN_REFUSED)
|
|
|
|
|
|
def scenario_auth_fail(proxy_ip, proxy_port):
|
|
"""Configure proxy to fail with authentication error."""
|
|
configure_scenario(proxy_ip, proxy_port, fail=True, error=RS_E_PROXY_AUTH_FAILED)
|
|
|
|
|
|
def scenario_success(proxy_ip, proxy_port, exit_ip='192.0.2.1'):
|
|
"""Configure proxy to succeed with specific exit IP."""
|
|
configure_scenario(proxy_ip, proxy_port, response=exit_ip.encode('utf-8'))
|