diff --git a/mcp_time_server.py b/mcp_time_server.py index 98ac183..0a03865 100644 --- a/mcp_time_server.py +++ b/mcp_time_server.py @@ -1,6 +1,6 @@ #!/usr/bin/env python3 """ -MCP Time Tools Server (HTTP transport) +MCP Time Tools Server (Streamable HTTP transport) Tools: - get_current_time @@ -25,7 +25,7 @@ import sys from datetime import datetime, timedelta, timezone from zoneinfo import ZoneInfo, ZoneInfoNotFoundError from http.server import HTTPServer, BaseHTTPRequestHandler -from typing import Any, Dict, List, Optional +from typing import Any, Dict, Optional API_KEY = os.environ.get("API_KEY", "").strip() @@ -166,10 +166,51 @@ TOOL_SCHEMA: Dict[str, Any] = { ] } +OPENAPI_SPEC = { + "openapi": "3.0.3", + "info": { + "title": "MCP Time Tools", + "version": "1.0.0", + "description": "Time manipulation, timezone conversion, natural language parsing, and formatting via MCP over HTTP." + }, + "paths": { + "/mcp": { + "post": { + "summary": "MCP JSON-RPC endpoint", + "description": "Use this endpoint to call tools via MCP JSON-RPC (tools/call, tools/list, initialize).", + "requestBody": { + "required": True, + "content": { + "application/json": { + "schema": { + "type": "object", + "properties": { + "jsonrpc": {"type": "string", "example": "2.0"}, + "method": {"type": "string", "example": "tools/call"}, + "params": { + "type": "object", + "example": { + "name": "get_current_time", + "arguments": {"timezone": "UTC"} + } + }, + "id": {"type": "integer", "example": 1} + } + } + } + } + }, + "responses": { + "200": {"description": "JSON-RPC response"} + } + } + } + } +} + def parse_iso_datetime(value: str) -> datetime: value = value.strip() - # If no timezone info, treat as UTC if not any(ch in value for ch in ("+", "-", "Z")) or value.endswith("Z"): value = value.replace("Z", "+00:00") if "T" in value and ("+" not in value.split("T", 1)[1] and "-" not in value.split("T", 1)[1]): @@ -179,13 +220,10 @@ def parse_iso_datetime(value: str) -> datetime: def parse_duration_to_timedelta(dur: str) -> timedelta: s = dur.strip() - - # ISO 8601 duration (starts with P) if s.upper().startswith("P"): from dateutil import parser as dateutil_parser return dateutil_parser.parse_duration(s) - # Natural tokens: "2 hours", "3 days", etc. tokens = s.lower().split() td = timedelta() for token in tokens: @@ -196,7 +234,7 @@ def parse_duration_to_timedelta(dur: str) -> timedelta: if not num_part: continue num = float(num_part) - unit = unit_part.rstrip("s") # normalize singular + unit = unit_part.rstrip("s") if unit in ("second", "sec"): td += timedelta(seconds=num) elif unit in ("minute", "min"): @@ -208,10 +246,8 @@ def parse_duration_to_timedelta(dur: str) -> timedelta: elif unit in ("week",): td += timedelta(weeks=num) elif unit in ("month",): - # approximate: 30 days td += timedelta(days=30 * num) elif unit in ("year",): - # approximate: 365 days td += timedelta(days=365 * num) else: raise ValueError(f"Unknown duration unit: {unit_part}") @@ -225,25 +261,17 @@ def safe_get_tz(tz_str: str) -> ZoneInfo: raise ValueError(f"Unknown timezone: {tz_str}") -def parse_natural_language_time( - expression: str, - ref: datetime, - tz: timezone -) -> datetime: +def parse_natural_language_time(expression: str, ref: datetime, tz: timezone) -> datetime: expr = expression.strip().lower() now = ref.astimezone(tz) - # Direct words if expr in ("now", "current time", "today"): return now - - if expr in ("yesterday",): + if expr == "yesterday": return now - timedelta(days=1) - - if expr in ("tomorrow",): + if expr == "tomorrow": return now + timedelta(days=1) - # "in X days/hours/minutes/weeks" import re m = re.match( r"^(?:in\s+)?(\d+(?:\.\d+)?)\s*(day|days|hour|hours|minute|minutes|week|weeks|month|months|year|years)\s*$", @@ -261,22 +289,16 @@ def parse_natural_language_time( if unit in ("week",): return now + timedelta(weeks=val) if unit in ("month",): - # approximate return now + timedelta(days=30 * val) if unit in ("year",): - # approximate return now + timedelta(days=365 * val) - # "last week" - if expr in ("last week",): + if expr == "last week": return now - timedelta(weeks=1) - - # "next week" - if expr in ("next week",): + if expr == "next week": return now + timedelta(weeks=1) - # "next month" - if expr in ("next month",): + if expr == "next month": month = now.month + 1 year = now.year if month > 12: @@ -285,11 +307,9 @@ def parse_natural_language_time( try: return now.replace(year=year, month=month) except ValueError: - # fallback: 30 days return now + timedelta(days=30) - # "last month" - if expr in ("last month",): + if expr == "last month": month = now.month - 1 year = now.year if month < 1: @@ -300,15 +320,12 @@ def parse_natural_language_time( except ValueError: return now - timedelta(days=30) - # "end of day" if expr in ("end of day", "end of today"): return now.replace(hour=23, minute=59, second=59, microsecond=999999) - # "start of day" if expr in ("start of day", "start of today"): return now.replace(hour=0, minute=0, second=0, microsecond=0) - # Fallback: try to parse as ISO try: dt = parse_iso_datetime(expression) return dt.astimezone(tz) @@ -318,7 +335,6 @@ def parse_natural_language_time( def format_datetime(dt: datetime, fmt: str) -> str: fmt = fmt.strip().lower() - if fmt == "iso": return dt.isoformat() if fmt == "iso-compact": @@ -327,48 +343,30 @@ def format_datetime(dt: datetime, fmt: str) -> str: return dt.strftime("%a, %d %b %Y %H:%M:%S %z") if fmt == "readable": return dt.strftime("%Y-%m-%d %H:%M:%S %Z") - - # treat as strftime pattern return dt.strftime(fmt) def get_api_key_from_request(headers: Any) -> Optional[str]: - """ - Extract API key from: - - Authorization: Bearer - - X-API-Key: - """ auth = (headers.get("Authorization") or "").strip() if auth.startswith("Bearer "): return auth[len("Bearer "):].strip() - x_key = (headers.get("X-API-Key") or "").strip() if x_key: return x_key - return None def require_auth(headers: Any) -> bool: - """ - If API_KEY is set, the request must provide a matching key. - Returns True if allowed, raises HTTPError if not. - """ if not API_KEY: return True - key = get_api_key_from_request(headers) if not key or key != API_KEY: raise PermissionError("Missing or invalid API key") - return True -# ---------- MCP HTTP handler ---------- - class MCPTimeToolsHandler(BaseHTTPRequestHandler): def log_message(self, format, *args): - # Quiet logs by default pass def _send_json(self, status: int, payload: Any): @@ -387,7 +385,7 @@ class MCPTimeToolsHandler(BaseHTTPRequestHandler): return False def do_GET(self): - # Root: simple info (no auth required) + # Root info if self.path == "/": self._send_json(200, { "service": "mcp-time-tools", @@ -397,7 +395,12 @@ class MCPTimeToolsHandler(BaseHTTPRequestHandler): }) return - # MCP discover tools via GET /mcp (auth-gated if API_KEY is set) + # OpenAPI spec for OpenWebUI discovery + if self.path == "/mcp/openapi.json": + self._send_json(200, OPENAPI_SPEC) + return + + # MCP tools list via GET if self.path == "/mcp": if not self._auth_or_401(): return @@ -410,7 +413,8 @@ class MCPTimeToolsHandler(BaseHTTPRequestHandler): self.send_error(404, "Not Found") def do_POST(self): - if self.path != "/mcp": + # Support both / and /mcp as MCP endpoints + if self.path not in ("/", "/mcp"): self.send_error(404, "Not Found") return @@ -433,7 +437,34 @@ class MCPTimeToolsHandler(BaseHTTPRequestHandler): params = req.get("params") or {} req_id = req.get("id") - # MCP tool calling pattern: tools/call with name + arguments + # MCP initialize handshake + if method == "initialize": + self._send_json(200, { + "jsonrpc": "2.0", + "id": req_id, + "result": { + "protocolVersion": "2025-11-25", + "capabilities": { + "tools": {} + }, + "serverInfo": { + "name": "mcp-time-tools", + "version": "1.0.0" + } + } + }) + return + + # tools/list + if method == "tools/list": + self._send_json(200, { + "jsonrpc": "2.0", + "id": req_id, + "result": TOOL_SCHEMA + }) + return + + # tools/call if method == "tools/call": tool_name = params.get("name") tool_args = params.get("arguments") or {} @@ -459,16 +490,6 @@ class MCPTimeToolsHandler(BaseHTTPRequestHandler): }) return - # If client asks for tools list via JSON-RPC - if method == "tools/list": - self._send_json(200, { - "jsonrpc": "2.0", - "id": req_id, - "result": TOOL_SCHEMA - }) - return - - # Fallback self._send_json(400, {"error": "Unknown method: " + str(method)}) def _call_tool(self, name: str, args: Dict[str, Any]) -> Any: @@ -485,7 +506,6 @@ class MCPTimeToolsHandler(BaseHTTPRequestHandler): if name == "convert_timezone": dt = parse_iso_datetime(args["datetime"]) to_tz = safe_get_tz(args["to_timezone"]) - # If from_timezone provided, interpret dt in that tz if "from_timezone" in args: from_tz = safe_get_tz(args["from_timezone"]) if dt.tzinfo is None: @@ -493,10 +513,8 @@ class MCPTimeToolsHandler(BaseHTTPRequestHandler): else: dt = dt.astimezone(from_tz) else: - # If no tz, assume UTC if dt.tzinfo is None: dt = dt.replace(tzinfo=timezone.utc) - converted = dt.astimezone(to_tz) return { "original": dt.isoformat(), @@ -580,7 +598,6 @@ class MCPTimeToolsHandler(BaseHTTPRequestHandler): def main(): - # Allow optional port override via environment or argument port = int(sys.argv[1]) if len(sys.argv) > 1 else int(os.environ.get("PORT", "8080")) server = HTTPServer(("0.0.0.0", port), MCPTimeToolsHandler) mode = "auth enabled" if API_KEY else "no auth (API_KEY not set)"