From e24f68500ce65e3903d218c155e716507a9f2986 Mon Sep 17 00:00:00 2001 From: Username Date: Sat, 20 Dec 2025 23:18:45 +0100 Subject: [PATCH] style: normalize indentation and improve code style - convert tabs to 4-space indentation - add docstrings to modules and classes - remove unused import (copy) - use explicit object inheritance - use 'while True' over 'while 1' - use 'while args' over 'while len(args)' - use '{}' over 'dict()' - consistent string formatting - Python 2/3 compatible Queue import --- comboparse.py | 164 +++--- config.py | 278 +++++----- dbs.py | 119 +++-- fetch.py | 248 ++++----- mysqlite.py | 92 ++-- ppf.py | 372 ++++++------- proxywatchd.py | 1377 ++++++++++++++++++++++++------------------------ soup_parser.py | 126 +++-- 8 files changed, 1434 insertions(+), 1342 deletions(-) mode change 100755 => 100644 ppf.py diff --git a/comboparse.py b/comboparse.py index 4ae5620..550ddbf 100644 --- a/comboparse.py +++ b/comboparse.py @@ -1,74 +1,110 @@ +#!/usr/bin/env python2 +# -*- coding: utf-8 -*- +"""Combined config file and argument parser.""" + from ConfigParser import SafeConfigParser, NoOptionError from argparse import ArgumentParser import sys -class _Dummy(): - pass + +class _Dummy(object): + """Placeholder for config sections.""" + pass + class ComboParser(object): - def __init__(self, ini): - self.items = [] - self.cparser = SafeConfigParser() - self.aparser = ArgumentParser() - self.ini = ini - self.items = [] - self.loaded = False + """Parse configuration from INI file and command-line arguments. - def add_item(self, section, name, type, default, desc, required): - def str2bool(val): - return val in ['True', 'true', '1', 'yes'] - self.items.append({ - 'section':section, - 'name':name, - 'type':type, - 'default':default, - 'required':required, - }) - self.aparser.add_argument( - '--%s.%s'%(section, name), - help='%s, default: (%s)'%(desc, str(default)), - type=type if type is not bool else str2bool, - default=None, - required=False - ) - def load(self): - if self.loaded: return - self.loaded = True + Command-line arguments override INI file values. + """ - try: - self.cparser.read(self.ini) - except Exception: - pass # config file missing or unreadable, use defaults - args = self.aparser.parse_args() - for item in self.items: - try: - obj = getattr(self, item['section']) - except AttributeError: - setattr(self, item['section'], _Dummy()) - obj = getattr(self, item['section']) + def __init__(self, ini): + self.items = [] + self.cparser = SafeConfigParser() + self.aparser = ArgumentParser() + self.ini = ini + self.loaded = False + self.args = None - setattr(obj, item['name'], item['default']) - inner = getattr(obj, item['name']) + def add_item(self, section, name, type, default, desc, required): + """Add a configuration item.""" + def str2bool(val): + return val.lower() in ('true', '1', 'yes') - item['found'] = True - try: - if item['type'] is bool : inner = self.cparser.getboolean(item['section'], item['name']) - elif item['type'] is float: inner = self.cparser.getfloat(item['section'], item['name']) - elif item['type'] is int : inner = self.cparser.getint(item['section'], item['name']) - elif item['type'] is str : inner = self.cparser.get(item['section'], item['name']) - except NoOptionError: - item['found'] = False - try: - arg = getattr(args, '%s.%s'%(item['section'], item['name'])) - if arg is not None: - inner = arg - item['found'] = True - except AttributeError: - pass # arg not provided on command line - if not item['found']: - if item['required']: - sys.stderr.write('error: required config item "%s" not found in section "%s" of "%s"!\n'%(item['name'], item['section'], self.ini)) - sys.exit(1) - else: - sys.stderr.write('warning: assigned default value of "%s" to "%s.%s"\n'%(str(item['default']), item['section'], item['name'])) - setattr(obj, item['name'], inner) + self.items.append({ + 'section': section, + 'name': name, + 'type': type, + 'default': default, + 'required': required, + }) + self.aparser.add_argument( + '--%s.%s' % (section, name), + help='%s (default: %s)' % (desc, default), + type=type if type is not bool else str2bool, + default=None, + required=False + ) + + def load(self): + """Load configuration from file and command-line.""" + if self.loaded: + return + self.loaded = True + + try: + self.cparser.read(self.ini) + except Exception: + pass # Config file missing or unreadable, use defaults + + self.args = self.aparser.parse_args() + + for item in self.items: + section = item['section'] + name = item['name'] + + # Ensure section object exists + if not hasattr(self, section): + setattr(self, section, _Dummy()) + obj = getattr(self, section) + + # Start with default value + value = item['default'] + found = False + + # Try to read from config file + try: + if item['type'] is bool: + value = self.cparser.getboolean(section, name) + elif item['type'] is float: + value = self.cparser.getfloat(section, name) + elif item['type'] is int: + value = self.cparser.getint(section, name) + elif item['type'] is str: + value = self.cparser.get(section, name) + found = True + except NoOptionError: + pass + + # Command-line overrides config file + arg_name = '%s.%s' % (section, name) + arg_value = getattr(self.args, arg_name, None) + if arg_value is not None: + value = arg_value + found = True + + # Handle missing required items + if not found: + if item['required']: + sys.stderr.write( + 'error: required config item "%s" not found in section "%s" of "%s"\n' + % (name, section, self.ini) + ) + sys.exit(1) + else: + sys.stderr.write( + 'warning: assigned default value of "%s" to "%s.%s"\n' + % (item['default'], section, name) + ) + + setattr(obj, name, value) diff --git a/config.py b/config.py index 2f44fe4..ebe345d 100644 --- a/config.py +++ b/config.py @@ -3,159 +3,159 @@ from misc import set_log_level, _log import os class Config(ComboParser): - def load(self): - super(Config, self).load() - self.torhosts = [ str(i).strip() for i in self.common.tor_hosts.split(',') ] - #with open('servers.txt', 'r') as handle: - with open(self.watchd.source_file, 'r') as handle: - self.servers = [x.strip() for x in handle.readlines() if len(x.strip()) > 0] - # Apply log level from CLI flags - if self.args.quiet: - set_log_level('warn') - elif self.args.verbose: - set_log_level('debug') + def load(self): + super(Config, self).load() + self.torhosts = [ str(i).strip() for i in self.common.tor_hosts.split(',') ] + #with open('servers.txt', 'r') as handle: + with open(self.watchd.source_file, 'r') as handle: + self.servers = [x.strip() for x in handle.readlines() if len(x.strip()) > 0] + # Apply log level from CLI flags + if self.args.quiet: + set_log_level('warn') + elif self.args.verbose: + set_log_level('debug') - def validate(self): - """Validate configuration values. Returns list of errors.""" - errors = [] - warnings = [] + def validate(self): + """Validate configuration values. Returns list of errors.""" + errors = [] + warnings = [] - # Validate port numbers - if not 1 <= self.httpd.port <= 65535: - errors.append('httpd.port must be 1-65535, got %d' % self.httpd.port) + # Validate port numbers + if not 1 <= self.httpd.port <= 65535: + errors.append('httpd.port must be 1-65535, got %d' % self.httpd.port) - # Validate timeouts (must be positive) - if self.common.timeout_connect <= 0: - errors.append('common.timeout_connect must be > 0') - if self.common.timeout_read <= 0: - errors.append('common.timeout_read must be > 0') - if self.watchd.timeout <= 0: - errors.append('watchd.timeout must be > 0') - if self.ppf.timeout <= 0: - errors.append('ppf.timeout must be > 0') + # Validate timeouts (must be positive) + if self.common.timeout_connect <= 0: + errors.append('common.timeout_connect must be > 0') + if self.common.timeout_read <= 0: + errors.append('common.timeout_read must be > 0') + if self.watchd.timeout <= 0: + errors.append('watchd.timeout must be > 0') + if self.ppf.timeout <= 0: + errors.append('ppf.timeout must be > 0') - # Validate thread counts - if self.watchd.threads < 1: - errors.append('watchd.threads must be >= 1') - if self.ppf.threads < 1: - errors.append('ppf.threads must be >= 1') + # Validate thread counts + if self.watchd.threads < 1: + errors.append('watchd.threads must be >= 1') + if self.ppf.threads < 1: + errors.append('ppf.threads must be >= 1') - # Validate max_fail - if self.watchd.max_fail < 1: - errors.append('watchd.max_fail must be >= 1') - if self.ppf.max_fail < 1: - errors.append('ppf.max_fail must be >= 1') + # Validate max_fail + if self.watchd.max_fail < 1: + errors.append('watchd.max_fail must be >= 1') + if self.ppf.max_fail < 1: + errors.append('ppf.max_fail must be >= 1') - # Validate engine names - valid_engines = {'duckduckgo', 'startpage', 'brave', 'ecosia', - 'mojeek', 'qwant', 'yandex', 'github', 'gitlab', - 'codeberg', 'gitea', 'searx'} - configured = [e.strip().lower() for e in self.scraper.engines.split(',')] - for eng in configured: - if eng and eng not in valid_engines: - warnings.append('unknown engine: %s' % eng) + # Validate engine names + valid_engines = {'duckduckgo', 'startpage', 'brave', 'ecosia', + 'mojeek', 'qwant', 'yandex', 'github', 'gitlab', + 'codeberg', 'gitea', 'searx'} + configured = [e.strip().lower() for e in self.scraper.engines.split(',')] + for eng in configured: + if eng and eng not in valid_engines: + warnings.append('unknown engine: %s' % eng) - # Validate source_file exists - if not os.path.exists(self.watchd.source_file): - warnings.append('source_file not found: %s' % self.watchd.source_file) + # Validate source_file exists + if not os.path.exists(self.watchd.source_file): + warnings.append('source_file not found: %s' % self.watchd.source_file) - # Validate database directories are writable - for db in (self.watchd.database, self.ppf.database): - db_dir = os.path.dirname(db) or '.' - if not os.access(db_dir, os.W_OK): - errors.append('database directory not writable: %s' % db_dir) + # Validate database directories are writable + for db in (self.watchd.database, self.ppf.database): + db_dir = os.path.dirname(db) or '.' + if not os.access(db_dir, os.W_OK): + errors.append('database directory not writable: %s' % db_dir) - # Log warnings - for w in warnings: - _log(w, 'warn') + # Log warnings + for w in warnings: + _log(w, 'warn') - return errors - def __init__(self): - super(Config, self).__init__('config.ini') - section = 'common' - self.add_item(section, 'tor_hosts', str, '127.0.0.1:9050', 'comma-separated list of tor proxy address(es)', True) - self.add_item(section, 'timeout_connect', int, 10, 'connection timeout in seconds (default: 10)', False) - self.add_item(section, 'timeout_read', int, 15, 'read timeout in seconds (default: 15)', False) + return errors + def __init__(self): + super(Config, self).__init__('config.ini') + section = 'common' + self.add_item(section, 'tor_hosts', str, '127.0.0.1:9050', 'comma-separated list of tor proxy address(es)', True) + self.add_item(section, 'timeout_connect', int, 10, 'connection timeout in seconds (default: 10)', False) + self.add_item(section, 'timeout_read', int, 15, 'read timeout in seconds (default: 15)', False) - section = 'watchd' - self.add_item(section, 'outage_threshold', float, 4.0, 'mininum success percentage required to not drop check results', False) - self.add_item(section, 'max_fail', int, 5, 'number of fails after which a proxy is considered dead', False) - self.add_item(section, 'threads', int, 10, 'number of threads watchd uses to check proxies', True) - self.add_item(section, 'timeout', int, 15, 'timeout for blocking operations (connect/recv/...) for proxy checks in seconds', False) - self.add_item(section, 'submit_after', int, 200, 'min. number of tested proxies for DB write', False) - self.add_item(section, 'debug', bool, False, 'whether to print additional debug info', False) - self.add_item(section, 'use_ssl', int, 0, 'whether to use SSL and port 6697 to connect to targets (slower)', False) - self.add_item(section, 'checktime', int, 1800, 'base checking interval for proxies in db in seconds', False) - self.add_item(section, 'perfail_checktime', int, 3600, 'additional checking interval for proxies in db in seconds per experienced failure', False) - self.add_item(section, 'database', str, 'websites.sqlite', 'filename of database', True) - self.add_item(section, 'oldies', bool, False, 're-test old proxies as well ? (default: False)', False) - self.add_item(section, 'oldies_checktime', int, 43200, 'base checking interval for *old* proxies in seconds (default: 43200)', False) - self.add_item(section, 'oldies_multi', int, 10, 'fetch threads*multi rows when testing oldies (default: 10)', False) - self.add_item(section, 'source_file', str, 'servers.txt', 'server/url list to read from (default: servers.txt)', False) - self.add_item(section, 'stale_days', int, 30, 'days after which dead proxies are removed (default: 30)', False) - self.add_item(section, 'stats_interval', int, 300, 'seconds between status reports (default: 300)', False) - self.add_item(section, 'tor_safeguard', bool, True, 'enable tor safeguard (default: True)', False) - self.add_item(section, 'checktype', str, 'http', 'check type (irc or http)', False) + section = 'watchd' + self.add_item(section, 'outage_threshold', float, 4.0, 'mininum success percentage required to not drop check results', False) + self.add_item(section, 'max_fail', int, 5, 'number of fails after which a proxy is considered dead', False) + self.add_item(section, 'threads', int, 10, 'number of threads watchd uses to check proxies', True) + self.add_item(section, 'timeout', int, 15, 'timeout for blocking operations (connect/recv/...) for proxy checks in seconds', False) + self.add_item(section, 'submit_after', int, 200, 'min. number of tested proxies for DB write', False) + self.add_item(section, 'debug', bool, False, 'whether to print additional debug info', False) + self.add_item(section, 'use_ssl', int, 0, 'whether to use SSL and port 6697 to connect to targets (slower)', False) + self.add_item(section, 'checktime', int, 1800, 'base checking interval for proxies in db in seconds', False) + self.add_item(section, 'perfail_checktime', int, 3600, 'additional checking interval for proxies in db in seconds per experienced failure', False) + self.add_item(section, 'database', str, 'websites.sqlite', 'filename of database', True) + self.add_item(section, 'oldies', bool, False, 're-test old proxies as well ? (default: False)', False) + self.add_item(section, 'oldies_checktime', int, 43200, 'base checking interval for *old* proxies in seconds (default: 43200)', False) + self.add_item(section, 'oldies_multi', int, 10, 'fetch threads*multi rows when testing oldies (default: 10)', False) + self.add_item(section, 'source_file', str, 'servers.txt', 'server/url list to read from (default: servers.txt)', False) + self.add_item(section, 'stale_days', int, 30, 'days after which dead proxies are removed (default: 30)', False) + self.add_item(section, 'stats_interval', int, 300, 'seconds between status reports (default: 300)', False) + self.add_item(section, 'tor_safeguard', bool, True, 'enable tor safeguard (default: True)', False) + self.add_item(section, 'checktype', str, 'http', 'check type (irc or http)', False) - section = 'httpd' - self.add_item(section, 'listenip', str, '127.0.0.1', 'address for the httpd to listen to (default: 127.0.0.1)', True) - self.add_item(section, 'port', int, 8081, 'port for the httpd to listen to (default: 8081)', True) - self.add_item(section, 'enabled', bool, False, 'start httpd (default: False)', True) + section = 'httpd' + self.add_item(section, 'listenip', str, '127.0.0.1', 'address for the httpd to listen to (default: 127.0.0.1)', True) + self.add_item(section, 'port', int, 8081, 'port for the httpd to listen to (default: 8081)', True) + self.add_item(section, 'enabled', bool, False, 'start httpd (default: False)', True) - section = 'ppf' - self.add_item(section, 'debug', bool, False, 'whether to print additional debug info', False) - self.add_item(section, 'search', bool, True, 'whether to use searx search engine to find new proxy lists', False) - self.add_item(section, 'timeout', float, 15, 'timeout for blocking operations (connect/recv/...) for proxy checks in seconds', False) - self.add_item(section, 'http_retries', int, 1, 'number of retries for http connects', False) - self.add_item(section, 'threads', int, 1, 'number of threads to run (default: 1)', False) - self.add_item(section, 'checktime', int, 3600, 'base checking interval for urls in db in seconds', False) - self.add_item(section, 'perfail_checktime', int, 3600, 'additional checking interval for urls in db in seconds per resultless check', False) - self.add_item(section, 'max_fail', int, 5, 'number of fails after which an url is considered dead', False) - self.add_item(section, 'database', str, 'proxies.sqlite', 'filename of database', True) - self.add_item(section, 'extract_samedomain', bool, False, 'extract only url from same domains? (default: False)', False) + section = 'ppf' + self.add_item(section, 'debug', bool, False, 'whether to print additional debug info', False) + self.add_item(section, 'search', bool, True, 'whether to use searx search engine to find new proxy lists', False) + self.add_item(section, 'timeout', float, 15, 'timeout for blocking operations (connect/recv/...) for proxy checks in seconds', False) + self.add_item(section, 'http_retries', int, 1, 'number of retries for http connects', False) + self.add_item(section, 'threads', int, 1, 'number of threads to run (default: 1)', False) + self.add_item(section, 'checktime', int, 3600, 'base checking interval for urls in db in seconds', False) + self.add_item(section, 'perfail_checktime', int, 3600, 'additional checking interval for urls in db in seconds per resultless check', False) + self.add_item(section, 'max_fail', int, 5, 'number of fails after which an url is considered dead', False) + self.add_item(section, 'database', str, 'proxies.sqlite', 'filename of database', True) + self.add_item(section, 'extract_samedomain', bool, False, 'extract only url from same domains? (default: False)', False) - section = 'scraper' - self.add_item(section, 'debug', bool, False, 'scraper: whether to print additional debug info', False) - self.add_item(section, 'query', str, 'psw', 'build query using Proxies, Search, Websites', False) - self.add_item(section, 'backoff_base', int, 30, 'base backoff delay in seconds (default: 30)', False) - self.add_item(section, 'backoff_max', int, 3600, 'max backoff delay in seconds (default: 3600)', False) - self.add_item(section, 'fail_threshold', int, 2, 'consecutive failures before backoff (default: 2)', False) - self.add_item(section, 'engines', str, 'searx,duckduckgo,github', 'comma-separated search engines (default: searx,duckduckgo,github)', False) - self.add_item(section, 'max_pages', int, 5, 'max pages to fetch per engine query (default: 5)', False) - self.add_item(section, 'libretranslate_url', str, 'https://lt.mymx.me/translate', 'LibreTranslate API URL (default: https://lt.mymx.me/translate)', False) - self.add_item(section, 'libretranslate_enabled', bool, True, 'enable LibreTranslate for dynamic translations (default: True)', False) + section = 'scraper' + self.add_item(section, 'debug', bool, False, 'scraper: whether to print additional debug info', False) + self.add_item(section, 'query', str, 'psw', 'build query using Proxies, Search, Websites', False) + self.add_item(section, 'backoff_base', int, 30, 'base backoff delay in seconds (default: 30)', False) + self.add_item(section, 'backoff_max', int, 3600, 'max backoff delay in seconds (default: 3600)', False) + self.add_item(section, 'fail_threshold', int, 2, 'consecutive failures before backoff (default: 2)', False) + self.add_item(section, 'engines', str, 'searx,duckduckgo,github', 'comma-separated search engines (default: searx,duckduckgo,github)', False) + self.add_item(section, 'max_pages', int, 5, 'max pages to fetch per engine query (default: 5)', False) + self.add_item(section, 'libretranslate_url', str, 'https://lt.mymx.me/translate', 'LibreTranslate API URL (default: https://lt.mymx.me/translate)', False) + self.add_item(section, 'libretranslate_enabled', bool, True, 'enable LibreTranslate for dynamic translations (default: True)', False) - self.aparser.add_argument("--file", help="import a single file containing proxy addrs", type=str, default='', required=False) - self.aparser.add_argument("--nobs", help="disable BeautifulSoup, use stdlib HTMLParser", action='store_true', default=False) - self.aparser.add_argument("-q", "--quiet", help="suppress info messages, show warnings and errors only", action='store_true', default=False) - self.aparser.add_argument("-v", "--verbose", help="show debug messages", action='store_true', default=False) + self.aparser.add_argument("--file", help="import a single file containing proxy addrs", type=str, default='', required=False) + self.aparser.add_argument("--nobs", help="disable BeautifulSoup, use stdlib HTMLParser", action='store_true', default=False) + self.aparser.add_argument("-q", "--quiet", help="suppress info messages, show warnings and errors only", action='store_true', default=False) + self.aparser.add_argument("-v", "--verbose", help="show debug messages", action='store_true', default=False) - section = 'flood' - self.add_item(section, 'server', str, None, 'irc server address', False) - self.add_item(section, 'target', str, None, 'target to flood', False) - self.add_item(section, 'nickserv', str, 'nickserv', "nickserv's nickname", False) - self.add_item(section, 'message', str, None, 'message', False) - self.add_item(section, 'threads', int, 1, '# of threads', False) - self.add_item(section, 'register', int, 0, 'register nickname when required', False) + section = 'flood' + self.add_item(section, 'server', str, None, 'irc server address', False) + self.add_item(section, 'target', str, None, 'target to flood', False) + self.add_item(section, 'nickserv', str, 'nickserv', "nickserv's nickname", False) + self.add_item(section, 'message', str, None, 'message', False) + self.add_item(section, 'threads', int, 1, '# of threads', False) + self.add_item(section, 'register', int, 0, 'register nickname when required', False) - self.add_item(section, 'wait', int, 0, 'wait prior sending messages', False) - self.add_item(section, 'once', int, 0, 'quit as soon as possible', False) - self.add_item(section, 'hilight', int, 0, 'try to hilight all nicks?', False) - self.add_item(section, 'waitonsuccess', int, 0, 'wait for a while on success', False) - self.add_item(section, 'debug', int, 0, 'use debug', False) - self.add_item(section, 'duration', int, 180, 'maximum time to run', False) - self.add_item(section, 'delay', str, 14400, 'if waitonsuccess, wait for $delay before sending other bots', False) - self.add_item(section, 'nick', str, None, 'specify nickname to use', False) - self.add_item(section, 'use_ssl', int, 2, 'Use ssl? (0: false, 1: true, 2: random)', False) - self.add_item(section, 'cycle', int, 0, 'cycle flood', False) - self.add_item(section, 'change_nick', int, 0, 'Change nick between messages (useful when flooding privates)', False) - self.add_item(section, 'use_timeout', int, 0, 'make connexions quit through timeout', False) - self.add_item(section, 'clones', int, 1, 'Number of connexion repeat to run', False) - self.add_item(section, 'query', bool, False, 'also flood in query', False) - self.add_item(section, 'noquerybefore', int, 10, 'do not send query before x secs being connected', False) - self.add_item(section, 'oper', bool, False, 'piss of opers', False) - self.add_item(section, 'whois', bool, False, 'piss of opers with /whois', False) - self.add_item(section, 'modex', bool, False, 'make +/- x mode', False) - self.add_item(section, 'os', bool, False, 'piss off opers with /os', False) - self.add_item(section, 'file', str, None, 'read flood content from file', False) - self.add_item(section, 'failid', str, None, 'generate nickserv warn. about IDENTIFY attempts', False) + self.add_item(section, 'wait', int, 0, 'wait prior sending messages', False) + self.add_item(section, 'once', int, 0, 'quit as soon as possible', False) + self.add_item(section, 'hilight', int, 0, 'try to hilight all nicks?', False) + self.add_item(section, 'waitonsuccess', int, 0, 'wait for a while on success', False) + self.add_item(section, 'debug', int, 0, 'use debug', False) + self.add_item(section, 'duration', int, 180, 'maximum time to run', False) + self.add_item(section, 'delay', str, 14400, 'if waitonsuccess, wait for $delay before sending other bots', False) + self.add_item(section, 'nick', str, None, 'specify nickname to use', False) + self.add_item(section, 'use_ssl', int, 2, 'Use ssl? (0: false, 1: true, 2: random)', False) + self.add_item(section, 'cycle', int, 0, 'cycle flood', False) + self.add_item(section, 'change_nick', int, 0, 'Change nick between messages (useful when flooding privates)', False) + self.add_item(section, 'use_timeout', int, 0, 'make connexions quit through timeout', False) + self.add_item(section, 'clones', int, 1, 'Number of connexion repeat to run', False) + self.add_item(section, 'query', bool, False, 'also flood in query', False) + self.add_item(section, 'noquerybefore', int, 10, 'do not send query before x secs being connected', False) + self.add_item(section, 'oper', bool, False, 'piss of opers', False) + self.add_item(section, 'whois', bool, False, 'piss of opers with /whois', False) + self.add_item(section, 'modex', bool, False, 'make +/- x mode', False) + self.add_item(section, 'os', bool, False, 'piss off opers with /os', False) + self.add_item(section, 'file', str, None, 'read flood content from file', False) + self.add_item(section, 'failid', str, None, 'generate nickserv warn. about IDENTIFY attempts', False) diff --git a/dbs.py b/dbs.py index 499ba87..4e5f489 100644 --- a/dbs.py +++ b/dbs.py @@ -1,61 +1,80 @@ +#!/usr/bin/env python2 +# -*- coding: utf-8 -*- +"""Database table creation and insertion utilities.""" + import time from misc import _log + def create_table_if_not_exists(sqlite, dbname): - if dbname == 'proxylist': - sqlite.execute("""CREATE TABLE IF NOT EXISTS proxylist ( - proxy BLOB UNIQUE, - country BLOB, - added INT, - failed INT, - tested INT, - dronebl INT, - proto TEXT, - mitm INT, - success_count INT, - ip TEXT, - port INT, - consecutive_success INT, - total_duration INT)""") - # indexes for common query patterns - sqlite.execute('CREATE INDEX IF NOT EXISTS idx_proxylist_failed ON proxylist(failed)') - sqlite.execute('CREATE INDEX IF NOT EXISTS idx_proxylist_tested ON proxylist(tested)') - sqlite.execute('CREATE INDEX IF NOT EXISTS idx_proxylist_proto ON proxylist(proto)') + """Create database table with indexes if it doesn't exist.""" + if dbname == 'proxylist': + sqlite.execute("""CREATE TABLE IF NOT EXISTS proxylist ( + proxy BLOB UNIQUE, + country BLOB, + added INT, + failed INT, + tested INT, + dronebl INT, + proto TEXT, + mitm INT, + success_count INT, + ip TEXT, + port INT, + consecutive_success INT, + total_duration INT)""") + # Indexes for common query patterns + sqlite.execute('CREATE INDEX IF NOT EXISTS idx_proxylist_failed ON proxylist(failed)') + sqlite.execute('CREATE INDEX IF NOT EXISTS idx_proxylist_tested ON proxylist(tested)') + sqlite.execute('CREATE INDEX IF NOT EXISTS idx_proxylist_proto ON proxylist(proto)') - elif dbname == 'uris': - sqlite.execute("""CREATE TABLE IF NOT EXISTS uris ( - url TEXT UNIQUE, - content_type TEXT, - check_time INT, - error INT, - stale_count INT, - retrievals INT, - proxies_added INT, - added INT - )""") - # indexes for common query patterns - sqlite.execute('CREATE INDEX IF NOT EXISTS idx_uris_error ON uris(error)') - sqlite.execute('CREATE INDEX IF NOT EXISTS idx_uris_checktime ON uris(check_time)') + elif dbname == 'uris': + sqlite.execute("""CREATE TABLE IF NOT EXISTS uris ( + url TEXT UNIQUE, + content_type TEXT, + check_time INT, + error INT, + stale_count INT, + retrievals INT, + proxies_added INT, + added INT)""") + # Indexes for common query patterns + sqlite.execute('CREATE INDEX IF NOT EXISTS idx_uris_error ON uris(error)') + sqlite.execute('CREATE INDEX IF NOT EXISTS idx_uris_checktime ON uris(check_time)') + + sqlite.commit() - sqlite.commit() def insert_proxies(proxydb, proxies, url): - if not proxies: return - timestamp = int(time.time()) - rows = [] - for p in proxies: - ip, port = p.split(':') - rows.append((timestamp,p,ip,port,3,0,0,0,0,0)) - proxydb.executemany('INSERT OR IGNORE INTO proxylist (added,proxy,ip,port,failed,tested,success_count,total_duration,mitm,consecutive_success) VALUES (?,?,?,?,?,?,?,?,?,?)', rows) - proxydb.commit() - _log('+%d proxy/ies from %s' % (len(proxies), url), 'added') + """Insert new proxies into database.""" + if not proxies: + return + timestamp = int(time.time()) + rows = [] + for p in proxies: + ip, port = p.split(':') + rows.append((timestamp, p, ip, port, 3, 0, 0, 0, 0, 0)) + proxydb.executemany( + 'INSERT OR IGNORE INTO proxylist ' + '(added,proxy,ip,port,failed,tested,success_count,total_duration,mitm,consecutive_success) ' + 'VALUES (?,?,?,?,?,?,?,?,?,?)', + rows + ) + proxydb.commit() + _log('+%d proxy/ies from %s' % (len(proxies), url), 'added') def insert_urls(urls, search, sqlite): - if not urls: return - time_now = int(time.time()) - rows = [ (time_now,u,0,1,0,0,0) for u in urls ] - sqlite.executemany('INSERT OR IGNORE INTO uris (added,url,check_time,error,stale_count,retrievals,proxies_added) values(?,?,?,?,?,?,?)', rows) - sqlite.commit() - _log('+%d url(s) from %s' % (len(urls), search), 'added') - + """Insert new URLs into database.""" + if not urls: + return + timestamp = int(time.time()) + rows = [(timestamp, u, 0, 1, 0, 0, 0) for u in urls] + sqlite.executemany( + 'INSERT OR IGNORE INTO uris ' + '(added,url,check_time,error,stale_count,retrievals,proxies_added) ' + 'VALUES (?,?,?,?,?,?,?)', + rows + ) + sqlite.commit() + _log('+%d url(s) from %s' % (len(urls), search), 'added') diff --git a/fetch.py b/fetch.py index fdb0558..8d663e1 100644 --- a/fetch.py +++ b/fetch.py @@ -6,169 +6,169 @@ from misc import _log config = None def set_config(cfg): - global config - config = cfg + global config + config = cfg cleanhtml_re = [ - re.compile('<.*?>'), - re.compile('\s+'), - re.compile('::+'), + re.compile('<.*?>'), + re.compile('\s+'), + re.compile('::+'), ] def cleanhtml(raw_html): - html = raw_html.replace(' ', ' ') - html = re.sub(cleanhtml_re[0], ':', html) - html = re.sub(cleanhtml_re[1], ':', html) - html = re.sub(cleanhtml_re[2], ':', html) - return html + html = raw_html.replace(' ', ' ') + html = re.sub(cleanhtml_re[0], ':', html) + html = re.sub(cleanhtml_re[1], ':', html) + html = re.sub(cleanhtml_re[2], ':', html) + return html def fetch_contents(url, head=False, proxy=None): - content = None - if proxy is not None and len(proxy): - for p in proxy: - content = _fetch_contents(url, head=head, proxy=p) - if content is not None: break + content = None + if proxy is not None and len(proxy): + for p in proxy: + content = _fetch_contents(url, head=head, proxy=p) + if content is not None: break - else: - content = _fetch_contents(url, head=head) + else: + content = _fetch_contents(url, head=head) - return content if content is not None else '' + return content if content is not None else '' retry_messages = ('Engines cannot retrieve results', 'Rate limit exceeded') def _fetch_contents(url, head = False, proxy=None): - host, port, ssl, uri = _parse_url(url) - headers=[ - 'Accept-Language: en-US,en;q=0.8', - 'Cache-Control: max-age=0', - ] - if config.ppf.debug: - _log("connecting to %s... (header: %s)" % (url, str(head)), "debug") - while True: - proxies = [rocksock.RocksockProxyFromURL('socks4://%s' % random.choice( config.torhosts ))] - if proxy: proxies.append( rocksock.RocksockProxyFromURL(proxy)) + host, port, ssl, uri = _parse_url(url) + headers=[ + 'Accept-Language: en-US,en;q=0.8', + 'Cache-Control: max-age=0', + ] + if config.ppf.debug: + _log("connecting to %s... (header: %s)" % (url, str(head)), "debug") + while True: + proxies = [rocksock.RocksockProxyFromURL('socks4://%s' % random.choice( config.torhosts ))] + if proxy: proxies.append( rocksock.RocksockProxyFromURL(proxy)) - http = RsHttp(host,ssl=ssl,port=port, keep_alive=True, timeout=config.ppf.timeout, max_tries=config.ppf.http_retries, follow_redirects=True, auto_set_cookies=True, proxies=proxies, user_agent='Mozilla/5.0 (Windows NT 6.1; rv:60.0) Gecko/20100101 Firefox/60.0') - if not http.connect(): - _log("failed to connect to %s"%url, "ppf") - e = http.get_last_rocksock_exception() - if not e: - return None - et = e.get_errortype() - ee = e.get_error() - ef = e.get_failedproxy() - if et == rocksock.RS_ET_OWN and \ - ee == rocksock.RS_E_TARGET_CONN_REFUSED \ - and ef == 0: - _log("could not connect to proxy 0 - check your connection", "error") - time.sleep(5) - continue - return None - break + http = RsHttp(host,ssl=ssl,port=port, keep_alive=True, timeout=config.ppf.timeout, max_tries=config.ppf.http_retries, follow_redirects=True, auto_set_cookies=True, proxies=proxies, user_agent='Mozilla/5.0 (Windows NT 6.1; rv:60.0) Gecko/20100101 Firefox/60.0') + if not http.connect(): + _log("failed to connect to %s"%url, "ppf") + e = http.get_last_rocksock_exception() + if not e: + return None + et = e.get_errortype() + ee = e.get_error() + ef = e.get_failedproxy() + if et == rocksock.RS_ET_OWN and \ + ee == rocksock.RS_E_TARGET_CONN_REFUSED \ + and ef == 0: + _log("could not connect to proxy 0 - check your connection", "error") + time.sleep(5) + continue + return None + break - ## only request header - if head: - hdr = http.head(uri, headers) - return hdr + ## only request header + if head: + hdr = http.head(uri, headers) + return hdr - hdr, res = http.get(uri, headers) - res = res.encode('utf-8') if isinstance(res, unicode) else res - for retry_message in retry_messages: - if retry_message in res: return None + hdr, res = http.get(uri, headers) + res = res.encode('utf-8') if isinstance(res, unicode) else res + for retry_message in retry_messages: + if retry_message in res: return None - return res + return res def valid_port(port): - return port > 0 and port < 65535 + return port > 0 and port < 65535 def is_usable_proxy(proxy): - ip, port = proxy.split(':') - if not valid_port(int(port)): return False + ip, port = proxy.split(':') + if not valid_port(int(port)): return False - octets = ip.split('.') - A = int(octets[0]) - B = int(octets[1]) - C = int(octets[2]) - D = int(octets[3]) + octets = ip.split('.') + A = int(octets[0]) + B = int(octets[1]) + C = int(octets[2]) + D = int(octets[3]) - if (A < 1 or A > 254 or \ - B > 255 or C > 255 or D > 255) or \ - (A == 10 or A == 127) or \ - (A == 192 and B == 168) or \ - (A == 172 and B >= 16 and B <= 31): return False - return True + if (A < 1 or A > 254 or \ + B > 255 or C > 255 or D > 255) or \ + (A == 10 or A == 127) or \ + (A == 192 and B == 168) or \ + (A == 172 and B >= 16 and B <= 31): return False + return True _known_proxies = {} def init_known_proxies(proxydb): - """Initialize known proxies cache from database.""" - global _known_proxies - if _known_proxies: - return - known = proxydb.execute('SELECT proxy FROM proxylist').fetchall() - for k in known: - _known_proxies[k[0]] = True + """Initialize known proxies cache from database.""" + global _known_proxies + if _known_proxies: + return + known = proxydb.execute('SELECT proxy FROM proxylist').fetchall() + for k in known: + _known_proxies[k[0]] = True def add_known_proxies(proxies): - """Add proxies to known cache.""" - global _known_proxies - for p in proxies: - _known_proxies[p] = True + """Add proxies to known cache.""" + global _known_proxies + for p in proxies: + _known_proxies[p] = True def is_known_proxy(proxy): - """Check if proxy is in known cache.""" - return proxy in _known_proxies + """Check if proxy is in known cache.""" + return proxy in _known_proxies def extract_proxies(content, proxydb=None, filter_known=True): - """Extract and normalize proxy addresses from content. + """Extract and normalize proxy addresses from content. - Args: - content: HTML/text content to parse - proxydb: Database connection for known proxy lookup (optional) - filter_known: If True, filter out known proxies and return new only + Args: + content: HTML/text content to parse + proxydb: Database connection for known proxy lookup (optional) + filter_known: If True, filter out known proxies and return new only - Returns: - If filter_known: (unique_count, new_proxies) tuple - If not filter_known: list of all unique valid proxies - """ - matches = re.findall(r'([0-9]+(?:\.[0-9]+){3}:[0-9]{2,5})[\D$]', cleanhtml(content)) + Returns: + If filter_known: (unique_count, new_proxies) tuple + If not filter_known: list of all unique valid proxies + """ + matches = re.findall(r'([0-9]+(?:\.[0-9]+){3}:[0-9]{2,5})[\D$]', cleanhtml(content)) - uniques_dict = {} - for p in matches: - ip, port = p.split(':') - # Normalize IP (remove leading zeros from octets) - ip = '.'.join(str(int(octet)) for octet in ip.split('.')) - # Normalize port (remove leading zeros, handle empty case) - port = int(port.lstrip('0') or '0') - p = '%s:%s' % (ip, port) - uniques_dict[p] = True + uniques_dict = {} + for p in matches: + ip, port = p.split(':') + # Normalize IP (remove leading zeros from octets) + ip = '.'.join(str(int(octet)) for octet in ip.split('.')) + # Normalize port (remove leading zeros, handle empty case) + port = int(port.lstrip('0') or '0') + p = '%s:%s' % (ip, port) + uniques_dict[p] = True - uniques = [p for p in uniques_dict.keys() if is_usable_proxy(p)] + uniques = [p for p in uniques_dict.keys() if is_usable_proxy(p)] - if not filter_known: - return uniques + if not filter_known: + return uniques - # Initialize known proxies from DB if needed - if proxydb is not None: - init_known_proxies(proxydb) + # Initialize known proxies from DB if needed + if proxydb is not None: + init_known_proxies(proxydb) - new = [] - for p in uniques: - if not is_known_proxy(p): - new.append(p) - add_known_proxies([p]) + new = [] + for p in uniques: + if not is_known_proxy(p): + new.append(p) + add_known_proxies([p]) - return len(uniques), new + return len(uniques), new def extract_urls(content, urls = None, urignore=None): - urls = [] if not urls else urls - soup = soupify(content) - for a in soup.body.find_all('a'): - if not 'rel' in a.attrs or not 'noreferrer' in a.attrs['rel'] or a.attrs['href'] in urls: continue - bad = False - href = a.attrs['href'] - for i in urignore: - if re.findall(i, href): - bad = True - break - if not bad: urls.append(href) - return urls + urls = [] if not urls else urls + soup = soupify(content) + for a in soup.body.find_all('a'): + if not 'rel' in a.attrs or not 'noreferrer' in a.attrs['rel'] or a.attrs['href'] in urls: continue + bad = False + href = a.attrs['href'] + for i in urignore: + if re.findall(i, href): + bad = True + break + if not bad: urls.append(href) + return urls diff --git a/mysqlite.py b/mysqlite.py index 08ba9f1..a8ac077 100644 --- a/mysqlite.py +++ b/mysqlite.py @@ -1,44 +1,62 @@ -import time, random, sys +#!/usr/bin/env python2 +# -*- coding: utf-8 -*- +"""SQLite wrapper with retry logic and WAL mode.""" + +import time +import random +import sys import sqlite3 -class mysqlite: - def _try_op(self, op, query, args=None, rmin=1.5, rmax=7.0): - while 1: - try: - if query is None: - return op() - elif args is None: - return op(query) - else: - return op(query, args) - except sqlite3.OperationalError as e: - if e.message == 'database is locked': - print "zzZzzZZ: db is locked (%s)"%self.dbname - time.sleep(random.uniform(rmin, rmax)) - continue - else: - print '%s\nquery: %s\nargs: %s' % (str(sys.exc_info()), str(query), str(args)) - raise e - def execute(self, query, args = None, rmin=1.5, rmax=7.0): - return self._try_op(self.cursor.execute, query, args, rmin, rmax) +class mysqlite(object): + """SQLite connection wrapper with automatic retry on lock.""" - def executemany(self, query, args, rmin=1.5, rmax=7.0): - while len(args): - self._try_op(self.cursor.executemany, query, args[:500], rmin, rmax) - args = args[500:] + def __init__(self, database, factory=None): + self.handle = sqlite3.connect(database) + if factory is not None: + self.handle.text_factory = factory + self.cursor = self.handle.cursor() + self.dbname = database + # Enable WAL mode for better concurrency + self.cursor.execute('PRAGMA journal_mode=WAL') + self.cursor.execute('PRAGMA synchronous=NORMAL') - def commit(self, rmin=1.5, rmax=7.0): - return self._try_op(self.handle.commit, None, None, rmin, rmax) + def _try_op(self, op, query, args=None, rmin=1.5, rmax=7.0): + """Execute operation with retry on database lock.""" + while True: + try: + if query is None: + return op() + elif args is None: + return op(query) + else: + return op(query, args) + except sqlite3.OperationalError as e: + err_msg = str(e) + if 'database is locked' in err_msg: + sys.stderr.write('zzZzzZZ: db is locked (%s)\n' % self.dbname) + time.sleep(random.uniform(rmin, rmax)) + continue + else: + sys.stderr.write('%s\nquery: %s\nargs: %s\n' % ( + str(sys.exc_info()), str(query), str(args))) + raise - def close(self): - self.handle.close() + def execute(self, query, args=None, rmin=1.5, rmax=7.0): + """Execute a single query with retry.""" + return self._try_op(self.cursor.execute, query, args, rmin, rmax) - def __init__(self, database, factory = None): - self.handle = sqlite3.connect(database) - if factory: self.handle.text_factory = factory - self.cursor = self.handle.cursor() - self.dbname = database - # enable WAL mode for better concurrency - self.cursor.execute('PRAGMA journal_mode=WAL') - self.cursor.execute('PRAGMA synchronous=NORMAL') + def executemany(self, query, args, rmin=1.5, rmax=7.0): + """Execute query for multiple argument sets, batched.""" + while args: + batch = args[:500] + self._try_op(self.cursor.executemany, query, batch, rmin, rmax) + args = args[500:] + + def commit(self, rmin=1.5, rmax=7.0): + """Commit transaction with retry.""" + return self._try_op(self.handle.commit, None, None, rmin, rmax) + + def close(self): + """Close database connection.""" + self.handle.close() diff --git a/ppf.py b/ppf.py old mode 100755 new mode 100644 index 7f114fb..adf69d8 --- a/ppf.py +++ b/ppf.py @@ -16,231 +16,231 @@ import random config = Config() def import_from_file(fn, sqlite): - with open(fn, 'r') as f: - urls = [ url for url in f.read().split('\n') if url ] - cinc = 0 - while True: - chunk = urls[cinc:cinc+200] - if chunk: dbs.insert_urls(chunk, 'import.txt', urldb) - else: break - cinc = cinc + 200 + with open(fn, 'r') as f: + urls = [ url for url in f.read().split('\n') if url ] + cinc = 0 + while True: + chunk = urls[cinc:cinc+200] + if chunk: dbs.insert_urls(chunk, 'import.txt', urldb) + else: break + cinc = cinc + 200 def get_content_type(url, proxy): - hdr = fetch.fetch_contents(url, head=True, proxy=proxy) + hdr = fetch.fetch_contents(url, head=True, proxy=proxy) - for h in hdr.split('\n'): - if h.lower().startswith('content-type: '): return h.lower().split(':')[1].strip() + for h in hdr.split('\n'): + if h.lower().startswith('content-type: '): return h.lower().split(':')[1].strip() - return '' + return '' def is_good_content_type(string): - allowed_ct = [ 'text/html', 'text/plain', 'atom+xml' ] - for ct in allowed_ct: - if ct.lower() in string.lower(): return True - return False + allowed_ct = [ 'text/html', 'text/plain', 'atom+xml' ] + for ct in allowed_ct: + if ct.lower() in string.lower(): return True + return False def is_bad_url(uri, domain=None, samedomain=False): - # if uri needs to be from same domain and domains missmatch - if samedomain and str(uri.split('/')[2]).lower() != str(domain).lower(): - return True - for u in urignore: - if re.findall(u, uri): return True - return False + # if uri needs to be from same domain and domains missmatch + if samedomain and str(uri.split('/')[2]).lower() != str(domain).lower(): + return True + for u in urignore: + if re.findall(u, uri): return True + return False def extract_urls(html, url): - mytime = int(time.time()) - proto = url.split(':')[0] - domain = url.split('/')[2] - urls = [] + mytime = int(time.time()) + proto = url.split(':')[0] + domain = url.split('/')[2] + urls = [] - soup = soupify(html, nohtml=True) + soup = soupify(html, nohtml=True) - for a in soup.find_all('a', href=True): - item = a['href'].encode('utf-8') if isinstance(a['href'], unicode) else a['href'] - item = item.strip() + for a in soup.find_all('a', href=True): + item = a['href'].encode('utf-8') if isinstance(a['href'], unicode) else a['href'] + item = item.strip() - if item.startswith('www.'): - item = 'http://%s' % item - elif not item.startswith('http'): - if not item.startswith('/'): item = '/%s' % item - item = '%s://%s%s' % (proto,domain,item) + if item.startswith('www.'): + item = 'http://%s' % item + elif not item.startswith('http'): + if not item.startswith('/'): item = '/%s' % item + item = '%s://%s%s' % (proto,domain,item) - elif is_bad_url(item, domain=domain, samedomain=config.ppf.extract_samedomain): - continue - if not item in urls: urls.append(item) + elif is_bad_url(item, domain=domain, samedomain=config.ppf.extract_samedomain): + continue + if not item in urls: urls.append(item) - if urls: dbs.insert_urls(urls, url, urldb) #insert_if_not_exists(urls) + if urls: dbs.insert_urls(urls, url, urldb) #insert_if_not_exists(urls) def import_proxies_from_file(proxydb, fn): - content = open(fn, 'r').read() - unique_count, new = fetch.extract_proxies(content, proxydb) - if new: - dbs.insert_proxies(proxydb, new, fn) - return 0 - return 1 + content = open(fn, 'r').read() + unique_count, new = fetch.extract_proxies(content, proxydb) + if new: + dbs.insert_proxies(proxydb, new, fn) + return 0 + return 1 class Leechered(threading.Thread): - def __init__(self, url, stale_count, error, retrievals, proxies_added, content_type, proxy): - self.status = 'nok' - self.proxylist = [] - self.running = True - self.url = url - self.stale_count = stale_count - self.error = error - self.retrievals = retrievals - self.proxies_added = proxies_added - self.content_type = content_type - self.proxy = proxy - self.execute = '' - threading.Thread.__init__(self) + def __init__(self, url, stale_count, error, retrievals, proxies_added, content_type, proxy): + self.status = 'nok' + self.proxylist = [] + self.running = True + self.url = url + self.stale_count = stale_count + self.error = error + self.retrievals = retrievals + self.proxies_added = proxies_added + self.content_type = content_type + self.proxy = proxy + self.execute = '' + threading.Thread.__init__(self) - def retrieve(self): - return self.url, self.proxylist, self.stale_count, self.error, self.retrievals, self.content_type, self.proxies_added, self.execute - def status(self): - return self.status + def retrieve(self): + return self.url, self.proxylist, self.stale_count, self.error, self.retrievals, self.content_type, self.proxies_added, self.execute + def status(self): + return self.status - def run(self): - self.status = 'nok' + def run(self): + self.status = 'nok' - if not self.content_type: self.content_type = get_content_type(self.url, self.proxy) + if not self.content_type: self.content_type = get_content_type(self.url, self.proxy) - if is_good_content_type(self.content_type): - try: - content = fetch.fetch_contents(self.url, proxy=self.proxy) - except KeyboardInterrupt as e: - raise e - except Exception as e: - _log('%s: fetch error: %s' % (self.url.split('/')[2], str(e)), 'error') - content = '' - else: - content = '' + if is_good_content_type(self.content_type): + try: + content = fetch.fetch_contents(self.url, proxy=self.proxy) + except KeyboardInterrupt as e: + raise e + except Exception as e: + _log('%s: fetch error: %s' % (self.url.split('/')[2], str(e)), 'error') + content = '' + else: + content = '' - unique = fetch.extract_proxies(content, filter_known=False) - self.proxylist = [ proxy for proxy in unique if not fetch.is_known_proxy(proxy) ] - proxy_count = len(self.proxylist) + unique = fetch.extract_proxies(content, filter_known=False) + self.proxylist = [ proxy for proxy in unique if not fetch.is_known_proxy(proxy) ] + proxy_count = len(self.proxylist) - if self.retrievals == 0: # new site - if content and not self.proxylist: # site works but has zero proxy addresses - self.error += 1 - self.stale_count += 1 - elif proxy_count: - self.error = 0 - self.stale_count = 0 - else: - self.error += 2 - self.stale_count += 2 - else: # not a new site - # proxylist is empty - if not proxy_count: - self.stale_count += 1 - # proxylist is not empty: site is working - else: - self.stale_count = 0 - self.error = 0 - # site has no content - if not content: - self.error += 1 - self.stale_count += 1 - #else: - # self.retrievals += 1 - # self.error = 0 - # self.stale_count = 0 - # site has proxies - if proxy_count: - self.error = 0 - self.stale_count = 0 - extract_urls(content, self.url) + if self.retrievals == 0: # new site + if content and not self.proxylist: # site works but has zero proxy addresses + self.error += 1 + self.stale_count += 1 + elif proxy_count: + self.error = 0 + self.stale_count = 0 + else: + self.error += 2 + self.stale_count += 2 + else: # not a new site + # proxylist is empty + if not proxy_count: + self.stale_count += 1 + # proxylist is not empty: site is working + else: + self.stale_count = 0 + self.error = 0 + # site has no content + if not content: + self.error += 1 + self.stale_count += 1 + #else: + # self.retrievals += 1 + # self.error = 0 + # self.stale_count = 0 + # site has proxies + if proxy_count: + self.error = 0 + self.stale_count = 0 + extract_urls(content, self.url) - self.execute = (self.error, self.stale_count, int(time.time()), self.retrievals, self.proxies_added+len(self.proxylist), self.content_type, self.url) - self.status = 'ok' + self.execute = (self.error, self.stale_count, int(time.time()), self.retrievals, self.proxies_added+len(self.proxylist), self.content_type, self.url) + self.status = 'ok' if __name__ == '__main__': - config.load() - errors = config.validate() - if errors: - for e in errors: - _log(e, 'error') - sys.exit(1) - fetch.set_config(config) + config.load() + errors = config.validate() + if errors: + for e in errors: + _log(e, 'error') + sys.exit(1) + fetch.set_config(config) - # handle --nobs flag - args = config.aparser.parse_args() - if args.nobs: - set_nobs(True) + # handle --nobs flag + args = config.aparser.parse_args() + if args.nobs: + set_nobs(True) - proxydb = mysqlite.mysqlite(config.watchd.database, str) - dbs.create_table_if_not_exists(proxydb, 'proxylist') - fetch.init_known_proxies(proxydb) + proxydb = mysqlite.mysqlite(config.watchd.database, str) + dbs.create_table_if_not_exists(proxydb, 'proxylist') + fetch.init_known_proxies(proxydb) - with open('urignore.txt', 'r') as f: - urignore = [ i.strip() for i in f.read().split('\n') if i.strip() ] + with open('urignore.txt', 'r') as f: + urignore = [ i.strip() for i in f.read().split('\n') if i.strip() ] - urldb = mysqlite.mysqlite(config.ppf.database, str) - dbs.create_table_if_not_exists(urldb, 'uris') - import_from_file('import.txt', urldb) - if len(sys.argv) == 3 and sys.argv[1] == "--file": - sys.exit(import_proxies_from_file(proxydb, sys.argv[2])) + urldb = mysqlite.mysqlite(config.ppf.database, str) + dbs.create_table_if_not_exists(urldb, 'uris') + import_from_file('import.txt', urldb) + if len(sys.argv) == 3 and sys.argv[1] == "--file": + sys.exit(import_proxies_from_file(proxydb, sys.argv[2])) - # start proxy watcher - if config.watchd.threads > 0: - watcherd = proxywatchd.Proxywatchd() - watcherd.start() - else: - watcherd = None + # start proxy watcher + if config.watchd.threads > 0: + watcherd = proxywatchd.Proxywatchd() + watcherd.start() + else: + watcherd = None - qurl = 'SELECT url,stale_count,error,retrievals,proxies_added,content_type FROM uris WHERE error < ? and (check_time+?+((error+stale_count)*?) 180: - _log('running %d thread(s) over %d' % (len(threads), config.ppf.threads), 'ppf') - statusmsg = time.time() - if not rows: - if (time.time() - reqtime) > 3: - rows = urldb.execute(qurl, (config.ppf.max_fail, config.ppf.checktime, config.ppf.perfail_checktime, int(time.time()))).fetchall() - reqtime = time.time() - if len(rows) < config.ppf.threads: - time.sleep(60) - rows = [] - else: - _log('handing %d job(s) to %d thread(s)' % ( len(rows), config.ppf.threads ), 'ppf') + qurl = 'SELECT url,stale_count,error,retrievals,proxies_added,content_type FROM uris WHERE error < ? and (check_time+?+((error+stale_count)*?) 180: + _log('running %d thread(s) over %d' % (len(threads), config.ppf.threads), 'ppf') + statusmsg = time.time() + if not rows: + if (time.time() - reqtime) > 3: + rows = urldb.execute(qurl, (config.ppf.max_fail, config.ppf.checktime, config.ppf.perfail_checktime, int(time.time()))).fetchall() + reqtime = time.time() + if len(rows) < config.ppf.threads: + time.sleep(60) + rows = [] + else: + _log('handing %d job(s) to %d thread(s)' % ( len(rows), config.ppf.threads ), 'ppf') - _proxylist = [ '%s://%s' % (p[0], p[1]) for p in proxydb.execute('SELECT proto,proxy from proxylist where failed=0').fetchall() ] - if not _proxylist: _proxylist = None + _proxylist = [ '%s://%s' % (p[0], p[1]) for p in proxydb.execute('SELECT proto,proxy from proxylist where failed=0').fetchall() ] + if not _proxylist: _proxylist = None - for thread in threads: - if thread.status == 'ok': - url, proxylist, stale_count, error, retrievals, content_type, proxies_added, execute = thread.retrieve() - new = [ p for p in proxylist if not fetch.is_known_proxy(p) ] - if new: - fetch.add_known_proxies(new) - execute = (error, stale_count, int(time.time()), retrievals, proxies_added+len(new), content_type, url) - urldb.execute('UPDATE uris SET error=?,stale_count=?,check_time=?,retrievals=?,proxies_added=?,content_type=? where url=?', execute) - urldb.commit() - if new: dbs.insert_proxies(proxydb, new, url) + for thread in threads: + if thread.status == 'ok': + url, proxylist, stale_count, error, retrievals, content_type, proxies_added, execute = thread.retrieve() + new = [ p for p in proxylist if not fetch.is_known_proxy(p) ] + if new: + fetch.add_known_proxies(new) + execute = (error, stale_count, int(time.time()), retrievals, proxies_added+len(new), content_type, url) + urldb.execute('UPDATE uris SET error=?,stale_count=?,check_time=?,retrievals=?,proxies_added=?,content_type=? where url=?', execute) + urldb.commit() + if new: dbs.insert_proxies(proxydb, new, url) - threads = [ thread for thread in threads if thread.is_alive() ] - if len(threads) < config.ppf.threads and rows: - p = random.sample(_proxylist, 5) if _proxylist is not None else None - row = random.choice(rows) - urldb.execute('UPDATE uris SET check_time=? where url=?', (time.time(), row[0])) - urldb.commit() - rows.remove(row) - t = Leechered(row[0], row[1], row[2], row[3], row[4], row[5], p) - threads.append(t) - t.start() + threads = [ thread for thread in threads if thread.is_alive() ] + if len(threads) < config.ppf.threads and rows: + p = random.sample(_proxylist, 5) if _proxylist is not None else None + row = random.choice(rows) + urldb.execute('UPDATE uris SET check_time=? where url=?', (time.time(), row[0])) + urldb.commit() + rows.remove(row) + t = Leechered(row[0], row[1], row[2], row[3], row[4], row[5], p) + threads.append(t) + t.start() - except KeyboardInterrupt: - if watcherd: - watcherd.stop() - watcherd.finish() - break + except KeyboardInterrupt: + if watcherd: + watcherd.stop() + watcherd.finish() + break - _log('ppf stopped', 'info') + _log('ppf stopped', 'info') diff --git a/proxywatchd.py b/proxywatchd.py index f661590..b83f0a7 100644 --- a/proxywatchd.py +++ b/proxywatchd.py @@ -1,16 +1,23 @@ #!/usr/bin/env python2 import threading -import time, random, string, re, copy -import Queue +import time +import random +import string +import re import heapq + try: - import IP2Location - import os - geodb = IP2Location.IP2Location(os.path.join("data", "IP2LOCATION-LITE-DB1.BIN")) - geolite = True + import Queue +except ImportError: + import queue as Queue +try: + import IP2Location + import os + geodb = IP2Location.IP2Location(os.path.join("data", "IP2LOCATION-LITE-DB1.BIN")) + geolite = True except (ImportError, IOError): - geolite = False + geolite = False from config import Config @@ -22,805 +29,805 @@ import connection_pool config = Config() _run_standalone = False -cached_dns = dict() +cached_dns = {} regexes = { - 'www.facebook.com': 'X-FB-Debug', - 'www.fbcdn.net': 'X-FB-Debug', - 'www.reddit.com': 'x-clacks-overhead', - 'www.twitter.com': 'x-connection-hash', - 't.co': 'x-connection-hash', - 'www.msn.com': 'x-aspnetmvc-version', - 'www.bing.com': 'p3p', - 'www.ask.com': 'x-served-by', - 'www.hotmail.com': 'x-msedge-ref', - 'www.bbc.co.uk': 'x-bbc-edge-cache-status', - 'www.skype.com': 'X-XSS-Protection', - 'www.alibaba.com': 'object-status', - 'www.mozilla.org': 'cf-ray', - 'www.cloudflare.com': 'cf-ray', - 'www.wikimedia.org': 'x-client-ip', - 'www.vk.com': 'x-frontend', - 'www.tinypic.com': 'x-amz-cf-pop', - 'www.netflix.com': 'X-Netflix.proxy.execution-time', - 'www.amazon.de': 'x-amz-cf-id', - 'www.reuters.com': 'x-amz-cf-id', - 'www.ikea.com': 'x-frame-options', - 'www.twitpic.com': 'timing-allow-origin', - 'www.digg.com': 'cf-request-id', - 'www.wikia.com': 'x-served-by', - 'www.wp.com': 'x-ac', - 'www.last.fm': 'x-timer', - 'www.usps.com': 'x-ruleset-version', - 'www.linkedin.com': 'x-li-uuid', - 'www.vimeo.com': 'x-timer', - 'www.yelp.com': 'x-timer', - 'www.ebay.com': 'x-envoy-upstream-service-time', - 'www.wikihow.com': 'x-c', - 'www.archive.org': 'referrer-policy', - 'www.pandora.tv': 'X-UA-Compatible', - 'www.w3.org': 'x-backend', - 'www.time.com': 'x-amz-cf-pop' + 'www.facebook.com': 'X-FB-Debug', + 'www.fbcdn.net': 'X-FB-Debug', + 'www.reddit.com': 'x-clacks-overhead', + 'www.twitter.com': 'x-connection-hash', + 't.co': 'x-connection-hash', + 'www.msn.com': 'x-aspnetmvc-version', + 'www.bing.com': 'p3p', + 'www.ask.com': 'x-served-by', + 'www.hotmail.com': 'x-msedge-ref', + 'www.bbc.co.uk': 'x-bbc-edge-cache-status', + 'www.skype.com': 'X-XSS-Protection', + 'www.alibaba.com': 'object-status', + 'www.mozilla.org': 'cf-ray', + 'www.cloudflare.com': 'cf-ray', + 'www.wikimedia.org': 'x-client-ip', + 'www.vk.com': 'x-frontend', + 'www.tinypic.com': 'x-amz-cf-pop', + 'www.netflix.com': 'X-Netflix.proxy.execution-time', + 'www.amazon.de': 'x-amz-cf-id', + 'www.reuters.com': 'x-amz-cf-id', + 'www.ikea.com': 'x-frame-options', + 'www.twitpic.com': 'timing-allow-origin', + 'www.digg.com': 'cf-request-id', + 'www.wikia.com': 'x-served-by', + 'www.wp.com': 'x-ac', + 'www.last.fm': 'x-timer', + 'www.usps.com': 'x-ruleset-version', + 'www.linkedin.com': 'x-li-uuid', + 'www.vimeo.com': 'x-timer', + 'www.yelp.com': 'x-timer', + 'www.ebay.com': 'x-envoy-upstream-service-time', + 'www.wikihow.com': 'x-c', + 'www.archive.org': 'referrer-policy', + 'www.pandora.tv': 'X-UA-Compatible', + 'www.w3.org': 'x-backend', + 'www.time.com': 'x-amz-cf-pop' } class Stats(): - """Track and report runtime statistics.""" + """Track and report runtime statistics.""" - def __init__(self): - self.lock = threading.Lock() - self.tested = 0 - self.passed = 0 - self.failed = 0 - self.start_time = time.time() - self.last_report = time.time() - # Failure category tracking - self.fail_categories = {} + def __init__(self): + self.lock = threading.Lock() + self.tested = 0 + self.passed = 0 + self.failed = 0 + self.start_time = time.time() + self.last_report = time.time() + # Failure category tracking + self.fail_categories = {} - def record(self, success, category=None): - with self.lock: - self.tested += 1 - if success: - self.passed += 1 - else: - self.failed += 1 - if category: - self.fail_categories[category] = self.fail_categories.get(category, 0) + 1 + def record(self, success, category=None): + with self.lock: + self.tested += 1 + if success: + self.passed += 1 + else: + self.failed += 1 + if category: + self.fail_categories[category] = self.fail_categories.get(category, 0) + 1 - def should_report(self, interval): - return (time.time() - self.last_report) >= interval + def should_report(self, interval): + return (time.time() - self.last_report) >= interval - def report(self): - with self.lock: - self.last_report = time.time() - elapsed = time.time() - self.start_time - rate = try_div(self.tested, elapsed) - pct = try_div(self.passed * 100.0, self.tested) - base = 'tested=%d passed=%d (%.1f%%) rate=%.2f/s uptime=%dm' % ( - self.tested, self.passed, pct, rate, int(elapsed / 60)) - # Add failure breakdown if there are failures - if self.fail_categories: - cats = ' '.join('%s=%d' % (k, v) for k, v in sorted(self.fail_categories.items())) - return '%s [%s]' % (base, cats) - return base + def report(self): + with self.lock: + self.last_report = time.time() + elapsed = time.time() - self.start_time + rate = try_div(self.tested, elapsed) + pct = try_div(self.passed * 100.0, self.tested) + base = 'tested=%d passed=%d (%.1f%%) rate=%.2f/s uptime=%dm' % ( + self.tested, self.passed, pct, rate, int(elapsed / 60)) + # Add failure breakdown if there are failures + if self.fail_categories: + cats = ' '.join('%s=%d' % (k, v) for k, v in sorted(self.fail_categories.items())) + return '%s [%s]' % (base, cats) + return base def try_div(a, b): - if b != 0: return a/float(b) - return 0 + if b != 0: return a/float(b) + return 0 class PriorityJobQueue(object): - """Priority queue for proxy test jobs. + """Priority queue for proxy test jobs. - Lower priority number = higher priority. - Priority 0: New proxies (never tested) - Priority 1: Recently working (no failures, has successes) - Priority 2: Low fail count (< 3 failures) - Priority 3: Medium fail count - Priority 4: High fail count - """ + Lower priority number = higher priority. + Priority 0: New proxies (never tested) + Priority 1: Recently working (no failures, has successes) + Priority 2: Low fail count (< 3 failures) + Priority 3: Medium fail count + Priority 4: High fail count + """ - def __init__(self): - self.heap = [] - self.lock = threading.Lock() - self.not_empty = threading.Condition(self.lock) - self.counter = 0 # tie-breaker for equal priorities + def __init__(self): + self.heap = [] + self.lock = threading.Lock() + self.not_empty = threading.Condition(self.lock) + self.counter = 0 # tie-breaker for equal priorities - def put(self, job, priority=3): - """Add job with priority (lower = higher priority).""" - with self.lock: - heapq.heappush(self.heap, (priority, self.counter, job)) - self.counter += 1 - self.not_empty.notify() + def put(self, job, priority=3): + """Add job with priority (lower = higher priority).""" + with self.lock: + heapq.heappush(self.heap, (priority, self.counter, job)) + self.counter += 1 + self.not_empty.notify() - def get(self, timeout=None): - """Get highest priority job. Raises Queue.Empty on timeout.""" - with self.not_empty: - if timeout is None: - while not self.heap: - self.not_empty.wait() - else: - end_time = time.time() + timeout - while not self.heap: - remaining = end_time - time.time() - if remaining <= 0: - raise Queue.Empty() - self.not_empty.wait(remaining) - _, _, job = heapq.heappop(self.heap) - return job + def get(self, timeout=None): + """Get highest priority job. Raises Queue.Empty on timeout.""" + with self.not_empty: + if timeout is None: + while not self.heap: + self.not_empty.wait() + else: + end_time = time.time() + timeout + while not self.heap: + remaining = end_time - time.time() + if remaining <= 0: + raise Queue.Empty() + self.not_empty.wait(remaining) + _, _, job = heapq.heappop(self.heap) + return job - def get_nowait(self): - """Get job without waiting. Raises Queue.Empty if empty.""" - with self.lock: - if not self.heap: - raise Queue.Empty() - _, _, job = heapq.heappop(self.heap) - return job + def get_nowait(self): + """Get job without waiting. Raises Queue.Empty if empty.""" + with self.lock: + if not self.heap: + raise Queue.Empty() + _, _, job = heapq.heappop(self.heap) + return job - def empty(self): - """Check if queue is empty.""" - with self.lock: - return len(self.heap) == 0 + def empty(self): + """Check if queue is empty.""" + with self.lock: + return len(self.heap) == 0 - def qsize(self): - """Return queue size.""" - with self.lock: - return len(self.heap) + def qsize(self): + """Return queue size.""" + with self.lock: + return len(self.heap) - def task_done(self): - """Compatibility method (no-op for heap queue).""" - pass + def task_done(self): + """Compatibility method (no-op for heap queue).""" + pass def calculate_priority(failcount, success_count, max_fail): - """Calculate job priority based on proxy state. + """Calculate job priority based on proxy state. - Returns: - int: Priority 0-4 (lower = higher priority) - """ - # New proxy (never successfully tested) - if success_count == 0 and failcount == 0: - return 0 - # Recently working (no current failures) - if failcount == 0: - return 1 - # Low fail count - if failcount < 3: - return 2 - # Medium fail count - if failcount < max_fail // 2: - return 3 - # High fail count - return 4 + Returns: + int: Priority 0-4 (lower = higher priority) + """ + # New proxy (never successfully tested) + if success_count == 0 and failcount == 0: + return 0 + # Recently working (no current failures) + if failcount == 0: + return 1 + # Low fail count + if failcount < 3: + return 2 + # Medium fail count + if failcount < max_fail // 2: + return 3 + # High fail count + return 4 def socks4_resolve(srvname, server_port): - srv = srvname - if srv in cached_dns: - srv = cached_dns[srvname] - if config.watchd.debug: - _log("using cached ip (%s) for %s"%(srv, srvname), "debug") - else: - dns_fail = False - try: - af, sa = rocksock.resolve(rocksock.RocksockHostinfo(srvname, server_port), want_v4=True) - if sa is not None: - cached_dns[srvname] = sa[0] - srv = sa[0] - else: dns_fail = True - except rocksock.RocksockException as e: - assert(e.get_errortype() == rocksock.RS_ET_GAI) - dns_fail = True - if dns_fail: - fail_inc = 0 - _log("could not resolve connection target %s"%srvname, "ERROR") - return False - return srv + srv = srvname + if srv in cached_dns: + srv = cached_dns[srvname] + if config.watchd.debug: + _log("using cached ip (%s) for %s"%(srv, srvname), "debug") + else: + dns_fail = False + try: + af, sa = rocksock.resolve(rocksock.RocksockHostinfo(srvname, server_port), want_v4=True) + if sa is not None: + cached_dns[srvname] = sa[0] + srv = sa[0] + else: dns_fail = True + except rocksock.RocksockException as e: + assert(e.get_errortype() == rocksock.RS_ET_GAI) + dns_fail = True + if dns_fail: + fail_inc = 0 + _log("could not resolve connection target %s"%srvname, "ERROR") + return False + return srv class ProxyTestState(): - """Thread-safe state for a proxy being tested against multiple targets. + """Thread-safe state for a proxy being tested against multiple targets. - Results from TargetTestJob instances are aggregated here. - When all tests complete, evaluate() determines final pass/fail. - """ - def __init__(self, ip, port, proto, failcount, success_count, total_duration, - country, mitm, consecutive_success, num_targets=3, oldies=False): - self.ip = ip - self.port = int(port) - self.proxy = '%s:%s' % (ip, port) - self.proto = proto - self.failcount = failcount - self.checktime = None - self.success_count = success_count - self.total_duration = total_duration - self.country = country - self.mitm = mitm - self.consecutive_success = consecutive_success - self.isoldies = oldies - self.num_targets = num_targets + Results from TargetTestJob instances are aggregated here. + When all tests complete, evaluate() determines final pass/fail. + """ + def __init__(self, ip, port, proto, failcount, success_count, total_duration, + country, mitm, consecutive_success, num_targets=3, oldies=False): + self.ip = ip + self.port = int(port) + self.proxy = '%s:%s' % (ip, port) + self.proto = proto + self.failcount = failcount + self.checktime = None + self.success_count = success_count + self.total_duration = total_duration + self.country = country + self.mitm = mitm + self.consecutive_success = consecutive_success + self.isoldies = oldies + self.num_targets = num_targets - # thread-safe result accumulation - self.lock = threading.Lock() - self.results = [] # list of (success, proto, duration, srv, tor, ssl) - self.completed = False + # thread-safe result accumulation + self.lock = threading.Lock() + self.results = [] # list of (success, proto, duration, srv, tor, ssl) + self.completed = False - def record_result(self, success, proto=None, duration=0, srv=None, tor=None, ssl=None, category=None): - """Record a single target test result. Thread-safe.""" - with self.lock: - self.results.append({ - 'success': success, - 'proto': proto, - 'duration': duration, - 'srv': srv, - 'tor': tor, - 'ssl': ssl, - 'category': category - }) + def record_result(self, success, proto=None, duration=0, srv=None, tor=None, ssl=None, category=None): + """Record a single target test result. Thread-safe.""" + with self.lock: + self.results.append({ + 'success': success, + 'proto': proto, + 'duration': duration, + 'srv': srv, + 'tor': tor, + 'ssl': ssl, + 'category': category + }) - def is_complete(self): - """Check if all target tests have finished.""" - with self.lock: - return len(self.results) >= self.num_targets + def is_complete(self): + """Check if all target tests have finished.""" + with self.lock: + return len(self.results) >= self.num_targets - def rwip(self, ip): - n = [] - for b in ip.split('.'): - while b[0] == 0 and len(b) > 1: b = b[:1] - n.append(b) - return '.'.join(n) + def rwip(self, ip): + n = [] + for b in ip.split('.'): + while b[0] == 0 and len(b) > 1: b = b[:1] + n.append(b) + return '.'.join(n) - def evaluate(self): - """Evaluate results after all tests complete. + def evaluate(self): + """Evaluate results after all tests complete. - Returns: - (success, category) tuple where success is bool and category is - the dominant failure type (or None on success) - """ - with self.lock: - if self.completed: - return (self.failcount == 0, None) - self.completed = True - self.checktime = int(time.time()) + Returns: + (success, category) tuple where success is bool and category is + the dominant failure type (or None on success) + """ + with self.lock: + if self.completed: + return (self.failcount == 0, None) + self.completed = True + self.checktime = int(time.time()) - successes = [r for r in self.results if r['success']] - failures = [r for r in self.results if not r['success']] - num_success = len(successes) + successes = [r for r in self.results if r['success']] + failures = [r for r in self.results if not r['success']] + num_success = len(successes) - # Determine dominant failure category - fail_category = None - if failures: - cats = {} - for f in failures: - cat = f.get('category') or 'other' - cats[cat] = cats.get(cat, 0) + 1 - if cats: - fail_category = max(cats.keys(), key=lambda k: cats[k]) + # Determine dominant failure category + fail_category = None + if failures: + cats = {} + for f in failures: + cat = f.get('category') or 'other' + cats[cat] = cats.get(cat, 0) + 1 + if cats: + fail_category = max(cats.keys(), key=lambda k: cats[k]) - # require majority success (2/3) - if num_success >= 2: - # use last successful result for metrics - last_good = successes[-1] + # require majority success (2/3) + if num_success >= 2: + # use last successful result for metrics + last_good = successes[-1] - if geolite and self.country is None: - self.ip = self.rwip(self.ip) - rec = geodb.get_all(self.ip) - if rec is not None and rec.country_short: - self.country = rec.country_short + if geolite and self.country is None: + self.ip = self.rwip(self.ip) + rec = geodb.get_all(self.ip) + if rec is not None and rec.country_short: + self.country = rec.country_short - self.proto = last_good['proto'] - self.failcount = 0 - if (self.consecutive_success % 3) == 0: - self.mitm = 0 - self.consecutive_success += 1 - self.success_count += 1 - self.total_duration += int(last_good['duration'] * 1000) + self.proto = last_good['proto'] + self.failcount = 0 + if (self.consecutive_success % 3) == 0: + self.mitm = 0 + self.consecutive_success += 1 + self.success_count += 1 + self.total_duration += int(last_good['duration'] * 1000) - torstats = "" if len(config.torhosts) == 1 else ' tor: %s;' % last_good['tor'] - _log('%s://%s:%d (%s) d: %.2f sec(s);%s srv: %s; ssl: %s; %d/%d targets' % ( - last_good['proto'], self.ip, self.port, self.country, - last_good['duration'], torstats, last_good['srv'], str(last_good['ssl']), - num_success, self.num_targets), 'xxxxx') - return (True, None) + torstats = "" if len(config.torhosts) == 1 else ' tor: %s;' % last_good['tor'] + _log('%s://%s:%d (%s) d: %.2f sec(s);%s srv: %s; ssl: %s; %d/%d targets' % ( + last_good['proto'], self.ip, self.port, self.country, + last_good['duration'], torstats, last_good['srv'], str(last_good['ssl']), + num_success, self.num_targets), 'xxxxx') + return (True, None) - elif num_success == 1: - # partial success - don't increment fail, but reset consecutive - self.consecutive_success = 0 - _log('%s:%d partial success %d/%d targets' % ( - self.ip, self.port, num_success, self.num_targets), 'debug') - return (False, fail_category) + elif num_success == 1: + # partial success - don't increment fail, but reset consecutive + self.consecutive_success = 0 + _log('%s:%d partial success %d/%d targets' % ( + self.ip, self.port, num_success, self.num_targets), 'debug') + return (False, fail_category) - else: - self.failcount += 1 - self.consecutive_success = 0 - return (False, fail_category) + else: + self.failcount += 1 + self.consecutive_success = 0 + return (False, fail_category) class TargetTestJob(): - """Job to test a single proxy against a single target. + """Job to test a single proxy against a single target. - Multiple TargetTestJob instances share the same ProxyTestState, - allowing tests to be interleaved with other proxies in the queue. - """ - def __init__(self, proxy_state, target_srv, checktype, worker_id=None): - self.proxy_state = proxy_state - self.target_srv = target_srv - self.checktype = checktype - self.worker_id = worker_id + Multiple TargetTestJob instances share the same ProxyTestState, + allowing tests to be interleaved with other proxies in the queue. + """ + def __init__(self, proxy_state, target_srv, checktype, worker_id=None): + self.proxy_state = proxy_state + self.target_srv = target_srv + self.checktype = checktype + self.worker_id = worker_id - def run(self): - """Test the proxy against this job's target server.""" - sock, proto, duration, tor, srv, failinc, is_ssl, err_cat = self._connect_and_test() + def run(self): + """Test the proxy against this job's target server.""" + sock, proto, duration, tor, srv, failinc, is_ssl, err_cat = self._connect_and_test() - if not sock: - self.proxy_state.record_result(False, category=err_cat) - return + if not sock: + self.proxy_state.record_result(False, category=err_cat) + return - try: - recv = sock.recv(-1) - regex = '^(:|NOTICE|ERROR)' if self.checktype == 'irc' else regexes[srv] + try: + recv = sock.recv(-1) + regex = '^(:|NOTICE|ERROR)' if self.checktype == 'irc' else regexes[srv] - if re.search(regex, recv, re.IGNORECASE): - elapsed = time.time() - duration - self.proxy_state.record_result( - True, proto=proto, duration=elapsed, - srv=srv, tor=tor, ssl=is_ssl - ) - else: - self.proxy_state.record_result(False, category='other') + if re.search(regex, recv, re.IGNORECASE): + elapsed = time.time() - duration + self.proxy_state.record_result( + True, proto=proto, duration=elapsed, + srv=srv, tor=tor, ssl=is_ssl + ) + else: + self.proxy_state.record_result(False, category='other') - except KeyboardInterrupt as e: - sock.disconnect() - raise e - except rocksock.RocksockException as e: - self.proxy_state.record_result(False, category=categorize_error(e)) - finally: - sock.disconnect() + except KeyboardInterrupt as e: + sock.disconnect() + raise e + except rocksock.RocksockException as e: + self.proxy_state.record_result(False, category=categorize_error(e)) + finally: + sock.disconnect() - def _connect_and_test(self): - """Connect to target through the proxy and send test packet.""" - ps = self.proxy_state - srvname = self.target_srv.strip() if self.checktype == 'irc' else self.target_srv + def _connect_and_test(self): + """Connect to target through the proxy and send test packet.""" + ps = self.proxy_state + srvname = self.target_srv.strip() if self.checktype == 'irc' else self.target_srv - use_ssl = random.choice([0, 1]) if config.watchd.use_ssl == 2 else config.watchd.use_ssl - if ps.consecutive_success > 0 and (ps.consecutive_success % 3) == 0: - use_ssl = 1 + use_ssl = random.choice([0, 1]) if config.watchd.use_ssl == 2 else config.watchd.use_ssl + if ps.consecutive_success > 0 and (ps.consecutive_success % 3) == 0: + use_ssl = 1 - if self.checktype == 'irc': - server_port = 6697 if use_ssl else 6667 - else: - server_port = 443 if use_ssl else 80 + if self.checktype == 'irc': + server_port = 6697 if use_ssl else 6667 + else: + server_port = 443 if use_ssl else 80 - verifycert = True if use_ssl else False - protos = ['http', 'socks5', 'socks4'] if ps.proto is None else [ps.proto] - last_error_category = None + verifycert = True if use_ssl else False + protos = ['http', 'socks5', 'socks4'] if ps.proto is None else [ps.proto] + last_error_category = None - # Get Tor host from pool (with worker affinity) - pool = connection_pool.get_pool() + # Get Tor host from pool (with worker affinity) + pool = connection_pool.get_pool() - for proto in protos: - if pool: - torhost = pool.get_tor_host(self.worker_id) - else: - torhost = random.choice(config.torhosts) - if proto == 'socks4': - srv = socks4_resolve(srvname, server_port) - else: - srv = srvname - if not srv: - continue + for proto in protos: + if pool: + torhost = pool.get_tor_host(self.worker_id) + else: + torhost = random.choice(config.torhosts) + if proto == 'socks4': + srv = socks4_resolve(srvname, server_port) + else: + srv = srvname + if not srv: + continue - duration = time.time() - proxies = [ - rocksock.RocksockProxyFromURL('socks5://%s' % torhost), - rocksock.RocksockProxyFromURL('%s://%s:%s' % (proto, ps.ip, ps.port)), - ] + duration = time.time() + proxies = [ + rocksock.RocksockProxyFromURL('socks5://%s' % torhost), + rocksock.RocksockProxyFromURL('%s://%s:%s' % (proto, ps.ip, ps.port)), + ] - try: - sock = rocksock.Rocksock(host=srv, port=server_port, ssl=use_ssl, - proxies=proxies, timeout=config.watchd.timeout, - verifycert=verifycert) - sock.connect() - if self.checktype == 'irc': - sock.send('NICK\n') - else: - sock.send('HEAD / HTTP/1.0\r\nHost: %s\r\n\r\n' % srvname) - # Record success in pool - if pool: - pool.record_success(torhost, time.time() - duration) - return sock, proto, duration, torhost, srvname, 0, use_ssl, None + try: + sock = rocksock.Rocksock(host=srv, port=server_port, ssl=use_ssl, + proxies=proxies, timeout=config.watchd.timeout, + verifycert=verifycert) + sock.connect() + if self.checktype == 'irc': + sock.send('NICK\n') + else: + sock.send('HEAD / HTTP/1.0\r\nHost: %s\r\n\r\n' % srvname) + # Record success in pool + if pool: + pool.record_success(torhost, time.time() - duration) + return sock, proto, duration, torhost, srvname, 0, use_ssl, None - except rocksock.RocksockException as e: - last_error_category = categorize_error(e) - if config.watchd.debug: - _log("proxy failed: %s://%s:%d: %s [%s]" % (proto, ps.ip, ps.port, - e.get_errormessage(), last_error_category), 'debug') + except rocksock.RocksockException as e: + last_error_category = categorize_error(e) + if config.watchd.debug: + _log("proxy failed: %s://%s:%d: %s [%s]" % (proto, ps.ip, ps.port, + e.get_errormessage(), last_error_category), 'debug') - et = e.get_errortype() - err = e.get_error() - fp = e.get_failedproxy() + et = e.get_errortype() + err = e.get_error() + fp = e.get_failedproxy() - sock.disconnect() + sock.disconnect() - if et == rocksock.RS_ET_OWN: - if fp == 1 and (err == rocksock.RS_E_REMOTE_DISCONNECTED or - err == rocksock.RS_E_HIT_TIMEOUT): - break - elif fp == 0 and err == rocksock.RS_E_TARGET_CONN_REFUSED: - # Tor connection failed - record in pool - if pool: - pool.record_failure(torhost) - if random.randint(0, (config.watchd.threads - 1) / 2) == 0: - _log("could not connect to tor, sleep 5s", "ERROR") - time.sleep(5) - elif et == rocksock.RS_ET_GAI: - _log("could not resolve connection target %s" % srvname, "ERROR") - break - elif err == rocksock.RS_E_SSL_CERTIFICATE_ERROR: - ps.mitm = 1 + if et == rocksock.RS_ET_OWN: + if fp == 1 and (err == rocksock.RS_E_REMOTE_DISCONNECTED or + err == rocksock.RS_E_HIT_TIMEOUT): + break + elif fp == 0 and err == rocksock.RS_E_TARGET_CONN_REFUSED: + # Tor connection failed - record in pool + if pool: + pool.record_failure(torhost) + if random.randint(0, (config.watchd.threads - 1) / 2) == 0: + _log("could not connect to tor, sleep 5s", "ERROR") + time.sleep(5) + elif et == rocksock.RS_ET_GAI: + _log("could not resolve connection target %s" % srvname, "ERROR") + break + elif err == rocksock.RS_E_SSL_CERTIFICATE_ERROR: + ps.mitm = 1 - except KeyboardInterrupt as e: - raise e + except KeyboardInterrupt as e: + raise e - return None, None, None, None, None, 1, use_ssl, last_error_category + return None, None, None, None, None, 1, use_ssl, last_error_category class WorkerThread(): - def __init__(self, id, job_queue, result_queue): - self.id = id - self.done = threading.Event() - self.thread = None - self.job_queue = job_queue # shared input queue - self.result_queue = result_queue # shared output queue - def stop(self): - self.done.set() - def term(self): - if self.thread: self.thread.join() - def start_thread(self): - self.thread = threading.Thread(target=self.workloop) - self.thread.start() - def workloop(self): - job_count = 0 - duration_total = 0 - while not self.done.is_set(): - try: - job = self.job_queue.get(timeout=0.5) - except Queue.Empty: - continue - nao = time.time() - # Assign worker ID for connection pool affinity - job.worker_id = self.id - job.run() - spent = time.time() - nao - job_count += 1 - duration_total += spent - self.result_queue.put(job) - self.job_queue.task_done() - if self.thread and job_count > 0: - avg_t = try_div(duration_total, job_count) - _log("terminated, %d jobs, avg.time %.2f" % (job_count, avg_t), self.id) + def __init__(self, id, job_queue, result_queue): + self.id = id + self.done = threading.Event() + self.thread = None + self.job_queue = job_queue # shared input queue + self.result_queue = result_queue # shared output queue + def stop(self): + self.done.set() + def term(self): + if self.thread: self.thread.join() + def start_thread(self): + self.thread = threading.Thread(target=self.workloop) + self.thread.start() + def workloop(self): + job_count = 0 + duration_total = 0 + while not self.done.is_set(): + try: + job = self.job_queue.get(timeout=0.5) + except Queue.Empty: + continue + nao = time.time() + # Assign worker ID for connection pool affinity + job.worker_id = self.id + job.run() + spent = time.time() - nao + job_count += 1 + duration_total += spent + self.result_queue.put(job) + self.job_queue.task_done() + if self.thread and job_count > 0: + avg_t = try_div(duration_total, job_count) + _log("terminated, %d jobs, avg.time %.2f" % (job_count, avg_t), self.id) class Proxywatchd(): - def stop(self): - _log('halting... (%d thread(s))' % len(self.threads), 'watchd') - self.stopping.set() + def stop(self): + _log('halting... (%d thread(s))' % len(self.threads), 'watchd') + self.stopping.set() - def _cleanup(self): - for wt in self.threads: - wt.stop() - for wt in self.threads: - wt.term() - self.collect_work() - self.submit_collected() - if self.httpd_server: - self.httpd_server.stop() - self.stopped.set() + def _cleanup(self): + for wt in self.threads: + wt.stop() + for wt in self.threads: + wt.term() + self.collect_work() + self.submit_collected() + if self.httpd_server: + self.httpd_server.stop() + self.stopped.set() - def finish(self): - if not self.in_background: self._cleanup() - while not self.stopped.is_set(): time.sleep(0.1) - success_rate = try_div(self.totals['success'], self.totals['submitted']) * 100 - _log("total results: %d/%d (%.2f%%)"%(self.totals['success'], self.totals['submitted'], success_rate), "watchd") + def finish(self): + if not self.in_background: self._cleanup() + while not self.stopped.is_set(): time.sleep(0.1) + success_rate = try_div(self.totals['success'], self.totals['submitted']) * 100 + _log("total results: %d/%d (%.2f%%)"%(self.totals['success'], self.totals['submitted'], success_rate), "watchd") - def _prep_db(self): - self.mysqlite = mysqlite.mysqlite(config.watchd.database, str) - def _close_db(self): - if self.mysqlite: - self.mysqlite.close() - self.mysqlite = None - def __init__(self): - config.load() - self.in_background = False - self.threads = [] - self.stopping = threading.Event() - self.stopped = threading.Event() + def _prep_db(self): + self.mysqlite = mysqlite.mysqlite(config.watchd.database, str) + def _close_db(self): + if self.mysqlite: + self.mysqlite.close() + self.mysqlite = None + def __init__(self): + config.load() + self.in_background = False + self.threads = [] + self.stopping = threading.Event() + self.stopped = threading.Event() - # shared work-stealing queues - self.job_queue = PriorityJobQueue() - self.result_queue = Queue.Queue() + # shared work-stealing queues + self.job_queue = PriorityJobQueue() + self.result_queue = Queue.Queue() - # track pending proxy states (for multi-target aggregation) - self.pending_states = [] # list of ProxyTestState awaiting completion - self.pending_lock = threading.Lock() + # track pending proxy states (for multi-target aggregation) + self.pending_states = [] # list of ProxyTestState awaiting completion + self.pending_lock = threading.Lock() - # create table if needed - self._prep_db() - self.mysqlite.execute('CREATE TABLE IF NOT EXISTS proxylist (proxy BLOB, country BLOB, added INT, failed INT, tested INT, source BLOB, dronebl INT, proto TEXT, success_count INT, total_duration INT, ip TEXT, port INT)') - self.mysqlite.commit() - self._close_db() + # create table if needed + self._prep_db() + self.mysqlite.execute('CREATE TABLE IF NOT EXISTS proxylist (proxy BLOB, country BLOB, added INT, failed INT, tested INT, source BLOB, dronebl INT, proto TEXT, success_count INT, total_duration INT, ip TEXT, port INT)') + self.mysqlite.commit() + self._close_db() - self.submit_after = config.watchd.submit_after # number of collected jobs before writing db - self.collected = [] # completed ProxyTestState objects ready for DB - self.totals = { - 'submitted':0, - 'success':0, - } - self.stats = Stats() - self.last_cleanup = time.time() - self.httpd_server = None + self.submit_after = config.watchd.submit_after # number of collected jobs before writing db + self.collected = [] # completed ProxyTestState objects ready for DB + self.totals = { + 'submitted':0, + 'success':0, + } + self.stats = Stats() + self.last_cleanup = time.time() + self.httpd_server = None - def fetch_rows(self): - self.isoldies = False - q = 'SELECT ip,port,proto,failed,success_count,total_duration,country,mitm,consecutive_success FROM proxylist WHERE failed >= ? and failed < ? and (tested + ? + (failed * ?)) < ? ORDER BY RANDOM()' - rows = self.mysqlite.execute(q, (0, config.watchd.max_fail, config.watchd.checktime, config.watchd.perfail_checktime, time.time())).fetchall() - # check oldies ? - if len(rows) < config.watchd.threads: - rows = [] - if config.watchd.oldies: - self.isoldies = True - ## disable tor safeguard for old proxies - if self.tor_safeguard: self.tor_safeguard = False - rows = self.mysqlite.execute(q, (config.watchd.max_fail, config.watchd.max_fail + round( config.watchd.max_fail/2), config.watchd.checktime, config.watchd.oldies_checktime, time.time())).fetchall() - return rows + def fetch_rows(self): + self.isoldies = False + q = 'SELECT ip,port,proto,failed,success_count,total_duration,country,mitm,consecutive_success FROM proxylist WHERE failed >= ? and failed < ? and (tested + ? + (failed * ?)) < ? ORDER BY RANDOM()' + rows = self.mysqlite.execute(q, (0, config.watchd.max_fail, config.watchd.checktime, config.watchd.perfail_checktime, time.time())).fetchall() + # check oldies ? + if len(rows) < config.watchd.threads: + rows = [] + if config.watchd.oldies: + self.isoldies = True + ## disable tor safeguard for old proxies + if self.tor_safeguard: self.tor_safeguard = False + rows = self.mysqlite.execute(q, (config.watchd.max_fail, config.watchd.max_fail + round( config.watchd.max_fail/2), config.watchd.checktime, config.watchd.oldies_checktime, time.time())).fetchall() + return rows - def prepare_jobs(self): - self._prep_db() - ## enable tor safeguard by default - self.tor_safeguard = config.watchd.tor_safeguard - rows = self.fetch_rows() - checktype = config.watchd.checktype - num_targets = 3 + def prepare_jobs(self): + self._prep_db() + ## enable tor safeguard by default + self.tor_safeguard = config.watchd.tor_safeguard + rows = self.fetch_rows() + checktype = config.watchd.checktype + num_targets = 3 - # select target pool based on checktype - if checktype == 'irc': - target_pool = config.servers - else: - target_pool = list(regexes.keys()) + # select target pool based on checktype + if checktype == 'irc': + target_pool = config.servers + else: + target_pool = list(regexes.keys()) - # create all jobs first, then shuffle for interleaving - all_jobs = [] - new_states = [] + # create all jobs first, then shuffle for interleaving + all_jobs = [] + new_states = [] - for row in rows: - # create shared state for this proxy - state = ProxyTestState( - row[0], row[1], row[2], row[3], row[4], row[5], - row[6], row[7], row[8], num_targets=num_targets, - oldies=self.isoldies - ) - new_states.append(state) + for row in rows: + # create shared state for this proxy + state = ProxyTestState( + row[0], row[1], row[2], row[3], row[4], row[5], + row[6], row[7], row[8], num_targets=num_targets, + oldies=self.isoldies + ) + new_states.append(state) - # select random targets for this proxy - targets = random.sample(target_pool, min(num_targets, len(target_pool))) + # select random targets for this proxy + targets = random.sample(target_pool, min(num_targets, len(target_pool))) - # create one job per target - for target in targets: - job = TargetTestJob(state, target, checktype) - all_jobs.append(job) + # create one job per target + for target in targets: + job = TargetTestJob(state, target, checktype) + all_jobs.append(job) - # shuffle to interleave tests across different proxies - random.shuffle(all_jobs) + # shuffle to interleave tests across different proxies + random.shuffle(all_jobs) - # track pending states - with self.pending_lock: - self.pending_states.extend(new_states) + # track pending states + with self.pending_lock: + self.pending_states.extend(new_states) - # queue all jobs with priority - for job in all_jobs: - priority = calculate_priority( - job.proxy_state.failcount, - job.proxy_state.success_count, - config.watchd.max_fail - ) - self.job_queue.put(job, priority) + # queue all jobs with priority + for job in all_jobs: + priority = calculate_priority( + job.proxy_state.failcount, + job.proxy_state.success_count, + config.watchd.max_fail + ) + self.job_queue.put(job, priority) - self._close_db() - proxy_count = len(new_states) - job_count = len(all_jobs) - if proxy_count > 0: - _log("created %d jobs for %d proxies (%d targets each)" % ( - job_count, proxy_count, num_targets), 'watchd') - return job_count + self._close_db() + proxy_count = len(new_states) + job_count = len(all_jobs) + if proxy_count > 0: + _log("created %d jobs for %d proxies (%d targets each)" % ( + job_count, proxy_count, num_targets), 'watchd') + return job_count - def collect_work(self): - # drain results from shared result queue (TargetTestJob objects) - # results are already recorded in their ProxyTestState - while True: - try: - self.result_queue.get_nowait() - except Queue.Empty: - break + def collect_work(self): + # drain results from shared result queue (TargetTestJob objects) + # results are already recorded in their ProxyTestState + while True: + try: + self.result_queue.get_nowait() + except Queue.Empty: + break - # check for completed proxy states and evaluate them - with self.pending_lock: - still_pending = [] - for state in self.pending_states: - if state.is_complete(): - success, category = state.evaluate() - self.stats.record(success, category) - self.collected.append(state) - else: - still_pending.append(state) - self.pending_states = still_pending + # check for completed proxy states and evaluate them + with self.pending_lock: + still_pending = [] + for state in self.pending_states: + if state.is_complete(): + success, category = state.evaluate() + self.stats.record(success, category) + self.collected.append(state) + else: + still_pending.append(state) + self.pending_states = still_pending - def collect_unfinished(self): - # drain any remaining jobs from job queue - unfinished_count = 0 - while True: - try: - self.job_queue.get_nowait() - unfinished_count += 1 - except Queue.Empty: - break - if unfinished_count > 0: - _log("discarded %d unfinished jobs" % unfinished_count, "watchd") - # note: corresponding ProxyTestStates will be incomplete - # they'll be re-tested in the next cycle + def collect_unfinished(self): + # drain any remaining jobs from job queue + unfinished_count = 0 + while True: + try: + self.job_queue.get_nowait() + unfinished_count += 1 + except Queue.Empty: + break + if unfinished_count > 0: + _log("discarded %d unfinished jobs" % unfinished_count, "watchd") + # note: corresponding ProxyTestStates will be incomplete + # they'll be re-tested in the next cycle - def submit_collected(self): - if len(self.collected) == 0: return True - sc = 0 - args = [] - for job in self.collected: - if job.failcount == 0: sc += 1 - args.append( (job.failcount, job.checktime, 1, job.country, job.proto, job.success_count, job.total_duration, job.mitm, job.consecutive_success, job.proxy) ) + def submit_collected(self): + if len(self.collected) == 0: return True + sc = 0 + args = [] + for job in self.collected: + if job.failcount == 0: sc += 1 + args.append( (job.failcount, job.checktime, 1, job.country, job.proto, job.success_count, job.total_duration, job.mitm, job.consecutive_success, job.proxy) ) - success_rate = (float(sc) / len(self.collected)) * 100 - ret = True - if len(self.collected) >= 100 and success_rate <= config.watchd.outage_threshold and self.tor_safeguard: - _log("WATCHD %.2f%% SUCCESS RATE - tor circuit blocked? won't submit fails"%success_rate, "ERROR") - if sc == 0: return False - args = [] - for job in self.collected: - if job.failcount == 0: - args.append( (job.failcount, job.checktime, 1, job.country, job.proto, job.success_count, job.total_duration, job.mitm, job.consecutive_success, job.proxy) ) - ret = False + success_rate = (float(sc) / len(self.collected)) * 100 + ret = True + if len(self.collected) >= 100 and success_rate <= config.watchd.outage_threshold and self.tor_safeguard: + _log("WATCHD %.2f%% SUCCESS RATE - tor circuit blocked? won't submit fails"%success_rate, "ERROR") + if sc == 0: return False + args = [] + for job in self.collected: + if job.failcount == 0: + args.append( (job.failcount, job.checktime, 1, job.country, job.proto, job.success_count, job.total_duration, job.mitm, job.consecutive_success, job.proxy) ) + ret = False - _log("updating %d DB entries (success rate: %.2f%%)"%(len(self.collected), success_rate), 'watchd') - self._prep_db() - query = 'UPDATE proxylist SET failed=?,tested=?,dronebl=?,country=?,proto=?,success_count=?,total_duration=?,mitm=?,consecutive_success=? WHERE proxy=?' - self.mysqlite.executemany(query, args) - self.mysqlite.commit() - self._close_db() - self.collected = [] - self.totals['submitted'] += len(args) - self.totals['success'] += sc - return ret + _log("updating %d DB entries (success rate: %.2f%%)"%(len(self.collected), success_rate), 'watchd') + self._prep_db() + query = 'UPDATE proxylist SET failed=?,tested=?,dronebl=?,country=?,proto=?,success_count=?,total_duration=?,mitm=?,consecutive_success=? WHERE proxy=?' + self.mysqlite.executemany(query, args) + self.mysqlite.commit() + self._close_db() + self.collected = [] + self.totals['submitted'] += len(args) + self.totals['success'] += sc + return ret - def cleanup_stale(self): - """Remove proxies that have been dead for too long.""" - stale_seconds = config.watchd.stale_days * 86400 - cutoff = int(time.time()) - stale_seconds - self._prep_db() - # delete proxies that: failed >= max_fail AND last tested before cutoff - result = self.mysqlite.execute( - 'DELETE FROM proxylist WHERE failed >= ? AND tested < ?', - (config.watchd.max_fail, cutoff) - ) - count = result.rowcount if hasattr(result, 'rowcount') else 0 - self.mysqlite.commit() - self._close_db() - if count > 0: - _log('removed %d stale proxies (not seen in %d days)' % (count, config.watchd.stale_days), 'watchd') - self.last_cleanup = time.time() + def cleanup_stale(self): + """Remove proxies that have been dead for too long.""" + stale_seconds = config.watchd.stale_days * 86400 + cutoff = int(time.time()) - stale_seconds + self._prep_db() + # delete proxies that: failed >= max_fail AND last tested before cutoff + result = self.mysqlite.execute( + 'DELETE FROM proxylist WHERE failed >= ? AND tested < ?', + (config.watchd.max_fail, cutoff) + ) + count = result.rowcount if hasattr(result, 'rowcount') else 0 + self.mysqlite.commit() + self._close_db() + if count > 0: + _log('removed %d stale proxies (not seen in %d days)' % (count, config.watchd.stale_days), 'watchd') + self.last_cleanup = time.time() - def start(self): - if config.watchd.threads == 1 and _run_standalone: - return self._run() - else: - return self._run_background() + def start(self): + if config.watchd.threads == 1 and _run_standalone: + return self._run() + else: + return self._run_background() - def run(self): - if self.in_background: - while 1: time.sleep(1) + def run(self): + if self.in_background: + while 1: time.sleep(1) - def _run_background(self): - self.in_background = True - t = threading.Thread(target=self._run) - t.start() + def _run_background(self): + self.in_background = True + t = threading.Thread(target=self._run) + t.start() - def _run(self): - _log('starting...', 'watchd') + def _run(self): + _log('starting...', 'watchd') - # Initialize Tor connection pool - connection_pool.init_pool(config.torhosts, warmup=True) + # Initialize Tor connection pool + connection_pool.init_pool(config.torhosts, warmup=True) - # Start HTTP API server if enabled - if config.httpd.enabled: - from httpd import ProxyAPIServer - self.httpd_server = ProxyAPIServer( - config.httpd.listenip, - config.httpd.port, - config.watchd.database - ) - self.httpd_server.start() + # Start HTTP API server if enabled + if config.httpd.enabled: + from httpd import ProxyAPIServer + self.httpd_server = ProxyAPIServer( + config.httpd.listenip, + config.httpd.port, + config.watchd.database + ) + self.httpd_server.start() - # create worker threads with shared queues - for i in range(config.watchd.threads): - threadid = ''.join([random.choice(string.letters) for x in range(5)]) - wt = WorkerThread(threadid, self.job_queue, self.result_queue) - if self.in_background: - wt.start_thread() - self.threads.append(wt) - time.sleep(random.random() / 10) + # create worker threads with shared queues + for i in range(config.watchd.threads): + threadid = ''.join([random.choice(string.letters) for x in range(5)]) + wt = WorkerThread(threadid, self.job_queue, self.result_queue) + if self.in_background: + wt.start_thread() + self.threads.append(wt) + time.sleep(random.random() / 10) - sleeptime = 0 - while True: + sleeptime = 0 + while True: - if self.stopping.is_set(): - if self.in_background: self._cleanup() - break + if self.stopping.is_set(): + if self.in_background: self._cleanup() + break - if sleeptime > 0: - time.sleep(1) - sleeptime -= 1 - continue + if sleeptime > 0: + time.sleep(1) + sleeptime -= 1 + continue - # check if job queue is empty (work-stealing: threads pull as needed) - if self.job_queue.empty(): - self.collect_work() - if not self.submit_collected() and self.tor_safeguard: - _log("zzZzZzzZ sleeping 1 minute(s) due to tor issues", "watchd") - sleeptime = 60 - else: - job_count = self.prepare_jobs() - if job_count == 0: - # no jobs available, wait before checking again - sleeptime = 10 + # check if job queue is empty (work-stealing: threads pull as needed) + if self.job_queue.empty(): + self.collect_work() + if not self.submit_collected() and self.tor_safeguard: + _log("zzZzZzzZ sleeping 1 minute(s) due to tor issues", "watchd") + sleeptime = 60 + else: + job_count = self.prepare_jobs() + if job_count == 0: + # no jobs available, wait before checking again + sleeptime = 10 - if not self.in_background: # single_thread scenario - self.threads[0].workloop() + if not self.in_background: # single_thread scenario + self.threads[0].workloop() - self.collect_work() + self.collect_work() - if len(self.collected) > self.submit_after: - if not self.submit_collected() and self.tor_safeguard: - _log("zzZzZzzZ sleeping 1 minute(s) due to tor issues", "watchd") - sleeptime = 60 + if len(self.collected) > self.submit_after: + if not self.submit_collected() and self.tor_safeguard: + _log("zzZzZzzZ sleeping 1 minute(s) due to tor issues", "watchd") + sleeptime = 60 - # periodic stats report - if self.stats.should_report(config.watchd.stats_interval): - _log(self.stats.report(), 'stats') - # Also report pool stats - pool = connection_pool.get_pool() - if pool: - _log(pool.status_line(), 'stats') + # periodic stats report + if self.stats.should_report(config.watchd.stats_interval): + _log(self.stats.report(), 'stats') + # Also report pool stats + pool = connection_pool.get_pool() + if pool: + _log(pool.status_line(), 'stats') - # periodic stale proxy cleanup (daily) - if (time.time() - self.last_cleanup) >= 86400: - self.cleanup_stale() + # periodic stale proxy cleanup (daily) + if (time.time() - self.last_cleanup) >= 86400: + self.cleanup_stale() - time.sleep(1) + time.sleep(1) if __name__ == '__main__': - _run_standalone = True + _run_standalone = True - config.load() - errors = config.validate() - if errors: - for e in errors: - _log(e, 'error') - import sys - sys.exit(1) + config.load() + errors = config.validate() + if errors: + for e in errors: + _log(e, 'error') + import sys + sys.exit(1) - w = Proxywatchd() - try: - w.start() - w.run() - except KeyboardInterrupt as e: - pass - finally: - w.stop() - w.finish() + w = Proxywatchd() + try: + w.start() + w.run() + except KeyboardInterrupt as e: + pass + finally: + w.stop() + w.finish() diff --git a/soup_parser.py b/soup_parser.py index ebb5d3a..224e426 100644 --- a/soup_parser.py +++ b/soup_parser.py @@ -1,5 +1,6 @@ #!/usr/bin/env python2 -# HTML parsing with optional BeautifulSoup or stdlib fallback +# -*- coding: utf-8 -*- +"""HTML parsing with optional BeautifulSoup or stdlib fallback.""" from HTMLParser import HTMLParser import sys @@ -8,87 +9,98 @@ _bs4_available = False _use_bs4 = True try: - from bs4 import BeautifulSoup, FeatureNotFound - _bs4_available = True + from bs4 import BeautifulSoup, FeatureNotFound + _bs4_available = True except ImportError: - _bs4_available = False + _bs4_available = False -class Tag(): - def __init__(self, name, attrs): - self.name = name - self.attrs = dict(attrs) +class Tag(object): + """Simple tag representation for stdlib parser.""" - def __getitem__(self, key): - return self.attrs.get(key) + def __init__(self, name, attrs): + self.name = name + self.attrs = dict(attrs) - def get(self, key, default=None): - return self.attrs.get(key, default) + def __getitem__(self, key): + return self.attrs.get(key) + + def get(self, key, default=None): + return self.attrs.get(key, default) -class SoupResult(): - def __init__(self, tags): - self._tags = tags - self.body = self +class SoupResult(object): + """BeautifulSoup-like result wrapper for stdlib parser.""" - def find_all(self, tag_name, **kwargs): - results = [] - for tag in self._tags: - if tag.name != tag_name: - continue - if 'href' in kwargs: - if kwargs['href'] is True and 'href' not in tag.attrs: - continue - elif kwargs['href'] is not True and tag.attrs.get('href') != kwargs['href']: - continue - results.append(tag) - return results + def __init__(self, tags): + self._tags = tags + self.body = self + + def find_all(self, tag_name, **kwargs): + """Find all tags matching criteria.""" + results = [] + for tag in self._tags: + if tag.name != tag_name: + continue + if 'href' in kwargs: + if kwargs['href'] is True and 'href' not in tag.attrs: + continue + elif kwargs['href'] is not True and tag.attrs.get('href') != kwargs['href']: + continue + results.append(tag) + return results class LinkExtractor(HTMLParser): - def __init__(self): - HTMLParser.__init__(self) - self.tags = [] + """Extract tags from HTML using stdlib.""" - def handle_starttag(self, tag, attrs): - self.tags.append(Tag(tag, attrs)) + def __init__(self): + HTMLParser.__init__(self) + self.tags = [] - def handle_startendtag(self, tag, attrs): - self.tags.append(Tag(tag, attrs)) + def handle_starttag(self, tag, attrs): + self.tags.append(Tag(tag, attrs)) + + def handle_startendtag(self, tag, attrs): + self.tags.append(Tag(tag, attrs)) def _parse_stdlib(html): - parser = LinkExtractor() - try: - parser.feed(html) - except Exception: - pass # malformed HTML, return partial results - return SoupResult(parser.tags) + """Parse HTML using stdlib HTMLParser.""" + parser = LinkExtractor() + try: + parser.feed(html) + except Exception: + pass # Malformed HTML, return partial results + return SoupResult(parser.tags) def _parse_bs4(html): - try: - return BeautifulSoup(html, 'lxml') - except (FeatureNotFound, Exception): - return BeautifulSoup(html, 'html.parser') + """Parse HTML using BeautifulSoup.""" + try: + return BeautifulSoup(html, 'lxml') + except (FeatureNotFound, Exception): + return BeautifulSoup(html, 'html.parser') def set_nobs(enabled): - global _use_bs4 - _use_bs4 = not enabled - if enabled and _bs4_available: - sys.stderr.write('info: --nobs: using stdlib HTMLParser\n') - elif not _bs4_available: - sys.stderr.write('info: bs4 not available, using stdlib HTMLParser\n') + """Disable BeautifulSoup and use stdlib instead.""" + global _use_bs4 + _use_bs4 = not enabled + if enabled and _bs4_available: + sys.stderr.write('info: --nobs: using stdlib HTMLParser\n') + elif not _bs4_available: + sys.stderr.write('info: bs4 not available, using stdlib HTMLParser\n') def soupify(html, nohtml=False): - htm = html if nohtml else '%s' % (html) - if _use_bs4 and _bs4_available: - return _parse_bs4(htm) - else: - return _parse_stdlib(htm) + """Parse HTML content, returning BeautifulSoup-like object.""" + htm = html if nohtml else '%s' % html + if _use_bs4 and _bs4_available: + return _parse_bs4(htm) + return _parse_stdlib(htm) def is_available(): - return _bs4_available + """Check if BeautifulSoup is available.""" + return _bs4_available