Files
ppf/fetch.py
Username 98b232f3d3 fetch: add short-circuit guards to extraction functions
Skip expensive regex scans when content lacks required markers:
- extract_auth_proxies: skip if no '@' in content
- extract_proxies_from_table: skip if no '<table' tag
- extract_proxies_from_json: skip if no '{' or '['
- Hoist table regexes to module-level precompiled constants
2026-02-22 13:50:29 +01:00

945 lines
30 KiB
Python

import re, random, time, string
import json
import threading
from collections import OrderedDict
import rocksock
import network_stats
from http2 import RsHttp, _parse_url
from soup_parser import soupify
from misc import _log, tor_proxy_url
config = None
# LRU cache for is_usable_proxy() - avoids repeated validation of same proxy strings
# Uses OrderedDict to maintain insertion order; oldest entries evicted when full
_proxy_valid_cache = OrderedDict()
_proxy_valid_cache_max = 10000
_proxy_valid_cache_lock = threading.Lock()
class FetchSession(object):
"""Reusable fetch session with persistent Tor circuit.
Maintains HTTP connection and Tor credentials across multiple requests.
Call cycle() to get a new Tor circuit when blocked.
"""
def __init__(self):
self.http = None
self.current_host = None
self.current_port = None
self.current_ssl = None
self.tor_url = None
self._new_circuit()
def _new_circuit(self):
"""Generate new Tor credentials for a fresh circuit."""
if config and config.torhosts:
torhost = random.choice(config.torhosts)
self.tor_url = tor_proxy_url(torhost)
def cycle(self):
"""Cycle to a new Tor circuit (call when blocked)."""
self.close()
self._new_circuit()
def close(self):
"""Close current connection."""
if self.http:
try:
self.http.disconnect()
except Exception:
pass
self.http = None
self.current_host = None
def fetch(self, url, head=False):
"""Fetch URL, reusing connection if possible."""
network_stats.set_category('scraper')
if isinstance(url, unicode):
url = url.encode('utf-8')
host, port, ssl, uri = _parse_url(url)
# Check if we can reuse existing connection
if (self.http and self.current_host == host and
self.current_port == port and self.current_ssl == ssl):
# Reuse existing connection
try:
if head:
return self.http.head(uri, [
'Accept-Language: en-US,en;q=0.8',
'Cache-Control: max-age=0',
])
hdr, res = self.http.get(uri, [
'Accept-Language: en-US,en;q=0.8',
'Cache-Control: max-age=0',
])
res = res.encode('utf-8') if isinstance(res, unicode) else res
return res
except Exception:
# Connection died, close and reconnect
self.close()
# Need new connection
self.close()
if not self.tor_url:
self._new_circuit()
proxies = [rocksock.RocksockProxyFromURL(self.tor_url)]
self.http = RsHttp(
host, ssl=ssl, port=port, keep_alive=True,
timeout=config.ppf.timeout, max_tries=config.ppf.http_retries,
follow_redirects=True, auto_set_cookies=True, proxies=proxies,
user_agent='Mozilla/5.0 (Windows NT 6.1; rv:60.0) Gecko/20100101 Firefox/60.0',
log_errors=False
)
if not self.http.connect():
self.close()
return None
self.current_host = host
self.current_port = port
self.current_ssl = ssl
try:
if head:
return self.http.head(uri, [
'Accept-Language: en-US,en;q=0.8',
'Cache-Control: max-age=0',
])
hdr, res = self.http.get(uri, [
'Accept-Language: en-US,en;q=0.8',
'Cache-Control: max-age=0',
])
res = res.encode('utf-8') if isinstance(res, unicode) else res
return res
except Exception:
self.close()
return None
_last_fail_log = 0
_fail_log_interval = 60
def set_config(cfg):
global config
config = cfg
# Pre-compiled regex patterns (compiled once at module load)
cleanhtml_re = [
re.compile(r'<.*?>'),
re.compile(r'\s+'),
re.compile(r'::+'),
]
# Proxy extraction pattern: IP:PORT followed by non-digit or end
# Pattern: 1-3 digits, dot, repeated 3 times, colon, 2-5 digit port
PROXY_PATTERN = re.compile(r'([0-9]+(?:\.[0-9]+){3}:[0-9]{2,5})[\D$]')
# IPv6 proxy pattern: [ipv6]:port
# IPv6 can contain hex digits and colons, enclosed in brackets for URL format
IPV6_PROXY_PATTERN = re.compile(
r'\[([0-9a-fA-F:]+)\]:([0-9]{2,5})'
)
# Auth proxy pattern: user:pass@IP:PORT or proto://user:pass@IP:PORT
# Captures: (proto, user, pass, ip, port)
AUTH_PROXY_PATTERN = re.compile(
r'(?:(socks5|socks4a?|https?|http|ssl|tor)://)?' # optional protocol
r'([a-zA-Z0-9._-]+):([a-zA-Z0-9._-]+)@' # user:pass@
r'([0-9]+(?:\.[0-9]+){3}):([0-9]{2,5})', # ip:port
re.IGNORECASE
)
# IPv6 auth pattern: user:pass@[ipv6]:port
AUTH_IPV6_PATTERN = re.compile(
r'(?:(socks5|socks4a?|https?|http|ssl|tor)://)?' # optional protocol
r'([a-zA-Z0-9._-]+):([a-zA-Z0-9._-]+)@' # user:pass@
r'\[([0-9a-fA-F:]+)\]:([0-9]{2,5})', # [ipv6]:port
re.IGNORECASE
)
# Protocol hint patterns - look for protocol keywords near IP:PORT
PROTO_HINT_PATTERN = re.compile(
r'(socks5|socks4a?|https?|connect|ssl|tor)\s*[:\-_\s]*'
r'([0-9]+(?:\.[0-9]+){3}:[0-9]{2,5})',
re.IGNORECASE
)
PROTO_HINT_REVERSE = re.compile(
r'([0-9]+(?:\.[0-9]+){3}:[0-9]{2,5})\s*[:\-_\|,\s]*'
r'(socks5|socks4a?|https?|http|connect|ssl|tor)',
re.IGNORECASE
)
# JSON field names commonly used for proxy data
JSON_IP_FIELDS = ('ip', 'host', 'address', 'addr', 'server', 'proxy_address')
JSON_PORT_FIELDS = ('port', 'proxy_port')
JSON_PROTO_FIELDS = ('type', 'protocol', 'proto', 'scheme', 'proxy_type')
JSON_USER_FIELDS = ('user', 'username', 'login', 'usr')
JSON_PASS_FIELDS = ('pass', 'password', 'pwd', 'passwd')
# Confidence scoring for extraction methods
# Higher scores indicate more reliable extraction
CONFIDENCE_AUTH = 90 # Authenticated proxy (usually paid sources)
CONFIDENCE_JSON = 80 # JSON API with structured fields
CONFIDENCE_TABLE = 70 # HTML table with columns
CONFIDENCE_HINT = 60 # Protocol hint in surrounding text
CONFIDENCE_URL_PROTO = 50 # Protocol inferred from URL path
CONFIDENCE_REGEX = 30 # Raw regex extraction
# Bonus for protocol detection
CONFIDENCE_PROTO_EXPLICIT = 15 # Protocol explicitly stated
CONFIDENCE_PROTO_INFERRED = 5 # Protocol from URL path
def _normalize_proto(proto_str):
"""Normalize protocol string to standard form."""
if not proto_str:
return None
p = proto_str.lower().strip()
if p in ('socks5', 's5', 'tor'):
return 'socks5'
if p in ('socks4', 'socks4a', 's4'):
return 'socks4'
if p in ('http', 'https', 'connect', 'ssl'):
return 'http'
return None
def extract_auth_proxies(content):
"""Extract authenticated proxies from content.
Matches patterns like:
- user:pass@1.2.3.4:8080
- socks5://user:pass@1.2.3.4:8080
- http://user:pass@1.2.3.4:8080
- user:pass@[2001:db8::1]:8080 (IPv6)
Returns:
List of (address, proto) tuples where address is user:pass@ip:port
"""
proxies = []
# Short-circuit: auth proxies always contain @
if '@' not in content:
return proxies
# IPv4 auth proxies
for match in AUTH_PROXY_PATTERN.finditer(content):
proto_str, user, passwd, ip, port = match.groups()
proto = _normalize_proto(proto_str)
# Normalize IP (remove leading zeros)
ip = '.'.join(str(int(o)) for o in ip.split('.'))
port = int(port)
# Build address with auth
addr = '%s:%s@%s:%d' % (user, passwd, ip, port)
proxies.append((addr, proto))
# IPv6 auth proxies
for match in AUTH_IPV6_PATTERN.finditer(content):
proto_str, user, passwd, ipv6, port = match.groups()
proto = _normalize_proto(proto_str)
port = int(port)
if not is_valid_ipv6(ipv6):
continue
# Build address with auth and bracketed IPv6
addr = '%s:%s@[%s]:%d' % (user, passwd, ipv6, port)
proxies.append((addr, proto))
return proxies
# Table column header patterns for identifying proxy data columns
TABLE_IP_HEADERS = ('ip', 'address', 'host', 'server', 'proxy')
TABLE_PORT_HEADERS = ('port',)
TABLE_PROTO_HEADERS = ('type', 'protocol', 'proto', 'scheme')
_TABLE_PATTERN = re.compile(r'<table[^>]*>(.*?)</table>', re.IGNORECASE | re.DOTALL)
_ROW_PATTERN = re.compile(r'<tr[^>]*>(.*?)</tr>', re.IGNORECASE | re.DOTALL)
_CELL_PATTERN = re.compile(r'<t[hd][^>]*>(.*?)</t[hd]>', re.IGNORECASE | re.DOTALL)
_TAG_STRIP = re.compile(r'<[^>]+>')
def extract_proxies_from_table(content):
"""Extract proxies from HTML tables with IP/Port/Protocol columns.
Handles tables like:
| IP Address | Port | Type |
|------------|------|---------|
| 1.2.3.4 | 8080 | SOCKS5 |
Returns:
List of (address, proto) tuples
"""
proxies = []
# Short-circuit: no HTML tables in plain text content
if '<table' not in content and '<TABLE' not in content:
return proxies
for table_match in _TABLE_PATTERN.finditer(content):
table_html = table_match.group(1)
rows = _ROW_PATTERN.findall(table_html)
if not rows:
continue
# Parse header row to find column indices
ip_col = port_col = proto_col = -1
header_row = rows[0]
headers = _CELL_PATTERN.findall(header_row)
for i, cell in enumerate(headers):
cell_text = _TAG_STRIP.sub('', cell).strip().lower()
if ip_col < 0 and any(h in cell_text for h in TABLE_IP_HEADERS):
ip_col = i
elif port_col < 0 and any(h in cell_text for h in TABLE_PORT_HEADERS):
port_col = i
elif proto_col < 0 and any(h in cell_text for h in TABLE_PROTO_HEADERS):
proto_col = i
# Need at least IP column (port might be in same cell)
if ip_col < 0:
continue
# Parse data rows
for row in rows[1:]:
cells = _CELL_PATTERN.findall(row)
if len(cells) <= ip_col:
continue
ip_cell = _TAG_STRIP.sub('', cells[ip_col]).strip()
# Check if IP cell contains port (ip:port format)
if ':' in ip_cell and port_col < 0:
match = re.match(r'([0-9]+\.[0-9]+\.[0-9]+\.[0-9]+):([0-9]+)', ip_cell)
if match:
ip, port = match.groups()
proto = None
if proto_col >= 0 and len(cells) > proto_col:
proto = _normalize_proto(_TAG_STRIP.sub('', cells[proto_col]).strip())
addr = '%s:%s' % (ip, port)
if is_usable_proxy(addr):
proxies.append((addr, proto))
continue
# Separate IP and Port columns
if port_col >= 0 and len(cells) > port_col:
port_cell = _TAG_STRIP.sub('', cells[port_col]).strip()
try:
port = int(port_cell)
except ValueError:
continue
# Validate IP format
if not re.match(r'^[0-9]+\.[0-9]+\.[0-9]+\.[0-9]+$', ip_cell):
continue
proto = None
if proto_col >= 0 and len(cells) > proto_col:
proto = _normalize_proto(_TAG_STRIP.sub('', cells[proto_col]).strip())
addr = '%s:%d' % (ip_cell, port)
if is_usable_proxy(addr):
proxies.append((addr, proto))
return proxies
def extract_proxies_from_json(content):
"""Extract proxies from JSON content.
Handles common JSON formats:
- Array of objects: [{"ip": "1.2.3.4", "port": 8080}, ...]
- Array of strings: ["1.2.3.4:8080", ...]
- Object with data array: {"data": [...], "proxies": [...]}
- Nested structures with ip/host/port/protocol fields
Returns:
List of (address, proto) tuples
"""
proxies = []
# Short-circuit: content must contain JSON delimiters
if '{' not in content and '[' not in content:
return proxies
# Try to find JSON in content (may be embedded in HTML)
json_matches = []
# Look for JSON arrays
for match in re.finditer(r'\[[\s\S]*?\]', content):
json_matches.append(match.group())
# Look for JSON objects
for match in re.finditer(r'\{[\s\S]*?\}', content):
json_matches.append(match.group())
for json_str in json_matches:
try:
data = json.loads(json_str)
proxies.extend(_extract_from_json_data(data))
except (ValueError, TypeError):
continue
return proxies
def _extract_from_json_data(data, parent_proto=None):
"""Recursively extract proxies from parsed JSON data.
Returns list of (address, proto) tuples where address may include
auth credentials as user:pass@ip:port.
"""
proxies = []
if isinstance(data, list):
for item in data:
if isinstance(item, dict):
proxies.extend(_extract_from_json_data(item, parent_proto))
elif isinstance(item, basestring):
# Try to parse as IP:PORT or user:pass@IP:PORT string
item = item.strip()
if re.match(r'^[0-9]+\.[0-9]+\.[0-9]+\.[0-9]+:[0-9]+$', item):
proxies.append((item, parent_proto))
elif re.match(r'^[^:]+:[^@]+@[0-9]+\.[0-9]+\.[0-9]+\.[0-9]+:[0-9]+$', item):
proxies.append((item, parent_proto))
elif isinstance(data, dict):
# Look for ip/port/user/pass fields
ip = None
port = None
proto = parent_proto
user = None
passwd = None
for key, value in data.items():
key_lower = key.lower()
if key_lower in JSON_IP_FIELDS and isinstance(value, basestring):
ip = value.strip()
elif key_lower in JSON_PORT_FIELDS:
try:
port = int(value)
except (ValueError, TypeError):
pass
elif key_lower in JSON_PROTO_FIELDS and isinstance(value, basestring):
proto = _normalize_proto(value)
elif key_lower in JSON_USER_FIELDS and isinstance(value, basestring):
user = value.strip()
elif key_lower in JSON_PASS_FIELDS and isinstance(value, basestring):
passwd = value.strip()
if ip and port:
if user and passwd:
addr = '%s:%s@%s:%d' % (user, passwd, ip, port)
else:
addr = '%s:%d' % (ip, port)
proxies.append((addr, proto))
# Check for nested arrays (data, proxies, list, items, etc.)
for key, value in data.items():
if isinstance(value, (list, dict)):
proxies.extend(_extract_from_json_data(value, proto))
return proxies
def extract_proxies_with_hints(content):
"""Extract proxies with protocol hints from surrounding context.
Looks for patterns like:
- "socks5 1.2.3.4:8080"
- "1.2.3.4:8080 (http)"
- "SOCKS5: 1.2.3.4:8080"
- Table rows with protocol in adjacent column
Returns:
Dict mapping address -> proto (or None if no hint)
"""
hints = {}
# Pattern: protocol before IP:PORT
for match in PROTO_HINT_PATTERN.finditer(content):
proto = _normalize_proto(match.group(1))
addr = match.group(2)
if proto:
hints[addr] = proto
# Pattern: IP:PORT before protocol
for match in PROTO_HINT_REVERSE.finditer(content):
addr = match.group(1)
proto = _normalize_proto(match.group(2))
if proto and addr not in hints:
hints[addr] = proto
return hints
def cleanhtml(raw_html):
html = raw_html.replace('&nbsp;', ' ')
html = re.sub(cleanhtml_re[0], ':', html)
html = re.sub(cleanhtml_re[1], ':', html)
html = re.sub(cleanhtml_re[2], ':', html)
return html
def fetch_contents(url, head=False, proxy=None):
content = None
if proxy is not None and len(proxy):
for p in proxy:
content = _fetch_contents(url, head=head, proxy=p)
if content is not None: break
else:
content = _fetch_contents(url, head=head)
return content if content is not None else ''
retry_messages = ('Engines cannot retrieve results', 'Rate limit exceeded')
def _fetch_contents(url, head = False, proxy=None):
network_stats.set_category('scraper')
if isinstance(url, unicode):
url = url.encode('utf-8')
host, port, ssl, uri = _parse_url(url)
headers=[
'Accept-Language: en-US,en;q=0.8',
'Cache-Control: max-age=0',
]
if config.ppf.debug:
_log("connecting to %s... (header: %s)" % (url, str(head)), "debug")
tor_retries = 0
max_tor_retries = 1
http = None
try:
while True:
proxies = [rocksock.RocksockProxyFromURL(tor_proxy_url(random.choice(config.torhosts)))]
if proxy: proxies.append( rocksock.RocksockProxyFromURL(proxy))
http = RsHttp(host,ssl=ssl,port=port, keep_alive=True, timeout=config.ppf.timeout, max_tries=config.ppf.http_retries, follow_redirects=True, auto_set_cookies=True, proxies=proxies, user_agent='Mozilla/5.0 (Windows NT 6.1; rv:60.0) Gecko/20100101 Firefox/60.0', log_errors=False)
if not http.connect():
http.disconnect()
http = None
tor_retries += 1
if tor_retries <= max_tor_retries:
# Retry once with different circuit
time.sleep(1)
continue
# Log failure after retries exhausted
global _last_fail_log
now = time.time()
if (now - _last_fail_log) >= _fail_log_interval:
_log("failed to connect to %s"%url, "ppf")
_last_fail_log = now
return None
break
## only request header
if head:
hdr = http.head(uri, headers)
return hdr
hdr, res = http.get(uri, headers)
res = res.encode('utf-8') if isinstance(res, unicode) else res
for retry_message in retry_messages:
if retry_message in res: return None
return res
finally:
if http:
http.disconnect()
def valid_port(port):
"""Check if port number is valid (1-65535)."""
return port >= 1 and port <= 65535
def is_valid_ipv6(addr):
"""Validate IPv6 address format.
Rejects:
- Malformed addresses
- Loopback (::1)
- Link-local (fe80::/10)
- Unique local (fc00::/7)
- Multicast (ff00::/8)
- Unspecified (::)
"""
# Basic format check - must contain colons, only hex digits and colons
if not re.match(r'^[0-9a-fA-F:]+$', addr):
return False
# Check for valid segment count (2-8 segments, :: expands to fill)
if '::' in addr:
if addr.count('::') > 1:
return False
else:
if addr.count(':') != 7:
return False
# Reject special addresses
addr_lower = addr.lower()
# Loopback ::1
if addr_lower in ('::1', '0:0:0:0:0:0:0:1'):
return False
# Unspecified ::
if addr_lower in ('::', '0:0:0:0:0:0:0:0'):
return False
# Link-local fe80::/10
if addr_lower.startswith('fe8') or addr_lower.startswith('fe9') or \
addr_lower.startswith('fea') or addr_lower.startswith('feb'):
return False
# Unique local fc00::/7 (fc00:: - fdff::)
if addr_lower.startswith('fc') or addr_lower.startswith('fd'):
return False
# Multicast ff00::/8
if addr_lower.startswith('ff'):
return False
return True
def is_usable_proxy(proxy):
"""Validate proxy string format and reject unusable addresses.
Accepts formats:
- ip:port (IPv4)
- [ipv6]:port (IPv6)
- user:pass@ip:port
- user:pass@[ipv6]:port
Rejects:
- Malformed strings
- Invalid port (0, >65535)
- Private/reserved ranges
Results are cached using LRU eviction to avoid repeated validation.
"""
with _proxy_valid_cache_lock:
if proxy in _proxy_valid_cache:
# Move to end (most recently used) - Python 2 compatible
value = _proxy_valid_cache.pop(proxy)
_proxy_valid_cache[proxy] = value
return value
result = _validate_proxy(proxy)
with _proxy_valid_cache_lock:
# Evict oldest entries if at capacity
while len(_proxy_valid_cache) >= _proxy_valid_cache_max:
_proxy_valid_cache.popitem(last=False)
_proxy_valid_cache[proxy] = result
return result
def _validate_proxy(proxy):
"""Internal validation logic for is_usable_proxy."""
try:
if ':' not in proxy:
return False
# Strip auth credentials if present (user:pass@ip:port -> ip:port)
if '@' in proxy:
proxy = proxy.split('@', 1)[1]
# Check for IPv6 format: [ipv6]:port
if proxy.startswith('['):
match = re.match(r'^\[([^\]]+)\]:(\d+)$', proxy)
if not match:
return False
ipv6_addr, port_str = match.groups()
port = int(port_str)
if not valid_port(port):
return False
return is_valid_ipv6(ipv6_addr)
# IPv4 format: ip:port
ip, port_str = proxy.rsplit(':', 1)
port = int(port_str)
if not valid_port(port):
return False
octets = ip.split('.')
if len(octets) != 4:
return False
A, B, C, D = [int(o) for o in octets]
# Validate octet ranges
if any(o < 0 or o > 255 for o in (A, B, C, D)):
return False
# Reject first octet 0 (0.0.0.0/8 - unspecified/invalid)
if A == 0:
return False
# Reject loopback (127.0.0.0/8)
if A == 127:
return False
# Reject private 10.0.0.0/8
if A == 10:
return False
# Reject private 172.16.0.0/12
if A == 172 and 16 <= B <= 31:
return False
# Reject private 192.168.0.0/16
if A == 192 and B == 168:
return False
# Reject link-local 169.254.0.0/16
if A == 169 and B == 254:
return False
# Reject CGNAT 100.64.0.0/10 (100.64.0.0 - 100.127.255.255)
if A == 100 and 64 <= B <= 127:
return False
# Reject multicast 224.0.0.0/4 (224-239.x.x.x)
if 224 <= A <= 239:
return False
# Reject reserved/future 240.0.0.0/4 (240-255.x.x.x)
if A >= 240:
return False
return True
except (ValueError, AttributeError, IndexError):
return False
_known_proxies = {}
_known_proxies_lock = threading.Lock()
def init_known_proxies(proxydb):
"""Initialize known proxies cache from database."""
global _known_proxies
with _known_proxies_lock:
if _known_proxies:
return
known = proxydb.execute('SELECT proxy FROM proxylist').fetchall()
for k in known:
_known_proxies[k[0]] = True
def add_known_proxies(proxies):
"""Add proxies to known cache."""
global _known_proxies
with _known_proxies_lock:
for p in proxies:
_known_proxies[p] = True
def is_known_proxy(proxy):
"""Check if proxy is in known cache."""
with _known_proxies_lock:
return proxy in _known_proxies
def detect_proto_from_path(url):
"""Detect proxy protocol from URL path.
Many proxy lists indicate protocol in their path:
- /socks5/, /socks5.txt, socks5-proxies.txt -> socks5
- /socks4/, /socks4a/, /socks4.txt -> socks4
- /http/, /http.txt, http-proxies.txt -> http
- /https/, /ssl/ -> http (HTTPS proxies use HTTP CONNECT)
Args:
url: Source URL path or full URL
Returns:
Protocol string ('http', 'socks4', 'socks5') or None if not detected
"""
url_lower = url.lower()
# Check for socks5 indicators
if 'socks5' in url_lower:
return 'socks5'
# Check for socks4/socks4a indicators
if 'socks4' in url_lower:
return 'socks4'
# Check for http/https/ssl/connect indicators
if any(x in url_lower for x in ('/http', 'http-', 'http_', 'http.', '/https', '/ssl', '/connect')):
return 'http'
return None
def _normalize_proxy_addr(addr):
"""Normalize proxy address, handling auth and IPv6 formats.
Formats:
- ip:port
- user:pass@ip:port
- [ipv6]:port
- user:pass@[ipv6]:port
Returns normalized address or None if invalid.
"""
auth_prefix = ''
if '@' in addr:
auth_prefix, addr = addr.rsplit('@', 1)
auth_prefix += '@'
if ':' not in addr:
return None
# IPv6 format: [ipv6]:port
if addr.startswith('['):
match = re.match(r'^\[([^\]]+)\]:(\d+)$', addr)
if not match:
return None
ipv6, port = match.groups()
try:
port = int(port.lstrip('0') or '0')
except ValueError:
return None
return '%s[%s]:%d' % (auth_prefix, ipv6, port)
# IPv4 format: ip:port
ip, port = addr.rsplit(':', 1)
try:
ip = '.'.join(str(int(o)) for o in ip.split('.'))
port = int(port.lstrip('0') or '0')
except (ValueError, AttributeError):
return None
return '%s%s:%d' % (auth_prefix, ip, port)
def extract_proxies(content, proxydb=None, filter_known=True, proto=None):
"""Extract and normalize proxy addresses from content.
Uses multiple extraction methods (in priority order):
1. Authenticated proxy patterns (user:pass@ip:port)
2. JSON parsing for API responses
3. HTML table parsing with IP/Port/Protocol columns
4. Protocol hints from surrounding text
5. Regex extraction for raw IP:PORT patterns
6. IPv6 regex extraction
Args:
content: HTML/text content to parse
proxydb: Database connection for known proxy lookup (optional)
filter_known: If True, filter out known proxies and return new only
proto: Protocol from source URL (fallback if not detected)
Returns:
If filter_known: (unique_count, new_proxies) tuple
new_proxies is list of (address, proto, confidence) tuples
If not filter_known: list of (address, proto, confidence) tuples
"""
# Dict: address -> (protocol, confidence)
# Higher confidence wins; explicit proto upgrades confidence
found = {}
# 1. Extract authenticated proxies first (highest confidence)
auth_proxies = extract_auth_proxies(content)
for addr, detected_proto in auth_proxies:
if is_usable_proxy(addr):
addr = _normalize_proxy_addr(addr)
if addr:
conf = CONFIDENCE_AUTH
if detected_proto:
conf += CONFIDENCE_PROTO_EXPLICIT
if addr not in found or conf > found[addr][1]:
found[addr] = (detected_proto, conf)
# 2. Try JSON extraction (reliable for protocol info)
json_proxies = extract_proxies_from_json(content)
for addr, detected_proto in json_proxies:
if is_usable_proxy(addr):
addr = _normalize_proxy_addr(addr)
if addr:
conf = CONFIDENCE_JSON
if detected_proto:
conf += CONFIDENCE_PROTO_EXPLICIT
if addr not in found or conf > found[addr][1]:
found[addr] = (detected_proto, conf)
# 3. Try HTML table extraction (structured data with protocol columns)
table_proxies = extract_proxies_from_table(content)
for addr, detected_proto in table_proxies:
if is_usable_proxy(addr):
addr = _normalize_proxy_addr(addr)
if addr:
conf = CONFIDENCE_TABLE
if detected_proto:
conf += CONFIDENCE_PROTO_EXPLICIT
if addr not in found or conf > found[addr][1]:
found[addr] = (detected_proto, conf)
# 4. Get protocol hints from content
hints = extract_proxies_with_hints(content)
# 5. Regex extraction for remaining IPv4 proxies (no auth)
matches = PROXY_PATTERN.findall(cleanhtml(content))
for p in matches:
ip, port = p.split(':')
# Normalize IP (remove leading zeros from octets)
ip = '.'.join(str(int(octet)) for octet in ip.split('.'))
# Normalize port (remove leading zeros, handle empty case)
port = int(port.lstrip('0') or '0')
addr = '%s:%d' % (ip, port)
if not is_usable_proxy(addr):
continue
if addr not in found:
# Check for protocol hint
detected_proto = hints.get(p) or hints.get(addr)
if detected_proto:
conf = CONFIDENCE_HINT + CONFIDENCE_PROTO_EXPLICIT
else:
conf = CONFIDENCE_REGEX
found[addr] = (detected_proto, conf)
# 6. Regex extraction for IPv6 proxies [ipv6]:port
for match in IPV6_PROXY_PATTERN.finditer(content):
ipv6, port = match.groups()
port = int(port)
if not is_valid_ipv6(ipv6):
continue
if not valid_port(port):
continue
addr = '[%s]:%d' % (ipv6, port)
if addr not in found:
found[addr] = (None, CONFIDENCE_REGEX)
# Build result list with protocol and confidence
# Protocol priority: detected > URL-based > None
uniques = []
for addr in found:
detected_proto, conf = found[addr]
final_proto = detected_proto if detected_proto else proto
# Add URL proto bonus if proto was inferred from path
if not detected_proto and proto:
conf += CONFIDENCE_PROTO_INFERRED
uniques.append((addr, final_proto, conf))
if not filter_known:
return uniques
# Initialize known proxies from DB if needed
if proxydb is not None:
init_known_proxies(proxydb)
new = []
for p, pr, conf in uniques:
if not is_known_proxy(p):
new.append((p, pr, conf))
add_known_proxies([p])
return len(uniques), new