add audio-diag and voice-smoke test tools

This commit is contained in:
Username
2026-02-24 12:23:50 +01:00
parent 54a70c108f
commit 08ac934b17
3 changed files with 528 additions and 0 deletions

0
tests/tools/__init__.py Normal file
View File

322
tests/tools/audio-diag Executable file
View File

@@ -0,0 +1,322 @@
#!/usr/bin/env python3
"""Audio diagnostics — verify devices, codec, and loopback.
Usage:
audio-diag Run all checks
audio-diag --devices List audio devices only
audio-diag --codec Test Opus encode/decode roundtrip
audio-diag --loopback [SEC] Capture mic -> encode -> decode -> playback
audio-diag --help Show this help
"""
from __future__ import annotations
import argparse
import struct
import sys
import time
RST = "\033[0m"
DIM = "\033[2m"
GRN = "\033[38;5;108m"
RED = "\033[38;5;131m"
YEL = "\033[38;5;179m"
CYN = "\033[38;5;109m"
RATE = 48000
CHANNELS = 1
FRAME = 960
DTYPE = "int16"
def ok(msg: str) -> None:
print(f" {GRN}\u2713{RST} {msg}")
def fail(msg: str) -> None:
print(f" {RED}\u2717{RST} {msg}")
def warn(msg: str) -> None:
print(f" {YEL}\u26a0{RST} {msg}")
def info(msg: str) -> None:
print(f" {DIM}{msg}{RST}")
def heading(msg: str) -> None:
print(f"\n{CYN}{msg}{RST}")
# -- checks ------------------------------------------------------------------
def check_portaudio() -> bool:
heading("PortAudio")
try:
import sounddevice as sd # noqa: F401
ok("library loaded")
return True
except OSError as e:
fail(f"not found: {e}")
info("install: apt install libportaudio2")
return False
def check_devices() -> bool:
import sounddevice as sd
heading("Audio Devices")
devices = sd.query_devices()
if not devices:
fail("no devices found")
return False
default_in = sd.default.device[0]
default_out = sd.default.device[1]
has_input = False
has_output = False
for i, d in enumerate(devices):
markers = []
if i == default_in:
markers.append("default-in")
if i == default_out:
markers.append("default-out")
tag = f" {YEL}({', '.join(markers)}){RST}" if markers else ""
ch_in = d["max_input_channels"]
ch_out = d["max_output_channels"]
print(f" {DIM}{i:>2}{RST} {d['name']}"
f" {DIM}in={ch_in} out={ch_out}{RST}{tag}")
if ch_in > 0:
has_input = True
if ch_out > 0:
has_output = True
if not has_input:
warn("no input devices")
if not has_output:
warn("no output devices")
if has_input and has_output:
ok(f"{len(devices)} devices, input+output available")
return has_input and has_output
def check_opus() -> bool:
heading("Opus Codec")
try:
import opuslib
except ImportError:
fail("opuslib not installed")
info("install: pip install opuslib")
return False
ok("opuslib loaded")
# roundtrip: sine wave -> encode -> decode -> verify
encoder = opuslib.Encoder(RATE, CHANNELS, opuslib.APPLICATION_VOIP)
decoder = opuslib.Decoder(RATE, CHANNELS)
# generate 20ms of 440Hz sine (int16)
import math
pcm = b""
for i in range(FRAME):
sample = int(16000 * math.sin(2 * math.pi * 440 * i / RATE))
pcm += struct.pack("<h", sample)
encoded = encoder.encode(pcm, FRAME)
decoded = decoder.decode(encoded, FRAME)
ok(f"encode: {len(pcm)} bytes -> {len(encoded)} bytes")
ok(f"decode: {len(encoded)} bytes -> {len(decoded)} bytes")
# verify decoded is close to original (not silent)
samples_in = struct.unpack(f"<{FRAME}h", pcm)
samples_out = struct.unpack(f"<{FRAME}h", decoded)
rms_in = (sum(s * s for s in samples_in) / FRAME) ** 0.5
rms_out = (sum(s * s for s in samples_out) / FRAME) ** 0.5
if rms_out < rms_in * 0.5:
warn(f"decoded RMS ({rms_out:.0f}) much lower than input ({rms_in:.0f})")
else:
ok(f"roundtrip RMS: {rms_in:.0f} -> {rms_out:.0f}")
return True
def check_streams() -> bool:
import sounddevice as sd
heading("Stream Open/Close")
errors = []
try:
s = sd.RawInputStream(
samplerate=RATE, channels=CHANNELS,
dtype=DTYPE, blocksize=FRAME,
)
s.start()
time.sleep(0.05)
s.stop()
s.close()
ok("input stream")
except Exception as e:
fail(f"input stream: {e}")
errors.append("input")
try:
s = sd.RawOutputStream(
samplerate=RATE, channels=CHANNELS,
dtype=DTYPE, blocksize=FRAME,
)
s.start()
time.sleep(0.05)
s.stop()
s.close()
ok("output stream")
except Exception as e:
fail(f"output stream: {e}")
errors.append("output")
return len(errors) == 0
def run_loopback(seconds: float) -> None:
"""Capture -> Opus encode -> decode -> playback in real time."""
import opuslib
import sounddevice as sd
heading(f"Loopback Test ({seconds:.1f}s)")
info("mic -> opus encode -> opus decode -> speakers")
encoder = opuslib.Encoder(RATE, CHANNELS, opuslib.APPLICATION_VOIP)
decoder = opuslib.Decoder(RATE, CHANNELS)
import queue
buf: queue.Queue[bytes] = queue.Queue(maxsize=100)
stats = {"captured": 0, "played": 0, "dropped": 0}
def capture_cb(indata, frames, time_info, status):
try:
encoded = encoder.encode(bytes(indata), FRAME)
pcm = decoder.decode(encoded, FRAME)
buf.put_nowait(pcm)
stats["captured"] += 1
except queue.Full:
stats["dropped"] += 1
def playback_cb(outdata, frames, time_info, status):
try:
pcm = buf.get_nowait()
n = min(len(pcm), len(outdata))
outdata[:n] = pcm[:n]
if n < len(outdata):
outdata[n:] = b"\x00" * (len(outdata) - n)
stats["played"] += 1
except queue.Empty:
outdata[:] = b"\x00" * len(outdata)
inp = sd.RawInputStream(
samplerate=RATE, channels=CHANNELS,
dtype=DTYPE, blocksize=FRAME, callback=capture_cb,
)
out = sd.RawOutputStream(
samplerate=RATE, channels=CHANNELS,
dtype=DTYPE, blocksize=FRAME, callback=playback_cb,
)
out.start()
inp.start()
info("speak into mic now...")
try:
time.sleep(seconds)
except KeyboardInterrupt:
pass
inp.stop()
out.stop()
inp.close()
out.close()
expected = int(seconds * RATE / FRAME)
ok(f"captured={stats['captured']} played={stats['played']} "
f"dropped={stats['dropped']} (expected ~{expected})")
# -- main --------------------------------------------------------------------
def main() -> int:
parser = argparse.ArgumentParser(
description="Audio pipeline diagnostics",
formatter_class=argparse.RawDescriptionHelpFormatter,
)
parser.add_argument(
"--devices", action="store_true", help="list audio devices",
)
parser.add_argument(
"--codec", action="store_true", help="test opus roundtrip",
)
parser.add_argument(
"--loopback", nargs="?", const=3.0, type=float, metavar="SEC",
help="mic->encode->decode->speakers (default: 3s)",
)
parser.add_argument(
"--version", action="version", version="audio-diag 0.1.0",
)
args = parser.parse_args()
# specific mode
if args.devices:
if not check_portaudio():
return 1
check_devices()
return 0
if args.codec:
return 0 if check_opus() else 1
if args.loopback is not None:
if not check_portaudio():
return 1
run_loopback(args.loopback)
return 0
# full diagnostic
print(f"{CYN}tuimble audio diagnostics{RST}")
passed = 0
total = 0
total += 1
if check_portaudio():
passed += 1
else:
print(f"\n{RED}cannot continue without PortAudio{RST}")
return 1
total += 1
if check_devices():
passed += 1
total += 1
if check_opus():
passed += 1
total += 1
if check_streams():
passed += 1
heading("Summary")
color = GRN if passed == total else YEL
print(f" {color}{passed}/{total} checks passed{RST}")
return 0 if passed == total else 1
if __name__ == "__main__":
sys.exit(main())

206
tests/tools/voice-smoke Executable file
View File

@@ -0,0 +1,206 @@
#!/usr/bin/env python3
"""Headless voice smoke test against a live Mumble server.
Connects to a server, starts the audio pipeline, captures mic for a few
seconds, sends encoded frames, listens for incoming audio, then disconnects.
No TUI needed — pure terminal output.
Usage:
voice-smoke HOST [--port PORT] [--user NAME] [--seconds SEC]
voice-smoke --help
"""
from __future__ import annotations
import argparse
import sys
import time
RST = "\033[0m"
DIM = "\033[2m"
GRN = "\033[38;5;108m"
RED = "\033[38;5;131m"
YEL = "\033[38;5;179m"
CYN = "\033[38;5;109m"
BLD = "\033[1m"
def ok(msg: str) -> None:
print(f" {GRN}\u2713{RST} {msg}")
def fail(msg: str) -> None:
print(f" {RED}\u2717{RST} {msg}")
def warn(msg: str) -> None:
print(f" {YEL}\u26a0{RST} {msg}")
def info(msg: str) -> None:
print(f" {DIM}{msg}{RST}")
def heading(msg: str) -> None:
print(f"\n{CYN}{msg}{RST}")
def main() -> int:
parser = argparse.ArgumentParser(
description="Headless voice smoke test",
formatter_class=argparse.RawDescriptionHelpFormatter,
)
parser.add_argument("host", help="mumble server hostname")
parser.add_argument("--port", type=int, default=64738, help="server port")
parser.add_argument("--user", default="tuimble-test", help="username")
parser.add_argument(
"--seconds", type=float, default=5.0,
help="capture+send duration (default: 5)",
)
parser.add_argument(
"--version", action="version", version="voice-smoke 0.1.0",
)
args = parser.parse_args()
print(f"{CYN}voice smoke test{RST} {DIM}{args.host}:{args.port}{RST}")
# -- dependencies --------------------------------------------------------
heading("Prerequisites")
try:
import sounddevice as sd # noqa: F401
ok("sounddevice + PortAudio")
except OSError as e:
fail(f"PortAudio: {e}")
return 1
try:
import opuslib # noqa: F401
ok("opuslib")
except ImportError:
fail("opuslib not installed")
return 1
from tuimble.audio import AudioPipeline
from tuimble.client import MumbleClient
ok("tuimble imports")
# -- connect -------------------------------------------------------------
heading("Connection")
info(f"connecting to {args.host}:{args.port} as {args.user}...")
client = MumbleClient(
host=args.host, port=args.port, username=args.user,
)
rx_stats = {"frames": 0, "bytes": 0}
def on_sound(_user, pcm_data: bytes) -> None:
rx_stats["frames"] += 1
rx_stats["bytes"] += len(pcm_data)
client.on_sound_received = on_sound
try:
client.connect()
except Exception as e:
fail(f"connect: {e}")
return 1
ok(f"connected (channels={len(client.channels)} users={len(client.users)})")
for cid, ch in client.channels.items():
n_users = sum(
1 for u in client.users.values() if u.channel_id == cid
)
info(f" {ch.name} ({n_users} users)")
# -- audio pipeline ------------------------------------------------------
heading("Audio Pipeline")
audio = AudioPipeline()
try:
audio.start()
ok("streams opened (input + output)")
except Exception as e:
fail(f"audio start: {e}")
client.disconnect()
return 1
# -- capture + send ------------------------------------------------------
heading(f"Transmit ({args.seconds:.1f}s)")
info("capturing mic + encoding + sending to server...")
audio.capturing = True
tx_frames = 0
deadline = time.monotonic() + args.seconds
try:
while time.monotonic() < deadline:
frame = audio.get_capture_frame()
if frame is not None:
client.send_audio(frame)
tx_frames += 1
else:
time.sleep(0.005)
except KeyboardInterrupt:
pass
audio.capturing = False
expected = int(args.seconds * 48000 / 960)
pct = (tx_frames / expected * 100) if expected else 0
if tx_frames == 0:
warn(f"sent 0 frames (expected ~{expected})")
info("mic may be muted or no input device")
elif pct < 80:
warn(f"sent {tx_frames} frames ({pct:.0f}% of expected ~{expected})")
else:
ok(f"sent {tx_frames} frames ({pct:.0f}% of ~{expected})")
# -- listen for incoming -------------------------------------------------
heading("Receive (2s)")
info("listening for incoming voice...")
time.sleep(2.0)
if rx_stats["frames"] > 0:
ok(f"received {rx_stats['frames']} frames "
f"({rx_stats['bytes']} bytes)")
else:
info("no incoming audio (normal if alone on server)")
# -- cleanup -------------------------------------------------------------
heading("Cleanup")
audio.stop()
ok("audio stopped")
client.disconnect()
ok("disconnected")
# -- summary -------------------------------------------------------------
heading("Summary")
results = []
if tx_frames > 0:
results.append(f"tx={tx_frames}")
if rx_stats["frames"] > 0:
results.append(f"rx={rx_stats['frames']}")
if results:
ok(f"voice pipeline working ({', '.join(results)})")
else:
warn("no audio transmitted or received")
info("check: mic not muted, other users in channel")
return 0
if __name__ == "__main__":
sys.exit(main())