diff --git a/Dockerfile b/Dockerfile index 162ae01..22a54a9 100644 --- a/Dockerfile +++ b/Dockerfile @@ -9,7 +9,7 @@ # - Auth: Bearer (if DOCX_MCP_API_KEY is set) # # Environment: -# DOCX_MCP_API_KEY - API key (Bearer or X-API-Key). Optional but recommended. +# DOCX_MCP_API_KEY - API key (Bearer). Optional but recommended. # DOCX_MCP_HTTP_HOST - Bind host (default: 0.0.0.0) # DOCX_MCP_HTTP_PORT - Bind port (default: 3000) # DOCX_MCP_TEMPLATES_DIR - Templates directory (default: /templates) @@ -65,9 +65,9 @@ ENV DOCX_MCP_HTTP_HOST=0.0.0.0 \ # Expose HTTP port (Streamable HTTP for OpenWebUI) EXPOSE 3000 -# Health check (ensure module is importable) +# Health check (ensure server module is importable) HEALTHCHECK --interval=30s --timeout=5s --start-period=5s --retries=3 \ - CMD python -c "from server import main; print('ok')" || exit 1 + CMD python -c "from server import make_app; print('ok')" || exit 1 # Default: Streamable HTTP for OpenWebUI MCP ENTRYPOINT ["python", "-m", "server"] diff --git a/pyproject.toml b/pyproject.toml index c611743..4d2a94e 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -8,14 +8,14 @@ version = "0.1.0" description = "Python MCP server for DOCX document manipulation" requires-python = ">=3.10" dependencies = [ - "mcp>=1.0.0", + "fastapi>=0.115.0", + "uvicorn[standard]>=0.24.0", "python-docx>=1.1.0", "Pillow>=10.0.0", "markdown>=3.5", "html5lib>=1.1", "regex>=2024.0.0", "aiofiles>=24.0.0", - "uvicorn>=0.24.0", ] [project.scripts] diff --git a/src/server.py b/src/server.py index 6dce236..352f21f 100644 --- a/src/server.py +++ b/src/server.py @@ -3,10 +3,10 @@ from __future__ import annotations import json import logging import os -from typing import Any +from typing import Any, Dict, Optional -from mcp.server.fastmcp import FastMCP -import uvicorn +from fastapi import FastAPI, Request, Response +from fastapi.responses import JSONResponse from docx_tools import DocxToolsProvider from security import SecurityConfig, is_command_allowed @@ -21,64 +21,410 @@ logger = logging.getLogger("py-docx-mcp") TEMPLATES_DIR = os.getenv("DOCX_MCP_TEMPLATES_DIR", "/templates") -class ApiKeyAuthMiddleware: +def build_tools_list(provider: DocxToolsProvider, security_config: SecurityConfig) -> Dict[str, Any]: """ - Simple ASGI middleware that enforces an API key if configured. - Accepts: - - Authorization: Bearer - - X-API-Key: + Build the tools list returned by tools/list, + matching the shape expected by OpenWebUI. """ - def __init__(self, app, api_key: str): - self.app = app - self.api_key = api_key + def allowed(name: str) -> bool: + return is_command_allowed(name, security_config) - async def __call__(self, scope, receive, send): - if scope["type"] != "http": - await self.app(scope, receive, send) + tools = [] + + def add(name: str, desc: str, required: list[str], + extra_props: Optional[Dict[str, Any]] = None): + if not allowed(name): return + props = dict(extra_props or {}) + for r in required: + if r not in props: + props[r] = {"type": "string"} + tools.append({ + "name": name, + "description": desc, + "inputSchema": { + "type": "object", + "properties": props, + "required": required, + }, + }) - headers = {k.decode("utf-8").lower(): v.decode("utf-8") for k, v in scope.get("headers", [])} - auth = headers.get("authorization", "") - api_key_header = headers.get("x-api-key", "") + add("create_document", + "Create a new empty DOCX document", + []) - provided = "" - if auth.startswith("Bearer "): - provided = auth[len("Bearer "):].strip() - elif api_key_header: - provided = api_key_header.strip() + add("open_document", + "Open an existing DOCX document", + ["path"], + {"path": {"type": "string", "description": "Path to the DOCX file"}}) - if provided != self.api_key: - await send( - { - "type": "http.response.start", - "status": 401, - "headers": [(b"content-type", b"application/json")], - } - ) - await send( - { - "type": "http.response.body", - "body": json.dumps({"error": "Invalid or missing API key"}).encode("utf-8"), - } - ) - return + add("add_paragraph", + "Add a paragraph with optional styling to the document", + ["document_id", "text"], + { + "document_id": {"type": "string"}, + "text": {"type": "string"}, + "style": { + "type": "object", + "description": "Paragraph style options (font_family, font_size, bold, italic, underline, color, alignment)" + }, + "return_content": {"type": "boolean", "description": "Return document content as base64", "default": False}, + }) - await self.app(scope, receive, send) + add("add_heading", + "Add a heading to the document", + ["document_id", "text", "level"], + { + "document_id": {"type": "string"}, + "text": {"type": "string"}, + "level": {"type": "integer", "description": "Heading level (1-6)"}, + "return_content": {"type": "boolean", "default": False}, + }) + + add("add_table", + "Add a table to the document", + ["document_id", "rows"], + { + "document_id": {"type": "string"}, + "rows": {"type": "array", "items": {"type": "array", "items": {"type": "string"}}}, + "headers": {"type": "array", "items": {"type": "string"}}, + "border_style": {"type": "string"}, + "col_widths": {"type": "array", "items": {"type": "integer"}}, + "cell_shading": {"type": "string"}, + "merges": {"type": "array", "items": {"type": "object"}}, + "return_content": {"type": "boolean", "default": False}, + }) + + add("add_section_break", + "Insert a section break with optional page setup", + ["document_id"], + { + "document_id": {"type": "string"}, + "page_size": {"type": "string"}, + "orientation": {"type": "string"}, + "margins": {"type": "object"}, + "return_content": {"type": "boolean", "default": False}, + }) + + add("add_list", + "Add a bulleted or numbered list to the document", + ["document_id", "items"], + { + "document_id": {"type": "string"}, + "items": {"type": "array", "items": {"type": "string"}}, + "ordered": {"type": "boolean", "default": False}, + "return_content": {"type": "boolean", "default": False}, + }) + + add("add_list_item", + "Add a single list item with a specific level", + ["document_id", "text"], + { + "document_id": {"type": "string"}, + "text": {"type": "string"}, + "level": {"type": "integer", "default": 0}, + "ordered": {"type": "boolean", "default": False}, + "return_content": {"type": "boolean", "default": False}, + }) + + add("add_page_break", + "Add a page break to the document", + ["document_id"], + {"return_content": {"type": "boolean", "default": False}}) + + add("insert_toc", + "Insert a Table of Contents placeholder", + ["document_id"], + { + "from_level": {"type": "integer", "default": 1}, + "to_level": {"type": "integer", "default": 3}, + "right_align_dots": {"type": "boolean", "default": True}, + "return_content": {"type": "boolean", "default": False}, + }) + + add("insert_bookmark_after_heading", + "Insert a bookmark immediately after the first matching heading", + ["document_id", "heading_text", "name"], + {"return_content": {"type": "boolean", "default": False}}) + + add("set_header", + "Set the document header", + ["document_id", "text"], + {"return_content": {"type": "boolean", "default": False}}) + + add("set_footer", + "Set the document footer", + ["document_id", "text"], + {"return_content": {"type": "boolean", "default": False}}) + + add("set_page_numbering", + "Set a simple page numbering text in header or footer", + ["document_id"], + { + "location": {"type": "string", "default": "footer"}, + "template": {"type": "string"}, + "return_content": {"type": "boolean", "default": False}, + }) + + add("embed_page_number_fields", + "Replace placeholder 'Page {PAGE} of {PAGES}' with Word field codes (best-effort)", + ["document_id"], + {"return_content": {"type": "boolean", "default": False}}) + + add("add_image", + "Insert an image into the document", + ["document_id", "data_base64"], + { + "data_base64": {"type": "string"}, + "width": {"type": "integer"}, + "height": {"type": "integer"}, + "alt_text": {"type": "string"}, + "return_content": {"type": "boolean", "default": False}, + }) + + add("add_hyperlink", + "Insert a hyperlink into the document", + ["document_id", "text", "url"], + {"return_content": {"type": "boolean", "default": False}}) + + add("find_and_replace", + "Find and replace text in the document", + ["document_id", "find_text", "replace_text"], + {"return_content": {"type": "boolean", "default": False}}) + + add("find_and_replace_advanced", + "Find/replace with regex, case, whole-word, preserving runs", + ["document_id", "pattern", "replacement"], + { + "case_sensitive": {"type": "boolean", "default": False}, + "whole_word": {"type": "boolean", "default": False}, + "use_regex": {"type": "boolean", "default": False}, + "return_content": {"type": "boolean", "default": False}, + }) + + add("apply_paragraph_format", + "Apply paragraph formatting to paragraphs matching a simple selector", + ["document_id"], + { + "contains": {"type": "string"}, + "format": {"type": "object"}, + "return_content": {"type": "boolean", "default": False}, + }) + + add("extract_text", + "Extract all text content from the document", + ["document_id"]) + + add("get_tables", + "List tables with dimensions, merges, and cell content", + ["document_id"]) + + add("list_images", + "List images with width/height and alt text", + ["document_id"]) + + add("list_hyperlinks", + "List hyperlinks in the document", + ["document_id"]) + + add("get_fields_summary", + "Summarize Word fields (PAGE, NUMPAGES, TOC) in document and headers/footers", + ["document_id"]) + + add("strip_personal_info", + "Remove personal info from metadata and core.xml (best-effort)", + ["document_id"]) + + add("get_metadata", + "Get document metadata", + ["document_id"]) + + add("save_document", + "Save the document to a specific path and return its content", + ["document_id", "output_path"], + {"return_content": {"type": "boolean", "default": True}}) + + add("close_document", + "Close the document and free resources", + ["document_id"]) + + add("list_documents", + "List all open documents", + []) + + add("convert_to_pdf", + "Convert a DOCX document to PDF and return the file", + ["document_id", "output_path"], + { + "prefer_external": {"type": "boolean", "default": False}, + "return_content": {"type": "boolean", "default": True}, + }) + + add("export_pdf_with_field_refresh", + "Embed page fields then export to PDF (hi-fidelity when available)", + ["document_id", "output_path"], + { + "prefer_external": {"type": "boolean", "default": True}, + "return_content": {"type": "boolean", "default": True}, + }) + + add("convert_to_images", + "Convert a DOCX document to images (one per page) and return them", + ["document_id", "output_dir"], + { + "format": {"type": "string", "default": "png"}, + "dpi": {"type": "integer", "default": 150}, + "return_content": {"type": "boolean", "default": True}, + }) + + add("convert_to_images_with_preference", + "Convert DOCX to images, preferring external hi-fidelity path", + ["document_id", "output_dir"], + { + "format": {"type": "string", "default": "png"}, + "dpi": {"type": "integer", "default": 150}, + "prefer_external": {"type": "boolean", "default": True}, + "return_content": {"type": "boolean", "default": True}, + }) + + add("merge_documents", + "Merge multiple DOCX documents into one and return the result", + ["document_ids", "output_path"], + { + "document_ids": {"type": "array", "items": {"type": "string"}}, + "return_content": {"type": "boolean", "default": True}, + }) + + add("split_document", + "Split a document at page breaks and return parts", + ["document_id", "output_dir"], + {"return_content": {"type": "boolean", "default": True}}) + + add("get_document_structure", + "Get the structural overview of the document (headings, sections, etc.)", + ["document_id"]) + + add("get_outline", + "Return heading outline with range_ids", + ["document_id"]) + + add("get_ranges", + "Resolve a selector to range_ids", + ["document_id", "selector"]) + + add("replace_range_text", + "Replace text in a paragraph/heading by range_id", + ["document_id", "range_id", "text"], + {"return_content": {"type": "boolean", "default": False}}) + + add("set_table_cell_text", + "Set text in a table cell by indices", + ["document_id", "table_index", "row", "col", "text"], + { + "table_index": {"type": "integer"}, + "row": {"type": "integer"}, + "col": {"type": "integer"}, + "return_content": {"type": "boolean", "default": False}, + }) + + add("get_document_properties", + "Get document properties (title, subject, author, timestamps)", + ["document_id"]) + + add("set_document_properties", + "Set document properties (title, subject, author)", + ["document_id"], + { + "title": {"type": "string"}, + "subject": {"type": "string"}, + "author": {"type": "string"}, + "return_content": {"type": "boolean", "default": False}, + }) + + add("insert_after_heading", + "Insert a paragraph after the first heading that matches text", + ["document_id", "heading_text", "text"], + {"return_content": {"type": "boolean", "default": False}}) + + add("sanitize_external_links", + "Remove external hyperlinks (http/https)", + ["document_id"]) + + add("redact_text", + "Redact text using regex/whole-word with █ character", + ["document_id", "pattern"], + { + "use_regex": {"type": "boolean", "default": False}, + "whole_word": {"type": "boolean", "default": False}, + "case_sensitive": {"type": "boolean", "default": False}, + "return_content": {"type": "boolean", "default": False}, + }) + + add("analyze_formatting", + "Analyze the formatting used throughout the document", + ["document_id"]) + + add("get_word_count", + "Get detailed word count statistics for the document", + ["document_id"]) + + add("search_text", + "Search for text patterns in the document", + ["document_id", "search_term"], + { + "case_sensitive": {"type": "boolean", "default": False}, + "whole_word": {"type": "boolean", "default": False}, + }) + + add("export_to_markdown", + "Export document content to Markdown format and return the file", + ["document_id", "output_path"], + {"return_content": {"type": "boolean", "default": True}}) + + add("export_to_html", + "Export document content to HTML format and return the file", + ["document_id", "output_path"], + {"return_content": {"type": "boolean", "default": True}}) + + add("get_security_info", + "Get information about current security settings and restrictions", + []) + + add("get_storage_info", + "Get information about temporary storage usage", + []) + + add("list_templates", + "List available document templates from the templates directory", + []) + + add("open_template", + "Open a template document by name from the templates directory", + ["name"]) + + add("generate_from_template", + "Generate a new document from a template and return the file", + ["template_name", "output_path", "fields"], + { + "fields": {"type": "object"}, + "return_content": {"type": "boolean", "default": True}, + }) + + return {"tools": tools} -def make_server( - readonly_mode: bool = False, - sandbox_mode: bool = False, - allow_external_tools: bool = False, - allow_network: bool = False, - max_document_size: int = 100 * 1024 * 1024, - max_open_documents: int = 30, -) -> FastMCP: - mcp = FastMCP( - "py-docx-mcp", - instructions="DOCX tools for reading and exporting via MCP (Python, Streamable HTTP)", - ) +def make_app() -> FastAPI: + app = FastAPI(title="py-docx-mcp") + + readonly_mode = os.getenv("DOCX_MCP_READONLY", "false").lower() in ("true", "1") + sandbox_mode = os.getenv("DOCX_MCP_SANDBOX", "true").lower() in ("true", "1") + allow_external_tools = os.getenv("DOCX_MCP_ALLOW_EXTERNAL_TOOLS", "false").lower() in ("true", "1") + allow_network = os.getenv("DOCX_MCP_ALLOW_NETWORK", "false").lower() in ("true", "1") + max_document_size = int(os.getenv("DOCX_MCP_MAX_SIZE", "104857600")) + max_open_documents = int(os.getenv("DOCX_MCP_MAX_DOCS", "30")) + + api_key = os.getenv("DOCX_MCP_API_KEY", "").strip() security_config = SecurityConfig( readonly_mode=readonly_mode, @@ -94,503 +440,475 @@ def make_server( templates_dir=TEMPLATES_DIR, ) - def require_allowed(tool_name: str): - def decorator(fn): - def wrapper(*args, **kwargs): - if not is_command_allowed(tool_name, security_config): - raise ValueError(f"Command '{tool_name}' not allowed by security policy") - return fn(*args, **kwargs) - wrapper.__name__ = fn.__name__ - wrapper.__doc__ = fn.__doc__ - return wrapper - return decorator + tools_list = build_tools_list(provider, security_config) - @mcp.tool() - @require_allowed("create_document") - def create_document(): - """Create a new empty DOCX document""" + def get_bearer_token(request: Request) -> Optional[str]: + auth = (request.headers.get("Authorization") or "").strip() + if auth.startswith("Bearer "): + return auth[len("Bearer "):].strip() + return None + + def require_auth(request: Request) -> bool: + if not api_key: + return True + token = get_bearer_token(request) + if not token or token != api_key: + return False + return True + + @app.get("/") + async def root(request: Request): + if not require_auth(request): + return JSONResponse(status_code=401, content={"error": "Missing or invalid API key"}) + return { + "service": "py-docx-mcp", + "transport": "streamable-http", + "docs": "Use POST / with MCP JSON-RPC (initialize, tools/list, tools/call).", + } + + @app.post("/") + async def mcp_endpoint(request: Request): + if not require_auth(request): + return JSONResponse(status_code=401, content={"error": "Missing or invalid API key"}) + + body = await request.json() + method = body.get("method") + params = body.get("params") or {} + req_id = body.get("id") + + # MCP: initialize + if method == "initialize": + return { + "jsonrpc": "2.0", + "id": req_id, + "result": { + "protocolVersion": "2025-11-25", + "capabilities": {"tools": {}}, + "serverInfo": { + "name": "py-docx-mcp", + "version": "0.1.0", + }, + }, + } + + # MCP: tools/list + if method == "tools/list": + return { + "jsonrpc": "2.0", + "id": req_id, + "result": tools_list, + } + + # MCP: tools/call + if method == "tools/call": + tool_name = params.get("name") + tool_args = params.get("arguments") or {} + + # Security check: only allowed commands + if not is_command_allowed(tool_name, security_config): + return { + "jsonrpc": "2.0", + "id": req_id, + "error": { + "code": -32000, + "message": f"Command '{tool_name}' not allowed by security policy", + }, + } + + try: + result = call_tool_impl(tool_name, tool_args, provider) + return { + "jsonrpc": "2.0", + "id": req_id, + "result": { + "content": [ + { + "type": "text", + "text": json.dumps(result, ensure_ascii=False), + } + ] + }, + } + except Exception as e: + return { + "jsonrpc": "2.0", + "id": req_id, + "error": { + "code": -32000, + "message": str(e), + }, + } + + # Unknown method + return JSONResponse( + status_code=400, + content={"error": "Unknown method: " + str(method)}, + ) + + return app + + +def call_tool_impl(name: str, args: Dict[str, Any], provider: DocxToolsProvider) -> Any: + # Delegate to provider methods, matching names. + # This central dispatcher keeps tool signatures in one place. + + if name == "create_document": return provider.create_document() - @mcp.tool() - @require_allowed("open_document") - def open_document(path: str): - """Open an existing DOCX document""" - return provider.open_document(path) + if name == "open_document": + return provider.open_document(args["path"]) - @mcp.tool() - @require_allowed("add_paragraph") - def add_paragraph(document_id: str, text: str, style: dict | None = None, return_content: bool = False): - """Add a paragraph with optional styling to the document""" - return provider.add_paragraph(document_id, text, style or {}, return_content=return_content) + if name == "add_paragraph": + return provider.add_paragraph( + args["document_id"], + args["text"], + args.get("style") or {}, + return_content=args.get("return_content", False), + ) - @mcp.tool() - @require_allowed("add_heading") - def add_heading(document_id: str, text: str, level: int, return_content: bool = False): - """Add a heading to the document""" - return provider.add_heading(document_id, text, level, return_content=return_content) + if name == "add_heading": + return provider.add_heading( + args["document_id"], + args["text"], + int(args["level"]), + return_content=args.get("return_content", False), + ) - @mcp.tool() - @require_allowed("add_table") - def add_table( - document_id: str, - rows: list[list[str]], - headers: list[str] | None = None, - border_style: str | None = None, - col_widths: list[int] | None = None, - cell_shading: str | None = None, - merges: list[dict] | None = None, - return_content: bool = False, - ): - """Add a table to the document""" + if name == "add_table": return provider.add_table( - document_id, - rows, - headers=headers, - border_style=border_style, - col_widths=col_widths, - cell_shading=cell_shading, - merges=merges, - return_content=return_content, + args["document_id"], + args["rows"], + headers=args.get("headers"), + border_style=args.get("border_style"), + col_widths=args.get("col_widths"), + cell_shading=args.get("cell_shading"), + merges=args.get("merges"), + return_content=args.get("return_content", False), ) - @mcp.tool() - @require_allowed("add_section_break") - def add_section_break( - document_id: str, - page_size: str | None = None, - orientation: str | None = None, - margins: dict | None = None, - return_content: bool = False, - ): - """Insert a section break with optional page setup""" + if name == "add_section_break": return provider.add_section_break( - document_id, page_size, orientation, margins or {}, return_content=return_content + args["document_id"], + page_size=args.get("page_size"), + orientation=args.get("orientation"), + margins=args.get("margins"), + return_content=args.get("return_content", False), ) - @mcp.tool() - @require_allowed("add_list") - def add_list(document_id: str, items: list[str], ordered: bool = False, return_content: bool = False): - """Add a bulleted or numbered list to the document""" - return provider.add_list(document_id, items, ordered, return_content=return_content) + if name == "add_list": + return provider.add_list( + args["document_id"], + args["items"], + ordered=bool(args.get("ordered", False)), + return_content=args.get("return_content", False), + ) - @mcp.tool() - @require_allowed("add_list_item") - def add_list_item(document_id: str, text: str, level: int = 0, ordered: bool = False, return_content: bool = False): - """Add a single list item with a specific level""" - return provider.add_list_item(document_id, text, level, ordered, return_content=return_content) + if name == "add_list_item": + return provider.add_list_item( + args["document_id"], + args["text"], + level=int(args.get("level", 0)), + ordered=bool(args.get("ordered", False)), + return_content=args.get("return_content", False), + ) - @mcp.tool() - @require_allowed("add_page_break") - def add_page_break(document_id: str, return_content: bool = False): - """Add a page break to the document""" - return provider.add_page_break(document_id, return_content=return_content) + if name == "add_page_break": + return provider.add_page_break( + args["document_id"], + return_content=args.get("return_content", False), + ) - @mcp.tool() - @require_allowed("insert_toc") - def insert_toc( - document_id: str, - from_level: int = 1, - to_level: int = 3, - right_align_dots: bool = True, - return_content: bool = False, - ): - """Insert a Table of Contents placeholder""" + if name == "insert_toc": return provider.insert_toc( - document_id, from_level, to_level, right_align_dots, return_content=return_content + args["document_id"], + from_level=int(args.get("from_level", 1)), + to_level=int(args.get("to_level", 3)), + right_align_dots=bool(args.get("right_align_dots", True)), + return_content=args.get("return_content", False), ) - @mcp.tool() - @require_allowed("insert_bookmark_after_heading") - def insert_bookmark_after_heading( - document_id: str, - heading_text: str, - name: str, - return_content: bool = False, - ): - """Insert a bookmark immediately after the first matching heading""" + if name == "insert_bookmark_after_heading": return provider.insert_bookmark_after_heading( - document_id, heading_text, name, return_content=return_content + args["document_id"], + args["heading_text"], + args["name"], + return_content=args.get("return_content", False), ) - @mcp.tool() - @require_allowed("set_header") - def set_header(document_id: str, text: str, return_content: bool = False): - """Set the document header""" - return provider.set_header(document_id, text, return_content=return_content) + if name == "set_header": + return provider.set_header( + args["document_id"], + args["text"], + return_content=args.get("return_content", False), + ) - @mcp.tool() - @require_allowed("set_footer") - def set_footer(document_id: str, text: str, return_content: bool = False): - """Set the document footer""" - return provider.set_footer(document_id, text, return_content=return_content) + if name == "set_footer": + return provider.set_footer( + args["document_id"], + args["text"], + return_content=args.get("return_content", False), + ) - @mcp.tool() - @require_allowed("set_page_numbering") - def set_page_numbering( - document_id: str, - location: str = "footer", - template: str | None = None, - return_content: bool = False, - ): - """Set a simple page numbering text in header or footer""" - return provider.set_page_numbering(document_id, location, template, return_content=return_content) + if name == "set_page_numbering": + return provider.set_page_numbering( + args["document_id"], + location=args.get("location", "footer"), + template=args.get("template"), + return_content=args.get("return_content", False), + ) - @mcp.tool() - @require_allowed("embed_page_number_fields") - def embed_page_number_fields(document_id: str, return_content: bool = False): - """Replace placeholder 'Page {PAGE} of {PAGES}' with Word field codes (best-effort)""" - return provider.embed_page_number_fields(document_id, return_content=return_content) + if name == "embed_page_number_fields": + return provider.embed_page_number_fields( + args["document_id"], + return_content=args.get("return_content", False), + ) - @mcp.tool() - @require_allowed("add_image") - def add_image( - document_id: str, - data_base64: str, - width: int | None = None, - height: int | None = None, - alt_text: str | None = None, - return_content: bool = False, - ): - """Insert an image into the document""" - return provider.add_image(document_id, data_base64, width, height, alt_text, return_content=return_content) + if name == "add_image": + return provider.add_image( + args["document_id"], + args["data_base64"], + width=args.get("width"), + height=args.get("height"), + alt_text=args.get("alt_text"), + return_content=args.get("return_content", False), + ) - @mcp.tool() - @require_allowed("add_hyperlink") - def add_hyperlink(document_id: str, text: str, url: str, return_content: bool = False): - """Insert a hyperlink into the document""" - return provider.add_hyperlink(document_id, text, url, return_content=return_content) + if name == "add_hyperlink": + return provider.add_hyperlink( + args["document_id"], + args["text"], + args["url"], + return_content=args.get("return_content", False), + ) - @mcp.tool() - @require_allowed("find_and_replace") - def find_and_replace(document_id: str, find_text: str, replace_text: str, return_content: bool = False): - """Find and replace text in the document""" - return provider.find_and_replace(document_id, find_text, replace_text, return_content=return_content) + if name == "find_and_replace": + return provider.find_and_replace( + args["document_id"], + args["find_text"], + args["replace_text"], + return_content=args.get("return_content", False), + ) - @mcp.tool() - @require_allowed("find_and_replace_advanced") - def find_and_replace_advanced( - document_id: str, - pattern: str, - replacement: str, - case_sensitive: bool = False, - whole_word: bool = False, - use_regex: bool = False, - return_content: bool = False, - ): - """Find/replace with regex, case, whole-word, preserving runs""" + if name == "find_and_replace_advanced": return provider.find_and_replace_advanced( - document_id, pattern, replacement, case_sensitive, whole_word, use_regex, return_content=return_content + args["document_id"], + args["pattern"], + args["replacement"], + case_sensitive=bool(args.get("case_sensitive", False)), + whole_word=bool(args.get("whole_word", False)), + use_regex=bool(args.get("use_regex", False)), + return_content=args.get("return_content", False), ) - @mcp.tool() - @require_allowed("apply_paragraph_format") - def apply_paragraph_format( - document_id: str, - contains: str | None = None, - format: dict | None = None, - return_content: bool = False, - ): - """Apply paragraph formatting to paragraphs matching a simple selector""" - return provider.apply_paragraph_format(document_id, contains, format or {}, return_content=return_content) + if name == "apply_paragraph_format": + return provider.apply_paragraph_format( + args["document_id"], + contains=args.get("contains"), + format=args.get("format") or {}, + return_content=args.get("return_content", False), + ) - @mcp.tool() - @require_allowed("extract_text") - def extract_text(document_id: str): - """Extract all text content from the document""" - return provider.extract_text(document_id) + if name == "extract_text": + return provider.extract_text(args["document_id"]) - @mcp.tool() - @require_allowed("get_tables") - def get_tables(document_id: str): - """List tables with dimensions, merges, and cell content""" - return provider.get_tables(document_id) + if name == "get_tables": + return provider.get_tables(args["document_id"]) - @mcp.tool() - @require_allowed("list_images") - def list_images(document_id: str): - """List images with width/height and alt text""" - return provider.list_images(document_id) + if name == "list_images": + return provider.list_images(args["document_id"]) - @mcp.tool() - @require_allowed("list_hyperlinks") - def list_hyperlinks(document_id: str): - """List hyperlinks in the document""" - return provider.list_hyperlinks(document_id) + if name == "list_hyperlinks": + return provider.list_hyperlinks(args["document_id"]) - @mcp.tool() - @require_allowed("get_fields_summary") - def get_fields_summary(document_id: str): - """Summarize Word fields (PAGE, NUMPAGES, TOC) in document and headers/footers""" - return provider.get_fields_summary(document_id) + if name == "get_fields_summary": + return provider.get_fields_summary(args["document_id"]) - @mcp.tool() - @require_allowed("strip_personal_info") - def strip_personal_info(document_id: str): - """Remove personal info from metadata and core.xml (best-effort)""" - return provider.strip_personal_info(document_id) + if name == "strip_personal_info": + return provider.strip_personal_info(args["document_id"]) - @mcp.tool() - @require_allowed("get_metadata") - def get_metadata(document_id: str): - """Get document metadata""" - return provider.get_metadata(document_id) + if name == "get_metadata": + return provider.get_metadata(args["document_id"]) - @mcp.tool() - @require_allowed("save_document") - def save_document(document_id: str, output_path: str, return_content: bool = True): - """Save the document to a specific path and return its content""" - return provider.save_document(document_id, output_path, return_content=return_content) + if name == "save_document": + return provider.save_document( + args["document_id"], + args["output_path"], + return_content=args.get("return_content", True), + ) - @mcp.tool() - @require_allowed("close_document") - def close_document(document_id: str): - """Close the document and free resources""" - return provider.close_document(document_id) + if name == "close_document": + return provider.close_document(args["document_id"]) - @mcp.tool() - @require_allowed("list_documents") - def list_documents(): - """List all open documents""" + if name == "list_documents": return provider.list_documents() - @mcp.tool() - @require_allowed("convert_to_pdf") - def convert_to_pdf(document_id: str, output_path: str, prefer_external: bool = False, return_content: bool = True): - """Convert a DOCX document to PDF and return the file""" - return provider.convert_to_pdf(document_id, output_path, prefer_external, return_content=return_content) + if name == "convert_to_pdf": + return provider.convert_to_pdf( + args["document_id"], + args["output_path"], + prefer_external=bool(args.get("prefer_external", False)), + return_content=args.get("return_content", True), + ) - @mcp.tool() - @require_allowed("export_pdf_with_field_refresh") - def export_pdf_with_field_refresh( - document_id: str, - output_path: str, - prefer_external: bool = True, - return_content: bool = True, - ): - """Embed page fields then export to PDF (hi-fidelity when available)""" + if name == "export_pdf_with_field_refresh": return provider.export_pdf_with_field_refresh( - document_id, output_path, prefer_external, return_content=return_content + args["document_id"], + args["output_path"], + prefer_external=bool(args.get("prefer_external", True)), + return_content=args.get("return_content", True), ) - @mcp.tool() - @require_allowed("convert_to_images") - def convert_to_images( - document_id: str, - output_dir: str, - format: str = "png", - dpi: int = 150, - return_content: bool = True, - ): - """Convert a DOCX document to images (one per page) and return them""" - return provider.convert_to_images(document_id, output_dir, format, dpi, return_content=return_content) + if name == "convert_to_images": + return provider.convert_to_images( + args["document_id"], + args["output_dir"], + format=args.get("format", "png"), + dpi=int(args.get("dpi", 150)), + return_content=args.get("return_content", True), + ) - @mcp.tool() - @require_allowed("convert_to_images_with_preference") - def convert_to_images_with_preference( - document_id: str, - output_dir: str, - format: str = "png", - dpi: int = 150, - prefer_external: bool = True, - return_content: bool = True, - ): - """Convert DOCX to images, preferring external hi-fidelity path""" + if name == "convert_to_images_with_preference": return provider.convert_to_images_with_preference( - document_id, output_dir, format, dpi, prefer_external, return_content=return_content + args["document_id"], + args["output_dir"], + format=args.get("format", "png"), + dpi=int(args.get("dpi", 150)), + prefer_external=bool(args.get("prefer_external", True)), + return_content=args.get("return_content", True), ) - @mcp.tool() - @require_allowed("merge_documents") - def merge_documents(document_ids: list[str], output_path: str, return_content: bool = True): - """Merge multiple DOCX documents into one and return the result""" - return provider.merge_documents(document_ids, output_path, return_content=return_content) + if name == "merge_documents": + return provider.merge_documents( + args["document_ids"], + args["output_path"], + return_content=args.get("return_content", True), + ) - @mcp.tool() - @require_allowed("split_document") - def split_document(document_id: str, output_dir: str, return_content: bool = True): - """Split a document at page breaks and return parts""" - return provider.split_document(document_id, output_dir, return_content=return_content) + if name == "split_document": + return provider.split_document( + args["document_id"], + args["output_dir"], + return_content=args.get("return_content", True), + ) - @mcp.tool() - @require_allowed("get_document_structure") - def get_document_structure(document_id: str): - """Get the structural overview of the document (headings, sections, etc.)""" - return provider.get_document_structure(document_id) + if name == "get_document_structure": + return provider.get_document_structure(args["document_id"]) - @mcp.tool() - @require_allowed("get_outline") - def get_outline(document_id: str): - """Return heading outline with range_ids""" - return provider.get_outline(document_id) + if name == "get_outline": + return provider.get_outline(args["document_id"]) - @mcp.tool() - @require_allowed("get_ranges") - def get_ranges(document_id: str, selector: str): - """Resolve a selector to range_ids""" - return provider.get_ranges(document_id, selector) + if name == "get_ranges": + return provider.get_ranges(args["document_id"], args["selector"]) - @mcp.tool() - @require_allowed("replace_range_text") - def replace_range_text(document_id: str, range_id: dict, text: str, return_content: bool = False): - """Replace text in a paragraph/heading by range_id""" - return provider.replace_range_text(document_id, range_id, text, return_content=return_content) + if name == "replace_range_text": + return provider.replace_range_text( + args["document_id"], + args["range_id"], + args["text"], + return_content=args.get("return_content", False), + ) - @mcp.tool() - @require_allowed("set_table_cell_text") - def set_table_cell_text( - document_id: str, - table_index: int, - row: int, - col: int, - text: str, - return_content: bool = False, - ): - """Set text in a table cell by indices""" - return provider.set_table_cell_text(document_id, table_index, row, col, text, return_content=return_content) + if name == "set_table_cell_text": + return provider.set_table_cell_text( + args["document_id"], + int(args["table_index"]), + int(args["row"]), + int(args["col"]), + args["text"], + return_content=args.get("return_content", False), + ) - @mcp.tool() - @require_allowed("get_document_properties") - def get_document_properties(document_id: str): - """Get document properties (title, subject, author, timestamps)""" - return provider.get_document_properties(document_id) + if name == "get_document_properties": + return provider.get_document_properties(args["document_id"]) - @mcp.tool() - @require_allowed("set_document_properties") - def set_document_properties( - document_id: str, - title: str | None = None, - subject: str | None = None, - author: str | None = None, - return_content: bool = False, - ): - """Set document properties (title, subject, author)""" - return provider.set_document_properties(document_id, title, subject, author, return_content=return_content) + if name == "set_document_properties": + return provider.set_document_properties( + args["document_id"], + title=args.get("title"), + subject=args.get("subject"), + author=args.get("author"), + return_content=args.get("return_content", False), + ) - @mcp.tool() - @require_allowed("insert_after_heading") - def insert_after_heading(document_id: str, heading_text: str, text: str, return_content: bool = False): - """Insert a paragraph after the first heading that matches text""" - return provider.insert_after_heading(document_id, heading_text, text, return_content=return_content) + if name == "insert_after_heading": + return provider.insert_after_heading( + args["document_id"], + args["heading_text"], + args["text"], + return_content=args.get("return_content", False), + ) - @mcp.tool() - @require_allowed("sanitize_external_links") - def sanitize_external_links(document_id: str): - """Remove external hyperlinks (http/https)""" - return provider.sanitize_external_links(document_id) + if name == "sanitize_external_links": + return provider.sanitize_external_links(args["document_id"]) - @mcp.tool() - @require_allowed("redact_text") - def redact_text( - document_id: str, - pattern: str, - use_regex: bool = False, - whole_word: bool = False, - case_sensitive: bool = False, - return_content: bool = False, - ): - """Redact text using regex/whole-word with █ character""" - return provider.redact_text(document_id, pattern, use_regex, whole_word, case_sensitive, return_content=return_content) + if name == "redact_text": + return provider.redact_text( + args["document_id"], + args["pattern"], + use_regex=bool(args.get("use_regex", False)), + whole_word=bool(args.get("whole_word", False)), + case_sensitive=bool(args.get("case_sensitive", False)), + return_content=args.get("return_content", False), + ) - @mcp.tool() - @require_allowed("analyze_formatting") - def analyze_formatting(document_id: str): - """Analyze the formatting used throughout the document""" - return provider.analyze_formatting(document_id) + if name == "analyze_formatting": + return provider.analyze_formatting(args["document_id"]) - @mcp.tool() - @require_allowed("get_word_count") - def get_word_count(document_id: str): - """Get detailed word count statistics for the document""" - return provider.get_word_count(document_id) + if name == "get_word_count": + return provider.get_word_count(args["document_id"]) - @mcp.tool() - @require_allowed("search_text") - def search_text(document_id: str, search_term: str, case_sensitive: bool = False, whole_word: bool = False): - """Search for text patterns in the document""" - return provider.search_text(document_id, search_term, case_sensitive, whole_word) + if name == "search_text": + return provider.search_text( + args["document_id"], + args["search_term"], + case_sensitive=bool(args.get("case_sensitive", False)), + whole_word=bool(args.get("whole_word", False)), + ) - @mcp.tool() - @require_allowed("export_to_markdown") - def export_to_markdown(document_id: str, output_path: str, return_content: bool = True): - """Export document content to Markdown format and return the file""" - return provider.export_to_markdown(document_id, output_path, return_content=return_content) + if name == "export_to_markdown": + return provider.export_to_markdown( + args["document_id"], + args["output_path"], + return_content=args.get("return_content", True), + ) - @mcp.tool() - @require_allowed("export_to_html") - def export_to_html(document_id: str, output_path: str, return_content: bool = True): - """Export document content to HTML format and return the file""" - return provider.export_to_html(document_id, output_path, return_content=return_content) + if name == "export_to_html": + return provider.export_to_html( + args["document_id"], + args["output_path"], + return_content=args.get("return_content", True), + ) - @mcp.tool() - @require_allowed("get_security_info") - def get_security_info(): - """Get information about current security settings and restrictions""" + if name == "get_security_info": return provider.get_security_info() - @mcp.tool() - @require_allowed("get_storage_info") - def get_storage_info(): - """Get information about temporary storage usage""" + if name == "get_storage_info": return provider.get_storage_info() - @mcp.tool() - @require_allowed("list_templates") - def list_templates(): - """List available document templates from the templates directory""" + if name == "list_templates": return list_templates(TEMPLATES_DIR) - @mcp.tool() - @require_allowed("open_template") - def open_template(name: str): - """Open a template document by name from the templates directory""" - return provider.open_template(name, TEMPLATES_DIR) + if name == "open_template": + return provider.open_template(args["name"], TEMPLATES_DIR) - @mcp.tool() - @require_allowed("generate_from_template") - def generate_from_template( - template_name: str, - output_path: str, - fields: dict | None = None, - return_content: bool = True, - ): - """Generate a new document from a template and return the file""" + if name == "generate_from_template": return provider.generate_from_template( - template_name, output_path, fields or {}, return_content=return_content + args["template_name"], + args["output_path"], + args.get("fields") or {}, + return_content=args.get("return_content", True), ) - return mcp + raise ValueError(f"Unknown tool: {name}") def main(): - readonly_mode = os.getenv("DOCX_MCP_READONLY", "false").lower() in ("true", "1") - sandbox_mode = os.getenv("DOCX_MCP_SANDBOX", "true").lower() in ("true", "1") - allow_external_tools = os.getenv("DOCX_MCP_ALLOW_EXTERNAL_TOOLS", "false").lower() in ("true", "1") - allow_network = os.getenv("DOCX_MCP_ALLOW_NETWORK", "false").lower() in ("true", "1") - max_document_size = int(os.getenv("DOCX_MCP_MAX_SIZE", "104857600")) - max_open_documents = int(os.getenv("DOCX_MCP_MAX_DOCS", "30")) - - api_key = os.getenv("DOCX_MCP_API_KEY", "").strip() - - mcp = make_server( - readonly_mode=readonly_mode, - sandbox_mode=sandbox_mode, - allow_external_tools=allow_external_tools, - allow_network=allow_network, - max_document_size=max_document_size, - max_open_documents=max_open_documents, - ) - - # Build ASGI app (FastMCP exposes to_asgi_app in current SDKs) - app = mcp.streamable_http_app() - - # Apply API key auth if configured - if api_key: - app = ApiKeyAuthMiddleware(app, api_key) - + app = make_app() host = os.getenv("DOCX_MCP_HTTP_HOST", "0.0.0.0") port = int(os.getenv("DOCX_MCP_HTTP_PORT", "3000")) - # Run with uvicorn (Streamable HTTP transport for OpenWebUI) + import uvicorn + uvicorn.run(app, host=host, port=port, log_level="info")