Time Server
by clssck
- src
import asyncio
import json
import logging
from collections.abc import Sequence
from datetime import datetime, timedelta
from enum import Enum
from zoneinfo import ZoneInfo
from mcp.server import Server
from mcp.server.stdio import stdio_server
from mcp.shared.exceptions import McpError
from mcp.types import EmbeddedResource, ImageContent, TextContent, Tool
from mcp.types import ErrorData as McpErrorData
from pydantic import BaseModel
class ErrorData(McpErrorData):
"""Local ErrorData class that inherits from mcp.types.ErrorData"""
class TimeTools(str, Enum):
GET_CURRENT_TIME = "get_current_time"
CONVERT_TIME = "convert_time"
class TimeResult(BaseModel):
timezone: str
datetime: str
is_dst: bool
class TimeConversionResult(BaseModel):
source: TimeResult
target: TimeResult
time_difference: str
class TimeConversionInput(BaseModel):
source_tz: str
time: str
target_tz_list: list[str]
def get_local_tz(local_tz_override: str | None = None) -> ZoneInfo:
if local_tz_override:
return ZoneInfo(local_tz_override)
# Get local timezone from datetime.now()
tzinfo = datetime.now().astimezone(tz=None).tzinfo
if tzinfo is not None:
return ZoneInfo(str(tzinfo))
raise McpError(
error=ErrorData(
code=1001,
message="Could not determine local timezone - tzinfo is None",
),
)
def get_zoneinfo(timezone_name: str) -> ZoneInfo:
try:
return ZoneInfo(timezone_name)
except Exception as e:
error_msg = f"Invalid timezone: {e!s}"
raise McpError(
error=ErrorData(
code=1002,
message=error_msg,
),
) from e
class TimeServer:
@staticmethod
def get_current_time(timezone_name: str) -> TimeResult:
"""Get current time in specified timezone"""
timezone = get_zoneinfo(timezone_name)
current_time = datetime.now(timezone)
return TimeResult(
timezone=timezone_name,
datetime=current_time.isoformat(timespec="seconds"),
is_dst=bool(current_time.dst()),
)
@staticmethod
def convert_time(
source_tz: str,
time_str: str,
target_tz: str,
) -> TimeConversionResult:
"""Convert time between timezones"""
source_timezone = get_zoneinfo(source_tz)
target_timezone = get_zoneinfo(target_tz)
try:
parsed_time = datetime.strptime(time_str + " +0000", "%H:%M %z").time()
except ValueError as e:
error_msg = "Invalid time format. Expected HH:MM [24-hour format]"
raise ValueError(error_msg) from e
now = datetime.now(source_timezone)
source_time = datetime(
now.year,
now.month,
now.day,
parsed_time.hour,
parsed_time.minute,
tzinfo=source_timezone,
)
target_time = source_time.astimezone(target_timezone)
source_offset = source_time.utcoffset() or timedelta()
target_offset = target_time.utcoffset() or timedelta()
hours_difference = (target_offset - source_offset).total_seconds() / 3600
if hours_difference.is_integer():
time_diff_str = f"{hours_difference:+.1f}h"
else:
# For fractional hours like Nepal's UTC+5:45
time_diff_str = f"{hours_difference:+.2f}".rstrip("0").rstrip(".") + "h"
return TimeConversionResult(
source=TimeResult(
timezone=source_tz,
datetime=source_time.isoformat(timespec="seconds"),
is_dst=bool(source_time.dst()),
),
target=TimeResult(
timezone=target_tz,
datetime=target_time.isoformat(timespec="seconds"),
is_dst=bool(target_time.dst()),
),
time_difference=time_diff_str,
)
logger = logging.getLogger(__name__)
# Suppress INFO logs for MCP request types
logging.getLogger("mcp.server.lowlevel.server").setLevel(logging.WARNING)
def create_tools(local_tz: str) -> list[Tool]:
"""Create and return the list of available tools."""
local_tz_desc = (
"IANA timezone name (e.g., 'America/New_York', 'Europe/London'). "
f"Use '{local_tz}' as local timezone if no timezone provided."
)
return [
Tool(
name=TimeTools.GET_CURRENT_TIME.value,
description="Get current time in a specific timezones",
inputSchema={
"type": "object",
"properties": {
"timezone": {
"type": "string",
"description": local_tz_desc,
},
},
"required": ["timezone"],
},
),
Tool(
name=TimeTools.CONVERT_TIME.value,
description="Convert time between timezones",
inputSchema={
"type": "object",
"properties": {
"source_timezone": {
"type": "string",
"description": local_tz_desc,
},
"time": {
"type": "string",
"description": "Time to convert in 24-hour format (HH:MM)",
},
"target_timezone": {
"type": "string",
"description": local_tz_desc,
},
},
"required": ["source_timezone", "time", "target_timezone"],
},
),
]
async def handle_tool_call(
name: str,
arguments: dict[str, str],
time_server: TimeServer,
) -> Sequence[TextContent | ImageContent | EmbeddedResource]:
"""Handle tool calls for time queries."""
def raise_error(error_msg: str) -> None:
raise ValueError(error_msg)
def raise_missing_arg(arg_name: str) -> None:
raise_error(f"Missing required argument: {arg_name}")
def raise_unknown_tool(tool_name: str) -> None:
raise_error(f"Unknown tool: {tool_name}")
try:
match name:
case TimeTools.GET_CURRENT_TIME.value:
timezone = arguments.get("timezone")
if not timezone:
raise_missing_arg("timezone")
assert isinstance(timezone, str) # Type narrowing for mypy
result = time_server.get_current_time(timezone)
case TimeTools.CONVERT_TIME.value:
required_args = ["source_timezone", "time", "target_timezone"]
if not all(k in arguments for k in required_args):
raise_missing_arg(", ".join(required_args))
result = time_server.convert_time(
arguments["source_timezone"],
arguments["time"],
arguments["target_timezone"],
)
case _:
raise_unknown_tool(name)
return [
TextContent(
type="text",
text=json.dumps(result.model_dump(), indent=2),
),
]
except Exception as e:
error_msg = f"Error processing mcp-server-time query: {e!s}"
raise ValueError(error_msg) from e
async def run_server(server: Server) -> None:
"""Run the MCP server with stdio transport."""
options = server.create_initialization_options()
async with stdio_server() as (read_stream, write_stream):
logger.info("Starting MCP Time Server")
await server.run(read_stream, write_stream, options)
logger.info("MCP Time Server running")
async def serve(local_timezone: str | None = None) -> None:
try:
server = Server("mcp-time")
time_server = TimeServer()
local_tz = str(get_local_tz(local_timezone))
@server.list_tools()
async def list_tools() -> list[Tool]:
await asyncio.sleep(0)
return create_tools(local_tz)
@server.call_tool()
async def call_tool(
name: str,
arguments: dict[str, str],
) -> Sequence[TextContent | ImageContent | EmbeddedResource]:
await asyncio.sleep(0)
return await handle_tool_call(name, arguments, time_server)
await run_server(server)
except Exception:
logger.exception("Error in MCP Time Server")
raise
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO)
try:
asyncio.run(serve())
except KeyboardInterrupt:
logger.info("Shutting down MCP Time Server")