from datetime import datetime
from enum import Enum
import json
from typing import Sequence
from zoneinfo import ZoneInfo
from mcp.server import Server
from mcp.server.stdio import stdio_server
from mcp.types import Tool, TextContent, ImageContent, EmbeddedResource
from mcp.shared.exceptions import McpError
from pydantic import BaseModel
class TimeTools(str, Enum):
GET_CURRENT_TIME = "get_current_time"
class TimeResult(BaseModel):
timezone: str
datetime: str
is_dst: bool
def get_local_tz(local_tz_override: str | None = None) -> ZoneInfo:
if local_tz_override:
return ZoneInfo(local_tz_override)
# Get local timezone from datetime.now()
tzinfo = datetime.now().astimezone(tz=None).tzinfo
if tzinfo is not None:
return ZoneInfo(str(tzinfo))
raise McpError("Could not determine local timezone - tzinfo is None")
def get_zoneinfo(timezone_name: str) -> ZoneInfo:
try:
return ZoneInfo(timezone_name)
except Exception as e:
raise McpError(f"Invalid timezone: {str(e)}")
class TimeServer:
def get_current_time(self, timezone_name: str) -> TimeResult:
"""Get current time in specified timezone"""
timezone = get_zoneinfo(timezone_name)
current_time = datetime.now(timezone)
return TimeResult(
timezone=timezone_name,
datetime=current_time.isoformat(timespec="seconds"),
is_dst=bool(current_time.dst()),
)
async def serve(local_timezone: str | None = None) -> None:
server = Server("mcp-time")
time_server = TimeServer()
local_tz = str(get_local_tz(local_timezone))
@server.list_tools()
async def list_tools() -> list[Tool]:
"""List available time tools."""
return [
Tool(
name=TimeTools.GET_CURRENT_TIME.value,
description="Get current time in a specific timezones",
inputSchema={
"type": "object",
"properties": {
"timezone": {
"type": "string",
"description": f"IANA timezone name (e.g., 'America/New_York', 'Europe/London'). Use '{local_tz}' as local timezone if no timezone provided by the user.",
}
},
"required": ["timezone"],
},
),
]
@server.call_tool()
async def call_tool(
name: str, arguments: dict
) -> Sequence[TextContent | ImageContent | EmbeddedResource]:
"""Handle tool calls for time queries."""
try:
match name:
case TimeTools.GET_CURRENT_TIME.value:
timezone = arguments.get("timezone")
if not timezone:
raise ValueError("Missing required argument: timezone")
result = time_server.get_current_time(timezone)
case _:
raise ValueError(f"Unknown tool: {name}")
return [
TextContent(type="text", text=json.dumps(result.model_dump(), indent=2))
]
except Exception as e:
raise ValueError(f"Error processing mcp-server-time query: {str(e)}")
options = server.create_initialization_options()
async with stdio_server() as (read_stream, write_stream):
await server.run(read_stream, write_stream, options)