diff --git a/get-started/csi_recv_router/main/app_main.c b/get-started/csi_recv_router/main/app_main.c index 2aed6c3..633bf08 100644 --- a/get-started/csi_recv_router/main/app_main.c +++ b/get-started/csi_recv_router/main/app_main.c @@ -2553,6 +2553,68 @@ static int cmd_handle(const char *cmd, char *reply, size_t reply_size, bool auth return strlen(reply); } +/* ── Serial console (UART0) — AUTH management with physical access ─── */ + +static void serial_task(void *arg) +{ + char line[128]; + ESP_LOGI(TAG, "Serial console ready (type HELP for commands)"); + + while (1) { + if (fgets(line, sizeof(line), stdin) == NULL) { + vTaskDelay(pdMS_TO_TICKS(100)); + continue; + } + + /* Trim trailing whitespace */ + size_t len = strlen(line); + while (len > 0 && (line[len - 1] == '\n' || line[len - 1] == '\r' || line[len - 1] == ' ')) + line[--len] = '\0'; + if (len == 0) continue; + + if (strcasecmp(line, "AUTH") == 0) { + if (s_auth_secret[0]) + printf("OK AUTH on secret=%s\n", s_auth_secret); + else + printf("OK AUTH off\n"); + } else if (strncasecmp(line, "AUTH ", 5) == 0) { + const char *arg = line + 5; + if (strcasecmp(arg, "OFF") == 0) { + s_auth_secret[0] = '\0'; + config_erase_key("auth_secret"); + printf("OK AUTH off (cleared)\n"); + } else { + size_t alen = strlen(arg); + if (alen < 8 || alen > 64) { + printf("ERR secret length 8-64 chars\n"); + } else { + strncpy(s_auth_secret, arg, sizeof(s_auth_secret) - 1); + s_auth_secret[sizeof(s_auth_secret) - 1] = '\0'; + config_save_str("auth_secret", s_auth_secret); + printf("OK AUTH on secret=%s\n", s_auth_secret); + } + } + } else if (strcasecmp(line, "STATUS") == 0) { + const esp_app_desc_t *desc = esp_app_get_description(); + printf("OK hostname=%s uptime_s=%lld heap=%lu auth=%s version=%s\n", + s_hostname, + (long long)(esp_timer_get_time() / 1000000LL), + (unsigned long)esp_get_free_heap_size(), + s_auth_secret[0] ? "on" : "off", + desc->version); + } else if (strcasecmp(line, "HELP") == 0) { + printf("Serial commands:\n" + " AUTH Show auth secret\n" + " AUTH Set auth secret (8-64 chars)\n" + " AUTH OFF Clear auth secret\n" + " STATUS Show basic status\n" + " HELP This help\n"); + } else { + printf("ERR unknown serial command (type HELP)\n"); + } + } +} + static void cmd_task(void *arg) { int sock = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP); @@ -2782,6 +2844,7 @@ void app_main() xTaskCreate(cmd_task, "cmd_task", 6144, NULL, 5, NULL); xTaskCreate(adaptive_task, "adaptive", 3072, NULL, 3, NULL); + xTaskCreate(serial_task, "serial", 3072, NULL, 2, NULL); /* OTA rollback: mark firmware valid if we got this far */ const esp_partition_t *running = esp_ota_get_running_partition(); diff --git a/tools/esp-ota b/tools/esp-ota index 0fd6379..8d6943f 100755 --- a/tools/esp-ota +++ b/tools/esp-ota @@ -32,8 +32,20 @@ def resolve(host: str) -> str: sys.exit(1) +_uptime_cache = {"ip": None, "value": 0, "time": 0} + + def get_uptime(ip: str, timeout: float = TIMEOUT) -> int: - """Query device uptime_s for HMAC timestamp (unauthenticated).""" + """Query device uptime_s for HMAC timestamp (unauthenticated). + + Caches result for 3s to avoid hitting the firmware rate limiter + when multiple udp_cmd calls happen in quick succession. + """ + now = time.monotonic() + if _uptime_cache["ip"] == ip and (now - _uptime_cache["time"]) < 3: + elapsed = int(now - _uptime_cache["time"]) + return _uptime_cache["value"] + elapsed + sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) sock.settimeout(timeout) try: @@ -41,7 +53,9 @@ def get_uptime(ip: str, timeout: float = TIMEOUT) -> int: data, _ = sock.recvfrom(1500) for part in data.decode().split(): if part.startswith("uptime_s="): - return int(part.split("=", 1)[1]) + val = int(part.split("=", 1)[1]) + _uptime_cache.update(ip=ip, value=val, time=now) + return val except (socket.timeout, OSError, ValueError): pass finally: diff --git a/tools/esp-provision b/tools/esp-provision new file mode 100755 index 0000000..0176307 --- /dev/null +++ b/tools/esp-provision @@ -0,0 +1,138 @@ +#!/usr/bin/env python3 +"""Provision auth secret to ESP32 CSI device NVS via USB serial.""" + +import argparse +import os +import secrets +import subprocess +import sys +import tempfile + +NVS_GEN = os.path.expanduser( + "~/esp/esp-idf/components/nvs_flash/nvs_partition_generator/nvs_partition_gen.py" +) +NVS_NAMESPACE = "csi_config" +NVS_OFFSET = "0x9000" +NVS_SIZE = "0x4000" +DEFAULT_PORT = "/dev/ttyUSB0" +DEFAULT_BAUD = "460800" + + +def generate_secret() -> str: + """Generate a 32-char hex secret.""" + return secrets.token_hex(16) + + +def create_nvs_csv(secret: str, path: str) -> None: + """Write NVS CSV with auth_secret entry.""" + with open(path, "w") as f: + f.write("key,type,encoding,value\n") + f.write(f"{NVS_NAMESPACE},namespace,,\n") + f.write(f"auth_secret,data,string,{secret}\n") + + +def main(): + parser = argparse.ArgumentParser( + description="Provision auth secret to ESP32 NVS partition" + ) + parser.add_argument( + "secret", nargs="?", default=None, + help="Auth secret (8-64 chars). Auto-generated if omitted." + ) + parser.add_argument( + "-p", "--port", default=DEFAULT_PORT, + help=f"USB serial port (default: {DEFAULT_PORT})" + ) + parser.add_argument( + "-b", "--baud", default=DEFAULT_BAUD, + help=f"Flash baud rate (default: {DEFAULT_BAUD})" + ) + parser.add_argument( + "--serial", action="store_true", + help="Set secret via serial console instead of NVS flash" + ) + parser.add_argument( + "--generate-only", action="store_true", + help="Generate and print a secret without flashing" + ) + args = parser.parse_args() + + # Generate or validate secret + secret = args.secret or generate_secret() + if len(secret) < 8 or len(secret) > 64: + print("ERR: secret must be 8-64 characters", file=sys.stderr) + sys.exit(1) + + if args.generate_only: + print(secret) + sys.exit(0) + + if args.serial: + # Set secret via serial console (device must be running) + try: + import serial + except ImportError: + print("ERR: pyserial required (pip install pyserial)", file=sys.stderr) + sys.exit(1) + + import time + s = serial.Serial(args.port, 921600, timeout=2) + s.reset_input_buffer() + s.write(f"AUTH {secret}\n".encode()) + time.sleep(0.5) + out = s.read(s.in_waiting or 256).decode("utf-8", errors="replace") + s.close() + + ok = False + for line in out.splitlines(): + if "OK AUTH on" in line: + ok = True + break + + if ok: + print(f"Secret set: {secret}") + else: + print(f"ERR: unexpected response: {out.strip()}", file=sys.stderr) + sys.exit(1) + sys.exit(0) + + # NVS flash method + if not os.path.isfile(NVS_GEN): + print(f"ERR: NVS generator not found: {NVS_GEN}", file=sys.stderr) + print(" Ensure ESP-IDF is installed at ~/esp/esp-idf/", file=sys.stderr) + sys.exit(1) + + with tempfile.TemporaryDirectory() as tmpdir: + csv_path = os.path.join(tmpdir, "nvs.csv") + bin_path = os.path.join(tmpdir, "nvs.bin") + + # Generate NVS partition binary + create_nvs_csv(secret, csv_path) + result = subprocess.run( + [sys.executable, NVS_GEN, "generate", csv_path, bin_path, NVS_SIZE], + capture_output=True, text=True, + ) + if result.returncode != 0: + print(f"ERR: NVS generation failed:\n{result.stderr}", file=sys.stderr) + sys.exit(1) + + # Flash NVS partition + print(f"Flashing auth secret to NVS at {NVS_OFFSET}...") + result = subprocess.run( + [ + sys.executable, "-m", "esptool", + "--port", args.port, "--baud", args.baud, + "write_flash", NVS_OFFSET, bin_path, + ], + capture_output=True, text=True, + ) + if result.returncode != 0: + print(f"ERR: flash failed:\n{result.stderr}", file=sys.stderr) + sys.exit(1) + + print(f"Secret provisioned: {secret}") + print(f" Add to environment: export ESP_CMD_SECRET=\"{secret}\"") + + +if __name__ == "__main__": + main()