tests: add unit test infrastructure

pytest-based test suite with fixtures for database testing.
Covers misc.py utilities, dbs.py operations, and fetch.py validation.
Includes mock_network.py for future network testing.
This commit is contained in:
Username
2026-01-08 01:42:38 +01:00
parent c1ec5d593b
commit 44604f1ce3
9 changed files with 1490 additions and 0 deletions

2
tests/__init__.py Normal file
View File

@@ -0,0 +1,2 @@
# -*- coding: utf-8 -*-
"""PPF unit tests."""

129
tests/conftest.py Normal file
View File

@@ -0,0 +1,129 @@
# -*- coding: utf-8 -*-
"""Shared pytest fixtures for PPF tests."""
from __future__ import print_function
import os
import sys
import tempfile
import shutil
import pytest
# Add parent directory to path for imports
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
import mysqlite
import dbs
import misc
@pytest.fixture(autouse=True)
def mock_logging(monkeypatch):
"""Mock _log to avoid Python 2 print syntax errors in Python 3.
Must patch in both misc and dbs modules since dbs uses 'from misc import _log'.
"""
def noop_log(msg, level='info'):
pass
monkeypatch.setattr(misc, '_log', noop_log)
monkeypatch.setattr(dbs, '_log', noop_log)
@pytest.fixture
def temp_db():
"""Create a temporary SQLite database for testing.
Yields a tuple of (sqlite_wrapper, db_path).
Database and file are cleaned up after test.
"""
fd, db_path = tempfile.mkstemp(suffix='.sqlite')
os.close(fd)
sqlite = mysqlite.mysqlite(db_path)
yield sqlite, db_path
# Cleanup
sqlite.close()
if os.path.exists(db_path):
os.unlink(db_path)
@pytest.fixture
def proxy_db(temp_db):
"""Create a temporary database with proxylist table initialized.
Yields a tuple of (sqlite_wrapper, db_path).
"""
sqlite, db_path = temp_db
dbs.create_table_if_not_exists(sqlite, 'proxylist')
yield sqlite, db_path
@pytest.fixture
def uri_db(temp_db):
"""Create a temporary database with uris table initialized.
Yields a tuple of (sqlite_wrapper, db_path).
"""
sqlite, db_path = temp_db
dbs.create_table_if_not_exists(sqlite, 'uris')
yield sqlite, db_path
@pytest.fixture
def full_db(temp_db):
"""Create a temporary database with both proxylist and uris tables.
Yields a tuple of (sqlite_wrapper, db_path).
"""
sqlite, db_path = temp_db
dbs.create_table_if_not_exists(sqlite, 'proxylist')
dbs.create_table_if_not_exists(sqlite, 'uris')
yield sqlite, db_path
@pytest.fixture
def temp_dir():
"""Create a temporary directory for testing.
Yields the directory path. Cleaned up after test.
"""
dirpath = tempfile.mkdtemp()
yield dirpath
shutil.rmtree(dirpath, ignore_errors=True)
@pytest.fixture
def sample_proxies():
"""Return a list of sample proxy strings for testing."""
return [
'1.2.3.4:8080',
'5.6.7.8:3128',
'9.10.11.12:1080',
]
@pytest.fixture
def sample_private_ips():
"""Return a list of private/reserved IP proxies that should be rejected."""
return [
'10.0.0.1:8080', # Private class A
'172.16.0.1:8080', # Private class B
'192.168.1.1:8080', # Private class C
'127.0.0.1:8080', # Loopback
'169.254.1.1:8080', # Link-local
'224.0.0.1:8080', # Multicast
'100.64.0.1:8080', # CGNAT
]
@pytest.fixture
def sample_cdn_ips():
"""Return a list of CDN IP proxies that should be filtered."""
return [
'141.101.1.1:8080', # Cloudflare
'151.101.1.1:8080', # Fastly
'23.32.1.1:8080', # Akamai
'13.32.1.1:8080', # AWS CloudFront
]

31
tests/fixtures/sample_html.txt vendored Normal file
View File

@@ -0,0 +1,31 @@
<html>
<body>
<h1>Free Proxy List</h1>
<table>
<tr>
<th>IP Address</th>
<th>Port</th>
<th>Type</th>
<th>Country</th>
</tr>
<tr>
<td>1.2.3.4</td>
<td>8080</td>
<td>SOCKS5</td>
<td>US</td>
</tr>
<tr>
<td>5.6.7.8</td>
<td>3128</td>
<td>HTTP</td>
<td>DE</td>
</tr>
<tr>
<td>9.10.11.12</td>
<td>1080</td>
<td>SOCKS4</td>
<td>FR</td>
</tr>
</table>
</body>
</html>

5
tests/fixtures/sample_json.txt vendored Normal file
View File

@@ -0,0 +1,5 @@
[
{"ip": "1.2.3.4", "port": 8080, "type": "socks5"},
{"ip": "5.6.7.8", "port": 3128, "type": "http"},
{"host": "9.10.11.12", "port": 1080, "protocol": "socks4"}
]

24
tests/fixtures/sample_mixed.txt vendored Normal file
View File

@@ -0,0 +1,24 @@
Free Proxy List - Updated Daily
Plain proxies:
1.2.3.4:8080
5.6.7.8:3128
9.10.11.12:1080
With protocol hints:
socks5 11.22.33.44:1080
http: 55.66.77.88:8080
socks4 - 99.100.101.102:1080
Auth proxies:
user:pass@111.112.113.114:8080
socks5://admin:secret@115.116.117.118:1080
JSON embedded:
{"data": [{"ip": "121.122.123.124", "port": 8080}]}
Table format:
<table>
<tr><th>IP</th><th>Port</th></tr>
<tr><td>131.132.133.134</td><td>3128</td></tr>
</table>

244
tests/mock_network.py Normal file
View File

@@ -0,0 +1,244 @@
# -*- coding: utf-8 -*-
"""Network mocking utilities for PPF offline testing.
Provides mock implementations of rocksock and connection pool for testing
proxy validation logic without actual network calls.
"""
from __future__ import print_function
import sys
import os
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
# Import rocksock error codes for mock exceptions
try:
import rocksock
RS_E_TARGET_CONN_REFUSED = rocksock.RS_E_TARGET_CONN_REFUSED
RS_E_HIT_TIMEOUT = rocksock.RS_E_HIT_TIMEOUT
RS_E_PROXY_AUTH_FAILED = rocksock.RS_E_PROXY_AUTH_FAILED
RS_E_REMOTE_DISCONNECTED = rocksock.RS_E_REMOTE_DISCONNECTED
RS_E_PROXY_UNEXPECTED_RESPONSE = rocksock.RS_E_PROXY_UNEXPECTED_RESPONSE
RS_ET_OWN = rocksock.RS_ET_OWN
RS_ET_GAI = rocksock.RS_ET_GAI
RS_ET_SSL = rocksock.RS_ET_SSL
except ImportError:
# Define error codes if rocksock not available
RS_E_TARGET_CONN_REFUSED = 20
RS_E_HIT_TIMEOUT = 5
RS_E_PROXY_AUTH_FAILED = 7
RS_E_REMOTE_DISCONNECTED = 24
RS_E_PROXY_UNEXPECTED_RESPONSE = 11
RS_ET_OWN = 1
RS_ET_GAI = 2
RS_ET_SSL = 3
class MockRocksockException(Exception):
"""Mock RocksockException for testing error categorization."""
def __init__(self, error=0, errortype=RS_ET_OWN, message=''):
self._error = error
self._errortype = errortype
self._message = message
super(MockRocksockException, self).__init__(message)
def get_error(self):
return self._error
def get_errortype(self):
return self._errortype
def get_errormessage(self):
return self._message
# Configurable test scenarios
# Maps (proxy_ip, proxy_port) -> test behavior
MOCK_SCENARIOS = {}
# Default response for unknown proxies
DEFAULT_RESPONSE = b'192.0.2.1' # TEST-NET-1 IP for testing
class MockRocksock:
"""Mock rocksock.Rocksock for offline testing.
Simulates network connections based on configured scenarios.
Can be configured to succeed, fail, or return specific responses.
"""
def __init__(self, host=None, port=None, proxies=None, ssl=False,
timeout=None, **kwargs):
self.host = host
self.port = port
self.proxies = proxies or []
self.ssl = ssl
self.timeout = timeout
self.connected = False
self._response = DEFAULT_RESPONSE
self._scenario = None
# Determine scenario based on target proxy (second in chain)
if len(self.proxies) >= 2:
target_proxy = self.proxies[1]
if hasattr(target_proxy, 'hostinfo'):
key = (target_proxy.hostinfo.host, target_proxy.hostinfo.port)
self._scenario = MOCK_SCENARIOS.get(key)
def connect(self):
"""Simulate connection based on scenario."""
if self._scenario:
if self._scenario.get('fail'):
error = self._scenario.get('error', RS_E_TARGET_CONN_REFUSED)
errortype = self._scenario.get('errortype', RS_ET_OWN)
raise MockRocksockException(error, errortype, 'Mock connection failed')
if self._scenario.get('response'):
self._response = self._scenario['response']
self.connected = True
return True
def send(self, data):
"""Simulate sending data."""
if not self.connected:
raise MockRocksockException(RS_E_REMOTE_DISCONNECTED, RS_ET_OWN, 'Not connected')
return len(data)
def recv(self, n=-1):
"""Return configured response."""
if not self.connected:
raise MockRocksockException(RS_E_REMOTE_DISCONNECTED, RS_ET_OWN, 'Not connected')
return self._response
def recvline(self):
"""Return response as line."""
return self.recv() + b'\n'
def disconnect(self):
"""Close mock connection."""
self.connected = False
class MockTorHostState:
"""Mock TorHostState for testing."""
def __init__(self, host='127.0.0.1:9050'):
self.host = host
self.success_count = 0
self.failure_count = 0
self.latency_sum = 0.0
self.available = True
self.backoff_until = 0
def record_success(self, latency=0):
self.success_count += 1
self.latency_sum += latency
def record_failure(self):
self.failure_count += 1
def is_available(self):
return self.available
def get_latency(self):
if self.success_count == 0:
return 0
return self.latency_sum / self.success_count
class MockTorConnectionPool:
"""Mock TorConnectionPool for testing.
Provides a simplified pool that returns configured Tor hosts
without making actual network connections.
"""
def __init__(self, tor_hosts=None, warmup=True):
self.tor_hosts = tor_hosts or ['127.0.0.1:9050']
self.warmup_complete = True
self.host_states = {h: MockTorHostState(h) for h in self.tor_hosts}
self.successes = []
self.failures = []
def get_tor_host(self, worker_id=None):
"""Return first available Tor host."""
for host in self.tor_hosts:
state = self.host_states.get(host)
if state and state.is_available():
return host
return None
def record_success(self, host, latency=0):
"""Record successful connection."""
self.successes.append((host, latency))
if host in self.host_states:
self.host_states[host].record_success(latency)
def record_failure(self, host):
"""Record failed connection."""
self.failures.append(host)
if host in self.host_states:
self.host_states[host].record_failure()
def get_stats(self):
"""Return pool statistics."""
return {
'available_hosts': sum(1 for h in self.host_states.values() if h.is_available()),
'total_hosts': len(self.tor_hosts),
'total_successes': sum(h.success_count for h in self.host_states.values()),
'total_failures': sum(h.failure_count for h in self.host_states.values()),
}
def configure_scenario(proxy_ip, proxy_port, **kwargs):
"""Configure test scenario for a specific proxy.
Args:
proxy_ip: Proxy IP address
proxy_port: Proxy port number
**kwargs: Scenario configuration:
- fail: If True, connection will fail
- error: Error code to raise (default: RS_E_TARGET_CONN_REFUSED)
- errortype: Error type (default: RS_ET_OWN)
- response: Bytes to return from recv()
Example:
# Configure proxy to fail with timeout
configure_scenario('1.2.3.4', 8080, fail=True, error=RS_E_HIT_TIMEOUT)
# Configure proxy to succeed with specific exit IP
configure_scenario('5.6.7.8', 3128, response=b'203.0.113.50')
"""
MOCK_SCENARIOS[(proxy_ip, proxy_port)] = kwargs
def clear_scenarios():
"""Clear all configured test scenarios."""
MOCK_SCENARIOS.clear()
def reset_mock_state():
"""Reset all mock state for clean test runs."""
clear_scenarios()
# Convenience functions for common scenarios
def scenario_timeout(proxy_ip, proxy_port):
"""Configure proxy to fail with timeout."""
configure_scenario(proxy_ip, proxy_port, fail=True, error=RS_E_HIT_TIMEOUT)
def scenario_refused(proxy_ip, proxy_port):
"""Configure proxy to fail with connection refused."""
configure_scenario(proxy_ip, proxy_port, fail=True, error=RS_E_TARGET_CONN_REFUSED)
def scenario_auth_fail(proxy_ip, proxy_port):
"""Configure proxy to fail with authentication error."""
configure_scenario(proxy_ip, proxy_port, fail=True, error=RS_E_PROXY_AUTH_FAILED)
def scenario_success(proxy_ip, proxy_port, exit_ip='192.0.2.1'):
"""Configure proxy to succeed with specific exit IP."""
configure_scenario(proxy_ip, proxy_port, response=exit_ip.encode('utf-8'))

427
tests/test_dbs.py Normal file
View File

@@ -0,0 +1,427 @@
# -*- coding: utf-8 -*-
"""Tests for dbs.py database operations."""
from __future__ import print_function
import sys
import os
import pytest
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
import dbs
class TestIsCdnIp:
"""Tests for is_cdn_ip() function."""
def test_cloudflare_ips(self):
"""Cloudflare IPs are detected as CDN."""
assert dbs.is_cdn_ip('141.101.1.1') is True
assert dbs.is_cdn_ip('141.101.255.255') is True
assert dbs.is_cdn_ip('104.16.1.1') is True
assert dbs.is_cdn_ip('172.64.1.1') is True
def test_fastly_ips(self):
"""Fastly IPs are detected as CDN."""
assert dbs.is_cdn_ip('151.101.1.1') is True
assert dbs.is_cdn_ip('151.101.128.1') is True
def test_akamai_ips(self):
"""Akamai IPs are detected as CDN."""
assert dbs.is_cdn_ip('23.32.1.1') is True
assert dbs.is_cdn_ip('23.64.1.1') is True
def test_cloudfront_ips(self):
"""Amazon CloudFront IPs are detected as CDN."""
assert dbs.is_cdn_ip('13.32.1.1') is True
assert dbs.is_cdn_ip('13.224.1.1') is True
def test_google_ips(self):
"""Google IPs are detected as CDN."""
assert dbs.is_cdn_ip('34.64.1.1') is True
assert dbs.is_cdn_ip('34.71.1.1') is True
def test_regular_ips_not_cdn(self):
"""Regular public IPs are not CDN."""
assert dbs.is_cdn_ip('1.2.3.4') is False
assert dbs.is_cdn_ip('8.8.8.8') is False
assert dbs.is_cdn_ip('203.0.113.50') is False
def test_edge_case_prefix_mismatch(self):
"""Similar but non-CDN prefixes are not detected."""
assert dbs.is_cdn_ip('141.100.1.1') is False # Not 141.101.
assert dbs.is_cdn_ip('104.15.1.1') is False # Not 104.16.
class TestComputeProxyListHash:
"""Tests for compute_proxy_list_hash() function."""
def test_empty_list_returns_none(self):
"""Empty list returns None."""
assert dbs.compute_proxy_list_hash([]) is None
assert dbs.compute_proxy_list_hash(None) is None
def test_single_proxy_hash(self):
"""Single proxy produces consistent hash."""
hash1 = dbs.compute_proxy_list_hash(['1.2.3.4:8080'])
hash2 = dbs.compute_proxy_list_hash(['1.2.3.4:8080'])
assert hash1 == hash2
assert len(hash1) == 32 # MD5 hex length
def test_order_independent(self):
"""Hash is order-independent (sorted internally)."""
hash1 = dbs.compute_proxy_list_hash(['1.2.3.4:8080', '5.6.7.8:3128'])
hash2 = dbs.compute_proxy_list_hash(['5.6.7.8:3128', '1.2.3.4:8080'])
assert hash1 == hash2
def test_different_lists_different_hash(self):
"""Different proxy lists produce different hashes."""
hash1 = dbs.compute_proxy_list_hash(['1.2.3.4:8080'])
hash2 = dbs.compute_proxy_list_hash(['5.6.7.8:3128'])
assert hash1 != hash2
def test_tuple_format(self):
"""Handles tuple format (address, proto)."""
hash1 = dbs.compute_proxy_list_hash([('1.2.3.4:8080', 'socks5')])
hash2 = dbs.compute_proxy_list_hash(['1.2.3.4:8080'])
# Should extract address from tuple
assert hash1 == hash2
class TestCreateTableIfNotExists:
"""Tests for create_table_if_not_exists() function."""
def test_create_proxylist_table(self, temp_db):
"""Creates proxylist table with correct schema."""
sqlite, _ = temp_db
dbs.create_table_if_not_exists(sqlite, 'proxylist')
# Verify table exists by querying it
result = sqlite.execute('SELECT COUNT(*) FROM proxylist').fetchone()
assert result[0] == 0
def test_create_uris_table(self, temp_db):
"""Creates uris table with correct schema."""
sqlite, _ = temp_db
dbs.create_table_if_not_exists(sqlite, 'uris')
# Verify table exists
result = sqlite.execute('SELECT COUNT(*) FROM uris').fetchone()
assert result[0] == 0
def test_idempotent_creation(self, temp_db):
"""Calling twice doesn't cause error."""
sqlite, _ = temp_db
dbs.create_table_if_not_exists(sqlite, 'proxylist')
dbs.create_table_if_not_exists(sqlite, 'proxylist')
# No exception means success
def test_proxylist_has_required_columns(self, proxy_db):
"""Proxylist table has all required columns."""
sqlite, _ = proxy_db
# Insert a row to test columns
sqlite.execute(
'INSERT INTO proxylist (added, proxy, ip, port, failed) VALUES (?, ?, ?, ?, ?)',
(1234567890, '1.2.3.4:8080', '1.2.3.4', '8080', 0)
)
sqlite.commit()
# Verify we can query various columns
row = sqlite.execute(
'SELECT proxy, ip, port, proto, failed, tested, avg_latency, anonymity '
'FROM proxylist WHERE proxy = ?',
('1.2.3.4:8080',)
).fetchone()
assert row is not None
assert row[0] == '1.2.3.4:8080'
class TestInsertProxies:
"""Tests for insert_proxies() function."""
def test_insert_plain_strings(self, proxy_db):
"""Insert plain proxy strings."""
sqlite, _ = proxy_db
proxies = ['1.2.3.4:8080', '5.6.7.8:3128']
dbs.insert_proxies(sqlite, proxies, 'http://test.com')
count = sqlite.execute('SELECT COUNT(*) FROM proxylist').fetchone()[0]
assert count == 2
def test_insert_tuples_with_proto(self, proxy_db):
"""Insert tuples with protocol."""
sqlite, _ = proxy_db
proxies = [('1.2.3.4:8080', 'socks5'), ('5.6.7.8:3128', 'http')]
dbs.insert_proxies(sqlite, proxies, 'http://test.com')
row = sqlite.execute(
'SELECT proto FROM proxylist WHERE proxy = ?',
('1.2.3.4:8080',)
).fetchone()
assert row[0] == 'socks5'
def test_insert_tuples_with_confidence(self, proxy_db):
"""Insert tuples with confidence score."""
sqlite, _ = proxy_db
proxies = [('1.2.3.4:8080', 'socks5', 85)]
dbs.insert_proxies(sqlite, proxies, 'http://test.com')
row = sqlite.execute(
'SELECT confidence FROM proxylist WHERE proxy = ?',
('1.2.3.4:8080',)
).fetchone()
assert row[0] == 85
def test_filters_cdn_ips(self, proxy_db):
"""CDN IPs are filtered out."""
sqlite, _ = proxy_db
proxies = [
'1.2.3.4:8080', # Regular - should be inserted
'141.101.1.1:8080', # Cloudflare CDN - should be filtered
]
dbs.insert_proxies(sqlite, proxies, 'http://test.com')
count = sqlite.execute('SELECT COUNT(*) FROM proxylist').fetchone()[0]
assert count == 1 # Only non-CDN proxy inserted
def test_empty_list_no_error(self, proxy_db):
"""Empty list doesn't cause error."""
sqlite, _ = proxy_db
dbs.insert_proxies(sqlite, [], 'http://test.com')
# No exception means success
def test_duplicate_ignored(self, proxy_db):
"""Duplicate proxies are ignored (INSERT OR IGNORE)."""
sqlite, _ = proxy_db
dbs.insert_proxies(sqlite, ['1.2.3.4:8080'], 'http://test1.com')
dbs.insert_proxies(sqlite, ['1.2.3.4:8080'], 'http://test2.com')
count = sqlite.execute('SELECT COUNT(*) FROM proxylist').fetchone()[0]
assert count == 1
class TestInsertUrls:
"""Tests for insert_urls() function."""
def test_insert_new_urls(self, uri_db):
"""Insert new URLs returns count of inserted."""
sqlite, _ = uri_db
urls = ['http://example.com/1', 'http://example.com/2']
count = dbs.insert_urls(urls, 'test query', sqlite)
assert count == 2
def test_duplicate_urls_not_counted(self, uri_db):
"""Duplicate URLs not counted in return value."""
sqlite, _ = uri_db
urls = ['http://example.com/1']
count1 = dbs.insert_urls(urls, 'test query', sqlite)
count2 = dbs.insert_urls(urls, 'test query', sqlite)
assert count1 == 1
assert count2 == 0
def test_mixed_new_and_duplicate(self, uri_db):
"""Mixed new and duplicate URLs counted correctly."""
sqlite, _ = uri_db
dbs.insert_urls(['http://example.com/1'], 'test', sqlite)
count = dbs.insert_urls(
['http://example.com/1', 'http://example.com/2', 'http://example.com/3'],
'test', sqlite
)
assert count == 2 # Only 2 new URLs
def test_empty_list_returns_zero(self, uri_db):
"""Empty list returns 0."""
sqlite, _ = uri_db
count = dbs.insert_urls([], 'test', sqlite)
assert count == 0
class TestUpdateProxyLatency:
"""Tests for update_proxy_latency() function."""
def test_first_latency_sample(self, proxy_db):
"""First latency sample sets avg_latency directly."""
sqlite, _ = proxy_db
dbs.insert_proxies(sqlite, ['1.2.3.4:8080'], 'http://test.com')
dbs.update_proxy_latency(sqlite, '1.2.3.4:8080', 100.0)
sqlite.commit()
row = sqlite.execute(
'SELECT avg_latency, latency_samples FROM proxylist WHERE proxy = ?',
('1.2.3.4:8080',)
).fetchone()
assert row[0] == 100.0
assert row[1] == 1
def test_ema_calculation(self, proxy_db):
"""Exponential moving average is calculated correctly."""
sqlite, _ = proxy_db
dbs.insert_proxies(sqlite, ['1.2.3.4:8080'], 'http://test.com')
# First sample: 100ms
dbs.update_proxy_latency(sqlite, '1.2.3.4:8080', 100.0)
sqlite.commit()
# Second sample: 50ms
# EMA: alpha = 2/(2+1) = 0.667, new_avg = 0.667*50 + 0.333*100 = 66.67
dbs.update_proxy_latency(sqlite, '1.2.3.4:8080', 50.0)
sqlite.commit()
row = sqlite.execute(
'SELECT avg_latency, latency_samples FROM proxylist WHERE proxy = ?',
('1.2.3.4:8080',)
).fetchone()
assert row[1] == 2
# Check EMA is roughly correct (allow for floating point)
assert 65 < row[0] < 68
def test_nonexistent_proxy_no_error(self, proxy_db):
"""Updating nonexistent proxy doesn't cause error."""
sqlite, _ = proxy_db
dbs.update_proxy_latency(sqlite, 'nonexistent:8080', 100.0)
# No exception means success
class TestBatchUpdateProxyLatency:
"""Tests for batch_update_proxy_latency() function."""
def test_batch_update_multiple(self, proxy_db):
"""Batch update updates multiple proxies."""
sqlite, _ = proxy_db
dbs.insert_proxies(sqlite, ['1.2.3.4:8080', '5.6.7.8:3128'], 'http://test.com')
updates = [('1.2.3.4:8080', 100.0), ('5.6.7.8:3128', 200.0)]
dbs.batch_update_proxy_latency(sqlite, updates)
sqlite.commit()
row1 = sqlite.execute(
'SELECT avg_latency FROM proxylist WHERE proxy = ?',
('1.2.3.4:8080',)
).fetchone()
row2 = sqlite.execute(
'SELECT avg_latency FROM proxylist WHERE proxy = ?',
('5.6.7.8:3128',)
).fetchone()
assert row1[0] == 100.0
assert row2[0] == 200.0
def test_empty_list_no_error(self, proxy_db):
"""Empty update list doesn't cause error."""
sqlite, _ = proxy_db
dbs.batch_update_proxy_latency(sqlite, [])
# No exception means success
class TestUpdateProxyAnonymity:
"""Tests for update_proxy_anonymity() function."""
def test_transparent_proxy(self, proxy_db):
"""Transparent proxy detected when exit_ip equals proxy_ip."""
sqlite, _ = proxy_db
dbs.insert_proxies(sqlite, ['1.2.3.4:8080'], 'http://test.com')
dbs.update_proxy_anonymity(sqlite, '1.2.3.4:8080', '1.2.3.4', '1.2.3.4')
sqlite.commit()
row = sqlite.execute(
'SELECT anonymity, exit_ip FROM proxylist WHERE proxy = ?',
('1.2.3.4:8080',)
).fetchone()
assert row[0] == 'transparent'
assert row[1] == '1.2.3.4'
def test_elite_proxy(self, proxy_db):
"""Elite proxy detected when exit_ip differs and no revealing headers."""
sqlite, _ = proxy_db
dbs.insert_proxies(sqlite, ['1.2.3.4:8080'], 'http://test.com')
dbs.update_proxy_anonymity(sqlite, '1.2.3.4:8080', '5.6.7.8', '1.2.3.4',
reveals_headers=False)
sqlite.commit()
row = sqlite.execute(
'SELECT anonymity FROM proxylist WHERE proxy = ?',
('1.2.3.4:8080',)
).fetchone()
assert row[0] == 'elite'
def test_anonymous_proxy(self, proxy_db):
"""Anonymous proxy detected when exit_ip differs but reveals headers."""
sqlite, _ = proxy_db
dbs.insert_proxies(sqlite, ['1.2.3.4:8080'], 'http://test.com')
dbs.update_proxy_anonymity(sqlite, '1.2.3.4:8080', '5.6.7.8', '1.2.3.4',
reveals_headers=True)
sqlite.commit()
row = sqlite.execute(
'SELECT anonymity FROM proxylist WHERE proxy = ?',
('1.2.3.4:8080',)
).fetchone()
assert row[0] == 'anonymous'
def test_normalizes_leading_zeros(self, proxy_db):
"""IP addresses with leading zeros are normalized."""
sqlite, _ = proxy_db
dbs.insert_proxies(sqlite, ['1.2.3.4:8080'], 'http://test.com')
# Same IP with leading zeros should be detected as transparent
dbs.update_proxy_anonymity(sqlite, '1.2.3.4:8080', '001.002.003.004', '1.2.3.4')
sqlite.commit()
row = sqlite.execute(
'SELECT anonymity FROM proxylist WHERE proxy = ?',
('1.2.3.4:8080',)
).fetchone()
assert row[0] == 'transparent'
class TestGetDatabaseStats:
"""Tests for get_database_stats() function."""
def test_empty_database_stats(self, full_db):
"""Empty database returns zero counts."""
sqlite, _ = full_db
stats = dbs.get_database_stats(sqlite)
assert stats['proxy_count'] == 0
assert stats['working_count'] == 0
assert 'page_count' in stats
assert 'total_size' in stats
def test_stats_after_inserts(self, full_db):
"""Stats reflect inserted proxies."""
sqlite, _ = full_db
dbs.insert_proxies(sqlite, ['1.2.3.4:8080', '5.6.7.8:3128'], 'http://test.com')
stats = dbs.get_database_stats(sqlite)
assert stats['proxy_count'] == 2
class TestAnalyzeVacuum:
"""Tests for analyze_database() and vacuum_database() functions."""
def test_analyze_no_error(self, proxy_db):
"""analyze_database() runs without error."""
sqlite, _ = proxy_db
dbs.analyze_database(sqlite)
# No exception means success
def test_vacuum_no_error(self, proxy_db):
"""vacuum_database() runs without error."""
sqlite, _ = proxy_db
dbs.vacuum_database(sqlite)
# No exception means success
def test_analyze_vacuum_sequence(self, proxy_db):
"""Running analyze then vacuum works."""
sqlite, _ = proxy_db
dbs.insert_proxies(sqlite, ['1.2.3.4:8080'], 'http://test.com')
dbs.analyze_database(sqlite)
dbs.vacuum_database(sqlite)
# Database still valid
count = sqlite.execute('SELECT COUNT(*) FROM proxylist').fetchone()[0]
assert count == 1

380
tests/test_fetch.py Normal file
View File

@@ -0,0 +1,380 @@
# -*- coding: utf-8 -*-
"""Tests for fetch.py proxy validation and extraction functions."""
from __future__ import print_function
import sys
import os
import pytest
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
# fetch.py has Python 2 dependencies - skip tests if import fails
try:
import fetch
FETCH_AVAILABLE = True
except ImportError as e:
FETCH_AVAILABLE = False
fetch = None
pytestmark = pytest.mark.skipif(not FETCH_AVAILABLE, reason="fetch module requires Python 2")
class TestValidPort:
"""Tests for valid_port() function."""
def test_port_zero_invalid(self):
"""Port 0 is invalid."""
assert fetch.valid_port(0) is False
def test_port_one_valid(self):
"""Port 1 is valid."""
assert fetch.valid_port(1) is True
def test_port_max_valid(self):
"""Port 65535 is valid."""
assert fetch.valid_port(65535) is True
def test_port_over_max_invalid(self):
"""Port 65536 is invalid."""
assert fetch.valid_port(65536) is False
def test_negative_port_invalid(self):
"""Negative port is invalid."""
assert fetch.valid_port(-1) is False
def test_common_ports_valid(self):
"""Common proxy ports are valid."""
assert fetch.valid_port(80) is True
assert fetch.valid_port(443) is True
assert fetch.valid_port(1080) is True
assert fetch.valid_port(3128) is True
assert fetch.valid_port(8080) is True
assert fetch.valid_port(9050) is True
class TestIsValidIpv6:
"""Tests for is_valid_ipv6() function."""
def test_valid_global_unicast(self):
"""Valid global unicast addresses pass."""
assert fetch.is_valid_ipv6('2001:db8::1') is True
assert fetch.is_valid_ipv6('2001:0db8:0000:0000:0000:ff00:0042:8329') is True
def test_valid_full_address(self):
"""Full 8-segment address is valid."""
assert fetch.is_valid_ipv6('2001:0db8:85a3:0000:0000:8a2e:0370:7334') is True
def test_reject_loopback(self):
"""Loopback ::1 is rejected."""
assert fetch.is_valid_ipv6('::1') is False
assert fetch.is_valid_ipv6('0:0:0:0:0:0:0:1') is False
def test_reject_unspecified(self):
"""Unspecified :: is rejected."""
assert fetch.is_valid_ipv6('::') is False
assert fetch.is_valid_ipv6('0:0:0:0:0:0:0:0') is False
def test_reject_link_local(self):
"""Link-local fe80::/10 is rejected."""
assert fetch.is_valid_ipv6('fe80::1') is False
assert fetch.is_valid_ipv6('fe90::1') is False
assert fetch.is_valid_ipv6('fea0::1') is False
assert fetch.is_valid_ipv6('feb0::1') is False
def test_reject_unique_local(self):
"""Unique local fc00::/7 is rejected."""
assert fetch.is_valid_ipv6('fc00::1') is False
assert fetch.is_valid_ipv6('fd00::1') is False
assert fetch.is_valid_ipv6('fdff::1') is False
def test_reject_multicast(self):
"""Multicast ff00::/8 is rejected."""
assert fetch.is_valid_ipv6('ff00::1') is False
assert fetch.is_valid_ipv6('ff02::1') is False
def test_reject_invalid_format(self):
"""Malformed addresses are rejected."""
assert fetch.is_valid_ipv6('gggg::1') is False
assert fetch.is_valid_ipv6('not-an-ipv6') is False
assert fetch.is_valid_ipv6('') is False
def test_reject_multiple_double_colon(self):
"""Multiple :: in address is invalid."""
assert fetch.is_valid_ipv6('2001::db8::1') is False
class TestIsUsableProxy:
"""Tests for is_usable_proxy() function."""
def test_valid_public_ipv4(self):
"""Valid public IPv4 proxies pass."""
assert fetch.is_usable_proxy('1.2.3.4:8080') is True
assert fetch.is_usable_proxy('8.8.8.8:3128') is True
assert fetch.is_usable_proxy('203.0.113.50:1080') is True
def test_reject_private_class_a(self):
"""Private 10.0.0.0/8 is rejected."""
assert fetch.is_usable_proxy('10.0.0.1:8080') is False
assert fetch.is_usable_proxy('10.255.255.255:8080') is False
def test_reject_private_class_b(self):
"""Private 172.16.0.0/12 is rejected."""
assert fetch.is_usable_proxy('172.16.0.1:8080') is False
assert fetch.is_usable_proxy('172.31.255.255:8080') is False
# 172.15.x.x and 172.32.x.x should be valid
assert fetch.is_usable_proxy('172.15.0.1:8080') is True
assert fetch.is_usable_proxy('172.32.0.1:8080') is True
def test_reject_private_class_c(self):
"""Private 192.168.0.0/16 is rejected."""
assert fetch.is_usable_proxy('192.168.1.1:8080') is False
assert fetch.is_usable_proxy('192.168.0.1:8080') is False
def test_reject_loopback(self):
"""Loopback 127.0.0.0/8 is rejected."""
assert fetch.is_usable_proxy('127.0.0.1:8080') is False
assert fetch.is_usable_proxy('127.255.255.255:8080') is False
def test_reject_link_local(self):
"""Link-local 169.254.0.0/16 is rejected."""
assert fetch.is_usable_proxy('169.254.1.1:8080') is False
def test_reject_cgnat(self):
"""CGNAT 100.64.0.0/10 is rejected."""
assert fetch.is_usable_proxy('100.64.0.1:8080') is False
assert fetch.is_usable_proxy('100.127.255.255:8080') is False
# 100.63.x.x and 100.128.x.x should be valid
assert fetch.is_usable_proxy('100.63.0.1:8080') is True
assert fetch.is_usable_proxy('100.128.0.1:8080') is True
def test_reject_multicast(self):
"""Multicast 224.0.0.0/4 is rejected."""
assert fetch.is_usable_proxy('224.0.0.1:8080') is False
assert fetch.is_usable_proxy('239.255.255.255:8080') is False
def test_reject_reserved(self):
"""Reserved 240.0.0.0/4 is rejected."""
assert fetch.is_usable_proxy('240.0.0.1:8080') is False
assert fetch.is_usable_proxy('255.255.255.255:8080') is False
def test_reject_zero_first_octet(self):
"""0.0.0.0/8 is rejected."""
assert fetch.is_usable_proxy('0.0.0.0:8080') is False
assert fetch.is_usable_proxy('0.1.2.3:8080') is False
def test_reject_invalid_port_zero(self):
"""Port 0 is rejected."""
assert fetch.is_usable_proxy('1.2.3.4:0') is False
def test_reject_invalid_port_high(self):
"""Port > 65535 is rejected."""
assert fetch.is_usable_proxy('1.2.3.4:65536') is False
assert fetch.is_usable_proxy('1.2.3.4:99999') is False
def test_reject_malformed_ip(self):
"""Malformed IP addresses are rejected."""
assert fetch.is_usable_proxy('1.2.3:8080') is False
assert fetch.is_usable_proxy('1.2.3.4.5:8080') is False
assert fetch.is_usable_proxy('not-an-ip:8080') is False
assert fetch.is_usable_proxy('1.2.3.256:8080') is False
def test_reject_no_colon(self):
"""String without colon is rejected."""
assert fetch.is_usable_proxy('1.2.3.4') is False
def test_auth_format_valid(self):
"""Authenticated proxy format is valid."""
assert fetch.is_usable_proxy('user:pass@1.2.3.4:8080') is True
def test_auth_private_ip_rejected(self):
"""Auth format with private IP is rejected."""
assert fetch.is_usable_proxy('user:pass@192.168.1.1:8080') is False
def test_ipv6_valid(self):
"""Valid IPv6 proxy is accepted."""
assert fetch.is_usable_proxy('[2001:db8::1]:8080') is True
def test_ipv6_loopback_rejected(self):
"""IPv6 loopback is rejected."""
assert fetch.is_usable_proxy('[::1]:8080') is False
def test_ipv6_malformed_rejected(self):
"""Malformed IPv6 is rejected."""
assert fetch.is_usable_proxy('[not-ipv6]:8080') is False
class TestNormalizeProto:
"""Tests for _normalize_proto() function."""
def test_none_returns_none(self):
"""None input returns None."""
assert fetch._normalize_proto(None) is None
def test_empty_returns_none(self):
"""Empty string returns None."""
assert fetch._normalize_proto('') is None
def test_socks5_variants(self):
"""SOCKS5 variants normalize to 'socks5'."""
assert fetch._normalize_proto('socks5') == 'socks5'
assert fetch._normalize_proto('SOCKS5') == 'socks5'
assert fetch._normalize_proto('s5') == 'socks5'
assert fetch._normalize_proto('tor') == 'socks5'
def test_socks4_variants(self):
"""SOCKS4 variants normalize to 'socks4'."""
assert fetch._normalize_proto('socks4') == 'socks4'
assert fetch._normalize_proto('SOCKS4') == 'socks4'
assert fetch._normalize_proto('socks4a') == 'socks4'
assert fetch._normalize_proto('s4') == 'socks4'
def test_http_variants(self):
"""HTTP variants normalize to 'http'."""
assert fetch._normalize_proto('http') == 'http'
assert fetch._normalize_proto('HTTP') == 'http'
assert fetch._normalize_proto('https') == 'http'
assert fetch._normalize_proto('connect') == 'http'
assert fetch._normalize_proto('ssl') == 'http'
def test_unknown_returns_none(self):
"""Unknown protocol returns None."""
assert fetch._normalize_proto('ftp') is None
assert fetch._normalize_proto('unknown') is None
class TestDetectProtoFromPath:
"""Tests for detect_proto_from_path() function."""
def test_socks5_in_path(self):
"""Detect socks5 from URL path."""
assert fetch.detect_proto_from_path('/socks5/') == 'socks5'
assert fetch.detect_proto_from_path('/proxy/socks5.txt') == 'socks5'
assert fetch.detect_proto_from_path('socks5-proxies.txt') == 'socks5'
def test_socks4_in_path(self):
"""Detect socks4 from URL path."""
assert fetch.detect_proto_from_path('/socks4/') == 'socks4'
assert fetch.detect_proto_from_path('/socks4a/') == 'socks4'
assert fetch.detect_proto_from_path('socks4.txt') == 'socks4'
def test_http_in_path(self):
"""Detect http from URL path."""
assert fetch.detect_proto_from_path('/http/') == 'http'
assert fetch.detect_proto_from_path('http-proxies.txt') == 'http'
assert fetch.detect_proto_from_path('http_list.txt') == 'http'
assert fetch.detect_proto_from_path('http.txt') == 'http'
def test_https_ssl_as_http(self):
"""HTTPS/SSL paths return 'http' (CONNECT proxies)."""
assert fetch.detect_proto_from_path('/https/') == 'http'
assert fetch.detect_proto_from_path('/ssl/') == 'http'
assert fetch.detect_proto_from_path('/connect/') == 'http'
def test_no_proto_returns_none(self):
"""No protocol indicator returns None."""
assert fetch.detect_proto_from_path('/proxies/') is None
assert fetch.detect_proto_from_path('/data/list.txt') is None
assert fetch.detect_proto_from_path('') is None
def test_case_insensitive(self):
"""Detection is case-insensitive."""
assert fetch.detect_proto_from_path('/SOCKS5/') == 'socks5'
assert fetch.detect_proto_from_path('/HTTP/') == 'http'
class TestCleanhtml:
"""Tests for cleanhtml() function."""
def test_strips_tags(self):
"""HTML tags are replaced with colons."""
result = fetch.cleanhtml('<b>1.2.3.4</b>:<i>8080</i>')
assert '1.2.3.4' in result
assert '8080' in result
assert '<b>' not in result
assert '</b>' not in result
def test_replaces_nbsp(self):
"""&nbsp; is replaced with space."""
result = fetch.cleanhtml('1.2.3.4&nbsp;8080')
assert '&nbsp;' not in result
def test_collapses_whitespace(self):
"""Multiple whitespace becomes single colon."""
result = fetch.cleanhtml('1.2.3.4 8080')
# Whitespace collapsed to colon
assert ' ' not in result
class TestExtractAuthProxies:
"""Tests for extract_auth_proxies() function."""
def test_basic_auth_format(self):
"""Extract basic user:pass@ip:port format."""
content = 'some text user:pass@1.2.3.4:8080 more text'
result = fetch.extract_auth_proxies(content)
assert len(result) == 1
assert result[0][0] == 'user:pass@1.2.3.4:8080'
assert result[0][1] is None
def test_with_protocol_prefix(self):
"""Extract with protocol prefix."""
content = 'socks5://user:pass@1.2.3.4:8080'
result = fetch.extract_auth_proxies(content)
assert len(result) == 1
assert result[0][0] == 'user:pass@1.2.3.4:8080'
assert result[0][1] == 'socks5'
def test_http_protocol(self):
"""Extract HTTP auth proxy."""
content = 'http://alice:secret@5.6.7.8:3128'
result = fetch.extract_auth_proxies(content)
assert len(result) == 1
assert result[0][1] == 'http'
def test_multiple_proxies(self):
"""Extract multiple auth proxies."""
content = '''
user1:pass1@1.2.3.4:8080
socks5://user2:pass2@5.6.7.8:1080
'''
result = fetch.extract_auth_proxies(content)
assert len(result) == 2
def test_normalizes_ip(self):
"""Leading zeros in IP are normalized."""
content = 'user:pass@001.002.003.004:8080'
result = fetch.extract_auth_proxies(content)
assert len(result) == 1
# IP normalized to remove leading zeros
assert '001' not in result[0][0]
assert '1.2.3.4' in result[0][0]
def test_empty_content(self):
"""Empty content returns empty list."""
assert fetch.extract_auth_proxies('') == []
def test_no_match(self):
"""Content without auth proxies returns empty list."""
assert fetch.extract_auth_proxies('just some text') == []
class TestConfidenceScoring:
"""Tests for confidence score constants."""
def test_auth_highest_confidence(self):
"""Auth proxies have highest confidence."""
assert fetch.CONFIDENCE_AUTH > fetch.CONFIDENCE_JSON
assert fetch.CONFIDENCE_AUTH > fetch.CONFIDENCE_TABLE
def test_json_above_table(self):
"""JSON has higher confidence than table."""
assert fetch.CONFIDENCE_JSON > fetch.CONFIDENCE_TABLE
def test_table_above_hint(self):
"""Table has higher confidence than hint."""
assert fetch.CONFIDENCE_TABLE > fetch.CONFIDENCE_HINT
def test_hint_above_regex(self):
"""Hint has higher confidence than regex."""
assert fetch.CONFIDENCE_HINT > fetch.CONFIDENCE_REGEX

248
tests/test_misc.py Normal file
View File

@@ -0,0 +1,248 @@
# -*- coding: utf-8 -*-
"""Tests for misc.py utility functions."""
from __future__ import print_function
import re
import sys
import os
import pytest
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
import misc
class TestTimestamp:
"""Tests for timestamp() function."""
def test_timestamp_format(self):
"""timestamp() returns HH:MM:SS format."""
ts = misc.timestamp()
assert re.match(r'^\d{2}:\d{2}:\d{2}$', ts)
def test_timestamp_valid_hours(self):
"""timestamp() hours are 00-23."""
ts = misc.timestamp()
hours = int(ts.split(':')[0])
assert 0 <= hours <= 23
def test_timestamp_valid_minutes(self):
"""timestamp() minutes are 00-59."""
ts = misc.timestamp()
minutes = int(ts.split(':')[1])
assert 0 <= minutes <= 59
def test_timestamp_valid_seconds(self):
"""timestamp() seconds are 00-59."""
ts = misc.timestamp()
seconds = int(ts.split(':')[2])
assert 0 <= seconds <= 59
class TestTorProxyUrl:
"""Tests for tor_proxy_url() function."""
def test_basic_format(self):
"""tor_proxy_url() returns socks5:// prefix."""
result = misc.tor_proxy_url('127.0.0.1:9050')
assert result == 'socks5://127.0.0.1:9050'
def test_custom_host_port(self):
"""tor_proxy_url() works with custom host:port."""
result = misc.tor_proxy_url('10.200.1.1:9150')
assert result == 'socks5://10.200.1.1:9150'
def test_ipv6_host(self):
"""tor_proxy_url() handles IPv6 addresses."""
result = misc.tor_proxy_url('[::1]:9050')
assert result == 'socks5://[::1]:9050'
class TestLogLevel:
"""Tests for set_log_level() and get_log_level() functions."""
def setup_method(self):
"""Reset log level before each test."""
misc.set_log_level(1) # Default info level
def test_get_default_level(self):
"""get_log_level() returns default level."""
assert misc.get_log_level() == 1
def test_set_integer_level(self):
"""set_log_level() accepts integer."""
misc.set_log_level(0)
assert misc.get_log_level() == 0
misc.set_log_level(2)
assert misc.get_log_level() == 2
def test_set_string_debug(self):
"""set_log_level('debug') sets level 0."""
misc.set_log_level('debug')
assert misc.get_log_level() == 0
def test_set_string_info(self):
"""set_log_level('info') sets level 1."""
misc.set_log_level('info')
assert misc.get_log_level() == 1
def test_set_string_warn(self):
"""set_log_level('warn') sets level 2."""
misc.set_log_level('warn')
assert misc.get_log_level() == 2
def test_set_string_error(self):
"""set_log_level('error') sets level 3."""
misc.set_log_level('error')
assert misc.get_log_level() == 3
def test_set_string_none(self):
"""set_log_level('none') suppresses all."""
misc.set_log_level('none')
assert misc.get_log_level() == 99
def test_invalid_string_defaults_to_info(self):
"""set_log_level() with invalid string defaults to 1."""
misc.set_log_level('invalid')
assert misc.get_log_level() == 1
class TestIsSSLProtocolError:
"""Tests for is_ssl_protocol_error() function."""
def test_none_returns_false(self):
"""is_ssl_protocol_error(None) returns False."""
assert misc.is_ssl_protocol_error(None) is False
def test_empty_returns_false(self):
"""is_ssl_protocol_error('') returns False."""
assert misc.is_ssl_protocol_error('') is False
def test_wrong_version_number(self):
"""Detects 'wrong version number' as protocol error."""
assert misc.is_ssl_protocol_error('wrong version number') is True
def test_unsupported_protocol(self):
"""Detects 'unsupported protocol' as protocol error."""
assert misc.is_ssl_protocol_error('unsupported protocol') is True
def test_unexpected_eof(self):
"""Detects 'unexpected eof' as protocol error."""
assert misc.is_ssl_protocol_error('unexpected eof') is True
def test_eof_occurred(self):
"""Detects 'eof occurred' as protocol error."""
assert misc.is_ssl_protocol_error('eof occurred') is True
def test_alert_handshake_failure(self):
"""Detects 'alert handshake failure' as protocol error."""
assert misc.is_ssl_protocol_error('alert handshake failure') is True
def test_http_request(self):
"""Detects 'http request' as protocol error (sent HTTP to HTTPS)."""
assert misc.is_ssl_protocol_error('http request') is True
def test_no_ciphers_available(self):
"""Detects 'no ciphers available' as protocol error."""
assert misc.is_ssl_protocol_error('no ciphers available') is True
def test_case_insensitive(self):
"""is_ssl_protocol_error() is case-insensitive."""
assert misc.is_ssl_protocol_error('WRONG VERSION NUMBER') is True
assert misc.is_ssl_protocol_error('Wrong Version Number') is True
def test_certificate_error_not_protocol(self):
"""Certificate errors are not protocol errors."""
assert misc.is_ssl_protocol_error('certificate verify failed') is False
def test_hostname_mismatch_not_protocol(self):
"""Hostname mismatch is not protocol error."""
assert misc.is_ssl_protocol_error('hostname mismatch') is False
def test_expired_cert_not_protocol(self):
"""Expired certificate is not protocol error."""
assert misc.is_ssl_protocol_error('certificate has expired') is False
def test_embedded_in_message(self):
"""Detects pattern embedded in longer message."""
assert misc.is_ssl_protocol_error(
'SSL error: wrong version number in record') is True
class TestFailureConstants:
"""Tests for failure category constants."""
def test_constants_are_strings(self):
"""Failure constants are strings."""
assert isinstance(misc.FAIL_TIMEOUT, str)
assert isinstance(misc.FAIL_REFUSED, str)
assert isinstance(misc.FAIL_AUTH, str)
assert isinstance(misc.FAIL_UNREACHABLE, str)
assert isinstance(misc.FAIL_DNS, str)
assert isinstance(misc.FAIL_SSL, str)
assert isinstance(misc.FAIL_CLOSED, str)
assert isinstance(misc.FAIL_PROXY, str)
assert isinstance(misc.FAIL_OTHER, str)
def test_constants_unique(self):
"""Failure constants have unique values."""
constants = [
misc.FAIL_TIMEOUT,
misc.FAIL_REFUSED,
misc.FAIL_AUTH,
misc.FAIL_UNREACHABLE,
misc.FAIL_DNS,
misc.FAIL_SSL,
misc.FAIL_CLOSED,
misc.FAIL_PROXY,
misc.FAIL_OTHER,
]
assert len(constants) == len(set(constants))
def test_ssl_errors_contains_ssl(self):
"""SSL_ERRORS contains FAIL_SSL."""
assert misc.FAIL_SSL in misc.SSL_ERRORS
def test_conn_errors_contents(self):
"""CONN_ERRORS contains connection-related failures."""
assert misc.FAIL_TIMEOUT in misc.CONN_ERRORS
assert misc.FAIL_REFUSED in misc.CONN_ERRORS
assert misc.FAIL_UNREACHABLE in misc.CONN_ERRORS
assert misc.FAIL_CLOSED in misc.CONN_ERRORS
assert misc.FAIL_DNS in misc.CONN_ERRORS
# Auth and proxy errors are not connection errors
assert misc.FAIL_AUTH not in misc.CONN_ERRORS
assert misc.FAIL_PROXY not in misc.CONN_ERRORS
class TestLogLevels:
"""Tests for LOG_LEVELS dictionary."""
def test_debug_is_lowest(self):
"""debug is the lowest (most verbose) level."""
assert misc.LOG_LEVELS['debug'] == 0
def test_info_is_one(self):
"""info level is 1."""
assert misc.LOG_LEVELS['info'] == 1
def test_warn_is_two(self):
"""warn level is 2."""
assert misc.LOG_LEVELS['warn'] == 2
def test_error_is_three(self):
"""error level is 3."""
assert misc.LOG_LEVELS['error'] == 3
def test_none_suppresses_all(self):
"""none level (99) suppresses all output."""
assert misc.LOG_LEVELS['none'] == 99
def test_aliases_equal_info(self):
"""rate, scraper, stats, diag are aliases for info."""
assert misc.LOG_LEVELS['rate'] == misc.LOG_LEVELS['info']
assert misc.LOG_LEVELS['scraper'] == misc.LOG_LEVELS['info']
assert misc.LOG_LEVELS['stats'] == misc.LOG_LEVELS['info']
assert misc.LOG_LEVELS['diag'] == misc.LOG_LEVELS['info']