From a8f48b9e5840cab2d0060989aadb6238d43cc7cf Mon Sep 17 00:00:00 2001 From: admin Date: Sun, 14 Jun 2026 16:20:10 +0000 Subject: [PATCH] Use structured filenames and formal DOCX transcript styling --- scraibe/email_sender.py | 84 ++++++++++++++++++++++++++++++++++------- scraibe/tasks.py | 64 ++++++++++++++++++++++++------- 2 files changed, 121 insertions(+), 27 deletions(-) diff --git a/scraibe/email_sender.py b/scraibe/email_sender.py index 0b3edc4..162b8cd 100644 --- a/scraibe/email_sender.py +++ b/scraibe/email_sender.py @@ -9,6 +9,7 @@ Template placeholders are primarily filled via environment variables. import base64 import os +import re import smtplib import logging from email import encoders @@ -17,6 +18,11 @@ from email.mime.multipart import MIMEMultipart from email.mime.text import MIMEText from typing import List, Optional, Dict, Any +from docx import Document +from docx.shared import Inches, Pt +from docx.oxml.ns import qn +from docx.oxml import OxmlElement + logger = logging.getLogger("scraibe.email_sender") @@ -280,30 +286,82 @@ def send_email( def create_transcript_docx(text: str, filename: str): """ - Create a .docx file from plain/markdown transcript text. + Create a .docx transcript with: + - 1.5" left margin, 1" right margin + - 12pt Courier + - Continuous line numbering on the left + - Speaker names capitalized and indented; spoken text further indented """ - from docx import Document - from docx.shared import Pt - doc = Document() - doc.add_heading("Transcript", level=1) + section = doc.sections[0] - for line in text.splitlines(): - p = doc.add_paragraph(line) - p.paragraph_format.space_after = Pt(4) + # Margins + section.left_margin = Inches(1.5) + section.right_margin = Inches(1.0) + section.top_margin = Inches(1.0) + section.bottom_margin = Inches(1.0) + + # Line numbering (continuous, left side) + section_type = section.element.find(qn("w:sectionPr")) + if section_type is None: + section_type = OxmlElement("w:sectionPr") + section.element.insert(0, section_type) + + line_num = OxmlElement("w:lineNumbering") + line_num.set(qn("w:start"), "continuous") + line_num.set(qn("w:countBy"), "1") + section_type.append(line_num) + + # Default font + style = doc.styles["Normal"] + font = style.font + font.name = "Courier" + font.size = Pt(12) + + # Parse lines + lines = text.strip().split("\n") + for line in lines: + line = line.strip() + if not line: + continue + + # Try to parse: [00:00] SPEAKER: text + m = re.match(r"\[(\d+:\d+(?::\d+)?)\]\s*(.+?):\s*(.*)", line) + if m: + ts, speaker, content = m.groups() + # Speaker line + p_spk = doc.add_paragraph() + p_spk.paragraph_format.left_indent = Inches(0.25) + run_spk = p_spk.add_run(f"[{ts}] {speaker.upper()}") + run_spk.bold = True + run_spk.font.name = "Courier" + run_spk.font.size = Pt(12) + + # Spoken text line + p_txt = doc.add_paragraph() + p_txt.paragraph_format.left_indent = Inches(0.5) + run_txt = p_txt.add_run(content.strip()) + run_txt.font.name = "Courier" + run_txt.font.size = Pt(12) + else: + # Fallback for non-standard lines + p = doc.add_paragraph() + run = p.add_run(line) + run.font.name = "Courier" + run.font.size = Pt(12) doc.save(filename) def create_summary_docx(text: str, filename: str): """ - Create a .docx file from summary text. + Create a .docx summary with consistent font. """ - from docx import Document - from docx.shared import Pt - doc = Document() - doc.add_heading("Summary", level=1) + style = doc.styles["Normal"] + font = style.font + font.name = "Courier" + font.size = Pt(12) for line in text.splitlines(): p = doc.add_paragraph(line) diff --git a/scraibe/tasks.py b/scraibe/tasks.py index 08efdfe..af7dc9f 100644 --- a/scraibe/tasks.py +++ b/scraibe/tasks.py @@ -17,6 +17,32 @@ from .email_sender import create_transcript_docx, create_summary_docx logger = logging.getLogger("scraibe.tasks") +def _local_part(email: str) -> str: + """ + Extract the part before '@' from an email, sanitized for filenames. + """ + local = (email or "").split("@")[0].strip() + local = "".join(ch if ch.isalnum() or ch in ("-", "_", ".") else "_" for ch in local) + return local or "user" + + +def _date_tag() -> str: + """ + Date tag in DD-MON-YYYY format (e.g. 01-JAN-2025). + """ + return datetime.utcnow().strftime("%d-%b-%Y").upper() + + +def _safe_filename(base: str, local: str, date_tag: str, ext: str) -> str: + """ + Create a temp file with the requested logical name. + Uses mktemp for uniqueness but keeps the desired name pattern. + """ + name = f"{base}-{local}-{date_tag}{ext}" + # Ensure uniqueness while preserving the logical name pattern + return tempfile.mktemp(prefix=name.replace(".", ""), suffix=ext) + + def _remove_file(path: str): """ Remove a file if it exists. Best-effort; logs but never raises. @@ -224,6 +250,10 @@ def process_transcription_task( # Track all temporary files to clean up later temp_files = [] + # Derive naming components + local = _local_part(email_to) + date_tag = _date_tag() + try: # 1) Determine queue position and send initial email queue_pos = get_queue_position(task_id) @@ -266,24 +296,21 @@ def process_transcription_task( segments = result.get("segments", []) raw_result = result.get("raw_result") - # 4) Prepare files for email - attachments = [] + # 4) Prepare files for email with required naming # Transcript as .md - md_transcript_path = tempfile.mktemp(suffix=".md") + md_transcript_path = _safe_filename("TRANSCRIPT", local, date_tag, ".md") with open(md_transcript_path, "w", encoding="utf-8") as f: f.write("# Transcript\n\n") f.write(transcript_text) - attachments.append(md_transcript_path) temp_files.append(md_transcript_path) - # Transcript as .docx - docx_transcript_path = tempfile.mktemp(suffix=".docx") + # Transcript as .docx (with required style) + docx_transcript_path = _safe_filename("TRANSCRIPT", local, date_tag, ".docx") create_transcript_docx(transcript_text, docx_transcript_path) - attachments.append(docx_transcript_path) temp_files.append(docx_transcript_path) - # JSON with diarization + # JSON with diarization as SOURCE json_data = { "task": task_type, "transcript": transcript_text, @@ -300,27 +327,36 @@ def process_transcription_task( if raw_result is not None: json_data["raw_result"] = raw_result - json_path = tempfile.mktemp(suffix=".json") + json_path = _safe_filename("SOURCE", local, date_tag, ".json") with open(json_path, "w", encoding="utf-8") as f: json.dump(json_data, f, indent=2, ensure_ascii=False) - attachments.append(json_path) temp_files.append(json_path) # Summary as .md (only when summary is available) if summary_text: - md_summary_path = tempfile.mktemp(suffix=".md") + md_summary_path = _safe_filename("SUMMARY", local, date_tag, ".md") with open(md_summary_path, "w", encoding="utf-8") as f: f.write("# Summary\n\n") f.write(summary_text) - attachments.append(md_summary_path) temp_files.append(md_summary_path) # Summary as .docx - docx_summary_path = tempfile.mktemp(suffix=".docx") + docx_summary_path = _safe_filename("SUMMARY", local, date_tag, ".docx") create_summary_docx(summary_text, docx_summary_path) - attachments.append(docx_summary_path) temp_files.append(docx_summary_path) + # All attachments + attachments = [ + md_transcript_path, + docx_transcript_path, + json_path, + ] + if summary_text: + attachments += [ + md_summary_path, + docx_summary_path, + ] + # 5) Send success email send_success_email( to=email_to,