#!/usr/bin/env python2 """MITM certificate detection and tracking for PPF.""" from __future__ import division import threading import time from misc import _log, tor_proxy_url import rocksock class MITMCertStats(object): """Track MITM certificate statistics.""" def __init__(self): self.lock = threading.Lock() self.certs = {} # fingerprint -> cert_info dict self.by_org = {} # organization -> count self.by_issuer = {} # issuer CN -> count self.by_proxy = {} # proxy IP -> list of fingerprints self.total_count = 0 self.recent_certs = [] # last N certificates seen def add_cert(self, proxy_ip, cert_info): """Add a MITM certificate to statistics.""" if not cert_info: return fp = cert_info.get('fingerprint', '') if not fp: return with self.lock: self.total_count += 1 # Store unique certs by fingerprint if fp not in self.certs: self.certs[fp] = cert_info self.certs[fp]['first_seen'] = time.time() self.certs[fp]['count'] = 1 self.certs[fp]['proxies'] = [proxy_ip] else: self.certs[fp]['count'] += 1 self.certs[fp]['last_seen'] = time.time() if proxy_ip not in self.certs[fp]['proxies']: self.certs[fp]['proxies'].append(proxy_ip) # Track by organization org = cert_info.get('subject_o', 'Unknown') self.by_org[org] = self.by_org.get(org, 0) + 1 # Track by issuer issuer = cert_info.get('issuer_cn', 'Unknown') self.by_issuer[issuer] = self.by_issuer.get(issuer, 0) + 1 # Track proxies using this cert if proxy_ip not in self.by_proxy: self.by_proxy[proxy_ip] = [] if fp not in self.by_proxy[proxy_ip]: self.by_proxy[proxy_ip].append(fp) # Keep recent certs (last 50) self.recent_certs.append({ 'fingerprint': fp, 'proxy': proxy_ip, 'subject_cn': cert_info.get('subject_cn', ''), 'issuer_cn': cert_info.get('issuer_cn', ''), 'timestamp': time.time() }) if len(self.recent_certs) > 50: self.recent_certs.pop(0) def get_stats(self): """Get MITM certificate statistics for API.""" with self.lock: # Top organizations top_orgs = sorted(self.by_org.items(), key=lambda x: x[1], reverse=True)[:10] # Top issuers top_issuers = sorted(self.by_issuer.items(), key=lambda x: x[1], reverse=True)[:10] # Unique certs sorted by count unique_certs = [] for fp, info in self.certs.items(): cert_entry = {'fingerprint': fp} cert_entry.update(info) unique_certs.append(cert_entry) unique_certs = sorted(unique_certs, key=lambda x: x.get('count', 0), reverse=True)[:20] return { 'total_detections': self.total_count, 'unique_certs': len(self.certs), 'unique_proxies': len(self.by_proxy), 'top_organizations': [{'name': o, 'count': c} for o, c in top_orgs], 'top_issuers': [{'name': i, 'count': c} for i, c in top_issuers], 'certificates': unique_certs, 'recent': list(self.recent_certs[-20:]) } def save_state(self, filepath): """Save MITM stats to JSON file for persistence.""" import json with self.lock: state = { 'certs': self.certs, 'by_org': self.by_org, 'by_issuer': self.by_issuer, 'by_proxy': self.by_proxy, 'total_count': self.total_count, 'recent_certs': self.recent_certs[-50:], } try: with open(filepath, 'w') as f: json.dump(state, f) except Exception as e: _log('failed to save MITM state: %s' % str(e), 'warn') def load_state(self, filepath): """Load MITM stats from JSON file.""" import json try: with open(filepath, 'r') as f: state = json.load(f) with self.lock: self.certs = state.get('certs', {}) self.by_org = state.get('by_org', {}) self.by_issuer = state.get('by_issuer', {}) self.by_proxy = state.get('by_proxy', {}) self.total_count = state.get('total_count', 0) self.recent_certs = state.get('recent_certs', []) _log('restored MITM state: %d certs, %d detections' % ( len(self.certs), self.total_count), 'info') except IOError: pass # File doesn't exist yet, start fresh except Exception as e: _log('failed to load MITM state: %s' % str(e), 'warn') def extract_cert_info(cert_der): """Extract certificate information from DER-encoded certificate. Args: cert_der: DER-encoded certificate bytes Returns: dict with certificate details or None on failure """ import hashlib try: # Decode DER to get certificate details # Python 2/3 compatible approach using ssl module from OpenSSL import crypto x509 = crypto.load_certificate(crypto.FILETYPE_ASN1, cert_der) subject = x509.get_subject() issuer = x509.get_issuer() # Parse dates (format: YYYYMMDDhhmmssZ) not_before = x509.get_notBefore() not_after = x509.get_notAfter() if isinstance(not_before, bytes): not_before = not_before.decode('ascii') if isinstance(not_after, bytes): not_after = not_after.decode('ascii') # Calculate fingerprint fp = hashlib.sha256(cert_der).hexdigest() return { 'fingerprint': fp[:16], # Short fingerprint for display 'fingerprint_full': fp, 'subject_cn': subject.CN or '', 'subject_o': subject.O or '', 'subject_ou': subject.OU or '', 'subject_c': subject.C or '', 'issuer_cn': issuer.CN or '', 'issuer_o': issuer.O or '', 'serial': str(x509.get_serial_number()), 'not_before': not_before, 'not_after': not_after, 'version': x509.get_version(), 'sig_algo': x509.get_signature_algorithm().decode('ascii') if hasattr(x509.get_signature_algorithm(), 'decode') else str(x509.get_signature_algorithm()), } except ImportError: # Fallback if pyOpenSSL not available - basic info from hashlib import hashlib fp = hashlib.sha256(cert_der).hexdigest() return { 'fingerprint': fp[:16], 'fingerprint_full': fp, 'subject_cn': '(pyOpenSSL not installed)', 'subject_o': '', 'issuer_cn': '', 'issuer_o': '', 'serial': '', 'not_before': '', 'not_after': '', } except Exception: return None def get_mitm_certificate(proxy_ip, proxy_port, proto, torhost, target_host, target_port, timeout, auth=None): """Connect to target through proxy without cert verification to get MITM cert. Args: proxy_ip: Proxy IP address proxy_port: Proxy port proto: Proxy protocol (http, socks4, socks5) torhost: Tor SOCKS5 address target_host: Target host for SSL connection target_port: Target port (usually 443) timeout: Connection timeout auth: Optional auth credentials (user:pass) Returns: dict with certificate info or None on failure """ try: if auth: proxy_url = '%s://%s@%s:%s' % (proto, auth, proxy_ip, proxy_port) else: proxy_url = '%s://%s:%s' % (proto, proxy_ip, proxy_port) proxies = [ rocksock.RocksockProxyFromURL(tor_proxy_url(torhost)), rocksock.RocksockProxyFromURL(proxy_url), ] # Connect without certificate verification sock = rocksock.Rocksock(host=target_host, port=target_port, ssl=True, proxies=proxies, timeout=timeout, verifycert=False) sock.connect() # Get peer certificate cert_der = sock.sock.getpeercert(binary_form=True) sock.disconnect() if cert_der: return extract_cert_info(cert_der) return None except Exception: return None