Files
tuimble/src/tuimble/app.py

966 lines
32 KiB
Python

"""TUI application built on Textual."""
from __future__ import annotations
import dataclasses
import html
import logging
from html.parser import HTMLParser
from textual import events, on, work
from textual.app import App, ComposeResult
from textual.containers import Horizontal, Vertical
from textual.message import Message
from textual.reactive import reactive
from textual.widgets import Footer, Header, Input, RichLog, Static
from tuimble.audio import AudioPipeline
from tuimble.client import Channel, MumbleClient, User
from tuimble.config import Config, load_config
from tuimble.ptt import KittyPtt, TogglePtt, detect_backend
from tuimble.reconnect import ReconnectManager
log = logging.getLogger(__name__)
VOLUME_STEPS = (0.0, 0.25, 0.5, 0.75, 1.0, 1.25, 1.5, 2.0)
TREE_DEBOUNCE = 0.1 # seconds to coalesce state changes
class InputHistory:
"""Navigable command history for the chat input."""
def __init__(self):
self._entries: list[str] = []
self._idx: int = -1
self._draft: str = ""
def push(self, text: str) -> None:
self._entries.append(text)
self._idx = -1
def up(self, current: str) -> str | None:
if not self._entries:
return None
if self._idx == -1:
self._draft = current
self._idx = len(self._entries) - 1
elif self._idx > 0:
self._idx -= 1
return self._entries[self._idx]
def down(self) -> str | None:
if self._idx == -1:
return None
if self._idx < len(self._entries) - 1:
self._idx += 1
return self._entries[self._idx]
self._idx = -1
return self._draft
def _next_volume(current: float) -> float:
"""Cycle through VOLUME_STEPS, wrapping to 0.0 after max."""
for step in VOLUME_STEPS:
if step > current + 0.01:
return step
return VOLUME_STEPS[0]
# -- custom messages (pymumble thread -> Textual) ----------------------------
class ServerConnected(Message):
pass
class ServerDisconnected(Message):
pass
class TextMessageReceived(Message):
def __init__(self, sender: str, text: str) -> None:
super().__init__()
self.sender = sender
self.text = text
class ServerStateChanged(Message):
"""Channel or user list changed on the server."""
pass
class ChannelSelected(Message):
"""User selected a channel to join."""
def __init__(self, channel_id: int) -> None:
super().__init__()
self.channel_id = channel_id
# -- widgets -----------------------------------------------------------------
class StatusBar(Static):
"""Connection and PTT status indicator."""
ptt_active = reactive(False)
connected = reactive(False)
reconnecting = reactive(False)
self_deaf = reactive(False)
server_info = reactive("")
output_vol = reactive(100)
input_vol = reactive(100)
@staticmethod
def _vol_bar(pct: int) -> str:
"""Compact 4-char volume indicator using block chars."""
filled = round(pct / 25)
return "\u2588" * filled + "\u2591" * (4 - filled)
def render(self) -> str:
w = self.content_size.width if self.content_size.width > 0 else 80
if self.connected:
conn_sym = "[#9ece6a]\u25cf[/]"
conn_full = f"{conn_sym} connected"
elif self.reconnecting:
conn_sym = "[#e0af68]\u25d0[/]"
conn_full = f"{conn_sym} reconnecting"
else:
conn_sym = "[#f7768e]\u25cb[/]"
conn_full = f"{conn_sym} disconnected"
if self.ptt_active:
ptt_sym = "[#e0af68]\u25cf[/]"
ptt_full = f"{ptt_sym} [bold]TX[/bold]"
else:
ptt_sym = "[#565f89]\u25cb[/]"
ptt_full = f"{ptt_sym} idle"
deaf_sym = "[#f7768e]\u2298[/]" if self.self_deaf else ""
deaf_full = "[#f7768e]\u2298[/] deaf" if self.self_deaf else ""
if w < 40:
return f" {conn_sym} {deaf_sym}{ptt_sym}"
if w < 60:
return f" {conn_full} {deaf_full}{' ' if deaf_full else ''}{ptt_full}"
vol = (
f" [dim]out[/]{self._vol_bar(self.output_vol)}"
f" [dim]in[/]{self._vol_bar(self.input_vol)}"
)
info = f" [dim]{self.server_info}[/]" if self.server_info else ""
deaf = f"{deaf_full} " if deaf_full else ""
return f" {conn_full} {deaf}{ptt_full}{vol}{info}"
class ChannelTree(Static):
"""Channel and user list with keyboard navigation."""
DEFAULT_WIDTH = 24
can_focus = True
def __init__(self, **kwargs) -> None:
super().__init__(**kwargs)
self._channels: dict[int, Channel] = {}
self._users_by_channel: dict[int, list[User]] = {}
self._channel_ids: list[int] = []
self._focused_idx: int = 0
self._my_channel_id: int | None = None
self._root_id: int = 0
def set_state(
self,
channels: dict[int, Channel],
users_by_channel: dict[int, list[User]],
my_channel_id: int | None = None,
) -> None:
self._channels = channels
self._users_by_channel = users_by_channel
self._my_channel_id = my_channel_id
self._root_id = self._find_root() if channels else 0
self._channel_ids = self._build_channel_order()
if self._channel_ids:
self._focused_idx = max(
0, min(self._focused_idx, len(self._channel_ids) - 1)
)
else:
self._focused_idx = 0
self.refresh()
def clear_state(self) -> None:
self._channels = {}
self._users_by_channel = {}
self._channel_ids = []
self._focused_idx = 0
self._my_channel_id = None
self.refresh()
def _build_channel_order(self) -> list[int]:
"""Build flat list of channel IDs in tree order."""
if not self._channels:
return []
order: list[int] = []
self._collect_order(self._root_id, order)
return order
def _collect_order(self, channel_id: int, order: list[int]) -> None:
ch = self._channels.get(channel_id)
if ch is None:
return
order.append(channel_id)
children = sorted(
(c for c in self._channels.values()
if c.parent_id == channel_id and c.channel_id != channel_id),
key=lambda c: c.name,
)
for child in children:
self._collect_order(child.channel_id, order)
def _get_width(self) -> int:
"""Usable character width (content area, excludes padding/border)."""
w = self.content_size.width
if w <= 0:
w = self.DEFAULT_WIDTH
return max(w, 12)
@staticmethod
def _truncate(text: str, max_width: int) -> str:
"""Clip text with ellipsis if it exceeds max_width."""
if max_width < 1:
return ""
if len(text) <= max_width:
return text
if max_width <= 3:
return text[:max_width]
return text[: max_width - 3] + "..."
@staticmethod
def _user_status(user: User) -> str:
"""Return status indicator for a user."""
if user.self_deaf or user.deaf:
return " [#f7768e]\u2298[/]"
if user.self_mute or user.mute:
return " [#e0af68]\u2715[/]"
return ""
def render(self) -> str:
if not self._channels:
return " Channels\n [dim]\u2514\u2500 (not connected)[/]"
w = self._get_width()
lines = [" [bold]Channels[/]"]
self._render_tree(self._root_id, lines, indent=1, is_last=True, w=w)
return "\n".join(lines)
def _find_root(self) -> int:
for cid, ch in self._channels.items():
if ch.parent_id == -1 or cid == 0:
return cid
return min(self._channels) if self._channels else 0
def _render_tree(
self,
channel_id: int,
lines: list[str],
indent: int,
is_last: bool,
w: int,
) -> None:
ch = self._channels.get(channel_id)
if ch is None:
return
prefix = " " * indent
branch = "\u2514\u2500" if is_last else "\u251c\u2500"
is_current = channel_id == self._my_channel_id
is_focused = (
self.has_focus
and self._channel_ids
and self._focused_idx < len(self._channel_ids)
and self._channel_ids[self._focused_idx] == channel_id
)
marker = "\u25cf " if is_current else " "
# indent + 2 branch + 1 space + 2 marker
name_max = w - indent - 5
name = self._truncate(ch.name, name_max)
if is_focused:
lines.append(
f"{prefix}{branch} {marker}[reverse bold]{name}[/]"
)
elif is_current:
lines.append(
f"{prefix}{branch} {marker}[bold #9ece6a]{name}[/]"
)
else:
lines.append(f"{prefix}{branch} {marker}[bold]{name}[/]")
users = self._users_by_channel.get(channel_id, [])
children = [
c
for c in sorted(self._channels.values(), key=lambda c: c.name)
if c.parent_id == channel_id and c.channel_id != channel_id
]
sub_indent = indent + 2
sub_prefix = " " * sub_indent
# sub_indent + 2 bullet chars + 1 space
user_max = w - sub_indent - 5
for i, user in enumerate(users):
is_last_item = i == len(users) - 1 and not children
bullet = "\u2514\u2500" if is_last_item else "\u251c\u2500"
uname = self._truncate(user.name, user_max)
status = self._user_status(user)
lines.append(
f"{sub_prefix}{bullet} [#7aa2f7]{uname}[/]{status}"
)
for i, child in enumerate(children):
self._render_tree(
child.channel_id,
lines,
indent + 2,
is_last=i == len(children) - 1,
w=w,
)
def on_key(self, event: events.Key) -> None:
if not self._channel_ids:
return
if event.key == "up":
self._focused_idx = max(0, self._focused_idx - 1)
self.refresh()
event.prevent_default()
event.stop()
elif event.key == "down":
self._focused_idx = min(
len(self._channel_ids) - 1, self._focused_idx + 1
)
self.refresh()
event.prevent_default()
event.stop()
elif event.key == "enter":
cid = self._channel_ids[self._focused_idx]
self.post_message(ChannelSelected(cid))
event.prevent_default()
event.stop()
class ChatLog(RichLog):
"""Message log."""
def __init__(self, **kwargs) -> None:
super().__init__(markup=True, wrap=True, **kwargs)
# -- main app ----------------------------------------------------------------
class TuimbleApp(App):
"""Main application."""
TITLE = "tuimble"
CSS = """
Screen {
background: #1a1a2e;
color: #a9b1d6;
}
#main {
height: 1fr;
}
#sidebar {
width: 24;
border-right: solid #292e42;
padding: 0 1;
overflow-x: hidden;
}
#sidebar:focus {
border-right: solid #7aa2f7;
}
#chat-area {
width: 1fr;
}
#chatlog {
height: 1fr;
border-bottom: solid #292e42;
scrollbar-size: 1 1;
}
#input {
dock: bottom;
height: 3;
border-top: solid #292e42;
}
#status {
dock: bottom;
height: 1;
background: #16161e;
}
Footer {
background: #16161e;
}
Header {
background: #16161e;
}
"""
BINDINGS = [
("f1", "toggle_deaf", "Deafen"),
("f2", "cycle_output_volume", "Vol Out"),
("f3", "cycle_input_volume", "Vol In"),
("f5", "reload_config", "Reload"),
("q", "quit", "Quit"),
("ctrl+c", "quit", "Quit"),
]
def __init__(self):
super().__init__()
self._config: Config = load_config()
self._ptt = detect_backend(
self._on_ptt_change, self._config.ptt.backend
)
self._client = self._make_client()
self._history = InputHistory()
acfg = self._config.audio
self._audio = AudioPipeline(
sample_rate=acfg.sample_rate,
frame_size=acfg.frame_size,
input_device=acfg.input_device,
output_device=acfg.output_device,
)
self._audio.input_gain = acfg.input_gain
self._audio.output_gain = acfg.output_gain
self._pending_reload: Config | None = None
self._tree_refresh_timer = None
self._reconnect = ReconnectManager(
connect_fn=self._reconnect_connect,
on_attempt=self._reconnect_on_attempt,
on_success=self._reconnect_on_success,
on_failure=self._reconnect_on_failure,
on_exhausted=self._reconnect_on_exhausted,
)
self._intentional_disconnect: bool = False
def _make_client(self, srv=None) -> MumbleClient:
"""Create a MumbleClient from the current (or given) server config."""
srv = srv or self._config.server
return MumbleClient(
host=srv.host,
port=srv.port,
username=srv.username,
password=srv.password,
certfile=srv.certfile,
keyfile=srv.keyfile,
)
def compose(self) -> ComposeResult:
yield Header()
with Horizontal(id="main"):
yield ChannelTree(id="sidebar")
with Vertical(id="chat-area"):
yield ChatLog(id="chatlog")
yield Input(placeholder="message...", id="input")
yield StatusBar(id="status")
yield Footer()
def on_mount(self) -> None:
status = self.query_one("#status", StatusBar)
status.output_vol = int(self._audio.output_gain * 100)
status.input_vol = int(self._audio.input_gain * 100)
chatlog = self.query_one("#chatlog", ChatLog)
chatlog.write("[dim]tuimble v0.1.0[/dim]")
srv = self._config.server
chatlog.write(f"[dim]connecting to {srv.host}:{srv.port}...[/dim]")
self._connect_to_server()
# -- server connection (worker thread) -----------------------------------
def _wire_client_callbacks(self) -> None:
"""Wire pymumble callbacks to message dispatchers."""
self._client.set_dispatcher(self.call_from_thread)
self._client.on_connected = self._cb_connected
self._client.on_disconnected = self._cb_disconnected
self._client.on_text_message = self._cb_text_message
self._client.on_user_update = self._cb_state_changed
self._client.on_channel_update = self._cb_state_changed
self._client.on_sound_received = self._cb_sound_received
@work(thread=True)
def _connect_to_server(self) -> None:
self._wire_client_callbacks()
try:
self._client.connect()
except Exception as exc:
self.call_from_thread(self._show_error, str(exc))
def _cb_connected(self) -> None:
self.post_message(ServerConnected())
def _cb_disconnected(self) -> None:
self.post_message(ServerDisconnected())
def _cb_text_message(self, sender: str, text: str) -> None:
self.post_message(TextMessageReceived(sender, text))
def _cb_state_changed(self) -> None:
self.post_message(ServerStateChanged())
def _cb_sound_received(self, _user, pcm_data: bytes) -> None:
self._audio.queue_playback(pcm_data)
def _show_error(self, text: str) -> None:
chatlog = self.query_one("#chatlog", ChatLog)
chatlog.write(f"[#f7768e]\u2717 {text}[/]")
# -- auto-reconnect ------------------------------------------------------
def _reconnect_connect(self) -> None:
"""Called by ReconnectManager to attempt a new connection."""
self._client.disconnect()
self._client = self._make_client()
self._wire_client_callbacks()
self._client.connect()
def _reconnect_on_attempt(self, attempt: int, delay: float) -> None:
self.call_from_thread(self._log_reconnect, attempt, delay)
def _reconnect_on_success(self) -> None:
self.call_from_thread(self.post_message, ServerConnected())
def _reconnect_on_failure(self, attempt: int, error: str) -> None:
self.call_from_thread(self._show_error, f"attempt {attempt}: {error}")
def _reconnect_on_exhausted(self) -> None:
self.call_from_thread(self._show_reconnect_exhausted)
def _log_reconnect(self, attempt: int, delay: float) -> None:
"""Log reconnection attempt to chatlog."""
from tuimble.reconnect import MAX_RETRIES
status = self.query_one("#status", StatusBar)
status.reconnecting = True
chatlog = self.query_one("#chatlog", ChatLog)
chatlog.write(
f"[dim]reconnecting in {delay}s "
f"(attempt {attempt}/{MAX_RETRIES})...[/dim]"
)
def _show_reconnect_exhausted(self) -> None:
"""Handle all reconnection attempts exhausted."""
from tuimble.reconnect import MAX_RETRIES
status = self.query_one("#status", StatusBar)
status.reconnecting = False
chatlog = self.query_one("#chatlog", ChatLog)
chatlog.write(
f"[#f7768e]reconnection failed after "
f"{MAX_RETRIES} attempts[/]"
)
chatlog.write("[dim]press F5 to retry manually[/dim]")
@work(thread=True)
def _start_reconnect(self) -> None:
"""Run the reconnect manager in a worker thread."""
self._reconnect.run()
def _cancel_reconnect(self) -> None:
"""Cancel an in-progress reconnect loop."""
self._reconnect.cancel()
try:
status = self.query_one("#status", StatusBar)
status.reconnecting = False
except Exception:
pass
# -- message handlers ----------------------------------------------------
def on_server_connected(self, _msg: ServerConnected) -> None:
self._intentional_disconnect = False
status = self.query_one("#status", StatusBar)
status.reconnecting = False
status.connected = True
srv = self._config.server
status.server_info = f"{srv.host}:{srv.port}"
chatlog = self.query_one("#chatlog", ChatLog)
chatlog.write(
f"[#9ece6a]\u2713 connected as {self._config.server.username}[/]"
)
self._refresh_channel_tree()
self._start_audio()
def on_server_disconnected(self, _msg: ServerDisconnected) -> None:
self._audio.stop()
status = self.query_one("#status", StatusBar)
status.connected = False
status.server_info = ""
tree = self.query_one("#sidebar", ChannelTree)
tree.clear_state()
if self._intentional_disconnect or self._reconnect.active:
return
chatlog = self.query_one("#chatlog", ChatLog)
chatlog.write("[#f7768e]\u2717 disconnected from server[/]")
self._start_reconnect()
def on_text_message_received(self, msg: TextMessageReceived) -> None:
chatlog = self.query_one("#chatlog", ChatLog)
# Strip HTML tags from Mumble messages
clean = _strip_html(msg.text)
chatlog.write(f"[#7aa2f7]{msg.sender}[/] {clean}")
def on_server_state_changed(self, _msg: ServerStateChanged) -> None:
if self._tree_refresh_timer is not None:
self._tree_refresh_timer.stop()
self._tree_refresh_timer = self.set_timer(
TREE_DEBOUNCE, self._refresh_channel_tree,
)
def on_channel_selected(self, msg: ChannelSelected) -> None:
if not self._client.connected:
return
ch = self._client.channels.get(msg.channel_id)
name = ch.name if ch else str(msg.channel_id)
try:
self._client.join_channel(msg.channel_id)
except Exception as exc:
self._show_error(f"join failed: {exc}")
return
chatlog = self.query_one("#chatlog", ChatLog)
chatlog.write(f"[dim]\u2192 joined {name}[/dim]")
@on(Input.Submitted, "#input")
def on_input_submitted(self, event: Input.Submitted) -> None:
text = event.value.strip()
if not text:
return
event.input.clear()
self._history.push(text)
if not self._client.connected:
self._show_error("not connected")
return
self._client.send_text(text)
chatlog = self.query_one("#chatlog", ChatLog)
chatlog.write(
f"[#e0af68]{self._config.server.username}[/] {text}"
)
# -- audio ---------------------------------------------------------------
def _start_audio(self) -> None:
"""Start audio pipeline; log error if hardware unavailable."""
chatlog = self.query_one("#chatlog", ChatLog)
try:
self._audio.start()
chatlog.write("[dim]audio pipeline started[/dim]")
self._audio_send_loop()
except Exception as exc:
chatlog.write(f"[#f7768e]\u2717 audio: {exc}[/]")
log.warning("audio start failed: %s", exc)
@work(thread=True)
def _audio_send_loop(self) -> None:
"""Send captured audio frames to the server."""
while self._client.connected:
frame = self._audio.get_capture_frame(timeout=0.02)
if frame is not None:
self._client.send_audio(frame)
# -- channel tree --------------------------------------------------------
def _refresh_channel_tree(self) -> None:
if not self._client.connected:
return
channels = self._client.channels
users = self._client.users
users_by_channel: dict[int, list[User]] = {}
for u in users.values():
users_by_channel.setdefault(u.channel_id, []).append(u)
for lst in users_by_channel.values():
lst.sort(key=lambda u: u.name)
tree = self.query_one("#sidebar", ChannelTree)
tree.set_state(channels, users_by_channel, self._client.my_channel_id)
# -- deafen --------------------------------------------------------------
def action_toggle_deaf(self) -> None:
"""Toggle self-deafen."""
deafened = not self._audio.deafened
self._audio.deafened = deafened
self._client.set_self_deaf(deafened)
status = self.query_one("#status", StatusBar)
status.self_deaf = deafened
chatlog = self.query_one("#chatlog", ChatLog)
if deafened:
chatlog.write("[#f7768e]\u2298 deafened[/]")
else:
chatlog.write("[#9ece6a]\u2713 undeafened[/]")
# -- volume ---------------------------------------------------------------
def action_cycle_output_volume(self) -> None:
"""Cycle output volume through preset steps."""
vol = _next_volume(self._audio.output_gain)
self._audio.output_gain = vol
pct = int(vol * 100)
status = self.query_one("#status", StatusBar)
status.output_vol = pct
chatlog = self.query_one("#chatlog", ChatLog)
chatlog.write(f"[dim]output volume {pct}%[/dim]")
def action_cycle_input_volume(self) -> None:
"""Cycle input volume through preset steps."""
vol = _next_volume(self._audio.input_gain)
self._audio.input_gain = vol
pct = int(vol * 100)
status = self.query_one("#status", StatusBar)
status.input_vol = pct
chatlog = self.query_one("#chatlog", ChatLog)
chatlog.write(f"[dim]input volume {pct}%[/dim]")
# -- config reload --------------------------------------------------------
def _detect_config_changes(
self, old: Config, new: Config,
) -> tuple[list[str], list[str]]:
"""Compare configs, return (safe_changes, restart_changes)."""
safe: list[str] = []
restart: list[str] = []
# PTT: all fields are safe to hot-reload
old_ptt = dataclasses.asdict(old.ptt)
new_ptt = dataclasses.asdict(new.ptt)
for key in old_ptt:
if old_ptt[key] != new_ptt[key]:
safe.append(f"ptt {key}: {old_ptt[key]} -> {new_ptt[key]}")
# Audio: gains are safe; hardware settings require restart
safe_audio = {"input_gain", "output_gain"}
old_aud = dataclasses.asdict(old.audio)
new_aud = dataclasses.asdict(new.audio)
for key in old_aud:
if old_aud[key] != new_aud[key]:
if key in safe_audio:
safe.append(f"{key}: {old_aud[key]} -> {new_aud[key]}")
else:
restart.append(f"audio.{key} changed")
# Server: all changes require restart
old_srv = dataclasses.asdict(old.server)
new_srv = dataclasses.asdict(new.server)
for key in old_srv:
if old_srv[key] != new_srv[key]:
label = "password" if key == "password" else key
restart.append(f"server.{label} changed")
return safe, restart
def _apply_safe_changes(self, new: Config) -> None:
"""Apply hot-reload-safe config changes immediately."""
self._config.ptt = new.ptt
self._ptt = detect_backend(
self._on_ptt_change, new.ptt.backend
)
self._audio.input_gain = new.audio.input_gain
self._audio.output_gain = new.audio.output_gain
status = self.query_one("#status", StatusBar)
status.input_vol = int(new.audio.input_gain * 100)
status.output_vol = int(new.audio.output_gain * 100)
def _apply_restart_changes(self, new: Config) -> None:
"""Apply changes that require reconnect/audio restart."""
chatlog = self.query_one("#chatlog", ChatLog)
old = self._config
server_changed = (
dataclasses.asdict(old.server) != dataclasses.asdict(new.server)
)
audio_hw_changed = any(
getattr(old.audio, a) != getattr(new.audio, a)
for a in ("input_device", "output_device", "sample_rate")
)
if server_changed:
self._intentional_disconnect = True
self._cancel_reconnect()
self._audio.stop()
self._client.set_dispatcher(None)
self._client.disconnect()
tree = self.query_one("#sidebar", ChannelTree)
tree.clear_state()
self._client = self._make_client(new.server)
self._config = new
chatlog.write(
f"[dim]reconnecting to {new.server.host}:{new.server.port}...[/]"
)
self._connect_to_server()
elif audio_hw_changed:
self._audio.stop()
acfg = new.audio
self._audio = AudioPipeline(
sample_rate=acfg.sample_rate,
frame_size=acfg.frame_size,
input_device=acfg.input_device,
output_device=acfg.output_device,
)
self._audio.input_gain = acfg.input_gain
self._audio.output_gain = acfg.output_gain
self._config = new
if self._client.connected:
self._start_audio()
chatlog.write("[dim]audio pipeline restarted[/dim]")
def action_reload_config(self) -> None:
"""Reload config from disk (F5).
Also serves as manual reconnect when disconnected.
"""
if self._reconnect.active:
self._cancel_reconnect()
if not self._client.connected:
chatlog = self.query_one("#chatlog", ChatLog)
self._client = self._make_client()
srv = self._config.server
chatlog.write(
f"[dim]connecting to {srv.host}:{srv.port}...[/dim]"
)
self._intentional_disconnect = False
self._connect_to_server()
return
chatlog = self.query_one("#chatlog", ChatLog)
if self._pending_reload is not None:
new = self._pending_reload
self._pending_reload = None
self._apply_restart_changes(new)
return
try:
new = load_config()
except Exception as exc:
self._show_error(f"config reload: {exc}")
return
safe, restart = self._detect_config_changes(self._config, new)
if not safe and not restart:
chatlog.write("[dim]config unchanged[/dim]")
return
self._apply_safe_changes(new)
self._config.audio.input_gain = new.audio.input_gain
self._config.audio.output_gain = new.audio.output_gain
if safe:
for change in safe:
chatlog.write(f"[dim]\u2713 {change}[/dim]")
if restart:
for change in restart:
chatlog.write(f"[#e0af68]\u26a0 {change}[/]")
chatlog.write(
"[dim]press F5 again to apply, "
"or any key to cancel[/dim]"
)
self._pending_reload = new
else:
chatlog.write("[dim]\u2713 config reloaded[/dim]")
# -- PTT -----------------------------------------------------------------
def on_key(self, event: events.Key) -> None:
"""Handle input history navigation and PTT key events."""
if self._pending_reload is not None and event.key != "f5":
self._pending_reload = None
chatlog = self.query_one("#chatlog", ChatLog)
chatlog.write("[dim]reload cancelled[/dim]")
focused = self.focused
if isinstance(focused, Input) and event.key in ("up", "down"):
inp = focused
if event.key == "up":
val = self._history.up(inp.value)
else:
val = self._history.down()
if val is not None:
inp.value = val
inp.cursor_position = len(val)
event.prevent_default()
return
if event.key != self._config.ptt.key:
return
if isinstance(self._ptt, TogglePtt):
self._ptt.toggle()
elif isinstance(self._ptt, KittyPtt):
# Kitty hold-mode requires key-release events that Textual
# does not expose; kept for explicit backend="kitty" only.
self._ptt.key_down()
def _on_ptt_change(self, transmitting: bool) -> None:
self._audio.capturing = transmitting
status = self.query_one("#status", StatusBar)
status.ptt_active = transmitting
if self._config.ptt.mode == "hold":
return
chatlog = self.query_one("#chatlog", ChatLog)
if transmitting:
chatlog.write("[#e0af68]● transmitting[/]")
else:
chatlog.write("[dim]○ stopped transmitting[/dim]")
# -- resize --------------------------------------------------------------
def on_resize(self, event: events.Resize) -> None:
"""Adapt layout to new terminal dimensions."""
sidebar = self.query_one("#sidebar", ChannelTree)
sidebar.styles.width = max(16, min(32, event.size.width // 4))
self.query_one("#status", StatusBar).refresh()
# -- lifecycle -----------------------------------------------------------
def action_quit(self) -> None:
self._intentional_disconnect = True
self._cancel_reconnect()
self._audio.stop()
self._client.set_dispatcher(None)
self._client.disconnect()
self.exit()
class _HTMLStripper(HTMLParser):
"""Extract text content from HTML, discarding all tags."""
def __init__(self):
super().__init__()
self._parts: list[str] = []
def handle_data(self, data: str):
self._parts.append(data)
def get_text(self) -> str:
return "".join(self._parts)
def _strip_html(text: str) -> str:
"""Remove HTML tags and unescape entities from Mumble messages."""
stripper = _HTMLStripper()
stripper.feed(text)
return html.unescape(stripper.get_text())