"""Robotics system portmanteau tool - System management operations.
Consolidates help, status, and robot listing into a single portmanteau tool.
"""
from typing import Any, Dict, Literal, Optional
import structlog
from ..utils.error_handler import format_error_response, format_success_response, handle_tool_error
logger = structlog.get_logger(__name__)
class RoboticsSystemTool:
"""Portmanteau tool for system management operations."""
def __init__(self, mcp: Any, state_manager: Any, config: Any, config_loader: Any, mounted_servers: Dict[str, Any]):
"""Initialize robotics system tool.
Args:
mcp: FastMCP server instance.
state_manager: Robot state manager instance.
config: Server configuration.
config_loader: Configuration loader instance.
mounted_servers: Dictionary of mounted MCP servers.
"""
self.mcp = mcp
self.state_manager = state_manager
self.config = config
self.config_loader = config_loader
self.mounted_servers = mounted_servers
def register(self):
"""Register robotics system tool with MCP server."""
@self.mcp.tool()
async def robotics_system(
operation: Literal["help", "status", "list_robots"],
robot_type: Optional[str] = None,
is_virtual: Optional[bool] = None,
) -> Dict[str, Any]:
"""System management portmanteau for Robotics MCP.
PORTMANTEAU PATTERN RATIONALE:
Instead of creating 3 separate tools (help, status, list_robots), this tool
consolidates related system operations into a single interface. This design:
- Prevents tool explosion (3 tools → 1 tool) while maintaining full functionality
- Improves discoverability by grouping related operations together
- Reduces cognitive load when working with system management tasks
- Enables consistent system interface across all operations
- Follows FastMCP 2.13+ best practices for feature-rich MCP servers
SUPPORTED OPERATIONS:
- help: Get comprehensive help information about the server and its tools
- status: Get server status with connectivity tests and robot counts
- list_robots: List all registered robots with optional filtering
Args:
operation: The system operation to perform. MUST be one of:
- "help": Get help information (no additional parameters)
- "status": Get server status (no additional parameters)
- "list_robots": List robots (optional: robot_type, is_virtual filters)
robot_type: Optional filter for list_robots operation.
Valid values: "scout", "go2", "g1", or any custom robot type.
If None, returns all robot types.
is_virtual: Optional filter for list_robots operation.
- True: Only virtual robots (vbots)
- False: Only physical robots (bots)
- None: Both virtual and physical robots
Returns:
Dictionary containing operation-specific results:
- help: Server info, tool list, features, mounted servers
- status: Server health, robot counts, connectivity tests, HTTP status
- list_robots: Robot list with filtering applied
Examples:
Get help information:
result = await robotics_system(operation="help")
Get server status:
result = await robotics_system(operation="status")
List all robots:
result = await robotics_system(operation="list_robots")
List only Scout robots:
result = await robotics_system(operation="list_robots", robot_type="scout")
List only virtual robots:
result = await robotics_system(operation="list_robots", is_virtual=True)
"""
try:
if operation == "help":
return await self._handle_help()
elif operation == "status":
return await self._handle_status()
elif operation == "list_robots":
return await self._handle_list_robots(robot_type, is_virtual)
else:
return format_error_response(
f"Unknown operation: {operation}",
error_type="validation_error",
operation=operation,
)
except Exception as e:
return handle_tool_error("robotics_system", e, operation=operation)
async def _handle_help(self) -> Dict[str, Any]:
"""Handle help operation."""
try:
# Get all registered tools
tools_info = []
# FastMCP stores tools in _tools dict - get description from docstring
for tool_name, tool_func in getattr(self.mcp, '_tools', {}).items():
description = ""
if hasattr(tool_func, '__doc__') and tool_func.__doc__:
# Get first line of docstring as description
description = tool_func.__doc__.split('\n')[0].strip()
tools_info.append(
{
"name": tool_name,
"description": description,
}
)
return format_success_response(
"Help information retrieved",
data={
"server_name": "Robotics-MCP",
"version": "0.1.0",
"description": (
"Unified robotics control via MCP - Physical and virtual robots (bot + vbot). "
"Provides comprehensive control for Moorebot Scout, Unitree robots, and virtual "
"robots in Unity/VRChat. Integrates with osc-mcp, unity3d-mcp, vrchat-mcp, and "
"avatar-mcp for seamless virtual robotics testing."
),
"features": [
"Physical robot control (ROS 1.4 via rosbridge)",
"Virtual robot control (Unity3D/VRChat/Resonite)",
"YDLIDAR SuperLight (95g) LiDAR integration",
"World Labs Marble/Chisel environment generation",
"Multi-robot coordination",
"Dual transport (stdio + HTTP)",
],
"tools": tools_info,
"mounted_servers": list(self.mounted_servers.keys()),
"configuration": {
"http_enabled": self.config.enable_http,
"http_port": self.config.http_port if self.config.enable_http else None,
"config_path": str(self.config_loader.config_path),
},
},
)
except Exception as e:
logger.error("Failed to generate help", error=str(e), exc_info=True)
return format_error_response("Failed to generate help information", details={"error": str(e)})
async def _handle_status(self) -> Dict[str, Any]:
"""Handle status operation."""
try:
robots = self.state_manager.list_robots()
# Test mounted server connectivity
mounted_servers_status: Dict[str, Any] = {}
for server_name, server_instance in self.mounted_servers.items():
try:
if hasattr(server_instance, "list_tools"):
tools = server_instance.list_tools()
mounted_servers_status[server_name] = {
"available": True,
"tools_count": len(tools) if isinstance(tools, dict) else 0,
}
else:
mounted_servers_status[server_name] = {
"available": True,
"tools_count": "unknown",
}
except Exception as e:
logger.warning("Mounted server connectivity test failed", server=server_name, error=str(e))
mounted_servers_status[server_name] = {
"available": False,
"error": str(e),
}
# Test HTTP server if enabled
http_status = None
if self.config.enable_http:
try:
import socket
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(1)
result = sock.connect_ex((self.config.http_host, self.config.http_port))
sock.close()
http_status = {
"enabled": True,
"host": self.config.http_host,
"port": self.config.http_port,
"reachable": result == 0,
}
except Exception as e:
logger.warning("HTTP server status check failed", error=str(e))
http_status = {
"enabled": True,
"host": self.config.http_host,
"port": self.config.http_port,
"reachable": False,
"error": str(e),
}
return format_success_response(
"Server status retrieved successfully",
data={
"version": "0.1.0",
"status": "healthy",
"robots": [r.to_dict() for r in robots],
"robots_count": len(robots),
"mounted_servers": mounted_servers_status,
"http": http_status,
"config": {
"http_enabled": self.config.enable_http,
"log_level": self.config.log_level,
},
},
)
except Exception as e:
return handle_tool_error("robotics_system", e, operation="status")
async def _handle_list_robots(self, robot_type: Optional[str], is_virtual: Optional[bool]) -> Dict[str, Any]:
"""Handle list_robots operation."""
try:
robots = self.state_manager.list_robots()
# Apply filters
filtered_robots = robots
if robot_type:
filtered_robots = [r for r in filtered_robots if r.robot_type == robot_type]
if is_virtual is not None:
filtered_robots = [r for r in filtered_robots if r.is_virtual == is_virtual]
return format_success_response(
f"Found {len(filtered_robots)} robot(s)",
data={
"count": len(filtered_robots),
"robots": [r.to_dict() for r in filtered_robots],
"filters": {
"robot_type": robot_type,
"is_virtual": is_virtual,
},
},
)
except Exception as e:
return handle_tool_error("robotics_system", e, operation="list_robots")