# -*- coding: utf-8 -*- """Tests for dbs.py database operations.""" from __future__ import print_function import sys import os import pytest sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) import dbs class TestIsCdnIp: """Tests for is_cdn_ip() function.""" def test_cloudflare_ips(self): """Cloudflare IPs are detected as CDN.""" assert dbs.is_cdn_ip('141.101.1.1') is True assert dbs.is_cdn_ip('141.101.255.255') is True assert dbs.is_cdn_ip('104.16.1.1') is True assert dbs.is_cdn_ip('172.64.1.1') is True def test_fastly_ips(self): """Fastly IPs are detected as CDN.""" assert dbs.is_cdn_ip('151.101.1.1') is True assert dbs.is_cdn_ip('151.101.128.1') is True def test_akamai_ips(self): """Akamai IPs are detected as CDN.""" assert dbs.is_cdn_ip('23.32.1.1') is True assert dbs.is_cdn_ip('23.64.1.1') is True def test_cloudfront_ips(self): """Amazon CloudFront IPs are detected as CDN.""" assert dbs.is_cdn_ip('13.32.1.1') is True assert dbs.is_cdn_ip('13.224.1.1') is True def test_google_ips(self): """Google IPs are detected as CDN.""" assert dbs.is_cdn_ip('34.64.1.1') is True assert dbs.is_cdn_ip('34.71.1.1') is True def test_regular_ips_not_cdn(self): """Regular public IPs are not CDN.""" assert dbs.is_cdn_ip('1.2.3.4') is False assert dbs.is_cdn_ip('8.8.8.8') is False assert dbs.is_cdn_ip('203.0.113.50') is False def test_edge_case_prefix_mismatch(self): """Similar but non-CDN prefixes are not detected.""" assert dbs.is_cdn_ip('141.100.1.1') is False # Not 141.101. assert dbs.is_cdn_ip('104.15.1.1') is False # Not 104.16. class TestComputeProxyListHash: """Tests for compute_proxy_list_hash() function.""" def test_empty_list_returns_none(self): """Empty list returns None.""" assert dbs.compute_proxy_list_hash([]) is None assert dbs.compute_proxy_list_hash(None) is None def test_single_proxy_hash(self): """Single proxy produces consistent hash.""" hash1 = dbs.compute_proxy_list_hash(['1.2.3.4:8080']) hash2 = dbs.compute_proxy_list_hash(['1.2.3.4:8080']) assert hash1 == hash2 assert len(hash1) == 32 # MD5 hex length def test_order_independent(self): """Hash is order-independent (sorted internally).""" hash1 = dbs.compute_proxy_list_hash(['1.2.3.4:8080', '5.6.7.8:3128']) hash2 = dbs.compute_proxy_list_hash(['5.6.7.8:3128', '1.2.3.4:8080']) assert hash1 == hash2 def test_different_lists_different_hash(self): """Different proxy lists produce different hashes.""" hash1 = dbs.compute_proxy_list_hash(['1.2.3.4:8080']) hash2 = dbs.compute_proxy_list_hash(['5.6.7.8:3128']) assert hash1 != hash2 def test_tuple_format(self): """Handles tuple format (address, proto).""" hash1 = dbs.compute_proxy_list_hash([('1.2.3.4:8080', 'socks5')]) hash2 = dbs.compute_proxy_list_hash(['1.2.3.4:8080']) # Should extract address from tuple assert hash1 == hash2 class TestCreateTableIfNotExists: """Tests for create_table_if_not_exists() function.""" def test_create_proxylist_table(self, temp_db): """Creates proxylist table with correct schema.""" sqlite, _ = temp_db dbs.create_table_if_not_exists(sqlite, 'proxylist') # Verify table exists by querying it result = sqlite.execute('SELECT COUNT(*) FROM proxylist').fetchone() assert result[0] == 0 def test_create_uris_table(self, temp_db): """Creates uris table with correct schema.""" sqlite, _ = temp_db dbs.create_table_if_not_exists(sqlite, 'uris') # Verify table exists result = sqlite.execute('SELECT COUNT(*) FROM uris').fetchone() assert result[0] == 0 def test_idempotent_creation(self, temp_db): """Calling twice doesn't cause error.""" sqlite, _ = temp_db dbs.create_table_if_not_exists(sqlite, 'proxylist') dbs.create_table_if_not_exists(sqlite, 'proxylist') # No exception means success def test_proxylist_has_required_columns(self, proxy_db): """Proxylist table has all required columns.""" sqlite, _ = proxy_db # Insert a row to test columns sqlite.execute( 'INSERT INTO proxylist (added, proxy, ip, port, failed) VALUES (?, ?, ?, ?, ?)', (1234567890, '1.2.3.4:8080', '1.2.3.4', '8080', 0) ) sqlite.commit() # Verify we can query various columns row = sqlite.execute( 'SELECT proxy, ip, port, proto, failed, tested, avg_latency, anonymity ' 'FROM proxylist WHERE proxy = ?', ('1.2.3.4:8080',) ).fetchone() assert row is not None assert row[0] == '1.2.3.4:8080' class TestInsertProxies: """Tests for insert_proxies() function.""" def test_insert_plain_strings(self, proxy_db): """Insert plain proxy strings.""" sqlite, _ = proxy_db proxies = ['1.2.3.4:8080', '5.6.7.8:3128'] dbs.insert_proxies(sqlite, proxies, 'http://test.com') count = sqlite.execute('SELECT COUNT(*) FROM proxylist').fetchone()[0] assert count == 2 def test_insert_tuples_with_proto(self, proxy_db): """Insert tuples with protocol.""" sqlite, _ = proxy_db proxies = [('1.2.3.4:8080', 'socks5'), ('5.6.7.8:3128', 'http')] dbs.insert_proxies(sqlite, proxies, 'http://test.com') row = sqlite.execute( 'SELECT proto FROM proxylist WHERE proxy = ?', ('1.2.3.4:8080',) ).fetchone() assert row[0] == 'socks5' def test_insert_tuples_with_confidence(self, proxy_db): """Insert tuples with confidence score.""" sqlite, _ = proxy_db proxies = [('1.2.3.4:8080', 'socks5', 85)] dbs.insert_proxies(sqlite, proxies, 'http://test.com') row = sqlite.execute( 'SELECT confidence FROM proxylist WHERE proxy = ?', ('1.2.3.4:8080',) ).fetchone() assert row[0] == 85 def test_filters_cdn_ips(self, proxy_db): """CDN IPs are filtered out.""" sqlite, _ = proxy_db proxies = [ '1.2.3.4:8080', # Regular - should be inserted '141.101.1.1:8080', # Cloudflare CDN - should be filtered ] dbs.insert_proxies(sqlite, proxies, 'http://test.com') count = sqlite.execute('SELECT COUNT(*) FROM proxylist').fetchone()[0] assert count == 1 # Only non-CDN proxy inserted def test_empty_list_no_error(self, proxy_db): """Empty list doesn't cause error.""" sqlite, _ = proxy_db dbs.insert_proxies(sqlite, [], 'http://test.com') # No exception means success def test_duplicate_ignored(self, proxy_db): """Duplicate proxies are ignored (INSERT OR IGNORE).""" sqlite, _ = proxy_db dbs.insert_proxies(sqlite, ['1.2.3.4:8080'], 'http://test1.com') dbs.insert_proxies(sqlite, ['1.2.3.4:8080'], 'http://test2.com') count = sqlite.execute('SELECT COUNT(*) FROM proxylist').fetchone()[0] assert count == 1 class TestInsertUrls: """Tests for insert_urls() function.""" def test_insert_new_urls(self, uri_db): """Insert new URLs returns count of inserted.""" sqlite, _ = uri_db urls = ['http://example.com/1', 'http://example.com/2'] count = dbs.insert_urls(urls, 'test query', sqlite) assert count == 2 def test_duplicate_urls_not_counted(self, uri_db): """Duplicate URLs not counted in return value.""" sqlite, _ = uri_db urls = ['http://example.com/1'] count1 = dbs.insert_urls(urls, 'test query', sqlite) count2 = dbs.insert_urls(urls, 'test query', sqlite) assert count1 == 1 assert count2 == 0 def test_mixed_new_and_duplicate(self, uri_db): """Mixed new and duplicate URLs counted correctly.""" sqlite, _ = uri_db dbs.insert_urls(['http://example.com/1'], 'test', sqlite) count = dbs.insert_urls( ['http://example.com/1', 'http://example.com/2', 'http://example.com/3'], 'test', sqlite ) assert count == 2 # Only 2 new URLs def test_empty_list_returns_zero(self, uri_db): """Empty list returns 0.""" sqlite, _ = uri_db count = dbs.insert_urls([], 'test', sqlite) assert count == 0 class TestUpdateProxyLatency: """Tests for update_proxy_latency() function.""" def test_first_latency_sample(self, proxy_db): """First latency sample sets avg_latency directly.""" sqlite, _ = proxy_db dbs.insert_proxies(sqlite, ['1.2.3.4:8080'], 'http://test.com') dbs.update_proxy_latency(sqlite, '1.2.3.4:8080', 100.0) sqlite.commit() row = sqlite.execute( 'SELECT avg_latency, latency_samples FROM proxylist WHERE proxy = ?', ('1.2.3.4:8080',) ).fetchone() assert row[0] == 100.0 assert row[1] == 1 def test_ema_calculation(self, proxy_db): """Exponential moving average is calculated correctly.""" sqlite, _ = proxy_db dbs.insert_proxies(sqlite, ['1.2.3.4:8080'], 'http://test.com') # First sample: 100ms dbs.update_proxy_latency(sqlite, '1.2.3.4:8080', 100.0) sqlite.commit() # Second sample: 50ms # EMA: alpha = 2/(2+1) = 0.667, new_avg = 0.667*50 + 0.333*100 = 66.67 dbs.update_proxy_latency(sqlite, '1.2.3.4:8080', 50.0) sqlite.commit() row = sqlite.execute( 'SELECT avg_latency, latency_samples FROM proxylist WHERE proxy = ?', ('1.2.3.4:8080',) ).fetchone() assert row[1] == 2 # Check EMA is roughly correct (allow for floating point) assert 65 < row[0] < 68 def test_nonexistent_proxy_no_error(self, proxy_db): """Updating nonexistent proxy doesn't cause error.""" sqlite, _ = proxy_db dbs.update_proxy_latency(sqlite, 'nonexistent:8080', 100.0) # No exception means success class TestBatchUpdateProxyLatency: """Tests for batch_update_proxy_latency() function.""" def test_batch_update_multiple(self, proxy_db): """Batch update updates multiple proxies.""" sqlite, _ = proxy_db dbs.insert_proxies(sqlite, ['1.2.3.4:8080', '5.6.7.8:3128'], 'http://test.com') updates = [('1.2.3.4:8080', 100.0), ('5.6.7.8:3128', 200.0)] dbs.batch_update_proxy_latency(sqlite, updates) sqlite.commit() row1 = sqlite.execute( 'SELECT avg_latency FROM proxylist WHERE proxy = ?', ('1.2.3.4:8080',) ).fetchone() row2 = sqlite.execute( 'SELECT avg_latency FROM proxylist WHERE proxy = ?', ('5.6.7.8:3128',) ).fetchone() assert row1[0] == 100.0 assert row2[0] == 200.0 def test_empty_list_no_error(self, proxy_db): """Empty update list doesn't cause error.""" sqlite, _ = proxy_db dbs.batch_update_proxy_latency(sqlite, []) # No exception means success class TestUpdateProxyAnonymity: """Tests for update_proxy_anonymity() function.""" def test_transparent_proxy(self, proxy_db): """Transparent proxy detected when exit_ip equals proxy_ip.""" sqlite, _ = proxy_db dbs.insert_proxies(sqlite, ['1.2.3.4:8080'], 'http://test.com') dbs.update_proxy_anonymity(sqlite, '1.2.3.4:8080', '1.2.3.4', '1.2.3.4') sqlite.commit() row = sqlite.execute( 'SELECT anonymity, exit_ip FROM proxylist WHERE proxy = ?', ('1.2.3.4:8080',) ).fetchone() assert row[0] == 'transparent' assert row[1] == '1.2.3.4' def test_elite_proxy(self, proxy_db): """Elite proxy detected when exit_ip differs and no revealing headers.""" sqlite, _ = proxy_db dbs.insert_proxies(sqlite, ['1.2.3.4:8080'], 'http://test.com') dbs.update_proxy_anonymity(sqlite, '1.2.3.4:8080', '5.6.7.8', '1.2.3.4', reveals_headers=False) sqlite.commit() row = sqlite.execute( 'SELECT anonymity FROM proxylist WHERE proxy = ?', ('1.2.3.4:8080',) ).fetchone() assert row[0] == 'elite' def test_anonymous_proxy(self, proxy_db): """Anonymous proxy detected when exit_ip differs but reveals headers.""" sqlite, _ = proxy_db dbs.insert_proxies(sqlite, ['1.2.3.4:8080'], 'http://test.com') dbs.update_proxy_anonymity(sqlite, '1.2.3.4:8080', '5.6.7.8', '1.2.3.4', reveals_headers=True) sqlite.commit() row = sqlite.execute( 'SELECT anonymity FROM proxylist WHERE proxy = ?', ('1.2.3.4:8080',) ).fetchone() assert row[0] == 'anonymous' def test_normalizes_leading_zeros(self, proxy_db): """IP addresses with leading zeros are normalized.""" sqlite, _ = proxy_db dbs.insert_proxies(sqlite, ['1.2.3.4:8080'], 'http://test.com') # Same IP with leading zeros should be detected as transparent dbs.update_proxy_anonymity(sqlite, '1.2.3.4:8080', '001.002.003.004', '1.2.3.4') sqlite.commit() row = sqlite.execute( 'SELECT anonymity FROM proxylist WHERE proxy = ?', ('1.2.3.4:8080',) ).fetchone() assert row[0] == 'transparent' class TestGetDatabaseStats: """Tests for get_database_stats() function.""" def test_empty_database_stats(self, full_db): """Empty database returns zero counts.""" sqlite, _ = full_db stats = dbs.get_database_stats(sqlite) assert stats['proxy_count'] == 0 assert stats['working_count'] == 0 assert 'page_count' in stats assert 'total_size' in stats def test_stats_after_inserts(self, full_db): """Stats reflect inserted proxies.""" sqlite, _ = full_db dbs.insert_proxies(sqlite, ['1.2.3.4:8080', '5.6.7.8:3128'], 'http://test.com') stats = dbs.get_database_stats(sqlite) assert stats['proxy_count'] == 2 class TestAnalyzeVacuum: """Tests for analyze_database() and vacuum_database() functions.""" def test_analyze_no_error(self, proxy_db): """analyze_database() runs without error.""" sqlite, _ = proxy_db dbs.analyze_database(sqlite) # No exception means success def test_vacuum_no_error(self, proxy_db): """vacuum_database() runs without error.""" sqlite, _ = proxy_db dbs.vacuum_database(sqlite) # No exception means success def test_analyze_vacuum_sequence(self, proxy_db): """Running analyze then vacuum works.""" sqlite, _ = proxy_db dbs.insert_proxies(sqlite, ['1.2.3.4:8080'], 'http://test.com') dbs.analyze_database(sqlite) dbs.vacuum_database(sqlite) # Database still valid count = sqlite.execute('SELECT COUNT(*) FROM proxylist').fetchone()[0] assert count == 1