435 lines
14 KiB
Python
435 lines
14 KiB
Python
"""
|
|
ScrAIbe Web GUI (Gradio)
|
|
------------------------
|
|
|
|
Runs the Web GUI that:
|
|
- Accepts audio uploads
|
|
- Sends audio to LocalAI for transcription + diarization
|
|
- Optionally sends transcript to a second LLM for summarization
|
|
- Returns transcript (and summary) in the browser
|
|
- Optionally emails transcript files (TXT + JSON)
|
|
|
|
This is the default entrypoint when running in Docker.
|
|
"""
|
|
|
|
import os
|
|
import json
|
|
import logging
|
|
import tempfile
|
|
from datetime import datetime
|
|
|
|
import gradio as gr
|
|
|
|
from .autotranscript import Scraibe
|
|
from .misc import setup_logging
|
|
|
|
logger = logging.getLogger("scraibe.webui")
|
|
|
|
|
|
def load_config():
|
|
"""
|
|
Load configuration from misc/config.yaml if present.
|
|
Primary runtime configuration is via environment variables.
|
|
"""
|
|
config_path = os.getenv("SCRAIBE_CONFIG", "/app/src/misc/config.yaml")
|
|
config = {}
|
|
if os.path.exists(config_path):
|
|
try:
|
|
import yaml
|
|
with open(config_path, "r", encoding="utf-8") as f:
|
|
config = yaml.safe_load(f) or {}
|
|
except Exception as e:
|
|
logger.warning("Failed to load config from %s: %s", config_path, e)
|
|
return config
|
|
|
|
|
|
def create_app():
|
|
"""
|
|
Create and launch the Gradio Web GUI.
|
|
"""
|
|
|
|
# Logging
|
|
log_level = os.getenv("LOG_LEVEL", "INFO")
|
|
setup_logging(level=log_level)
|
|
|
|
# Load config (branding, layout, etc.)
|
|
config = load_config()
|
|
layout_cfg = config.get("layout", {})
|
|
launch_cfg = config.get("launch", {})
|
|
|
|
logger.info("Starting ScrAIbe Web GUI.")
|
|
|
|
# Initialize Scraibe (LocalAI-backed)
|
|
# If LocalAI is unreachable at startup, still launch the UI
|
|
# and let individual transcription calls fail with a clear message.
|
|
scraibe = None
|
|
try:
|
|
scraibe = Scraibe(verbose=True)
|
|
except Exception as e:
|
|
logger.warning(
|
|
"Failed to initialize Scraibe at startup (LocalAI may be down): %s. "
|
|
"Web GUI will start; transcription will fail until LocalAI is reachable.",
|
|
e,
|
|
)
|
|
|
|
# Helper: run transcription via LocalAI API
|
|
def run_transcribe(
|
|
audio_path,
|
|
task,
|
|
language,
|
|
num_speakers,
|
|
send_email_flag,
|
|
email_to,
|
|
email_cc,
|
|
email_subject,
|
|
):
|
|
if not audio_path:
|
|
raise ValueError("No audio file provided.")
|
|
|
|
email_status = ""
|
|
attachments = []
|
|
|
|
# Ensure we use rich export mode (for JSON with diarization)
|
|
try:
|
|
if task == "transcript_and_summarize":
|
|
result = scraibe.transcript_and_summarize(
|
|
audio_file=audio_path,
|
|
language=language or None,
|
|
num_speakers=int(num_speakers) if num_speakers else None,
|
|
verbose=True,
|
|
for_export=True,
|
|
)
|
|
transcript_text = result.get("transcript", "")
|
|
summary_text = result.get("summary", "")
|
|
segments = result.get("segments", [])
|
|
raw_result = result.get("raw_result")
|
|
|
|
# Save as .md (transcript + summary)
|
|
md_path = tempfile.mktemp(suffix=".md")
|
|
with open(md_path, "w", encoding="utf-8") as f:
|
|
f.write("# Transcript\n\n")
|
|
f.write(transcript_text)
|
|
f.write("\n\n# Summary\n\n")
|
|
f.write(summary_text)
|
|
|
|
# Save as .txt (plain transcript)
|
|
txt_path = tempfile.mktemp(suffix=".txt")
|
|
with open(txt_path, "w", encoding="utf-8") as f:
|
|
f.write(transcript_text)
|
|
|
|
# Save as .json (diarization + transcript + summary)
|
|
json_data = {
|
|
"task": "transcript_and_summarize",
|
|
"transcript": transcript_text,
|
|
"summary": summary_text,
|
|
"segments": segments,
|
|
"metadata": {
|
|
"timestamp": datetime.utcnow().isoformat()
|
|
},
|
|
}
|
|
if raw_result is not None:
|
|
json_data["raw_result"] = raw_result
|
|
|
|
json_path = tempfile.mktemp(suffix=".json")
|
|
with open(json_path, "w", encoding="utf-8") as f:
|
|
json.dump(json_data, f, indent=2, ensure_ascii=False)
|
|
|
|
# Prepare attachments for email
|
|
if send_email_flag:
|
|
attachments = [txt_path, json_path]
|
|
|
|
status_msg = "Transcription and summarization completed."
|
|
|
|
else:
|
|
# transcribe only (with diarization)
|
|
result = scraibe.transcribe(
|
|
audio_file=audio_path,
|
|
language=language or None,
|
|
num_speakers=int(num_speakers) if num_speakers else None,
|
|
verbose=True,
|
|
for_export=True,
|
|
)
|
|
transcript_text = result.get("transcript", "")
|
|
segments = result.get("segments", [])
|
|
raw_result = result.get("raw_result")
|
|
|
|
# Save as .txt (plain transcript)
|
|
txt_path = tempfile.mktemp(suffix=".txt")
|
|
with open(txt_path, "w", encoding="utf-8") as f:
|
|
f.write(transcript_text)
|
|
|
|
# Save as .json (diarization + transcript)
|
|
json_data = {
|
|
"task": "transcribe",
|
|
"transcript": transcript_text,
|
|
"segments": segments,
|
|
"metadata": {
|
|
"timestamp": datetime.utcnow().isoformat()
|
|
},
|
|
}
|
|
if raw_result is not None:
|
|
json_data["raw_result"] = raw_result
|
|
|
|
json_path = tempfile.mktemp(suffix=".json")
|
|
with open(json_path, "w", encoding="utf-8") as f:
|
|
json.dump(json_data, f, indent=2, ensure_ascii=False)
|
|
|
|
# Prepare attachments for email
|
|
if send_email_flag:
|
|
attachments = [txt_path, json_path]
|
|
|
|
status_msg = "Transcription completed."
|
|
|
|
except Exception as e:
|
|
logger.error("Error during transcription: %s", e)
|
|
return (
|
|
"",
|
|
"",
|
|
None,
|
|
f"Error: {e}",
|
|
"",
|
|
)
|
|
|
|
# Handle email after successful transcription
|
|
if send_email_flag and attachments:
|
|
try:
|
|
from .email_sender import send_email, EmailError
|
|
except ImportError:
|
|
email_status = "Email feature unavailable (email_sender not found)."
|
|
else:
|
|
to = (email_to or "").strip()
|
|
cc = (email_cc or "").strip()
|
|
subject = (email_subject or "").strip()
|
|
|
|
if not to:
|
|
email_status = "Email not sent: 'To' address is empty."
|
|
else:
|
|
if not subject:
|
|
subject = f"ScrAIbe Transcript - {datetime.utcnow().strftime('%Y-%m-%d %H:%M UTC')}"
|
|
|
|
body = (
|
|
"Please find the transcription files attached.\n"
|
|
"This message was generated by ScrAIbe.\n"
|
|
)
|
|
|
|
try:
|
|
send_email(
|
|
to=to,
|
|
cc=cc or None,
|
|
subject=subject,
|
|
body=body,
|
|
attachments=attachments,
|
|
)
|
|
email_status = "Transcript files sent via email."
|
|
except EmailError as e:
|
|
email_status = f"Email failed: {e}"
|
|
except Exception as e:
|
|
email_status = f"Email failed: {e}"
|
|
|
|
# Use md_path for file_output in transcript_and_summarize, else txt_path
|
|
file_path = md_path if task == "transcript_and_summarize" else txt_path
|
|
|
|
return (
|
|
transcript_text,
|
|
summary_text if task == "transcript_and_summarize" else "",
|
|
file_path,
|
|
status_msg,
|
|
email_status,
|
|
)
|
|
|
|
# Load header/footer HTML if present
|
|
header_path = layout_cfg.get("header", "/app/src/misc/header.html")
|
|
footer_path = layout_cfg.get("footer", "/app/src/misc/footer.html")
|
|
|
|
header_html = ""
|
|
footer_html = ""
|
|
|
|
if header_path and os.path.exists(header_path):
|
|
with open(header_path, "r", encoding="utf-8") as f:
|
|
header_html = f.read()
|
|
|
|
if footer_path and os.path.exists(footer_path):
|
|
with open(footer_path, "r", encoding="utf-8") as f:
|
|
footer_html = f.read()
|
|
|
|
# Build Gradio interface
|
|
# In Gradio 6.0+, css must be passed to launch(), not Blocks()
|
|
with gr.Blocks(
|
|
title="A.P.Strom Transcription",
|
|
) as app:
|
|
|
|
# Header
|
|
if header_html:
|
|
gr.HTML(header_html)
|
|
|
|
with gr.Row():
|
|
with gr.Column(scale=2):
|
|
audio_input = gr.Audio(
|
|
label="Upload or record audio",
|
|
type="filepath",
|
|
)
|
|
|
|
with gr.Row():
|
|
task_choice = gr.Radio(
|
|
choices=[
|
|
("Transcribe", "transcribe"),
|
|
("Transcript & Summarize", "transcript_and_summarize"),
|
|
],
|
|
value="transcribe",
|
|
label="Task",
|
|
)
|
|
|
|
with gr.Row():
|
|
language_input = gr.Textbox(
|
|
label="Language (optional)",
|
|
placeholder="e.g., english, german",
|
|
)
|
|
num_speakers_input = gr.Number(
|
|
label="Number of speakers (optional)",
|
|
precision=0,
|
|
)
|
|
|
|
# Email options
|
|
send_email_checkbox = gr.Checkbox(
|
|
label="Send transcript files via email"
|
|
)
|
|
|
|
with gr.Group(visible=False) as email_group:
|
|
email_to = gr.Textbox(
|
|
label="To (comma-separated)",
|
|
placeholder="e.g. name@example.com",
|
|
)
|
|
email_cc = gr.Textbox(
|
|
label="CC (optional, comma-separated)",
|
|
placeholder="e.g. manager@example.com",
|
|
)
|
|
email_subject = gr.Textbox(
|
|
label="Subject (optional)",
|
|
placeholder="Default: ScrAIbe Transcript - <date>",
|
|
)
|
|
|
|
send_email_checkbox.change(
|
|
fn=lambda v: gr.update(visible=v),
|
|
inputs=[send_email_checkbox],
|
|
outputs=[email_group],
|
|
)
|
|
|
|
transcribe_btn = gr.Button("Start", variant="primary")
|
|
|
|
with gr.Column(scale=3):
|
|
output_text = gr.Textbox(
|
|
label="Transcript",
|
|
lines=10,
|
|
interactive=False,
|
|
)
|
|
summary_text = gr.Textbox(
|
|
label="Summary",
|
|
lines=10,
|
|
interactive=False,
|
|
visible=False,
|
|
)
|
|
file_output = gr.File(
|
|
label="Download transcript/summary",
|
|
)
|
|
status_text = gr.Textbox(
|
|
label="Status",
|
|
interactive=False,
|
|
)
|
|
email_status_text = gr.Textbox(
|
|
label="Email status",
|
|
interactive=False,
|
|
visible=True,
|
|
)
|
|
|
|
# Footer
|
|
if footer_html:
|
|
gr.HTML(footer_html)
|
|
|
|
# Events
|
|
|
|
def on_task_change(value):
|
|
show_summary = (value == "transcript_and_summarize")
|
|
return gr.update(visible=show_summary)
|
|
|
|
task_choice.change(
|
|
fn=on_task_change,
|
|
inputs=[task_choice],
|
|
outputs=[summary_text],
|
|
)
|
|
|
|
def on_transcribe(
|
|
audio,
|
|
task,
|
|
language,
|
|
num_speakers,
|
|
send_email_flag,
|
|
email_to_val,
|
|
email_cc_val,
|
|
email_subject_val,
|
|
):
|
|
if not audio:
|
|
return (
|
|
"",
|
|
"",
|
|
None,
|
|
"Please upload or record audio.",
|
|
"",
|
|
)
|
|
|
|
transcript, summary, file_path, status_msg, email_status = run_transcribe(
|
|
audio_path=audio,
|
|
task=task,
|
|
language=language,
|
|
num_speakers=num_speakers,
|
|
send_email_flag=bool(send_email_flag),
|
|
email_to=email_to_val,
|
|
email_cc=email_cc_val,
|
|
email_subject=email_subject_val,
|
|
)
|
|
|
|
show_summary = bool(summary)
|
|
return (
|
|
transcript,
|
|
summary,
|
|
file_path if file_path else None,
|
|
status_msg,
|
|
email_status,
|
|
)
|
|
|
|
transcribe_btn.click(
|
|
fn=on_transcribe,
|
|
inputs=[
|
|
audio_input,
|
|
task_choice,
|
|
language_input,
|
|
num_speakers_input,
|
|
send_email_checkbox,
|
|
email_to,
|
|
email_cc,
|
|
email_subject,
|
|
],
|
|
outputs=[
|
|
output_text,
|
|
summary_text,
|
|
file_output,
|
|
status_text,
|
|
email_status_text,
|
|
],
|
|
)
|
|
|
|
# Launch options
|
|
server_name = launch_cfg.get("server_name", os.getenv("GRADIO_SERVER_NAME", "0.0.0.0"))
|
|
server_port = launch_cfg.get("server_port", 7860)
|
|
favicon_path = launch_cfg.get("favicon_path", "/app/src/misc/logo.png")
|
|
|
|
app.launch(
|
|
server_name=str(server_name),
|
|
server_port=int(server_port),
|
|
favicon_path=favicon_path if os.path.exists(favicon_path) else None,
|
|
css="body { font-family: Arial, sans-serif; }",
|
|
)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
create_app()
|