#!/usr/bin/env python3 """ MCP Time Tools Server (Streamable HTTP transport) Tools: - get_current_time - convert_timezone - add_time - subtract_time - parse_natural_language - compare_times - format_time API Key Auth: - Set API_KEY environment variable to enable auth. - Send as: - Authorization: Bearer - or X-API-Key: - If API_KEY is not set, the server runs without auth (for dev). """ import json import os import sys from datetime import datetime, timedelta, timezone from zoneinfo import ZoneInfo, ZoneInfoNotFoundError from http.server import HTTPServer, BaseHTTPRequestHandler from typing import Any, Dict, Optional API_KEY = os.environ.get("API_KEY", "").strip() TOOL_SCHEMA: Dict[str, Any] = { "tools": [ { "name": "get_current_time", "description": "Get the current date and time, optionally in a specific timezone.", "inputSchema": { "type": "object", "properties": { "timezone": { "type": "string", "description": "Timezone (e.g. 'UTC', 'America/New_York'). Defaults to UTC." } }, "required": [] } }, { "name": "convert_timezone", "description": "Convert a datetime from one timezone to another.", "inputSchema": { "type": "object", "properties": { "datetime": { "type": "string", "description": "ISO 8601 datetime (e.g. '2025-01-01T12:00:00+00:00')." }, "from_timezone": { "type": "string", "description": "Source timezone (e.g. 'UTC', 'America/New_York'). Optional." }, "to_timezone": { "type": "string", "description": "Target timezone (e.g. 'Asia/Tokyo')." } }, "required": ["datetime", "to_timezone"] } }, { "name": "add_time", "description": "Add a duration to a datetime. Duration in ISO 8601 (e.g. 'P1DT2H30M') or simple tokens like '2 hours', '3 days'." , "inputSchema": { "type": "object", "properties": { "datetime": { "type": "string", "description": "ISO 8601 datetime." }, "duration": { "type": "string", "description": "Duration to add, e.g. 'P1DT2H30M', '2 hours', '3 days'." } }, "required": ["datetime", "duration"] } }, { "name": "subtract_time", "description": "Subtract a duration from a datetime.", "inputSchema": { "type": "object", "properties": { "datetime": { "type": "string", "description": "ISO 8601 datetime." }, "duration": { "type": "string", "description": "Duration to subtract, e.g. 'P1DT2H30M', '2 hours', '3 days'." } }, "required": ["datetime", "duration"] } }, { "name": "parse_natural_language", "description": "Parse natural language time expressions like 'yesterday', 'next month', 'in 3 days' into ISO 8601." , "inputSchema": { "type": "object", "properties": { "expression": { "type": "string", "description": "Natural language time expression, e.g. 'yesterday', 'next month', 'in 2 days'." }, "reference_time": { "type": "string", "description": "Optional ISO 8601 reference time. Defaults to now." }, "timezone": { "type": "string", "description": "Timezone for the result (e.g. 'UTC', 'America/New_York'). Defaults to UTC." } }, "required": ["expression"] } }, { "name": "compare_times", "description": "Compare two ISO 8601 datetimes and return their difference.", "inputSchema": { "type": "object", "properties": { "time1": { "type": "string", "description": "First ISO 8601 datetime." }, "time2": { "type": "string", "description": "Second ISO 8601 datetime." } }, "required": ["time1", "time2"] } }, { "name": "format_time", "description": "Format a datetime using a predefined or custom format pattern.", "inputSchema": { "type": "object", "properties": { "datetime": { "type": "string", "description": "ISO 8601 datetime." }, "format": { "type": "string", "description": "Format: 'iso', 'iso-compact', 'rfc822', 'readable', or a Python strftime pattern." } }, "required": ["datetime", "format"] } } ] } OPENAPI_SPEC = { "openapi": "3.0.3", "info": { "title": "MCP Time Tools", "version": "1.0.0", "description": "Time manipulation, timezone conversion, natural language parsing, and formatting via MCP over HTTP." }, "paths": { "/mcp": { "post": { "summary": "MCP JSON-RPC endpoint", "description": "Use this endpoint to call tools via MCP JSON-RPC (tools/call, tools/list, initialize).", "requestBody": { "required": True, "content": { "application/json": { "schema": { "type": "object", "properties": { "jsonrpc": {"type": "string", "example": "2.0"}, "method": {"type": "string", "example": "tools/call"}, "params": { "type": "object", "example": { "name": "get_current_time", "arguments": {"timezone": "UTC"} } }, "id": {"type": "integer", "example": 1} } } } } }, "responses": { "200": {"description": "JSON-RPC response"} } } } } } def parse_iso_datetime(value: str) -> datetime: value = value.strip() if not any(ch in value for ch in ("+", "-", "Z")) or value.endswith("Z"): value = value.replace("Z", "+00:00") if "T" in value and ("+" not in value.split("T", 1)[1] and "-" not in value.split("T", 1)[1]): value = value + "+00:00" return datetime.fromisoformat(value) def parse_duration_to_timedelta(dur: str) -> timedelta: s = dur.strip() if s.upper().startswith("P"): from dateutil import parser as dateutil_parser return dateutil_parser.parse_duration(s) tokens = s.lower().split() td = timedelta() for token in tokens: if not token: continue num_part = "".join(ch for ch in token if ch.isdigit() or ch == ".") unit_part = token.replace(num_part, "").strip() if not num_part: continue num = float(num_part) unit = unit_part.rstrip("s") if unit in ("second", "sec"): td += timedelta(seconds=num) elif unit in ("minute", "min"): td += timedelta(minutes=num) elif unit in ("hour", "hr"): td += timedelta(hours=num) elif unit in ("day",): td += timedelta(days=num) elif unit in ("week",): td += timedelta(weeks=num) elif unit in ("month",): td += timedelta(days=30 * num) elif unit in ("year",): td += timedelta(days=365 * num) else: raise ValueError(f"Unknown duration unit: {unit_part}") return td def safe_get_tz(tz_str: str) -> ZoneInfo: try: return ZoneInfo(tz_str) except (ZoneInfoNotFoundError, KeyError, Exception): raise ValueError(f"Unknown timezone: {tz_str}") def parse_natural_language_time(expression: str, ref: datetime, tz: timezone) -> datetime: expr = expression.strip().lower() now = ref.astimezone(tz) if expr in ("now", "current time", "today"): return now if expr == "yesterday": return now - timedelta(days=1) if expr == "tomorrow": return now + timedelta(days=1) import re m = re.match( r"^(?:in\s+)?(\d+(?:\.\d+)?)\s*(day|days|hour|hours|minute|minutes|week|weeks|month|months|year|years)\s*$", expr, ) if m: val = float(m.group(1)) unit = m.group(2).rstrip("s") if unit in ("day",): return now + timedelta(days=val) if unit in ("hour",): return now + timedelta(hours=val) if unit in ("minute",): return now + timedelta(minutes=val) if unit in ("week",): return now + timedelta(weeks=val) if unit in ("month",): return now + timedelta(days=30 * val) if unit in ("year",): return now + timedelta(days=365 * val) if expr == "last week": return now - timedelta(weeks=1) if expr == "next week": return now + timedelta(weeks=1) if expr == "next month": month = now.month + 1 year = now.year if month > 12: month = 1 year += 1 try: return now.replace(year=year, month=month) except ValueError: return now + timedelta(days=30) if expr == "last month": month = now.month - 1 year = now.year if month < 1: month = 12 year -= 1 try: return now.replace(year=year, month=month) except ValueError: return now - timedelta(days=30) if expr in ("end of day", "end of today"): return now.replace(hour=23, minute=59, second=59, microsecond=999999) if expr in ("start of day", "start of today"): return now.replace(hour=0, minute=0, second=0, microsecond=0) try: dt = parse_iso_datetime(expression) return dt.astimezone(tz) except Exception: raise ValueError(f"Could not parse natural language time expression: {expression}") def format_datetime(dt: datetime, fmt: str) -> str: fmt = fmt.strip().lower() if fmt == "iso": return dt.isoformat() if fmt == "iso-compact": return dt.isoformat(timespec="seconds").replace(":", "") if fmt == "rfc822": return dt.strftime("%a, %d %b %Y %H:%M:%S %z") if fmt == "readable": return dt.strftime("%Y-%m-%d %H:%M:%S %Z") return dt.strftime(fmt) def get_api_key_from_request(headers: Any) -> Optional[str]: auth = (headers.get("Authorization") or "").strip() if auth.startswith("Bearer "): return auth[len("Bearer "):].strip() x_key = (headers.get("X-API-Key") or "").strip() if x_key: return x_key return None def require_auth(headers: Any) -> bool: if not API_KEY: return True key = get_api_key_from_request(headers) if not key or key != API_KEY: raise PermissionError("Missing or invalid API key") return True class MCPTimeToolsHandler(BaseHTTPRequestHandler): def log_message(self, format, *args): pass def _send_json(self, status: int, payload: Any): body = json.dumps(payload, ensure_ascii=False).encode("utf-8") self.send_response(status) self.send_header("Content-Type", "application/json") self.send_header("Content-Length", str(len(body))) self.end_headers() self.wfile.write(body) def _auth_or_401(self): try: return require_auth(self.headers) except PermissionError: self._send_json(401, {"error": "Missing or invalid API key"}) return False def do_GET(self): # Root info if self.path == "/": self._send_json(200, { "service": "mcp-time-tools", "transport": "http", "auth": "API key required if API_KEY is set (Authorization: Bearer or X-API-Key: )", "docs": "Use POST /mcp with MCP JSON-RPC requests." }) return # OpenAPI spec for OpenWebUI discovery if self.path == "/mcp/openapi.json": self._send_json(200, OPENAPI_SPEC) return # MCP tools list via GET if self.path == "/mcp": if not self._auth_or_401(): return self._send_json(200, { "jsonrpc": "2.0", "tools": TOOL_SCHEMA["tools"] }) return self.send_error(404, "Not Found") def do_POST(self): # Support both / and /mcp as MCP endpoints if self.path not in ("/", "/mcp"): self.send_error(404, "Not Found") return if not self._auth_or_401(): return length = int(self.headers.get("Content-Length", 0)) if length == 0: self._send_json(400, {"error": "Empty body"}) return raw = self.rfile.read(length) try: req = json.loads(raw) except json.JSONDecodeError: self._send_json(400, {"error": "Invalid JSON"}) return method = req.get("method") params = req.get("params") or {} req_id = req.get("id") # MCP initialize handshake if method == "initialize": self._send_json(200, { "jsonrpc": "2.0", "id": req_id, "result": { "protocolVersion": "2025-11-25", "capabilities": { "tools": {} }, "serverInfo": { "name": "mcp-time-tools", "version": "1.0.0" } } }) return # tools/list if method == "tools/list": self._send_json(200, { "jsonrpc": "2.0", "id": req_id, "result": TOOL_SCHEMA }) return # tools/call if method == "tools/call": tool_name = params.get("name") tool_args = params.get("arguments") or {} try: result = self._call_tool(tool_name, tool_args) self._send_json(200, { "jsonrpc": "2.0", "id": req_id, "result": { "content": [ {"type": "text", "text": json.dumps(result, ensure_ascii=False)} ] } }) except Exception as e: self._send_json(200, { "jsonrpc": "2.0", "id": req_id, "error": { "code": -32000, "message": str(e) } }) return self._send_json(400, {"error": "Unknown method: " + str(method)}) def _call_tool(self, name: str, args: Dict[str, Any]) -> Any: if name == "get_current_time": tz_str = args.get("timezone") or "UTC" tz = safe_get_tz(tz_str) now = datetime.now(tz) return { "timezone": tz_str, "iso": now.isoformat(), "readable": now.strftime("%Y-%m-%d %H:%M:%S %Z") } if name == "convert_timezone": dt = parse_iso_datetime(args["datetime"]) to_tz = safe_get_tz(args["to_timezone"]) if "from_timezone" in args: from_tz = safe_get_tz(args["from_timezone"]) if dt.tzinfo is None: dt = dt.replace(tzinfo=from_tz) else: dt = dt.astimezone(from_tz) else: if dt.tzinfo is None: dt = dt.replace(tzinfo=timezone.utc) converted = dt.astimezone(to_tz) return { "original": dt.isoformat(), "converted": converted.isoformat(), "to_timezone": args["to_timezone"] } if name == "add_time": dt = parse_iso_datetime(args["datetime"]) td = parse_duration_to_timedelta(args["duration"]) result = dt + td return { "original": dt.isoformat(), "duration": args["duration"], "result": result.isoformat() } if name == "subtract_time": dt = parse_iso_datetime(args["datetime"]) td = parse_duration_to_timedelta(args["duration"]) result = dt - td return { "original": dt.isoformat(), "duration": args["duration"], "result": result.isoformat() } if name == "parse_natural_language": ref_str = args.get("reference_time") tz_str = args.get("timezone") or "UTC" tz = safe_get_tz(tz_str) ref = parse_iso_datetime(ref_str) if ref_str else datetime.now(tz) parsed = parse_natural_language_time(args["expression"], ref, tz) return { "expression": args["expression"], "reference_time": ref.isoformat(), "parsed": parsed.isoformat(), "timezone": tz_str } if name == "compare_times": t1 = parse_iso_datetime(args["time1"]) t2 = parse_iso_datetime(args["time2"]) diff = t2 - t1 total_seconds = diff.total_seconds() days, rem = divmod(abs(int(total_seconds)), 86400) hours, rem = divmod(rem, 3600) minutes, seconds = divmod(rem, 60) if total_seconds > 0: relation = "time2 is after time1" elif total_seconds < 0: relation = "time2 is before time1" else: relation = "times are equal" return { "time1": t1.isoformat(), "time2": t2.isoformat(), "relation": relation, "difference": { "total_seconds": total_seconds, "days": days, "hours": hours, "minutes": minutes, "seconds": seconds } } if name == "format_time": dt = parse_iso_datetime(args["datetime"]) fmt = args["format"] formatted = format_datetime(dt, fmt) return { "datetime": dt.isoformat(), "format": fmt, "formatted": formatted } raise ValueError(f"Unknown tool: {name}") def main(): port = int(sys.argv[1]) if len(sys.argv) > 1 else int(os.environ.get("PORT", "8080")) server = HTTPServer(("0.0.0.0", port), MCPTimeToolsHandler) mode = "auth enabled" if API_KEY else "no auth (API_KEY not set)" print(f"MCP Time Tools HTTP server listening on 0.0.0.0:{port} [{mode}]") try: server.serve_forever() except KeyboardInterrupt: print("\nShutting down...") server.server_close() if __name__ == "__main__": main()