#!/usr/bin/env python3 """ MCP Summary Server (Streamable HTTP transport) Designed to work with OpenWebUI's MCP (Streamable HTTP) integration. Summarizes documents by: 1. Checking text length 2. If short, summarizing directly with LLM 3. If long, chunking text, summarizing each chunk, then synthesizing All processing happens server-side, keeping full text out of the chat context window. Tools: - summarize_document: Summarize a document (handles chunking automatically) Auth: - If API_KEY is set: - Requires: Authorization: Bearer - If API_KEY is not set: - No auth required (for local/internal use). """ import json import os import sys import logging from http.server import HTTPServer, BaseHTTPRequestHandler from typing import Any, Dict, List, Optional import requests from requests.exceptions import RequestException # Configure logging logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') logger = logging.getLogger("mcp-summary") # MCP Server Configuration API_KEY = os.environ.get("API_KEY", "").strip() PORT = int(os.environ.get("PORT", "8080")) # LLM Configuration OPENAPI_URL = os.environ.get("OPENAPI_URL", "http://localhost:8080/v1") OPENAPI_API_KEY = os.environ.get("OPENAPI_API_KEY", "") MODEL_NAME = os.environ.get("MODEL_NAME", "gpt-4o") # Summarization Configuration CHUNK_SIZE = int(os.environ.get("CHUNK_SIZE", "4000")) OVERLAP = int(os.environ.get("OVERLAP", "200")) TARGET_INTERMEDIATE_SUMMARY_LENGTH = int(os.environ.get("TARGET_INTERMEDIATE_SUMMARY_LENGTH", "150")) MAX_DIRECT_SUMMARY_LENGTH = int(os.environ.get("MAX_DIRECT_SUMMARY_LENGTH", "100")) MAX_DIRECT_TEXT_LENGTH = int(os.environ.get("MAX_DIRECT_TEXT_LENGTH", "8000")) LLM_TIMEOUT = int(os.environ.get("LLM_TIMEOUT", "120")) # Tool definitions TOOLS_LIST: Dict[str, Any] = { "tools": [ { "name": "summarize_document", "description": "Summarize a document. Automatically handles chunking for long text. Returns a concise summary without exposing the full text.", "inputSchema": { "type": "object", "properties": { "text": { "type": "string", "description": "The document text to summarize" }, "max_length": { "type": "integer", "description": "Maximum length of summary in words (default: 100)" } }, "required": ["text"] } } ] } def get_bearer_token(headers: Any) -> Optional[str]: """Extract bearer token from Authorization header.""" auth = (headers.get("Authorization") or "").strip() if auth.startswith("Bearer "): return auth[len("Bearer "):].strip() return None def require_auth(headers: Any) -> bool: """Check authentication. Returns True if auth passes or is not required.""" if not API_KEY: return True token = get_bearer_token(headers) if not token or token != API_KEY: raise PermissionError("Missing or invalid API key") return True def call_llm(messages: List[Dict], temperature: float = 0.3) -> str: """Make an OpenAPI-compatible LLM call with error handling.""" url = f"{OPENAPI_URL}/chat/completions" headers = { "Content-Type": "application/json", "Authorization": f"Bearer {OPENAPI_API_KEY}" } payload = { "model": MODEL_NAME, "messages": messages, "temperature": temperature, "max_tokens": 2000, "top_p": 0.9 } try: logger.info(f"Calling LLM at {OPENAPI_URL} with model {MODEL_NAME}") response = requests.post(url, headers=headers, json=payload, timeout=LLM_TIMEOUT) response.raise_for_status() data = response.json() return data["choices"][0]["message"]["content"] except RequestException as e: logger.error(f"LLM request failed: {e}") raise RuntimeError(f"Failed to connect to LLM at {OPENAPI_URL}: {str(e)}") except Exception as e: logger.error(f"LLM call failed: {e}") raise RuntimeError(f"LLM call failed: {str(e)}") def chunk_text(text: str) -> List[str]: """Split text into chunks with overlap for summarization.""" if len(text) <= CHUNK_SIZE: return [text] chunks = [] start = 0 while start < len(text): end = min(start + CHUNK_SIZE, len(text)) break_point = end for marker in ["\n\n", "\n", ". ", "! ", "? "]: pos = text.rfind(marker, start + CHUNK_SIZE // 2, end) if pos > start: break_point = pos break chunk = text[start:break_point] if chunk.strip(): chunks.append(chunk) start = break_point - OVERLAP if break_point < len(text) else len(text) if start >= len(text): break logger.info(f"Split text into {len(chunks)} chunks") return chunks def summarize_chunk(chunk_text: str, chunk_num: int, total_chunks: int) -> str: """Summarize a single chunk of text.""" system_prompt = f"""You are a precise legal assistant creating concise, accurate summaries. You are processing chunk {chunk_num} of {total_chunks} from a larger document. Create a focused summary that: - Captures key points and important details - Is approximately {TARGET_INTERMEDIATE_SUMMARY_LENGTH} words - Can be combined with other chunk summaries - Uses clear, professional language - Preserves names, dates, and specific facts Respond as plain text without bullet points.""" user_prompt = f"""Summarize this text (chunk {chunk_num} of {total_chunks}): {chunk_text} Summary:""" messages = [ {"role": "system", "content": system_prompt}, {"role": "user", "content": user_prompt} ] logger.info(f"Summarizing chunk {chunk_num}/{total_chunks}") return call_llm(messages) def synthesize_summaries(chunk_summaries: List[str]) -> str: """Synthesize multiple chunk summaries into a single final summary.""" combined = "\n\n".join(chunk_summaries) system_prompt = """You are a precise legal assistant creating executive-level summaries. Synthesize the provided partial summaries into a single, cohesive summary that: - Is approximately 100 words - Captures the complete document picture - Is clear and professional - Removes redundancy - Maintains logical flow - Preserves all critical information Format as a single paragraph of plain text.""" user_prompt = f"""Synthesize these partial summaries into one cohesive summary: {combined} Final summary:""" messages = [ {"role": "system", "content": system_prompt}, {"role": "user", "content": user_prompt} ] logger.info(f"Synthesizing {len(chunk_summaries)} chunk summaries") return call_llm(messages) def summarize_document(text: str, max_length: int = MAX_DIRECT_SUMMARY_LENGTH) -> Dict[str, Any]: """ Main summarization function. - If text is short, summarize directly - If text is long, chunk and summarize each chunk, then synthesize """ original_length = len(text) text = text.strip() if not text: raise ValueError("Empty text provided") logger.info(f"Summarizing text of {original_length} characters") # Direct summarization for shorter texts if len(text) <= MAX_DIRECT_TEXT_LENGTH: system_prompt = f"""You are a precise legal assistant creating concise, accurate summaries. Create a summary that: - Is approximately {max_length} words - Captures key points and important details - Uses clear, professional language - Preserves names, dates, and specific facts Format as plain text without bullet points.""" user_prompt = f"""Summarize the following document: {text} Summary:""" messages = [ {"role": "system", "content": system_prompt}, {"role": "user", "content": user_prompt} ] summary = call_llm(messages) return { "summary": summary, "original_length": original_length, "method": "direct", "chunks": 1 } # Chunked summarization for longer texts chunks = chunk_text(text) chunk_summaries = [] for i, chunk in enumerate(chunks, 1): chunk_summary = summarize_chunk(chunk, i, len(chunks)) chunk_summaries.append(chunk_summary) final_summary = synthesize_summaries(chunk_summaries) return { "summary": final_summary, "original_length": original_length, "method": "chunked", "chunks": len(chunks) } class MCPSummaryHandler(BaseHTTPRequestHandler): """HTTP handler for MCP summary server.""" def log_message(self, format, *args): logger.info(format % args) def _send_json(self, status: int, payload: Any): """Send JSON response.""" body = json.dumps(payload, ensure_ascii=False).encode("utf-8") self.send_response(status) self.send_header("Content-Type", "application/json") self.send_header("Content-Length", str(len(body))) self.end_headers() self.wfile.write(body) def _auth_or_401(self): """Check authentication. Returns False if auth fails.""" try: return require_auth(self.headers) except PermissionError: self._send_json(401, {"error": "Missing or invalid API key"}) return False def do_GET(self): """Handle GET requests (health check).""" if self.path == "/": self._send_json(200, { "service": "mcp-summary", "transport": "streamable-http", "model": MODEL_NAME, "status": "running", "docs": "Use POST / with MCP JSON-RPC (initialize, tools/list, tools/call)." }) return self.send_error(404, "Not Found") def do_POST(self): """Handle MCP JSON-RPC requests.""" # Streamable HTTP MCP endpoint if self.path not in ("/", "/mcp"): self.send_error(404, "Not Found") return if not self._auth_or_401(): return length = int(self.headers.get("Content-Length", 0)) if length == 0: self._send_json(400, {"error": "Empty body"}) return raw = self.rfile.read(length) try: req = json.loads(raw) except json.JSONDecodeError: self._send_json(400, {"error": "Invalid JSON"}) return method = req.get("method") params = req.get("params") or {} req_id = req.get("id") logger.info(f"MCP request: method={method}, id={req_id}") # MCP: initialize if method == "initialize": self._send_json(200, { "jsonrpc": "2.0", "id": req_id, "result": { "protocolVersion": "2025-11-25", "capabilities": { "tools": {} }, "serverInfo": { "name": "mcp-summary", "version": "1.0.0" } } }) return # MCP: ping if method == "ping": self._send_json(200, { "jsonrpc": "2.0", "id": req_id, "result": {} }) return # MCP: tools/list if method == "tools/list": self._send_json(200, { "jsonrpc": "2.0", "id": req_id, "result": TOOLS_LIST }) return # MCP: tools/call if method == "tools/call": tool_name = params.get("name") tool_args = params.get("arguments") or {} try: result = self._call_tool(tool_name, tool_args) self._send_json(200, { "jsonrpc": "2.0", "id": req_id, "result": { "content": [ {"type": "text", "text": json.dumps(result, ensure_ascii=False)} ] } }) except Exception as e: logger.error(f"Tool call failed: {e}", exc_info=True) self._send_json(200, { "jsonrpc": "2.0", "id": req_id, "error": { "code": -32000, "message": str(e) } }) return # Unknown method self._send_json(400, {"error": "Unknown method: " + str(method)}) def _call_tool(self, name: str, args: Dict[str, Any]) -> Any: """Execute a tool call.""" if name == "summarize_document": text = args.get("text") if not text: raise ValueError("Text parameter is required") max_length = args.get("max_length", MAX_DIRECT_SUMMARY_LENGTH) return summarize_document(text, max_length) raise ValueError(f"Unknown tool: {name}") def main(): """Start the MCP summary server.""" port = int(sys.argv[1]) if len(sys.argv) > 1 else int(os.environ.get("PORT", "8080")) server = HTTPServer(("0.0.0.0", port), MCPSummaryHandler) mode = "auth enabled (Bearer)" if API_KEY else "no auth (API_KEY not set)" print(f"MCP Summary Server listening on 0.0.0.0:{port} [{mode}]") print(f" - Model: {MODEL_NAME}") print(f" - LLM URL: {OPENAPI_URL}") print(f" - Chunk size: {CHUNK_SIZE} characters") print(f" - Max direct text: {MAX_DIRECT_TEXT_LENGTH} characters") print(f" - LLM timeout: {LLM_TIMEOUT} seconds") try: server.serve_forever() except KeyboardInterrupt: print("\nShutting down...") server.server_close() if __name__ == "__main__": main()