Compare commits
4 Commits
63617550a1
..
main
| Author | SHA1 | Date | |
|---|---|---|---|
| dce352b02a | |||
| 73fb7000b5 | |||
| 741714a4bc | |||
| 208e4195b0 |
+624
-298
@@ -4,15 +4,25 @@ MCP Summary Server (Streamable HTTP transport)
|
||||
|
||||
Designed to work with OpenWebUI's MCP (Streamable HTTP) integration.
|
||||
|
||||
Summarizes documents by:
|
||||
1. Checking text length
|
||||
2. If short, summarizing directly with LLM
|
||||
3. If long, chunking text, summarizing each chunk, then synthesizing
|
||||
|
||||
All processing happens server-side, keeping full text out of the chat context window.
|
||||
Features:
|
||||
- Multiple specialized summarization, comparison, and extraction tools.
|
||||
- Automatic chunking and synthesis for long documents.
|
||||
- Temporary in-memory storage of document chunks/summaries for continued use.
|
||||
- Configurable cache limits via environment variables.
|
||||
|
||||
Tools:
|
||||
- summarize_document: Summarize a document (handles chunking automatically)
|
||||
- summarize_document
|
||||
- summarize_executive_brief
|
||||
- summarize_bullet_points
|
||||
- summarize_for_court
|
||||
- compare_documents
|
||||
- extract_key_points
|
||||
- extract_action_items
|
||||
- extract_entities
|
||||
- summarize_very_long_document
|
||||
- retrieve_document_data
|
||||
- query_stored_document
|
||||
- clear_document_cache
|
||||
|
||||
Auth:
|
||||
- If API_KEY is set:
|
||||
@@ -24,231 +34,446 @@ Auth:
|
||||
import json
|
||||
import os
|
||||
import sys
|
||||
import time
|
||||
import uuid
|
||||
import logging
|
||||
from http.server import HTTPServer, BaseHTTPRequestHandler
|
||||
from typing import Any, Dict, Optional
|
||||
from typing import Any, Dict, List, Optional, Tuple
|
||||
import requests
|
||||
|
||||
# Configure logging
|
||||
logging.basicConfig(
|
||||
level=logging.INFO,
|
||||
format="%(asctime)s [%(levelname)s] %(message)s",
|
||||
stream=sys.stdout,
|
||||
)
|
||||
logger = logging.getLogger("mcp-summary")
|
||||
|
||||
# MCP Server Configuration
|
||||
API_KEY = os.environ.get("API_KEY", "").strip()
|
||||
PORT = int(os.environ.get("PORT", "8080"))
|
||||
|
||||
# LLM Configuration
|
||||
OPENAPI_URL = os.environ.get("OPENAPI_URL", "http://localhost:8080/v1")
|
||||
OPENAPI_API_KEY = os.environ.get("OPENAPI_API_KEY", "")
|
||||
MODEL_NAME = os.environ.get("MODEL_NAME", "gpt-4o")
|
||||
LLM_TIMEOUT = int(os.environ.get("LLM_TIMEOUT", "120"))
|
||||
|
||||
# Chunking Configuration
|
||||
CHUNK_SIZE = int(os.environ.get("CHUNK_SIZE", "4000"))
|
||||
OVERLAP = int(os.environ.get("OVERLAP", "200"))
|
||||
MAX_DIRECT_TEXT_LENGTH = int(os.environ.get("MAX_DIRECT_TEXT_LENGTH", "8000"))
|
||||
TARGET_INTERMEDIATE_SUMMARY_LENGTH = int(os.environ.get("TARGET_INTERMEDIATE_SUMMARY_LENGTH", "150"))
|
||||
|
||||
# Cache Configuration
|
||||
MAX_STORED_DOCS = int(os.environ.get("MAX_STORED_DOCS", "500"))
|
||||
CACHE_TTL_SECONDS = int(os.environ.get("CACHE_TTL_SECONDS", "86400")) # 24h default
|
||||
|
||||
# Temporary in-memory store
|
||||
DOCUMENT_STORE: Dict[str, Dict[str, Any]] = {}
|
||||
|
||||
|
||||
def generate_doc_id() -> str:
|
||||
return str(uuid.uuid4())
|
||||
|
||||
|
||||
def evict_oldest_if_needed():
|
||||
if len(DOCUMENT_STORE) <= MAX_STORED_DOCS:
|
||||
return
|
||||
# Remove oldest N entries to stay within limit
|
||||
sorted_keys = sorted(DOCUMENT_STORE.keys(), key=lambda k: DOCUMENT_STORE[k]["created_at"])
|
||||
to_remove = len(DOCUMENT_STORE) - MAX_STORED_DOCS
|
||||
for k in sorted_keys[:to_remove]:
|
||||
DOCUMENT_STORE.pop(k, None)
|
||||
|
||||
|
||||
def store_document(doc_id: str, text_length: int, chunks: List[str],
|
||||
intermediate_summaries: List[str], final_output: str,
|
||||
tool_used: str):
|
||||
evict_oldest_if_needed()
|
||||
DOCUMENT_STORE[doc_id] = {
|
||||
"text_length": text_length,
|
||||
"chunks_count": len(chunks),
|
||||
"chunks": chunks,
|
||||
"intermediate_summaries": intermediate_summaries,
|
||||
"final_output": final_output,
|
||||
"tool_used": tool_used,
|
||||
"created_at": time.time()
|
||||
}
|
||||
|
||||
|
||||
def get_document(doc_id: str) -> Optional[Dict[str, Any]]:
|
||||
doc = DOCUMENT_STORE.get(doc_id)
|
||||
if not doc:
|
||||
return None
|
||||
# TTL check
|
||||
if time.time() - doc["created_at"] > CACHE_TTL_SECONDS:
|
||||
DOCUMENT_STORE.pop(doc_id, None)
|
||||
return None
|
||||
return doc
|
||||
|
||||
|
||||
def call_llm(system_prompt: str, user_prompt: str, max_tokens: int = 2000) -> str:
|
||||
url = f"{OPENAPI_URL}/chat/completions"
|
||||
headers = {
|
||||
"Content-Type": "application/json",
|
||||
"Authorization": f"Bearer {OPENAPI_API_KEY}"
|
||||
}
|
||||
payload = {
|
||||
"model": MODEL_NAME,
|
||||
"messages": [
|
||||
{"role": "system", "content": system_prompt},
|
||||
{"role": "user", "content": user_prompt}
|
||||
],
|
||||
"temperature": 0.3,
|
||||
"max_tokens": max_tokens,
|
||||
"top_p": 0.9
|
||||
}
|
||||
logger.info(f"Calling LLM: {OPENAPI_URL} model={MODEL_NAME}")
|
||||
response = requests.post(url, headers=headers, json=payload, timeout=LLM_TIMEOUT)
|
||||
response.raise_for_status()
|
||||
data = response.json()
|
||||
return data["choices"][0]["message"]["content"]
|
||||
|
||||
|
||||
def chunk_text(text: str) -> List[str]:
|
||||
if len(text) <= CHUNK_SIZE:
|
||||
return [text]
|
||||
chunks = []
|
||||
start = 0
|
||||
while start < len(text):
|
||||
end = min(start + CHUNK_SIZE, len(text))
|
||||
break_point = end
|
||||
for marker in ["\n\n", "\n", ". ", "! ", "? "]:
|
||||
pos = text.rfind(marker, start + CHUNK_SIZE // 2, end)
|
||||
if pos > start:
|
||||
break_point = pos
|
||||
break
|
||||
chunk = text[start:break_point]
|
||||
if chunk.strip():
|
||||
chunks.append(chunk)
|
||||
start = break_point - OVERLAP if break_point < len(text) else len(text)
|
||||
if start >= len(text):
|
||||
break
|
||||
return chunks
|
||||
|
||||
|
||||
def build_tool_prompts(tool_name: str) -> Tuple[str, str, str]:
|
||||
"""
|
||||
Returns (system_prompt, chunk_user_template, synthesis_user_template)
|
||||
Templates use {text} or {summaries} placeholders.
|
||||
"""
|
||||
base_system = "You are a precise legal assistant creating concise, accurate outputs."
|
||||
|
||||
if tool_name == "summarize_document":
|
||||
sys_prompt = base_system + """
|
||||
Create a clear, professional summary.
|
||||
- Approximately {max_length} words.
|
||||
- Capture key points, important details, names, dates, facts.
|
||||
- Format as plain text without bullet points.
|
||||
"""
|
||||
chunk_user = "Summarize this text (chunk {i} of {total}):\n\n{text}\n\nSummary:"
|
||||
synth_user = "Synthesize these partial summaries into one cohesive summary:\n\n{summaries}\n\nFinal summary:"
|
||||
|
||||
elif tool_name == "summarize_executive_brief":
|
||||
sys_prompt = base_system + """
|
||||
Create an executive brief:
|
||||
- 1–2 paragraphs.
|
||||
- High-level overview of issues, key findings, and outcomes.
|
||||
- Professional tone, suitable for senior decision-makers.
|
||||
- No bullet points.
|
||||
"""
|
||||
chunk_user = "Provide a concise executive-style summary of this chunk (chunk {i} of {total}):\n\n{text}\n\nExecutive summary:"
|
||||
synth_user = "Combine these executive-style summaries into a single, clear executive brief:\n\n{summaries}\n\nFinal executive brief:"
|
||||
|
||||
elif tool_name == "summarize_bullet_points":
|
||||
sys_prompt = base_system + """
|
||||
Create a concise bullet-point summary:
|
||||
- Use short bullets.
|
||||
- Focus on key points, actions, dates, and outcomes.
|
||||
- No long paragraphs.
|
||||
"""
|
||||
chunk_user = "Summarize this chunk as concise bullet points (chunk {i} of {total}):\n\n{text}\n\nBullet points:"
|
||||
synth_user = "Merge these bullet-point summaries into one clean, non-redundant bullet list:\n\n{summaries}\n\nFinal bullet summary:"
|
||||
|
||||
elif tool_name == "summarize_for_court":
|
||||
sys_prompt = base_system + """
|
||||
Create a summary suitable for a judge or legal professional:
|
||||
- Clearly state: parties, issues, key evidence, legal reasoning, outcome.
|
||||
- Use formal, precise language.
|
||||
- Keep it concise and structured.
|
||||
"""
|
||||
chunk_user = "Provide a court-style summary of this chunk (chunk {i} of {total}):\n\n{text}\n\nCourt summary:"
|
||||
synth_user = "Combine these summaries into a single, structured summary suitable for a court:\n\n{summaries}\n\nFinal court-style summary:"
|
||||
|
||||
elif tool_name == "compare_documents":
|
||||
sys_prompt = base_system + """
|
||||
Compare two documents and highlight:
|
||||
- Key differences and conflicts.
|
||||
- Changes in facts, reasoning, or outcomes.
|
||||
- Any new or removed conditions/requirements.
|
||||
Be precise and concise.
|
||||
"""
|
||||
# For compare, we process both texts together; chunking applies if combined is long.
|
||||
chunk_user = "Compare these excerpts and note key differences/conflicts (chunk {i} of {total}):\n\n{text}\n\nComparison:"
|
||||
synth_user = "Synthesize these partial comparisons into a single, clear comparison summary:\n\n{summaries}\n\nFinal comparison:"
|
||||
|
||||
elif tool_name == "extract_key_points":
|
||||
sys_prompt = base_system + """
|
||||
Extract the key points from the text:
|
||||
- Issues, holdings, obligations, dates, parties, statutes.
|
||||
- Use concise bullet points.
|
||||
- Do not add commentary.
|
||||
"""
|
||||
chunk_user = "Extract the key points from this chunk (chunk {i} of {total}):\n\n{text}\n\nKey points:"
|
||||
synth_user = "Combine these extracted key points into one clean, non-redundant list:\n\n{summaries}\n\nFinal key points:"
|
||||
|
||||
elif tool_name == "extract_action_items":
|
||||
sys_prompt = base_system + """
|
||||
Extract all action items, deadlines, and obligations:
|
||||
- Who must do what, by when.
|
||||
- Use concise bullets.
|
||||
- No extra commentary.
|
||||
"""
|
||||
chunk_user = "Extract action items from this chunk (chunk {i} of {total}):\n\n{text}\n\nAction items:"
|
||||
synth_user = "Combine these action items into one clear, non-redundant list:\n\n{summaries}\n\nFinal action items:"
|
||||
|
||||
elif tool_name == "extract_entities":
|
||||
sys_prompt = base_system + """
|
||||
Extract important entities:
|
||||
- People, organizations, locations, dates, legal references, case names.
|
||||
- Use concise bullets, grouped by type.
|
||||
- No extra commentary.
|
||||
"""
|
||||
chunk_user = "Extract entities from this chunk (chunk {i} of {total}):\n\n{text}\n\nEntities:"
|
||||
synth_user = "Merge these entity lists into one clean, grouped list:\n\n{summaries}\n\nFinal entities:"
|
||||
|
||||
elif tool_name == "summarize_very_long_document":
|
||||
sys_prompt = base_system + """
|
||||
Create a concise, structured summary optimized for very long documents:
|
||||
- Preserve core issues, reasoning, outcomes, and critical details.
|
||||
- Use clear paragraphs; avoid fluff.
|
||||
"""
|
||||
chunk_user = "Summarize this chunk from a very long document (chunk {i} of {total}):\n\n{text}\n\nSummary:"
|
||||
synth_user = "Synthesize these summaries into one concise, structured summary of the full document:\n\n{summaries}\n\nFinal summary:"
|
||||
|
||||
else:
|
||||
# Fallback
|
||||
sys_prompt = base_system
|
||||
chunk_user = "Process this chunk (chunk {i} of {total}):\n\n{text}"
|
||||
synth_user = "Combine these results:\n\n{summaries}"
|
||||
|
||||
return sys_prompt, chunk_user, synth_user
|
||||
|
||||
|
||||
def process_with_chunking(
|
||||
text: str,
|
||||
tool_name: str,
|
||||
max_length: int = 100
|
||||
) -> Tuple[str, List[str], List[str]]:
|
||||
"""
|
||||
Returns (final_output, chunks, intermediate_summaries)
|
||||
"""
|
||||
original_length = len(text)
|
||||
text = text.strip()
|
||||
if not text:
|
||||
raise ValueError("Empty text provided")
|
||||
|
||||
sys_prompt, chunk_user_tpl, synth_user_tpl = build_tool_prompts(tool_name)
|
||||
|
||||
# If short, direct processing
|
||||
if len(text) <= MAX_DIRECT_TEXT_LENGTH:
|
||||
user_prompt = chunk_user_tpl.format(
|
||||
i=1, total=1, text=text, max_length=max_length
|
||||
)
|
||||
final_output = call_llm(sys_prompt, user_prompt)
|
||||
return final_output, [text], [final_output]
|
||||
|
||||
# Chunked processing
|
||||
chunks = chunk_text(text)
|
||||
intermediate_summaries = []
|
||||
|
||||
for i, chunk in enumerate(chunks, 1):
|
||||
user_prompt = chunk_user_tpl.format(i=i, total=len(chunks), text=chunk)
|
||||
summary = call_llm(sys_prompt, user_prompt)
|
||||
intermediate_summaries.append(summary)
|
||||
|
||||
# Synthesis
|
||||
combined = "\n\n".join(intermediate_summaries)
|
||||
synth_prompt = synth_user_tpl.format(summaries=combined)
|
||||
final_output = call_llm(sys_prompt, synth_prompt)
|
||||
|
||||
return final_output, chunks, intermediate_summaries
|
||||
|
||||
|
||||
def compare_texts_with_chunking(text1: str, text2: str) -> Tuple[str, List[str], List[str]]:
|
||||
combined = f"=== DOCUMENT 1 ===\n\n{text1}\n\n=== DOCUMENT 2 ===\n\n{text2}"
|
||||
return process_with_chunking(combined, "compare_documents")
|
||||
|
||||
|
||||
def query_chunks(chunks: List[str], question: str) -> str:
|
||||
"""
|
||||
Simple semantic-style query: send question + chunks to LLM to extract relevant answers.
|
||||
For very large chunk lists, we can limit or sample; here we send all but keep prompt tight.
|
||||
"""
|
||||
system_prompt = (
|
||||
"You are a precise legal assistant. Answer the question strictly based on the provided text. "
|
||||
"If the information is not present, say so clearly."
|
||||
)
|
||||
user_prompt = (
|
||||
"Question:\n"
|
||||
f"{question}\n\n"
|
||||
"Text:\n"
|
||||
+ "\n\n".join(chunks)
|
||||
)
|
||||
return call_llm(system_prompt, user_prompt, max_tokens=1500)
|
||||
|
||||
|
||||
# Tool definitions
|
||||
TOOLS_LIST: Dict[str, Any] = {
|
||||
"tools": [
|
||||
{
|
||||
"name": "summarize_document",
|
||||
"description": "Summarize a document. Automatically handles chunking for long text. Returns a concise summary without exposing the full text.",
|
||||
"description": "General-purpose document summarization. Prefer this for long or complex documents to avoid context limits.",
|
||||
"inputSchema": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"text": {
|
||||
"type": "string",
|
||||
"description": "The document text to summarize"
|
||||
},
|
||||
"max_length": {
|
||||
"type": "integer",
|
||||
"description": "Maximum length of summary in words (default: 100)"
|
||||
}
|
||||
"text": {"type": "string", "description": "Full document text to summarize."},
|
||||
"max_length": {"type": "integer", "description": "Max summary length in words (default: 100)."}
|
||||
},
|
||||
"required": ["text"]
|
||||
}
|
||||
},
|
||||
{
|
||||
"name": "summarize_executive_brief",
|
||||
"description": "Create a short executive brief (1–2 paragraphs) for senior decision-makers.",
|
||||
"inputSchema": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"text": {"type": "string", "description": "Full document text."}
|
||||
},
|
||||
"required": ["text"]
|
||||
}
|
||||
},
|
||||
{
|
||||
"name": "summarize_bullet_points",
|
||||
"description": "Create a concise bullet-point summary of key points.",
|
||||
"inputSchema": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"text": {"type": "string", "description": "Full document text."}
|
||||
},
|
||||
"required": ["text"]
|
||||
}
|
||||
},
|
||||
{
|
||||
"name": "summarize_for_court",
|
||||
"description": "Create a formal summary suitable for a judge or legal professional.",
|
||||
"inputSchema": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"text": {"type": "string", "description": "Full document text."}
|
||||
},
|
||||
"required": ["text"]
|
||||
}
|
||||
},
|
||||
{
|
||||
"name": "compare_documents",
|
||||
"description": "Compare two documents and highlight key differences, conflicts, and changes.",
|
||||
"inputSchema": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"text1": {"type": "string", "description": "First document text."},
|
||||
"text2": {"type": "string", "description": "Second document text."}
|
||||
},
|
||||
"required": ["text1", "text2"]
|
||||
}
|
||||
},
|
||||
{
|
||||
"name": "extract_key_points",
|
||||
"description": "Extract key points: issues, holdings, obligations, dates, parties, statutes.",
|
||||
"inputSchema": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"text": {"type": "string", "description": "Full document text."}
|
||||
},
|
||||
"required": ["text"]
|
||||
}
|
||||
},
|
||||
{
|
||||
"name": "extract_action_items",
|
||||
"description": "Extract all action items, deadlines, and obligations.",
|
||||
"inputSchema": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"text": {"type": "string", "description": "Full document text."}
|
||||
},
|
||||
"required": ["text"]
|
||||
}
|
||||
},
|
||||
{
|
||||
"name": "extract_entities",
|
||||
"description": "Extract important entities: people, organizations, locations, dates, legal references.",
|
||||
"inputSchema": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"text": {"type": "string", "description": "Full document text."}
|
||||
},
|
||||
"required": ["text"]
|
||||
}
|
||||
},
|
||||
{
|
||||
"name": "summarize_very_long_document",
|
||||
"description": "Optimized for very long documents with deeper chunking and hierarchical summarization.",
|
||||
"inputSchema": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"text": {"type": "string", "description": "Very long document text."}
|
||||
},
|
||||
"required": ["text"]
|
||||
}
|
||||
},
|
||||
{
|
||||
"name": "retrieve_document_data",
|
||||
"description": "Retrieve stored data for a previously processed document by doc_id (final output, intermediate summaries, metadata).",
|
||||
"inputSchema": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"doc_id": {"type": "string", "description": "Document ID returned when the document was first processed."}
|
||||
},
|
||||
"required": ["doc_id"]
|
||||
}
|
||||
},
|
||||
{
|
||||
"name": "query_stored_document",
|
||||
"description": "Ask a question about a previously processed document using its stored chunks.",
|
||||
"inputSchema": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"doc_id": {"type": "string", "description": "Document ID."},
|
||||
"question": {"type": "string", "description": "Your question about the document."}
|
||||
},
|
||||
"required": ["doc_id", "question"]
|
||||
}
|
||||
},
|
||||
{
|
||||
"name": "clear_document_cache",
|
||||
"description": "Clear all temporarily stored document data from this server.",
|
||||
"inputSchema": {
|
||||
"type": "object",
|
||||
"properties": {},
|
||||
"required": []
|
||||
}
|
||||
}
|
||||
]
|
||||
}
|
||||
|
||||
|
||||
def get_bearer_token(headers: Any) -> Optional[str]:
|
||||
"""Extract bearer token from Authorization header."""
|
||||
auth = (headers.get("Authorization") or "").strip()
|
||||
if auth.startswith("Bearer "):
|
||||
return auth[len("Bearer "):].strip()
|
||||
return None
|
||||
|
||||
|
||||
def require_auth(headers: Any) -> bool:
|
||||
"""Check authentication if API key is configured."""
|
||||
# If API_KEY is not set, allow unauthenticated access
|
||||
if not API_KEY:
|
||||
return True
|
||||
|
||||
token = get_bearer_token(headers)
|
||||
if not token or token != API_KEY:
|
||||
raise PermissionError("Missing or invalid API key")
|
||||
return True
|
||||
|
||||
|
||||
def call_llm(text: str, system_prompt: str, max_tokens: int = 2000) -> str:
|
||||
"""Make an OpenAPI-compatible LLM call."""
|
||||
openapi_url = os.environ.get("OPENAPI_URL", "http://localhost:8080/v1")
|
||||
openapi_api_key = os.environ.get("OPENAPI_API_KEY", "")
|
||||
model_name = os.environ.get("MODEL_NAME", "gpt-4o")
|
||||
timeout = int(os.environ.get("LLM_TIMEOUT", "120"))
|
||||
|
||||
url = f"{openapi_url}/chat/completions"
|
||||
headers = {
|
||||
"Content-Type": "application/json",
|
||||
"Authorization": f"Bearer {openapi_api_key}"
|
||||
}
|
||||
|
||||
payload = {
|
||||
"model": model_name,
|
||||
"messages": [
|
||||
{"role": "system", "content": system_prompt},
|
||||
{"role": "user", "content": text}
|
||||
],
|
||||
"temperature": 0.3,
|
||||
"max_tokens": max_tokens,
|
||||
"top_p": 0.9
|
||||
}
|
||||
|
||||
response = requests.post(url, headers=headers, json=payload, timeout=timeout)
|
||||
response.raise_for_status()
|
||||
|
||||
data = response.json()
|
||||
return data["choices"][0]["message"]["content"]
|
||||
|
||||
|
||||
def chunk_text(text: str) -> list:
|
||||
"""Split text into chunks with overlap for summarization."""
|
||||
chunk_size = int(os.environ.get("CHUNK_SIZE", "4000"))
|
||||
overlap = int(os.environ.get("OVERLAP", "200"))
|
||||
|
||||
if len(text) <= chunk_size:
|
||||
return [text]
|
||||
|
||||
chunks = []
|
||||
start = 0
|
||||
|
||||
while start < len(text):
|
||||
end = min(start + chunk_size, len(text))
|
||||
|
||||
# Try to break at sentence/paragraph boundary
|
||||
break_point = end
|
||||
for marker in ["\n\n", "\n", ". ", "! ", "? "]:
|
||||
pos = text.rfind(marker, start + chunk_size // 2, end)
|
||||
if pos > start:
|
||||
break_point = pos
|
||||
break
|
||||
|
||||
chunk = text[start:break_point]
|
||||
if chunk.strip():
|
||||
chunks.append(chunk)
|
||||
|
||||
start = break_point - overlap if break_point < len(text) else len(text)
|
||||
if start >= len(text):
|
||||
break
|
||||
|
||||
return chunks
|
||||
|
||||
|
||||
def summarize_document(text: str, max_length: int = 100) -> dict:
|
||||
"""
|
||||
Main summarization function.
|
||||
|
||||
- If text is short, summarize directly
|
||||
- If text is long, chunk and summarize each chunk, then synthesize
|
||||
"""
|
||||
original_length = len(text)
|
||||
|
||||
text = text.strip()
|
||||
if not text:
|
||||
raise ValueError("Empty text provided")
|
||||
|
||||
max_direct_length = int(os.environ.get("MAX_DIRECT_TEXT_LENGTH", "8000"))
|
||||
intermediate_length = int(os.environ.get("TARGET_INTERMEDIATE_SUMMARY_LENGTH", "150"))
|
||||
|
||||
# Direct summarization for shorter texts
|
||||
if len(text) <= max_direct_length:
|
||||
system_prompt = f"""You are a precise legal assistant creating concise, accurate summaries.
|
||||
|
||||
Create a summary that:
|
||||
- Is approximately {max_length} words
|
||||
- Captures key points and important details
|
||||
- Uses clear, professional language
|
||||
- Preserves names, dates, and specific facts
|
||||
|
||||
Format as plain text without bullet points."""
|
||||
|
||||
user_prompt = f"""Summarize the following document:
|
||||
|
||||
{text}
|
||||
|
||||
Summary:"""
|
||||
|
||||
summary = call_llm(user_prompt, system_prompt)
|
||||
|
||||
return {
|
||||
"summary": summary,
|
||||
"original_length": original_length,
|
||||
"method": "direct",
|
||||
"chunks": 1
|
||||
}
|
||||
|
||||
# Chunked summarization for longer texts
|
||||
chunks = chunk_text(text)
|
||||
|
||||
chunk_summaries = []
|
||||
for i, chunk in enumerate(chunks, 1):
|
||||
system_prompt = f"""You are a precise legal assistant creating concise, accurate summaries.
|
||||
|
||||
You are processing chunk {i} of {len(chunks)} from a larger document.
|
||||
|
||||
Create a focused summary that:
|
||||
- Captures key points and important details
|
||||
- Is approximately {intermediate_length} words
|
||||
- Can be combined with other chunk summaries
|
||||
- Uses clear, professional language
|
||||
- Preserves names, dates, and specific facts
|
||||
|
||||
Respond as plain text without bullet points."""
|
||||
|
||||
user_prompt = f"""Summarize this text (chunk {i} of {len(chunks)}):
|
||||
|
||||
{chunk}
|
||||
|
||||
Summary:"""
|
||||
|
||||
chunk_summary = call_llm(user_prompt, system_prompt)
|
||||
chunk_summaries.append(chunk_summary)
|
||||
|
||||
# Synthesize into final summary
|
||||
combined = "\n\n".join(chunk_summaries)
|
||||
|
||||
system_prompt = """You are a precise legal assistant creating executive-level summaries.
|
||||
|
||||
Synthesize the provided partial summaries into a single, cohesive summary that:
|
||||
- Is approximately 100 words
|
||||
- Captures the complete document picture
|
||||
- Is clear and professional
|
||||
- Removes redundancy
|
||||
- Maintains logical flow
|
||||
- Preserves all critical information
|
||||
|
||||
Format as a single paragraph of plain text."""
|
||||
|
||||
user_prompt = f"""Synthesize these partial summaries into one cohesive summary:
|
||||
|
||||
{combined}
|
||||
|
||||
Final summary:"""
|
||||
|
||||
final_summary = call_llm(user_prompt, system_prompt)
|
||||
|
||||
return {
|
||||
"summary": final_summary,
|
||||
"original_length": original_length,
|
||||
"method": "chunked",
|
||||
"chunks": len(chunks)
|
||||
}
|
||||
|
||||
|
||||
class MCPSummaryHandler(BaseHTTPRequestHandler):
|
||||
"""HTTP handler for MCP summary server."""
|
||||
|
||||
def log_message(self, format, *args):
|
||||
# Quiet logs by default
|
||||
pass
|
||||
logger.info(format % args)
|
||||
|
||||
def _send_json(self, status: int, payload: Any):
|
||||
"""Send JSON response."""
|
||||
body = json.dumps(payload, ensure_ascii=False).encode("utf-8")
|
||||
self.send_response(status)
|
||||
self.send_header("Content-Type", "application/json")
|
||||
@@ -257,129 +482,230 @@ class MCPSummaryHandler(BaseHTTPRequestHandler):
|
||||
self.wfile.write(body)
|
||||
|
||||
def _auth_or_401(self) -> bool:
|
||||
"""Check authentication if API key is configured."""
|
||||
try:
|
||||
return require_auth(self.headers)
|
||||
except PermissionError:
|
||||
self._send_json(401, {"error": "Missing or invalid API key"})
|
||||
return False
|
||||
auth = (self.headers.get("Authorization") or "").strip()
|
||||
if not API_KEY:
|
||||
return True
|
||||
if auth.startswith("Bearer "):
|
||||
token = auth[len("Bearer "):].strip()
|
||||
if token == API_KEY:
|
||||
return True
|
||||
self._send_json(401, {"error": "Missing or invalid API key"})
|
||||
return False
|
||||
|
||||
def do_GET(self):
|
||||
"""Handle GET requests (health check)."""
|
||||
if self.path == "/":
|
||||
self._send_json(200, {
|
||||
"service": "mcp-summary",
|
||||
"transport": "streamable-http",
|
||||
"docs": "Use POST / with MCP JSON-RPC (initialize, tools/list, tools/call)."
|
||||
})
|
||||
return
|
||||
|
||||
self.send_error(404, "Not Found")
|
||||
try:
|
||||
if self.path == "/":
|
||||
self._send_json(200, {
|
||||
"service": "mcp-summary",
|
||||
"transport": "streamable-http",
|
||||
"docs": "Use POST / with MCP JSON-RPC (initialize, tools/list, tools/call)."
|
||||
})
|
||||
return
|
||||
self.send_error(404, "Not Found")
|
||||
except Exception as e:
|
||||
logger.error(f"GET error: {e}", exc_info=True)
|
||||
try:
|
||||
self.send_error(500, "Internal Server Error")
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
def do_POST(self):
|
||||
"""Handle MCP JSON-RPC requests."""
|
||||
if self.path not in ("/", "/mcp"):
|
||||
self.send_error(404, "Not Found")
|
||||
return
|
||||
|
||||
if not self._auth_or_401():
|
||||
return
|
||||
|
||||
length = int(self.headers.get("Content-Length", 0))
|
||||
if length == 0:
|
||||
self._send_json(400, {"error": "Empty body"})
|
||||
return
|
||||
|
||||
raw = self.rfile.read(length)
|
||||
try:
|
||||
req = json.loads(raw)
|
||||
except json.JSONDecodeError:
|
||||
self._send_json(400, {"error": "Invalid JSON"})
|
||||
return
|
||||
if self.path not in ("/", "/mcp"):
|
||||
self.send_error(404, "Not Found")
|
||||
return
|
||||
|
||||
method = req.get("method")
|
||||
params = req.get("params") or {}
|
||||
req_id = req.get("id")
|
||||
if not self._auth_or_401():
|
||||
return
|
||||
|
||||
# MCP: initialize
|
||||
if method == "initialize":
|
||||
self._send_json(200, {
|
||||
"jsonrpc": "2.0",
|
||||
"id": req_id,
|
||||
"result": {
|
||||
"protocolVersion": "2025-11-25",
|
||||
"capabilities": {
|
||||
"tools": {}
|
||||
},
|
||||
"serverInfo": {
|
||||
"name": "mcp-summary",
|
||||
"version": "1.0.0"
|
||||
}
|
||||
}
|
||||
})
|
||||
return
|
||||
length = int(self.headers.get("Content-Length", 0))
|
||||
if length == 0:
|
||||
self._send_json(400, {"error": "Empty body"})
|
||||
return
|
||||
|
||||
# MCP: tools/list
|
||||
if method == "tools/list":
|
||||
self._send_json(200, {
|
||||
"jsonrpc": "2.0",
|
||||
"id": req_id,
|
||||
"result": TOOLS_LIST
|
||||
})
|
||||
return
|
||||
|
||||
# MCP: tools/call
|
||||
if method == "tools/call":
|
||||
tool_name = params.get("name")
|
||||
tool_args = params.get("arguments") or {}
|
||||
raw = self.rfile.read(length)
|
||||
try:
|
||||
result = self._call_tool(tool_name, tool_args)
|
||||
req = json.loads(raw)
|
||||
except json.JSONDecodeError:
|
||||
self._send_json(400, {"error": "Invalid JSON"})
|
||||
return
|
||||
|
||||
method = req.get("method")
|
||||
params = req.get("params") or {}
|
||||
req_id = req.get("id")
|
||||
|
||||
logger.info(f"MCP request: method={method}, id={req_id}")
|
||||
|
||||
# Notifications
|
||||
if isinstance(method, str) and method.startswith("notifications/"):
|
||||
if req_id is not None:
|
||||
self._send_json(200, {"jsonrpc": "2.0", "id": req_id, "result": {}})
|
||||
else:
|
||||
self.send_response(200)
|
||||
self.send_header("Content-Length", "0")
|
||||
self.end_headers()
|
||||
return
|
||||
|
||||
# initialize
|
||||
if method == "initialize":
|
||||
self._send_json(200, {
|
||||
"jsonrpc": "2.0",
|
||||
"id": req_id,
|
||||
"result": {
|
||||
"content": [
|
||||
{"type": "text", "text": json.dumps(result, ensure_ascii=False)}
|
||||
]
|
||||
"protocolVersion": "2025-11-25",
|
||||
"capabilities": {"tools": {}},
|
||||
"serverInfo": {"name": "mcp-summary", "version": "1.0.0"}
|
||||
}
|
||||
})
|
||||
except Exception as e:
|
||||
self._send_json(200, {
|
||||
"jsonrpc": "2.0",
|
||||
"id": req_id,
|
||||
"error": {
|
||||
"code": -32000,
|
||||
"message": str(e)
|
||||
}
|
||||
})
|
||||
return
|
||||
return
|
||||
|
||||
# Unknown method
|
||||
self._send_json(400, {"error": "Unknown method: " + str(method)})
|
||||
# tools/list
|
||||
if method == "tools/list":
|
||||
self._send_json(200, {"jsonrpc": "2.0", "id": req_id, "result": TOOLS_LIST})
|
||||
return
|
||||
|
||||
# tools/call
|
||||
if method == "tools/call":
|
||||
tool_name = params.get("name")
|
||||
tool_args = params.get("arguments") or {}
|
||||
try:
|
||||
result = self._call_tool(tool_name, tool_args)
|
||||
self._send_json(200, {
|
||||
"jsonrpc": "2.0",
|
||||
"id": req_id,
|
||||
"result": {
|
||||
"content": [
|
||||
{"type": "text", "text": json.dumps(result, ensure_ascii=False)}
|
||||
]
|
||||
}
|
||||
})
|
||||
except Exception as e:
|
||||
logger.error(f"Tool call error: {e}", exc_info=True)
|
||||
self._send_json(200, {
|
||||
"jsonrpc": "2.0",
|
||||
"id": req_id,
|
||||
"error": {"code": -32000, "message": str(e)}
|
||||
})
|
||||
return
|
||||
|
||||
self._send_json(400, {"error": "Unknown method: " + str(method)})
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"POST error: {e}", exc_info=True)
|
||||
try:
|
||||
self.send_error(500, "Internal Server Error")
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
def _call_tool(self, name: str, args: Dict[str, Any]) -> Any:
|
||||
"""Execute a tool call."""
|
||||
if name == "summarize_document":
|
||||
# General single-text tools
|
||||
if name in (
|
||||
"summarize_document",
|
||||
"summarize_executive_brief",
|
||||
"summarize_bullet_points",
|
||||
"summarize_for_court",
|
||||
"extract_key_points",
|
||||
"extract_action_items",
|
||||
"extract_entities",
|
||||
"summarize_very_long_document"
|
||||
):
|
||||
text = args.get("text")
|
||||
if not text:
|
||||
raise ValueError("Text parameter is required")
|
||||
|
||||
max_length = args.get("max_length", 100)
|
||||
return summarize_document(text, max_length)
|
||||
final_output, chunks, intermediate_summaries = process_with_chunking(
|
||||
text, name, max_length
|
||||
)
|
||||
doc_id = generate_doc_id()
|
||||
store_document(doc_id, len(text), chunks, intermediate_summaries, final_output, name)
|
||||
return {
|
||||
"doc_id": doc_id,
|
||||
"tool": name,
|
||||
"result": final_output,
|
||||
"metadata": {
|
||||
"original_length": len(text),
|
||||
"chunks": len(chunks)
|
||||
}
|
||||
}
|
||||
|
||||
# compare_documents
|
||||
if name == "compare_documents":
|
||||
text1 = args.get("text1")
|
||||
text2 = args.get("text2")
|
||||
if not text1 or not text2:
|
||||
raise ValueError("text1 and text2 are required")
|
||||
final_output, chunks, intermediate_summaries = compare_texts_with_chunking(text1, text2)
|
||||
doc_id = generate_doc_id()
|
||||
store_document(doc_id, len(text1) + len(text2), chunks, intermediate_summaries, final_output, name)
|
||||
return {
|
||||
"doc_id": doc_id,
|
||||
"tool": name,
|
||||
"result": final_output,
|
||||
"metadata": {
|
||||
"original_length_1": len(text1),
|
||||
"original_length_2": len(text2),
|
||||
"chunks": len(chunks)
|
||||
}
|
||||
}
|
||||
|
||||
# retrieve_document_data
|
||||
if name == "retrieve_document_data":
|
||||
doc_id = args.get("doc_id")
|
||||
if not doc_id:
|
||||
raise ValueError("doc_id is required")
|
||||
doc = get_document(doc_id)
|
||||
if not doc:
|
||||
raise ValueError("Document not found or expired")
|
||||
# Return metadata + final_output + intermediate_summaries (chunks on demand if needed)
|
||||
return {
|
||||
"doc_id": doc_id,
|
||||
"tool_used": doc["tool_used"],
|
||||
"final_output": doc["final_output"],
|
||||
"intermediate_summaries": doc["intermediate_summaries"],
|
||||
"metadata": {
|
||||
"text_length": doc["text_length"],
|
||||
"chunks_count": doc["chunks_count"],
|
||||
"created_at": doc["created_at"]
|
||||
}
|
||||
}
|
||||
|
||||
# query_stored_document
|
||||
if name == "query_stored_document":
|
||||
doc_id = args.get("doc_id")
|
||||
question = args.get("question")
|
||||
if not doc_id or not question:
|
||||
raise ValueError("doc_id and question are required")
|
||||
doc = get_document(doc_id)
|
||||
if not doc:
|
||||
raise ValueError("Document not found or expired")
|
||||
answer = query_chunks(doc["chunks"], question)
|
||||
return {
|
||||
"doc_id": doc_id,
|
||||
"question": question,
|
||||
"answer": answer
|
||||
}
|
||||
|
||||
# clear_document_cache
|
||||
if name == "clear_document_cache":
|
||||
DOCUMENT_STORE.clear()
|
||||
return {"status": "ok", "message": "Document cache cleared."}
|
||||
|
||||
raise ValueError(f"Unknown tool: {name}")
|
||||
|
||||
|
||||
def main():
|
||||
"""Start the MCP summary server."""
|
||||
port = int(sys.argv[1]) if len(sys.argv) > 1 else int(os.environ.get("PORT", "8080"))
|
||||
logger.info(f"Starting MCP Summary Server on 0.0.0.0:{port}")
|
||||
logger.info(f"Auth mode: {'Bearer (API_KEY set)' if API_KEY else 'none (API_KEY not set)'}")
|
||||
logger.info(f"LLM URL: {OPENAPI_URL}")
|
||||
logger.info(f"Model: {MODEL_NAME}")
|
||||
logger.info(f"Cache: max_docs={MAX_STORED_DOCS}, ttl={CACHE_TTL_SECONDS}s")
|
||||
server = HTTPServer(("0.0.0.0", port), MCPSummaryHandler)
|
||||
mode = "auth enabled (Bearer)" if API_KEY else "no auth (API_KEY not set)"
|
||||
print(f"MCP Summary Server listening on 0.0.0.0:{port} [{mode}]")
|
||||
try:
|
||||
logger.info(f"MCP Summary Server listening on 0.0.0.0:{port}")
|
||||
server.serve_forever()
|
||||
except KeyboardInterrupt:
|
||||
print("\nShutting down...")
|
||||
logger.info("Shutting down...")
|
||||
server.server_close()
|
||||
|
||||
|
||||
|
||||
Reference in New Issue
Block a user