app: extract InputHistory, _make_client factory, cache _find_root

InputHistory encapsulates history navigation (up/down/push).
_make_client() deduplicates 4 MumbleClient instantiation sites.
_find_root() result cached in set_state() to avoid double lookup.
This commit is contained in:
Username
2026-02-24 16:32:16 +01:00
parent a6380b53f7
commit 216a4be4fd

View File

@@ -4,7 +4,6 @@ from __future__ import annotations
import html import html
import logging import logging
import time
from html.parser import HTMLParser from html.parser import HTMLParser
from textual import events, on, work from textual import events, on, work
@@ -15,20 +14,50 @@ from textual.reactive import reactive
from textual.widgets import Footer, Header, Input, RichLog, Static from textual.widgets import Footer, Header, Input, RichLog, Static
from tuimble.audio import AudioPipeline from tuimble.audio import AudioPipeline
from tuimble.client import Channel, ConnectionFailed, MumbleClient, User from tuimble.client import Channel, MumbleClient, User
from tuimble.config import Config, load_config from tuimble.config import Config, load_config
from tuimble.ptt import KittyPtt, TogglePtt, detect_backend from tuimble.ptt import KittyPtt, TogglePtt, detect_backend
from tuimble.reconnect import ReconnectManager
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
VOLUME_STEPS = (0.0, 0.25, 0.5, 0.75, 1.0, 1.25, 1.5, 2.0) VOLUME_STEPS = (0.0, 0.25, 0.5, 0.75, 1.0, 1.25, 1.5, 2.0)
RECONNECT_INITIAL = 2 # seconds before first retry
RECONNECT_MAX = 30 # maximum backoff delay
RECONNECT_RETRIES = 10 # attempts before giving up
TREE_DEBOUNCE = 0.1 # seconds to coalesce state changes TREE_DEBOUNCE = 0.1 # seconds to coalesce state changes
class InputHistory:
"""Navigable command history for the chat input."""
def __init__(self):
self._entries: list[str] = []
self._idx: int = -1
self._draft: str = ""
def push(self, text: str) -> None:
self._entries.append(text)
self._idx = -1
def up(self, current: str) -> str | None:
if not self._entries:
return None
if self._idx == -1:
self._draft = current
self._idx = len(self._entries) - 1
elif self._idx > 0:
self._idx -= 1
return self._entries[self._idx]
def down(self) -> str | None:
if self._idx == -1:
return None
if self._idx < len(self._entries) - 1:
self._idx += 1
return self._entries[self._idx]
self._idx = -1
return self._draft
def _next_volume(current: float) -> float: def _next_volume(current: float) -> float:
"""Cycle through VOLUME_STEPS, wrapping to 0.0 after max.""" """Cycle through VOLUME_STEPS, wrapping to 0.0 after max."""
for step in VOLUME_STEPS: for step in VOLUME_STEPS:
@@ -139,6 +168,7 @@ class ChannelTree(Static):
self._channel_ids: list[int] = [] self._channel_ids: list[int] = []
self._focused_idx: int = 0 self._focused_idx: int = 0
self._my_channel_id: int | None = None self._my_channel_id: int | None = None
self._root_id: int = 0
def set_state( def set_state(
self, self,
@@ -149,6 +179,7 @@ class ChannelTree(Static):
self._channels = channels self._channels = channels
self._users_by_channel = users_by_channel self._users_by_channel = users_by_channel
self._my_channel_id = my_channel_id self._my_channel_id = my_channel_id
self._root_id = self._find_root() if channels else 0
self._channel_ids = self._build_channel_order() self._channel_ids = self._build_channel_order()
if self._channel_ids: if self._channel_ids:
self._focused_idx = max( self._focused_idx = max(
@@ -171,8 +202,7 @@ class ChannelTree(Static):
if not self._channels: if not self._channels:
return [] return []
order: list[int] = [] order: list[int] = []
root_id = self._find_root() self._collect_order(self._root_id, order)
self._collect_order(root_id, order)
return order return order
def _collect_order(self, channel_id: int, order: list[int]) -> None: def _collect_order(self, channel_id: int, order: list[int]) -> None:
@@ -221,8 +251,7 @@ class ChannelTree(Static):
w = self._get_width() w = self._get_width()
lines = [" [bold]Channels[/]"] lines = [" [bold]Channels[/]"]
root_id = self._find_root() self._render_tree(self._root_id, lines, indent=1, is_last=True, w=w)
self._render_tree(root_id, lines, indent=1, is_last=True, w=w)
return "\n".join(lines) return "\n".join(lines)
def _find_root(self) -> int: def _find_root(self) -> int:
@@ -392,18 +421,8 @@ class TuimbleApp(App):
self._ptt = detect_backend( self._ptt = detect_backend(
self._on_ptt_change, self._config.ptt.backend self._on_ptt_change, self._config.ptt.backend
) )
srv = self._config.server self._client = self._make_client()
self._client = MumbleClient( self._history = InputHistory()
host=srv.host,
port=srv.port,
username=srv.username,
password=srv.password,
certfile=srv.certfile,
keyfile=srv.keyfile,
)
self._history: list[str] = []
self._history_idx: int = -1
self._history_draft: str = ""
acfg = self._config.audio acfg = self._config.audio
self._audio = AudioPipeline( self._audio = AudioPipeline(
sample_rate=acfg.sample_rate, sample_rate=acfg.sample_rate,
@@ -415,10 +434,27 @@ class TuimbleApp(App):
self._audio.output_gain = acfg.output_gain self._audio.output_gain = acfg.output_gain
self._pending_reload: Config | None = None self._pending_reload: Config | None = None
self._tree_refresh_timer = None self._tree_refresh_timer = None
self._reconnecting: bool = False self._reconnect = ReconnectManager(
self._reconnect_attempt: int = 0 connect_fn=self._reconnect_connect,
on_attempt=self._reconnect_on_attempt,
on_success=self._reconnect_on_success,
on_failure=self._reconnect_on_failure,
on_exhausted=self._reconnect_on_exhausted,
)
self._intentional_disconnect: bool = False self._intentional_disconnect: bool = False
def _make_client(self, srv=None) -> MumbleClient:
"""Create a MumbleClient from the current (or given) server config."""
srv = srv or self._config.server
return MumbleClient(
host=srv.host,
port=srv.port,
username=srv.username,
password=srv.password,
certfile=srv.certfile,
keyfile=srv.keyfile,
)
def compose(self) -> ComposeResult: def compose(self) -> ComposeResult:
yield Header() yield Header()
with Horizontal(id="main"): with Horizontal(id="main"):
@@ -481,97 +517,58 @@ class TuimbleApp(App):
# -- auto-reconnect ------------------------------------------------------ # -- auto-reconnect ------------------------------------------------------
@work(thread=True) def _reconnect_connect(self) -> None:
def _reconnect_loop(self) -> None: """Called by ReconnectManager to attempt a new connection."""
"""Retry connection with exponential backoff.""" self._client.disconnect()
while self._reconnecting: self._client = self._make_client()
self._reconnect_attempt += 1 self._wire_client_callbacks()
delay = min( self._client.connect()
RECONNECT_INITIAL * (2 ** (self._reconnect_attempt - 1)),
RECONNECT_MAX,
)
self.call_from_thread(
self._log_reconnect, self._reconnect_attempt, delay,
)
elapsed = 0.0 def _reconnect_on_attempt(self, attempt: int, delay: float) -> None:
while elapsed < delay and self._reconnecting: self.call_from_thread(self._log_reconnect, attempt, delay)
time.sleep(0.5)
elapsed += 0.5
if not self._reconnecting: def _reconnect_on_success(self) -> None:
break self.call_from_thread(self.post_message, ServerConnected())
try: def _reconnect_on_failure(self, attempt: int, error: str) -> None:
self._client.disconnect() self.call_from_thread(self._show_error, f"attempt {attempt}: {error}")
srv = self._config.server
self._client = MumbleClient(
host=srv.host,
port=srv.port,
username=srv.username,
password=srv.password,
certfile=srv.certfile,
keyfile=srv.keyfile,
)
self._wire_client_callbacks()
self._client.connect()
self._reconnecting = False def _reconnect_on_exhausted(self) -> None:
self._reconnect_attempt = 0 self.call_from_thread(self._show_reconnect_exhausted)
self.call_from_thread(self._on_reconnect_success)
return
except ConnectionFailed as exc:
if not exc.retryable:
self.call_from_thread(
self._show_error, f"rejected: {exc}",
)
self._reconnecting = False
self.call_from_thread(self._on_reconnect_exhausted)
return
self.call_from_thread(
self._show_error,
f"attempt {self._reconnect_attempt}: {exc}",
)
except Exception as exc:
self.call_from_thread(
self._show_error,
f"attempt {self._reconnect_attempt}: {exc}",
)
if self._reconnect_attempt >= RECONNECT_RETRIES: def _log_reconnect(self, attempt: int, delay: float) -> None:
self._reconnecting = False
self.call_from_thread(self._on_reconnect_exhausted)
return
def _log_reconnect(self, attempt: int, delay: int) -> None:
"""Log reconnection attempt to chatlog.""" """Log reconnection attempt to chatlog."""
from tuimble.reconnect import MAX_RETRIES
status = self.query_one("#status", StatusBar) status = self.query_one("#status", StatusBar)
status.reconnecting = True status.reconnecting = True
chatlog = self.query_one("#chatlog", ChatLog) chatlog = self.query_one("#chatlog", ChatLog)
chatlog.write( chatlog.write(
f"[dim]reconnecting in {delay}s " f"[dim]reconnecting in {delay}s "
f"(attempt {attempt}/{RECONNECT_RETRIES})...[/dim]" f"(attempt {attempt}/{MAX_RETRIES})...[/dim]"
) )
def _on_reconnect_success(self) -> None: def _show_reconnect_exhausted(self) -> None:
"""Handle successful reconnection."""
self.post_message(ServerConnected())
def _on_reconnect_exhausted(self) -> None:
"""Handle all reconnection attempts exhausted.""" """Handle all reconnection attempts exhausted."""
from tuimble.reconnect import MAX_RETRIES
status = self.query_one("#status", StatusBar) status = self.query_one("#status", StatusBar)
status.reconnecting = False status.reconnecting = False
chatlog = self.query_one("#chatlog", ChatLog) chatlog = self.query_one("#chatlog", ChatLog)
chatlog.write( chatlog.write(
f"[#f7768e]reconnection failed after " f"[#f7768e]reconnection failed after "
f"{RECONNECT_RETRIES} attempts[/]" f"{MAX_RETRIES} attempts[/]"
) )
chatlog.write("[dim]press F5 to retry manually[/dim]") chatlog.write("[dim]press F5 to retry manually[/dim]")
@work(thread=True)
def _start_reconnect(self) -> None:
"""Run the reconnect manager in a worker thread."""
self._reconnect.run()
def _cancel_reconnect(self) -> None: def _cancel_reconnect(self) -> None:
"""Cancel an in-progress reconnect loop.""" """Cancel an in-progress reconnect loop."""
self._reconnecting = False self._reconnect.cancel()
self._reconnect_attempt = 0
try: try:
status = self.query_one("#status", StatusBar) status = self.query_one("#status", StatusBar)
status.reconnecting = False status.reconnecting = False
@@ -606,15 +603,13 @@ class TuimbleApp(App):
tree = self.query_one("#sidebar", ChannelTree) tree = self.query_one("#sidebar", ChannelTree)
tree.clear_state() tree.clear_state()
if self._intentional_disconnect or self._reconnecting: if self._intentional_disconnect or self._reconnect.active:
return return
chatlog = self.query_one("#chatlog", ChatLog) chatlog = self.query_one("#chatlog", ChatLog)
chatlog.write("[#f7768e]\u2717 disconnected from server[/]") chatlog.write("[#f7768e]\u2717 disconnected from server[/]")
self._reconnecting = True self._start_reconnect()
self._reconnect_attempt = 0
self._reconnect_loop()
def on_text_message_received(self, msg: TextMessageReceived) -> None: def on_text_message_received(self, msg: TextMessageReceived) -> None:
chatlog = self.query_one("#chatlog", ChatLog) chatlog = self.query_one("#chatlog", ChatLog)
@@ -648,8 +643,7 @@ class TuimbleApp(App):
if not text: if not text:
return return
event.input.clear() event.input.clear()
self._history.append(text) self._history.push(text)
self._history_idx = -1
if not self._client.connected: if not self._client.connected:
self._show_error("not connected") self._show_error("not connected")
@@ -819,18 +813,10 @@ class TuimbleApp(App):
tree = self.query_one("#sidebar", ChannelTree) tree = self.query_one("#sidebar", ChannelTree)
tree.clear_state() tree.clear_state()
srv = new.server self._client = self._make_client(new.server)
self._client = MumbleClient(
host=srv.host,
port=srv.port,
username=srv.username,
password=srv.password,
certfile=srv.certfile,
keyfile=srv.keyfile,
)
self._config = new self._config = new
chatlog.write( chatlog.write(
f"[dim]reconnecting to {srv.host}:{srv.port}...[/]" f"[dim]reconnecting to {new.server.host}:{new.server.port}...[/]"
) )
self._connect_to_server() self._connect_to_server()
elif audio_hw_changed: elif audio_hw_changed:
@@ -854,20 +840,13 @@ class TuimbleApp(App):
Also serves as manual reconnect when disconnected. Also serves as manual reconnect when disconnected.
""" """
if self._reconnecting: if self._reconnect.active:
self._cancel_reconnect() self._cancel_reconnect()
if not self._client.connected: if not self._client.connected:
chatlog = self.query_one("#chatlog", ChatLog) chatlog = self.query_one("#chatlog", ChatLog)
self._client = self._make_client()
srv = self._config.server srv = self._config.server
self._client = MumbleClient(
host=srv.host,
port=srv.port,
username=srv.username,
password=srv.password,
certfile=srv.certfile,
keyfile=srv.keyfile,
)
chatlog.write( chatlog.write(
f"[dim]connecting to {srv.host}:{srv.port}...[/dim]" f"[dim]connecting to {srv.host}:{srv.port}...[/dim]"
) )
@@ -927,27 +906,12 @@ class TuimbleApp(App):
if isinstance(focused, Input) and event.key in ("up", "down"): if isinstance(focused, Input) and event.key in ("up", "down"):
inp = focused inp = focused
if event.key == "up": if event.key == "up":
if not self._history: val = self._history.up(inp.value)
event.prevent_default() else:
return val = self._history.down()
if self._history_idx == -1: if val is not None:
self._history_draft = inp.value inp.value = val
self._history_idx = len(self._history) - 1 inp.cursor_position = len(val)
elif self._history_idx > 0:
self._history_idx -= 1
inp.value = self._history[self._history_idx]
inp.cursor_position = len(inp.value)
else: # down
if self._history_idx == -1:
event.prevent_default()
return
if self._history_idx < len(self._history) - 1:
self._history_idx += 1
inp.value = self._history[self._history_idx]
else:
self._history_idx = -1
inp.value = self._history_draft
inp.cursor_position = len(inp.value)
event.prevent_default() event.prevent_default()
return return