fix lint and formatting violations in tests and source
This commit is contained in:
@@ -24,7 +24,7 @@ log = logging.getLogger(__name__)
|
||||
|
||||
VOLUME_STEPS = (0.0, 0.25, 0.5, 0.75, 1.0, 1.25, 1.5, 2.0)
|
||||
|
||||
TREE_DEBOUNCE = 0.1 # seconds to coalesce state changes
|
||||
TREE_DEBOUNCE = 0.1 # seconds to coalesce state changes
|
||||
|
||||
|
||||
class InputHistory:
|
||||
@@ -212,8 +212,11 @@ class ChannelTree(Static):
|
||||
return
|
||||
order.append(channel_id)
|
||||
children = sorted(
|
||||
(c for c in self._channels.values()
|
||||
if c.parent_id == channel_id and c.channel_id != channel_id),
|
||||
(
|
||||
c
|
||||
for c in self._channels.values()
|
||||
if c.parent_id == channel_id and c.channel_id != channel_id
|
||||
),
|
||||
key=lambda c: c.name,
|
||||
)
|
||||
for child in children:
|
||||
@@ -289,13 +292,9 @@ class ChannelTree(Static):
|
||||
name = self._truncate(ch.name, name_max)
|
||||
|
||||
if is_focused:
|
||||
lines.append(
|
||||
f"{prefix}{branch} {marker}[reverse bold]{name}[/]"
|
||||
)
|
||||
lines.append(f"{prefix}{branch} {marker}[reverse bold]{name}[/]")
|
||||
elif is_current:
|
||||
lines.append(
|
||||
f"{prefix}{branch} {marker}[bold #9ece6a]{name}[/]"
|
||||
)
|
||||
lines.append(f"{prefix}{branch} {marker}[bold #9ece6a]{name}[/]")
|
||||
else:
|
||||
lines.append(f"{prefix}{branch} {marker}[bold]{name}[/]")
|
||||
|
||||
@@ -315,9 +314,7 @@ class ChannelTree(Static):
|
||||
bullet = "\u2514\u2500" if is_last_item else "\u251c\u2500"
|
||||
uname = self._truncate(user.name, user_max)
|
||||
status = self._user_status(user)
|
||||
lines.append(
|
||||
f"{sub_prefix}{bullet} [#7aa2f7]{uname}[/]{status}"
|
||||
)
|
||||
lines.append(f"{sub_prefix}{bullet} [#7aa2f7]{uname}[/]{status}")
|
||||
|
||||
for i, child in enumerate(children):
|
||||
self._render_tree(
|
||||
@@ -337,9 +334,7 @@ class ChannelTree(Static):
|
||||
event.prevent_default()
|
||||
event.stop()
|
||||
elif event.key == "down":
|
||||
self._focused_idx = min(
|
||||
len(self._channel_ids) - 1, self._focused_idx + 1
|
||||
)
|
||||
self._focused_idx = min(len(self._channel_ids) - 1, self._focused_idx + 1)
|
||||
self.refresh()
|
||||
event.prevent_default()
|
||||
event.stop()
|
||||
@@ -419,9 +414,7 @@ class TuimbleApp(App):
|
||||
def __init__(self):
|
||||
super().__init__()
|
||||
self._config: Config = load_config()
|
||||
self._ptt = detect_backend(
|
||||
self._on_ptt_change, self._config.ptt.backend
|
||||
)
|
||||
self._ptt = detect_backend(self._on_ptt_change, self._config.ptt.backend)
|
||||
self._client = self._make_client()
|
||||
self._history = InputHistory()
|
||||
acfg = self._config.audio
|
||||
@@ -545,8 +538,7 @@ class TuimbleApp(App):
|
||||
status.reconnecting = True
|
||||
chatlog = self.query_one("#chatlog", ChatLog)
|
||||
chatlog.write(
|
||||
f"[dim]reconnecting in {delay}s "
|
||||
f"(attempt {attempt}/{MAX_RETRIES})...[/dim]"
|
||||
f"[dim]reconnecting in {delay}s (attempt {attempt}/{MAX_RETRIES})...[/dim]"
|
||||
)
|
||||
|
||||
def _show_reconnect_exhausted(self) -> None:
|
||||
@@ -556,10 +548,7 @@ class TuimbleApp(App):
|
||||
status = self.query_one("#status", StatusBar)
|
||||
status.reconnecting = False
|
||||
chatlog = self.query_one("#chatlog", ChatLog)
|
||||
chatlog.write(
|
||||
f"[#f7768e]reconnection failed after "
|
||||
f"{MAX_RETRIES} attempts[/]"
|
||||
)
|
||||
chatlog.write(f"[#f7768e]reconnection failed after {MAX_RETRIES} attempts[/]")
|
||||
chatlog.write("[dim]press F5 to retry manually[/dim]")
|
||||
|
||||
@work(thread=True)
|
||||
@@ -588,9 +577,7 @@ class TuimbleApp(App):
|
||||
status.server_info = f"{srv.host}:{srv.port}"
|
||||
|
||||
chatlog = self.query_one("#chatlog", ChatLog)
|
||||
chatlog.write(
|
||||
f"[#9ece6a]\u2713 connected as {self._config.server.username}[/]"
|
||||
)
|
||||
chatlog.write(f"[#9ece6a]\u2713 connected as {self._config.server.username}[/]")
|
||||
self._refresh_channel_tree()
|
||||
self._start_audio()
|
||||
|
||||
@@ -622,7 +609,8 @@ class TuimbleApp(App):
|
||||
if self._tree_refresh_timer is not None:
|
||||
self._tree_refresh_timer.stop()
|
||||
self._tree_refresh_timer = self.set_timer(
|
||||
TREE_DEBOUNCE, self._refresh_channel_tree,
|
||||
TREE_DEBOUNCE,
|
||||
self._refresh_channel_tree,
|
||||
)
|
||||
|
||||
def on_channel_selected(self, msg: ChannelSelected) -> None:
|
||||
@@ -652,9 +640,7 @@ class TuimbleApp(App):
|
||||
|
||||
self._client.send_text(text)
|
||||
chatlog = self.query_one("#chatlog", ChatLog)
|
||||
chatlog.write(
|
||||
f"[#e0af68]{self._config.server.username}[/] {text}"
|
||||
)
|
||||
chatlog.write(f"[#e0af68]{self._config.server.username}[/] {text}")
|
||||
|
||||
# -- audio ---------------------------------------------------------------
|
||||
|
||||
@@ -732,7 +718,9 @@ class TuimbleApp(App):
|
||||
# -- config reload --------------------------------------------------------
|
||||
|
||||
def _detect_config_changes(
|
||||
self, old: Config, new: Config,
|
||||
self,
|
||||
old: Config,
|
||||
new: Config,
|
||||
) -> tuple[list[str], list[str]]:
|
||||
"""Compare configs, return (safe_changes, restart_changes)."""
|
||||
safe: list[str] = []
|
||||
@@ -769,9 +757,7 @@ class TuimbleApp(App):
|
||||
def _apply_safe_changes(self, new: Config) -> None:
|
||||
"""Apply hot-reload-safe config changes immediately."""
|
||||
self._config.ptt = new.ptt
|
||||
self._ptt = detect_backend(
|
||||
self._on_ptt_change, new.ptt.backend
|
||||
)
|
||||
self._ptt = detect_backend(self._on_ptt_change, new.ptt.backend)
|
||||
|
||||
self._audio.input_gain = new.audio.input_gain
|
||||
self._audio.output_gain = new.audio.output_gain
|
||||
@@ -784,8 +770,8 @@ class TuimbleApp(App):
|
||||
chatlog = self.query_one("#chatlog", ChatLog)
|
||||
old = self._config
|
||||
|
||||
server_changed = (
|
||||
dataclasses.asdict(old.server) != dataclasses.asdict(new.server)
|
||||
server_changed = dataclasses.asdict(old.server) != dataclasses.asdict(
|
||||
new.server
|
||||
)
|
||||
audio_hw_changed = any(
|
||||
getattr(old.audio, a) != getattr(new.audio, a)
|
||||
@@ -835,9 +821,7 @@ class TuimbleApp(App):
|
||||
chatlog = self.query_one("#chatlog", ChatLog)
|
||||
self._client = self._make_client()
|
||||
srv = self._config.server
|
||||
chatlog.write(
|
||||
f"[dim]connecting to {srv.host}:{srv.port}...[/dim]"
|
||||
)
|
||||
chatlog.write(f"[dim]connecting to {srv.host}:{srv.port}...[/dim]")
|
||||
self._intentional_disconnect = False
|
||||
self._connect_to_server()
|
||||
return
|
||||
@@ -873,10 +857,7 @@ class TuimbleApp(App):
|
||||
if restart:
|
||||
for change in restart:
|
||||
chatlog.write(f"[#e0af68]\u26a0 {change}[/]")
|
||||
chatlog.write(
|
||||
"[dim]press F5 again to apply, "
|
||||
"or any key to cancel[/dim]"
|
||||
)
|
||||
chatlog.write("[dim]press F5 again to apply, or any key to cancel[/dim]")
|
||||
self._pending_reload = new
|
||||
else:
|
||||
chatlog.write("[dim]\u2713 config reloaded[/dim]")
|
||||
|
||||
@@ -198,7 +198,8 @@ class MumbleClient:
|
||||
except (socket.error, OSError) as exc:
|
||||
self._connected = False
|
||||
raise ConnectionFailed(
|
||||
f"network error: {exc}", retryable=True,
|
||||
f"network error: {exc}",
|
||||
retryable=True,
|
||||
) from exc
|
||||
except Exception as exc:
|
||||
self._connected = False
|
||||
@@ -207,13 +208,16 @@ class MumbleClient:
|
||||
if self._mumble.connected != const.PYMUMBLE_CONN_STATE_CONNECTED:
|
||||
self._connected = False
|
||||
raise ConnectionFailed(
|
||||
"server rejected connection", retryable=False,
|
||||
"server rejected connection",
|
||||
retryable=False,
|
||||
)
|
||||
|
||||
self._connected = True
|
||||
log.info(
|
||||
"connected to %s:%d as %s",
|
||||
self._host, self._port, self._username,
|
||||
self._host,
|
||||
self._port,
|
||||
self._username,
|
||||
)
|
||||
|
||||
def disconnect(self) -> None:
|
||||
|
||||
@@ -2,7 +2,7 @@
|
||||
|
||||
import struct
|
||||
|
||||
from tuimble.audio import FRAME_SIZE, SAMPLE_RATE, AudioPipeline, _apply_gain
|
||||
from tuimble.audio import FRAME_SIZE, AudioPipeline, _apply_gain
|
||||
|
||||
|
||||
def test_default_construction():
|
||||
@@ -14,8 +14,9 @@ def test_default_construction():
|
||||
|
||||
|
||||
def test_custom_construction():
|
||||
ap = AudioPipeline(sample_rate=24000, frame_size=480,
|
||||
input_device=1, output_device=2)
|
||||
ap = AudioPipeline(
|
||||
sample_rate=24000, frame_size=480, input_device=1, output_device=2
|
||||
)
|
||||
# Verify via public behavior: get_capture_frame returns None
|
||||
assert ap.get_capture_frame() is None
|
||||
|
||||
|
||||
@@ -1,8 +1,12 @@
|
||||
"""Tests for configuration module."""
|
||||
|
||||
from tuimble.config import (
|
||||
AudioConfig, Config, PttConfig, ServerConfig,
|
||||
_load_section, load_config,
|
||||
AudioConfig,
|
||||
Config,
|
||||
PttConfig,
|
||||
ServerConfig,
|
||||
_load_section,
|
||||
load_config,
|
||||
)
|
||||
|
||||
|
||||
@@ -55,11 +59,14 @@ def test_server_cert_custom():
|
||||
|
||||
def test_load_section_filters_unknown_keys():
|
||||
"""Unknown keys are silently dropped, valid keys are kept."""
|
||||
result = _load_section(ServerConfig, {
|
||||
"host": "example.com",
|
||||
"typo_field": "oops",
|
||||
"another_bad": 42,
|
||||
})
|
||||
result = _load_section(
|
||||
ServerConfig,
|
||||
{
|
||||
"host": "example.com",
|
||||
"typo_field": "oops",
|
||||
"another_bad": 42,
|
||||
},
|
||||
)
|
||||
assert result.host == "example.com"
|
||||
assert result.port == 64738 # default preserved
|
||||
|
||||
@@ -86,8 +93,7 @@ def test_load_config_with_unknown_keys(tmp_path):
|
||||
"""Config file with unknown keys loads without error."""
|
||||
toml = tmp_path / "config.toml"
|
||||
toml.write_text(
|
||||
'[server]\nhost = "example.com"\nbogus = true\n'
|
||||
'[ptt]\nfuture_option = "x"\n'
|
||||
'[server]\nhost = "example.com"\nbogus = true\n[ptt]\nfuture_option = "x"\n'
|
||||
)
|
||||
cfg = load_config(toml)
|
||||
assert cfg.server.host == "example.com"
|
||||
|
||||
@@ -2,7 +2,7 @@
|
||||
|
||||
import threading
|
||||
|
||||
from tuimble.reconnect import INITIAL_DELAY, MAX_RETRIES, ReconnectManager
|
||||
from tuimble.reconnect import INITIAL_DELAY, ReconnectManager
|
||||
|
||||
|
||||
def _make_manager(connect_fn=None, **overrides):
|
||||
@@ -22,7 +22,9 @@ def _make_manager(connect_fn=None, **overrides):
|
||||
log["exhausted"] += 1
|
||||
|
||||
if connect_fn is None:
|
||||
connect_fn = lambda: None
|
||||
|
||||
def connect_fn():
|
||||
return None
|
||||
|
||||
mgr = ReconnectManager(
|
||||
connect_fn=connect_fn,
|
||||
@@ -67,6 +69,7 @@ def test_success_after_failures():
|
||||
mgr, log = _make_manager(connect_fn=flaky_connect)
|
||||
# Patch delay to zero for test speed
|
||||
import tuimble.reconnect as mod
|
||||
|
||||
orig = mod.INITIAL_DELAY
|
||||
mod.INITIAL_DELAY = 0
|
||||
try:
|
||||
@@ -88,7 +91,10 @@ def test_non_retryable_aborts_immediately():
|
||||
class Rejected(Exception):
|
||||
retryable = False
|
||||
|
||||
mgr, log = _make_manager(connect_fn=lambda: (_ for _ in ()).throw(Rejected("banned")))
|
||||
def _raise():
|
||||
raise Rejected("banned")
|
||||
|
||||
mgr, log = _make_manager(connect_fn=_raise)
|
||||
mgr.run()
|
||||
assert log["exhausted"] == 1
|
||||
assert log["success"] == 0
|
||||
@@ -102,6 +108,7 @@ def test_non_retryable_aborts_immediately():
|
||||
def test_exhaustion_after_max_retries():
|
||||
"""Loop stops after MAX_RETRIES failed attempts."""
|
||||
import tuimble.reconnect as mod
|
||||
|
||||
orig_delay = mod.INITIAL_DELAY
|
||||
orig_retries = mod.MAX_RETRIES
|
||||
mod.INITIAL_DELAY = 0
|
||||
@@ -155,6 +162,7 @@ def test_backoff_delays():
|
||||
# We only need the attempt callback to record delays; cancel after
|
||||
# a few attempts to avoid waiting.
|
||||
import tuimble.reconnect as mod
|
||||
|
||||
orig_delay = mod.INITIAL_DELAY
|
||||
orig_retries = mod.MAX_RETRIES
|
||||
mod.INITIAL_DELAY = 0 # zero delay for speed
|
||||
|
||||
Reference in New Issue
Block a user