Files
scribe/scraibe/email_sender.py
T
admin 2f9299389b
Mirror and run GitLab CI / build (push) Has been cancelled
Ruff / ruff (push) Has been cancelled
Fix line numbering: only transcript pages; ensure page numbering fields are set correctly
2026-06-14 22:25:26 +00:00

611 lines
18 KiB
Python

"""
Email sender module for ScrAIbe.
Sends transcription outputs (TXT, JSON, etc.) via SMTP.
All credentials are configured via environment variables.
Supports both plain text and HTML email bodies.
Template placeholders are primarily filled via environment variables.
"""
import base64
import os
import re
import smtplib
import logging
from email import encoders
from email.mime.base import MIMEBase
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from typing import List, Optional, Dict, Any
from docx import Document
from docx.shared import Inches, Pt
from docx.oxml.ns import qn
from docx.oxml import OxmlElement
from docx.enum.text import WD_ALIGN_PARAGRAPH
logger = logging.getLogger("scraibe.email_sender")
class EmailError(Exception):
pass
def get_email_config():
"""
Read email configuration from environment variables.
Raises EmailError if required fields are missing.
"""
smtp_host = os.getenv("EMAIL_SMTP_HOST")
smtp_port = os.getenv("EMAIL_SMTP_PORT")
smtp_user = os.getenv("EMAIL_SMTP_USER")
smtp_password = os.getenv("EMAIL_SMTP_PASSWORD")
from_address = os.getenv("EMAIL_FROM_ADDRESS")
use_tls_str = os.getenv("EMAIL_SMTP_USE_TLS", "true").strip().lower()
use_tls = use_tls_str not in ("false", "0", "no")
if not all([smtp_host, smtp_port, smtp_user, smtp_password, from_address]):
raise EmailError(
"Email configuration incomplete. "
"Ensure EMAIL_SMTP_HOST, EMAIL_SMTP_PORT, EMAIL_SMTP_USER, "
"EMAIL_SMTP_PASSWORD, and EMAIL_FROM_ADDRESS are set."
)
return {
"smtp_host": smtp_host,
"smtp_port": int(smtp_port),
"smtp_user": smtp_user,
"smtp_password": smtp_password,
"from_address": from_address,
"use_tls": use_tls,
}
def _load_css(path: str) -> str:
"""
Load CSS file content if it exists.
"""
if not path or not os.path.exists(path):
return ""
with open(path, "r", encoding="utf-8") as f:
return f.read()
def _email_logo_html() -> str:
"""
Return a subtle watermark-style logo for emails.
- Priority:
1) EMAIL_LOGO_URL (direct URL)
2) EMAIL_LOGO_PATH (local file as base64)
- Style: small, faint, bottom-right, non-intrusive.
"""
logo_url = os.getenv("EMAIL_LOGO_URL")
src = logo_url
if not logo_url:
logo_path = os.getenv("EMAIL_LOGO_PATH", "/app/src/misc/logo1.png")
if os.path.exists(logo_path):
try:
with open(logo_path, "rb") as f:
b64 = base64.b64encode(f.read()).decode("utf-8")
src = f"data:image/png;base64,{b64}"
except Exception:
src = None
if not src:
return ""
# Watermark: bottom-right, low opacity, compact
return (
f'<div style="text-align: right; margin-top: 24px; opacity: 0.15;">'
f'<img src="{src}" alt="Logo" style="max-width: 90px; height: auto; display: inline-block;" />'
f'</div>'
)
def _accent_color() -> str:
"""
Accent color for UI and emails.
Default: #7C6DA0
"""
return os.getenv("EMAIL_ACCENT_COLOR", "#7C6DA0")
def build_template_context(**runtime_kwargs: Any) -> Dict[str, Any]:
"""
Build a context dict for templates from:
- environment variables (base, customizable)
- runtime-provided values (override env if present)
Environment variables:
- EMAIL_CONTACT_ADDRESS: value for {contact_email}
- EMAIL_CSS_PATH: path to mail_style.css (optional; we inline it)
- EMAIL_LOGO_URL: URL for email logo (preferred)
- EMAIL_LOGO_PATH: fallback local path for email logo
- EMAIL_ACCENT_COLOR: accent color (default #7C6DA0)
"""
# Load and inline mail_style.css for consistent email styling
css_path = os.getenv("EMAIL_CSS_PATH", "/app/src/misc/mail_style.css")
css_text = _load_css(css_path)
# Build logo HTML (URL or local fallback)
logo_html = _email_logo_html()
# Accent color
accent = _accent_color()
ctx: Dict[str, Any] = {
"contact_email": os.getenv("EMAIL_CONTACT_ADDRESS", "support@example.com"),
"email_css": css_text,
"email_logo": logo_html,
"accent_color": accent,
}
# Runtime values override env if provided
if runtime_kwargs:
ctx.update(runtime_kwargs)
return ctx
def load_template(template_name: str, **runtime_kwargs: Any) -> str:
"""
Load an HTML email template from misc/ and render placeholders.
Expects files like:
/app/src/misc/upload_notification_template.html
/app/src/misc/success_template.html
/app/src/misc/error_notification_template.html
"""
base = os.getenv("SCRAIBE_TEMPLATES_DIR", "/app/src/misc")
path = os.path.join(base, template_name)
if not os.path.exists(path):
raise EmailError(f"Email template not found: {path}")
with open(path, "r", encoding="utf-8") as f:
template = f.read()
# Build context from env + runtime
ctx = build_template_context(**runtime_kwargs)
# Replace {placeholder} style variables safely
try:
return template.format(**ctx)
except KeyError as e:
raise EmailError(f"Missing template variable: {e}")
def send_email(
to: str,
subject: str,
body: str,
html: Optional[str],
attachments: List[str],
cc: Optional[str] = None,
) -> bool:
"""
Send an email with optional HTML body and file attachments.
Args:
to: Comma-separated list of recipient email addresses.
subject: Email subject.
body: Email body (plain text).
html: Email body (HTML), or None.
attachments: List of file paths to attach.
cc: Comma-separated list of CC email addresses (optional).
Returns:
True if sent successfully.
Raises:
EmailError if sending fails.
"""
try:
cfg = get_email_config()
except EmailError as e:
logger.error("Email configuration error: %s", e)
raise
# Parse recipients
to_list = [addr.strip() for addr in to.split(",") if addr.strip()]
cc_list = [addr.strip() for addr in cc.split(",") if addr.strip()] if cc else []
if not to_list:
raise EmailError("No valid 'To' email addresses provided.")
# Build message
msg = MIMEMultipart("alternative")
msg["From"] = cfg["from_address"]
msg["To"] = ", ".join(to_list)
if cc_list:
msg["Cc"] = ", ".join(cc_list)
# Ensure subject is never blank
if not subject or not subject.strip():
logger.warning("Subject was blank; using default subject.")
subject = "ScrAIbe: Your transcript is ready"
msg["Subject"] = subject.strip()
# Attach plain text
msg.attach(MIMEText(body, "plain"))
# Attach HTML if provided
if html:
msg.attach(MIMEText(html, "html"))
# Attach files in a separate multipart/mixed part
if attachments:
mixed = MIMEMultipart("mixed")
mixed.attach(msg)
msg = mixed
for file_path in attachments:
if not os.path.isfile(file_path):
logger.warning("Attachment file not found, skipping: %s", file_path)
continue
try:
with open(file_path, "rb") as f:
part = MIMEBase("application", "octet-stream")
part.set_payload(f.read())
encoders.encode_base64(part)
part.add_header(
"Content-Disposition",
"attachment",
filename=os.path.basename(file_path),
)
msg.attach(part)
except Exception as e:
logger.warning("Failed to attach file %s: %s", file_path, e)
# Connect and send
try:
if cfg["use_tls"]:
server = smtplib.SMTP(cfg["smtp_host"], cfg["smtp_port"], timeout=30)
server.ehlo()
server.starttls()
server.ehlo()
else:
server = smtplib.SMTP(cfg["smtp_host"], cfg["smtp_port"], timeout=30)
server.ehlo()
server.login(cfg["smtp_user"], cfg["smtp_password"])
server.sendmail(
cfg["from_address"],
to_list + cc_list,
msg.as_string(),
)
server.quit()
logger.info(
"Email sent to %s (CC: %s)",
to_list,
cc_list or "None",
)
return True
except Exception as e:
logger.error("Failed to send email: %s", e)
raise EmailError(f"Failed to send email: {e}")
def _remove_line_numbering(section):
"""
Explicitly remove line numbering from a section.
"""
sectPr = section._sectPr
lnNumType = sectPr.find(qn("w:lnNumType"))
if lnNumType is not None:
sectPr.remove(lnNumType)
def _enable_line_numbering(section):
"""
Enable continuous line numbering for a section.
"""
sectPr = section._sectPr
lnNumType = sectPr.find(qn("w:lnNumType"))
if lnNumType is None:
lnNumType = OxmlElement("w:lnNumType")
sectPr.append(lnNumType)
lnNumType.set(qn("w:start"), "continuous")
lnNumType.set(qn("w:countBy"), "1")
def _setup_docx_style(doc, enable_line_numbering=False):
"""
Base document setup (margins, font, footer).
Line numbering is optional and applied to the first section only.
"""
section = doc.sections[0]
section.left_margin = Inches(1.5)
section.right_margin = Inches(1.0)
section.top_margin = Inches(1.0)
section.bottom_margin = Inches(1.0)
# Line numbering (only for transcript sections)
if enable_line_numbering:
_enable_line_numbering(section)
else:
_remove_line_numbering(section)
# Default font
style = doc.styles["Normal"]
font = style.font
font.name = "Courier"
font.size = Pt(12)
# Add "Page X of Y" footer, right-aligned
footer = section.footer
footer.is_linked_to_previous = False
p = footer.paragraphs[0]
p.alignment = WD_ALIGN_PARAGRAPH.RIGHT
# PAGE field
run_page = p.add_run()
run_page.font.name = "Courier"
run_page.font.size = Pt(10)
fldCharBegin = OxmlElement("w:fldChar")
fldCharBegin.set(qn("w:fldCharType"), "begin")
run_page._r.addprevious(fldCharBegin)
instrTextPage = OxmlElement("w:instrText")
instrTextPage.set(qn("xml:space"), "preserve")
instrTextPage.text = "PAGE"
run_page._r.addprevious(instrTextPage)
fldCharEnd = OxmlElement("w:fldChar")
fldCharEnd.set(qn("w:fldCharType"), "end")
run_page._r.addprevious(fldCharEnd)
# " of " text
run_of = p.add_run(" of ")
run_of.font.name = "Courier"
run_of.font.size = Pt(10)
# NUMPAGES field
run_numpages = p.add_run()
run_numpages.font.name = "Courier"
run_numpages.font.size = Pt(10)
fldCharBegin2 = OxmlElement("w:fldChar")
fldCharBegin2.set(qn("w:fldCharType"), "begin")
run_numpages._r.addprevious(fldCharBegin2)
instrTextNumpages = OxmlElement("w:instrText")
instrTextNumpages.set(qn("xml:space"), "preserve")
instrTextNumpages.text = "NUMPAGES"
run_numpages._r.addprevious(instrTextNumpages)
fldCharEnd2 = OxmlElement("w:fldChar")
fldCharEnd2.set(qn("w:fldCharType"), "end")
run_numpages._r.addprevious(fldCharEnd2)
def _add_cover_page(doc, doc_type, date, description):
"""
Add a cover page:
- Centered horizontally and vertically using a full-page table.
- Lines:
1) Document type
2) Date (e.g. "June 14, 2026")
3-5) Empty space
6) One-sentence description
- Then page break.
"""
# Create a full-page table to center content vertically and horizontally
table = doc.add_table(rows=1, cols=1)
table.autofit = False
cell = table.cell(0, 0)
# Make table span full page height (approx)
cell.width = Inches(6.5)
# Center content inside the cell
for paragraph in cell.paragraphs:
paragraph.alignment = WD_ALIGN_PARAGRAPH.CENTER
# Clear default paragraph
cell.paragraphs[0].clear()
# Line 1: Document type
p_type = cell.add_paragraph()
p_type.alignment = WD_ALIGN_PARAGRAPH.CENTER
run_type = p_type.add_run(doc_type)
run_type.bold = True
run_type.font.name = "Courier"
run_type.font.size = Pt(12)
# Line 2: Date
p_date = cell.add_paragraph()
p_date.alignment = WD_ALIGN_PARAGRAPH.CENTER
run_date = p_date.add_run(date)
run_date.font.name = "Courier"
run_date.font.size = Pt(12)
# Lines 3-5: blank space
for _ in range(3):
cell.add_paragraph()
# Line 6: Description
p_desc = cell.add_paragraph()
p_desc.alignment = WD_ALIGN_PARAGRAPH.CENTER
run_desc = p_desc.add_run(description)
run_desc.font.name = "Courier"
run_desc.font.size = Pt(12)
# Page break after cover page
doc.add_page_break()
def _add_transcript_content(doc, text):
lines = text.strip().split("\n")
for line in lines:
line = line.strip()
if not line:
continue
m = re.match(r"\[(\d+:\d+(?::\d+)?)\]\s*(.+?):\s*(.*)", line)
if m:
ts, speaker, content = m.groups()
p = doc.add_paragraph()
p.paragraph_format.left_indent = Inches(0.25)
run_label = p.add_run(f"[{ts}] {speaker.upper()}:")
run_label.bold = False
run_label.underline = True
run_label.font.name = "Courier"
run_label.font.size = Pt(12)
run_space = p.add_run(" ")
run_space.bold = False
run_space.underline = False
run_space.font.name = "Courier"
run_space.font.size = Pt(12)
run_txt = p.add_run(content.strip())
run_txt.bold = False
run_txt.underline = False
run_txt.font.name = "Courier"
run_txt.font.size = Pt(12)
else:
p = doc.add_paragraph()
run = p.add_run(line)
run.font.name = "Courier"
run.font.size = Pt(12)
def _add_summary_content(doc, text):
heading_count = 0
for line in text.splitlines():
stripped = line.strip()
if not stripped:
continue
# Detect markdown-style headings: #, ##, ###, #### at start of line
m = re.match(r"^(#{1,4})\s+(.*)", stripped)
if m:
heading_count += 1
content = m.group(2).strip()
p = doc.add_paragraph()
p.paragraph_format.space_after = Pt(4)
run = p.add_run(content)
run.font.name = "Courier"
run.font.size = Pt(12)
# Apply formatting based on this heading's ordinal position
if heading_count == 1:
run.bold = True
elif heading_count == 2:
run.italic = True
elif heading_count == 3:
run.underline = True
elif heading_count >= 4:
run.italic = True
run.underline = True
else:
# Normal text line
p = doc.add_paragraph(stripped)
p.paragraph_format.space_after = Pt(4)
def create_transcript_docx(
text: str,
filename: str,
include_cover: bool = False,
cover_date: str = "",
cover_desc: str = "",
):
"""
Create a .docx transcript with:
- 1.5" left margin, 1" right margin
- 12pt Courier
- Continuous line numbering on the left (for transcript content only)
- Optional cover page with type, date, and AI-generated description.
"""
doc = Document()
# Enable line numbering for transcript
_setup_docx_style(doc, enable_line_numbering=True)
if include_cover and cover_date and cover_desc:
_add_cover_page(doc, "TRANSCRIPT", cover_date, cover_desc)
_add_transcript_content(doc, text)
doc.save(filename)
def create_summary_docx(
text: str,
filename: str,
include_cover: bool = False,
cover_date: str = "",
cover_desc: str = "",
):
"""
Create a .docx summary with consistent font and heading styles.
Optional cover page with type, date, and AI-generated description.
No line numbering.
"""
doc = Document()
# No line numbering for summary
_setup_docx_style(doc, enable_line_numbering=False)
if include_cover and cover_date and cover_desc:
_add_cover_page(doc, "SUMMARY", cover_date, cover_desc)
_add_summary_content(doc, text)
doc.save(filename)
def create_combined_docx(
transcript_text: str,
summary_text: str,
filename: str,
transcript_cover_date: str,
transcript_cover_desc: str,
summary_cover_date: str,
summary_cover_desc: str,
):
"""
Create a combined .docx with:
1) Transcript cover page (no line numbering)
2) Page break
3) Summary content (no line numbering)
4) Page break
5) Transcript content (line numbering enabled)
"""
doc = Document()
# Start with no line numbering (for cover and summary)
_setup_docx_style(doc, enable_line_numbering=False)
# 1) Transcript cover page (includes trailing page break)
_add_cover_page(doc, "TRANSCRIPT", transcript_cover_date, transcript_cover_desc)
# 3) Summary content (no line numbering)
_add_summary_content(doc, summary_text)
# 4) Page break before transcript
doc.add_page_break()
# Enable line numbering for transcript section
# We create a new section for transcript so line numbering applies only there
section_transcript = doc.add_section()
# Apply same margins
section_transcript.left_margin = Inches(1.5)
section_transcript.right_margin = Inches(1.0)
section_transcript.top_margin = Inches(1.0)
section_transcript.bottom_margin = Inches(1.0)
# Enable line numbering in transcript section
_enable_line_numbering(section_transcript)
# 5) Transcript content (with line numbering)
_add_transcript_content(doc, transcript_text)
doc.save(filename)