ppf: worker heartbeat includes thread count
This commit is contained in:
37
ppf.py
37
ppf.py
@@ -345,13 +345,14 @@ def worker_submit_results(server_url, worker_key, results):
|
|||||||
return 0
|
return 0
|
||||||
|
|
||||||
|
|
||||||
def worker_send_heartbeat(server_url, worker_key, tor_ok, tor_ip=None, profiling=False):
|
def worker_send_heartbeat(server_url, worker_key, tor_ok, tor_ip=None, profiling=False, threads=0):
|
||||||
"""Send heartbeat with Tor status to master."""
|
"""Send heartbeat with Tor status to master."""
|
||||||
url = '%s/api/heartbeat?key=%s' % (server_url.rstrip('/'), worker_key)
|
url = '%s/api/heartbeat?key=%s' % (server_url.rstrip('/'), worker_key)
|
||||||
data = json.dumps({
|
data = json.dumps({
|
||||||
'tor_ok': tor_ok,
|
'tor_ok': tor_ok,
|
||||||
'tor_ip': tor_ip,
|
'tor_ip': tor_ip,
|
||||||
'profiling': profiling,
|
'profiling': profiling,
|
||||||
|
'threads': threads,
|
||||||
})
|
})
|
||||||
|
|
||||||
req = urllib2.Request(url, data)
|
req = urllib2.Request(url, data)
|
||||||
@@ -529,14 +530,14 @@ def worker_main(config):
|
|||||||
_log('tor recovered: %s (%s)' % (working[0], tor_ip or 'unknown'), 'info')
|
_log('tor recovered: %s (%s)' % (working[0], tor_ip or 'unknown'), 'info')
|
||||||
# Send heartbeat to manager
|
# Send heartbeat to manager
|
||||||
try:
|
try:
|
||||||
worker_send_heartbeat(server_url, wstate['worker_key'], True, tor_ip, worker_profiling)
|
worker_send_heartbeat(server_url, wstate['worker_key'], True, tor_ip, worker_profiling, num_threads)
|
||||||
except NeedReregister:
|
except NeedReregister:
|
||||||
do_register()
|
do_register()
|
||||||
return working, tor_ip
|
return working, tor_ip
|
||||||
_log('tor still down, retrying in %ds' % check_interval, 'warn')
|
_log('tor still down, retrying in %ds' % check_interval, 'warn')
|
||||||
# Send heartbeat with tor_ok=False
|
# Send heartbeat with tor_ok=False
|
||||||
try:
|
try:
|
||||||
worker_send_heartbeat(server_url, wstate['worker_key'], False, None, worker_profiling)
|
worker_send_heartbeat(server_url, wstate['worker_key'], False, None, worker_profiling, num_threads)
|
||||||
except NeedReregister:
|
except NeedReregister:
|
||||||
do_register()
|
do_register()
|
||||||
time.sleep(check_interval)
|
time.sleep(check_interval)
|
||||||
@@ -549,7 +550,7 @@ def worker_main(config):
|
|||||||
consecutive_tor_failures += 1
|
consecutive_tor_failures += 1
|
||||||
_log('tor down before claiming work (consecutive: %d)' % consecutive_tor_failures, 'warn')
|
_log('tor down before claiming work (consecutive: %d)' % consecutive_tor_failures, 'warn')
|
||||||
try:
|
try:
|
||||||
worker_send_heartbeat(server_url, wstate['worker_key'], False, None, worker_profiling)
|
worker_send_heartbeat(server_url, wstate['worker_key'], False, None, worker_profiling, num_threads)
|
||||||
except NeedReregister:
|
except NeedReregister:
|
||||||
do_register()
|
do_register()
|
||||||
if consecutive_tor_failures >= 2:
|
if consecutive_tor_failures >= 2:
|
||||||
@@ -567,7 +568,7 @@ def worker_main(config):
|
|||||||
current_tor_ip = tor_ip
|
current_tor_ip = tor_ip
|
||||||
# Send heartbeat to manager
|
# Send heartbeat to manager
|
||||||
try:
|
try:
|
||||||
worker_send_heartbeat(server_url, wstate['worker_key'], True, tor_ip, worker_profiling)
|
worker_send_heartbeat(server_url, wstate['worker_key'], True, tor_ip, worker_profiling, num_threads)
|
||||||
except NeedReregister:
|
except NeedReregister:
|
||||||
do_register()
|
do_register()
|
||||||
|
|
||||||
@@ -747,11 +748,37 @@ def main():
|
|||||||
# Start httpd independently when watchd is disabled
|
# Start httpd independently when watchd is disabled
|
||||||
if config.httpd.enabled:
|
if config.httpd.enabled:
|
||||||
from httpd import ProxyAPIServer
|
from httpd import ProxyAPIServer
|
||||||
|
import network_stats
|
||||||
|
|
||||||
|
def httpd_stats_provider():
|
||||||
|
"""Stats provider for httpd-only mode (scraping without testing)."""
|
||||||
|
stats = {
|
||||||
|
'network': network_stats.get_stats(),
|
||||||
|
}
|
||||||
|
# Add scraper stats if available
|
||||||
|
try:
|
||||||
|
import scraper
|
||||||
|
scraper_stats = scraper.get_scraper_stats()
|
||||||
|
if scraper_stats:
|
||||||
|
stats['scraper'] = scraper_stats
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
# Add tor pool stats if available
|
||||||
|
try:
|
||||||
|
import connection_pool
|
||||||
|
pool = connection_pool.get_pool()
|
||||||
|
if pool:
|
||||||
|
stats['tor_pool'] = pool.get_stats()
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
return stats
|
||||||
|
|
||||||
profiling = config.args.profile or config.common.profiling
|
profiling = config.args.profile or config.common.profiling
|
||||||
httpd_server = ProxyAPIServer(
|
httpd_server = ProxyAPIServer(
|
||||||
config.httpd.listenip,
|
config.httpd.listenip,
|
||||||
config.httpd.port,
|
config.httpd.port,
|
||||||
config.watchd.database,
|
config.watchd.database,
|
||||||
|
stats_provider=httpd_stats_provider,
|
||||||
profiling=profiling,
|
profiling=profiling,
|
||||||
)
|
)
|
||||||
httpd_server.start()
|
httpd_server.start()
|
||||||
|
|||||||
Reference in New Issue
Block a user