427 lines
13 KiB
Python
427 lines
13 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
MCP Summary Server (Streamable HTTP transport)
|
|
|
|
Designed to work with OpenWebUI's MCP (Streamable HTTP) integration.
|
|
|
|
Summarizes documents by:
|
|
1. Checking text length
|
|
2. If short, summarizing directly with LLM
|
|
3. If long, chunking text, summarizing each chunk, then synthesizing
|
|
|
|
All processing happens server-side, keeping full text out of the chat context window.
|
|
|
|
Tools:
|
|
- summarize_document: Summarize a document (handles chunking automatically)
|
|
|
|
Auth:
|
|
- If API_KEY is set:
|
|
- Requires: Authorization: Bearer <API_KEY>
|
|
- If API_KEY is not set:
|
|
- No auth required (for local/internal use).
|
|
"""
|
|
|
|
import json
|
|
import os
|
|
import sys
|
|
import re
|
|
from http.server import HTTPServer, BaseHTTPRequestHandler
|
|
from typing import Any, Dict, List, Optional
|
|
import requests
|
|
|
|
# MCP Server Configuration
|
|
API_KEY = os.environ.get("API_KEY", "").strip()
|
|
PORT = int(os.environ.get("PORT", "8080"))
|
|
|
|
# LLM Configuration
|
|
OPENAPI_URL = os.environ.get("OPENAPI_URL", "http://localhost:8080/v1")
|
|
OPENAPI_API_KEY = os.environ.get("OPENAPI_API_KEY", "")
|
|
MODEL_NAME = os.environ.get("MODEL_NAME", "gpt-4o")
|
|
|
|
# Summarization Configuration
|
|
CHUNK_SIZE = int(os.environ.get("CHUNK_SIZE", "4000")) # Characters per chunk
|
|
OVERLAP = int(os.environ.get("OVERLAP", "200")) # Characters of overlap between chunks
|
|
TARGET_INTERMEDIATE_SUMMARY_LENGTH = int(os.environ.get("TARGET_INTERMEDIATE_SUMMARY_LENGTH", "150")) # Words
|
|
MAX_DIRECT_SUMMARY_LENGTH = int(os.environ.get("MAX_DIRECT_SUMMARY_LENGTH", "100")) # Words for final summary
|
|
MAX_DIRECT_TEXT_LENGTH = int(os.environ.get("MAX_DIRECT_TEXT_LENGTH", "8000")) # Characters before chunking
|
|
|
|
# Tool definitions
|
|
TOOLS_LIST: Dict[str, Any] = {
|
|
"tools": [
|
|
{
|
|
"name": "summarize_document",
|
|
"description": "Summarize a document. Automatically handles chunking for long text. Returns a concise summary without exposing the full text.",
|
|
"inputSchema": {
|
|
"type": "object",
|
|
"properties": {
|
|
"text": {
|
|
"type": "string",
|
|
"description": "The document text to summarize"
|
|
},
|
|
"max_length": {
|
|
"type": "integer",
|
|
"description": f"Maximum length of summary in words (default: {MAX_DIRECT_SUMMARY_LENGTH})"
|
|
}
|
|
},
|
|
"required": ["text"]
|
|
}
|
|
}
|
|
]
|
|
}
|
|
|
|
|
|
def call_llm(messages: List[Dict], temperature: float = 0.3) -> str:
|
|
"""Make an OpenAPI-compatible LLM call."""
|
|
url = f"{OPENAPI_URL}/chat/completions"
|
|
headers = {
|
|
"Content-Type": "application/json",
|
|
"Authorization": f"Bearer {OPENAPI_API_KEY}"
|
|
}
|
|
|
|
payload = {
|
|
"model": MODEL_NAME,
|
|
"messages": messages,
|
|
"temperature": temperature,
|
|
"max_tokens": 2000,
|
|
"top_p": 0.9
|
|
}
|
|
|
|
response = requests.post(url, headers=headers, json=payload, timeout=60)
|
|
response.raise_for_status()
|
|
|
|
data = response.json()
|
|
return data["choices"][0]["message"]["content"]
|
|
|
|
|
|
def chunk_text(text: str) -> List[str]:
|
|
"""Split text into chunks with overlap for summarization."""
|
|
if len(text) <= CHUNK_SIZE:
|
|
return [text]
|
|
|
|
chunks = []
|
|
start = 0
|
|
|
|
while start < len(text):
|
|
# Find a good breaking point (after sentence or paragraph)
|
|
end = min(start + CHUNK_SIZE, len(text))
|
|
|
|
# Try to break at sentence boundary
|
|
search_end = min(end, len(text))
|
|
break_point = -1
|
|
|
|
# Look for paragraph break first
|
|
for marker in ["\n\n", "\n"]:
|
|
pos = text.rfind(marker, start + CHUNK_SIZE // 2, search_end)
|
|
if pos > 0:
|
|
break_point = pos
|
|
break
|
|
|
|
# If no paragraph break, look for sentence break
|
|
if break_point == -1:
|
|
for marker in [".", "!", "?"]:
|
|
pos = text.rfind(marker, start + CHUNK_SIZE // 2, search_end)
|
|
if pos > 0:
|
|
break_point = pos
|
|
break
|
|
|
|
if break_point == -1:
|
|
break_point = end
|
|
|
|
chunk = text[start:break_point]
|
|
if chunk.strip():
|
|
chunks.append(chunk)
|
|
|
|
start = break_point - OVERLAP if break_point < len(text) else len(text)
|
|
if start >= len(text):
|
|
break
|
|
|
|
return chunks
|
|
|
|
|
|
def summarize_chunk(chunk: str, chunk_num: int, total_chunks: int) -> str:
|
|
"""Summarize a single chunk of text."""
|
|
system_prompt = f"""You are a precise legal assistant specializing in creating concise, accurate summaries.
|
|
|
|
You are processing chunk {chunk_num} of {total_chunks} from a larger document.
|
|
|
|
Your task: Create a focused summary of this chunk that:
|
|
- Captures the key points and important details
|
|
- Is approximately {TARGET_INTERMEDIATE_SUMMARY_LENGTH} words
|
|
- Can be combined with summaries of other chunks to form a complete picture
|
|
- Uses clear, professional language
|
|
- Preserves important names, dates, and specific facts
|
|
|
|
Format your response as plain text without bullet points or special formatting."""
|
|
|
|
user_prompt = f"""Summarize the following text (chunk {chunk_num} of {total_chunks}):
|
|
|
|
{text}
|
|
|
|
Summary:"""
|
|
|
|
messages = [
|
|
{"role": "system", "content": system_prompt},
|
|
{"role": "user", "content": user_prompt}
|
|
]
|
|
|
|
return call_llm(messages)
|
|
|
|
|
|
def synthesize_summaries(chunk_summaries: List[str]) -> str:
|
|
"""Synthesize multiple chunk summaries into a single final summary."""
|
|
combined = "\n\n".join(chunk_summaries)
|
|
|
|
system_prompt = """You are a precise legal assistant creating executive-level summaries.
|
|
|
|
Your task: Synthesize the provided partial summaries into a single, cohesive summary that:
|
|
- Is approximately 100 words
|
|
- Captures the complete picture of the document
|
|
- Is clear and professional
|
|
- Removes redundancy
|
|
- Maintains logical flow
|
|
- Preserves all critical information
|
|
|
|
Format your response as a single paragraph of plain text."""
|
|
|
|
user_prompt = f"""Synthesize the following partial summaries into one cohesive summary:
|
|
|
|
{combined}
|
|
|
|
Final summary:"""
|
|
|
|
messages = [
|
|
{"role": "system", "content": system_prompt},
|
|
{"role": "user", "content": user_prompt}
|
|
]
|
|
|
|
return call_llm(messages)
|
|
|
|
|
|
def summarize_document(text: str, max_length: int = MAX_DIRECT_SUMMARY_LENGTH) -> Dict[str, Any]:
|
|
"""
|
|
Main summarization function.
|
|
|
|
- If text is short, summarize directly
|
|
- If text is long, chunk and summarize each chunk, then synthesize
|
|
"""
|
|
original_length = len(text)
|
|
|
|
# Strip whitespace and validate
|
|
text = text.strip()
|
|
if not text:
|
|
raise ValueError("Empty text provided")
|
|
|
|
# Direct summarization for shorter texts
|
|
if len(text) <= MAX_DIRECT_TEXT_LENGTH:
|
|
system_prompt = f"""You are a precise legal assistant creating concise, accurate summaries.
|
|
|
|
Your task: Create a summary that:
|
|
- Is approximately {max_length} words
|
|
- Captures the key points and important details
|
|
- Uses clear, professional language
|
|
- Preserves important names, dates, and specific facts
|
|
- Is suitable for a legal professional
|
|
|
|
Format your response as plain text without bullet points or special formatting."""
|
|
|
|
user_prompt = f"""Summarize the following document:
|
|
|
|
{text}
|
|
|
|
Summary:"""
|
|
|
|
messages = [
|
|
{"role": "system", "content": system_prompt},
|
|
{"role": "user", "content": user_prompt}
|
|
]
|
|
|
|
summary = call_llm(messages)
|
|
|
|
return {
|
|
"summary": summary,
|
|
"original_length": original_length,
|
|
"method": "direct",
|
|
"chunks": 1
|
|
}
|
|
|
|
# Chunked summarization for longer texts
|
|
chunks = chunk_text(text)
|
|
|
|
# Summarize each chunk
|
|
chunk_summaries = []
|
|
for i, chunk in enumerate(chunks, 1):
|
|
chunk_summary = summarize_chunk(chunk, i, len(chunks))
|
|
chunk_summaries.append(chunk_summary)
|
|
|
|
# Synthesize into final summary
|
|
final_summary = synthesize_summaries(chunk_summaries)
|
|
|
|
return {
|
|
"summary": final_summary,
|
|
"original_length": original_length,
|
|
"method": "chunked",
|
|
"chunks": len(chunks)
|
|
}
|
|
|
|
|
|
class MCPSummaryHandler(BaseHTTPRequestHandler):
|
|
"""HTTP handler for MCP summary server."""
|
|
|
|
def log_message(self, format, *args):
|
|
# Quiet logs by default
|
|
pass
|
|
|
|
def _send_json(self, status: int, payload: Any):
|
|
"""Send JSON response."""
|
|
body = json.dumps(payload, ensure_ascii=False).encode("utf-8")
|
|
self.send_response(status)
|
|
self.send_header("Content-Type", "application/json")
|
|
self.send_header("Content-Length", str(len(body)))
|
|
self.end_headers()
|
|
self.wfile.write(body)
|
|
|
|
def _auth_or_401(self) -> bool:
|
|
"""Check authentication if API key is configured."""
|
|
if not API_KEY:
|
|
return True
|
|
|
|
auth_header = self.headers.get("Authorization", "")
|
|
if not auth_header.startswith("Bearer "):
|
|
self._send_json(401, {"error": "Missing or invalid API key"})
|
|
return False
|
|
|
|
token = auth_header[len("Bearer "):].strip()
|
|
if token != API_KEY:
|
|
self._send_json(401, {"error": "Invalid API key"})
|
|
return False
|
|
|
|
return True
|
|
|
|
def do_GET(self):
|
|
"""Handle GET requests (health check)."""
|
|
if self.path == "/":
|
|
self._send_json(200, {
|
|
"service": "mcp-summary",
|
|
"transport": "streamable-http",
|
|
"model": MODEL_NAME,
|
|
"docs": "Use POST / with MCP JSON-RPC (initialize, tools/list, tools/call)."
|
|
})
|
|
return
|
|
|
|
self.send_error(404, "Not Found")
|
|
|
|
def do_POST(self):
|
|
"""Handle MCP JSON-RPC requests."""
|
|
if self.path not in ("/", "/mcp"):
|
|
self.send_error(404, "Not Found")
|
|
return
|
|
|
|
if not self._auth_or_401():
|
|
return
|
|
|
|
# Parse request
|
|
length = int(self.headers.get("Content-Length", 0))
|
|
if length == 0:
|
|
self._send_json(400, {"error": "Empty body"})
|
|
return
|
|
|
|
raw = self.rfile.read(length)
|
|
try:
|
|
req = json.loads(raw)
|
|
except json.JSONDecodeError:
|
|
self._send_json(400, {"error": "Invalid JSON"})
|
|
return
|
|
|
|
method = req.get("method")
|
|
params = req.get("params") or {}
|
|
req_id = req.get("id")
|
|
|
|
# MCP: initialize
|
|
if method == "initialize":
|
|
self._send_json(200, {
|
|
"jsonrpc": "2.0",
|
|
"id": req_id,
|
|
"result": {
|
|
"protocolVersion": "2025-11-25",
|
|
"capabilities": {
|
|
"tools": {}
|
|
},
|
|
"serverInfo": {
|
|
"name": "mcp-summary",
|
|
"version": "1.0.0"
|
|
}
|
|
}
|
|
})
|
|
return
|
|
|
|
# MCP: tools/list
|
|
if method == "tools/list":
|
|
self._send_json(200, {
|
|
"jsonrpc": "2.0",
|
|
"id": req_id,
|
|
"result": TOOLS_LIST
|
|
})
|
|
return
|
|
|
|
# MCP: tools/call
|
|
if method == "tools/call":
|
|
tool_name = params.get("name")
|
|
tool_args = params.get("arguments") or {}
|
|
|
|
try:
|
|
result = self._call_tool(tool_name, tool_args)
|
|
self._send_json(200, {
|
|
"jsonrpc": "2.0",
|
|
"id": req_id,
|
|
"result": {
|
|
"content": [
|
|
{"type": "text", "text": json.dumps(result, ensure_ascii=False)}
|
|
]
|
|
}
|
|
})
|
|
except Exception as e:
|
|
self._send_json(200, {
|
|
"jsonrpc": "2.0",
|
|
"id": req_id,
|
|
"error": {
|
|
"code": -32000,
|
|
"message": str(e)
|
|
}
|
|
})
|
|
return
|
|
|
|
# Unknown method
|
|
self._send_json(400, {"error": "Unknown method: " + str(method)})
|
|
|
|
def _call_tool(self, name: str, args: Dict[str, Any]) -> Any:
|
|
"""Execute a tool call."""
|
|
if name == "summarize_document":
|
|
text = args.get("text")
|
|
if not text:
|
|
raise ValueError("Text parameter is required")
|
|
|
|
max_length = args.get("max_length", MAX_DIRECT_SUMMARY_LENGTH)
|
|
return summarize_document(text, max_length)
|
|
|
|
raise ValueError(f"Unknown tool: {name}")
|
|
|
|
|
|
def main():
|
|
"""Start the MCP summary server."""
|
|
server = HTTPServer(("0.0.0.0", PORT), MCPSummaryHandler)
|
|
mode = "auth enabled (Bearer)" if API_KEY else "no auth (API_KEY not set)"
|
|
print(f"MCP Summary Server listening on 0.0.0.0:{PORT} [{mode}]")
|
|
print(f" - Model: {MODEL_NAME}")
|
|
print(f" - Chunk size: {CHUNK_SIZE} characters")
|
|
print(f" - Max direct text: {MAX_DIRECT_TEXT_LENGTH} characters")
|
|
|
|
try:
|
|
server.serve_forever()
|
|
except KeyboardInterrupt:
|
|
print("\nShutting down...")
|
|
server.server_close()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|