diff --git a/mcp_time_server.py b/mcp_time_server.py new file mode 100644 index 0000000..8423038 --- /dev/null +++ b/mcp_time_server.py @@ -0,0 +1,553 @@ +#!/usr/bin/env python3 +""" +MCP Time Tools Server (HTTP transport) + +Tools: +- get_current_time +- convert_timezone +- add_time +- subtract_time +- parse_natural_language +- compare_times +- format_time +""" + +import json +import sys +from datetime import datetime, timedelta, timezone +from zoneinfo import ZoneInfo, ZoneInfoNotFoundError +from http.server import HTTPServer, BaseHTTPRequestHandler +from typing import Any, Dict, List, Optional + +# ---------- MCP helpers ---------- + +TOOL_SCHEMA: Dict[str, Any] = { + "tools": [ + { + "name": "get_current_time", + "description": "Get the current date and time, optionally in a specific timezone.", + "inputSchema": { + "type": "object", + "properties": { + "timezone": { + "type": "string", + "description": "Timezone (e.g. 'UTC', 'America/New_York'). Defaults to UTC." + } + }, + "required": [] + } + }, + { + "name": "convert_timezone", + "description": "Convert a datetime from one timezone to another.", + "inputSchema": { + "type": "object", + "properties": { + "datetime": { + "type": "string", + "description": "ISO 8601 datetime (e.g. '2025-01-01T12:00:00+00:00')." + }, + "from_timezone": { + "type": "string", + "description": "Source timezone (e.g. 'UTC', 'America/New_York'). Optional." + }, + "to_timezone": { + "type": "string", + "description": "Target timezone (e.g. 'Asia/Tokyo')." + } + }, + "required": ["datetime", "to_timezone"] + } + }, + { + "name": "add_time", + "description": "Add a duration to a datetime. Duration in ISO 8601 (e.g. 'P1DT2H30M') or simple tokens like '2 hours', '3 days'." + , + "inputSchema": { + "type": "object", + "properties": { + "datetime": { + "type": "string", + "description": "ISO 8601 datetime." + }, + "duration": { + "type": "string", + "description": "Duration to add, e.g. 'P1DT2H30M', '2 hours', '3 days'." + } + }, + "required": ["datetime", "duration"] + } + }, + { + "name": "subtract_time", + "description": "Subtract a duration from a datetime.", + "inputSchema": { + "type": "object", + "properties": { + "datetime": { + "type": "string", + "description": "ISO 8601 datetime." + }, + "duration": { + "type": "string", + "description": "Duration to subtract, e.g. 'P1DT2H30M', '2 hours', '3 days'." + } + }, + "required": ["datetime", "duration"] + } + }, + { + "name": "parse_natural_language", + "description": "Parse natural language time expressions like 'yesterday', 'next month', 'in 3 days' into ISO 8601." + , + "inputSchema": { + "type": "object", + "properties": { + "expression": { + "type": "string", + "description": "Natural language time expression, e.g. 'yesterday', 'next month', 'in 2 days'." + }, + "reference_time": { + "type": "string", + "description": "Optional ISO 8601 reference time. Defaults to now." + }, + "timezone": { + "type": "string", + "description": "Timezone for the result (e.g. 'UTC', 'America/New_York'). Defaults to UTC." + } + }, + "required": ["expression"] + } + }, + { + "name": "compare_times", + "description": "Compare two ISO 8601 datetimes and return their difference.", + "inputSchema": { + "type": "object", + "properties": { + "time1": { + "type": "string", + "description": "First ISO 8601 datetime." + }, + "time2": { + "type": "string", + "description": "Second ISO 8601 datetime." + } + }, + "required": ["time1", "time2"] + } + }, + { + "name": "format_time", + "description": "Format a datetime using a predefined or custom format pattern.", + "inputSchema": { + "type": "object", + "properties": { + "datetime": { + "type": "string", + "description": "ISO 8601 datetime." + }, + "format": { + "type": "string", + "description": "Format: 'iso', 'iso-compact', 'rfc822', 'readable', or a Python strftime pattern." + } + }, + "required": ["datetime", "format"] + } + } + ] +} + + +def parse_iso_datetime(value: str) -> datetime: + """ + Parse ISO 8601-ish datetime strings. + """ + value = value.strip() + # If no timezone info, treat as UTC + if not any(ch in value for ch in ("+", "-", "Z")) or value.endswith("Z"): + value = value.replace("Z", "+00:00") + if "T" in value and ("+" not in value.split("T", 1)[1] and "-" not in value.split("T", 1)[1]): + value = value + "+00:00" + return datetime.fromisoformat(value) + + +def parse_duration_to_timedelta(dur: str) -> timedelta: + """ + Parse duration in: + - ISO 8601: P1DT2H30M, PT1H, P3D, etc. + - Simple tokens: "2 hours", "3 days", "1 week", "5 minutes" + """ + s = dur.strip() + + # ISO 8601 duration (starts with P) + if s.upper().startswith("P"): + from dateutil import parser as dateutil_parser + return dateutil_parser.parse_duration(s) + + # Natural tokens: "2 hours", "3 days", etc. + tokens = s.lower().split() + td = timedelta() + for token in tokens: + if not token: + continue + num_part = "".join(ch for ch in token if ch.isdigit() or ch == ".") + unit_part = token.replace(num_part, "").strip() + if not num_part: + continue + num = float(num_part) + unit = unit_part.rstrip("s") # normalize singular + if unit in ("second", "sec"): + td += timedelta(seconds=num) + elif unit in ("minute", "min"): + td += timedelta(minutes=num) + elif unit in ("hour", "hr"): + td += timedelta(hours=num) + elif unit in ("day",): + td += timedelta(days=num) + elif unit in ("week",): + td += timedelta(weeks=num) + elif unit in ("month",): + # approximate: 30 days + td += timedelta(days=30 * num) + elif unit in ("year",): + # approximate: 365 days + td += timedelta(days=365 * num) + else: + raise ValueError(f"Unknown duration unit: {unit_part}") + return td + + +def safe_get_tz(tz_str: str) -> ZoneInfo: + try: + return ZoneInfo(tz_str) + except (ZoneInfoNotFoundError, KeyError, Exception): + raise ValueError(f"Unknown timezone: {tz_str}") + + +def parse_natural_language_time( + expression: str, + ref: datetime, + tz: timezone +) -> datetime: + """ + Interpret common natural language time expressions. + """ + expr = expression.strip().lower() + now = ref.astimezone(tz) + + # Direct words + if expr in ("now", "current time", "today"): + return now + + if expr in ("yesterday",): + return now - timedelta(days=1) + + if expr in ("tomorrow",): + return now + timedelta(days=1) + + # "in X days/hours/minutes/weeks" + import re + m = re.match( + r"^(?:in\s+)?(\d+(?:\.\d+)?)\s*(day|days|hour|hours|minute|minutes|week|weeks|month|months|year|years)\s*$", + expr, + ) + if m: + val = float(m.group(1)) + unit = m.group(2).rstrip("s") + if unit in ("day",): + return now + timedelta(days=val) + if unit in ("hour",): + return now + timedelta(hours=val) + if unit in ("minute",): + return now + timedelta(minutes=val) + if unit in ("week",): + return now + timedelta(weeks=val) + if unit in ("month",): + # approximate + return now + timedelta(days=30 * val) + if unit in ("year",): + # approximate + return now + timedelta(days=365 * val) + + # "last week" + if expr in ("last week",): + return now - timedelta(weeks=1) + + # "next week" + if expr in ("next week",): + return now + timedelta(weeks=1) + + # "next month" + if expr in ("next month",): + month = now.month + 1 + year = now.year + if month > 12: + month = 1 + year += 1 + try: + return now.replace(year=year, month=month) + except ValueError: + # fallback: 30 days + return now + timedelta(days=30) + + # "last month" + if expr in ("last month",): + month = now.month - 1 + year = now.year + if month < 1: + month = 12 + year -= 1 + try: + return now.replace(year=year, month=month) + except ValueError: + return now - timedelta(days=30) + + # "end of day" + if expr in ("end of day", "end of today"): + return now.replace(hour=23, minute=59, second=59, microsecond=999999) + + # "start of day" + if expr in ("start of day", "start of today"): + return now.replace(hour=0, minute=0, second=0, microsecond=0) + + # Fallback: try to parse as ISO + try: + dt = parse_iso_datetime(expression) + return dt.astimezone(tz) + except Exception: + raise ValueError(f"Could not parse natural language time expression: {expression}") + + +def format_datetime(dt: datetime, fmt: str) -> str: + fmt = fmt.strip().lower() + + if fmt == "iso": + return dt.isoformat() + if fmt == "iso-compact": + return dt.isoformat(timespec="seconds").replace(":", "") + if fmt == "rfc822": + return dt.strftime("%a, %d %b %Y %H:%M:%S %z") + if fmt == "readable": + return dt.strftime("%Y-%m-%d %H:%M:%S %Z") + + # treat as strftime pattern + return dt.strftime(fmt) + + +# ---------- MCP HTTP handler ---------- + +class MCPTimeToolsHandler(BaseHTTPRequestHandler): + def log_message(self, format, *args): + # Quiet logs by default + pass + + def _send_json(self, status: int, payload: Any): + body = json.dumps(payload, ensure_ascii=False).encode("utf-8") + self.send_response(status) + self.send_header("Content-Type", "application/json") + self.send_header("Content-Length", str(len(body))) + self.end_headers() + self.wfile.write(body) + + def do_GET(self): + # Root: simple info + if self.path == "/": + self._send_json(200, { + "service": "mcp-time-tools", + "transport": "http", + "docs": "Use POST /mcp with MCP JSON-RPC requests." + }) + return + + # MCP discover tools via GET /mcp + if self.path == "/mcp": + self._send_json(200, { + "jsonrpc": "2.0", + "tools": TOOL_SCHEMA["tools"] + }) + return + + self.send_error(404, "Not Found") + + def do_POST(self): + if self.path != "/mcp": + self.send_error(404, "Not Found") + return + + length = int(self.headers.get("Content-Length", 0)) + if length == 0: + self._send_json(400, {"error": "Empty body"}) + return + + raw = self.rfile.read(length) + try: + req = json.loads(raw) + except json.JSONDecodeError: + self._send_json(400, {"error": "Invalid JSON"}) + return + + method = req.get("method") + params = req.get("params") or {} + req_id = req.get("id") + + # MCP tool calling pattern: tools/call with name + arguments + if method == "tools/call": + tool_name = params.get("name") + tool_args = params.get("arguments") or {} + try: + result = self._call_tool(tool_name, tool_args) + self._send_json(200, { + "jsonrpc": "2.0", + "id": req_id, + "result": { + "content": [ + {"type": "text", "text": json.dumps(result, ensure_ascii=False)} + ] + } + }) + except Exception as e: + self._send_json(200, { + "jsonrpc": "2.0", + "id": req_id, + "error": { + "code": -32000, + "message": str(e) + } + }) + return + + # If client asks for tools list via JSON-RPC + if method == "tools/list": + self._send_json(200, { + "jsonrpc": "2.0", + "id": req_id, + "result": TOOL_SCHEMA + }) + return + + # Fallback + self._send_json(400, {"error": "Unknown method: " + str(method)}) + + def _call_tool(self, name: str, args: Dict[str, Any]) -> Any: + if name == "get_current_time": + tz_str = args.get("timezone") or "UTC" + tz = safe_get_tz(tz_str) + now = datetime.now(tz) + return { + "timezone": tz_str, + "iso": now.isoformat(), + "readable": now.strftime("%Y-%m-%d %H:%M:%S %Z") + } + + if name == "convert_timezone": + dt = parse_iso_datetime(args["datetime"]) + to_tz = safe_get_tz(args["to_timezone"]) + # If from_timezone provided, interpret dt in that tz + if "from_timezone" in args: + from_tz = safe_get_tz(args["from_timezone"]) + if dt.tzinfo is None: + dt = dt.replace(tzinfo=from_tz) + else: + dt = dt.astimezone(from_tz) + else: + # If no tz, assume UTC + if dt.tzinfo is None: + dt = dt.replace(tzinfo=timezone.utc) + + converted = dt.astimezone(to_tz) + return { + "original": dt.isoformat(), + "converted": converted.isoformat(), + "to_timezone": args["to_timezone"] + } + + if name == "add_time": + dt = parse_iso_datetime(args["datetime"]) + td = parse_duration_to_timedelta(args["duration"]) + result = dt + td + return { + "original": dt.isoformat(), + "duration": args["duration"], + "result": result.isoformat() + } + + if name == "subtract_time": + dt = parse_iso_datetime(args["datetime"]) + td = parse_duration_to_timedelta(args["duration"]) + result = dt - td + return { + "original": dt.isoformat(), + "duration": args["duration"], + "result": result.isoformat() + } + + if name == "parse_natural_language": + ref_str = args.get("reference_time") + tz_str = args.get("timezone") or "UTC" + tz = safe_get_tz(tz_str) + ref = parse_iso_datetime(ref_str) if ref_str else datetime.now(tz) + parsed = parse_natural_language_time(args["expression"], ref, tz) + return { + "expression": args["expression"], + "reference_time": ref.isoformat(), + "parsed": parsed.isoformat(), + "timezone": tz_str + } + + if name == "compare_times": + t1 = parse_iso_datetime(args["time1"]) + t2 = parse_iso_datetime(args["time2"]) + diff = t2 - t1 + total_seconds = diff.total_seconds() + days, rem = divmod(abs(int(total_seconds)), 86400) + hours, rem = divmod(rem, 3600) + minutes, seconds = divmod(rem, 60) + + if total_seconds > 0: + relation = "time2 is after time1" + elif total_seconds < 0: + relation = "time2 is before time1" + else: + relation = "times are equal" + + return { + "time1": t1.isoformat(), + "time2": t2.isoformat(), + "relation": relation, + "difference": { + "total_seconds": total_seconds, + "days": days, + "hours": hours, + "minutes": minutes, + "seconds": seconds + } + } + + if name == "format_time": + dt = parse_iso_datetime(args["datetime"]) + fmt = args["format"] + formatted = format_datetime(dt, fmt) + return { + "datetime": dt.isoformat(), + "format": fmt, + "formatted": formatted + } + + raise ValueError(f"Unknown tool: {name}") + + +def main(): + # Allow optional port override via environment or argument + port = int(sys.argv[1]) if len(sys.argv) > 1 else int(__import__("os").environ.get("PORT", "8080")) + server = HTTPServer(("0.0.0.0", port), MCPTimeToolsHandler) + print(f"MCP Time Tools HTTP server listening on 0.0.0.0:{port}") + try: + server.serve_forever() + except KeyboardInterrupt: + print("\nShutting down...") + server.server_close() + + +if __name__ == "__main__": + main()