Files
mcp-time/mcp_time_server.py
T
2026-06-12 03:49:03 +00:00

597 lines
19 KiB
Python

#!/usr/bin/env python3
"""
MCP Time Tools Server (HTTP transport)
Tools:
- get_current_time
- convert_timezone
- add_time
- subtract_time
- parse_natural_language
- compare_times
- format_time
API Key Auth:
- Set API_KEY environment variable to enable auth.
- Send as:
- Authorization: Bearer <API_KEY>
- or X-API-Key: <API_KEY>
- If API_KEY is not set, the server runs without auth (for dev).
"""
import json
import os
import sys
from datetime import datetime, timedelta, timezone
from zoneinfo import ZoneInfo, ZoneInfoNotFoundError
from http.server import HTTPServer, BaseHTTPRequestHandler
from typing import Any, Dict, List, Optional
API_KEY = os.environ.get("API_KEY", "").strip()
TOOL_SCHEMA: Dict[str, Any] = {
"tools": [
{
"name": "get_current_time",
"description": "Get the current date and time, optionally in a specific timezone.",
"inputSchema": {
"type": "object",
"properties": {
"timezone": {
"type": "string",
"description": "Timezone (e.g. 'UTC', 'America/New_York'). Defaults to UTC."
}
},
"required": []
}
},
{
"name": "convert_timezone",
"description": "Convert a datetime from one timezone to another.",
"inputSchema": {
"type": "object",
"properties": {
"datetime": {
"type": "string",
"description": "ISO 8601 datetime (e.g. '2025-01-01T12:00:00+00:00')."
},
"from_timezone": {
"type": "string",
"description": "Source timezone (e.g. 'UTC', 'America/New_York'). Optional."
},
"to_timezone": {
"type": "string",
"description": "Target timezone (e.g. 'Asia/Tokyo')."
}
},
"required": ["datetime", "to_timezone"]
}
},
{
"name": "add_time",
"description": "Add a duration to a datetime. Duration in ISO 8601 (e.g. 'P1DT2H30M') or simple tokens like '2 hours', '3 days'."
,
"inputSchema": {
"type": "object",
"properties": {
"datetime": {
"type": "string",
"description": "ISO 8601 datetime."
},
"duration": {
"type": "string",
"description": "Duration to add, e.g. 'P1DT2H30M', '2 hours', '3 days'."
}
},
"required": ["datetime", "duration"]
}
},
{
"name": "subtract_time",
"description": "Subtract a duration from a datetime.",
"inputSchema": {
"type": "object",
"properties": {
"datetime": {
"type": "string",
"description": "ISO 8601 datetime."
},
"duration": {
"type": "string",
"description": "Duration to subtract, e.g. 'P1DT2H30M', '2 hours', '3 days'."
}
},
"required": ["datetime", "duration"]
}
},
{
"name": "parse_natural_language",
"description": "Parse natural language time expressions like 'yesterday', 'next month', 'in 3 days' into ISO 8601."
,
"inputSchema": {
"type": "object",
"properties": {
"expression": {
"type": "string",
"description": "Natural language time expression, e.g. 'yesterday', 'next month', 'in 2 days'."
},
"reference_time": {
"type": "string",
"description": "Optional ISO 8601 reference time. Defaults to now."
},
"timezone": {
"type": "string",
"description": "Timezone for the result (e.g. 'UTC', 'America/New_York'). Defaults to UTC."
}
},
"required": ["expression"]
}
},
{
"name": "compare_times",
"description": "Compare two ISO 8601 datetimes and return their difference.",
"inputSchema": {
"type": "object",
"properties": {
"time1": {
"type": "string",
"description": "First ISO 8601 datetime."
},
"time2": {
"type": "string",
"description": "Second ISO 8601 datetime."
}
},
"required": ["time1", "time2"]
}
},
{
"name": "format_time",
"description": "Format a datetime using a predefined or custom format pattern.",
"inputSchema": {
"type": "object",
"properties": {
"datetime": {
"type": "string",
"description": "ISO 8601 datetime."
},
"format": {
"type": "string",
"description": "Format: 'iso', 'iso-compact', 'rfc822', 'readable', or a Python strftime pattern."
}
},
"required": ["datetime", "format"]
}
}
]
}
def parse_iso_datetime(value: str) -> datetime:
value = value.strip()
# If no timezone info, treat as UTC
if not any(ch in value for ch in ("+", "-", "Z")) or value.endswith("Z"):
value = value.replace("Z", "+00:00")
if "T" in value and ("+" not in value.split("T", 1)[1] and "-" not in value.split("T", 1)[1]):
value = value + "+00:00"
return datetime.fromisoformat(value)
def parse_duration_to_timedelta(dur: str) -> timedelta:
s = dur.strip()
# ISO 8601 duration (starts with P)
if s.upper().startswith("P"):
from dateutil import parser as dateutil_parser
return dateutil_parser.parse_duration(s)
# Natural tokens: "2 hours", "3 days", etc.
tokens = s.lower().split()
td = timedelta()
for token in tokens:
if not token:
continue
num_part = "".join(ch for ch in token if ch.isdigit() or ch == ".")
unit_part = token.replace(num_part, "").strip()
if not num_part:
continue
num = float(num_part)
unit = unit_part.rstrip("s") # normalize singular
if unit in ("second", "sec"):
td += timedelta(seconds=num)
elif unit in ("minute", "min"):
td += timedelta(minutes=num)
elif unit in ("hour", "hr"):
td += timedelta(hours=num)
elif unit in ("day",):
td += timedelta(days=num)
elif unit in ("week",):
td += timedelta(weeks=num)
elif unit in ("month",):
# approximate: 30 days
td += timedelta(days=30 * num)
elif unit in ("year",):
# approximate: 365 days
td += timedelta(days=365 * num)
else:
raise ValueError(f"Unknown duration unit: {unit_part}")
return td
def safe_get_tz(tz_str: str) -> ZoneInfo:
try:
return ZoneInfo(tz_str)
except (ZoneInfoNotFoundError, KeyError, Exception):
raise ValueError(f"Unknown timezone: {tz_str}")
def parse_natural_language_time(
expression: str,
ref: datetime,
tz: timezone
) -> datetime:
expr = expression.strip().lower()
now = ref.astimezone(tz)
# Direct words
if expr in ("now", "current time", "today"):
return now
if expr in ("yesterday",):
return now - timedelta(days=1)
if expr in ("tomorrow",):
return now + timedelta(days=1)
# "in X days/hours/minutes/weeks"
import re
m = re.match(
r"^(?:in\s+)?(\d+(?:\.\d+)?)\s*(day|days|hour|hours|minute|minutes|week|weeks|month|months|year|years)\s*$",
expr,
)
if m:
val = float(m.group(1))
unit = m.group(2).rstrip("s")
if unit in ("day",):
return now + timedelta(days=val)
if unit in ("hour",):
return now + timedelta(hours=val)
if unit in ("minute",):
return now + timedelta(minutes=val)
if unit in ("week",):
return now + timedelta(weeks=val)
if unit in ("month",):
# approximate
return now + timedelta(days=30 * val)
if unit in ("year",):
# approximate
return now + timedelta(days=365 * val)
# "last week"
if expr in ("last week",):
return now - timedelta(weeks=1)
# "next week"
if expr in ("next week",):
return now + timedelta(weeks=1)
# "next month"
if expr in ("next month",):
month = now.month + 1
year = now.year
if month > 12:
month = 1
year += 1
try:
return now.replace(year=year, month=month)
except ValueError:
# fallback: 30 days
return now + timedelta(days=30)
# "last month"
if expr in ("last month",):
month = now.month - 1
year = now.year
if month < 1:
month = 12
year -= 1
try:
return now.replace(year=year, month=month)
except ValueError:
return now - timedelta(days=30)
# "end of day"
if expr in ("end of day", "end of today"):
return now.replace(hour=23, minute=59, second=59, microsecond=999999)
# "start of day"
if expr in ("start of day", "start of today"):
return now.replace(hour=0, minute=0, second=0, microsecond=0)
# Fallback: try to parse as ISO
try:
dt = parse_iso_datetime(expression)
return dt.astimezone(tz)
except Exception:
raise ValueError(f"Could not parse natural language time expression: {expression}")
def format_datetime(dt: datetime, fmt: str) -> str:
fmt = fmt.strip().lower()
if fmt == "iso":
return dt.isoformat()
if fmt == "iso-compact":
return dt.isoformat(timespec="seconds").replace(":", "")
if fmt == "rfc822":
return dt.strftime("%a, %d %b %Y %H:%M:%S %z")
if fmt == "readable":
return dt.strftime("%Y-%m-%d %H:%M:%S %Z")
# treat as strftime pattern
return dt.strftime(fmt)
def get_api_key_from_request(headers: Any) -> Optional[str]:
"""
Extract API key from:
- Authorization: Bearer <key>
- X-API-Key: <key>
"""
auth = (headers.get("Authorization") or "").strip()
if auth.startswith("Bearer "):
return auth[len("Bearer "):].strip()
x_key = (headers.get("X-API-Key") or "").strip()
if x_key:
return x_key
return None
def require_auth(headers: Any) -> bool:
"""
If API_KEY is set, the request must provide a matching key.
Returns True if allowed, raises HTTPError if not.
"""
if not API_KEY:
return True
key = get_api_key_from_request(headers)
if not key or key != API_KEY:
raise PermissionError("Missing or invalid API key")
return True
# ---------- MCP HTTP handler ----------
class MCPTimeToolsHandler(BaseHTTPRequestHandler):
def log_message(self, format, *args):
# Quiet logs by default
pass
def _send_json(self, status: int, payload: Any):
body = json.dumps(payload, ensure_ascii=False).encode("utf-8")
self.send_response(status)
self.send_header("Content-Type", "application/json")
self.send_header("Content-Length", str(len(body)))
self.end_headers()
self.wfile.write(body)
def _auth_or_401(self):
try:
return require_auth(self.headers)
except PermissionError:
self._send_json(401, {"error": "Missing or invalid API key"})
return False
def do_GET(self):
# Root: simple info (no auth required)
if self.path == "/":
self._send_json(200, {
"service": "mcp-time-tools",
"transport": "http",
"auth": "API key required if API_KEY is set (Authorization: Bearer <key> or X-API-Key: <key>)",
"docs": "Use POST /mcp with MCP JSON-RPC requests."
})
return
# MCP discover tools via GET /mcp (auth-gated if API_KEY is set)
if self.path == "/mcp":
if not self._auth_or_401():
return
self._send_json(200, {
"jsonrpc": "2.0",
"tools": TOOL_SCHEMA["tools"]
})
return
self.send_error(404, "Not Found")
def do_POST(self):
if self.path != "/mcp":
self.send_error(404, "Not Found")
return
if not self._auth_or_401():
return
length = int(self.headers.get("Content-Length", 0))
if length == 0:
self._send_json(400, {"error": "Empty body"})
return
raw = self.rfile.read(length)
try:
req = json.loads(raw)
except json.JSONDecodeError:
self._send_json(400, {"error": "Invalid JSON"})
return
method = req.get("method")
params = req.get("params") or {}
req_id = req.get("id")
# MCP tool calling pattern: tools/call with name + arguments
if method == "tools/call":
tool_name = params.get("name")
tool_args = params.get("arguments") or {}
try:
result = self._call_tool(tool_name, tool_args)
self._send_json(200, {
"jsonrpc": "2.0",
"id": req_id,
"result": {
"content": [
{"type": "text", "text": json.dumps(result, ensure_ascii=False)}
]
}
})
except Exception as e:
self._send_json(200, {
"jsonrpc": "2.0",
"id": req_id,
"error": {
"code": -32000,
"message": str(e)
}
})
return
# If client asks for tools list via JSON-RPC
if method == "tools/list":
self._send_json(200, {
"jsonrpc": "2.0",
"id": req_id,
"result": TOOL_SCHEMA
})
return
# Fallback
self._send_json(400, {"error": "Unknown method: " + str(method)})
def _call_tool(self, name: str, args: Dict[str, Any]) -> Any:
if name == "get_current_time":
tz_str = args.get("timezone") or "UTC"
tz = safe_get_tz(tz_str)
now = datetime.now(tz)
return {
"timezone": tz_str,
"iso": now.isoformat(),
"readable": now.strftime("%Y-%m-%d %H:%M:%S %Z")
}
if name == "convert_timezone":
dt = parse_iso_datetime(args["datetime"])
to_tz = safe_get_tz(args["to_timezone"])
# If from_timezone provided, interpret dt in that tz
if "from_timezone" in args:
from_tz = safe_get_tz(args["from_timezone"])
if dt.tzinfo is None:
dt = dt.replace(tzinfo=from_tz)
else:
dt = dt.astimezone(from_tz)
else:
# If no tz, assume UTC
if dt.tzinfo is None:
dt = dt.replace(tzinfo=timezone.utc)
converted = dt.astimezone(to_tz)
return {
"original": dt.isoformat(),
"converted": converted.isoformat(),
"to_timezone": args["to_timezone"]
}
if name == "add_time":
dt = parse_iso_datetime(args["datetime"])
td = parse_duration_to_timedelta(args["duration"])
result = dt + td
return {
"original": dt.isoformat(),
"duration": args["duration"],
"result": result.isoformat()
}
if name == "subtract_time":
dt = parse_iso_datetime(args["datetime"])
td = parse_duration_to_timedelta(args["duration"])
result = dt - td
return {
"original": dt.isoformat(),
"duration": args["duration"],
"result": result.isoformat()
}
if name == "parse_natural_language":
ref_str = args.get("reference_time")
tz_str = args.get("timezone") or "UTC"
tz = safe_get_tz(tz_str)
ref = parse_iso_datetime(ref_str) if ref_str else datetime.now(tz)
parsed = parse_natural_language_time(args["expression"], ref, tz)
return {
"expression": args["expression"],
"reference_time": ref.isoformat(),
"parsed": parsed.isoformat(),
"timezone": tz_str
}
if name == "compare_times":
t1 = parse_iso_datetime(args["time1"])
t2 = parse_iso_datetime(args["time2"])
diff = t2 - t1
total_seconds = diff.total_seconds()
days, rem = divmod(abs(int(total_seconds)), 86400)
hours, rem = divmod(rem, 3600)
minutes, seconds = divmod(rem, 60)
if total_seconds > 0:
relation = "time2 is after time1"
elif total_seconds < 0:
relation = "time2 is before time1"
else:
relation = "times are equal"
return {
"time1": t1.isoformat(),
"time2": t2.isoformat(),
"relation": relation,
"difference": {
"total_seconds": total_seconds,
"days": days,
"hours": hours,
"minutes": minutes,
"seconds": seconds
}
}
if name == "format_time":
dt = parse_iso_datetime(args["datetime"])
fmt = args["format"]
formatted = format_datetime(dt, fmt)
return {
"datetime": dt.isoformat(),
"format": fmt,
"formatted": formatted
}
raise ValueError(f"Unknown tool: {name}")
def main():
# Allow optional port override via environment or argument
port = int(sys.argv[1]) if len(sys.argv) > 1 else int(os.environ.get("PORT", "8080"))
server = HTTPServer(("0.0.0.0", port), MCPTimeToolsHandler)
mode = "auth enabled" if API_KEY else "no auth (API_KEY not set)"
print(f"MCP Time Tools HTTP server listening on 0.0.0.0:{port} [{mode}]")
try:
server.serve_forever()
except KeyboardInterrupt:
print("\nShutting down...")
server.server_close()
if __name__ == "__main__":
main()