""" Celery tasks for async transcription, diarization, and email notifications. """ import os import json import logging import tempfile from datetime import datetime from .celery_app import celery_app from .autotranscript import Scraibe from .summarizer import SummarizerClient, SummarizerError from .misc import setup_logging from .email_sender import send_email, EmailError, load_template from .email_sender import create_transcript_docx, create_summary_docx logger = logging.getLogger("scraibe.tasks") def _local_part(email: str) -> str: """ Extract the part before '@' from an email, sanitized for filenames. """ local = (email or "").split("@")[0].strip() local = "".join(ch if ch.isalnum() or ch in ("-", "_", ".") else "_" for ch in local) return local or "user" def _date_tag() -> str: """ Date tag in DD-MON-YYYY format (e.g. 01-JAN-2025). """ return datetime.utcnow().strftime("%d-%b-%Y").upper() def _safe_filename(base: str, local: str, date_tag: str, ext: str) -> str: """ Create a temp file with the requested logical name. Uses mktemp for uniqueness but keeps the desired name pattern. """ name = f"{base}-{local}-{date_tag}{ext}" return tempfile.mktemp(prefix=name.replace(".", ""), suffix=ext) def _remove_file(path: str): """ Remove a file if it exists. Best-effort; logs but never raises. """ if not path: return try: if os.path.exists(path): os.remove(path) except Exception as e: logger.warning("Failed to remove file %s: %s", path, e) def _get_subject(env_var: str, default: str) -> str: """ Safely read an email subject from an environment variable. Uses default if unset or blank. Logs the final value. """ value = (os.getenv(env_var) or "").strip() subject = value or default logger.info("Email subject [%s] = %s", env_var, subject) return subject def get_queue_position(task_id: str) -> int: """ Estimate the job's position in the queue. Returns: - A positive int if we can estimate (1 = first in line). - 0 if we cannot reliably determine position. """ try: inspect = celery_app.control.inspect() reserved = inspect.reserved() or {} # queued but not yet running active = inspect.active() or {} # currently running # Count tasks ahead of this one in the reserved (waiting) queue ahead = 0 found = False for _, tasks in list(reserved.values()): for t in tasks: tid = t.get("id") if tid == task_id: found = True break ahead += 1 if found: break # If not found in reserved, it may already be active or not yet visible. # In that case, treat it as position 1. if found: return max(ahead + 1, 1) else: return 1 except Exception: # If inspection fails, don't guess; caller should use a safe message. return 0 def send_initial_email(to: str, queue_pos: int): """ Send initial confirmation email with queue position. Subject is customizable via EMAIL_SUBJECT_UPLOAD. """ subject = _get_subject( "EMAIL_SUBJECT_UPLOAD", "ScrAIbe: Your transcription request has been received", ) body = ( "Hello,\n\n" "We have received your audio file for transcription.\n" ) if queue_pos > 0: body += f"Your request is currently number {queue_pos} in the queue.\n" queue_position_display = ( f'{queue_pos}' ) else: body += "Your request has been queued for processing.\n" queue_position_display = "the queue" body += ( "\n" "You will receive an email with your transcript (and summary, if requested) " "once processing is complete.\n\n" "If you have any questions, contact us at " f"{os.getenv('EMAIL_CONTACT_ADDRESS', 'support@example.com')}.\n\n" "This is an automated message from ScrAIbe.\n" ) html = None try: html = load_template( "upload_notification_template.html", queue_position_text=queue_position_display, ) except EmailError as e: logger.warning("Failed to render upload notification template: %s", e) try: send_email(to=to, subject=subject, body=body, html=html, attachments=[]) logger.info("Initial confirmation email sent to %s", to) except EmailError as e: logger.error("Failed to send initial email to %s: %s", to, e) def send_success_email( to: str, transcript_text: str, summary_text: str, attachments: list, task_id: str, ): """ Send final email with transcript and attachments. Subject is customizable via EMAIL_SUBJECT_SUCCESS. Falls back to a safe default if the env var is missing or blank. """ subject = _get_subject( "EMAIL_SUBJECT_SUCCESS", "ScrAIbe: Your transcript is ready", ) body = ( "Hello,\n\n" "Your transcription is ready.\n\n" "Please find the transcript and JSON files attached.\n" ) if summary_text: body += ( "\n" "SUMMARY\n" "-------\n" f"{summary_text}\n" ) body += ( "\n" "Job ID: " + str(task_id) + "\n\n" "If you have any questions, contact us at " f"{os.getenv('EMAIL_CONTACT_ADDRESS', 'support@example.com')}.\n\n" "This is an automated message from ScrAIbe.\n" ) html = None try: html = load_template("success_template.html") except EmailError as e: logger.warning("Failed to render success template: %s", e) try: send_email( to=to, subject=subject, body=body, html=html, attachments=attachments, ) logger.info("Success email sent to %s for job %s with subject: %s", to, task_id, subject) except EmailError as e: logger.error("Failed to send success email to %s for job %s: %s", to, task_id, e) def send_error_email(to: str, error_message: str, task_id: str): """ Send error notification email. Subject is customizable via EMAIL_SUBJECT_ERROR. """ subject = _get_subject( "EMAIL_SUBJECT_ERROR", "ScrAIbe: Error with your transcription request", ) body = ( "Hello,\n\n" "We encountered an error while processing your transcription request.\n\n" f"Details: {error_message}\n\n" "Job ID: " + str(task_id) + "\n\n" "Please contact your administrator if the problem persists.\n\n" "If you have any questions, contact us at " f"{os.getenv('EMAIL_CONTACT_ADDRESS', 'support@example.com')}.\n\n" "This is an automated message from ScrAIbe.\n" ) html = None try: html = load_template( "error_notification_template.html", exception=str(error_message), ) except EmailError as e: logger.warning("Failed to render error template: %s", e) try: send_email(to=to, subject=subject, body=body, html=html, attachments=[]) logger.info("Error email sent to %s for job %s", to, task_id) except EmailError as e: logger.error("Failed to send error email to %s for job %s: %s", to, task_id, e) @celery_app.task( name="scraibe.tasks.process_transcription_task", bind=True, max_retries=1, ) def process_transcription_task( self, audio_path: str, task_type: str, language: str, num_speakers: int, email_to: str, email_cc: str, include_summary: bool, identify_speakers: bool = False, ): """ Async task: transcribe audio, optionally summarize, then email results. Cleans up temporary files after completion. """ task_id = self.request.id log_level = os.getenv("LOG_LEVEL", "INFO") setup_logging(level=log_level) temp_files = [] local = _local_part(email_to) date_tag = _date_tag() try: # 1) Queue position and initial email queue_pos = get_queue_position(task_id) send_initial_email(to=email_to, queue_pos=queue_pos) # 2) Initialize Scraibe try: scraibe = Scraibe(verbose=True) except Exception as e: send_error_email( to=email_to, error_message=f"Failed to initialize transcription service: {e}", task_id=task_id, ) raise # 3) Transcription if task_type == "transcript_and_summarize": result = scraibe.transcript_and_summarize( audio_file=audio_path, language=language or None, num_speakers=int(num_speakers) if num_speakers else None, verbose=True, for_export=True, ) transcript_text = result.get("transcript", "") summary_text = result.get("summary", "") segments = result.get("segments", []) raw_result = result.get("raw_result") else: result = scraibe.transcribe( audio_file=audio_path, language=language or None, num_speakers=int(num_speakers) if num_speakers else None, verbose=True, for_export=True, ) transcript_text = result.get("transcript", "") summary_text = "" segments = result.get("segments", []) raw_result = result.get("raw_result") # 3b) Optional speaker identification speaker_map = {} # e.g. {"SPEAKER 1": "John", "SPEAKER 2": "Maria"} if identify_speakers: try: # Use the same summarizer client as transcript_and_summarize scraibe._ensure_summarizer() summarizer = scraibe._summarizer prompt = ( "Below is a transcript with speaker labels like 'SPEAKER 1', 'SPEAKER 2', etc. " "Based on the context and how each speaker talks, identify each speaker as: " "- Their real name, if it is clearly mentioned or strongly implied, OR " "- A concise role/position (e.g., Judge, Doctor, Manager, Interviewer, Client, Witness), " "if their identity is not clear. " "Do not invent random personal names. " "Do not add extra commentary. Output ONLY a mapping in this exact format, one per line: " "SPEAKER 1: Name or Role " "SPEAKER 2: Name or Role " "SPEAKER 3: Name or Role " "\n" "Transcript: " + transcript_text ) response = summarizer._chat_completion( messages=[{"role": "user", "content": prompt}], temperature=0.3, max_tokens=300, ) reply = (response or {}).get("choices", [{}])[0].get("message", {}).get("content", "") # Parse mapping import re for m in re.finditer( r"SPEAKER\s+(\d+)\s*:\s*(.+)", reply, re.IGNORECASE, ): spk = f"SPEAKER {m.group(1).strip()}" name = m.group(2).strip().rstrip(".").upper() if name: speaker_map[spk] = name logger.info("Speaker identification mapping: %s", speaker_map) # Apply mapping to transcript text if speaker_map: def replace_speaker(m): label = m.group(0).strip() # normalize to "SPEAKER N" normalized = re.sub( r"\s+", " ", re.sub(r"[^A-Z0-9\s]", "", label.upper()), ).strip() return speaker_map.get(normalized, label) # Replace in lines like "[00:12] SPEAKER 1:" but preserve timestamp and colon def replace_in_line(line: str) -> str: # match after timestamp bracket and space: "SPEAKER N:" return re.sub( r"(\[\d+:\d+(?::\d+)?\]\s*)([A-Z\s]+?):\s*", lambda m: m.group(1) + (speaker_map.get(m.group(2).strip(), m.group(2)) + ": "), line, ) transcript_lines = transcript_text.splitlines() transcript_text = "\n".join( replace_in_line(line) for line in transcript_lines ) # Also update segments for JSON export updated_segments = [] for seg in segments: sp = (seg.get("speaker") or "").strip() sp_norm = re.sub(r"[^A-Z0-9\s]", "", sp.upper()).strip() sp_new = speaker_map.get(sp_norm, sp) seg = dict(seg) seg["speaker"] = sp_new updated_segments.append(seg) segments = updated_segments except (SummarizerError, Exception) as e: logger.warning( "Speaker identification failed; falling back to Speaker IDs: %s", e ) speaker_map = {} # 4) Prepare files # Transcript .md md_transcript_path = _safe_filename("TRANSCRIPT", local, date_tag, ".md") with open(md_transcript_path, "w", encoding="utf-8") as f: f.write("# Transcript\n\n") f.write(transcript_text) temp_files.append(md_transcript_path) # Transcript .docx (standalone, no cover page) docx_transcript_path = _safe_filename("TRANSCRIPT", local, date_tag, ".docx") create_transcript_docx( transcript_text, docx_transcript_path, ) temp_files.append(docx_transcript_path) # JSON as SOURCE json_data = { "task": task_type, "transcript": transcript_text, "segments": segments, "metadata": { "timestamp": datetime.utcnow().isoformat(), "job_id": task_id, }, } if summary_text: json_data["summary"] = summary_text if raw_result is not None: json_data["raw_result"] = raw_result json_path = _safe_filename("SOURCE", local, date_tag, ".json") with open(json_path, "w", encoding="utf-8") as f: json.dump(json_data, f, indent=2, ensure_ascii=False) temp_files.append(json_path) # Summary files (if present) md_summary_path = None docx_summary_path = None if summary_text: # Summary .md md_summary_path = _safe_filename("SUMMARY", local, date_tag, ".md") with open(md_summary_path, "w", encoding="utf-8") as f: f.write("# Summary\n\n") f.write(summary_text) temp_files.append(md_summary_path) # Summary .docx (standalone, no cover page) docx_summary_path = _safe_filename("SUMMARY", local, date_tag, ".docx") create_summary_docx( summary_text, docx_summary_path, ) temp_files.append(docx_summary_path) # 5) Build attachments list # Always: JSON, transcript MD, transcript DOCX attachments = [ md_transcript_path, docx_transcript_path, json_path, ] # If summary is present, add summary MD and DOCX if summary_text: attachments += [md_summary_path, docx_summary_path] # 6) Send success email send_success_email( to=email_to, transcript_text=transcript_text, summary_text=summary_text if include_summary else "", attachments=attachments, task_id=task_id, ) logger.info("Job %s completed successfully.", task_id) except Exception as e: logger.error("Error processing job %s: %s", task_id, e, exc_info=True) send_error_email( to=email_to, error_message=str(e), task_id=task_id, ) raise e finally: # 7) Cleanup for path in temp_files: _remove_file(path) if audio_path: _remove_file(audio_path) logger.info("Cleanup completed for job %s.", task_id)