Files
scribe/scraibe/tasks.py
T
admin 154cac6c7b
Mirror and run GitLab CI / build (push) Has been cancelled
Ruff / ruff (push) Has been cancelled
Ensure success email subject is wired to EMAIL_SUBJECT_SUCCESS and never blank
2026-06-14 21:09:25 +00:00

492 lines
16 KiB
Python

"""
Celery tasks for async transcription, diarization, and email notifications.
"""
import os
import json
import logging
import tempfile
from datetime import datetime
from .celery_app import celery_app
from .autotranscript import Scraibe
from .summarizer import SummarizerClient, SummarizerError
from .misc import setup_logging
from .email_sender import send_email, EmailError, load_template
from .email_sender import create_transcript_docx, create_summary_docx
logger = logging.getLogger("scraibe.tasks")
def _local_part(email: str) -> str:
"""
Extract the part before '@' from an email, sanitized for filenames.
"""
local = (email or "").split("@")[0].strip()
local = "".join(ch if ch.isalnum() or ch in ("-", "_", ".") else "_" for ch in local)
return local or "user"
def _date_tag() -> str:
"""
Date tag in DD-MON-YYYY format (e.g. 01-JAN-2025).
"""
return datetime.utcnow().strftime("%d-%b-%Y").upper()
def _safe_filename(base: str, local: str, date_tag: str, ext: str) -> str:
"""
Create a temp file with the requested logical name.
Uses mktemp for uniqueness but keeps the desired name pattern.
"""
name = f"{base}-{local}-{date_tag}{ext}"
return tempfile.mktemp(prefix=name.replace(".", ""), suffix=ext)
def _remove_file(path: str):
"""
Remove a file if it exists. Best-effort; logs but never raises.
"""
if not path:
return
try:
if os.path.exists(path):
os.remove(path)
except Exception as e:
logger.warning("Failed to remove file %s: %s", path, e)
def _get_subject(env_var: str, default: str) -> str:
"""
Safely read an email subject from an environment variable.
Uses default if unset or blank. Logs the final value.
"""
value = (os.getenv(env_var) or "").strip()
subject = value or default
logger.info("Email subject [%s] = %s", env_var, subject)
return subject
def get_queue_position(task_id: str) -> int:
"""
Estimate the job's position in the queue.
Returns:
- A positive int if we can estimate (1 = first in line).
- 0 if we cannot reliably determine position.
"""
try:
inspect = celery_app.control.inspect()
reserved = inspect.reserved() or {} # queued but not yet running
active = inspect.active() or {} # currently running
# Count tasks ahead of this one in the reserved (waiting) queue
ahead = 0
found = False
for _, tasks in list(reserved.values()):
for t in tasks:
tid = t.get("id")
if tid == task_id:
found = True
break
ahead += 1
if found:
break
# If not found in reserved, it may already be active or not yet visible.
# In that case, treat it as position 1.
if found:
return max(ahead + 1, 1)
else:
return 1
except Exception:
# If inspection fails, don't guess; caller should use a safe message.
return 0
def send_initial_email(to: str, queue_pos: int):
"""
Send initial confirmation email with queue position.
Subject is customizable via EMAIL_SUBJECT_UPLOAD.
"""
subject = _get_subject(
"EMAIL_SUBJECT_UPLOAD",
"ScrAIbe: Your transcription request has been received",
)
body = (
"Hello,\n\n"
"We have received your audio file for transcription.\n"
)
if queue_pos > 0:
body += f"Your request is currently number {queue_pos} in the queue.\n"
queue_position_display = (
f'<span style="color:{_accent_color()}; font-weight:bold;">{queue_pos}</span>'
)
else:
body += "Your request has been queued for processing.\n"
queue_position_display = "the queue"
body += (
"\n"
"You will receive an email with your transcript (and summary, if requested) "
"once processing is complete.\n\n"
"If you have any questions, contact us at "
f"{os.getenv('EMAIL_CONTACT_ADDRESS', 'support@example.com')}.\n\n"
"This is an automated message from ScrAIbe.\n"
)
html = None
try:
html = load_template(
"upload_notification_template.html",
queue_position_text=queue_position_display,
)
except EmailError as e:
logger.warning("Failed to render upload notification template: %s", e)
try:
send_email(to=to, subject=subject, body=body, html=html, attachments=[])
logger.info("Initial confirmation email sent to %s", to)
except EmailError as e:
logger.error("Failed to send initial email to %s: %s", to, e)
def send_success_email(
to: str,
transcript_text: str,
summary_text: str,
attachments: list,
task_id: str,
):
"""
Send final email with transcript and attachments.
Subject is customizable via EMAIL_SUBJECT_SUCCESS.
Falls back to a safe default if the env var is missing or blank.
"""
subject = _get_subject(
"EMAIL_SUBJECT_SUCCESS",
"ScrAIbe: Your transcript is ready",
)
# Ensure subject is never blank
if not subject.strip():
subject = "ScrAIbe: Your transcript is ready"
logger.warning("EMAIL_SUBJECT_SUCCESS was blank; using default subject.")
else:
logger.info("Using success email subject: %s", subject)
body = (
"Hello,\n\n"
"Your transcription is ready.\n\n"
"Please find the transcript and JSON files attached.\n"
)
if summary_text:
body += (
"\n"
"SUMMARY\n"
"-------\n"
f"{summary_text}\n"
)
body += (
"\n"
"Job ID: " + str(task_id) + "\n\n"
"If you have any questions, contact us at "
f"{os.getenv('EMAIL_CONTACT_ADDRESS', 'support@example.com')}.\n\n"
"This is an automated message from ScrAIbe.\n"
)
html = None
try:
html = load_template("success_template.html")
except EmailError as e:
logger.warning("Failed to render success template: %s", e)
try:
send_email(
to=to,
subject=subject,
body=body,
html=html,
attachments=attachments,
)
logger.info("Success email sent to %s for job %s with subject: %s", to, task_id, subject)
except EmailError as e:
logger.error("Failed to send success email to %s for job %s: %s", to, task_id, e)
def send_error_email(to: str, error_message: str, task_id: str):
"""
Send error notification email.
Subject is customizable via EMAIL_SUBJECT_ERROR.
"""
subject = _get_subject(
"EMAIL_SUBJECT_ERROR",
"ScrAIbe: Error with your transcription request",
)
body = (
"Hello,\n\n"
"We encountered an error while processing your transcription request.\n\n"
f"Details: {error_message}\n\n"
"Job ID: " + str(task_id) + "\n\n"
"Please contact your administrator if the problem persists.\n\n"
"If you have any questions, contact us at "
f"{os.getenv('EMAIL_CONTACT_ADDRESS', 'support@example.com')}.\n\n"
"This is an automated message from ScrAIbe.\n"
)
html = None
try:
html = load_template(
"error_notification_template.html",
exception=str(error_message),
)
except EmailError as e:
logger.warning("Failed to render error template: %s", e)
try:
send_email(to=to, subject=subject, body=body, html=html, attachments=[])
logger.info("Error email sent to %s for job %s", to, task_id)
except EmailError as e:
logger.error("Failed to send error email to %s for job %s: %s", to, task_id, e)
@celery_app.task(
name="scraibe.tasks.process_transcription_task",
bind=True,
max_retries=1,
)
def process_transcription_task(
self,
audio_path: str,
task_type: str,
language: str,
num_speakers: int,
email_to: str,
email_cc: str,
include_summary: bool,
identify_speakers: bool = False,
):
"""
Async task: transcribe audio, optionally summarize, then email results.
Cleans up temporary files after completion.
"""
task_id = self.request.id
log_level = os.getenv("LOG_LEVEL", "INFO")
setup_logging(level=log_level)
temp_files = []
local = _local_part(email_to)
date_tag = _date_tag()
try:
# 1) Queue position and initial email
queue_pos = get_queue_position(task_id)
send_initial_email(to=email_to, queue_pos=queue_pos)
# 2) Initialize Scraibe
try:
scraibe = Scraibe(verbose=True)
except Exception as e:
send_error_email(
to=email_to,
error_message=f"Failed to initialize transcription service: {e}",
task_id=task_id,
)
raise
# 3) Transcription
if task_type == "transcript_and_summarize":
result = scraibe.transcript_and_summarize(
audio_file=audio_path,
language=language or None,
num_speakers=int(num_speakers) if num_speakers else None,
verbose=True,
for_export=True,
)
transcript_text = result.get("transcript", "")
summary_text = result.get("summary", "")
segments = result.get("segments", [])
raw_result = result.get("raw_result")
else:
result = scraibe.transcribe(
audio_file=audio_path,
language=language or None,
num_speakers=int(num_speakers) if num_speakers else None,
verbose=True,
for_export=True,
)
transcript_text = result.get("transcript", "")
summary_text = ""
segments = result.get("segments", [])
raw_result = result.get("raw_result")
# 3b) Optional speaker identification
speaker_map = {} # e.g. {"SPEAKER 1": "John", "SPEAKER 2": "Maria"}
if identify_speakers:
try:
# Use the same summarizer client as transcript_and_summarize
scraibe._ensure_summarizer()
summarizer = scraibe._summarizer
prompt = (
"Below is a transcript with speaker labels like 'SPEAKER 1', 'SPEAKER 2', etc. "
"Based on how they speak and the context, suggest realistic names for each speaker. "
"Do not add extra commentary. Output ONLY a mapping in this exact format, one per line:\n"
"SPEAKER 1: Suggested Name\n"
"SPEAKER 2: Suggested Name\n"
"SPEAKER 3: Suggested Name\n"
"\n"
"Transcript:\n"
+ transcript_text
)
response = summarizer._chat_completion(
messages=[{"role": "user", "content": prompt}],
temperature=0.3,
max_tokens=300,
)
reply = (response or {}).get("choices", [{}])[0].get("message", {}).get("content", "")
# Parse mapping
import re
for m in re.finditer(
r"SPEAKER\s+(\d+)\s*:\s*(.+)",
reply,
re.IGNORECASE,
):
spk = f"SPEAKER {m.group(1).strip()}"
name = m.group(2).strip().rstrip(".")
if name:
speaker_map[spk] = name
logger.info("Speaker identification mapping: %s", speaker_map)
# Apply mapping to transcript text
if speaker_map:
def replace_speaker(m):
label = m.group(0).strip()
# normalize to "SPEAKER N"
normalized = re.sub(
r"\s+",
" ",
re.sub(r"[^A-Z0-9\s]", "", label.upper()),
).strip()
return speaker_map.get(normalized, label)
# Replace in lines like "[00:12] SPEAKER 1:" but preserve timestamp and colon
def replace_in_line(line: str) -> str:
# match after timestamp bracket and space: "SPEAKER N:"
return re.sub(
r"(\[\d+:\d+(?::\d+)?\]\s*)([A-Z\s]+?):\s*",
lambda m: m.group(1) + (speaker_map.get(m.group(2).strip(), m.group(2)) + ": "),
line,
)
transcript_lines = transcript_text.splitlines()
transcript_text = "\n".join(
replace_in_line(line) for line in transcript_lines
)
# Also update segments for JSON export
updated_segments = []
for seg in segments:
sp = (seg.get("speaker") or "").strip()
sp_norm = re.sub(r"[^A-Z0-9\s]", "", sp.upper()).strip()
sp_new = speaker_map.get(sp_norm, sp)
seg = dict(seg)
seg["speaker"] = sp_new
updated_segments.append(seg)
segments = updated_segments
except (SummarizerError, Exception) as e:
logger.warning(
"Speaker identification failed; falling back to Speaker IDs: %s", e
)
speaker_map = {}
# 4) Prepare files
# Transcript .md
md_transcript_path = _safe_filename("TRANSCRIPT", local, date_tag, ".md")
with open(md_transcript_path, "w", encoding="utf-8") as f:
f.write("# Transcript\n\n")
f.write(transcript_text)
temp_files.append(md_transcript_path)
# Transcript .docx
docx_transcript_path = _safe_filename("TRANSCRIPT", local, date_tag, ".docx")
create_transcript_docx(transcript_text, docx_transcript_path)
temp_files.append(docx_transcript_path)
# JSON as SOURCE
json_data = {
"task": task_type,
"transcript": transcript_text,
"segments": segments,
"metadata": {
"timestamp": datetime.utcnow().isoformat(),
"job_id": task_id,
},
}
if summary_text:
json_data["summary"] = summary_text
if raw_result is not None:
json_data["raw_result"] = raw_result
json_path = _safe_filename("SOURCE", local, date_tag, ".json")
with open(json_path, "w", encoding="utf-8") as f:
json.dump(json_data, f, indent=2, ensure_ascii=False)
temp_files.append(json_path)
# Summary files (if present)
if summary_text:
md_summary_path = _safe_filename("SUMMARY", local, date_tag, ".md")
with open(md_summary_path, "w", encoding="utf-8") as f:
f.write("# Summary\n\n")
f.write(summary_text)
temp_files.append(md_summary_path)
docx_summary_path = _safe_filename("SUMMARY", local, date_tag, ".docx")
create_summary_docx(summary_text, docx_summary_path)
temp_files.append(docx_summary_path)
attachments = [
md_transcript_path,
docx_transcript_path,
json_path,
]
if summary_text:
attachments += [md_summary_path, docx_summary_path]
# 5) Send success email
send_success_email(
to=email_to,
transcript_text=transcript_text,
summary_text=summary_text if include_summary else "",
attachments=attachments,
task_id=task_id,
)
logger.info("Job %s completed successfully.", task_id)
except Exception as e:
logger.error("Error processing job %s: %s", task_id, e, exc_info=True)
send_error_email(
to=email_to,
error_message=str(e),
task_id=task_id,
)
raise e
finally:
# 6) Cleanup
for path in temp_files:
_remove_file(path)
if audio_path:
_remove_file(audio_path)
logger.info("Cleanup completed for job %s.", task_id)