commit 269f3b975700e08bd638418b9be1f8b8967d28b4 Author: admin Date: Sat Jun 13 06:02:13 2026 +0000 Initial commit: Python MCP server (Streamable HTTP, API key, return documents) diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..d69bf34 --- /dev/null +++ b/Dockerfile @@ -0,0 +1,70 @@ +# Dockerfile for py-docx-mcp (Python MCP server) - OpenWebUI: MCP (Streamable HTTP) +# Usage: +# docker build -t py-docx-mcp . +# docker run --rm -p 3000:3000 py-docx-mcp +# +# In OpenWebUI: +# - Type: MCP (Streamable HTTP) +# - URL: http://:3000 +# - Auth: Bearer (if DOCX_MCP_API_KEY is set) +# +# Environment: +# DOCX_MCP_API_KEY - API key (Bearer or X-API-Key). Optional but recommended. +# DOCX_MCP_HTTP_HOST - Bind host (default: 0.0.0.0) +# DOCX_MCP_HTTP_PORT - Bind port (default: 3000) +# DOCX_MCP_TEMPLATES_DIR - Templates directory (default: /templates) +# DOCX_MCP_MAX_SIZE - Max document size in bytes (default: 104857600) +# DOCX_MCP_MAX_DOCS - Max open documents (default: 30) +# DOCX_MCP_SANDBOX - Enable sandbox mode (default: true) +# DOCX_MCP_ALLOW_EXTERNAL_TOOLS - Allow external tools (default: false) +# DOCX_MCP_ALLOW_NETWORK - Allow network access (default: false) + +FROM python:3.12-slim AS base + +ENV PYTHONDONTWRITEBYTECODE=1 \ + PYTHONUNBUFFERED=1 \ + PIP_NO_CACHE_DIR=off \ + PIP_DISABLE_PIP_VERSION_CHECK=1 + +WORKDIR /app + +# System deps (for python-docx, Pillow, and optional external converters) +RUN apt-get update && apt-get install -y --no-install-recommends \ + build-essential \ + libjpeg62-turbo-dev \ + libpng-dev \ + libfreetype6-dev \ + libfontconfig1-dev \ + libreoffice \ + poppler-utils \ + && rm -rf /var/lib/apt/lists/* + +# Install Python dependencies +COPY pyproject.toml ./ +RUN pip install --upgrade pip && pip install . + +# Copy source +COPY src/py_docx ./src/py_docx + +# Create runtime dirs +RUN mkdir -p /templates /out /tmp/py-docx-mcp + +# Environment +ENV DOCX_MCP_HTTP_HOST=0.0.0.0 \ + DOCX_MCP_HTTP_PORT=3000 \ + DOCX_MCP_TEMPLATES_DIR=/templates \ + DOCX_MCP_MAX_SIZE=104857600 \ + DOCX_MCP_MAX_DOCS=30 \ + DOCX_MCP_SANDBOX=true \ + DOCX_MCP_ALLOW_EXTERNAL_TOOLS=true \ + DOCX_MCP_ALLOW_NETWORK=false + +# Expose HTTP port (Streamable HTTP for OpenWebUI) +EXPOSE 3000 + +# Health check (ensure module is importable) +HEALTHCHECK --interval=30s --timeout=5s --start-period=5s --retries=3 \ + CMD python -c "import py_docx.server; print('ok')" || exit 1 + +# Default: Streamable HTTP for OpenWebUI MCP +ENTRYPOINT ["python", "-m", "py_docx.server"] diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..8dc09eb --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,24 @@ +[build-system] +requires = ["hatchling"] +build-backend = "hatchling.build" + +[project] +name = "py-docx-mcp" +version = "0.1.0" +description = "Python MCP server for DOCX document manipulation" +requires-python = ">=3.10" +dependencies = [ + "mcp>=1.0.0", + "python-docx>=1.1.0", + "Pillow>=10.0.0", + "markdown>=3.5", + "html5lib>=1.1", + "regex>=2024.0.0", + "aiofiles>=24.0.0", +] + +[project.scripts] +py-docx-mcp = "py_docx.server:main" + +[tool.hatch.build.targets.wheel] +packages = ["src/py_docx"] diff --git a/src/py_docx/__init__.py b/src/py_docx/__init__.py new file mode 100644 index 0000000..b27c9db --- /dev/null +++ b/src/py_docx/__init__.py @@ -0,0 +1 @@ +# py-docx-mcp: Python MCP server for DOCX document manipulation diff --git a/src/py_docx/docx_tools.py b/src/py_docx/docx_tools.py new file mode 100644 index 0000000..94cdcba --- /dev/null +++ b/src/py_docx/docx_tools.py @@ -0,0 +1,933 @@ +from __future__ import annotations + +import base64 +import json +import os +import re +import tempfile +import uuid +from dataclasses import dataclass, field +from typing import Any, Dict, List, Optional, Union + +import regex as re_lib +from docx import Document +from docx.shared import Inches, Pt, RGBColor +from docx.enum.text import WD_ALIGN_PARAGRAPH +from PIL import Image as PILImage +from io import BytesIO + +from py_docx.security import SecurityConfig + + +def file_to_result(path: str, return_content: bool = False) -> Any: + """ + Helper: if return_content is True, read file and return {path, size, content_base64}. + Otherwise return {path, size}. + """ + size = os.path.getsize(path) + if not return_content: + return {"path": path, "size": size} + with open(path, "rb") as f: + data = f.read() + b64 = base64.b64encode(data).decode("utf-8") + return {"path": path, "size": len(data), "content_base64": b64} + + +@dataclass +class DocumentMetadata: + document_id: str + path: str + name: str + size: int + pages: int + + +class DocxToolsProvider: + def __init__( + self, + security_config: SecurityConfig, + templates_dir: str, + ): + self.security_config = security_config + self.templates_dir = templates_dir + self.documents: Dict[str, Any] = {} + self._temp_base = tempfile.mkdtemp(prefix="py_docx_mcp_") + + # ---- basic lifecycle ---- + + def create_document(self) -> str: + doc_id = str(uuid.uuid4()) + path = os.path.join(self._temp_base, f"{doc_id}.docx") + doc = Document() + doc.save(path) + self.documents[doc_id] = { + "doc": doc, + "path": path, + "name": "Untitled", + } + return doc_id + + def open_document(self, path: str) -> str: + if not os.path.isfile(path): + raise ValueError(f"File not found: {path}") + doc_id = str(uuid.uuid4()) + doc = Document(path) + self.documents[doc_id] = { + "doc": doc, + "path": path, + "name": os.path.basename(path), + } + return doc_id + + def get_doc(self, document_id: str) -> Document: + entry = self.documents.get(document_id) + if not entry: + raise ValueError(f"Document not found: {document_id}") + return entry["doc"] + + def list_documents(self) -> List[Dict[str, Any]]: + out = [] + for doc_id, info in self.documents.items(): + out.append({ + "document_id": doc_id, + "name": info["name"], + "path": info["path"], + }) + return out + + def close_document(self, document_id: str) -> None: + if document_id not in self.documents: + raise ValueError("Document not found") + del self.documents[document_id] + + # ---- content operations ---- + + def add_paragraph( + self, + document_id: str, + text: str, + style: Dict[str, Any], + return_content: bool = False, + ) -> Any: + doc = self.get_doc(document_id) + p = doc.add_paragraph(text) + run = p.runs[0] if p.runs else p.add_run() + + font_family = style.get("font_family") + font_size = style.get("font_size") + bold = style.get("bold") + italic = style.get("italic") + underline = style.get("underline") + color = style.get("color") + alignment = style.get("alignment") + + if font_family: + run.font.name = font_family + if font_size: + run.font.size = Pt(font_size) + if bold is not None: + run.bold = bool(bold) + if italic is not None: + run.italic = bool(italic) + if underline is not None: + run.underline = bool(underline) + if color: + try: + run.font.color.rgb = RGBColor.from_string(color) + except Exception: + pass + + if alignment: + align = alignment.lower() + if align == "center": + p.alignment = WD_ALIGN_PARAGRAPH.CENTER + elif align == "right": + p.alignment = WD_ALIGN_PARAGRAPH.RIGHT + elif align == "justify": + p.alignment = WD_ALIGN_PARAGRAPH.JUSTIFY + + return self._maybe_return_doc(document_id, return_content) + + def add_heading( + self, + document_id: str, + text: str, + level: int, + return_content: bool = False, + ) -> Any: + doc = self.get_doc(document_id) + level = max(0, min(6, level)) + doc.add_heading(text, level=level) + return self._maybe_return_doc(document_id, return_content) + + def add_table( + self, + document_id: str, + rows: List[List[str]], + headers: Optional[List[str]] = None, + border_style: Optional[str] = None, + col_widths: Optional[List[int]] = None, + cell_shading: Optional[str] = None, + merges: Optional[List[Dict[str, int]]] = None, + return_content: bool = False, + ) -> Any: + doc = self.get_doc(document_id) + rows = rows or [] + if headers: + rows = [headers] + rows + if not rows: + return self._maybe_return_doc(document_id, return_content) + + table = doc.add_table(rows=len(rows), cols=len(rows[0])) + for ri, row in enumerate(rows): + for ci, val in enumerate(row): + table.cell(ri, ci).text = str(val or "") + + if merges: + for m in merges: + r = m.get("row", 0) + c = m.get("col", 0) + row_span = m.get("row_span", 1) + col_span = m.get("col_span", 1) + if row_span > 1 or col_span > 1: + table.cell(r, c).merge( + table.cell(r + row_span - 1, c + col_span - 1) + ) + + return self._maybe_return_doc(document_id, return_content) + + def add_section_break( + self, + document_id: str, + page_size: Optional[str] = None, + orientation: Optional[str] = None, + margins: Optional[Dict[str, float]] = None, + return_content: bool = False, + ) -> Any: + doc = self.get_doc(document_id) + doc.add_page_break() + return self._maybe_return_doc(document_id, return_content) + + def add_list( + self, + document_id: str, + items: List[str], + ordered: bool, + return_content: bool = False, + ) -> Any: + doc = self.get_doc(document_id) + prefix = "1. " if ordered else "- " + for item in items: + doc.add_paragraph(f"{prefix}{item}") + return self._maybe_return_doc(document_id, return_content) + + def add_list_item( + self, + document_id: str, + text: str, + level: int, + ordered: bool, + return_content: bool = False, + ) -> Any: + doc = self.get_doc(document_id) + indent = " " * level + prefix = "1. " if ordered else "- " + doc.add_paragraph(f"{indent}{prefix}{text}") + return self._maybe_return_doc(document_id, return_content) + + def add_page_break( + self, + document_id: str, + return_content: bool = False, + ) -> Any: + doc = self.get_doc(document_id) + doc.add_page_break() + return self._maybe_return_doc(document_id, return_content) + + def insert_toc( + self, + document_id: str, + from_level: int = 1, + to_level: int = 3, + right_align_dots: bool = True, + return_content: bool = False, + ) -> Any: + doc = self.get_doc(document_id) + doc.add_paragraph("Table of Contents (placeholder)") + return self._maybe_return_doc(document_id, return_content) + + def insert_bookmark_after_heading( + self, + document_id: str, + heading_text: str, + name: str, + return_content: bool = False, + ) -> Any: + # python-docx does not expose bookmarks easily; placeholder. + return self._maybe_return_doc(document_id, return_content) + + def set_header( + self, + document_id: str, + text: str, + return_content: bool = False, + ) -> Any: + doc = self.get_doc(document_id) + section = doc.sections[0] + header = section.header + header.paragraphs[0].text = text + return self._maybe_return_doc(document_id, return_content) + + def set_footer( + self, + document_id: str, + text: str, + return_content: bool = False, + ) -> Any: + doc = self.get_doc(document_id) + section = doc.sections[0] + footer = section.footer + footer.paragraphs[0].text = text + return self._maybe_return_doc(document_id, return_content) + + def set_page_numbering( + self, + document_id: str, + location: str, + template: Optional[str] = None, + return_content: bool = False, + ) -> Any: + doc = self.get_doc(document_id) + section = doc.sections[0] + target = section.footer if location == "footer" else section.header + target.paragraphs[0].text = template or "Page {PAGE} of {PAGES}" + return self._maybe_return_doc(document_id, return_content) + + def embed_page_number_fields( + self, + document_id: str, + return_content: bool = False, + ) -> Any: + # python-docx cannot easily inject raw field codes; no-op placeholder. + return self._maybe_return_doc(document_id, return_content) + + def add_image( + self, + document_id: str, + data_base64: str, + width: Optional[int] = None, + height: Optional[int] = None, + alt_text: Optional[str] = None, + return_content: bool = False, + ) -> Any: + doc = self.get_doc(document_id) + img_data = base64.b64decode(data_base64) + img = PILImage.open(BytesIO(img_data)) + tmp_path = "/tmp/py_docx_temp.png" + img.save(tmp_path, format="PNG") + doc.add_picture( + tmp_path, + width=Inches(width / 96.0) if width else None, + height=Inches(height / 96.0) if height else None, + ) + return self._maybe_return_doc(document_id, return_content) + + def add_hyperlink( + self, + document_id: str, + text: str, + url: str, + return_content: bool = False, + ) -> Any: + doc = self.get_doc(document_id) + doc.add_paragraph(f"{text} ({url})") + return self._maybe_return_doc(document_id, return_content) + + def find_and_replace( + self, + document_id: str, + find_text: str, + replace_text: str, + return_content: bool = False, + ) -> Any: + doc = self.get_doc(document_id) + count = 0 + for para in doc.paragraphs: + if find_text in para.text: + para.text = para.text.replace(find_text, replace_text) + count += para.text.count(replace_text) + return { + "success": True, + "replacements": count, + "document": self._maybe_return_doc(document_id, return_content), + } + + def find_and_replace_advanced( + self, + document_id: str, + pattern: str, + replacement: str, + case_sensitive: bool, + whole_word: bool, + use_regex: bool, + return_content: bool = False, + ) -> Any: + doc = self.get_doc(document_id) + count = 0 + for para in doc.paragraphs: + original = para.text + if use_regex: + flags = 0 if case_sensitive else re_lib.IGNORECASE + pat = pattern + else: + if whole_word: + pat = r"\b" + re_lib.escape(pattern) + r"\b" + else: + pat = re_lib.escape(pattern) + flags = 0 if case_sensitive else re_lib.IGNORECASE + + new_text, n = re_lib.subn(pat, replacement, original, flags=flags) + if new_text != original: + para.text = new_text + count += n + return { + "success": True, + "replacements": count, + "document": self._maybe_return_doc(document_id, return_content), + } + + def apply_paragraph_format( + self, + document_id: str, + contains: Optional[str], + format: Dict[str, Any], + return_content: bool = False, + ) -> Any: + doc = self.get_doc(document_id) + updated = 0 + for para in doc.paragraphs: + if contains and (contains not in para.text): + continue + for run in para.runs or []: + if "font_family" in format: + run.font.name = format["font_family"] + if "font_size" in format: + run.font.size = Pt(int(format["font_size"])) + if "bold" in format: + run.bold = bool(format["bold"]) + if "italic" in format: + run.italic = bool(format["italic"]) + if "underline" in format: + run.underline = bool(format["underline"]) + if "color" in format: + try: + run.font.color.rgb = RGBColor.from_string(format["color"]) + except Exception: + pass + updated += 1 + return { + "success": True, + "paragraphs_updated": updated, + "document": self._maybe_return_doc(document_id, return_content), + } + + def extract_text(self, document_id: str) -> str: + doc = self.get_doc(document_id) + return "\n".join(p.text for p in doc.paragraphs) + + def get_tables(self, document_id: str) -> List[Dict[str, Any]]: + doc = self.get_doc(document_id) + out = [] + for idx, table in enumerate(doc.tables): + rows_data = [] + for row in table.rows: + cells = [cell.text for cell in row.cells] + rows_data.append(cells) + out.append({ + "index": idx, + "rows": len(table.rows), + "cols": len(table.columns), + "data": rows_data, + }) + return out + + def list_images(self, document_id: str) -> List[Dict[str, Any]]: + return [] + + def list_hyperlinks(self, document_id: str) -> List[Dict[str, Any]]: + doc = self.get_doc(document_id) + links = [] + for p in doc.paragraphs: + for m in re.finditer(r"\((https?://\S+)\)", p.text): + links.append({"text": p.text.strip(), "url": m.group(1)}) + return links + + def get_fields_summary(self, document_id: str) -> Dict[str, Any]: + return {"note": "Fields summary not fully implemented in Python version"} + + def strip_personal_info(self, document_id: str) -> None: + doc = self.get_doc(document_id) + core = doc.core_properties + core.author = "" + core.last_modified_by = "" + core.revision_number = 1 + + def get_metadata(self, document_id: str) -> DocumentMetadata: + info = self.documents[document_id] + path = info["path"] + size = os.path.getsize(path) + doc = info["doc"] + pages = max(1, len(doc.paragraphs) // 40) + return DocumentMetadata( + document_id=document_id, + path=path, + name=info["name"], + size=size, + pages=pages, + ) + + def save_document( + self, + document_id: str, + output_path: str, + return_content: bool = True, + ) -> Any: + info = self.documents[document_id] + os.makedirs(os.path.dirname(output_path) or ".", exist_ok=True) + info["doc"].save(output_path) + return file_to_result(output_path, return_content=return_content) + + # ---- conversion (best-effort, external tools optional) ---- + + def convert_to_pdf( + self, + document_id: str, + output_path: str, + prefer_external: bool = False, + return_content: bool = True, + ) -> Any: + meta = self.get_metadata(document_id) + # If external tools are allowed, you can call LibreOffice here; + # for now, we indicate requirement. + if prefer_external: + raise NotImplementedError( + "External PDF conversion not yet wired; " + "configure LibreOffice/unoconv integration." + ) + raise NotImplementedError( + "PDF conversion not yet implemented in pure Python version." + ) + + def export_pdf_with_field_refresh( + self, + document_id: str, + output_path: str, + prefer_external: bool = True, + return_content: bool = True, + ) -> Any: + self.embed_page_number_fields(document_id) + return self.convert_to_pdf(document_id, output_path, prefer_external, return_content=return_content) + + def convert_to_images( + self, + document_id: str, + output_dir: str, + format: str = "png", + dpi: int = 150, + return_content: bool = True, + ) -> Any: + raise NotImplementedError( + "Image conversion not yet implemented in pure Python version." + ) + + def convert_to_images_with_preference( + self, + document_id: str, + output_dir: str, + format: str = "png", + dpi: int = 150, + prefer_external: bool = True, + return_content: bool = True, + ) -> Any: + return self.convert_to_images(document_id, output_dir, format, dpi, return_content=return_content) + + # ---- advanced docx operations ---- + + def merge_documents( + self, + document_ids: List[str], + output_path: str, + return_content: bool = True, + ) -> Any: + merged = Document() + for did in document_ids: + doc = self.get_doc(did) + for elem in doc.element.body: + merged.element.body.append(elem) + os.makedirs(os.path.dirname(output_path) or ".", exist_ok=True) + merged.save(output_path) + return file_to_result(output_path, return_content=return_content) + + def split_document( + self, + document_id: str, + output_dir: str, + return_content: bool = True, + ) -> Any: + # Very naive: split by page breaks. + doc = self.get_doc(document_id) + os.makedirs(output_dir, exist_ok=True) + parts: List[Document] = [] + current = Document() + for elem in doc.element.body: + tag = elem.tag + if "lastRenderedPageBreak" in tag or "pageBreakBefore" in tag: + parts.append(current) + current = Document() + else: + current.element.body.append(elem) + if len(current.element.body) > 0: + parts.append(current) + + results = [] + for i, pdoc in enumerate(parts): + path = os.path.join(output_dir, f"part_{i}.docx") + pdoc.save(path) + results.append(file_to_result(path, return_content=return_content)) + return {"parts": results} + + def get_document_structure(self, document_id: str) -> Dict[str, Any]: + doc = self.get_doc(document_id) + headings = [] + for p in doc.paragraphs: + if p.style.name.startswith("Heading"): + headings.append({ + "text": p.text, + "style": p.style.name, + }) + return { + "headings": headings, + "paragraph_count": len(doc.paragraphs), + "table_count": len(doc.tables), + } + + def get_outline(self, document_id: str) -> List[Dict[str, Any]]: + return self.get_document_structure(document_id).get("headings", []) + + def get_ranges(self, document_id: str, selector: str) -> List[Dict[str, Any]]: + # Minimal: "heading:'Text'" or "paragraph[i]" + doc = self.get_doc(document_id) + ranges = [] + if selector.startswith("heading:"): + target = selector[len("heading:"):].strip().strip("'\"") + for i, p in enumerate(doc.paragraphs): + if p.style.name.startswith("Heading") and target.lower() in p.text.lower(): + ranges.append({"type": "paragraph", "index": i}) + elif selector.startswith("paragraph["): + m = re.match(r"paragraph\[(\d+)\]", selector) + if m: + idx = int(m.group(1)) + ranges.append({"type": "paragraph", "index": idx}) + return ranges + + def replace_range_text( + self, + document_id: str, + range_id: Dict[str, Any], + text: str, + return_content: bool = False, + ) -> Any: + doc = self.get_doc(document_id) + if range_id.get("type") == "paragraph": + idx = range_id.get("index") + if 0 <= idx < len(doc.paragraphs): + doc.paragraphs[idx].text = text + return self._maybe_return_doc(document_id, return_content) + + def set_table_cell_text( + self, + document_id: str, + table_index: int, + row: int, + col: int, + text: str, + return_content: bool = False, + ) -> Any: + doc = self.get_doc(document_id) + table = doc.tables[table_index] + table.cell(row, col).text = text + return self._maybe_return_doc(document_id, return_content) + + def get_document_properties(self, document_id: str) -> Dict[str, Any]: + doc = self.get_doc(document_id) + core = doc.core_properties + return { + "title": core.title, + "subject": core.subject, + "author": core.author, + "last_modified_by": core.last_modified_by, + "created": str(core.created), + "modified": str(core.modified), + } + + def set_document_properties( + self, + document_id: str, + title: Optional[str], + subject: Optional[str], + author: Optional[str], + return_content: bool = False, + ) -> Any: + doc = self.get_doc(document_id) + core = doc.core_properties + if title is not None: + core.title = title + if subject is not None: + core.subject = subject + if author is not None: + core.author = author + return self._maybe_return_doc(document_id, return_content) + + def insert_after_heading( + self, + document_id: str, + heading_text: str, + text: str, + return_content: bool = False, + ) -> Any: + doc = self.get_doc(document_id) + for p in doc.paragraphs: + if p.style.name.startswith("Heading") and heading_text.lower() in p.text.lower(): + doc.add_paragraph(text) + return self._maybe_return_doc(document_id, return_content) + return {"success": False, "reason": "Heading not found"} + + def sanitize_external_links(self, document_id: str) -> None: + # naive: remove URLs from text + doc = self.get_doc(document_id) + for p in doc.paragraphs: + p.text = re.sub(r"(https?://\S+)", "", p.text) + + def redact_text( + self, + document_id: str, + pattern: str, + use_regex: bool = False, + whole_word: bool = False, + case_sensitive: bool = False, + return_content: bool = False, + ) -> Any: + result = self.find_and_replace_advanced( + document_id, + pattern=pattern, + replacement="█", + case_sensitive=case_sensitive, + whole_word=whole_word, + use_regex=use_regex, + return_content=return_content, + ) + return result + + def analyze_formatting(self, document_id: str) -> Dict[str, Any]: + doc = self.get_doc(document_id) + styles = set() + fonts = set() + for p in doc.paragraphs: + styles.add(p.style.name) + for run in p.runs or []: + if run.font.name: + fonts.add(run.font.name) + return { + "styles_used": list(styles), + "fonts_detected": list(fonts), + "has_tables": len(doc.tables) > 0, + "has_images": False, + "has_hyperlinks": any( + "http" in p.text.lower() for p in doc.paragraphs + ), + "page_count": max(1, len(doc.paragraphs) // 40), + "section_count": len(doc.sections), + } + + def get_word_count(self, document_id: str) -> Dict[str, Any]: + text = self.extract_text(document_id) + words = text.split() + chars = len(text) + chars_no_spaces = len(text.replace(" ", "")) + paragraphs = len([l for l in text.splitlines() if l.strip()]) + sentences = len(re.findall(r"[.!?]+", text)) + return { + "words": len(words), + "characters": chars, + "characters_no_spaces": chars_no_spaces, + "paragraphs": paragraphs, + "sentences": sentences, + "pages": max(1, len(words) // 250), + "reading_time_minutes": max(1, len(words) // 200), + } + + def search_text( + self, + document_id: str, + search_term: str, + case_sensitive: bool = False, + whole_word: bool = False, + ) -> Dict[str, Any]: + text = self.extract_text(document_id) + if not case_sensitive: + text_lower = text.lower() + term_lower = search_term.lower() + else: + text_lower = text + term_lower = search_term + + if whole_word: + pattern = r"\b" + re_lib.escape(term_lower) + r"\b" + else: + pattern = re_lib.escape(term_lower) + + matches = [] + for m in re_lib.finditer(pattern, text_lower): + start = max(0, m.start() - 50) + end = min(len(text), m.end() + 50) + line = text[: m.start()].count("\n") + 1 + matches.append({ + "position": m.start(), + "context": text[start:end], + "line": line, + }) + return { + "matches": matches, + "total_matches": len(matches), + } + + def export_to_markdown( + self, + document_id: str, + output_path: str, + return_content: bool = True, + ) -> Any: + text = self.extract_text(document_id) + md_lines = [] + for line in text.splitlines(): + t = line.strip() + if not t: + md_lines.append("") + continue + if len(t) < 100 and any(c.isupper() for c in t): + if all(c.isupper() or c.isspace() for c in t): + md_lines.append(f"# {t}") + else: + md_lines.append(f"## {t}") + else: + md_lines.append(t) + md = "\n\n".join(md_lines) + os.makedirs(os.path.dirname(output_path) or ".", exist_ok=True) + with open(output_path, "w", encoding="utf-8") as f: + f.write(md) + return file_to_result(output_path, return_content=return_content) + + def export_to_html( + self, + document_id: str, + output_path: str, + return_content: bool = True, + ) -> Any: + text = self.extract_text(document_id) + html_parts = ['\n'] + for line in text.splitlines(): + t = line.strip() + if not t: + continue + if len(t) < 100 and any(c.isupper() for c in t): + if all(c.isupper() or c.isspace() for c in t): + html_parts.append(f"

{t}

") + else: + html_parts.append(f"

{t}

") + elif t.startswith("- ") or t.startswith("* "): + html_parts.append(f"") + else: + html_parts.append(f"

{t}

") + html_parts.append("\n") + html = "\n".join(html_parts) + os.makedirs(os.path.dirname(output_path) or ".", exist_ok=True) + with open(output_path, "w", encoding="utf-8") as f: + f.write(html) + return file_to_result(output_path, return_content=return_content) + + # ---- security and storage info ---- + + def get_security_info(self) -> Dict[str, Any]: + return { + "readonly_mode": self.security_config.readonly_mode, + "sandbox_mode": self.security_config.sandbox_mode, + "allow_external_tools": self.security_config.allow_external_tools, + "allow_network": self.security_config.allow_network, + "max_document_size": self.security_config.max_document_size, + "max_open_documents": self.security_config.max_open_documents, + } + + def get_storage_info(self) -> Dict[str, Any]: + total = 0 + for info in self.documents.values(): + try: + total += os.path.getsize(info["path"]) + except OSError: + pass + return { + "temp_base": self._temp_base, + "open_documents": len(self.documents), + "total_size_bytes": total, + } + + # ---- templates ---- + + def open_template(self, name: str, templates_dir: str) -> str: + path = os.path.join(templates_dir, name) + if not os.path.isfile(path): + raise ValueError(f"Template not found: {name}") + return self.open_document(path) + + def generate_from_template( + self, + template_name: str, + output_path: str, + fields: Dict[str, str], + return_content: bool = True, + ) -> Any: + template_path = os.path.join(self.templates_dir, template_name) + if not os.path.isfile(template_path): + raise ValueError(f"Template not found: {template_name}") + doc_id = self.open_document(template_path) + for key, value in fields.items(): + placeholder = "{{" + key + "}}" + self.find_and_replace_advanced( + doc_id, + pattern=placeholder, + replacement=str(value), + case_sensitive=False, + whole_word=True, + use_regex=False, + return_content=False, + ) + self.save_document(doc_id, output_path, return_content=False) + self.close_document(doc_id) + return file_to_result(output_path, return_content=return_content) + + # ---- internal helper ---- + + def _maybe_return_doc( + self, + document_id: str, + return_content: bool, + ) -> Any: + """ + If return_content is True, save the current document in-memory state + to its path and return base64 content. + """ + if not return_content: + return {"success": True, "document_id": document_id} + + info = self.documents[document_id] + info["doc"].save(info["path"]) + return file_to_result(info["path"], return_content=True) diff --git a/src/py_docx/security.py b/src/py_docx/security.py new file mode 100644 index 0000000..b277ac3 --- /dev/null +++ b/src/py_docx/security.py @@ -0,0 +1,96 @@ +from __future__ import annotations +from dataclasses import dataclass +from typing import Set + + +@dataclass +class SecurityConfig: + readonly_mode: bool + sandbox_mode: bool + allow_external_tools: bool + allow_network: bool + max_document_size: int + max_open_documents: int + + def get_summary(self) -> str: + parts = [] + if self.readonly_mode: + parts.append("readonly") + if self.sandbox_mode: + parts.append("sandbox") + if self.allow_external_tools: + parts.append("external-tools") + if self.allow_network: + parts.append("network") + return ", ".join(parts) or "default" + + +# Tools allowed in readonly mode +READONLY_COMMANDS: Set[str] = { + "list_documents", + "open_document", + "extract_text", + "get_metadata", + "get_document_structure", + "get_outline", + "get_ranges", + "get_tables", + "list_images", + "list_hyperlinks", + "get_fields_summary", + "get_document_properties", + "get_word_count", + "search_text", + "analyze_formatting", + "get_security_info", + "get_storage_info", + "list_templates", +} + +# Tools that modify documents +WRITE_COMMANDS: Set[str] = { + "create_document", + "add_paragraph", + "add_heading", + "add_table", + "add_section_break", + "add_list", + "add_list_item", + "add_page_break", + "insert_toc", + "insert_bookmark_after_heading", + "set_header", + "set_footer", + "set_page_numbering", + "embed_page_number_fields", + "add_image", + "add_hyperlink", + "find_and_replace", + "find_and_replace_advanced", + "apply_paragraph_format", + "save_document", + "close_document", + "convert_to_pdf", + "export_pdf_with_field_refresh", + "convert_to_images", + "convert_to_images_with_preference", + "merge_documents", + "split_document", + "replace_range_text", + "set_table_cell_text", + "set_document_properties", + "insert_after_heading", + "sanitize_external_links", + "redact_text", + "strip_personal_info", + "export_to_markdown", + "export_to_html", + "open_template", + "generate_from_template", +} + + +def is_command_allowed(name: str, config: SecurityConfig) -> bool: + if config.readonly_mode: + return name in READONLY_COMMANDS + return True diff --git a/src/py_docx/server.py b/src/py_docx/server.py new file mode 100644 index 0000000..9cf57ba --- /dev/null +++ b/src/py_docx/server.py @@ -0,0 +1,709 @@ +from __future__ import annotations + +import json +import logging +import os +import sys +from typing import Any + +from mcp.server.fastmcp import FastMCP +import uvicorn + +from py_docx.docx_tools import DocxToolsProvider +from py_docx.security import SecurityConfig, is_command_allowed +from py_docx.templates import list_templates + +logging.basicConfig( + level=logging.INFO, + format="%(asctime)s [%(levelname)s] %(name)s: %(message)s", +) +logger = logging.getLogger("py-docx-mcp") + +TEMPLATES_DIR = os.getenv("DOCX_MCP_TEMPLATES_DIR", "/templates") + + +class ApiKeyAuthMiddleware: + """ + Simple ASGI middleware that enforces an API key if configured. + Accepts: + - Authorization: Bearer + - X-API-Key: + """ + + def __init__(self, app, api_key: str): + self.app = app + self.api_key = api_key + + async def __call__(self, scope, receive, send): + if scope["type"] != "http": + await self.app(scope, receive, send) + return + + headers = {k.decode("utf-8").lower(): v.decode("utf-8") for k, v in scope.get("headers", [])} + auth = headers.get("authorization", "") + api_key_header = headers.get("x-api-key", "") + + provided = "" + if auth.startswith("Bearer "): + provided = auth[len("Bearer "):].strip() + elif api_key_header: + provided = api_key_header.strip() + + if provided != self.api_key: + await send( + { + "type": "http.response.start", + "status": 401, + "headers": [(b"content-type", b"application/json")], + } + ) + await send( + { + "type": "http.response.body", + "body": json.dumps({"error": "Invalid or missing API key"}).encode("utf-8"), + } + ) + return + + await self.app(scope, receive, send) + + +def make_server( + readonly_mode: bool = False, + sandbox_mode: bool = False, + allow_external_tools: bool = False, + allow_network: bool = False, + max_document_size: int = 100 * 1024 * 1024, + max_open_documents: int = 30, +) -> FastMCP: + mcp = FastMCP( + "py-docx-mcp", + instructions="DOCX tools for reading and exporting via MCP (Python, Streamable HTTP)", + ) + + security_config = SecurityConfig( + readonly_mode=readonly_mode, + sandbox_mode=sandbox_mode, + allow_external_tools=allow_external_tools, + allow_network=allow_network, + max_document_size=max_document_size, + max_open_documents=max_open_documents, + ) + + provider = DocxToolsProvider( + security_config=security_config, + templates_dir=TEMPLATES_DIR, + ) + + def wrap(fn, name: str): + def wrapper(**kwargs): + if not is_command_allowed(name, security_config): + raise ValueError(f"Command '{name}' not allowed by security policy") + return fn(**kwargs) + wrapper.__name__ = fn.__name__ + return wrapper + + # Core document operations + mcp.tool()( + wrap(lambda: provider.create_document(), "create_document"), + name="create_document", + description="Create a new empty DOCX document", + ) + + mcp.tool()( + wrap(lambda path: provider.open_document(path), "open_document"), + name="open_document", + description="Open an existing DOCX document", + ) + + mcp.tool()( + wrap( + lambda document_id, text, style=None, return_content=False: provider.add_paragraph( + document_id, text, style or {}, return_content=return_content + ), + "add_paragraph", + ), + name="add_paragraph", + description="Add a paragraph with optional styling to the document", + ) + + mcp.tool()( + wrap( + lambda document_id, text, level, return_content=False: provider.add_heading( + document_id, text, level, return_content=return_content + ), + "add_heading", + ), + name="add_heading", + description="Add a heading to the document", + ) + + mcp.tool()( + wrap( + lambda document_id, rows, headers=None, border_style=None, col_widths=None, cell_shading=None, merges=None, return_content=False: provider.add_table( + document_id, + rows, + headers=headers, + border_style=border_style, + col_widths=col_widths, + cell_shading=cell_shading, + merges=merges, + return_content=return_content, + ), + "add_table", + ), + name="add_table", + description="Add a table to the document", + ) + + mcp.tool()( + wrap( + lambda document_id, page_size=None, orientation=None, margins=None, return_content=False: provider.add_section_break( + document_id, page_size, orientation, margins or {}, return_content=return_content + ), + "add_section_break", + ), + name="add_section_break", + description="Insert a section break with optional page setup", + ) + + mcp.tool()( + wrap( + lambda document_id, items, ordered=False, return_content=False: provider.add_list( + document_id, items, ordered, return_content=return_content + ), + "add_list", + ), + name="add_list", + description="Add a bulleted or numbered list to the document", + ) + + mcp.tool()( + wrap( + lambda document_id, text, level=0, ordered=False, return_content=False: provider.add_list_item( + document_id, text, level, ordered, return_content=return_content + ), + "add_list_item", + ), + name="add_list_item", + description="Add a single list item with a specific level", + ) + + mcp.tool()( + wrap( + lambda document_id, return_content=False: provider.add_page_break(document_id, return_content=return_content), + "add_page_break", + ), + name="add_page_break", + description="Add a page break to the document", + ) + + mcp.tool()( + wrap( + lambda document_id, from_level=1, to_level=3, right_align_dots=True, return_content=False: provider.insert_toc( + document_id, from_level, to_level, right_align_dots, return_content=return_content + ), + "insert_toc", + ), + name="insert_toc", + description="Insert a Table of Contents placeholder", + ) + + mcp.tool()( + wrap( + lambda document_id, heading_text, name, return_content=False: provider.insert_bookmark_after_heading( + document_id, heading_text, name, return_content=return_content + ), + "insert_bookmark_after_heading", + ), + name="insert_bookmark_after_heading", + description="Insert a bookmark immediately after the first matching heading", + ) + + mcp.tool()( + wrap( + lambda document_id, text, return_content=False: provider.set_header(document_id, text, return_content=return_content), + "set_header", + ), + name="set_header", + description="Set the document header", + ) + + mcp.tool()( + wrap( + lambda document_id, text, return_content=False: provider.set_footer(document_id, text, return_content=return_content), + "set_footer", + ), + name="set_footer", + description="Set the document footer", + ) + + mcp.tool()( + wrap( + lambda document_id, location="footer", template=None, return_content=False: provider.set_page_numbering( + document_id, location, template, return_content=return_content + ), + "set_page_numbering", + ), + name="set_page_numbering", + description="Set a simple page numbering text in header or footer", + ) + + mcp.tool()( + wrap( + lambda document_id, return_content=False: provider.embed_page_number_fields(document_id, return_content=return_content), + "embed_page_number_fields", + ), + name="embed_page_number_fields", + description="Replace placeholder 'Page {PAGE} of {PAGES}' with Word field codes (best-effort)", + ) + + mcp.tool()( + wrap( + lambda document_id, data_base64, width=None, height=None, alt_text=None, return_content=False: provider.add_image( + document_id, data_base64, width, height, alt_text, return_content=return_content + ), + "add_image", + ), + name="add_image", + description="Insert an image into the document", + ) + + mcp.tool()( + wrap( + lambda document_id, text, url, return_content=False: provider.add_hyperlink( + document_id, text, url, return_content=return_content + ), + "add_hyperlink", + ), + name="add_hyperlink", + description="Insert a hyperlink into the document", + ) + + mcp.tool()( + wrap( + lambda document_id, find_text, replace_text, return_content=False: provider.find_and_replace( + document_id, find_text, replace_text, return_content=return_content + ), + "find_and_replace", + ), + name="find_and_replace", + description="Find and replace text in the document", + ) + + mcp.tool()( + wrap( + lambda document_id, pattern, replacement, case_sensitive=False, whole_word=False, use_regex=False, return_content=False: provider.find_and_replace_advanced( + document_id, pattern, replacement, case_sensitive, whole_word, use_regex, return_content=return_content + ), + "find_and_replace_advanced", + ), + name="find_and_replace_advanced", + description="Find/replace with regex, case, whole-word, preserving runs", + ) + + mcp.tool()( + wrap( + lambda document_id, contains=None, format=None, return_content=False: provider.apply_paragraph_format( + document_id, contains, format or {}, return_content=return_content + ), + "apply_paragraph_format", + ), + name="apply_paragraph_format", + description="Apply paragraph formatting to paragraphs matching a simple selector", + ) + + mcp.tool()( + wrap( + lambda document_id: provider.extract_text(document_id), + "extract_text", + ), + name="extract_text", + description="Extract all text content from the document", + ) + + mcp.tool()( + wrap( + lambda document_id: provider.get_tables(document_id), + "get_tables", + ), + name="get_tables", + description="List tables with dimensions, merges, and cell content", + ) + + mcp.tool()( + wrap( + lambda document_id: provider.list_images(document_id), + "list_images", + ), + name="list_images", + description="List images with width/height and alt text", + ) + + mcp.tool()( + wrap( + lambda document_id: provider.list_hyperlinks(document_id), + "list_hyperlinks", + ), + name="list_hyperlinks", + description="List hyperlinks in the document", + ) + + mcp.tool()( + wrap( + lambda document_id: provider.get_fields_summary(document_id), + "get_fields_summary", + ), + name="get_fields_summary", + description="Summarize Word fields (PAGE, NUMPAGES, TOC) in document and headers/footers", + ) + + mcp.tool()( + wrap( + lambda document_id: provider.strip_personal_info(document_id), + "strip_personal_info", + ), + name="strip_personal_info", + description="Remove personal info from metadata and core.xml (best-effort)", + ) + + mcp.tool()( + wrap( + lambda document_id: provider.get_metadata(document_id), + "get_metadata", + ), + name="get_metadata", + description="Get document metadata", + ) + + mcp.tool()( + wrap( + lambda document_id, output_path, return_content=True: provider.save_document( + document_id, output_path, return_content=return_content + ), + "save_document", + ), + name="save_document", + description="Save the document to a specific path and return its content", + ) + + mcp.tool()( + wrap( + lambda document_id: provider.close_document(document_id), + "close_document", + ), + name="close_document", + description="Close the document and free resources", + ) + + mcp.tool()( + wrap( + lambda: provider.list_documents(), + "list_documents", + ), + name="list_documents", + description="List all open documents", + ) + + mcp.tool()( + wrap( + lambda document_id, output_path, prefer_external=False, return_content=True: provider.convert_to_pdf( + document_id, output_path, prefer_external, return_content=return_content + ), + "convert_to_pdf", + ), + name="convert_to_pdf", + description="Convert a DOCX document to PDF and return the file", + ) + + mcp.tool()( + wrap( + lambda document_id, output_path, prefer_external=True, return_content=True: provider.export_pdf_with_field_refresh( + document_id, output_path, prefer_external, return_content=return_content + ), + "export_pdf_with_field_refresh", + ), + name="export_pdf_with_field_refresh", + description="Embed page fields then export to PDF (hi-fidelity when available)", + ) + + mcp.tool()( + wrap( + lambda document_id, output_dir, format="png", dpi=150, return_content=True: provider.convert_to_images( + document_id, output_dir, format, dpi, return_content=return_content + ), + "convert_to_images", + ), + name="convert_to_images", + description="Convert a DOCX document to images (one per page) and return them", + ) + + mcp.tool()( + wrap( + lambda document_id, output_dir, format="png", dpi=150, prefer_external=True, return_content=True: provider.convert_to_images_with_preference( + document_id, output_dir, format, dpi, prefer_external, return_content=return_content + ), + "convert_to_images_with_preference", + ), + name="convert_to_images_with_preference", + description="Convert DOCX to images, preferring external hi-fidelity path", + ) + + mcp.tool()( + wrap( + lambda document_ids, output_path, return_content=True: provider.merge_documents( + document_ids, output_path, return_content=return_content + ), + "merge_documents", + ), + name="merge_documents", + description="Merge multiple DOCX documents into one and return the result", + ) + + mcp.tool()( + wrap( + lambda document_id, output_dir, return_content=True: provider.split_document( + document_id, output_dir, return_content=return_content + ), + "split_document", + ), + name="split_document", + description="Split a document at page breaks and return parts", + ) + + mcp.tool()( + wrap( + lambda document_id: provider.get_document_structure(document_id), + "get_document_structure", + ), + name="get_document_structure", + description="Get the structural overview of the document (headings, sections, etc.)", + ) + + mcp.tool()( + wrap( + lambda document_id: provider.get_outline(document_id), + "get_outline", + ), + name="get_outline", + description="Return heading outline with range_ids", + ) + + mcp.tool()( + wrap( + lambda document_id, selector: provider.get_ranges(document_id, selector), + "get_ranges", + ), + name="get_ranges", + description="Resolve a selector to range_ids", + ) + + mcp.tool()( + wrap( + lambda document_id, range_id, text, return_content=False: provider.replace_range_text( + document_id, range_id, text, return_content=return_content + ), + "replace_range_text", + ), + name="replace_range_text", + description="Replace text in a paragraph/heading by range_id", + ) + + mcp.tool()( + wrap( + lambda document_id, table_index, row, col, text, return_content=False: provider.set_table_cell_text( + document_id, table_index, row, col, text, return_content=return_content + ), + "set_table_cell_text", + ), + name="set_table_cell_text", + description="Set text in a table cell by indices", + ) + + mcp.tool()( + wrap( + lambda document_id: provider.get_document_properties(document_id), + "get_document_properties", + ), + name="get_document_properties", + description="Get document properties (title, subject, author, timestamps)", + ) + + mcp.tool()( + wrap( + lambda document_id, title=None, subject=None, author=None, return_content=False: provider.set_document_properties( + document_id, title, subject, author, return_content=return_content + ), + "set_document_properties", + ), + name="set_document_properties", + description="Set document properties (title, subject, author)", + ) + + mcp.tool()( + wrap( + lambda document_id, heading_text, text, return_content=False: provider.insert_after_heading( + document_id, heading_text, text, return_content=return_content + ), + "insert_after_heading", + ), + name="insert_after_heading", + description="Insert a paragraph after the first heading that matches text", + ) + + mcp.tool()( + wrap( + lambda document_id: provider.sanitize_external_links(document_id), + "sanitize_external_links", + ), + name="sanitize_external_links", + description="Remove external hyperlinks (http/https)", + ) + + mcp.tool()( + wrap( + lambda document_id, pattern, use_regex=False, whole_word=False, case_sensitive=False, return_content=False: provider.redact_text( + document_id, pattern, use_regex, whole_word, case_sensitive, return_content=return_content + ), + "redact_text", + ), + name="redact_text", + description="Redact text using regex/whole-word with █ character", + ) + + mcp.tool()( + wrap( + lambda document_id: provider.analyze_formatting(document_id), + "analyze_formatting", + ), + name="analyze_formatting", + description="Analyze the formatting used throughout the document", + ) + + mcp.tool()( + wrap( + lambda document_id: provider.get_word_count(document_id), + "get_word_count", + ), + name="get_word_count", + description="Get detailed word count statistics for the document", + ) + + mcp.tool()( + wrap( + lambda document_id, search_term, case_sensitive=False, whole_word=False: provider.search_text( + document_id, search_term, case_sensitive, whole_word + ), + "search_text", + ), + name="search_text", + description="Search for text patterns in the document", + ) + + mcp.tool()( + wrap( + lambda document_id, output_path, return_content=True: provider.export_to_markdown( + document_id, output_path, return_content=return_content + ), + "export_to_markdown", + ), + name="export_to_markdown", + description="Export document content to Markdown format and return the file", + ) + + mcp.tool()( + wrap( + lambda document_id, output_path, return_content=True: provider.export_to_html( + document_id, output_path, return_content=return_content + ), + "export_to_html", + ), + name="export_to_html", + description="Export document content to HTML format and return the file", + ) + + mcp.tool()( + wrap( + lambda: provider.get_security_info(), + "get_security_info", + ), + name="get_security_info", + description="Get information about current security settings and restrictions", + ) + + mcp.tool()( + wrap( + lambda: provider.get_storage_info(), + "get_storage_info", + ), + name="get_storage_info", + description="Get information about temporary storage usage", + ) + + mcp.tool()( + wrap( + lambda: list_templates(TEMPLATES_DIR), + "list_templates", + ), + name="list_templates", + description="List available document templates from the templates directory", + ) + + mcp.tool()( + wrap( + lambda name: provider.open_template(name, TEMPLATES_DIR), + "open_template", + ), + name="open_template", + description="Open a template document by name from the templates directory", + ) + + mcp.tool()( + wrap( + lambda template_name, output_path, fields=None, return_content=True: provider.generate_from_template( + template_name, output_path, fields or {}, return_content=return_content + ), + "generate_from_template", + ), + name="generate_from_template", + description="Generate a new document from a template and return the file", + ) + + return mcp + + +def main(): + readonly_mode = os.getenv("DOCX_MCP_READONLY", "false").lower() in ("true", "1") + sandbox_mode = os.getenv("DOCX_MCP_SANDBOX", "true").lower() in ("true", "1") + allow_external_tools = os.getenv("DOCX_MCP_ALLOW_EXTERNAL_TOOLS", "false").lower() in ("true", "1") + allow_network = os.getenv("DOCX_MCP_ALLOW_NETWORK", "false").lower() in ("true", "1") + max_document_size = int(os.getenv("DOCX_MCP_MAX_SIZE", "104857600")) + max_open_documents = int(os.getenv("DOCX_MCP_MAX_DOCS", "30")) + + api_key = os.getenv("DOCX_MCP_API_KEY", "").strip() + + mcp = make_server( + readonly_mode=readonly_mode, + sandbox_mode=sandbox_mode, + allow_external_tools=allow_external_tools, + allow_network=allow_network, + max_document_size=max_document_size, + max_open_documents=max_open_documents, + ) + + # Build ASGI app (FastMCP exposes to_asgi_app in current SDKs) + app = mcp.to_asgi_app() + + # Apply API key auth if configured + if api_key: + app = ApiKeyAuthMiddleware(app, api_key) + + host = os.getenv("DOCX_MCP_HTTP_HOST", "0.0.0.0") + port = int(os.getenv("DOCX_MCP_HTTP_PORT", "3000")) + + # Run with uvicorn (Streamable HTTP transport for OpenWebUI) + uvicorn.run(app, host=host, port=port, log_level="info") + + +if __name__ == "__main__": + main() diff --git a/src/py_docx/templates.py b/src/py_docx/templates.py new file mode 100644 index 0000000..950e077 --- /dev/null +++ b/src/py_docx/templates.py @@ -0,0 +1,22 @@ +from __future__ import annotations +import os +from typing import List + + +def list_templates(templates_dir: str) -> dict: + if not os.path.isdir(templates_dir): + return {"templates": []} + templates: List[str] = [] + for entry in os.listdir(templates_dir): + path = os.path.join(templates_dir, entry) + if os.path.isfile(path) and entry.lower().endswith(".docx"): + templates.append(entry) + templates.sort() + return {"templates": templates} + + +def open_template_path(templates_dir: str, name: str) -> str: + path = os.path.join(templates_dir, name) + if not os.path.isfile(path): + raise ValueError(f"Template not found: {name}") + return path