diff --git a/mcp_time_server.py b/mcp_time_server.py index 8423038..98ac183 100644 --- a/mcp_time_server.py +++ b/mcp_time_server.py @@ -10,16 +10,24 @@ Tools: - parse_natural_language - compare_times - format_time + +API Key Auth: +- Set API_KEY environment variable to enable auth. +- Send as: + - Authorization: Bearer + - or X-API-Key: +- If API_KEY is not set, the server runs without auth (for dev). """ import json +import os import sys from datetime import datetime, timedelta, timezone from zoneinfo import ZoneInfo, ZoneInfoNotFoundError from http.server import HTTPServer, BaseHTTPRequestHandler from typing import Any, Dict, List, Optional -# ---------- MCP helpers ---------- +API_KEY = os.environ.get("API_KEY", "").strip() TOOL_SCHEMA: Dict[str, Any] = { "tools": [ @@ -160,9 +168,6 @@ TOOL_SCHEMA: Dict[str, Any] = { def parse_iso_datetime(value: str) -> datetime: - """ - Parse ISO 8601-ish datetime strings. - """ value = value.strip() # If no timezone info, treat as UTC if not any(ch in value for ch in ("+", "-", "Z")) or value.endswith("Z"): @@ -173,11 +178,6 @@ def parse_iso_datetime(value: str) -> datetime: def parse_duration_to_timedelta(dur: str) -> timedelta: - """ - Parse duration in: - - ISO 8601: P1DT2H30M, PT1H, P3D, etc. - - Simple tokens: "2 hours", "3 days", "1 week", "5 minutes" - """ s = dur.strip() # ISO 8601 duration (starts with P) @@ -230,9 +230,6 @@ def parse_natural_language_time( ref: datetime, tz: timezone ) -> datetime: - """ - Interpret common natural language time expressions. - """ expr = expression.strip().lower() now = ref.astimezone(tz) @@ -335,6 +332,38 @@ def format_datetime(dt: datetime, fmt: str) -> str: return dt.strftime(fmt) +def get_api_key_from_request(headers: Any) -> Optional[str]: + """ + Extract API key from: + - Authorization: Bearer + - X-API-Key: + """ + auth = (headers.get("Authorization") or "").strip() + if auth.startswith("Bearer "): + return auth[len("Bearer "):].strip() + + x_key = (headers.get("X-API-Key") or "").strip() + if x_key: + return x_key + + return None + + +def require_auth(headers: Any) -> bool: + """ + If API_KEY is set, the request must provide a matching key. + Returns True if allowed, raises HTTPError if not. + """ + if not API_KEY: + return True + + key = get_api_key_from_request(headers) + if not key or key != API_KEY: + raise PermissionError("Missing or invalid API key") + + return True + + # ---------- MCP HTTP handler ---------- class MCPTimeToolsHandler(BaseHTTPRequestHandler): @@ -350,18 +379,28 @@ class MCPTimeToolsHandler(BaseHTTPRequestHandler): self.end_headers() self.wfile.write(body) + def _auth_or_401(self): + try: + return require_auth(self.headers) + except PermissionError: + self._send_json(401, {"error": "Missing or invalid API key"}) + return False + def do_GET(self): - # Root: simple info + # Root: simple info (no auth required) if self.path == "/": self._send_json(200, { "service": "mcp-time-tools", "transport": "http", + "auth": "API key required if API_KEY is set (Authorization: Bearer or X-API-Key: )", "docs": "Use POST /mcp with MCP JSON-RPC requests." }) return - # MCP discover tools via GET /mcp + # MCP discover tools via GET /mcp (auth-gated if API_KEY is set) if self.path == "/mcp": + if not self._auth_or_401(): + return self._send_json(200, { "jsonrpc": "2.0", "tools": TOOL_SCHEMA["tools"] @@ -375,6 +414,9 @@ class MCPTimeToolsHandler(BaseHTTPRequestHandler): self.send_error(404, "Not Found") return + if not self._auth_or_401(): + return + length = int(self.headers.get("Content-Length", 0)) if length == 0: self._send_json(400, {"error": "Empty body"}) @@ -539,9 +581,10 @@ class MCPTimeToolsHandler(BaseHTTPRequestHandler): def main(): # Allow optional port override via environment or argument - port = int(sys.argv[1]) if len(sys.argv) > 1 else int(__import__("os").environ.get("PORT", "8080")) + port = int(sys.argv[1]) if len(sys.argv) > 1 else int(os.environ.get("PORT", "8080")) server = HTTPServer(("0.0.0.0", port), MCPTimeToolsHandler) - print(f"MCP Time Tools HTTP server listening on 0.0.0.0:{port}") + mode = "auth enabled" if API_KEY else "no auth (API_KEY not set)" + print(f"MCP Time Tools HTTP server listening on 0.0.0.0:{port} [{mode}]") try: server.serve_forever() except KeyboardInterrupt: