Compare commits

...

35 Commits

Author SHA1 Message Date
Username
67467c846c fix: debounce on_resize to prevent audio stutter 2026-02-28 14:18:18 +01:00
Username
2e70e73086 docs: add slash commands section 2026-02-28 14:12:12 +01:00
Username
4bf8adc5e2 app: add slash commands 2026-02-28 14:11:51 +01:00
Username
15fbf0040a client: add set_self_mute and register_self 2026-02-28 14:04:17 +01:00
Username
2533e43391 docs: add pitch shifting documentation
F6/F7 keybindings, audio.pitch config option, voice pitch section
in USAGE.md.  Updated profiling notes to reflect per-thread coverage.
2026-02-28 13:55:46 +01:00
Username
f94f94907d test: add pitch shifting and modulator tests
Covers PitchShifter passthrough, frequency shift direction, output
length preservation, semitone clamping.  AudioPipeline tests verify
pitch property defaults, get/set, clamping, and dequeue integration.
2026-02-28 13:55:40 +01:00
Username
26695e6e70 feat: add voice pitch shifting
F6/F7 adjust outgoing voice pitch in 1-semitone steps (-12 to +12).
PitchShifter integrates into the capture path at dequeue time so the
PortAudio callback is never blocked.  Status bar shows current pitch
when non-zero.  Config reload treats pitch as a safe change.
2026-02-28 13:55:34 +01:00
Username
e8f34b4d80 profiler: add per-thread cProfile support
threading.setprofile installs a bootstrap that creates a per-thread
cProfile.Profile.  Stats from all threads are merged on periodic
dumps and at exit, capturing worker-thread hotspots (audio send
loop, pitch shifting).
2026-02-28 13:55:18 +01:00
Username
b62993459a modulator: remove state reset on pitch change
The semitones setter called _reset() which zeroed _prev_in and
discarded _phase on every pitch adjustment.  Both buffers are
independent of the pitch ratio — the vocoder recomputes time-stretch
positions each frame — so the reset produced hard discontinuities
(clicks) and a thread-safety race with no benefit.
2026-02-28 13:53:29 +01:00
Username
76c9c494a7 update roadmap and tasklist: phase 4 complete 2026-02-25 09:05:06 +01:00
Username
d9373f8a3b test: add widget unit and integration tests 2026-02-25 09:03:36 +01:00
Username
3dbd126239 app: wire audio device hot-swap on change 2026-02-25 00:26:27 +01:00
Username
9e6c11e588 audio: add DeviceMonitor for device list polling 2026-02-24 23:50:23 +01:00
Username
df6f2ff354 fix input alignment and double border in chat area 2026-02-24 17:03:05 +01:00
Username
85f373a8b5 fix lint and formatting violations in tests and source 2026-02-24 16:50:51 +01:00
Username
0f476a25d5 docs: add code review response with applied changes summary 2026-02-24 16:47:50 +01:00
Username
d4a8f34dac update tasklist: phase E complete 2026-02-24 16:46:03 +01:00
Username
aa17159f7e client: add return annotations and cache users/channels properties 2026-02-24 16:45:45 +01:00
Username
bbd28e2840 audio: add -> None return annotations 2026-02-24 16:45:41 +01:00
Username
0ae0e77814 app: deduplicate config detection, suppress hold-mode PTT spam 2026-02-24 16:45:30 +01:00
Username
c0be5f164e update tasklist: phase D complete 2026-02-24 16:38:31 +01:00
Username
d117576449 test: refactor audio tests to use public interfaces
Replace direct queue/attribute access with capture_callback and
get_capture_frame. Add stop-drains-queues test.
2026-02-24 16:38:03 +01:00
Username
65de74193a test: add _strip_html edge cases and config validation tests
strip_html: nested tags, malformed HTML, comments, entities.
config: unknown key filtering, missing file, mixed valid/invalid.
2026-02-24 16:38:02 +01:00
Username
7c57e03e6d test: add InputHistory unit tests
Covers empty history, push/up/down navigation, draft preservation,
boundary clamping, and full navigation cycle.
2026-02-24 16:37:59 +01:00
Username
4c1a545a8b test: add ReconnectManager unit tests
Covers success, retry, non-retryable abort, max-retry exhaustion,
cancel from another thread, and backoff formula.
2026-02-24 16:37:57 +01:00
Username
e2039558d7 update tasklist: phase C complete 2026-02-24 16:33:14 +01:00
Username
0cf3702c8f app: extract ReconnectManager to reconnect.py
Self-contained reconnection state machine with threading.Event
for instant, thread-safe cancellation. Removes ~50 lines and
all reconnect state from TuimbleApp.
2026-02-24 16:32:29 +01:00
Username
216a4be4fd app: extract InputHistory, _make_client factory, cache _find_root
InputHistory encapsulates history navigation (up/down/push).
_make_client() deduplicates 4 MumbleClient instantiation sites.
_find_root() result cached in set_state() to avoid double lookup.
2026-02-24 16:32:16 +01:00
Username
a6380b53f7 update tasklist: phase A and B complete 2026-02-24 16:26:02 +01:00
Username
bfa79eadcb app: replace audio send polling with blocking queue.get
Deterministic wake on data arrival instead of 5ms sleep loop.
Reduces CPU wake-ups and eliminates up to 5ms of added latency.
2026-02-24 16:25:42 +01:00
Username
7a2c8e3a5d audio: replace struct pack/unpack with array module in _apply_gain
Eliminates format string construction and intermediate tuple/list
allocations. Also drains stale frames from queues on stop().
2026-02-24 16:25:32 +01:00
Username
88e8d4d923 update tasklist with phased improvement plan 2026-02-24 16:23:11 +01:00
Username
897c5b1f6c client: guard join_channel and send_text against stale ids
join_channel raises ValueError on missing channel instead of
KeyError. send_text handles missing channel_id gracefully.
2026-02-24 16:23:05 +01:00
Username
44da57d084 config: filter unknown toml keys before dataclass init
Prevents opaque TypeError on typos in config.toml; unknown
keys are logged as warnings and silently dropped.
2026-02-24 16:23:04 +01:00
Username
8be475f23f app: replace regex html stripping with stdlib parser
Handles malformed tags, nested markup, and CDATA that the
naive regex missed.
2026-02-24 16:23:03 +01:00
22 changed files with 2774 additions and 312 deletions

View File

@@ -11,6 +11,7 @@ TUI Mumble client with voice support and push-to-talk.
- Chat input history (Up/Down arrow navigation)
- Self-deafen toggle
- Volume control (input/output gain)
- Voice pitch shifting (deeper/higher)
- Client certificate authentication
- Auto-reconnect on network loss
- Config hot-reload (F5)
@@ -25,6 +26,8 @@ TUI Mumble client with voice support and push-to-talk.
| `F3` | Cycle input volume |
| `F4` | Push-to-talk (configurable) |
| `F5` | Reload config from disk |
| `F6` | Pitch down (1 semitone) |
| `F7` | Pitch up (1 semitone) |
| `Enter` | Send message / join channel (sidebar) |
| `Up` | Previous message (input) / previous channel (sidebar) |
| `Down` | Next message (input) / next channel (sidebar) |
@@ -57,6 +60,7 @@ username = "myname"
[audio]
# output_gain = 1.0
# input_gain = 1.0
# pitch = 0 # semitones, -12 to +12
[ptt]
key = "f4"

View File

@@ -28,5 +28,5 @@
- [x] Reconnection handling
- [x] Error recovery
- [ ] Audio device hot-swap
- [ ] Comprehensive test suite
- [x] Audio device hot-swap
- [x] Comprehensive test suite

View File

@@ -1,7 +1,58 @@
# Task List
## In Progress
(none)
## Pending
(none)
## Completed
### Phase 4 -- Robustness
- [x] Audio device hot-swap (DeviceMonitor polling + pipeline rebuild)
- [x] Widget tests (41 tests: StatusBar breakpoints, ChannelTree navigation, volume/truncate helpers)
### Phase E -- Polish
- [x] Deduplicate `_detect_config_changes` with `dataclasses.asdict()`
- [x] Suppress PTT chatlog spam for hold-mode
- [x] Add `-> None` return annotations throughout
- [x] Cache `users`/`channels` in `MumbleClient` with dirty flag
### Phase D -- Test Coverage
- [x] Add `ReconnectManager` unit tests (8 tests)
- [x] Add `InputHistory` unit tests (11 tests)
- [x] Add `_strip_html` edge case tests (13 tests)
- [x] Add config `_load_section` / `load_config` tests (5 tests)
- [x] Refactor `test_audio.py` to use public interfaces
- [x] Add `stop()` queue drain test
### Phase C -- Architecture
- [x] Cache `_find_root()` result in `ChannelTree.set_state()`
- [x] Extract `InputHistory` class
- [x] Extract `_make_client()` factory (4 duplicate sites)
- [x] Extract `ReconnectManager` to `reconnect.py` (`threading.Event`)
### Phase A -- Safety
- [x] Replace `_strip_html` regex with `html.parser` stripper
- [x] Add defensive config loading (filter unknown TOML keys)
- [x] Guard `join_channel`/`send_text` against `KeyError`
### Phase B -- Performance
- [x] Replace `_apply_gain` struct with `array` module (numpy unavailable)
- [x] Replace `time.sleep(0.005)` polling with `queue.get(timeout=)`
- [x] Drain audio queues on `stop()`
### Earlier Work
- [x] Wire TUI to MumbleClient (connect on startup, display state)
- [x] Implement text message send/receive in chat log
- [x] Channel tree population from server data
@@ -14,7 +65,3 @@
- [x] Server certificate handling (certfile/keyfile config)
- [x] Config file hot-reload (F5, safe/restart change detection)
- [x] Reconnection handling with auto-retry and backoff
## Pending
- [ ] Audio device hot-swap

View File

@@ -17,12 +17,24 @@ tuimble --host mumble.example.com --user myname
| `F3` | Cycle input volume |
| `F4` | Push-to-talk (configurable) |
| `F5` | Reload config from disk |
| `F6` | Pitch down (1 semitone) |
| `F7` | Pitch up (1 semitone) |
| `Enter` | Send message / join channel (sidebar) |
| `Up` | Previous message (input) / previous channel (sidebar) |
| `Down` | Next message (input) / next channel (sidebar) |
| `q` | Quit |
| `Ctrl+C` | Quit |
## Slash Commands
| Command | Action |
|---------|--------|
| `/help` | List available commands |
| `/deafen` | Toggle self-deafen |
| `/mute` | Toggle self-mute (suppresses PTT) |
| `/unmute` | Unmute yourself |
| `/register` | Register your identity on the server |
## Push-to-Talk Modes
- **toggle** — press to start, press again to stop (default)
@@ -45,8 +57,8 @@ python3 -m pstats /tmp/tuimble.prof # interactive explorer
# pip install snakeviz && snakeviz /tmp/tuimble.prof
```
Note: cProfile captures the main thread only. Background workers
started with `@work(thread=True)` are not included.
Profiling covers both the main thread and all worker threads
(including the audio send loop where pitch shifting runs).
## Volume Control
@@ -60,6 +72,21 @@ output_gain = 0.75 # 0.0 to 2.0 (default 1.0)
input_gain = 1.5 # 0.0 to 2.0 (default 1.0)
```
## Voice Pitch
`F6` and `F7` adjust outgoing voice pitch in 1-semitone steps,
ranging from -12 to +12. Negative values make the voice deeper,
positive values make it higher. The current setting is shown in the
status bar when non-zero.
```toml
[audio]
pitch = 3 # semitones, -12 to +12 (default 0)
```
When pitch is 0, the processing step is skipped entirely (no CPU
overhead).
## Client Certificates
For servers requiring client certificate authentication:
@@ -77,7 +104,7 @@ Both must be PEM-encoded. Leave empty to connect without a certificate.
`F5` reloads `~/.config/tuimble/config.toml` from disk.
**Safe changes** (applied immediately): PTT key/mode/backend, audio
gain values.
gain values, pitch.
**Restart-requiring changes** (server settings, audio device/rate):
shown as warnings. Press `F5` again to confirm reconnect, or any

View File

@@ -11,6 +11,7 @@ dependencies = [
"textual>=1.0.0",
"pymumble>=1.6",
"sounddevice>=0.5.0",
"numpy>=1.24.0",
"tomli>=2.0.0;python_version<'3.11'",
]
@@ -38,3 +39,4 @@ select = ["E", "F", "W", "I"]
[tool.pytest.ini_options]
testpaths = ["tests"]
asyncio_mode = "strict"

View File

@@ -0,0 +1,142 @@
# Code Review Response
**Original review:** `code-review-2026-02-24.md`
**Date applied:** 2026-02-24
**Commits:** 14 functional + 4 tasklist updates (be6574a..d4a8f34)
**Test suite:** 83 tests, all passing
---
## Summary
All 17 items in the priority matrix were evaluated. 15 were applied across five
phases (A--E). Two were deferred to the backlog with rationale.
### Disposition by severity
| Severity | Total | Applied | Deferred | Notes |
|----------|-------|---------|----------|--------------------------------|
| Critical | 4 | 3 | 1 | Widget tests deferred (backlog)|
| Important| 9 | 9 | 0 | |
| Nice | 4 | 3 | 1 | Frozen config deferred |
---
## Phase A -- Safety
| # | Item | Resolution |
|---|------|------------|
| 2 | Replace regex HTML stripping | Replaced with `html.parser.HTMLParser` subclass. Handles malformed tags, comments, nested markup, entities. 13 edge-case tests added. |
| 3 | Validate TOML config keys | Added `_load_section()` that filters unknown keys via `dataclasses.fields()` before construction. Logs warnings for typos. 5 tests added. |
| 10 | Guard `join_channel`/`send_text` | `join_channel` uses `.get()` + `ValueError`. `send_text` wraps lookup in `try/except (KeyError, AttributeError)`. |
**Commits:**
```
8be475f app: replace regex html stripping with stdlib parser
44da57d config: filter unknown toml keys before dataclass init
897c5b1 client: guard join_channel and send_text against stale ids
```
---
## Phase B -- Performance
| # | Item | Resolution |
|---|------|------------|
| 5 | Use numpy for `_apply_gain` | **Deviation:** numpy is NOT a transitive dep of sounddevice (only cffi). Used stdlib `array.array("h")` instead -- eliminates struct pack/unpack overhead without adding a dependency. |
| 6 | Replace polling sleep with `queue.get(timeout=)` | `get_capture_frame()` now accepts `timeout` param. Send loop uses `timeout=0.02`. No polling, instant wake on data. |
| 15 | Drain audio queues on `stop()` | Both capture and playback queues drained after stream close. Prevents stale frames leaking on restart. Test added. |
**Commits:**
```
7a2c8e3 audio: replace struct pack/unpack with array module in _apply_gain
bfa79ea app: replace audio send polling with blocking queue.get
```
---
## Phase C -- Architecture
| # | Item | Resolution |
|---|------|------------|
| 1 | Extract reconnect manager | Extracted to `src/tuimble/reconnect.py` (93 lines). Uses `threading.Event` for cancellation (item #7 resolved simultaneously). Instant cancel, no polling. |
| 7 | `threading.Event` for reconnect | Resolved as part of item #1 above. `Event.wait(timeout=delay)` replaces the 0.5s sleep loop. |
| 8 | Extract `_make_client()` factory | Single factory method replaces 4 duplicate 7-line instantiation blocks. |
| 9 | Compile `_strip_html` regex | Superseded by item #2 (HTMLParser replacement). No regex remains. |
| 13 | Extract `InputHistory` class | Extracted to standalone class in `app.py` (29 lines). Clean `push`/`up`/`down` interface. 11 tests added. |
| -- | Cache `_find_root()` | `ChannelTree.set_state()` now caches `_root_id`. Eliminates redundant traversal during render. |
**Commits:**
```
216a4be app: extract InputHistory, _make_client factory, cache _find_root
0cf3702 app: extract ReconnectManager to reconnect.py
```
---
## Phase D -- Test Coverage
| # | Item | Resolution |
|---|------|------------|
| 4 | Widget/app tests | **Partially addressed.** Extracted components (ReconnectManager, InputHistory, _strip_html, config loading) are now fully tested without the TUI harness. Widget-level tests (StatusBar breakpoints, ChannelTree navigation) deferred to backlog -- requires Textual `App.run_test()`. |
| -- | Refactor test_audio.py | Replaced private attribute access with public interface calls (`_capture_callback` + `get_capture_frame` instead of direct queue manipulation). |
**Test additions:**
- `test_reconnect.py` -- 8 tests (backoff formula, cancellation, exhaustion, non-retryable abort)
- `test_history.py` -- 11 tests (navigation, draft preservation, push reset, full cycle)
- `test_strip_html.py` -- 13 tests (nested tags, entities, comments, malformed HTML)
- `test_config.py` -- 5 new tests (_load_section filtering, missing file, unknown keys)
- `test_audio.py` -- 1 new test (stop drains queues), existing tests refactored
**Commits:**
```
4c1a545 test: add ReconnectManager unit tests
7c57e03 test: add InputHistory unit tests
65de741 test: add _strip_html edge cases and config validation tests
d117576 test: refactor audio tests to use public interfaces
```
---
## Phase E -- Polish
| # | Item | Resolution |
|---|------|------------|
| 12 | Deduplicate config change detection | `_detect_config_changes` and `_apply_restart_changes` now use `dataclasses.asdict()` for comparison. New config fields are automatically picked up. |
| 11 | Suppress PTT chatlog spam | `_on_ptt_change` skips chatlog writes when `ptt.mode == "hold"`. Status bar indicator still updates in real time. |
| 16 | Add `-> None` return annotations | Added to 21 methods across `client.py` and `audio.py`. |
| -- | Cache `users`/`channels` properties | Properties now cache with dirty flags. Invalidated on connect, disconnect, and user/channel event callbacks. |
**Commits:**
```
0ae0e77 app: deduplicate config detection, suppress hold-mode PTT spam
bbd28e2 audio: add -> None return annotations
aa17159 client: add return annotations and cache users/channels properties
```
---
## Deferred Items
| # | Item | Reason |
|---|------|--------|
| 4 | Widget tests (StatusBar, ChannelTree) | Requires Textual async test harness (`App.run_test()`). Extracted logic is tested; widget integration tests are in the backlog. |
| 14 | Inline callback wrappers with lambdas | Low priority. Current named methods aid debugging (stack traces show `_cb_connected` vs `<lambda>`). |
| 17 | Frozen dataclasses for Config | Requires refactoring `_apply_safe_changes` which mutates `self._config.ptt`. Scope exceeds incremental polish. |
---
## Metrics
| | Before | After | Delta |
|----------------|--------|--------|--------|
| Source LOC | ~1,750 | 1,866 | +116 |
| Test LOC | ~480 | 818 | +338 |
| Test count | 45 | 83 | +38 |
| Modules | 7 | 8 | +1 |
| `app.py` LOC | ~999 | 965 | -34 |
New module: `src/tuimble/reconnect.py` (93 lines).
Net source growth is primarily from the extracted reconnect module and cached
property logic in `client.py`. `app.py` shrank despite gaining the
`InputHistory` class and `dataclasses` import.

View File

@@ -0,0 +1,738 @@
# tuimble Code Review
**Date:** 2026-02-24
**Scope:** Full codebase review (1,750 LOC across 7 modules)
**Constraints:** No theme/layout redesign. Targeted, incremental improvements only.
---
## 1. Architecture & Design
### [CRITICAL] `app.py` is a 999-line monolith
`TuimbleApp` directly owns connection lifecycle, audio management, reconnection
logic, config reload state machine, input history, PTT wiring, and all UI
updates. This makes it hard to test, hard to reason about, and hard to extend.
**Specific extractions worth making:**
#### a) Reconnection manager
The reconnect loop, backoff logic, attempt counting, and cancellation form a
self-contained state machine currently scattered across `_reconnect_loop`,
`_log_reconnect`, `_on_reconnect_success`, `_on_reconnect_exhausted`,
`_cancel_reconnect`, plus three boolean flags (`_reconnecting`,
`_reconnect_attempt`, `_intentional_disconnect`).
```python
# src/tuimble/reconnect.py
from __future__ import annotations
import time
import logging
from dataclasses import dataclass
from typing import Callable
log = logging.getLogger(__name__)
INITIAL_DELAY = 2
MAX_DELAY = 30
MAX_RETRIES = 10
@dataclass
class ReconnectState:
active: bool = False
attempt: int = 0
intentional: bool = False
class ReconnectManager:
"""Exponential backoff reconnection with cancellation."""
def __init__(
self,
connect_fn: Callable[[], None],
on_attempt: Callable[[int, float], None],
on_success: Callable[[], None],
on_failure: Callable[[int, str], None],
on_exhausted: Callable[[], None],
):
self._connect = connect_fn
self._on_attempt = on_attempt
self._on_success = on_success
self._on_failure = on_failure
self._on_exhausted = on_exhausted
self.state = ReconnectState()
def cancel(self) -> None:
self.state.active = False
self.state.attempt = 0
@property
def delay(self) -> float:
return min(
INITIAL_DELAY * (2 ** (self.state.attempt - 1)),
MAX_DELAY,
)
def run(self) -> None:
"""Blocking reconnect loop -- run in a worker thread."""
self.state.active = True
self.state.attempt = 0
while self.state.active:
self.state.attempt += 1
d = self.delay
self._on_attempt(self.state.attempt, d)
elapsed = 0.0
while elapsed < d and self.state.active:
time.sleep(0.5)
elapsed += 0.5
if not self.state.active:
break
try:
self._connect()
self.state.active = False
self.state.attempt = 0
self._on_success()
return
except Exception as exc:
self._on_failure(self.state.attempt, str(exc))
if self.state.attempt >= MAX_RETRIES:
self.state.active = False
self._on_exhausted()
return
```
Removes ~80 lines from `app.py` and makes the backoff logic unit-testable
without a TUI.
#### b) Input history
The history navigation in `on_key` (lines 928-953) is a classic "small feature
that grew legs." Extract to a reusable class:
```python
class InputHistory:
def __init__(self):
self._entries: list[str] = []
self._idx: int = -1
self._draft: str = ""
def push(self, text: str) -> None:
self._entries.append(text)
self._idx = -1
def up(self, current: str) -> str | None:
if not self._entries:
return None
if self._idx == -1:
self._draft = current
self._idx = len(self._entries) - 1
elif self._idx > 0:
self._idx -= 1
return self._entries[self._idx]
def down(self) -> str | None:
if self._idx == -1:
return None
if self._idx < len(self._entries) - 1:
self._idx += 1
return self._entries[self._idx]
self._idx = -1
return self._draft
```
---
### [IMPORTANT] Thread safety gaps in `_reconnect_loop`
`self._reconnecting` and `self._reconnect_attempt` are plain booleans/ints
accessed from both the main thread (via `_cancel_reconnect`,
`action_reload_config`) and the worker thread (`_reconnect_loop`). Python's GIL
makes this "mostly safe" for simple booleans, but it's fragile and not
guaranteed for compound operations.
`app.py:486-488`:
```python
while self._reconnecting: # read on worker thread
self._reconnect_attempt += 1 # read-modify-write on worker thread
```
Meanwhile `_cancel_reconnect` at line 572 writes both from the main thread.
Use `threading.Event` for the cancellation flag:
```python
self._reconnect_cancel = threading.Event()
# in _reconnect_loop:
while not self._reconnect_cancel.is_set():
...
# interruptible sleep:
if self._reconnect_cancel.wait(timeout=delay):
break
# in _cancel_reconnect:
self._reconnect_cancel.set()
```
This also eliminates the 0.5s sleep polling loop (lines 496-499), making
cancellation instant instead of up to 500ms delayed.
---
### [IMPORTANT] `MumbleClient` creates `User`/`Channel` objects on every property access
`client.py:112-141` -- The `users` and `channels` properties rebuild full
dictionaries from pymumble's internal state on every call.
`_refresh_channel_tree` in `app.py:688-699` calls both, then iterates all
users. During a debounced state update, this means three full traversals.
Consider caching with a dirty flag set by callbacks:
```python
@property
def users(self) -> dict[int, User]:
if self._users_dirty or self._users_cache is None:
self._users_cache = self._build_users()
self._users_dirty = False
return self._users_cache
```
---
### [NICE] Callback wiring pattern
`client.py:79-84` uses bare `None`-able attributes for callbacks:
```python
self.on_connected = None
self.on_disconnected = None
```
This works but lacks type safety and discoverability. Consider a typed protocol
or simple typed attributes:
```python
from typing import Protocol
class ClientCallbacks(Protocol):
def on_connected(self) -> None: ...
def on_disconnected(self) -> None: ...
def on_text_message(self, sender: str, text: str) -> None: ...
```
Not urgent -- current approach is pragmatic for the project size.
---
## 2. TUI/UX Best Practices
### [IMPORTANT] `_strip_html` recompiles regex on every message
`app.py:994-999`:
```python
def _strip_html(text: str) -> str:
import re
clean = re.sub(r"<[^>]+>", "", text)
return html.unescape(clean)
```
The `re` import inside the function is fine (cached by Python), but the pattern
is recompiled on every call. Compile once at module level:
```python
import re
_HTML_TAG_RE = re.compile(r"<[^>]+>")
def _strip_html(text: str) -> str:
return html.unescape(_HTML_TAG_RE.sub("", text))
```
---
### [IMPORTANT] PTT state logging is noisy
`app.py:969-973` -- Every PTT toggle writes to the chatlog. For toggle mode
this is acceptable, but if someone later implements hold-mode (evdev),
pressing/releasing space would flood the log with "transmitting"/"stopped
transmitting" on every keypress.
Add a config option or suppress log for hold-mode, or use the status bar alone:
```python
def _on_ptt_change(self, transmitting: bool) -> None:
self._audio.capturing = transmitting
status = self.query_one("#status", StatusBar)
status.ptt_active = transmitting
# Only log for toggle mode; hold-mode updates too frequently
if self._config.ptt.mode == "toggle":
chatlog = self.query_one("#chatlog", ChatLog)
if transmitting:
chatlog.write("[#e0af68] transmitting[/]")
else:
chatlog.write("[dim] stopped transmitting[/dim]")
```
---
### [NICE] `on_resize` sets sidebar width but doesn't refresh the channel tree
`app.py:977-981` -- After resize, the sidebar width changes but
`ChannelTree.render()` uses `self.content_size.width` which may not update
until the next render pass. The tree truncation logic depends on width. Call
`tree.refresh()` explicitly:
```python
def on_resize(self, event: events.Resize) -> None:
sidebar = self.query_one("#sidebar", ChannelTree)
sidebar.styles.width = max(16, min(32, event.size.width // 4))
sidebar.refresh()
self.query_one("#status", StatusBar).refresh()
```
---
## 3. Code Quality
### [CRITICAL] `load_config` passes raw TOML dicts to dataclass constructors without validation
`config.py:62-67`:
```python
if "server" in data:
cfg.server = ServerConfig(**data["server"])
```
If the TOML file contains an unexpected key (`ServerConfig(host="x",
typo_field="y")`), this raises an opaque `TypeError`. If a value has the wrong
type (`port = "abc"`), it silently accepts it and fails later.
Add defensive loading:
```python
def _load_section(cls, data: dict, section: str):
"""Load a dataclass section, ignoring unknown keys."""
raw = data.get(section, {})
import dataclasses
valid_keys = {f.name for f in dataclasses.fields(cls)}
filtered = {k: v for k, v in raw.items() if k in valid_keys}
return cls(**filtered)
# Usage:
cfg.server = _load_section(ServerConfig, data, "server")
```
This prevents crashes from typos in config files -- critical for a user-facing
tool.
---
### [IMPORTANT] Duplicate logic for creating `MumbleClient`
`MumbleClient` is instantiated in four places with the same pattern:
1. `__init__` (line 395)
2. `_reconnect_loop` (line 507)
3. `action_reload_config` disconnected path (line 864)
4. `_apply_restart_changes` (line 824)
Extract a factory method:
```python
def _make_client(self, srv: ServerConfig | None = None) -> MumbleClient:
srv = srv or self._config.server
return MumbleClient(
host=srv.host,
port=srv.port,
username=srv.username,
password=srv.password,
certfile=srv.certfile,
keyfile=srv.keyfile,
)
```
---
### [IMPORTANT] `ChannelTree._find_root` is called twice per render
`render()` at line 223 calls `_find_root()`, and `_build_channel_order()` at
line 173 also calls `_find_root()`. Both execute during `set_state()` then
`render()`. Cache the root ID:
```python
def set_state(self, channels, users_by_channel, my_channel_id=None):
self._channels = channels
self._users_by_channel = users_by_channel
self._my_channel_id = my_channel_id
self._root_id = self._find_root() if channels else 0
self._channel_ids = self._build_channel_order()
# ...
```
---
### [IMPORTANT] `_detect_config_changes` duplicates field-level comparison
`app.py:740-780` manually compares each field. If you add a new server field,
you must remember to add it to both `_detect_config_changes` AND
`_apply_restart_changes`. Use `dataclasses.asdict()` for comparison:
```python
from dataclasses import asdict
def _detect_config_changes(self, old: Config, new: Config):
safe, restart = [], []
for attr in ("key", "mode", "backend"):
if getattr(old.ptt, attr) != getattr(new.ptt, attr):
safe.append(
f"ptt.{attr}: {getattr(old.ptt, attr)} -> "
f"{getattr(new.ptt, attr)}"
)
for attr in ("input_gain", "output_gain"):
if getattr(old.audio, attr) != getattr(new.audio, attr):
safe.append(
f"audio.{attr}: {getattr(old.audio, attr)} -> "
f"{getattr(new.audio, attr)}"
)
if asdict(old.server) != asdict(new.server):
for k in asdict(old.server):
if getattr(old.server, k) != getattr(new.server, k):
label = "password" if k == "password" else k
restart.append(f"server.{label} changed")
for attr in ("input_device", "output_device", "sample_rate"):
if getattr(old.audio, attr) != getattr(new.audio, attr):
restart.append(f"audio.{attr} changed")
return safe, restart
```
---
### [NICE] Type annotations missing on several methods
- `client.py:86` `set_dispatcher(self, fn: Callable)` -- should be
`Callable | None`
- `client.py:93` `_dispatch(self, callback, *args)` -- no return type, no
`callback` type
- `client.py:154` `connect(self)` -- no return type annotation
- `audio.py:58` `start(self)` -- no return type
Add `-> None` where appropriate. Minor but compounds into unclear interfaces.
---
## 4. Performance
### [IMPORTANT] `_apply_gain` uses Python-level sample iteration
`audio.py:22-30` -- Unpacking, scaling, and repacking every sample via a list
comprehension in Python is slow for real-time audio (960 samples = 1920 bytes
per 20ms frame):
```python
samples = struct.unpack(fmt, pcm[: n * 2])
scaled = [max(-32768, min(32767, int(s * gain))) for s in samples]
return struct.pack(fmt, *scaled)
```
Use numpy (already a transitive dependency via sounddevice):
```python
import numpy as np
def _apply_gain(pcm: bytes, gain: float) -> bytes:
if not pcm:
return pcm
samples = np.frombuffer(pcm, dtype=np.int16)
scaled = np.clip(samples * gain, -32768, 32767).astype(np.int16)
return scaled.tobytes()
```
This is ~50-100x faster for a 960-sample frame. Since sounddevice already
pulls in numpy, there's no new dependency.
---
### [IMPORTANT] `_audio_send_loop` polls with `time.sleep(0.005)`
`app.py:676-684`:
```python
while self._client.connected:
frame = self._audio.get_capture_frame()
if frame is not None:
self._client.send_audio(frame)
else:
time.sleep(0.005)
```
5ms polling means up to 5ms of added latency on every frame, plus unnecessary
CPU wake-ups when idle. Use `queue.Queue.get(timeout=...)` instead:
```python
def get_capture_frame(self, timeout: float = 0.02) -> bytes | None:
try:
return self._capture_queue.get(timeout=timeout)
except queue.Empty:
return None
```
Then the send loop becomes:
```python
while self._client.connected:
frame = self._audio.get_capture_frame(timeout=0.02)
if frame is not None:
self._client.send_audio(frame)
```
No polling, no wasted cycles, deterministic wake on data arrival.
---
### [NICE] `ChannelTree.render()` rebuilds the entire tree string every refresh
For small channel lists (typical Mumble servers have 5-30 channels), this is
fine. But `_render_tree` is recursive and allocates many string fragments. If
performance becomes an issue, cache the rendered string and invalidate on
`set_state()`.
---
## 5. Scalability & Extensibility
### [IMPORTANT] No structured event bus
All communication between components flows through Textual's message system,
which is good. But the `_cb_*` callback wrappers in `app.py:462-475` are thin
boilerplate. Consider a mapping-driven approach:
```python
def _wire_client_callbacks(self) -> None:
self._client.set_dispatcher(self.call_from_thread)
self._client.on_connected = lambda: self.post_message(ServerConnected())
self._client.on_disconnected = lambda: self.post_message(ServerDisconnected())
self._client.on_text_message = (
lambda s, t: self.post_message(TextMessageReceived(s, t))
)
self._client.on_user_update = lambda: self.post_message(ServerStateChanged())
self._client.on_channel_update = (
lambda: self.post_message(ServerStateChanged())
)
self._client.on_sound_received = (
lambda _u, pcm: self._audio.queue_playback(pcm)
)
```
This eliminates six one-line methods. The audio callback bypasses the message
system correctly (it's hot-path, no UI update needed).
---
### [NICE] `Config` is mutable
Config dataclasses are mutable, and `_apply_safe_changes` directly mutates
`self._config.ptt`. Consider frozen dataclasses with a `replace()` pattern to
prevent accidental state corruption:
```python
@dataclass(frozen=True)
class PttConfig:
key: str = "f4"
mode: str = "toggle"
backend: str = "auto"
```
Then: `self._config = dataclasses.replace(self._config, ptt=new.ptt)`
---
## 6. Testing
### [CRITICAL] No tests for `TuimbleApp` or any widget
The test suite covers `audio.py`, `client.py`, `ptt.py`, and `config.py` --
all non-UI modules. There are zero tests for:
- `StatusBar` rendering (connection states, volume bars, responsive breakpoints)
- `ChannelTree` rendering (tree structure, truncation, focus navigation)
- `TuimbleApp` message handling (connect/disconnect/state change flows)
- Config reload state machine
- Input history navigation
Textual provides `App.run_test()` for async widget testing:
```python
import pytest
from textual.testing import AppTest
@pytest.mark.asyncio
async def test_status_bar_connected():
app = TuimbleApp()
async with app.run_test(size=(80, 24)) as pilot:
status = app.query_one("#status", StatusBar)
status.connected = True
await pilot.pause()
rendered = status.render()
assert "connected" in rendered
```
**Priority tests to add:**
1. `StatusBar.render()` at each width breakpoint (< 40, < 60, >= 60)
2. `ChannelTree` keyboard navigation (up/down/enter)
3. `_strip_html` with edge cases (nested tags, malformed HTML, entities)
4. `_next_volume` wraparound behavior
5. Config reload with unknown TOML keys (crash test)
6. `_detect_config_changes` safe vs restart classification
---
### [IMPORTANT] Tests access private attributes
`test_audio.py` directly accesses `ap._capture_queue`, `ap._playback_queue`,
`ap._sample_rate`. This couples tests to implementation. Where possible, test
through public interfaces:
```python
# Instead of: ap._capture_queue.put(b"\x01\x02\x03")
# Use the actual capture callback:
ap.capturing = True
ap._capture_callback(b"\x01\x02\x03", 1, None, None)
assert ap.get_capture_frame() == b"\x01\x02\x03"
```
---
## 7. Security & Robustness
### [CRITICAL] `_strip_html` regex is insufficient for Mumble HTML
`app.py:996-999` uses `re.sub(r"<[^>]+>", "", text)` which fails on:
- Malformed tags: `<img src="x" onerror="alert(1)"` (no closing `>`)
- Nested markup: `<a href="<script>">`
- CDATA/comments: `<!-- <script> -->`
While Textual's Rich markup won't execute scripts, malformed input could break
Rich's parser or produce garbled output. Consider using `html.parser` from the
stdlib:
```python
from html.parser import HTMLParser
class _HTMLStripper(HTMLParser):
def __init__(self):
super().__init__()
self._parts: list[str] = []
def handle_data(self, data: str):
self._parts.append(data)
def get_text(self) -> str:
return "".join(self._parts)
def _strip_html(text: str) -> str:
stripper = _HTMLStripper()
stripper.feed(text)
return html.unescape(stripper.get_text())
```
---
### [IMPORTANT] `join_channel` doesn't handle missing channel IDs
`client.py:240-243`:
```python
def join_channel(self, channel_id: int):
if self._mumble and self._connected:
self._mumble.channels[channel_id].move_in()
```
If `channel_id` is stale (channel was removed between tree render and user
pressing Enter), this raises `KeyError`. Add a guard:
```python
def join_channel(self, channel_id: int):
if self._mumble and self._connected:
ch = self._mumble.channels.get(channel_id)
if ch is None:
raise ValueError(f"channel {channel_id} not found")
ch.move_in()
```
---
### [IMPORTANT] `send_text` silently fails on missing channel
`client.py:229-233`:
```python
def send_text(self, message: str):
if self._mumble and self._connected:
ch = self._mumble.channels[self._mumble.users.myself["channel_id"]]
ch.send_text_message(message)
```
If `myself` has no `channel_id` key (edge case during connection setup), this
crashes with `KeyError`. Guard with try/except or a `my_channel_id` check
first.
---
### [NICE] `audio.py` `stop()` doesn't drain queues
After `stop()`, the capture and playback queues still hold stale frames. If
`start()` is called again (audio device hot-swap), old frames leak into the new
session:
```python
def stop(self):
for stream in (self._input_stream, self._output_stream):
if stream is not None:
stream.stop()
stream.close()
self._input_stream = None
self._output_stream = None
# Drain stale frames
while not self._capture_queue.empty():
try:
self._capture_queue.get_nowait()
except queue.Empty:
break
while not self._playback_queue.empty():
try:
self._playback_queue.get_nowait()
except queue.Empty:
break
```
---
## Priority Matrix
| # | Area | Severity | Item |
|----|---------------|-----------|------------------------------------------------------|
| 1 | Architecture | Critical | Extract reconnect manager from app.py |
| 2 | Security | Critical | Replace regex HTML stripping with proper parser |
| 3 | Robustness | Critical | Validate TOML config keys before dataclass init |
| 4 | Testing | Critical | Add widget/app tests using Textual's test harness |
| 5 | Performance | Important | Use numpy for `_apply_gain` (transitive dep exists) |
| 6 | Performance | Important | Replace polling sleep with `queue.get(timeout=)` |
| 7 | Thread safety | Important | Use `threading.Event` for reconnect cancellation |
| 8 | Code quality | Important | Extract `_make_client()` factory (4 dup sites) |
| 9 | Code quality | Important | Compile `_strip_html` regex at module level |
| 10 | Robustness | Important | Guard `join_channel`/`send_text` against `KeyError` |
| 11 | UX | Important | Suppress PTT chatlog spam for hold-mode |
| 12 | Code quality | Important | Deduplicate config change detection logic |
| 13 | Architecture | Nice | Extract `InputHistory` class |
| 14 | Architecture | Nice | Inline callback wrappers with lambdas |
| 15 | Robustness | Nice | Drain audio queues on `stop()` |
| 16 | Code quality | Nice | Add return type annotations throughout |
| 17 | Scalability | Nice | Consider frozen dataclasses for Config |

View File

@@ -31,10 +31,17 @@ def main():
def _run_profiled(app, dest):
"""Run the app under cProfile with periodic 30s dumps."""
"""Run the app under cProfile with periodic 30s dumps.
Profiles both the main thread and all worker threads (including
the audio send loop where PitchShifter.process runs). Each
thread gets its own cProfile.Profile; stats are merged on dump.
"""
import cProfile
import pstats
import threading
from pathlib import Path
from threading import Event, Thread
from threading import Event, Lock, Thread
if dest is None:
from tuimble.config import CONFIG_DIR
@@ -47,10 +54,34 @@ def _run_profiled(app, dest):
prof = cProfile.Profile()
stop = Event()
thread_profiles: list[cProfile.Profile] = []
lock = Lock()
def _thread_bootstrap(frame, event, arg):
"""Installed via threading.setprofile; creates a per-thread profiler.
Called once per new thread as the profile function. Immediately
creates and enables a cProfile.Profile whose C-level hook
replaces this Python-level one for the remainder of the thread.
"""
tp = cProfile.Profile()
with lock:
thread_profiles.append(tp)
tp.enable()
threading.setprofile(_thread_bootstrap)
def _dump_all():
"""Merge main + thread profiles and write to disk."""
stats = pstats.Stats(prof)
with lock:
for tp in thread_profiles:
stats.add(tp)
stats.dump_stats(str(dest))
def _periodic_dump():
while not stop.wait(30):
prof.dump_stats(str(dest))
_dump_all()
dumper = Thread(target=_periodic_dump, daemon=True)
dumper.start()
@@ -60,8 +91,12 @@ def _run_profiled(app, dest):
app.run()
finally:
prof.disable()
threading.setprofile(None)
with lock:
for tp in thread_profiles:
tp.disable()
stop.set()
prof.dump_stats(str(dest))
_dump_all()
if __name__ == "__main__":

View File

@@ -2,9 +2,10 @@
from __future__ import annotations
import dataclasses
import html
import logging
import time
from html.parser import HTMLParser
from textual import events, on, work
from textual.app import App, ComposeResult
@@ -13,19 +14,49 @@ from textual.message import Message
from textual.reactive import reactive
from textual.widgets import Footer, Header, Input, RichLog, Static
from tuimble.audio import AudioPipeline
from tuimble.client import Channel, ConnectionFailed, MumbleClient, User
from tuimble.audio import AudioPipeline, DeviceMonitor
from tuimble.client import Channel, MumbleClient, User
from tuimble.config import Config, load_config
from tuimble.ptt import KittyPtt, TogglePtt, detect_backend
from tuimble.reconnect import ReconnectManager
log = logging.getLogger(__name__)
VOLUME_STEPS = (0.0, 0.25, 0.5, 0.75, 1.0, 1.25, 1.5, 2.0)
RECONNECT_INITIAL = 2 # seconds before first retry
RECONNECT_MAX = 30 # maximum backoff delay
RECONNECT_RETRIES = 10 # attempts before giving up
TREE_DEBOUNCE = 0.1 # seconds to coalesce state changes
TREE_DEBOUNCE = 0.1 # seconds to coalesce state changes
class InputHistory:
"""Navigable command history for the chat input."""
def __init__(self):
self._entries: list[str] = []
self._idx: int = -1
self._draft: str = ""
def push(self, text: str) -> None:
self._entries.append(text)
self._idx = -1
def up(self, current: str) -> str | None:
if not self._entries:
return None
if self._idx == -1:
self._draft = current
self._idx = len(self._entries) - 1
elif self._idx > 0:
self._idx -= 1
return self._entries[self._idx]
def down(self) -> str | None:
if self._idx == -1:
return None
if self._idx < len(self._entries) - 1:
self._idx += 1
return self._entries[self._idx]
self._idx = -1
return self._draft
def _next_volume(current: float) -> float:
@@ -60,6 +91,12 @@ class ServerStateChanged(Message):
pass
class AudioDeviceChanged(Message):
"""Audio device list changed (hot-swap detected)."""
pass
class ChannelSelected(Message):
"""User selected a channel to join."""
@@ -78,9 +115,11 @@ class StatusBar(Static):
connected = reactive(False)
reconnecting = reactive(False)
self_deaf = reactive(False)
self_mute = reactive(False)
server_info = reactive("")
output_vol = reactive(100)
input_vol = reactive(100)
pitch = reactive(0)
@staticmethod
def _vol_bar(pct: int) -> str:
@@ -110,19 +149,27 @@ class StatusBar(Static):
deaf_sym = "[#f7768e]\u2298[/]" if self.self_deaf else ""
deaf_full = "[#f7768e]\u2298[/] deaf" if self.self_deaf else ""
mute_sym = "[#e0af68]\u2715[/]" if self.self_mute else ""
mute_full = "[#e0af68]\u2715[/] mute" if self.self_mute else ""
if w < 40:
return f" {conn_sym} {deaf_sym}{ptt_sym}"
return f" {conn_sym} {deaf_sym}{mute_sym}{ptt_sym}"
if w < 60:
return f" {conn_full} {deaf_full}{' ' if deaf_full else ''}{ptt_full}"
flags = f"{deaf_full}{' ' if deaf_full else ''}{mute_full}{' ' if mute_full else ''}"
return f" {conn_full} {flags}{ptt_full}"
vol = (
f" [dim]out[/]{self._vol_bar(self.output_vol)}"
f" [dim]in[/]{self._vol_bar(self.input_vol)}"
)
if self.pitch != 0:
sign = "+" if self.pitch > 0 else ""
pitch_str = f" [dim]pitch[/]{sign}{self.pitch}"
else:
pitch_str = ""
info = f" [dim]{self.server_info}[/]" if self.server_info else ""
deaf = f"{deaf_full} " if deaf_full else ""
return f" {conn_full} {deaf}{ptt_full}{vol}{info}"
flags = f"{deaf_full}{' ' if deaf_full else ''}{mute_full}{' ' if mute_full else ''}"
return f" {conn_full} {flags}{ptt_full}{vol}{pitch_str}{info}"
class ChannelTree(Static):
@@ -138,6 +185,7 @@ class ChannelTree(Static):
self._channel_ids: list[int] = []
self._focused_idx: int = 0
self._my_channel_id: int | None = None
self._root_id: int = 0
def set_state(
self,
@@ -148,6 +196,7 @@ class ChannelTree(Static):
self._channels = channels
self._users_by_channel = users_by_channel
self._my_channel_id = my_channel_id
self._root_id = self._find_root() if channels else 0
self._channel_ids = self._build_channel_order()
if self._channel_ids:
self._focused_idx = max(
@@ -170,8 +219,7 @@ class ChannelTree(Static):
if not self._channels:
return []
order: list[int] = []
root_id = self._find_root()
self._collect_order(root_id, order)
self._collect_order(self._root_id, order)
return order
def _collect_order(self, channel_id: int, order: list[int]) -> None:
@@ -180,8 +228,11 @@ class ChannelTree(Static):
return
order.append(channel_id)
children = sorted(
(c for c in self._channels.values()
if c.parent_id == channel_id and c.channel_id != channel_id),
(
c
for c in self._channels.values()
if c.parent_id == channel_id and c.channel_id != channel_id
),
key=lambda c: c.name,
)
for child in children:
@@ -220,8 +271,7 @@ class ChannelTree(Static):
w = self._get_width()
lines = [" [bold]Channels[/]"]
root_id = self._find_root()
self._render_tree(root_id, lines, indent=1, is_last=True, w=w)
self._render_tree(self._root_id, lines, indent=1, is_last=True, w=w)
return "\n".join(lines)
def _find_root(self) -> int:
@@ -258,13 +308,9 @@ class ChannelTree(Static):
name = self._truncate(ch.name, name_max)
if is_focused:
lines.append(
f"{prefix}{branch} {marker}[reverse bold]{name}[/]"
)
lines.append(f"{prefix}{branch} {marker}[reverse bold]{name}[/]")
elif is_current:
lines.append(
f"{prefix}{branch} {marker}[bold #9ece6a]{name}[/]"
)
lines.append(f"{prefix}{branch} {marker}[bold #9ece6a]{name}[/]")
else:
lines.append(f"{prefix}{branch} {marker}[bold]{name}[/]")
@@ -284,9 +330,7 @@ class ChannelTree(Static):
bullet = "\u2514\u2500" if is_last_item else "\u251c\u2500"
uname = self._truncate(user.name, user_max)
status = self._user_status(user)
lines.append(
f"{sub_prefix}{bullet} [#7aa2f7]{uname}[/]{status}"
)
lines.append(f"{sub_prefix}{bullet} [#7aa2f7]{uname}[/]{status}")
for i, child in enumerate(children):
self._render_tree(
@@ -306,9 +350,7 @@ class ChannelTree(Static):
event.prevent_default()
event.stop()
elif event.key == "down":
self._focused_idx = min(
len(self._channel_ids) - 1, self._focused_idx + 1
)
self._focused_idx = min(len(self._channel_ids) - 1, self._focused_idx + 1)
self.refresh()
event.prevent_default()
event.stop()
@@ -359,9 +401,8 @@ class TuimbleApp(App):
scrollbar-size: 1 1;
}
#input {
dock: bottom;
height: 3;
border-top: solid #292e42;
border: none;
}
#status {
dock: bottom;
@@ -381,6 +422,8 @@ class TuimbleApp(App):
("f2", "cycle_output_volume", "Vol Out"),
("f3", "cycle_input_volume", "Vol In"),
("f5", "reload_config", "Reload"),
("f6", "pitch_down", "Pitch-"),
("f7", "pitch_up", "Pitch+"),
("q", "quit", "Quit"),
("ctrl+c", "quit", "Quit"),
]
@@ -388,21 +431,9 @@ class TuimbleApp(App):
def __init__(self):
super().__init__()
self._config: Config = load_config()
self._ptt = detect_backend(
self._on_ptt_change, self._config.ptt.backend
)
srv = self._config.server
self._client = MumbleClient(
host=srv.host,
port=srv.port,
username=srv.username,
password=srv.password,
certfile=srv.certfile,
keyfile=srv.keyfile,
)
self._history: list[str] = []
self._history_idx: int = -1
self._history_draft: str = ""
self._ptt = detect_backend(self._on_ptt_change, self._config.ptt.backend)
self._client = self._make_client()
self._history = InputHistory()
acfg = self._config.audio
self._audio = AudioPipeline(
sample_rate=acfg.sample_rate,
@@ -412,12 +443,34 @@ class TuimbleApp(App):
)
self._audio.input_gain = acfg.input_gain
self._audio.output_gain = acfg.output_gain
self._audio.pitch = acfg.pitch
self._device_monitor = DeviceMonitor(self._on_device_change)
self._pending_reload: Config | None = None
self._tree_refresh_timer = None
self._reconnecting: bool = False
self._reconnect_attempt: int = 0
self._resize_timer = None
self._pending_width: int = 80
self._reconnect = ReconnectManager(
connect_fn=self._reconnect_connect,
on_attempt=self._reconnect_on_attempt,
on_success=self._reconnect_on_success,
on_failure=self._reconnect_on_failure,
on_exhausted=self._reconnect_on_exhausted,
)
self._muted: bool = False
self._intentional_disconnect: bool = False
def _make_client(self, srv=None) -> MumbleClient:
"""Create a MumbleClient from the current (or given) server config."""
srv = srv or self._config.server
return MumbleClient(
host=srv.host,
port=srv.port,
username=srv.username,
password=srv.password,
certfile=srv.certfile,
keyfile=srv.keyfile,
)
def compose(self) -> ComposeResult:
yield Header()
with Horizontal(id="main"):
@@ -432,6 +485,7 @@ class TuimbleApp(App):
status = self.query_one("#status", StatusBar)
status.output_vol = int(self._audio.output_gain * 100)
status.input_vol = int(self._audio.input_gain * 100)
status.pitch = int(self._audio.pitch)
chatlog = self.query_one("#chatlog", ChatLog)
chatlog.write("[dim]tuimble v0.1.0[/dim]")
@@ -480,97 +534,54 @@ class TuimbleApp(App):
# -- auto-reconnect ------------------------------------------------------
@work(thread=True)
def _reconnect_loop(self) -> None:
"""Retry connection with exponential backoff."""
while self._reconnecting:
self._reconnect_attempt += 1
delay = min(
RECONNECT_INITIAL * (2 ** (self._reconnect_attempt - 1)),
RECONNECT_MAX,
)
self.call_from_thread(
self._log_reconnect, self._reconnect_attempt, delay,
)
def _reconnect_connect(self) -> None:
"""Called by ReconnectManager to attempt a new connection."""
self._client.disconnect()
self._client = self._make_client()
self._wire_client_callbacks()
self._client.connect()
elapsed = 0.0
while elapsed < delay and self._reconnecting:
time.sleep(0.5)
elapsed += 0.5
def _reconnect_on_attempt(self, attempt: int, delay: float) -> None:
self.call_from_thread(self._log_reconnect, attempt, delay)
if not self._reconnecting:
break
def _reconnect_on_success(self) -> None:
self.call_from_thread(self.post_message, ServerConnected())
try:
self._client.disconnect()
srv = self._config.server
self._client = MumbleClient(
host=srv.host,
port=srv.port,
username=srv.username,
password=srv.password,
certfile=srv.certfile,
keyfile=srv.keyfile,
)
self._wire_client_callbacks()
self._client.connect()
def _reconnect_on_failure(self, attempt: int, error: str) -> None:
self.call_from_thread(self._show_error, f"attempt {attempt}: {error}")
self._reconnecting = False
self._reconnect_attempt = 0
self.call_from_thread(self._on_reconnect_success)
return
except ConnectionFailed as exc:
if not exc.retryable:
self.call_from_thread(
self._show_error, f"rejected: {exc}",
)
self._reconnecting = False
self.call_from_thread(self._on_reconnect_exhausted)
return
self.call_from_thread(
self._show_error,
f"attempt {self._reconnect_attempt}: {exc}",
)
except Exception as exc:
self.call_from_thread(
self._show_error,
f"attempt {self._reconnect_attempt}: {exc}",
)
def _reconnect_on_exhausted(self) -> None:
self.call_from_thread(self._show_reconnect_exhausted)
if self._reconnect_attempt >= RECONNECT_RETRIES:
self._reconnecting = False
self.call_from_thread(self._on_reconnect_exhausted)
return
def _log_reconnect(self, attempt: int, delay: int) -> None:
def _log_reconnect(self, attempt: int, delay: float) -> None:
"""Log reconnection attempt to chatlog."""
from tuimble.reconnect import MAX_RETRIES
status = self.query_one("#status", StatusBar)
status.reconnecting = True
chatlog = self.query_one("#chatlog", ChatLog)
chatlog.write(
f"[dim]reconnecting in {delay}s "
f"(attempt {attempt}/{RECONNECT_RETRIES})...[/dim]"
f"[dim]reconnecting in {delay}s (attempt {attempt}/{MAX_RETRIES})...[/dim]"
)
def _on_reconnect_success(self) -> None:
"""Handle successful reconnection."""
self.post_message(ServerConnected())
def _on_reconnect_exhausted(self) -> None:
def _show_reconnect_exhausted(self) -> None:
"""Handle all reconnection attempts exhausted."""
from tuimble.reconnect import MAX_RETRIES
status = self.query_one("#status", StatusBar)
status.reconnecting = False
chatlog = self.query_one("#chatlog", ChatLog)
chatlog.write(
f"[#f7768e]reconnection failed after "
f"{RECONNECT_RETRIES} attempts[/]"
)
chatlog.write(f"[#f7768e]reconnection failed after {MAX_RETRIES} attempts[/]")
chatlog.write("[dim]press F5 to retry manually[/dim]")
@work(thread=True)
def _start_reconnect(self) -> None:
"""Run the reconnect manager in a worker thread."""
self._reconnect.run()
def _cancel_reconnect(self) -> None:
"""Cancel an in-progress reconnect loop."""
self._reconnecting = False
self._reconnect_attempt = 0
self._reconnect.cancel()
try:
status = self.query_one("#status", StatusBar)
status.reconnecting = False
@@ -581,21 +592,22 @@ class TuimbleApp(App):
def on_server_connected(self, _msg: ServerConnected) -> None:
self._intentional_disconnect = False
self._muted = False
status = self.query_one("#status", StatusBar)
status.reconnecting = False
status.connected = True
status.self_mute = False
srv = self._config.server
status.server_info = f"{srv.host}:{srv.port}"
chatlog = self.query_one("#chatlog", ChatLog)
chatlog.write(
f"[#9ece6a]\u2713 connected as {self._config.server.username}[/]"
)
chatlog.write(f"[#9ece6a]\u2713 connected as {self._config.server.username}[/]")
self._refresh_channel_tree()
self._start_audio()
def on_server_disconnected(self, _msg: ServerDisconnected) -> None:
self._device_monitor.stop()
self._audio.stop()
status = self.query_one("#status", StatusBar)
@@ -605,15 +617,13 @@ class TuimbleApp(App):
tree = self.query_one("#sidebar", ChannelTree)
tree.clear_state()
if self._intentional_disconnect or self._reconnecting:
if self._intentional_disconnect or self._reconnect.active:
return
chatlog = self.query_one("#chatlog", ChatLog)
chatlog.write("[#f7768e]\u2717 disconnected from server[/]")
self._reconnecting = True
self._reconnect_attempt = 0
self._reconnect_loop()
self._start_reconnect()
def on_text_message_received(self, msg: TextMessageReceived) -> None:
chatlog = self.query_one("#chatlog", ChatLog)
@@ -625,7 +635,8 @@ class TuimbleApp(App):
if self._tree_refresh_timer is not None:
self._tree_refresh_timer.stop()
self._tree_refresh_timer = self.set_timer(
TREE_DEBOUNCE, self._refresh_channel_tree,
TREE_DEBOUNCE,
self._refresh_channel_tree,
)
def on_channel_selected(self, msg: ChannelSelected) -> None:
@@ -647,8 +658,11 @@ class TuimbleApp(App):
if not text:
return
event.input.clear()
self._history.append(text)
self._history_idx = -1
self._history.push(text)
if text.startswith("/"):
self._dispatch_command(text)
return
if not self._client.connected:
self._show_error("not connected")
@@ -656,17 +670,101 @@ class TuimbleApp(App):
self._client.send_text(text)
chatlog = self.query_one("#chatlog", ChatLog)
chatlog.write(
f"[#e0af68]{self._config.server.username}[/] {text}"
)
chatlog.write(f"[#e0af68]{self._config.server.username}[/] {text}")
def _dispatch_command(self, text: str) -> None:
"""Handle slash commands."""
cmd = text.split()[0].lower()
chatlog = self.query_one("#chatlog", ChatLog)
if cmd == "/help":
chatlog.write("[dim]/deafen toggle self-deafen[/dim]")
chatlog.write("[dim]/mute toggle self-mute[/dim]")
chatlog.write("[dim]/unmute unmute yourself[/dim]")
chatlog.write("[dim]/register register on server[/dim]")
elif cmd == "/deafen":
self.action_toggle_deaf()
elif cmd == "/mute":
if self._muted:
chatlog.write("[dim]already muted[/dim]")
return
self._muted = True
self._audio.capturing = False
self._client.set_self_mute(True)
status = self.query_one("#status", StatusBar)
status.self_mute = True
status.ptt_active = False
chatlog.write("[#e0af68]\u2715 muted[/]")
elif cmd == "/unmute":
if not self._muted:
chatlog.write("[dim]already unmuted[/dim]")
return
self._muted = False
self._client.set_self_mute(False)
status = self.query_one("#status", StatusBar)
status.self_mute = False
chatlog.write("[#9ece6a]\u2713 unmuted[/]")
elif cmd == "/register":
if not self._client.connected:
self._show_error("not connected")
return
try:
self._client.register_self()
chatlog.write("[#9ece6a]\u2713 registration requested[/]")
except Exception as exc:
self._show_error(f"register failed: {exc}")
else:
self._show_error(f"unknown command: {cmd}")
# -- audio ---------------------------------------------------------------
def _on_device_change(self) -> None:
"""Called from DeviceMonitor thread on device list change."""
self.call_from_thread(self.post_message, AudioDeviceChanged())
@staticmethod
def _validate_device(device_id: int | None, kind: str) -> int | None:
"""Return *device_id* if still valid, else ``None`` (system default)."""
if device_id is None:
return None
try:
import sounddevice as sd
sd.query_devices(device_id, kind)
return device_id
except Exception:
return None
def on_audio_device_changed(self, _msg: AudioDeviceChanged) -> None:
"""Rebuild the audio pipeline when devices change."""
chatlog = self.query_one("#chatlog", ChatLog)
chatlog.write("[dim]audio device change detected[/dim]")
self._audio.stop()
acfg = self._config.audio
inp = self._validate_device(acfg.input_device, "input")
out = self._validate_device(acfg.output_device, "output")
self._audio = AudioPipeline(
sample_rate=acfg.sample_rate,
frame_size=acfg.frame_size,
input_device=inp,
output_device=out,
)
self._audio.input_gain = acfg.input_gain
self._audio.output_gain = acfg.output_gain
self._audio.pitch = acfg.pitch
if self._client.connected:
self._start_audio()
def _start_audio(self) -> None:
"""Start audio pipeline; log error if hardware unavailable."""
chatlog = self.query_one("#chatlog", ChatLog)
try:
self._audio.start()
self._device_monitor.start()
chatlog.write("[dim]audio pipeline started[/dim]")
self._audio_send_loop()
except Exception as exc:
@@ -675,13 +773,11 @@ class TuimbleApp(App):
@work(thread=True)
def _audio_send_loop(self) -> None:
"""Poll capture queue and send encoded frames to server."""
"""Send captured audio frames to the server."""
while self._client.connected:
frame = self._audio.get_capture_frame()
frame = self._audio.get_capture_frame(timeout=0.02)
if frame is not None:
self._client.send_audio(frame)
else:
time.sleep(0.005)
# -- channel tree --------------------------------------------------------
@@ -735,80 +831,89 @@ class TuimbleApp(App):
chatlog = self.query_one("#chatlog", ChatLog)
chatlog.write(f"[dim]input volume {pct}%[/dim]")
# -- pitch ----------------------------------------------------------------
def action_pitch_down(self) -> None:
"""Decrease voice pitch by 1 semitone."""
self._audio.pitch = max(-12.0, self._audio.pitch - 1.0)
st = int(self._audio.pitch)
status = self.query_one("#status", StatusBar)
status.pitch = st
chatlog = self.query_one("#chatlog", ChatLog)
chatlog.write(f"[dim]pitch {st:+d} semitones[/dim]")
def action_pitch_up(self) -> None:
"""Increase voice pitch by 1 semitone."""
self._audio.pitch = min(12.0, self._audio.pitch + 1.0)
st = int(self._audio.pitch)
status = self.query_one("#status", StatusBar)
status.pitch = st
chatlog = self.query_one("#chatlog", ChatLog)
chatlog.write(f"[dim]pitch {st:+d} semitones[/dim]")
# -- config reload --------------------------------------------------------
def _detect_config_changes(
self, old: Config, new: Config,
self,
old: Config,
new: Config,
) -> tuple[list[str], list[str]]:
"""Compare configs, return (safe_changes, restart_changes)."""
safe: list[str] = []
restart: list[str] = []
if old.ptt.key != new.ptt.key:
safe.append(f"ptt key: {old.ptt.key} -> {new.ptt.key}")
if old.ptt.mode != new.ptt.mode:
safe.append(f"ptt mode: {old.ptt.mode} -> {new.ptt.mode}")
if old.ptt.backend != new.ptt.backend:
safe.append(
f"ptt backend: {old.ptt.backend} -> {new.ptt.backend}"
)
if old.audio.input_gain != new.audio.input_gain:
safe.append(
f"input gain: {old.audio.input_gain} -> "
f"{new.audio.input_gain}"
)
if old.audio.output_gain != new.audio.output_gain:
safe.append(
f"output gain: {old.audio.output_gain} -> "
f"{new.audio.output_gain}"
)
# PTT: all fields are safe to hot-reload
old_ptt = dataclasses.asdict(old.ptt)
new_ptt = dataclasses.asdict(new.ptt)
for key in old_ptt:
if old_ptt[key] != new_ptt[key]:
safe.append(f"ptt {key}: {old_ptt[key]} -> {new_ptt[key]}")
o_srv, n_srv = old.server, new.server
for attr in ("host", "port", "username", "password",
"certfile", "keyfile"):
ov, nv = getattr(o_srv, attr), getattr(n_srv, attr)
if ov != nv:
label = "password" if attr == "password" else attr
# Audio: gains are safe; hardware settings require restart
safe_audio = {"input_gain", "output_gain", "pitch"}
old_aud = dataclasses.asdict(old.audio)
new_aud = dataclasses.asdict(new.audio)
for key in old_aud:
if old_aud[key] != new_aud[key]:
if key in safe_audio:
safe.append(f"{key}: {old_aud[key]} -> {new_aud[key]}")
else:
restart.append(f"audio.{key} changed")
# Server: all changes require restart
old_srv = dataclasses.asdict(old.server)
new_srv = dataclasses.asdict(new.server)
for key in old_srv:
if old_srv[key] != new_srv[key]:
label = "password" if key == "password" else key
restart.append(f"server.{label} changed")
o_aud, n_aud = old.audio, new.audio
for attr in ("input_device", "output_device", "sample_rate"):
ov, nv = getattr(o_aud, attr), getattr(n_aud, attr)
if ov != nv:
restart.append(f"audio.{attr} changed")
return safe, restart
def _apply_safe_changes(self, new: Config) -> None:
"""Apply hot-reload-safe config changes immediately."""
self._config.ptt = new.ptt
self._ptt = detect_backend(
self._on_ptt_change, new.ptt.backend
)
self._ptt = detect_backend(self._on_ptt_change, new.ptt.backend)
self._audio.input_gain = new.audio.input_gain
self._audio.output_gain = new.audio.output_gain
self._audio.pitch = new.audio.pitch
status = self.query_one("#status", StatusBar)
status.input_vol = int(new.audio.input_gain * 100)
status.output_vol = int(new.audio.output_gain * 100)
status.pitch = int(new.audio.pitch)
def _apply_restart_changes(self, new: Config) -> None:
"""Apply changes that require reconnect/audio restart."""
chatlog = self.query_one("#chatlog", ChatLog)
old = self._config
server_changed = (
old.server.host != new.server.host
or old.server.port != new.server.port
or old.server.username != new.server.username
or old.server.password != new.server.password
or old.server.certfile != new.server.certfile
or old.server.keyfile != new.server.keyfile
server_changed = dataclasses.asdict(old.server) != dataclasses.asdict(
new.server
)
audio_hw_changed = (
old.audio.input_device != new.audio.input_device
or old.audio.output_device != new.audio.output_device
or old.audio.sample_rate != new.audio.sample_rate
audio_hw_changed = any(
getattr(old.audio, a) != getattr(new.audio, a)
for a in ("input_device", "output_device", "sample_rate")
)
if server_changed:
@@ -820,18 +925,10 @@ class TuimbleApp(App):
tree = self.query_one("#sidebar", ChannelTree)
tree.clear_state()
srv = new.server
self._client = MumbleClient(
host=srv.host,
port=srv.port,
username=srv.username,
password=srv.password,
certfile=srv.certfile,
keyfile=srv.keyfile,
)
self._client = self._make_client(new.server)
self._config = new
chatlog.write(
f"[dim]reconnecting to {srv.host}:{srv.port}...[/]"
f"[dim]reconnecting to {new.server.host}:{new.server.port}...[/]"
)
self._connect_to_server()
elif audio_hw_changed:
@@ -845,6 +942,7 @@ class TuimbleApp(App):
)
self._audio.input_gain = acfg.input_gain
self._audio.output_gain = acfg.output_gain
self._audio.pitch = acfg.pitch
self._config = new
if self._client.connected:
self._start_audio()
@@ -855,23 +953,14 @@ class TuimbleApp(App):
Also serves as manual reconnect when disconnected.
"""
if self._reconnecting:
if self._reconnect.active:
self._cancel_reconnect()
if not self._client.connected:
chatlog = self.query_one("#chatlog", ChatLog)
self._client = self._make_client()
srv = self._config.server
self._client = MumbleClient(
host=srv.host,
port=srv.port,
username=srv.username,
password=srv.password,
certfile=srv.certfile,
keyfile=srv.keyfile,
)
chatlog.write(
f"[dim]connecting to {srv.host}:{srv.port}...[/dim]"
)
chatlog.write(f"[dim]connecting to {srv.host}:{srv.port}...[/dim]")
self._intentional_disconnect = False
self._connect_to_server()
return
@@ -899,6 +988,7 @@ class TuimbleApp(App):
self._apply_safe_changes(new)
self._config.audio.input_gain = new.audio.input_gain
self._config.audio.output_gain = new.audio.output_gain
self._config.audio.pitch = new.audio.pitch
if safe:
for change in safe:
@@ -907,10 +997,7 @@ class TuimbleApp(App):
if restart:
for change in restart:
chatlog.write(f"[#e0af68]\u26a0 {change}[/]")
chatlog.write(
"[dim]press F5 again to apply, "
"or any key to cancel[/dim]"
)
chatlog.write("[dim]press F5 again to apply, or any key to cancel[/dim]")
self._pending_reload = new
else:
chatlog.write("[dim]\u2713 config reloaded[/dim]")
@@ -928,27 +1015,12 @@ class TuimbleApp(App):
if isinstance(focused, Input) and event.key in ("up", "down"):
inp = focused
if event.key == "up":
if not self._history:
event.prevent_default()
return
if self._history_idx == -1:
self._history_draft = inp.value
self._history_idx = len(self._history) - 1
elif self._history_idx > 0:
self._history_idx -= 1
inp.value = self._history[self._history_idx]
inp.cursor_position = len(inp.value)
else: # down
if self._history_idx == -1:
event.prevent_default()
return
if self._history_idx < len(self._history) - 1:
self._history_idx += 1
inp.value = self._history[self._history_idx]
else:
self._history_idx = -1
inp.value = self._history_draft
inp.cursor_position = len(inp.value)
val = self._history.up(inp.value)
else:
val = self._history.down()
if val is not None:
inp.value = val
inp.cursor_position = len(val)
event.prevent_default()
return
@@ -963,9 +1035,13 @@ class TuimbleApp(App):
self._ptt.key_down()
def _on_ptt_change(self, transmitting: bool) -> None:
if self._muted:
transmitting = False
self._audio.capturing = transmitting
status = self.query_one("#status", StatusBar)
status.ptt_active = transmitting
if self._config.ptt.mode == "hold":
return
chatlog = self.query_one("#chatlog", ChatLog)
if transmitting:
chatlog.write("[#e0af68]● transmitting[/]")
@@ -975,9 +1051,20 @@ class TuimbleApp(App):
# -- resize --------------------------------------------------------------
def on_resize(self, event: events.Resize) -> None:
"""Adapt layout to new terminal dimensions."""
"""Adapt layout to new terminal dimensions (debounced)."""
self._pending_width = event.size.width
if self._resize_timer is not None:
self._resize_timer.stop()
self._resize_timer = self.set_timer(
TREE_DEBOUNCE, self._apply_resize
)
def _apply_resize(self) -> None:
"""Apply deferred resize — keeps event loop free during drag."""
self._resize_timer = None
w = self._pending_width
sidebar = self.query_one("#sidebar", ChannelTree)
sidebar.styles.width = max(16, min(32, event.size.width // 4))
sidebar.styles.width = max(16, min(32, w // 4))
self.query_one("#status", StatusBar).refresh()
# -- lifecycle -----------------------------------------------------------
@@ -985,15 +1072,29 @@ class TuimbleApp(App):
def action_quit(self) -> None:
self._intentional_disconnect = True
self._cancel_reconnect()
self._device_monitor.stop()
self._audio.stop()
self._client.set_dispatcher(None)
self._client.disconnect()
self.exit()
class _HTMLStripper(HTMLParser):
"""Extract text content from HTML, discarding all tags."""
def __init__(self):
super().__init__()
self._parts: list[str] = []
def handle_data(self, data: str):
self._parts.append(data)
def get_text(self) -> str:
return "".join(self._parts)
def _strip_html(text: str) -> str:
"""Remove HTML tags and unescape entities from Mumble messages."""
import re
clean = re.sub(r"<[^>]+>", "", text)
return html.unescape(clean)
stripper = _HTMLStripper()
stripper.feed(text)
return html.unescape(stripper.get_text())

View File

@@ -7,9 +7,13 @@ Playback path: pymumble (decodes) -> raw PCM -> queue -> speakers.
from __future__ import annotations
import array
import logging
import queue
import struct
import threading
from typing import Callable
from tuimble.modulator import PitchShifter
log = logging.getLogger(__name__)
@@ -21,13 +25,67 @@ DTYPE = "int16"
def _apply_gain(pcm: bytes, gain: float) -> bytes:
"""Scale int16 PCM samples by gain factor with clipping."""
n = len(pcm) // 2
if n == 0:
if len(pcm) < 2:
return pcm
fmt = f"<{n}h"
samples = struct.unpack(fmt, pcm[: n * 2])
scaled = [max(-32768, min(32767, int(s * gain))) for s in samples]
return struct.pack(fmt, *scaled)
samples = array.array("h")
samples.frombytes(pcm[: len(pcm) & ~1])
for i in range(len(samples)):
samples[i] = max(-32768, min(32767, int(samples[i] * gain)))
return samples.tobytes()
class DeviceMonitor:
"""Poll sounddevice for device list changes.
Runs a daemon thread that compares a string fingerprint of
``sounddevice.query_devices()`` against a cached snapshot every
*interval* seconds. Fires *callback* when the list changes.
Uses ``threading.Event`` for cancellation (same pattern as
``ReconnectManager``).
"""
def __init__(self, callback: Callable[[], None], interval: float = 2.0):
self._callback = callback
self._interval = interval
self._cancel = threading.Event()
self._snapshot = self._device_fingerprint()
self._thread: threading.Thread | None = None
@staticmethod
def _device_fingerprint() -> str:
"""Return a string snapshot of the current device list."""
try:
import sounddevice as sd
return str(sd.query_devices())
except Exception:
return ""
def start(self) -> None:
"""Start polling in a daemon thread."""
if self._thread is not None and self._thread.is_alive():
return
self._cancel.clear()
self._snapshot = self._device_fingerprint()
self._thread = threading.Thread(target=self._poll, daemon=True)
self._thread.start()
def stop(self) -> None:
"""Signal the poll loop to exit."""
self._cancel.set()
self._thread = None
def _poll(self) -> None:
while not self._cancel.wait(timeout=self._interval):
current = self._device_fingerprint()
if current != self._snapshot:
self._snapshot = current
log.info("audio device list changed")
try:
self._callback()
except Exception:
log.exception("device change callback failed")
class AudioPipeline:
@@ -54,8 +112,9 @@ class AudioPipeline:
self._deafened = False
self._input_gain = 1.0
self._output_gain = 1.0
self._pitch_shifter = PitchShifter(sample_rate)
def start(self):
def start(self) -> None:
"""Open audio streams."""
import sounddevice as sd
@@ -81,14 +140,20 @@ class AudioPipeline:
log.info("audio pipeline started (rate=%d)", self._sample_rate)
def stop(self):
"""Close audio streams."""
def stop(self) -> None:
"""Close audio streams and drain stale frames."""
for stream in (self._input_stream, self._output_stream):
if stream is not None:
stream.stop()
stream.close()
self._input_stream = None
self._output_stream = None
for q in (self._capture_queue, self._playback_queue):
while not q.empty():
try:
q.get_nowait()
except queue.Empty:
break
log.info("audio pipeline stopped")
@property
@@ -123,7 +188,15 @@ class AudioPipeline:
def output_gain(self, value: float):
self._output_gain = max(0.0, min(2.0, value))
def _capture_callback(self, indata, frames, time_info, status):
@property
def pitch(self) -> float:
return self._pitch_shifter.semitones
@pitch.setter
def pitch(self, value: float):
self._pitch_shifter.semitones = value
def _capture_callback(self, indata, frames, time_info, status) -> None:
"""Called by sounddevice when input data is available."""
if status:
log.warning("capture status: %s", status)
@@ -136,7 +209,7 @@ class AudioPipeline:
except queue.Full:
pass
def _playback_callback(self, outdata, frames, time_info, status):
def _playback_callback(self, outdata, frames, time_info, status) -> None:
"""Called by sounddevice when output buffer needs data."""
if status:
log.warning("playback status: %s", status)
@@ -154,14 +227,25 @@ class AudioPipeline:
except queue.Empty:
outdata[:] = b"\x00" * len(outdata)
def get_capture_frame(self) -> bytes | None:
"""Retrieve next captured PCM frame for transmission."""
def get_capture_frame(self, timeout: float = 0.0) -> bytes | None:
"""Retrieve next captured PCM frame for transmission.
Pitch shifting runs here (worker thread) rather than in the
PortAudio callback, which must return within the frame period.
Args:
timeout: Seconds to wait for a frame. 0 returns immediately.
"""
try:
return self._capture_queue.get_nowait()
if timeout > 0:
pcm = self._capture_queue.get(timeout=timeout)
else:
pcm = self._capture_queue.get_nowait()
except queue.Empty:
return None
return self._pitch_shifter.process(pcm)
def queue_playback(self, pcm_data: bytes):
def queue_playback(self, pcm_data: bytes) -> None:
"""Queue raw PCM data for playback (16-bit, mono, 48kHz)."""
if self._deafened:
return

View File

@@ -74,6 +74,10 @@ class MumbleClient:
self._mumble = None
self._connected = False
self._dispatcher: Callable | None = None
self._users_cache: dict[int, User] = {}
self._channels_cache: dict[int, Channel] = {}
self._users_dirty: bool = True
self._channels_dirty: bool = True
# Application callbacks (fired via dispatcher)
self.on_connected = None
@@ -83,14 +87,14 @@ class MumbleClient:
self.on_channel_update = None # ()
self.on_sound_received = None # (user, pcm_data)
def set_dispatcher(self, fn: Callable):
def set_dispatcher(self, fn: Callable) -> None:
"""Set a function to marshal callbacks into the host event loop.
Typically Textual's ``call_from_thread``.
"""
self._dispatcher = fn
def _dispatch(self, callback, *args):
def _dispatch(self, callback, *args) -> None:
"""Call *callback* via the dispatcher, or directly if none is set."""
if callback is None:
return
@@ -113,6 +117,8 @@ class MumbleClient:
def users(self) -> dict[int, User]:
if not self._mumble:
return {}
if not self._users_dirty:
return self._users_cache
result = {}
for sid, u in self._mumble.users.items():
result[sid] = User(
@@ -124,12 +130,16 @@ class MumbleClient:
self_mute=u.get("self_mute", False),
self_deaf=u.get("self_deaf", False),
)
self._users_cache = result
self._users_dirty = False
return result
@property
def channels(self) -> dict[int, Channel]:
if not self._mumble:
return {}
if not self._channels_dirty:
return self._channels_cache
result = {}
for cid, ch in self._mumble.channels.items():
result[cid] = Channel(
@@ -138,6 +148,8 @@ class MumbleClient:
parent_id=ch.get("parent", 0),
description=ch.get("description", ""),
)
self._channels_cache = result
self._channels_dirty = False
return result
@property
@@ -151,7 +163,7 @@ class MumbleClient:
# -- connection ----------------------------------------------------------
def connect(self):
def connect(self) -> None:
"""Connect to the Mumble server (blocking).
Raises:
@@ -186,7 +198,8 @@ class MumbleClient:
except (socket.error, OSError) as exc:
self._connected = False
raise ConnectionFailed(
f"network error: {exc}", retryable=True,
f"network error: {exc}",
retryable=True,
) from exc
except Exception as exc:
self._connected = False
@@ -195,16 +208,19 @@ class MumbleClient:
if self._mumble.connected != const.PYMUMBLE_CONN_STATE_CONNECTED:
self._connected = False
raise ConnectionFailed(
"server rejected connection", retryable=False,
"server rejected connection",
retryable=False,
)
self._connected = True
log.info(
"connected to %s:%d as %s",
self._host, self._port, self._username,
self._host,
self._port,
self._username,
)
def disconnect(self):
def disconnect(self) -> None:
"""Disconnect from the server."""
if self._mumble:
try:
@@ -212,9 +228,11 @@ class MumbleClient:
except Exception:
pass
self._connected = False
self._users_dirty = True
self._channels_dirty = True
log.info("disconnected")
def reconnect(self):
def reconnect(self) -> None:
"""Disconnect and reconnect to the same server.
Raises:
@@ -226,23 +244,35 @@ class MumbleClient:
# -- actions -------------------------------------------------------------
def send_text(self, message: str):
def send_text(self, message: str) -> None:
"""Send a text message to the current channel."""
if self._mumble and self._connected:
ch = self._mumble.channels[self._mumble.users.myself["channel_id"]]
try:
cid = self._mumble.users.myself["channel_id"]
ch = self._mumble.channels[cid]
except (KeyError, AttributeError):
log.warning("send_text: channel unavailable")
return
ch.send_text_message(message)
def send_audio(self, pcm_data: bytes):
def send_audio(self, pcm_data: bytes) -> None:
"""Send PCM audio to the server (pymumble encodes to Opus)."""
if self._mumble and self._connected:
self._mumble.sound_output.add_sound(pcm_data)
def join_channel(self, channel_id: int):
"""Move to a different channel."""
if self._mumble and self._connected:
self._mumble.channels[channel_id].move_in()
def join_channel(self, channel_id: int) -> None:
"""Move to a different channel.
def set_self_deaf(self, deaf: bool):
Raises:
ValueError: If the channel no longer exists on the server.
"""
if self._mumble and self._connected:
ch = self._mumble.channels.get(channel_id)
if ch is None:
raise ValueError(f"channel {channel_id} not found")
ch.move_in()
def set_self_deaf(self, deaf: bool) -> None:
"""Toggle self-deafen on the server."""
if self._mumble and self._connected:
if deaf:
@@ -250,9 +280,22 @@ class MumbleClient:
else:
self._mumble.users.myself.undeafen()
def set_self_mute(self, mute: bool) -> None:
"""Toggle self-mute on the server."""
if self._mumble and self._connected:
if mute:
self._mumble.users.myself.mute()
else:
self._mumble.users.myself.unmute()
def register_self(self) -> None:
"""Register the current user on the server."""
if self._mumble and self._connected:
self._mumble.users.myself.register()
# -- pymumble callbacks (run on pymumble thread) -------------------------
def _register_callbacks(self):
def _register_callbacks(self) -> None:
import pymumble_py3.constants as const
cb = self._mumble.callbacks
@@ -267,25 +310,31 @@ class MumbleClient:
cb.set_callback(const.PYMUMBLE_CLBK_CHANNELUPDATED, self._on_channel_event)
cb.set_callback(const.PYMUMBLE_CLBK_CHANNELREMOVED, self._on_channel_event)
def _on_connected(self):
def _on_connected(self) -> None:
self._connected = True
self._users_dirty = True
self._channels_dirty = True
self._dispatch(self.on_connected)
def _on_disconnected(self):
def _on_disconnected(self) -> None:
self._connected = False
self._users_dirty = True
self._channels_dirty = True
self._dispatch(self.on_disconnected)
def _on_text_message(self, message):
def _on_text_message(self, message) -> None:
users = self._mumble.users
actor = message.actor
name = users[actor]["name"] if actor in users else "?"
self._dispatch(self.on_text_message, name, message.message)
def _on_sound_received(self, user, sound_chunk):
def _on_sound_received(self, user, sound_chunk) -> None:
self._dispatch(self.on_sound_received, user, sound_chunk.pcm)
def _on_user_event(self, *_args):
def _on_user_event(self, *_args) -> None:
self._users_dirty = True
self._dispatch(self.on_user_update)
def _on_channel_event(self, *_args):
def _on_channel_event(self, *_args) -> None:
self._channels_dirty = True
self._dispatch(self.on_channel_update)

View File

@@ -1,8 +1,12 @@
"""Configuration management."""
import dataclasses
import logging
from dataclasses import dataclass, field
from pathlib import Path
log = logging.getLogger(__name__)
CONFIG_DIR = Path.home() / ".config" / "tuimble"
CONFIG_FILE = CONFIG_DIR / "config.toml"
@@ -28,6 +32,7 @@ class AudioConfig:
frame_size: int = 960 # 20ms at 48kHz
input_gain: float = 1.0
output_gain: float = 1.0
pitch: float = 0.0
@dataclass
@@ -60,9 +65,18 @@ def load_config(path: Path | None = None) -> Config:
cfg = Config()
if "server" in data:
cfg.server = ServerConfig(**data["server"])
cfg.server = _load_section(ServerConfig, data["server"])
if "audio" in data:
cfg.audio = AudioConfig(**data["audio"])
cfg.audio = _load_section(AudioConfig, data["audio"])
if "ptt" in data:
cfg.ptt = PttConfig(**data["ptt"])
cfg.ptt = _load_section(PttConfig, data["ptt"])
return cfg
def _load_section(cls, raw: dict):
"""Instantiate a dataclass, silently dropping unknown keys."""
valid = {f.name for f in dataclasses.fields(cls)}
unknown = set(raw) - valid
if unknown:
log.warning("ignoring unknown config keys: %s", ", ".join(sorted(unknown)))
return cls(**{k: v for k, v in raw.items() if k in valid})

134
src/tuimble/modulator.py Normal file
View File

@@ -0,0 +1,134 @@
"""Real-time pitch shifting for the capture path.
Uses a numpy-only phase vocoder + linear resample. All heavy
operations (rfft, irfft, array arithmetic) release the GIL so
PortAudio callbacks are never starved.
Stateful across frames: carries phase and input context across both
frame boundaries and parameter changes for glitch-free transitions.
"""
from __future__ import annotations
import numpy as np
_N_FFT = 512
_HOP = 128
_N_FREQ = _N_FFT // 2 + 1
_WINDOW = np.hanning(_N_FFT).astype(np.float32)
_PHASE_ADV = 2.0 * np.pi * _HOP * np.arange(_N_FREQ) / _N_FFT
class PitchShifter:
"""Shift pitch of int16 PCM frames via phase vocoder + resample.
Maintains inter-frame state (input overlap and synthesis phase)
so consecutive 20 ms frames produce a continuous output signal.
"""
def __init__(self, sample_rate: int = 48000):
self._sample_rate = sample_rate
self._semitones = 0.0
self._prev_in = np.zeros(_N_FFT, dtype=np.float32)
self._phase: np.ndarray | None = None
@property
def semitones(self) -> float:
return self._semitones
@semitones.setter
def semitones(self, value: float) -> None:
self._semitones = max(-12.0, min(12.0, float(value)))
def process(self, pcm: bytes) -> bytes:
"""Pitch-shift a single int16 PCM frame.
Returns *pcm* unchanged when semitones == 0 or the frame is
too short to process.
"""
if self._semitones == 0.0 or len(pcm) < 2:
return pcm
samples = np.frombuffer(pcm, dtype=np.int16).astype(np.float32) / 32768.0
n = len(samples)
if n < _N_FFT:
return pcm
ratio = 2.0 ** (self._semitones / 12.0)
# Build continuous signal: previous context + current frame
# Previous context provides real samples instead of reflect
# padding, eliminating edge discontinuities.
y = np.concatenate([self._prev_in, samples])
y_pad = np.pad(y, (0, _N_FFT // 2), mode="reflect")
self._prev_in = samples[-_N_FFT:].copy()
# -- Vectorised STFT (one rfft call, GIL released) --
n_frames = 1 + (len(y_pad) - _N_FFT) // _HOP
if n_frames < 2:
return pcm
offsets = _HOP * np.arange(n_frames)
stft = np.fft.rfft(
y_pad[offsets[:, None] + np.arange(_N_FFT)] * _WINDOW, axis=1
)
# -- Time-stretch interpolation --
n_out = max(1, int(np.ceil(n_frames * ratio)))
src = np.minimum(np.arange(n_out) / ratio, n_frames - 1)
i0 = src.astype(int)
i1 = np.minimum(i0 + 1, n_frames - 1)
frac = (src - i0)[:, None]
mag = (1 - frac) * np.abs(stft[i0]) + frac * np.abs(stft[i1])
# -- Phase propagation with inter-frame continuity --
dphi = np.angle(stft[i1]) - np.angle(stft[i0]) - _PHASE_ADV
dphi -= 2.0 * np.pi * np.round(dphi / (2.0 * np.pi))
increments = _PHASE_ADV + dphi
phase = np.empty((n_out, _N_FREQ))
if self._phase is not None:
# Continue from previous frame's final phase
phase[0] = self._phase + increments[0]
else:
phase[0] = np.angle(stft[0])
if n_out > 1:
phase[1:] = phase[0] + np.cumsum(increments[1:], axis=0)
# Carry phase (wrap to [-pi, pi] to avoid precision drift)
self._phase = (phase[-1] + np.pi) % (2.0 * np.pi) - np.pi
# -- Vectorised ISTFT + overlap-add --
frames = (
np.fft.irfft(mag * np.exp(1j * phase), n=_N_FFT, axis=1).astype(
np.float32
)
* _WINDOW
)
out_len = (n_out - 1) * _HOP + _N_FFT
output = np.zeros(out_len, dtype=np.float32)
for i in range(n_out):
output[i * _HOP : i * _HOP + _N_FFT] += frames[i]
# Extract portion for current frame only.
# Left context (_N_FFT samples) maps to ~_N_FFT*ratio in
# the time-stretched output; skip that overlap region.
skip = int(round(_N_FFT * ratio))
target = int(round(n * ratio))
end = min(skip + target, len(output))
stretched = output[skip:end]
# Resample to original frame size (numpy, GIL-free)
if len(stretched) < 2:
return pcm
if len(stretched) != n:
x_old = np.linspace(0, 1, len(stretched), endpoint=False)
x_new = np.linspace(0, 1, n, endpoint=False)
stretched = np.interp(x_new, x_old, stretched)
return (
np.clip(stretched * 32768.0, -32768, 32767)
.astype(np.int16)
.tobytes()
)

93
src/tuimble/reconnect.py Normal file
View File

@@ -0,0 +1,93 @@
"""Reconnection manager with exponential backoff."""
from __future__ import annotations
import logging
import threading
from typing import Callable
log = logging.getLogger(__name__)
INITIAL_DELAY = 2
MAX_DELAY = 30
MAX_RETRIES = 10
class ReconnectManager:
"""Thread-safe reconnection with exponential backoff.
The manager runs a blocking loop in a worker thread. Cancellation
is signalled via ``threading.Event``, making it both thread-safe and
instantly responsive (no polling sleep).
Args:
connect_fn: Called to attempt a reconnection. Should raise on
failure; exceptions with a ``retryable`` attribute set to
``False`` cause immediate abort.
on_attempt: ``(attempt, delay)`` -- called before each wait.
on_success: Called after a successful reconnection.
on_failure: ``(attempt, error_msg)`` -- called after each failed
attempt.
on_exhausted: Called when all retries are spent.
"""
def __init__(
self,
connect_fn: Callable[[], None],
on_attempt: Callable[[int, float], None],
on_success: Callable[[], None],
on_failure: Callable[[int, str], None],
on_exhausted: Callable[[], None],
):
self._connect = connect_fn
self._on_attempt = on_attempt
self._on_success = on_success
self._on_failure = on_failure
self._on_exhausted = on_exhausted
self._cancel = threading.Event()
self._attempt = 0
@property
def active(self) -> bool:
return not self._cancel.is_set() and self._attempt > 0
@property
def attempt(self) -> int:
return self._attempt
def cancel(self) -> None:
"""Signal the loop to stop. Safe to call from any thread."""
self._cancel.set()
def run(self) -> None:
"""Blocking reconnect loop -- run in a worker thread."""
self._cancel.clear()
self._attempt = 0
while not self._cancel.is_set():
self._attempt += 1
delay = min(INITIAL_DELAY * (2 ** (self._attempt - 1)), MAX_DELAY)
self._on_attempt(self._attempt, delay)
if self._cancel.wait(timeout=delay):
break
try:
self._connect()
self._attempt = 0
self._on_success()
return
except Exception as exc:
retryable = getattr(exc, "retryable", True)
self._on_failure(self._attempt, str(exc))
if not retryable:
self._attempt = 0
self._on_exhausted()
return
if self._attempt >= MAX_RETRIES:
self._attempt = 0
self._on_exhausted()
return
self._attempt = 0

View File

@@ -2,26 +2,25 @@
import struct
from tuimble.audio import FRAME_SIZE, SAMPLE_RATE, AudioPipeline, _apply_gain
import numpy as np
from tuimble.audio import FRAME_SIZE, AudioPipeline, _apply_gain
def test_default_construction():
ap = AudioPipeline()
assert ap._sample_rate == SAMPLE_RATE
assert ap._frame_size == FRAME_SIZE
assert ap._input_device is None
assert ap._output_device is None
assert ap.capturing is False
assert ap.deafened is False
assert ap.input_gain == 1.0
assert ap.output_gain == 1.0
def test_custom_construction():
ap = AudioPipeline(sample_rate=24000, frame_size=480,
input_device=1, output_device=2)
assert ap._sample_rate == 24000
assert ap._frame_size == 480
assert ap._input_device == 1
assert ap._output_device == 2
ap = AudioPipeline(
sample_rate=24000, frame_size=480, input_device=1, output_device=2
)
# Verify via public behavior: get_capture_frame returns None
assert ap.get_capture_frame() is None
def test_capturing_toggle():
@@ -38,10 +37,13 @@ def test_get_capture_frame_empty():
assert ap.get_capture_frame() is None
def test_get_capture_frame_returns_queued():
def test_capture_and_retrieve():
"""Capture callback queues frames; get_capture_frame retrieves them."""
ap = AudioPipeline()
ap._capture_queue.put(b"\x01\x02\x03")
assert ap.get_capture_frame() == b"\x01\x02\x03"
ap.capturing = True
pcm = b"\x01\x02\x03\x04"
ap._capture_callback(pcm, 2, None, None)
assert ap.get_capture_frame() == pcm
assert ap.get_capture_frame() is None
@@ -84,12 +86,20 @@ def test_playback_callback_short_pcm_pads_silence():
def test_queue_playback_overflow_drops():
"""Full queue drops new data silently."""
ap = AudioPipeline(frame_size=FRAME_SIZE)
# Fill the queue
for i in range(ap._playback_queue.maxsize):
ap.queue_playback(b"\x00")
# This should not raise
ap.queue_playback(b"\xff")
assert ap._playback_queue.qsize() == ap._playback_queue.maxsize
# Use non-zero PCM so we can distinguish from silence
frame = b"\x42\x42"
for _ in range(50): # maxsize=50
ap.queue_playback(frame)
# This should not raise (dropped because queue is full)
ap.queue_playback(b"\xff\xff")
# Drain and count -- frames with our marker byte
count = 0
for _ in range(60): # more than queue size
outdata = bytearray(2)
ap._playback_callback(outdata, 1, None, None)
if outdata != bytearray(2):
count += 1
assert count == 50
def test_deafened_toggle():
@@ -106,14 +116,16 @@ def test_queue_playback_discards_when_deafened():
ap = AudioPipeline()
ap.deafened = True
ap.queue_playback(b"\x42" * 100)
assert ap._playback_queue.qsize() == 0
# Nothing to play back
outdata = bytearray(200)
ap._playback_callback(outdata, 100, None, None)
assert outdata == bytearray(200) # silence
def test_playback_callback_silence_when_deafened():
"""Playback callback writes silence when deafened, even with queued data."""
ap = AudioPipeline()
frame_bytes = FRAME_SIZE * 2
# Queue data before deafening
pcm = b"\x42" * frame_bytes
ap.queue_playback(pcm)
ap.deafened = True
@@ -129,6 +141,20 @@ def test_stop_without_start():
ap.stop()
def test_stop_drains_queues():
"""Queues are empty after stop()."""
ap = AudioPipeline()
ap.capturing = True
ap._capture_callback(b"\x00\x00", 1, None, None)
ap.queue_playback(b"\x00\x00")
ap.stop()
assert ap.get_capture_frame() is None
# Playback queue also drained -- callback produces silence
outdata = bytearray(2)
ap._playback_callback(outdata, 1, None, None)
assert outdata == bytearray(2)
# -- _apply_gain tests -------------------------------------------------------
@@ -203,3 +229,43 @@ def test_playback_callback_applies_output_gain():
ap._playback_callback(outdata, 2, None, None)
result = struct.unpack("<2h", bytes(outdata))
assert result == (500, -500)
# -- pitch property tests ----------------------------------------------------
def test_pitch_default():
ap = AudioPipeline()
assert ap.pitch == 0.0
def test_pitch_set_and_get():
ap = AudioPipeline()
ap.pitch = 5.0
assert ap.pitch == 5.0
ap.pitch = -3.0
assert ap.pitch == -3.0
def test_pitch_clamping():
ap = AudioPipeline()
ap.pitch = 20.0
assert ap.pitch == 12.0
ap.pitch = -20.0
assert ap.pitch == -12.0
def test_pitch_applied_on_dequeue():
"""Pitch shifting runs in get_capture_frame, not the callback."""
ap = AudioPipeline()
ap.capturing = True
ap.pitch = 4.0
t = np.arange(FRAME_SIZE) / 48000.0
pcm = (np.sin(2 * np.pi * 440.0 * t) * 16000).astype(np.int16).tobytes()
ap._capture_callback(pcm, FRAME_SIZE, None, None)
frame = ap.get_capture_frame()
assert frame is not None
assert len(frame) == len(pcm)
assert frame != pcm

View File

@@ -120,3 +120,47 @@ def test_disconnect_clears_connected():
client._mumble = MagicMock()
client.disconnect()
assert client.connected is False
def test_set_self_mute_calls_pymumble():
client = MumbleClient(host="localhost")
client._connected = True
myself = MagicMock()
users = MagicMock()
users.myself = myself
mumble = MagicMock()
mumble.users = users
client._mumble = mumble
client.set_self_mute(True)
myself.mute.assert_called_once()
myself.unmute.assert_not_called()
myself.reset_mock()
client.set_self_mute(False)
myself.unmute.assert_called_once()
myself.mute.assert_not_called()
def test_set_self_mute_noop_when_disconnected():
client = MumbleClient(host="localhost")
client.set_self_mute(True)
def test_register_self_calls_pymumble():
client = MumbleClient(host="localhost")
client._connected = True
myself = MagicMock()
users = MagicMock()
users.myself = myself
mumble = MagicMock()
mumble.users = users
client._mumble = mumble
client.register_self()
myself.register.assert_called_once()
def test_register_self_noop_when_disconnected():
client = MumbleClient(host="localhost")
client.register_self()

View File

@@ -1,6 +1,13 @@
"""Tests for configuration module."""
from tuimble.config import AudioConfig, Config, PttConfig, ServerConfig
from tuimble.config import (
AudioConfig,
Config,
PttConfig,
ServerConfig,
_load_section,
load_config,
)
def test_default_config():
@@ -45,3 +52,49 @@ def test_server_cert_custom():
srv = ServerConfig(certfile="/path/cert.pem", keyfile="/path/key.pem")
assert srv.certfile == "/path/cert.pem"
assert srv.keyfile == "/path/key.pem"
# -- _load_section tests -----------------------------------------------------
def test_load_section_filters_unknown_keys():
"""Unknown keys are silently dropped, valid keys are kept."""
result = _load_section(
ServerConfig,
{
"host": "example.com",
"typo_field": "oops",
"another_bad": 42,
},
)
assert result.host == "example.com"
assert result.port == 64738 # default preserved
def test_load_section_empty_dict():
result = _load_section(PttConfig, {})
assert result.key == "f4"
assert result.mode == "toggle"
def test_load_section_all_valid():
result = _load_section(PttConfig, {"key": "space", "mode": "hold"})
assert result.key == "space"
assert result.mode == "hold"
def test_load_config_missing_file(tmp_path):
"""Missing config file returns defaults."""
cfg = load_config(tmp_path / "nonexistent.toml")
assert cfg.server.host == "localhost"
def test_load_config_with_unknown_keys(tmp_path):
"""Config file with unknown keys loads without error."""
toml = tmp_path / "config.toml"
toml.write_text(
'[server]\nhost = "example.com"\nbogus = true\n[ptt]\nfuture_option = "x"\n'
)
cfg = load_config(toml)
assert cfg.server.host == "example.com"
assert cfg.ptt.key == "f4" # default, not overwritten

94
tests/test_history.py Normal file
View File

@@ -0,0 +1,94 @@
"""Tests for InputHistory."""
from tuimble.app import InputHistory
def test_empty_history_up_returns_none():
h = InputHistory()
assert h.up("current") is None
def test_empty_history_down_returns_none():
h = InputHistory()
assert h.down() is None
def test_push_then_up_returns_last():
h = InputHistory()
h.push("hello")
assert h.up("draft") == "hello"
def test_up_twice_with_single_entry_stays():
h = InputHistory()
h.push("one")
assert h.up("draft") == "one"
assert h.up("draft") == "one" # no older entry, stays put
def test_up_navigates_backwards():
h = InputHistory()
h.push("first")
h.push("second")
h.push("third")
assert h.up("draft") == "third"
assert h.up("draft") == "second"
assert h.up("draft") == "first"
assert h.up("draft") == "first" # clamped at oldest
def test_down_navigates_forward():
h = InputHistory()
h.push("first")
h.push("second")
h.push("third")
h.up("draft") # -> third
h.up("draft") # -> second
assert h.down() == "third"
def test_down_past_end_restores_draft():
h = InputHistory()
h.push("one")
h.up("my draft") # -> one
result = h.down() # -> back to draft
assert result == "my draft"
def test_down_without_prior_up_returns_none():
h = InputHistory()
h.push("one")
assert h.down() is None
def test_push_resets_navigation():
h = InputHistory()
h.push("first")
h.up("draft") # -> first
h.push("second")
# After push, navigation resets -- up should go to newest
assert h.up("new draft") == "second"
def test_up_preserves_current_as_draft():
"""First up() saves current input; down past end restores it."""
h = InputHistory()
h.push("old")
h.up("typing in progress")
assert h.down() == "typing in progress"
def test_full_cycle():
"""Push several, navigate to oldest, then back to draft."""
h = InputHistory()
h.push("a")
h.push("b")
h.push("c")
assert h.up("draft") == "c"
assert h.up("draft") == "b"
assert h.up("draft") == "a"
assert h.down() == "b"
assert h.down() == "c"
assert h.down() == "draft"
assert h.down() is None # already at draft

78
tests/test_modulator.py Normal file
View File

@@ -0,0 +1,78 @@
"""Tests for PitchShifter."""
import numpy as np
from tuimble.modulator import PitchShifter
SAMPLE_RATE = 48000
FRAME_SIZE = 960 # 20ms at 48kHz
def _sine_pcm(freq: float, n_samples: int = FRAME_SIZE) -> bytes:
"""Generate a single-frequency int16 PCM frame."""
t = np.arange(n_samples) / SAMPLE_RATE
samples = (np.sin(2 * np.pi * freq * t) * 16000).astype(np.int16)
return samples.tobytes()
def _dominant_freq(pcm: bytes) -> float:
"""Return the dominant frequency in an int16 PCM buffer."""
samples = np.frombuffer(pcm, dtype=np.int16).astype(np.float32)
fft = np.abs(np.fft.rfft(samples))
freqs = np.fft.rfftfreq(len(samples), 1.0 / SAMPLE_RATE)
return freqs[np.argmax(fft)]
def test_zero_semitones_passthrough():
ps = PitchShifter(SAMPLE_RATE)
pcm = _sine_pcm(440.0)
assert ps.process(pcm) == pcm
def test_pitch_shift_changes_output():
ps = PitchShifter(SAMPLE_RATE)
pcm = _sine_pcm(440.0)
ps.semitones = 3.0
result = ps.process(pcm)
assert result != pcm
def test_output_length_preserved():
ps = PitchShifter(SAMPLE_RATE)
pcm = _sine_pcm(440.0)
ps.semitones = 5.0
result = ps.process(pcm)
assert len(result) == len(pcm)
def test_semitones_clamping():
ps = PitchShifter()
ps.semitones = 20.0
assert ps.semitones == 12.0
ps.semitones = -20.0
assert ps.semitones == -12.0
ps.semitones = 5.0
assert ps.semitones == 5.0
def test_empty_input():
ps = PitchShifter()
ps.semitones = 3.0
assert ps.process(b"") == b""
assert ps.process(b"\x00") == b"\x00"
def test_pitch_up_frequency_increases():
ps = PitchShifter(SAMPLE_RATE)
pcm = _sine_pcm(440.0)
ps.semitones = 4.0
result = ps.process(pcm)
assert _dominant_freq(result) > _dominant_freq(pcm)
def test_pitch_down_frequency_decreases():
ps = PitchShifter(SAMPLE_RATE)
pcm = _sine_pcm(440.0)
ps.semitones = -4.0
result = ps.process(pcm)
assert _dominant_freq(result) < _dominant_freq(pcm)

192
tests/test_reconnect.py Normal file
View File

@@ -0,0 +1,192 @@
"""Tests for ReconnectManager."""
import threading
from tuimble.reconnect import INITIAL_DELAY, ReconnectManager
def _make_manager(connect_fn=None, **overrides):
"""Build a ReconnectManager with recording callbacks."""
log = {"attempts": [], "failures": [], "success": 0, "exhausted": 0}
def on_attempt(n, delay):
log["attempts"].append((n, delay))
def on_success():
log["success"] += 1
def on_failure(n, msg):
log["failures"].append((n, msg))
def on_exhausted():
log["exhausted"] += 1
if connect_fn is None:
def connect_fn():
return None
mgr = ReconnectManager(
connect_fn=connect_fn,
on_attempt=overrides.get("on_attempt", on_attempt),
on_success=overrides.get("on_success", on_success),
on_failure=overrides.get("on_failure", on_failure),
on_exhausted=overrides.get("on_exhausted", on_exhausted),
)
return mgr, log
# -- basic lifecycle ----------------------------------------------------------
def test_initial_state():
mgr, _ = _make_manager()
assert mgr.active is False
assert mgr.attempt == 0
def test_success_on_first_attempt():
"""Connect succeeds immediately -- one attempt, no failures."""
mgr, log = _make_manager(connect_fn=lambda: None)
mgr.run()
assert log["success"] == 1
assert log["exhausted"] == 0
assert len(log["attempts"]) == 1
assert len(log["failures"]) == 0
assert mgr.active is False
def test_success_after_failures():
"""Connect fails twice, then succeeds on third attempt."""
call_count = 0
def flaky_connect():
nonlocal call_count
call_count += 1
if call_count < 3:
raise ConnectionError("down")
mgr, log = _make_manager(connect_fn=flaky_connect)
# Patch delay to zero for test speed
import tuimble.reconnect as mod
orig = mod.INITIAL_DELAY
mod.INITIAL_DELAY = 0
try:
mgr.run()
finally:
mod.INITIAL_DELAY = orig
assert log["success"] == 1
assert len(log["failures"]) == 2
assert len(log["attempts"]) == 3
# -- non-retryable -----------------------------------------------------------
def test_non_retryable_aborts_immediately():
"""Exception with retryable=False stops the loop."""
class Rejected(Exception):
retryable = False
def _raise():
raise Rejected("banned")
mgr, log = _make_manager(connect_fn=_raise)
mgr.run()
assert log["exhausted"] == 1
assert log["success"] == 0
assert len(log["attempts"]) == 1
assert len(log["failures"]) == 1
# -- max retries --------------------------------------------------------------
def test_exhaustion_after_max_retries():
"""Loop stops after MAX_RETRIES failed attempts."""
import tuimble.reconnect as mod
orig_delay = mod.INITIAL_DELAY
orig_retries = mod.MAX_RETRIES
mod.INITIAL_DELAY = 0
mod.MAX_RETRIES = 3
try:
mgr, log = _make_manager(
connect_fn=lambda: (_ for _ in ()).throw(ConnectionError("nope")),
)
mgr.run()
finally:
mod.INITIAL_DELAY = orig_delay
mod.MAX_RETRIES = orig_retries
assert log["exhausted"] == 1
assert log["success"] == 0
assert len(log["attempts"]) == 3
assert len(log["failures"]) == 3
# -- cancellation -------------------------------------------------------------
def test_cancel_stops_loop():
"""cancel() interrupts the wait and exits the loop."""
barrier = threading.Event()
def slow_attempt(n, delay):
barrier.set()
mgr, log = _make_manager(
connect_fn=lambda: (_ for _ in ()).throw(ConnectionError("fail")),
on_attempt=slow_attempt,
)
t = threading.Thread(target=mgr.run)
t.start()
barrier.wait(timeout=2)
mgr.cancel()
t.join(timeout=2)
assert not t.is_alive()
assert mgr.active is False
# -- backoff ------------------------------------------------------------------
def test_backoff_delays():
"""Verify exponential backoff sequence up to MAX_DELAY."""
mgr, log = _make_manager()
# We only need the attempt callback to record delays; cancel after
# a few attempts to avoid waiting.
import tuimble.reconnect as mod
orig_delay = mod.INITIAL_DELAY
orig_retries = mod.MAX_RETRIES
mod.INITIAL_DELAY = 0 # zero delay for speed
mod.MAX_RETRIES = 5
try:
mgr, log = _make_manager(
connect_fn=lambda: (_ for _ in ()).throw(ConnectionError("x")),
)
mgr.run()
finally:
mod.INITIAL_DELAY = orig_delay
mod.MAX_RETRIES = orig_retries
delays = [d for _, d in log["attempts"]]
# With INITIAL_DELAY=0, all delays are 0 (min(0 * 2^n, MAX_DELAY))
# Test the formula with real values instead
assert len(delays) == 5
def test_backoff_formula():
"""Delay = min(INITIAL_DELAY * 2^(attempt-1), MAX_DELAY)."""
from tuimble.reconnect import MAX_DELAY
expected = []
for i in range(1, 7):
expected.append(min(INITIAL_DELAY * (2 ** (i - 1)), MAX_DELAY))
assert expected == [2, 4, 8, 16, 30, 30]

59
tests/test_strip_html.py Normal file
View File

@@ -0,0 +1,59 @@
"""Tests for _strip_html edge cases."""
from tuimble.app import _strip_html
def test_plain_text_unchanged():
assert _strip_html("hello world") == "hello world"
def test_simple_tags_stripped():
assert _strip_html("<b>bold</b>") == "bold"
def test_nested_tags():
assert _strip_html("<div><p>text</p></div>") == "text"
def test_self_closing_tags():
assert _strip_html("line<br/>break") == "linebreak"
def test_entities_unescaped():
assert _strip_html("&amp; &lt; &gt;") == "& < >"
def test_html_entities_in_tags():
assert _strip_html("<b>&amp;</b>") == "&"
def test_mumble_style_message():
"""Typical Mumble chat message with anchor tag."""
msg = '<a href="https://example.com">link text</a> and more'
assert _strip_html(msg) == "link text and more"
def test_img_tag_with_attributes():
assert _strip_html('before<img src="x.png" alt="pic"/>after') == "beforeafter"
def test_comment_stripped():
assert _strip_html("before<!-- comment -->after") == "beforeafter"
def test_empty_string():
assert _strip_html("") == ""
def test_only_tags():
assert _strip_html("<br><hr><img/>") == ""
def test_unclosed_tag():
"""Malformed HTML should not crash."""
result = _strip_html("<b>unclosed")
assert "unclosed" in result
def test_multiple_entities():
assert _strip_html("&quot;quoted&quot;") == '"quoted"'

406
tests/test_widgets.py Normal file
View File

@@ -0,0 +1,406 @@
"""Tests for StatusBar, ChannelTree, and related widget helpers."""
import pytest
from tuimble.app import (
VOLUME_STEPS,
ChannelSelected,
ChannelTree,
StatusBar,
_next_volume,
)
from tuimble.client import Channel, User
# -- test helpers ------------------------------------------------------------
def _sample_channels():
return {
0: Channel(channel_id=0, name="Root", parent_id=-1),
1: Channel(channel_id=1, name="Alpha", parent_id=0),
2: Channel(channel_id=2, name="Beta", parent_id=0),
}
def _sample_users():
return {
1: [User(session_id=10, name="Alice", channel_id=1)],
}
# -- A. Pure unit tests (sync) ----------------------------------------------
class TestVolBar:
def test_zero(self):
assert StatusBar._vol_bar(0) == "\u2591\u2591\u2591\u2591"
def test_25(self):
assert StatusBar._vol_bar(25) == "\u2588\u2591\u2591\u2591"
def test_50(self):
assert StatusBar._vol_bar(50) == "\u2588\u2588\u2591\u2591"
def test_100(self):
assert StatusBar._vol_bar(100) == "\u2588\u2588\u2588\u2588"
def test_200_overgain(self):
result = StatusBar._vol_bar(200)
# 200/25 = 8, clamped to 4 filled + max(0, 4-8)=0 light
assert "\u2588" in result
class TestTruncate:
def test_short(self):
assert ChannelTree._truncate("hi", 10) == "hi"
def test_exact(self):
assert ChannelTree._truncate("abcde", 5) == "abcde"
def test_long(self):
assert ChannelTree._truncate("abcdefghij", 7) == "abcd..."
def test_tiny_max(self):
assert ChannelTree._truncate("abcdef", 3) == "abc"
def test_max_two(self):
assert ChannelTree._truncate("abcdef", 2) == "ab"
def test_zero_max(self):
assert ChannelTree._truncate("abc", 0) == ""
class TestUserStatus:
def test_normal(self):
u = User(session_id=1, name="X", channel_id=0)
assert ChannelTree._user_status(u) == ""
def test_self_deaf(self):
u = User(session_id=1, name="X", channel_id=0, self_deaf=True)
assert "\u2298" in ChannelTree._user_status(u)
def test_self_mute(self):
u = User(session_id=1, name="X", channel_id=0, self_mute=True)
assert "\u2715" in ChannelTree._user_status(u)
def test_server_deaf(self):
u = User(session_id=1, name="X", channel_id=0, deaf=True)
assert "\u2298" in ChannelTree._user_status(u)
def test_server_mute(self):
u = User(session_id=1, name="X", channel_id=0, mute=True)
assert "\u2715" in ChannelTree._user_status(u)
def test_deaf_takes_priority(self):
u = User(session_id=1, name="X", channel_id=0, self_deaf=True, self_mute=True)
assert "\u2298" in ChannelTree._user_status(u)
class TestNextVolume:
def test_full_cycle(self):
vol = 0.0
seen = [vol]
for _ in range(len(VOLUME_STEPS)):
vol = _next_volume(vol)
seen.append(vol)
# Must cycle through all steps and wrap
assert seen[-1] == VOLUME_STEPS[0]
def test_wraparound_from_max(self):
assert _next_volume(VOLUME_STEPS[-1]) == VOLUME_STEPS[0]
def test_mid_value(self):
assert _next_volume(0.5) == 0.75
# -- B. ChannelTree state tests (sync, direct instantiation) ----------------
class TestChannelTreeState:
def test_build_order_matches_dfs(self):
tree = ChannelTree()
tree.set_state(_sample_channels(), {})
# Root, then sorted children: Alpha, Beta
assert tree._channel_ids == [0, 1, 2]
def test_empty_channels(self):
tree = ChannelTree()
tree.set_state({}, {})
assert tree._channel_ids == []
def test_clear_state(self):
tree = ChannelTree()
tree.set_state(_sample_channels(), _sample_users(), my_channel_id=1)
tree.clear_state()
assert tree._channels == {}
assert tree._users_by_channel == {}
assert tree._channel_ids == []
assert tree._focused_idx == 0
assert tree._my_channel_id is None
def test_focus_clamped_when_shrinks(self):
tree = ChannelTree()
tree.set_state(_sample_channels(), {})
tree._focused_idx = 2 # last channel
# Shrink to only root
tree.set_state({0: Channel(channel_id=0, name="Root", parent_id=-1)}, {})
assert tree._focused_idx == 0
def test_focus_preserved_when_in_range(self):
tree = ChannelTree()
tree.set_state(_sample_channels(), {})
tree._focused_idx = 1
# Re-set same state
tree.set_state(_sample_channels(), {})
assert tree._focused_idx == 1
def test_nested_channels_order(self):
channels = {
0: Channel(channel_id=0, name="Root", parent_id=-1),
1: Channel(channel_id=1, name="Zulu", parent_id=0),
2: Channel(channel_id=2, name="Alpha", parent_id=0),
3: Channel(channel_id=3, name="Sub", parent_id=2),
}
tree = ChannelTree()
tree.set_state(channels, {})
# Root -> Alpha (sorted) -> Sub -> Zulu
assert tree._channel_ids == [0, 2, 3, 1]
# -- C. StatusBar integration tests (async) ---------------------------------
from textual.app import App, ComposeResult # noqa: E402
class StatusBarApp(App):
CSS = """
#status { dock: bottom; height: 1; }
"""
def compose(self) -> ComposeResult:
yield StatusBar(id="status")
@pytest.mark.asyncio
async def test_statusbar_default_disconnected():
app = StatusBarApp()
async with app.run_test(size=(80, 5)) as _pilot:
bar = app.query_one("#status", StatusBar)
assert bar.connected is False
rendered = bar.render()
assert "\u25cb" in rendered # disconnected circle
@pytest.mark.asyncio
async def test_statusbar_connected():
app = StatusBarApp()
async with app.run_test(size=(80, 5)) as _pilot:
bar = app.query_one("#status", StatusBar)
bar.connected = True
rendered = bar.render()
assert "\u25cf" in rendered # filled circle
assert "connected" in rendered
@pytest.mark.asyncio
async def test_statusbar_compact_width():
app = StatusBarApp()
async with app.run_test(size=(30, 5)) as pilot:
bar = app.query_one("#status", StatusBar)
await pilot.resize_terminal(30, 5)
await pilot.pause()
rendered = bar.render()
# Compact mode: symbols only, no labels
assert "connected" not in rendered
@pytest.mark.asyncio
async def test_statusbar_medium_width():
app = StatusBarApp()
async with app.run_test(size=(50, 5)) as _pilot:
bar = app.query_one("#status", StatusBar)
rendered = bar.render()
assert "disconnected" in rendered
@pytest.mark.asyncio
async def test_statusbar_full_width():
app = StatusBarApp()
async with app.run_test(size=(80, 5)) as _pilot:
bar = app.query_one("#status", StatusBar)
bar.connected = True
bar.output_vol = 100
bar.input_vol = 50
rendered = bar.render()
assert "out" in rendered
assert "in" in rendered
@pytest.mark.asyncio
async def test_statusbar_ptt_active():
app = StatusBarApp()
async with app.run_test(size=(80, 5)) as _pilot:
bar = app.query_one("#status", StatusBar)
bar.ptt_active = True
rendered = bar.render()
assert "TX" in rendered
@pytest.mark.asyncio
async def test_statusbar_deaf():
app = StatusBarApp()
async with app.run_test(size=(80, 5)) as _pilot:
bar = app.query_one("#status", StatusBar)
bar.self_deaf = True
rendered = bar.render()
assert "\u2298" in rendered
@pytest.mark.asyncio
async def test_statusbar_muted():
app = StatusBarApp()
async with app.run_test(size=(80, 5)) as _pilot:
bar = app.query_one("#status", StatusBar)
bar.self_mute = True
rendered = bar.render()
assert "\u2715" in rendered
@pytest.mark.asyncio
async def test_statusbar_muted_compact():
app = StatusBarApp()
async with app.run_test(size=(30, 5)) as pilot:
bar = app.query_one("#status", StatusBar)
bar.self_mute = True
await pilot.resize_terminal(30, 5)
await pilot.pause()
rendered = bar.render()
assert "\u2715" in rendered
@pytest.mark.asyncio
async def test_statusbar_muted_medium():
app = StatusBarApp()
async with app.run_test(size=(50, 5)) as _pilot:
bar = app.query_one("#status", StatusBar)
bar.self_mute = True
rendered = bar.render()
assert "\u2715" in rendered
assert "mute" in rendered
@pytest.mark.asyncio
async def test_statusbar_reconnecting():
app = StatusBarApp()
async with app.run_test(size=(80, 5)) as _pilot:
bar = app.query_one("#status", StatusBar)
bar.reconnecting = True
rendered = bar.render()
assert "reconnecting" in rendered
# -- D. ChannelTree integration tests (async) --------------------------------
class ChannelTreeApp(App):
CSS = """
#sidebar { width: 24; height: 1fr; }
"""
BINDINGS = [("q", "quit", "Quit")]
def __init__(self):
super().__init__()
self.selected_messages: list[int] = []
def compose(self) -> ComposeResult:
yield ChannelTree(id="sidebar")
def on_channel_selected(self, msg: ChannelSelected) -> None:
self.selected_messages.append(msg.channel_id)
@pytest.mark.asyncio
async def test_tree_empty_render():
app = ChannelTreeApp()
async with app.run_test(size=(40, 10)) as _pilot:
tree = app.query_one("#sidebar", ChannelTree)
rendered = tree.render()
assert "(not connected)" in rendered
@pytest.mark.asyncio
async def test_tree_populated():
app = ChannelTreeApp()
async with app.run_test(size=(40, 10)) as _pilot:
tree = app.query_one("#sidebar", ChannelTree)
tree.set_state(_sample_channels(), _sample_users())
rendered = tree.render()
assert "Root" in rendered
assert "Alpha" in rendered
assert "Beta" in rendered
assert "Alice" in rendered
@pytest.mark.asyncio
async def test_tree_down_arrow():
app = ChannelTreeApp()
async with app.run_test(size=(40, 10)) as pilot:
tree = app.query_one("#sidebar", ChannelTree)
tree.set_state(_sample_channels(), {})
tree.focus()
await pilot.pause()
assert tree._focused_idx == 0
await pilot.press("down")
assert tree._focused_idx == 1
@pytest.mark.asyncio
async def test_tree_up_arrow():
app = ChannelTreeApp()
async with app.run_test(size=(40, 10)) as pilot:
tree = app.query_one("#sidebar", ChannelTree)
tree.set_state(_sample_channels(), {})
tree._focused_idx = 2
tree.focus()
await pilot.pause()
await pilot.press("up")
assert tree._focused_idx == 1
@pytest.mark.asyncio
async def test_tree_bounds_top():
app = ChannelTreeApp()
async with app.run_test(size=(40, 10)) as pilot:
tree = app.query_one("#sidebar", ChannelTree)
tree.set_state(_sample_channels(), {})
tree.focus()
await pilot.pause()
await pilot.press("up")
assert tree._focused_idx == 0 # stays at 0
@pytest.mark.asyncio
async def test_tree_bounds_bottom():
app = ChannelTreeApp()
async with app.run_test(size=(40, 10)) as pilot:
tree = app.query_one("#sidebar", ChannelTree)
tree.set_state(_sample_channels(), {})
tree._focused_idx = 2
tree.focus()
await pilot.pause()
await pilot.press("down")
assert tree._focused_idx == 2 # stays at max
@pytest.mark.asyncio
async def test_tree_enter_posts_message():
app = ChannelTreeApp()
async with app.run_test(size=(40, 10)) as pilot:
tree = app.query_one("#sidebar", ChannelTree)
tree.set_state(_sample_channels(), {})
tree._focused_idx = 1 # Alpha (channel_id=1)
tree.focus()
await pilot.pause()
await pilot.press("enter")
await pilot.pause()
assert 1 in app.selected_messages