fix: Add uptime sync to all tools for 5s HMAC replay window

All three standalone tools (esp-cmd, esp-fleet, esp-ota) now fetch
device uptime before signing commands, matching what esp-ctl already
does. Includes 60ms delay after uptime fetch to avoid firmware rate
limiter (50ms inter-command throttle).
This commit is contained in:
user
2026-02-14 20:29:49 +01:00
parent 8fcc90a6db
commit a4bd2a6315
3 changed files with 76 additions and 7 deletions

View File

@@ -3,8 +3,9 @@
import socket
import sys
import time
from esp_ctl.auth import sign_command
from esp_ctl.auth import get_secret, sign_command
DEFAULT_PORT = 5501
TIMEOUT = 2.0
@@ -37,14 +38,35 @@ def resolve(host):
sys.exit(1)
def get_uptime(ip):
"""Query device uptime_s for HMAC timestamp (unauthenticated)."""
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.settimeout(TIMEOUT)
try:
sock.sendto(b"STATUS", (ip, DEFAULT_PORT))
data, _ = sock.recvfrom(1500)
for part in data.decode().split():
if part.startswith("uptime_s="):
return int(part.split("=", 1)[1])
except (socket.timeout, OSError, ValueError):
pass
finally:
sock.close()
return 0
def main():
if len(sys.argv) < 3 or sys.argv[1] in ("-h", "--help"):
print(USAGE)
sys.exit(0 if sys.argv[1:] and sys.argv[1] in ("-h", "--help") else 2)
host = sys.argv[1]
cmd = sign_command(" ".join(sys.argv[2:]).strip())
ip = resolve(host)
secret = get_secret()
uptime = get_uptime(ip) if secret else 0
if secret and uptime:
time.sleep(0.06) # avoid firmware rate limiter (50ms)
cmd = sign_command(" ".join(sys.argv[2:]).strip(), uptime, secret)
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.settimeout(TIMEOUT)

View File

@@ -10,7 +10,7 @@ import sys
import threading
import time
from esp_ctl.auth import sign_command
from esp_ctl.auth import get_secret, sign_command
DEFAULT_PORT = 5501
DEFAULT_HTTP_PORT = 8070
@@ -53,15 +53,37 @@ Examples:
esp-fleet ota --parallel /path/to/firmware.bin"""
def _get_uptime(ip, timeout=TIMEOUT):
"""Query device uptime_s for HMAC timestamp (unauthenticated)."""
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.settimeout(timeout)
try:
sock.sendto(b"STATUS", (ip, DEFAULT_PORT))
data, _ = sock.recvfrom(1500)
for part in data.decode().split():
if part.startswith("uptime_s="):
return int(part.split("=", 1)[1])
except (socket.timeout, OSError, ValueError):
pass
finally:
sock.close()
return 0
def query(name, host, cmd):
"""Send command to one sensor, return (name, reply_or_error)."""
cmd = sign_command(cmd)
try:
info = socket.getaddrinfo(host, DEFAULT_PORT, socket.AF_INET, socket.SOCK_DGRAM)
ip = info[0][4][0]
except socket.gaierror:
return (name, f"ERR: cannot resolve {host}")
secret = get_secret()
uptime = _get_uptime(ip) if secret else 0
if secret and uptime:
time.sleep(0.06) # avoid firmware rate limiter (50ms)
cmd = sign_command(cmd, uptime, secret)
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.settimeout(TIMEOUT)
try:
@@ -87,7 +109,11 @@ def _resolve(host):
def _udp_cmd(ip, cmd, timeout=TIMEOUT):
"""Send signed UDP command and return reply string."""
cmd = sign_command(cmd)
secret = get_secret()
uptime = _get_uptime(ip, timeout) if secret else 0
if secret and uptime:
time.sleep(0.06) # avoid firmware rate limiter (50ms)
cmd = sign_command(cmd, uptime, secret)
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.settimeout(timeout)
try:

View File

@@ -9,7 +9,7 @@ import sys
import threading
import time
from esp_ctl.auth import sign_command
from esp_ctl.auth import get_secret, sign_command
DEFAULT_CMD_PORT = 5501
DEFAULT_HTTP_PORT = 8070
@@ -32,9 +32,30 @@ def resolve(host: str) -> str:
sys.exit(1)
def get_uptime(ip: str, timeout: float = TIMEOUT) -> int:
"""Query device uptime_s for HMAC timestamp (unauthenticated)."""
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.settimeout(timeout)
try:
sock.sendto(b"STATUS", (ip, DEFAULT_CMD_PORT))
data, _ = sock.recvfrom(1500)
for part in data.decode().split():
if part.startswith("uptime_s="):
return int(part.split("=", 1)[1])
except (socket.timeout, OSError, ValueError):
pass
finally:
sock.close()
return 0
def udp_cmd(ip: str, cmd: str, timeout: float = TIMEOUT) -> str:
"""Send UDP command and return reply."""
cmd = sign_command(cmd)
secret = get_secret()
uptime = get_uptime(ip, timeout) if secret else 0
if secret and uptime:
time.sleep(0.06) # avoid firmware rate limiter (50ms)
cmd = sign_command(cmd, uptime, secret)
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.settimeout(timeout)
try: