Add watch-folder mode and wire MCP/watcher into entrypoint
- New watcher.py: polls WATCH_DIR, enqueues transcription+summary via Celery. - New process_watch_file_task in tasks.py. - Updated __main__.py: WebUI always runs; MCP and watcher run in parallel when enabled.
This commit is contained in:
+7
-1
@@ -4,7 +4,9 @@ Entrypoint for running ScrAIbe as a module:
|
||||
python -m scraibe
|
||||
|
||||
Always launches the Web GUI (Gradio).
|
||||
Optionally launches an MCP-style API server in parallel.
|
||||
Optionally launches:
|
||||
- MCP-style API server
|
||||
- Watch-folder mode
|
||||
"""
|
||||
|
||||
import os
|
||||
@@ -38,5 +40,9 @@ if __name__ == "__main__":
|
||||
t = threading.Thread(target=_run_mcp_server, daemon=True)
|
||||
t.start()
|
||||
|
||||
# Optionally start watch-folder mode
|
||||
from .watcher import start_watcher
|
||||
start_watcher()
|
||||
|
||||
# Always start WebUI (Gradio)
|
||||
create_app()
|
||||
|
||||
@@ -569,3 +569,145 @@ def process_mcp_transcribe_task(
|
||||
finally:
|
||||
_remove_file(audio_path)
|
||||
logger.info("MCP job %s cleanup completed.", job_id)
|
||||
|
||||
|
||||
@celery_app.task(
|
||||
name="scraibe.tasks.process_watch_file_task",
|
||||
bind=True,
|
||||
max_retries=1,
|
||||
task_time_limit=14400,
|
||||
task_soft_time_limit=13500,
|
||||
)
|
||||
def process_watch_file_task(
|
||||
self,
|
||||
file_path: str,
|
||||
):
|
||||
"""
|
||||
Async task for watch-folder mode:
|
||||
- Transcribe + summarize
|
||||
- Email results
|
||||
- Optionally delete source file
|
||||
"""
|
||||
task_id = self.request.id
|
||||
|
||||
log_level = os.getenv("LOG_LEVEL", "INFO")
|
||||
setup_logging(level=log_level)
|
||||
|
||||
email_to = os.getenv("WATCH_EMAIL_TO") or os.getenv("EMAIL_DEFAULT_TO")
|
||||
if not email_to:
|
||||
logger.error("No email address configured for watch-folder mode.")
|
||||
raise RuntimeError("WATCH_EMAIL_TO or EMAIL_DEFAULT_TO not set.")
|
||||
|
||||
delete_on_success = os.getenv("WATCH_DELETE_ON_SUCCESS", "true").strip().lower() in ("true", "1", "yes")
|
||||
|
||||
temp_files = []
|
||||
local = "watch"
|
||||
date_tag = _date_tag()
|
||||
|
||||
try:
|
||||
scraibe = Scraibe(verbose=True)
|
||||
|
||||
result = scraibe.transcript_and_summarize(
|
||||
audio_file=file_path,
|
||||
language=None,
|
||||
num_speakers=None,
|
||||
verbose=True,
|
||||
for_export=True,
|
||||
)
|
||||
|
||||
transcript_text = result.get("transcript", "")
|
||||
summary_text = result.get("summary", "")
|
||||
segments = result.get("segments", [])
|
||||
raw_result = result.get("raw_result")
|
||||
|
||||
# Transcript .md
|
||||
md_transcript_path = _safe_filename("TRANSCRIPT", local, date_tag, ".md")
|
||||
with open(md_transcript_path, "w", encoding="utf-8") as f:
|
||||
f.write("# Transcript\n\n")
|
||||
f.write(transcript_text)
|
||||
temp_files.append(md_transcript_path)
|
||||
|
||||
# Transcript .docx
|
||||
docx_transcript_path = _safe_filename("TRANSCRIPT", local, date_tag, ".docx")
|
||||
create_transcript_docx(
|
||||
transcript_text,
|
||||
docx_transcript_path,
|
||||
)
|
||||
temp_files.append(docx_transcript_path)
|
||||
|
||||
# Summary .md
|
||||
md_summary_path = _safe_filename("SUMMARY", local, date_tag, ".md")
|
||||
with open(md_summary_path, "w", encoding="utf-8") as f:
|
||||
f.write("# Summary\n\n")
|
||||
f.write(summary_text)
|
||||
temp_files.append(md_summary_path)
|
||||
|
||||
# Summary .docx
|
||||
docx_summary_path = _safe_filename("SUMMARY", local, date_tag, ".docx")
|
||||
create_summary_docx(
|
||||
summary_text,
|
||||
docx_summary_path,
|
||||
)
|
||||
temp_files.append(docx_summary_path)
|
||||
|
||||
# JSON as SOURCE
|
||||
json_data = {
|
||||
"task": "watch_transcript_and_summarize",
|
||||
"transcript": transcript_text,
|
||||
"summary": summary_text,
|
||||
"segments": segments,
|
||||
"metadata": {
|
||||
"timestamp": datetime.utcnow().isoformat(),
|
||||
"job_id": task_id,
|
||||
"source_file": file_path,
|
||||
},
|
||||
}
|
||||
if raw_result is not None:
|
||||
json_data["raw_result"] = raw_result
|
||||
|
||||
json_path = _safe_filename("SOURCE", local, date_tag, ".json")
|
||||
with open(json_path, "w", encoding="utf-8") as f:
|
||||
json.dump(json_data, f, indent=2, ensure_ascii=False)
|
||||
temp_files.append(json_path)
|
||||
|
||||
# Attachments
|
||||
attachments = [
|
||||
md_transcript_path,
|
||||
docx_transcript_path,
|
||||
md_summary_path,
|
||||
docx_summary_path,
|
||||
json_path,
|
||||
]
|
||||
|
||||
# Send email
|
||||
send_success_email(
|
||||
to=email_to,
|
||||
transcript_text=transcript_text,
|
||||
summary_text=summary_text,
|
||||
attachments=attachments,
|
||||
task_id=task_id,
|
||||
)
|
||||
|
||||
logger.info("Watch-folder job %s completed for %s.", task_id, file_path)
|
||||
|
||||
# Delete source file if configured
|
||||
if delete_on_success and os.path.exists(file_path):
|
||||
try:
|
||||
os.remove(file_path)
|
||||
logger.info("Deleted source file: %s", file_path)
|
||||
except Exception as e:
|
||||
logger.warning("Failed to delete source file %s: %s", file_path, e)
|
||||
|
||||
except Exception as e:
|
||||
logger.error("Error processing watch file %s: %s", file_path, e, exc_info=True)
|
||||
send_error_email(
|
||||
to=email_to,
|
||||
error_message=str(e),
|
||||
task_id=task_id,
|
||||
)
|
||||
raise e
|
||||
finally:
|
||||
# Cleanup temp files
|
||||
for path in temp_files:
|
||||
_remove_file(path)
|
||||
logger.info("Watch-folder job %s cleanup completed.", task_id)
|
||||
|
||||
@@ -0,0 +1,100 @@
|
||||
"""
|
||||
Watch-folder mode for ScrAIbe.
|
||||
|
||||
Monitors a folder for audio files. For each file:
|
||||
- Transcribes + summarizes
|
||||
- Emails results
|
||||
- Deletes source file
|
||||
|
||||
Configuration (env):
|
||||
- WATCH_ENABLED: "true"/"false" (default: false)
|
||||
- WATCH_DIR: directory to watch (required if enabled)
|
||||
- WATCH_EMAIL_TO: destination email (required if enabled)
|
||||
- WATCH_POLL_INTERVAL: seconds between scans (default: 10)
|
||||
- WATCH_DELETE_ON_SUCCESS: "true"/"false" (default: true)
|
||||
"""
|
||||
|
||||
import os
|
||||
import time
|
||||
import logging
|
||||
import threading
|
||||
from pathlib import Path
|
||||
|
||||
logger = logging.getLogger("scraibe.watcher")
|
||||
|
||||
AUDIO_EXTENSIONS = {
|
||||
".wav",
|
||||
".mp3",
|
||||
".flac",
|
||||
".m4a",
|
||||
".ogg",
|
||||
".webm",
|
||||
".mp4",
|
||||
}
|
||||
|
||||
|
||||
def _is_audio(path: Path) -> bool:
|
||||
return path.is_file() and path.suffix.lower() in AUDIO_EXTENSIONS
|
||||
|
||||
|
||||
def _enqueue_file(file_path: Path):
|
||||
"""
|
||||
Enqueue a file for transcription + summarization via Celery.
|
||||
"""
|
||||
from .tasks import process_watch_file_task
|
||||
|
||||
try:
|
||||
process_watch_file_task.delay(str(file_path))
|
||||
except Exception as e:
|
||||
logger.error("Failed to enqueue watch file %s: %s", file_path, e)
|
||||
|
||||
|
||||
def _scan_directory(watch_dir: Path):
|
||||
"""
|
||||
Scan directory and enqueue all audio files.
|
||||
"""
|
||||
if not watch_dir.is_dir():
|
||||
logger.warning("WATCH_DIR does not exist or is not a directory: %s", watch_dir)
|
||||
return
|
||||
|
||||
for p in watch_dir.iterdir():
|
||||
if _is_audio(p):
|
||||
logger.info("Found audio file in WATCH_DIR: %s", p)
|
||||
_enqueue_file(p)
|
||||
|
||||
|
||||
def start_watcher():
|
||||
"""
|
||||
Start watch-folder loop in a background thread.
|
||||
"""
|
||||
enabled = os.getenv("WATCH_ENABLED", "false").strip().lower() in ("true", "1", "yes")
|
||||
if not enabled:
|
||||
return
|
||||
|
||||
watch_dir = os.getenv("WATCH_DIR")
|
||||
if not watch_dir:
|
||||
logger.warning("WATCH_ENABLED is true but WATCH_DIR is not set. Watcher disabled.")
|
||||
return
|
||||
|
||||
email_to = os.getenv("WATCH_EMAIL_TO")
|
||||
if not email_to:
|
||||
logger.warning("WATCH_ENABLED is true but WATCH_EMAIL_TO is not set. Watcher disabled.")
|
||||
return
|
||||
|
||||
interval = float(os.getenv("WATCH_POLL_INTERVAL", "10"))
|
||||
|
||||
watch_path = Path(watch_dir).expanduser().resolve()
|
||||
watch_path.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
logger.info("Starting watch-folder: dir=%s, email=%s, interval=%s", watch_dir, email_to, interval)
|
||||
|
||||
def _loop():
|
||||
while True:
|
||||
try:
|
||||
_scan_directory(watch_path)
|
||||
except Exception as e:
|
||||
logger.error("Error scanning WATCH_DIR: %s", e)
|
||||
time.sleep(interval)
|
||||
|
||||
t = threading.Thread(target=_loop, daemon=True)
|
||||
t.start()
|
||||
Reference in New Issue
Block a user