Compare commits
2 Commits
b300afed6c
...
2ea7eb41b7
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
2ea7eb41b7 | ||
|
|
98b232f3d3 |
41
fetch.py
41
fetch.py
@@ -221,6 +221,10 @@ def extract_auth_proxies(content):
|
|||||||
"""
|
"""
|
||||||
proxies = []
|
proxies = []
|
||||||
|
|
||||||
|
# Short-circuit: auth proxies always contain @
|
||||||
|
if '@' not in content:
|
||||||
|
return proxies
|
||||||
|
|
||||||
# IPv4 auth proxies
|
# IPv4 auth proxies
|
||||||
for match in AUTH_PROXY_PATTERN.finditer(content):
|
for match in AUTH_PROXY_PATTERN.finditer(content):
|
||||||
proto_str, user, passwd, ip, port = match.groups()
|
proto_str, user, passwd, ip, port = match.groups()
|
||||||
@@ -256,6 +260,12 @@ TABLE_PORT_HEADERS = ('port',)
|
|||||||
TABLE_PROTO_HEADERS = ('type', 'protocol', 'proto', 'scheme')
|
TABLE_PROTO_HEADERS = ('type', 'protocol', 'proto', 'scheme')
|
||||||
|
|
||||||
|
|
||||||
|
_TABLE_PATTERN = re.compile(r'<table[^>]*>(.*?)</table>', re.IGNORECASE | re.DOTALL)
|
||||||
|
_ROW_PATTERN = re.compile(r'<tr[^>]*>(.*?)</tr>', re.IGNORECASE | re.DOTALL)
|
||||||
|
_CELL_PATTERN = re.compile(r'<t[hd][^>]*>(.*?)</t[hd]>', re.IGNORECASE | re.DOTALL)
|
||||||
|
_TAG_STRIP = re.compile(r'<[^>]+>')
|
||||||
|
|
||||||
|
|
||||||
def extract_proxies_from_table(content):
|
def extract_proxies_from_table(content):
|
||||||
"""Extract proxies from HTML tables with IP/Port/Protocol columns.
|
"""Extract proxies from HTML tables with IP/Port/Protocol columns.
|
||||||
|
|
||||||
@@ -269,26 +279,23 @@ def extract_proxies_from_table(content):
|
|||||||
"""
|
"""
|
||||||
proxies = []
|
proxies = []
|
||||||
|
|
||||||
# Simple regex-based table parsing (works without BeautifulSoup)
|
# Short-circuit: no HTML tables in plain text content
|
||||||
# Find all tables
|
if '<table' not in content and '<TABLE' not in content:
|
||||||
table_pattern = re.compile(r'<table[^>]*>(.*?)</table>', re.IGNORECASE | re.DOTALL)
|
return proxies
|
||||||
row_pattern = re.compile(r'<tr[^>]*>(.*?)</tr>', re.IGNORECASE | re.DOTALL)
|
|
||||||
cell_pattern = re.compile(r'<t[hd][^>]*>(.*?)</t[hd]>', re.IGNORECASE | re.DOTALL)
|
|
||||||
tag_strip = re.compile(r'<[^>]+>')
|
|
||||||
|
|
||||||
for table_match in table_pattern.finditer(content):
|
for table_match in _TABLE_PATTERN.finditer(content):
|
||||||
table_html = table_match.group(1)
|
table_html = table_match.group(1)
|
||||||
rows = row_pattern.findall(table_html)
|
rows = _ROW_PATTERN.findall(table_html)
|
||||||
if not rows:
|
if not rows:
|
||||||
continue
|
continue
|
||||||
|
|
||||||
# Parse header row to find column indices
|
# Parse header row to find column indices
|
||||||
ip_col = port_col = proto_col = -1
|
ip_col = port_col = proto_col = -1
|
||||||
header_row = rows[0]
|
header_row = rows[0]
|
||||||
headers = cell_pattern.findall(header_row)
|
headers = _CELL_PATTERN.findall(header_row)
|
||||||
|
|
||||||
for i, cell in enumerate(headers):
|
for i, cell in enumerate(headers):
|
||||||
cell_text = tag_strip.sub('', cell).strip().lower()
|
cell_text = _TAG_STRIP.sub('', cell).strip().lower()
|
||||||
if ip_col < 0 and any(h in cell_text for h in TABLE_IP_HEADERS):
|
if ip_col < 0 and any(h in cell_text for h in TABLE_IP_HEADERS):
|
||||||
ip_col = i
|
ip_col = i
|
||||||
elif port_col < 0 and any(h in cell_text for h in TABLE_PORT_HEADERS):
|
elif port_col < 0 and any(h in cell_text for h in TABLE_PORT_HEADERS):
|
||||||
@@ -302,11 +309,11 @@ def extract_proxies_from_table(content):
|
|||||||
|
|
||||||
# Parse data rows
|
# Parse data rows
|
||||||
for row in rows[1:]:
|
for row in rows[1:]:
|
||||||
cells = cell_pattern.findall(row)
|
cells = _CELL_PATTERN.findall(row)
|
||||||
if len(cells) <= ip_col:
|
if len(cells) <= ip_col:
|
||||||
continue
|
continue
|
||||||
|
|
||||||
ip_cell = tag_strip.sub('', cells[ip_col]).strip()
|
ip_cell = _TAG_STRIP.sub('', cells[ip_col]).strip()
|
||||||
|
|
||||||
# Check if IP cell contains port (ip:port format)
|
# Check if IP cell contains port (ip:port format)
|
||||||
if ':' in ip_cell and port_col < 0:
|
if ':' in ip_cell and port_col < 0:
|
||||||
@@ -315,7 +322,7 @@ def extract_proxies_from_table(content):
|
|||||||
ip, port = match.groups()
|
ip, port = match.groups()
|
||||||
proto = None
|
proto = None
|
||||||
if proto_col >= 0 and len(cells) > proto_col:
|
if proto_col >= 0 and len(cells) > proto_col:
|
||||||
proto = _normalize_proto(tag_strip.sub('', cells[proto_col]).strip())
|
proto = _normalize_proto(_TAG_STRIP.sub('', cells[proto_col]).strip())
|
||||||
addr = '%s:%s' % (ip, port)
|
addr = '%s:%s' % (ip, port)
|
||||||
if is_usable_proxy(addr):
|
if is_usable_proxy(addr):
|
||||||
proxies.append((addr, proto))
|
proxies.append((addr, proto))
|
||||||
@@ -323,7 +330,7 @@ def extract_proxies_from_table(content):
|
|||||||
|
|
||||||
# Separate IP and Port columns
|
# Separate IP and Port columns
|
||||||
if port_col >= 0 and len(cells) > port_col:
|
if port_col >= 0 and len(cells) > port_col:
|
||||||
port_cell = tag_strip.sub('', cells[port_col]).strip()
|
port_cell = _TAG_STRIP.sub('', cells[port_col]).strip()
|
||||||
try:
|
try:
|
||||||
port = int(port_cell)
|
port = int(port_cell)
|
||||||
except ValueError:
|
except ValueError:
|
||||||
@@ -335,7 +342,7 @@ def extract_proxies_from_table(content):
|
|||||||
|
|
||||||
proto = None
|
proto = None
|
||||||
if proto_col >= 0 and len(cells) > proto_col:
|
if proto_col >= 0 and len(cells) > proto_col:
|
||||||
proto = _normalize_proto(tag_strip.sub('', cells[proto_col]).strip())
|
proto = _normalize_proto(_TAG_STRIP.sub('', cells[proto_col]).strip())
|
||||||
|
|
||||||
addr = '%s:%d' % (ip_cell, port)
|
addr = '%s:%d' % (ip_cell, port)
|
||||||
if is_usable_proxy(addr):
|
if is_usable_proxy(addr):
|
||||||
@@ -358,6 +365,10 @@ def extract_proxies_from_json(content):
|
|||||||
"""
|
"""
|
||||||
proxies = []
|
proxies = []
|
||||||
|
|
||||||
|
# Short-circuit: content must contain JSON delimiters
|
||||||
|
if '{' not in content and '[' not in content:
|
||||||
|
return proxies
|
||||||
|
|
||||||
# Try to find JSON in content (may be embedded in HTML)
|
# Try to find JSON in content (may be embedded in HTML)
|
||||||
json_matches = []
|
json_matches = []
|
||||||
|
|
||||||
|
|||||||
@@ -359,6 +359,198 @@ class TestExtractAuthProxies:
|
|||||||
assert fetch.extract_auth_proxies('just some text') == []
|
assert fetch.extract_auth_proxies('just some text') == []
|
||||||
|
|
||||||
|
|
||||||
|
class TestExtractAuthProxiesShortCircuit:
|
||||||
|
"""Tests for extract_auth_proxies() short-circuit on missing @."""
|
||||||
|
|
||||||
|
def test_no_at_sign_returns_empty(self):
|
||||||
|
"""Content without @ skips regex entirely."""
|
||||||
|
content = '1.2.3.4:8080 socks5://5.6.7.8:1080 plain text'
|
||||||
|
assert fetch.extract_auth_proxies(content) == []
|
||||||
|
|
||||||
|
def test_at_sign_still_extracts(self):
|
||||||
|
"""Content with @ still finds auth proxies."""
|
||||||
|
content = 'user:pass@1.2.3.4:8080'
|
||||||
|
result = fetch.extract_auth_proxies(content)
|
||||||
|
assert len(result) == 1
|
||||||
|
assert result[0][0] == 'user:pass@1.2.3.4:8080'
|
||||||
|
|
||||||
|
def test_at_sign_no_match_returns_empty(self):
|
||||||
|
"""Content with @ but no auth proxy pattern returns empty."""
|
||||||
|
content = 'email@example.com has no proxy'
|
||||||
|
assert fetch.extract_auth_proxies(content) == []
|
||||||
|
|
||||||
|
|
||||||
|
class TestExtractProxiesFromTable:
|
||||||
|
"""Tests for extract_proxies_from_table() with precompiled regexes."""
|
||||||
|
|
||||||
|
def test_no_table_returns_empty(self):
|
||||||
|
"""Plain text without <table> returns empty."""
|
||||||
|
content = '1.2.3.4:8080\n5.6.7.8:3128\n'
|
||||||
|
assert fetch.extract_proxies_from_table(content) == []
|
||||||
|
|
||||||
|
def test_simple_table(self):
|
||||||
|
"""Basic HTML table with IP/Port columns is parsed."""
|
||||||
|
content = '''
|
||||||
|
<table>
|
||||||
|
<tr><th>IP</th><th>Port</th><th>Type</th></tr>
|
||||||
|
<tr><td>1.2.3.4</td><td>8080</td><td>HTTP</td></tr>
|
||||||
|
<tr><td>5.6.7.8</td><td>1080</td><td>SOCKS5</td></tr>
|
||||||
|
</table>
|
||||||
|
'''
|
||||||
|
result = fetch.extract_proxies_from_table(content)
|
||||||
|
assert len(result) == 2
|
||||||
|
addrs = [r[0] for r in result]
|
||||||
|
assert '1.2.3.4:8080' in addrs
|
||||||
|
assert '5.6.7.8:1080' in addrs
|
||||||
|
|
||||||
|
def test_uppercase_table_tag(self):
|
||||||
|
"""<TABLE> (uppercase) is also detected."""
|
||||||
|
content = '''
|
||||||
|
<TABLE>
|
||||||
|
<TR><TH>IP</TH><TH>Port</TH></TR>
|
||||||
|
<TR><TD>1.2.3.4</TD><TD>8080</TD></TR>
|
||||||
|
</TABLE>
|
||||||
|
'''
|
||||||
|
result = fetch.extract_proxies_from_table(content)
|
||||||
|
assert len(result) == 1
|
||||||
|
|
||||||
|
def test_empty_table(self):
|
||||||
|
"""Table with headers but no data rows returns empty."""
|
||||||
|
content = '''
|
||||||
|
<table>
|
||||||
|
<tr><th>IP</th><th>Port</th></tr>
|
||||||
|
</table>
|
||||||
|
'''
|
||||||
|
result = fetch.extract_proxies_from_table(content)
|
||||||
|
assert result == []
|
||||||
|
|
||||||
|
|
||||||
|
class TestExtractProxiesFromJson:
|
||||||
|
"""Tests for extract_proxies_from_json() short-circuit."""
|
||||||
|
|
||||||
|
def test_no_braces_returns_empty(self):
|
||||||
|
"""Content without { or [ skips JSON parsing."""
|
||||||
|
content = '1.2.3.4:8080\n5.6.7.8:3128\n'
|
||||||
|
assert fetch.extract_proxies_from_json(content) == []
|
||||||
|
|
||||||
|
def test_json_array_of_objects(self):
|
||||||
|
"""JSON array with ip/port objects is parsed."""
|
||||||
|
content = '[{"ip": "1.2.3.4", "port": 8080}]'
|
||||||
|
result = fetch.extract_proxies_from_json(content)
|
||||||
|
assert len(result) >= 1
|
||||||
|
addrs = [r[0] for r in result]
|
||||||
|
assert '1.2.3.4:8080' in addrs
|
||||||
|
|
||||||
|
def test_json_array_of_strings(self):
|
||||||
|
"""JSON array of ip:port strings is parsed."""
|
||||||
|
content = '["1.2.3.4:8080", "5.6.7.8:3128"]'
|
||||||
|
result = fetch.extract_proxies_from_json(content)
|
||||||
|
addrs = [r[0] for r in result]
|
||||||
|
assert '1.2.3.4:8080' in addrs
|
||||||
|
assert '5.6.7.8:3128' in addrs
|
||||||
|
|
||||||
|
def test_plain_html_skips_json(self):
|
||||||
|
"""HTML without JSON delimiters returns empty."""
|
||||||
|
content = '<html><body>1.2.3.4:8080</body></html>'
|
||||||
|
# HTML has < and > but this function checks for { and [
|
||||||
|
# The < > chars won't trigger JSON parsing
|
||||||
|
result = fetch.extract_proxies_from_json(content)
|
||||||
|
# May or may not find anything depending on HTML structure
|
||||||
|
# but should not crash
|
||||||
|
assert isinstance(result, list)
|
||||||
|
|
||||||
|
|
||||||
|
class TestExtractProxiesWithHints:
|
||||||
|
"""Tests for extract_proxies_with_hints()."""
|
||||||
|
|
||||||
|
def test_proto_before_ip(self):
|
||||||
|
"""Protocol keyword before IP:PORT is detected."""
|
||||||
|
content = 'socks5 1.2.3.4:8080'
|
||||||
|
result = fetch.extract_proxies_with_hints(content)
|
||||||
|
assert '1.2.3.4:8080' in result
|
||||||
|
assert result['1.2.3.4:8080'] == 'socks5'
|
||||||
|
|
||||||
|
def test_proto_after_ip(self):
|
||||||
|
"""Protocol keyword after IP:PORT is detected."""
|
||||||
|
content = '1.2.3.4:8080 socks5'
|
||||||
|
result = fetch.extract_proxies_with_hints(content)
|
||||||
|
assert '1.2.3.4:8080' in result
|
||||||
|
|
||||||
|
def test_no_hints_returns_empty(self):
|
||||||
|
"""Plain IP:PORT without protocol hints returns empty."""
|
||||||
|
content = '1.2.3.4:8080'
|
||||||
|
result = fetch.extract_proxies_with_hints(content)
|
||||||
|
assert result == {}
|
||||||
|
|
||||||
|
|
||||||
|
class TestExtractProxiesIntegration:
|
||||||
|
"""Integration tests for extract_proxies() combining all extractors."""
|
||||||
|
|
||||||
|
def test_plain_text_proxy_list(self):
|
||||||
|
"""Plain text IP:PORT list extracts correctly."""
|
||||||
|
content = '1.2.3.4:8080\n5.6.7.8:3128\n9.10.11.12:1080\n'
|
||||||
|
result = fetch.extract_proxies(content, filter_known=False)
|
||||||
|
addrs = [r[0] for r in result]
|
||||||
|
assert '1.2.3.4:8080' in addrs
|
||||||
|
assert '5.6.7.8:3128' in addrs
|
||||||
|
assert '9.10.11.12:1080' in addrs
|
||||||
|
|
||||||
|
def test_auth_proxies_extracted(self):
|
||||||
|
"""Auth proxies found in mixed content."""
|
||||||
|
content = 'user:pass@1.2.3.4:8080\n5.6.7.8:3128\n'
|
||||||
|
result = fetch.extract_proxies(content, filter_known=False)
|
||||||
|
addrs = [r[0] for r in result]
|
||||||
|
assert 'user:pass@1.2.3.4:8080' in addrs
|
||||||
|
assert '5.6.7.8:3128' in addrs
|
||||||
|
|
||||||
|
def test_html_table_extraction(self):
|
||||||
|
"""Proxies extracted from HTML table."""
|
||||||
|
content = '''
|
||||||
|
<table>
|
||||||
|
<tr><th>IP</th><th>Port</th></tr>
|
||||||
|
<tr><td>1.2.3.4</td><td>8080</td></tr>
|
||||||
|
</table>
|
||||||
|
'''
|
||||||
|
result = fetch.extract_proxies(content, filter_known=False)
|
||||||
|
addrs = [r[0] for r in result]
|
||||||
|
assert '1.2.3.4:8080' in addrs
|
||||||
|
|
||||||
|
def test_json_extraction(self):
|
||||||
|
"""Proxies extracted from JSON content."""
|
||||||
|
content = '[{"ip": "1.2.3.4", "port": 8080}]'
|
||||||
|
result = fetch.extract_proxies(content, filter_known=False)
|
||||||
|
addrs = [r[0] for r in result]
|
||||||
|
assert '1.2.3.4:8080' in addrs
|
||||||
|
|
||||||
|
def test_empty_content(self):
|
||||||
|
"""Empty content returns no proxies."""
|
||||||
|
result = fetch.extract_proxies('', filter_known=False)
|
||||||
|
assert result == []
|
||||||
|
|
||||||
|
def test_private_ips_filtered(self):
|
||||||
|
"""Private IPs are not returned."""
|
||||||
|
content = '10.0.0.1:8080\n192.168.1.1:3128\n1.2.3.4:8080\n'
|
||||||
|
result = fetch.extract_proxies(content, filter_known=False)
|
||||||
|
addrs = [r[0] for r in result]
|
||||||
|
assert '10.0.0.1:8080' not in addrs
|
||||||
|
assert '192.168.1.1:3128' not in addrs
|
||||||
|
assert '1.2.3.4:8080' in addrs
|
||||||
|
|
||||||
|
def test_proto_from_hints(self):
|
||||||
|
"""Protocol hints are picked up."""
|
||||||
|
content = 'socks5 1.2.3.4:8080\n'
|
||||||
|
result = fetch.extract_proxies(content, filter_known=False)
|
||||||
|
protos = {r[0]: r[1] for r in result}
|
||||||
|
assert protos.get('1.2.3.4:8080') == 'socks5'
|
||||||
|
|
||||||
|
def test_proto_from_arg(self):
|
||||||
|
"""Fallback proto from argument is used."""
|
||||||
|
content = '1.2.3.4:8080\n'
|
||||||
|
result = fetch.extract_proxies(content, filter_known=False, proto='socks4')
|
||||||
|
protos = {r[0]: r[1] for r in result}
|
||||||
|
assert protos.get('1.2.3.4:8080') == 'socks4'
|
||||||
|
|
||||||
|
|
||||||
class TestConfidenceScoring:
|
class TestConfidenceScoring:
|
||||||
"""Tests for confidence score constants."""
|
"""Tests for confidence score constants."""
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user