""" LocalAI Client Module --------------------- This module provides a client for communicating with a LocalAI server running vibevoice.cpp for transcription and speaker diarization. It replaces the previous local Whisper + Pyannote pipeline by sending audio files to the /v1/audio/diarization endpoint and mapping the response into the same Transcript format used by the UI. For long audio files, it can chunk the input to avoid GPU OOM errors. Environment Variables: LOCALAI_API_URL: (required) Base URL of the LocalAI server (e.g., http://localhost:8080) LOCALAI_API_KEY: (optional) API key, if configured LOCALAI_MODEL: (optional) Model name to use (default: vibevoice-diarize) Chunking / long audio (all optional): LOCALAI_CHUNK_DURATION: Max duration of each chunk in seconds (default: 180.0) LOCALAI_CHUNK_OVERLAP: Overlap between consecutive chunks in seconds (default: 2.0) LOCALAI_MAX_SINGLE_REQUEST_DURATION: If audio duration exceeds this, chunking is enabled automatically (default: 300.0) """ import os import io import json import logging from typing import Dict, List, Any, Optional import httpx from .audio import get_audio_duration, split_audio_into_chunks logger = logging.getLogger("scraibe.localai_client") class LocalAIError(Exception): """Raised when the LocalAI API returns an error or unexpected response.""" pass class LocalAIClient: """ Thin HTTP client for LocalAI /v1/audio/diarization with vibevoice.cpp. Responsibilities: - Read configuration from environment. - Upload audio file as multipart/form-data. - Parse diarization + transcription response (verbose_json). - Map response into the same structure expected by Scraibe's Transcript. - For long audio: chunk, transcribe each chunk, merge results. """ # Default thresholds for chunking long audio to avoid GPU OOM. # These can be overridden via environment or at call time. DEFAULT_CHUNK_DURATION = 180.0 # seconds DEFAULT_CHUNK_OVERLAP = 2.0 # seconds def __init__( self, api_url: Optional[str] = None, api_key: Optional[str] = None, model: Optional[str] = None, timeout: float = 3600.0, ): """ Args: api_url: LocalAI server URL (e.g., http://localhost:8080). Falls back to LOCALAI_API_URL env var. api_key: API key, if required. Falls back to LOCALAI_API_KEY. model: Model name (e.g., vibevoice-diarize). Falls back to LOCALAI_MODEL or default. timeout: Request timeout in seconds. """ self.api_url = (api_url or os.getenv("LOCALAI_API_URL")).strip().rstrip("/") self.api_key = api_key or os.getenv("LOCALAI_API_KEY") or None self.model = model or os.getenv("LOCALAI_MODEL") or "vibevoice-diarize" self.timeout = timeout if not self.api_url: raise LocalAIError( "LOCALAI_API_URL is not set. " "Provide the LocalAI server URL via environment or constructor." ) logger.info( "Initializing LocalAIClient: url=%s model=%s", self.api_url, self.model, ) self._client = httpx.Client( base_url=self.api_url, timeout=self.timeout, follow_redirects=True, ) @staticmethod def _env_float(var: str, default: float) -> float: """ Read a float from environment with a fallback default. """ val = (os.getenv(var) or "").strip() if val == "": return default try: return float(val) except ValueError: logger.warning( "Invalid value for %s: %s; using default %s", var, val, default ) return default def _effective_chunk_duration(self, provided: Optional[float]) -> float: """ Resolve chunk_duration using this precedence: 1) provided argument 2) LOCALAI_CHUNK_DURATION env 3) class default """ if provided is not None: return provided return self._env_float("LOCALAI_CHUNK_DURATION", self.DEFAULT_CHUNK_DURATION) def _effective_chunk_overlap(self, provided: Optional[float]) -> float: """ Resolve chunk_overlap: 1) provided argument 2) LOCALAI_CHUNK_OVERLAP env 3) class default """ if provided is not None: return provided return self._env_float("LOCALAI_CHUNK_OVERLAP", self.DEFAULT_CHUNK_OVERLAP) def _effective_max_single_request_duration(self, provided: Optional[float]) -> float: """ Resolve max_single_request_duration: 1) provided argument 2) LOCALAI_MAX_SINGLE_REQUEST_DURATION env 3) default 300.0 """ if provided is not None: return provided return self._env_float("LOCALAI_MAX_SINGLE_REQUEST_DURATION", 300.0) def close(self): """Close the underlying HTTP client.""" self._client.close() def __del__(self): try: self._client.close() except Exception: pass def diarize_and_transcribe( self, audio_path: str, *, language: Optional[str] = None, num_speakers: Optional[int] = None, min_speakers: Optional[int] = None, max_speakers: Optional[int] = None, clustering_threshold: Optional[float] = None, min_duration_on: Optional[float] = None, min_duration_off: Optional[float] = None, response_format: Optional[str] = None, include_text: Optional[bool] = None, verbose: bool = False, return_raw: bool = False, use_chunking: Optional[bool] = None, chunk_duration: Optional[float] = None, chunk_overlap: Optional[float] = None, max_single_request_duration: Optional[float] = None, **_ignored, ) -> Dict[str, Any]: """ Send audio to LocalAI /v1/audio/diarization and return: - A normalized dict with segments, speakers, transcripts. - Optionally, the raw verbose_json response (for JSON export). For long audio, it can automatically chunk the file to avoid GPU OOM. Args: audio_path: Path to the audio file. language: Language hint, forwarded if set. num_speakers: Optional exact speaker count. min_speakers: Optional hint. max_speakers: Optional hint. clustering_threshold: Optional clustering threshold. min_duration_on: Optional min segment duration. min_duration_off: Optional min gap duration. response_format: "json", "verbose_json", or "rttm". Defaults to "verbose_json". include_text: Whether to request per-segment text. Defaults to True. verbose: If True, prints progress messages. return_raw: If True, also return the raw API response in 'raw_result'. use_chunking: Whether to enable chunking for long audio. If None, enabled automatically based on duration. chunk_duration: Max duration per chunk in seconds. Falls back to LOCALAI_CHUNK_DURATION env, then 180.0. chunk_overlap: Overlap between chunks in seconds. Falls back to LOCALAI_CHUNK_OVERLAP env, then 2.0. max_single_request_duration: If audio duration exceeds this, chunking is enabled (unless explicitly disabled). Falls back to LOCALAI_MAX_SINGLE_REQUEST_DURATION env, then 300.0. """ if verbose: print("Starting diarization and transcription via LocalAI.") logger.info("diarize_and_transcribe requested for: %s", audio_path) # Resolve chunking parameters with environment support chunk_duration = self._effective_chunk_duration(chunk_duration) chunk_overlap = self._effective_chunk_overlap(chunk_overlap) max_single = self._effective_max_single_request_duration(max_single_request_duration) if use_chunking is None: try: duration = get_audio_duration(audio_path) except RuntimeError: duration = None use_chunking = (duration is not None and duration > max_single) logger.info( "Auto-chunking decision: duration=%s, threshold=%s, use_chunking=%s", duration, max_single, use_chunking, ) if use_chunking: return self._diarize_and_transcribe_chunked( audio_path=audio_path, language=language, num_speakers=num_speakers, min_speakers=min_speakers, max_speakers=max_speakers, clustering_threshold=clustering_threshold, min_duration_on=min_duration_on, min_duration_off=min_duration_off, response_format=response_format, include_text=include_text, verbose=verbose, return_raw=return_raw, chunk_duration=chunk_duration, chunk_overlap=chunk_overlap, ) # Single-request path (existing behavior) return self._diarize_and_transcribe_single( audio_path=audio_path, language=language, num_speakers=num_speakers, min_speakers=min_speakers, max_speakers=max_speakers, clustering_threshold=clustering_threshold, min_duration_on=min_duration_on, min_duration_off=min_duration_off, response_format=response_format, include_text=include_text, verbose=verbose, return_raw=return_raw, ) def _diarize_and_transcribe_single( self, audio_path: str, *, language: Optional[str] = None, num_speakers: Optional[int] = None, min_speakers: Optional[int] = None, max_speakers: Optional[int] = None, clustering_threshold: Optional[float] = None, min_duration_on: Optional[float] = None, min_duration_off: Optional[float] = None, response_format: Optional[str] = None, include_text: Optional[bool] = None, verbose: bool = False, return_raw: bool = False, ) -> Dict[str, Any]: """ Internal: single-request diarization and transcription. """ if verbose: print("Starting diarization and transcription via LocalAI.") logger.info("diarize_and_transcribe requested for: %s", audio_path) # Always use verbose_json for diarization + speaker info if response_format is None: response_format = "verbose_json" if include_text is None: include_text = True # Prepare form data data = { "model": self.model, "response_format": response_format, "include_text": str(include_text).lower(), } if language is not None: data["language"] = language if num_speakers is not None: data["num_speakers"] = str(num_speakers) if min_speakers is not None: data["min_speakers"] = str(min_speakers) if max_speakers is not None: data["max_speakers"] = str(max_speakers) if clustering_threshold is not None: data["clustering_threshold"] = str(clustering_threshold) if min_duration_on is not None: data["min_duration_on"] = str(min_duration_on) if min_duration_off is not None: data["min_duration_off"] = str(min_duration_off) logger.debug("LocalAI request params: %s", data) # Open file if not os.path.exists(audio_path): raise LocalAIError(f"Audio file not found: {audio_path}") with open(audio_path, "rb") as f: files = { "file": (os.path.basename(audio_path), f, "application/octet-stream") } headers = {} if self.api_key: headers["Authorization"] = f"Bearer {self.api_key}" # POST /v1/audio/diarization logger.info("Sending request to LocalAI: /v1/audio/diarization") resp = self._client.post( "/v1/audio/diarization", data=data, files=files, headers=headers, ) logger.info("LocalAI response status: %d", resp.status_code) if resp.status_code >= 400: body = resp.text logger.error("LocalAI error response: %s", body) raise LocalAIError( f"LocalAI request failed with status {resp.status_code}: {body}" ) try: raw_result = resp.json() except json.JSONDecodeError: logger.error("Failed to parse LocalAI response as JSON.") raise LocalAIError( "Failed to parse LocalAI response as JSON." ) if verbose: print("Diarization and transcription finished. Starting post-processing.") parsed = self._parse_diarization_response(raw_result) if return_raw: parsed["raw_result"] = raw_result return parsed def _diarize_and_transcribe_chunked( self, audio_path: str, *, language: Optional[str] = None, num_speakers: Optional[int] = None, min_speakers: Optional[int] = None, max_speakers: Optional[int] = None, clustering_threshold: Optional[float] = None, min_duration_on: Optional[float] = None, min_duration_off: Optional[float] = None, response_format: Optional[str] = None, include_text: Optional[bool] = None, verbose: bool = False, return_raw: bool = False, chunk_duration: float = DEFAULT_CHUNK_DURATION, chunk_overlap: float = DEFAULT_CHUNK_OVERLAP, ) -> Dict[str, Any]: """ Internal: chunked diarization and transcription for long audio. - Splits audio into overlapping chunks. - Transcribes each chunk via /v1/audio/diarization. - Merges segments with adjusted timestamps. """ if verbose: print("Audio is long; splitting into chunks to avoid GPU memory issues.") logger.info( "Chunked transcription: chunk_duration=%s, overlap=%s", chunk_duration, chunk_overlap, ) chunks = split_audio_into_chunks( input_path=audio_path, max_duration=chunk_duration, overlap=chunk_overlap, ) if len(chunks) == 1: # No actual split needed; fall back to single-request path return self._diarize_and_transcribe_single( audio_path=chunks[0]["path"], language=language, num_speakers=num_speakers, min_speakers=min_speakers, max_speakers=max_speakers, clustering_threshold=clustering_threshold, min_duration_on=min_duration_on, min_duration_off=min_duration_off, response_format=response_format, include_text=include_text, verbose=verbose, return_raw=return_raw, ) all_segments: List[List[float]] = [] all_speakers: List[str] = [] all_transcripts: List[str] = [] raw_results: List[Dict[str, Any]] = [] temp_files = [c["path"] for c in chunks] try: for i, chunk_info in enumerate(chunks): chunk_path = chunk_info["path"] chunk_start = chunk_info["start"] if verbose: print( f"Transcribing chunk {i+1}/{len(chunks)} " f"(start={chunk_start:.1f}s)" ) logger.info( "Transcribing chunk %d/%d, start=%.1f", i + 1, len(chunks), chunk_start ) # Use single-request logic for each chunk chunk_result = self._diarize_and_transcribe_single( audio_path=chunk_path, language=language, num_speakers=num_speakers, min_speakers=min_speakers, max_speakers=max_speakers, clustering_threshold=clustering_threshold, min_duration_on=min_duration_on, min_duration_off=min_duration_off, response_format=response_format, include_text=include_text, verbose=False, return_raw=return_raw, ) segs = chunk_result.get("segments", []) spks = chunk_result.get("speakers", []) txts = chunk_result.get("transcripts", []) raw = chunk_result.get("raw_result") # Adjust timestamps to global timeline adjusted_segs = [] for seg, sp, txt in zip(segs, spks, txts): start = float(seg[0]) + chunk_start end = float(seg[1]) + chunk_start adjusted_segs.append([start, end]) all_speakers.append(sp) all_transcripts.append(txt) all_segments.extend(adjusted_segs) if return_raw and raw is not None: raw_results.append(raw) finally: # Clean up temporary chunk files for path in temp_files: if path and os.path.exists(path) and path != audio_path: try: os.remove(path) except Exception as e: logger.warning("Failed to remove chunk file %s: %s", path, e) # Sort segments by start time combined = list(zip(all_segments, all_speakers, all_transcripts)) combined.sort(key=lambda x: x[0][0]) all_segments = [x[0] for x in combined] all_speakers = [x[1] for x in combined] all_transcripts = [x[2] for x in combined] if verbose: print( f"Chunked transcription complete. Total segments: {len(all_segments)}" ) result = { "segments": all_segments, "speakers": all_speakers, "transcripts": all_transcripts, } if return_raw and raw_results: result["raw_result"] = { "chunked": True, "chunks": raw_results, } return result def _parse_diarization_response(self, result: Dict[str, Any]) -> Dict[str, Any]: """ Convert LocalAI verbose_json response into the internal format used by Scraibe: { "segments": [ [start, end], ... ], "speakers": [ "SPEAKER_00", ... ], "transcripts": [ "text for segment", ... ] } """ segments = result.get("segments", []) if not segments: logger.warning("LocalAI returned no segments.") return { "segments": [], "speakers": [], "transcripts": [], } out_segments = [] out_speakers = [] out_transcripts = [] for seg in segments: start = float(seg.get("start", 0.0)) end = float(seg.get("end", 0.0)) speaker = seg.get("speaker", "SPEAKER_00") text = seg.get("text", "").strip() out_segments.append([start, end]) out_speakers.append(speaker) out_transcripts.append(text) logger.info( "Parsed %d segments from LocalAI.", len(out_segments), ) return { "segments": out_segments, "speakers": out_speakers, "transcripts": out_transcripts, }