"""FastMCP server setup for ROS1 Noetic."""
from contextlib import asynccontextmanager
from typing import AsyncIterator, Any
from fastmcp import FastMCP
from .utils.ros_bridge import ROSBridge
@asynccontextmanager
async def ros_lifespan(server: FastMCP) -> AsyncIterator[dict[str, Any]]:
"""
Initialize ROS connection on server startup.
This lifespan context manager ensures the ROS node is properly
initialized before handling any MCP requests and cleanly shut down
when the server stops.
"""
bridge = ROSBridge()
try:
bridge.initialize()
yield {"ros_bridge": bridge}
finally:
bridge.shutdown()
# Create the FastMCP server instance
mcp = FastMCP(
name="ROS1 Noetic MCP Server",
instructions="""
This MCP server provides access to ROS1 Noetic robotic systems.
Available capabilities:
- **Topics**: List, publish, and subscribe to ROS topics
- **Services**: List and call ROS services
- **Parameters**: Get, set, and list ROS parameters
- **Nodes**: List and inspect ROS nodes
- **TF Transforms**: Query coordinate frame transforms
- **Messages**: Inspect message and service definitions
The server connects to a ROS master via ROS_MASTER_URI environment variable.
""",
lifespan=ros_lifespan,
)
# Import and register tools
from .tools import topics, services, params, nodes, messages, transforms
# Import and register resources
from .resources import ros_resources