46fbcf80af
- Use EMAIL_SUBJECT_SUCCESS env var for success emails - Use EMAIL_SUBJECT_ERROR env var for error emails - Provide safe defaults if env vars are missing or blank - Add final guard in send_email() to prevent blank subjects
913 lines
27 KiB
Python
913 lines
27 KiB
Python
"""
|
|
Email sender module for ScrAIbe.
|
|
|
|
Sends transcription outputs (TXT, JSON, etc.) via SMTP.
|
|
All credentials are configured via environment variables.
|
|
Supports both plain text and HTML email bodies.
|
|
Template placeholders are primarily filled via environment variables.
|
|
"""
|
|
|
|
import base64
|
|
import json
|
|
import logging
|
|
import os
|
|
import re
|
|
import smtplib
|
|
import subprocess
|
|
import tempfile
|
|
from email import encoders
|
|
from email.mime.base import MIMEBase
|
|
from email.mime.multipart import MIMEMultipart
|
|
from email.mime.text import MIMEText
|
|
from typing import Any, Dict, List, Optional
|
|
|
|
from docx import Document
|
|
from docx.oxml import OxmlElement
|
|
from docx.oxml.ns import qn
|
|
from docx.shared import Inches, Pt
|
|
from docx.enum.text import WD_ALIGN_PARAGRAPH
|
|
|
|
logger = logging.getLogger("scraibe.email_sender")
|
|
|
|
|
|
class EmailError(Exception):
|
|
pass
|
|
|
|
|
|
def get_email_config():
|
|
"""
|
|
Read email configuration from environment variables.
|
|
Raises EmailError if required fields are missing.
|
|
"""
|
|
smtp_host = os.getenv("EMAIL_SMTP_HOST")
|
|
smtp_port = os.getenv("EMAIL_SMTP_PORT")
|
|
smtp_user = os.getenv("EMAIL_SMTP_USER")
|
|
smtp_password = os.getenv("EMAIL_SMTP_PASSWORD")
|
|
from_address = os.getenv("EMAIL_FROM_ADDRESS")
|
|
use_tls_str = os.getenv("EMAIL_SMTP_USE_TLS", "true").strip().lower()
|
|
use_tls = use_tls_str not in ("false", "0", "no")
|
|
|
|
if not all([smtp_host, smtp_port, smtp_user, smtp_password, from_address]):
|
|
raise EmailError(
|
|
"Email configuration incomplete. "
|
|
"Ensure EMAIL_SMTP_HOST, EMAIL_SMTP_PORT, EMAIL_SMTP_USER, "
|
|
"EMAIL_SMTP_PASSWORD, and EMAIL_FROM_ADDRESS are set."
|
|
)
|
|
|
|
return {
|
|
"smtp_host": smtp_host,
|
|
"smtp_port": int(smtp_port),
|
|
"smtp_user": smtp_user,
|
|
"smtp_password": smtp_password,
|
|
"from_address": from_address,
|
|
"use_tls": use_tls,
|
|
}
|
|
|
|
|
|
def _load_css(path: str) -> str:
|
|
"""
|
|
Load CSS file content if it exists.
|
|
"""
|
|
if not path or not os.path.exists(path):
|
|
return ""
|
|
with open(path, "r", encoding="utf-8") as f:
|
|
return f.read()
|
|
|
|
|
|
def _email_logo_html() -> str:
|
|
"""
|
|
Return a subtle watermark-style logo for emails.
|
|
|
|
- Priority:
|
|
1) EMAIL_LOGO_URL (direct URL)
|
|
2) EMAIL_LOGO_PATH (local file as base64)
|
|
- Style: small, faint, bottom-right, non-intrusive.
|
|
"""
|
|
logo_url = os.getenv("EMAIL_LOGO_URL")
|
|
src = logo_url
|
|
|
|
if not logo_url:
|
|
logo_path = os.getenv("EMAIL_LOGO_PATH", "/app/src/misc/logo1.png")
|
|
if os.path.exists(logo_path):
|
|
try:
|
|
with open(logo_path, "rb") as f:
|
|
b64 = base64.b64encode(f.read()).decode("utf-8")
|
|
src = f"data:image/png;base64,{b64}"
|
|
except Exception:
|
|
src = None
|
|
|
|
if not src:
|
|
return ""
|
|
|
|
# Watermark: bottom-right, low opacity, compact
|
|
return (
|
|
f'<div style="text-align: right; margin-top: 24px; opacity: 0.15;">'
|
|
f'<img src="{src}" alt="Logo" style="max-width: 90px; height: auto; display: inline-block;" />'
|
|
f'</div>'
|
|
)
|
|
|
|
|
|
def _accent_color() -> str:
|
|
"""
|
|
Accent color for UI and emails.
|
|
Default: #7C6DA0
|
|
"""
|
|
return os.getenv("EMAIL_ACCENT_COLOR", "#7C6DA0")
|
|
|
|
|
|
def build_template_context(**runtime_kwargs: Any) -> Dict[str, Any]:
|
|
"""
|
|
Build a context dict for templates from:
|
|
- environment variables (base, customizable)
|
|
- runtime-provided values (override env if present)
|
|
|
|
Environment variables:
|
|
- EMAIL_CONTACT_ADDRESS: value for {contact_email}
|
|
- EMAIL_CSS_PATH: path to mail_style.css (optional; we inline it)
|
|
- EMAIL_LOGO_URL: URL for email logo (preferred)
|
|
- EMAIL_LOGO_PATH: fallback local path for email logo
|
|
- EMAIL_ACCENT_COLOR: accent color (default #7C6DA0)
|
|
"""
|
|
# Load and inline mail_style.css for consistent email styling
|
|
css_path = os.getenv("EMAIL_CSS_PATH", "/app/src/misc/mail_style.css")
|
|
css_text = _load_css(css_path)
|
|
|
|
# Build logo HTML (URL or local fallback)
|
|
logo_html = _email_logo_html()
|
|
|
|
# Accent color
|
|
accent = _accent_color()
|
|
|
|
ctx: Dict[str, Any] = {
|
|
"contact_email": os.getenv("EMAIL_CONTACT_ADDRESS", "support@example.com"),
|
|
"email_css": css_text,
|
|
"email_logo": logo_html,
|
|
"accent_color": accent,
|
|
}
|
|
|
|
# Runtime values override env if provided
|
|
if runtime_kwargs:
|
|
ctx.update(runtime_kwargs)
|
|
|
|
return ctx
|
|
|
|
|
|
def load_template(template_name: str, **runtime_kwargs: Any) -> str:
|
|
"""
|
|
Load an HTML email template from misc/ and render placeholders.
|
|
|
|
Expects files like:
|
|
/app/src/misc/upload_notification_template.html
|
|
/app/src/misc/success_template.html
|
|
/app/src/misc/error_notification_template.html
|
|
"""
|
|
base = os.getenv("SCRAIBE_TEMPLATES_DIR", "/app/src/misc")
|
|
path = os.path.join(base, template_name)
|
|
|
|
if not os.path.exists(path):
|
|
raise EmailError(f"Email template not found: {path}")
|
|
|
|
with open(path, "r", encoding="utf-8") as f:
|
|
template = f.read()
|
|
|
|
# Build context from env + runtime
|
|
ctx = build_template_context(**runtime_kwargs)
|
|
|
|
# Replace {placeholder} style variables safely
|
|
try:
|
|
return template.format(**ctx)
|
|
except KeyError as e:
|
|
raise EmailError(f"Missing template variable: {e}")
|
|
|
|
|
|
def send_email(
|
|
to: str,
|
|
subject: str,
|
|
body: str,
|
|
html: Optional[str],
|
|
attachments: List[str],
|
|
cc: Optional[str] = None,
|
|
) -> bool:
|
|
"""
|
|
Send an email with optional HTML body and file attachments.
|
|
|
|
Args:
|
|
to: Comma-separated list of recipient email addresses.
|
|
subject: Email subject.
|
|
body: Email body (plain text).
|
|
html: Email body (HTML), or None.
|
|
attachments: List of file paths to attach.
|
|
cc: Comma-separated list of CC email addresses (optional).
|
|
|
|
Returns:
|
|
True if sent successfully.
|
|
|
|
Raises:
|
|
EmailError if sending fails.
|
|
"""
|
|
try:
|
|
cfg = get_email_config()
|
|
except EmailError as e:
|
|
logger.error("Email configuration error: %s", e)
|
|
raise
|
|
|
|
# Parse recipients
|
|
to_list = [addr.strip() for addr in to.split(",") if addr.strip()]
|
|
cc_list = [addr.strip() for addr in cc.split(",") if addr.strip()] if cc else []
|
|
|
|
if not to_list:
|
|
raise EmailError("No valid 'To' email addresses provided.")
|
|
|
|
# Ensure subject is never blank
|
|
if not subject or not subject.strip():
|
|
logger.warning("Subject was blank or missing; using default subject.")
|
|
subject = "ScrAIbe: Your transcript is ready"
|
|
|
|
subject = subject.strip()
|
|
|
|
# Build message
|
|
msg = MIMEMultipart("alternative")
|
|
msg["From"] = cfg["from_address"]
|
|
msg["To"] = ", ".join(to_list)
|
|
if cc_list:
|
|
msg["Cc"] = ", ".join(cc_list)
|
|
msg["Subject"] = subject
|
|
|
|
# Attach plain text
|
|
msg.attach(MIMEText(body, "plain"))
|
|
|
|
# Attach HTML if provided
|
|
if html:
|
|
msg.attach(MIMEText(html, "html"))
|
|
|
|
# Attach files in a separate multipart/mixed part
|
|
if attachments:
|
|
mixed = MIMEMultipart("mixed")
|
|
mixed.attach(msg)
|
|
msg = mixed
|
|
|
|
for file_path in attachments:
|
|
if not os.path.isfile(file_path):
|
|
logger.warning("Attachment file not found, skipping: %s", file_path)
|
|
continue
|
|
|
|
try:
|
|
with open(file_path, "rb") as f:
|
|
part = MIMEBase("application", "octet-stream")
|
|
part.set_payload(f.read())
|
|
encoders.encode_base64(part)
|
|
part.add_header(
|
|
"Content-Disposition",
|
|
"attachment",
|
|
filename=os.path.basename(file_path),
|
|
)
|
|
msg.attach(part)
|
|
except Exception as e:
|
|
logger.warning("Failed to attach file %s: %s", file_path, e)
|
|
|
|
# Connect and send
|
|
try:
|
|
if cfg["use_tls"]:
|
|
server = smtplib.SMTP(cfg["smtp_host"], cfg["smtp_port"], timeout=30)
|
|
server.ehlo()
|
|
server.starttls()
|
|
server.ehlo()
|
|
else:
|
|
server = smtplib.SMTP(cfg["smtp_host"], cfg["smtp_port"], timeout=30)
|
|
server.ehlo()
|
|
|
|
server.login(cfg["smtp_user"], cfg["smtp_password"])
|
|
server.sendmail(
|
|
cfg["from_address"],
|
|
to_list + cc_list,
|
|
msg.as_string(),
|
|
)
|
|
server.quit()
|
|
logger.info(
|
|
"Email sent to %s (CC: %s) with subject: %s",
|
|
to_list,
|
|
cc_list or "None",
|
|
subject,
|
|
)
|
|
return True
|
|
|
|
except Exception as e:
|
|
logger.error("Failed to send email: %s", e)
|
|
raise EmailError(f"Failed to send email: {e}")
|
|
|
|
|
|
# ------------ DOCX helpers ------------
|
|
|
|
def _configure_base_font(doc):
|
|
"""
|
|
Set base document font to 12pt Courier.
|
|
"""
|
|
style = doc.styles["Normal"]
|
|
style.font.name = "Courier"
|
|
style.font.size = Pt(12)
|
|
|
|
|
|
def _configure_section_margins(doc, margin=1.0):
|
|
"""
|
|
Set uniform margins on all sides (default 1 inch).
|
|
"""
|
|
for section in doc.sections:
|
|
section.left_margin = Inches(margin)
|
|
section.right_margin = Inches(margin)
|
|
section.top_margin = Inches(margin)
|
|
section.bottom_margin = Inches(margin)
|
|
|
|
|
|
def _enable_line_numbering(section, start_at=1, count_by=1, restart=True):
|
|
"""
|
|
Enable line numbering for a specific section.
|
|
"""
|
|
sectPr = section._sectPr
|
|
|
|
# Create the line numbering element: <w:lnNumType>
|
|
lnNumType = OxmlElement("w:lnNumType")
|
|
lnNumType.set(qn("w:start"), str(start_at))
|
|
lnNumType.set(qn("w:countBy"), str(count_by))
|
|
|
|
# 'eachPage' restarts numbering on every page; 'continuous' keeps it going
|
|
restart_mode = "eachPage" if restart else "continuous"
|
|
lnNumType.set(qn("w:restart"), restart_mode)
|
|
|
|
sectPr.append(lnNumType)
|
|
|
|
|
|
def _add_cover_page(doc, doc_type, date, description):
|
|
"""
|
|
Add a cover page:
|
|
- Centered horizontally and vertically using a full-page table.
|
|
- Lines:
|
|
1) Document type
|
|
2) Date (e.g. "June 14, 2026")
|
|
3-5) Empty space
|
|
6) One-sentence description
|
|
"""
|
|
# Create a full-page table to center content vertically and horizontally
|
|
table = doc.add_table(rows=1, cols=1)
|
|
table.autofit = False
|
|
cell = table.cell(0, 0)
|
|
|
|
# Make table span full page width (approx)
|
|
cell.width = Inches(6.5)
|
|
|
|
# Center content inside the cell
|
|
for paragraph in cell.paragraphs:
|
|
paragraph.alignment = WD_ALIGN_PARAGRAPH.CENTER
|
|
|
|
# Clear default paragraph
|
|
cell.paragraphs[0].clear()
|
|
|
|
# Line 1: Document type
|
|
p_type = cell.add_paragraph()
|
|
p_type.alignment = WD_ALIGN_PARAGRAPH.CENTER
|
|
run_type = p_type.add_run(doc_type)
|
|
run_type.bold = True
|
|
run_type.font.name = "Courier"
|
|
run_type.font.size = Pt(12)
|
|
|
|
# Line 2: Date
|
|
p_date = cell.add_paragraph()
|
|
p_date.alignment = WD_ALIGN_PARAGRAPH.CENTER
|
|
run_date = p_date.add_run(date)
|
|
run_date.font.name = "Courier"
|
|
run_date.font.size = Pt(12)
|
|
|
|
# Lines 3-5: blank space
|
|
for _ in range(3):
|
|
cell.add_paragraph()
|
|
|
|
# Line 6: Description
|
|
p_desc = cell.add_paragraph()
|
|
p_desc.alignment = WD_ALIGN_PARAGRAPH.CENTER
|
|
run_desc = p_desc.add_run(description)
|
|
run_desc.font.name = "Courier"
|
|
run_desc.font.size = Pt(12)
|
|
|
|
|
|
def _add_transcript_content(doc, text):
|
|
"""
|
|
Add transcript lines to the document with formatting.
|
|
"""
|
|
lines = text.strip().split("\n")
|
|
for line in lines:
|
|
line = line.strip()
|
|
if not line:
|
|
continue
|
|
|
|
m = re.match(r"\[(\d+:\d+(?::\d+)?)\]\s*(.+?):\s*(.*)", line)
|
|
if m:
|
|
ts, speaker, content = m.groups()
|
|
p = doc.add_paragraph()
|
|
p.paragraph_format.left_indent = Inches(0.25)
|
|
|
|
run_label = p.add_run(f"[{ts}] {speaker.upper()}:")
|
|
run_label.bold = False
|
|
run_label.underline = True
|
|
run_label.font.name = "Courier"
|
|
run_label.font.size = Pt(12)
|
|
|
|
run_space = p.add_run(" ")
|
|
run_space.bold = False
|
|
run_space.underline = False
|
|
run_space.font.name = "Courier"
|
|
run_space.font.size = Pt(12)
|
|
|
|
run_txt = p.add_run(content.strip())
|
|
run_txt.bold = False
|
|
run_txt.underline = False
|
|
run_txt.font.name = "Courier"
|
|
run_txt.font.size = Pt(12)
|
|
else:
|
|
p = doc.add_paragraph()
|
|
run = p.add_run(line)
|
|
run.font.name = "Courier"
|
|
run.font.size = Pt(12)
|
|
|
|
|
|
def _add_summary_content(doc, text):
|
|
"""
|
|
Add summary content with heading styles.
|
|
"""
|
|
heading_count = 0
|
|
for line in text.splitlines():
|
|
stripped = line.strip()
|
|
if not stripped:
|
|
continue
|
|
|
|
# Detect markdown-style headings: #, ##, ###, #### at start of line
|
|
m = re.match(r"^(#{1,4})\s+(.*)", stripped)
|
|
if m:
|
|
heading_count += 1
|
|
content = m.group(2).strip()
|
|
|
|
p = doc.add_paragraph()
|
|
p.paragraph_format.space_after = Pt(4)
|
|
|
|
run = p.add_run(content)
|
|
run.font.name = "Courier"
|
|
run.font.size = Pt(12)
|
|
|
|
# Apply formatting based on this heading's ordinal position
|
|
if heading_count == 1:
|
|
run.bold = True
|
|
elif heading_count == 2:
|
|
run.italic = True
|
|
elif heading_count == 3:
|
|
run.underline = True
|
|
elif heading_count >= 4:
|
|
run.italic = True
|
|
run.underline = True
|
|
else:
|
|
# Normal text line
|
|
p = doc.add_paragraph(stripped)
|
|
p.paragraph_format.space_after = Pt(4)
|
|
|
|
|
|
# ------------ PDF helpers ------------
|
|
|
|
def _docx_to_pdf(docx_path: str, output_dir: str) -> str:
|
|
"""
|
|
Convert a .docx file to PDF using LibreOffice.
|
|
Returns the path of the generated PDF.
|
|
"""
|
|
try:
|
|
# Use LibreOffice in headless mode
|
|
result = subprocess.run(
|
|
[
|
|
"libreoffice",
|
|
"--headless",
|
|
"--convert-to", "pdf",
|
|
"--outdir", output_dir,
|
|
docx_path,
|
|
],
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=60,
|
|
)
|
|
|
|
if result.returncode != 0:
|
|
logger.warning("LibreOffice conversion failed: %s", result.stderr)
|
|
|
|
# LibreOffice creates a PDF with the same base name
|
|
base = os.path.splitext(os.path.basename(docx_path))[0]
|
|
pdf_path = os.path.join(output_dir, f"{base}.pdf")
|
|
|
|
if os.path.exists(pdf_path):
|
|
return pdf_path
|
|
else:
|
|
logger.warning("Converted PDF not found at: %s", pdf_path)
|
|
return None
|
|
|
|
except Exception as e:
|
|
logger.error("Error converting DOCX to PDF: %s", e)
|
|
return None
|
|
|
|
|
|
def _merge_pdfs(input_pdfs: List[str], output_pdf: str) -> bool:
|
|
"""
|
|
Merge multiple PDF files into a single PDF using PyPDF2.
|
|
Returns True on success, False on failure.
|
|
"""
|
|
try:
|
|
from PyPDF2 import PdfMerger
|
|
|
|
merger = PdfMerger()
|
|
for pdf in input_pdfs:
|
|
if os.path.exists(pdf):
|
|
merger.append(pdf)
|
|
|
|
merger.write(output_pdf)
|
|
merger.close()
|
|
return True
|
|
|
|
except Exception as e:
|
|
logger.error("Error merging PDFs: %s", e)
|
|
return False
|
|
|
|
|
|
def _add_page_numbers_to_pdf(input_pdf: str, output_pdf: str) -> bool:
|
|
"""
|
|
Add page numbers to a PDF using reportlab.
|
|
Page numbers appear at the bottom-right of each page.
|
|
"""
|
|
try:
|
|
from reportlab.pdfgen import canvas
|
|
from reportlab.lib.pagesizes import letter
|
|
from reportlab.pdfbase import pdfmetrics
|
|
from reportlab.pdfbase.ttfonts import TTFont
|
|
from PyPDF2 import PdfReader
|
|
|
|
# Try to use a standard font
|
|
font_name = "Courier"
|
|
try:
|
|
pdfmetrics.registerFont(
|
|
TTFont("Courier", "/usr/share/fonts/truetype/dejavu/DejaVuSansMono.ttf")
|
|
)
|
|
except Exception:
|
|
pass
|
|
|
|
reader = PdfReader(input_pdf)
|
|
num_pages = len(reader.pages)
|
|
|
|
# Create a canvas to add page numbers
|
|
c = canvas.Canvas(output_pdf, pagesize=letter)
|
|
|
|
for page_num in range(num_pages):
|
|
page = reader.pages[page_num]
|
|
c = canvas.Canvas(output_pdf, pagesize=letter)
|
|
|
|
# Add page number
|
|
c.setFont(font_name, 10)
|
|
page_text = f"Page {page_num + 1} of {num_pages}"
|
|
text_width = c.stringWidth(page_text, font_name, 10)
|
|
x = letter[0] - 1 * 72 - text_width # 1 inch from right
|
|
y = 1 * 72 # 1 inch from bottom
|
|
c.drawString(x, y, page_text)
|
|
|
|
c.showPage()
|
|
c.save()
|
|
|
|
return True
|
|
|
|
except Exception as e:
|
|
logger.error("Error adding page numbers to PDF: %s", e)
|
|
return False
|
|
|
|
|
|
# ------------ Public DOCX/PDF functions ------------
|
|
|
|
def create_transcript_docx(
|
|
text: str,
|
|
filename: str,
|
|
include_cover: bool = False,
|
|
cover_date: str = "",
|
|
cover_desc: str = "",
|
|
):
|
|
"""
|
|
Create a .docx transcript with:
|
|
- 1" margins on all sides
|
|
- 12pt Courier
|
|
- Continuous line numbering on the left (for transcript content only)
|
|
- Optional cover page with type, date, and AI-generated description.
|
|
"""
|
|
doc = Document()
|
|
_configure_base_font(doc)
|
|
_configure_section_margins(doc)
|
|
|
|
# 1) Optional cover page (no line numbering)
|
|
if include_cover and cover_date and cover_desc:
|
|
_add_cover_page(doc, "TRANSCRIPT", cover_date, cover_desc)
|
|
doc.add_page_break()
|
|
|
|
# 2) Enable line numbering for transcript section
|
|
_enable_line_numbering(doc.sections[0])
|
|
|
|
# 3) Transcript content (with line numbering)
|
|
_add_transcript_content(doc, text)
|
|
|
|
doc.save(filename)
|
|
|
|
|
|
def create_summary_docx(
|
|
text: str,
|
|
filename: str,
|
|
include_cover: bool = False,
|
|
cover_date: str = "",
|
|
cover_desc: str = "",
|
|
):
|
|
"""
|
|
Create a .docx summary with consistent font and heading styles.
|
|
Optional cover page with type, date, and AI-generated description.
|
|
No line numbering.
|
|
"""
|
|
doc = Document()
|
|
_configure_base_font(doc)
|
|
_configure_section_margins(doc)
|
|
|
|
# 1) Optional cover page
|
|
if include_cover and cover_date and cover_desc:
|
|
_add_cover_page(doc, "SUMMARY", cover_date, cover_desc)
|
|
doc.add_page_break()
|
|
|
|
# 2) Summary content (no line numbering)
|
|
_add_summary_content(doc, text)
|
|
|
|
doc.save(filename)
|
|
|
|
|
|
def create_combined_docx(
|
|
transcript_text: str,
|
|
summary_text: str,
|
|
filename: str,
|
|
transcript_cover_date: str,
|
|
transcript_cover_desc: str,
|
|
summary_cover_date: str,
|
|
summary_cover_desc: str,
|
|
):
|
|
"""
|
|
Create a combined .docx with:
|
|
1) Transcript cover page (no line numbering)
|
|
2) Page break
|
|
3) Summary content (no line numbering)
|
|
4) Page break
|
|
5) Transcript content (line numbering enabled only here)
|
|
|
|
Line numbering is restricted to the transcript section only.
|
|
"""
|
|
doc = Document()
|
|
_configure_base_font(doc)
|
|
_configure_section_margins(doc)
|
|
|
|
# 1) Transcript cover page (no line numbering)
|
|
_add_cover_page(doc, "TRANSCRIPT", transcript_cover_date, transcript_cover_desc)
|
|
doc.add_page_break()
|
|
|
|
# 3) Summary content (no line numbering)
|
|
_add_summary_content(doc, summary_text)
|
|
doc.add_page_break()
|
|
|
|
# 4) Enable line numbering for transcript section
|
|
_enable_line_numbering(doc.sections[0])
|
|
|
|
# 5) Transcript content (with line numbering)
|
|
_add_transcript_content(doc, transcript_text)
|
|
|
|
doc.save(filename)
|
|
|
|
|
|
def generate_pdf_documents(
|
|
transcript_text: str,
|
|
summary_text: str,
|
|
output_dir: str,
|
|
transcript_cover_date: str,
|
|
transcript_cover_desc: str,
|
|
summary_cover_date: str,
|
|
summary_cover_desc: str,
|
|
) -> Dict[str, str]:
|
|
"""
|
|
Generate PDF documents for transcript, summary, and combined.
|
|
|
|
Returns a dict with paths:
|
|
- transcript_pdf
|
|
- summary_pdf (if summary_text provided)
|
|
- combined_pdf (if summary_text provided)
|
|
"""
|
|
import os
|
|
|
|
os.makedirs(output_dir, exist_ok=True)
|
|
|
|
result = {}
|
|
|
|
# Create temporary directory for intermediate files
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
# 1) Generate constituent DOCX files
|
|
# Transcript cover
|
|
transcript_cover_docx = os.path.join(tmpdir, "TRANSCRIPT_COVER.docx")
|
|
doc = Document()
|
|
_configure_base_font(doc)
|
|
_configure_section_margins(doc)
|
|
_add_cover_page(doc, "TRANSCRIPT", transcript_cover_date, transcript_cover_desc)
|
|
doc.save(transcript_cover_docx)
|
|
|
|
# Summary cover
|
|
summary_cover_docx = os.path.join(tmpdir, "SUMMARY_COVER.docx")
|
|
doc = Document()
|
|
_configure_base_font(doc)
|
|
_configure_section_margins(doc)
|
|
_add_cover_page(doc, "SUMMARY", summary_cover_date, summary_cover_desc)
|
|
doc.save(summary_cover_docx)
|
|
|
|
# Transcript (with line numbering)
|
|
transcript_docx = os.path.join(tmpdir, "TRANSCRIPT.docx")
|
|
doc = Document()
|
|
_configure_base_font(doc)
|
|
_configure_section_margins(doc)
|
|
_enable_line_numbering(doc.sections[0])
|
|
_add_transcript_content(doc, transcript_text)
|
|
doc.save(transcript_docx)
|
|
|
|
# Summary (no line numbering)
|
|
summary_docx = os.path.join(tmpdir, "SUMMARY.docx")
|
|
doc = Document()
|
|
_configure_base_font(doc)
|
|
_configure_section_margins(doc)
|
|
_add_summary_content(doc, summary_text)
|
|
doc.save(summary_docx)
|
|
|
|
# 2) Convert DOCX to PDF
|
|
transcript_cover_pdf = _docx_to_pdf(transcript_cover_docx, tmpdir)
|
|
summary_cover_pdf = _docx_to_pdf(summary_cover_docx, tmpdir)
|
|
transcript_pdf = _docx_to_pdf(transcript_docx, tmpdir)
|
|
summary_pdf = _docx_to_pdf(summary_docx, tmpdir)
|
|
|
|
# 3) Assemble final PDFs
|
|
|
|
# Transcript PDF: cover + transcript + page numbers
|
|
transcript_output_pdf = os.path.join(output_dir, "TRANSCRIPT.pdf")
|
|
merged_transcript = os.path.join(tmpdir, "TRANSCRIPT_MERGED.pdf")
|
|
if _merge_pdfs([transcript_cover_pdf, transcript_pdf], merged_transcript):
|
|
_add_page_numbers_to_pdf(merged_transcript, transcript_output_pdf)
|
|
result["transcript_pdf"] = transcript_output_pdf
|
|
|
|
# Summary PDF: cover + summary + page numbers
|
|
if summary_text:
|
|
summary_output_pdf = os.path.join(output_dir, "SUMMARY.pdf")
|
|
merged_summary = os.path.join(tmpdir, "SUMMARY_MERGED.pdf")
|
|
if _merge_pdfs([summary_cover_pdf, summary_pdf], merged_summary):
|
|
_add_page_numbers_to_pdf(merged_summary, summary_output_pdf)
|
|
result["summary_pdf"] = summary_output_pdf
|
|
|
|
# Combined PDF: transcript cover + summary + "TRANSCRIPT" header + transcript + page numbers
|
|
if summary_text:
|
|
# Create a document with "TRANSCRIPT" header before transcript
|
|
transcript_header_docx = os.path.join(tmpdir, "TRANSCRIPT_HEADER.docx")
|
|
doc = Document()
|
|
_configure_base_font(doc)
|
|
_configure_section_margins(doc)
|
|
|
|
# Add "TRANSCRIPT" header
|
|
p = doc.add_paragraph()
|
|
p.alignment = WD_ALIGN_PARAGRAPH.CENTER
|
|
run = p.add_run("TRANSCRIPT")
|
|
run.bold = True
|
|
run.font.name = "Courier"
|
|
run.font.size = Pt(12)
|
|
|
|
# Page break
|
|
doc.add_page_break()
|
|
|
|
# Enable line numbering for transcript section
|
|
_enable_line_numbering(doc.sections[0])
|
|
|
|
# Transcript content (with line numbering)
|
|
_add_transcript_content(doc, transcript_text)
|
|
doc.save(transcript_header_docx)
|
|
|
|
# Convert to PDF
|
|
transcript_header_pdf = _docx_to_pdf(transcript_header_docx, tmpdir)
|
|
|
|
# Assemble combined PDF
|
|
combined_output_pdf = os.path.join(output_dir, "COMBINED.pdf")
|
|
merged_combined = os.path.join(tmpdir, "COMBINED_MERGED.pdf")
|
|
if _merge_pdfs(
|
|
[transcript_cover_pdf, summary_pdf, transcript_header_pdf],
|
|
merged_combined,
|
|
):
|
|
_add_page_numbers_to_pdf(merged_combined, combined_output_pdf)
|
|
result["combined_pdf"] = combined_output_pdf
|
|
|
|
return result
|
|
|
|
|
|
def send_success_email(
|
|
to: str,
|
|
transcript_text: str,
|
|
summary_text: str,
|
|
attachments: List[str],
|
|
task_id: str,
|
|
):
|
|
"""
|
|
Send a success email with attachments.
|
|
Subject is customizable via EMAIL_SUBJECT_SUCCESS.
|
|
Falls back to a safe default if the env var is missing or blank.
|
|
"""
|
|
# Read subject from environment; never allow blank
|
|
raw_subject = os.getenv("EMAIL_SUBJECT_SUCCESS")
|
|
subject = (raw_subject or "").strip()
|
|
|
|
if not subject:
|
|
subject = "ScrAIbe: Your transcript is ready"
|
|
logger.info(
|
|
"EMAIL_SUBJECT_SUCCESS not set or blank; using default subject: %s", subject
|
|
)
|
|
else:
|
|
logger.info("Using EMAIL_SUBJECT_SUCCESS: %s", subject)
|
|
|
|
# Build email body
|
|
body = f"""
|
|
Your transcription is complete.
|
|
|
|
Task ID: {task_id}
|
|
|
|
Please find the attached documents:
|
|
- Transcript (MD)
|
|
- Transcript (DOCX)
|
|
- Source JSON
|
|
"""
|
|
if summary_text:
|
|
body += "- Summary (MD)\n- Summary (DOCX)\n"
|
|
|
|
# Load HTML template
|
|
try:
|
|
html = load_template(
|
|
"success_template.html",
|
|
task_id=task_id,
|
|
transcript_text=transcript_text[:500],
|
|
summary_text=summary_text[:500] if summary_text else "",
|
|
)
|
|
except EmailError:
|
|
html = None
|
|
|
|
# Send email (send_email has an additional subject guard)
|
|
send_email(
|
|
to=to,
|
|
subject=subject,
|
|
body=body,
|
|
html=html,
|
|
attachments=attachments,
|
|
)
|
|
|
|
|
|
def send_error_email(
|
|
to: str,
|
|
error_message: str,
|
|
task_id: str,
|
|
):
|
|
"""
|
|
Send an error email.
|
|
Subject is customizable via EMAIL_SUBJECT_ERROR.
|
|
Falls back to a safe default if the env var is missing or blank.
|
|
"""
|
|
# Read subject from environment; never allow blank
|
|
raw_subject = os.getenv("EMAIL_SUBJECT_ERROR")
|
|
subject = (raw_subject or "").strip()
|
|
|
|
if not subject:
|
|
subject = "ScrAIbe: Error with your transcription request"
|
|
logger.info(
|
|
"EMAIL_SUBJECT_ERROR not set or blank; using default subject: %s", subject
|
|
)
|
|
else:
|
|
logger.info("Using EMAIL_SUBJECT_ERROR: %s", subject)
|
|
|
|
# Build email body
|
|
body = f"""
|
|
There was an error processing your transcription.
|
|
|
|
Task ID: {task_id}
|
|
Error: {error_message}
|
|
"""
|
|
|
|
# Load HTML template
|
|
try:
|
|
html = load_template(
|
|
"error_notification_template.html",
|
|
task_id=task_id,
|
|
error_message=error_message,
|
|
)
|
|
except EmailError:
|
|
html = None
|
|
|
|
# Send email (send_email has an additional subject guard)
|
|
send_email(
|
|
to=to,
|
|
subject=subject,
|
|
body=body,
|
|
html=html,
|
|
attachments=[],
|
|
)
|