""" Email sender module for ScrAIbe. Sends transcription outputs (TXT, JSON, etc.) via SMTP. All credentials are configured via environment variables. Supports both plain text and HTML email bodies. Template placeholders are primarily filled via environment variables. """ import base64 import json import logging import os import re import smtplib from email import encoders from email.mime.base import MIMEBase from email.mime.multipart import MIMEMultipart from email.mime.text import MIMEText from typing import Any, Dict, List, Optional from docx import Document from docx.oxml import OxmlElement from docx.oxml.ns import qn from docx.shared import Inches, Pt from docx.enum.text import WD_ALIGN_PARAGRAPH logger = logging.getLogger("scraibe.email_sender") class EmailError(Exception): pass def get_email_config(): """ Read email configuration from environment variables. Raises EmailError if required fields are missing. """ smtp_host = os.getenv("EMAIL_SMTP_HOST") smtp_port = os.getenv("EMAIL_SMTP_PORT") smtp_user = os.getenv("EMAIL_SMTP_USER") smtp_password = os.getenv("EMAIL_SMTP_PASSWORD") from_address = os.getenv("EMAIL_FROM_ADDRESS") use_tls_str = os.getenv("EMAIL_SMTP_USE_TLS", "true").strip().lower() use_tls = use_tls_str not in ("false", "0", "no") if not all([smtp_host, smtp_port, smtp_user, smtp_password, from_address]): raise EmailError( "Email configuration incomplete. " "Ensure EMAIL_SMTP_HOST, EMAIL_SMTP_PORT, EMAIL_SMTP_USER, " "EMAIL_SMTP_PASSWORD, and EMAIL_FROM_ADDRESS are set." ) return { "smtp_host": smtp_host, "smtp_port": int(smtp_port), "smtp_user": smtp_user, "smtp_password": smtp_password, "from_address": from_address, "use_tls": use_tls, } def _load_css(path: str) -> str: """ Load CSS file content if it exists. """ if not path or not os.path.exists(path): return "" with open(path, "r", encoding="utf-8") as f: return f.read() def _email_logo_html() -> str: """ Return a subtle watermark-style logo for emails. - Priority: 1) EMAIL_LOGO_URL (direct URL) 2) EMAIL_LOGO_PATH (local file as base64) - Style: small, faint, bottom-right, non-intrusive. """ logo_url = os.getenv("EMAIL_LOGO_URL") src = logo_url if not logo_url: logo_path = os.getenv("EMAIL_LOGO_PATH", "/app/src/misc/logo1.png") if os.path.exists(logo_path): try: with open(logo_path, "rb") as f: b64 = base64.b64encode(f.read()).decode("utf-8") src = f"data:image/png;base64,{b64}" except Exception: src = None if not src: return "" # Watermark: bottom-right, low opacity, compact return ( f'
' f'Logo' f'
' ) def _accent_color() -> str: """ Accent color for UI and emails. Default: #7C6DA0 """ return os.getenv("EMAIL_ACCENT_COLOR", "#7C6DA0") def build_template_context(**runtime_kwargs: Any) -> Dict[str, Any]: """ Build a context dict for templates from: - environment variables (base, customizable) - runtime-provided values (override env if present) Environment variables: - EMAIL_CONTACT_ADDRESS: value for {contact_email} - EMAIL_CSS_PATH: path to mail_style.css (optional; we inline it) - EMAIL_LOGO_URL: URL for email logo (preferred) - EMAIL_LOGO_PATH: fallback local path for email logo - EMAIL_ACCENT_COLOR: accent color (default #7C6DA0) """ # Load and inline mail_style.css for consistent email styling css_path = os.getenv("EMAIL_CSS_PATH", "/app/src/misc/mail_style.css") css_text = _load_css(css_path) # Build logo HTML (URL or local fallback) logo_html = _email_logo_html() # Accent color accent = _accent_color() ctx: Dict[str, Any] = { "contact_email": os.getenv("EMAIL_CONTACT_ADDRESS", "support@example.com"), "email_css": css_text, "email_logo": logo_html, "accent_color": accent, } # Runtime values override env if provided if runtime_kwargs: ctx.update(runtime_kwargs) return ctx def load_template(template_name: str, **runtime_kwargs: Any) -> str: """ Load an HTML email template from misc/ and render placeholders. Expects files like: /app/src/misc/upload_notification_template.html /app/src/misc/success_template.html /app/src/misc/error_notification_template.html """ base = os.getenv("SCRAIBE_TEMPLATES_DIR", "/app/src/misc") path = os.path.join(base, template_name) if not os.path.exists(path): raise EmailError(f"Email template not found: {path}") with open(path, "r", encoding="utf-8") as f: template = f.read() # Build context from env + runtime ctx = build_template_context(**runtime_kwargs) # Replace {placeholder} style variables safely try: return template.format(**ctx) except KeyError as e: raise EmailError(f"Missing template variable: {e}") def send_email( to: str, subject: str, body: str, html: Optional[str], attachments: List[str], cc: Optional[str] = None, ) -> bool: """ Send an email with optional HTML body and file attachments. Args: to: Comma-separated list of recipient email addresses. subject: Email subject. body: Email body (plain text). html: Email body (HTML), or None. attachments: List of file paths to attach. cc: Comma-separated list of CC email addresses (optional). Returns: True if sent successfully. Raises: EmailError if sending fails. """ try: cfg = get_email_config() except EmailError as e: logger.error("Email configuration error: %s", e) raise # Parse recipients to_list = [addr.strip() for addr in to.split(",") if addr.strip()] cc_list = [addr.strip() for addr in cc.split(",") if addr.strip()] if cc else [] if not to_list: raise EmailError("No valid 'To' email addresses provided.") # Ensure subject is never blank if not subject or not subject.strip(): logger.warning("Subject was blank or missing; using default subject.") subject = "ScrAIbe: Your transcript is ready" subject = subject.strip() has_attachments = bool(attachments) # Build the text/HTML part (alternative) alt = MIMEMultipart("alternative") alt.attach(MIMEText(body, "plain")) if html: alt.attach(MIMEText(html, "html")) if has_attachments: # Outer message: multipart/mixed with headers msg = MIMEMultipart("mixed") msg["From"] = cfg["from_address"] msg["To"] = ", ".join(to_list) if cc_list: msg["Cc"] = ", ".join(cc_list) msg["Subject"] = subject # Attach the alternative (text/HTML) part msg.attach(alt) # Attach files for file_path in attachments: if not os.path.isfile(file_path): logger.warning("Attachment file not found, skipping: %s", file_path) continue try: with open(file_path, "rb") as f: part = MIMEBase("application", "octet-stream") part.set_payload(f.read()) encoders.encode_base64(part) part.add_header( "Content-Disposition", "attachment", filename=os.path.basename(file_path), ) msg.attach(part) except Exception as e: logger.warning("Failed to attach file %s: %s", file_path, e) else: # No attachments: use the alternative part as the root message msg = alt msg["From"] = cfg["from_address"] msg["To"] = ", ".join(to_list) if cc_list: msg["Cc"] = ", ".join(cc_list) msg["Subject"] = subject # Connect and send try: if cfg["use_tls"]: server = smtplib.SMTP(cfg["smtp_host"], cfg["smtp_port"], timeout=30) server.ehlo() server.starttls() server.ehlo() else: server = smtplib.SMTP(cfg["smtp_host"], cfg["smtp_port"], timeout=30) server.ehlo() server.login(cfg["smtp_user"], cfg["smtp_password"]) server.sendmail( cfg["from_address"], to_list + cc_list, msg.as_string(), ) server.quit() logger.info( "Email sent to %s (CC: %s) with subject: %s", to_list, cc_list or "None", subject, ) return True except Exception as e: logger.error("Failed to send email: %s", e) raise EmailError(f"Failed to send email: {e}") # ------------ DOCX helpers ------------ # Namespaces W_NS = "http://schemas.openxmlformats.org/wordprocessingml/2006/main" def _set_element_attr(elem, attr, value): elem.set(f"{{{W_NS}}}{attr}", str(value)) def _create_transcript_section_properties(section): """ Configure the section properties for transcript DOCX: - Margins: 1 inch all sides - Single column layout - No built-in line numbering (we embed line numbers as text for portability) - Remove document grid to avoid off-by-one line numbering """ sectPr = section._sectPr # Margins: 1 inch = 1440 twips pgMar = sectPr.find(f"{{{W_NS}}}pgMar") if pgMar is None: pgMar = OxmlElement("w:pgMar") sectPr.append(pgMar) _set_element_attr(pgMar, "top", "1440") _set_element_attr(pgMar, "right", "1440") _set_element_attr(pgMar, "bottom", "1440") _set_element_attr(pgMar, "left", "1440") _set_element_attr(pgMar, "header", "720") _set_element_attr(pgMar, "footer", "720") _set_element_attr(pgMar, "gutter", "0") # Ensure single column (no multi-column layout) cols = sectPr.find(f"{{{W_NS}}}cols") if cols is not None: _set_element_attr(cols, "num", "1") _set_element_attr(cols, "space", "720") # Remove document grid entirely for docGrid in sectPr.findall(f"{{{W_NS}}}docGrid"): sectPr.remove(docGrid) # Remove any built-in line numbering; we will use text-based line numbers for lnNumType in sectPr.findall(f"{{{W_NS}}}lnNumType"): sectPr.remove(lnNumType) def _add_transcript_paragraph(doc, line_text, line_number): """ Add a single transcript line as a paragraph with an embedded line number. Uses a left tab stop so the line number appears in the left margin area, independent of built-in line numbering, ensuring consistent behavior across Word, LibreOffice, Google Docs, etc. """ line_text = line_text.strip() if not line_text: return p = doc.add_paragraph() # Set up paragraph formatting: # - No left indent; we control spacing via tab stop # - Single line spacing, no extra before/after pPr = p._p.get_or_add_pPr() # Remove any default indent pPr.find(f"{{{W_NS}}}ind") and pPr.remove(pPr.find(f"{{{W_NS}}}ind")) # Define a left tab stop for line numbers (e.g. 360 twips ≈ 0.25") tabs = OxmlElement("w:tabs") tab = OxmlElement("w:tab") tab.set("{http://schemas.openxmlformats.org/wordprocessingml/2006/main}val", "left") tab.set("{http://schemas.openxmlformats.org/wordprocessingml/2006/main}pos", "360") tabs.append(tab) pPr.append(tabs) spacing = OxmlElement("w:spacing") _set_element_attr(spacing, "before", "0") _set_element_attr(spacing, "after", "0") _set_element_attr(spacing, "line", "360") # 1.5 line spacing (12pt * 1.5 = 18pt → 360 twips) _set_element_attr(spacing, "lineRule", "auto") pPr.append(spacing) # Try to match: [00:00] SPEAKER 1: content m = re.match(r"\[(\d+:\d+(?::\d+)?)\]\s*(.+?):\s*(.*)", line_text) # Line number run (no underline) run_ln = p.add_run(str(line_number)) run_ln.font.name = "Courier" run_ln.font.size = Pt(12) run_ln.underline = False # Tab + two spaces between line number and content for extra white space run_tab = p.add_run("\t ") run_tab.font.name = "Courier" run_tab.font.size = Pt(12) run_tab.underline = False if m: ts, speaker, content = m.groups() label_text = f"[{ts}] {speaker.upper()}:" # Label run (underline) run_label = p.add_run(label_text) run_label.underline = True run_label.font.name = "Courier" run_label.font.size = Pt(12) # Space run (no underline) run_space = p.add_run(" ") run_space.underline = False run_space.font.name = "Courier" run_space.font.size = Pt(12) # Content run (no underline) run_txt = p.add_run(content.strip()) run_txt.underline = False run_txt.font.name = "Courier" run_txt.font.size = Pt(12) else: # Non-standard line: plain text run = p.add_run(line_text) run.underline = False run.font.name = "Courier" run.font.size = Pt(12) # ------------ Public DOCX functions ------------ def create_transcript_docx(text: str, filename: str): """ Create a transcript DOCX with: - 1" margins on all sides - 12pt Courier font - Embedded line numbers starting at 1 on the first page (portable across Word, LibreOffice, Google Docs) - Line numbers reflect visual lines on the page, not speaker turns. - Proper formatting for timestamps and speaker labels """ doc = Document() # Set base font (Normal style) style = doc.styles["Normal"] style.font.name = "Courier" style.font.size = Pt(12) # Remove any default paragraphs (ensure no phantom first line) body = doc.element.body for p in list(body.findall(f"{{{W_NS}}}p")): body.remove(p) # Configure section properties (margins, no built-in line numbering) _create_transcript_section_properties(doc.sections[0]) # Max characters per visual line (for 12pt Courier, 1" margins) max_chars = 60 # Lines per page before restarting numbering lines_per_page = 29 # Current line counter for visual lines line_number = 0 # Split transcript into logical lines logical_lines = text.strip().splitlines() def ensure_new_page_if_needed(): nonlocal line_number if line_number >= lines_per_page: # Insert a page break paragraph (no line number, no text) p_break = doc.add_paragraph() pPr = p_break._p.get_or_add_pPr() # Clear any inherited formatting for child in list(pPr): tag = child.tag.split("}")[-1] if "}" in child.tag else child.tag if tag in ("tabs", "spacing", "ind"): pPr.remove(child) # Standard page break via paragraph property page_break = OxmlElement("w:pageBreak") page_break.set("{http://schemas.openxmlformats.org/wordprocessingml/2006/main}val", "1") pPr.append(page_break) # Reset line counter for new page line_number = 0 for line in logical_lines: line = line.strip() if not line: continue # Try to match: [00:00] SPEAKER 1: content m = re.match(r"\[(\d+:\d+(?::\d+)?)\]\s*(.+?):\s*(.*)", line) if m: ts, speaker, content = m.groups() label_text = f"[{ts}] {speaker.upper()}:" content = content.strip() else: label_text = "" content = line.strip() # Split content into visual lines at word boundaries content_lines = [] words = content.split() current = "" for w in words: if len(current) == 0: current = w elif len(current) + 1 + len(w) <= max_chars: current += " " + w else: content_lines.append(current) current = w if current: content_lines.append(current) # First visual line: include label if present, ensuring total <= max_chars if content_lines: ensure_new_page_if_needed() first_content = content_lines.pop(0) if label_text: prefix = label_text + " " # If too long, trim first_content at word boundary if len(prefix) + len(first_content) > max_chars: allowed = max_chars - len(prefix) if allowed < 1: allowed = 1 # Truncate at word boundary candidate = first_content[:allowed] last_space = candidate.rfind(" ") if last_space > 0: candidate = candidate[:last_space] first_content = candidate first_line_text = prefix + first_content else: first_line_text = first_content line_number += 1 _add_transcript_paragraph(doc, first_line_text, line_number=line_number) # Subsequent visual lines: no label, just content for cl in content_lines: ensure_new_page_if_needed() line_number += 1 _add_transcript_paragraph(doc, cl, line_number=line_number) # Add page numbers to footer: "X of Y" (bottom left) section = doc.sections[0] footer = section.footer footer.is_linked_to_previous = False footer_para = footer.paragraphs[0] if footer.paragraphs else footer.add_paragraph() footer_para.alignment = WD_ALIGN_PARAGRAPH.LEFT # Clear any existing content for r in footer_para.runs: r.text = "" def add_field(run, code): fldChar = OxmlElement("w:fldChar") fldChar.set(qn("w:fldCharType"), "begin") run._r.append(fldChar) instrText = OxmlElement("w:instrText") instrText.set(qn("xml:space"), "preserve") instrText.text = code run._r.append(instrText) fldCharEnd = OxmlElement("w:fldChar") fldCharEnd.set(qn("w:fldCharType"), "end") run._r.append(fldCharEnd) run_page = footer_para.add_run() add_field(run_page, " PAGE ") run_of = footer_para.add_run(" of ") run_total = footer_para.add_run() add_field(run_total, " NUMPAGES ") # Save doc.save(filename) def create_summary_docx(text: str, filename: str): """ Create a summary DOCX with: - 1" margins on all sides - 12pt Courier font - No line numbering """ doc = Document() # Base font style = doc.styles["Normal"] style.font.name = "Courier" style.font.size = Pt(12) # Margins: 1 inch all sides for section in doc.sections: section.left_margin = Inches(1.0) section.right_margin = Inches(1.0) section.top_margin = Inches(1.0) section.bottom_margin = Inches(1.0) # Remove default paragraph body = doc.element.body for p in list(body.findall(f"{{{W_NS}}}p")): body.remove(p) # Add summary content lines = text.strip().splitlines() for line in lines: line = line.strip() if not line: continue p = doc.add_paragraph(line) p.paragraph_format.space_after = Pt(4) doc.save(filename)