""" Email sender module for ScrAIbe. Sends transcription outputs (TXT, JSON, etc.) via SMTP. All credentials are configured via environment variables. Supports both plain text and HTML email bodies. Template placeholders are primarily filled via environment variables. """ import base64 import json import logging import os import re import smtplib import subprocess import tempfile from email import encoders from email.mime.base import MIMEBase from email.mime.multipart import MIMEMultipart from email.mime.text import MIMEText from typing import Any, Dict, List, Optional from docx import Document from docx.oxml import OxmlElement from docx.oxml.ns import qn from docx.shared import Inches, Pt from docx.enum.text import WD_ALIGN_PARAGRAPH logger = logging.getLogger("scraibe.email_sender") class EmailError(Exception): pass def get_email_config(): """ Read email configuration from environment variables. Raises EmailError if required fields are missing. """ smtp_host = os.getenv("EMAIL_SMTP_HOST") smtp_port = os.getenv("EMAIL_SMTP_PORT") smtp_user = os.getenv("EMAIL_SMTP_USER") smtp_password = os.getenv("EMAIL_SMTP_PASSWORD") from_address = os.getenv("EMAIL_FROM_ADDRESS") use_tls_str = os.getenv("EMAIL_SMTP_USE_TLS", "true").strip().lower() use_tls = use_tls_str not in ("false", "0", "no") if not all([smtp_host, smtp_port, smtp_user, smtp_password, from_address]): raise EmailError( "Email configuration incomplete. " "Ensure EMAIL_SMTP_HOST, EMAIL_SMTP_PORT, EMAIL_SMTP_USER, " "EMAIL_SMTP_PASSWORD, and EMAIL_FROM_ADDRESS are set." ) return { "smtp_host": smtp_host, "smtp_port": int(smtp_port), "smtp_user": smtp_user, "smtp_password": smtp_password, "from_address": from_address, "use_tls": use_tls, } def _load_css(path: str) -> str: """ Load CSS file content if it exists. """ if not path or not os.path.exists(path): return "" with open(path, "r", encoding="utf-8") as f: return f.read() def _email_logo_html() -> str: """ Return a subtle watermark-style logo for emails. - Priority: 1) EMAIL_LOGO_URL (direct URL) 2) EMAIL_LOGO_PATH (local file as base64) - Style: small, faint, bottom-right, non-intrusive. """ logo_url = os.getenv("EMAIL_LOGO_URL") src = logo_url if not logo_url: logo_path = os.getenv("EMAIL_LOGO_PATH", "/app/src/misc/logo1.png") if os.path.exists(logo_path): try: with open(logo_path, "rb") as f: b64 = base64.b64encode(f.read()).decode("utf-8") src = f"data:image/png;base64,{b64}" except Exception: src = None if not src: return "" # Watermark: bottom-right, low opacity, compact return ( f'
' f'Logo' f'
' ) def _accent_color() -> str: """ Accent color for UI and emails. Default: #7C6DA0 """ return os.getenv("EMAIL_ACCENT_COLOR", "#7C6DA0") def build_template_context(**runtime_kwargs: Any) -> Dict[str, Any]: """ Build a context dict for templates from: - environment variables (base, customizable) - runtime-provided values (override env if present) Environment variables: - EMAIL_CONTACT_ADDRESS: value for {contact_email} - EMAIL_CSS_PATH: path to mail_style.css (optional; we inline it) - EMAIL_LOGO_URL: URL for email logo (preferred) - EMAIL_LOGO_PATH: fallback local path for email logo - EMAIL_ACCENT_COLOR: accent color (default #7C6DA0) """ # Load and inline mail_style.css for consistent email styling css_path = os.getenv("EMAIL_CSS_PATH", "/app/src/misc/mail_style.css") css_text = _load_css(css_path) # Build logo HTML (URL or local fallback) logo_html = _email_logo_html() # Accent color accent = _accent_color() ctx: Dict[str, Any] = { "contact_email": os.getenv("EMAIL_CONTACT_ADDRESS", "support@example.com"), "email_css": css_text, "email_logo": logo_html, "accent_color": accent, } # Runtime values override env if provided if runtime_kwargs: ctx.update(runtime_kwargs) return ctx def load_template(template_name: str, **runtime_kwargs: Any) -> str: """ Load an HTML email template from misc/ and render placeholders. Expects files like: /app/src/misc/upload_notification_template.html /app/src/misc/success_template.html /app/src/misc/error_notification_template.html """ base = os.getenv("SCRAIBE_TEMPLATES_DIR", "/app/src/misc") path = os.path.join(base, template_name) if not os.path.exists(path): raise EmailError(f"Email template not found: {path}") with open(path, "r", encoding="utf-8") as f: template = f.read() # Build context from env + runtime ctx = build_template_context(**runtime_kwargs) # Replace {placeholder} style variables safely try: return template.format(**ctx) except KeyError as e: raise EmailError(f"Missing template variable: {e}") def send_email( to: str, subject: str, body: str, html: Optional[str], attachments: List[str], cc: Optional[str] = None, ) -> bool: """ Send an email with optional HTML body and file attachments. Args: to: Comma-separated list of recipient email addresses. subject: Email subject. body: Email body (plain text). html: Email body (HTML), or None. attachments: List of file paths to attach. cc: Comma-separated list of CC email addresses (optional). Returns: True if sent successfully. Raises: EmailError if sending fails. """ try: cfg = get_email_config() except EmailError as e: logger.error("Email configuration error: %s", e) raise # Parse recipients to_list = [addr.strip() for addr in to.split(",") if addr.strip()] cc_list = [addr.strip() for addr in cc.split(",") if addr.strip()] if cc else [] if not to_list: raise EmailError("No valid 'To' email addresses provided.") # Ensure subject is never blank if not subject or not subject.strip(): logger.warning("Subject was blank or missing; using default subject.") subject = "ScrAIbe: Your transcript is ready" subject = subject.strip() # Build message msg = MIMEMultipart("alternative") msg["From"] = cfg["from_address"] msg["To"] = ", ".join(to_list) if cc_list: msg["Cc"] = ", ".join(cc_list) msg["Subject"] = subject # Attach plain text msg.attach(MIMEText(body, "plain")) # Attach HTML if provided if html: msg.attach(MIMEText(html, "html")) # Attach files in a separate multipart/mixed part if attachments: mixed = MIMEMultipart("mixed") mixed.attach(msg) msg = mixed for file_path in attachments: if not os.path.isfile(file_path): logger.warning("Attachment file not found, skipping: %s", file_path) continue try: with open(file_path, "rb") as f: part = MIMEBase("application", "octet-stream") part.set_payload(f.read()) encoders.encode_base64(part) part.add_header( "Content-Disposition", "attachment", filename=os.path.basename(file_path), ) msg.attach(part) except Exception as e: logger.warning("Failed to attach file %s: %s", file_path, e) # Connect and send try: if cfg["use_tls"]: server = smtplib.SMTP(cfg["smtp_host"], cfg["smtp_port"], timeout=30) server.ehlo() server.starttls() server.ehlo() else: server = smtplib.SMTP(cfg["smtp_host"], cfg["smtp_port"], timeout=30) server.ehlo() server.login(cfg["smtp_user"], cfg["smtp_password"]) server.sendmail( cfg["from_address"], to_list + cc_list, msg.as_string(), ) server.quit() logger.info( "Email sent to %s (CC: %s) with subject: %s", to_list, cc_list or "None", subject, ) return True except Exception as e: logger.error("Failed to send email: %s", e) raise EmailError(f"Failed to send email: {e}") # ------------ DOCX helpers ------------ def _configure_base_font(doc): """ Set base document font to 12pt Courier. """ style = doc.styles["Normal"] style.font.name = "Courier" style.font.size = Pt(12) def _configure_section_margins(doc, margin=1.0): """ Set uniform margins on all sides (default 1 inch). """ for section in doc.sections: section.left_margin = Inches(margin) section.right_margin = Inches(margin) section.top_margin = Inches(margin) section.bottom_margin = Inches(margin) def _enable_line_numbering(section, start_at=1, count_by=1, restart=True): """ Enable line numbering for a specific section. """ sectPr = section._sectPr # Create the line numbering element: lnNumType = OxmlElement("w:lnNumType") lnNumType.set(qn("w:start"), str(start_at)) lnNumType.set(qn("w:countBy"), str(count_by)) # 'eachPage' restarts numbering on every page; 'continuous' keeps it going restart_mode = "eachPage" if restart else "continuous" lnNumType.set(qn("w:restart"), restart_mode) sectPr.append(lnNumType) def _add_cover_page(doc, doc_type, date, description): """ Add a cover page: - Centered horizontally and vertically using a full-page table. - Lines: 1) Document type 2) Date (e.g. "June 14, 2026") 3-5) Empty space 6) One-sentence description """ # Create a full-page table to center content vertically and horizontally table = doc.add_table(rows=1, cols=1) table.autofit = False cell = table.cell(0, 0) # Make table span full page width (approx) cell.width = Inches(6.5) # Center content inside the cell for paragraph in cell.paragraphs: paragraph.alignment = WD_ALIGN_PARAGRAPH.CENTER # Clear default paragraph cell.paragraphs[0].clear() # Line 1: Document type p_type = cell.add_paragraph() p_type.alignment = WD_ALIGN_PARAGRAPH.CENTER run_type = p_type.add_run(doc_type) run_type.bold = True run_type.font.name = "Courier" run_type.font.size = Pt(12) # Line 2: Date p_date = cell.add_paragraph() p_date.alignment = WD_ALIGN_PARAGRAPH.CENTER run_date = p_date.add_run(date) run_date.font.name = "Courier" run_date.font.size = Pt(12) # Lines 3-5: blank space for _ in range(3): cell.add_paragraph() # Line 6: Description p_desc = cell.add_paragraph() p_desc.alignment = WD_ALIGN_PARAGRAPH.CENTER run_desc = p_desc.add_run(description) run_desc.font.name = "Courier" run_desc.font.size = Pt(12) def _add_transcript_content(doc, text): """ Add transcript lines to the document with formatting. """ lines = text.strip().split("\n") for line in lines: line = line.strip() if not line: continue m = re.match(r"\[(\d+:\d+(?::\d+)?)\]\s*(.+?):\s*(.*)", line) if m: ts, speaker, content = m.groups() p = doc.add_paragraph() p.paragraph_format.left_indent = Inches(0.25) run_label = p.add_run(f"[{ts}] {speaker.upper()}:") run_label.bold = False run_label.underline = True run_label.font.name = "Courier" run_label.font.size = Pt(12) run_space = p.add_run(" ") run_space.bold = False run_space.underline = False run_space.font.name = "Courier" run_space.font.size = Pt(12) run_txt = p.add_run(content.strip()) run_txt.bold = False run_txt.underline = False run_txt.font.name = "Courier" run_txt.font.size = Pt(12) else: p = doc.add_paragraph() run = p.add_run(line) run.font.name = "Courier" run.font.size = Pt(12) def _add_summary_content(doc, text): """ Add summary content with heading styles. """ heading_count = 0 for line in text.splitlines(): stripped = line.strip() if not stripped: continue # Detect markdown-style headings: #, ##, ###, #### at start of line m = re.match(r"^(#{1,4})\s+(.*)", stripped) if m: heading_count += 1 content = m.group(2).strip() p = doc.add_paragraph() p.paragraph_format.space_after = Pt(4) run = p.add_run(content) run.font.name = "Courier" run.font.size = Pt(12) # Apply formatting based on this heading's ordinal position if heading_count == 1: run.bold = True elif heading_count == 2: run.italic = True elif heading_count == 3: run.underline = True elif heading_count >= 4: run.italic = True run.underline = True else: # Normal text line p = doc.add_paragraph(stripped) p.paragraph_format.space_after = Pt(4) # ------------ PDF helpers ------------ def _docx_to_pdf(docx_path: str, output_dir: str) -> str: """ Convert a .docx file to PDF using LibreOffice. Returns the path of the generated PDF, or None on failure. """ try: # Use LibreOffice in headless mode result = subprocess.run( [ "libreoffice", "--headless", "--convert-to", "pdf", "--outdir", output_dir, docx_path, ], capture_output=True, text=True, timeout=120, ) if result.returncode != 0: logger.warning("LibreOffice conversion failed: %s", result.stderr) # LibreOffice creates a PDF with the same base name base = os.path.splitext(os.path.basename(docx_path))[0] pdf_path = os.path.join(output_dir, f"{base}.pdf") if os.path.exists(pdf_path): return pdf_path else: logger.warning("Converted PDF not found at: %s", pdf_path) return None except Exception as e: logger.error("Error converting DOCX to PDF: %s", e) return None def _merge_pdfs(input_pdfs: List[str], output_pdf: str) -> bool: """ Merge multiple PDF files into a single PDF using PyPDF2. Returns True on success, False on failure. """ try: from PyPDF2 import PdfMerger merger = PdfMerger() for pdf in input_pdfs: if os.path.exists(pdf): merger.append(pdf) merger.write(output_pdf) merger.close() return True except Exception as e: logger.error("Error merging PDFs: %s", e) return False def _add_page_numbers_to_pdf(input_pdf: str, output_pdf: str) -> bool: """ Add page numbers to a PDF using reportlab. Page numbers appear at the bottom-right of each page. """ try: from reportlab.pdfgen import canvas from reportlab.lib.pagesizes import letter from reportlab.pdfbase import pdfmetrics from reportlab.pdfbase.ttfonts import TTFont from PyPDF2 import PdfReader # Try to use a standard font font_name = "Courier" try: pdfmetrics.registerFont( TTFont("Courier", "/usr/share/fonts/truetype/dejavu/DejaVuSansMono.ttf") ) except Exception: pass reader = PdfReader(input_pdf) num_pages = len(reader.pages) # Create a canvas to add page numbers c = canvas.Canvas(output_pdf, pagesize=letter) for page_num in range(num_pages): page = reader.pages[page_num] c = canvas.Canvas(output_pdf, pagesize=letter) # Add page number c.setFont(font_name, 10) page_text = f"Page {page_num + 1} of {num_pages}" text_width = c.stringWidth(page_text, font_name, 10) x = letter[0] - 1 * 72 - text_width # 1 inch from right y = 1 * 72 # 1 inch from bottom c.drawString(x, y, page_text) c.showPage() c.save() return True except Exception as e: logger.error("Error adding page numbers to PDF: %s", e) return False # ------------ Public DOCX/PDF functions ------------ def create_transcript_docx( text: str, filename: str, include_cover: bool = False, cover_date: str = "", cover_desc: str = "", ): """ Create a .docx transcript with: - 1" margins on all sides - 12pt Courier - Continuous line numbering on the left (for transcript content only) - Optional cover page with type, date, and AI-generated description. """ doc = Document() _configure_base_font(doc) _configure_section_margins(doc) # 1) Optional cover page (no line numbering) if include_cover and cover_date and cover_desc: _add_cover_page(doc, "TRANSCRIPT", cover_date, cover_desc) doc.add_page_break() # 2) Enable line numbering for transcript section _enable_line_numbering(doc.sections[0]) # 3) Transcript content (with line numbering) _add_transcript_content(doc, text) doc.save(filename) def create_summary_docx( text: str, filename: str, include_cover: bool = False, cover_date: str = "", cover_desc: str = "", ): """ Create a .docx summary with consistent font and heading styles. Optional cover page with type, date, and AI-generated description. No line numbering. """ doc = Document() _configure_base_font(doc) _configure_section_margins(doc) # 1) Optional cover page if include_cover and cover_date and cover_desc: _add_cover_page(doc, "SUMMARY", cover_date, cover_desc) doc.add_page_break() # 2) Summary content (no line numbering) _add_summary_content(doc, text) doc.save(filename) def create_combined_docx( transcript_text: str, summary_text: str, filename: str, transcript_cover_date: str, transcript_cover_desc: str, summary_cover_date: str, summary_cover_desc: str, ): """ Create a combined .docx with: 1) Transcript cover page (no line numbering) 2) Page break 3) Summary content (no line numbering) 4) Page break 5) Transcript content (line numbering enabled only here) Line numbering is restricted to the transcript section only. """ doc = Document() _configure_base_font(doc) _configure_section_margins(doc) # 1) Transcript cover page (no line numbering) _add_cover_page(doc, "TRANSCRIPT", transcript_cover_date, transcript_cover_desc) doc.add_page_break() # 3) Summary content (no line numbering) _add_summary_content(doc, summary_text) doc.add_page_break() # 4) Enable line numbering for transcript section _enable_line_numbering(doc.sections[0]) # 5) Transcript content (with line numbering) _add_transcript_content(doc, transcript_text) doc.save(filename) def generate_pdf_documents( transcript_text: str, summary_text: str, output_dir: str, transcript_cover_date: str, transcript_cover_desc: str, summary_cover_date: str, summary_cover_desc: str, ) -> Dict[str, str]: """ Generate PDF documents by: - Creating individual .docx components (cover pages, transcript, summary) - Converting each .docx to PDF - Merging PDFs in the correct order - Adding page numbers to the final PDFs Behavior: - Always: - Generate TRANSCRIPT.pdf: - transcript_cover.pdf + transcript_with_line_numbers.pdf - If summary_text is provided: - Generate SUMMARY.pdf: - summary_cover.pdf + summary.pdf - Generate COMBINED.pdf: - transcript_cover.pdf + summary.pdf + transcript_with_line_numbers.pdf Returns a dict with paths: - transcript_pdf (always) - summary_pdf (if summary_text provided) - combined_pdf (if summary_text provided) """ import os os.makedirs(output_dir, exist_ok=True) result = {} # Create temporary directory for intermediate files with tempfile.TemporaryDirectory() as tmpdir: # 1) Generate constituent DOCX files # Transcript cover transcript_cover_docx = os.path.join(tmpdir, "TRANSCRIPT_COVER.docx") doc = Document() _configure_base_font(doc) _configure_section_margins(doc) _add_cover_page(doc, "TRANSCRIPT", transcript_cover_date, transcript_cover_desc) doc.save(transcript_cover_docx) # Summary cover (only if summary is requested) summary_cover_docx = None if summary_text: summary_cover_docx = os.path.join(tmpdir, "SUMMARY_COVER.docx") doc = Document() _configure_base_font(doc) _configure_section_margins(doc) _add_cover_page(doc, "SUMMARY", summary_cover_date, summary_cover_desc) doc.save(summary_cover_docx) # Transcript (with line numbering) transcript_docx = os.path.join(tmpdir, "TRANSCRIPT.docx") doc = Document() _configure_base_font(doc) _configure_section_margins(doc) _enable_line_numbering(doc.sections[0]) _add_transcript_content(doc, transcript_text) doc.save(transcript_docx) # Summary (no line numbering) summary_docx = None if summary_text: summary_docx = os.path.join(tmpdir, "SUMMARY.docx") doc = Document() _configure_base_font(doc) _configure_section_margins(doc) _add_summary_content(doc, summary_text) doc.save(summary_docx) # 2) Convert DOCX to PDF transcript_cover_pdf = _docx_to_pdf(transcript_cover_docx, tmpdir) transcript_pdf = _docx_to_pdf(transcript_docx, tmpdir) summary_cover_pdf = None summary_pdf = None if summary_text: summary_cover_pdf = _docx_to_pdf(summary_cover_docx, tmpdir) summary_pdf = _docx_to_pdf(summary_docx, tmpdir) # 3) Assemble TRANSCRIPT.pdf: transcript_cover + transcript + page numbers transcript_output_pdf = os.path.join(output_dir, "TRANSCRIPT.pdf") merged_transcript = os.path.join(tmpdir, "TRANSCRIPT_MERGED.pdf") if ( transcript_cover_pdf and transcript_pdf and _merge_pdfs([transcript_cover_pdf, transcript_pdf], merged_transcript) and _add_page_numbers_to_pdf(merged_transcript, transcript_output_pdf) ): result["transcript_pdf"] = transcript_output_pdf # 4) If summary is provided: # - SUMMARY.pdf: summary_cover + summary + page numbers # - COMBINED.pdf: transcript_cover + summary + transcript + page numbers if summary_text and summary_pdf and summary_cover_pdf: # SUMMARY.pdf summary_output_pdf = os.path.join(output_dir, "SUMMARY.pdf") merged_summary = os.path.join(tmpdir, "SUMMARY_MERGED.pdf") if ( _merge_pdfs([summary_cover_pdf, summary_pdf], merged_summary) and _add_page_numbers_to_pdf(merged_summary, summary_output_pdf) ): result["summary_pdf"] = summary_output_pdf # COMBINED.pdf combined_output_pdf = os.path.join(output_dir, "COMBINED.pdf") merged_combined = os.path.join(tmpdir, "COMBINED_MERGED.pdf") if ( _merge_pdfs( [transcript_cover_pdf, summary_pdf, transcript_pdf], merged_combined, ) and _add_page_numbers_to_pdf(merged_combined, combined_output_pdf) ): result["combined_pdf"] = combined_output_pdf return result def send_success_email( to: str, transcript_text: str, summary_text: str, attachments: List[str], task_id: str, ): """ Send a success email with attachments. Subject is customizable via EMAIL_SUBJECT_SUCCESS. Falls back to a safe default if the env var is missing or blank. """ # Read subject from environment; never allow blank raw_subject = os.getenv("EMAIL_SUBJECT_SUCCESS") subject = (raw_subject or "").strip() if not subject: subject = "ScrAIbe: Your transcript is ready" logger.info( "EMAIL_SUBJECT_SUCCESS not set or blank; using default subject: %s", subject ) else: logger.info("Using EMAIL_SUBJECT_SUCCESS: %s", subject) # Build email body body = f""" Your transcription is complete. Task ID: {task_id} Please find the attached documents: - Transcript (MD) - Transcript (DOCX) - Source JSON """ if summary_text: body += "- Summary (MD)\n- Summary (DOCX)\n" # Load HTML template try: html = load_template( "success_template.html", task_id=task_id, transcript_text=transcript_text[:500], summary_text=summary_text[:500] if summary_text else "", ) except EmailError: html = None # Send email (send_email has an additional subject guard) send_email( to=to, subject=subject, body=body, html=html, attachments=attachments, ) def send_error_email( to: str, error_message: str, task_id: str, ): """ Send an error email. Subject is customizable via EMAIL_SUBJECT_ERROR. Falls back to a safe default if the env var is missing or blank. """ # Read subject from environment; never allow blank raw_subject = os.getenv("EMAIL_SUBJECT_ERROR") subject = (raw_subject or "").strip() if not subject: subject = "ScrAIbe: Error with your transcription request" logger.info( "EMAIL_SUBJECT_ERROR not set or blank; using default subject: %s", subject ) else: logger.info("Using EMAIL_SUBJECT_ERROR: %s", subject) # Build email body body = f""" There was an error processing your transcription. Task ID: {task_id} Error: {error_message} """ # Load HTML template try: html = load_template( "error_notification_template.html", task_id=task_id, error_message=error_message, ) except EmailError: html = None # Send email (send_email has an additional subject guard) send_email( to=to, subject=subject, body=body, html=html, attachments=[], )