From 491745733f03cc187de6848739e9764e63059fbe Mon Sep 17 00:00:00 2001 From: Admin Date: Sun, 14 Jun 2026 05:57:46 +0000 Subject: [PATCH] Initial commit: MCP Summary Server --- mcp_summary_server.py | 448 ++++++++++++++++++++++++++++++++++++++++++ test_mcp_server.py | 155 +++++++++++++++ 2 files changed, 603 insertions(+) create mode 100644 mcp_summary_server.py create mode 100644 test_mcp_server.py diff --git a/mcp_summary_server.py b/mcp_summary_server.py new file mode 100644 index 0000000..4b46153 --- /dev/null +++ b/mcp_summary_server.py @@ -0,0 +1,448 @@ +#!/usr/bin/env python3 +""" +MCP Summary Server (Streamable HTTP transport) + +Designed to work with OpenWebUI's MCP (Streamable HTTP) integration. + +Summarizes documents by: +1. Checking text length +2. If short, summarizing directly with LLM +3. If long, chunking text, summarizing each chunk, then synthesizing + +All processing happens server-side, keeping full text out of the chat context window. + +Tools: +- summarize_document: Summarize a document (handles chunking automatically) + +Auth: +- If API_KEY is set: + - Requires: Authorization: Bearer +- If API_KEY is not set: + - No auth required (for local/internal use). +""" + +import json +import os +import sys +import logging +from http.server import HTTPServer, BaseHTTPRequestHandler +from typing import Any, Dict, List, Optional +import requests +from requests.exceptions import RequestException + +# Configure logging +logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') +logger = logging.getLogger("mcp-summary") + +# MCP Server Configuration +API_KEY = os.environ.get("API_KEY", "").strip() +PORT = int(os.environ.get("PORT", "8080")) + +# LLM Configuration +OPENAPI_URL = os.environ.get("OPENAPI_URL", "http://localhost:8080/v1") +OPENAPI_API_KEY = os.environ.get("OPENAPI_API_KEY", "") +MODEL_NAME = os.environ.get("MODEL_NAME", "gpt-4o") + +# Summarization Configuration +CHUNK_SIZE = int(os.environ.get("CHUNK_SIZE", "4000")) +OVERLAP = int(os.environ.get("OVERLAP", "200")) +TARGET_INTERMEDIATE_SUMMARY_LENGTH = int(os.environ.get("TARGET_INTERMEDIATE_SUMMARY_LENGTH", "150")) +MAX_DIRECT_SUMMARY_LENGTH = int(os.environ.get("MAX_DIRECT_SUMMARY_LENGTH", "100")) +MAX_DIRECT_TEXT_LENGTH = int(os.environ.get("MAX_DIRECT_TEXT_LENGTH", "8000")) +LLM_TIMEOUT = int(os.environ.get("LLM_TIMEOUT", "120")) + +# Tool definitions +TOOLS_LIST: Dict[str, Any] = { + "tools": [ + { + "name": "summarize_document", + "description": "Summarize a document. Automatically handles chunking for long text. Returns a concise summary without exposing the full text.", + "inputSchema": { + "type": "object", + "properties": { + "text": { + "type": "string", + "description": "The document text to summarize" + }, + "max_length": { + "type": "integer", + "description": "Maximum length of summary in words (default: 100)" + } + }, + "required": ["text"] + } + } + ] +} + + +def get_bearer_token(headers: Any) -> Optional[str]: + """Extract bearer token from Authorization header.""" + auth = (headers.get("Authorization") or "").strip() + if auth.startswith("Bearer "): + return auth[len("Bearer "):].strip() + return None + + +def require_auth(headers: Any) -> bool: + """Check authentication. Returns True if auth passes or is not required.""" + if not API_KEY: + return True + + token = get_bearer_token(headers) + if not token or token != API_KEY: + raise PermissionError("Missing or invalid API key") + return True + + +def call_llm(messages: List[Dict], temperature: float = 0.3) -> str: + """Make an OpenAPI-compatible LLM call with error handling.""" + url = f"{OPENAPI_URL}/chat/completions" + headers = { + "Content-Type": "application/json", + "Authorization": f"Bearer {OPENAPI_API_KEY}" + } + + payload = { + "model": MODEL_NAME, + "messages": messages, + "temperature": temperature, + "max_tokens": 2000, + "top_p": 0.9 + } + + try: + logger.info(f"Calling LLM at {OPENAPI_URL} with model {MODEL_NAME}") + response = requests.post(url, headers=headers, json=payload, timeout=LLM_TIMEOUT) + response.raise_for_status() + + data = response.json() + return data["choices"][0]["message"]["content"] + + except RequestException as e: + logger.error(f"LLM request failed: {e}") + raise RuntimeError(f"Failed to connect to LLM at {OPENAPI_URL}: {str(e)}") + except Exception as e: + logger.error(f"LLM call failed: {e}") + raise RuntimeError(f"LLM call failed: {str(e)}") + + +def chunk_text(text: str) -> List[str]: + """Split text into chunks with overlap for summarization.""" + if len(text) <= CHUNK_SIZE: + return [text] + + chunks = [] + start = 0 + + while start < len(text): + end = min(start + CHUNK_SIZE, len(text)) + + break_point = end + for marker in ["\n\n", "\n", ". ", "! ", "? "]: + pos = text.rfind(marker, start + CHUNK_SIZE // 2, end) + if pos > start: + break_point = pos + break + + chunk = text[start:break_point] + if chunk.strip(): + chunks.append(chunk) + + start = break_point - OVERLAP if break_point < len(text) else len(text) + if start >= len(text): + break + + logger.info(f"Split text into {len(chunks)} chunks") + return chunks + + +def summarize_chunk(chunk_text: str, chunk_num: int, total_chunks: int) -> str: + """Summarize a single chunk of text.""" + system_prompt = f"""You are a precise legal assistant creating concise, accurate summaries. + +You are processing chunk {chunk_num} of {total_chunks} from a larger document. + +Create a focused summary that: +- Captures key points and important details +- Is approximately {TARGET_INTERMEDIATE_SUMMARY_LENGTH} words +- Can be combined with other chunk summaries +- Uses clear, professional language +- Preserves names, dates, and specific facts + +Respond as plain text without bullet points.""" + + user_prompt = f"""Summarize this text (chunk {chunk_num} of {total_chunks}): + +{chunk_text} + +Summary:""" + + messages = [ + {"role": "system", "content": system_prompt}, + {"role": "user", "content": user_prompt} + ] + + logger.info(f"Summarizing chunk {chunk_num}/{total_chunks}") + return call_llm(messages) + + +def synthesize_summaries(chunk_summaries: List[str]) -> str: + """Synthesize multiple chunk summaries into a single final summary.""" + combined = "\n\n".join(chunk_summaries) + + system_prompt = """You are a precise legal assistant creating executive-level summaries. + +Synthesize the provided partial summaries into a single, cohesive summary that: +- Is approximately 100 words +- Captures the complete document picture +- Is clear and professional +- Removes redundancy +- Maintains logical flow +- Preserves all critical information + +Format as a single paragraph of plain text.""" + + user_prompt = f"""Synthesize these partial summaries into one cohesive summary: + +{combined} + +Final summary:""" + + messages = [ + {"role": "system", "content": system_prompt}, + {"role": "user", "content": user_prompt} + ] + + logger.info(f"Synthesizing {len(chunk_summaries)} chunk summaries") + return call_llm(messages) + + +def summarize_document(text: str, max_length: int = MAX_DIRECT_SUMMARY_LENGTH) -> Dict[str, Any]: + """ + Main summarization function. + + - If text is short, summarize directly + - If text is long, chunk and summarize each chunk, then synthesize + """ + original_length = len(text) + + text = text.strip() + if not text: + raise ValueError("Empty text provided") + + logger.info(f"Summarizing text of {original_length} characters") + + # Direct summarization for shorter texts + if len(text) <= MAX_DIRECT_TEXT_LENGTH: + system_prompt = f"""You are a precise legal assistant creating concise, accurate summaries. + +Create a summary that: +- Is approximately {max_length} words +- Captures key points and important details +- Uses clear, professional language +- Preserves names, dates, and specific facts + +Format as plain text without bullet points.""" + + user_prompt = f"""Summarize the following document: + +{text} + +Summary:""" + + messages = [ + {"role": "system", "content": system_prompt}, + {"role": "user", "content": user_prompt} + ] + + summary = call_llm(messages) + + return { + "summary": summary, + "original_length": original_length, + "method": "direct", + "chunks": 1 + } + + # Chunked summarization for longer texts + chunks = chunk_text(text) + + chunk_summaries = [] + for i, chunk in enumerate(chunks, 1): + chunk_summary = summarize_chunk(chunk, i, len(chunks)) + chunk_summaries.append(chunk_summary) + + final_summary = synthesize_summaries(chunk_summaries) + + return { + "summary": final_summary, + "original_length": original_length, + "method": "chunked", + "chunks": len(chunks) + } + + +class MCPSummaryHandler(BaseHTTPRequestHandler): + """HTTP handler for MCP summary server.""" + + def log_message(self, format, *args): + logger.info(format % args) + + def _send_json(self, status: int, payload: Any): + """Send JSON response.""" + body = json.dumps(payload, ensure_ascii=False).encode("utf-8") + self.send_response(status) + self.send_header("Content-Type", "application/json") + self.send_header("Content-Length", str(len(body))) + self.end_headers() + self.wfile.write(body) + + def _auth_or_401(self): + """Check authentication. Returns False if auth fails.""" + try: + return require_auth(self.headers) + except PermissionError: + self._send_json(401, {"error": "Missing or invalid API key"}) + return False + + def do_GET(self): + """Handle GET requests (health check).""" + if self.path == "/": + self._send_json(200, { + "service": "mcp-summary", + "transport": "streamable-http", + "model": MODEL_NAME, + "status": "running", + "docs": "Use POST / with MCP JSON-RPC (initialize, tools/list, tools/call)." + }) + return + + self.send_error(404, "Not Found") + + def do_POST(self): + """Handle MCP JSON-RPC requests.""" + if self.path not in ("/", "/mcp"): + self.send_error(404, "Not Found") + return + + if not self._auth_or_401(): + return + + length = int(self.headers.get("Content-Length", 0)) + if length == 0: + self._send_json(400, {"error": "Empty body"}) + return + + raw = self.rfile.read(length) + try: + req = json.loads(raw) + except json.JSONDecodeError: + self._send_json(400, {"error": "Invalid JSON"}) + return + + method = req.get("method") + params = req.get("params") or {} + req_id = req.get("id") + + logger.info(f"MCP request: method={method}, id={req_id}") + + # MCP: initialize + if method == "initialize": + self._send_json(200, { + "jsonrpc": "2.0", + "id": req_id, + "result": { + "protocolVersion": "2025-11-25", + "capabilities": { + "tools": {} + }, + "serverInfo": { + "name": "mcp-summary", + "version": "1.0.0" + } + } + }) + return + + # MCP: ping + if method == "ping": + self._send_json(200, { + "jsonrpc": "2.0", + "id": req_id, + "result": {} + }) + return + + # MCP: tools/list + if method == "tools/list": + self._send_json(200, { + "jsonrpc": "2.0", + "id": req_id, + "result": TOOLS_LIST + }) + return + + # MCP: tools/call + if method == "tools/call": + tool_name = params.get("name") + tool_args = params.get("arguments") or {} + try: + result = self._call_tool(tool_name, tool_args) + self._send_json(200, { + "jsonrpc": "2.0", + "id": req_id, + "result": { + "content": [ + {"type": "text", "text": json.dumps(result, ensure_ascii=False)} + ] + } + }) + except Exception as e: + logger.error(f"Tool call failed: {e}", exc_info=True) + self._send_json(200, { + "jsonrpc": "2.0", + "id": req_id, + "error": { + "code": -32000, + "message": str(e) + } + }) + return + + # Unknown method + self._send_json(400, {"error": "Unknown method: " + str(method)}) + + def _call_tool(self, name: str, args: Dict[str, Any]) -> Any: + """Execute a tool call.""" + if name == "summarize_document": + text = args.get("text") + if not text: + raise ValueError("Text parameter is required") + + max_length = args.get("max_length", MAX_DIRECT_SUMMARY_LENGTH) + return summarize_document(text, max_length) + + raise ValueError(f"Unknown tool: {name}") + + +def main(): + """Start the MCP summary server.""" + port = int(sys.argv[1]) if len(sys.argv) > 1 else int(os.environ.get("PORT", "8080")) + server = HTTPServer(("0.0.0.0", port), MCPSummaryHandler) + mode = "auth enabled (Bearer)" if API_KEY else "no auth (API_KEY not set)" + print(f"MCP Summary Server listening on 0.0.0.0:{port} [{mode}]") + print(f" - Model: {MODEL_NAME}") + print(f" - LLM URL: {OPENAPI_URL}") + print(f" - Chunk size: {CHUNK_SIZE} characters") + print(f" - Max direct text: {MAX_DIRECT_TEXT_LENGTH} characters") + print(f" - LLM timeout: {LLM_TIMEOUT} seconds") + try: + server.serve_forever() + except KeyboardInterrupt: + print("\nShutting down...") + server.server_close() + + +if __name__ == "__main__": + main() diff --git a/test_mcp_server.py b/test_mcp_server.py new file mode 100644 index 0000000..592b3e8 --- /dev/null +++ b/test_mcp_server.py @@ -0,0 +1,155 @@ +#!/usr/bin/env python3 +""" +Test script to verify MCP summary server connectivity and functionality. +""" + +import json +import requests +import sys + +def test_health_check(url): + """Test basic health check.""" + try: + response = requests.get(url, timeout=5) + print(f"✓ Health check: {response.status_code}") + print(f" Response: {response.json()}") + return True + except Exception as e: + print(f"✗ Health check failed: {e}") + return False + +def test_mcp_initialize(url, api_key=None): + """Test MCP initialization.""" + headers = { + "Content-Type": "application/json" + } + if api_key: + headers["Authorization"] = f"Bearer {api_key}" + + payload = { + "jsonrpc": "2.0", + "id": 1, + "method": "initialize", + "params": { + "protocolVersion": "2025-11-25", + "capabilities": {}, + "clientInfo": { + "name": "test-client", + "version": "1.0.0" + } + } + } + + try: + response = requests.post(url, headers=headers, json=payload, timeout=10) + print(f"✓ MCP Initialize: {response.status_code}") + print(f" Response: {response.json()}") + return response.status_code == 200 + except Exception as e: + print(f"✗ MCP Initialize failed: {e}") + return False + +def test_tools_list(url, api_key=None): + """Test tools/list endpoint.""" + headers = { + "Content-Type": "application/json" + } + if api_key: + headers["Authorization"] = f"Bearer {api_key}" + + payload = { + "jsonrpc": "2.0", + "id": 2, + "method": "tools/list", + "params": {} + } + + try: + response = requests.post(url, headers=headers, json=payload, timeout=10) + print(f"✓ Tools List: {response.status_code}") + print(f" Response: {response.json()}") + return response.status_code == 200 + except Exception as e: + print(f"✗ Tools List failed: {e}") + return False + +def test_summarize(url, api_key=None): + """Test summarize_document tool.""" + headers = { + "Content-Type": "application/json" + } + if api_key: + headers["Authorization"] = f"Bearer {api_key}" + + # Simple test text + test_text = """ + The Supreme Court of Canada decided in R v. Smith, 2020 SCC 15, that the + police had violated the accused's section 8 Charter rights by conducting a + warrantless search of his vehicle. The Court found that the officers did not + have reasonable grounds to believe that evidence would be destroyed if they + waited to obtain a warrant. The evidence was excluded under section 24(2) of + the Charter, and the conviction was overturned. + """ + + payload = { + "jsonrpc": "2.0", + "id": 3, + "method": "tools/call", + "params": { + "name": "summarize_document", + "arguments": { + "text": test_text, + "max_length": 50 + } + } + } + + try: + response = requests.post(url, headers=headers, json=payload, timeout=120) + print(f"✓ Summarize Test: {response.status_code}") + print(f" Response: {response.json()}") + return response.status_code == 200 + except Exception as e: + print(f"✗ Summarize Test failed: {e}") + return False + +def main(): + url = sys.argv[1] if len(sys.argv) > 1 else "http://localhost:8080" + api_key = sys.argv[2] if len(sys.argv) > 2 else None + + print("=" * 60) + print("MCP Summary Server Test") + print("=" * 60) + print(f"URL: {url}") + print(f"API Key: {'[SET]' if api_key else '[NOT SET]'}") + print("=" * 60) + + print("\n1. Testing health check...") + health_ok = test_health_check(url) + + print("\n2. Testing MCP initialization...") + init_ok = test_mcp_initialize(url, api_key) + + print("\n3. Testing tools list...") + tools_ok = test_tools_list(url, api_key) + + print("\n4. Testing summarize tool...") + summarize_ok = test_summarize(url, api_key) + + print("\n" + "=" * 60) + print("Summary:") + print(f" Health Check: {'✓' if health_ok else '✗'}") + print(f" MCP Initialize: {'✓' if init_ok else '✗'}") + print(f" Tools List: {'✓' if tools_ok else '✗'}") + print(f" Summarize: {'✓' if summarize_ok else '✗'}") + print("=" * 60) + + if all([health_ok, init_ok, tools_ok, summarize_ok]): + print("\n✓ All tests passed!") + return 0 + else: + print("\n✗ Some tests failed. Check the errors above.") + return 1 + +if __name__ == "__main__": + sys.exit(main())