Use structured filenames and formal DOCX transcript styling
Mirror and run GitLab CI / build (push) Has been cancelled
Ruff / ruff (push) Has been cancelled

This commit is contained in:
admin
2026-06-14 16:20:10 +00:00
parent 2dce9b43c9
commit a8f48b9e58
2 changed files with 121 additions and 27 deletions
+71 -13
View File
@@ -9,6 +9,7 @@ Template placeholders are primarily filled via environment variables.
import base64 import base64
import os import os
import re
import smtplib import smtplib
import logging import logging
from email import encoders from email import encoders
@@ -17,6 +18,11 @@ from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText from email.mime.text import MIMEText
from typing import List, Optional, Dict, Any from typing import List, Optional, Dict, Any
from docx import Document
from docx.shared import Inches, Pt
from docx.oxml.ns import qn
from docx.oxml import OxmlElement
logger = logging.getLogger("scraibe.email_sender") logger = logging.getLogger("scraibe.email_sender")
@@ -280,30 +286,82 @@ def send_email(
def create_transcript_docx(text: str, filename: str): def create_transcript_docx(text: str, filename: str):
""" """
Create a .docx file from plain/markdown transcript text. Create a .docx transcript with:
- 1.5" left margin, 1" right margin
- 12pt Courier
- Continuous line numbering on the left
- Speaker names capitalized and indented; spoken text further indented
""" """
from docx import Document
from docx.shared import Pt
doc = Document() doc = Document()
doc.add_heading("Transcript", level=1) section = doc.sections[0]
for line in text.splitlines(): # Margins
p = doc.add_paragraph(line) section.left_margin = Inches(1.5)
p.paragraph_format.space_after = Pt(4) section.right_margin = Inches(1.0)
section.top_margin = Inches(1.0)
section.bottom_margin = Inches(1.0)
# Line numbering (continuous, left side)
section_type = section.element.find(qn("w:sectionPr"))
if section_type is None:
section_type = OxmlElement("w:sectionPr")
section.element.insert(0, section_type)
line_num = OxmlElement("w:lineNumbering")
line_num.set(qn("w:start"), "continuous")
line_num.set(qn("w:countBy"), "1")
section_type.append(line_num)
# Default font
style = doc.styles["Normal"]
font = style.font
font.name = "Courier"
font.size = Pt(12)
# Parse lines
lines = text.strip().split("\n")
for line in lines:
line = line.strip()
if not line:
continue
# Try to parse: [00:00] SPEAKER: text
m = re.match(r"\[(\d+:\d+(?::\d+)?)\]\s*(.+?):\s*(.*)", line)
if m:
ts, speaker, content = m.groups()
# Speaker line
p_spk = doc.add_paragraph()
p_spk.paragraph_format.left_indent = Inches(0.25)
run_spk = p_spk.add_run(f"[{ts}] {speaker.upper()}")
run_spk.bold = True
run_spk.font.name = "Courier"
run_spk.font.size = Pt(12)
# Spoken text line
p_txt = doc.add_paragraph()
p_txt.paragraph_format.left_indent = Inches(0.5)
run_txt = p_txt.add_run(content.strip())
run_txt.font.name = "Courier"
run_txt.font.size = Pt(12)
else:
# Fallback for non-standard lines
p = doc.add_paragraph()
run = p.add_run(line)
run.font.name = "Courier"
run.font.size = Pt(12)
doc.save(filename) doc.save(filename)
def create_summary_docx(text: str, filename: str): def create_summary_docx(text: str, filename: str):
""" """
Create a .docx file from summary text. Create a .docx summary with consistent font.
""" """
from docx import Document
from docx.shared import Pt
doc = Document() doc = Document()
doc.add_heading("Summary", level=1) style = doc.styles["Normal"]
font = style.font
font.name = "Courier"
font.size = Pt(12)
for line in text.splitlines(): for line in text.splitlines():
p = doc.add_paragraph(line) p = doc.add_paragraph(line)
+50 -14
View File
@@ -17,6 +17,32 @@ from .email_sender import create_transcript_docx, create_summary_docx
logger = logging.getLogger("scraibe.tasks") logger = logging.getLogger("scraibe.tasks")
def _local_part(email: str) -> str:
"""
Extract the part before '@' from an email, sanitized for filenames.
"""
local = (email or "").split("@")[0].strip()
local = "".join(ch if ch.isalnum() or ch in ("-", "_", ".") else "_" for ch in local)
return local or "user"
def _date_tag() -> str:
"""
Date tag in DD-MON-YYYY format (e.g. 01-JAN-2025).
"""
return datetime.utcnow().strftime("%d-%b-%Y").upper()
def _safe_filename(base: str, local: str, date_tag: str, ext: str) -> str:
"""
Create a temp file with the requested logical name.
Uses mktemp for uniqueness but keeps the desired name pattern.
"""
name = f"{base}-{local}-{date_tag}{ext}"
# Ensure uniqueness while preserving the logical name pattern
return tempfile.mktemp(prefix=name.replace(".", ""), suffix=ext)
def _remove_file(path: str): def _remove_file(path: str):
""" """
Remove a file if it exists. Best-effort; logs but never raises. Remove a file if it exists. Best-effort; logs but never raises.
@@ -224,6 +250,10 @@ def process_transcription_task(
# Track all temporary files to clean up later # Track all temporary files to clean up later
temp_files = [] temp_files = []
# Derive naming components
local = _local_part(email_to)
date_tag = _date_tag()
try: try:
# 1) Determine queue position and send initial email # 1) Determine queue position and send initial email
queue_pos = get_queue_position(task_id) queue_pos = get_queue_position(task_id)
@@ -266,24 +296,21 @@ def process_transcription_task(
segments = result.get("segments", []) segments = result.get("segments", [])
raw_result = result.get("raw_result") raw_result = result.get("raw_result")
# 4) Prepare files for email # 4) Prepare files for email with required naming
attachments = []
# Transcript as .md # Transcript as .md
md_transcript_path = tempfile.mktemp(suffix=".md") md_transcript_path = _safe_filename("TRANSCRIPT", local, date_tag, ".md")
with open(md_transcript_path, "w", encoding="utf-8") as f: with open(md_transcript_path, "w", encoding="utf-8") as f:
f.write("# Transcript\n\n") f.write("# Transcript\n\n")
f.write(transcript_text) f.write(transcript_text)
attachments.append(md_transcript_path)
temp_files.append(md_transcript_path) temp_files.append(md_transcript_path)
# Transcript as .docx # Transcript as .docx (with required style)
docx_transcript_path = tempfile.mktemp(suffix=".docx") docx_transcript_path = _safe_filename("TRANSCRIPT", local, date_tag, ".docx")
create_transcript_docx(transcript_text, docx_transcript_path) create_transcript_docx(transcript_text, docx_transcript_path)
attachments.append(docx_transcript_path)
temp_files.append(docx_transcript_path) temp_files.append(docx_transcript_path)
# JSON with diarization # JSON with diarization as SOURCE
json_data = { json_data = {
"task": task_type, "task": task_type,
"transcript": transcript_text, "transcript": transcript_text,
@@ -300,27 +327,36 @@ def process_transcription_task(
if raw_result is not None: if raw_result is not None:
json_data["raw_result"] = raw_result json_data["raw_result"] = raw_result
json_path = tempfile.mktemp(suffix=".json") json_path = _safe_filename("SOURCE", local, date_tag, ".json")
with open(json_path, "w", encoding="utf-8") as f: with open(json_path, "w", encoding="utf-8") as f:
json.dump(json_data, f, indent=2, ensure_ascii=False) json.dump(json_data, f, indent=2, ensure_ascii=False)
attachments.append(json_path)
temp_files.append(json_path) temp_files.append(json_path)
# Summary as .md (only when summary is available) # Summary as .md (only when summary is available)
if summary_text: if summary_text:
md_summary_path = tempfile.mktemp(suffix=".md") md_summary_path = _safe_filename("SUMMARY", local, date_tag, ".md")
with open(md_summary_path, "w", encoding="utf-8") as f: with open(md_summary_path, "w", encoding="utf-8") as f:
f.write("# Summary\n\n") f.write("# Summary\n\n")
f.write(summary_text) f.write(summary_text)
attachments.append(md_summary_path)
temp_files.append(md_summary_path) temp_files.append(md_summary_path)
# Summary as .docx # Summary as .docx
docx_summary_path = tempfile.mktemp(suffix=".docx") docx_summary_path = _safe_filename("SUMMARY", local, date_tag, ".docx")
create_summary_docx(summary_text, docx_summary_path) create_summary_docx(summary_text, docx_summary_path)
attachments.append(docx_summary_path)
temp_files.append(docx_summary_path) temp_files.append(docx_summary_path)
# All attachments
attachments = [
md_transcript_path,
docx_transcript_path,
json_path,
]
if summary_text:
attachments += [
md_summary_path,
docx_summary_path,
]
# 5) Send success email # 5) Send success email
send_success_email( send_success_email(
to=email_to, to=email_to,