552 lines
17 KiB
Python
552 lines
17 KiB
Python
"""
|
|
Email sender module for ScrAIbe.
|
|
|
|
Sends transcription outputs (TXT, JSON, etc.) via SMTP.
|
|
All credentials are configured via environment variables.
|
|
Supports both plain text and HTML email bodies.
|
|
Template placeholders are primarily filled via environment variables.
|
|
"""
|
|
|
|
import base64
|
|
import json
|
|
import logging
|
|
import os
|
|
import re
|
|
import smtplib
|
|
from email import encoders
|
|
from email.mime.base import MIMEBase
|
|
from email.mime.multipart import MIMEMultipart
|
|
from email.mime.text import MIMEText
|
|
from typing import Any, Dict, List, Optional
|
|
|
|
from docx import Document
|
|
from docx.oxml import OxmlElement
|
|
from docx.oxml.ns import qn
|
|
from docx.shared import Inches, Pt
|
|
from docx.enum.text import WD_ALIGN_PARAGRAPH
|
|
|
|
logger = logging.getLogger("scraibe.email_sender")
|
|
|
|
|
|
class EmailError(Exception):
|
|
pass
|
|
|
|
|
|
def get_email_config():
|
|
"""
|
|
Read email configuration from environment variables.
|
|
Raises EmailError if required fields are missing.
|
|
"""
|
|
smtp_host = os.getenv("EMAIL_SMTP_HOST")
|
|
smtp_port = os.getenv("EMAIL_SMTP_PORT")
|
|
smtp_user = os.getenv("EMAIL_SMTP_USER")
|
|
smtp_password = os.getenv("EMAIL_SMTP_PASSWORD")
|
|
from_address = os.getenv("EMAIL_FROM_ADDRESS")
|
|
use_tls_str = os.getenv("EMAIL_SMTP_USE_TLS", "true").strip().lower()
|
|
use_tls = use_tls_str not in ("false", "0", "no")
|
|
|
|
if not all([smtp_host, smtp_port, smtp_user, smtp_password, from_address]):
|
|
raise EmailError(
|
|
"Email configuration incomplete. "
|
|
"Ensure EMAIL_SMTP_HOST, EMAIL_SMTP_PORT, EMAIL_SMTP_USER, "
|
|
"EMAIL_SMTP_PASSWORD, and EMAIL_FROM_ADDRESS are set."
|
|
)
|
|
|
|
return {
|
|
"smtp_host": smtp_host,
|
|
"smtp_port": int(smtp_port),
|
|
"smtp_user": smtp_user,
|
|
"smtp_password": smtp_password,
|
|
"from_address": from_address,
|
|
"use_tls": use_tls,
|
|
}
|
|
|
|
|
|
def _load_css(path: str) -> str:
|
|
"""
|
|
Load CSS file content if it exists.
|
|
"""
|
|
if not path or not os.path.exists(path):
|
|
return ""
|
|
with open(path, "r", encoding="utf-8") as f:
|
|
return f.read()
|
|
|
|
|
|
def _email_logo_html() -> str:
|
|
"""
|
|
Return a subtle watermark-style logo for emails.
|
|
|
|
- Priority:
|
|
1) EMAIL_LOGO_URL (direct URL)
|
|
2) EMAIL_LOGO_PATH (local file as base64)
|
|
- Style: small, faint, bottom-right, non-intrusive.
|
|
"""
|
|
logo_url = os.getenv("EMAIL_LOGO_URL")
|
|
src = logo_url
|
|
|
|
if not logo_url:
|
|
logo_path = os.getenv("EMAIL_LOGO_PATH", "/app/src/misc/logo1.png")
|
|
if os.path.exists(logo_path):
|
|
try:
|
|
with open(logo_path, "rb") as f:
|
|
b64 = base64.b64encode(f.read()).decode("utf-8")
|
|
src = f"data:image/png;base64,{b64}"
|
|
except Exception:
|
|
src = None
|
|
|
|
if not src:
|
|
return ""
|
|
|
|
# Watermark: bottom-right, low opacity, compact
|
|
return (
|
|
f'<div style="text-align: right; margin-top: 24px; opacity: 0.15;">'
|
|
f'<img src="{src}" alt="Logo" style="max-width: 90px; height: auto; display: inline-block;" />'
|
|
f'</div>'
|
|
)
|
|
|
|
|
|
def _accent_color() -> str:
|
|
"""
|
|
Accent color for UI and emails.
|
|
Default: #7C6DA0
|
|
"""
|
|
return os.getenv("EMAIL_ACCENT_COLOR", "#7C6DA0")
|
|
|
|
|
|
def build_template_context(**runtime_kwargs: Any) -> Dict[str, Any]:
|
|
"""
|
|
Build a context dict for templates from:
|
|
- environment variables (base, customizable)
|
|
- runtime-provided values (override env if present)
|
|
|
|
Environment variables:
|
|
- EMAIL_CONTACT_ADDRESS: value for {contact_email}
|
|
- EMAIL_CSS_PATH: path to mail_style.css (optional; we inline it)
|
|
- EMAIL_LOGO_URL: URL for email logo (preferred)
|
|
- EMAIL_LOGO_PATH: fallback local path for email logo
|
|
- EMAIL_ACCENT_COLOR: accent color (default #7C6DA0)
|
|
"""
|
|
# Load and inline mail_style.css for consistent email styling
|
|
css_path = os.getenv("EMAIL_CSS_PATH", "/app/src/misc/mail_style.css")
|
|
css_text = _load_css(css_path)
|
|
|
|
# Build logo HTML (URL or local fallback)
|
|
logo_html = _email_logo_html()
|
|
|
|
# Accent color
|
|
accent = _accent_color()
|
|
|
|
ctx: Dict[str, Any] = {
|
|
"contact_email": os.getenv("EMAIL_CONTACT_ADDRESS", "support@example.com"),
|
|
"email_css": css_text,
|
|
"email_logo": logo_html,
|
|
"accent_color": accent,
|
|
}
|
|
|
|
# Runtime values override env if provided
|
|
if runtime_kwargs:
|
|
ctx.update(runtime_kwargs)
|
|
|
|
return ctx
|
|
|
|
|
|
def load_template(template_name: str, **runtime_kwargs: Any) -> str:
|
|
"""
|
|
Load an HTML email template from misc/ and render placeholders.
|
|
|
|
Expects files like:
|
|
/app/src/misc/upload_notification_template.html
|
|
/app/src/misc/success_template.html
|
|
/app/src/misc/error_notification_template.html
|
|
"""
|
|
base = os.getenv("SCRAIBE_TEMPLATES_DIR", "/app/src/misc")
|
|
path = os.path.join(base, template_name)
|
|
|
|
if not os.path.exists(path):
|
|
raise EmailError(f"Email template not found: {path}")
|
|
|
|
with open(path, "r", encoding="utf-8") as f:
|
|
template = f.read()
|
|
|
|
# Build context from env + runtime
|
|
ctx = build_template_context(**runtime_kwargs)
|
|
|
|
# Replace {placeholder} style variables safely
|
|
try:
|
|
return template.format(**ctx)
|
|
except KeyError as e:
|
|
raise EmailError(f"Missing template variable: {e}")
|
|
|
|
|
|
def send_email(
|
|
to: str,
|
|
subject: str,
|
|
body: str,
|
|
html: Optional[str],
|
|
attachments: List[str],
|
|
cc: Optional[str] = None,
|
|
) -> bool:
|
|
"""
|
|
Send an email with optional HTML body and file attachments.
|
|
|
|
Args:
|
|
to: Comma-separated list of recipient email addresses.
|
|
subject: Email subject.
|
|
body: Email body (plain text).
|
|
html: Email body (HTML), or None.
|
|
attachments: List of file paths to attach.
|
|
cc: Comma-separated list of CC email addresses (optional).
|
|
|
|
Returns:
|
|
True if sent successfully.
|
|
|
|
Raises:
|
|
EmailError if sending fails.
|
|
"""
|
|
try:
|
|
cfg = get_email_config()
|
|
except EmailError as e:
|
|
logger.error("Email configuration error: %s", e)
|
|
raise
|
|
|
|
# Parse recipients
|
|
to_list = [addr.strip() for addr in to.split(",") if addr.strip()]
|
|
cc_list = [addr.strip() for addr in cc.split(",") if addr.strip()] if cc else []
|
|
|
|
if not to_list:
|
|
raise EmailError("No valid 'To' email addresses provided.")
|
|
|
|
# Ensure subject is never blank
|
|
if not subject or not subject.strip():
|
|
logger.warning("Subject was blank or missing; using default subject.")
|
|
subject = "ScrAIbe: Your transcript is ready"
|
|
|
|
subject = subject.strip()
|
|
|
|
has_attachments = bool(attachments)
|
|
|
|
# Build the text/HTML part (alternative)
|
|
alt = MIMEMultipart("alternative")
|
|
alt.attach(MIMEText(body, "plain"))
|
|
if html:
|
|
alt.attach(MIMEText(html, "html"))
|
|
|
|
if has_attachments:
|
|
# Outer message: multipart/mixed with headers
|
|
msg = MIMEMultipart("mixed")
|
|
msg["From"] = cfg["from_address"]
|
|
msg["To"] = ", ".join(to_list)
|
|
if cc_list:
|
|
msg["Cc"] = ", ".join(cc_list)
|
|
msg["Subject"] = subject
|
|
|
|
# Attach the alternative (text/HTML) part
|
|
msg.attach(alt)
|
|
|
|
# Attach files
|
|
for file_path in attachments:
|
|
if not os.path.isfile(file_path):
|
|
logger.warning("Attachment file not found, skipping: %s", file_path)
|
|
continue
|
|
|
|
try:
|
|
with open(file_path, "rb") as f:
|
|
part = MIMEBase("application", "octet-stream")
|
|
part.set_payload(f.read())
|
|
encoders.encode_base64(part)
|
|
part.add_header(
|
|
"Content-Disposition",
|
|
"attachment",
|
|
filename=os.path.basename(file_path),
|
|
)
|
|
msg.attach(part)
|
|
except Exception as e:
|
|
logger.warning("Failed to attach file %s: %s", file_path, e)
|
|
else:
|
|
# No attachments: use the alternative part as the root message
|
|
msg = alt
|
|
msg["From"] = cfg["from_address"]
|
|
msg["To"] = ", ".join(to_list)
|
|
if cc_list:
|
|
msg["Cc"] = ", ".join(cc_list)
|
|
msg["Subject"] = subject
|
|
|
|
# Connect and send
|
|
try:
|
|
if cfg["use_tls"]:
|
|
server = smtplib.SMTP(cfg["smtp_host"], cfg["smtp_port"], timeout=30)
|
|
server.ehlo()
|
|
server.starttls()
|
|
server.ehlo()
|
|
else:
|
|
server = smtplib.SMTP(cfg["smtp_host"], cfg["smtp_port"], timeout=30)
|
|
server.ehlo()
|
|
|
|
server.login(cfg["smtp_user"], cfg["smtp_password"])
|
|
server.sendmail(
|
|
cfg["from_address"],
|
|
to_list + cc_list,
|
|
msg.as_string(),
|
|
)
|
|
server.quit()
|
|
logger.info(
|
|
"Email sent to %s (CC: %s) with subject: %s",
|
|
to_list,
|
|
cc_list or "None",
|
|
subject,
|
|
)
|
|
return True
|
|
|
|
except Exception as e:
|
|
logger.error("Failed to send email: %s", e)
|
|
raise EmailError(f"Failed to send email: {e}")
|
|
|
|
|
|
# ------------ DOCX helpers ------------
|
|
|
|
# Namespaces
|
|
W_NS = "http://schemas.openxmlformats.org/wordprocessingml/2006/main"
|
|
|
|
|
|
def _set_element_attr(elem, attr, value):
|
|
elem.set(f"{{{W_NS}}}{attr}", str(value))
|
|
|
|
|
|
def _create_transcript_section_properties(section):
|
|
"""
|
|
Configure the section properties for transcript DOCX:
|
|
- Margins: 1 inch all sides
|
|
- Single column layout
|
|
- No built-in line numbering (we embed line numbers as text for portability)
|
|
- Remove document grid to avoid off-by-one line numbering
|
|
"""
|
|
sectPr = section._sectPr
|
|
|
|
# Margins: 1 inch = 1440 twips
|
|
pgMar = sectPr.find(f"{{{W_NS}}}pgMar")
|
|
if pgMar is None:
|
|
pgMar = OxmlElement("w:pgMar")
|
|
sectPr.append(pgMar)
|
|
_set_element_attr(pgMar, "top", "1440")
|
|
_set_element_attr(pgMar, "right", "1440")
|
|
_set_element_attr(pgMar, "bottom", "1440")
|
|
_set_element_attr(pgMar, "left", "1440")
|
|
_set_element_attr(pgMar, "header", "720")
|
|
_set_element_attr(pgMar, "footer", "720")
|
|
_set_element_attr(pgMar, "gutter", "0")
|
|
|
|
# Ensure single column (no multi-column layout)
|
|
cols = sectPr.find(f"{{{W_NS}}}cols")
|
|
if cols is not None:
|
|
_set_element_attr(cols, "num", "1")
|
|
_set_element_attr(cols, "space", "720")
|
|
|
|
# Remove document grid entirely
|
|
for docGrid in sectPr.findall(f"{{{W_NS}}}docGrid"):
|
|
sectPr.remove(docGrid)
|
|
|
|
# Remove any built-in line numbering; we will use text-based line numbers
|
|
for lnNumType in sectPr.findall(f"{{{W_NS}}}lnNumType"):
|
|
sectPr.remove(lnNumType)
|
|
|
|
|
|
def _add_transcript_paragraph(doc, line_text, line_number):
|
|
"""
|
|
Add a single transcript line as a paragraph with an embedded line number.
|
|
Uses a left tab stop so the line number appears in the left margin area,
|
|
independent of built-in line numbering, ensuring consistent behavior
|
|
across Word, LibreOffice, Google Docs, etc.
|
|
"""
|
|
line_text = line_text.strip()
|
|
if not line_text:
|
|
return
|
|
|
|
p = doc.add_paragraph()
|
|
|
|
# Set up paragraph formatting:
|
|
# - No left indent; we control spacing via tab stop
|
|
# - Single line spacing, no extra before/after
|
|
pPr = p._p.get_or_add_pPr()
|
|
|
|
# Remove any default indent
|
|
pPr.find(f"{{{W_NS}}}ind") and pPr.remove(pPr.find(f"{{{W_NS}}}ind"))
|
|
|
|
# Define a left tab stop for line numbers (e.g. 360 twips ≈ 0.25")
|
|
tabs = OxmlElement("w:tabs")
|
|
tab = OxmlElement("w:tab")
|
|
tab.set("{http://schemas.openxmlformats.org/wordprocessingml/2006/main}val", "left")
|
|
tab.set("{http://schemas.openxmlformats.org/wordprocessingml/2006/main}pos", "360")
|
|
tabs.append(tab)
|
|
pPr.append(tabs)
|
|
|
|
spacing = OxmlElement("w:spacing")
|
|
_set_element_attr(spacing, "before", "0")
|
|
_set_element_attr(spacing, "after", "0")
|
|
_set_element_attr(spacing, "line", "240") # 12pt in twips
|
|
_set_element_attr(spacing, "lineRule", "auto")
|
|
pPr.append(spacing)
|
|
|
|
# Try to match: [00:00] SPEAKER 1: content
|
|
m = re.match(r"\[(\d+:\d+(?::\d+)?)\]\s*(.+?):\s*(.*)", line_text)
|
|
|
|
# Line number run (no underline)
|
|
run_ln = p.add_run(str(line_number))
|
|
run_ln.font.name = "Courier"
|
|
run_ln.font.size = Pt(12)
|
|
run_ln.underline = False
|
|
|
|
# Tab between line number and content
|
|
run_tab = p.add_run("\t")
|
|
run_tab.font.name = "Courier"
|
|
run_tab.font.size = Pt(12)
|
|
run_tab.underline = False
|
|
|
|
if m:
|
|
ts, speaker, content = m.groups()
|
|
label_text = f"[{ts}] {speaker.upper()}:"
|
|
|
|
# Label run (underline)
|
|
run_label = p.add_run(label_text)
|
|
run_label.underline = True
|
|
run_label.font.name = "Courier"
|
|
run_label.font.size = Pt(12)
|
|
|
|
# Space run (no underline)
|
|
run_space = p.add_run(" ")
|
|
run_space.underline = False
|
|
run_space.font.name = "Courier"
|
|
run_space.font.size = Pt(12)
|
|
|
|
# Content run (no underline)
|
|
run_txt = p.add_run(content.strip())
|
|
run_txt.underline = False
|
|
run_txt.font.name = "Courier"
|
|
run_txt.font.size = Pt(12)
|
|
else:
|
|
# Non-standard line: plain text
|
|
run = p.add_run(line_text)
|
|
run.underline = False
|
|
run.font.name = "Courier"
|
|
run.font.size = Pt(12)
|
|
|
|
|
|
# ------------ Public DOCX functions ------------
|
|
|
|
def create_transcript_docx(text: str, filename: str):
|
|
"""
|
|
Create a transcript DOCX with:
|
|
- 1" margins on all sides
|
|
- 12pt Courier font
|
|
- Embedded line numbers starting at 1 on the first page
|
|
(portable across Word, LibreOffice, Google Docs)
|
|
- Line numbers reflect visual lines on the page, not speaker turns.
|
|
- Proper formatting for timestamps and speaker labels
|
|
"""
|
|
doc = Document()
|
|
|
|
# Set base font (Normal style)
|
|
style = doc.styles["Normal"]
|
|
style.font.name = "Courier"
|
|
style.font.size = Pt(12)
|
|
|
|
# Remove any default paragraphs (ensure no phantom first line)
|
|
body = doc.element.body
|
|
for p in list(body.findall(f"{{{W_NS}}}p")):
|
|
body.remove(p)
|
|
|
|
# Configure section properties (margins, no built-in line numbering)
|
|
_create_transcript_section_properties(doc.sections[0])
|
|
|
|
# Max characters per visual line (for 12pt Courier, 1" margins)
|
|
max_chars = 58
|
|
|
|
# Global line counter for visual lines
|
|
line_number = 0
|
|
|
|
# Split transcript into logical lines
|
|
logical_lines = text.strip().splitlines()
|
|
|
|
for line in logical_lines:
|
|
line = line.strip()
|
|
if not line:
|
|
continue
|
|
|
|
# Try to match: [00:00] SPEAKER 1: content
|
|
m = re.match(r"\[(\d+:\d+(?::\d+)?)\]\s*(.+?):\s*(.*)", line)
|
|
if m:
|
|
ts, speaker, content = m.groups()
|
|
label_text = f"[{ts}] {speaker.upper()}:"
|
|
content = content.strip()
|
|
else:
|
|
label_text = ""
|
|
content = line.strip()
|
|
|
|
# Split content into visual lines at word boundaries
|
|
content_lines = []
|
|
words = content.split()
|
|
current = ""
|
|
for w in words:
|
|
if len(current) == 0:
|
|
current = w
|
|
elif len(current) + 1 + len(w) <= max_chars:
|
|
current += " " + w
|
|
else:
|
|
content_lines.append(current)
|
|
current = w
|
|
if current:
|
|
content_lines.append(current)
|
|
|
|
# First visual line: include label if present
|
|
if content_lines:
|
|
first_line_text = (label_text + " " if label_text else "") + content_lines[0]
|
|
line_number += 1
|
|
_add_transcript_paragraph(doc, first_line_text, line_number=line_number)
|
|
# Remove remaining content lines' leading content (already done) from first line
|
|
content_lines = content_lines[1:]
|
|
|
|
# Subsequent visual lines: no label, just content
|
|
for cl in content_lines:
|
|
line_number += 1
|
|
_add_transcript_paragraph(doc, cl, line_number=line_number)
|
|
|
|
# Save
|
|
doc.save(filename)
|
|
|
|
|
|
def create_summary_docx(text: str, filename: str):
|
|
"""
|
|
Create a summary DOCX with:
|
|
- 1" margins on all sides
|
|
- 12pt Courier font
|
|
- No line numbering
|
|
"""
|
|
doc = Document()
|
|
|
|
# Base font
|
|
style = doc.styles["Normal"]
|
|
style.font.name = "Courier"
|
|
style.font.size = Pt(12)
|
|
|
|
# Margins: 1 inch all sides
|
|
for section in doc.sections:
|
|
section.left_margin = Inches(1.0)
|
|
section.right_margin = Inches(1.0)
|
|
section.top_margin = Inches(1.0)
|
|
section.bottom_margin = Inches(1.0)
|
|
|
|
# Remove default paragraph
|
|
body = doc.element.body
|
|
for p in list(body.findall(f"{{{W_NS}}}p")):
|
|
body.remove(p)
|
|
|
|
# Add summary content
|
|
lines = text.strip().splitlines()
|
|
for line in lines:
|
|
line = line.strip()
|
|
if not line:
|
|
continue
|
|
p = doc.add_paragraph(line)
|
|
p.paragraph_format.space_after = Pt(4)
|
|
|
|
doc.save(filename)
|