Files
mcp-docx/src/py_docx/server.py
T

710 lines
23 KiB
Python

from __future__ import annotations
import json
import logging
import os
import sys
from typing import Any
from mcp.server.fastmcp import FastMCP
import uvicorn
from py_docx.docx_tools import DocxToolsProvider
from py_docx.security import SecurityConfig, is_command_allowed
from py_docx.templates import list_templates
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
)
logger = logging.getLogger("py-docx-mcp")
TEMPLATES_DIR = os.getenv("DOCX_MCP_TEMPLATES_DIR", "/templates")
class ApiKeyAuthMiddleware:
"""
Simple ASGI middleware that enforces an API key if configured.
Accepts:
- Authorization: Bearer <key>
- X-API-Key: <key>
"""
def __init__(self, app, api_key: str):
self.app = app
self.api_key = api_key
async def __call__(self, scope, receive, send):
if scope["type"] != "http":
await self.app(scope, receive, send)
return
headers = {k.decode("utf-8").lower(): v.decode("utf-8") for k, v in scope.get("headers", [])}
auth = headers.get("authorization", "")
api_key_header = headers.get("x-api-key", "")
provided = ""
if auth.startswith("Bearer "):
provided = auth[len("Bearer "):].strip()
elif api_key_header:
provided = api_key_header.strip()
if provided != self.api_key:
await send(
{
"type": "http.response.start",
"status": 401,
"headers": [(b"content-type", b"application/json")],
}
)
await send(
{
"type": "http.response.body",
"body": json.dumps({"error": "Invalid or missing API key"}).encode("utf-8"),
}
)
return
await self.app(scope, receive, send)
def make_server(
readonly_mode: bool = False,
sandbox_mode: bool = False,
allow_external_tools: bool = False,
allow_network: bool = False,
max_document_size: int = 100 * 1024 * 1024,
max_open_documents: int = 30,
) -> FastMCP:
mcp = FastMCP(
"py-docx-mcp",
instructions="DOCX tools for reading and exporting via MCP (Python, Streamable HTTP)",
)
security_config = SecurityConfig(
readonly_mode=readonly_mode,
sandbox_mode=sandbox_mode,
allow_external_tools=allow_external_tools,
allow_network=allow_network,
max_document_size=max_document_size,
max_open_documents=max_open_documents,
)
provider = DocxToolsProvider(
security_config=security_config,
templates_dir=TEMPLATES_DIR,
)
def wrap(fn, name: str):
def wrapper(**kwargs):
if not is_command_allowed(name, security_config):
raise ValueError(f"Command '{name}' not allowed by security policy")
return fn(**kwargs)
wrapper.__name__ = fn.__name__
return wrapper
# Core document operations
mcp.tool()(
wrap(lambda: provider.create_document(), "create_document"),
name="create_document",
description="Create a new empty DOCX document",
)
mcp.tool()(
wrap(lambda path: provider.open_document(path), "open_document"),
name="open_document",
description="Open an existing DOCX document",
)
mcp.tool()(
wrap(
lambda document_id, text, style=None, return_content=False: provider.add_paragraph(
document_id, text, style or {}, return_content=return_content
),
"add_paragraph",
),
name="add_paragraph",
description="Add a paragraph with optional styling to the document",
)
mcp.tool()(
wrap(
lambda document_id, text, level, return_content=False: provider.add_heading(
document_id, text, level, return_content=return_content
),
"add_heading",
),
name="add_heading",
description="Add a heading to the document",
)
mcp.tool()(
wrap(
lambda document_id, rows, headers=None, border_style=None, col_widths=None, cell_shading=None, merges=None, return_content=False: provider.add_table(
document_id,
rows,
headers=headers,
border_style=border_style,
col_widths=col_widths,
cell_shading=cell_shading,
merges=merges,
return_content=return_content,
),
"add_table",
),
name="add_table",
description="Add a table to the document",
)
mcp.tool()(
wrap(
lambda document_id, page_size=None, orientation=None, margins=None, return_content=False: provider.add_section_break(
document_id, page_size, orientation, margins or {}, return_content=return_content
),
"add_section_break",
),
name="add_section_break",
description="Insert a section break with optional page setup",
)
mcp.tool()(
wrap(
lambda document_id, items, ordered=False, return_content=False: provider.add_list(
document_id, items, ordered, return_content=return_content
),
"add_list",
),
name="add_list",
description="Add a bulleted or numbered list to the document",
)
mcp.tool()(
wrap(
lambda document_id, text, level=0, ordered=False, return_content=False: provider.add_list_item(
document_id, text, level, ordered, return_content=return_content
),
"add_list_item",
),
name="add_list_item",
description="Add a single list item with a specific level",
)
mcp.tool()(
wrap(
lambda document_id, return_content=False: provider.add_page_break(document_id, return_content=return_content),
"add_page_break",
),
name="add_page_break",
description="Add a page break to the document",
)
mcp.tool()(
wrap(
lambda document_id, from_level=1, to_level=3, right_align_dots=True, return_content=False: provider.insert_toc(
document_id, from_level, to_level, right_align_dots, return_content=return_content
),
"insert_toc",
),
name="insert_toc",
description="Insert a Table of Contents placeholder",
)
mcp.tool()(
wrap(
lambda document_id, heading_text, name, return_content=False: provider.insert_bookmark_after_heading(
document_id, heading_text, name, return_content=return_content
),
"insert_bookmark_after_heading",
),
name="insert_bookmark_after_heading",
description="Insert a bookmark immediately after the first matching heading",
)
mcp.tool()(
wrap(
lambda document_id, text, return_content=False: provider.set_header(document_id, text, return_content=return_content),
"set_header",
),
name="set_header",
description="Set the document header",
)
mcp.tool()(
wrap(
lambda document_id, text, return_content=False: provider.set_footer(document_id, text, return_content=return_content),
"set_footer",
),
name="set_footer",
description="Set the document footer",
)
mcp.tool()(
wrap(
lambda document_id, location="footer", template=None, return_content=False: provider.set_page_numbering(
document_id, location, template, return_content=return_content
),
"set_page_numbering",
),
name="set_page_numbering",
description="Set a simple page numbering text in header or footer",
)
mcp.tool()(
wrap(
lambda document_id, return_content=False: provider.embed_page_number_fields(document_id, return_content=return_content),
"embed_page_number_fields",
),
name="embed_page_number_fields",
description="Replace placeholder 'Page {PAGE} of {PAGES}' with Word field codes (best-effort)",
)
mcp.tool()(
wrap(
lambda document_id, data_base64, width=None, height=None, alt_text=None, return_content=False: provider.add_image(
document_id, data_base64, width, height, alt_text, return_content=return_content
),
"add_image",
),
name="add_image",
description="Insert an image into the document",
)
mcp.tool()(
wrap(
lambda document_id, text, url, return_content=False: provider.add_hyperlink(
document_id, text, url, return_content=return_content
),
"add_hyperlink",
),
name="add_hyperlink",
description="Insert a hyperlink into the document",
)
mcp.tool()(
wrap(
lambda document_id, find_text, replace_text, return_content=False: provider.find_and_replace(
document_id, find_text, replace_text, return_content=return_content
),
"find_and_replace",
),
name="find_and_replace",
description="Find and replace text in the document",
)
mcp.tool()(
wrap(
lambda document_id, pattern, replacement, case_sensitive=False, whole_word=False, use_regex=False, return_content=False: provider.find_and_replace_advanced(
document_id, pattern, replacement, case_sensitive, whole_word, use_regex, return_content=return_content
),
"find_and_replace_advanced",
),
name="find_and_replace_advanced",
description="Find/replace with regex, case, whole-word, preserving runs",
)
mcp.tool()(
wrap(
lambda document_id, contains=None, format=None, return_content=False: provider.apply_paragraph_format(
document_id, contains, format or {}, return_content=return_content
),
"apply_paragraph_format",
),
name="apply_paragraph_format",
description="Apply paragraph formatting to paragraphs matching a simple selector",
)
mcp.tool()(
wrap(
lambda document_id: provider.extract_text(document_id),
"extract_text",
),
name="extract_text",
description="Extract all text content from the document",
)
mcp.tool()(
wrap(
lambda document_id: provider.get_tables(document_id),
"get_tables",
),
name="get_tables",
description="List tables with dimensions, merges, and cell content",
)
mcp.tool()(
wrap(
lambda document_id: provider.list_images(document_id),
"list_images",
),
name="list_images",
description="List images with width/height and alt text",
)
mcp.tool()(
wrap(
lambda document_id: provider.list_hyperlinks(document_id),
"list_hyperlinks",
),
name="list_hyperlinks",
description="List hyperlinks in the document",
)
mcp.tool()(
wrap(
lambda document_id: provider.get_fields_summary(document_id),
"get_fields_summary",
),
name="get_fields_summary",
description="Summarize Word fields (PAGE, NUMPAGES, TOC) in document and headers/footers",
)
mcp.tool()(
wrap(
lambda document_id: provider.strip_personal_info(document_id),
"strip_personal_info",
),
name="strip_personal_info",
description="Remove personal info from metadata and core.xml (best-effort)",
)
mcp.tool()(
wrap(
lambda document_id: provider.get_metadata(document_id),
"get_metadata",
),
name="get_metadata",
description="Get document metadata",
)
mcp.tool()(
wrap(
lambda document_id, output_path, return_content=True: provider.save_document(
document_id, output_path, return_content=return_content
),
"save_document",
),
name="save_document",
description="Save the document to a specific path and return its content",
)
mcp.tool()(
wrap(
lambda document_id: provider.close_document(document_id),
"close_document",
),
name="close_document",
description="Close the document and free resources",
)
mcp.tool()(
wrap(
lambda: provider.list_documents(),
"list_documents",
),
name="list_documents",
description="List all open documents",
)
mcp.tool()(
wrap(
lambda document_id, output_path, prefer_external=False, return_content=True: provider.convert_to_pdf(
document_id, output_path, prefer_external, return_content=return_content
),
"convert_to_pdf",
),
name="convert_to_pdf",
description="Convert a DOCX document to PDF and return the file",
)
mcp.tool()(
wrap(
lambda document_id, output_path, prefer_external=True, return_content=True: provider.export_pdf_with_field_refresh(
document_id, output_path, prefer_external, return_content=return_content
),
"export_pdf_with_field_refresh",
),
name="export_pdf_with_field_refresh",
description="Embed page fields then export to PDF (hi-fidelity when available)",
)
mcp.tool()(
wrap(
lambda document_id, output_dir, format="png", dpi=150, return_content=True: provider.convert_to_images(
document_id, output_dir, format, dpi, return_content=return_content
),
"convert_to_images",
),
name="convert_to_images",
description="Convert a DOCX document to images (one per page) and return them",
)
mcp.tool()(
wrap(
lambda document_id, output_dir, format="png", dpi=150, prefer_external=True, return_content=True: provider.convert_to_images_with_preference(
document_id, output_dir, format, dpi, prefer_external, return_content=return_content
),
"convert_to_images_with_preference",
),
name="convert_to_images_with_preference",
description="Convert DOCX to images, preferring external hi-fidelity path",
)
mcp.tool()(
wrap(
lambda document_ids, output_path, return_content=True: provider.merge_documents(
document_ids, output_path, return_content=return_content
),
"merge_documents",
),
name="merge_documents",
description="Merge multiple DOCX documents into one and return the result",
)
mcp.tool()(
wrap(
lambda document_id, output_dir, return_content=True: provider.split_document(
document_id, output_dir, return_content=return_content
),
"split_document",
),
name="split_document",
description="Split a document at page breaks and return parts",
)
mcp.tool()(
wrap(
lambda document_id: provider.get_document_structure(document_id),
"get_document_structure",
),
name="get_document_structure",
description="Get the structural overview of the document (headings, sections, etc.)",
)
mcp.tool()(
wrap(
lambda document_id: provider.get_outline(document_id),
"get_outline",
),
name="get_outline",
description="Return heading outline with range_ids",
)
mcp.tool()(
wrap(
lambda document_id, selector: provider.get_ranges(document_id, selector),
"get_ranges",
),
name="get_ranges",
description="Resolve a selector to range_ids",
)
mcp.tool()(
wrap(
lambda document_id, range_id, text, return_content=False: provider.replace_range_text(
document_id, range_id, text, return_content=return_content
),
"replace_range_text",
),
name="replace_range_text",
description="Replace text in a paragraph/heading by range_id",
)
mcp.tool()(
wrap(
lambda document_id, table_index, row, col, text, return_content=False: provider.set_table_cell_text(
document_id, table_index, row, col, text, return_content=return_content
),
"set_table_cell_text",
),
name="set_table_cell_text",
description="Set text in a table cell by indices",
)
mcp.tool()(
wrap(
lambda document_id: provider.get_document_properties(document_id),
"get_document_properties",
),
name="get_document_properties",
description="Get document properties (title, subject, author, timestamps)",
)
mcp.tool()(
wrap(
lambda document_id, title=None, subject=None, author=None, return_content=False: provider.set_document_properties(
document_id, title, subject, author, return_content=return_content
),
"set_document_properties",
),
name="set_document_properties",
description="Set document properties (title, subject, author)",
)
mcp.tool()(
wrap(
lambda document_id, heading_text, text, return_content=False: provider.insert_after_heading(
document_id, heading_text, text, return_content=return_content
),
"insert_after_heading",
),
name="insert_after_heading",
description="Insert a paragraph after the first heading that matches text",
)
mcp.tool()(
wrap(
lambda document_id: provider.sanitize_external_links(document_id),
"sanitize_external_links",
),
name="sanitize_external_links",
description="Remove external hyperlinks (http/https)",
)
mcp.tool()(
wrap(
lambda document_id, pattern, use_regex=False, whole_word=False, case_sensitive=False, return_content=False: provider.redact_text(
document_id, pattern, use_regex, whole_word, case_sensitive, return_content=return_content
),
"redact_text",
),
name="redact_text",
description="Redact text using regex/whole-word with █ character",
)
mcp.tool()(
wrap(
lambda document_id: provider.analyze_formatting(document_id),
"analyze_formatting",
),
name="analyze_formatting",
description="Analyze the formatting used throughout the document",
)
mcp.tool()(
wrap(
lambda document_id: provider.get_word_count(document_id),
"get_word_count",
),
name="get_word_count",
description="Get detailed word count statistics for the document",
)
mcp.tool()(
wrap(
lambda document_id, search_term, case_sensitive=False, whole_word=False: provider.search_text(
document_id, search_term, case_sensitive, whole_word
),
"search_text",
),
name="search_text",
description="Search for text patterns in the document",
)
mcp.tool()(
wrap(
lambda document_id, output_path, return_content=True: provider.export_to_markdown(
document_id, output_path, return_content=return_content
),
"export_to_markdown",
),
name="export_to_markdown",
description="Export document content to Markdown format and return the file",
)
mcp.tool()(
wrap(
lambda document_id, output_path, return_content=True: provider.export_to_html(
document_id, output_path, return_content=return_content
),
"export_to_html",
),
name="export_to_html",
description="Export document content to HTML format and return the file",
)
mcp.tool()(
wrap(
lambda: provider.get_security_info(),
"get_security_info",
),
name="get_security_info",
description="Get information about current security settings and restrictions",
)
mcp.tool()(
wrap(
lambda: provider.get_storage_info(),
"get_storage_info",
),
name="get_storage_info",
description="Get information about temporary storage usage",
)
mcp.tool()(
wrap(
lambda: list_templates(TEMPLATES_DIR),
"list_templates",
),
name="list_templates",
description="List available document templates from the templates directory",
)
mcp.tool()(
wrap(
lambda name: provider.open_template(name, TEMPLATES_DIR),
"open_template",
),
name="open_template",
description="Open a template document by name from the templates directory",
)
mcp.tool()(
wrap(
lambda template_name, output_path, fields=None, return_content=True: provider.generate_from_template(
template_name, output_path, fields or {}, return_content=return_content
),
"generate_from_template",
),
name="generate_from_template",
description="Generate a new document from a template and return the file",
)
return mcp
def main():
readonly_mode = os.getenv("DOCX_MCP_READONLY", "false").lower() in ("true", "1")
sandbox_mode = os.getenv("DOCX_MCP_SANDBOX", "true").lower() in ("true", "1")
allow_external_tools = os.getenv("DOCX_MCP_ALLOW_EXTERNAL_TOOLS", "false").lower() in ("true", "1")
allow_network = os.getenv("DOCX_MCP_ALLOW_NETWORK", "false").lower() in ("true", "1")
max_document_size = int(os.getenv("DOCX_MCP_MAX_SIZE", "104857600"))
max_open_documents = int(os.getenv("DOCX_MCP_MAX_DOCS", "30"))
api_key = os.getenv("DOCX_MCP_API_KEY", "").strip()
mcp = make_server(
readonly_mode=readonly_mode,
sandbox_mode=sandbox_mode,
allow_external_tools=allow_external_tools,
allow_network=allow_network,
max_document_size=max_document_size,
max_open_documents=max_open_documents,
)
# Build ASGI app (FastMCP exposes to_asgi_app in current SDKs)
app = mcp.to_asgi_app()
# Apply API key auth if configured
if api_key:
app = ApiKeyAuthMiddleware(app, api_key)
host = os.getenv("DOCX_MCP_HTTP_HOST", "0.0.0.0")
port = int(os.getenv("DOCX_MCP_HTTP_PORT", "3000"))
# Run with uvicorn (Streamable HTTP transport for OpenWebUI)
uvicorn.run(app, host=host, port=port, log_level="info")
if __name__ == "__main__":
main()