#!/usr/bin/env python2 # -*- coding: utf-8 -*- """Export working proxies to various formats. Usage: python export.py # Export all working proxies (txt) python export.py --format json # Export as JSON python export.py --format csv # Export as CSV python export.py --proto socks5 # Filter by protocol python export.py --country US,DE # Filter by country codes python export.py --limit 100 # Limit output count python export.py --sort latency # Sort by latency (fastest first) python export.py --anonymity elite # Filter by anonymity level python export.py --max-latency 1000 # Max latency in ms python export.py --include-failed # Include failed proxies too """ from __future__ import print_function import argparse import json import sys import os # Add parent directory to path for imports sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) import mysqlite def get_proxies(db_path, filters=None): """Query proxies from database with optional filters. Args: db_path: Path to SQLite database filters: Dict with optional keys: - proto: Protocol filter (socks4, socks5, http) - country: Country code(s) to include - anonymity: Anonymity level (transparent, anonymous, elite) - max_latency: Maximum latency in milliseconds - include_failed: Include proxies with failed > 0 - sort: Sort order (latency, added, tested) - limit: Maximum number of results Returns: List of proxy dicts """ if filters is None: filters = {} db = mysqlite.mysqlite(db_path) # Build query conditions = [] params = [] # By default, only working proxies if not filters.get('include_failed'): conditions.append('failed = 0') # Protocol filter if filters.get('proto'): conditions.append('proto = ?') params.append(filters['proto']) # Country filter (can be comma-separated) if filters.get('country'): countries = [c.strip().upper() for c in filters['country'].split(',')] placeholders = ','.join('?' * len(countries)) conditions.append('country IN (%s)' % placeholders) params.extend(countries) # Anonymity filter if filters.get('anonymity'): conditions.append('anonymity = ?') params.append(filters['anonymity']) # Max latency filter if filters.get('max_latency'): conditions.append('avg_latency > 0 AND avg_latency <= ?') params.append(filters['max_latency']) # Build WHERE clause where = ' AND '.join(conditions) if conditions else '1=1' # Sort order sort_map = { 'latency': 'avg_latency ASC', 'added': 'added DESC', 'tested': 'tested DESC', 'success': 'success_count DESC', } order = sort_map.get(filters.get('sort'), 'added DESC') # Limit limit = '' if filters.get('limit'): limit = ' LIMIT %d' % int(filters['limit']) query = ''' SELECT proto, ip, port, country, avg_latency, anonymity, success_count, failed, added, tested, asn FROM proxylist WHERE %s ORDER BY %s%s ''' % (where, order, limit) rows = db.execute(query, tuple(params) if params else None).fetchall() proxies = [] for row in rows: proto, ip, port, country, latency, anonymity, success, failed, added, tested, asn = row proxies.append({ 'proto': proto or 'http', 'ip': ip, 'port': port, 'address': '%s:%d' % (ip, port) if ip and port else None, 'country': country, 'latency_ms': round(latency, 1) if latency else None, 'anonymity': anonymity, 'success_count': success or 0, 'failed': failed or 0, 'added': added, 'tested': tested, 'asn': asn, }) return proxies def format_txt(proxies, include_proto=True): """Format proxies as plain text, one per line.""" lines = [] for p in proxies: if not p['address']: continue if include_proto: lines.append('%s://%s' % (p['proto'], p['address'])) else: lines.append(p['address']) return '\n'.join(lines) def format_json(proxies, pretty=False): """Format proxies as JSON array.""" # Clean up for JSON output output = [] for p in proxies: if not p['address']: continue output.append({ 'proto': p['proto'], 'address': p['address'], 'country': p['country'], 'latency_ms': p['latency_ms'], 'anonymity': p['anonymity'], }) if pretty: return json.dumps(output, indent=2, sort_keys=True) return json.dumps(output) def format_csv(proxies): """Format proxies as CSV.""" lines = ['proto,ip,port,country,latency_ms,anonymity'] for p in proxies: if not p['ip']: continue lines.append('%s,%s,%d,%s,%s,%s' % ( p['proto'], p['ip'], p['port'], p['country'] or '', p['latency_ms'] if p['latency_ms'] else '', p['anonymity'] or '', )) return '\n'.join(lines) def format_len_prefixed(proxies): """Format as length-prefixed text (for streaming parsers).""" lines = [] for p in proxies: if not p['address']: continue entry = '%s://%s' % (p['proto'], p['address']) lines.append('%d:%s' % (len(entry), entry)) return '\n'.join(lines) def main(): parser = argparse.ArgumentParser( description='Export working proxies from PPF database', formatter_class=argparse.RawDescriptionHelpFormatter, epilog=''' Examples: %(prog)s Export all working proxies %(prog)s -f json --pretty Export as formatted JSON %(prog)s --proto socks5 --limit 50 Top 50 SOCKS5 proxies %(prog)s --country US,GB,DE Proxies from specific countries %(prog)s --sort latency --limit 100 100 fastest proxies %(prog)s --anonymity elite Only elite/high-anon proxies ''' ) parser.add_argument('-d', '--database', default='data/proxies.sqlite', help='Path to proxies database (default: data/proxies.sqlite)') parser.add_argument('-f', '--format', choices=['txt', 'json', 'csv', 'len'], default='txt', help='Output format (default: txt)') parser.add_argument('--pretty', action='store_true', help='Pretty-print JSON output') parser.add_argument('--no-proto', action='store_true', help='Omit protocol prefix in txt format (output ip:port only)') parser.add_argument('-p', '--proto', choices=['http', 'socks4', 'socks5'], help='Filter by protocol') parser.add_argument('-c', '--country', help='Filter by country code(s), comma-separated (e.g., US,DE,GB)') parser.add_argument('-a', '--anonymity', choices=['transparent', 'anonymous', 'elite'], help='Filter by anonymity level') parser.add_argument('--max-latency', type=float, help='Maximum latency in milliseconds') parser.add_argument('-n', '--limit', type=int, help='Maximum number of proxies to export') parser.add_argument('-s', '--sort', choices=['latency', 'added', 'tested', 'success'], help='Sort order (default: added)') parser.add_argument('--include-failed', action='store_true', help='Include proxies that have failed tests') parser.add_argument('-o', '--output', help='Output file (default: stdout)') parser.add_argument('-q', '--quiet', action='store_true', help='Suppress status messages') args = parser.parse_args() # Check database exists if not os.path.exists(args.database): sys.stderr.write('error: database not found: %s\n' % args.database) sys.exit(1) # Build filters filters = {} if args.proto: filters['proto'] = args.proto if args.country: filters['country'] = args.country if args.anonymity: filters['anonymity'] = args.anonymity if args.max_latency: filters['max_latency'] = args.max_latency if args.limit: filters['limit'] = args.limit if args.sort: filters['sort'] = args.sort if args.include_failed: filters['include_failed'] = True # Query proxies proxies = get_proxies(args.database, filters) if not args.quiet: sys.stderr.write('info: found %d proxies\n' % len(proxies)) # Format output if args.format == 'txt': output = format_txt(proxies, include_proto=not args.no_proto) elif args.format == 'json': output = format_json(proxies, pretty=args.pretty) elif args.format == 'csv': output = format_csv(proxies) elif args.format == 'len': output = format_len_prefixed(proxies) # Write output if args.output: with open(args.output, 'w') as f: f.write(output) f.write('\n') if not args.quiet: sys.stderr.write('info: wrote %s\n' % args.output) else: print(output) if __name__ == '__main__': main()