Files
scribe/scraibe/localai_client.py
T
admin 6640bc050d
Mirror and run GitLab CI / build (push) Waiting to run
Ruff / ruff (push) Waiting to run
feat: add chunked ASR for long audio with env-configurable chunk duration
- Integrate chunking into LocalAI client to avoid GPU OOM on long audio.
- Split long files into overlapping chunks; transcribe each chunk; merge segments with corrected timestamps.
- Auto-enable chunking when audio duration > LOCALAI_MAX_SINGLE_REQUEST_DURATION (default 300s).
- Add env variables:
    LOCALAI_CHUNK_DURATION (default 180)
    LOCALAI_CHUNK_OVERLAP (default 2)
    LOCALAI_MAX_SINGLE_REQUEST_DURATION (default 300)
- Add unit and integration tests for chunking logic.
- Confirmed working end-to-end with vibevoice-cpp-asr on 88-minute file.
2026-06-18 17:46:29 +00:00

567 lines
20 KiB
Python

"""
LocalAI Client Module
---------------------
This module provides a client for communicating with a LocalAI server
running vibevoice.cpp for transcription and speaker diarization.
It replaces the previous local Whisper + Pyannote pipeline by sending
audio files to the /v1/audio/diarization endpoint and mapping the
response into the same Transcript format used by the UI.
For long audio files, it can chunk the input to avoid GPU OOM errors.
Environment Variables:
LOCALAI_API_URL: (required) Base URL of the LocalAI server
(e.g., http://localhost:8080)
LOCALAI_API_KEY: (optional) API key, if configured
LOCALAI_MODEL: (optional) Model name to use (default: vibevoice-diarize)
Chunking / long audio (all optional):
LOCALAI_CHUNK_DURATION: Max duration of each chunk in seconds
(default: 180.0)
LOCALAI_CHUNK_OVERLAP: Overlap between consecutive chunks in seconds
(default: 2.0)
LOCALAI_MAX_SINGLE_REQUEST_DURATION: If audio duration exceeds this, chunking
is enabled automatically (default: 300.0)
"""
import os
import io
import json
import logging
from typing import Dict, List, Any, Optional
import httpx
from .audio import get_audio_duration, split_audio_into_chunks
logger = logging.getLogger("scraibe.localai_client")
class LocalAIError(Exception):
"""Raised when the LocalAI API returns an error or unexpected response."""
pass
class LocalAIClient:
"""
Thin HTTP client for LocalAI /v1/audio/diarization with vibevoice.cpp.
Responsibilities:
- Read configuration from environment.
- Upload audio file as multipart/form-data.
- Parse diarization + transcription response (verbose_json).
- Map response into the same structure expected by Scraibe's Transcript.
- For long audio: chunk, transcribe each chunk, merge results.
"""
# Default thresholds for chunking long audio to avoid GPU OOM.
# These can be overridden via environment or at call time.
DEFAULT_CHUNK_DURATION = 180.0 # seconds
DEFAULT_CHUNK_OVERLAP = 2.0 # seconds
def __init__(
self,
api_url: Optional[str] = None,
api_key: Optional[str] = None,
model: Optional[str] = None,
timeout: float = 3600.0,
):
"""
Args:
api_url: LocalAI server URL (e.g., http://localhost:8080).
Falls back to LOCALAI_API_URL env var.
api_key: API key, if required. Falls back to LOCALAI_API_KEY.
model: Model name (e.g., vibevoice-diarize).
Falls back to LOCALAI_MODEL or default.
timeout: Request timeout in seconds.
"""
self.api_url = (api_url or os.getenv("LOCALAI_API_URL")).strip().rstrip("/")
self.api_key = api_key or os.getenv("LOCALAI_API_KEY") or None
self.model = model or os.getenv("LOCALAI_MODEL") or "vibevoice-diarize"
self.timeout = timeout
if not self.api_url:
raise LocalAIError(
"LOCALAI_API_URL is not set. "
"Provide the LocalAI server URL via environment or constructor."
)
logger.info(
"Initializing LocalAIClient: url=%s model=%s",
self.api_url,
self.model,
)
self._client = httpx.Client(
base_url=self.api_url,
timeout=self.timeout,
follow_redirects=True,
)
@staticmethod
def _env_float(var: str, default: float) -> float:
"""
Read a float from environment with a fallback default.
"""
val = (os.getenv(var) or "").strip()
if val == "":
return default
try:
return float(val)
except ValueError:
logger.warning(
"Invalid value for %s: %s; using default %s", var, val, default
)
return default
def _effective_chunk_duration(self, provided: Optional[float]) -> float:
"""
Resolve chunk_duration using this precedence:
1) provided argument
2) LOCALAI_CHUNK_DURATION env
3) class default
"""
if provided is not None:
return provided
return self._env_float("LOCALAI_CHUNK_DURATION", self.DEFAULT_CHUNK_DURATION)
def _effective_chunk_overlap(self, provided: Optional[float]) -> float:
"""
Resolve chunk_overlap:
1) provided argument
2) LOCALAI_CHUNK_OVERLAP env
3) class default
"""
if provided is not None:
return provided
return self._env_float("LOCALAI_CHUNK_OVERLAP", self.DEFAULT_CHUNK_OVERLAP)
def _effective_max_single_request_duration(self, provided: Optional[float]) -> float:
"""
Resolve max_single_request_duration:
1) provided argument
2) LOCALAI_MAX_SINGLE_REQUEST_DURATION env
3) default 300.0
"""
if provided is not None:
return provided
return self._env_float("LOCALAI_MAX_SINGLE_REQUEST_DURATION", 300.0)
def close(self):
"""Close the underlying HTTP client."""
self._client.close()
def __del__(self):
try:
self._client.close()
except Exception:
pass
def diarize_and_transcribe(
self,
audio_path: str,
*,
language: Optional[str] = None,
num_speakers: Optional[int] = None,
min_speakers: Optional[int] = None,
max_speakers: Optional[int] = None,
clustering_threshold: Optional[float] = None,
min_duration_on: Optional[float] = None,
min_duration_off: Optional[float] = None,
response_format: Optional[str] = None,
include_text: Optional[bool] = None,
verbose: bool = False,
return_raw: bool = False,
use_chunking: Optional[bool] = None,
chunk_duration: Optional[float] = None,
chunk_overlap: Optional[float] = None,
max_single_request_duration: Optional[float] = None,
**_ignored,
) -> Dict[str, Any]:
"""
Send audio to LocalAI /v1/audio/diarization and return:
- A normalized dict with segments, speakers, transcripts.
- Optionally, the raw verbose_json response (for JSON export).
For long audio, it can automatically chunk the file to avoid GPU OOM.
Args:
audio_path: Path to the audio file.
language: Language hint, forwarded if set.
num_speakers: Optional exact speaker count.
min_speakers: Optional hint.
max_speakers: Optional hint.
clustering_threshold: Optional clustering threshold.
min_duration_on: Optional min segment duration.
min_duration_off: Optional min gap duration.
response_format: "json", "verbose_json", or "rttm".
Defaults to "verbose_json".
include_text: Whether to request per-segment text.
Defaults to True.
verbose: If True, prints progress messages.
return_raw: If True, also return the raw API response in 'raw_result'.
use_chunking: Whether to enable chunking for long audio.
If None, enabled automatically based on duration.
chunk_duration: Max duration per chunk in seconds.
Falls back to LOCALAI_CHUNK_DURATION env, then 180.0.
chunk_overlap: Overlap between chunks in seconds.
Falls back to LOCALAI_CHUNK_OVERLAP env, then 2.0.
max_single_request_duration: If audio duration exceeds this, chunking
is enabled (unless explicitly disabled).
Falls back to LOCALAI_MAX_SINGLE_REQUEST_DURATION
env, then 300.0.
"""
if verbose:
print("Starting diarization and transcription via LocalAI.")
logger.info("diarize_and_transcribe requested for: %s", audio_path)
# Resolve chunking parameters with environment support
chunk_duration = self._effective_chunk_duration(chunk_duration)
chunk_overlap = self._effective_chunk_overlap(chunk_overlap)
max_single = self._effective_max_single_request_duration(max_single_request_duration)
if use_chunking is None:
try:
duration = get_audio_duration(audio_path)
except RuntimeError:
duration = None
use_chunking = (duration is not None and duration > max_single)
logger.info(
"Auto-chunking decision: duration=%s, threshold=%s, use_chunking=%s",
duration,
max_single,
use_chunking,
)
if use_chunking:
return self._diarize_and_transcribe_chunked(
audio_path=audio_path,
language=language,
num_speakers=num_speakers,
min_speakers=min_speakers,
max_speakers=max_speakers,
clustering_threshold=clustering_threshold,
min_duration_on=min_duration_on,
min_duration_off=min_duration_off,
response_format=response_format,
include_text=include_text,
verbose=verbose,
return_raw=return_raw,
chunk_duration=chunk_duration,
chunk_overlap=chunk_overlap,
)
# Single-request path (existing behavior)
return self._diarize_and_transcribe_single(
audio_path=audio_path,
language=language,
num_speakers=num_speakers,
min_speakers=min_speakers,
max_speakers=max_speakers,
clustering_threshold=clustering_threshold,
min_duration_on=min_duration_on,
min_duration_off=min_duration_off,
response_format=response_format,
include_text=include_text,
verbose=verbose,
return_raw=return_raw,
)
def _diarize_and_transcribe_single(
self,
audio_path: str,
*,
language: Optional[str] = None,
num_speakers: Optional[int] = None,
min_speakers: Optional[int] = None,
max_speakers: Optional[int] = None,
clustering_threshold: Optional[float] = None,
min_duration_on: Optional[float] = None,
min_duration_off: Optional[float] = None,
response_format: Optional[str] = None,
include_text: Optional[bool] = None,
verbose: bool = False,
return_raw: bool = False,
) -> Dict[str, Any]:
"""
Internal: single-request diarization and transcription.
"""
if verbose:
print("Starting diarization and transcription via LocalAI.")
logger.info("diarize_and_transcribe requested for: %s", audio_path)
# Always use verbose_json for diarization + speaker info
if response_format is None:
response_format = "verbose_json"
if include_text is None:
include_text = True
# Prepare form data
data = {
"model": self.model,
"response_format": response_format,
"include_text": str(include_text).lower(),
}
if language is not None:
data["language"] = language
if num_speakers is not None:
data["num_speakers"] = str(num_speakers)
if min_speakers is not None:
data["min_speakers"] = str(min_speakers)
if max_speakers is not None:
data["max_speakers"] = str(max_speakers)
if clustering_threshold is not None:
data["clustering_threshold"] = str(clustering_threshold)
if min_duration_on is not None:
data["min_duration_on"] = str(min_duration_on)
if min_duration_off is not None:
data["min_duration_off"] = str(min_duration_off)
logger.debug("LocalAI request params: %s", data)
# Open file
if not os.path.exists(audio_path):
raise LocalAIError(f"Audio file not found: {audio_path}")
with open(audio_path, "rb") as f:
files = {
"file": (os.path.basename(audio_path), f, "application/octet-stream")
}
headers = {}
if self.api_key:
headers["Authorization"] = f"Bearer {self.api_key}"
# POST /v1/audio/diarization
logger.info("Sending request to LocalAI: /v1/audio/diarization")
resp = self._client.post(
"/v1/audio/diarization",
data=data,
files=files,
headers=headers,
)
logger.info("LocalAI response status: %d", resp.status_code)
if resp.status_code >= 400:
body = resp.text
logger.error("LocalAI error response: %s", body)
raise LocalAIError(
f"LocalAI request failed with status {resp.status_code}: {body}"
)
try:
raw_result = resp.json()
except json.JSONDecodeError:
logger.error("Failed to parse LocalAI response as JSON.")
raise LocalAIError(
"Failed to parse LocalAI response as JSON."
)
if verbose:
print("Diarization and transcription finished. Starting post-processing.")
parsed = self._parse_diarization_response(raw_result)
if return_raw:
parsed["raw_result"] = raw_result
return parsed
def _diarize_and_transcribe_chunked(
self,
audio_path: str,
*,
language: Optional[str] = None,
num_speakers: Optional[int] = None,
min_speakers: Optional[int] = None,
max_speakers: Optional[int] = None,
clustering_threshold: Optional[float] = None,
min_duration_on: Optional[float] = None,
min_duration_off: Optional[float] = None,
response_format: Optional[str] = None,
include_text: Optional[bool] = None,
verbose: bool = False,
return_raw: bool = False,
chunk_duration: float = DEFAULT_CHUNK_DURATION,
chunk_overlap: float = DEFAULT_CHUNK_OVERLAP,
) -> Dict[str, Any]:
"""
Internal: chunked diarization and transcription for long audio.
- Splits audio into overlapping chunks.
- Transcribes each chunk via /v1/audio/diarization.
- Merges segments with adjusted timestamps.
"""
if verbose:
print("Audio is long; splitting into chunks to avoid GPU memory issues.")
logger.info(
"Chunked transcription: chunk_duration=%s, overlap=%s",
chunk_duration,
chunk_overlap,
)
chunks = split_audio_into_chunks(
input_path=audio_path,
max_duration=chunk_duration,
overlap=chunk_overlap,
)
if len(chunks) == 1:
# No actual split needed; fall back to single-request path
return self._diarize_and_transcribe_single(
audio_path=chunks[0]["path"],
language=language,
num_speakers=num_speakers,
min_speakers=min_speakers,
max_speakers=max_speakers,
clustering_threshold=clustering_threshold,
min_duration_on=min_duration_on,
min_duration_off=min_duration_off,
response_format=response_format,
include_text=include_text,
verbose=verbose,
return_raw=return_raw,
)
all_segments: List[List[float]] = []
all_speakers: List[str] = []
all_transcripts: List[str] = []
raw_results: List[Dict[str, Any]] = []
temp_files = [c["path"] for c in chunks]
try:
for i, chunk_info in enumerate(chunks):
chunk_path = chunk_info["path"]
chunk_start = chunk_info["start"]
if verbose:
print(
f"Transcribing chunk {i+1}/{len(chunks)} "
f"(start={chunk_start:.1f}s)"
)
logger.info(
"Transcribing chunk %d/%d, start=%.1f", i + 1, len(chunks), chunk_start
)
# Use single-request logic for each chunk
chunk_result = self._diarize_and_transcribe_single(
audio_path=chunk_path,
language=language,
num_speakers=num_speakers,
min_speakers=min_speakers,
max_speakers=max_speakers,
clustering_threshold=clustering_threshold,
min_duration_on=min_duration_on,
min_duration_off=min_duration_off,
response_format=response_format,
include_text=include_text,
verbose=False,
return_raw=return_raw,
)
segs = chunk_result.get("segments", [])
spks = chunk_result.get("speakers", [])
txts = chunk_result.get("transcripts", [])
raw = chunk_result.get("raw_result")
# Adjust timestamps to global timeline
adjusted_segs = []
for seg, sp, txt in zip(segs, spks, txts):
start = float(seg[0]) + chunk_start
end = float(seg[1]) + chunk_start
adjusted_segs.append([start, end])
all_speakers.append(sp)
all_transcripts.append(txt)
all_segments.extend(adjusted_segs)
if return_raw and raw is not None:
raw_results.append(raw)
finally:
# Clean up temporary chunk files
for path in temp_files:
if path and os.path.exists(path) and path != audio_path:
try:
os.remove(path)
except Exception as e:
logger.warning("Failed to remove chunk file %s: %s", path, e)
# Sort segments by start time
combined = list(zip(all_segments, all_speakers, all_transcripts))
combined.sort(key=lambda x: x[0][0])
all_segments = [x[0] for x in combined]
all_speakers = [x[1] for x in combined]
all_transcripts = [x[2] for x in combined]
if verbose:
print(
f"Chunked transcription complete. Total segments: {len(all_segments)}"
)
result = {
"segments": all_segments,
"speakers": all_speakers,
"transcripts": all_transcripts,
}
if return_raw and raw_results:
result["raw_result"] = {
"chunked": True,
"chunks": raw_results,
}
return result
def _parse_diarization_response(self, result: Dict[str, Any]) -> Dict[str, Any]:
"""
Convert LocalAI verbose_json response into the internal format used by Scraibe:
{
"segments": [ [start, end], ... ],
"speakers": [ "SPEAKER_00", ... ],
"transcripts": [ "text for segment", ... ]
}
"""
segments = result.get("segments", [])
if not segments:
logger.warning("LocalAI returned no segments.")
return {
"segments": [],
"speakers": [],
"transcripts": [],
}
out_segments = []
out_speakers = []
out_transcripts = []
for seg in segments:
start = float(seg.get("start", 0.0))
end = float(seg.get("end", 0.0))
speaker = seg.get("speaker", "SPEAKER_00")
text = seg.get("text", "").strip()
out_segments.append([start, end])
out_speakers.append(speaker)
out_transcripts.append(text)
logger.info(
"Parsed %d segments from LocalAI.",
len(out_segments),
)
return {
"segments": out_segments,
"speakers": out_speakers,
"transcripts": out_transcripts,
}