Add mcp_time_server.py

This commit is contained in:
2026-06-12 03:13:27 +00:00
parent 392d06023f
commit ebf31e67d3
+553
View File
@@ -0,0 +1,553 @@
#!/usr/bin/env python3
"""
MCP Time Tools Server (HTTP transport)
Tools:
- get_current_time
- convert_timezone
- add_time
- subtract_time
- parse_natural_language
- compare_times
- format_time
"""
import json
import sys
from datetime import datetime, timedelta, timezone
from zoneinfo import ZoneInfo, ZoneInfoNotFoundError
from http.server import HTTPServer, BaseHTTPRequestHandler
from typing import Any, Dict, List, Optional
# ---------- MCP helpers ----------
TOOL_SCHEMA: Dict[str, Any] = {
"tools": [
{
"name": "get_current_time",
"description": "Get the current date and time, optionally in a specific timezone.",
"inputSchema": {
"type": "object",
"properties": {
"timezone": {
"type": "string",
"description": "Timezone (e.g. 'UTC', 'America/New_York'). Defaults to UTC."
}
},
"required": []
}
},
{
"name": "convert_timezone",
"description": "Convert a datetime from one timezone to another.",
"inputSchema": {
"type": "object",
"properties": {
"datetime": {
"type": "string",
"description": "ISO 8601 datetime (e.g. '2025-01-01T12:00:00+00:00')."
},
"from_timezone": {
"type": "string",
"description": "Source timezone (e.g. 'UTC', 'America/New_York'). Optional."
},
"to_timezone": {
"type": "string",
"description": "Target timezone (e.g. 'Asia/Tokyo')."
}
},
"required": ["datetime", "to_timezone"]
}
},
{
"name": "add_time",
"description": "Add a duration to a datetime. Duration in ISO 8601 (e.g. 'P1DT2H30M') or simple tokens like '2 hours', '3 days'."
,
"inputSchema": {
"type": "object",
"properties": {
"datetime": {
"type": "string",
"description": "ISO 8601 datetime."
},
"duration": {
"type": "string",
"description": "Duration to add, e.g. 'P1DT2H30M', '2 hours', '3 days'."
}
},
"required": ["datetime", "duration"]
}
},
{
"name": "subtract_time",
"description": "Subtract a duration from a datetime.",
"inputSchema": {
"type": "object",
"properties": {
"datetime": {
"type": "string",
"description": "ISO 8601 datetime."
},
"duration": {
"type": "string",
"description": "Duration to subtract, e.g. 'P1DT2H30M', '2 hours', '3 days'."
}
},
"required": ["datetime", "duration"]
}
},
{
"name": "parse_natural_language",
"description": "Parse natural language time expressions like 'yesterday', 'next month', 'in 3 days' into ISO 8601."
,
"inputSchema": {
"type": "object",
"properties": {
"expression": {
"type": "string",
"description": "Natural language time expression, e.g. 'yesterday', 'next month', 'in 2 days'."
},
"reference_time": {
"type": "string",
"description": "Optional ISO 8601 reference time. Defaults to now."
},
"timezone": {
"type": "string",
"description": "Timezone for the result (e.g. 'UTC', 'America/New_York'). Defaults to UTC."
}
},
"required": ["expression"]
}
},
{
"name": "compare_times",
"description": "Compare two ISO 8601 datetimes and return their difference.",
"inputSchema": {
"type": "object",
"properties": {
"time1": {
"type": "string",
"description": "First ISO 8601 datetime."
},
"time2": {
"type": "string",
"description": "Second ISO 8601 datetime."
}
},
"required": ["time1", "time2"]
}
},
{
"name": "format_time",
"description": "Format a datetime using a predefined or custom format pattern.",
"inputSchema": {
"type": "object",
"properties": {
"datetime": {
"type": "string",
"description": "ISO 8601 datetime."
},
"format": {
"type": "string",
"description": "Format: 'iso', 'iso-compact', 'rfc822', 'readable', or a Python strftime pattern."
}
},
"required": ["datetime", "format"]
}
}
]
}
def parse_iso_datetime(value: str) -> datetime:
"""
Parse ISO 8601-ish datetime strings.
"""
value = value.strip()
# If no timezone info, treat as UTC
if not any(ch in value for ch in ("+", "-", "Z")) or value.endswith("Z"):
value = value.replace("Z", "+00:00")
if "T" in value and ("+" not in value.split("T", 1)[1] and "-" not in value.split("T", 1)[1]):
value = value + "+00:00"
return datetime.fromisoformat(value)
def parse_duration_to_timedelta(dur: str) -> timedelta:
"""
Parse duration in:
- ISO 8601: P1DT2H30M, PT1H, P3D, etc.
- Simple tokens: "2 hours", "3 days", "1 week", "5 minutes"
"""
s = dur.strip()
# ISO 8601 duration (starts with P)
if s.upper().startswith("P"):
from dateutil import parser as dateutil_parser
return dateutil_parser.parse_duration(s)
# Natural tokens: "2 hours", "3 days", etc.
tokens = s.lower().split()
td = timedelta()
for token in tokens:
if not token:
continue
num_part = "".join(ch for ch in token if ch.isdigit() or ch == ".")
unit_part = token.replace(num_part, "").strip()
if not num_part:
continue
num = float(num_part)
unit = unit_part.rstrip("s") # normalize singular
if unit in ("second", "sec"):
td += timedelta(seconds=num)
elif unit in ("minute", "min"):
td += timedelta(minutes=num)
elif unit in ("hour", "hr"):
td += timedelta(hours=num)
elif unit in ("day",):
td += timedelta(days=num)
elif unit in ("week",):
td += timedelta(weeks=num)
elif unit in ("month",):
# approximate: 30 days
td += timedelta(days=30 * num)
elif unit in ("year",):
# approximate: 365 days
td += timedelta(days=365 * num)
else:
raise ValueError(f"Unknown duration unit: {unit_part}")
return td
def safe_get_tz(tz_str: str) -> ZoneInfo:
try:
return ZoneInfo(tz_str)
except (ZoneInfoNotFoundError, KeyError, Exception):
raise ValueError(f"Unknown timezone: {tz_str}")
def parse_natural_language_time(
expression: str,
ref: datetime,
tz: timezone
) -> datetime:
"""
Interpret common natural language time expressions.
"""
expr = expression.strip().lower()
now = ref.astimezone(tz)
# Direct words
if expr in ("now", "current time", "today"):
return now
if expr in ("yesterday",):
return now - timedelta(days=1)
if expr in ("tomorrow",):
return now + timedelta(days=1)
# "in X days/hours/minutes/weeks"
import re
m = re.match(
r"^(?:in\s+)?(\d+(?:\.\d+)?)\s*(day|days|hour|hours|minute|minutes|week|weeks|month|months|year|years)\s*$",
expr,
)
if m:
val = float(m.group(1))
unit = m.group(2).rstrip("s")
if unit in ("day",):
return now + timedelta(days=val)
if unit in ("hour",):
return now + timedelta(hours=val)
if unit in ("minute",):
return now + timedelta(minutes=val)
if unit in ("week",):
return now + timedelta(weeks=val)
if unit in ("month",):
# approximate
return now + timedelta(days=30 * val)
if unit in ("year",):
# approximate
return now + timedelta(days=365 * val)
# "last week"
if expr in ("last week",):
return now - timedelta(weeks=1)
# "next week"
if expr in ("next week",):
return now + timedelta(weeks=1)
# "next month"
if expr in ("next month",):
month = now.month + 1
year = now.year
if month > 12:
month = 1
year += 1
try:
return now.replace(year=year, month=month)
except ValueError:
# fallback: 30 days
return now + timedelta(days=30)
# "last month"
if expr in ("last month",):
month = now.month - 1
year = now.year
if month < 1:
month = 12
year -= 1
try:
return now.replace(year=year, month=month)
except ValueError:
return now - timedelta(days=30)
# "end of day"
if expr in ("end of day", "end of today"):
return now.replace(hour=23, minute=59, second=59, microsecond=999999)
# "start of day"
if expr in ("start of day", "start of today"):
return now.replace(hour=0, minute=0, second=0, microsecond=0)
# Fallback: try to parse as ISO
try:
dt = parse_iso_datetime(expression)
return dt.astimezone(tz)
except Exception:
raise ValueError(f"Could not parse natural language time expression: {expression}")
def format_datetime(dt: datetime, fmt: str) -> str:
fmt = fmt.strip().lower()
if fmt == "iso":
return dt.isoformat()
if fmt == "iso-compact":
return dt.isoformat(timespec="seconds").replace(":", "")
if fmt == "rfc822":
return dt.strftime("%a, %d %b %Y %H:%M:%S %z")
if fmt == "readable":
return dt.strftime("%Y-%m-%d %H:%M:%S %Z")
# treat as strftime pattern
return dt.strftime(fmt)
# ---------- MCP HTTP handler ----------
class MCPTimeToolsHandler(BaseHTTPRequestHandler):
def log_message(self, format, *args):
# Quiet logs by default
pass
def _send_json(self, status: int, payload: Any):
body = json.dumps(payload, ensure_ascii=False).encode("utf-8")
self.send_response(status)
self.send_header("Content-Type", "application/json")
self.send_header("Content-Length", str(len(body)))
self.end_headers()
self.wfile.write(body)
def do_GET(self):
# Root: simple info
if self.path == "/":
self._send_json(200, {
"service": "mcp-time-tools",
"transport": "http",
"docs": "Use POST /mcp with MCP JSON-RPC requests."
})
return
# MCP discover tools via GET /mcp
if self.path == "/mcp":
self._send_json(200, {
"jsonrpc": "2.0",
"tools": TOOL_SCHEMA["tools"]
})
return
self.send_error(404, "Not Found")
def do_POST(self):
if self.path != "/mcp":
self.send_error(404, "Not Found")
return
length = int(self.headers.get("Content-Length", 0))
if length == 0:
self._send_json(400, {"error": "Empty body"})
return
raw = self.rfile.read(length)
try:
req = json.loads(raw)
except json.JSONDecodeError:
self._send_json(400, {"error": "Invalid JSON"})
return
method = req.get("method")
params = req.get("params") or {}
req_id = req.get("id")
# MCP tool calling pattern: tools/call with name + arguments
if method == "tools/call":
tool_name = params.get("name")
tool_args = params.get("arguments") or {}
try:
result = self._call_tool(tool_name, tool_args)
self._send_json(200, {
"jsonrpc": "2.0",
"id": req_id,
"result": {
"content": [
{"type": "text", "text": json.dumps(result, ensure_ascii=False)}
]
}
})
except Exception as e:
self._send_json(200, {
"jsonrpc": "2.0",
"id": req_id,
"error": {
"code": -32000,
"message": str(e)
}
})
return
# If client asks for tools list via JSON-RPC
if method == "tools/list":
self._send_json(200, {
"jsonrpc": "2.0",
"id": req_id,
"result": TOOL_SCHEMA
})
return
# Fallback
self._send_json(400, {"error": "Unknown method: " + str(method)})
def _call_tool(self, name: str, args: Dict[str, Any]) -> Any:
if name == "get_current_time":
tz_str = args.get("timezone") or "UTC"
tz = safe_get_tz(tz_str)
now = datetime.now(tz)
return {
"timezone": tz_str,
"iso": now.isoformat(),
"readable": now.strftime("%Y-%m-%d %H:%M:%S %Z")
}
if name == "convert_timezone":
dt = parse_iso_datetime(args["datetime"])
to_tz = safe_get_tz(args["to_timezone"])
# If from_timezone provided, interpret dt in that tz
if "from_timezone" in args:
from_tz = safe_get_tz(args["from_timezone"])
if dt.tzinfo is None:
dt = dt.replace(tzinfo=from_tz)
else:
dt = dt.astimezone(from_tz)
else:
# If no tz, assume UTC
if dt.tzinfo is None:
dt = dt.replace(tzinfo=timezone.utc)
converted = dt.astimezone(to_tz)
return {
"original": dt.isoformat(),
"converted": converted.isoformat(),
"to_timezone": args["to_timezone"]
}
if name == "add_time":
dt = parse_iso_datetime(args["datetime"])
td = parse_duration_to_timedelta(args["duration"])
result = dt + td
return {
"original": dt.isoformat(),
"duration": args["duration"],
"result": result.isoformat()
}
if name == "subtract_time":
dt = parse_iso_datetime(args["datetime"])
td = parse_duration_to_timedelta(args["duration"])
result = dt - td
return {
"original": dt.isoformat(),
"duration": args["duration"],
"result": result.isoformat()
}
if name == "parse_natural_language":
ref_str = args.get("reference_time")
tz_str = args.get("timezone") or "UTC"
tz = safe_get_tz(tz_str)
ref = parse_iso_datetime(ref_str) if ref_str else datetime.now(tz)
parsed = parse_natural_language_time(args["expression"], ref, tz)
return {
"expression": args["expression"],
"reference_time": ref.isoformat(),
"parsed": parsed.isoformat(),
"timezone": tz_str
}
if name == "compare_times":
t1 = parse_iso_datetime(args["time1"])
t2 = parse_iso_datetime(args["time2"])
diff = t2 - t1
total_seconds = diff.total_seconds()
days, rem = divmod(abs(int(total_seconds)), 86400)
hours, rem = divmod(rem, 3600)
minutes, seconds = divmod(rem, 60)
if total_seconds > 0:
relation = "time2 is after time1"
elif total_seconds < 0:
relation = "time2 is before time1"
else:
relation = "times are equal"
return {
"time1": t1.isoformat(),
"time2": t2.isoformat(),
"relation": relation,
"difference": {
"total_seconds": total_seconds,
"days": days,
"hours": hours,
"minutes": minutes,
"seconds": seconds
}
}
if name == "format_time":
dt = parse_iso_datetime(args["datetime"])
fmt = args["format"]
formatted = format_datetime(dt, fmt)
return {
"datetime": dt.isoformat(),
"format": fmt,
"formatted": formatted
}
raise ValueError(f"Unknown tool: {name}")
def main():
# Allow optional port override via environment or argument
port = int(sys.argv[1]) if len(sys.argv) > 1 else int(__import__("os").environ.get("PORT", "8080"))
server = HTTPServer(("0.0.0.0", port), MCPTimeToolsHandler)
print(f"MCP Time Tools HTTP server listening on 0.0.0.0:{port}")
try:
server.serve_forever()
except KeyboardInterrupt:
print("\nShutting down...")
server.server_close()
if __name__ == "__main__":
main()