""" MCP-style HTTP server for ScrAIbe. - Exposes an OpenAPI-compliant endpoint for external LLMs to: - Upload audio - Receive transcript JSON (no summary) - WebUI remains always enabled; this is additive. Configuration (env): - MCP_SERVER_ENABLED: "true"/"false" (default: false) - MCP_SERVER_HOST: bind address (default: 0.0.0.0) - MCP_SERVER_PORT: port (default: 8000) - MCP_USE_CELERY: "true"/"false" (default: true) - If true, uses Celery tasks; if false, runs synchronously. """ import os import time import uuid import json import logging from typing import Optional from fastapi import FastAPI, UploadFile, File, Form, HTTPException from fastapi.responses import JSONResponse from .autotranscript import Scraibe logger = logging.getLogger("scraibe.mcp_server") app = FastAPI( title="ScrAIbe MCP Transcription API", version="0.1.0", description=( "MCP-style HTTP API for ScrAIbe. " "Allows external LLMs to upload audio and receive transcript JSON." ), ) # In-memory job store for MCP (simple; can be replaced with Redis later) _mcp_jobs: dict = {} def _job_id() -> str: return str(uuid.uuid4()) @app.get("/health") async def health(): return {"status": "ok"} @app.post("/transcribe") async def transcribe( file: UploadFile = File(...), language: Optional[str] = Form(None), num_speakers: Optional[int] = Form(None), ): """ Upload audio and start transcription. Returns: { "job_id": "", "status": "queued" | "processing" | "completed" | "error", "message": "..." } Use GET /transcribe/{job_id}/status and /json to retrieve results. """ use_celery = os.getenv("MCP_USE_CELERY", "true").strip().lower() in ("true", "1", "yes") # Save uploaded file temporarily try: import tempfile from pathlib import Path upload_dir = Path(os.getenv("SCRAIBE_UPLOAD_DIR", "/tmp/scraibe_uploads")) upload_dir.mkdir(parents=True, exist_ok=True) ext = Path(file.filename or "file").suffix or ".wav" ts = time.strftime("%Y%m%d%H%M%S") tmp_name = f"mcp_upload_{ts}_{uuid.uuid4().hex[:8]}{ext}" file_path = upload_dir / tmp_name content = await file.read() file_path.write_bytes(content) except Exception as e: logger.error("Error saving MCP upload: %s", e) raise HTTPException(status_code=500, detail=f"Error saving file: {e}") job_id = _job_id() if use_celery: try: from .tasks import process_mcp_transcribe_task except ImportError: # Fallback: run synchronously use_celery = False if use_celery: try: process_mcp_transcribe_task.delay( audio_path=str(file_path), job_id=job_id, language=language or None, num_speakers=int(num_speakers) if num_speakers else None, ) except Exception as e: logger.error("Error enqueuing MCP job: %s", e) _mcp_jobs[job_id] = { "status": "error", "message": f"Error enqueuing job: {e}", "file_path": str(file_path), } return { "job_id": job_id, "status": "error", "message": _mcp_jobs[job_id]["message"], } _mcp_jobs[job_id] = { "status": "queued", "message": "Job queued for processing.", "file_path": str(file_path), } return { "job_id": job_id, "status": "queued", "message": _mcp_jobs[job_id]["message"], } # Synchronous path _mcp_jobs[job_id] = { "status": "processing", "message": "Transcription started (synchronous).", "file_path": str(file_path), } def _run_sync(): try: scraibe = Scraibe(verbose=False) result = scraibe.transcribe( audio_file=str(file_path), language=language or None, num_speakers=int(num_speakers) if num_speakers else None, verbose=False, for_export=True, ) transcript_text = result.get("transcript", "") segments = result.get("segments", []) _mcp_jobs[job_id]["status"] = "completed" _mcp_jobs[job_id]["transcript"] = transcript_text _mcp_jobs[job_id]["segments"] = segments _mcp_jobs[job_id]["message"] = "Transcription completed." except Exception as e: logger.error("MCP sync transcription error: %s", e) _mcp_jobs[job_id]["status"] = "error" _mcp_jobs[job_id]["message"] = f"Transcription error: {e}" import threading t = threading.Thread(target=_run_sync, daemon=True) t.start() return { "job_id": job_id, "status": "processing", "message": _mcp_jobs[job_id]["message"], } @app.get("/transcribe/{job_id}/status") async def get_status(job_id: str): job = _mcp_jobs.get(job_id) if not job: raise HTTPException(status_code=404, detail="Job not found") return { "job_id": job_id, "status": job["status"], "message": job.get("message", ""), } @app.get("/transcribe/{job_id}/json") async def get_json(job_id: str): job = _mcp_jobs.get(job_id) if not job: raise HTTPException(status_code=404, detail="Job not found") if job["status"] != "completed": raise HTTPException( status_code=400, detail=f"Job not completed. Current status: {job['status']}", ) transcript_text = job.get("transcript", "") segments = job.get("segments", []) return JSONResponse( content={ "job_id": job_id, "transcript": transcript_text, "segments": segments, } )