Compare commits
2 Commits
b300afed6c
...
2ea7eb41b7
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
2ea7eb41b7 | ||
|
|
98b232f3d3 |
41
fetch.py
41
fetch.py
@@ -221,6 +221,10 @@ def extract_auth_proxies(content):
|
||||
"""
|
||||
proxies = []
|
||||
|
||||
# Short-circuit: auth proxies always contain @
|
||||
if '@' not in content:
|
||||
return proxies
|
||||
|
||||
# IPv4 auth proxies
|
||||
for match in AUTH_PROXY_PATTERN.finditer(content):
|
||||
proto_str, user, passwd, ip, port = match.groups()
|
||||
@@ -256,6 +260,12 @@ TABLE_PORT_HEADERS = ('port',)
|
||||
TABLE_PROTO_HEADERS = ('type', 'protocol', 'proto', 'scheme')
|
||||
|
||||
|
||||
_TABLE_PATTERN = re.compile(r'<table[^>]*>(.*?)</table>', re.IGNORECASE | re.DOTALL)
|
||||
_ROW_PATTERN = re.compile(r'<tr[^>]*>(.*?)</tr>', re.IGNORECASE | re.DOTALL)
|
||||
_CELL_PATTERN = re.compile(r'<t[hd][^>]*>(.*?)</t[hd]>', re.IGNORECASE | re.DOTALL)
|
||||
_TAG_STRIP = re.compile(r'<[^>]+>')
|
||||
|
||||
|
||||
def extract_proxies_from_table(content):
|
||||
"""Extract proxies from HTML tables with IP/Port/Protocol columns.
|
||||
|
||||
@@ -269,26 +279,23 @@ def extract_proxies_from_table(content):
|
||||
"""
|
||||
proxies = []
|
||||
|
||||
# Simple regex-based table parsing (works without BeautifulSoup)
|
||||
# Find all tables
|
||||
table_pattern = re.compile(r'<table[^>]*>(.*?)</table>', re.IGNORECASE | re.DOTALL)
|
||||
row_pattern = re.compile(r'<tr[^>]*>(.*?)</tr>', re.IGNORECASE | re.DOTALL)
|
||||
cell_pattern = re.compile(r'<t[hd][^>]*>(.*?)</t[hd]>', re.IGNORECASE | re.DOTALL)
|
||||
tag_strip = re.compile(r'<[^>]+>')
|
||||
# Short-circuit: no HTML tables in plain text content
|
||||
if '<table' not in content and '<TABLE' not in content:
|
||||
return proxies
|
||||
|
||||
for table_match in table_pattern.finditer(content):
|
||||
for table_match in _TABLE_PATTERN.finditer(content):
|
||||
table_html = table_match.group(1)
|
||||
rows = row_pattern.findall(table_html)
|
||||
rows = _ROW_PATTERN.findall(table_html)
|
||||
if not rows:
|
||||
continue
|
||||
|
||||
# Parse header row to find column indices
|
||||
ip_col = port_col = proto_col = -1
|
||||
header_row = rows[0]
|
||||
headers = cell_pattern.findall(header_row)
|
||||
headers = _CELL_PATTERN.findall(header_row)
|
||||
|
||||
for i, cell in enumerate(headers):
|
||||
cell_text = tag_strip.sub('', cell).strip().lower()
|
||||
cell_text = _TAG_STRIP.sub('', cell).strip().lower()
|
||||
if ip_col < 0 and any(h in cell_text for h in TABLE_IP_HEADERS):
|
||||
ip_col = i
|
||||
elif port_col < 0 and any(h in cell_text for h in TABLE_PORT_HEADERS):
|
||||
@@ -302,11 +309,11 @@ def extract_proxies_from_table(content):
|
||||
|
||||
# Parse data rows
|
||||
for row in rows[1:]:
|
||||
cells = cell_pattern.findall(row)
|
||||
cells = _CELL_PATTERN.findall(row)
|
||||
if len(cells) <= ip_col:
|
||||
continue
|
||||
|
||||
ip_cell = tag_strip.sub('', cells[ip_col]).strip()
|
||||
ip_cell = _TAG_STRIP.sub('', cells[ip_col]).strip()
|
||||
|
||||
# Check if IP cell contains port (ip:port format)
|
||||
if ':' in ip_cell and port_col < 0:
|
||||
@@ -315,7 +322,7 @@ def extract_proxies_from_table(content):
|
||||
ip, port = match.groups()
|
||||
proto = None
|
||||
if proto_col >= 0 and len(cells) > proto_col:
|
||||
proto = _normalize_proto(tag_strip.sub('', cells[proto_col]).strip())
|
||||
proto = _normalize_proto(_TAG_STRIP.sub('', cells[proto_col]).strip())
|
||||
addr = '%s:%s' % (ip, port)
|
||||
if is_usable_proxy(addr):
|
||||
proxies.append((addr, proto))
|
||||
@@ -323,7 +330,7 @@ def extract_proxies_from_table(content):
|
||||
|
||||
# Separate IP and Port columns
|
||||
if port_col >= 0 and len(cells) > port_col:
|
||||
port_cell = tag_strip.sub('', cells[port_col]).strip()
|
||||
port_cell = _TAG_STRIP.sub('', cells[port_col]).strip()
|
||||
try:
|
||||
port = int(port_cell)
|
||||
except ValueError:
|
||||
@@ -335,7 +342,7 @@ def extract_proxies_from_table(content):
|
||||
|
||||
proto = None
|
||||
if proto_col >= 0 and len(cells) > proto_col:
|
||||
proto = _normalize_proto(tag_strip.sub('', cells[proto_col]).strip())
|
||||
proto = _normalize_proto(_TAG_STRIP.sub('', cells[proto_col]).strip())
|
||||
|
||||
addr = '%s:%d' % (ip_cell, port)
|
||||
if is_usable_proxy(addr):
|
||||
@@ -358,6 +365,10 @@ def extract_proxies_from_json(content):
|
||||
"""
|
||||
proxies = []
|
||||
|
||||
# Short-circuit: content must contain JSON delimiters
|
||||
if '{' not in content and '[' not in content:
|
||||
return proxies
|
||||
|
||||
# Try to find JSON in content (may be embedded in HTML)
|
||||
json_matches = []
|
||||
|
||||
|
||||
@@ -359,6 +359,198 @@ class TestExtractAuthProxies:
|
||||
assert fetch.extract_auth_proxies('just some text') == []
|
||||
|
||||
|
||||
class TestExtractAuthProxiesShortCircuit:
|
||||
"""Tests for extract_auth_proxies() short-circuit on missing @."""
|
||||
|
||||
def test_no_at_sign_returns_empty(self):
|
||||
"""Content without @ skips regex entirely."""
|
||||
content = '1.2.3.4:8080 socks5://5.6.7.8:1080 plain text'
|
||||
assert fetch.extract_auth_proxies(content) == []
|
||||
|
||||
def test_at_sign_still_extracts(self):
|
||||
"""Content with @ still finds auth proxies."""
|
||||
content = 'user:pass@1.2.3.4:8080'
|
||||
result = fetch.extract_auth_proxies(content)
|
||||
assert len(result) == 1
|
||||
assert result[0][0] == 'user:pass@1.2.3.4:8080'
|
||||
|
||||
def test_at_sign_no_match_returns_empty(self):
|
||||
"""Content with @ but no auth proxy pattern returns empty."""
|
||||
content = 'email@example.com has no proxy'
|
||||
assert fetch.extract_auth_proxies(content) == []
|
||||
|
||||
|
||||
class TestExtractProxiesFromTable:
|
||||
"""Tests for extract_proxies_from_table() with precompiled regexes."""
|
||||
|
||||
def test_no_table_returns_empty(self):
|
||||
"""Plain text without <table> returns empty."""
|
||||
content = '1.2.3.4:8080\n5.6.7.8:3128\n'
|
||||
assert fetch.extract_proxies_from_table(content) == []
|
||||
|
||||
def test_simple_table(self):
|
||||
"""Basic HTML table with IP/Port columns is parsed."""
|
||||
content = '''
|
||||
<table>
|
||||
<tr><th>IP</th><th>Port</th><th>Type</th></tr>
|
||||
<tr><td>1.2.3.4</td><td>8080</td><td>HTTP</td></tr>
|
||||
<tr><td>5.6.7.8</td><td>1080</td><td>SOCKS5</td></tr>
|
||||
</table>
|
||||
'''
|
||||
result = fetch.extract_proxies_from_table(content)
|
||||
assert len(result) == 2
|
||||
addrs = [r[0] for r in result]
|
||||
assert '1.2.3.4:8080' in addrs
|
||||
assert '5.6.7.8:1080' in addrs
|
||||
|
||||
def test_uppercase_table_tag(self):
|
||||
"""<TABLE> (uppercase) is also detected."""
|
||||
content = '''
|
||||
<TABLE>
|
||||
<TR><TH>IP</TH><TH>Port</TH></TR>
|
||||
<TR><TD>1.2.3.4</TD><TD>8080</TD></TR>
|
||||
</TABLE>
|
||||
'''
|
||||
result = fetch.extract_proxies_from_table(content)
|
||||
assert len(result) == 1
|
||||
|
||||
def test_empty_table(self):
|
||||
"""Table with headers but no data rows returns empty."""
|
||||
content = '''
|
||||
<table>
|
||||
<tr><th>IP</th><th>Port</th></tr>
|
||||
</table>
|
||||
'''
|
||||
result = fetch.extract_proxies_from_table(content)
|
||||
assert result == []
|
||||
|
||||
|
||||
class TestExtractProxiesFromJson:
|
||||
"""Tests for extract_proxies_from_json() short-circuit."""
|
||||
|
||||
def test_no_braces_returns_empty(self):
|
||||
"""Content without { or [ skips JSON parsing."""
|
||||
content = '1.2.3.4:8080\n5.6.7.8:3128\n'
|
||||
assert fetch.extract_proxies_from_json(content) == []
|
||||
|
||||
def test_json_array_of_objects(self):
|
||||
"""JSON array with ip/port objects is parsed."""
|
||||
content = '[{"ip": "1.2.3.4", "port": 8080}]'
|
||||
result = fetch.extract_proxies_from_json(content)
|
||||
assert len(result) >= 1
|
||||
addrs = [r[0] for r in result]
|
||||
assert '1.2.3.4:8080' in addrs
|
||||
|
||||
def test_json_array_of_strings(self):
|
||||
"""JSON array of ip:port strings is parsed."""
|
||||
content = '["1.2.3.4:8080", "5.6.7.8:3128"]'
|
||||
result = fetch.extract_proxies_from_json(content)
|
||||
addrs = [r[0] for r in result]
|
||||
assert '1.2.3.4:8080' in addrs
|
||||
assert '5.6.7.8:3128' in addrs
|
||||
|
||||
def test_plain_html_skips_json(self):
|
||||
"""HTML without JSON delimiters returns empty."""
|
||||
content = '<html><body>1.2.3.4:8080</body></html>'
|
||||
# HTML has < and > but this function checks for { and [
|
||||
# The < > chars won't trigger JSON parsing
|
||||
result = fetch.extract_proxies_from_json(content)
|
||||
# May or may not find anything depending on HTML structure
|
||||
# but should not crash
|
||||
assert isinstance(result, list)
|
||||
|
||||
|
||||
class TestExtractProxiesWithHints:
|
||||
"""Tests for extract_proxies_with_hints()."""
|
||||
|
||||
def test_proto_before_ip(self):
|
||||
"""Protocol keyword before IP:PORT is detected."""
|
||||
content = 'socks5 1.2.3.4:8080'
|
||||
result = fetch.extract_proxies_with_hints(content)
|
||||
assert '1.2.3.4:8080' in result
|
||||
assert result['1.2.3.4:8080'] == 'socks5'
|
||||
|
||||
def test_proto_after_ip(self):
|
||||
"""Protocol keyword after IP:PORT is detected."""
|
||||
content = '1.2.3.4:8080 socks5'
|
||||
result = fetch.extract_proxies_with_hints(content)
|
||||
assert '1.2.3.4:8080' in result
|
||||
|
||||
def test_no_hints_returns_empty(self):
|
||||
"""Plain IP:PORT without protocol hints returns empty."""
|
||||
content = '1.2.3.4:8080'
|
||||
result = fetch.extract_proxies_with_hints(content)
|
||||
assert result == {}
|
||||
|
||||
|
||||
class TestExtractProxiesIntegration:
|
||||
"""Integration tests for extract_proxies() combining all extractors."""
|
||||
|
||||
def test_plain_text_proxy_list(self):
|
||||
"""Plain text IP:PORT list extracts correctly."""
|
||||
content = '1.2.3.4:8080\n5.6.7.8:3128\n9.10.11.12:1080\n'
|
||||
result = fetch.extract_proxies(content, filter_known=False)
|
||||
addrs = [r[0] for r in result]
|
||||
assert '1.2.3.4:8080' in addrs
|
||||
assert '5.6.7.8:3128' in addrs
|
||||
assert '9.10.11.12:1080' in addrs
|
||||
|
||||
def test_auth_proxies_extracted(self):
|
||||
"""Auth proxies found in mixed content."""
|
||||
content = 'user:pass@1.2.3.4:8080\n5.6.7.8:3128\n'
|
||||
result = fetch.extract_proxies(content, filter_known=False)
|
||||
addrs = [r[0] for r in result]
|
||||
assert 'user:pass@1.2.3.4:8080' in addrs
|
||||
assert '5.6.7.8:3128' in addrs
|
||||
|
||||
def test_html_table_extraction(self):
|
||||
"""Proxies extracted from HTML table."""
|
||||
content = '''
|
||||
<table>
|
||||
<tr><th>IP</th><th>Port</th></tr>
|
||||
<tr><td>1.2.3.4</td><td>8080</td></tr>
|
||||
</table>
|
||||
'''
|
||||
result = fetch.extract_proxies(content, filter_known=False)
|
||||
addrs = [r[0] for r in result]
|
||||
assert '1.2.3.4:8080' in addrs
|
||||
|
||||
def test_json_extraction(self):
|
||||
"""Proxies extracted from JSON content."""
|
||||
content = '[{"ip": "1.2.3.4", "port": 8080}]'
|
||||
result = fetch.extract_proxies(content, filter_known=False)
|
||||
addrs = [r[0] for r in result]
|
||||
assert '1.2.3.4:8080' in addrs
|
||||
|
||||
def test_empty_content(self):
|
||||
"""Empty content returns no proxies."""
|
||||
result = fetch.extract_proxies('', filter_known=False)
|
||||
assert result == []
|
||||
|
||||
def test_private_ips_filtered(self):
|
||||
"""Private IPs are not returned."""
|
||||
content = '10.0.0.1:8080\n192.168.1.1:3128\n1.2.3.4:8080\n'
|
||||
result = fetch.extract_proxies(content, filter_known=False)
|
||||
addrs = [r[0] for r in result]
|
||||
assert '10.0.0.1:8080' not in addrs
|
||||
assert '192.168.1.1:3128' not in addrs
|
||||
assert '1.2.3.4:8080' in addrs
|
||||
|
||||
def test_proto_from_hints(self):
|
||||
"""Protocol hints are picked up."""
|
||||
content = 'socks5 1.2.3.4:8080\n'
|
||||
result = fetch.extract_proxies(content, filter_known=False)
|
||||
protos = {r[0]: r[1] for r in result}
|
||||
assert protos.get('1.2.3.4:8080') == 'socks5'
|
||||
|
||||
def test_proto_from_arg(self):
|
||||
"""Fallback proto from argument is used."""
|
||||
content = '1.2.3.4:8080\n'
|
||||
result = fetch.extract_proxies(content, filter_known=False, proto='socks4')
|
||||
protos = {r[0]: r[1] for r in result}
|
||||
assert protos.get('1.2.3.4:8080') == 'socks4'
|
||||
|
||||
|
||||
class TestConfidenceScoring:
|
||||
"""Tests for confidence score constants."""
|
||||
|
||||
|
||||
Reference in New Issue
Block a user