diff --git a/src/tuimble/modulator.py b/src/tuimble/modulator.py new file mode 100644 index 0000000..e5a632b --- /dev/null +++ b/src/tuimble/modulator.py @@ -0,0 +1,134 @@ +"""Real-time pitch shifting for the capture path. + +Uses a numpy-only phase vocoder + linear resample. All heavy +operations (rfft, irfft, array arithmetic) release the GIL so +PortAudio callbacks are never starved. + +Stateful across frames: carries phase and input context across both +frame boundaries and parameter changes for glitch-free transitions. +""" + +from __future__ import annotations + +import numpy as np + +_N_FFT = 512 +_HOP = 128 +_N_FREQ = _N_FFT // 2 + 1 +_WINDOW = np.hanning(_N_FFT).astype(np.float32) +_PHASE_ADV = 2.0 * np.pi * _HOP * np.arange(_N_FREQ) / _N_FFT + + +class PitchShifter: + """Shift pitch of int16 PCM frames via phase vocoder + resample. + + Maintains inter-frame state (input overlap and synthesis phase) + so consecutive 20 ms frames produce a continuous output signal. + """ + + def __init__(self, sample_rate: int = 48000): + self._sample_rate = sample_rate + self._semitones = 0.0 + self._prev_in = np.zeros(_N_FFT, dtype=np.float32) + self._phase: np.ndarray | None = None + + @property + def semitones(self) -> float: + return self._semitones + + @semitones.setter + def semitones(self, value: float) -> None: + self._semitones = max(-12.0, min(12.0, float(value))) + + def process(self, pcm: bytes) -> bytes: + """Pitch-shift a single int16 PCM frame. + + Returns *pcm* unchanged when semitones == 0 or the frame is + too short to process. + """ + if self._semitones == 0.0 or len(pcm) < 2: + return pcm + + samples = np.frombuffer(pcm, dtype=np.int16).astype(np.float32) / 32768.0 + n = len(samples) + if n < _N_FFT: + return pcm + + ratio = 2.0 ** (self._semitones / 12.0) + + # Build continuous signal: previous context + current frame + # Previous context provides real samples instead of reflect + # padding, eliminating edge discontinuities. + y = np.concatenate([self._prev_in, samples]) + y_pad = np.pad(y, (0, _N_FFT // 2), mode="reflect") + self._prev_in = samples[-_N_FFT:].copy() + + # -- Vectorised STFT (one rfft call, GIL released) -- + n_frames = 1 + (len(y_pad) - _N_FFT) // _HOP + if n_frames < 2: + return pcm + + offsets = _HOP * np.arange(n_frames) + stft = np.fft.rfft( + y_pad[offsets[:, None] + np.arange(_N_FFT)] * _WINDOW, axis=1 + ) + + # -- Time-stretch interpolation -- + n_out = max(1, int(np.ceil(n_frames * ratio))) + src = np.minimum(np.arange(n_out) / ratio, n_frames - 1) + i0 = src.astype(int) + i1 = np.minimum(i0 + 1, n_frames - 1) + frac = (src - i0)[:, None] + + mag = (1 - frac) * np.abs(stft[i0]) + frac * np.abs(stft[i1]) + + # -- Phase propagation with inter-frame continuity -- + dphi = np.angle(stft[i1]) - np.angle(stft[i0]) - _PHASE_ADV + dphi -= 2.0 * np.pi * np.round(dphi / (2.0 * np.pi)) + increments = _PHASE_ADV + dphi + + phase = np.empty((n_out, _N_FREQ)) + if self._phase is not None: + # Continue from previous frame's final phase + phase[0] = self._phase + increments[0] + else: + phase[0] = np.angle(stft[0]) + if n_out > 1: + phase[1:] = phase[0] + np.cumsum(increments[1:], axis=0) + + # Carry phase (wrap to [-pi, pi] to avoid precision drift) + self._phase = (phase[-1] + np.pi) % (2.0 * np.pi) - np.pi + + # -- Vectorised ISTFT + overlap-add -- + frames = ( + np.fft.irfft(mag * np.exp(1j * phase), n=_N_FFT, axis=1).astype( + np.float32 + ) + * _WINDOW + ) + out_len = (n_out - 1) * _HOP + _N_FFT + output = np.zeros(out_len, dtype=np.float32) + for i in range(n_out): + output[i * _HOP : i * _HOP + _N_FFT] += frames[i] + + # Extract portion for current frame only. + # Left context (_N_FFT samples) maps to ~_N_FFT*ratio in + # the time-stretched output; skip that overlap region. + skip = int(round(_N_FFT * ratio)) + target = int(round(n * ratio)) + end = min(skip + target, len(output)) + stretched = output[skip:end] + + # Resample to original frame size (numpy, GIL-free) + if len(stretched) < 2: + return pcm + if len(stretched) != n: + x_old = np.linspace(0, 1, len(stretched), endpoint=False) + x_new = np.linspace(0, 1, n, endpoint=False) + stretched = np.interp(x_new, x_old, stretched) + + return ( + np.clip(stretched * 32768.0, -32768, 32767) + .astype(np.int16) + .tobytes() + )