""" Audio Processor Module ======================= Simplified audio processor for ScrAIbe. Previously this used torch and pyannote-style processing. In the LocalAI-backed version, we primarily pass files to the API, but we keep a lightweight helper for backward compatibility. Now also includes utilities for chunking long audio into smaller segments to avoid GPU memory limits when using vibevoice-cpp on LocalAI. """ import json import os import tempfile from subprocess import CalledProcessError, run import numpy as np SAMPLE_RATE = 16000 NORMALIZATION_FACTOR = 32768.0 DEFAULT_CHUNK_DURATION = 180.0 # seconds DEFAULT_CHUNK_OVERLAP = 2.0 # seconds class AudioProcessor: """ Lightweight audio processor for loading and cutting audio. Attributes: waveform (np.ndarray): The audio waveform as float32. sr (int): The sample rate of the audio. """ def __init__(self, waveform: np.ndarray, sr: int = SAMPLE_RATE): self.waveform = waveform self.sr = sr if not isinstance(self.sr, int): raise ValueError( "Sample rate should be a single value of type int, " f"not {len(self.sr)} and type {type(self.sr)}" ) @classmethod def from_file(cls, file: str, *args, **kwargs): """ Create an AudioProcessor instance from an audio file. Args: file (str): The audio file path. Returns: AudioProcessor: Instance with loaded audio. """ audio, sr = cls.load_audio(file, *args, **kwargs) return cls(audio, sr) def cut(self, start: float, end: float) -> np.ndarray: """ Cut a segment from the audio waveform. Args: start (float): Start time in seconds. end (float): End time in seconds. Returns: np.ndarray: The cut waveform segment. """ start_idx = int(start * self.sr) end_idx = int(np.ceil(end * self.sr)) return self.waveform[start_idx:end_idx] @staticmethod def load_audio(file: str, sr: int = SAMPLE_RATE): """ Load an audio file as a mono waveform, resampling if necessary. Requires ffmpeg in PATH. Args: file (str): The audio file to open. sr (int, optional): The desired sample rate. Returns: tuple: (waveform as np.ndarray[float32], sample rate) Raises: RuntimeError: If failed to load audio. """ cmd = [ "ffmpeg", "-nostdin", "-threads", "0", "-i", file, "-f", "s16le", "-ac", "1", "-acodec", "pcm_s16le", "-ar", str(sr), "-", ] try: out = run(cmd, capture_output=True, check=True).stdout except CalledProcessError as e: raise RuntimeError( f"Failed to load audio: {e.stderr.decode()}" ) from e waveform = np.frombuffer(out, np.int16).flatten().astype( np.float32 ) / NORMALIZATION_FACTOR return waveform, sr def __repr__(self) -> str: return f"AudioProcessor(waveform_len={len(self.waveform)}, sr={self.sr})" def get_audio_duration(file_path: str) -> float: """ Get the duration of an audio file in seconds using ffprobe. Args: file_path: Path to the audio file. Returns: Duration in seconds as a float. Raises: RuntimeError: If ffprobe fails. """ cmd = [ "ffprobe", "-v", "error", "-show_entries", "format=duration", "-of", "json", file_path, ] try: result = run(cmd, capture_output=True, text=True, check=True) data = json.loads(result.stdout) return float(data["format"]["duration"]) except (CalledProcessError, json.JSONDecodeError, KeyError) as e: raise RuntimeError(f"Failed to get audio duration for {file_path}: {e}") def split_audio_into_chunks( input_path: str, max_duration: float = DEFAULT_CHUNK_DURATION, overlap: float = DEFAULT_CHUNK_OVERLAP, output_format: str = "wav", sample_rate: int = 24000, ) -> list: """ Split a long audio file into overlapping chunks using ffmpeg. Args: input_path: Path to the input audio file. max_duration: Maximum duration of each chunk in seconds. overlap: Overlap duration in seconds between consecutive chunks. output_format: Output format (e.g., 'wav'). sample_rate: Sample rate for output chunks. Returns: List of dicts: [{"path": "chunk.wav", "start": 0.0, "end": 180.0}, ...] Files must be cleaned up by the caller. """ duration = get_audio_duration(input_path) # If file is shorter than max_duration, no need to split if duration <= max_duration: return [{"path": input_path, "start": 0.0, "end": duration}] chunks = [] start = 0.0 chunk_id = 0 while start < duration: chunk_end = min(start + max_duration, duration) chunk_duration = chunk_end - start tmp = tempfile.NamedTemporaryFile( delete=False, suffix=f".{output_format}", prefix="scraibe_chunk_", ) chunk_path = tmp.name tmp.close() cmd = [ "ffmpeg", "-y", "-nostdin", "-ss", str(start), "-i", input_path, "-t", str(chunk_duration), "-ar", str(sample_rate), "-ac", "1", "-c:a", "pcm_s16le", chunk_path, ] try: run(cmd, capture_output=True, check=True) except CalledProcessError as e: # Clean up on error if os.path.exists(chunk_path): os.remove(chunk_path) raise RuntimeError( f"Failed to create audio chunk {chunk_id} for {input_path}: {e.stderr.decode()}" ) chunks.append({ "path": chunk_path, "start": start, "end": chunk_end, }) start += max_duration - overlap chunk_id += 1 return chunks