Files
mcp-summary/mcp_summary_server.py

713 lines
26 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""
MCP Summary Server (Streamable HTTP transport)
Designed to work with OpenWebUI's MCP (Streamable HTTP) integration.
Features:
- Multiple specialized summarization, comparison, and extraction tools.
- Automatic chunking and synthesis for long documents.
- Temporary in-memory storage of document chunks/summaries for continued use.
- Configurable cache limits via environment variables.
Tools:
- summarize_document
- summarize_executive_brief
- summarize_bullet_points
- summarize_for_court
- compare_documents
- extract_key_points
- extract_action_items
- extract_entities
- summarize_very_long_document
- retrieve_document_data
- query_stored_document
- clear_document_cache
Auth:
- If API_KEY is set:
- Requires: Authorization: Bearer <API_KEY>
- If API_KEY is not set:
- No auth required (for local/internal use).
"""
import json
import os
import sys
import time
import uuid
import logging
from http.server import HTTPServer, BaseHTTPRequestHandler
from typing import Any, Dict, List, Optional, Tuple
import requests
# Configure logging
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
stream=sys.stdout,
)
logger = logging.getLogger("mcp-summary")
# MCP Server Configuration
API_KEY = os.environ.get("API_KEY", "").strip()
PORT = int(os.environ.get("PORT", "8080"))
# LLM Configuration
OPENAPI_URL = os.environ.get("OPENAPI_URL", "http://localhost:8080/v1")
OPENAPI_API_KEY = os.environ.get("OPENAPI_API_KEY", "")
MODEL_NAME = os.environ.get("MODEL_NAME", "gpt-4o")
LLM_TIMEOUT = int(os.environ.get("LLM_TIMEOUT", "120"))
# Chunking Configuration
CHUNK_SIZE = int(os.environ.get("CHUNK_SIZE", "4000"))
OVERLAP = int(os.environ.get("OVERLAP", "200"))
MAX_DIRECT_TEXT_LENGTH = int(os.environ.get("MAX_DIRECT_TEXT_LENGTH", "8000"))
TARGET_INTERMEDIATE_SUMMARY_LENGTH = int(os.environ.get("TARGET_INTERMEDIATE_SUMMARY_LENGTH", "150"))
# Cache Configuration
MAX_STORED_DOCS = int(os.environ.get("MAX_STORED_DOCS", "500"))
CACHE_TTL_SECONDS = int(os.environ.get("CACHE_TTL_SECONDS", "86400")) # 24h default
# Temporary in-memory store
DOCUMENT_STORE: Dict[str, Dict[str, Any]] = {}
def generate_doc_id() -> str:
return str(uuid.uuid4())
def evict_oldest_if_needed():
if len(DOCUMENT_STORE) <= MAX_STORED_DOCS:
return
# Remove oldest N entries to stay within limit
sorted_keys = sorted(DOCUMENT_STORE.keys(), key=lambda k: DOCUMENT_STORE[k]["created_at"])
to_remove = len(DOCUMENT_STORE) - MAX_STORED_DOCS
for k in sorted_keys[:to_remove]:
DOCUMENT_STORE.pop(k, None)
def store_document(doc_id: str, text_length: int, chunks: List[str],
intermediate_summaries: List[str], final_output: str,
tool_used: str):
evict_oldest_if_needed()
DOCUMENT_STORE[doc_id] = {
"text_length": text_length,
"chunks_count": len(chunks),
"chunks": chunks,
"intermediate_summaries": intermediate_summaries,
"final_output": final_output,
"tool_used": tool_used,
"created_at": time.time()
}
def get_document(doc_id: str) -> Optional[Dict[str, Any]]:
doc = DOCUMENT_STORE.get(doc_id)
if not doc:
return None
# TTL check
if time.time() - doc["created_at"] > CACHE_TTL_SECONDS:
DOCUMENT_STORE.pop(doc_id, None)
return None
return doc
def call_llm(system_prompt: str, user_prompt: str, max_tokens: int = 2000) -> str:
url = f"{OPENAPI_URL}/chat/completions"
headers = {
"Content-Type": "application/json",
"Authorization": f"Bearer {OPENAPI_API_KEY}"
}
payload = {
"model": MODEL_NAME,
"messages": [
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_prompt}
],
"temperature": 0.3,
"max_tokens": max_tokens,
"top_p": 0.9
}
logger.info(f"Calling LLM: {OPENAPI_URL} model={MODEL_NAME}")
response = requests.post(url, headers=headers, json=payload, timeout=LLM_TIMEOUT)
response.raise_for_status()
data = response.json()
return data["choices"][0]["message"]["content"]
def chunk_text(text: str) -> List[str]:
if len(text) <= CHUNK_SIZE:
return [text]
chunks = []
start = 0
while start < len(text):
end = min(start + CHUNK_SIZE, len(text))
break_point = end
for marker in ["\n\n", "\n", ". ", "! ", "? "]:
pos = text.rfind(marker, start + CHUNK_SIZE // 2, end)
if pos > start:
break_point = pos
break
chunk = text[start:break_point]
if chunk.strip():
chunks.append(chunk)
start = break_point - OVERLAP if break_point < len(text) else len(text)
if start >= len(text):
break
return chunks
def build_tool_prompts(tool_name: str) -> Tuple[str, str, str]:
"""
Returns (system_prompt, chunk_user_template, synthesis_user_template)
Templates use {text} or {summaries} placeholders.
"""
base_system = "You are a precise legal assistant creating concise, accurate outputs."
if tool_name == "summarize_document":
sys_prompt = base_system + """
Create a clear, professional summary.
- Approximately {max_length} words.
- Capture key points, important details, names, dates, facts.
- Format as plain text without bullet points.
"""
chunk_user = "Summarize this text (chunk {i} of {total}):\n\n{text}\n\nSummary:"
synth_user = "Synthesize these partial summaries into one cohesive summary:\n\n{summaries}\n\nFinal summary:"
elif tool_name == "summarize_executive_brief":
sys_prompt = base_system + """
Create an executive brief:
- 12 paragraphs.
- High-level overview of issues, key findings, and outcomes.
- Professional tone, suitable for senior decision-makers.
- No bullet points.
"""
chunk_user = "Provide a concise executive-style summary of this chunk (chunk {i} of {total}):\n\n{text}\n\nExecutive summary:"
synth_user = "Combine these executive-style summaries into a single, clear executive brief:\n\n{summaries}\n\nFinal executive brief:"
elif tool_name == "summarize_bullet_points":
sys_prompt = base_system + """
Create a concise bullet-point summary:
- Use short bullets.
- Focus on key points, actions, dates, and outcomes.
- No long paragraphs.
"""
chunk_user = "Summarize this chunk as concise bullet points (chunk {i} of {total}):\n\n{text}\n\nBullet points:"
synth_user = "Merge these bullet-point summaries into one clean, non-redundant bullet list:\n\n{summaries}\n\nFinal bullet summary:"
elif tool_name == "summarize_for_court":
sys_prompt = base_system + """
Create a summary suitable for a judge or legal professional:
- Clearly state: parties, issues, key evidence, legal reasoning, outcome.
- Use formal, precise language.
- Keep it concise and structured.
"""
chunk_user = "Provide a court-style summary of this chunk (chunk {i} of {total}):\n\n{text}\n\nCourt summary:"
synth_user = "Combine these summaries into a single, structured summary suitable for a court:\n\n{summaries}\n\nFinal court-style summary:"
elif tool_name == "compare_documents":
sys_prompt = base_system + """
Compare two documents and highlight:
- Key differences and conflicts.
- Changes in facts, reasoning, or outcomes.
- Any new or removed conditions/requirements.
Be precise and concise.
"""
# For compare, we process both texts together; chunking applies if combined is long.
chunk_user = "Compare these excerpts and note key differences/conflicts (chunk {i} of {total}):\n\n{text}\n\nComparison:"
synth_user = "Synthesize these partial comparisons into a single, clear comparison summary:\n\n{summaries}\n\nFinal comparison:"
elif tool_name == "extract_key_points":
sys_prompt = base_system + """
Extract the key points from the text:
- Issues, holdings, obligations, dates, parties, statutes.
- Use concise bullet points.
- Do not add commentary.
"""
chunk_user = "Extract the key points from this chunk (chunk {i} of {total}):\n\n{text}\n\nKey points:"
synth_user = "Combine these extracted key points into one clean, non-redundant list:\n\n{summaries}\n\nFinal key points:"
elif tool_name == "extract_action_items":
sys_prompt = base_system + """
Extract all action items, deadlines, and obligations:
- Who must do what, by when.
- Use concise bullets.
- No extra commentary.
"""
chunk_user = "Extract action items from this chunk (chunk {i} of {total}):\n\n{text}\n\nAction items:"
synth_user = "Combine these action items into one clear, non-redundant list:\n\n{summaries}\n\nFinal action items:"
elif tool_name == "extract_entities":
sys_prompt = base_system + """
Extract important entities:
- People, organizations, locations, dates, legal references, case names.
- Use concise bullets, grouped by type.
- No extra commentary.
"""
chunk_user = "Extract entities from this chunk (chunk {i} of {total}):\n\n{text}\n\nEntities:"
synth_user = "Merge these entity lists into one clean, grouped list:\n\n{summaries}\n\nFinal entities:"
elif tool_name == "summarize_very_long_document":
sys_prompt = base_system + """
Create a concise, structured summary optimized for very long documents:
- Preserve core issues, reasoning, outcomes, and critical details.
- Use clear paragraphs; avoid fluff.
"""
chunk_user = "Summarize this chunk from a very long document (chunk {i} of {total}):\n\n{text}\n\nSummary:"
synth_user = "Synthesize these summaries into one concise, structured summary of the full document:\n\n{summaries}\n\nFinal summary:"
else:
# Fallback
sys_prompt = base_system
chunk_user = "Process this chunk (chunk {i} of {total}):\n\n{text}"
synth_user = "Combine these results:\n\n{summaries}"
return sys_prompt, chunk_user, synth_user
def process_with_chunking(
text: str,
tool_name: str,
max_length: int = 100
) -> Tuple[str, List[str], List[str]]:
"""
Returns (final_output, chunks, intermediate_summaries)
"""
original_length = len(text)
text = text.strip()
if not text:
raise ValueError("Empty text provided")
sys_prompt, chunk_user_tpl, synth_user_tpl = build_tool_prompts(tool_name)
# If short, direct processing
if len(text) <= MAX_DIRECT_TEXT_LENGTH:
user_prompt = chunk_user_tpl.format(
i=1, total=1, text=text, max_length=max_length
)
final_output = call_llm(sys_prompt, user_prompt)
return final_output, [text], [final_output]
# Chunked processing
chunks = chunk_text(text)
intermediate_summaries = []
for i, chunk in enumerate(chunks, 1):
user_prompt = chunk_user_tpl.format(i=i, total=len(chunks), text=chunk)
summary = call_llm(sys_prompt, user_prompt)
intermediate_summaries.append(summary)
# Synthesis
combined = "\n\n".join(intermediate_summaries)
synth_prompt = synth_user_tpl.format(summaries=combined)
final_output = call_llm(sys_prompt, synth_prompt)
return final_output, chunks, intermediate_summaries
def compare_texts_with_chunking(text1: str, text2: str) -> Tuple[str, List[str], List[str]]:
combined = f"=== DOCUMENT 1 ===\n\n{text1}\n\n=== DOCUMENT 2 ===\n\n{text2}"
return process_with_chunking(combined, "compare_documents")
def query_chunks(chunks: List[str], question: str) -> str:
"""
Simple semantic-style query: send question + chunks to LLM to extract relevant answers.
For very large chunk lists, we can limit or sample; here we send all but keep prompt tight.
"""
system_prompt = (
"You are a precise legal assistant. Answer the question strictly based on the provided text. "
"If the information is not present, say so clearly."
)
user_prompt = (
"Question:\n"
f"{question}\n\n"
"Text:\n"
+ "\n\n".join(chunks)
)
return call_llm(system_prompt, user_prompt, max_tokens=1500)
# Tool definitions
TOOLS_LIST: Dict[str, Any] = {
"tools": [
{
"name": "summarize_document",
"description": "General-purpose document summarization. Prefer this for long or complex documents to avoid context limits.",
"inputSchema": {
"type": "object",
"properties": {
"text": {"type": "string", "description": "Full document text to summarize."},
"max_length": {"type": "integer", "description": "Max summary length in words (default: 100)."}
},
"required": ["text"]
}
},
{
"name": "summarize_executive_brief",
"description": "Create a short executive brief (12 paragraphs) for senior decision-makers.",
"inputSchema": {
"type": "object",
"properties": {
"text": {"type": "string", "description": "Full document text."}
},
"required": ["text"]
}
},
{
"name": "summarize_bullet_points",
"description": "Create a concise bullet-point summary of key points.",
"inputSchema": {
"type": "object",
"properties": {
"text": {"type": "string", "description": "Full document text."}
},
"required": ["text"]
}
},
{
"name": "summarize_for_court",
"description": "Create a formal summary suitable for a judge or legal professional.",
"inputSchema": {
"type": "object",
"properties": {
"text": {"type": "string", "description": "Full document text."}
},
"required": ["text"]
}
},
{
"name": "compare_documents",
"description": "Compare two documents and highlight key differences, conflicts, and changes.",
"inputSchema": {
"type": "object",
"properties": {
"text1": {"type": "string", "description": "First document text."},
"text2": {"type": "string", "description": "Second document text."}
},
"required": ["text1", "text2"]
}
},
{
"name": "extract_key_points",
"description": "Extract key points: issues, holdings, obligations, dates, parties, statutes.",
"inputSchema": {
"type": "object",
"properties": {
"text": {"type": "string", "description": "Full document text."}
},
"required": ["text"]
}
},
{
"name": "extract_action_items",
"description": "Extract all action items, deadlines, and obligations.",
"inputSchema": {
"type": "object",
"properties": {
"text": {"type": "string", "description": "Full document text."}
},
"required": ["text"]
}
},
{
"name": "extract_entities",
"description": "Extract important entities: people, organizations, locations, dates, legal references.",
"inputSchema": {
"type": "object",
"properties": {
"text": {"type": "string", "description": "Full document text."}
},
"required": ["text"]
}
},
{
"name": "summarize_very_long_document",
"description": "Optimized for very long documents with deeper chunking and hierarchical summarization.",
"inputSchema": {
"type": "object",
"properties": {
"text": {"type": "string", "description": "Very long document text."}
},
"required": ["text"]
}
},
{
"name": "retrieve_document_data",
"description": "Retrieve stored data for a previously processed document by doc_id (final output, intermediate summaries, metadata).",
"inputSchema": {
"type": "object",
"properties": {
"doc_id": {"type": "string", "description": "Document ID returned when the document was first processed."}
},
"required": ["doc_id"]
}
},
{
"name": "query_stored_document",
"description": "Ask a question about a previously processed document using its stored chunks.",
"inputSchema": {
"type": "object",
"properties": {
"doc_id": {"type": "string", "description": "Document ID."},
"question": {"type": "string", "description": "Your question about the document."}
},
"required": ["doc_id", "question"]
}
},
{
"name": "clear_document_cache",
"description": "Clear all temporarily stored document data from this server.",
"inputSchema": {
"type": "object",
"properties": {},
"required": []
}
}
]
}
class MCPSummaryHandler(BaseHTTPRequestHandler):
def log_message(self, format, *args):
logger.info(format % args)
def _send_json(self, status: int, payload: Any):
body = json.dumps(payload, ensure_ascii=False).encode("utf-8")
self.send_response(status)
self.send_header("Content-Type", "application/json")
self.send_header("Content-Length", str(len(body)))
self.end_headers()
self.wfile.write(body)
def _auth_or_401(self) -> bool:
auth = (self.headers.get("Authorization") or "").strip()
if not API_KEY:
return True
if auth.startswith("Bearer "):
token = auth[len("Bearer "):].strip()
if token == API_KEY:
return True
self._send_json(401, {"error": "Missing or invalid API key"})
return False
def do_GET(self):
try:
if self.path == "/":
self._send_json(200, {
"service": "mcp-summary",
"transport": "streamable-http",
"docs": "Use POST / with MCP JSON-RPC (initialize, tools/list, tools/call)."
})
return
self.send_error(404, "Not Found")
except Exception as e:
logger.error(f"GET error: {e}", exc_info=True)
try:
self.send_error(500, "Internal Server Error")
except Exception:
pass
def do_POST(self):
try:
if self.path not in ("/", "/mcp"):
self.send_error(404, "Not Found")
return
if not self._auth_or_401():
return
length = int(self.headers.get("Content-Length", 0))
if length == 0:
self._send_json(400, {"error": "Empty body"})
return
raw = self.rfile.read(length)
try:
req = json.loads(raw)
except json.JSONDecodeError:
self._send_json(400, {"error": "Invalid JSON"})
return
method = req.get("method")
params = req.get("params") or {}
req_id = req.get("id")
logger.info(f"MCP request: method={method}, id={req_id}")
# Notifications
if isinstance(method, str) and method.startswith("notifications/"):
if req_id is not None:
self._send_json(200, {"jsonrpc": "2.0", "id": req_id, "result": {}})
else:
self.send_response(200)
self.send_header("Content-Length", "0")
self.end_headers()
return
# initialize
if method == "initialize":
self._send_json(200, {
"jsonrpc": "2.0",
"id": req_id,
"result": {
"protocolVersion": "2025-11-25",
"capabilities": {"tools": {}},
"serverInfo": {"name": "mcp-summary", "version": "1.0.0"}
}
})
return
# tools/list
if method == "tools/list":
self._send_json(200, {"jsonrpc": "2.0", "id": req_id, "result": TOOLS_LIST})
return
# tools/call
if method == "tools/call":
tool_name = params.get("name")
tool_args = params.get("arguments") or {}
try:
result = self._call_tool(tool_name, tool_args)
self._send_json(200, {
"jsonrpc": "2.0",
"id": req_id,
"result": {
"content": [
{"type": "text", "text": json.dumps(result, ensure_ascii=False)}
]
}
})
except Exception as e:
logger.error(f"Tool call error: {e}", exc_info=True)
self._send_json(200, {
"jsonrpc": "2.0",
"id": req_id,
"error": {"code": -32000, "message": str(e)}
})
return
self._send_json(400, {"error": "Unknown method: " + str(method)})
except Exception as e:
logger.error(f"POST error: {e}", exc_info=True)
try:
self.send_error(500, "Internal Server Error")
except Exception:
pass
def _call_tool(self, name: str, args: Dict[str, Any]) -> Any:
# General single-text tools
if name in (
"summarize_document",
"summarize_executive_brief",
"summarize_bullet_points",
"summarize_for_court",
"extract_key_points",
"extract_action_items",
"extract_entities",
"summarize_very_long_document"
):
text = args.get("text")
if not text:
raise ValueError("Text parameter is required")
max_length = args.get("max_length", 100)
final_output, chunks, intermediate_summaries = process_with_chunking(
text, name, max_length
)
doc_id = generate_doc_id()
store_document(doc_id, len(text), chunks, intermediate_summaries, final_output, name)
return {
"doc_id": doc_id,
"tool": name,
"result": final_output,
"metadata": {
"original_length": len(text),
"chunks": len(chunks)
}
}
# compare_documents
if name == "compare_documents":
text1 = args.get("text1")
text2 = args.get("text2")
if not text1 or not text2:
raise ValueError("text1 and text2 are required")
final_output, chunks, intermediate_summaries = compare_texts_with_chunking(text1, text2)
doc_id = generate_doc_id()
store_document(doc_id, len(text1) + len(text2), chunks, intermediate_summaries, final_output, name)
return {
"doc_id": doc_id,
"tool": name,
"result": final_output,
"metadata": {
"original_length_1": len(text1),
"original_length_2": len(text2),
"chunks": len(chunks)
}
}
# retrieve_document_data
if name == "retrieve_document_data":
doc_id = args.get("doc_id")
if not doc_id:
raise ValueError("doc_id is required")
doc = get_document(doc_id)
if not doc:
raise ValueError("Document not found or expired")
# Return metadata + final_output + intermediate_summaries (chunks on demand if needed)
return {
"doc_id": doc_id,
"tool_used": doc["tool_used"],
"final_output": doc["final_output"],
"intermediate_summaries": doc["intermediate_summaries"],
"metadata": {
"text_length": doc["text_length"],
"chunks_count": doc["chunks_count"],
"created_at": doc["created_at"]
}
}
# query_stored_document
if name == "query_stored_document":
doc_id = args.get("doc_id")
question = args.get("question")
if not doc_id or not question:
raise ValueError("doc_id and question are required")
doc = get_document(doc_id)
if not doc:
raise ValueError("Document not found or expired")
answer = query_chunks(doc["chunks"], question)
return {
"doc_id": doc_id,
"question": question,
"answer": answer
}
# clear_document_cache
if name == "clear_document_cache":
DOCUMENT_STORE.clear()
return {"status": "ok", "message": "Document cache cleared."}
raise ValueError(f"Unknown tool: {name}")
def main():
port = int(sys.argv[1]) if len(sys.argv) > 1 else int(os.environ.get("PORT", "8080"))
logger.info(f"Starting MCP Summary Server on 0.0.0.0:{port}")
logger.info(f"Auth mode: {'Bearer (API_KEY set)' if API_KEY else 'none (API_KEY not set)'}")
logger.info(f"LLM URL: {OPENAPI_URL}")
logger.info(f"Model: {MODEL_NAME}")
logger.info(f"Cache: max_docs={MAX_STORED_DOCS}, ttl={CACHE_TTL_SECONDS}s")
server = HTTPServer(("0.0.0.0", port), MCPSummaryHandler)
try:
logger.info(f"MCP Summary Server listening on 0.0.0.0:{port}")
server.serve_forever()
except KeyboardInterrupt:
logger.info("Shutting down...")
server.server_close()
if __name__ == "__main__":
main()