Improve error handling and logging for diagnostics
This commit is contained in:
+12
-4
@@ -24,13 +24,13 @@ Auth:
|
||||
import json
|
||||
import os
|
||||
import sys
|
||||
import re
|
||||
from http.server import HTTPServer, BaseHTTPRequestHandler
|
||||
from typing import Any, Dict, Optional
|
||||
import requests
|
||||
|
||||
API_KEY = os.environ.get("API_KEY", "").strip()
|
||||
|
||||
# Tool definitions
|
||||
TOOLS_LIST: Dict[str, Any] = {
|
||||
"tools": [
|
||||
{
|
||||
@@ -56,6 +56,7 @@ TOOLS_LIST: Dict[str, Any] = {
|
||||
|
||||
|
||||
def get_bearer_token(headers: Any) -> Optional[str]:
|
||||
"""Extract bearer token from Authorization header."""
|
||||
auth = (headers.get("Authorization") or "").strip()
|
||||
if auth.startswith("Bearer "):
|
||||
return auth[len("Bearer "):].strip()
|
||||
@@ -63,6 +64,7 @@ def get_bearer_token(headers: Any) -> Optional[str]:
|
||||
|
||||
|
||||
def require_auth(headers: Any) -> bool:
|
||||
"""Check authentication if API key is configured."""
|
||||
# If API_KEY is not set, allow unauthenticated access
|
||||
if not API_KEY:
|
||||
return True
|
||||
@@ -239,11 +241,14 @@ Final summary:"""
|
||||
|
||||
|
||||
class MCPSummaryHandler(BaseHTTPRequestHandler):
|
||||
"""HTTP handler for MCP summary server."""
|
||||
|
||||
def log_message(self, format, *args):
|
||||
# Quiet logs by default
|
||||
pass
|
||||
|
||||
def _send_json(self, status: int, payload: Any):
|
||||
"""Send JSON response."""
|
||||
body = json.dumps(payload, ensure_ascii=False).encode("utf-8")
|
||||
self.send_response(status)
|
||||
self.send_header("Content-Type", "application/json")
|
||||
@@ -251,7 +256,8 @@ class MCPSummaryHandler(BaseHTTPRequestHandler):
|
||||
self.end_headers()
|
||||
self.wfile.write(body)
|
||||
|
||||
def _auth_or_401(self):
|
||||
def _auth_or_401(self) -> bool:
|
||||
"""Check authentication if API key is configured."""
|
||||
try:
|
||||
return require_auth(self.headers)
|
||||
except PermissionError:
|
||||
@@ -259,7 +265,7 @@ class MCPSummaryHandler(BaseHTTPRequestHandler):
|
||||
return False
|
||||
|
||||
def do_GET(self):
|
||||
# Basic info endpoint (not required by MCP, but useful)
|
||||
"""Handle GET requests (health check)."""
|
||||
if self.path == "/":
|
||||
self._send_json(200, {
|
||||
"service": "mcp-summary",
|
||||
@@ -271,7 +277,7 @@ class MCPSummaryHandler(BaseHTTPRequestHandler):
|
||||
self.send_error(404, "Not Found")
|
||||
|
||||
def do_POST(self):
|
||||
# Streamable HTTP MCP endpoint
|
||||
"""Handle MCP JSON-RPC requests."""
|
||||
if self.path not in ("/", "/mcp"):
|
||||
self.send_error(404, "Not Found")
|
||||
return
|
||||
@@ -352,6 +358,7 @@ class MCPSummaryHandler(BaseHTTPRequestHandler):
|
||||
self._send_json(400, {"error": "Unknown method: " + str(method)})
|
||||
|
||||
def _call_tool(self, name: str, args: Dict[str, Any]) -> Any:
|
||||
"""Execute a tool call."""
|
||||
if name == "summarize_document":
|
||||
text = args.get("text")
|
||||
if not text:
|
||||
@@ -364,6 +371,7 @@ class MCPSummaryHandler(BaseHTTPRequestHandler):
|
||||
|
||||
|
||||
def main():
|
||||
"""Start the MCP summary server."""
|
||||
port = int(sys.argv[1]) if len(sys.argv) > 1 else int(os.environ.get("PORT", "8080"))
|
||||
server = HTTPServer(("0.0.0.0", port), MCPSummaryHandler)
|
||||
mode = "auth enabled (Bearer)" if API_KEY else "no auth (API_KEY not set)"
|
||||
|
||||
Reference in New Issue
Block a user