Add MCP-style API server (OpenAPI) alongside WebUI
Mirror and run GitLab CI / build (push) Waiting to run
Ruff / ruff (push) Waiting to run

- New mcp_server.py: FastAPI app for LLMs to upload audio and get transcript JSON.
- Added process_mcp_transcribe_task Celery task.
- Updated __main__.py: WebUI always runs; MCP server runs in parallel when MCP_SERVER_ENABLED=true.
This commit is contained in:
admin
2026-06-19 17:04:44 +00:00
parent 111d1ea18b
commit 54414def26
3 changed files with 301 additions and 1 deletions
+31 -1
View File
@@ -3,10 +3,40 @@ Entrypoint for running ScrAIbe as a module:
python -m scraibe python -m scraibe
Always launches the Web GUI (Gradio), never the CLI. Always launches the Web GUI (Gradio).
Optionally launches an MCP-style API server in parallel.
""" """
import os
import threading
from .webui import create_app from .webui import create_app
def _run_mcp_server():
"""
Run MCP server in a separate thread.
"""
import uvicorn
from . import mcp_server
host = os.getenv("MCP_SERVER_HOST", "0.0.0.0")
port = int(os.getenv("MCP_SERVER_PORT", "8000"))
uvicorn.run(
mcp_server.app,
host=host,
port=port,
log_level="info",
)
if __name__ == "__main__": if __name__ == "__main__":
# Optionally start MCP server in background
mcp_enabled = os.getenv("MCP_SERVER_ENABLED", "false").strip().lower() in ("true", "1", "yes")
if mcp_enabled:
t = threading.Thread(target=_run_mcp_server, daemon=True)
t.start()
# Always start WebUI (Gradio)
create_app() create_app()
+205
View File
@@ -0,0 +1,205 @@
"""
MCP-style HTTP server for ScrAIbe.
- Exposes an OpenAPI-compliant endpoint for external LLMs to:
- Upload audio
- Receive transcript JSON (no summary)
- WebUI remains always enabled; this is additive.
Configuration (env):
- MCP_SERVER_ENABLED: "true"/"false" (default: false)
- MCP_SERVER_HOST: bind address (default: 0.0.0.0)
- MCP_SERVER_PORT: port (default: 8000)
- MCP_USE_CELERY: "true"/"false" (default: true)
- If true, uses Celery tasks; if false, runs synchronously.
"""
import os
import time
import uuid
import json
import logging
from typing import Optional
from fastapi import FastAPI, UploadFile, File, Form, HTTPException
from fastapi.responses import JSONResponse
from .autotranscript import Scraibe
logger = logging.getLogger("scraibe.mcp_server")
app = FastAPI(
title="ScrAIbe MCP Transcription API",
version="0.1.0",
description=(
"MCP-style HTTP API for ScrAIbe. "
"Allows external LLMs to upload audio and receive transcript JSON."
),
)
# In-memory job store for MCP (simple; can be replaced with Redis later)
_mcp_jobs: dict = {}
def _job_id() -> str:
return str(uuid.uuid4())
@app.get("/health")
async def health():
return {"status": "ok"}
@app.post("/transcribe")
async def transcribe(
file: UploadFile = File(...),
language: Optional[str] = Form(None),
num_speakers: Optional[int] = Form(None),
):
"""
Upload audio and start transcription.
Returns:
{
"job_id": "<id>",
"status": "queued" | "processing" | "completed" | "error",
"message": "..."
}
Use GET /transcribe/{job_id}/status and /json to retrieve results.
"""
use_celery = os.getenv("MCP_USE_CELERY", "true").strip().lower() in ("true", "1", "yes")
# Save uploaded file temporarily
try:
import tempfile
from pathlib import Path
upload_dir = Path(os.getenv("SCRAIBE_UPLOAD_DIR", "/tmp/scraibe_uploads"))
upload_dir.mkdir(parents=True, exist_ok=True)
ext = Path(file.filename or "file").suffix or ".wav"
ts = time.strftime("%Y%m%d%H%M%S")
tmp_name = f"mcp_upload_{ts}_{uuid.uuid4().hex[:8]}{ext}"
file_path = upload_dir / tmp_name
content = await file.read()
file_path.write_bytes(content)
except Exception as e:
logger.error("Error saving MCP upload: %s", e)
raise HTTPException(status_code=500, detail=f"Error saving file: {e}")
job_id = _job_id()
if use_celery:
try:
from .tasks import process_mcp_transcribe_task
except ImportError:
# Fallback: run synchronously
use_celery = False
if use_celery:
try:
process_mcp_transcribe_task.delay(
audio_path=str(file_path),
job_id=job_id,
language=language or None,
num_speakers=int(num_speakers) if num_speakers else None,
)
except Exception as e:
logger.error("Error enqueuing MCP job: %s", e)
_mcp_jobs[job_id] = {
"status": "error",
"message": f"Error enqueuing job: {e}",
"file_path": str(file_path),
}
return {
"job_id": job_id,
"status": "error",
"message": _mcp_jobs[job_id]["message"],
}
_mcp_jobs[job_id] = {
"status": "queued",
"message": "Job queued for processing.",
"file_path": str(file_path),
}
return {
"job_id": job_id,
"status": "queued",
"message": _mcp_jobs[job_id]["message"],
}
# Synchronous path
_mcp_jobs[job_id] = {
"status": "processing",
"message": "Transcription started (synchronous).",
"file_path": str(file_path),
}
def _run_sync():
try:
scraibe = Scraibe(verbose=False)
result = scraibe.transcribe(
audio_file=str(file_path),
language=language or None,
num_speakers=int(num_speakers) if num_speakers else None,
verbose=False,
for_export=True,
)
transcript_text = result.get("transcript", "")
segments = result.get("segments", [])
_mcp_jobs[job_id]["status"] = "completed"
_mcp_jobs[job_id]["transcript"] = transcript_text
_mcp_jobs[job_id]["segments"] = segments
_mcp_jobs[job_id]["message"] = "Transcription completed."
except Exception as e:
logger.error("MCP sync transcription error: %s", e)
_mcp_jobs[job_id]["status"] = "error"
_mcp_jobs[job_id]["message"] = f"Transcription error: {e}"
import threading
t = threading.Thread(target=_run_sync, daemon=True)
t.start()
return {
"job_id": job_id,
"status": "processing",
"message": _mcp_jobs[job_id]["message"],
}
@app.get("/transcribe/{job_id}/status")
async def get_status(job_id: str):
job = _mcp_jobs.get(job_id)
if not job:
raise HTTPException(status_code=404, detail="Job not found")
return {
"job_id": job_id,
"status": job["status"],
"message": job.get("message", ""),
}
@app.get("/transcribe/{job_id}/json")
async def get_json(job_id: str):
job = _mcp_jobs.get(job_id)
if not job:
raise HTTPException(status_code=404, detail="Job not found")
if job["status"] != "completed":
raise HTTPException(
status_code=400,
detail=f"Job not completed. Current status: {job['status']}",
)
transcript_text = job.get("transcript", "")
segments = job.get("segments", [])
return JSONResponse(
content={
"job_id": job_id,
"transcript": transcript_text,
"segments": segments,
}
)
+65
View File
@@ -504,3 +504,68 @@ def process_transcription_task(
if audio_path: if audio_path:
_remove_file(audio_path) _remove_file(audio_path)
logger.info("Cleanup completed for job %s.", task_id) logger.info("Cleanup completed for job %s.", task_id)
@celery_app.task(
name="scraibe.tasks.process_mcp_transcribe_task",
bind=True,
max_retries=1,
task_time_limit=14400,
task_soft_time_limit=13500,
)
def process_mcp_transcribe_task(
self,
audio_path: str,
job_id: str,
language: str,
num_speakers: int,
):
"""
Async task used by MCP-style API:
- Transcribe audio
- Store transcript + segments in shared MCP job store
- Clean up temporary file
"""
from .mcp_server import _mcp_jobs
log_level = os.getenv("LOG_LEVEL", "INFO")
setup_logging(level=log_level)
# Initialize status
_mcp_jobs.setdefault(
job_id,
{
"status": "processing",
"message": "Transcription started (async).",
"file_path": audio_path,
},
)
try:
scraibe = Scraibe(verbose=True)
result = scraibe.transcribe(
audio_file=audio_path,
language=language or None,
num_speakers=int(num_speakers) if num_speakers else None,
verbose=True,
for_export=True,
)
transcript_text = result.get("transcript", "")
segments = result.get("segments", [])
_mcp_jobs[job_id]["status"] = "completed"
_mcp_jobs[job_id]["transcript"] = transcript_text
_mcp_jobs[job_id]["segments"] = segments
_mcp_jobs[job_id]["message"] = "Transcription completed."
logger.info("MCP job %s completed.", job_id)
except Exception as e:
logger.error("MCP job %s failed: %s", job_id, e, exc_info=True)
_mcp_jobs[job_id]["status"] = "error"
_mcp_jobs[job_id]["message"] = f"Transcription error: {e}"
finally:
_remove_file(audio_path)
logger.info("MCP job %s cleanup completed.", job_id)