302 lines
9.7 KiB
Python
302 lines
9.7 KiB
Python
"""
|
|
Scraibe Class (LocalAI-backed)
|
|
------------------------------
|
|
|
|
Core class for transcription and (optionally) summarization.
|
|
|
|
- Transcription and diarization are delegated to LocalAI (vibevoice.cpp).
|
|
- Summarization is delegated to a separate LLM via /v1/chat/completions.
|
|
|
|
Public tasks:
|
|
- transcribe
|
|
- transcript_and_summarize (transcribe + generate a detailed summary)
|
|
|
|
Previous task/whisper/pyannote-specific settings are kept for compatibility
|
|
but ignored when not relevant.
|
|
"""
|
|
|
|
import os
|
|
import logging
|
|
from typing import Union, Optional
|
|
|
|
from .localai_client import LocalAIClient, LocalAIError
|
|
from .summarizer import SummarizerClient, SummarizerError
|
|
from .transcript_exporter import Transcript
|
|
|
|
logger = logging.getLogger("scraibe.autotranscript")
|
|
|
|
|
|
class Scraibe:
|
|
"""
|
|
Scraibe now:
|
|
- Uses LocalAI for transcription + diarization.
|
|
- Uses a separate LLM for summarization (when requested).
|
|
|
|
Public methods:
|
|
- transcribe(audio_file, ...)
|
|
- transcript_and_summarize(audio_file, ...)
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
api_url: Optional[str] = None,
|
|
api_key: Optional[str] = None,
|
|
model: Optional[str] = None,
|
|
whisper_model: Union[bool, str] = None,
|
|
whisper_type: str = "whisper",
|
|
dia_model: Union[bool, str] = None,
|
|
use_auth_token: str = None,
|
|
verbose: bool = False,
|
|
**kwargs,
|
|
) -> None:
|
|
"""
|
|
Initialize Scraibe with LocalAI client and summarizer client.
|
|
|
|
Args:
|
|
api_url: LocalAI server URL for transcription/diarization.
|
|
Falls back to LOCALAI_API_URL env var.
|
|
api_key: API key for LocalAI. Falls back to LOCALAI_API_KEY.
|
|
model: Model name for LocalAI (e.g., vibevoice-diarize).
|
|
Falls back to LOCALAI_MODEL env var.
|
|
|
|
Summarizer uses:
|
|
- SUMMARIZER_API_URL
|
|
- SUMMARIZER_API_KEY
|
|
- SUMMARIZER_MODEL
|
|
These can be overridden via environment or via the transcript_and_summarize
|
|
method if needed.
|
|
|
|
Backward-compat (ignored):
|
|
- whisper_model, whisper_type, dia_model, use_auth_token, etc.
|
|
"""
|
|
self.verbose = verbose or kwargs.get("verbose", False)
|
|
|
|
logger.info("Initializing Scraibe.")
|
|
|
|
try:
|
|
self.client = LocalAIClient(
|
|
api_url=api_url,
|
|
api_key=api_key,
|
|
model=model,
|
|
)
|
|
except LocalAIError as e:
|
|
logger.error("Failed to initialize LocalAI client: %s", e)
|
|
raise LocalAIError(f"Failed to initialize LocalAI client: {e}")
|
|
|
|
# Summarizer is lazy-initialized if needed
|
|
self._summarizer: Optional[SummarizerClient] = None
|
|
|
|
if self.verbose:
|
|
print("Scraibe initialized. Using LocalAI for transcription and diarization.")
|
|
|
|
def _ensure_summarizer(
|
|
self,
|
|
api_url: Optional[str] = None,
|
|
api_key: Optional[str] = None,
|
|
model: Optional[str] = None,
|
|
) -> SummarizerClient:
|
|
"""
|
|
Lazy-init summarizer client.
|
|
"""
|
|
if self._summarizer is not None:
|
|
return self._summarizer
|
|
|
|
logger.info("Initializing SummarizerClient (lazy).")
|
|
try:
|
|
self._summarizer = SummarizerClient(
|
|
api_url=api_url,
|
|
api_key=api_key,
|
|
model=model,
|
|
)
|
|
except SummarizerError as e:
|
|
logger.error("Failed to initialize Summarizer client: %s", e)
|
|
raise SummarizerError(f"Failed to initialize Summarizer client: {e}")
|
|
|
|
return self._summarizer
|
|
|
|
# -----------------
|
|
# Primary public API
|
|
# -----------------
|
|
|
|
def transcribe(
|
|
self,
|
|
audio_file: Union[str],
|
|
**kwargs,
|
|
) -> str:
|
|
"""
|
|
Transcribe the provided audio file using LocalAI.
|
|
|
|
Uses /v1/audio/diarization with vibevoice.cpp, then concatenates
|
|
all segment texts.
|
|
|
|
Args:
|
|
audio_file (str): Path to the audio file.
|
|
**kwargs: Additional keyword arguments (some forwarded, others ignored).
|
|
|
|
Returns:
|
|
str: The concatenated transcribed text.
|
|
"""
|
|
if isinstance(audio_file, str):
|
|
if not os.path.exists(audio_file):
|
|
raise FileNotFoundError(f"Audio file not found: {audio_file}")
|
|
else:
|
|
raise TypeError(
|
|
"In LocalAI mode, audio_file must be a file path (str)."
|
|
)
|
|
|
|
verbose = kwargs.get("verbose", self.verbose)
|
|
logger.info("transcribe called for: %s", audio_file)
|
|
|
|
try:
|
|
result = self.client.diarize_and_transcribe(
|
|
audio_path=audio_file,
|
|
include_text=True,
|
|
verbose=verbose,
|
|
**kwargs,
|
|
)
|
|
except LocalAIError as e:
|
|
logger.error("Error during LocalAI transcription: %s", e)
|
|
raise LocalAIError(f"Error during LocalAI transcription: {e}")
|
|
|
|
transcripts = result.get("transcripts", [])
|
|
text = " ".join(t.strip() for t in transcripts if t.strip())
|
|
logger.info("transcribe completed, length=%d chars", len(text))
|
|
return text
|
|
|
|
def transcript_and_summarize(
|
|
self,
|
|
audio_file: Union[str],
|
|
*,
|
|
summarizer_api_url: Optional[str] = None,
|
|
summarizer_api_key: Optional[str] = None,
|
|
summarizer_model: Optional[str] = None,
|
|
**kwargs,
|
|
) -> dict:
|
|
"""
|
|
Transcribe the audio file and generate a detailed summary.
|
|
|
|
Steps:
|
|
- Transcribe via LocalAI.
|
|
- Build a plain-text transcript (with speaker labels).
|
|
- Summarize the transcript using the configured LLM.
|
|
|
|
Returns:
|
|
dict with:
|
|
- transcript: full transcript text (with speaker labels)
|
|
- summary: final detailed summary (markdown-ready)
|
|
"""
|
|
if isinstance(audio_file, str):
|
|
if not os.path.exists(audio_file):
|
|
raise FileNotFoundError(f"Audio file not found: {audio_file}")
|
|
else:
|
|
raise TypeError(
|
|
"In LocalAI mode, audio_file must be a file path (str)."
|
|
)
|
|
|
|
verbose = kwargs.get("verbose", self.verbose)
|
|
logger.info("transcript_and_summarize called for: %s", audio_file)
|
|
|
|
# 1) Get diarized + transcribed result
|
|
try:
|
|
result = self.client.diarize_and_transcribe(
|
|
audio_path=audio_file,
|
|
include_text=True,
|
|
verbose=verbose,
|
|
**kwargs,
|
|
)
|
|
except LocalAIError as e:
|
|
logger.error("Error during LocalAI transcription: %s", e)
|
|
raise LocalAIError(f"Error during LocalAI transcription: {e}")
|
|
|
|
segments = result.get("segments", [])
|
|
speakers = result.get("speakers", [])
|
|
transcripts = result.get("transcripts", [])
|
|
|
|
if not segments:
|
|
logger.warning("No segments returned; returning empty transcript/summary.")
|
|
return {
|
|
"transcript": "",
|
|
"summary": "No transcript content to summarize.",
|
|
}
|
|
|
|
# 2) Build full transcript text with speaker labels
|
|
lines = []
|
|
for seg, speaker, text in zip(segments, speakers, transcripts):
|
|
start, end = seg
|
|
ts = self._format_timestamp(start)
|
|
line = f"[{ts}] {speaker}: {text.strip()}"
|
|
lines.append(line)
|
|
|
|
full_transcript = "\n\n".join(lines)
|
|
logger.info("Built full transcript, length=%d chars", len(full_transcript))
|
|
|
|
# 3) Summarize
|
|
try:
|
|
summarizer = self._ensure_summarizer(
|
|
api_url=summarizer_api_url,
|
|
api_key=summarizer_api_key,
|
|
model=summarizer_model,
|
|
)
|
|
except SummarizerError as e:
|
|
logger.error("Failed to initialize summarizer: %s", e)
|
|
raise SummarizerError(f"Failed to initialize summarizer: {e}")
|
|
|
|
try:
|
|
summary = summarizer.summarize_transcript(full_transcript)
|
|
except SummarizerError as e:
|
|
logger.error("Error during summarization: %s", e)
|
|
raise SummarizerError(f"Error during summarization: {e}")
|
|
|
|
logger.info("transcript_and_summarize completed.")
|
|
|
|
return {
|
|
"transcript": full_transcript,
|
|
"summary": summary,
|
|
}
|
|
|
|
# -----------------
|
|
# Helpers
|
|
# -----------------
|
|
|
|
@staticmethod
|
|
def _format_timestamp(seconds: float) -> str:
|
|
"""
|
|
Format seconds into MM:SS or HH:MM:SS.
|
|
"""
|
|
m, s = divmod(int(seconds), 60)
|
|
h, m = divmod(m, 60)
|
|
if h > 0:
|
|
return f"{h:02d}:{m:02d}:{s:02d}"
|
|
return f"{m:02d}:{s:02d}"
|
|
|
|
@staticmethod
|
|
def remove_audio_file(audio_file: str, shred: bool = False) -> None:
|
|
"""
|
|
Remove the original audio file.
|
|
"""
|
|
if not os.path.exists(audio_file):
|
|
raise ValueError(f"Audiofile {audio_file} does not exist.")
|
|
|
|
if shred:
|
|
import subprocess
|
|
import warnings
|
|
from glob import iglob
|
|
|
|
warnings.warn("Shredding audiofile can take a long time.", RuntimeWarning)
|
|
|
|
gen = iglob(f"{audio_file}", recursive=True)
|
|
cmd = ["shred", "-zvu", "-n", "10", f"{audio_file}"]
|
|
|
|
if os.path.isdir(audio_file):
|
|
raise ValueError(f"Audiofile {audio_file} is a directory.")
|
|
|
|
for file in gen:
|
|
print(f"shredding {file} now\n")
|
|
subprocess.run(cmd, check=True)
|
|
else:
|
|
os.remove(audio_file)
|
|
print(f"Audiofile {audio_file} removed.")
|
|
|
|
def __repr__(self):
|
|
return "Scraibe(LocalAI-backed)"
|