diff --git a/scraibe/__main__.py b/scraibe/__main__.py index 89e0b4f..f4a2345 100644 --- a/scraibe/__main__.py +++ b/scraibe/__main__.py @@ -4,7 +4,9 @@ Entrypoint for running ScrAIbe as a module: python -m scraibe Always launches the Web GUI (Gradio). -Optionally launches an MCP-style API server in parallel. +Optionally launches: +- MCP-style API server +- Watch-folder mode """ import os @@ -38,5 +40,9 @@ if __name__ == "__main__": t = threading.Thread(target=_run_mcp_server, daemon=True) t.start() + # Optionally start watch-folder mode + from .watcher import start_watcher + start_watcher() + # Always start WebUI (Gradio) create_app() diff --git a/scraibe/tasks.py b/scraibe/tasks.py index b670ee8..091cde3 100644 --- a/scraibe/tasks.py +++ b/scraibe/tasks.py @@ -569,3 +569,145 @@ def process_mcp_transcribe_task( finally: _remove_file(audio_path) logger.info("MCP job %s cleanup completed.", job_id) + + +@celery_app.task( + name="scraibe.tasks.process_watch_file_task", + bind=True, + max_retries=1, + task_time_limit=14400, + task_soft_time_limit=13500, +) +def process_watch_file_task( + self, + file_path: str, +): + """ + Async task for watch-folder mode: + - Transcribe + summarize + - Email results + - Optionally delete source file + """ + task_id = self.request.id + + log_level = os.getenv("LOG_LEVEL", "INFO") + setup_logging(level=log_level) + + email_to = os.getenv("WATCH_EMAIL_TO") or os.getenv("EMAIL_DEFAULT_TO") + if not email_to: + logger.error("No email address configured for watch-folder mode.") + raise RuntimeError("WATCH_EMAIL_TO or EMAIL_DEFAULT_TO not set.") + + delete_on_success = os.getenv("WATCH_DELETE_ON_SUCCESS", "true").strip().lower() in ("true", "1", "yes") + + temp_files = [] + local = "watch" + date_tag = _date_tag() + + try: + scraibe = Scraibe(verbose=True) + + result = scraibe.transcript_and_summarize( + audio_file=file_path, + language=None, + num_speakers=None, + verbose=True, + for_export=True, + ) + + transcript_text = result.get("transcript", "") + summary_text = result.get("summary", "") + segments = result.get("segments", []) + raw_result = result.get("raw_result") + + # Transcript .md + md_transcript_path = _safe_filename("TRANSCRIPT", local, date_tag, ".md") + with open(md_transcript_path, "w", encoding="utf-8") as f: + f.write("# Transcript\n\n") + f.write(transcript_text) + temp_files.append(md_transcript_path) + + # Transcript .docx + docx_transcript_path = _safe_filename("TRANSCRIPT", local, date_tag, ".docx") + create_transcript_docx( + transcript_text, + docx_transcript_path, + ) + temp_files.append(docx_transcript_path) + + # Summary .md + md_summary_path = _safe_filename("SUMMARY", local, date_tag, ".md") + with open(md_summary_path, "w", encoding="utf-8") as f: + f.write("# Summary\n\n") + f.write(summary_text) + temp_files.append(md_summary_path) + + # Summary .docx + docx_summary_path = _safe_filename("SUMMARY", local, date_tag, ".docx") + create_summary_docx( + summary_text, + docx_summary_path, + ) + temp_files.append(docx_summary_path) + + # JSON as SOURCE + json_data = { + "task": "watch_transcript_and_summarize", + "transcript": transcript_text, + "summary": summary_text, + "segments": segments, + "metadata": { + "timestamp": datetime.utcnow().isoformat(), + "job_id": task_id, + "source_file": file_path, + }, + } + if raw_result is not None: + json_data["raw_result"] = raw_result + + json_path = _safe_filename("SOURCE", local, date_tag, ".json") + with open(json_path, "w", encoding="utf-8") as f: + json.dump(json_data, f, indent=2, ensure_ascii=False) + temp_files.append(json_path) + + # Attachments + attachments = [ + md_transcript_path, + docx_transcript_path, + md_summary_path, + docx_summary_path, + json_path, + ] + + # Send email + send_success_email( + to=email_to, + transcript_text=transcript_text, + summary_text=summary_text, + attachments=attachments, + task_id=task_id, + ) + + logger.info("Watch-folder job %s completed for %s.", task_id, file_path) + + # Delete source file if configured + if delete_on_success and os.path.exists(file_path): + try: + os.remove(file_path) + logger.info("Deleted source file: %s", file_path) + except Exception as e: + logger.warning("Failed to delete source file %s: %s", file_path, e) + + except Exception as e: + logger.error("Error processing watch file %s: %s", file_path, e, exc_info=True) + send_error_email( + to=email_to, + error_message=str(e), + task_id=task_id, + ) + raise e + finally: + # Cleanup temp files + for path in temp_files: + _remove_file(path) + logger.info("Watch-folder job %s cleanup completed.", task_id) diff --git a/scraibe/watcher.py b/scraibe/watcher.py new file mode 100644 index 0000000..f394ab2 --- /dev/null +++ b/scraibe/watcher.py @@ -0,0 +1,100 @@ +""" +Watch-folder mode for ScrAIbe. + +Monitors a folder for audio files. For each file: +- Transcribes + summarizes +- Emails results +- Deletes source file + +Configuration (env): +- WATCH_ENABLED: "true"/"false" (default: false) +- WATCH_DIR: directory to watch (required if enabled) +- WATCH_EMAIL_TO: destination email (required if enabled) +- WATCH_POLL_INTERVAL: seconds between scans (default: 10) +- WATCH_DELETE_ON_SUCCESS: "true"/"false" (default: true) +""" + +import os +import time +import logging +import threading +from pathlib import Path + +logger = logging.getLogger("scraibe.watcher") + +AUDIO_EXTENSIONS = { + ".wav", + ".mp3", + ".flac", + ".m4a", + ".ogg", + ".webm", + ".mp4", +} + + +def _is_audio(path: Path) -> bool: + return path.is_file() and path.suffix.lower() in AUDIO_EXTENSIONS + + +def _enqueue_file(file_path: Path): + """ + Enqueue a file for transcription + summarization via Celery. + """ + from .tasks import process_watch_file_task + + try: + process_watch_file_task.delay(str(file_path)) + except Exception as e: + logger.error("Failed to enqueue watch file %s: %s", file_path, e) + + +def _scan_directory(watch_dir: Path): + """ + Scan directory and enqueue all audio files. + """ + if not watch_dir.is_dir(): + logger.warning("WATCH_DIR does not exist or is not a directory: %s", watch_dir) + return + + for p in watch_dir.iterdir(): + if _is_audio(p): + logger.info("Found audio file in WATCH_DIR: %s", p) + _enqueue_file(p) + + +def start_watcher(): + """ + Start watch-folder loop in a background thread. + """ + enabled = os.getenv("WATCH_ENABLED", "false").strip().lower() in ("true", "1", "yes") + if not enabled: + return + + watch_dir = os.getenv("WATCH_DIR") + if not watch_dir: + logger.warning("WATCH_ENABLED is true but WATCH_DIR is not set. Watcher disabled.") + return + + email_to = os.getenv("WATCH_EMAIL_TO") + if not email_to: + logger.warning("WATCH_ENABLED is true but WATCH_EMAIL_TO is not set. Watcher disabled.") + return + + interval = float(os.getenv("WATCH_POLL_INTERVAL", "10")) + + watch_path = Path(watch_dir).expanduser().resolve() + watch_path.mkdir(parents=True, exist_ok=True) + + logger.info("Starting watch-folder: dir=%s, email=%s, interval=%s", watch_dir, email_to, interval) + + def _loop(): + while True: + try: + _scan_directory(watch_path) + except Exception as e: + logger.error("Error scanning WATCH_DIR: %s", e) + time.sleep(interval) + + t = threading.Thread(target=_loop, daemon=True) + t.start()