add network statistics tracking module
This commit is contained in:
157
network_stats.py
Normal file
157
network_stats.py
Normal file
@@ -0,0 +1,157 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
"""Thread-safe network statistics tracking."""
|
||||
|
||||
import threading
|
||||
import time
|
||||
|
||||
class NetworkStats(object):
|
||||
"""Track network bytes sent/received with thread-safe counters."""
|
||||
|
||||
def __init__(self):
|
||||
self._lock = threading.Lock()
|
||||
self._bytes_rx = 0
|
||||
self._bytes_tx = 0
|
||||
self._requests = 0
|
||||
self._start_time = time.time()
|
||||
# Per-category stats
|
||||
self._scraper_rx = 0
|
||||
self._scraper_tx = 0
|
||||
self._proxy_rx = 0
|
||||
self._proxy_tx = 0
|
||||
# Per-tor-node stats: {host:port -> {'rx': n, 'tx': n, 'requests': n}}
|
||||
self._tor_stats = {}
|
||||
|
||||
def add_rx(self, nbytes, category='other', tor_node=None):
|
||||
"""Add received bytes."""
|
||||
with self._lock:
|
||||
self._bytes_rx += nbytes
|
||||
if category == 'scraper':
|
||||
self._scraper_rx += nbytes
|
||||
elif category == 'proxy':
|
||||
self._proxy_rx += nbytes
|
||||
if tor_node:
|
||||
if tor_node not in self._tor_stats:
|
||||
self._tor_stats[tor_node] = {'rx': 0, 'tx': 0, 'requests': 0}
|
||||
self._tor_stats[tor_node]['rx'] += nbytes
|
||||
|
||||
def add_tx(self, nbytes, category='other', tor_node=None):
|
||||
"""Add transmitted bytes."""
|
||||
with self._lock:
|
||||
self._bytes_tx += nbytes
|
||||
if category == 'scraper':
|
||||
self._scraper_tx += nbytes
|
||||
elif category == 'proxy':
|
||||
self._proxy_tx += nbytes
|
||||
if tor_node:
|
||||
if tor_node not in self._tor_stats:
|
||||
self._tor_stats[tor_node] = {'rx': 0, 'tx': 0, 'requests': 0}
|
||||
self._tor_stats[tor_node]['tx'] += nbytes
|
||||
|
||||
def add_request(self, tor_node=None):
|
||||
"""Increment request counter."""
|
||||
with self._lock:
|
||||
self._requests += 1
|
||||
if tor_node:
|
||||
if tor_node not in self._tor_stats:
|
||||
self._tor_stats[tor_node] = {'rx': 0, 'tx': 0, 'requests': 0}
|
||||
self._tor_stats[tor_node]['requests'] += 1
|
||||
|
||||
def get_stats(self):
|
||||
"""Return current stats as dict."""
|
||||
with self._lock:
|
||||
elapsed = time.time() - self._start_time
|
||||
rx_rate = self._bytes_rx / elapsed if elapsed > 0 else 0
|
||||
tx_rate = self._bytes_tx / elapsed if elapsed > 0 else 0
|
||||
return {
|
||||
'bytes_rx': self._bytes_rx,
|
||||
'bytes_tx': self._bytes_tx,
|
||||
'bytes_total': self._bytes_rx + self._bytes_tx,
|
||||
'requests': self._requests,
|
||||
'elapsed': elapsed,
|
||||
'rx_rate': rx_rate,
|
||||
'tx_rate': tx_rate,
|
||||
'scraper': {
|
||||
'bytes_rx': self._scraper_rx,
|
||||
'bytes_tx': self._scraper_tx,
|
||||
'bytes_total': self._scraper_rx + self._scraper_tx,
|
||||
},
|
||||
'proxy': {
|
||||
'bytes_rx': self._proxy_rx,
|
||||
'bytes_tx': self._proxy_tx,
|
||||
'bytes_total': self._proxy_rx + self._proxy_tx,
|
||||
},
|
||||
'tor_nodes': dict(self._tor_stats),
|
||||
}
|
||||
|
||||
def reset(self):
|
||||
"""Reset all counters."""
|
||||
with self._lock:
|
||||
self._bytes_rx = 0
|
||||
self._bytes_tx = 0
|
||||
self._requests = 0
|
||||
self._scraper_rx = 0
|
||||
self._scraper_tx = 0
|
||||
self._proxy_rx = 0
|
||||
self._proxy_tx = 0
|
||||
self._tor_stats = {}
|
||||
self._start_time = time.time()
|
||||
|
||||
|
||||
# Global singleton
|
||||
_stats = NetworkStats()
|
||||
|
||||
# Thread-local context for category and tor_node tracking
|
||||
_context = threading.local()
|
||||
|
||||
|
||||
def set_category(category):
|
||||
"""Set the current category for this thread (scraper, proxy, or other)."""
|
||||
_context.category = category
|
||||
|
||||
|
||||
def get_category():
|
||||
"""Get the current category for this thread."""
|
||||
return getattr(_context, 'category', 'other')
|
||||
|
||||
|
||||
def set_tor_node(tor_node):
|
||||
"""Set the current Tor node for this thread (host:port)."""
|
||||
_context.tor_node = tor_node
|
||||
|
||||
|
||||
def get_tor_node():
|
||||
"""Get the current Tor node for this thread."""
|
||||
return getattr(_context, 'tor_node', None)
|
||||
|
||||
|
||||
def add_rx(nbytes, category=None, tor_node=None):
|
||||
"""Add received bytes. Uses thread-local context if not specified."""
|
||||
if category is None:
|
||||
category = get_category()
|
||||
if tor_node is None:
|
||||
tor_node = get_tor_node()
|
||||
_stats.add_rx(nbytes, category, tor_node)
|
||||
|
||||
|
||||
def add_tx(nbytes, category=None, tor_node=None):
|
||||
"""Add transmitted bytes. Uses thread-local context if not specified."""
|
||||
if category is None:
|
||||
category = get_category()
|
||||
if tor_node is None:
|
||||
tor_node = get_tor_node()
|
||||
_stats.add_tx(nbytes, category, tor_node)
|
||||
|
||||
|
||||
def add_request(tor_node=None):
|
||||
"""Increment request counter."""
|
||||
if tor_node is None:
|
||||
tor_node = get_tor_node()
|
||||
_stats.add_request(tor_node)
|
||||
|
||||
|
||||
def get_stats():
|
||||
return _stats.get_stats()
|
||||
|
||||
|
||||
def reset():
|
||||
_stats.reset()
|
||||
Reference in New Issue
Block a user