diff --git a/network_stats.py b/network_stats.py new file mode 100644 index 0000000..a05a77d --- /dev/null +++ b/network_stats.py @@ -0,0 +1,157 @@ +# -*- coding: utf-8 -*- +"""Thread-safe network statistics tracking.""" + +import threading +import time + +class NetworkStats(object): + """Track network bytes sent/received with thread-safe counters.""" + + def __init__(self): + self._lock = threading.Lock() + self._bytes_rx = 0 + self._bytes_tx = 0 + self._requests = 0 + self._start_time = time.time() + # Per-category stats + self._scraper_rx = 0 + self._scraper_tx = 0 + self._proxy_rx = 0 + self._proxy_tx = 0 + # Per-tor-node stats: {host:port -> {'rx': n, 'tx': n, 'requests': n}} + self._tor_stats = {} + + def add_rx(self, nbytes, category='other', tor_node=None): + """Add received bytes.""" + with self._lock: + self._bytes_rx += nbytes + if category == 'scraper': + self._scraper_rx += nbytes + elif category == 'proxy': + self._proxy_rx += nbytes + if tor_node: + if tor_node not in self._tor_stats: + self._tor_stats[tor_node] = {'rx': 0, 'tx': 0, 'requests': 0} + self._tor_stats[tor_node]['rx'] += nbytes + + def add_tx(self, nbytes, category='other', tor_node=None): + """Add transmitted bytes.""" + with self._lock: + self._bytes_tx += nbytes + if category == 'scraper': + self._scraper_tx += nbytes + elif category == 'proxy': + self._proxy_tx += nbytes + if tor_node: + if tor_node not in self._tor_stats: + self._tor_stats[tor_node] = {'rx': 0, 'tx': 0, 'requests': 0} + self._tor_stats[tor_node]['tx'] += nbytes + + def add_request(self, tor_node=None): + """Increment request counter.""" + with self._lock: + self._requests += 1 + if tor_node: + if tor_node not in self._tor_stats: + self._tor_stats[tor_node] = {'rx': 0, 'tx': 0, 'requests': 0} + self._tor_stats[tor_node]['requests'] += 1 + + def get_stats(self): + """Return current stats as dict.""" + with self._lock: + elapsed = time.time() - self._start_time + rx_rate = self._bytes_rx / elapsed if elapsed > 0 else 0 + tx_rate = self._bytes_tx / elapsed if elapsed > 0 else 0 + return { + 'bytes_rx': self._bytes_rx, + 'bytes_tx': self._bytes_tx, + 'bytes_total': self._bytes_rx + self._bytes_tx, + 'requests': self._requests, + 'elapsed': elapsed, + 'rx_rate': rx_rate, + 'tx_rate': tx_rate, + 'scraper': { + 'bytes_rx': self._scraper_rx, + 'bytes_tx': self._scraper_tx, + 'bytes_total': self._scraper_rx + self._scraper_tx, + }, + 'proxy': { + 'bytes_rx': self._proxy_rx, + 'bytes_tx': self._proxy_tx, + 'bytes_total': self._proxy_rx + self._proxy_tx, + }, + 'tor_nodes': dict(self._tor_stats), + } + + def reset(self): + """Reset all counters.""" + with self._lock: + self._bytes_rx = 0 + self._bytes_tx = 0 + self._requests = 0 + self._scraper_rx = 0 + self._scraper_tx = 0 + self._proxy_rx = 0 + self._proxy_tx = 0 + self._tor_stats = {} + self._start_time = time.time() + + +# Global singleton +_stats = NetworkStats() + +# Thread-local context for category and tor_node tracking +_context = threading.local() + + +def set_category(category): + """Set the current category for this thread (scraper, proxy, or other).""" + _context.category = category + + +def get_category(): + """Get the current category for this thread.""" + return getattr(_context, 'category', 'other') + + +def set_tor_node(tor_node): + """Set the current Tor node for this thread (host:port).""" + _context.tor_node = tor_node + + +def get_tor_node(): + """Get the current Tor node for this thread.""" + return getattr(_context, 'tor_node', None) + + +def add_rx(nbytes, category=None, tor_node=None): + """Add received bytes. Uses thread-local context if not specified.""" + if category is None: + category = get_category() + if tor_node is None: + tor_node = get_tor_node() + _stats.add_rx(nbytes, category, tor_node) + + +def add_tx(nbytes, category=None, tor_node=None): + """Add transmitted bytes. Uses thread-local context if not specified.""" + if category is None: + category = get_category() + if tor_node is None: + tor_node = get_tor_node() + _stats.add_tx(nbytes, category, tor_node) + + +def add_request(tor_node=None): + """Increment request counter.""" + if tor_node is None: + tor_node = get_tor_node() + _stats.add_request(tor_node) + + +def get_stats(): + return _stats.get_stats() + + +def reset(): + _stats.reset()