Compare commits

...

53 Commits

Author SHA1 Message Date
Username
67467c846c fix: debounce on_resize to prevent audio stutter 2026-02-28 14:18:18 +01:00
Username
2e70e73086 docs: add slash commands section 2026-02-28 14:12:12 +01:00
Username
4bf8adc5e2 app: add slash commands 2026-02-28 14:11:51 +01:00
Username
15fbf0040a client: add set_self_mute and register_self 2026-02-28 14:04:17 +01:00
Username
2533e43391 docs: add pitch shifting documentation
F6/F7 keybindings, audio.pitch config option, voice pitch section
in USAGE.md.  Updated profiling notes to reflect per-thread coverage.
2026-02-28 13:55:46 +01:00
Username
f94f94907d test: add pitch shifting and modulator tests
Covers PitchShifter passthrough, frequency shift direction, output
length preservation, semitone clamping.  AudioPipeline tests verify
pitch property defaults, get/set, clamping, and dequeue integration.
2026-02-28 13:55:40 +01:00
Username
26695e6e70 feat: add voice pitch shifting
F6/F7 adjust outgoing voice pitch in 1-semitone steps (-12 to +12).
PitchShifter integrates into the capture path at dequeue time so the
PortAudio callback is never blocked.  Status bar shows current pitch
when non-zero.  Config reload treats pitch as a safe change.
2026-02-28 13:55:34 +01:00
Username
e8f34b4d80 profiler: add per-thread cProfile support
threading.setprofile installs a bootstrap that creates a per-thread
cProfile.Profile.  Stats from all threads are merged on periodic
dumps and at exit, capturing worker-thread hotspots (audio send
loop, pitch shifting).
2026-02-28 13:55:18 +01:00
Username
b62993459a modulator: remove state reset on pitch change
The semitones setter called _reset() which zeroed _prev_in and
discarded _phase on every pitch adjustment.  Both buffers are
independent of the pitch ratio — the vocoder recomputes time-stretch
positions each frame — so the reset produced hard discontinuities
(clicks) and a thread-safety race with no benefit.
2026-02-28 13:53:29 +01:00
Username
76c9c494a7 update roadmap and tasklist: phase 4 complete 2026-02-25 09:05:06 +01:00
Username
d9373f8a3b test: add widget unit and integration tests 2026-02-25 09:03:36 +01:00
Username
3dbd126239 app: wire audio device hot-swap on change 2026-02-25 00:26:27 +01:00
Username
9e6c11e588 audio: add DeviceMonitor for device list polling 2026-02-24 23:50:23 +01:00
Username
df6f2ff354 fix input alignment and double border in chat area 2026-02-24 17:03:05 +01:00
Username
85f373a8b5 fix lint and formatting violations in tests and source 2026-02-24 16:50:51 +01:00
Username
0f476a25d5 docs: add code review response with applied changes summary 2026-02-24 16:47:50 +01:00
Username
d4a8f34dac update tasklist: phase E complete 2026-02-24 16:46:03 +01:00
Username
aa17159f7e client: add return annotations and cache users/channels properties 2026-02-24 16:45:45 +01:00
Username
bbd28e2840 audio: add -> None return annotations 2026-02-24 16:45:41 +01:00
Username
0ae0e77814 app: deduplicate config detection, suppress hold-mode PTT spam 2026-02-24 16:45:30 +01:00
Username
c0be5f164e update tasklist: phase D complete 2026-02-24 16:38:31 +01:00
Username
d117576449 test: refactor audio tests to use public interfaces
Replace direct queue/attribute access with capture_callback and
get_capture_frame. Add stop-drains-queues test.
2026-02-24 16:38:03 +01:00
Username
65de74193a test: add _strip_html edge cases and config validation tests
strip_html: nested tags, malformed HTML, comments, entities.
config: unknown key filtering, missing file, mixed valid/invalid.
2026-02-24 16:38:02 +01:00
Username
7c57e03e6d test: add InputHistory unit tests
Covers empty history, push/up/down navigation, draft preservation,
boundary clamping, and full navigation cycle.
2026-02-24 16:37:59 +01:00
Username
4c1a545a8b test: add ReconnectManager unit tests
Covers success, retry, non-retryable abort, max-retry exhaustion,
cancel from another thread, and backoff formula.
2026-02-24 16:37:57 +01:00
Username
e2039558d7 update tasklist: phase C complete 2026-02-24 16:33:14 +01:00
Username
0cf3702c8f app: extract ReconnectManager to reconnect.py
Self-contained reconnection state machine with threading.Event
for instant, thread-safe cancellation. Removes ~50 lines and
all reconnect state from TuimbleApp.
2026-02-24 16:32:29 +01:00
Username
216a4be4fd app: extract InputHistory, _make_client factory, cache _find_root
InputHistory encapsulates history navigation (up/down/push).
_make_client() deduplicates 4 MumbleClient instantiation sites.
_find_root() result cached in set_state() to avoid double lookup.
2026-02-24 16:32:16 +01:00
Username
a6380b53f7 update tasklist: phase A and B complete 2026-02-24 16:26:02 +01:00
Username
bfa79eadcb app: replace audio send polling with blocking queue.get
Deterministic wake on data arrival instead of 5ms sleep loop.
Reduces CPU wake-ups and eliminates up to 5ms of added latency.
2026-02-24 16:25:42 +01:00
Username
7a2c8e3a5d audio: replace struct pack/unpack with array module in _apply_gain
Eliminates format string construction and intermediate tuple/list
allocations. Also drains stale frames from queues on stop().
2026-02-24 16:25:32 +01:00
Username
88e8d4d923 update tasklist with phased improvement plan 2026-02-24 16:23:11 +01:00
Username
897c5b1f6c client: guard join_channel and send_text against stale ids
join_channel raises ValueError on missing channel instead of
KeyError. send_text handles missing channel_id gracefully.
2026-02-24 16:23:05 +01:00
Username
44da57d084 config: filter unknown toml keys before dataclass init
Prevents opaque TypeError on typos in config.toml; unknown
keys are logged as warnings and silently dropped.
2026-02-24 16:23:04 +01:00
Username
8be475f23f app: replace regex html stripping with stdlib parser
Handles malformed tags, nested markup, and CDATA that the
naive regex missed.
2026-02-24 16:23:03 +01:00
Username
be6574ae79 app: debounce channel tree and cache render width 2026-02-24 15:32:57 +01:00
Username
57f4559a38 update roadmap and tasklist 2026-02-24 14:55:03 +01:00
Username
351b980b42 docs: add reconnection and error recovery docs 2026-02-24 14:54:40 +01:00
Username
6f590ede38 app: add reconnection status to status bar 2026-02-24 14:54:07 +01:00
Username
e443facd3b app: add auto-reconnect with backoff 2026-02-24 14:43:29 +01:00
Username
d02bb5239a app: stop audio on disconnect 2026-02-24 14:40:24 +01:00
Username
a041069cc9 client: add ConnectionFailed and reconnect method 2026-02-24 14:34:40 +01:00
Username
0b186f1f0c update roadmap and tasklist 2026-02-24 14:19:48 +01:00
Username
0b178d371e docs: add volume, certificate, and reload docs 2026-02-24 14:19:15 +01:00
Username
0bc41d1a46 app: add config reload on F5 2026-02-24 14:18:19 +01:00
Username
e9726da401 app: add volume key bindings and status display 2026-02-24 14:17:02 +01:00
Username
5e44ee9e38 app: wire volume and certificate config 2026-02-24 14:15:42 +01:00
Username
6467f5fe32 client: add certificate parameters 2026-02-24 14:15:24 +01:00
Username
e9944c88eb audio: add gain control to capture and playback 2026-02-24 14:14:52 +01:00
Username
eb98165370 config: add gain and certificate fields 2026-02-24 14:13:18 +01:00
Username
0856ab3c55 docs: update tasklist with completed items 2026-02-24 13:58:02 +01:00
Username
b852459eff feat: add channel navigation and user status indicators
Tab cycles focus to sidebar where Up/Down navigates channels
and Enter joins the selected one. Muted/deafened users show
status symbols in the channel tree.
2026-02-24 13:57:57 +01:00
Username
ec81fac507 docs: add key bindings and fix ptt key in readme 2026-02-24 13:31:56 +01:00
24 changed files with 3522 additions and 182 deletions

View File

@@ -8,6 +8,31 @@ TUI Mumble client with voice support and push-to-talk.
- Voice transmission with Opus codec
- Push-to-talk via Kitty keyboard protocol, evdev, or toggle
- Channel browsing and text chat
- Chat input history (Up/Down arrow navigation)
- Self-deafen toggle
- Volume control (input/output gain)
- Voice pitch shifting (deeper/higher)
- Client certificate authentication
- Auto-reconnect on network loss
- Config hot-reload (F5)
## Key Bindings
| Key | Action |
|-----|--------|
| `Tab` | Cycle focus (chat input / channel tree) |
| `F1` | Toggle self-deafen |
| `F2` | Cycle output volume |
| `F3` | Cycle input volume |
| `F4` | Push-to-talk (configurable) |
| `F5` | Reload config from disk |
| `F6` | Pitch down (1 semitone) |
| `F7` | Pitch up (1 semitone) |
| `Enter` | Send message / join channel (sidebar) |
| `Up` | Previous message (input) / previous channel (sidebar) |
| `Down` | Next message (input) / next channel (sidebar) |
| `q` | Quit |
| `Ctrl+C` | Quit |
## Quick Start
@@ -29,9 +54,16 @@ mkdir -p ~/.config/tuimble
host = "mumble.example.com"
port = 64738
username = "myname"
# certfile = "/path/to/cert.pem"
# keyfile = "/path/to/key.pem"
[audio]
# output_gain = 1.0
# input_gain = 1.0
# pitch = 0 # semitones, -12 to +12
[ptt]
key = "space"
key = "f4"
mode = "hold"
backend = "auto"
```

View File

@@ -18,15 +18,15 @@
## Phase 3 — Polish
- [x] Responsive terminal layout (adaptive sidebar, truncation, resize)
- [ ] Channel tree navigation
- [ ] User list with status indicators
- [ ] Volume control
- [ ] Server certificate handling
- [ ] Config file hot-reload
- [x] Channel tree navigation
- [x] User list with status indicators
- [x] Volume control
- [x] Server certificate handling
- [x] Config file hot-reload
## Phase 4 — Robustness
- [ ] Reconnection handling
- [ ] Error recovery
- [ ] Audio device hot-swap
- [ ] Comprehensive test suite
- [x] Reconnection handling
- [x] Error recovery
- [x] Audio device hot-swap
- [x] Comprehensive test suite

View File

@@ -1,14 +1,67 @@
# Task List
## In Progress
(none)
## Pending
(none)
## Completed
### Phase 4 -- Robustness
- [x] Audio device hot-swap (DeviceMonitor polling + pipeline rebuild)
- [x] Widget tests (41 tests: StatusBar breakpoints, ChannelTree navigation, volume/truncate helpers)
### Phase E -- Polish
- [x] Deduplicate `_detect_config_changes` with `dataclasses.asdict()`
- [x] Suppress PTT chatlog spam for hold-mode
- [x] Add `-> None` return annotations throughout
- [x] Cache `users`/`channels` in `MumbleClient` with dirty flag
### Phase D -- Test Coverage
- [x] Add `ReconnectManager` unit tests (8 tests)
- [x] Add `InputHistory` unit tests (11 tests)
- [x] Add `_strip_html` edge case tests (13 tests)
- [x] Add config `_load_section` / `load_config` tests (5 tests)
- [x] Refactor `test_audio.py` to use public interfaces
- [x] Add `stop()` queue drain test
### Phase C -- Architecture
- [x] Cache `_find_root()` result in `ChannelTree.set_state()`
- [x] Extract `InputHistory` class
- [x] Extract `_make_client()` factory (4 duplicate sites)
- [x] Extract `ReconnectManager` to `reconnect.py` (`threading.Event`)
### Phase A -- Safety
- [x] Replace `_strip_html` regex with `html.parser` stripper
- [x] Add defensive config loading (filter unknown TOML keys)
- [x] Guard `join_channel`/`send_text` against `KeyError`
### Phase B -- Performance
- [x] Replace `_apply_gain` struct with `array` module (numpy unavailable)
- [x] Replace `time.sleep(0.005)` polling with `queue.get(timeout=)`
- [x] Drain audio queues on `stop()`
### Earlier Work
- [x] Wire TUI to MumbleClient (connect on startup, display state)
- [x] Implement text message send/receive in chat log
- [x] Channel tree population from server data
- [x] Audio pipeline integration with MumbleClient
- [x] PTT wiring (key events -> audio.capturing toggle)
## Pending
- [ ] Channel join/navigation (select channel in tree, move into it)
- [ ] Reconnection handling on disconnect
- [x] Chat input history navigation (up/down arrows)
- [x] Channel navigation and join from sidebar
- [x] User status indicators (mute/deaf)
- [x] Volume control (F2/F3, gain 0.0-2.0, status bar display)
- [x] Server certificate handling (certfile/keyfile config)
- [x] Config file hot-reload (F5, safe/restart change detection)
- [x] Reconnection handling with auto-retry and backoff

View File

@@ -6,7 +6,13 @@ make run launch tuimble
make test run tests
make lint check code style
F1 toggle self-deafen
F2 cycle output volume
F3 cycle input volume
F4 push-to-talk
F5 reload config
Tab cycle focus
Enter send message / join channel
Up/Down history / channel navigation
q quit
space push-to-talk (hold)
Enter send message
```

View File

@@ -38,8 +38,46 @@ groups # should include 'input'
Add user to `input` group: `sudo usermod -aG input $USER`
### Certificate errors
If the server requires client certificates:
```sh
# Verify cert/key pair
openssl x509 -in cert.pem -noout -subject -dates
openssl rsa -in key.pem -check -noout
```
- Both `certfile` and `keyfile` must be PEM-encoded
- Paths must be absolute or relative to the working directory
- Key must not be passphrase-protected (pymumble limitation)
- If the server uses a self-signed CA, pymumble verifies by default
### Connection refused
- Verify server address and port
- Check firewall allows outbound TCP/UDP to port 64738
- Test with: `nc -zv <host> 64738`
### Connection drops / reconnection
When tuimble loses connection, it retries automatically with backoff.
The chatlog shows each attempt:
```
✗ disconnected from server
reconnecting in 2s (attempt 1/10)...
✗ attempt 1: network error: [Errno 111] Connection refused
reconnecting in 4s (attempt 2/10)...
✓ connected as myname
```
Common causes:
- **Server restart** — normal, reconnect succeeds when server is back
- **Network loss** — check connectivity, tuimble retries automatically
- **Auth timeout** — server may drop idle connections; reconnect handles this
- **"rejected" with no retry** — wrong password or certificate issue
If reconnection fails after 10 attempts, press `F5` to retry manually.
Update `config.toml` if server details changed.

View File

@@ -11,14 +11,30 @@ tuimble --host mumble.example.com --user myname
| Key | Action |
|-----|--------|
| `Tab` | Cycle focus (chat input / channel tree) |
| `F1` | Toggle self-deafen |
| `F2` | Cycle output volume |
| `F3` | Cycle input volume |
| `F4` | Push-to-talk (configurable) |
| `Enter` | Send message |
| `Up` | Previous sent message (in chat input) |
| `Down` | Next sent message (in chat input) |
| `F5` | Reload config from disk |
| `F6` | Pitch down (1 semitone) |
| `F7` | Pitch up (1 semitone) |
| `Enter` | Send message / join channel (sidebar) |
| `Up` | Previous message (input) / previous channel (sidebar) |
| `Down` | Next message (input) / next channel (sidebar) |
| `q` | Quit |
| `Ctrl+C` | Quit |
## Slash Commands
| Command | Action |
|---------|--------|
| `/help` | List available commands |
| `/deafen` | Toggle self-deafen |
| `/mute` | Toggle self-mute (suppresses PTT) |
| `/unmute` | Unmute yourself |
| `/register` | Register your identity on the server |
## Push-to-Talk Modes
- **toggle** — press to start, press again to stop (default)
@@ -41,8 +57,73 @@ python3 -m pstats /tmp/tuimble.prof # interactive explorer
# pip install snakeviz && snakeviz /tmp/tuimble.prof
```
Note: cProfile captures the main thread only. Background workers
started with `@work(thread=True)` are not included.
Profiling covers both the main thread and all worker threads
(including the audio send loop where pitch shifting runs).
## Volume Control
`F2` and `F3` cycle through volume steps: 0%, 25%, 50%, 75%, 100%,
125%, 150%, 200%, then back to 0%. Volume is shown in the status bar
as block indicators.
```toml
[audio]
output_gain = 0.75 # 0.0 to 2.0 (default 1.0)
input_gain = 1.5 # 0.0 to 2.0 (default 1.0)
```
## Voice Pitch
`F6` and `F7` adjust outgoing voice pitch in 1-semitone steps,
ranging from -12 to +12. Negative values make the voice deeper,
positive values make it higher. The current setting is shown in the
status bar when non-zero.
```toml
[audio]
pitch = 3 # semitones, -12 to +12 (default 0)
```
When pitch is 0, the processing step is skipped entirely (no CPU
overhead).
## Client Certificates
For servers requiring client certificate authentication:
```toml
[server]
certfile = "/path/to/client-cert.pem"
keyfile = "/path/to/client-key.pem"
```
Both must be PEM-encoded. Leave empty to connect without a certificate.
## Config Reload
`F5` reloads `~/.config/tuimble/config.toml` from disk.
**Safe changes** (applied immediately): PTT key/mode/backend, audio
gain values, pitch.
**Restart-requiring changes** (server settings, audio device/rate):
shown as warnings. Press `F5` again to confirm reconnect, or any
other key to cancel.
## Reconnection
On unexpected disconnect (server restart, network loss), tuimble
automatically retries with exponential backoff:
- Initial delay: 2 seconds, doubling each attempt (2, 4, 8, 16, 30...)
- Maximum delay: 30 seconds
- Maximum attempts: 10
The status bar shows `◐ reconnecting` during retry. If the server
rejects authentication, retries stop immediately.
After all attempts are exhausted, press `F5` to retry manually.
`F5` also cancels an in-progress reconnect and starts a fresh attempt.
## Configuration

View File

@@ -11,6 +11,7 @@ dependencies = [
"textual>=1.0.0",
"pymumble>=1.6",
"sounddevice>=0.5.0",
"numpy>=1.24.0",
"tomli>=2.0.0;python_version<'3.11'",
]
@@ -38,3 +39,4 @@ select = ["E", "F", "W", "I"]
[tool.pytest.ini_options]
testpaths = ["tests"]
asyncio_mode = "strict"

View File

@@ -0,0 +1,142 @@
# Code Review Response
**Original review:** `code-review-2026-02-24.md`
**Date applied:** 2026-02-24
**Commits:** 14 functional + 4 tasklist updates (be6574a..d4a8f34)
**Test suite:** 83 tests, all passing
---
## Summary
All 17 items in the priority matrix were evaluated. 15 were applied across five
phases (A--E). Two were deferred to the backlog with rationale.
### Disposition by severity
| Severity | Total | Applied | Deferred | Notes |
|----------|-------|---------|----------|--------------------------------|
| Critical | 4 | 3 | 1 | Widget tests deferred (backlog)|
| Important| 9 | 9 | 0 | |
| Nice | 4 | 3 | 1 | Frozen config deferred |
---
## Phase A -- Safety
| # | Item | Resolution |
|---|------|------------|
| 2 | Replace regex HTML stripping | Replaced with `html.parser.HTMLParser` subclass. Handles malformed tags, comments, nested markup, entities. 13 edge-case tests added. |
| 3 | Validate TOML config keys | Added `_load_section()` that filters unknown keys via `dataclasses.fields()` before construction. Logs warnings for typos. 5 tests added. |
| 10 | Guard `join_channel`/`send_text` | `join_channel` uses `.get()` + `ValueError`. `send_text` wraps lookup in `try/except (KeyError, AttributeError)`. |
**Commits:**
```
8be475f app: replace regex html stripping with stdlib parser
44da57d config: filter unknown toml keys before dataclass init
897c5b1 client: guard join_channel and send_text against stale ids
```
---
## Phase B -- Performance
| # | Item | Resolution |
|---|------|------------|
| 5 | Use numpy for `_apply_gain` | **Deviation:** numpy is NOT a transitive dep of sounddevice (only cffi). Used stdlib `array.array("h")` instead -- eliminates struct pack/unpack overhead without adding a dependency. |
| 6 | Replace polling sleep with `queue.get(timeout=)` | `get_capture_frame()` now accepts `timeout` param. Send loop uses `timeout=0.02`. No polling, instant wake on data. |
| 15 | Drain audio queues on `stop()` | Both capture and playback queues drained after stream close. Prevents stale frames leaking on restart. Test added. |
**Commits:**
```
7a2c8e3 audio: replace struct pack/unpack with array module in _apply_gain
bfa79ea app: replace audio send polling with blocking queue.get
```
---
## Phase C -- Architecture
| # | Item | Resolution |
|---|------|------------|
| 1 | Extract reconnect manager | Extracted to `src/tuimble/reconnect.py` (93 lines). Uses `threading.Event` for cancellation (item #7 resolved simultaneously). Instant cancel, no polling. |
| 7 | `threading.Event` for reconnect | Resolved as part of item #1 above. `Event.wait(timeout=delay)` replaces the 0.5s sleep loop. |
| 8 | Extract `_make_client()` factory | Single factory method replaces 4 duplicate 7-line instantiation blocks. |
| 9 | Compile `_strip_html` regex | Superseded by item #2 (HTMLParser replacement). No regex remains. |
| 13 | Extract `InputHistory` class | Extracted to standalone class in `app.py` (29 lines). Clean `push`/`up`/`down` interface. 11 tests added. |
| -- | Cache `_find_root()` | `ChannelTree.set_state()` now caches `_root_id`. Eliminates redundant traversal during render. |
**Commits:**
```
216a4be app: extract InputHistory, _make_client factory, cache _find_root
0cf3702 app: extract ReconnectManager to reconnect.py
```
---
## Phase D -- Test Coverage
| # | Item | Resolution |
|---|------|------------|
| 4 | Widget/app tests | **Partially addressed.** Extracted components (ReconnectManager, InputHistory, _strip_html, config loading) are now fully tested without the TUI harness. Widget-level tests (StatusBar breakpoints, ChannelTree navigation) deferred to backlog -- requires Textual `App.run_test()`. |
| -- | Refactor test_audio.py | Replaced private attribute access with public interface calls (`_capture_callback` + `get_capture_frame` instead of direct queue manipulation). |
**Test additions:**
- `test_reconnect.py` -- 8 tests (backoff formula, cancellation, exhaustion, non-retryable abort)
- `test_history.py` -- 11 tests (navigation, draft preservation, push reset, full cycle)
- `test_strip_html.py` -- 13 tests (nested tags, entities, comments, malformed HTML)
- `test_config.py` -- 5 new tests (_load_section filtering, missing file, unknown keys)
- `test_audio.py` -- 1 new test (stop drains queues), existing tests refactored
**Commits:**
```
4c1a545 test: add ReconnectManager unit tests
7c57e03 test: add InputHistory unit tests
65de741 test: add _strip_html edge cases and config validation tests
d117576 test: refactor audio tests to use public interfaces
```
---
## Phase E -- Polish
| # | Item | Resolution |
|---|------|------------|
| 12 | Deduplicate config change detection | `_detect_config_changes` and `_apply_restart_changes` now use `dataclasses.asdict()` for comparison. New config fields are automatically picked up. |
| 11 | Suppress PTT chatlog spam | `_on_ptt_change` skips chatlog writes when `ptt.mode == "hold"`. Status bar indicator still updates in real time. |
| 16 | Add `-> None` return annotations | Added to 21 methods across `client.py` and `audio.py`. |
| -- | Cache `users`/`channels` properties | Properties now cache with dirty flags. Invalidated on connect, disconnect, and user/channel event callbacks. |
**Commits:**
```
0ae0e77 app: deduplicate config detection, suppress hold-mode PTT spam
bbd28e2 audio: add -> None return annotations
aa17159 client: add return annotations and cache users/channels properties
```
---
## Deferred Items
| # | Item | Reason |
|---|------|--------|
| 4 | Widget tests (StatusBar, ChannelTree) | Requires Textual async test harness (`App.run_test()`). Extracted logic is tested; widget integration tests are in the backlog. |
| 14 | Inline callback wrappers with lambdas | Low priority. Current named methods aid debugging (stack traces show `_cb_connected` vs `<lambda>`). |
| 17 | Frozen dataclasses for Config | Requires refactoring `_apply_safe_changes` which mutates `self._config.ptt`. Scope exceeds incremental polish. |
---
## Metrics
| | Before | After | Delta |
|----------------|--------|--------|--------|
| Source LOC | ~1,750 | 1,866 | +116 |
| Test LOC | ~480 | 818 | +338 |
| Test count | 45 | 83 | +38 |
| Modules | 7 | 8 | +1 |
| `app.py` LOC | ~999 | 965 | -34 |
New module: `src/tuimble/reconnect.py` (93 lines).
Net source growth is primarily from the extracted reconnect module and cached
property logic in `client.py`. `app.py` shrank despite gaining the
`InputHistory` class and `dataclasses` import.

View File

@@ -0,0 +1,738 @@
# tuimble Code Review
**Date:** 2026-02-24
**Scope:** Full codebase review (1,750 LOC across 7 modules)
**Constraints:** No theme/layout redesign. Targeted, incremental improvements only.
---
## 1. Architecture & Design
### [CRITICAL] `app.py` is a 999-line monolith
`TuimbleApp` directly owns connection lifecycle, audio management, reconnection
logic, config reload state machine, input history, PTT wiring, and all UI
updates. This makes it hard to test, hard to reason about, and hard to extend.
**Specific extractions worth making:**
#### a) Reconnection manager
The reconnect loop, backoff logic, attempt counting, and cancellation form a
self-contained state machine currently scattered across `_reconnect_loop`,
`_log_reconnect`, `_on_reconnect_success`, `_on_reconnect_exhausted`,
`_cancel_reconnect`, plus three boolean flags (`_reconnecting`,
`_reconnect_attempt`, `_intentional_disconnect`).
```python
# src/tuimble/reconnect.py
from __future__ import annotations
import time
import logging
from dataclasses import dataclass
from typing import Callable
log = logging.getLogger(__name__)
INITIAL_DELAY = 2
MAX_DELAY = 30
MAX_RETRIES = 10
@dataclass
class ReconnectState:
active: bool = False
attempt: int = 0
intentional: bool = False
class ReconnectManager:
"""Exponential backoff reconnection with cancellation."""
def __init__(
self,
connect_fn: Callable[[], None],
on_attempt: Callable[[int, float], None],
on_success: Callable[[], None],
on_failure: Callable[[int, str], None],
on_exhausted: Callable[[], None],
):
self._connect = connect_fn
self._on_attempt = on_attempt
self._on_success = on_success
self._on_failure = on_failure
self._on_exhausted = on_exhausted
self.state = ReconnectState()
def cancel(self) -> None:
self.state.active = False
self.state.attempt = 0
@property
def delay(self) -> float:
return min(
INITIAL_DELAY * (2 ** (self.state.attempt - 1)),
MAX_DELAY,
)
def run(self) -> None:
"""Blocking reconnect loop -- run in a worker thread."""
self.state.active = True
self.state.attempt = 0
while self.state.active:
self.state.attempt += 1
d = self.delay
self._on_attempt(self.state.attempt, d)
elapsed = 0.0
while elapsed < d and self.state.active:
time.sleep(0.5)
elapsed += 0.5
if not self.state.active:
break
try:
self._connect()
self.state.active = False
self.state.attempt = 0
self._on_success()
return
except Exception as exc:
self._on_failure(self.state.attempt, str(exc))
if self.state.attempt >= MAX_RETRIES:
self.state.active = False
self._on_exhausted()
return
```
Removes ~80 lines from `app.py` and makes the backoff logic unit-testable
without a TUI.
#### b) Input history
The history navigation in `on_key` (lines 928-953) is a classic "small feature
that grew legs." Extract to a reusable class:
```python
class InputHistory:
def __init__(self):
self._entries: list[str] = []
self._idx: int = -1
self._draft: str = ""
def push(self, text: str) -> None:
self._entries.append(text)
self._idx = -1
def up(self, current: str) -> str | None:
if not self._entries:
return None
if self._idx == -1:
self._draft = current
self._idx = len(self._entries) - 1
elif self._idx > 0:
self._idx -= 1
return self._entries[self._idx]
def down(self) -> str | None:
if self._idx == -1:
return None
if self._idx < len(self._entries) - 1:
self._idx += 1
return self._entries[self._idx]
self._idx = -1
return self._draft
```
---
### [IMPORTANT] Thread safety gaps in `_reconnect_loop`
`self._reconnecting` and `self._reconnect_attempt` are plain booleans/ints
accessed from both the main thread (via `_cancel_reconnect`,
`action_reload_config`) and the worker thread (`_reconnect_loop`). Python's GIL
makes this "mostly safe" for simple booleans, but it's fragile and not
guaranteed for compound operations.
`app.py:486-488`:
```python
while self._reconnecting: # read on worker thread
self._reconnect_attempt += 1 # read-modify-write on worker thread
```
Meanwhile `_cancel_reconnect` at line 572 writes both from the main thread.
Use `threading.Event` for the cancellation flag:
```python
self._reconnect_cancel = threading.Event()
# in _reconnect_loop:
while not self._reconnect_cancel.is_set():
...
# interruptible sleep:
if self._reconnect_cancel.wait(timeout=delay):
break
# in _cancel_reconnect:
self._reconnect_cancel.set()
```
This also eliminates the 0.5s sleep polling loop (lines 496-499), making
cancellation instant instead of up to 500ms delayed.
---
### [IMPORTANT] `MumbleClient` creates `User`/`Channel` objects on every property access
`client.py:112-141` -- The `users` and `channels` properties rebuild full
dictionaries from pymumble's internal state on every call.
`_refresh_channel_tree` in `app.py:688-699` calls both, then iterates all
users. During a debounced state update, this means three full traversals.
Consider caching with a dirty flag set by callbacks:
```python
@property
def users(self) -> dict[int, User]:
if self._users_dirty or self._users_cache is None:
self._users_cache = self._build_users()
self._users_dirty = False
return self._users_cache
```
---
### [NICE] Callback wiring pattern
`client.py:79-84` uses bare `None`-able attributes for callbacks:
```python
self.on_connected = None
self.on_disconnected = None
```
This works but lacks type safety and discoverability. Consider a typed protocol
or simple typed attributes:
```python
from typing import Protocol
class ClientCallbacks(Protocol):
def on_connected(self) -> None: ...
def on_disconnected(self) -> None: ...
def on_text_message(self, sender: str, text: str) -> None: ...
```
Not urgent -- current approach is pragmatic for the project size.
---
## 2. TUI/UX Best Practices
### [IMPORTANT] `_strip_html` recompiles regex on every message
`app.py:994-999`:
```python
def _strip_html(text: str) -> str:
import re
clean = re.sub(r"<[^>]+>", "", text)
return html.unescape(clean)
```
The `re` import inside the function is fine (cached by Python), but the pattern
is recompiled on every call. Compile once at module level:
```python
import re
_HTML_TAG_RE = re.compile(r"<[^>]+>")
def _strip_html(text: str) -> str:
return html.unescape(_HTML_TAG_RE.sub("", text))
```
---
### [IMPORTANT] PTT state logging is noisy
`app.py:969-973` -- Every PTT toggle writes to the chatlog. For toggle mode
this is acceptable, but if someone later implements hold-mode (evdev),
pressing/releasing space would flood the log with "transmitting"/"stopped
transmitting" on every keypress.
Add a config option or suppress log for hold-mode, or use the status bar alone:
```python
def _on_ptt_change(self, transmitting: bool) -> None:
self._audio.capturing = transmitting
status = self.query_one("#status", StatusBar)
status.ptt_active = transmitting
# Only log for toggle mode; hold-mode updates too frequently
if self._config.ptt.mode == "toggle":
chatlog = self.query_one("#chatlog", ChatLog)
if transmitting:
chatlog.write("[#e0af68] transmitting[/]")
else:
chatlog.write("[dim] stopped transmitting[/dim]")
```
---
### [NICE] `on_resize` sets sidebar width but doesn't refresh the channel tree
`app.py:977-981` -- After resize, the sidebar width changes but
`ChannelTree.render()` uses `self.content_size.width` which may not update
until the next render pass. The tree truncation logic depends on width. Call
`tree.refresh()` explicitly:
```python
def on_resize(self, event: events.Resize) -> None:
sidebar = self.query_one("#sidebar", ChannelTree)
sidebar.styles.width = max(16, min(32, event.size.width // 4))
sidebar.refresh()
self.query_one("#status", StatusBar).refresh()
```
---
## 3. Code Quality
### [CRITICAL] `load_config` passes raw TOML dicts to dataclass constructors without validation
`config.py:62-67`:
```python
if "server" in data:
cfg.server = ServerConfig(**data["server"])
```
If the TOML file contains an unexpected key (`ServerConfig(host="x",
typo_field="y")`), this raises an opaque `TypeError`. If a value has the wrong
type (`port = "abc"`), it silently accepts it and fails later.
Add defensive loading:
```python
def _load_section(cls, data: dict, section: str):
"""Load a dataclass section, ignoring unknown keys."""
raw = data.get(section, {})
import dataclasses
valid_keys = {f.name for f in dataclasses.fields(cls)}
filtered = {k: v for k, v in raw.items() if k in valid_keys}
return cls(**filtered)
# Usage:
cfg.server = _load_section(ServerConfig, data, "server")
```
This prevents crashes from typos in config files -- critical for a user-facing
tool.
---
### [IMPORTANT] Duplicate logic for creating `MumbleClient`
`MumbleClient` is instantiated in four places with the same pattern:
1. `__init__` (line 395)
2. `_reconnect_loop` (line 507)
3. `action_reload_config` disconnected path (line 864)
4. `_apply_restart_changes` (line 824)
Extract a factory method:
```python
def _make_client(self, srv: ServerConfig | None = None) -> MumbleClient:
srv = srv or self._config.server
return MumbleClient(
host=srv.host,
port=srv.port,
username=srv.username,
password=srv.password,
certfile=srv.certfile,
keyfile=srv.keyfile,
)
```
---
### [IMPORTANT] `ChannelTree._find_root` is called twice per render
`render()` at line 223 calls `_find_root()`, and `_build_channel_order()` at
line 173 also calls `_find_root()`. Both execute during `set_state()` then
`render()`. Cache the root ID:
```python
def set_state(self, channels, users_by_channel, my_channel_id=None):
self._channels = channels
self._users_by_channel = users_by_channel
self._my_channel_id = my_channel_id
self._root_id = self._find_root() if channels else 0
self._channel_ids = self._build_channel_order()
# ...
```
---
### [IMPORTANT] `_detect_config_changes` duplicates field-level comparison
`app.py:740-780` manually compares each field. If you add a new server field,
you must remember to add it to both `_detect_config_changes` AND
`_apply_restart_changes`. Use `dataclasses.asdict()` for comparison:
```python
from dataclasses import asdict
def _detect_config_changes(self, old: Config, new: Config):
safe, restart = [], []
for attr in ("key", "mode", "backend"):
if getattr(old.ptt, attr) != getattr(new.ptt, attr):
safe.append(
f"ptt.{attr}: {getattr(old.ptt, attr)} -> "
f"{getattr(new.ptt, attr)}"
)
for attr in ("input_gain", "output_gain"):
if getattr(old.audio, attr) != getattr(new.audio, attr):
safe.append(
f"audio.{attr}: {getattr(old.audio, attr)} -> "
f"{getattr(new.audio, attr)}"
)
if asdict(old.server) != asdict(new.server):
for k in asdict(old.server):
if getattr(old.server, k) != getattr(new.server, k):
label = "password" if k == "password" else k
restart.append(f"server.{label} changed")
for attr in ("input_device", "output_device", "sample_rate"):
if getattr(old.audio, attr) != getattr(new.audio, attr):
restart.append(f"audio.{attr} changed")
return safe, restart
```
---
### [NICE] Type annotations missing on several methods
- `client.py:86` `set_dispatcher(self, fn: Callable)` -- should be
`Callable | None`
- `client.py:93` `_dispatch(self, callback, *args)` -- no return type, no
`callback` type
- `client.py:154` `connect(self)` -- no return type annotation
- `audio.py:58` `start(self)` -- no return type
Add `-> None` where appropriate. Minor but compounds into unclear interfaces.
---
## 4. Performance
### [IMPORTANT] `_apply_gain` uses Python-level sample iteration
`audio.py:22-30` -- Unpacking, scaling, and repacking every sample via a list
comprehension in Python is slow for real-time audio (960 samples = 1920 bytes
per 20ms frame):
```python
samples = struct.unpack(fmt, pcm[: n * 2])
scaled = [max(-32768, min(32767, int(s * gain))) for s in samples]
return struct.pack(fmt, *scaled)
```
Use numpy (already a transitive dependency via sounddevice):
```python
import numpy as np
def _apply_gain(pcm: bytes, gain: float) -> bytes:
if not pcm:
return pcm
samples = np.frombuffer(pcm, dtype=np.int16)
scaled = np.clip(samples * gain, -32768, 32767).astype(np.int16)
return scaled.tobytes()
```
This is ~50-100x faster for a 960-sample frame. Since sounddevice already
pulls in numpy, there's no new dependency.
---
### [IMPORTANT] `_audio_send_loop` polls with `time.sleep(0.005)`
`app.py:676-684`:
```python
while self._client.connected:
frame = self._audio.get_capture_frame()
if frame is not None:
self._client.send_audio(frame)
else:
time.sleep(0.005)
```
5ms polling means up to 5ms of added latency on every frame, plus unnecessary
CPU wake-ups when idle. Use `queue.Queue.get(timeout=...)` instead:
```python
def get_capture_frame(self, timeout: float = 0.02) -> bytes | None:
try:
return self._capture_queue.get(timeout=timeout)
except queue.Empty:
return None
```
Then the send loop becomes:
```python
while self._client.connected:
frame = self._audio.get_capture_frame(timeout=0.02)
if frame is not None:
self._client.send_audio(frame)
```
No polling, no wasted cycles, deterministic wake on data arrival.
---
### [NICE] `ChannelTree.render()` rebuilds the entire tree string every refresh
For small channel lists (typical Mumble servers have 5-30 channels), this is
fine. But `_render_tree` is recursive and allocates many string fragments. If
performance becomes an issue, cache the rendered string and invalidate on
`set_state()`.
---
## 5. Scalability & Extensibility
### [IMPORTANT] No structured event bus
All communication between components flows through Textual's message system,
which is good. But the `_cb_*` callback wrappers in `app.py:462-475` are thin
boilerplate. Consider a mapping-driven approach:
```python
def _wire_client_callbacks(self) -> None:
self._client.set_dispatcher(self.call_from_thread)
self._client.on_connected = lambda: self.post_message(ServerConnected())
self._client.on_disconnected = lambda: self.post_message(ServerDisconnected())
self._client.on_text_message = (
lambda s, t: self.post_message(TextMessageReceived(s, t))
)
self._client.on_user_update = lambda: self.post_message(ServerStateChanged())
self._client.on_channel_update = (
lambda: self.post_message(ServerStateChanged())
)
self._client.on_sound_received = (
lambda _u, pcm: self._audio.queue_playback(pcm)
)
```
This eliminates six one-line methods. The audio callback bypasses the message
system correctly (it's hot-path, no UI update needed).
---
### [NICE] `Config` is mutable
Config dataclasses are mutable, and `_apply_safe_changes` directly mutates
`self._config.ptt`. Consider frozen dataclasses with a `replace()` pattern to
prevent accidental state corruption:
```python
@dataclass(frozen=True)
class PttConfig:
key: str = "f4"
mode: str = "toggle"
backend: str = "auto"
```
Then: `self._config = dataclasses.replace(self._config, ptt=new.ptt)`
---
## 6. Testing
### [CRITICAL] No tests for `TuimbleApp` or any widget
The test suite covers `audio.py`, `client.py`, `ptt.py`, and `config.py` --
all non-UI modules. There are zero tests for:
- `StatusBar` rendering (connection states, volume bars, responsive breakpoints)
- `ChannelTree` rendering (tree structure, truncation, focus navigation)
- `TuimbleApp` message handling (connect/disconnect/state change flows)
- Config reload state machine
- Input history navigation
Textual provides `App.run_test()` for async widget testing:
```python
import pytest
from textual.testing import AppTest
@pytest.mark.asyncio
async def test_status_bar_connected():
app = TuimbleApp()
async with app.run_test(size=(80, 24)) as pilot:
status = app.query_one("#status", StatusBar)
status.connected = True
await pilot.pause()
rendered = status.render()
assert "connected" in rendered
```
**Priority tests to add:**
1. `StatusBar.render()` at each width breakpoint (< 40, < 60, >= 60)
2. `ChannelTree` keyboard navigation (up/down/enter)
3. `_strip_html` with edge cases (nested tags, malformed HTML, entities)
4. `_next_volume` wraparound behavior
5. Config reload with unknown TOML keys (crash test)
6. `_detect_config_changes` safe vs restart classification
---
### [IMPORTANT] Tests access private attributes
`test_audio.py` directly accesses `ap._capture_queue`, `ap._playback_queue`,
`ap._sample_rate`. This couples tests to implementation. Where possible, test
through public interfaces:
```python
# Instead of: ap._capture_queue.put(b"\x01\x02\x03")
# Use the actual capture callback:
ap.capturing = True
ap._capture_callback(b"\x01\x02\x03", 1, None, None)
assert ap.get_capture_frame() == b"\x01\x02\x03"
```
---
## 7. Security & Robustness
### [CRITICAL] `_strip_html` regex is insufficient for Mumble HTML
`app.py:996-999` uses `re.sub(r"<[^>]+>", "", text)` which fails on:
- Malformed tags: `<img src="x" onerror="alert(1)"` (no closing `>`)
- Nested markup: `<a href="<script>">`
- CDATA/comments: `<!-- <script> -->`
While Textual's Rich markup won't execute scripts, malformed input could break
Rich's parser or produce garbled output. Consider using `html.parser` from the
stdlib:
```python
from html.parser import HTMLParser
class _HTMLStripper(HTMLParser):
def __init__(self):
super().__init__()
self._parts: list[str] = []
def handle_data(self, data: str):
self._parts.append(data)
def get_text(self) -> str:
return "".join(self._parts)
def _strip_html(text: str) -> str:
stripper = _HTMLStripper()
stripper.feed(text)
return html.unescape(stripper.get_text())
```
---
### [IMPORTANT] `join_channel` doesn't handle missing channel IDs
`client.py:240-243`:
```python
def join_channel(self, channel_id: int):
if self._mumble and self._connected:
self._mumble.channels[channel_id].move_in()
```
If `channel_id` is stale (channel was removed between tree render and user
pressing Enter), this raises `KeyError`. Add a guard:
```python
def join_channel(self, channel_id: int):
if self._mumble and self._connected:
ch = self._mumble.channels.get(channel_id)
if ch is None:
raise ValueError(f"channel {channel_id} not found")
ch.move_in()
```
---
### [IMPORTANT] `send_text` silently fails on missing channel
`client.py:229-233`:
```python
def send_text(self, message: str):
if self._mumble and self._connected:
ch = self._mumble.channels[self._mumble.users.myself["channel_id"]]
ch.send_text_message(message)
```
If `myself` has no `channel_id` key (edge case during connection setup), this
crashes with `KeyError`. Guard with try/except or a `my_channel_id` check
first.
---
### [NICE] `audio.py` `stop()` doesn't drain queues
After `stop()`, the capture and playback queues still hold stale frames. If
`start()` is called again (audio device hot-swap), old frames leak into the new
session:
```python
def stop(self):
for stream in (self._input_stream, self._output_stream):
if stream is not None:
stream.stop()
stream.close()
self._input_stream = None
self._output_stream = None
# Drain stale frames
while not self._capture_queue.empty():
try:
self._capture_queue.get_nowait()
except queue.Empty:
break
while not self._playback_queue.empty():
try:
self._playback_queue.get_nowait()
except queue.Empty:
break
```
---
## Priority Matrix
| # | Area | Severity | Item |
|----|---------------|-----------|------------------------------------------------------|
| 1 | Architecture | Critical | Extract reconnect manager from app.py |
| 2 | Security | Critical | Replace regex HTML stripping with proper parser |
| 3 | Robustness | Critical | Validate TOML config keys before dataclass init |
| 4 | Testing | Critical | Add widget/app tests using Textual's test harness |
| 5 | Performance | Important | Use numpy for `_apply_gain` (transitive dep exists) |
| 6 | Performance | Important | Replace polling sleep with `queue.get(timeout=)` |
| 7 | Thread safety | Important | Use `threading.Event` for reconnect cancellation |
| 8 | Code quality | Important | Extract `_make_client()` factory (4 dup sites) |
| 9 | Code quality | Important | Compile `_strip_html` regex at module level |
| 10 | Robustness | Important | Guard `join_channel`/`send_text` against `KeyError` |
| 11 | UX | Important | Suppress PTT chatlog spam for hold-mode |
| 12 | Code quality | Important | Deduplicate config change detection logic |
| 13 | Architecture | Nice | Extract `InputHistory` class |
| 14 | Architecture | Nice | Inline callback wrappers with lambdas |
| 15 | Robustness | Nice | Drain audio queues on `stop()` |
| 16 | Code quality | Nice | Add return type annotations throughout |
| 17 | Scalability | Nice | Consider frozen dataclasses for Config |

View File

@@ -31,10 +31,17 @@ def main():
def _run_profiled(app, dest):
"""Run the app under cProfile with periodic 30s dumps."""
"""Run the app under cProfile with periodic 30s dumps.
Profiles both the main thread and all worker threads (including
the audio send loop where PitchShifter.process runs). Each
thread gets its own cProfile.Profile; stats are merged on dump.
"""
import cProfile
import pstats
import threading
from pathlib import Path
from threading import Event, Thread
from threading import Event, Lock, Thread
if dest is None:
from tuimble.config import CONFIG_DIR
@@ -47,10 +54,34 @@ def _run_profiled(app, dest):
prof = cProfile.Profile()
stop = Event()
thread_profiles: list[cProfile.Profile] = []
lock = Lock()
def _thread_bootstrap(frame, event, arg):
"""Installed via threading.setprofile; creates a per-thread profiler.
Called once per new thread as the profile function. Immediately
creates and enables a cProfile.Profile whose C-level hook
replaces this Python-level one for the remainder of the thread.
"""
tp = cProfile.Profile()
with lock:
thread_profiles.append(tp)
tp.enable()
threading.setprofile(_thread_bootstrap)
def _dump_all():
"""Merge main + thread profiles and write to disk."""
stats = pstats.Stats(prof)
with lock:
for tp in thread_profiles:
stats.add(tp)
stats.dump_stats(str(dest))
def _periodic_dump():
while not stop.wait(30):
prof.dump_stats(str(dest))
_dump_all()
dumper = Thread(target=_periodic_dump, daemon=True)
dumper.start()
@@ -60,8 +91,12 @@ def _run_profiled(app, dest):
app.run()
finally:
prof.disable()
threading.setprofile(None)
with lock:
for tp in thread_profiles:
tp.disable()
stop.set()
prof.dump_stats(str(dest))
_dump_all()
if __name__ == "__main__":

File diff suppressed because it is too large Load Diff

View File

@@ -7,8 +7,13 @@ Playback path: pymumble (decodes) -> raw PCM -> queue -> speakers.
from __future__ import annotations
import array
import logging
import queue
import threading
from typing import Callable
from tuimble.modulator import PitchShifter
log = logging.getLogger(__name__)
@@ -18,6 +23,71 @@ FRAME_SIZE = 960 # 20ms at 48kHz
DTYPE = "int16"
def _apply_gain(pcm: bytes, gain: float) -> bytes:
"""Scale int16 PCM samples by gain factor with clipping."""
if len(pcm) < 2:
return pcm
samples = array.array("h")
samples.frombytes(pcm[: len(pcm) & ~1])
for i in range(len(samples)):
samples[i] = max(-32768, min(32767, int(samples[i] * gain)))
return samples.tobytes()
class DeviceMonitor:
"""Poll sounddevice for device list changes.
Runs a daemon thread that compares a string fingerprint of
``sounddevice.query_devices()`` against a cached snapshot every
*interval* seconds. Fires *callback* when the list changes.
Uses ``threading.Event`` for cancellation (same pattern as
``ReconnectManager``).
"""
def __init__(self, callback: Callable[[], None], interval: float = 2.0):
self._callback = callback
self._interval = interval
self._cancel = threading.Event()
self._snapshot = self._device_fingerprint()
self._thread: threading.Thread | None = None
@staticmethod
def _device_fingerprint() -> str:
"""Return a string snapshot of the current device list."""
try:
import sounddevice as sd
return str(sd.query_devices())
except Exception:
return ""
def start(self) -> None:
"""Start polling in a daemon thread."""
if self._thread is not None and self._thread.is_alive():
return
self._cancel.clear()
self._snapshot = self._device_fingerprint()
self._thread = threading.Thread(target=self._poll, daemon=True)
self._thread.start()
def stop(self) -> None:
"""Signal the poll loop to exit."""
self._cancel.set()
self._thread = None
def _poll(self) -> None:
while not self._cancel.wait(timeout=self._interval):
current = self._device_fingerprint()
if current != self._snapshot:
self._snapshot = current
log.info("audio device list changed")
try:
self._callback()
except Exception:
log.exception("device change callback failed")
class AudioPipeline:
"""Manages audio input/output streams and Opus codec."""
@@ -40,8 +110,11 @@ class AudioPipeline:
self._output_stream = None
self._capturing = False
self._deafened = False
self._input_gain = 1.0
self._output_gain = 1.0
self._pitch_shifter = PitchShifter(sample_rate)
def start(self):
def start(self) -> None:
"""Open audio streams."""
import sounddevice as sd
@@ -67,14 +140,20 @@ class AudioPipeline:
log.info("audio pipeline started (rate=%d)", self._sample_rate)
def stop(self):
"""Close audio streams."""
def stop(self) -> None:
"""Close audio streams and drain stale frames."""
for stream in (self._input_stream, self._output_stream):
if stream is not None:
stream.stop()
stream.close()
self._input_stream = None
self._output_stream = None
for q in (self._capture_queue, self._playback_queue):
while not q.empty():
try:
q.get_nowait()
except queue.Empty:
break
log.info("audio pipeline stopped")
@property
@@ -93,17 +172,44 @@ class AudioPipeline:
def deafened(self, value: bool):
self._deafened = value
def _capture_callback(self, indata, frames, time_info, status):
@property
def input_gain(self) -> float:
return self._input_gain
@input_gain.setter
def input_gain(self, value: float):
self._input_gain = max(0.0, min(2.0, value))
@property
def output_gain(self) -> float:
return self._output_gain
@output_gain.setter
def output_gain(self, value: float):
self._output_gain = max(0.0, min(2.0, value))
@property
def pitch(self) -> float:
return self._pitch_shifter.semitones
@pitch.setter
def pitch(self, value: float):
self._pitch_shifter.semitones = value
def _capture_callback(self, indata, frames, time_info, status) -> None:
"""Called by sounddevice when input data is available."""
if status:
log.warning("capture status: %s", status)
if self._capturing:
try:
self._capture_queue.put_nowait(bytes(indata))
pcm = bytes(indata)
if self._input_gain != 1.0:
pcm = _apply_gain(pcm, self._input_gain)
self._capture_queue.put_nowait(pcm)
except queue.Full:
pass
def _playback_callback(self, outdata, frames, time_info, status):
def _playback_callback(self, outdata, frames, time_info, status) -> None:
"""Called by sounddevice when output buffer needs data."""
if status:
log.warning("playback status: %s", status)
@@ -112,6 +218,8 @@ class AudioPipeline:
return
try:
pcm = self._playback_queue.get_nowait()
if self._output_gain != 1.0:
pcm = _apply_gain(pcm, self._output_gain)
n = min(len(pcm), len(outdata))
outdata[:n] = pcm[:n]
if n < len(outdata):
@@ -119,14 +227,25 @@ class AudioPipeline:
except queue.Empty:
outdata[:] = b"\x00" * len(outdata)
def get_capture_frame(self) -> bytes | None:
"""Retrieve next captured PCM frame for transmission."""
def get_capture_frame(self, timeout: float = 0.0) -> bytes | None:
"""Retrieve next captured PCM frame for transmission.
Pitch shifting runs here (worker thread) rather than in the
PortAudio callback, which must return within the frame period.
Args:
timeout: Seconds to wait for a frame. 0 returns immediately.
"""
try:
return self._capture_queue.get_nowait()
if timeout > 0:
pcm = self._capture_queue.get(timeout=timeout)
else:
pcm = self._capture_queue.get_nowait()
except queue.Empty:
return None
return self._pitch_shifter.process(pcm)
def queue_playback(self, pcm_data: bytes):
def queue_playback(self, pcm_data: bytes) -> None:
"""Queue raw PCM data for playback (16-bit, mono, 48kHz)."""
if self._deafened:
return

View File

@@ -9,12 +9,26 @@ event loop (e.g. Textual's call_from_thread).
from __future__ import annotations
import logging
import socket
from dataclasses import dataclass
from typing import Callable
log = logging.getLogger(__name__)
class ConnectionFailed(Exception):
"""Connection attempt failed.
Attributes:
retryable: Whether the caller should attempt reconnection.
False for authentication rejections, True for network errors.
"""
def __init__(self, message: str, *, retryable: bool = True):
super().__init__(message)
self.retryable = retryable
@dataclass
class User:
session_id: int
@@ -48,14 +62,22 @@ class MumbleClient:
port: int = 64738,
username: str = "tuimble-user",
password: str = "",
certfile: str = "",
keyfile: str = "",
):
self._host = host
self._port = port
self._username = username
self._password = password
self._certfile = certfile
self._keyfile = keyfile
self._mumble = None
self._connected = False
self._dispatcher: Callable | None = None
self._users_cache: dict[int, User] = {}
self._channels_cache: dict[int, Channel] = {}
self._users_dirty: bool = True
self._channels_dirty: bool = True
# Application callbacks (fired via dispatcher)
self.on_connected = None
@@ -65,14 +87,14 @@ class MumbleClient:
self.on_channel_update = None # ()
self.on_sound_received = None # (user, pcm_data)
def set_dispatcher(self, fn: Callable):
def set_dispatcher(self, fn: Callable) -> None:
"""Set a function to marshal callbacks into the host event loop.
Typically Textual's ``call_from_thread``.
"""
self._dispatcher = fn
def _dispatch(self, callback, *args):
def _dispatch(self, callback, *args) -> None:
"""Call *callback* via the dispatcher, or directly if none is set."""
if callback is None:
return
@@ -95,6 +117,8 @@ class MumbleClient:
def users(self) -> dict[int, User]:
if not self._mumble:
return {}
if not self._users_dirty:
return self._users_cache
result = {}
for sid, u in self._mumble.users.items():
result[sid] = User(
@@ -106,12 +130,16 @@ class MumbleClient:
self_mute=u.get("self_mute", False),
self_deaf=u.get("self_deaf", False),
)
self._users_cache = result
self._users_dirty = False
return result
@property
def channels(self) -> dict[int, Channel]:
if not self._mumble:
return {}
if not self._channels_dirty:
return self._channels_cache
result = {}
for cid, ch in self._mumble.channels.items():
result[cid] = Channel(
@@ -120,6 +148,8 @@ class MumbleClient:
parent_id=ch.get("parent", 0),
description=ch.get("description", ""),
)
self._channels_cache = result
self._channels_dirty = False
return result
@property
@@ -133,27 +163,64 @@ class MumbleClient:
# -- connection ----------------------------------------------------------
def connect(self):
"""Connect to the Mumble server (blocking)."""
def connect(self) -> None:
"""Connect to the Mumble server (blocking).
Raises:
ConnectionFailed: On any connection failure. Check the
``retryable`` attribute to decide whether to retry.
"""
import pymumble_py3 as pymumble
import pymumble_py3.constants as const
self._mumble = pymumble.Mumble(
self._host,
self._username,
port=self._port,
password=self._password,
reconnect=False,
)
self._mumble.set_codec_profile("audio")
self._mumble.set_receive_sound(True)
self._register_callbacks()
kwargs = {
"port": self._port,
"password": self._password,
"reconnect": False,
}
if self._certfile:
kwargs["certfile"] = self._certfile
if self._keyfile:
kwargs["keyfile"] = self._keyfile
try:
self._mumble = pymumble.Mumble(
self._host,
self._username,
**kwargs,
)
self._mumble.set_codec_profile("audio")
self._mumble.set_receive_sound(True)
self._register_callbacks()
self._mumble.start()
self._mumble.is_ready() # blocks until handshake completes
except (socket.error, OSError) as exc:
self._connected = False
raise ConnectionFailed(
f"network error: {exc}",
retryable=True,
) from exc
except Exception as exc:
self._connected = False
raise ConnectionFailed(str(exc), retryable=True) from exc
if self._mumble.connected != const.PYMUMBLE_CONN_STATE_CONNECTED:
self._connected = False
raise ConnectionFailed(
"server rejected connection",
retryable=False,
)
self._mumble.start()
self._mumble.is_ready() # blocks until handshake completes
self._connected = True
log.info("connected to %s:%d as %s", self._host, self._port, self._username)
log.info(
"connected to %s:%d as %s",
self._host,
self._port,
self._username,
)
def disconnect(self):
def disconnect(self) -> None:
"""Disconnect from the server."""
if self._mumble:
try:
@@ -161,27 +228,51 @@ class MumbleClient:
except Exception:
pass
self._connected = False
self._users_dirty = True
self._channels_dirty = True
log.info("disconnected")
def reconnect(self) -> None:
"""Disconnect and reconnect to the same server.
Raises:
ConnectionFailed: On connection failure (see ``connect``).
"""
self.disconnect()
self._mumble = None
self.connect()
# -- actions -------------------------------------------------------------
def send_text(self, message: str):
def send_text(self, message: str) -> None:
"""Send a text message to the current channel."""
if self._mumble and self._connected:
ch = self._mumble.channels[self._mumble.users.myself["channel_id"]]
try:
cid = self._mumble.users.myself["channel_id"]
ch = self._mumble.channels[cid]
except (KeyError, AttributeError):
log.warning("send_text: channel unavailable")
return
ch.send_text_message(message)
def send_audio(self, pcm_data: bytes):
def send_audio(self, pcm_data: bytes) -> None:
"""Send PCM audio to the server (pymumble encodes to Opus)."""
if self._mumble and self._connected:
self._mumble.sound_output.add_sound(pcm_data)
def join_channel(self, channel_id: int):
"""Move to a different channel."""
if self._mumble and self._connected:
self._mumble.channels[channel_id].move_in()
def join_channel(self, channel_id: int) -> None:
"""Move to a different channel.
def set_self_deaf(self, deaf: bool):
Raises:
ValueError: If the channel no longer exists on the server.
"""
if self._mumble and self._connected:
ch = self._mumble.channels.get(channel_id)
if ch is None:
raise ValueError(f"channel {channel_id} not found")
ch.move_in()
def set_self_deaf(self, deaf: bool) -> None:
"""Toggle self-deafen on the server."""
if self._mumble and self._connected:
if deaf:
@@ -189,9 +280,22 @@ class MumbleClient:
else:
self._mumble.users.myself.undeafen()
def set_self_mute(self, mute: bool) -> None:
"""Toggle self-mute on the server."""
if self._mumble and self._connected:
if mute:
self._mumble.users.myself.mute()
else:
self._mumble.users.myself.unmute()
def register_self(self) -> None:
"""Register the current user on the server."""
if self._mumble and self._connected:
self._mumble.users.myself.register()
# -- pymumble callbacks (run on pymumble thread) -------------------------
def _register_callbacks(self):
def _register_callbacks(self) -> None:
import pymumble_py3.constants as const
cb = self._mumble.callbacks
@@ -206,25 +310,31 @@ class MumbleClient:
cb.set_callback(const.PYMUMBLE_CLBK_CHANNELUPDATED, self._on_channel_event)
cb.set_callback(const.PYMUMBLE_CLBK_CHANNELREMOVED, self._on_channel_event)
def _on_connected(self):
def _on_connected(self) -> None:
self._connected = True
self._users_dirty = True
self._channels_dirty = True
self._dispatch(self.on_connected)
def _on_disconnected(self):
def _on_disconnected(self) -> None:
self._connected = False
self._users_dirty = True
self._channels_dirty = True
self._dispatch(self.on_disconnected)
def _on_text_message(self, message):
def _on_text_message(self, message) -> None:
users = self._mumble.users
actor = message.actor
name = users[actor]["name"] if actor in users else "?"
self._dispatch(self.on_text_message, name, message.message)
def _on_sound_received(self, user, sound_chunk):
def _on_sound_received(self, user, sound_chunk) -> None:
self._dispatch(self.on_sound_received, user, sound_chunk.pcm)
def _on_user_event(self, *_args):
def _on_user_event(self, *_args) -> None:
self._users_dirty = True
self._dispatch(self.on_user_update)
def _on_channel_event(self, *_args):
def _on_channel_event(self, *_args) -> None:
self._channels_dirty = True
self._dispatch(self.on_channel_update)

View File

@@ -1,8 +1,12 @@
"""Configuration management."""
import dataclasses
import logging
from dataclasses import dataclass, field
from pathlib import Path
log = logging.getLogger(__name__)
CONFIG_DIR = Path.home() / ".config" / "tuimble"
CONFIG_FILE = CONFIG_DIR / "config.toml"
@@ -16,6 +20,8 @@ class ServerConfig:
username: str = "tuimble-user"
password: str = ""
channel: str = ""
certfile: str = ""
keyfile: str = ""
@dataclass
@@ -24,6 +30,9 @@ class AudioConfig:
output_device: int | None = None
sample_rate: int = 48000
frame_size: int = 960 # 20ms at 48kHz
input_gain: float = 1.0
output_gain: float = 1.0
pitch: float = 0.0
@dataclass
@@ -56,9 +65,18 @@ def load_config(path: Path | None = None) -> Config:
cfg = Config()
if "server" in data:
cfg.server = ServerConfig(**data["server"])
cfg.server = _load_section(ServerConfig, data["server"])
if "audio" in data:
cfg.audio = AudioConfig(**data["audio"])
cfg.audio = _load_section(AudioConfig, data["audio"])
if "ptt" in data:
cfg.ptt = PttConfig(**data["ptt"])
cfg.ptt = _load_section(PttConfig, data["ptt"])
return cfg
def _load_section(cls, raw: dict):
"""Instantiate a dataclass, silently dropping unknown keys."""
valid = {f.name for f in dataclasses.fields(cls)}
unknown = set(raw) - valid
if unknown:
log.warning("ignoring unknown config keys: %s", ", ".join(sorted(unknown)))
return cls(**{k: v for k, v in raw.items() if k in valid})

134
src/tuimble/modulator.py Normal file
View File

@@ -0,0 +1,134 @@
"""Real-time pitch shifting for the capture path.
Uses a numpy-only phase vocoder + linear resample. All heavy
operations (rfft, irfft, array arithmetic) release the GIL so
PortAudio callbacks are never starved.
Stateful across frames: carries phase and input context across both
frame boundaries and parameter changes for glitch-free transitions.
"""
from __future__ import annotations
import numpy as np
_N_FFT = 512
_HOP = 128
_N_FREQ = _N_FFT // 2 + 1
_WINDOW = np.hanning(_N_FFT).astype(np.float32)
_PHASE_ADV = 2.0 * np.pi * _HOP * np.arange(_N_FREQ) / _N_FFT
class PitchShifter:
"""Shift pitch of int16 PCM frames via phase vocoder + resample.
Maintains inter-frame state (input overlap and synthesis phase)
so consecutive 20 ms frames produce a continuous output signal.
"""
def __init__(self, sample_rate: int = 48000):
self._sample_rate = sample_rate
self._semitones = 0.0
self._prev_in = np.zeros(_N_FFT, dtype=np.float32)
self._phase: np.ndarray | None = None
@property
def semitones(self) -> float:
return self._semitones
@semitones.setter
def semitones(self, value: float) -> None:
self._semitones = max(-12.0, min(12.0, float(value)))
def process(self, pcm: bytes) -> bytes:
"""Pitch-shift a single int16 PCM frame.
Returns *pcm* unchanged when semitones == 0 or the frame is
too short to process.
"""
if self._semitones == 0.0 or len(pcm) < 2:
return pcm
samples = np.frombuffer(pcm, dtype=np.int16).astype(np.float32) / 32768.0
n = len(samples)
if n < _N_FFT:
return pcm
ratio = 2.0 ** (self._semitones / 12.0)
# Build continuous signal: previous context + current frame
# Previous context provides real samples instead of reflect
# padding, eliminating edge discontinuities.
y = np.concatenate([self._prev_in, samples])
y_pad = np.pad(y, (0, _N_FFT // 2), mode="reflect")
self._prev_in = samples[-_N_FFT:].copy()
# -- Vectorised STFT (one rfft call, GIL released) --
n_frames = 1 + (len(y_pad) - _N_FFT) // _HOP
if n_frames < 2:
return pcm
offsets = _HOP * np.arange(n_frames)
stft = np.fft.rfft(
y_pad[offsets[:, None] + np.arange(_N_FFT)] * _WINDOW, axis=1
)
# -- Time-stretch interpolation --
n_out = max(1, int(np.ceil(n_frames * ratio)))
src = np.minimum(np.arange(n_out) / ratio, n_frames - 1)
i0 = src.astype(int)
i1 = np.minimum(i0 + 1, n_frames - 1)
frac = (src - i0)[:, None]
mag = (1 - frac) * np.abs(stft[i0]) + frac * np.abs(stft[i1])
# -- Phase propagation with inter-frame continuity --
dphi = np.angle(stft[i1]) - np.angle(stft[i0]) - _PHASE_ADV
dphi -= 2.0 * np.pi * np.round(dphi / (2.0 * np.pi))
increments = _PHASE_ADV + dphi
phase = np.empty((n_out, _N_FREQ))
if self._phase is not None:
# Continue from previous frame's final phase
phase[0] = self._phase + increments[0]
else:
phase[0] = np.angle(stft[0])
if n_out > 1:
phase[1:] = phase[0] + np.cumsum(increments[1:], axis=0)
# Carry phase (wrap to [-pi, pi] to avoid precision drift)
self._phase = (phase[-1] + np.pi) % (2.0 * np.pi) - np.pi
# -- Vectorised ISTFT + overlap-add --
frames = (
np.fft.irfft(mag * np.exp(1j * phase), n=_N_FFT, axis=1).astype(
np.float32
)
* _WINDOW
)
out_len = (n_out - 1) * _HOP + _N_FFT
output = np.zeros(out_len, dtype=np.float32)
for i in range(n_out):
output[i * _HOP : i * _HOP + _N_FFT] += frames[i]
# Extract portion for current frame only.
# Left context (_N_FFT samples) maps to ~_N_FFT*ratio in
# the time-stretched output; skip that overlap region.
skip = int(round(_N_FFT * ratio))
target = int(round(n * ratio))
end = min(skip + target, len(output))
stretched = output[skip:end]
# Resample to original frame size (numpy, GIL-free)
if len(stretched) < 2:
return pcm
if len(stretched) != n:
x_old = np.linspace(0, 1, len(stretched), endpoint=False)
x_new = np.linspace(0, 1, n, endpoint=False)
stretched = np.interp(x_new, x_old, stretched)
return (
np.clip(stretched * 32768.0, -32768, 32767)
.astype(np.int16)
.tobytes()
)

93
src/tuimble/reconnect.py Normal file
View File

@@ -0,0 +1,93 @@
"""Reconnection manager with exponential backoff."""
from __future__ import annotations
import logging
import threading
from typing import Callable
log = logging.getLogger(__name__)
INITIAL_DELAY = 2
MAX_DELAY = 30
MAX_RETRIES = 10
class ReconnectManager:
"""Thread-safe reconnection with exponential backoff.
The manager runs a blocking loop in a worker thread. Cancellation
is signalled via ``threading.Event``, making it both thread-safe and
instantly responsive (no polling sleep).
Args:
connect_fn: Called to attempt a reconnection. Should raise on
failure; exceptions with a ``retryable`` attribute set to
``False`` cause immediate abort.
on_attempt: ``(attempt, delay)`` -- called before each wait.
on_success: Called after a successful reconnection.
on_failure: ``(attempt, error_msg)`` -- called after each failed
attempt.
on_exhausted: Called when all retries are spent.
"""
def __init__(
self,
connect_fn: Callable[[], None],
on_attempt: Callable[[int, float], None],
on_success: Callable[[], None],
on_failure: Callable[[int, str], None],
on_exhausted: Callable[[], None],
):
self._connect = connect_fn
self._on_attempt = on_attempt
self._on_success = on_success
self._on_failure = on_failure
self._on_exhausted = on_exhausted
self._cancel = threading.Event()
self._attempt = 0
@property
def active(self) -> bool:
return not self._cancel.is_set() and self._attempt > 0
@property
def attempt(self) -> int:
return self._attempt
def cancel(self) -> None:
"""Signal the loop to stop. Safe to call from any thread."""
self._cancel.set()
def run(self) -> None:
"""Blocking reconnect loop -- run in a worker thread."""
self._cancel.clear()
self._attempt = 0
while not self._cancel.is_set():
self._attempt += 1
delay = min(INITIAL_DELAY * (2 ** (self._attempt - 1)), MAX_DELAY)
self._on_attempt(self._attempt, delay)
if self._cancel.wait(timeout=delay):
break
try:
self._connect()
self._attempt = 0
self._on_success()
return
except Exception as exc:
retryable = getattr(exc, "retryable", True)
self._on_failure(self._attempt, str(exc))
if not retryable:
self._attempt = 0
self._on_exhausted()
return
if self._attempt >= MAX_RETRIES:
self._attempt = 0
self._on_exhausted()
return
self._attempt = 0

View File

@@ -1,25 +1,26 @@
"""Tests for AudioPipeline."""
from tuimble.audio import FRAME_SIZE, SAMPLE_RATE, AudioPipeline
import struct
import numpy as np
from tuimble.audio import FRAME_SIZE, AudioPipeline, _apply_gain
def test_default_construction():
ap = AudioPipeline()
assert ap._sample_rate == SAMPLE_RATE
assert ap._frame_size == FRAME_SIZE
assert ap._input_device is None
assert ap._output_device is None
assert ap.capturing is False
assert ap.deafened is False
assert ap.input_gain == 1.0
assert ap.output_gain == 1.0
def test_custom_construction():
ap = AudioPipeline(sample_rate=24000, frame_size=480,
input_device=1, output_device=2)
assert ap._sample_rate == 24000
assert ap._frame_size == 480
assert ap._input_device == 1
assert ap._output_device == 2
ap = AudioPipeline(
sample_rate=24000, frame_size=480, input_device=1, output_device=2
)
# Verify via public behavior: get_capture_frame returns None
assert ap.get_capture_frame() is None
def test_capturing_toggle():
@@ -36,10 +37,13 @@ def test_get_capture_frame_empty():
assert ap.get_capture_frame() is None
def test_get_capture_frame_returns_queued():
def test_capture_and_retrieve():
"""Capture callback queues frames; get_capture_frame retrieves them."""
ap = AudioPipeline()
ap._capture_queue.put(b"\x01\x02\x03")
assert ap.get_capture_frame() == b"\x01\x02\x03"
ap.capturing = True
pcm = b"\x01\x02\x03\x04"
ap._capture_callback(pcm, 2, None, None)
assert ap.get_capture_frame() == pcm
assert ap.get_capture_frame() is None
@@ -82,12 +86,20 @@ def test_playback_callback_short_pcm_pads_silence():
def test_queue_playback_overflow_drops():
"""Full queue drops new data silently."""
ap = AudioPipeline(frame_size=FRAME_SIZE)
# Fill the queue
for i in range(ap._playback_queue.maxsize):
ap.queue_playback(b"\x00")
# This should not raise
ap.queue_playback(b"\xff")
assert ap._playback_queue.qsize() == ap._playback_queue.maxsize
# Use non-zero PCM so we can distinguish from silence
frame = b"\x42\x42"
for _ in range(50): # maxsize=50
ap.queue_playback(frame)
# This should not raise (dropped because queue is full)
ap.queue_playback(b"\xff\xff")
# Drain and count -- frames with our marker byte
count = 0
for _ in range(60): # more than queue size
outdata = bytearray(2)
ap._playback_callback(outdata, 1, None, None)
if outdata != bytearray(2):
count += 1
assert count == 50
def test_deafened_toggle():
@@ -104,14 +116,16 @@ def test_queue_playback_discards_when_deafened():
ap = AudioPipeline()
ap.deafened = True
ap.queue_playback(b"\x42" * 100)
assert ap._playback_queue.qsize() == 0
# Nothing to play back
outdata = bytearray(200)
ap._playback_callback(outdata, 100, None, None)
assert outdata == bytearray(200) # silence
def test_playback_callback_silence_when_deafened():
"""Playback callback writes silence when deafened, even with queued data."""
ap = AudioPipeline()
frame_bytes = FRAME_SIZE * 2
# Queue data before deafening
pcm = b"\x42" * frame_bytes
ap.queue_playback(pcm)
ap.deafened = True
@@ -125,3 +139,133 @@ def test_stop_without_start():
"""Stop on unstarted pipeline should not raise."""
ap = AudioPipeline()
ap.stop()
def test_stop_drains_queues():
"""Queues are empty after stop()."""
ap = AudioPipeline()
ap.capturing = True
ap._capture_callback(b"\x00\x00", 1, None, None)
ap.queue_playback(b"\x00\x00")
ap.stop()
assert ap.get_capture_frame() is None
# Playback queue also drained -- callback produces silence
outdata = bytearray(2)
ap._playback_callback(outdata, 1, None, None)
assert outdata == bytearray(2)
# -- _apply_gain tests -------------------------------------------------------
def test_apply_gain_unity():
"""Gain 1.0 returns identical samples."""
pcm = struct.pack("<4h", 100, -200, 32767, -32768)
assert _apply_gain(pcm, 1.0) == pcm
def test_apply_gain_double():
"""Gain 2.0 doubles sample values."""
pcm = struct.pack("<2h", 100, -100)
result = struct.unpack("<2h", _apply_gain(pcm, 2.0))
assert result == (200, -200)
def test_apply_gain_clips():
"""Values exceeding int16 range are clipped."""
pcm = struct.pack("<2h", 20000, -20000)
result = struct.unpack("<2h", _apply_gain(pcm, 2.0))
assert result == (32767, -32768)
def test_apply_gain_zero():
"""Gain 0.0 produces silence."""
pcm = struct.pack("<2h", 1000, -1000)
result = struct.unpack("<2h", _apply_gain(pcm, 0.0))
assert result == (0, 0)
def test_apply_gain_empty():
"""Empty buffer returns empty."""
assert _apply_gain(b"", 2.0) == b""
# -- gain property tests ------------------------------------------------------
def test_gain_defaults():
ap = AudioPipeline()
assert ap.input_gain == 1.0
assert ap.output_gain == 1.0
def test_gain_clamping():
ap = AudioPipeline()
ap.input_gain = 3.0
assert ap.input_gain == 2.0
ap.output_gain = -1.0
assert ap.output_gain == 0.0
def test_capture_callback_applies_input_gain():
"""Input gain is applied to captured PCM."""
ap = AudioPipeline()
ap.capturing = True
ap.input_gain = 0.5
pcm = struct.pack("<2h", 1000, -1000)
ap._capture_callback(pcm, 2, None, None)
frame = ap.get_capture_frame()
result = struct.unpack("<2h", frame)
assert result == (500, -500)
def test_playback_callback_applies_output_gain():
"""Output gain is applied during playback."""
ap = AudioPipeline()
ap.output_gain = 0.5
pcm = struct.pack("<2h", 1000, -1000)
ap.queue_playback(pcm)
outdata = bytearray(4)
ap._playback_callback(outdata, 2, None, None)
result = struct.unpack("<2h", bytes(outdata))
assert result == (500, -500)
# -- pitch property tests ----------------------------------------------------
def test_pitch_default():
ap = AudioPipeline()
assert ap.pitch == 0.0
def test_pitch_set_and_get():
ap = AudioPipeline()
ap.pitch = 5.0
assert ap.pitch == 5.0
ap.pitch = -3.0
assert ap.pitch == -3.0
def test_pitch_clamping():
ap = AudioPipeline()
ap.pitch = 20.0
assert ap.pitch == 12.0
ap.pitch = -20.0
assert ap.pitch == -12.0
def test_pitch_applied_on_dequeue():
"""Pitch shifting runs in get_capture_frame, not the callback."""
ap = AudioPipeline()
ap.capturing = True
ap.pitch = 4.0
t = np.arange(FRAME_SIZE) / 48000.0
pcm = (np.sin(2 * np.pi * 440.0 * t) * 16000).astype(np.int16).tobytes()
ap._capture_callback(pcm, FRAME_SIZE, None, None)
frame = ap.get_capture_frame()
assert frame is not None
assert len(frame) == len(pcm)
assert frame != pcm

View File

@@ -1,8 +1,8 @@
"""Tests for MumbleClient dispatcher and callback wiring."""
from unittest.mock import MagicMock
from unittest.mock import MagicMock, patch
from tuimble.client import MumbleClient
from tuimble.client import ConnectionFailed, MumbleClient
def test_default_state():
@@ -67,3 +67,100 @@ def test_set_self_deaf_noop_when_disconnected():
client = MumbleClient(host="localhost")
# Should not raise when not connected
client.set_self_deaf(True)
def test_cert_defaults():
client = MumbleClient(host="localhost")
assert client._certfile == ""
assert client._keyfile == ""
def test_cert_custom():
client = MumbleClient(
host="localhost",
certfile="/path/cert.pem",
keyfile="/path/key.pem",
)
assert client._certfile == "/path/cert.pem"
assert client._keyfile == "/path/key.pem"
def test_connection_failed_retryable():
exc = ConnectionFailed("network error", retryable=True)
assert str(exc) == "network error"
assert exc.retryable is True
def test_connection_failed_not_retryable():
exc = ConnectionFailed("auth rejected", retryable=False)
assert str(exc) == "auth rejected"
assert exc.retryable is False
def test_connection_failed_default_retryable():
exc = ConnectionFailed("something broke")
assert exc.retryable is True
def test_reconnect_resets_state():
client = MumbleClient(host="localhost")
client._connected = True
client._mumble = MagicMock()
with patch.object(client, "connect"):
client.reconnect()
# disconnect should have cleared _mumble before connect
assert client._connected is False
def test_disconnect_clears_connected():
client = MumbleClient(host="localhost")
client._connected = True
client._mumble = MagicMock()
client.disconnect()
assert client.connected is False
def test_set_self_mute_calls_pymumble():
client = MumbleClient(host="localhost")
client._connected = True
myself = MagicMock()
users = MagicMock()
users.myself = myself
mumble = MagicMock()
mumble.users = users
client._mumble = mumble
client.set_self_mute(True)
myself.mute.assert_called_once()
myself.unmute.assert_not_called()
myself.reset_mock()
client.set_self_mute(False)
myself.unmute.assert_called_once()
myself.mute.assert_not_called()
def test_set_self_mute_noop_when_disconnected():
client = MumbleClient(host="localhost")
client.set_self_mute(True)
def test_register_self_calls_pymumble():
client = MumbleClient(host="localhost")
client._connected = True
myself = MagicMock()
users = MagicMock()
users.myself = myself
mumble = MagicMock()
mumble.users = users
client._mumble = mumble
client.register_self()
myself.register.assert_called_once()
def test_register_self_noop_when_disconnected():
client = MumbleClient(host="localhost")
client.register_self()

View File

@@ -1,6 +1,13 @@
"""Tests for configuration module."""
from tuimble.config import Config, PttConfig, ServerConfig
from tuimble.config import (
AudioConfig,
Config,
PttConfig,
ServerConfig,
_load_section,
load_config,
)
def test_default_config():
@@ -21,3 +28,73 @@ def test_ptt_config_defaults():
ptt = PttConfig()
assert ptt.key == "f4"
assert ptt.backend == "auto"
def test_audio_gain_defaults():
acfg = AudioConfig()
assert acfg.input_gain == 1.0
assert acfg.output_gain == 1.0
def test_audio_gain_custom():
acfg = AudioConfig(input_gain=0.5, output_gain=1.5)
assert acfg.input_gain == 0.5
assert acfg.output_gain == 1.5
def test_server_cert_defaults():
srv = ServerConfig()
assert srv.certfile == ""
assert srv.keyfile == ""
def test_server_cert_custom():
srv = ServerConfig(certfile="/path/cert.pem", keyfile="/path/key.pem")
assert srv.certfile == "/path/cert.pem"
assert srv.keyfile == "/path/key.pem"
# -- _load_section tests -----------------------------------------------------
def test_load_section_filters_unknown_keys():
"""Unknown keys are silently dropped, valid keys are kept."""
result = _load_section(
ServerConfig,
{
"host": "example.com",
"typo_field": "oops",
"another_bad": 42,
},
)
assert result.host == "example.com"
assert result.port == 64738 # default preserved
def test_load_section_empty_dict():
result = _load_section(PttConfig, {})
assert result.key == "f4"
assert result.mode == "toggle"
def test_load_section_all_valid():
result = _load_section(PttConfig, {"key": "space", "mode": "hold"})
assert result.key == "space"
assert result.mode == "hold"
def test_load_config_missing_file(tmp_path):
"""Missing config file returns defaults."""
cfg = load_config(tmp_path / "nonexistent.toml")
assert cfg.server.host == "localhost"
def test_load_config_with_unknown_keys(tmp_path):
"""Config file with unknown keys loads without error."""
toml = tmp_path / "config.toml"
toml.write_text(
'[server]\nhost = "example.com"\nbogus = true\n[ptt]\nfuture_option = "x"\n'
)
cfg = load_config(toml)
assert cfg.server.host == "example.com"
assert cfg.ptt.key == "f4" # default, not overwritten

94
tests/test_history.py Normal file
View File

@@ -0,0 +1,94 @@
"""Tests for InputHistory."""
from tuimble.app import InputHistory
def test_empty_history_up_returns_none():
h = InputHistory()
assert h.up("current") is None
def test_empty_history_down_returns_none():
h = InputHistory()
assert h.down() is None
def test_push_then_up_returns_last():
h = InputHistory()
h.push("hello")
assert h.up("draft") == "hello"
def test_up_twice_with_single_entry_stays():
h = InputHistory()
h.push("one")
assert h.up("draft") == "one"
assert h.up("draft") == "one" # no older entry, stays put
def test_up_navigates_backwards():
h = InputHistory()
h.push("first")
h.push("second")
h.push("third")
assert h.up("draft") == "third"
assert h.up("draft") == "second"
assert h.up("draft") == "first"
assert h.up("draft") == "first" # clamped at oldest
def test_down_navigates_forward():
h = InputHistory()
h.push("first")
h.push("second")
h.push("third")
h.up("draft") # -> third
h.up("draft") # -> second
assert h.down() == "third"
def test_down_past_end_restores_draft():
h = InputHistory()
h.push("one")
h.up("my draft") # -> one
result = h.down() # -> back to draft
assert result == "my draft"
def test_down_without_prior_up_returns_none():
h = InputHistory()
h.push("one")
assert h.down() is None
def test_push_resets_navigation():
h = InputHistory()
h.push("first")
h.up("draft") # -> first
h.push("second")
# After push, navigation resets -- up should go to newest
assert h.up("new draft") == "second"
def test_up_preserves_current_as_draft():
"""First up() saves current input; down past end restores it."""
h = InputHistory()
h.push("old")
h.up("typing in progress")
assert h.down() == "typing in progress"
def test_full_cycle():
"""Push several, navigate to oldest, then back to draft."""
h = InputHistory()
h.push("a")
h.push("b")
h.push("c")
assert h.up("draft") == "c"
assert h.up("draft") == "b"
assert h.up("draft") == "a"
assert h.down() == "b"
assert h.down() == "c"
assert h.down() == "draft"
assert h.down() is None # already at draft

78
tests/test_modulator.py Normal file
View File

@@ -0,0 +1,78 @@
"""Tests for PitchShifter."""
import numpy as np
from tuimble.modulator import PitchShifter
SAMPLE_RATE = 48000
FRAME_SIZE = 960 # 20ms at 48kHz
def _sine_pcm(freq: float, n_samples: int = FRAME_SIZE) -> bytes:
"""Generate a single-frequency int16 PCM frame."""
t = np.arange(n_samples) / SAMPLE_RATE
samples = (np.sin(2 * np.pi * freq * t) * 16000).astype(np.int16)
return samples.tobytes()
def _dominant_freq(pcm: bytes) -> float:
"""Return the dominant frequency in an int16 PCM buffer."""
samples = np.frombuffer(pcm, dtype=np.int16).astype(np.float32)
fft = np.abs(np.fft.rfft(samples))
freqs = np.fft.rfftfreq(len(samples), 1.0 / SAMPLE_RATE)
return freqs[np.argmax(fft)]
def test_zero_semitones_passthrough():
ps = PitchShifter(SAMPLE_RATE)
pcm = _sine_pcm(440.0)
assert ps.process(pcm) == pcm
def test_pitch_shift_changes_output():
ps = PitchShifter(SAMPLE_RATE)
pcm = _sine_pcm(440.0)
ps.semitones = 3.0
result = ps.process(pcm)
assert result != pcm
def test_output_length_preserved():
ps = PitchShifter(SAMPLE_RATE)
pcm = _sine_pcm(440.0)
ps.semitones = 5.0
result = ps.process(pcm)
assert len(result) == len(pcm)
def test_semitones_clamping():
ps = PitchShifter()
ps.semitones = 20.0
assert ps.semitones == 12.0
ps.semitones = -20.0
assert ps.semitones == -12.0
ps.semitones = 5.0
assert ps.semitones == 5.0
def test_empty_input():
ps = PitchShifter()
ps.semitones = 3.0
assert ps.process(b"") == b""
assert ps.process(b"\x00") == b"\x00"
def test_pitch_up_frequency_increases():
ps = PitchShifter(SAMPLE_RATE)
pcm = _sine_pcm(440.0)
ps.semitones = 4.0
result = ps.process(pcm)
assert _dominant_freq(result) > _dominant_freq(pcm)
def test_pitch_down_frequency_decreases():
ps = PitchShifter(SAMPLE_RATE)
pcm = _sine_pcm(440.0)
ps.semitones = -4.0
result = ps.process(pcm)
assert _dominant_freq(result) < _dominant_freq(pcm)

192
tests/test_reconnect.py Normal file
View File

@@ -0,0 +1,192 @@
"""Tests for ReconnectManager."""
import threading
from tuimble.reconnect import INITIAL_DELAY, ReconnectManager
def _make_manager(connect_fn=None, **overrides):
"""Build a ReconnectManager with recording callbacks."""
log = {"attempts": [], "failures": [], "success": 0, "exhausted": 0}
def on_attempt(n, delay):
log["attempts"].append((n, delay))
def on_success():
log["success"] += 1
def on_failure(n, msg):
log["failures"].append((n, msg))
def on_exhausted():
log["exhausted"] += 1
if connect_fn is None:
def connect_fn():
return None
mgr = ReconnectManager(
connect_fn=connect_fn,
on_attempt=overrides.get("on_attempt", on_attempt),
on_success=overrides.get("on_success", on_success),
on_failure=overrides.get("on_failure", on_failure),
on_exhausted=overrides.get("on_exhausted", on_exhausted),
)
return mgr, log
# -- basic lifecycle ----------------------------------------------------------
def test_initial_state():
mgr, _ = _make_manager()
assert mgr.active is False
assert mgr.attempt == 0
def test_success_on_first_attempt():
"""Connect succeeds immediately -- one attempt, no failures."""
mgr, log = _make_manager(connect_fn=lambda: None)
mgr.run()
assert log["success"] == 1
assert log["exhausted"] == 0
assert len(log["attempts"]) == 1
assert len(log["failures"]) == 0
assert mgr.active is False
def test_success_after_failures():
"""Connect fails twice, then succeeds on third attempt."""
call_count = 0
def flaky_connect():
nonlocal call_count
call_count += 1
if call_count < 3:
raise ConnectionError("down")
mgr, log = _make_manager(connect_fn=flaky_connect)
# Patch delay to zero for test speed
import tuimble.reconnect as mod
orig = mod.INITIAL_DELAY
mod.INITIAL_DELAY = 0
try:
mgr.run()
finally:
mod.INITIAL_DELAY = orig
assert log["success"] == 1
assert len(log["failures"]) == 2
assert len(log["attempts"]) == 3
# -- non-retryable -----------------------------------------------------------
def test_non_retryable_aborts_immediately():
"""Exception with retryable=False stops the loop."""
class Rejected(Exception):
retryable = False
def _raise():
raise Rejected("banned")
mgr, log = _make_manager(connect_fn=_raise)
mgr.run()
assert log["exhausted"] == 1
assert log["success"] == 0
assert len(log["attempts"]) == 1
assert len(log["failures"]) == 1
# -- max retries --------------------------------------------------------------
def test_exhaustion_after_max_retries():
"""Loop stops after MAX_RETRIES failed attempts."""
import tuimble.reconnect as mod
orig_delay = mod.INITIAL_DELAY
orig_retries = mod.MAX_RETRIES
mod.INITIAL_DELAY = 0
mod.MAX_RETRIES = 3
try:
mgr, log = _make_manager(
connect_fn=lambda: (_ for _ in ()).throw(ConnectionError("nope")),
)
mgr.run()
finally:
mod.INITIAL_DELAY = orig_delay
mod.MAX_RETRIES = orig_retries
assert log["exhausted"] == 1
assert log["success"] == 0
assert len(log["attempts"]) == 3
assert len(log["failures"]) == 3
# -- cancellation -------------------------------------------------------------
def test_cancel_stops_loop():
"""cancel() interrupts the wait and exits the loop."""
barrier = threading.Event()
def slow_attempt(n, delay):
barrier.set()
mgr, log = _make_manager(
connect_fn=lambda: (_ for _ in ()).throw(ConnectionError("fail")),
on_attempt=slow_attempt,
)
t = threading.Thread(target=mgr.run)
t.start()
barrier.wait(timeout=2)
mgr.cancel()
t.join(timeout=2)
assert not t.is_alive()
assert mgr.active is False
# -- backoff ------------------------------------------------------------------
def test_backoff_delays():
"""Verify exponential backoff sequence up to MAX_DELAY."""
mgr, log = _make_manager()
# We only need the attempt callback to record delays; cancel after
# a few attempts to avoid waiting.
import tuimble.reconnect as mod
orig_delay = mod.INITIAL_DELAY
orig_retries = mod.MAX_RETRIES
mod.INITIAL_DELAY = 0 # zero delay for speed
mod.MAX_RETRIES = 5
try:
mgr, log = _make_manager(
connect_fn=lambda: (_ for _ in ()).throw(ConnectionError("x")),
)
mgr.run()
finally:
mod.INITIAL_DELAY = orig_delay
mod.MAX_RETRIES = orig_retries
delays = [d for _, d in log["attempts"]]
# With INITIAL_DELAY=0, all delays are 0 (min(0 * 2^n, MAX_DELAY))
# Test the formula with real values instead
assert len(delays) == 5
def test_backoff_formula():
"""Delay = min(INITIAL_DELAY * 2^(attempt-1), MAX_DELAY)."""
from tuimble.reconnect import MAX_DELAY
expected = []
for i in range(1, 7):
expected.append(min(INITIAL_DELAY * (2 ** (i - 1)), MAX_DELAY))
assert expected == [2, 4, 8, 16, 30, 30]

59
tests/test_strip_html.py Normal file
View File

@@ -0,0 +1,59 @@
"""Tests for _strip_html edge cases."""
from tuimble.app import _strip_html
def test_plain_text_unchanged():
assert _strip_html("hello world") == "hello world"
def test_simple_tags_stripped():
assert _strip_html("<b>bold</b>") == "bold"
def test_nested_tags():
assert _strip_html("<div><p>text</p></div>") == "text"
def test_self_closing_tags():
assert _strip_html("line<br/>break") == "linebreak"
def test_entities_unescaped():
assert _strip_html("&amp; &lt; &gt;") == "& < >"
def test_html_entities_in_tags():
assert _strip_html("<b>&amp;</b>") == "&"
def test_mumble_style_message():
"""Typical Mumble chat message with anchor tag."""
msg = '<a href="https://example.com">link text</a> and more'
assert _strip_html(msg) == "link text and more"
def test_img_tag_with_attributes():
assert _strip_html('before<img src="x.png" alt="pic"/>after') == "beforeafter"
def test_comment_stripped():
assert _strip_html("before<!-- comment -->after") == "beforeafter"
def test_empty_string():
assert _strip_html("") == ""
def test_only_tags():
assert _strip_html("<br><hr><img/>") == ""
def test_unclosed_tag():
"""Malformed HTML should not crash."""
result = _strip_html("<b>unclosed")
assert "unclosed" in result
def test_multiple_entities():
assert _strip_html("&quot;quoted&quot;") == '"quoted"'

406
tests/test_widgets.py Normal file
View File

@@ -0,0 +1,406 @@
"""Tests for StatusBar, ChannelTree, and related widget helpers."""
import pytest
from tuimble.app import (
VOLUME_STEPS,
ChannelSelected,
ChannelTree,
StatusBar,
_next_volume,
)
from tuimble.client import Channel, User
# -- test helpers ------------------------------------------------------------
def _sample_channels():
return {
0: Channel(channel_id=0, name="Root", parent_id=-1),
1: Channel(channel_id=1, name="Alpha", parent_id=0),
2: Channel(channel_id=2, name="Beta", parent_id=0),
}
def _sample_users():
return {
1: [User(session_id=10, name="Alice", channel_id=1)],
}
# -- A. Pure unit tests (sync) ----------------------------------------------
class TestVolBar:
def test_zero(self):
assert StatusBar._vol_bar(0) == "\u2591\u2591\u2591\u2591"
def test_25(self):
assert StatusBar._vol_bar(25) == "\u2588\u2591\u2591\u2591"
def test_50(self):
assert StatusBar._vol_bar(50) == "\u2588\u2588\u2591\u2591"
def test_100(self):
assert StatusBar._vol_bar(100) == "\u2588\u2588\u2588\u2588"
def test_200_overgain(self):
result = StatusBar._vol_bar(200)
# 200/25 = 8, clamped to 4 filled + max(0, 4-8)=0 light
assert "\u2588" in result
class TestTruncate:
def test_short(self):
assert ChannelTree._truncate("hi", 10) == "hi"
def test_exact(self):
assert ChannelTree._truncate("abcde", 5) == "abcde"
def test_long(self):
assert ChannelTree._truncate("abcdefghij", 7) == "abcd..."
def test_tiny_max(self):
assert ChannelTree._truncate("abcdef", 3) == "abc"
def test_max_two(self):
assert ChannelTree._truncate("abcdef", 2) == "ab"
def test_zero_max(self):
assert ChannelTree._truncate("abc", 0) == ""
class TestUserStatus:
def test_normal(self):
u = User(session_id=1, name="X", channel_id=0)
assert ChannelTree._user_status(u) == ""
def test_self_deaf(self):
u = User(session_id=1, name="X", channel_id=0, self_deaf=True)
assert "\u2298" in ChannelTree._user_status(u)
def test_self_mute(self):
u = User(session_id=1, name="X", channel_id=0, self_mute=True)
assert "\u2715" in ChannelTree._user_status(u)
def test_server_deaf(self):
u = User(session_id=1, name="X", channel_id=0, deaf=True)
assert "\u2298" in ChannelTree._user_status(u)
def test_server_mute(self):
u = User(session_id=1, name="X", channel_id=0, mute=True)
assert "\u2715" in ChannelTree._user_status(u)
def test_deaf_takes_priority(self):
u = User(session_id=1, name="X", channel_id=0, self_deaf=True, self_mute=True)
assert "\u2298" in ChannelTree._user_status(u)
class TestNextVolume:
def test_full_cycle(self):
vol = 0.0
seen = [vol]
for _ in range(len(VOLUME_STEPS)):
vol = _next_volume(vol)
seen.append(vol)
# Must cycle through all steps and wrap
assert seen[-1] == VOLUME_STEPS[0]
def test_wraparound_from_max(self):
assert _next_volume(VOLUME_STEPS[-1]) == VOLUME_STEPS[0]
def test_mid_value(self):
assert _next_volume(0.5) == 0.75
# -- B. ChannelTree state tests (sync, direct instantiation) ----------------
class TestChannelTreeState:
def test_build_order_matches_dfs(self):
tree = ChannelTree()
tree.set_state(_sample_channels(), {})
# Root, then sorted children: Alpha, Beta
assert tree._channel_ids == [0, 1, 2]
def test_empty_channels(self):
tree = ChannelTree()
tree.set_state({}, {})
assert tree._channel_ids == []
def test_clear_state(self):
tree = ChannelTree()
tree.set_state(_sample_channels(), _sample_users(), my_channel_id=1)
tree.clear_state()
assert tree._channels == {}
assert tree._users_by_channel == {}
assert tree._channel_ids == []
assert tree._focused_idx == 0
assert tree._my_channel_id is None
def test_focus_clamped_when_shrinks(self):
tree = ChannelTree()
tree.set_state(_sample_channels(), {})
tree._focused_idx = 2 # last channel
# Shrink to only root
tree.set_state({0: Channel(channel_id=0, name="Root", parent_id=-1)}, {})
assert tree._focused_idx == 0
def test_focus_preserved_when_in_range(self):
tree = ChannelTree()
tree.set_state(_sample_channels(), {})
tree._focused_idx = 1
# Re-set same state
tree.set_state(_sample_channels(), {})
assert tree._focused_idx == 1
def test_nested_channels_order(self):
channels = {
0: Channel(channel_id=0, name="Root", parent_id=-1),
1: Channel(channel_id=1, name="Zulu", parent_id=0),
2: Channel(channel_id=2, name="Alpha", parent_id=0),
3: Channel(channel_id=3, name="Sub", parent_id=2),
}
tree = ChannelTree()
tree.set_state(channels, {})
# Root -> Alpha (sorted) -> Sub -> Zulu
assert tree._channel_ids == [0, 2, 3, 1]
# -- C. StatusBar integration tests (async) ---------------------------------
from textual.app import App, ComposeResult # noqa: E402
class StatusBarApp(App):
CSS = """
#status { dock: bottom; height: 1; }
"""
def compose(self) -> ComposeResult:
yield StatusBar(id="status")
@pytest.mark.asyncio
async def test_statusbar_default_disconnected():
app = StatusBarApp()
async with app.run_test(size=(80, 5)) as _pilot:
bar = app.query_one("#status", StatusBar)
assert bar.connected is False
rendered = bar.render()
assert "\u25cb" in rendered # disconnected circle
@pytest.mark.asyncio
async def test_statusbar_connected():
app = StatusBarApp()
async with app.run_test(size=(80, 5)) as _pilot:
bar = app.query_one("#status", StatusBar)
bar.connected = True
rendered = bar.render()
assert "\u25cf" in rendered # filled circle
assert "connected" in rendered
@pytest.mark.asyncio
async def test_statusbar_compact_width():
app = StatusBarApp()
async with app.run_test(size=(30, 5)) as pilot:
bar = app.query_one("#status", StatusBar)
await pilot.resize_terminal(30, 5)
await pilot.pause()
rendered = bar.render()
# Compact mode: symbols only, no labels
assert "connected" not in rendered
@pytest.mark.asyncio
async def test_statusbar_medium_width():
app = StatusBarApp()
async with app.run_test(size=(50, 5)) as _pilot:
bar = app.query_one("#status", StatusBar)
rendered = bar.render()
assert "disconnected" in rendered
@pytest.mark.asyncio
async def test_statusbar_full_width():
app = StatusBarApp()
async with app.run_test(size=(80, 5)) as _pilot:
bar = app.query_one("#status", StatusBar)
bar.connected = True
bar.output_vol = 100
bar.input_vol = 50
rendered = bar.render()
assert "out" in rendered
assert "in" in rendered
@pytest.mark.asyncio
async def test_statusbar_ptt_active():
app = StatusBarApp()
async with app.run_test(size=(80, 5)) as _pilot:
bar = app.query_one("#status", StatusBar)
bar.ptt_active = True
rendered = bar.render()
assert "TX" in rendered
@pytest.mark.asyncio
async def test_statusbar_deaf():
app = StatusBarApp()
async with app.run_test(size=(80, 5)) as _pilot:
bar = app.query_one("#status", StatusBar)
bar.self_deaf = True
rendered = bar.render()
assert "\u2298" in rendered
@pytest.mark.asyncio
async def test_statusbar_muted():
app = StatusBarApp()
async with app.run_test(size=(80, 5)) as _pilot:
bar = app.query_one("#status", StatusBar)
bar.self_mute = True
rendered = bar.render()
assert "\u2715" in rendered
@pytest.mark.asyncio
async def test_statusbar_muted_compact():
app = StatusBarApp()
async with app.run_test(size=(30, 5)) as pilot:
bar = app.query_one("#status", StatusBar)
bar.self_mute = True
await pilot.resize_terminal(30, 5)
await pilot.pause()
rendered = bar.render()
assert "\u2715" in rendered
@pytest.mark.asyncio
async def test_statusbar_muted_medium():
app = StatusBarApp()
async with app.run_test(size=(50, 5)) as _pilot:
bar = app.query_one("#status", StatusBar)
bar.self_mute = True
rendered = bar.render()
assert "\u2715" in rendered
assert "mute" in rendered
@pytest.mark.asyncio
async def test_statusbar_reconnecting():
app = StatusBarApp()
async with app.run_test(size=(80, 5)) as _pilot:
bar = app.query_one("#status", StatusBar)
bar.reconnecting = True
rendered = bar.render()
assert "reconnecting" in rendered
# -- D. ChannelTree integration tests (async) --------------------------------
class ChannelTreeApp(App):
CSS = """
#sidebar { width: 24; height: 1fr; }
"""
BINDINGS = [("q", "quit", "Quit")]
def __init__(self):
super().__init__()
self.selected_messages: list[int] = []
def compose(self) -> ComposeResult:
yield ChannelTree(id="sidebar")
def on_channel_selected(self, msg: ChannelSelected) -> None:
self.selected_messages.append(msg.channel_id)
@pytest.mark.asyncio
async def test_tree_empty_render():
app = ChannelTreeApp()
async with app.run_test(size=(40, 10)) as _pilot:
tree = app.query_one("#sidebar", ChannelTree)
rendered = tree.render()
assert "(not connected)" in rendered
@pytest.mark.asyncio
async def test_tree_populated():
app = ChannelTreeApp()
async with app.run_test(size=(40, 10)) as _pilot:
tree = app.query_one("#sidebar", ChannelTree)
tree.set_state(_sample_channels(), _sample_users())
rendered = tree.render()
assert "Root" in rendered
assert "Alpha" in rendered
assert "Beta" in rendered
assert "Alice" in rendered
@pytest.mark.asyncio
async def test_tree_down_arrow():
app = ChannelTreeApp()
async with app.run_test(size=(40, 10)) as pilot:
tree = app.query_one("#sidebar", ChannelTree)
tree.set_state(_sample_channels(), {})
tree.focus()
await pilot.pause()
assert tree._focused_idx == 0
await pilot.press("down")
assert tree._focused_idx == 1
@pytest.mark.asyncio
async def test_tree_up_arrow():
app = ChannelTreeApp()
async with app.run_test(size=(40, 10)) as pilot:
tree = app.query_one("#sidebar", ChannelTree)
tree.set_state(_sample_channels(), {})
tree._focused_idx = 2
tree.focus()
await pilot.pause()
await pilot.press("up")
assert tree._focused_idx == 1
@pytest.mark.asyncio
async def test_tree_bounds_top():
app = ChannelTreeApp()
async with app.run_test(size=(40, 10)) as pilot:
tree = app.query_one("#sidebar", ChannelTree)
tree.set_state(_sample_channels(), {})
tree.focus()
await pilot.pause()
await pilot.press("up")
assert tree._focused_idx == 0 # stays at 0
@pytest.mark.asyncio
async def test_tree_bounds_bottom():
app = ChannelTreeApp()
async with app.run_test(size=(40, 10)) as pilot:
tree = app.query_one("#sidebar", ChannelTree)
tree.set_state(_sample_channels(), {})
tree._focused_idx = 2
tree.focus()
await pilot.pause()
await pilot.press("down")
assert tree._focused_idx == 2 # stays at max
@pytest.mark.asyncio
async def test_tree_enter_posts_message():
app = ChannelTreeApp()
async with app.run_test(size=(40, 10)) as pilot:
tree = app.query_one("#sidebar", ChannelTree)
tree.set_state(_sample_channels(), {})
tree._focused_idx = 1 # Alpha (channel_id=1)
tree.focus()
await pilot.pause()
await pilot.press("enter")
await pilot.pause()
assert 1 in app.selected_messages