Compare commits

...

2 Commits

Author SHA1 Message Date
Username
2ea7eb41b7 tests: add extraction short-circuit and integration tests
All checks were successful
CI / validate (push) Successful in 19s
Cover short-circuit guards, table/JSON/hint extraction,
and full extract_proxies() integration (82 tests, all passing).
2026-02-22 13:50:34 +01:00
Username
98b232f3d3 fetch: add short-circuit guards to extraction functions
Skip expensive regex scans when content lacks required markers:
- extract_auth_proxies: skip if no '@' in content
- extract_proxies_from_table: skip if no '<table' tag
- extract_proxies_from_json: skip if no '{' or '['
- Hoist table regexes to module-level precompiled constants
2026-02-22 13:50:29 +01:00
2 changed files with 218 additions and 15 deletions

View File

@@ -221,6 +221,10 @@ def extract_auth_proxies(content):
"""
proxies = []
# Short-circuit: auth proxies always contain @
if '@' not in content:
return proxies
# IPv4 auth proxies
for match in AUTH_PROXY_PATTERN.finditer(content):
proto_str, user, passwd, ip, port = match.groups()
@@ -256,6 +260,12 @@ TABLE_PORT_HEADERS = ('port',)
TABLE_PROTO_HEADERS = ('type', 'protocol', 'proto', 'scheme')
_TABLE_PATTERN = re.compile(r'<table[^>]*>(.*?)</table>', re.IGNORECASE | re.DOTALL)
_ROW_PATTERN = re.compile(r'<tr[^>]*>(.*?)</tr>', re.IGNORECASE | re.DOTALL)
_CELL_PATTERN = re.compile(r'<t[hd][^>]*>(.*?)</t[hd]>', re.IGNORECASE | re.DOTALL)
_TAG_STRIP = re.compile(r'<[^>]+>')
def extract_proxies_from_table(content):
"""Extract proxies from HTML tables with IP/Port/Protocol columns.
@@ -269,26 +279,23 @@ def extract_proxies_from_table(content):
"""
proxies = []
# Simple regex-based table parsing (works without BeautifulSoup)
# Find all tables
table_pattern = re.compile(r'<table[^>]*>(.*?)</table>', re.IGNORECASE | re.DOTALL)
row_pattern = re.compile(r'<tr[^>]*>(.*?)</tr>', re.IGNORECASE | re.DOTALL)
cell_pattern = re.compile(r'<t[hd][^>]*>(.*?)</t[hd]>', re.IGNORECASE | re.DOTALL)
tag_strip = re.compile(r'<[^>]+>')
# Short-circuit: no HTML tables in plain text content
if '<table' not in content and '<TABLE' not in content:
return proxies
for table_match in table_pattern.finditer(content):
for table_match in _TABLE_PATTERN.finditer(content):
table_html = table_match.group(1)
rows = row_pattern.findall(table_html)
rows = _ROW_PATTERN.findall(table_html)
if not rows:
continue
# Parse header row to find column indices
ip_col = port_col = proto_col = -1
header_row = rows[0]
headers = cell_pattern.findall(header_row)
headers = _CELL_PATTERN.findall(header_row)
for i, cell in enumerate(headers):
cell_text = tag_strip.sub('', cell).strip().lower()
cell_text = _TAG_STRIP.sub('', cell).strip().lower()
if ip_col < 0 and any(h in cell_text for h in TABLE_IP_HEADERS):
ip_col = i
elif port_col < 0 and any(h in cell_text for h in TABLE_PORT_HEADERS):
@@ -302,11 +309,11 @@ def extract_proxies_from_table(content):
# Parse data rows
for row in rows[1:]:
cells = cell_pattern.findall(row)
cells = _CELL_PATTERN.findall(row)
if len(cells) <= ip_col:
continue
ip_cell = tag_strip.sub('', cells[ip_col]).strip()
ip_cell = _TAG_STRIP.sub('', cells[ip_col]).strip()
# Check if IP cell contains port (ip:port format)
if ':' in ip_cell and port_col < 0:
@@ -315,7 +322,7 @@ def extract_proxies_from_table(content):
ip, port = match.groups()
proto = None
if proto_col >= 0 and len(cells) > proto_col:
proto = _normalize_proto(tag_strip.sub('', cells[proto_col]).strip())
proto = _normalize_proto(_TAG_STRIP.sub('', cells[proto_col]).strip())
addr = '%s:%s' % (ip, port)
if is_usable_proxy(addr):
proxies.append((addr, proto))
@@ -323,7 +330,7 @@ def extract_proxies_from_table(content):
# Separate IP and Port columns
if port_col >= 0 and len(cells) > port_col:
port_cell = tag_strip.sub('', cells[port_col]).strip()
port_cell = _TAG_STRIP.sub('', cells[port_col]).strip()
try:
port = int(port_cell)
except ValueError:
@@ -335,7 +342,7 @@ def extract_proxies_from_table(content):
proto = None
if proto_col >= 0 and len(cells) > proto_col:
proto = _normalize_proto(tag_strip.sub('', cells[proto_col]).strip())
proto = _normalize_proto(_TAG_STRIP.sub('', cells[proto_col]).strip())
addr = '%s:%d' % (ip_cell, port)
if is_usable_proxy(addr):
@@ -358,6 +365,10 @@ def extract_proxies_from_json(content):
"""
proxies = []
# Short-circuit: content must contain JSON delimiters
if '{' not in content and '[' not in content:
return proxies
# Try to find JSON in content (may be embedded in HTML)
json_matches = []

View File

@@ -359,6 +359,198 @@ class TestExtractAuthProxies:
assert fetch.extract_auth_proxies('just some text') == []
class TestExtractAuthProxiesShortCircuit:
"""Tests for extract_auth_proxies() short-circuit on missing @."""
def test_no_at_sign_returns_empty(self):
"""Content without @ skips regex entirely."""
content = '1.2.3.4:8080 socks5://5.6.7.8:1080 plain text'
assert fetch.extract_auth_proxies(content) == []
def test_at_sign_still_extracts(self):
"""Content with @ still finds auth proxies."""
content = 'user:pass@1.2.3.4:8080'
result = fetch.extract_auth_proxies(content)
assert len(result) == 1
assert result[0][0] == 'user:pass@1.2.3.4:8080'
def test_at_sign_no_match_returns_empty(self):
"""Content with @ but no auth proxy pattern returns empty."""
content = 'email@example.com has no proxy'
assert fetch.extract_auth_proxies(content) == []
class TestExtractProxiesFromTable:
"""Tests for extract_proxies_from_table() with precompiled regexes."""
def test_no_table_returns_empty(self):
"""Plain text without <table> returns empty."""
content = '1.2.3.4:8080\n5.6.7.8:3128\n'
assert fetch.extract_proxies_from_table(content) == []
def test_simple_table(self):
"""Basic HTML table with IP/Port columns is parsed."""
content = '''
<table>
<tr><th>IP</th><th>Port</th><th>Type</th></tr>
<tr><td>1.2.3.4</td><td>8080</td><td>HTTP</td></tr>
<tr><td>5.6.7.8</td><td>1080</td><td>SOCKS5</td></tr>
</table>
'''
result = fetch.extract_proxies_from_table(content)
assert len(result) == 2
addrs = [r[0] for r in result]
assert '1.2.3.4:8080' in addrs
assert '5.6.7.8:1080' in addrs
def test_uppercase_table_tag(self):
"""<TABLE> (uppercase) is also detected."""
content = '''
<TABLE>
<TR><TH>IP</TH><TH>Port</TH></TR>
<TR><TD>1.2.3.4</TD><TD>8080</TD></TR>
</TABLE>
'''
result = fetch.extract_proxies_from_table(content)
assert len(result) == 1
def test_empty_table(self):
"""Table with headers but no data rows returns empty."""
content = '''
<table>
<tr><th>IP</th><th>Port</th></tr>
</table>
'''
result = fetch.extract_proxies_from_table(content)
assert result == []
class TestExtractProxiesFromJson:
"""Tests for extract_proxies_from_json() short-circuit."""
def test_no_braces_returns_empty(self):
"""Content without { or [ skips JSON parsing."""
content = '1.2.3.4:8080\n5.6.7.8:3128\n'
assert fetch.extract_proxies_from_json(content) == []
def test_json_array_of_objects(self):
"""JSON array with ip/port objects is parsed."""
content = '[{"ip": "1.2.3.4", "port": 8080}]'
result = fetch.extract_proxies_from_json(content)
assert len(result) >= 1
addrs = [r[0] for r in result]
assert '1.2.3.4:8080' in addrs
def test_json_array_of_strings(self):
"""JSON array of ip:port strings is parsed."""
content = '["1.2.3.4:8080", "5.6.7.8:3128"]'
result = fetch.extract_proxies_from_json(content)
addrs = [r[0] for r in result]
assert '1.2.3.4:8080' in addrs
assert '5.6.7.8:3128' in addrs
def test_plain_html_skips_json(self):
"""HTML without JSON delimiters returns empty."""
content = '<html><body>1.2.3.4:8080</body></html>'
# HTML has < and > but this function checks for { and [
# The < > chars won't trigger JSON parsing
result = fetch.extract_proxies_from_json(content)
# May or may not find anything depending on HTML structure
# but should not crash
assert isinstance(result, list)
class TestExtractProxiesWithHints:
"""Tests for extract_proxies_with_hints()."""
def test_proto_before_ip(self):
"""Protocol keyword before IP:PORT is detected."""
content = 'socks5 1.2.3.4:8080'
result = fetch.extract_proxies_with_hints(content)
assert '1.2.3.4:8080' in result
assert result['1.2.3.4:8080'] == 'socks5'
def test_proto_after_ip(self):
"""Protocol keyword after IP:PORT is detected."""
content = '1.2.3.4:8080 socks5'
result = fetch.extract_proxies_with_hints(content)
assert '1.2.3.4:8080' in result
def test_no_hints_returns_empty(self):
"""Plain IP:PORT without protocol hints returns empty."""
content = '1.2.3.4:8080'
result = fetch.extract_proxies_with_hints(content)
assert result == {}
class TestExtractProxiesIntegration:
"""Integration tests for extract_proxies() combining all extractors."""
def test_plain_text_proxy_list(self):
"""Plain text IP:PORT list extracts correctly."""
content = '1.2.3.4:8080\n5.6.7.8:3128\n9.10.11.12:1080\n'
result = fetch.extract_proxies(content, filter_known=False)
addrs = [r[0] for r in result]
assert '1.2.3.4:8080' in addrs
assert '5.6.7.8:3128' in addrs
assert '9.10.11.12:1080' in addrs
def test_auth_proxies_extracted(self):
"""Auth proxies found in mixed content."""
content = 'user:pass@1.2.3.4:8080\n5.6.7.8:3128\n'
result = fetch.extract_proxies(content, filter_known=False)
addrs = [r[0] for r in result]
assert 'user:pass@1.2.3.4:8080' in addrs
assert '5.6.7.8:3128' in addrs
def test_html_table_extraction(self):
"""Proxies extracted from HTML table."""
content = '''
<table>
<tr><th>IP</th><th>Port</th></tr>
<tr><td>1.2.3.4</td><td>8080</td></tr>
</table>
'''
result = fetch.extract_proxies(content, filter_known=False)
addrs = [r[0] for r in result]
assert '1.2.3.4:8080' in addrs
def test_json_extraction(self):
"""Proxies extracted from JSON content."""
content = '[{"ip": "1.2.3.4", "port": 8080}]'
result = fetch.extract_proxies(content, filter_known=False)
addrs = [r[0] for r in result]
assert '1.2.3.4:8080' in addrs
def test_empty_content(self):
"""Empty content returns no proxies."""
result = fetch.extract_proxies('', filter_known=False)
assert result == []
def test_private_ips_filtered(self):
"""Private IPs are not returned."""
content = '10.0.0.1:8080\n192.168.1.1:3128\n1.2.3.4:8080\n'
result = fetch.extract_proxies(content, filter_known=False)
addrs = [r[0] for r in result]
assert '10.0.0.1:8080' not in addrs
assert '192.168.1.1:3128' not in addrs
assert '1.2.3.4:8080' in addrs
def test_proto_from_hints(self):
"""Protocol hints are picked up."""
content = 'socks5 1.2.3.4:8080\n'
result = fetch.extract_proxies(content, filter_known=False)
protos = {r[0]: r[1] for r in result}
assert protos.get('1.2.3.4:8080') == 'socks5'
def test_proto_from_arg(self):
"""Fallback proto from argument is used."""
content = '1.2.3.4:8080\n'
result = fetch.extract_proxies(content, filter_known=False, proto='socks4')
protos = {r[0]: r[1] for r in result}
assert protos.get('1.2.3.4:8080') == 'socks4'
class TestConfidenceScoring:
"""Tests for confidence score constants."""