from mcp.server import Server
from mcp.server.stdio import stdio_server
import mcp.types as types
from robodk_mcp.service import RoboDKService
import asyncio
from typing import Any
# Initialize Service
rdk_service = RoboDKService()
# Initialize Server
server = Server("robodk-mcp")
@server.list_tools()
async def handle_list_tools() -> list[types.Tool]:
return [
types.Tool(
name="robodk.start",
description="Launch RoboDK or connect to existing instance.",
inputSchema={
"type": "object",
"properties": {},
},
),
types.Tool(
name="robodk.open_station",
description="Create a new empty station or open an existing one.",
inputSchema={
"type": "object",
"properties": {
"empty": {"type": "boolean", "description": "Create a new empty station."},
"file_path": {"type": "string", "description": "Path to a RoboDK station file (.rdk)."},
},
},
),
types.Tool(
name="station.add_robot",
description="Add a robot to the current station.",
inputSchema={
"type": "object",
"properties": {
"robot_model": {"type": "string", "description": "Name of the robot model to add (e.g. 'UR10')."},
},
"required": ["robot_model"],
},
),
]
@server.call_tool()
async def handle_call_tool(
name: str, arguments: dict | None
) -> list[types.TextContent | types.ImageContent | types.EmbeddedResource]:
if name == "robodk.start":
session_id = rdk_service.connect()
return [types.TextContent(type="text", text=f"RoboDK started. Session: {session_id}")]
elif name == "robodk.open_station":
arguments = arguments or {}
empty = arguments.get("empty", False)
file_path = arguments.get("file_path")
rdk_service.open_station(file_path=file_path, empty=empty)
msg = "Opened empty station." if empty else f"Opened station from {file_path}"
return [types.TextContent(type="text", text=msg)]
elif name == "station.add_robot":
if not arguments or "robot_model" not in arguments:
raise ValueError("Missing 'robot_model' argument")
robot_model = arguments["robot_model"]
robot_info = rdk_service.add_robot(robot_model)
return [types.TextContent(type="text", text=f"Added robot: {robot_info['name']} (ID: {robot_info['id']})")]
raise ValueError(f"Unknown tool: {name}")
@server.list_resources()
async def handle_list_resources() -> list[types.Resource]:
return [
types.Resource(
uri=types.AnyUrl("station://current/tree"),
name="Current Station Tree",
description="The current hierarchy of the RoboDK station.",
mimeType="application/json",
)
]
@server.read_resource()
async def handle_read_resource(uri: types.AnyUrl) -> str | bytes:
if str(uri) == "station://current/tree":
tree = rdk_service.get_station_tree()
import json
return json.dumps(tree, indent=2)
raise ValueError(f"Unknown resource: {uri}")
async def main():
async with stdio_server() as (read_stream, write_stream):
await server.run(
read_stream,
write_stream,
server.create_initialization_options()
)
if __name__ == "__main__":
asyncio.run(main())