Update mcp_time_server.py

This commit is contained in:
2026-06-12 04:02:57 +00:00
parent a80132998e
commit 8c34a17a9a
+89 -72
View File
@@ -1,6 +1,6 @@
#!/usr/bin/env python3
"""
MCP Time Tools Server (HTTP transport)
MCP Time Tools Server (Streamable HTTP transport)
Tools:
- get_current_time
@@ -25,7 +25,7 @@ import sys
from datetime import datetime, timedelta, timezone
from zoneinfo import ZoneInfo, ZoneInfoNotFoundError
from http.server import HTTPServer, BaseHTTPRequestHandler
from typing import Any, Dict, List, Optional
from typing import Any, Dict, Optional
API_KEY = os.environ.get("API_KEY", "").strip()
@@ -166,10 +166,51 @@ TOOL_SCHEMA: Dict[str, Any] = {
]
}
OPENAPI_SPEC = {
"openapi": "3.0.3",
"info": {
"title": "MCP Time Tools",
"version": "1.0.0",
"description": "Time manipulation, timezone conversion, natural language parsing, and formatting via MCP over HTTP."
},
"paths": {
"/mcp": {
"post": {
"summary": "MCP JSON-RPC endpoint",
"description": "Use this endpoint to call tools via MCP JSON-RPC (tools/call, tools/list, initialize).",
"requestBody": {
"required": True,
"content": {
"application/json": {
"schema": {
"type": "object",
"properties": {
"jsonrpc": {"type": "string", "example": "2.0"},
"method": {"type": "string", "example": "tools/call"},
"params": {
"type": "object",
"example": {
"name": "get_current_time",
"arguments": {"timezone": "UTC"}
}
},
"id": {"type": "integer", "example": 1}
}
}
}
}
},
"responses": {
"200": {"description": "JSON-RPC response"}
}
}
}
}
}
def parse_iso_datetime(value: str) -> datetime:
value = value.strip()
# If no timezone info, treat as UTC
if not any(ch in value for ch in ("+", "-", "Z")) or value.endswith("Z"):
value = value.replace("Z", "+00:00")
if "T" in value and ("+" not in value.split("T", 1)[1] and "-" not in value.split("T", 1)[1]):
@@ -179,13 +220,10 @@ def parse_iso_datetime(value: str) -> datetime:
def parse_duration_to_timedelta(dur: str) -> timedelta:
s = dur.strip()
# ISO 8601 duration (starts with P)
if s.upper().startswith("P"):
from dateutil import parser as dateutil_parser
return dateutil_parser.parse_duration(s)
# Natural tokens: "2 hours", "3 days", etc.
tokens = s.lower().split()
td = timedelta()
for token in tokens:
@@ -196,7 +234,7 @@ def parse_duration_to_timedelta(dur: str) -> timedelta:
if not num_part:
continue
num = float(num_part)
unit = unit_part.rstrip("s") # normalize singular
unit = unit_part.rstrip("s")
if unit in ("second", "sec"):
td += timedelta(seconds=num)
elif unit in ("minute", "min"):
@@ -208,10 +246,8 @@ def parse_duration_to_timedelta(dur: str) -> timedelta:
elif unit in ("week",):
td += timedelta(weeks=num)
elif unit in ("month",):
# approximate: 30 days
td += timedelta(days=30 * num)
elif unit in ("year",):
# approximate: 365 days
td += timedelta(days=365 * num)
else:
raise ValueError(f"Unknown duration unit: {unit_part}")
@@ -225,25 +261,17 @@ def safe_get_tz(tz_str: str) -> ZoneInfo:
raise ValueError(f"Unknown timezone: {tz_str}")
def parse_natural_language_time(
expression: str,
ref: datetime,
tz: timezone
) -> datetime:
def parse_natural_language_time(expression: str, ref: datetime, tz: timezone) -> datetime:
expr = expression.strip().lower()
now = ref.astimezone(tz)
# Direct words
if expr in ("now", "current time", "today"):
return now
if expr in ("yesterday",):
if expr == "yesterday":
return now - timedelta(days=1)
if expr in ("tomorrow",):
if expr == "tomorrow":
return now + timedelta(days=1)
# "in X days/hours/minutes/weeks"
import re
m = re.match(
r"^(?:in\s+)?(\d+(?:\.\d+)?)\s*(day|days|hour|hours|minute|minutes|week|weeks|month|months|year|years)\s*$",
@@ -261,22 +289,16 @@ def parse_natural_language_time(
if unit in ("week",):
return now + timedelta(weeks=val)
if unit in ("month",):
# approximate
return now + timedelta(days=30 * val)
if unit in ("year",):
# approximate
return now + timedelta(days=365 * val)
# "last week"
if expr in ("last week",):
if expr == "last week":
return now - timedelta(weeks=1)
# "next week"
if expr in ("next week",):
if expr == "next week":
return now + timedelta(weeks=1)
# "next month"
if expr in ("next month",):
if expr == "next month":
month = now.month + 1
year = now.year
if month > 12:
@@ -285,11 +307,9 @@ def parse_natural_language_time(
try:
return now.replace(year=year, month=month)
except ValueError:
# fallback: 30 days
return now + timedelta(days=30)
# "last month"
if expr in ("last month",):
if expr == "last month":
month = now.month - 1
year = now.year
if month < 1:
@@ -300,15 +320,12 @@ def parse_natural_language_time(
except ValueError:
return now - timedelta(days=30)
# "end of day"
if expr in ("end of day", "end of today"):
return now.replace(hour=23, minute=59, second=59, microsecond=999999)
# "start of day"
if expr in ("start of day", "start of today"):
return now.replace(hour=0, minute=0, second=0, microsecond=0)
# Fallback: try to parse as ISO
try:
dt = parse_iso_datetime(expression)
return dt.astimezone(tz)
@@ -318,7 +335,6 @@ def parse_natural_language_time(
def format_datetime(dt: datetime, fmt: str) -> str:
fmt = fmt.strip().lower()
if fmt == "iso":
return dt.isoformat()
if fmt == "iso-compact":
@@ -327,48 +343,30 @@ def format_datetime(dt: datetime, fmt: str) -> str:
return dt.strftime("%a, %d %b %Y %H:%M:%S %z")
if fmt == "readable":
return dt.strftime("%Y-%m-%d %H:%M:%S %Z")
# treat as strftime pattern
return dt.strftime(fmt)
def get_api_key_from_request(headers: Any) -> Optional[str]:
"""
Extract API key from:
- Authorization: Bearer <key>
- X-API-Key: <key>
"""
auth = (headers.get("Authorization") or "").strip()
if auth.startswith("Bearer "):
return auth[len("Bearer "):].strip()
x_key = (headers.get("X-API-Key") or "").strip()
if x_key:
return x_key
return None
def require_auth(headers: Any) -> bool:
"""
If API_KEY is set, the request must provide a matching key.
Returns True if allowed, raises HTTPError if not.
"""
if not API_KEY:
return True
key = get_api_key_from_request(headers)
if not key or key != API_KEY:
raise PermissionError("Missing or invalid API key")
return True
# ---------- MCP HTTP handler ----------
class MCPTimeToolsHandler(BaseHTTPRequestHandler):
def log_message(self, format, *args):
# Quiet logs by default
pass
def _send_json(self, status: int, payload: Any):
@@ -387,7 +385,7 @@ class MCPTimeToolsHandler(BaseHTTPRequestHandler):
return False
def do_GET(self):
# Root: simple info (no auth required)
# Root info
if self.path == "/":
self._send_json(200, {
"service": "mcp-time-tools",
@@ -397,7 +395,12 @@ class MCPTimeToolsHandler(BaseHTTPRequestHandler):
})
return
# MCP discover tools via GET /mcp (auth-gated if API_KEY is set)
# OpenAPI spec for OpenWebUI discovery
if self.path == "/mcp/openapi.json":
self._send_json(200, OPENAPI_SPEC)
return
# MCP tools list via GET
if self.path == "/mcp":
if not self._auth_or_401():
return
@@ -410,7 +413,8 @@ class MCPTimeToolsHandler(BaseHTTPRequestHandler):
self.send_error(404, "Not Found")
def do_POST(self):
if self.path != "/mcp":
# Support both / and /mcp as MCP endpoints
if self.path not in ("/", "/mcp"):
self.send_error(404, "Not Found")
return
@@ -433,7 +437,34 @@ class MCPTimeToolsHandler(BaseHTTPRequestHandler):
params = req.get("params") or {}
req_id = req.get("id")
# MCP tool calling pattern: tools/call with name + arguments
# MCP initialize handshake
if method == "initialize":
self._send_json(200, {
"jsonrpc": "2.0",
"id": req_id,
"result": {
"protocolVersion": "2025-11-25",
"capabilities": {
"tools": {}
},
"serverInfo": {
"name": "mcp-time-tools",
"version": "1.0.0"
}
}
})
return
# tools/list
if method == "tools/list":
self._send_json(200, {
"jsonrpc": "2.0",
"id": req_id,
"result": TOOL_SCHEMA
})
return
# tools/call
if method == "tools/call":
tool_name = params.get("name")
tool_args = params.get("arguments") or {}
@@ -459,16 +490,6 @@ class MCPTimeToolsHandler(BaseHTTPRequestHandler):
})
return
# If client asks for tools list via JSON-RPC
if method == "tools/list":
self._send_json(200, {
"jsonrpc": "2.0",
"id": req_id,
"result": TOOL_SCHEMA
})
return
# Fallback
self._send_json(400, {"error": "Unknown method: " + str(method)})
def _call_tool(self, name: str, args: Dict[str, Any]) -> Any:
@@ -485,7 +506,6 @@ class MCPTimeToolsHandler(BaseHTTPRequestHandler):
if name == "convert_timezone":
dt = parse_iso_datetime(args["datetime"])
to_tz = safe_get_tz(args["to_timezone"])
# If from_timezone provided, interpret dt in that tz
if "from_timezone" in args:
from_tz = safe_get_tz(args["from_timezone"])
if dt.tzinfo is None:
@@ -493,10 +513,8 @@ class MCPTimeToolsHandler(BaseHTTPRequestHandler):
else:
dt = dt.astimezone(from_tz)
else:
# If no tz, assume UTC
if dt.tzinfo is None:
dt = dt.replace(tzinfo=timezone.utc)
converted = dt.astimezone(to_tz)
return {
"original": dt.isoformat(),
@@ -580,7 +598,6 @@ class MCPTimeToolsHandler(BaseHTTPRequestHandler):
def main():
# Allow optional port override via environment or argument
port = int(sys.argv[1]) if len(sys.argv) > 1 else int(os.environ.get("PORT", "8080"))
server = HTTPServer(("0.0.0.0", port), MCPTimeToolsHandler)
mode = "auth enabled" if API_KEY else "no auth (API_KEY not set)"