#!/usr/bin/env python3 """ MCP Summary Server (Streamable HTTP transport) Designed to work with OpenWebUI's MCP (Streamable HTTP) integration. Features: - Multiple specialized summarization, comparison, and extraction tools. - Automatic chunking and synthesis for long documents. - Temporary in-memory storage of document chunks/summaries for continued use. - Configurable cache limits via environment variables. Tools: - summarize_document - summarize_executive_brief - summarize_bullet_points - summarize_for_court - compare_documents - extract_key_points - extract_action_items - extract_entities - summarize_very_long_document - retrieve_document_data - query_stored_document - clear_document_cache Auth: - If API_KEY is set: - Requires: Authorization: Bearer - If API_KEY is not set: - No auth required (for local/internal use). """ import json import os import sys import time import uuid import logging from http.server import HTTPServer, BaseHTTPRequestHandler from typing import Any, Dict, List, Optional, Tuple import requests # Configure logging logging.basicConfig( level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s", stream=sys.stdout, ) logger = logging.getLogger("mcp-summary") # MCP Server Configuration API_KEY = os.environ.get("API_KEY", "").strip() PORT = int(os.environ.get("PORT", "8080")) # LLM Configuration OPENAPI_URL = os.environ.get("OPENAPI_URL", "http://localhost:8080/v1") OPENAPI_API_KEY = os.environ.get("OPENAPI_API_KEY", "") MODEL_NAME = os.environ.get("MODEL_NAME", "gpt-4o") LLM_TIMEOUT = int(os.environ.get("LLM_TIMEOUT", "120")) # Chunking Configuration CHUNK_SIZE = int(os.environ.get("CHUNK_SIZE", "4000")) OVERLAP = int(os.environ.get("OVERLAP", "200")) MAX_DIRECT_TEXT_LENGTH = int(os.environ.get("MAX_DIRECT_TEXT_LENGTH", "8000")) TARGET_INTERMEDIATE_SUMMARY_LENGTH = int(os.environ.get("TARGET_INTERMEDIATE_SUMMARY_LENGTH", "150")) # Cache Configuration MAX_STORED_DOCS = int(os.environ.get("MAX_STORED_DOCS", "500")) CACHE_TTL_SECONDS = int(os.environ.get("CACHE_TTL_SECONDS", "86400")) # 24h default # Temporary in-memory store DOCUMENT_STORE: Dict[str, Dict[str, Any]] = {} def generate_doc_id() -> str: return str(uuid.uuid4()) def evict_oldest_if_needed(): if len(DOCUMENT_STORE) <= MAX_STORED_DOCS: return # Remove oldest N entries to stay within limit sorted_keys = sorted(DOCUMENT_STORE.keys(), key=lambda k: DOCUMENT_STORE[k]["created_at"]) to_remove = len(DOCUMENT_STORE) - MAX_STORED_DOCS for k in sorted_keys[:to_remove]: DOCUMENT_STORE.pop(k, None) def store_document(doc_id: str, text_length: int, chunks: List[str], intermediate_summaries: List[str], final_output: str, tool_used: str): evict_oldest_if_needed() DOCUMENT_STORE[doc_id] = { "text_length": text_length, "chunks_count": len(chunks), "chunks": chunks, "intermediate_summaries": intermediate_summaries, "final_output": final_output, "tool_used": tool_used, "created_at": time.time() } def get_document(doc_id: str) -> Optional[Dict[str, Any]]: doc = DOCUMENT_STORE.get(doc_id) if not doc: return None # TTL check if time.time() - doc["created_at"] > CACHE_TTL_SECONDS: DOCUMENT_STORE.pop(doc_id, None) return None return doc def call_llm(system_prompt: str, user_prompt: str, max_tokens: int = 2000) -> str: url = f"{OPENAPI_URL}/chat/completions" headers = { "Content-Type": "application/json", "Authorization": f"Bearer {OPENAPI_API_KEY}" } payload = { "model": MODEL_NAME, "messages": [ {"role": "system", "content": system_prompt}, {"role": "user", "content": user_prompt} ], "temperature": 0.3, "max_tokens": max_tokens, "top_p": 0.9 } logger.info(f"Calling LLM: {OPENAPI_URL} model={MODEL_NAME}") response = requests.post(url, headers=headers, json=payload, timeout=LLM_TIMEOUT) response.raise_for_status() data = response.json() return data["choices"][0]["message"]["content"] def chunk_text(text: str) -> List[str]: if len(text) <= CHUNK_SIZE: return [text] chunks = [] start = 0 while start < len(text): end = min(start + CHUNK_SIZE, len(text)) break_point = end for marker in ["\n\n", "\n", ". ", "! ", "? "]: pos = text.rfind(marker, start + CHUNK_SIZE // 2, end) if pos > start: break_point = pos break chunk = text[start:break_point] if chunk.strip(): chunks.append(chunk) start = break_point - OVERLAP if break_point < len(text) else len(text) if start >= len(text): break return chunks def build_tool_prompts(tool_name: str) -> Tuple[str, str, str]: """ Returns (system_prompt, chunk_user_template, synthesis_user_template) Templates use {text} or {summaries} placeholders. """ base_system = "You are a precise legal assistant creating concise, accurate outputs." if tool_name == "summarize_document": sys_prompt = base_system + """ Create a clear, professional summary. - Approximately {max_length} words. - Capture key points, important details, names, dates, facts. - Format as plain text without bullet points. """ chunk_user = "Summarize this text (chunk {i} of {total}):\n\n{text}\n\nSummary:" synth_user = "Synthesize these partial summaries into one cohesive summary:\n\n{summaries}\n\nFinal summary:" elif tool_name == "summarize_executive_brief": sys_prompt = base_system + """ Create an executive brief: - 1–2 paragraphs. - High-level overview of issues, key findings, and outcomes. - Professional tone, suitable for senior decision-makers. - No bullet points. """ chunk_user = "Provide a concise executive-style summary of this chunk (chunk {i} of {total}):\n\n{text}\n\nExecutive summary:" synth_user = "Combine these executive-style summaries into a single, clear executive brief:\n\n{summaries}\n\nFinal executive brief:" elif tool_name == "summarize_bullet_points": sys_prompt = base_system + """ Create a concise bullet-point summary: - Use short bullets. - Focus on key points, actions, dates, and outcomes. - No long paragraphs. """ chunk_user = "Summarize this chunk as concise bullet points (chunk {i} of {total}):\n\n{text}\n\nBullet points:" synth_user = "Merge these bullet-point summaries into one clean, non-redundant bullet list:\n\n{summaries}\n\nFinal bullet summary:" elif tool_name == "summarize_for_court": sys_prompt = base_system + """ Create a summary suitable for a judge or legal professional: - Clearly state: parties, issues, key evidence, legal reasoning, outcome. - Use formal, precise language. - Keep it concise and structured. """ chunk_user = "Provide a court-style summary of this chunk (chunk {i} of {total}):\n\n{text}\n\nCourt summary:" synth_user = "Combine these summaries into a single, structured summary suitable for a court:\n\n{summaries}\n\nFinal court-style summary:" elif tool_name == "compare_documents": sys_prompt = base_system + """ Compare two documents and highlight: - Key differences and conflicts. - Changes in facts, reasoning, or outcomes. - Any new or removed conditions/requirements. Be precise and concise. """ # For compare, we process both texts together; chunking applies if combined is long. chunk_user = "Compare these excerpts and note key differences/conflicts (chunk {i} of {total}):\n\n{text}\n\nComparison:" synth_user = "Synthesize these partial comparisons into a single, clear comparison summary:\n\n{summaries}\n\nFinal comparison:" elif tool_name == "extract_key_points": sys_prompt = base_system + """ Extract the key points from the text: - Issues, holdings, obligations, dates, parties, statutes. - Use concise bullet points. - Do not add commentary. """ chunk_user = "Extract the key points from this chunk (chunk {i} of {total}):\n\n{text}\n\nKey points:" synth_user = "Combine these extracted key points into one clean, non-redundant list:\n\n{summaries}\n\nFinal key points:" elif tool_name == "extract_action_items": sys_prompt = base_system + """ Extract all action items, deadlines, and obligations: - Who must do what, by when. - Use concise bullets. - No extra commentary. """ chunk_user = "Extract action items from this chunk (chunk {i} of {total}):\n\n{text}\n\nAction items:" synth_user = "Combine these action items into one clear, non-redundant list:\n\n{summaries}\n\nFinal action items:" elif tool_name == "extract_entities": sys_prompt = base_system + """ Extract important entities: - People, organizations, locations, dates, legal references, case names. - Use concise bullets, grouped by type. - No extra commentary. """ chunk_user = "Extract entities from this chunk (chunk {i} of {total}):\n\n{text}\n\nEntities:" synth_user = "Merge these entity lists into one clean, grouped list:\n\n{summaries}\n\nFinal entities:" elif tool_name == "summarize_very_long_document": sys_prompt = base_system + """ Create a concise, structured summary optimized for very long documents: - Preserve core issues, reasoning, outcomes, and critical details. - Use clear paragraphs; avoid fluff. """ chunk_user = "Summarize this chunk from a very long document (chunk {i} of {total}):\n\n{text}\n\nSummary:" synth_user = "Synthesize these summaries into one concise, structured summary of the full document:\n\n{summaries}\n\nFinal summary:" else: # Fallback sys_prompt = base_system chunk_user = "Process this chunk (chunk {i} of {total}):\n\n{text}" synth_user = "Combine these results:\n\n{summaries}" return sys_prompt, chunk_user, synth_user def process_with_chunking( text: str, tool_name: str, max_length: int = 100 ) -> Tuple[str, List[str], List[str]]: """ Returns (final_output, chunks, intermediate_summaries) """ original_length = len(text) text = text.strip() if not text: raise ValueError("Empty text provided") sys_prompt, chunk_user_tpl, synth_user_tpl = build_tool_prompts(tool_name) # If short, direct processing if len(text) <= MAX_DIRECT_TEXT_LENGTH: user_prompt = chunk_user_tpl.format( i=1, total=1, text=text, max_length=max_length ) final_output = call_llm(sys_prompt, user_prompt) return final_output, [text], [final_output] # Chunked processing chunks = chunk_text(text) intermediate_summaries = [] for i, chunk in enumerate(chunks, 1): user_prompt = chunk_user_tpl.format(i=i, total=len(chunks), text=chunk) summary = call_llm(sys_prompt, user_prompt) intermediate_summaries.append(summary) # Synthesis combined = "\n\n".join(intermediate_summaries) synth_prompt = synth_user_tpl.format(summaries=combined) final_output = call_llm(sys_prompt, synth_prompt) return final_output, chunks, intermediate_summaries def compare_texts_with_chunking(text1: str, text2: str) -> Tuple[str, List[str], List[str]]: combined = f"=== DOCUMENT 1 ===\n\n{text1}\n\n=== DOCUMENT 2 ===\n\n{text2}" return process_with_chunking(combined, "compare_documents") def query_chunks(chunks: List[str], question: str) -> str: """ Simple semantic-style query: send question + chunks to LLM to extract relevant answers. For very large chunk lists, we can limit or sample; here we send all but keep prompt tight. """ system_prompt = ( "You are a precise legal assistant. Answer the question strictly based on the provided text. " "If the information is not present, say so clearly." ) user_prompt = ( "Question:\n" f"{question}\n\n" "Text:\n" + "\n\n".join(chunks) ) return call_llm(system_prompt, user_prompt, max_tokens=1500) # Tool definitions TOOLS_LIST: Dict[str, Any] = { "tools": [ { "name": "summarize_document", "description": "General-purpose document summarization. Prefer this for long or complex documents to avoid context limits.", "inputSchema": { "type": "object", "properties": { "text": {"type": "string", "description": "Full document text to summarize."}, "max_length": {"type": "integer", "description": "Max summary length in words (default: 100)."} }, "required": ["text"] } }, { "name": "summarize_executive_brief", "description": "Create a short executive brief (1–2 paragraphs) for senior decision-makers.", "inputSchema": { "type": "object", "properties": { "text": {"type": "string", "description": "Full document text."} }, "required": ["text"] } }, { "name": "summarize_bullet_points", "description": "Create a concise bullet-point summary of key points.", "inputSchema": { "type": "object", "properties": { "text": {"type": "string", "description": "Full document text."} }, "required": ["text"] } }, { "name": "summarize_for_court", "description": "Create a formal summary suitable for a judge or legal professional.", "inputSchema": { "type": "object", "properties": { "text": {"type": "string", "description": "Full document text."} }, "required": ["text"] } }, { "name": "compare_documents", "description": "Compare two documents and highlight key differences, conflicts, and changes.", "inputSchema": { "type": "object", "properties": { "text1": {"type": "string", "description": "First document text."}, "text2": {"type": "string", "description": "Second document text."} }, "required": ["text1", "text2"] } }, { "name": "extract_key_points", "description": "Extract key points: issues, holdings, obligations, dates, parties, statutes.", "inputSchema": { "type": "object", "properties": { "text": {"type": "string", "description": "Full document text."} }, "required": ["text"] } }, { "name": "extract_action_items", "description": "Extract all action items, deadlines, and obligations.", "inputSchema": { "type": "object", "properties": { "text": {"type": "string", "description": "Full document text."} }, "required": ["text"] } }, { "name": "extract_entities", "description": "Extract important entities: people, organizations, locations, dates, legal references.", "inputSchema": { "type": "object", "properties": { "text": {"type": "string", "description": "Full document text."} }, "required": ["text"] } }, { "name": "summarize_very_long_document", "description": "Optimized for very long documents with deeper chunking and hierarchical summarization.", "inputSchema": { "type": "object", "properties": { "text": {"type": "string", "description": "Very long document text."} }, "required": ["text"] } }, { "name": "retrieve_document_data", "description": "Retrieve stored data for a previously processed document by doc_id (final output, intermediate summaries, metadata).", "inputSchema": { "type": "object", "properties": { "doc_id": {"type": "string", "description": "Document ID returned when the document was first processed."} }, "required": ["doc_id"] } }, { "name": "query_stored_document", "description": "Ask a question about a previously processed document using its stored chunks.", "inputSchema": { "type": "object", "properties": { "doc_id": {"type": "string", "description": "Document ID."}, "question": {"type": "string", "description": "Your question about the document."} }, "required": ["doc_id", "question"] } }, { "name": "clear_document_cache", "description": "Clear all temporarily stored document data from this server.", "inputSchema": { "type": "object", "properties": {}, "required": [] } } ] } class MCPSummaryHandler(BaseHTTPRequestHandler): def log_message(self, format, *args): logger.info(format % args) def _send_json(self, status: int, payload: Any): body = json.dumps(payload, ensure_ascii=False).encode("utf-8") self.send_response(status) self.send_header("Content-Type", "application/json") self.send_header("Content-Length", str(len(body))) self.end_headers() self.wfile.write(body) def _auth_or_401(self) -> bool: auth = (self.headers.get("Authorization") or "").strip() if not API_KEY: return True if auth.startswith("Bearer "): token = auth[len("Bearer "):].strip() if token == API_KEY: return True self._send_json(401, {"error": "Missing or invalid API key"}) return False def do_GET(self): try: if self.path == "/": self._send_json(200, { "service": "mcp-summary", "transport": "streamable-http", "docs": "Use POST / with MCP JSON-RPC (initialize, tools/list, tools/call)." }) return self.send_error(404, "Not Found") except Exception as e: logger.error(f"GET error: {e}", exc_info=True) try: self.send_error(500, "Internal Server Error") except Exception: pass def do_POST(self): try: if self.path not in ("/", "/mcp"): self.send_error(404, "Not Found") return if not self._auth_or_401(): return length = int(self.headers.get("Content-Length", 0)) if length == 0: self._send_json(400, {"error": "Empty body"}) return raw = self.rfile.read(length) try: req = json.loads(raw) except json.JSONDecodeError: self._send_json(400, {"error": "Invalid JSON"}) return method = req.get("method") params = req.get("params") or {} req_id = req.get("id") logger.info(f"MCP request: method={method}, id={req_id}") # Notifications if isinstance(method, str) and method.startswith("notifications/"): if req_id is not None: self._send_json(200, {"jsonrpc": "2.0", "id": req_id, "result": {}}) else: self.send_response(200) self.send_header("Content-Length", "0") self.end_headers() return # initialize if method == "initialize": self._send_json(200, { "jsonrpc": "2.0", "id": req_id, "result": { "protocolVersion": "2025-11-25", "capabilities": {"tools": {}}, "serverInfo": {"name": "mcp-summary", "version": "1.0.0"} } }) return # tools/list if method == "tools/list": self._send_json(200, {"jsonrpc": "2.0", "id": req_id, "result": TOOLS_LIST}) return # tools/call if method == "tools/call": tool_name = params.get("name") tool_args = params.get("arguments") or {} try: result = self._call_tool(tool_name, tool_args) self._send_json(200, { "jsonrpc": "2.0", "id": req_id, "result": { "content": [ {"type": "text", "text": json.dumps(result, ensure_ascii=False)} ] } }) except Exception as e: logger.error(f"Tool call error: {e}", exc_info=True) self._send_json(200, { "jsonrpc": "2.0", "id": req_id, "error": {"code": -32000, "message": str(e)} }) return self._send_json(400, {"error": "Unknown method: " + str(method)}) except Exception as e: logger.error(f"POST error: {e}", exc_info=True) try: self.send_error(500, "Internal Server Error") except Exception: pass def _call_tool(self, name: str, args: Dict[str, Any]) -> Any: # General single-text tools if name in ( "summarize_document", "summarize_executive_brief", "summarize_bullet_points", "summarize_for_court", "extract_key_points", "extract_action_items", "extract_entities", "summarize_very_long_document" ): text = args.get("text") if not text: raise ValueError("Text parameter is required") max_length = args.get("max_length", 100) final_output, chunks, intermediate_summaries = process_with_chunking( text, name, max_length ) doc_id = generate_doc_id() store_document(doc_id, len(text), chunks, intermediate_summaries, final_output, name) return { "doc_id": doc_id, "tool": name, "result": final_output, "metadata": { "original_length": len(text), "chunks": len(chunks) } } # compare_documents if name == "compare_documents": text1 = args.get("text1") text2 = args.get("text2") if not text1 or not text2: raise ValueError("text1 and text2 are required") final_output, chunks, intermediate_summaries = compare_texts_with_chunking(text1, text2) doc_id = generate_doc_id() store_document(doc_id, len(text1) + len(text2), chunks, intermediate_summaries, final_output, name) return { "doc_id": doc_id, "tool": name, "result": final_output, "metadata": { "original_length_1": len(text1), "original_length_2": len(text2), "chunks": len(chunks) } } # retrieve_document_data if name == "retrieve_document_data": doc_id = args.get("doc_id") if not doc_id: raise ValueError("doc_id is required") doc = get_document(doc_id) if not doc: raise ValueError("Document not found or expired") # Return metadata + final_output + intermediate_summaries (chunks on demand if needed) return { "doc_id": doc_id, "tool_used": doc["tool_used"], "final_output": doc["final_output"], "intermediate_summaries": doc["intermediate_summaries"], "metadata": { "text_length": doc["text_length"], "chunks_count": doc["chunks_count"], "created_at": doc["created_at"] } } # query_stored_document if name == "query_stored_document": doc_id = args.get("doc_id") question = args.get("question") if not doc_id or not question: raise ValueError("doc_id and question are required") doc = get_document(doc_id) if not doc: raise ValueError("Document not found or expired") answer = query_chunks(doc["chunks"], question) return { "doc_id": doc_id, "question": question, "answer": answer } # clear_document_cache if name == "clear_document_cache": DOCUMENT_STORE.clear() return {"status": "ok", "message": "Document cache cleared."} raise ValueError(f"Unknown tool: {name}") def main(): port = int(sys.argv[1]) if len(sys.argv) > 1 else int(os.environ.get("PORT", "8080")) logger.info(f"Starting MCP Summary Server on 0.0.0.0:{port}") logger.info(f"Auth mode: {'Bearer (API_KEY set)' if API_KEY else 'none (API_KEY not set)'}") logger.info(f"LLM URL: {OPENAPI_URL}") logger.info(f"Model: {MODEL_NAME}") logger.info(f"Cache: max_docs={MAX_STORED_DOCS}, ttl={CACHE_TTL_SECONDS}s") server = HTTPServer(("0.0.0.0", port), MCPSummaryHandler) try: logger.info(f"MCP Summary Server listening on 0.0.0.0:{port}") server.serve_forever() except KeyboardInterrupt: logger.info("Shutting down...") server.server_close() if __name__ == "__main__": main()