feat: Serial console AUTH + NVS provisioning tool
- Add serial_task: UART console for AUTH management with physical access AUTH shows full secret, AUTH <secret> sets, AUTH OFF clears - Add esp-provision tool: provision auth secret via serial or NVS flash Supports auto-generate, custom secrets, --serial and --generate-only - Fix esp-ota uptime cache: avoid firmware rate limiter on consecutive udp_cmd calls by caching uptime_s for 3s
This commit is contained in:
@@ -2553,6 +2553,68 @@ static int cmd_handle(const char *cmd, char *reply, size_t reply_size, bool auth
|
|||||||
return strlen(reply);
|
return strlen(reply);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/* ── Serial console (UART0) — AUTH management with physical access ─── */
|
||||||
|
|
||||||
|
static void serial_task(void *arg)
|
||||||
|
{
|
||||||
|
char line[128];
|
||||||
|
ESP_LOGI(TAG, "Serial console ready (type HELP for commands)");
|
||||||
|
|
||||||
|
while (1) {
|
||||||
|
if (fgets(line, sizeof(line), stdin) == NULL) {
|
||||||
|
vTaskDelay(pdMS_TO_TICKS(100));
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
/* Trim trailing whitespace */
|
||||||
|
size_t len = strlen(line);
|
||||||
|
while (len > 0 && (line[len - 1] == '\n' || line[len - 1] == '\r' || line[len - 1] == ' '))
|
||||||
|
line[--len] = '\0';
|
||||||
|
if (len == 0) continue;
|
||||||
|
|
||||||
|
if (strcasecmp(line, "AUTH") == 0) {
|
||||||
|
if (s_auth_secret[0])
|
||||||
|
printf("OK AUTH on secret=%s\n", s_auth_secret);
|
||||||
|
else
|
||||||
|
printf("OK AUTH off\n");
|
||||||
|
} else if (strncasecmp(line, "AUTH ", 5) == 0) {
|
||||||
|
const char *arg = line + 5;
|
||||||
|
if (strcasecmp(arg, "OFF") == 0) {
|
||||||
|
s_auth_secret[0] = '\0';
|
||||||
|
config_erase_key("auth_secret");
|
||||||
|
printf("OK AUTH off (cleared)\n");
|
||||||
|
} else {
|
||||||
|
size_t alen = strlen(arg);
|
||||||
|
if (alen < 8 || alen > 64) {
|
||||||
|
printf("ERR secret length 8-64 chars\n");
|
||||||
|
} else {
|
||||||
|
strncpy(s_auth_secret, arg, sizeof(s_auth_secret) - 1);
|
||||||
|
s_auth_secret[sizeof(s_auth_secret) - 1] = '\0';
|
||||||
|
config_save_str("auth_secret", s_auth_secret);
|
||||||
|
printf("OK AUTH on secret=%s\n", s_auth_secret);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else if (strcasecmp(line, "STATUS") == 0) {
|
||||||
|
const esp_app_desc_t *desc = esp_app_get_description();
|
||||||
|
printf("OK hostname=%s uptime_s=%lld heap=%lu auth=%s version=%s\n",
|
||||||
|
s_hostname,
|
||||||
|
(long long)(esp_timer_get_time() / 1000000LL),
|
||||||
|
(unsigned long)esp_get_free_heap_size(),
|
||||||
|
s_auth_secret[0] ? "on" : "off",
|
||||||
|
desc->version);
|
||||||
|
} else if (strcasecmp(line, "HELP") == 0) {
|
||||||
|
printf("Serial commands:\n"
|
||||||
|
" AUTH Show auth secret\n"
|
||||||
|
" AUTH <secret> Set auth secret (8-64 chars)\n"
|
||||||
|
" AUTH OFF Clear auth secret\n"
|
||||||
|
" STATUS Show basic status\n"
|
||||||
|
" HELP This help\n");
|
||||||
|
} else {
|
||||||
|
printf("ERR unknown serial command (type HELP)\n");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
static void cmd_task(void *arg)
|
static void cmd_task(void *arg)
|
||||||
{
|
{
|
||||||
int sock = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP);
|
int sock = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP);
|
||||||
@@ -2782,6 +2844,7 @@ void app_main()
|
|||||||
|
|
||||||
xTaskCreate(cmd_task, "cmd_task", 6144, NULL, 5, NULL);
|
xTaskCreate(cmd_task, "cmd_task", 6144, NULL, 5, NULL);
|
||||||
xTaskCreate(adaptive_task, "adaptive", 3072, NULL, 3, NULL);
|
xTaskCreate(adaptive_task, "adaptive", 3072, NULL, 3, NULL);
|
||||||
|
xTaskCreate(serial_task, "serial", 3072, NULL, 2, NULL);
|
||||||
|
|
||||||
/* OTA rollback: mark firmware valid if we got this far */
|
/* OTA rollback: mark firmware valid if we got this far */
|
||||||
const esp_partition_t *running = esp_ota_get_running_partition();
|
const esp_partition_t *running = esp_ota_get_running_partition();
|
||||||
|
|||||||
@@ -32,8 +32,20 @@ def resolve(host: str) -> str:
|
|||||||
sys.exit(1)
|
sys.exit(1)
|
||||||
|
|
||||||
|
|
||||||
|
_uptime_cache = {"ip": None, "value": 0, "time": 0}
|
||||||
|
|
||||||
|
|
||||||
def get_uptime(ip: str, timeout: float = TIMEOUT) -> int:
|
def get_uptime(ip: str, timeout: float = TIMEOUT) -> int:
|
||||||
"""Query device uptime_s for HMAC timestamp (unauthenticated)."""
|
"""Query device uptime_s for HMAC timestamp (unauthenticated).
|
||||||
|
|
||||||
|
Caches result for 3s to avoid hitting the firmware rate limiter
|
||||||
|
when multiple udp_cmd calls happen in quick succession.
|
||||||
|
"""
|
||||||
|
now = time.monotonic()
|
||||||
|
if _uptime_cache["ip"] == ip and (now - _uptime_cache["time"]) < 3:
|
||||||
|
elapsed = int(now - _uptime_cache["time"])
|
||||||
|
return _uptime_cache["value"] + elapsed
|
||||||
|
|
||||||
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
||||||
sock.settimeout(timeout)
|
sock.settimeout(timeout)
|
||||||
try:
|
try:
|
||||||
@@ -41,7 +53,9 @@ def get_uptime(ip: str, timeout: float = TIMEOUT) -> int:
|
|||||||
data, _ = sock.recvfrom(1500)
|
data, _ = sock.recvfrom(1500)
|
||||||
for part in data.decode().split():
|
for part in data.decode().split():
|
||||||
if part.startswith("uptime_s="):
|
if part.startswith("uptime_s="):
|
||||||
return int(part.split("=", 1)[1])
|
val = int(part.split("=", 1)[1])
|
||||||
|
_uptime_cache.update(ip=ip, value=val, time=now)
|
||||||
|
return val
|
||||||
except (socket.timeout, OSError, ValueError):
|
except (socket.timeout, OSError, ValueError):
|
||||||
pass
|
pass
|
||||||
finally:
|
finally:
|
||||||
|
|||||||
138
tools/esp-provision
Executable file
138
tools/esp-provision
Executable file
@@ -0,0 +1,138 @@
|
|||||||
|
#!/usr/bin/env python3
|
||||||
|
"""Provision auth secret to ESP32 CSI device NVS via USB serial."""
|
||||||
|
|
||||||
|
import argparse
|
||||||
|
import os
|
||||||
|
import secrets
|
||||||
|
import subprocess
|
||||||
|
import sys
|
||||||
|
import tempfile
|
||||||
|
|
||||||
|
NVS_GEN = os.path.expanduser(
|
||||||
|
"~/esp/esp-idf/components/nvs_flash/nvs_partition_generator/nvs_partition_gen.py"
|
||||||
|
)
|
||||||
|
NVS_NAMESPACE = "csi_config"
|
||||||
|
NVS_OFFSET = "0x9000"
|
||||||
|
NVS_SIZE = "0x4000"
|
||||||
|
DEFAULT_PORT = "/dev/ttyUSB0"
|
||||||
|
DEFAULT_BAUD = "460800"
|
||||||
|
|
||||||
|
|
||||||
|
def generate_secret() -> str:
|
||||||
|
"""Generate a 32-char hex secret."""
|
||||||
|
return secrets.token_hex(16)
|
||||||
|
|
||||||
|
|
||||||
|
def create_nvs_csv(secret: str, path: str) -> None:
|
||||||
|
"""Write NVS CSV with auth_secret entry."""
|
||||||
|
with open(path, "w") as f:
|
||||||
|
f.write("key,type,encoding,value\n")
|
||||||
|
f.write(f"{NVS_NAMESPACE},namespace,,\n")
|
||||||
|
f.write(f"auth_secret,data,string,{secret}\n")
|
||||||
|
|
||||||
|
|
||||||
|
def main():
|
||||||
|
parser = argparse.ArgumentParser(
|
||||||
|
description="Provision auth secret to ESP32 NVS partition"
|
||||||
|
)
|
||||||
|
parser.add_argument(
|
||||||
|
"secret", nargs="?", default=None,
|
||||||
|
help="Auth secret (8-64 chars). Auto-generated if omitted."
|
||||||
|
)
|
||||||
|
parser.add_argument(
|
||||||
|
"-p", "--port", default=DEFAULT_PORT,
|
||||||
|
help=f"USB serial port (default: {DEFAULT_PORT})"
|
||||||
|
)
|
||||||
|
parser.add_argument(
|
||||||
|
"-b", "--baud", default=DEFAULT_BAUD,
|
||||||
|
help=f"Flash baud rate (default: {DEFAULT_BAUD})"
|
||||||
|
)
|
||||||
|
parser.add_argument(
|
||||||
|
"--serial", action="store_true",
|
||||||
|
help="Set secret via serial console instead of NVS flash"
|
||||||
|
)
|
||||||
|
parser.add_argument(
|
||||||
|
"--generate-only", action="store_true",
|
||||||
|
help="Generate and print a secret without flashing"
|
||||||
|
)
|
||||||
|
args = parser.parse_args()
|
||||||
|
|
||||||
|
# Generate or validate secret
|
||||||
|
secret = args.secret or generate_secret()
|
||||||
|
if len(secret) < 8 or len(secret) > 64:
|
||||||
|
print("ERR: secret must be 8-64 characters", file=sys.stderr)
|
||||||
|
sys.exit(1)
|
||||||
|
|
||||||
|
if args.generate_only:
|
||||||
|
print(secret)
|
||||||
|
sys.exit(0)
|
||||||
|
|
||||||
|
if args.serial:
|
||||||
|
# Set secret via serial console (device must be running)
|
||||||
|
try:
|
||||||
|
import serial
|
||||||
|
except ImportError:
|
||||||
|
print("ERR: pyserial required (pip install pyserial)", file=sys.stderr)
|
||||||
|
sys.exit(1)
|
||||||
|
|
||||||
|
import time
|
||||||
|
s = serial.Serial(args.port, 921600, timeout=2)
|
||||||
|
s.reset_input_buffer()
|
||||||
|
s.write(f"AUTH {secret}\n".encode())
|
||||||
|
time.sleep(0.5)
|
||||||
|
out = s.read(s.in_waiting or 256).decode("utf-8", errors="replace")
|
||||||
|
s.close()
|
||||||
|
|
||||||
|
ok = False
|
||||||
|
for line in out.splitlines():
|
||||||
|
if "OK AUTH on" in line:
|
||||||
|
ok = True
|
||||||
|
break
|
||||||
|
|
||||||
|
if ok:
|
||||||
|
print(f"Secret set: {secret}")
|
||||||
|
else:
|
||||||
|
print(f"ERR: unexpected response: {out.strip()}", file=sys.stderr)
|
||||||
|
sys.exit(1)
|
||||||
|
sys.exit(0)
|
||||||
|
|
||||||
|
# NVS flash method
|
||||||
|
if not os.path.isfile(NVS_GEN):
|
||||||
|
print(f"ERR: NVS generator not found: {NVS_GEN}", file=sys.stderr)
|
||||||
|
print(" Ensure ESP-IDF is installed at ~/esp/esp-idf/", file=sys.stderr)
|
||||||
|
sys.exit(1)
|
||||||
|
|
||||||
|
with tempfile.TemporaryDirectory() as tmpdir:
|
||||||
|
csv_path = os.path.join(tmpdir, "nvs.csv")
|
||||||
|
bin_path = os.path.join(tmpdir, "nvs.bin")
|
||||||
|
|
||||||
|
# Generate NVS partition binary
|
||||||
|
create_nvs_csv(secret, csv_path)
|
||||||
|
result = subprocess.run(
|
||||||
|
[sys.executable, NVS_GEN, "generate", csv_path, bin_path, NVS_SIZE],
|
||||||
|
capture_output=True, text=True,
|
||||||
|
)
|
||||||
|
if result.returncode != 0:
|
||||||
|
print(f"ERR: NVS generation failed:\n{result.stderr}", file=sys.stderr)
|
||||||
|
sys.exit(1)
|
||||||
|
|
||||||
|
# Flash NVS partition
|
||||||
|
print(f"Flashing auth secret to NVS at {NVS_OFFSET}...")
|
||||||
|
result = subprocess.run(
|
||||||
|
[
|
||||||
|
sys.executable, "-m", "esptool",
|
||||||
|
"--port", args.port, "--baud", args.baud,
|
||||||
|
"write_flash", NVS_OFFSET, bin_path,
|
||||||
|
],
|
||||||
|
capture_output=True, text=True,
|
||||||
|
)
|
||||||
|
if result.returncode != 0:
|
||||||
|
print(f"ERR: flash failed:\n{result.stderr}", file=sys.stderr)
|
||||||
|
sys.exit(1)
|
||||||
|
|
||||||
|
print(f"Secret provisioned: {secret}")
|
||||||
|
print(f" Add to environment: export ESP_CMD_SECRET=\"{secret}\"")
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
main()
|
||||||
Reference in New Issue
Block a user