diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 0000000..6bbecf1 --- /dev/null +++ b/tests/__init__.py @@ -0,0 +1,2 @@ +# -*- coding: utf-8 -*- +"""PPF unit tests.""" diff --git a/tests/conftest.py b/tests/conftest.py new file mode 100644 index 0000000..9847e10 --- /dev/null +++ b/tests/conftest.py @@ -0,0 +1,129 @@ +# -*- coding: utf-8 -*- +"""Shared pytest fixtures for PPF tests.""" +from __future__ import print_function + +import os +import sys +import tempfile +import shutil + +import pytest + +# Add parent directory to path for imports +sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) + +import mysqlite +import dbs +import misc + + +@pytest.fixture(autouse=True) +def mock_logging(monkeypatch): + """Mock _log to avoid Python 2 print syntax errors in Python 3. + + Must patch in both misc and dbs modules since dbs uses 'from misc import _log'. + """ + def noop_log(msg, level='info'): + pass + monkeypatch.setattr(misc, '_log', noop_log) + monkeypatch.setattr(dbs, '_log', noop_log) + + +@pytest.fixture +def temp_db(): + """Create a temporary SQLite database for testing. + + Yields a tuple of (sqlite_wrapper, db_path). + Database and file are cleaned up after test. + """ + fd, db_path = tempfile.mkstemp(suffix='.sqlite') + os.close(fd) + + sqlite = mysqlite.mysqlite(db_path) + + yield sqlite, db_path + + # Cleanup + sqlite.close() + if os.path.exists(db_path): + os.unlink(db_path) + + +@pytest.fixture +def proxy_db(temp_db): + """Create a temporary database with proxylist table initialized. + + Yields a tuple of (sqlite_wrapper, db_path). + """ + sqlite, db_path = temp_db + dbs.create_table_if_not_exists(sqlite, 'proxylist') + yield sqlite, db_path + + +@pytest.fixture +def uri_db(temp_db): + """Create a temporary database with uris table initialized. + + Yields a tuple of (sqlite_wrapper, db_path). + """ + sqlite, db_path = temp_db + dbs.create_table_if_not_exists(sqlite, 'uris') + yield sqlite, db_path + + +@pytest.fixture +def full_db(temp_db): + """Create a temporary database with both proxylist and uris tables. + + Yields a tuple of (sqlite_wrapper, db_path). + """ + sqlite, db_path = temp_db + dbs.create_table_if_not_exists(sqlite, 'proxylist') + dbs.create_table_if_not_exists(sqlite, 'uris') + yield sqlite, db_path + + +@pytest.fixture +def temp_dir(): + """Create a temporary directory for testing. + + Yields the directory path. Cleaned up after test. + """ + dirpath = tempfile.mkdtemp() + yield dirpath + shutil.rmtree(dirpath, ignore_errors=True) + + +@pytest.fixture +def sample_proxies(): + """Return a list of sample proxy strings for testing.""" + return [ + '1.2.3.4:8080', + '5.6.7.8:3128', + '9.10.11.12:1080', + ] + + +@pytest.fixture +def sample_private_ips(): + """Return a list of private/reserved IP proxies that should be rejected.""" + return [ + '10.0.0.1:8080', # Private class A + '172.16.0.1:8080', # Private class B + '192.168.1.1:8080', # Private class C + '127.0.0.1:8080', # Loopback + '169.254.1.1:8080', # Link-local + '224.0.0.1:8080', # Multicast + '100.64.0.1:8080', # CGNAT + ] + + +@pytest.fixture +def sample_cdn_ips(): + """Return a list of CDN IP proxies that should be filtered.""" + return [ + '141.101.1.1:8080', # Cloudflare + '151.101.1.1:8080', # Fastly + '23.32.1.1:8080', # Akamai + '13.32.1.1:8080', # AWS CloudFront + ] diff --git a/tests/fixtures/sample_html.txt b/tests/fixtures/sample_html.txt new file mode 100644 index 0000000..d7e2533 --- /dev/null +++ b/tests/fixtures/sample_html.txt @@ -0,0 +1,31 @@ + + +

Free Proxy List

+ + + + + + + + + + + + + + + + + + + + + + + + + +
IP AddressPortTypeCountry
1.2.3.48080SOCKS5US
5.6.7.83128HTTPDE
9.10.11.121080SOCKS4FR
+ + diff --git a/tests/fixtures/sample_json.txt b/tests/fixtures/sample_json.txt new file mode 100644 index 0000000..4da6b8b --- /dev/null +++ b/tests/fixtures/sample_json.txt @@ -0,0 +1,5 @@ +[ + {"ip": "1.2.3.4", "port": 8080, "type": "socks5"}, + {"ip": "5.6.7.8", "port": 3128, "type": "http"}, + {"host": "9.10.11.12", "port": 1080, "protocol": "socks4"} +] diff --git a/tests/fixtures/sample_mixed.txt b/tests/fixtures/sample_mixed.txt new file mode 100644 index 0000000..7ba7d8f --- /dev/null +++ b/tests/fixtures/sample_mixed.txt @@ -0,0 +1,24 @@ +Free Proxy List - Updated Daily + +Plain proxies: +1.2.3.4:8080 +5.6.7.8:3128 +9.10.11.12:1080 + +With protocol hints: +socks5 11.22.33.44:1080 +http: 55.66.77.88:8080 +socks4 - 99.100.101.102:1080 + +Auth proxies: +user:pass@111.112.113.114:8080 +socks5://admin:secret@115.116.117.118:1080 + +JSON embedded: +{"data": [{"ip": "121.122.123.124", "port": 8080}]} + +Table format: + + + +
IPPort
131.132.133.1343128
diff --git a/tests/mock_network.py b/tests/mock_network.py new file mode 100644 index 0000000..7f3d7bd --- /dev/null +++ b/tests/mock_network.py @@ -0,0 +1,244 @@ +# -*- coding: utf-8 -*- +"""Network mocking utilities for PPF offline testing. + +Provides mock implementations of rocksock and connection pool for testing +proxy validation logic without actual network calls. +""" +from __future__ import print_function + +import sys +import os + +sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) + +# Import rocksock error codes for mock exceptions +try: + import rocksock + RS_E_TARGET_CONN_REFUSED = rocksock.RS_E_TARGET_CONN_REFUSED + RS_E_HIT_TIMEOUT = rocksock.RS_E_HIT_TIMEOUT + RS_E_PROXY_AUTH_FAILED = rocksock.RS_E_PROXY_AUTH_FAILED + RS_E_REMOTE_DISCONNECTED = rocksock.RS_E_REMOTE_DISCONNECTED + RS_E_PROXY_UNEXPECTED_RESPONSE = rocksock.RS_E_PROXY_UNEXPECTED_RESPONSE + RS_ET_OWN = rocksock.RS_ET_OWN + RS_ET_GAI = rocksock.RS_ET_GAI + RS_ET_SSL = rocksock.RS_ET_SSL +except ImportError: + # Define error codes if rocksock not available + RS_E_TARGET_CONN_REFUSED = 20 + RS_E_HIT_TIMEOUT = 5 + RS_E_PROXY_AUTH_FAILED = 7 + RS_E_REMOTE_DISCONNECTED = 24 + RS_E_PROXY_UNEXPECTED_RESPONSE = 11 + RS_ET_OWN = 1 + RS_ET_GAI = 2 + RS_ET_SSL = 3 + + +class MockRocksockException(Exception): + """Mock RocksockException for testing error categorization.""" + + def __init__(self, error=0, errortype=RS_ET_OWN, message=''): + self._error = error + self._errortype = errortype + self._message = message + super(MockRocksockException, self).__init__(message) + + def get_error(self): + return self._error + + def get_errortype(self): + return self._errortype + + def get_errormessage(self): + return self._message + + +# Configurable test scenarios +# Maps (proxy_ip, proxy_port) -> test behavior +MOCK_SCENARIOS = {} + +# Default response for unknown proxies +DEFAULT_RESPONSE = b'192.0.2.1' # TEST-NET-1 IP for testing + + +class MockRocksock: + """Mock rocksock.Rocksock for offline testing. + + Simulates network connections based on configured scenarios. + Can be configured to succeed, fail, or return specific responses. + """ + + def __init__(self, host=None, port=None, proxies=None, ssl=False, + timeout=None, **kwargs): + self.host = host + self.port = port + self.proxies = proxies or [] + self.ssl = ssl + self.timeout = timeout + self.connected = False + self._response = DEFAULT_RESPONSE + self._scenario = None + + # Determine scenario based on target proxy (second in chain) + if len(self.proxies) >= 2: + target_proxy = self.proxies[1] + if hasattr(target_proxy, 'hostinfo'): + key = (target_proxy.hostinfo.host, target_proxy.hostinfo.port) + self._scenario = MOCK_SCENARIOS.get(key) + + def connect(self): + """Simulate connection based on scenario.""" + if self._scenario: + if self._scenario.get('fail'): + error = self._scenario.get('error', RS_E_TARGET_CONN_REFUSED) + errortype = self._scenario.get('errortype', RS_ET_OWN) + raise MockRocksockException(error, errortype, 'Mock connection failed') + if self._scenario.get('response'): + self._response = self._scenario['response'] + + self.connected = True + return True + + def send(self, data): + """Simulate sending data.""" + if not self.connected: + raise MockRocksockException(RS_E_REMOTE_DISCONNECTED, RS_ET_OWN, 'Not connected') + return len(data) + + def recv(self, n=-1): + """Return configured response.""" + if not self.connected: + raise MockRocksockException(RS_E_REMOTE_DISCONNECTED, RS_ET_OWN, 'Not connected') + return self._response + + def recvline(self): + """Return response as line.""" + return self.recv() + b'\n' + + def disconnect(self): + """Close mock connection.""" + self.connected = False + + +class MockTorHostState: + """Mock TorHostState for testing.""" + + def __init__(self, host='127.0.0.1:9050'): + self.host = host + self.success_count = 0 + self.failure_count = 0 + self.latency_sum = 0.0 + self.available = True + self.backoff_until = 0 + + def record_success(self, latency=0): + self.success_count += 1 + self.latency_sum += latency + + def record_failure(self): + self.failure_count += 1 + + def is_available(self): + return self.available + + def get_latency(self): + if self.success_count == 0: + return 0 + return self.latency_sum / self.success_count + + +class MockTorConnectionPool: + """Mock TorConnectionPool for testing. + + Provides a simplified pool that returns configured Tor hosts + without making actual network connections. + """ + + def __init__(self, tor_hosts=None, warmup=True): + self.tor_hosts = tor_hosts or ['127.0.0.1:9050'] + self.warmup_complete = True + self.host_states = {h: MockTorHostState(h) for h in self.tor_hosts} + self.successes = [] + self.failures = [] + + def get_tor_host(self, worker_id=None): + """Return first available Tor host.""" + for host in self.tor_hosts: + state = self.host_states.get(host) + if state and state.is_available(): + return host + return None + + def record_success(self, host, latency=0): + """Record successful connection.""" + self.successes.append((host, latency)) + if host in self.host_states: + self.host_states[host].record_success(latency) + + def record_failure(self, host): + """Record failed connection.""" + self.failures.append(host) + if host in self.host_states: + self.host_states[host].record_failure() + + def get_stats(self): + """Return pool statistics.""" + return { + 'available_hosts': sum(1 for h in self.host_states.values() if h.is_available()), + 'total_hosts': len(self.tor_hosts), + 'total_successes': sum(h.success_count for h in self.host_states.values()), + 'total_failures': sum(h.failure_count for h in self.host_states.values()), + } + + +def configure_scenario(proxy_ip, proxy_port, **kwargs): + """Configure test scenario for a specific proxy. + + Args: + proxy_ip: Proxy IP address + proxy_port: Proxy port number + **kwargs: Scenario configuration: + - fail: If True, connection will fail + - error: Error code to raise (default: RS_E_TARGET_CONN_REFUSED) + - errortype: Error type (default: RS_ET_OWN) + - response: Bytes to return from recv() + + Example: + # Configure proxy to fail with timeout + configure_scenario('1.2.3.4', 8080, fail=True, error=RS_E_HIT_TIMEOUT) + + # Configure proxy to succeed with specific exit IP + configure_scenario('5.6.7.8', 3128, response=b'203.0.113.50') + """ + MOCK_SCENARIOS[(proxy_ip, proxy_port)] = kwargs + + +def clear_scenarios(): + """Clear all configured test scenarios.""" + MOCK_SCENARIOS.clear() + + +def reset_mock_state(): + """Reset all mock state for clean test runs.""" + clear_scenarios() + + +# Convenience functions for common scenarios +def scenario_timeout(proxy_ip, proxy_port): + """Configure proxy to fail with timeout.""" + configure_scenario(proxy_ip, proxy_port, fail=True, error=RS_E_HIT_TIMEOUT) + + +def scenario_refused(proxy_ip, proxy_port): + """Configure proxy to fail with connection refused.""" + configure_scenario(proxy_ip, proxy_port, fail=True, error=RS_E_TARGET_CONN_REFUSED) + + +def scenario_auth_fail(proxy_ip, proxy_port): + """Configure proxy to fail with authentication error.""" + configure_scenario(proxy_ip, proxy_port, fail=True, error=RS_E_PROXY_AUTH_FAILED) + + +def scenario_success(proxy_ip, proxy_port, exit_ip='192.0.2.1'): + """Configure proxy to succeed with specific exit IP.""" + configure_scenario(proxy_ip, proxy_port, response=exit_ip.encode('utf-8')) diff --git a/tests/test_dbs.py b/tests/test_dbs.py new file mode 100644 index 0000000..2cadb83 --- /dev/null +++ b/tests/test_dbs.py @@ -0,0 +1,427 @@ +# -*- coding: utf-8 -*- +"""Tests for dbs.py database operations.""" +from __future__ import print_function + +import sys +import os + +import pytest + +sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) + +import dbs + + +class TestIsCdnIp: + """Tests for is_cdn_ip() function.""" + + def test_cloudflare_ips(self): + """Cloudflare IPs are detected as CDN.""" + assert dbs.is_cdn_ip('141.101.1.1') is True + assert dbs.is_cdn_ip('141.101.255.255') is True + assert dbs.is_cdn_ip('104.16.1.1') is True + assert dbs.is_cdn_ip('172.64.1.1') is True + + def test_fastly_ips(self): + """Fastly IPs are detected as CDN.""" + assert dbs.is_cdn_ip('151.101.1.1') is True + assert dbs.is_cdn_ip('151.101.128.1') is True + + def test_akamai_ips(self): + """Akamai IPs are detected as CDN.""" + assert dbs.is_cdn_ip('23.32.1.1') is True + assert dbs.is_cdn_ip('23.64.1.1') is True + + def test_cloudfront_ips(self): + """Amazon CloudFront IPs are detected as CDN.""" + assert dbs.is_cdn_ip('13.32.1.1') is True + assert dbs.is_cdn_ip('13.224.1.1') is True + + def test_google_ips(self): + """Google IPs are detected as CDN.""" + assert dbs.is_cdn_ip('34.64.1.1') is True + assert dbs.is_cdn_ip('34.71.1.1') is True + + def test_regular_ips_not_cdn(self): + """Regular public IPs are not CDN.""" + assert dbs.is_cdn_ip('1.2.3.4') is False + assert dbs.is_cdn_ip('8.8.8.8') is False + assert dbs.is_cdn_ip('203.0.113.50') is False + + def test_edge_case_prefix_mismatch(self): + """Similar but non-CDN prefixes are not detected.""" + assert dbs.is_cdn_ip('141.100.1.1') is False # Not 141.101. + assert dbs.is_cdn_ip('104.15.1.1') is False # Not 104.16. + + +class TestComputeProxyListHash: + """Tests for compute_proxy_list_hash() function.""" + + def test_empty_list_returns_none(self): + """Empty list returns None.""" + assert dbs.compute_proxy_list_hash([]) is None + assert dbs.compute_proxy_list_hash(None) is None + + def test_single_proxy_hash(self): + """Single proxy produces consistent hash.""" + hash1 = dbs.compute_proxy_list_hash(['1.2.3.4:8080']) + hash2 = dbs.compute_proxy_list_hash(['1.2.3.4:8080']) + assert hash1 == hash2 + assert len(hash1) == 32 # MD5 hex length + + def test_order_independent(self): + """Hash is order-independent (sorted internally).""" + hash1 = dbs.compute_proxy_list_hash(['1.2.3.4:8080', '5.6.7.8:3128']) + hash2 = dbs.compute_proxy_list_hash(['5.6.7.8:3128', '1.2.3.4:8080']) + assert hash1 == hash2 + + def test_different_lists_different_hash(self): + """Different proxy lists produce different hashes.""" + hash1 = dbs.compute_proxy_list_hash(['1.2.3.4:8080']) + hash2 = dbs.compute_proxy_list_hash(['5.6.7.8:3128']) + assert hash1 != hash2 + + def test_tuple_format(self): + """Handles tuple format (address, proto).""" + hash1 = dbs.compute_proxy_list_hash([('1.2.3.4:8080', 'socks5')]) + hash2 = dbs.compute_proxy_list_hash(['1.2.3.4:8080']) + # Should extract address from tuple + assert hash1 == hash2 + + +class TestCreateTableIfNotExists: + """Tests for create_table_if_not_exists() function.""" + + def test_create_proxylist_table(self, temp_db): + """Creates proxylist table with correct schema.""" + sqlite, _ = temp_db + dbs.create_table_if_not_exists(sqlite, 'proxylist') + + # Verify table exists by querying it + result = sqlite.execute('SELECT COUNT(*) FROM proxylist').fetchone() + assert result[0] == 0 + + def test_create_uris_table(self, temp_db): + """Creates uris table with correct schema.""" + sqlite, _ = temp_db + dbs.create_table_if_not_exists(sqlite, 'uris') + + # Verify table exists + result = sqlite.execute('SELECT COUNT(*) FROM uris').fetchone() + assert result[0] == 0 + + def test_idempotent_creation(self, temp_db): + """Calling twice doesn't cause error.""" + sqlite, _ = temp_db + dbs.create_table_if_not_exists(sqlite, 'proxylist') + dbs.create_table_if_not_exists(sqlite, 'proxylist') + # No exception means success + + def test_proxylist_has_required_columns(self, proxy_db): + """Proxylist table has all required columns.""" + sqlite, _ = proxy_db + # Insert a row to test columns + sqlite.execute( + 'INSERT INTO proxylist (added, proxy, ip, port, failed) VALUES (?, ?, ?, ?, ?)', + (1234567890, '1.2.3.4:8080', '1.2.3.4', '8080', 0) + ) + sqlite.commit() + + # Verify we can query various columns + row = sqlite.execute( + 'SELECT proxy, ip, port, proto, failed, tested, avg_latency, anonymity ' + 'FROM proxylist WHERE proxy = ?', + ('1.2.3.4:8080',) + ).fetchone() + assert row is not None + assert row[0] == '1.2.3.4:8080' + + +class TestInsertProxies: + """Tests for insert_proxies() function.""" + + def test_insert_plain_strings(self, proxy_db): + """Insert plain proxy strings.""" + sqlite, _ = proxy_db + proxies = ['1.2.3.4:8080', '5.6.7.8:3128'] + dbs.insert_proxies(sqlite, proxies, 'http://test.com') + + count = sqlite.execute('SELECT COUNT(*) FROM proxylist').fetchone()[0] + assert count == 2 + + def test_insert_tuples_with_proto(self, proxy_db): + """Insert tuples with protocol.""" + sqlite, _ = proxy_db + proxies = [('1.2.3.4:8080', 'socks5'), ('5.6.7.8:3128', 'http')] + dbs.insert_proxies(sqlite, proxies, 'http://test.com') + + row = sqlite.execute( + 'SELECT proto FROM proxylist WHERE proxy = ?', + ('1.2.3.4:8080',) + ).fetchone() + assert row[0] == 'socks5' + + def test_insert_tuples_with_confidence(self, proxy_db): + """Insert tuples with confidence score.""" + sqlite, _ = proxy_db + proxies = [('1.2.3.4:8080', 'socks5', 85)] + dbs.insert_proxies(sqlite, proxies, 'http://test.com') + + row = sqlite.execute( + 'SELECT confidence FROM proxylist WHERE proxy = ?', + ('1.2.3.4:8080',) + ).fetchone() + assert row[0] == 85 + + def test_filters_cdn_ips(self, proxy_db): + """CDN IPs are filtered out.""" + sqlite, _ = proxy_db + proxies = [ + '1.2.3.4:8080', # Regular - should be inserted + '141.101.1.1:8080', # Cloudflare CDN - should be filtered + ] + dbs.insert_proxies(sqlite, proxies, 'http://test.com') + + count = sqlite.execute('SELECT COUNT(*) FROM proxylist').fetchone()[0] + assert count == 1 # Only non-CDN proxy inserted + + def test_empty_list_no_error(self, proxy_db): + """Empty list doesn't cause error.""" + sqlite, _ = proxy_db + dbs.insert_proxies(sqlite, [], 'http://test.com') + # No exception means success + + def test_duplicate_ignored(self, proxy_db): + """Duplicate proxies are ignored (INSERT OR IGNORE).""" + sqlite, _ = proxy_db + dbs.insert_proxies(sqlite, ['1.2.3.4:8080'], 'http://test1.com') + dbs.insert_proxies(sqlite, ['1.2.3.4:8080'], 'http://test2.com') + + count = sqlite.execute('SELECT COUNT(*) FROM proxylist').fetchone()[0] + assert count == 1 + + +class TestInsertUrls: + """Tests for insert_urls() function.""" + + def test_insert_new_urls(self, uri_db): + """Insert new URLs returns count of inserted.""" + sqlite, _ = uri_db + urls = ['http://example.com/1', 'http://example.com/2'] + count = dbs.insert_urls(urls, 'test query', sqlite) + assert count == 2 + + def test_duplicate_urls_not_counted(self, uri_db): + """Duplicate URLs not counted in return value.""" + sqlite, _ = uri_db + urls = ['http://example.com/1'] + count1 = dbs.insert_urls(urls, 'test query', sqlite) + count2 = dbs.insert_urls(urls, 'test query', sqlite) + + assert count1 == 1 + assert count2 == 0 + + def test_mixed_new_and_duplicate(self, uri_db): + """Mixed new and duplicate URLs counted correctly.""" + sqlite, _ = uri_db + dbs.insert_urls(['http://example.com/1'], 'test', sqlite) + count = dbs.insert_urls( + ['http://example.com/1', 'http://example.com/2', 'http://example.com/3'], + 'test', sqlite + ) + assert count == 2 # Only 2 new URLs + + def test_empty_list_returns_zero(self, uri_db): + """Empty list returns 0.""" + sqlite, _ = uri_db + count = dbs.insert_urls([], 'test', sqlite) + assert count == 0 + + +class TestUpdateProxyLatency: + """Tests for update_proxy_latency() function.""" + + def test_first_latency_sample(self, proxy_db): + """First latency sample sets avg_latency directly.""" + sqlite, _ = proxy_db + dbs.insert_proxies(sqlite, ['1.2.3.4:8080'], 'http://test.com') + dbs.update_proxy_latency(sqlite, '1.2.3.4:8080', 100.0) + sqlite.commit() + + row = sqlite.execute( + 'SELECT avg_latency, latency_samples FROM proxylist WHERE proxy = ?', + ('1.2.3.4:8080',) + ).fetchone() + assert row[0] == 100.0 + assert row[1] == 1 + + def test_ema_calculation(self, proxy_db): + """Exponential moving average is calculated correctly.""" + sqlite, _ = proxy_db + dbs.insert_proxies(sqlite, ['1.2.3.4:8080'], 'http://test.com') + + # First sample: 100ms + dbs.update_proxy_latency(sqlite, '1.2.3.4:8080', 100.0) + sqlite.commit() + + # Second sample: 50ms + # EMA: alpha = 2/(2+1) = 0.667, new_avg = 0.667*50 + 0.333*100 = 66.67 + dbs.update_proxy_latency(sqlite, '1.2.3.4:8080', 50.0) + sqlite.commit() + + row = sqlite.execute( + 'SELECT avg_latency, latency_samples FROM proxylist WHERE proxy = ?', + ('1.2.3.4:8080',) + ).fetchone() + assert row[1] == 2 + # Check EMA is roughly correct (allow for floating point) + assert 65 < row[0] < 68 + + def test_nonexistent_proxy_no_error(self, proxy_db): + """Updating nonexistent proxy doesn't cause error.""" + sqlite, _ = proxy_db + dbs.update_proxy_latency(sqlite, 'nonexistent:8080', 100.0) + # No exception means success + + +class TestBatchUpdateProxyLatency: + """Tests for batch_update_proxy_latency() function.""" + + def test_batch_update_multiple(self, proxy_db): + """Batch update updates multiple proxies.""" + sqlite, _ = proxy_db + dbs.insert_proxies(sqlite, ['1.2.3.4:8080', '5.6.7.8:3128'], 'http://test.com') + + updates = [('1.2.3.4:8080', 100.0), ('5.6.7.8:3128', 200.0)] + dbs.batch_update_proxy_latency(sqlite, updates) + sqlite.commit() + + row1 = sqlite.execute( + 'SELECT avg_latency FROM proxylist WHERE proxy = ?', + ('1.2.3.4:8080',) + ).fetchone() + row2 = sqlite.execute( + 'SELECT avg_latency FROM proxylist WHERE proxy = ?', + ('5.6.7.8:3128',) + ).fetchone() + + assert row1[0] == 100.0 + assert row2[0] == 200.0 + + def test_empty_list_no_error(self, proxy_db): + """Empty update list doesn't cause error.""" + sqlite, _ = proxy_db + dbs.batch_update_proxy_latency(sqlite, []) + # No exception means success + + +class TestUpdateProxyAnonymity: + """Tests for update_proxy_anonymity() function.""" + + def test_transparent_proxy(self, proxy_db): + """Transparent proxy detected when exit_ip equals proxy_ip.""" + sqlite, _ = proxy_db + dbs.insert_proxies(sqlite, ['1.2.3.4:8080'], 'http://test.com') + + dbs.update_proxy_anonymity(sqlite, '1.2.3.4:8080', '1.2.3.4', '1.2.3.4') + sqlite.commit() + + row = sqlite.execute( + 'SELECT anonymity, exit_ip FROM proxylist WHERE proxy = ?', + ('1.2.3.4:8080',) + ).fetchone() + assert row[0] == 'transparent' + assert row[1] == '1.2.3.4' + + def test_elite_proxy(self, proxy_db): + """Elite proxy detected when exit_ip differs and no revealing headers.""" + sqlite, _ = proxy_db + dbs.insert_proxies(sqlite, ['1.2.3.4:8080'], 'http://test.com') + + dbs.update_proxy_anonymity(sqlite, '1.2.3.4:8080', '5.6.7.8', '1.2.3.4', + reveals_headers=False) + sqlite.commit() + + row = sqlite.execute( + 'SELECT anonymity FROM proxylist WHERE proxy = ?', + ('1.2.3.4:8080',) + ).fetchone() + assert row[0] == 'elite' + + def test_anonymous_proxy(self, proxy_db): + """Anonymous proxy detected when exit_ip differs but reveals headers.""" + sqlite, _ = proxy_db + dbs.insert_proxies(sqlite, ['1.2.3.4:8080'], 'http://test.com') + + dbs.update_proxy_anonymity(sqlite, '1.2.3.4:8080', '5.6.7.8', '1.2.3.4', + reveals_headers=True) + sqlite.commit() + + row = sqlite.execute( + 'SELECT anonymity FROM proxylist WHERE proxy = ?', + ('1.2.3.4:8080',) + ).fetchone() + assert row[0] == 'anonymous' + + def test_normalizes_leading_zeros(self, proxy_db): + """IP addresses with leading zeros are normalized.""" + sqlite, _ = proxy_db + dbs.insert_proxies(sqlite, ['1.2.3.4:8080'], 'http://test.com') + + # Same IP with leading zeros should be detected as transparent + dbs.update_proxy_anonymity(sqlite, '1.2.3.4:8080', '001.002.003.004', '1.2.3.4') + sqlite.commit() + + row = sqlite.execute( + 'SELECT anonymity FROM proxylist WHERE proxy = ?', + ('1.2.3.4:8080',) + ).fetchone() + assert row[0] == 'transparent' + + +class TestGetDatabaseStats: + """Tests for get_database_stats() function.""" + + def test_empty_database_stats(self, full_db): + """Empty database returns zero counts.""" + sqlite, _ = full_db + stats = dbs.get_database_stats(sqlite) + + assert stats['proxy_count'] == 0 + assert stats['working_count'] == 0 + assert 'page_count' in stats + assert 'total_size' in stats + + def test_stats_after_inserts(self, full_db): + """Stats reflect inserted proxies.""" + sqlite, _ = full_db + dbs.insert_proxies(sqlite, ['1.2.3.4:8080', '5.6.7.8:3128'], 'http://test.com') + + stats = dbs.get_database_stats(sqlite) + assert stats['proxy_count'] == 2 + + +class TestAnalyzeVacuum: + """Tests for analyze_database() and vacuum_database() functions.""" + + def test_analyze_no_error(self, proxy_db): + """analyze_database() runs without error.""" + sqlite, _ = proxy_db + dbs.analyze_database(sqlite) + # No exception means success + + def test_vacuum_no_error(self, proxy_db): + """vacuum_database() runs without error.""" + sqlite, _ = proxy_db + dbs.vacuum_database(sqlite) + # No exception means success + + def test_analyze_vacuum_sequence(self, proxy_db): + """Running analyze then vacuum works.""" + sqlite, _ = proxy_db + dbs.insert_proxies(sqlite, ['1.2.3.4:8080'], 'http://test.com') + dbs.analyze_database(sqlite) + dbs.vacuum_database(sqlite) + # Database still valid + count = sqlite.execute('SELECT COUNT(*) FROM proxylist').fetchone()[0] + assert count == 1 diff --git a/tests/test_fetch.py b/tests/test_fetch.py new file mode 100644 index 0000000..7f1b16b --- /dev/null +++ b/tests/test_fetch.py @@ -0,0 +1,380 @@ +# -*- coding: utf-8 -*- +"""Tests for fetch.py proxy validation and extraction functions.""" +from __future__ import print_function + +import sys +import os + +import pytest + +sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) + +# fetch.py has Python 2 dependencies - skip tests if import fails +try: + import fetch + FETCH_AVAILABLE = True +except ImportError as e: + FETCH_AVAILABLE = False + fetch = None + +pytestmark = pytest.mark.skipif(not FETCH_AVAILABLE, reason="fetch module requires Python 2") + + +class TestValidPort: + """Tests for valid_port() function.""" + + def test_port_zero_invalid(self): + """Port 0 is invalid.""" + assert fetch.valid_port(0) is False + + def test_port_one_valid(self): + """Port 1 is valid.""" + assert fetch.valid_port(1) is True + + def test_port_max_valid(self): + """Port 65535 is valid.""" + assert fetch.valid_port(65535) is True + + def test_port_over_max_invalid(self): + """Port 65536 is invalid.""" + assert fetch.valid_port(65536) is False + + def test_negative_port_invalid(self): + """Negative port is invalid.""" + assert fetch.valid_port(-1) is False + + def test_common_ports_valid(self): + """Common proxy ports are valid.""" + assert fetch.valid_port(80) is True + assert fetch.valid_port(443) is True + assert fetch.valid_port(1080) is True + assert fetch.valid_port(3128) is True + assert fetch.valid_port(8080) is True + assert fetch.valid_port(9050) is True + + +class TestIsValidIpv6: + """Tests for is_valid_ipv6() function.""" + + def test_valid_global_unicast(self): + """Valid global unicast addresses pass.""" + assert fetch.is_valid_ipv6('2001:db8::1') is True + assert fetch.is_valid_ipv6('2001:0db8:0000:0000:0000:ff00:0042:8329') is True + + def test_valid_full_address(self): + """Full 8-segment address is valid.""" + assert fetch.is_valid_ipv6('2001:0db8:85a3:0000:0000:8a2e:0370:7334') is True + + def test_reject_loopback(self): + """Loopback ::1 is rejected.""" + assert fetch.is_valid_ipv6('::1') is False + assert fetch.is_valid_ipv6('0:0:0:0:0:0:0:1') is False + + def test_reject_unspecified(self): + """Unspecified :: is rejected.""" + assert fetch.is_valid_ipv6('::') is False + assert fetch.is_valid_ipv6('0:0:0:0:0:0:0:0') is False + + def test_reject_link_local(self): + """Link-local fe80::/10 is rejected.""" + assert fetch.is_valid_ipv6('fe80::1') is False + assert fetch.is_valid_ipv6('fe90::1') is False + assert fetch.is_valid_ipv6('fea0::1') is False + assert fetch.is_valid_ipv6('feb0::1') is False + + def test_reject_unique_local(self): + """Unique local fc00::/7 is rejected.""" + assert fetch.is_valid_ipv6('fc00::1') is False + assert fetch.is_valid_ipv6('fd00::1') is False + assert fetch.is_valid_ipv6('fdff::1') is False + + def test_reject_multicast(self): + """Multicast ff00::/8 is rejected.""" + assert fetch.is_valid_ipv6('ff00::1') is False + assert fetch.is_valid_ipv6('ff02::1') is False + + def test_reject_invalid_format(self): + """Malformed addresses are rejected.""" + assert fetch.is_valid_ipv6('gggg::1') is False + assert fetch.is_valid_ipv6('not-an-ipv6') is False + assert fetch.is_valid_ipv6('') is False + + def test_reject_multiple_double_colon(self): + """Multiple :: in address is invalid.""" + assert fetch.is_valid_ipv6('2001::db8::1') is False + + +class TestIsUsableProxy: + """Tests for is_usable_proxy() function.""" + + def test_valid_public_ipv4(self): + """Valid public IPv4 proxies pass.""" + assert fetch.is_usable_proxy('1.2.3.4:8080') is True + assert fetch.is_usable_proxy('8.8.8.8:3128') is True + assert fetch.is_usable_proxy('203.0.113.50:1080') is True + + def test_reject_private_class_a(self): + """Private 10.0.0.0/8 is rejected.""" + assert fetch.is_usable_proxy('10.0.0.1:8080') is False + assert fetch.is_usable_proxy('10.255.255.255:8080') is False + + def test_reject_private_class_b(self): + """Private 172.16.0.0/12 is rejected.""" + assert fetch.is_usable_proxy('172.16.0.1:8080') is False + assert fetch.is_usable_proxy('172.31.255.255:8080') is False + # 172.15.x.x and 172.32.x.x should be valid + assert fetch.is_usable_proxy('172.15.0.1:8080') is True + assert fetch.is_usable_proxy('172.32.0.1:8080') is True + + def test_reject_private_class_c(self): + """Private 192.168.0.0/16 is rejected.""" + assert fetch.is_usable_proxy('192.168.1.1:8080') is False + assert fetch.is_usable_proxy('192.168.0.1:8080') is False + + def test_reject_loopback(self): + """Loopback 127.0.0.0/8 is rejected.""" + assert fetch.is_usable_proxy('127.0.0.1:8080') is False + assert fetch.is_usable_proxy('127.255.255.255:8080') is False + + def test_reject_link_local(self): + """Link-local 169.254.0.0/16 is rejected.""" + assert fetch.is_usable_proxy('169.254.1.1:8080') is False + + def test_reject_cgnat(self): + """CGNAT 100.64.0.0/10 is rejected.""" + assert fetch.is_usable_proxy('100.64.0.1:8080') is False + assert fetch.is_usable_proxy('100.127.255.255:8080') is False + # 100.63.x.x and 100.128.x.x should be valid + assert fetch.is_usable_proxy('100.63.0.1:8080') is True + assert fetch.is_usable_proxy('100.128.0.1:8080') is True + + def test_reject_multicast(self): + """Multicast 224.0.0.0/4 is rejected.""" + assert fetch.is_usable_proxy('224.0.0.1:8080') is False + assert fetch.is_usable_proxy('239.255.255.255:8080') is False + + def test_reject_reserved(self): + """Reserved 240.0.0.0/4 is rejected.""" + assert fetch.is_usable_proxy('240.0.0.1:8080') is False + assert fetch.is_usable_proxy('255.255.255.255:8080') is False + + def test_reject_zero_first_octet(self): + """0.0.0.0/8 is rejected.""" + assert fetch.is_usable_proxy('0.0.0.0:8080') is False + assert fetch.is_usable_proxy('0.1.2.3:8080') is False + + def test_reject_invalid_port_zero(self): + """Port 0 is rejected.""" + assert fetch.is_usable_proxy('1.2.3.4:0') is False + + def test_reject_invalid_port_high(self): + """Port > 65535 is rejected.""" + assert fetch.is_usable_proxy('1.2.3.4:65536') is False + assert fetch.is_usable_proxy('1.2.3.4:99999') is False + + def test_reject_malformed_ip(self): + """Malformed IP addresses are rejected.""" + assert fetch.is_usable_proxy('1.2.3:8080') is False + assert fetch.is_usable_proxy('1.2.3.4.5:8080') is False + assert fetch.is_usable_proxy('not-an-ip:8080') is False + assert fetch.is_usable_proxy('1.2.3.256:8080') is False + + def test_reject_no_colon(self): + """String without colon is rejected.""" + assert fetch.is_usable_proxy('1.2.3.4') is False + + def test_auth_format_valid(self): + """Authenticated proxy format is valid.""" + assert fetch.is_usable_proxy('user:pass@1.2.3.4:8080') is True + + def test_auth_private_ip_rejected(self): + """Auth format with private IP is rejected.""" + assert fetch.is_usable_proxy('user:pass@192.168.1.1:8080') is False + + def test_ipv6_valid(self): + """Valid IPv6 proxy is accepted.""" + assert fetch.is_usable_proxy('[2001:db8::1]:8080') is True + + def test_ipv6_loopback_rejected(self): + """IPv6 loopback is rejected.""" + assert fetch.is_usable_proxy('[::1]:8080') is False + + def test_ipv6_malformed_rejected(self): + """Malformed IPv6 is rejected.""" + assert fetch.is_usable_proxy('[not-ipv6]:8080') is False + + +class TestNormalizeProto: + """Tests for _normalize_proto() function.""" + + def test_none_returns_none(self): + """None input returns None.""" + assert fetch._normalize_proto(None) is None + + def test_empty_returns_none(self): + """Empty string returns None.""" + assert fetch._normalize_proto('') is None + + def test_socks5_variants(self): + """SOCKS5 variants normalize to 'socks5'.""" + assert fetch._normalize_proto('socks5') == 'socks5' + assert fetch._normalize_proto('SOCKS5') == 'socks5' + assert fetch._normalize_proto('s5') == 'socks5' + assert fetch._normalize_proto('tor') == 'socks5' + + def test_socks4_variants(self): + """SOCKS4 variants normalize to 'socks4'.""" + assert fetch._normalize_proto('socks4') == 'socks4' + assert fetch._normalize_proto('SOCKS4') == 'socks4' + assert fetch._normalize_proto('socks4a') == 'socks4' + assert fetch._normalize_proto('s4') == 'socks4' + + def test_http_variants(self): + """HTTP variants normalize to 'http'.""" + assert fetch._normalize_proto('http') == 'http' + assert fetch._normalize_proto('HTTP') == 'http' + assert fetch._normalize_proto('https') == 'http' + assert fetch._normalize_proto('connect') == 'http' + assert fetch._normalize_proto('ssl') == 'http' + + def test_unknown_returns_none(self): + """Unknown protocol returns None.""" + assert fetch._normalize_proto('ftp') is None + assert fetch._normalize_proto('unknown') is None + + +class TestDetectProtoFromPath: + """Tests for detect_proto_from_path() function.""" + + def test_socks5_in_path(self): + """Detect socks5 from URL path.""" + assert fetch.detect_proto_from_path('/socks5/') == 'socks5' + assert fetch.detect_proto_from_path('/proxy/socks5.txt') == 'socks5' + assert fetch.detect_proto_from_path('socks5-proxies.txt') == 'socks5' + + def test_socks4_in_path(self): + """Detect socks4 from URL path.""" + assert fetch.detect_proto_from_path('/socks4/') == 'socks4' + assert fetch.detect_proto_from_path('/socks4a/') == 'socks4' + assert fetch.detect_proto_from_path('socks4.txt') == 'socks4' + + def test_http_in_path(self): + """Detect http from URL path.""" + assert fetch.detect_proto_from_path('/http/') == 'http' + assert fetch.detect_proto_from_path('http-proxies.txt') == 'http' + assert fetch.detect_proto_from_path('http_list.txt') == 'http' + assert fetch.detect_proto_from_path('http.txt') == 'http' + + def test_https_ssl_as_http(self): + """HTTPS/SSL paths return 'http' (CONNECT proxies).""" + assert fetch.detect_proto_from_path('/https/') == 'http' + assert fetch.detect_proto_from_path('/ssl/') == 'http' + assert fetch.detect_proto_from_path('/connect/') == 'http' + + def test_no_proto_returns_none(self): + """No protocol indicator returns None.""" + assert fetch.detect_proto_from_path('/proxies/') is None + assert fetch.detect_proto_from_path('/data/list.txt') is None + assert fetch.detect_proto_from_path('') is None + + def test_case_insensitive(self): + """Detection is case-insensitive.""" + assert fetch.detect_proto_from_path('/SOCKS5/') == 'socks5' + assert fetch.detect_proto_from_path('/HTTP/') == 'http' + + +class TestCleanhtml: + """Tests for cleanhtml() function.""" + + def test_strips_tags(self): + """HTML tags are replaced with colons.""" + result = fetch.cleanhtml('1.2.3.4:8080') + assert '1.2.3.4' in result + assert '8080' in result + assert '' not in result + assert '' not in result + + def test_replaces_nbsp(self): + """  is replaced with space.""" + result = fetch.cleanhtml('1.2.3.4 8080') + assert ' ' not in result + + def test_collapses_whitespace(self): + """Multiple whitespace becomes single colon.""" + result = fetch.cleanhtml('1.2.3.4 8080') + # Whitespace collapsed to colon + assert ' ' not in result + + +class TestExtractAuthProxies: + """Tests for extract_auth_proxies() function.""" + + def test_basic_auth_format(self): + """Extract basic user:pass@ip:port format.""" + content = 'some text user:pass@1.2.3.4:8080 more text' + result = fetch.extract_auth_proxies(content) + assert len(result) == 1 + assert result[0][0] == 'user:pass@1.2.3.4:8080' + assert result[0][1] is None + + def test_with_protocol_prefix(self): + """Extract with protocol prefix.""" + content = 'socks5://user:pass@1.2.3.4:8080' + result = fetch.extract_auth_proxies(content) + assert len(result) == 1 + assert result[0][0] == 'user:pass@1.2.3.4:8080' + assert result[0][1] == 'socks5' + + def test_http_protocol(self): + """Extract HTTP auth proxy.""" + content = 'http://alice:secret@5.6.7.8:3128' + result = fetch.extract_auth_proxies(content) + assert len(result) == 1 + assert result[0][1] == 'http' + + def test_multiple_proxies(self): + """Extract multiple auth proxies.""" + content = ''' + user1:pass1@1.2.3.4:8080 + socks5://user2:pass2@5.6.7.8:1080 + ''' + result = fetch.extract_auth_proxies(content) + assert len(result) == 2 + + def test_normalizes_ip(self): + """Leading zeros in IP are normalized.""" + content = 'user:pass@001.002.003.004:8080' + result = fetch.extract_auth_proxies(content) + assert len(result) == 1 + # IP normalized to remove leading zeros + assert '001' not in result[0][0] + assert '1.2.3.4' in result[0][0] + + def test_empty_content(self): + """Empty content returns empty list.""" + assert fetch.extract_auth_proxies('') == [] + + def test_no_match(self): + """Content without auth proxies returns empty list.""" + assert fetch.extract_auth_proxies('just some text') == [] + + +class TestConfidenceScoring: + """Tests for confidence score constants.""" + + def test_auth_highest_confidence(self): + """Auth proxies have highest confidence.""" + assert fetch.CONFIDENCE_AUTH > fetch.CONFIDENCE_JSON + assert fetch.CONFIDENCE_AUTH > fetch.CONFIDENCE_TABLE + + def test_json_above_table(self): + """JSON has higher confidence than table.""" + assert fetch.CONFIDENCE_JSON > fetch.CONFIDENCE_TABLE + + def test_table_above_hint(self): + """Table has higher confidence than hint.""" + assert fetch.CONFIDENCE_TABLE > fetch.CONFIDENCE_HINT + + def test_hint_above_regex(self): + """Hint has higher confidence than regex.""" + assert fetch.CONFIDENCE_HINT > fetch.CONFIDENCE_REGEX diff --git a/tests/test_misc.py b/tests/test_misc.py new file mode 100644 index 0000000..14f1f1e --- /dev/null +++ b/tests/test_misc.py @@ -0,0 +1,248 @@ +# -*- coding: utf-8 -*- +"""Tests for misc.py utility functions.""" +from __future__ import print_function + +import re +import sys +import os + +import pytest + +sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) + +import misc + + +class TestTimestamp: + """Tests for timestamp() function.""" + + def test_timestamp_format(self): + """timestamp() returns HH:MM:SS format.""" + ts = misc.timestamp() + assert re.match(r'^\d{2}:\d{2}:\d{2}$', ts) + + def test_timestamp_valid_hours(self): + """timestamp() hours are 00-23.""" + ts = misc.timestamp() + hours = int(ts.split(':')[0]) + assert 0 <= hours <= 23 + + def test_timestamp_valid_minutes(self): + """timestamp() minutes are 00-59.""" + ts = misc.timestamp() + minutes = int(ts.split(':')[1]) + assert 0 <= minutes <= 59 + + def test_timestamp_valid_seconds(self): + """timestamp() seconds are 00-59.""" + ts = misc.timestamp() + seconds = int(ts.split(':')[2]) + assert 0 <= seconds <= 59 + + +class TestTorProxyUrl: + """Tests for tor_proxy_url() function.""" + + def test_basic_format(self): + """tor_proxy_url() returns socks5:// prefix.""" + result = misc.tor_proxy_url('127.0.0.1:9050') + assert result == 'socks5://127.0.0.1:9050' + + def test_custom_host_port(self): + """tor_proxy_url() works with custom host:port.""" + result = misc.tor_proxy_url('10.200.1.1:9150') + assert result == 'socks5://10.200.1.1:9150' + + def test_ipv6_host(self): + """tor_proxy_url() handles IPv6 addresses.""" + result = misc.tor_proxy_url('[::1]:9050') + assert result == 'socks5://[::1]:9050' + + +class TestLogLevel: + """Tests for set_log_level() and get_log_level() functions.""" + + def setup_method(self): + """Reset log level before each test.""" + misc.set_log_level(1) # Default info level + + def test_get_default_level(self): + """get_log_level() returns default level.""" + assert misc.get_log_level() == 1 + + def test_set_integer_level(self): + """set_log_level() accepts integer.""" + misc.set_log_level(0) + assert misc.get_log_level() == 0 + + misc.set_log_level(2) + assert misc.get_log_level() == 2 + + def test_set_string_debug(self): + """set_log_level('debug') sets level 0.""" + misc.set_log_level('debug') + assert misc.get_log_level() == 0 + + def test_set_string_info(self): + """set_log_level('info') sets level 1.""" + misc.set_log_level('info') + assert misc.get_log_level() == 1 + + def test_set_string_warn(self): + """set_log_level('warn') sets level 2.""" + misc.set_log_level('warn') + assert misc.get_log_level() == 2 + + def test_set_string_error(self): + """set_log_level('error') sets level 3.""" + misc.set_log_level('error') + assert misc.get_log_level() == 3 + + def test_set_string_none(self): + """set_log_level('none') suppresses all.""" + misc.set_log_level('none') + assert misc.get_log_level() == 99 + + def test_invalid_string_defaults_to_info(self): + """set_log_level() with invalid string defaults to 1.""" + misc.set_log_level('invalid') + assert misc.get_log_level() == 1 + + +class TestIsSSLProtocolError: + """Tests for is_ssl_protocol_error() function.""" + + def test_none_returns_false(self): + """is_ssl_protocol_error(None) returns False.""" + assert misc.is_ssl_protocol_error(None) is False + + def test_empty_returns_false(self): + """is_ssl_protocol_error('') returns False.""" + assert misc.is_ssl_protocol_error('') is False + + def test_wrong_version_number(self): + """Detects 'wrong version number' as protocol error.""" + assert misc.is_ssl_protocol_error('wrong version number') is True + + def test_unsupported_protocol(self): + """Detects 'unsupported protocol' as protocol error.""" + assert misc.is_ssl_protocol_error('unsupported protocol') is True + + def test_unexpected_eof(self): + """Detects 'unexpected eof' as protocol error.""" + assert misc.is_ssl_protocol_error('unexpected eof') is True + + def test_eof_occurred(self): + """Detects 'eof occurred' as protocol error.""" + assert misc.is_ssl_protocol_error('eof occurred') is True + + def test_alert_handshake_failure(self): + """Detects 'alert handshake failure' as protocol error.""" + assert misc.is_ssl_protocol_error('alert handshake failure') is True + + def test_http_request(self): + """Detects 'http request' as protocol error (sent HTTP to HTTPS).""" + assert misc.is_ssl_protocol_error('http request') is True + + def test_no_ciphers_available(self): + """Detects 'no ciphers available' as protocol error.""" + assert misc.is_ssl_protocol_error('no ciphers available') is True + + def test_case_insensitive(self): + """is_ssl_protocol_error() is case-insensitive.""" + assert misc.is_ssl_protocol_error('WRONG VERSION NUMBER') is True + assert misc.is_ssl_protocol_error('Wrong Version Number') is True + + def test_certificate_error_not_protocol(self): + """Certificate errors are not protocol errors.""" + assert misc.is_ssl_protocol_error('certificate verify failed') is False + + def test_hostname_mismatch_not_protocol(self): + """Hostname mismatch is not protocol error.""" + assert misc.is_ssl_protocol_error('hostname mismatch') is False + + def test_expired_cert_not_protocol(self): + """Expired certificate is not protocol error.""" + assert misc.is_ssl_protocol_error('certificate has expired') is False + + def test_embedded_in_message(self): + """Detects pattern embedded in longer message.""" + assert misc.is_ssl_protocol_error( + 'SSL error: wrong version number in record') is True + + +class TestFailureConstants: + """Tests for failure category constants.""" + + def test_constants_are_strings(self): + """Failure constants are strings.""" + assert isinstance(misc.FAIL_TIMEOUT, str) + assert isinstance(misc.FAIL_REFUSED, str) + assert isinstance(misc.FAIL_AUTH, str) + assert isinstance(misc.FAIL_UNREACHABLE, str) + assert isinstance(misc.FAIL_DNS, str) + assert isinstance(misc.FAIL_SSL, str) + assert isinstance(misc.FAIL_CLOSED, str) + assert isinstance(misc.FAIL_PROXY, str) + assert isinstance(misc.FAIL_OTHER, str) + + def test_constants_unique(self): + """Failure constants have unique values.""" + constants = [ + misc.FAIL_TIMEOUT, + misc.FAIL_REFUSED, + misc.FAIL_AUTH, + misc.FAIL_UNREACHABLE, + misc.FAIL_DNS, + misc.FAIL_SSL, + misc.FAIL_CLOSED, + misc.FAIL_PROXY, + misc.FAIL_OTHER, + ] + assert len(constants) == len(set(constants)) + + def test_ssl_errors_contains_ssl(self): + """SSL_ERRORS contains FAIL_SSL.""" + assert misc.FAIL_SSL in misc.SSL_ERRORS + + def test_conn_errors_contents(self): + """CONN_ERRORS contains connection-related failures.""" + assert misc.FAIL_TIMEOUT in misc.CONN_ERRORS + assert misc.FAIL_REFUSED in misc.CONN_ERRORS + assert misc.FAIL_UNREACHABLE in misc.CONN_ERRORS + assert misc.FAIL_CLOSED in misc.CONN_ERRORS + assert misc.FAIL_DNS in misc.CONN_ERRORS + # Auth and proxy errors are not connection errors + assert misc.FAIL_AUTH not in misc.CONN_ERRORS + assert misc.FAIL_PROXY not in misc.CONN_ERRORS + + +class TestLogLevels: + """Tests for LOG_LEVELS dictionary.""" + + def test_debug_is_lowest(self): + """debug is the lowest (most verbose) level.""" + assert misc.LOG_LEVELS['debug'] == 0 + + def test_info_is_one(self): + """info level is 1.""" + assert misc.LOG_LEVELS['info'] == 1 + + def test_warn_is_two(self): + """warn level is 2.""" + assert misc.LOG_LEVELS['warn'] == 2 + + def test_error_is_three(self): + """error level is 3.""" + assert misc.LOG_LEVELS['error'] == 3 + + def test_none_suppresses_all(self): + """none level (99) suppresses all output.""" + assert misc.LOG_LEVELS['none'] == 99 + + def test_aliases_equal_info(self): + """rate, scraper, stats, diag are aliases for info.""" + assert misc.LOG_LEVELS['rate'] == misc.LOG_LEVELS['info'] + assert misc.LOG_LEVELS['scraper'] == misc.LOG_LEVELS['info'] + assert misc.LOG_LEVELS['stats'] == misc.LOG_LEVELS['info'] + assert misc.LOG_LEVELS['diag'] == misc.LOG_LEVELS['info']