Files
ppf/mitm.py
Username 0d7d2dce70
All checks were successful
CI / syntax-check (push) Successful in 3s
CI / memory-leak-check (push) Successful in 11s
refactor: extract modules from proxywatchd.py
Extract focused modules to reduce proxywatchd.py complexity:
- stats.py: JudgeStats, Stats, regexes, ssl_targets (557 lines)
- mitm.py: MITMCertStats, cert extraction functions (239 lines)
- dns.py: socks4_resolve with TTL caching (86 lines)
- job.py: PriorityJobQueue, calculate_priority (103 lines)

proxywatchd.py reduced from 2488 to 1591 lines (-36%).
2025-12-28 15:45:24 +01:00

240 lines
8.6 KiB
Python

#!/usr/bin/env python2
"""MITM certificate detection and tracking for PPF."""
from __future__ import division
import threading
import time
from misc import _log, tor_proxy_url
import rocksock
class MITMCertStats(object):
"""Track MITM certificate statistics."""
def __init__(self):
self.lock = threading.Lock()
self.certs = {} # fingerprint -> cert_info dict
self.by_org = {} # organization -> count
self.by_issuer = {} # issuer CN -> count
self.by_proxy = {} # proxy IP -> list of fingerprints
self.total_count = 0
self.recent_certs = [] # last N certificates seen
def add_cert(self, proxy_ip, cert_info):
"""Add a MITM certificate to statistics."""
if not cert_info:
return
fp = cert_info.get('fingerprint', '')
if not fp:
return
with self.lock:
self.total_count += 1
# Store unique certs by fingerprint
if fp not in self.certs:
self.certs[fp] = cert_info
self.certs[fp]['first_seen'] = time.time()
self.certs[fp]['count'] = 1
self.certs[fp]['proxies'] = [proxy_ip]
else:
self.certs[fp]['count'] += 1
self.certs[fp]['last_seen'] = time.time()
if proxy_ip not in self.certs[fp]['proxies']:
self.certs[fp]['proxies'].append(proxy_ip)
# Track by organization
org = cert_info.get('subject_o', 'Unknown')
self.by_org[org] = self.by_org.get(org, 0) + 1
# Track by issuer
issuer = cert_info.get('issuer_cn', 'Unknown')
self.by_issuer[issuer] = self.by_issuer.get(issuer, 0) + 1
# Track proxies using this cert
if proxy_ip not in self.by_proxy:
self.by_proxy[proxy_ip] = []
if fp not in self.by_proxy[proxy_ip]:
self.by_proxy[proxy_ip].append(fp)
# Keep recent certs (last 50)
self.recent_certs.append({
'fingerprint': fp,
'proxy': proxy_ip,
'subject_cn': cert_info.get('subject_cn', ''),
'issuer_cn': cert_info.get('issuer_cn', ''),
'timestamp': time.time()
})
if len(self.recent_certs) > 50:
self.recent_certs.pop(0)
def get_stats(self):
"""Get MITM certificate statistics for API."""
with self.lock:
# Top organizations
top_orgs = sorted(self.by_org.items(), key=lambda x: x[1], reverse=True)[:10]
# Top issuers
top_issuers = sorted(self.by_issuer.items(), key=lambda x: x[1], reverse=True)[:10]
# Unique certs sorted by count
unique_certs = []
for fp, info in self.certs.items():
cert_entry = {'fingerprint': fp}
cert_entry.update(info)
unique_certs.append(cert_entry)
unique_certs = sorted(unique_certs, key=lambda x: x.get('count', 0), reverse=True)[:20]
return {
'total_detections': self.total_count,
'unique_certs': len(self.certs),
'unique_proxies': len(self.by_proxy),
'top_organizations': [{'name': o, 'count': c} for o, c in top_orgs],
'top_issuers': [{'name': i, 'count': c} for i, c in top_issuers],
'certificates': unique_certs,
'recent': list(self.recent_certs[-20:])
}
def save_state(self, filepath):
"""Save MITM stats to JSON file for persistence."""
import json
with self.lock:
state = {
'certs': self.certs,
'by_org': self.by_org,
'by_issuer': self.by_issuer,
'by_proxy': self.by_proxy,
'total_count': self.total_count,
'recent_certs': self.recent_certs[-50:],
}
try:
with open(filepath, 'w') as f:
json.dump(state, f)
except Exception as e:
_log('failed to save MITM state: %s' % str(e), 'warn')
def load_state(self, filepath):
"""Load MITM stats from JSON file."""
import json
try:
with open(filepath, 'r') as f:
state = json.load(f)
with self.lock:
self.certs = state.get('certs', {})
self.by_org = state.get('by_org', {})
self.by_issuer = state.get('by_issuer', {})
self.by_proxy = state.get('by_proxy', {})
self.total_count = state.get('total_count', 0)
self.recent_certs = state.get('recent_certs', [])
_log('restored MITM state: %d certs, %d detections' % (
len(self.certs), self.total_count), 'info')
except IOError:
pass # File doesn't exist yet, start fresh
except Exception as e:
_log('failed to load MITM state: %s' % str(e), 'warn')
def extract_cert_info(cert_der):
"""Extract certificate information from DER-encoded certificate.
Args:
cert_der: DER-encoded certificate bytes
Returns:
dict with certificate details or None on failure
"""
import hashlib
try:
# Decode DER to get certificate details
# Python 2/3 compatible approach using ssl module
from OpenSSL import crypto
x509 = crypto.load_certificate(crypto.FILETYPE_ASN1, cert_der)
subject = x509.get_subject()
issuer = x509.get_issuer()
# Parse dates (format: YYYYMMDDhhmmssZ)
not_before = x509.get_notBefore()
not_after = x509.get_notAfter()
if isinstance(not_before, bytes):
not_before = not_before.decode('ascii')
if isinstance(not_after, bytes):
not_after = not_after.decode('ascii')
# Calculate fingerprint
fp = hashlib.sha256(cert_der).hexdigest()
return {
'fingerprint': fp[:16], # Short fingerprint for display
'fingerprint_full': fp,
'subject_cn': subject.CN or '',
'subject_o': subject.O or '',
'subject_ou': subject.OU or '',
'subject_c': subject.C or '',
'issuer_cn': issuer.CN or '',
'issuer_o': issuer.O or '',
'serial': str(x509.get_serial_number()),
'not_before': not_before,
'not_after': not_after,
'version': x509.get_version(),
'sig_algo': x509.get_signature_algorithm().decode('ascii') if hasattr(x509.get_signature_algorithm(), 'decode') else str(x509.get_signature_algorithm()),
}
except ImportError:
# Fallback if pyOpenSSL not available - basic info from hashlib
import hashlib
fp = hashlib.sha256(cert_der).hexdigest()
return {
'fingerprint': fp[:16],
'fingerprint_full': fp,
'subject_cn': '(pyOpenSSL not installed)',
'subject_o': '',
'issuer_cn': '',
'issuer_o': '',
'serial': '',
'not_before': '',
'not_after': '',
}
except Exception:
return None
def get_mitm_certificate(proxy_ip, proxy_port, proto, torhost, target_host, target_port, timeout, auth=None):
"""Connect to target through proxy without cert verification to get MITM cert.
Args:
proxy_ip: Proxy IP address
proxy_port: Proxy port
proto: Proxy protocol (http, socks4, socks5)
torhost: Tor SOCKS5 address
target_host: Target host for SSL connection
target_port: Target port (usually 443)
timeout: Connection timeout
auth: Optional auth credentials (user:pass)
Returns:
dict with certificate info or None on failure
"""
try:
if auth:
proxy_url = '%s://%s@%s:%s' % (proto, auth, proxy_ip, proxy_port)
else:
proxy_url = '%s://%s:%s' % (proto, proxy_ip, proxy_port)
proxies = [
rocksock.RocksockProxyFromURL(tor_proxy_url(torhost)),
rocksock.RocksockProxyFromURL(proxy_url),
]
# Connect without certificate verification
sock = rocksock.Rocksock(host=target_host, port=target_port, ssl=True,
proxies=proxies, timeout=timeout, verifycert=False)
sock.connect()
# Get peer certificate
cert_der = sock.sock.getpeercert(binary_form=True)
sock.disconnect()
if cert_der:
return extract_cert_info(cert_der)
return None
except Exception:
return None