526 lines
15 KiB
Python
526 lines
15 KiB
Python
"""
|
|
Email sender module for ScrAIbe.
|
|
|
|
Sends transcription outputs (TXT, JSON, etc.) via SMTP.
|
|
All credentials are configured via environment variables.
|
|
Supports both plain text and HTML email bodies.
|
|
Template placeholders are primarily filled via environment variables.
|
|
"""
|
|
|
|
import base64
|
|
import os
|
|
import re
|
|
import smtplib
|
|
import logging
|
|
from email import encoders
|
|
from email.mime.base import MIMEBase
|
|
from email.mime.multipart import MIMEMultipart
|
|
from email.mime.text import MIMEText
|
|
from typing import List, Optional, Dict, Any
|
|
|
|
from docx import Document
|
|
from docx.shared import Inches, Pt
|
|
from docx.oxml.ns import qn
|
|
from docx.oxml import OxmlElement
|
|
from docx.enum.text import WD_ALIGN_PARAGRAPH
|
|
|
|
logger = logging.getLogger("scraibe.email_sender")
|
|
|
|
|
|
class EmailError(Exception):
|
|
pass
|
|
|
|
|
|
def get_email_config():
|
|
"""
|
|
Read email configuration from environment variables.
|
|
Raises EmailError if required fields are missing.
|
|
"""
|
|
smtp_host = os.getenv("EMAIL_SMTP_HOST")
|
|
smtp_port = os.getenv("EMAIL_SMTP_PORT")
|
|
smtp_user = os.getenv("EMAIL_SMTP_USER")
|
|
smtp_password = os.getenv("EMAIL_SMTP_PASSWORD")
|
|
from_address = os.getenv("EMAIL_FROM_ADDRESS")
|
|
use_tls_str = os.getenv("EMAIL_SMTP_USE_TLS", "true").strip().lower()
|
|
use_tls = use_tls_str not in ("false", "0", "no")
|
|
|
|
if not all([smtp_host, smtp_port, smtp_user, smtp_password, from_address]):
|
|
raise EmailError(
|
|
"Email configuration incomplete. "
|
|
"Ensure EMAIL_SMTP_HOST, EMAIL_SMTP_PORT, EMAIL_SMTP_USER, "
|
|
"EMAIL_SMTP_PASSWORD, and EMAIL_FROM_ADDRESS are set."
|
|
)
|
|
|
|
return {
|
|
"smtp_host": smtp_host,
|
|
"smtp_port": int(smtp_port),
|
|
"smtp_user": smtp_user,
|
|
"smtp_password": smtp_password,
|
|
"from_address": from_address,
|
|
"use_tls": use_tls,
|
|
}
|
|
|
|
|
|
def _load_css(path: str) -> str:
|
|
"""
|
|
Load CSS file content if it exists.
|
|
"""
|
|
if not path or not os.path.exists(path):
|
|
return ""
|
|
with open(path, "r", encoding="utf-8") as f:
|
|
return f.read()
|
|
|
|
|
|
def _email_logo_html() -> str:
|
|
"""
|
|
Return a subtle watermark-style logo for emails.
|
|
|
|
- Priority:
|
|
1) EMAIL_LOGO_URL (direct URL)
|
|
2) EMAIL_LOGO_PATH (local file as base64)
|
|
- Style: small, faint, bottom-right, non-intrusive.
|
|
"""
|
|
logo_url = os.getenv("EMAIL_LOGO_URL")
|
|
src = logo_url
|
|
|
|
if not logo_url:
|
|
logo_path = os.getenv("EMAIL_LOGO_PATH", "/app/src/misc/logo1.png")
|
|
if os.path.exists(logo_path):
|
|
try:
|
|
with open(logo_path, "rb") as f:
|
|
b64 = base64.b64encode(f.read()).decode("utf-8")
|
|
src = f"data:image/png;base64,{b64}"
|
|
except Exception:
|
|
src = None
|
|
|
|
if not src:
|
|
return ""
|
|
|
|
# Watermark: bottom-right, low opacity, compact
|
|
return (
|
|
f'<div style="text-align: right; margin-top: 24px; opacity: 0.15;">'
|
|
f'<img src="{src}" alt="Logo" style="max-width: 90px; height: auto; display: inline-block;" />'
|
|
f'</div>'
|
|
)
|
|
|
|
|
|
def _accent_color() -> str:
|
|
"""
|
|
Accent color for UI and emails.
|
|
Default: #7C6DA0
|
|
"""
|
|
return os.getenv("EMAIL_ACCENT_COLOR", "#7C6DA0")
|
|
|
|
|
|
def build_template_context(**runtime_kwargs: Any) -> Dict[str, Any]:
|
|
"""
|
|
Build a context dict for templates from:
|
|
- environment variables (base, customizable)
|
|
- runtime-provided values (override env if present)
|
|
|
|
Environment variables:
|
|
- EMAIL_CONTACT_ADDRESS: value for {contact_email}
|
|
- EMAIL_CSS_PATH: path to mail_style.css (optional; we inline it)
|
|
- EMAIL_LOGO_URL: URL for email logo (preferred)
|
|
- EMAIL_LOGO_PATH: fallback local path for email logo
|
|
- EMAIL_ACCENT_COLOR: accent color (default #7C6DA0)
|
|
"""
|
|
# Load and inline mail_style.css for consistent email styling
|
|
css_path = os.getenv("EMAIL_CSS_PATH", "/app/src/misc/mail_style.css")
|
|
css_text = _load_css(css_path)
|
|
|
|
# Build logo HTML (URL or local fallback)
|
|
logo_html = _email_logo_html()
|
|
|
|
# Accent color
|
|
accent = _accent_color()
|
|
|
|
ctx: Dict[str, Any] = {
|
|
"contact_email": os.getenv("EMAIL_CONTACT_ADDRESS", "support@example.com"),
|
|
"email_css": css_text,
|
|
"email_logo": logo_html,
|
|
"accent_color": accent,
|
|
}
|
|
|
|
# Runtime values override env if provided
|
|
if runtime_kwargs:
|
|
ctx.update(runtime_kwargs)
|
|
|
|
return ctx
|
|
|
|
|
|
def load_template(template_name: str, **runtime_kwargs: Any) -> str:
|
|
"""
|
|
Load an HTML email template from misc/ and render placeholders.
|
|
|
|
Expects files like:
|
|
/app/src/misc/upload_notification_template.html
|
|
/app/src/misc/success_template.html
|
|
/app/src/misc/error_notification_template.html
|
|
"""
|
|
base = os.getenv("SCRAIBE_TEMPLATES_DIR", "/app/src/misc")
|
|
path = os.path.join(base, template_name)
|
|
|
|
if not os.path.exists(path):
|
|
raise EmailError(f"Email template not found: {path}")
|
|
|
|
with open(path, "r", encoding="utf-8") as f:
|
|
template = f.read()
|
|
|
|
# Build context from env + runtime
|
|
ctx = build_template_context(**runtime_kwargs)
|
|
|
|
# Replace {placeholder} style variables safely
|
|
try:
|
|
return template.format(**ctx)
|
|
except KeyError as e:
|
|
raise EmailError(f"Missing template variable: {e}")
|
|
|
|
|
|
def send_email(
|
|
to: str,
|
|
subject: str,
|
|
body: str,
|
|
html: Optional[str],
|
|
attachments: List[str],
|
|
cc: Optional[str] = None,
|
|
) -> bool:
|
|
"""
|
|
Send an email with optional HTML body and file attachments.
|
|
|
|
Args:
|
|
to: Comma-separated list of recipient email addresses.
|
|
subject: Email subject.
|
|
body: Email body (plain text).
|
|
html: Email body (HTML), or None.
|
|
attachments: List of file paths to attach.
|
|
cc: Comma-separated list of CC email addresses (optional).
|
|
|
|
Returns:
|
|
True if sent successfully.
|
|
|
|
Raises:
|
|
EmailError if sending fails.
|
|
"""
|
|
try:
|
|
cfg = get_email_config()
|
|
except EmailError as e:
|
|
logger.error("Email configuration error: %s", e)
|
|
raise
|
|
|
|
# Parse recipients
|
|
to_list = [addr.strip() for addr in to.split(",") if addr.strip()]
|
|
cc_list = [addr.strip() for addr in cc.split(",") if addr.strip()] if cc else []
|
|
|
|
if not to_list:
|
|
raise EmailError("No valid 'To' email addresses provided.")
|
|
|
|
# Build message
|
|
msg = MIMEMultipart("alternative")
|
|
msg["From"] = cfg["from_address"]
|
|
msg["To"] = ", ".join(to_list)
|
|
if cc_list:
|
|
msg["Cc"] = ", ".join(cc_list)
|
|
msg["Subject"] = subject
|
|
|
|
# Attach plain text
|
|
msg.attach(MIMEText(body, "plain"))
|
|
|
|
# Attach HTML if provided
|
|
if html:
|
|
msg.attach(MIMEText(html, "html"))
|
|
|
|
# Attach files in a separate multipart/mixed part
|
|
if attachments:
|
|
mixed = MIMEMultipart("mixed")
|
|
mixed.attach(msg)
|
|
msg = mixed
|
|
|
|
for file_path in attachments:
|
|
if not os.path.isfile(file_path):
|
|
logger.warning("Attachment file not found, skipping: %s", file_path)
|
|
continue
|
|
|
|
try:
|
|
with open(file_path, "rb") as f:
|
|
part = MIMEBase("application", "octet-stream")
|
|
part.set_payload(f.read())
|
|
encoders.encode_base64(part)
|
|
part.add_header(
|
|
"Content-Disposition",
|
|
"attachment",
|
|
filename=os.path.basename(file_path),
|
|
)
|
|
msg.attach(part)
|
|
except Exception as e:
|
|
logger.warning("Failed to attach file %s: %s", file_path, e)
|
|
|
|
# Connect and send
|
|
try:
|
|
if cfg["use_tls"]:
|
|
server = smtplib.SMTP(cfg["smtp_host"], cfg["smtp_port"], timeout=30)
|
|
server.ehlo()
|
|
server.starttls()
|
|
server.ehlo()
|
|
else:
|
|
server = smtplib.SMTP(cfg["smtp_host"], cfg["smtp_port"], timeout=30)
|
|
server.ehlo()
|
|
|
|
server.login(cfg["smtp_user"], cfg["smtp_password"])
|
|
server.sendmail(
|
|
cfg["from_address"],
|
|
to_list + cc_list,
|
|
msg.as_string(),
|
|
)
|
|
server.quit()
|
|
logger.info(
|
|
"Email sent to %s (CC: %s)",
|
|
to_list,
|
|
cc_list or "None",
|
|
)
|
|
return True
|
|
|
|
except Exception as e:
|
|
logger.error("Failed to send email: %s", e)
|
|
raise EmailError(f"Failed to send email: {e}")
|
|
|
|
|
|
def _setup_docx_style(doc):
|
|
section = doc.sections[0]
|
|
section.left_margin = Inches(1.5)
|
|
section.right_margin = Inches(1.0)
|
|
section.top_margin = Inches(1.0)
|
|
section.bottom_margin = Inches(1.0)
|
|
|
|
sectPr = section._sectPr
|
|
lnNumType = sectPr.find(qn("w:lnNumType"))
|
|
if lnNumType is None:
|
|
lnNumType = OxmlElement("w:lnNumType")
|
|
sectPr.append(lnNumType)
|
|
lnNumType.set(qn("w:start"), "continuous")
|
|
lnNumType.set(qn("w:countBy"), "1")
|
|
|
|
style = doc.styles["Normal"]
|
|
font = style.font
|
|
font.name = "Courier"
|
|
font.size = Pt(12)
|
|
|
|
# Add "Page X of Y" footer, right-aligned
|
|
footer = section.footer
|
|
footer.is_linked_to_previous = False
|
|
p = footer.paragraphs[0]
|
|
p.alignment = WD_ALIGN_PARAGRAPH.RIGHT
|
|
run = p.add_run()
|
|
run.font.name = "Courier"
|
|
run.font.size = Pt(10)
|
|
|
|
# Field: PAGE
|
|
fldChar1 = OxmlElement("w:fldChar")
|
|
fldChar1.set(qn("w:fldCharType"), "begin")
|
|
run._r.addprevious(fldChar1)
|
|
|
|
instrText = OxmlElement("w:instrText")
|
|
instrText.set(qn("xml:space"), "preserve")
|
|
instrText.text = " PAGE "
|
|
run._r.addprevious(instrText)
|
|
|
|
fldChar2 = OxmlElement("w:fldChar")
|
|
fldChar2.set(qn("w:fldCharType"), "end")
|
|
run._r.addprevious(fldChar2)
|
|
|
|
# Static text: " of "
|
|
run_of = p.add_run(" of ")
|
|
run_of.font.name = "Courier"
|
|
run_of.font.size = Pt(10)
|
|
|
|
# Field: NUMPAGES
|
|
run2 = p.add_run()
|
|
run2.font.name = "Courier"
|
|
run2.font.size = Pt(10)
|
|
|
|
fldChar3 = OxmlElement("w:fldChar")
|
|
fldChar3.set(qn("w:fldCharType"), "begin")
|
|
run2._r.addprevious(fldChar3)
|
|
|
|
instrText2 = OxmlElement("w:instrText")
|
|
instrText2.set(qn("xml:space"), "preserve")
|
|
instrText2.text = " NUMPAGES "
|
|
run2._r.addprevious(instrText2)
|
|
|
|
fldChar4 = OxmlElement("w:fldChar")
|
|
fldChar4.set(qn("w:fldCharType"), "end")
|
|
run2._r.addprevious(fldChar4)
|
|
|
|
|
|
def _add_cover_page(doc, doc_type, date, description):
|
|
p_type = doc.add_paragraph()
|
|
run_type = p_type.add_run(doc_type)
|
|
run_type.bold = True
|
|
run_type.font.name = "Courier"
|
|
run_type.font.size = Pt(12)
|
|
|
|
p_date = doc.add_paragraph()
|
|
run_date = p_date.add_run(date)
|
|
run_date.font.name = "Courier"
|
|
run_date.font.size = Pt(12)
|
|
|
|
for _ in range(3):
|
|
doc.add_paragraph()
|
|
|
|
p_desc = doc.add_paragraph()
|
|
run_desc = p_desc.add_run(description)
|
|
run_desc.font.name = "Courier"
|
|
run_desc.font.size = Pt(12)
|
|
|
|
doc.add_page_break()
|
|
|
|
|
|
def _add_transcript_content(doc, text):
|
|
lines = text.strip().split("\n")
|
|
for line in lines:
|
|
line = line.strip()
|
|
if not line:
|
|
continue
|
|
|
|
m = re.match(r"\[(\d+:\d+(?::\d+)?)\]\s*(.+?):\s*(.*)", line)
|
|
if m:
|
|
ts, speaker, content = m.groups()
|
|
p = doc.add_paragraph()
|
|
p.paragraph_format.left_indent = Inches(0.25)
|
|
|
|
run_label = p.add_run(f"[{ts}] {speaker.upper()}:")
|
|
run_label.bold = False
|
|
run_label.underline = True
|
|
run_label.font.name = "Courier"
|
|
run_label.font.size = Pt(12)
|
|
|
|
run_space = p.add_run(" ")
|
|
run_space.bold = False
|
|
run_space.underline = False
|
|
run_space.font.name = "Courier"
|
|
run_space.font.size = Pt(12)
|
|
|
|
run_txt = p.add_run(content.strip())
|
|
run_txt.bold = False
|
|
run_txt.underline = False
|
|
run_txt.font.name = "Courier"
|
|
run_txt.font.size = Pt(12)
|
|
else:
|
|
p = doc.add_paragraph()
|
|
run = p.add_run(line)
|
|
run.font.name = "Courier"
|
|
run.font.size = Pt(12)
|
|
|
|
|
|
def _add_summary_content(doc, text):
|
|
heading_count = 0
|
|
for line in text.splitlines():
|
|
stripped = line.strip()
|
|
if not stripped:
|
|
continue
|
|
|
|
m = re.match(r"^(#{1,4})\s+(.*)", stripped)
|
|
if m:
|
|
heading_count += 1
|
|
content = m.group(2).strip()
|
|
|
|
p = doc.add_paragraph()
|
|
p.paragraph_format.space_after = Pt(4)
|
|
|
|
run = p.add_run(content)
|
|
run.font.name = "Courier"
|
|
run.font.size = Pt(12)
|
|
|
|
if heading_count == 1:
|
|
run.bold = True
|
|
elif heading_count == 2:
|
|
run.italic = True
|
|
elif heading_count == 3:
|
|
run.underline = True
|
|
elif heading_count >= 4:
|
|
run.italic = True
|
|
run.underline = True
|
|
else:
|
|
p = doc.add_paragraph(stripped)
|
|
p.paragraph_format.space_after = Pt(4)
|
|
|
|
|
|
def create_transcript_docx(
|
|
text: str,
|
|
filename: str,
|
|
include_cover: bool = False,
|
|
cover_date: str = "",
|
|
cover_desc: str = "",
|
|
):
|
|
"""
|
|
Create a .docx transcript with:
|
|
- 1.5" left margin, 1" right margin
|
|
- 12pt Courier
|
|
- Continuous line numbering on the left
|
|
- Optional cover page with type, date, and AI-generated description.
|
|
"""
|
|
doc = Document()
|
|
_setup_docx_style(doc)
|
|
|
|
if include_cover and cover_date and cover_desc:
|
|
_add_cover_page(doc, "TRANSCRIPT", cover_date, cover_desc)
|
|
|
|
_add_transcript_content(doc, text)
|
|
doc.save(filename)
|
|
|
|
|
|
def create_summary_docx(
|
|
text: str,
|
|
filename: str,
|
|
include_cover: bool = False,
|
|
cover_date: str = "",
|
|
cover_desc: str = "",
|
|
):
|
|
"""
|
|
Create a .docx summary with consistent font and heading styles.
|
|
Optional cover page with type, date, and AI-generated description.
|
|
"""
|
|
doc = Document()
|
|
_setup_docx_style(doc)
|
|
|
|
if include_cover and cover_date and cover_desc:
|
|
_add_cover_page(doc, "SUMMARY", cover_date, cover_desc)
|
|
|
|
_add_summary_content(doc, text)
|
|
doc.save(filename)
|
|
|
|
|
|
def create_combined_docx(
|
|
transcript_text: str,
|
|
summary_text: str,
|
|
filename: str,
|
|
transcript_cover_date: str,
|
|
transcript_cover_desc: str,
|
|
summary_cover_date: str,
|
|
summary_cover_desc: str,
|
|
):
|
|
"""
|
|
Create a combined .docx with:
|
|
1) Transcript cover page
|
|
2) Page break
|
|
3) Summary content
|
|
4) Page break
|
|
5) Transcript content
|
|
"""
|
|
doc = Document()
|
|
_setup_docx_style(doc)
|
|
|
|
# 1) Transcript cover page (includes trailing page break)
|
|
_add_cover_page(doc, "TRANSCRIPT", transcript_cover_date, transcript_cover_desc)
|
|
|
|
# 3) Summary content
|
|
_add_summary_content(doc, summary_text)
|
|
|
|
# 4) Page break before transcript
|
|
doc.add_page_break()
|
|
|
|
# 5) Transcript content
|
|
_add_transcript_content(doc, transcript_text)
|
|
|
|
doc.save(filename)
|