add export.py for proxy list export

CLI tool for exporting working proxies:
- multiple formats: txt, json, csv, length-prefixed
- filters: protocol, country, anonymity, max latency
- sorting: latency, added time, success count
- configurable output limit

Also update .gitignore to exclude data/ directory
This commit is contained in:
Username
2025-12-23 17:34:51 +01:00
parent eb1bba0e13
commit 20fc1b01fd
2 changed files with 310 additions and 2 deletions

3
.gitignore vendored
View File

@@ -6,5 +6,4 @@ __pycache__/
*.sqlite-shm
*.sqlite-wal
.claude/
data/*.BIN
data/*.dat
data/

309
export.py Normal file
View File

@@ -0,0 +1,309 @@
#!/usr/bin/env python2
# -*- coding: utf-8 -*-
"""Export working proxies to various formats.
Usage:
python export.py # Export all working proxies (txt)
python export.py --format json # Export as JSON
python export.py --format csv # Export as CSV
python export.py --proto socks5 # Filter by protocol
python export.py --country US,DE # Filter by country codes
python export.py --limit 100 # Limit output count
python export.py --sort latency # Sort by latency (fastest first)
python export.py --anonymity elite # Filter by anonymity level
python export.py --max-latency 1000 # Max latency in ms
python export.py --include-failed # Include failed proxies too
"""
from __future__ import print_function
import argparse
import json
import sys
import os
# Add parent directory to path for imports
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
import mysqlite
def get_proxies(db_path, filters=None):
"""Query proxies from database with optional filters.
Args:
db_path: Path to SQLite database
filters: Dict with optional keys:
- proto: Protocol filter (socks4, socks5, http)
- country: Country code(s) to include
- anonymity: Anonymity level (transparent, anonymous, elite)
- max_latency: Maximum latency in milliseconds
- include_failed: Include proxies with failed > 0
- sort: Sort order (latency, added, tested)
- limit: Maximum number of results
Returns:
List of proxy dicts
"""
if filters is None:
filters = {}
db = mysqlite.mysqlite(db_path)
# Build query
conditions = []
params = []
# By default, only working proxies
if not filters.get('include_failed'):
conditions.append('failed = 0')
# Protocol filter
if filters.get('proto'):
conditions.append('proto = ?')
params.append(filters['proto'])
# Country filter (can be comma-separated)
if filters.get('country'):
countries = [c.strip().upper() for c in filters['country'].split(',')]
placeholders = ','.join('?' * len(countries))
conditions.append('country IN (%s)' % placeholders)
params.extend(countries)
# Anonymity filter
if filters.get('anonymity'):
conditions.append('anonymity = ?')
params.append(filters['anonymity'])
# Max latency filter
if filters.get('max_latency'):
conditions.append('avg_latency > 0 AND avg_latency <= ?')
params.append(filters['max_latency'])
# Build WHERE clause
where = ' AND '.join(conditions) if conditions else '1=1'
# Sort order
sort_map = {
'latency': 'avg_latency ASC',
'added': 'added DESC',
'tested': 'tested DESC',
'success': 'success_count DESC',
}
order = sort_map.get(filters.get('sort'), 'added DESC')
# Limit
limit = ''
if filters.get('limit'):
limit = ' LIMIT %d' % int(filters['limit'])
query = '''
SELECT proto, ip, port, country, avg_latency, anonymity,
success_count, failed, added, tested, asn
FROM proxylist
WHERE %s
ORDER BY %s%s
''' % (where, order, limit)
rows = db.execute(query, tuple(params) if params else None).fetchall()
proxies = []
for row in rows:
proto, ip, port, country, latency, anonymity, success, failed, added, tested, asn = row
proxies.append({
'proto': proto or 'http',
'ip': ip,
'port': port,
'address': '%s:%d' % (ip, port) if ip and port else None,
'country': country,
'latency_ms': round(latency, 1) if latency else None,
'anonymity': anonymity,
'success_count': success or 0,
'failed': failed or 0,
'added': added,
'tested': tested,
'asn': asn,
})
return proxies
def format_txt(proxies, include_proto=True):
"""Format proxies as plain text, one per line."""
lines = []
for p in proxies:
if not p['address']:
continue
if include_proto:
lines.append('%s://%s' % (p['proto'], p['address']))
else:
lines.append(p['address'])
return '\n'.join(lines)
def format_json(proxies, pretty=False):
"""Format proxies as JSON array."""
# Clean up for JSON output
output = []
for p in proxies:
if not p['address']:
continue
output.append({
'proto': p['proto'],
'address': p['address'],
'country': p['country'],
'latency_ms': p['latency_ms'],
'anonymity': p['anonymity'],
})
if pretty:
return json.dumps(output, indent=2, sort_keys=True)
return json.dumps(output)
def format_csv(proxies):
"""Format proxies as CSV."""
lines = ['proto,ip,port,country,latency_ms,anonymity']
for p in proxies:
if not p['ip']:
continue
lines.append('%s,%s,%d,%s,%s,%s' % (
p['proto'],
p['ip'],
p['port'],
p['country'] or '',
p['latency_ms'] if p['latency_ms'] else '',
p['anonymity'] or '',
))
return '\n'.join(lines)
def format_len_prefixed(proxies):
"""Format as length-prefixed text (for streaming parsers)."""
lines = []
for p in proxies:
if not p['address']:
continue
entry = '%s://%s' % (p['proto'], p['address'])
lines.append('%d:%s' % (len(entry), entry))
return '\n'.join(lines)
def main():
parser = argparse.ArgumentParser(
description='Export working proxies from PPF database',
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog='''
Examples:
%(prog)s Export all working proxies
%(prog)s -f json --pretty Export as formatted JSON
%(prog)s --proto socks5 --limit 50 Top 50 SOCKS5 proxies
%(prog)s --country US,GB,DE Proxies from specific countries
%(prog)s --sort latency --limit 100 100 fastest proxies
%(prog)s --anonymity elite Only elite/high-anon proxies
'''
)
parser.add_argument('-d', '--database',
default='data/proxies.sqlite',
help='Path to proxies database (default: data/proxies.sqlite)')
parser.add_argument('-f', '--format',
choices=['txt', 'json', 'csv', 'len'],
default='txt',
help='Output format (default: txt)')
parser.add_argument('--pretty',
action='store_true',
help='Pretty-print JSON output')
parser.add_argument('--no-proto',
action='store_true',
help='Omit protocol prefix in txt format (output ip:port only)')
parser.add_argument('-p', '--proto',
choices=['http', 'socks4', 'socks5'],
help='Filter by protocol')
parser.add_argument('-c', '--country',
help='Filter by country code(s), comma-separated (e.g., US,DE,GB)')
parser.add_argument('-a', '--anonymity',
choices=['transparent', 'anonymous', 'elite'],
help='Filter by anonymity level')
parser.add_argument('--max-latency',
type=float,
help='Maximum latency in milliseconds')
parser.add_argument('-n', '--limit',
type=int,
help='Maximum number of proxies to export')
parser.add_argument('-s', '--sort',
choices=['latency', 'added', 'tested', 'success'],
help='Sort order (default: added)')
parser.add_argument('--include-failed',
action='store_true',
help='Include proxies that have failed tests')
parser.add_argument('-o', '--output',
help='Output file (default: stdout)')
parser.add_argument('-q', '--quiet',
action='store_true',
help='Suppress status messages')
args = parser.parse_args()
# Check database exists
if not os.path.exists(args.database):
sys.stderr.write('error: database not found: %s\n' % args.database)
sys.exit(1)
# Build filters
filters = {}
if args.proto:
filters['proto'] = args.proto
if args.country:
filters['country'] = args.country
if args.anonymity:
filters['anonymity'] = args.anonymity
if args.max_latency:
filters['max_latency'] = args.max_latency
if args.limit:
filters['limit'] = args.limit
if args.sort:
filters['sort'] = args.sort
if args.include_failed:
filters['include_failed'] = True
# Query proxies
proxies = get_proxies(args.database, filters)
if not args.quiet:
sys.stderr.write('info: found %d proxies\n' % len(proxies))
# Format output
if args.format == 'txt':
output = format_txt(proxies, include_proto=not args.no_proto)
elif args.format == 'json':
output = format_json(proxies, pretty=args.pretty)
elif args.format == 'csv':
output = format_csv(proxies)
elif args.format == 'len':
output = format_len_prefixed(proxies)
# Write output
if args.output:
with open(args.output, 'w') as f:
f.write(output)
f.write('\n')
if not args.quiet:
sys.stderr.write('info: wrote %s\n' % args.output)
else:
print(output)
if __name__ == '__main__':
main()