diff --git a/ppf.py b/ppf.py index 92b86e2..f38a26f 100644 --- a/ppf.py +++ b/ppf.py @@ -801,6 +801,8 @@ def worker_main(config): timeout_start = time.time() timeout_seconds = max(config.watchd.timeout * 2 + 30, len(all_jobs) * 0.5) working_results = [] + last_heartbeat = time.time() + last_report = time.time() while completed < len(all_jobs): try: @@ -834,6 +836,40 @@ def worker_main(config): if time.time() - timeout_start > timeout_seconds: _log('test timeout, %d/%d completed' % (completed, len(all_jobs)), 'warn') break + + # Periodic heartbeat to prevent stale detection + now = time.time() + if now - last_heartbeat >= 60: + try: + worker_send_heartbeat(server_url, wstate['worker_key'], + True, current_tor_ip, worker_profiling, num_threads) + except NeedReregister: + do_register() + last_heartbeat = now + + # Periodic proxy report (flush working results every 5 minutes) + if working_results and now - last_report >= 300: + reported = False + try: + processed = worker_report_proxies(server_url, wstate['worker_key'], + working_results) + if processed > 0: + _log('interim report: %d proxies (%d submitted)' % ( + len(working_results), processed), 'info') + reported = True + except NeedReregister: + do_register() + try: + processed = worker_report_proxies(server_url, wstate['worker_key'], + working_results) + if processed > 0: + reported = True + except NeedReregister: + pass + if reported: + working_results = [] + last_report = now + continue # Populate proxy test cache from results