diff --git a/scraibe/__main__.py b/scraibe/__main__.py index e4b6d16..89e0b4f 100644 --- a/scraibe/__main__.py +++ b/scraibe/__main__.py @@ -3,10 +3,40 @@ Entrypoint for running ScrAIbe as a module: python -m scraibe -Always launches the Web GUI (Gradio), never the CLI. +Always launches the Web GUI (Gradio). +Optionally launches an MCP-style API server in parallel. """ +import os +import threading + from .webui import create_app + +def _run_mcp_server(): + """ + Run MCP server in a separate thread. + """ + import uvicorn + from . import mcp_server + + host = os.getenv("MCP_SERVER_HOST", "0.0.0.0") + port = int(os.getenv("MCP_SERVER_PORT", "8000")) + + uvicorn.run( + mcp_server.app, + host=host, + port=port, + log_level="info", + ) + + if __name__ == "__main__": + # Optionally start MCP server in background + mcp_enabled = os.getenv("MCP_SERVER_ENABLED", "false").strip().lower() in ("true", "1", "yes") + if mcp_enabled: + t = threading.Thread(target=_run_mcp_server, daemon=True) + t.start() + + # Always start WebUI (Gradio) create_app() diff --git a/scraibe/mcp_server.py b/scraibe/mcp_server.py new file mode 100644 index 0000000..bc52ea9 --- /dev/null +++ b/scraibe/mcp_server.py @@ -0,0 +1,205 @@ +""" +MCP-style HTTP server for ScrAIbe. + +- Exposes an OpenAPI-compliant endpoint for external LLMs to: + - Upload audio + - Receive transcript JSON (no summary) +- WebUI remains always enabled; this is additive. + +Configuration (env): +- MCP_SERVER_ENABLED: "true"/"false" (default: false) +- MCP_SERVER_HOST: bind address (default: 0.0.0.0) +- MCP_SERVER_PORT: port (default: 8000) +- MCP_USE_CELERY: "true"/"false" (default: true) + - If true, uses Celery tasks; if false, runs synchronously. +""" + +import os +import time +import uuid +import json +import logging +from typing import Optional + +from fastapi import FastAPI, UploadFile, File, Form, HTTPException +from fastapi.responses import JSONResponse + +from .autotranscript import Scraibe + +logger = logging.getLogger("scraibe.mcp_server") + +app = FastAPI( + title="ScrAIbe MCP Transcription API", + version="0.1.0", + description=( + "MCP-style HTTP API for ScrAIbe. " + "Allows external LLMs to upload audio and receive transcript JSON." + ), +) + +# In-memory job store for MCP (simple; can be replaced with Redis later) +_mcp_jobs: dict = {} + + +def _job_id() -> str: + return str(uuid.uuid4()) + + +@app.get("/health") +async def health(): + return {"status": "ok"} + + +@app.post("/transcribe") +async def transcribe( + file: UploadFile = File(...), + language: Optional[str] = Form(None), + num_speakers: Optional[int] = Form(None), +): + """ + Upload audio and start transcription. + + Returns: + { + "job_id": "", + "status": "queued" | "processing" | "completed" | "error", + "message": "..." + } + + Use GET /transcribe/{job_id}/status and /json to retrieve results. + """ + use_celery = os.getenv("MCP_USE_CELERY", "true").strip().lower() in ("true", "1", "yes") + + # Save uploaded file temporarily + try: + import tempfile + from pathlib import Path + + upload_dir = Path(os.getenv("SCRAIBE_UPLOAD_DIR", "/tmp/scraibe_uploads")) + upload_dir.mkdir(parents=True, exist_ok=True) + + ext = Path(file.filename or "file").suffix or ".wav" + ts = time.strftime("%Y%m%d%H%M%S") + tmp_name = f"mcp_upload_{ts}_{uuid.uuid4().hex[:8]}{ext}" + file_path = upload_dir / tmp_name + + content = await file.read() + file_path.write_bytes(content) + except Exception as e: + logger.error("Error saving MCP upload: %s", e) + raise HTTPException(status_code=500, detail=f"Error saving file: {e}") + + job_id = _job_id() + + if use_celery: + try: + from .tasks import process_mcp_transcribe_task + except ImportError: + # Fallback: run synchronously + use_celery = False + + if use_celery: + try: + process_mcp_transcribe_task.delay( + audio_path=str(file_path), + job_id=job_id, + language=language or None, + num_speakers=int(num_speakers) if num_speakers else None, + ) + except Exception as e: + logger.error("Error enqueuing MCP job: %s", e) + _mcp_jobs[job_id] = { + "status": "error", + "message": f"Error enqueuing job: {e}", + "file_path": str(file_path), + } + return { + "job_id": job_id, + "status": "error", + "message": _mcp_jobs[job_id]["message"], + } + + _mcp_jobs[job_id] = { + "status": "queued", + "message": "Job queued for processing.", + "file_path": str(file_path), + } + return { + "job_id": job_id, + "status": "queued", + "message": _mcp_jobs[job_id]["message"], + } + + # Synchronous path + _mcp_jobs[job_id] = { + "status": "processing", + "message": "Transcription started (synchronous).", + "file_path": str(file_path), + } + + def _run_sync(): + try: + scraibe = Scraibe(verbose=False) + result = scraibe.transcribe( + audio_file=str(file_path), + language=language or None, + num_speakers=int(num_speakers) if num_speakers else None, + verbose=False, + for_export=True, + ) + transcript_text = result.get("transcript", "") + segments = result.get("segments", []) + _mcp_jobs[job_id]["status"] = "completed" + _mcp_jobs[job_id]["transcript"] = transcript_text + _mcp_jobs[job_id]["segments"] = segments + _mcp_jobs[job_id]["message"] = "Transcription completed." + except Exception as e: + logger.error("MCP sync transcription error: %s", e) + _mcp_jobs[job_id]["status"] = "error" + _mcp_jobs[job_id]["message"] = f"Transcription error: {e}" + + import threading + t = threading.Thread(target=_run_sync, daemon=True) + t.start() + + return { + "job_id": job_id, + "status": "processing", + "message": _mcp_jobs[job_id]["message"], + } + + +@app.get("/transcribe/{job_id}/status") +async def get_status(job_id: str): + job = _mcp_jobs.get(job_id) + if not job: + raise HTTPException(status_code=404, detail="Job not found") + return { + "job_id": job_id, + "status": job["status"], + "message": job.get("message", ""), + } + + +@app.get("/transcribe/{job_id}/json") +async def get_json(job_id: str): + job = _mcp_jobs.get(job_id) + if not job: + raise HTTPException(status_code=404, detail="Job not found") + + if job["status"] != "completed": + raise HTTPException( + status_code=400, + detail=f"Job not completed. Current status: {job['status']}", + ) + + transcript_text = job.get("transcript", "") + segments = job.get("segments", []) + + return JSONResponse( + content={ + "job_id": job_id, + "transcript": transcript_text, + "segments": segments, + } + ) diff --git a/scraibe/tasks.py b/scraibe/tasks.py index df605f3..b670ee8 100644 --- a/scraibe/tasks.py +++ b/scraibe/tasks.py @@ -504,3 +504,68 @@ def process_transcription_task( if audio_path: _remove_file(audio_path) logger.info("Cleanup completed for job %s.", task_id) + + +@celery_app.task( + name="scraibe.tasks.process_mcp_transcribe_task", + bind=True, + max_retries=1, + task_time_limit=14400, + task_soft_time_limit=13500, +) +def process_mcp_transcribe_task( + self, + audio_path: str, + job_id: str, + language: str, + num_speakers: int, +): + """ + Async task used by MCP-style API: + - Transcribe audio + - Store transcript + segments in shared MCP job store + - Clean up temporary file + """ + from .mcp_server import _mcp_jobs + + log_level = os.getenv("LOG_LEVEL", "INFO") + setup_logging(level=log_level) + + # Initialize status + _mcp_jobs.setdefault( + job_id, + { + "status": "processing", + "message": "Transcription started (async).", + "file_path": audio_path, + }, + ) + + try: + scraibe = Scraibe(verbose=True) + result = scraibe.transcribe( + audio_file=audio_path, + language=language or None, + num_speakers=int(num_speakers) if num_speakers else None, + verbose=True, + for_export=True, + ) + + transcript_text = result.get("transcript", "") + segments = result.get("segments", []) + + _mcp_jobs[job_id]["status"] = "completed" + _mcp_jobs[job_id]["transcript"] = transcript_text + _mcp_jobs[job_id]["segments"] = segments + _mcp_jobs[job_id]["message"] = "Transcription completed." + + logger.info("MCP job %s completed.", job_id) + + except Exception as e: + logger.error("MCP job %s failed: %s", job_id, e, exc_info=True) + _mcp_jobs[job_id]["status"] = "error" + _mcp_jobs[job_id]["message"] = f"Transcription error: {e}" + + finally: + _remove_file(audio_path) + logger.info("MCP job %s cleanup completed.", job_id)