diff --git a/dbs.py b/dbs.py
index b4878d4..bb72b9e 100644
--- a/dbs.py
+++ b/dbs.py
@@ -56,6 +56,16 @@ def _migrate_geolocation_columns(sqlite):
sqlite.commit()
+def _migrate_confidence_column(sqlite):
+ """Add confidence column for extraction quality scoring."""
+ try:
+ sqlite.execute('SELECT confidence FROM proxylist LIMIT 1')
+ except Exception:
+ # confidence: 0-100 score indicating extraction reliability
+ sqlite.execute('ALTER TABLE proxylist ADD COLUMN confidence INT DEFAULT 30')
+ sqlite.commit()
+
+
def compute_proxy_list_hash(proxies):
"""Compute MD5 hash of sorted proxy list for change detection.
@@ -279,12 +289,14 @@ def create_table_if_not_exists(sqlite, dbname):
exit_ip TEXT,
asn INT,
latitude REAL,
- longitude REAL)""")
+ longitude REAL,
+ confidence INT DEFAULT 30)""")
# Migration: add columns to existing databases (must run before creating indexes)
_migrate_latency_columns(sqlite)
_migrate_anonymity_columns(sqlite)
_migrate_asn_column(sqlite)
_migrate_geolocation_columns(sqlite)
+ _migrate_confidence_column(sqlite)
# Indexes for common query patterns
sqlite.execute('CREATE INDEX IF NOT EXISTS idx_proxylist_failed ON proxylist(failed)')
sqlite.execute('CREATE INDEX IF NOT EXISTS idx_proxylist_tested ON proxylist(tested)')
@@ -359,7 +371,10 @@ def insert_proxies(proxydb, proxies, url):
Args:
proxydb: Database connection
- proxies: List of (address, proto) tuples or plain address strings
+ proxies: List of tuples or plain address strings
+ - (address, proto) - 2-tuple, default confidence
+ - (address, proto, confidence) - 3-tuple with score
+ - address string - default proto and confidence
url: Source URL for logging
"""
if not proxies:
@@ -367,17 +382,36 @@ def insert_proxies(proxydb, proxies, url):
timestamp = int(time.time())
rows = []
for p in proxies:
- # Handle both tuple (address, proto) and plain string formats
+ # Handle tuple (address, proto[, confidence]) and plain string formats
+ confidence = 30 # Default confidence (CONFIDENCE_REGEX)
if isinstance(p, tuple):
- addr, proto = p
+ if len(p) >= 3:
+ addr, proto, confidence = p[0], p[1], p[2]
+ else:
+ addr, proto = p[0], p[1]
else:
addr, proto = p, None
- ip, port = addr.split(':')
- rows.append((timestamp, addr, ip, port, proto, 1, 0, 0, 0, 0, 0))
+
+ # Parse address into ip and port
+ # Formats: ip:port, [ipv6]:port, user:pass@ip:port, user:pass@[ipv6]:port
+ addr_part = addr.split('@')[-1] # Strip auth if present
+
+ if addr_part.startswith('['):
+ # IPv6: [ipv6]:port
+ bracket_end = addr_part.find(']')
+ if bracket_end < 0:
+ continue
+ ip = addr_part[:bracket_end + 1] # Include brackets
+ port = addr_part[bracket_end + 2:] # Skip ]:
+ else:
+ # IPv4: ip:port
+ ip, port = addr_part.rsplit(':', 1)
+
+ rows.append((timestamp, addr, ip, port, proto, 1, 0, 0, 0, 0, 0, confidence))
proxydb.executemany(
'INSERT OR IGNORE INTO proxylist '
- '(added,proxy,ip,port,proto,failed,tested,success_count,total_duration,mitm,consecutive_success) '
- 'VALUES (?,?,?,?,?,?,?,?,?,?,?)',
+ '(added,proxy,ip,port,proto,failed,tested,success_count,total_duration,mitm,consecutive_success,confidence) '
+ 'VALUES (?,?,?,?,?,?,?,?,?,?,?,?)',
rows
)
proxydb.commit()
diff --git a/fetch.py b/fetch.py
index 891481f..de5ba34 100644
--- a/fetch.py
+++ b/fetch.py
@@ -1,4 +1,5 @@
import re, random, time, string
+import json
import threading
import rocksock
import network_stats
@@ -134,6 +135,339 @@ cleanhtml_re = [
# Proxy extraction pattern: IP:PORT followed by non-digit or end
# Pattern: 1-3 digits, dot, repeated 3 times, colon, 2-5 digit port
PROXY_PATTERN = re.compile(r'([0-9]+(?:\.[0-9]+){3}:[0-9]{2,5})[\D$]')
+
+# IPv6 proxy pattern: [ipv6]:port
+# IPv6 can contain hex digits and colons, enclosed in brackets for URL format
+IPV6_PROXY_PATTERN = re.compile(
+ r'\[([0-9a-fA-F:]+)\]:([0-9]{2,5})'
+)
+
+# Auth proxy pattern: user:pass@IP:PORT or proto://user:pass@IP:PORT
+# Captures: (proto, user, pass, ip, port)
+AUTH_PROXY_PATTERN = re.compile(
+ r'(?:(socks5|socks4a?|https?|http|ssl|tor)://)?' # optional protocol
+ r'([a-zA-Z0-9._-]+):([a-zA-Z0-9._-]+)@' # user:pass@
+ r'([0-9]+(?:\.[0-9]+){3}):([0-9]{2,5})', # ip:port
+ re.IGNORECASE
+)
+
+# IPv6 auth pattern: user:pass@[ipv6]:port
+AUTH_IPV6_PATTERN = re.compile(
+ r'(?:(socks5|socks4a?|https?|http|ssl|tor)://)?' # optional protocol
+ r'([a-zA-Z0-9._-]+):([a-zA-Z0-9._-]+)@' # user:pass@
+ r'\[([0-9a-fA-F:]+)\]:([0-9]{2,5})', # [ipv6]:port
+ re.IGNORECASE
+)
+
+# Protocol hint patterns - look for protocol keywords near IP:PORT
+PROTO_HINT_PATTERN = re.compile(
+ r'(socks5|socks4a?|https?|connect|ssl|tor)\s*[:\-_\s]*'
+ r'([0-9]+(?:\.[0-9]+){3}:[0-9]{2,5})',
+ re.IGNORECASE
+)
+PROTO_HINT_REVERSE = re.compile(
+ r'([0-9]+(?:\.[0-9]+){3}:[0-9]{2,5})\s*[:\-_\|,\s]*'
+ r'(socks5|socks4a?|https?|http|connect|ssl|tor)',
+ re.IGNORECASE
+)
+
+# JSON field names commonly used for proxy data
+JSON_IP_FIELDS = ('ip', 'host', 'address', 'addr', 'server', 'proxy_address')
+JSON_PORT_FIELDS = ('port', 'proxy_port')
+JSON_PROTO_FIELDS = ('type', 'protocol', 'proto', 'scheme', 'proxy_type')
+JSON_USER_FIELDS = ('user', 'username', 'login', 'usr')
+JSON_PASS_FIELDS = ('pass', 'password', 'pwd', 'passwd')
+
+# Confidence scoring for extraction methods
+# Higher scores indicate more reliable extraction
+CONFIDENCE_AUTH = 90 # Authenticated proxy (usually paid sources)
+CONFIDENCE_JSON = 80 # JSON API with structured fields
+CONFIDENCE_TABLE = 70 # HTML table with columns
+CONFIDENCE_HINT = 60 # Protocol hint in surrounding text
+CONFIDENCE_URL_PROTO = 50 # Protocol inferred from URL path
+CONFIDENCE_REGEX = 30 # Raw regex extraction
+
+# Bonus for protocol detection
+CONFIDENCE_PROTO_EXPLICIT = 15 # Protocol explicitly stated
+CONFIDENCE_PROTO_INFERRED = 5 # Protocol from URL path
+
+
+def _normalize_proto(proto_str):
+ """Normalize protocol string to standard form."""
+ if not proto_str:
+ return None
+ p = proto_str.lower().strip()
+ if p in ('socks5', 's5', 'tor'):
+ return 'socks5'
+ if p in ('socks4', 'socks4a', 's4'):
+ return 'socks4'
+ if p in ('http', 'https', 'connect', 'ssl'):
+ return 'http'
+ return None
+
+
+def extract_auth_proxies(content):
+ """Extract authenticated proxies from content.
+
+ Matches patterns like:
+ - user:pass@1.2.3.4:8080
+ - socks5://user:pass@1.2.3.4:8080
+ - http://user:pass@1.2.3.4:8080
+ - user:pass@[2001:db8::1]:8080 (IPv6)
+
+ Returns:
+ List of (address, proto) tuples where address is user:pass@ip:port
+ """
+ proxies = []
+
+ # IPv4 auth proxies
+ for match in AUTH_PROXY_PATTERN.finditer(content):
+ proto_str, user, passwd, ip, port = match.groups()
+ proto = _normalize_proto(proto_str)
+
+ # Normalize IP (remove leading zeros)
+ ip = '.'.join(str(int(o)) for o in ip.split('.'))
+ port = int(port)
+
+ # Build address with auth
+ addr = '%s:%s@%s:%d' % (user, passwd, ip, port)
+ proxies.append((addr, proto))
+
+ # IPv6 auth proxies
+ for match in AUTH_IPV6_PATTERN.finditer(content):
+ proto_str, user, passwd, ipv6, port = match.groups()
+ proto = _normalize_proto(proto_str)
+ port = int(port)
+
+ if not is_valid_ipv6(ipv6):
+ continue
+
+ # Build address with auth and bracketed IPv6
+ addr = '%s:%s@[%s]:%d' % (user, passwd, ipv6, port)
+ proxies.append((addr, proto))
+
+ return proxies
+
+
+# Table column header patterns for identifying proxy data columns
+TABLE_IP_HEADERS = ('ip', 'address', 'host', 'server', 'proxy')
+TABLE_PORT_HEADERS = ('port',)
+TABLE_PROTO_HEADERS = ('type', 'protocol', 'proto', 'scheme')
+
+
+def extract_proxies_from_table(content):
+ """Extract proxies from HTML tables with IP/Port/Protocol columns.
+
+ Handles tables like:
+ | IP Address | Port | Type |
+ |------------|------|---------|
+ | 1.2.3.4 | 8080 | SOCKS5 |
+
+ Returns:
+ List of (address, proto) tuples
+ """
+ proxies = []
+
+ # Simple regex-based table parsing (works without BeautifulSoup)
+ # Find all tables
+ table_pattern = re.compile(r'
', re.IGNORECASE | re.DOTALL)
+ row_pattern = re.compile(r']*>(.*?)
', re.IGNORECASE | re.DOTALL)
+ cell_pattern = re.compile(r']*>(.*?)', re.IGNORECASE | re.DOTALL)
+ tag_strip = re.compile(r'<[^>]+>')
+
+ for table_match in table_pattern.finditer(content):
+ table_html = table_match.group(1)
+ rows = row_pattern.findall(table_html)
+ if not rows:
+ continue
+
+ # Parse header row to find column indices
+ ip_col = port_col = proto_col = -1
+ header_row = rows[0]
+ headers = cell_pattern.findall(header_row)
+
+ for i, cell in enumerate(headers):
+ cell_text = tag_strip.sub('', cell).strip().lower()
+ if ip_col < 0 and any(h in cell_text for h in TABLE_IP_HEADERS):
+ ip_col = i
+ elif port_col < 0 and any(h in cell_text for h in TABLE_PORT_HEADERS):
+ port_col = i
+ elif proto_col < 0 and any(h in cell_text for h in TABLE_PROTO_HEADERS):
+ proto_col = i
+
+ # Need at least IP column (port might be in same cell)
+ if ip_col < 0:
+ continue
+
+ # Parse data rows
+ for row in rows[1:]:
+ cells = cell_pattern.findall(row)
+ if len(cells) <= ip_col:
+ continue
+
+ ip_cell = tag_strip.sub('', cells[ip_col]).strip()
+
+ # Check if IP cell contains port (ip:port format)
+ if ':' in ip_cell and port_col < 0:
+ match = re.match(r'([0-9]+\.[0-9]+\.[0-9]+\.[0-9]+):([0-9]+)', ip_cell)
+ if match:
+ ip, port = match.groups()
+ proto = None
+ if proto_col >= 0 and len(cells) > proto_col:
+ proto = _normalize_proto(tag_strip.sub('', cells[proto_col]).strip())
+ addr = '%s:%s' % (ip, port)
+ if is_usable_proxy(addr):
+ proxies.append((addr, proto))
+ continue
+
+ # Separate IP and Port columns
+ if port_col >= 0 and len(cells) > port_col:
+ port_cell = tag_strip.sub('', cells[port_col]).strip()
+ try:
+ port = int(port_cell)
+ except ValueError:
+ continue
+
+ # Validate IP format
+ if not re.match(r'^[0-9]+\.[0-9]+\.[0-9]+\.[0-9]+$', ip_cell):
+ continue
+
+ proto = None
+ if proto_col >= 0 and len(cells) > proto_col:
+ proto = _normalize_proto(tag_strip.sub('', cells[proto_col]).strip())
+
+ addr = '%s:%d' % (ip_cell, port)
+ if is_usable_proxy(addr):
+ proxies.append((addr, proto))
+
+ return proxies
+
+
+def extract_proxies_from_json(content):
+ """Extract proxies from JSON content.
+
+ Handles common JSON formats:
+ - Array of objects: [{"ip": "1.2.3.4", "port": 8080}, ...]
+ - Array of strings: ["1.2.3.4:8080", ...]
+ - Object with data array: {"data": [...], "proxies": [...]}
+ - Nested structures with ip/host/port/protocol fields
+
+ Returns:
+ List of (address, proto) tuples
+ """
+ proxies = []
+
+ # Try to find JSON in content (may be embedded in HTML)
+ json_matches = []
+
+ # Look for JSON arrays
+ for match in re.finditer(r'\[[\s\S]*?\]', content):
+ json_matches.append(match.group())
+
+ # Look for JSON objects
+ for match in re.finditer(r'\{[\s\S]*?\}', content):
+ json_matches.append(match.group())
+
+ for json_str in json_matches:
+ try:
+ data = json.loads(json_str)
+ proxies.extend(_extract_from_json_data(data))
+ except (ValueError, TypeError):
+ continue
+
+ return proxies
+
+
+def _extract_from_json_data(data, parent_proto=None):
+ """Recursively extract proxies from parsed JSON data.
+
+ Returns list of (address, proto) tuples where address may include
+ auth credentials as user:pass@ip:port.
+ """
+ proxies = []
+
+ if isinstance(data, list):
+ for item in data:
+ if isinstance(item, dict):
+ proxies.extend(_extract_from_json_data(item, parent_proto))
+ elif isinstance(item, basestring):
+ # Try to parse as IP:PORT or user:pass@IP:PORT string
+ item = item.strip()
+ if re.match(r'^[0-9]+\.[0-9]+\.[0-9]+\.[0-9]+:[0-9]+$', item):
+ proxies.append((item, parent_proto))
+ elif re.match(r'^[^:]+:[^@]+@[0-9]+\.[0-9]+\.[0-9]+\.[0-9]+:[0-9]+$', item):
+ proxies.append((item, parent_proto))
+
+ elif isinstance(data, dict):
+ # Look for ip/port/user/pass fields
+ ip = None
+ port = None
+ proto = parent_proto
+ user = None
+ passwd = None
+
+ for key, value in data.items():
+ key_lower = key.lower()
+ if key_lower in JSON_IP_FIELDS and isinstance(value, basestring):
+ ip = value.strip()
+ elif key_lower in JSON_PORT_FIELDS:
+ try:
+ port = int(value)
+ except (ValueError, TypeError):
+ pass
+ elif key_lower in JSON_PROTO_FIELDS and isinstance(value, basestring):
+ proto = _normalize_proto(value)
+ elif key_lower in JSON_USER_FIELDS and isinstance(value, basestring):
+ user = value.strip()
+ elif key_lower in JSON_PASS_FIELDS and isinstance(value, basestring):
+ passwd = value.strip()
+
+ if ip and port:
+ if user and passwd:
+ addr = '%s:%s@%s:%d' % (user, passwd, ip, port)
+ else:
+ addr = '%s:%d' % (ip, port)
+ proxies.append((addr, proto))
+
+ # Check for nested arrays (data, proxies, list, items, etc.)
+ for key, value in data.items():
+ if isinstance(value, (list, dict)):
+ proxies.extend(_extract_from_json_data(value, proto))
+
+ return proxies
+
+
+def extract_proxies_with_hints(content):
+ """Extract proxies with protocol hints from surrounding context.
+
+ Looks for patterns like:
+ - "socks5 1.2.3.4:8080"
+ - "1.2.3.4:8080 (http)"
+ - "SOCKS5: 1.2.3.4:8080"
+ - Table rows with protocol in adjacent column
+
+ Returns:
+ Dict mapping address -> proto (or None if no hint)
+ """
+ hints = {}
+
+ # Pattern: protocol before IP:PORT
+ for match in PROTO_HINT_PATTERN.finditer(content):
+ proto = _normalize_proto(match.group(1))
+ addr = match.group(2)
+ if proto:
+ hints[addr] = proto
+
+ # Pattern: IP:PORT before protocol
+ for match in PROTO_HINT_REVERSE.finditer(content):
+ addr = match.group(1)
+ proto = _normalize_proto(match.group(2))
+ if proto and addr not in hints:
+ hints[addr] = proto
+
+ return hints
+
+
def cleanhtml(raw_html):
html = raw_html.replace(' ', ' ')
html = re.sub(cleanhtml_re[0], ':', html)
@@ -219,25 +553,90 @@ def valid_port(port):
return port >= 1 and port <= 65535
+def is_valid_ipv6(addr):
+ """Validate IPv6 address format.
+
+ Rejects:
+ - Malformed addresses
+ - Loopback (::1)
+ - Link-local (fe80::/10)
+ - Unique local (fc00::/7)
+ - Multicast (ff00::/8)
+ - Unspecified (::)
+ """
+ # Basic format check - must contain colons, only hex digits and colons
+ if not re.match(r'^[0-9a-fA-F:]+$', addr):
+ return False
+
+ # Check for valid segment count (2-8 segments, :: expands to fill)
+ if '::' in addr:
+ if addr.count('::') > 1:
+ return False
+ else:
+ if addr.count(':') != 7:
+ return False
+
+ # Reject special addresses
+ addr_lower = addr.lower()
+
+ # Loopback ::1
+ if addr_lower in ('::1', '0:0:0:0:0:0:0:1'):
+ return False
+
+ # Unspecified ::
+ if addr_lower in ('::', '0:0:0:0:0:0:0:0'):
+ return False
+
+ # Link-local fe80::/10
+ if addr_lower.startswith('fe8') or addr_lower.startswith('fe9') or \
+ addr_lower.startswith('fea') or addr_lower.startswith('feb'):
+ return False
+
+ # Unique local fc00::/7 (fc00:: - fdff::)
+ if addr_lower.startswith('fc') or addr_lower.startswith('fd'):
+ return False
+
+ # Multicast ff00::/8
+ if addr_lower.startswith('ff'):
+ return False
+
+ return True
+
+
def is_usable_proxy(proxy):
"""Validate proxy string format and reject unusable addresses.
+ Accepts formats:
+ - ip:port (IPv4)
+ - [ipv6]:port (IPv6)
+ - user:pass@ip:port
+ - user:pass@[ipv6]:port
+
Rejects:
- - Malformed strings (not ip:port format)
+ - Malformed strings
- Invalid port (0, >65535)
- - Invalid IP octets (>255)
- - Private ranges: 10.0.0.0/8, 172.16.0.0/12, 192.168.0.0/16
- - Loopback: 127.0.0.0/8
- - Link-local: 169.254.0.0/16
- - CGNAT: 100.64.0.0/10
- - Multicast: 224.0.0.0/4
- - Reserved: 240.0.0.0/4
- - Unspecified: 0.0.0.0
+ - Private/reserved ranges
"""
try:
if ':' not in proxy:
return False
+ # Strip auth credentials if present (user:pass@ip:port -> ip:port)
+ if '@' in proxy:
+ proxy = proxy.split('@', 1)[1]
+
+ # Check for IPv6 format: [ipv6]:port
+ if proxy.startswith('['):
+ match = re.match(r'^\[([^\]]+)\]:(\d+)$', proxy)
+ if not match:
+ return False
+ ipv6_addr, port_str = match.groups()
+ port = int(port_str)
+ if not valid_port(port):
+ return False
+ return is_valid_ipv6(ipv6_addr)
+
+ # IPv4 format: ip:port
ip, port_str = proxy.rsplit(':', 1)
port = int(port_str)
@@ -348,33 +747,160 @@ def detect_proto_from_path(url):
return None
+def _normalize_proxy_addr(addr):
+ """Normalize proxy address, handling auth and IPv6 formats.
+
+ Formats:
+ - ip:port
+ - user:pass@ip:port
+ - [ipv6]:port
+ - user:pass@[ipv6]:port
+
+ Returns normalized address or None if invalid.
+ """
+ auth_prefix = ''
+ if '@' in addr:
+ auth_prefix, addr = addr.rsplit('@', 1)
+ auth_prefix += '@'
+
+ if ':' not in addr:
+ return None
+
+ # IPv6 format: [ipv6]:port
+ if addr.startswith('['):
+ match = re.match(r'^\[([^\]]+)\]:(\d+)$', addr)
+ if not match:
+ return None
+ ipv6, port = match.groups()
+ try:
+ port = int(port.lstrip('0') or '0')
+ except ValueError:
+ return None
+ return '%s[%s]:%d' % (auth_prefix, ipv6, port)
+
+ # IPv4 format: ip:port
+ ip, port = addr.rsplit(':', 1)
+ try:
+ ip = '.'.join(str(int(o)) for o in ip.split('.'))
+ port = int(port.lstrip('0') or '0')
+ except (ValueError, AttributeError):
+ return None
+
+ return '%s%s:%d' % (auth_prefix, ip, port)
+
+
def extract_proxies(content, proxydb=None, filter_known=True, proto=None):
"""Extract and normalize proxy addresses from content.
+ Uses multiple extraction methods (in priority order):
+ 1. Authenticated proxy patterns (user:pass@ip:port)
+ 2. JSON parsing for API responses
+ 3. HTML table parsing with IP/Port/Protocol columns
+ 4. Protocol hints from surrounding text
+ 5. Regex extraction for raw IP:PORT patterns
+ 6. IPv6 regex extraction
+
Args:
content: HTML/text content to parse
proxydb: Database connection for known proxy lookup (optional)
filter_known: If True, filter out known proxies and return new only
- proto: Protocol to assign to all extracted proxies (from source URL)
+ proto: Protocol from source URL (fallback if not detected)
Returns:
If filter_known: (unique_count, new_proxies) tuple
- new_proxies is list of (address, proto) tuples
- If not filter_known: list of (address, proto) tuples
+ new_proxies is list of (address, proto, confidence) tuples
+ If not filter_known: list of (address, proto, confidence) tuples
"""
- matches = PROXY_PATTERN.findall(cleanhtml(content))
+ # Dict: address -> (protocol, confidence)
+ # Higher confidence wins; explicit proto upgrades confidence
+ found = {}
- uniques_dict = {}
+ # 1. Extract authenticated proxies first (highest confidence)
+ auth_proxies = extract_auth_proxies(content)
+ for addr, detected_proto in auth_proxies:
+ if is_usable_proxy(addr):
+ addr = _normalize_proxy_addr(addr)
+ if addr:
+ conf = CONFIDENCE_AUTH
+ if detected_proto:
+ conf += CONFIDENCE_PROTO_EXPLICIT
+ if addr not in found or conf > found[addr][1]:
+ found[addr] = (detected_proto, conf)
+
+ # 2. Try JSON extraction (reliable for protocol info)
+ json_proxies = extract_proxies_from_json(content)
+ for addr, detected_proto in json_proxies:
+ if is_usable_proxy(addr):
+ addr = _normalize_proxy_addr(addr)
+ if addr:
+ conf = CONFIDENCE_JSON
+ if detected_proto:
+ conf += CONFIDENCE_PROTO_EXPLICIT
+ if addr not in found or conf > found[addr][1]:
+ found[addr] = (detected_proto, conf)
+
+ # 3. Try HTML table extraction (structured data with protocol columns)
+ table_proxies = extract_proxies_from_table(content)
+ for addr, detected_proto in table_proxies:
+ if is_usable_proxy(addr):
+ addr = _normalize_proxy_addr(addr)
+ if addr:
+ conf = CONFIDENCE_TABLE
+ if detected_proto:
+ conf += CONFIDENCE_PROTO_EXPLICIT
+ if addr not in found or conf > found[addr][1]:
+ found[addr] = (detected_proto, conf)
+
+ # 4. Get protocol hints from content
+ hints = extract_proxies_with_hints(content)
+
+ # 5. Regex extraction for remaining IPv4 proxies (no auth)
+ matches = PROXY_PATTERN.findall(cleanhtml(content))
for p in matches:
ip, port = p.split(':')
# Normalize IP (remove leading zeros from octets)
ip = '.'.join(str(int(octet)) for octet in ip.split('.'))
# Normalize port (remove leading zeros, handle empty case)
port = int(port.lstrip('0') or '0')
- p = '%s:%s' % (ip, port)
- uniques_dict[p] = True
+ addr = '%s:%d' % (ip, port)
- uniques = [(p, proto) for p in uniques_dict.keys() if is_usable_proxy(p)]
+ if not is_usable_proxy(addr):
+ continue
+
+ if addr not in found:
+ # Check for protocol hint
+ detected_proto = hints.get(p) or hints.get(addr)
+ if detected_proto:
+ conf = CONFIDENCE_HINT + CONFIDENCE_PROTO_EXPLICIT
+ else:
+ conf = CONFIDENCE_REGEX
+ found[addr] = (detected_proto, conf)
+
+ # 6. Regex extraction for IPv6 proxies [ipv6]:port
+ for match in IPV6_PROXY_PATTERN.finditer(content):
+ ipv6, port = match.groups()
+ port = int(port)
+
+ if not is_valid_ipv6(ipv6):
+ continue
+
+ if not valid_port(port):
+ continue
+
+ addr = '[%s]:%d' % (ipv6, port)
+ if addr not in found:
+ found[addr] = (None, CONFIDENCE_REGEX)
+
+ # Build result list with protocol and confidence
+ # Protocol priority: detected > URL-based > None
+ uniques = []
+ for addr in found:
+ detected_proto, conf = found[addr]
+ final_proto = detected_proto if detected_proto else proto
+ # Add URL proto bonus if proto was inferred from path
+ if not detected_proto and proto:
+ conf += CONFIDENCE_PROTO_INFERRED
+ uniques.append((addr, final_proto, conf))
if not filter_known:
return uniques
@@ -384,9 +910,9 @@ def extract_proxies(content, proxydb=None, filter_known=True, proto=None):
init_known_proxies(proxydb)
new = []
- for p, pr in uniques:
+ for p, pr, conf in uniques:
if not is_known_proxy(p):
- new.append((p, pr))
+ new.append((p, pr, conf))
add_known_proxies([p])
return len(uniques), new