Implement async processing with Celery, Redis, and queue-based email notifications
Mirror and run GitLab CI / build (push) Has been cancelled
Ruff / ruff (push) Has been cancelled

This commit is contained in:
admin
2026-06-14 14:38:10 +00:00
parent b9d25a39dd
commit 2803c81b44
5 changed files with 362 additions and 274 deletions
+249
View File
@@ -0,0 +1,249 @@
"""
Celery tasks for async transcription, diarization, and email notifications.
"""
import os
import json
import logging
import tempfile
from datetime import datetime
from .celery_app import celery_app
from .autotranscript import Scraibe
from .misc import setup_logging
from .email_sender import send_email, EmailError
logger = logging.getLogger("scraibe.tasks")
def get_queue_position(task_id: str) -> int:
"""
Estimate the job's position in the queue.
This is a simple count of ready/started tasks.
"""
try:
inspect = celery_app.control.inspect()
ready = inspect.active() or {}
reserved = inspect.reserved() or {}
count = 0
for _, tasks in list(ready.values()) + list(reserved.values()):
for t in tasks:
if t.get("id") == task_id:
break
count += 1
return max(count + 1, 1)
except Exception:
return -1
def send_initial_email(to: str, queue_pos: int):
"""
Send initial confirmation email with queue position.
"""
subject = "ScrAIbe: Your transcription request has been received"
body = (
"Hello,\n\n"
"We have received your audio file for transcription.\n"
)
if queue_pos > 0:
body += f"Your request is currently number {queue_pos} in the queue.\n"
else:
body += "Your request has been queued for processing.\n"
body += (
"\n"
"You will receive an email with your transcript (and summary, if requested) "
"once processing is complete.\n\n"
"This is an automated message from ScrAIbe.\n"
)
try:
send_email(to=to, subject=subject, body=body, attachments=[])
logger.info("Initial confirmation email sent to %s", to)
except EmailError as e:
logger.error("Failed to send initial email to %s: %s", to, e)
def send_success_email(
to: str,
transcript_text: str,
summary_text: str,
attachments: list,
task_id: str,
):
"""
Send final email with transcript and attachments.
"""
subject = "ScrAIbe: Your transcript is ready"
body = (
"Hello,\n\n"
"Your transcription is ready.\n\n"
"Please find the transcript and JSON files attached.\n"
)
if summary_text:
body += (
"\n"
"SUMMARY\n"
"-------\n"
f"{summary_text}\n"
)
body += (
"\n"
"Job ID: " + str(task_id) + "\n\n"
"This is an automated message from ScrAIbe.\n"
)
try:
send_email(
to=to,
subject=subject,
body=body,
attachments=attachments,
)
logger.info("Success email sent to %s for job %s", to, task_id)
except EmailError as e:
logger.error("Failed to send success email to %s for job %s: %s", to, task_id, e)
def send_error_email(to: str, error_message: str, task_id: str):
"""
Send error notification email.
"""
subject = "ScrAIbe: Error with your transcription request"
body = (
"Hello,\n\n"
"We encountered an error while processing your transcription request.\n\n"
f"Details: {error_message}\n\n"
"Job ID: " + str(task_id) + "\n\n"
"Please contact your administrator if the problem persists.\n\n"
"This is an automated message from ScrAIbe.\n"
)
try:
send_email(to=to, subject=subject, body=body, attachments=[])
logger.info("Error email sent to %s for job %s", to, task_id)
except EmailError as e:
logger.error("Failed to send error email to %s for job %s: %s", to, task_id, e)
@celery_app.task(
name="scraibe.tasks.process_transcription_task",
bind=True,
max_retries=1,
)
def process_transcription_task(
self,
audio_path: str,
task_type: str,
language: str,
num_speakers: int,
email_to: str,
email_cc: str,
include_summary: bool,
):
"""
Async task: transcribe audio, optionally summarize, then email results.
"""
task_id = self.request.id
# Ensure logging
log_level = os.getenv("LOG_LEVEL", "INFO")
setup_logging(level=log_level)
# 1) Determine queue position and send initial email
queue_pos = get_queue_position(task_id)
send_initial_email(to=email_to, queue_pos=queue_pos)
# 2) Initialize Scraibe
try:
scraibe = Scraibe(verbose=True)
except Exception as e:
send_error_email(
to=email_to,
error_message=f"Failed to initialize transcription service: {e}",
task_id=task_id,
)
raise
try:
# 3) Perform transcription
if task_type == "transcript_and_summarize":
result = scraibe.transcript_and_summarize(
audio_file=audio_path,
language=language or None,
num_speakers=int(num_speakers) if num_speakers else None,
verbose=True,
for_export=True,
)
transcript_text = result.get("transcript", "")
summary_text = result.get("summary", "")
segments = result.get("segments", [])
raw_result = result.get("raw_result")
else:
result = scraibe.transcribe(
audio_file=audio_path,
language=language or None,
num_speakers=int(num_speakers) if num_speakers else None,
verbose=True,
for_export=True,
)
transcript_text = result.get("transcript", "")
summary_text = ""
segments = result.get("segments", [])
raw_result = result.get("raw_result")
# 4) Prepare files for email
attachments = []
# TXT transcript
txt_path = tempfile.mktemp(suffix=".txt")
with open(txt_path, "w", encoding="utf-8") as f:
f.write(transcript_text)
attachments.append(txt_path)
# JSON with diarization
json_data = {
"task": task_type,
"transcript": transcript_text,
"segments": segments,
"metadata": {
"timestamp": datetime.utcnow().isoformat(),
"job_id": task_id,
},
}
if summary_text:
json_data["summary"] = summary_text
if raw_result is not None:
json_data["raw_result"] = raw_result
json_path = tempfile.mktemp(suffix=".json")
with open(json_path, "w", encoding="utf-8") as f:
json.dump(json_data, f, indent=2, ensure_ascii=False)
attachments.append(json_path)
# 5) Send success email
send_success_email(
to=email_to,
transcript_text=transcript_text,
summary_text=summary_text if include_summary else "",
attachments=attachments,
task_id=task_id,
)
logger.info("Job %s completed successfully.", task_id)
except Exception as e:
logger.error("Error processing job %s: %s", task_id, e, exc_info=True)
send_error_email(
to=email_to,
error_message=str(e),
task_id=task_id,
)
raise e