Files
mcp-summary/mcp_summary_server.py
T

449 lines
14 KiB
Python

#!/usr/bin/env python3
"""
MCP Summary Server (Streamable HTTP transport)
Designed to work with OpenWebUI's MCP (Streamable HTTP) integration.
Summarizes documents by:
1. Checking text length
2. If short, summarizing directly with LLM
3. If long, chunking text, summarizing each chunk, then synthesizing
All processing happens server-side, keeping full text out of the chat context window.
Tools:
- summarize_document: Summarize a document (handles chunking automatically)
Auth:
- If API_KEY is set:
- Requires: Authorization: Bearer <API_KEY>
- If API_KEY is not set:
- No auth required (for local/internal use).
"""
import json
import os
import sys
import logging
from http.server import HTTPServer, BaseHTTPRequestHandler
from typing import Any, Dict, List, Optional
import requests
from requests.exceptions import RequestException
# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger("mcp-summary")
# MCP Server Configuration
API_KEY = os.environ.get("API_KEY", "").strip()
PORT = int(os.environ.get("PORT", "8080"))
# LLM Configuration
OPENAPI_URL = os.environ.get("OPENAPI_URL", "http://localhost:8080/v1")
OPENAPI_API_KEY = os.environ.get("OPENAPI_API_KEY", "")
MODEL_NAME = os.environ.get("MODEL_NAME", "gpt-4o")
# Summarization Configuration
CHUNK_SIZE = int(os.environ.get("CHUNK_SIZE", "4000"))
OVERLAP = int(os.environ.get("OVERLAP", "200"))
TARGET_INTERMEDIATE_SUMMARY_LENGTH = int(os.environ.get("TARGET_INTERMEDIATE_SUMMARY_LENGTH", "150"))
MAX_DIRECT_SUMMARY_LENGTH = int(os.environ.get("MAX_DIRECT_SUMMARY_LENGTH", "100"))
MAX_DIRECT_TEXT_LENGTH = int(os.environ.get("MAX_DIRECT_TEXT_LENGTH", "8000"))
LLM_TIMEOUT = int(os.environ.get("LLM_TIMEOUT", "120"))
# Tool definitions
TOOLS_LIST: Dict[str, Any] = {
"tools": [
{
"name": "summarize_document",
"description": "Summarize a document. Automatically handles chunking for long text. Returns a concise summary without exposing the full text.",
"inputSchema": {
"type": "object",
"properties": {
"text": {
"type": "string",
"description": "The document text to summarize"
},
"max_length": {
"type": "integer",
"description": "Maximum length of summary in words (default: 100)"
}
},
"required": ["text"]
}
}
]
}
def get_bearer_token(headers: Any) -> Optional[str]:
"""Extract bearer token from Authorization header."""
auth = (headers.get("Authorization") or "").strip()
if auth.startswith("Bearer "):
return auth[len("Bearer "):].strip()
return None
def require_auth(headers: Any) -> bool:
"""Check authentication. Returns True if auth passes or is not required."""
if not API_KEY:
return True
token = get_bearer_token(headers)
if not token or token != API_KEY:
raise PermissionError("Missing or invalid API key")
return True
def call_llm(messages: List[Dict], temperature: float = 0.3) -> str:
"""Make an OpenAPI-compatible LLM call with error handling."""
url = f"{OPENAPI_URL}/chat/completions"
headers = {
"Content-Type": "application/json",
"Authorization": f"Bearer {OPENAPI_API_KEY}"
}
payload = {
"model": MODEL_NAME,
"messages": messages,
"temperature": temperature,
"max_tokens": 2000,
"top_p": 0.9
}
try:
logger.info(f"Calling LLM at {OPENAPI_URL} with model {MODEL_NAME}")
response = requests.post(url, headers=headers, json=payload, timeout=LLM_TIMEOUT)
response.raise_for_status()
data = response.json()
return data["choices"][0]["message"]["content"]
except RequestException as e:
logger.error(f"LLM request failed: {e}")
raise RuntimeError(f"Failed to connect to LLM at {OPENAPI_URL}: {str(e)}")
except Exception as e:
logger.error(f"LLM call failed: {e}")
raise RuntimeError(f"LLM call failed: {str(e)}")
def chunk_text(text: str) -> List[str]:
"""Split text into chunks with overlap for summarization."""
if len(text) <= CHUNK_SIZE:
return [text]
chunks = []
start = 0
while start < len(text):
end = min(start + CHUNK_SIZE, len(text))
break_point = end
for marker in ["\n\n", "\n", ". ", "! ", "? "]:
pos = text.rfind(marker, start + CHUNK_SIZE // 2, end)
if pos > start:
break_point = pos
break
chunk = text[start:break_point]
if chunk.strip():
chunks.append(chunk)
start = break_point - OVERLAP if break_point < len(text) else len(text)
if start >= len(text):
break
logger.info(f"Split text into {len(chunks)} chunks")
return chunks
def summarize_chunk(chunk_text: str, chunk_num: int, total_chunks: int) -> str:
"""Summarize a single chunk of text."""
system_prompt = f"""You are a precise legal assistant creating concise, accurate summaries.
You are processing chunk {chunk_num} of {total_chunks} from a larger document.
Create a focused summary that:
- Captures key points and important details
- Is approximately {TARGET_INTERMEDIATE_SUMMARY_LENGTH} words
- Can be combined with other chunk summaries
- Uses clear, professional language
- Preserves names, dates, and specific facts
Respond as plain text without bullet points."""
user_prompt = f"""Summarize this text (chunk {chunk_num} of {total_chunks}):
{chunk_text}
Summary:"""
messages = [
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_prompt}
]
logger.info(f"Summarizing chunk {chunk_num}/{total_chunks}")
return call_llm(messages)
def synthesize_summaries(chunk_summaries: List[str]) -> str:
"""Synthesize multiple chunk summaries into a single final summary."""
combined = "\n\n".join(chunk_summaries)
system_prompt = """You are a precise legal assistant creating executive-level summaries.
Synthesize the provided partial summaries into a single, cohesive summary that:
- Is approximately 100 words
- Captures the complete document picture
- Is clear and professional
- Removes redundancy
- Maintains logical flow
- Preserves all critical information
Format as a single paragraph of plain text."""
user_prompt = f"""Synthesize these partial summaries into one cohesive summary:
{combined}
Final summary:"""
messages = [
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_prompt}
]
logger.info(f"Synthesizing {len(chunk_summaries)} chunk summaries")
return call_llm(messages)
def summarize_document(text: str, max_length: int = MAX_DIRECT_SUMMARY_LENGTH) -> Dict[str, Any]:
"""
Main summarization function.
- If text is short, summarize directly
- If text is long, chunk and summarize each chunk, then synthesize
"""
original_length = len(text)
text = text.strip()
if not text:
raise ValueError("Empty text provided")
logger.info(f"Summarizing text of {original_length} characters")
# Direct summarization for shorter texts
if len(text) <= MAX_DIRECT_TEXT_LENGTH:
system_prompt = f"""You are a precise legal assistant creating concise, accurate summaries.
Create a summary that:
- Is approximately {max_length} words
- Captures key points and important details
- Uses clear, professional language
- Preserves names, dates, and specific facts
Format as plain text without bullet points."""
user_prompt = f"""Summarize the following document:
{text}
Summary:"""
messages = [
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_prompt}
]
summary = call_llm(messages)
return {
"summary": summary,
"original_length": original_length,
"method": "direct",
"chunks": 1
}
# Chunked summarization for longer texts
chunks = chunk_text(text)
chunk_summaries = []
for i, chunk in enumerate(chunks, 1):
chunk_summary = summarize_chunk(chunk, i, len(chunks))
chunk_summaries.append(chunk_summary)
final_summary = synthesize_summaries(chunk_summaries)
return {
"summary": final_summary,
"original_length": original_length,
"method": "chunked",
"chunks": len(chunks)
}
class MCPSummaryHandler(BaseHTTPRequestHandler):
"""HTTP handler for MCP summary server."""
def log_message(self, format, *args):
logger.info(format % args)
def _send_json(self, status: int, payload: Any):
"""Send JSON response."""
body = json.dumps(payload, ensure_ascii=False).encode("utf-8")
self.send_response(status)
self.send_header("Content-Type", "application/json")
self.send_header("Content-Length", str(len(body)))
self.end_headers()
self.wfile.write(body)
def _auth_or_401(self):
"""Check authentication. Returns False if auth fails."""
try:
return require_auth(self.headers)
except PermissionError:
self._send_json(401, {"error": "Missing or invalid API key"})
return False
def do_GET(self):
"""Handle GET requests (health check)."""
if self.path == "/":
self._send_json(200, {
"service": "mcp-summary",
"transport": "streamable-http",
"model": MODEL_NAME,
"status": "running",
"docs": "Use POST / with MCP JSON-RPC (initialize, tools/list, tools/call)."
})
return
self.send_error(404, "Not Found")
def do_POST(self):
"""Handle MCP JSON-RPC requests."""
if self.path not in ("/", "/mcp"):
self.send_error(404, "Not Found")
return
if not self._auth_or_401():
return
length = int(self.headers.get("Content-Length", 0))
if length == 0:
self._send_json(400, {"error": "Empty body"})
return
raw = self.rfile.read(length)
try:
req = json.loads(raw)
except json.JSONDecodeError:
self._send_json(400, {"error": "Invalid JSON"})
return
method = req.get("method")
params = req.get("params") or {}
req_id = req.get("id")
logger.info(f"MCP request: method={method}, id={req_id}")
# MCP: initialize
if method == "initialize":
self._send_json(200, {
"jsonrpc": "2.0",
"id": req_id,
"result": {
"protocolVersion": "2025-11-25",
"capabilities": {
"tools": {}
},
"serverInfo": {
"name": "mcp-summary",
"version": "1.0.0"
}
}
})
return
# MCP: ping
if method == "ping":
self._send_json(200, {
"jsonrpc": "2.0",
"id": req_id,
"result": {}
})
return
# MCP: tools/list
if method == "tools/list":
self._send_json(200, {
"jsonrpc": "2.0",
"id": req_id,
"result": TOOLS_LIST
})
return
# MCP: tools/call
if method == "tools/call":
tool_name = params.get("name")
tool_args = params.get("arguments") or {}
try:
result = self._call_tool(tool_name, tool_args)
self._send_json(200, {
"jsonrpc": "2.0",
"id": req_id,
"result": {
"content": [
{"type": "text", "text": json.dumps(result, ensure_ascii=False)}
]
}
})
except Exception as e:
logger.error(f"Tool call failed: {e}", exc_info=True)
self._send_json(200, {
"jsonrpc": "2.0",
"id": req_id,
"error": {
"code": -32000,
"message": str(e)
}
})
return
# Unknown method
self._send_json(400, {"error": "Unknown method: " + str(method)})
def _call_tool(self, name: str, args: Dict[str, Any]) -> Any:
"""Execute a tool call."""
if name == "summarize_document":
text = args.get("text")
if not text:
raise ValueError("Text parameter is required")
max_length = args.get("max_length", MAX_DIRECT_SUMMARY_LENGTH)
return summarize_document(text, max_length)
raise ValueError(f"Unknown tool: {name}")
def main():
"""Start the MCP summary server."""
port = int(sys.argv[1]) if len(sys.argv) > 1 else int(os.environ.get("PORT", "8080"))
server = HTTPServer(("0.0.0.0", port), MCPSummaryHandler)
mode = "auth enabled (Bearer)" if API_KEY else "no auth (API_KEY not set)"
print(f"MCP Summary Server listening on 0.0.0.0:{port} [{mode}]")
print(f" - Model: {MODEL_NAME}")
print(f" - LLM URL: {OPENAPI_URL}")
print(f" - Chunk size: {CHUNK_SIZE} characters")
print(f" - Max direct text: {MAX_DIRECT_TEXT_LENGTH} characters")
print(f" - LLM timeout: {LLM_TIMEOUT} seconds")
try:
server.serve_forever()
except KeyboardInterrupt:
print("\nShutting down...")
server.server_close()
if __name__ == "__main__":
main()