#!/usr/bin/env python3 """ MCP Time Tools Server (HTTP transport) Tools: - get_current_time - convert_timezone - add_time - subtract_time - parse_natural_language - compare_times - format_time """ import json import sys from datetime import datetime, timedelta, timezone from zoneinfo import ZoneInfo, ZoneInfoNotFoundError from http.server import HTTPServer, BaseHTTPRequestHandler from typing import Any, Dict, List, Optional # ---------- MCP helpers ---------- TOOL_SCHEMA: Dict[str, Any] = { "tools": [ { "name": "get_current_time", "description": "Get the current date and time, optionally in a specific timezone.", "inputSchema": { "type": "object", "properties": { "timezone": { "type": "string", "description": "Timezone (e.g. 'UTC', 'America/New_York'). Defaults to UTC." } }, "required": [] } }, { "name": "convert_timezone", "description": "Convert a datetime from one timezone to another.", "inputSchema": { "type": "object", "properties": { "datetime": { "type": "string", "description": "ISO 8601 datetime (e.g. '2025-01-01T12:00:00+00:00')." }, "from_timezone": { "type": "string", "description": "Source timezone (e.g. 'UTC', 'America/New_York'). Optional." }, "to_timezone": { "type": "string", "description": "Target timezone (e.g. 'Asia/Tokyo')." } }, "required": ["datetime", "to_timezone"] } }, { "name": "add_time", "description": "Add a duration to a datetime. Duration in ISO 8601 (e.g. 'P1DT2H30M') or simple tokens like '2 hours', '3 days'." , "inputSchema": { "type": "object", "properties": { "datetime": { "type": "string", "description": "ISO 8601 datetime." }, "duration": { "type": "string", "description": "Duration to add, e.g. 'P1DT2H30M', '2 hours', '3 days'." } }, "required": ["datetime", "duration"] } }, { "name": "subtract_time", "description": "Subtract a duration from a datetime.", "inputSchema": { "type": "object", "properties": { "datetime": { "type": "string", "description": "ISO 8601 datetime." }, "duration": { "type": "string", "description": "Duration to subtract, e.g. 'P1DT2H30M', '2 hours', '3 days'." } }, "required": ["datetime", "duration"] } }, { "name": "parse_natural_language", "description": "Parse natural language time expressions like 'yesterday', 'next month', 'in 3 days' into ISO 8601." , "inputSchema": { "type": "object", "properties": { "expression": { "type": "string", "description": "Natural language time expression, e.g. 'yesterday', 'next month', 'in 2 days'." }, "reference_time": { "type": "string", "description": "Optional ISO 8601 reference time. Defaults to now." }, "timezone": { "type": "string", "description": "Timezone for the result (e.g. 'UTC', 'America/New_York'). Defaults to UTC." } }, "required": ["expression"] } }, { "name": "compare_times", "description": "Compare two ISO 8601 datetimes and return their difference.", "inputSchema": { "type": "object", "properties": { "time1": { "type": "string", "description": "First ISO 8601 datetime." }, "time2": { "type": "string", "description": "Second ISO 8601 datetime." } }, "required": ["time1", "time2"] } }, { "name": "format_time", "description": "Format a datetime using a predefined or custom format pattern.", "inputSchema": { "type": "object", "properties": { "datetime": { "type": "string", "description": "ISO 8601 datetime." }, "format": { "type": "string", "description": "Format: 'iso', 'iso-compact', 'rfc822', 'readable', or a Python strftime pattern." } }, "required": ["datetime", "format"] } } ] } def parse_iso_datetime(value: str) -> datetime: """ Parse ISO 8601-ish datetime strings. """ value = value.strip() # If no timezone info, treat as UTC if not any(ch in value for ch in ("+", "-", "Z")) or value.endswith("Z"): value = value.replace("Z", "+00:00") if "T" in value and ("+" not in value.split("T", 1)[1] and "-" not in value.split("T", 1)[1]): value = value + "+00:00" return datetime.fromisoformat(value) def parse_duration_to_timedelta(dur: str) -> timedelta: """ Parse duration in: - ISO 8601: P1DT2H30M, PT1H, P3D, etc. - Simple tokens: "2 hours", "3 days", "1 week", "5 minutes" """ s = dur.strip() # ISO 8601 duration (starts with P) if s.upper().startswith("P"): from dateutil import parser as dateutil_parser return dateutil_parser.parse_duration(s) # Natural tokens: "2 hours", "3 days", etc. tokens = s.lower().split() td = timedelta() for token in tokens: if not token: continue num_part = "".join(ch for ch in token if ch.isdigit() or ch == ".") unit_part = token.replace(num_part, "").strip() if not num_part: continue num = float(num_part) unit = unit_part.rstrip("s") # normalize singular if unit in ("second", "sec"): td += timedelta(seconds=num) elif unit in ("minute", "min"): td += timedelta(minutes=num) elif unit in ("hour", "hr"): td += timedelta(hours=num) elif unit in ("day",): td += timedelta(days=num) elif unit in ("week",): td += timedelta(weeks=num) elif unit in ("month",): # approximate: 30 days td += timedelta(days=30 * num) elif unit in ("year",): # approximate: 365 days td += timedelta(days=365 * num) else: raise ValueError(f"Unknown duration unit: {unit_part}") return td def safe_get_tz(tz_str: str) -> ZoneInfo: try: return ZoneInfo(tz_str) except (ZoneInfoNotFoundError, KeyError, Exception): raise ValueError(f"Unknown timezone: {tz_str}") def parse_natural_language_time( expression: str, ref: datetime, tz: timezone ) -> datetime: """ Interpret common natural language time expressions. """ expr = expression.strip().lower() now = ref.astimezone(tz) # Direct words if expr in ("now", "current time", "today"): return now if expr in ("yesterday",): return now - timedelta(days=1) if expr in ("tomorrow",): return now + timedelta(days=1) # "in X days/hours/minutes/weeks" import re m = re.match( r"^(?:in\s+)?(\d+(?:\.\d+)?)\s*(day|days|hour|hours|minute|minutes|week|weeks|month|months|year|years)\s*$", expr, ) if m: val = float(m.group(1)) unit = m.group(2).rstrip("s") if unit in ("day",): return now + timedelta(days=val) if unit in ("hour",): return now + timedelta(hours=val) if unit in ("minute",): return now + timedelta(minutes=val) if unit in ("week",): return now + timedelta(weeks=val) if unit in ("month",): # approximate return now + timedelta(days=30 * val) if unit in ("year",): # approximate return now + timedelta(days=365 * val) # "last week" if expr in ("last week",): return now - timedelta(weeks=1) # "next week" if expr in ("next week",): return now + timedelta(weeks=1) # "next month" if expr in ("next month",): month = now.month + 1 year = now.year if month > 12: month = 1 year += 1 try: return now.replace(year=year, month=month) except ValueError: # fallback: 30 days return now + timedelta(days=30) # "last month" if expr in ("last month",): month = now.month - 1 year = now.year if month < 1: month = 12 year -= 1 try: return now.replace(year=year, month=month) except ValueError: return now - timedelta(days=30) # "end of day" if expr in ("end of day", "end of today"): return now.replace(hour=23, minute=59, second=59, microsecond=999999) # "start of day" if expr in ("start of day", "start of today"): return now.replace(hour=0, minute=0, second=0, microsecond=0) # Fallback: try to parse as ISO try: dt = parse_iso_datetime(expression) return dt.astimezone(tz) except Exception: raise ValueError(f"Could not parse natural language time expression: {expression}") def format_datetime(dt: datetime, fmt: str) -> str: fmt = fmt.strip().lower() if fmt == "iso": return dt.isoformat() if fmt == "iso-compact": return dt.isoformat(timespec="seconds").replace(":", "") if fmt == "rfc822": return dt.strftime("%a, %d %b %Y %H:%M:%S %z") if fmt == "readable": return dt.strftime("%Y-%m-%d %H:%M:%S %Z") # treat as strftime pattern return dt.strftime(fmt) # ---------- MCP HTTP handler ---------- class MCPTimeToolsHandler(BaseHTTPRequestHandler): def log_message(self, format, *args): # Quiet logs by default pass def _send_json(self, status: int, payload: Any): body = json.dumps(payload, ensure_ascii=False).encode("utf-8") self.send_response(status) self.send_header("Content-Type", "application/json") self.send_header("Content-Length", str(len(body))) self.end_headers() self.wfile.write(body) def do_GET(self): # Root: simple info if self.path == "/": self._send_json(200, { "service": "mcp-time-tools", "transport": "http", "docs": "Use POST /mcp with MCP JSON-RPC requests." }) return # MCP discover tools via GET /mcp if self.path == "/mcp": self._send_json(200, { "jsonrpc": "2.0", "tools": TOOL_SCHEMA["tools"] }) return self.send_error(404, "Not Found") def do_POST(self): if self.path != "/mcp": self.send_error(404, "Not Found") return length = int(self.headers.get("Content-Length", 0)) if length == 0: self._send_json(400, {"error": "Empty body"}) return raw = self.rfile.read(length) try: req = json.loads(raw) except json.JSONDecodeError: self._send_json(400, {"error": "Invalid JSON"}) return method = req.get("method") params = req.get("params") or {} req_id = req.get("id") # MCP tool calling pattern: tools/call with name + arguments if method == "tools/call": tool_name = params.get("name") tool_args = params.get("arguments") or {} try: result = self._call_tool(tool_name, tool_args) self._send_json(200, { "jsonrpc": "2.0", "id": req_id, "result": { "content": [ {"type": "text", "text": json.dumps(result, ensure_ascii=False)} ] } }) except Exception as e: self._send_json(200, { "jsonrpc": "2.0", "id": req_id, "error": { "code": -32000, "message": str(e) } }) return # If client asks for tools list via JSON-RPC if method == "tools/list": self._send_json(200, { "jsonrpc": "2.0", "id": req_id, "result": TOOL_SCHEMA }) return # Fallback self._send_json(400, {"error": "Unknown method: " + str(method)}) def _call_tool(self, name: str, args: Dict[str, Any]) -> Any: if name == "get_current_time": tz_str = args.get("timezone") or "UTC" tz = safe_get_tz(tz_str) now = datetime.now(tz) return { "timezone": tz_str, "iso": now.isoformat(), "readable": now.strftime("%Y-%m-%d %H:%M:%S %Z") } if name == "convert_timezone": dt = parse_iso_datetime(args["datetime"]) to_tz = safe_get_tz(args["to_timezone"]) # If from_timezone provided, interpret dt in that tz if "from_timezone" in args: from_tz = safe_get_tz(args["from_timezone"]) if dt.tzinfo is None: dt = dt.replace(tzinfo=from_tz) else: dt = dt.astimezone(from_tz) else: # If no tz, assume UTC if dt.tzinfo is None: dt = dt.replace(tzinfo=timezone.utc) converted = dt.astimezone(to_tz) return { "original": dt.isoformat(), "converted": converted.isoformat(), "to_timezone": args["to_timezone"] } if name == "add_time": dt = parse_iso_datetime(args["datetime"]) td = parse_duration_to_timedelta(args["duration"]) result = dt + td return { "original": dt.isoformat(), "duration": args["duration"], "result": result.isoformat() } if name == "subtract_time": dt = parse_iso_datetime(args["datetime"]) td = parse_duration_to_timedelta(args["duration"]) result = dt - td return { "original": dt.isoformat(), "duration": args["duration"], "result": result.isoformat() } if name == "parse_natural_language": ref_str = args.get("reference_time") tz_str = args.get("timezone") or "UTC" tz = safe_get_tz(tz_str) ref = parse_iso_datetime(ref_str) if ref_str else datetime.now(tz) parsed = parse_natural_language_time(args["expression"], ref, tz) return { "expression": args["expression"], "reference_time": ref.isoformat(), "parsed": parsed.isoformat(), "timezone": tz_str } if name == "compare_times": t1 = parse_iso_datetime(args["time1"]) t2 = parse_iso_datetime(args["time2"]) diff = t2 - t1 total_seconds = diff.total_seconds() days, rem = divmod(abs(int(total_seconds)), 86400) hours, rem = divmod(rem, 3600) minutes, seconds = divmod(rem, 60) if total_seconds > 0: relation = "time2 is after time1" elif total_seconds < 0: relation = "time2 is before time1" else: relation = "times are equal" return { "time1": t1.isoformat(), "time2": t2.isoformat(), "relation": relation, "difference": { "total_seconds": total_seconds, "days": days, "hours": hours, "minutes": minutes, "seconds": seconds } } if name == "format_time": dt = parse_iso_datetime(args["datetime"]) fmt = args["format"] formatted = format_datetime(dt, fmt) return { "datetime": dt.isoformat(), "format": fmt, "formatted": formatted } raise ValueError(f"Unknown tool: {name}") def main(): # Allow optional port override via environment or argument port = int(sys.argv[1]) if len(sys.argv) > 1 else int(__import__("os").environ.get("PORT", "8080")) server = HTTPServer(("0.0.0.0", port), MCPTimeToolsHandler) print(f"MCP Time Tools HTTP server listening on 0.0.0.0:{port}") try: server.serve_forever() except KeyboardInterrupt: print("\nShutting down...") server.server_close() if __name__ == "__main__": main()