modulator: remove state reset on pitch change

The semitones setter called _reset() which zeroed _prev_in and
discarded _phase on every pitch adjustment.  Both buffers are
independent of the pitch ratio — the vocoder recomputes time-stretch
positions each frame — so the reset produced hard discontinuities
(clicks) and a thread-safety race with no benefit.
This commit is contained in:
Username
2026-02-28 13:53:29 +01:00
parent 76c9c494a7
commit b62993459a

134
src/tuimble/modulator.py Normal file
View File

@@ -0,0 +1,134 @@
"""Real-time pitch shifting for the capture path.
Uses a numpy-only phase vocoder + linear resample. All heavy
operations (rfft, irfft, array arithmetic) release the GIL so
PortAudio callbacks are never starved.
Stateful across frames: carries phase and input context across both
frame boundaries and parameter changes for glitch-free transitions.
"""
from __future__ import annotations
import numpy as np
_N_FFT = 512
_HOP = 128
_N_FREQ = _N_FFT // 2 + 1
_WINDOW = np.hanning(_N_FFT).astype(np.float32)
_PHASE_ADV = 2.0 * np.pi * _HOP * np.arange(_N_FREQ) / _N_FFT
class PitchShifter:
"""Shift pitch of int16 PCM frames via phase vocoder + resample.
Maintains inter-frame state (input overlap and synthesis phase)
so consecutive 20 ms frames produce a continuous output signal.
"""
def __init__(self, sample_rate: int = 48000):
self._sample_rate = sample_rate
self._semitones = 0.0
self._prev_in = np.zeros(_N_FFT, dtype=np.float32)
self._phase: np.ndarray | None = None
@property
def semitones(self) -> float:
return self._semitones
@semitones.setter
def semitones(self, value: float) -> None:
self._semitones = max(-12.0, min(12.0, float(value)))
def process(self, pcm: bytes) -> bytes:
"""Pitch-shift a single int16 PCM frame.
Returns *pcm* unchanged when semitones == 0 or the frame is
too short to process.
"""
if self._semitones == 0.0 or len(pcm) < 2:
return pcm
samples = np.frombuffer(pcm, dtype=np.int16).astype(np.float32) / 32768.0
n = len(samples)
if n < _N_FFT:
return pcm
ratio = 2.0 ** (self._semitones / 12.0)
# Build continuous signal: previous context + current frame
# Previous context provides real samples instead of reflect
# padding, eliminating edge discontinuities.
y = np.concatenate([self._prev_in, samples])
y_pad = np.pad(y, (0, _N_FFT // 2), mode="reflect")
self._prev_in = samples[-_N_FFT:].copy()
# -- Vectorised STFT (one rfft call, GIL released) --
n_frames = 1 + (len(y_pad) - _N_FFT) // _HOP
if n_frames < 2:
return pcm
offsets = _HOP * np.arange(n_frames)
stft = np.fft.rfft(
y_pad[offsets[:, None] + np.arange(_N_FFT)] * _WINDOW, axis=1
)
# -- Time-stretch interpolation --
n_out = max(1, int(np.ceil(n_frames * ratio)))
src = np.minimum(np.arange(n_out) / ratio, n_frames - 1)
i0 = src.astype(int)
i1 = np.minimum(i0 + 1, n_frames - 1)
frac = (src - i0)[:, None]
mag = (1 - frac) * np.abs(stft[i0]) + frac * np.abs(stft[i1])
# -- Phase propagation with inter-frame continuity --
dphi = np.angle(stft[i1]) - np.angle(stft[i0]) - _PHASE_ADV
dphi -= 2.0 * np.pi * np.round(dphi / (2.0 * np.pi))
increments = _PHASE_ADV + dphi
phase = np.empty((n_out, _N_FREQ))
if self._phase is not None:
# Continue from previous frame's final phase
phase[0] = self._phase + increments[0]
else:
phase[0] = np.angle(stft[0])
if n_out > 1:
phase[1:] = phase[0] + np.cumsum(increments[1:], axis=0)
# Carry phase (wrap to [-pi, pi] to avoid precision drift)
self._phase = (phase[-1] + np.pi) % (2.0 * np.pi) - np.pi
# -- Vectorised ISTFT + overlap-add --
frames = (
np.fft.irfft(mag * np.exp(1j * phase), n=_N_FFT, axis=1).astype(
np.float32
)
* _WINDOW
)
out_len = (n_out - 1) * _HOP + _N_FFT
output = np.zeros(out_len, dtype=np.float32)
for i in range(n_out):
output[i * _HOP : i * _HOP + _N_FFT] += frames[i]
# Extract portion for current frame only.
# Left context (_N_FFT samples) maps to ~_N_FFT*ratio in
# the time-stretched output; skip that overlap region.
skip = int(round(_N_FFT * ratio))
target = int(round(n * ratio))
end = min(skip + target, len(output))
stretched = output[skip:end]
# Resample to original frame size (numpy, GIL-free)
if len(stretched) < 2:
return pcm
if len(stretched) != n:
x_old = np.linspace(0, 1, len(stretched), endpoint=False)
x_new = np.linspace(0, 1, n, endpoint=False)
stretched = np.interp(x_new, x_old, stretched)
return (
np.clip(stretched * 32768.0, -32768, 32767)
.astype(np.int16)
.tobytes()
)