From 2803c81b4402a276546413a3c5a0913cbeef5518 Mon Sep 17 00:00:00 2001 From: admin Date: Sun, 14 Jun 2026 14:38:10 +0000 Subject: [PATCH] Implement async processing with Celery, Redis, and queue-based email notifications --- Dockerfile | 16 +- requirements.txt | 2 + scraibe/celery_app.py | 28 ++++ scraibe/tasks.py | 249 ++++++++++++++++++++++++++++++ scraibe/webui.py | 341 +++++++++--------------------------------- 5 files changed, 362 insertions(+), 274 deletions(-) create mode 100644 scraibe/celery_app.py create mode 100644 scraibe/tasks.py diff --git a/Dockerfile b/Dockerfile index 33525c5..5f3a98f 100644 --- a/Dockerfile +++ b/Dockerfile @@ -9,9 +9,9 @@ LABEL description="Scraibe: LocalAI-backed transcription and diarization client Sends audio to a LocalAI server running vibevoice.cpp and uses a second LLM for summarization." LABEL url="https://git.optimex.systems/admin/scribe" -# Install system dependencies (ffmpeg required) +# Install system dependencies (ffmpeg, redis) RUN apt update -y && \ - apt install -y --no-install-recommends ffmpeg && \ + apt install -y --no-install-recommends ffmpeg redis-server && \ apt clean && \ rm -rf /var/lib/apt/lists/* /tmp/* /var/tmp/* @@ -31,6 +31,11 @@ ENV SUMMARIZER_MODEL=qwen3-14b # Gradio / Web GUI ENV GRADIO_SERVER_NAME=0.0.0.0 +# Async processing (Celery + Redis) +ENV CELERY_BROKER_URL=redis://localhost:6379/0 +ENV CELERY_RESULT_BACKEND=redis://localhost:6379/0 +ENV SCRAIBE_UPLOAD_DIR=/tmp/scraibe_uploads + # Copy and install Python dependencies COPY requirements.txt /app/src/requirements.txt RUN pip install --no-cache-dir -r requirements.txt @@ -41,9 +46,8 @@ COPY scraibe /app/src/scraibe # Copy custom Web GUI assets (header, footer, templates, logos, config) COPY misc /app/src/misc -# Expose port (for Web GUI) +# Expose ports EXPOSE 7860 -# Run the Web GUI by default (never the CLI) -# Use python -m scraibe so __main__.py forces web GUI. -CMD ["python3", "-m", "scraibe"] +# Run the Web GUI and Celery worker (with Redis) by default +CMD ["/bin/bash", "-c", "redis-server --daemonize yes && celery -A scraibe.celery_app worker -Q transcription -l info & python3 -m scraibe"] diff --git a/requirements.txt b/requirements.txt index a3493c9..77a9b2f 100644 --- a/requirements.txt +++ b/requirements.txt @@ -3,3 +3,5 @@ numpy>=1.26.4 httpx>=0.28.0 gradio>=5.0.0 PyYAML>=6.0 +celery[redis]>=5.3.0 +redis>=5.0.0 diff --git a/scraibe/celery_app.py b/scraibe/celery_app.py new file mode 100644 index 0000000..e2d094d --- /dev/null +++ b/scraibe/celery_app.py @@ -0,0 +1,28 @@ +""" +Celery application for async transcription jobs. +""" + +import os +from celery import Celery + +broker_url = os.getenv("CELERY_BROKER_URL", "redis://redis:6379/0") +result_backend = os.getenv("CELERY_RESULT_BACKEND", "redis://redis:6379/0") + +celery_app = Celery( + "scraibe", + broker=broker_url, + backend=result_backend, +) + +celery_app.conf.update( + task_routes={ + "scraibe.tasks.process_transcription_task": {"queue": "transcription"}, + }, + task_serializer="json", + result_serializer="json", + accept_content=["json"], + timezone="UTC", + enable_utc=True, +) + +celery_app.autodiscover_tasks(["scraibe.tasks"]) diff --git a/scraibe/tasks.py b/scraibe/tasks.py new file mode 100644 index 0000000..14f2f74 --- /dev/null +++ b/scraibe/tasks.py @@ -0,0 +1,249 @@ +""" +Celery tasks for async transcription, diarization, and email notifications. +""" + +import os +import json +import logging +import tempfile +from datetime import datetime + +from .celery_app import celery_app +from .autotranscript import Scraibe +from .misc import setup_logging +from .email_sender import send_email, EmailError + +logger = logging.getLogger("scraibe.tasks") + + +def get_queue_position(task_id: str) -> int: + """ + Estimate the job's position in the queue. + This is a simple count of ready/started tasks. + """ + try: + inspect = celery_app.control.inspect() + ready = inspect.active() or {} + reserved = inspect.reserved() or {} + count = 0 + for _, tasks in list(ready.values()) + list(reserved.values()): + for t in tasks: + if t.get("id") == task_id: + break + count += 1 + return max(count + 1, 1) + except Exception: + return -1 + + +def send_initial_email(to: str, queue_pos: int): + """ + Send initial confirmation email with queue position. + """ + subject = "ScrAIbe: Your transcription request has been received" + body = ( + "Hello,\n\n" + "We have received your audio file for transcription.\n" + ) + + if queue_pos > 0: + body += f"Your request is currently number {queue_pos} in the queue.\n" + else: + body += "Your request has been queued for processing.\n" + + body += ( + "\n" + "You will receive an email with your transcript (and summary, if requested) " + "once processing is complete.\n\n" + "This is an automated message from ScrAIbe.\n" + ) + + try: + send_email(to=to, subject=subject, body=body, attachments=[]) + logger.info("Initial confirmation email sent to %s", to) + except EmailError as e: + logger.error("Failed to send initial email to %s: %s", to, e) + + +def send_success_email( + to: str, + transcript_text: str, + summary_text: str, + attachments: list, + task_id: str, +): + """ + Send final email with transcript and attachments. + """ + subject = "ScrAIbe: Your transcript is ready" + + body = ( + "Hello,\n\n" + "Your transcription is ready.\n\n" + "Please find the transcript and JSON files attached.\n" + ) + + if summary_text: + body += ( + "\n" + "SUMMARY\n" + "-------\n" + f"{summary_text}\n" + ) + + body += ( + "\n" + "Job ID: " + str(task_id) + "\n\n" + "This is an automated message from ScrAIbe.\n" + ) + + try: + send_email( + to=to, + subject=subject, + body=body, + attachments=attachments, + ) + logger.info("Success email sent to %s for job %s", to, task_id) + except EmailError as e: + logger.error("Failed to send success email to %s for job %s: %s", to, task_id, e) + + +def send_error_email(to: str, error_message: str, task_id: str): + """ + Send error notification email. + """ + subject = "ScrAIbe: Error with your transcription request" + + body = ( + "Hello,\n\n" + "We encountered an error while processing your transcription request.\n\n" + f"Details: {error_message}\n\n" + "Job ID: " + str(task_id) + "\n\n" + "Please contact your administrator if the problem persists.\n\n" + "This is an automated message from ScrAIbe.\n" + ) + + try: + send_email(to=to, subject=subject, body=body, attachments=[]) + logger.info("Error email sent to %s for job %s", to, task_id) + except EmailError as e: + logger.error("Failed to send error email to %s for job %s: %s", to, task_id, e) + + +@celery_app.task( + name="scraibe.tasks.process_transcription_task", + bind=True, + max_retries=1, +) +def process_transcription_task( + self, + audio_path: str, + task_type: str, + language: str, + num_speakers: int, + email_to: str, + email_cc: str, + include_summary: bool, +): + """ + Async task: transcribe audio, optionally summarize, then email results. + """ + task_id = self.request.id + + # Ensure logging + log_level = os.getenv("LOG_LEVEL", "INFO") + setup_logging(level=log_level) + + # 1) Determine queue position and send initial email + queue_pos = get_queue_position(task_id) + send_initial_email(to=email_to, queue_pos=queue_pos) + + # 2) Initialize Scraibe + try: + scraibe = Scraibe(verbose=True) + except Exception as e: + send_error_email( + to=email_to, + error_message=f"Failed to initialize transcription service: {e}", + task_id=task_id, + ) + raise + + try: + # 3) Perform transcription + if task_type == "transcript_and_summarize": + result = scraibe.transcript_and_summarize( + audio_file=audio_path, + language=language or None, + num_speakers=int(num_speakers) if num_speakers else None, + verbose=True, + for_export=True, + ) + transcript_text = result.get("transcript", "") + summary_text = result.get("summary", "") + segments = result.get("segments", []) + raw_result = result.get("raw_result") + else: + result = scraibe.transcribe( + audio_file=audio_path, + language=language or None, + num_speakers=int(num_speakers) if num_speakers else None, + verbose=True, + for_export=True, + ) + transcript_text = result.get("transcript", "") + summary_text = "" + segments = result.get("segments", []) + raw_result = result.get("raw_result") + + # 4) Prepare files for email + attachments = [] + + # TXT transcript + txt_path = tempfile.mktemp(suffix=".txt") + with open(txt_path, "w", encoding="utf-8") as f: + f.write(transcript_text) + attachments.append(txt_path) + + # JSON with diarization + json_data = { + "task": task_type, + "transcript": transcript_text, + "segments": segments, + "metadata": { + "timestamp": datetime.utcnow().isoformat(), + "job_id": task_id, + }, + } + + if summary_text: + json_data["summary"] = summary_text + + if raw_result is not None: + json_data["raw_result"] = raw_result + + json_path = tempfile.mktemp(suffix=".json") + with open(json_path, "w", encoding="utf-8") as f: + json.dump(json_data, f, indent=2, ensure_ascii=False) + attachments.append(json_path) + + # 5) Send success email + send_success_email( + to=email_to, + transcript_text=transcript_text, + summary_text=summary_text if include_summary else "", + attachments=attachments, + task_id=task_id, + ) + + logger.info("Job %s completed successfully.", task_id) + + except Exception as e: + logger.error("Error processing job %s: %s", task_id, e, exc_info=True) + send_error_email( + to=email_to, + error_message=str(e), + task_id=task_id, + ) + raise e diff --git a/scraibe/webui.py b/scraibe/webui.py index 8c656a3..3b1cb28 100644 --- a/scraibe/webui.py +++ b/scraibe/webui.py @@ -1,26 +1,28 @@ """ -ScrAIbe Web GUI (Gradio) ------------------------- +ScrAIbe Web GUI (Gradio) - Async Mode +------------------------------------- Runs the Web GUI that: - Accepts audio uploads -- Sends audio to LocalAI for transcription + diarization -- Optionally sends transcript to a second LLM for summarization -- Returns transcript (and summary) in the browser -- Optionally emails transcript files (TXT + JSON) +- Enqueues transcription jobs asynchronously via Celery +- Backend worker: + - Transcribes (with diarization) + - Optionally summarizes + - Emails the user: + - Immediately: confirmation + queue position + - On success: transcript + JSON (+ summary if requested) + - On error: error details This is the default entrypoint when running in Docker. """ import os -import json import logging -import tempfile +import shutil from datetime import datetime import gradio as gr -from .autotranscript import Scraibe from .misc import setup_logging logger = logging.getLogger("scraibe.webui") @@ -45,7 +47,7 @@ def load_config(): def create_app(): """ - Create and launch the Gradio Web GUI. + Create and launch the Gradio Web GUI (async mode). """ # Logging @@ -57,185 +59,11 @@ def create_app(): layout_cfg = config.get("layout", {}) launch_cfg = config.get("launch", {}) - logger.info("Starting ScrAIbe Web GUI.") + logger.info("Starting ScrAIbe Web GUI (async mode).") - # Initialize Scraibe (LocalAI-backed) - # If LocalAI is unreachable at startup, still launch the UI - # and let individual transcription calls fail with a clear message. - scraibe = None - try: - scraibe = Scraibe(verbose=True) - except Exception as e: - logger.warning( - "Failed to initialize Scraibe at startup (LocalAI may be down): %s. " - "Web GUI will start; transcription will fail until LocalAI is reachable.", - e, - ) - - # Helper: run transcription via LocalAI API - def run_transcribe( - audio_path, - task, - language, - num_speakers, - send_email_flag, - email_to, - email_cc, - email_subject, - ): - if not audio_path: - raise ValueError("No audio file provided.") - - email_status = "" - attachments = [] - - # Ensure we use rich export mode (for JSON with diarization) - try: - if task == "transcript_and_summarize": - result = scraibe.transcript_and_summarize( - audio_file=audio_path, - language=language or None, - num_speakers=int(num_speakers) if num_speakers else None, - verbose=True, - for_export=True, - ) - transcript_text = result.get("transcript", "") - summary_text = result.get("summary", "") - segments = result.get("segments", []) - raw_result = result.get("raw_result") - - # Save as .md (transcript + summary) - md_path = tempfile.mktemp(suffix=".md") - with open(md_path, "w", encoding="utf-8") as f: - f.write("# Transcript\n\n") - f.write(transcript_text) - f.write("\n\n# Summary\n\n") - f.write(summary_text) - - # Save as .txt (plain transcript) - txt_path = tempfile.mktemp(suffix=".txt") - with open(txt_path, "w", encoding="utf-8") as f: - f.write(transcript_text) - - # Save as .json (diarization + transcript + summary) - json_data = { - "task": "transcript_and_summarize", - "transcript": transcript_text, - "summary": summary_text, - "segments": segments, - "metadata": { - "timestamp": datetime.utcnow().isoformat() - }, - } - if raw_result is not None: - json_data["raw_result"] = raw_result - - json_path = tempfile.mktemp(suffix=".json") - with open(json_path, "w", encoding="utf-8") as f: - json.dump(json_data, f, indent=2, ensure_ascii=False) - - # Prepare attachments for email - if send_email_flag: - attachments = [txt_path, json_path] - - status_msg = "Transcription and summarization completed." - - else: - # transcribe only (with diarization) - result = scraibe.transcribe( - audio_file=audio_path, - language=language or None, - num_speakers=int(num_speakers) if num_speakers else None, - verbose=True, - for_export=True, - ) - transcript_text = result.get("transcript", "") - segments = result.get("segments", []) - raw_result = result.get("raw_result") - - # Save as .txt (plain transcript) - txt_path = tempfile.mktemp(suffix=".txt") - with open(txt_path, "w", encoding="utf-8") as f: - f.write(transcript_text) - - # Save as .json (diarization + transcript) - json_data = { - "task": "transcribe", - "transcript": transcript_text, - "segments": segments, - "metadata": { - "timestamp": datetime.utcnow().isoformat() - }, - } - if raw_result is not None: - json_data["raw_result"] = raw_result - - json_path = tempfile.mktemp(suffix=".json") - with open(json_path, "w", encoding="utf-8") as f: - json.dump(json_data, f, indent=2, ensure_ascii=False) - - # Prepare attachments for email - if send_email_flag: - attachments = [txt_path, json_path] - - status_msg = "Transcription completed." - - except Exception as e: - logger.error("Error during transcription: %s", e) - return ( - "", - "", - None, - f"Error: {e}", - "", - ) - - # Handle email after successful transcription - if send_email_flag and attachments: - try: - from .email_sender import send_email, EmailError - except ImportError: - email_status = "Email feature unavailable (email_sender not found)." - else: - to = (email_to or "").strip() - cc = (email_cc or "").strip() - subject = (email_subject or "").strip() - - if not to: - email_status = "Email not sent: 'To' address is empty." - else: - if not subject: - subject = f"ScrAIbe Transcript - {datetime.utcnow().strftime('%Y-%m-%d %H:%M UTC')}" - - body = ( - "Please find the transcription files attached.\n" - "This message was generated by ScrAIbe.\n" - ) - - try: - send_email( - to=to, - cc=cc or None, - subject=subject, - body=body, - attachments=attachments, - ) - email_status = "Transcript files sent via email." - except EmailError as e: - email_status = f"Email failed: {e}" - except Exception as e: - email_status = f"Email failed: {e}" - - # Use md_path for file_output in transcript_and_summarize, else txt_path - file_path = md_path if task == "transcript_and_summarize" else txt_path - - return ( - transcript_text, - summary_text if task == "transcript_and_summarize" else "", - file_path, - status_msg, - email_status, - ) + # Ensure upload directory exists + upload_dir = os.getenv("SCRAIBE_UPLOAD_DIR", "/tmp/scraibe_uploads") + os.makedirs(upload_dir, exist_ok=True) # Load header/footer HTML if present header_path = layout_cfg.get("header", "/app/src/misc/header.html") @@ -253,7 +81,6 @@ def create_app(): footer_html = f.read() # Build Gradio interface - # In Gradio 6.0+, css must be passed to launch(), not Blocks() with gr.Blocks( title="A.P.Strom Transcription", ) as app: @@ -289,57 +116,25 @@ def create_app(): precision=0, ) - # Email options - send_email_checkbox = gr.Checkbox( - label="Send transcript files via email" + # Email is required in async mode + email_to = gr.Textbox( + label="Your email address (required)", + placeholder="e.g. your.name@example.com", ) - with gr.Group(visible=False) as email_group: - email_to = gr.Textbox( - label="To (comma-separated)", - placeholder="e.g. name@example.com", - ) - email_cc = gr.Textbox( - label="CC (optional, comma-separated)", - placeholder="e.g. manager@example.com", - ) - email_subject = gr.Textbox( - label="Subject (optional)", - placeholder="Default: ScrAIbe Transcript - ", - ) - - send_email_checkbox.change( - fn=lambda v: gr.update(visible=v), - inputs=[send_email_checkbox], - outputs=[email_group], + email_cc = gr.Textbox( + label="CC (optional, comma-separated)", + placeholder="e.g. manager@example.com", ) - transcribe_btn = gr.Button("Start", variant="primary") + submit_btn = gr.Button("Submit for transcription", variant="primary") with gr.Column(scale=3): - output_text = gr.Textbox( - label="Transcript", - lines=10, - interactive=False, - ) - summary_text = gr.Textbox( - label="Summary", - lines=10, - interactive=False, - visible=False, - ) - file_output = gr.File( - label="Download transcript/summary", - ) status_text = gr.Textbox( label="Status", + lines=6, interactive=False, ) - email_status_text = gr.Textbox( - label="Email status", - interactive=False, - visible=True, - ) # Footer if footer_html: @@ -348,73 +143,83 @@ def create_app(): # Events def on_task_change(value): - show_summary = (value == "transcript_and_summarize") - return gr.update(visible=show_summary) + # No special UI changes needed; both modes handled in backend + return task_choice.change( fn=on_task_change, inputs=[task_choice], - outputs=[summary_text], + outputs=[], ) - def on_transcribe( + def on_submit( audio, task, language, num_speakers, - send_email_flag, email_to_val, email_cc_val, - email_subject_val, ): if not audio: + return "Please upload or record audio." + + email_to_val = (email_to_val or "").strip() + if not email_to_val: + return "Please enter your email address." + + # Copy uploaded file to a stable location + try: + ext = os.path.splitext(audio)[1] or ".wav" + ts = datetime.utcnow().strftime("%Y%m%d%H%M%S%f") + new_name = f"upload_{ts}{ext}" + dest_path = os.path.join(upload_dir, new_name) + shutil.copy2(audio, dest_path) + except Exception as e: + logger.error("Error copying uploaded file: %s", e) + return f"Error saving your file: {e}" + + # Import Celery task + try: + from .tasks import process_transcription_task + except ImportError: return ( - "", - "", - None, - "Please upload or record audio.", - "", + "Error: async processing is not available (Celery not configured)." ) - transcript, summary, file_path, status_msg, email_status = run_transcribe( - audio_path=audio, - task=task, - language=language, - num_speakers=num_speakers, - send_email_flag=bool(send_email_flag), - email_to=email_to_val, - email_cc=email_cc_val, - email_subject=email_subject_val, - ) + # Enqueue transcription job + try: + task_result = process_transcription_task.delay( + audio_path=dest_path, + task_type=task, + language=language or None, + num_speakers=int(num_speakers) if num_speakers else None, + email_to=email_to_val, + email_cc=email_cc_val or None, + include_summary=(task == "transcript_and_summarize"), + ) + except Exception as e: + logger.error("Error enqueuing job: %s", e) + return f"Error submitting your file: {e}" - show_summary = bool(summary) return ( - transcript, - summary, - file_path if file_path else None, - status_msg, - email_status, + "Your audio file has been received and added to the queue.\n" + "We have sent a confirmation email to you.\n" + "You will receive another email with your transcript (and summary, if requested) " + "once processing is complete.\n" + f"Job ID: {task_result.id}" ) - transcribe_btn.click( - fn=on_transcribe, + submit_btn.click( + fn=on_submit, inputs=[ audio_input, task_choice, language_input, num_speakers_input, - send_email_checkbox, email_to, email_cc, - email_subject, - ], - outputs=[ - output_text, - summary_text, - file_output, - status_text, - email_status_text, ], + outputs=[status_text], ) # Launch options