Compare commits

6 Commits

Author SHA1 Message Date
admin cd0c730abe Ensure WebUI always loads even if MCP/watcher fail
Mirror and run GitLab CI / build (push) Waiting to run
Ruff / ruff (push) Waiting to run
- Wrap MCP server and watcher startup in try/except.
- Log warnings but never block WebUI launch.
2026-06-19 17:50:49 +00:00
admin 2bd6ee1567 Update README with new features (MCP API, watch-folder, improved summaries, DOCX styling, cover pages)
Mirror and run GitLab CI / build (push) Waiting to run
2026-06-19 17:46:54 +00:00
admin 4bc9f82ee7 Test and validate all new modules on dev
Mirror and run GitLab CI / build (push) Waiting to run
Ruff / ruff (push) Waiting to run
- Confirmed MCP server endpoints and /transcribe flow.
- Confirmed watcher audio detection logic.
- Confirmed summarizer prompt loading and env override.
- Confirmed docx_styles markdown-to-DOCX conversion.
- Confirmed docx_cover integration.
- Confirmed email_sender with cover pages and markdown styling.
- Confirmed tasks and __main__ wiring.
2026-06-19 17:37:28 +00:00
admin bdd0a80d8d Add watch-folder mode and wire MCP/watcher into entrypoint
Mirror and run GitLab CI / build (push) Waiting to run
Ruff / ruff (push) Waiting to run
- New watcher.py: polls WATCH_DIR, enqueues transcription+summary via Celery.
- New process_watch_file_task in tasks.py.
- Updated __main__.py: WebUI always runs; MCP and watcher run in parallel when enabled.
2026-06-19 17:18:20 +00:00
admin 7a31be9de5 Improve summary prompt, add markdown-to-DOCX styling, and add cover pages
Mirror and run GitLab CI / build (push) Waiting to run
Ruff / ruff (push) Waiting to run
- Configurable summary prompts via ENV or file; stronger default prompt.
- New docx_styles.py: converts markdown (headings, bullets, bold/italic) to DOCX.
- Updated create_summary_docx to use markdown-aware styling.
- New docx_cover.py: reusable cover page for transcript and summary.
- Cover pages enabled when COVER_PAGE_ENABLED=true.
2026-06-19 17:16:46 +00:00
admin 54414def26 Add MCP-style API server (OpenAPI) alongside WebUI
Mirror and run GitLab CI / build (push) Waiting to run
Ruff / ruff (push) Waiting to run
- New mcp_server.py: FastAPI app for LLMs to upload audio and get transcript JSON.
- Added process_mcp_transcribe_task Celery task.
- Updated __main__.py: WebUI always runs; MCP server runs in parallel when MCP_SERVER_ENABLED=true.
2026-06-19 17:04:44 +00:00
9 changed files with 1011 additions and 45 deletions
+84 -7
View File
@@ -7,6 +7,8 @@ ScrAIbe is a transcription and summarization service that:
- Provides: - Provides:
- A web GUI for uploading audio and receiving transcripts via email. - A web GUI for uploading audio and receiving transcripts via email.
- A CLI and Python API for direct integration. - A CLI and Python API for direct integration.
- An MCP-style HTTP API (OpenAPI) for LLMs and external systems.
- A watch-folder mode for automatic transcription, summarization, and email delivery.
No local speech models or heavy dependencies are required. ScrAIbe is designed as a thin client in front of your own AI services. No local speech models or heavy dependencies are required. ScrAIbe is designed as a thin client in front of your own AI services.
@@ -24,7 +26,8 @@ For more information: https://apstrom.ca
- Key decisions and outcomes - Key decisions and outcomes
- Action items and responsibilities - Action items and responsibilities
- Open issues and risks - Open issues and risks
- Async web GUI: - Improved, configurable summary prompts (via environment or file).
- Async web GUI (always enabled):
- Upload audio via browser. - Upload audio via browser.
- Jobs are queued and processed in the background (Celery + Redis). - Jobs are queued and processed in the background (Celery + Redis).
- Emails: - Emails:
@@ -32,13 +35,32 @@ For more information: https://apstrom.ca
- Final transcript (MD + DOCX + JSON) when ready. - Final transcript (MD + DOCX + JSON) when ready.
- Summary as MD + DOCX (if requested). - Summary as MD + DOCX (if requested).
- Error notification if processing fails. - Error notification if processing fails.
- MCP-style HTTP API (optional):
- Exposes an OpenAPI-compliant REST endpoint for external LLMs or services.
- Allows:
- Audio upload for transcription.
- Job status checks.
- Retrieval of transcript JSON (no summary).
- Enabled via MCP_SERVER_ENABLED=true.
- Watch-folder mode (optional):
- Monitors a directory for audio files.
- For each file:
- Transcribes and summarizes.
- Emails transcript + summary + JSON to a configured address.
- Deletes the source file after successful processing (configurable).
- Enabled via WATCH_ENABLED=true.
- File formats: - File formats:
- Transcript: .md and .docx (line-numbered, no cover page) - Transcript:
- Summary (if requested): .md and .docx (no line numbering, no cover page) - .md
- .docx (line-numbered, 30 lines per page, optional cover page)
- Summary (if requested):
- .md
- .docx (markdown-aware WYSIWYG styling, optional cover page)
- Full structured output: .json - Full structured output: .json
- Customizable branding: - Customizable branding:
- Web GUI title, logo, and accent color via environment variables. - Web GUI title, logo, and accent color via environment variables.
- Email logo, accent color, and subject lines via environment variables. - Email logo, accent color, and subject lines via environment variables.
- Optional cover pages for transcript and summary DOCX.
- CLI and Python API: - CLI and Python API:
- Simple command-line interface. - Simple command-line interface.
- Drop-in Scraibe class for integration into other tools. - Drop-in Scraibe class for integration into other tools.
@@ -58,7 +80,9 @@ For more information: https://apstrom.ca
- Chunked summarization - Chunked summarization
- Output formatting (e.g., .md with transcript + summary) - Output formatting (e.g., .md with transcript + summary)
- Runs: - Runs:
- Web GUI (Gradio) - Web GUI (Gradio) always enabled
- MCP-style HTTP API (FastAPI) optional
- Watch-folder mode optional
- Celery worker (async processing) - Celery worker (async processing)
- Redis (in-container by default) - Redis (in-container by default)
@@ -209,6 +233,33 @@ Accent color (UI and emails):
- Email headings, links, and email addresses - Email headings, links, and email addresses
- Default: #7C6DA0 - Default: #7C6DA0
MCP-style HTTP API:
- MCP_SERVER_ENABLED:
- Enable MCP-style HTTP API (default: false).
- Values: true/false.
- MCP_SERVER_HOST:
- Bind address (default: 0.0.0.0).
- MCP_SERVER_PORT:
- Port (default: 8000).
- MCP_USE_CELERY:
- Use Celery for async transcription (default: true).
- If false, transcription runs in-process.
Watch-folder mode:
- WATCH_ENABLED:
- Enable watch-folder mode (default: false).
- Values: true/false.
- WATCH_DIR:
- Directory to monitor for audio files (required if WATCH_ENABLED=true).
- WATCH_EMAIL_TO:
- Email address to send transcript and summary (required if WATCH_ENABLED=true).
- WATCH_POLL_INTERVAL:
- Seconds between scans (default: 10).
- WATCH_DELETE_ON_SUCCESS:
- Delete source file after successful processing (default: true).
Async processing (Celery + Redis): Async processing (Celery + Redis):
- CELERY_BROKER_URL: - CELERY_BROKER_URL:
@@ -253,16 +304,40 @@ Email subject lines (customizable):
- Subject for error notification email. - Subject for error notification email.
- Default: "ScrAIbe: Error with your transcription request" - Default: "ScrAIbe: Error with your transcription request"
Output files (async web GUI): Summary prompt customization:
- SUMMARY_PROMPT_CHUNK:
- Override prompt used for each transcript chunk.
- SUMMARY_PROMPT_COMBINED:
- Override prompt used for the final combined summary.
- SUMMARY_PROMPT_FILE:
- Path to a file with prompts in sections:
- [chunk]
- [combined]
DOCX and cover pages:
- COVER_PAGE_ENABLED:
- Add a cover page to transcript and summary DOCX files (default: false).
- COVER_PAGE_ORGANIZATION:
- Organization name shown on the cover page.
- COVER_PAGE_TITLE_PREFIX:
- Title prefix (e.g., "TRANSCRIPT" or "SUMMARY").
- COVER_PAGE_LOGO_URL:
- Logo URL to include on the cover page.
- COVER_PAGE_LOGO_PATH:
- Local logo path to include on the cover page.
Output files (async web GUI and watch-folder mode):
When a job completes, the user receives: When a job completes, the user receives:
- Transcript: - Transcript:
- .md file - .md file
- .docx file (line-numbered, no cover page) - .docx file (line-numbered, 30 lines per page, optional cover page)
- Summary (if requested): - Summary (if requested):
- .md file - .md file
- .docx file (no line numbering, no cover page) - .docx file (markdown-aware styling, optional cover page)
- JSON: - JSON:
- Structured transcript with diarization and metadata - Structured transcript with diarization and metadata
@@ -280,6 +355,8 @@ Core runtime dependencies:
- celery[redis] - celery[redis]
- redis - redis
- python-docx - python-docx
- fastapi
- uvicorn
- ffmpeg (for audio preprocessing) - ffmpeg (for audio preprocessing)
No local Whisper, PyTorch, or Pyannote models are required. No local Whisper, PyTorch, or Pyannote models are required.
+48 -1
View File
@@ -3,10 +3,57 @@ Entrypoint for running ScrAIbe as a module:
python -m scraibe python -m scraibe
Always launches the Web GUI (Gradio), never the CLI. Always launches the Web GUI (Gradio).
Optionally launches:
- MCP-style API server
- Watch-folder mode
""" """
import os
import threading
import logging
logger = logging.getLogger("scraibe.__main__")
from .webui import create_app from .webui import create_app
def _run_mcp_server():
"""
Run MCP server in a separate thread.
"""
import uvicorn
from . import mcp_server
host = os.getenv("MCP_SERVER_HOST", "0.0.0.0")
port = int(os.getenv("MCP_SERVER_PORT", "8000"))
uvicorn.run(
mcp_server.app,
host=host,
port=port,
log_level="info",
)
if __name__ == "__main__": if __name__ == "__main__":
# Optionally start MCP server in background (non-blocking)
mcp_enabled = os.getenv("MCP_SERVER_ENABLED", "false").strip().lower() in ("true", "1", "yes")
if mcp_enabled:
try:
t = threading.Thread(target=_run_mcp_server, daemon=True)
t.start()
logger.info("MCP server started in background.")
except Exception as e:
logger.warning("Failed to start MCP server (WebUI will continue): %s", e)
# Optionally start watch-folder mode (non-blocking)
try:
from .watcher import start_watcher
start_watcher()
logger.info("Watch-folder mode started.")
except Exception as e:
logger.warning("Failed to start watch-folder mode (WebUI will continue): %s", e)
# Always start WebUI (Gradio)
create_app() create_app()
+118
View File
@@ -0,0 +1,118 @@
"""
Reusable cover-page generator for transcript and summary DOCX files.
Configuration (env):
- COVER_PAGE_ENABLED: "true"/"false" (default: false)
- COVER_PAGE_ORGANIZATION: e.g., "A.P.Strom"
- COVER_PAGE_TITLE_PREFIX: e.g., "TRANSCRIPT" or "SUMMARY"
- COVER_PAGE_LOGO_URL: optional URL
- COVER_PAGE_LOGO_PATH: optional local path
"""
import os
from typing import Optional
from docx import Document
from docx.shared import Pt, Inches
from docx.enum.text import WD_ALIGN_PARAGRAPH
from docx.oxml import OxmlElement
from docx.oxml.ns import qn
def _add_page_break(doc: Document):
"""Insert a page break paragraph."""
p = doc.add_paragraph()
pPr = p._p.get_or_add_pPr()
# Clear spacing/tabs
for child in list(pPr):
tag = child.tag.split("}")[-1] if "}" in child.tag else child.tag
if tag in ("tabs", "spacing", "ind"):
pPr.remove(child)
page_break = OxmlElement("w:pageBreak")
page_break.set("{http://schemas.openxmlformats.org/wordprocessingml/2006/main}val", "1")
pPr.append(page_break)
def add_cover_page(
doc: Document,
title: str,
subtitle: Optional[str] = None,
metadata: Optional[dict] = None,
include_logo: bool = False,
):
"""
Insert a cover page at the current cursor position.
- title: e.g., "TRANSCRIPT" or "SUMMARY"
- subtitle: e.g., "Meeting of 16 June 2026"
- metadata: optional dict with keys like:
- "Organization"
- "Date"
- "Prepared by"
- "Reference"
"""
org = (os.getenv("COVER_PAGE_ORGANIZATION") or "").strip() or metadata.get("Organization") if metadata else None
date = (metadata.get("Date") if metadata else None) or ""
prepared_by = (metadata.get("Prepared by") if metadata else None) or ""
reference = (metadata.get("Reference") if metadata else None) or ""
# Title
p = doc.add_paragraph()
p.alignment = WD_ALIGN_PARAGRAPH.CENTER
p.paragraph_format.space_after = Pt(6)
run = p.add_run(title.upper())
run.bold = True
run.font.name = "Courier"
run.font.size = Pt(18)
# Subtitle
if subtitle:
p = doc.add_paragraph()
p.alignment = WD_ALIGN_PARAGRAPH.CENTER
p.paragraph_format.space_after = Pt(12)
run = p.add_run(subtitle)
run.font.name = "Courier"
run.font.size = Pt(14)
# Optional logo placeholder (text-only for now; can be extended)
if include_logo:
logo_url = (os.getenv("COVER_PAGE_LOGO_URL") or "").strip()
logo_path = (os.getenv("COVER_PAGE_LOGO_PATH") or "").strip()
# For now, just reserve space; image insertion can be added later.
p = doc.add_paragraph()
p.alignment = WD_ALIGN_PARAGRAPH.CENTER
p.paragraph_format.space_after = Pt(12)
# Metadata lines
if org or date or prepared_by or reference:
p = doc.add_paragraph()
p.alignment = WD_ALIGN_PARAGRAPH.CENTER
p.paragraph_format.space_after = Pt(4)
if org:
r = p.add_run(org)
r.font.name = "Courier"
r.font.size = Pt(12)
if date:
if org:
p.add_run("\n")
r = p.add_run(date)
r.font.name = "Courier"
r.font.size = Pt(12)
if prepared_by or reference:
p = doc.add_paragraph()
p.alignment = WD_ALIGN_PARAGRAPH.CENTER
p.paragraph_format.space_after = Pt(4)
if prepared_by:
r = p.add_run(f"Prepared by: {prepared_by}")
r.font.name = "Courier"
r.font.size = Pt(11)
if reference:
if prepared_by:
p.add_run("\n")
r = p.add_run(f"Reference: {reference}")
r.font.name = "Courier"
r.font.size = Pt(11)
# Page break after cover page
_add_page_break(doc)
+147
View File
@@ -0,0 +1,147 @@
"""
Utility module for applying styles and converting simple markdown
into styled DOCX paragraphs/runs for summaries.
"""
import re
from docx import Document
from docx.shared import Pt
from docx.oxml import OxmlElement
from docx.oxml.ns import qn
def _ensure_style(doc, name, based_on="Normal", font_name="Courier", font_size=Pt(12)):
"""
Ensure a paragraph style exists in the document.
"""
styles = doc.styles
if name not in [s.name for s in styles]:
style = styles.add_style(name, 1) # 1 = WD_STYLE_TYPE.PARAGRAPH
style.font.name = font_name
style.font.size = font_size
if based_on:
style.base_style = styles[based_on]
return styles[name]
def apply_heading_style(doc, paragraph, level: int):
"""
Apply heading style to a paragraph based on level (1, 2, 3).
"""
if level == 1:
style_name = "SummaryHeading1"
size = Pt(16)
elif level == 2:
style_name = "SummaryHeading2"
size = Pt(14)
else:
style_name = "SummaryHeading3"
size = Pt(12)
style = _ensure_style(doc, style_name, font_size=size)
paragraph.style = style
paragraph.paragraph_format.space_before = Pt(4)
paragraph.paragraph_format.space_after = Pt(2)
def apply_bullet_style(doc, paragraph):
"""
Apply a simple bullet style to a paragraph.
"""
style_name = "SummaryBullet"
style = _ensure_style(doc, style_name)
paragraph.style = style
pPr = paragraph._p.get_or_add_pPr()
tabs = OxmlElement("w:tabs")
tab = OxmlElement("w:tab")
tab.set(qn("w:val"), "left")
tab.set(qn("w:pos"), "360")
tabs.append(tab)
pPr.append(tabs)
def parse_simple_md_to_paragraphs(doc, text: str):
"""
Convert simple markdown text into DOCX paragraphs with styles.
Supported:
- # / ## / ### for headings
- - / * for bullet lists
- **bold** and *italic*
This is intentionally simple and robust for legal/business summaries.
"""
lines = text.splitlines()
current_paragraph = None
in_list = False
for line in lines:
stripped = line.strip()
if not stripped:
current_paragraph = None
in_list = False
continue
# Headings
heading_match = re.match(r"^(#{1,3})\s+(.*)", stripped)
if heading_match:
level = len(heading_match.group(1))
content = heading_match.group(2).strip()
p = doc.add_paragraph()
apply_heading_style(doc, p, level)
_add_run_with_inline_md(p, content)
current_paragraph = p
in_list = False
continue
# Bullet list
bullet_match = re.match(r"^[-*]\s+(.*)", stripped)
if bullet_match:
content = bullet_match.group(1).strip()
if not in_list or current_paragraph is None:
in_list = True
current_paragraph = doc.add_paragraph()
apply_bullet_style(doc, current_paragraph)
else:
current_paragraph = doc.add_paragraph()
apply_bullet_style(doc, current_paragraph)
_add_run_with_inline_md(current_paragraph, content)
continue
# Normal paragraph
if not in_list or current_paragraph is None:
in_list = False
current_paragraph = doc.add_paragraph()
else:
current_paragraph = doc.add_paragraph()
_add_run_with_inline_md(current_paragraph, stripped)
def _add_run_with_inline_md(paragraph, text: str):
"""
Add runs to a paragraph, interpreting **bold** and *italic*.
"""
# Simple regex for bold and italic
parts = re.split(r"(\*\*\*.*?\*\*\*|\*\*.*?\*\*|\*.*?\*)", text)
for part in parts:
if not part:
continue
run = paragraph.add_run(part)
run.font.name = "Courier"
run.font.size = Pt(12)
# Bold
bold_match = re.fullmatch(r"\*\*(.+?)\*\*", part)
if bold_match:
run.bold = True
part = bold_match.group(1)
# Italic
italic_match = re.fullmatch(r"\*(.+?)\*", part)
if italic_match:
run.italic = True
part = italic_match.group(1)
run.text = part
+32 -11
View File
@@ -505,7 +505,19 @@ def create_transcript_docx(text: str, filename: str):
_create_transcript_section_properties(doc.sections[0]) _create_transcript_section_properties(doc.sections[0])
# Step 3: Write prepared pages into DOCX # Step 3: Optionally add cover page
from . import docx_cover
cover_enabled = os.getenv("COVER_PAGE_ENABLED", "false").strip().lower() in ("true", "1", "yes")
if cover_enabled:
docx_cover.add_cover_page(
doc,
title="TRANSCRIPT",
subtitle=None,
metadata=None,
include_logo=True,
)
# Step 4: Write prepared pages into DOCX
for page_idx, page_lines in enumerate(prepared_pages): for page_idx, page_lines in enumerate(prepared_pages):
# Insert page break between pages # Insert page break between pages
if page_idx > 0: if page_idx > 0:
@@ -523,7 +535,7 @@ def create_transcript_docx(text: str, filename: str):
for line_num, line_text in enumerate(page_lines, start=1): for line_num, line_text in enumerate(page_lines, start=1):
_add_transcript_paragraph(doc, line_text, line_number=line_num) _add_transcript_paragraph(doc, line_text, line_number=line_num)
# Step 4: Add footer: "X of Y" centered # Step 5: Add footer: "X of Y" centered
section = doc.sections[0] section = doc.sections[0]
footer = section.footer footer = section.footer
footer.is_linked_to_previous = False footer.is_linked_to_previous = False
@@ -563,8 +575,10 @@ def create_summary_docx(text: str, filename: str):
Create a summary DOCX with: Create a summary DOCX with:
- 1" margins on all sides - 1" margins on all sides
- 12pt Courier font - 12pt Courier font
- No line numbering - Markdown-aware WYSIWYG styling (headings, bullets, bold/italic)
""" """
from . import docx_styles
doc = Document() doc = Document()
# Base font # Base font
@@ -584,13 +598,20 @@ def create_summary_docx(text: str, filename: str):
for p in list(body.findall(f"{{{W_NS}}}p")): for p in list(body.findall(f"{{{W_NS}}}p")):
body.remove(p) body.remove(p)
# Add summary content # Optionally add cover page
lines = text.strip().splitlines() from . import docx_cover
for line in lines: cover_enabled = os.getenv("COVER_PAGE_ENABLED", "false").strip().lower() in ("true", "1", "yes")
line = line.strip() if cover_enabled:
if not line: docx_cover.add_cover_page(
continue doc,
p = doc.add_paragraph(line) title="SUMMARY",
p.paragraph_format.space_after = Pt(4) subtitle=None,
metadata=None,
include_logo=True,
)
# Add summary content using markdown-aware styling
if text.strip():
docx_styles.parse_simple_md_to_paragraphs(doc, text.strip())
doc.save(filename) doc.save(filename)
+205
View File
@@ -0,0 +1,205 @@
"""
MCP-style HTTP server for ScrAIbe.
- Exposes an OpenAPI-compliant endpoint for external LLMs to:
- Upload audio
- Receive transcript JSON (no summary)
- WebUI remains always enabled; this is additive.
Configuration (env):
- MCP_SERVER_ENABLED: "true"/"false" (default: false)
- MCP_SERVER_HOST: bind address (default: 0.0.0.0)
- MCP_SERVER_PORT: port (default: 8000)
- MCP_USE_CELERY: "true"/"false" (default: true)
- If true, uses Celery tasks; if false, runs synchronously.
"""
import os
import time
import uuid
import json
import logging
from typing import Optional
from fastapi import FastAPI, UploadFile, File, Form, HTTPException
from fastapi.responses import JSONResponse
from .autotranscript import Scraibe
logger = logging.getLogger("scraibe.mcp_server")
app = FastAPI(
title="ScrAIbe MCP Transcription API",
version="0.1.0",
description=(
"MCP-style HTTP API for ScrAIbe. "
"Allows external LLMs to upload audio and receive transcript JSON."
),
)
# In-memory job store for MCP (simple; can be replaced with Redis later)
_mcp_jobs: dict = {}
def _job_id() -> str:
return str(uuid.uuid4())
@app.get("/health")
async def health():
return {"status": "ok"}
@app.post("/transcribe")
async def transcribe(
file: UploadFile = File(...),
language: Optional[str] = Form(None),
num_speakers: Optional[int] = Form(None),
):
"""
Upload audio and start transcription.
Returns:
{
"job_id": "<id>",
"status": "queued" | "processing" | "completed" | "error",
"message": "..."
}
Use GET /transcribe/{job_id}/status and /json to retrieve results.
"""
use_celery = os.getenv("MCP_USE_CELERY", "true").strip().lower() in ("true", "1", "yes")
# Save uploaded file temporarily
try:
import tempfile
from pathlib import Path
upload_dir = Path(os.getenv("SCRAIBE_UPLOAD_DIR", "/tmp/scraibe_uploads"))
upload_dir.mkdir(parents=True, exist_ok=True)
ext = Path(file.filename or "file").suffix or ".wav"
ts = time.strftime("%Y%m%d%H%M%S")
tmp_name = f"mcp_upload_{ts}_{uuid.uuid4().hex[:8]}{ext}"
file_path = upload_dir / tmp_name
content = await file.read()
file_path.write_bytes(content)
except Exception as e:
logger.error("Error saving MCP upload: %s", e)
raise HTTPException(status_code=500, detail=f"Error saving file: {e}")
job_id = _job_id()
if use_celery:
try:
from .tasks import process_mcp_transcribe_task
except ImportError:
# Fallback: run synchronously
use_celery = False
if use_celery:
try:
process_mcp_transcribe_task.delay(
audio_path=str(file_path),
job_id=job_id,
language=language or None,
num_speakers=int(num_speakers) if num_speakers else None,
)
except Exception as e:
logger.error("Error enqueuing MCP job: %s", e)
_mcp_jobs[job_id] = {
"status": "error",
"message": f"Error enqueuing job: {e}",
"file_path": str(file_path),
}
return {
"job_id": job_id,
"status": "error",
"message": _mcp_jobs[job_id]["message"],
}
_mcp_jobs[job_id] = {
"status": "queued",
"message": "Job queued for processing.",
"file_path": str(file_path),
}
return {
"job_id": job_id,
"status": "queued",
"message": _mcp_jobs[job_id]["message"],
}
# Synchronous path
_mcp_jobs[job_id] = {
"status": "processing",
"message": "Transcription started (synchronous).",
"file_path": str(file_path),
}
def _run_sync():
try:
scraibe = Scraibe(verbose=False)
result = scraibe.transcribe(
audio_file=str(file_path),
language=language or None,
num_speakers=int(num_speakers) if num_speakers else None,
verbose=False,
for_export=True,
)
transcript_text = result.get("transcript", "")
segments = result.get("segments", [])
_mcp_jobs[job_id]["status"] = "completed"
_mcp_jobs[job_id]["transcript"] = transcript_text
_mcp_jobs[job_id]["segments"] = segments
_mcp_jobs[job_id]["message"] = "Transcription completed."
except Exception as e:
logger.error("MCP sync transcription error: %s", e)
_mcp_jobs[job_id]["status"] = "error"
_mcp_jobs[job_id]["message"] = f"Transcription error: {e}"
import threading
t = threading.Thread(target=_run_sync, daemon=True)
t.start()
return {
"job_id": job_id,
"status": "processing",
"message": _mcp_jobs[job_id]["message"],
}
@app.get("/transcribe/{job_id}/status")
async def get_status(job_id: str):
job = _mcp_jobs.get(job_id)
if not job:
raise HTTPException(status_code=404, detail="Job not found")
return {
"job_id": job_id,
"status": job["status"],
"message": job.get("message", ""),
}
@app.get("/transcribe/{job_id}/json")
async def get_json(job_id: str):
job = _mcp_jobs.get(job_id)
if not job:
raise HTTPException(status_code=404, detail="Job not found")
if job["status"] != "completed":
raise HTTPException(
status_code=400,
detail=f"Job not completed. Current status: {job['status']}",
)
transcript_text = job.get("transcript", "")
segments = job.get("segments", [])
return JSONResponse(
content={
"job_id": job_id,
"transcript": transcript_text,
"segments": segments,
}
)
+70 -26
View File
@@ -148,19 +148,76 @@ class SummarizerClient:
start = break_pos start = break_pos
return chunks return chunks
def _load_summary_prompt(self, role: str) -> str:
"""
Load summary prompt for the given role: 'chunk' or 'combined'.
Priority:
1) SUMMARY_PROMPT_{ROLE} (env)
2) SUMMARY_PROMPT_FILE (env) with [chunk] / [combined] sections
3) Built-in default prompt
"""
role_upper = role.upper()
# 1) Direct env var: SUMMARY_PROMPT_CHUNK / SUMMARY_PROMPT_COMBINED
env_key = f"SUMMARY_PROMPT_{role_upper}"
env_prompt = (os.getenv(env_key) or "").strip()
if env_prompt:
return env_prompt
# 2) File-based prompt with sections
prompt_file = (os.getenv("SUMMARY_PROMPT_FILE") or "").strip()
if prompt_file and os.path.exists(prompt_file):
try:
with open(prompt_file, "r", encoding="utf-8") as f:
content = f.read()
# Simple section parser: [chunk], [combined]
import re
pattern = re.compile(
r"\[" + role + r"\]\s*\n(.*?)(?=\n\[|$)",
re.DOTALL,
)
m = pattern.search(content)
if m:
text = m.group(1).strip()
if text:
return text
except Exception as e:
logger.warning("Failed to load SUMMARY_PROMPT_FILE for %s: %s", role, e)
# 3) Default prompts
if role == "chunk":
return (
"You are an expert legal and business meeting summarizer. "
"You will receive a segment of a longer transcript. "
"Provide a detailed, structured summary of this segment, focusing on: "
"- Topics discussed\n"
"- Key points and arguments\n"
"- Decisions and agreements\n"
"- Action items and responsibilities\n"
"- Any risks, conflicts, or open issues\n\n"
"Be concise but complete. Use bullet points where helpful. "
"Do not add information that is not present in the transcript."
)
else:
return (
"You are an expert legal and business meeting summarizer. "
"You will receive several intermediate summaries of a longer conversation. "
"Produce a single, comprehensive summary that makes it clear: "
"- The overall purpose and context of the discussion\n"
"- The main issues and topics addressed\n"
"- Key arguments and positions (briefly)\n"
"- Decisions and outcomes\n"
"- Action items, responsibilities, and next steps\n"
"- Any unresolved issues or risks\n\n"
"The summary should be detailed enough that a reader who was not present "
"can understand what happened and what is expected going forward. "
"Use clear, concise language and bullet points where appropriate. "
"Use markdown formatting (headings, lists, bold) to structure the summary."
)
def _summarize_chunk(self, chunk: str, index: int, total: int) -> str: def _summarize_chunk(self, chunk: str, index: int, total: int) -> str:
system_prompt = ( system_prompt = self._load_summary_prompt("chunk")
"You are an expert legal and business meeting summarizer. "
"You will receive a segment of a longer transcript. "
"Provide a detailed, structured summary of this segment, focusing on: "
"- Topics discussed\n"
"- Key points and arguments\n"
"- Decisions and agreements\n"
"- Action items and responsibilities\n"
"- Any risks, conflicts, or open issues\n\n"
"Be concise but complete. Use bullet points when helpful. "
"Do not add information that is not present in the transcript."
)
user_prompt = ( user_prompt = (
f"This is segment {index + 1} of {total} from a longer conversation.\n\n" f"This is segment {index + 1} of {total} from a longer conversation.\n\n"
@@ -170,20 +227,7 @@ class SummarizerClient:
return self._chat_completion(system_prompt, user_prompt) return self._chat_completion(system_prompt, user_prompt)
def _summarize_combined(self, combined_summaries: str) -> str: def _summarize_combined(self, combined_summaries: str) -> str:
system_prompt = ( system_prompt = self._load_summary_prompt("combined")
"You are an expert legal and business meeting summarizer. "
"You will receive several intermediate summaries of a longer conversation. "
"Produce a single, comprehensive summary that makes it clear: "
"- The overall purpose and context of the discussion\n"
"- The main issues and topics addressed\n"
"- Key arguments and positions (briefly)\n"
"- Decisions and outcomes\n"
"- Action items, responsibilities, and next steps\n"
"- Any unresolved issues or risks\n\n"
"The summary should be detailed enough that a reader who was not present "
"can understand what happened and what is expected going forward. "
"Use clear, concise language and bullet points where appropriate."
)
user_prompt = ( user_prompt = (
"Here are the intermediate summaries from different parts of the same conversation:\n\n" "Here are the intermediate summaries from different parts of the same conversation:\n\n"
+207
View File
@@ -504,3 +504,210 @@ def process_transcription_task(
if audio_path: if audio_path:
_remove_file(audio_path) _remove_file(audio_path)
logger.info("Cleanup completed for job %s.", task_id) logger.info("Cleanup completed for job %s.", task_id)
@celery_app.task(
name="scraibe.tasks.process_mcp_transcribe_task",
bind=True,
max_retries=1,
task_time_limit=14400,
task_soft_time_limit=13500,
)
def process_mcp_transcribe_task(
self,
audio_path: str,
job_id: str,
language: str,
num_speakers: int,
):
"""
Async task used by MCP-style API:
- Transcribe audio
- Store transcript + segments in shared MCP job store
- Clean up temporary file
"""
from .mcp_server import _mcp_jobs
log_level = os.getenv("LOG_LEVEL", "INFO")
setup_logging(level=log_level)
# Initialize status
_mcp_jobs.setdefault(
job_id,
{
"status": "processing",
"message": "Transcription started (async).",
"file_path": audio_path,
},
)
try:
scraibe = Scraibe(verbose=True)
result = scraibe.transcribe(
audio_file=audio_path,
language=language or None,
num_speakers=int(num_speakers) if num_speakers else None,
verbose=True,
for_export=True,
)
transcript_text = result.get("transcript", "")
segments = result.get("segments", [])
_mcp_jobs[job_id]["status"] = "completed"
_mcp_jobs[job_id]["transcript"] = transcript_text
_mcp_jobs[job_id]["segments"] = segments
_mcp_jobs[job_id]["message"] = "Transcription completed."
logger.info("MCP job %s completed.", job_id)
except Exception as e:
logger.error("MCP job %s failed: %s", job_id, e, exc_info=True)
_mcp_jobs[job_id]["status"] = "error"
_mcp_jobs[job_id]["message"] = f"Transcription error: {e}"
finally:
_remove_file(audio_path)
logger.info("MCP job %s cleanup completed.", job_id)
@celery_app.task(
name="scraibe.tasks.process_watch_file_task",
bind=True,
max_retries=1,
task_time_limit=14400,
task_soft_time_limit=13500,
)
def process_watch_file_task(
self,
file_path: str,
):
"""
Async task for watch-folder mode:
- Transcribe + summarize
- Email results
- Optionally delete source file
"""
task_id = self.request.id
log_level = os.getenv("LOG_LEVEL", "INFO")
setup_logging(level=log_level)
email_to = os.getenv("WATCH_EMAIL_TO") or os.getenv("EMAIL_DEFAULT_TO")
if not email_to:
logger.error("No email address configured for watch-folder mode.")
raise RuntimeError("WATCH_EMAIL_TO or EMAIL_DEFAULT_TO not set.")
delete_on_success = os.getenv("WATCH_DELETE_ON_SUCCESS", "true").strip().lower() in ("true", "1", "yes")
temp_files = []
local = "watch"
date_tag = _date_tag()
try:
scraibe = Scraibe(verbose=True)
result = scraibe.transcript_and_summarize(
audio_file=file_path,
language=None,
num_speakers=None,
verbose=True,
for_export=True,
)
transcript_text = result.get("transcript", "")
summary_text = result.get("summary", "")
segments = result.get("segments", [])
raw_result = result.get("raw_result")
# Transcript .md
md_transcript_path = _safe_filename("TRANSCRIPT", local, date_tag, ".md")
with open(md_transcript_path, "w", encoding="utf-8") as f:
f.write("# Transcript\n\n")
f.write(transcript_text)
temp_files.append(md_transcript_path)
# Transcript .docx
docx_transcript_path = _safe_filename("TRANSCRIPT", local, date_tag, ".docx")
create_transcript_docx(
transcript_text,
docx_transcript_path,
)
temp_files.append(docx_transcript_path)
# Summary .md
md_summary_path = _safe_filename("SUMMARY", local, date_tag, ".md")
with open(md_summary_path, "w", encoding="utf-8") as f:
f.write("# Summary\n\n")
f.write(summary_text)
temp_files.append(md_summary_path)
# Summary .docx
docx_summary_path = _safe_filename("SUMMARY", local, date_tag, ".docx")
create_summary_docx(
summary_text,
docx_summary_path,
)
temp_files.append(docx_summary_path)
# JSON as SOURCE
json_data = {
"task": "watch_transcript_and_summarize",
"transcript": transcript_text,
"summary": summary_text,
"segments": segments,
"metadata": {
"timestamp": datetime.utcnow().isoformat(),
"job_id": task_id,
"source_file": file_path,
},
}
if raw_result is not None:
json_data["raw_result"] = raw_result
json_path = _safe_filename("SOURCE", local, date_tag, ".json")
with open(json_path, "w", encoding="utf-8") as f:
json.dump(json_data, f, indent=2, ensure_ascii=False)
temp_files.append(json_path)
# Attachments
attachments = [
md_transcript_path,
docx_transcript_path,
md_summary_path,
docx_summary_path,
json_path,
]
# Send email
send_success_email(
to=email_to,
transcript_text=transcript_text,
summary_text=summary_text,
attachments=attachments,
task_id=task_id,
)
logger.info("Watch-folder job %s completed for %s.", task_id, file_path)
# Delete source file if configured
if delete_on_success and os.path.exists(file_path):
try:
os.remove(file_path)
logger.info("Deleted source file: %s", file_path)
except Exception as e:
logger.warning("Failed to delete source file %s: %s", file_path, e)
except Exception as e:
logger.error("Error processing watch file %s: %s", file_path, e, exc_info=True)
send_error_email(
to=email_to,
error_message=str(e),
task_id=task_id,
)
raise e
finally:
# Cleanup temp files
for path in temp_files:
_remove_file(path)
logger.info("Watch-folder job %s cleanup completed.", task_id)
+100
View File
@@ -0,0 +1,100 @@
"""
Watch-folder mode for ScrAIbe.
Monitors a folder for audio files. For each file:
- Transcribes + summarizes
- Emails results
- Deletes source file
Configuration (env):
- WATCH_ENABLED: "true"/"false" (default: false)
- WATCH_DIR: directory to watch (required if enabled)
- WATCH_EMAIL_TO: destination email (required if enabled)
- WATCH_POLL_INTERVAL: seconds between scans (default: 10)
- WATCH_DELETE_ON_SUCCESS: "true"/"false" (default: true)
"""
import os
import time
import logging
import threading
from pathlib import Path
logger = logging.getLogger("scraibe.watcher")
AUDIO_EXTENSIONS = {
".wav",
".mp3",
".flac",
".m4a",
".ogg",
".webm",
".mp4",
}
def _is_audio(path: Path) -> bool:
return path.is_file() and path.suffix.lower() in AUDIO_EXTENSIONS
def _enqueue_file(file_path: Path):
"""
Enqueue a file for transcription + summarization via Celery.
"""
from .tasks import process_watch_file_task
try:
process_watch_file_task.delay(str(file_path))
except Exception as e:
logger.error("Failed to enqueue watch file %s: %s", file_path, e)
def _scan_directory(watch_dir: Path):
"""
Scan directory and enqueue all audio files.
"""
if not watch_dir.is_dir():
logger.warning("WATCH_DIR does not exist or is not a directory: %s", watch_dir)
return
for p in watch_dir.iterdir():
if _is_audio(p):
logger.info("Found audio file in WATCH_DIR: %s", p)
_enqueue_file(p)
def start_watcher():
"""
Start watch-folder loop in a background thread.
"""
enabled = os.getenv("WATCH_ENABLED", "false").strip().lower() in ("true", "1", "yes")
if not enabled:
return
watch_dir = os.getenv("WATCH_DIR")
if not watch_dir:
logger.warning("WATCH_ENABLED is true but WATCH_DIR is not set. Watcher disabled.")
return
email_to = os.getenv("WATCH_EMAIL_TO")
if not email_to:
logger.warning("WATCH_ENABLED is true but WATCH_EMAIL_TO is not set. Watcher disabled.")
return
interval = float(os.getenv("WATCH_POLL_INTERVAL", "10"))
watch_path = Path(watch_dir).expanduser().resolve()
watch_path.mkdir(parents=True, exist_ok=True)
logger.info("Starting watch-folder: dir=%s, email=%s, interval=%s", watch_dir, email_to, interval)
def _loop():
while True:
try:
_scan_directory(watch_path)
except Exception as e:
logger.error("Error scanning WATCH_DIR: %s", e)
time.sleep(interval)
t = threading.Thread(target=_loop, daemon=True)
t.start()