Add Identify speakers option: AI infers names and replaces Speaker IDs in transcript
Mirror and run GitLab CI / build (push) Has been cancelled
Ruff / ruff (push) Has been cancelled

This commit is contained in:
ScrAIbe Admin
2026-06-14 18:05:37 +00:00
parent eb9b2f9126
commit 49e999f0ee
2 changed files with 95 additions and 0 deletions
+86
View File
@@ -10,6 +10,7 @@ from datetime import datetime
from .celery_app import celery_app from .celery_app import celery_app
from .autotranscript import Scraibe from .autotranscript import Scraibe
from .summarizer import SummarizerClient, SummarizerError
from .misc import setup_logging from .misc import setup_logging
from .email_sender import send_email, EmailError, load_template from .email_sender import send_email, EmailError, load_template
from .email_sender import create_transcript_docx, create_summary_docx from .email_sender import create_transcript_docx, create_summary_docx
@@ -238,6 +239,7 @@ def process_transcription_task(
email_to: str, email_to: str,
email_cc: str, email_cc: str,
include_summary: bool, include_summary: bool,
identify_speakers: bool = False,
): ):
""" """
Async task: transcribe audio, optionally summarize, then email results. Async task: transcribe audio, optionally summarize, then email results.
@@ -294,6 +296,90 @@ def process_transcription_task(
segments = result.get("segments", []) segments = result.get("segments", [])
raw_result = result.get("raw_result") raw_result = result.get("raw_result")
# 3b) Optional speaker identification
speaker_map = {} # e.g. {"SPEAKER 1": "John", "SPEAKER 2": "Maria"}
if identify_speakers:
try:
# Use the same summarizer client as transcript_and_summarize
scraibe._ensure_summarizer()
summarizer = scraibe._summarizer
prompt = (
"Below is a transcript with speaker labels like 'SPEAKER 1', 'SPEAKER 2', etc. "
"Based on how they speak and the context, suggest realistic names for each speaker. "
"Do not add extra commentary. Output ONLY a mapping in this exact format, one per line:
SPEAKER 1: Suggested Name
SPEAKER 2: Suggested Name
SPEAKER 3: Suggested Name
Transcript:
" + transcript_text
)
response = summarizer._chat_completion(
messages=[{"role": "user", "content": prompt}],
temperature=0.3,
max_tokens=300,
)
reply = (response or {}).get("choices", [{}])[0].get("message", {}).get("content", "")
# Parse mapping
import re
for m in re.finditer(
r"SPEAKER\s+(\d+)\s*:\s*(.+)",
reply,
re.IGNORECASE,
):
spk = f"SPEAKER {m.group(1).strip()}"
name = m.group(2).strip().rstrip(".")
if name:
speaker_map[spk] = name
logger.info("Speaker identification mapping: %s", speaker_map)
# Apply mapping to transcript text
if speaker_map:
def replace_speaker(m):
label = m.group(0).strip()
# normalize to "SPEAKER N"
normalized = re.sub(
r"\s+",
" ",
re.sub(r"[^A-Z0-9\s]", "", label.upper()),
).strip()
return speaker_map.get(normalized, label)
# Replace in lines like "[00:12] SPEAKER 1:" but preserve timestamp and colon
def replace_in_line(line: str) -> str:
# match after timestamp bracket and space: "SPEAKER N:"
return re.sub(
r"(\[\d+:\d+(?::\d+)?\]\s*)([A-Z\s]+?):\s*",
lambda m: m.group(1) + (speaker_map.get(m.group(2).strip(), m.group(2)) + ": "),
line,
)
transcript_lines = transcript_text.splitlines()
transcript_text = "\n".join(
replace_in_line(line) for line in transcript_lines
)
# Also update segments for JSON export
updated_segments = []
for seg in segments:
sp = (seg.get("speaker") or "").strip()
sp_norm = re.sub(r"[^A-Z0-9\s]", "", sp.upper()).strip()
sp_new = speaker_map.get(sp_norm, sp)
seg = dict(seg)
seg["speaker"] = sp_new
updated_segments.append(seg)
segments = updated_segments
except (SummarizerError, Exception) as e:
logger.warning(
"Speaker identification failed; falling back to Speaker IDs: %s", e
)
speaker_map = {}
# 4) Prepare files # 4) Prepare files
# Transcript .md # Transcript .md
+9
View File
@@ -135,6 +135,12 @@ def create_app():
label="Task", label="Task",
) )
identify_speakers = gr.Checkbox(
label="Identify speakers (best effort using AI)",
value=False,
info="If enabled, AI will attempt to infer real names for speakers and replace Speaker 1/2/etc. in the transcript."
)
with gr.Row(): with gr.Row():
language_input = gr.Textbox( language_input = gr.Textbox(
label="Language (optional)", label="Language (optional)",
@@ -188,6 +194,7 @@ def create_app():
num_speakers, num_speakers,
email_to_val, email_to_val,
email_cc_val, email_cc_val,
identify_speakers_val,
): ):
if not audio: if not audio:
return "Please upload or record audio." return "Please upload or record audio."
@@ -225,6 +232,7 @@ def create_app():
email_to=email_to_val, email_to=email_to_val,
email_cc=email_cc_val or None, email_cc=email_cc_val or None,
include_summary=(task == "transcript_and_summarize"), include_summary=(task == "transcript_and_summarize"),
identify_speakers=bool(identify_speakers_val),
) )
except Exception as e: except Exception as e:
logger.error("Error enqueuing job: %s", e) logger.error("Error enqueuing job: %s", e)
@@ -247,6 +255,7 @@ def create_app():
num_speakers_input, num_speakers_input,
email_to, email_to,
email_cc, email_cc,
identify_speakers,
], ],
outputs=[status_text], outputs=[status_text],
) )