Skip to main content
Glama
robotics_system.py11.6 kB
"""Robotics system portmanteau tool - System management operations. Consolidates help, status, and robot listing into a single portmanteau tool. """ from typing import Any, Dict, Literal, Optional import structlog from ..utils.error_handler import format_error_response, format_success_response, handle_tool_error logger = structlog.get_logger(__name__) class RoboticsSystemTool: """Portmanteau tool for system management operations.""" def __init__(self, mcp: Any, state_manager: Any, config: Any, config_loader: Any, mounted_servers: Dict[str, Any]): """Initialize robotics system tool. Args: mcp: FastMCP server instance. state_manager: Robot state manager instance. config: Server configuration. config_loader: Configuration loader instance. mounted_servers: Dictionary of mounted MCP servers. """ self.mcp = mcp self.state_manager = state_manager self.config = config self.config_loader = config_loader self.mounted_servers = mounted_servers def register(self): """Register robotics system tool with MCP server.""" @self.mcp.tool() async def robotics_system( operation: Literal["help", "status", "list_robots"], robot_type: Optional[str] = None, is_virtual: Optional[bool] = None, ) -> Dict[str, Any]: """System management portmanteau for Robotics MCP. PORTMANTEAU PATTERN RATIONALE: Instead of creating 3 separate tools (help, status, list_robots), this tool consolidates related system operations into a single interface. This design: - Prevents tool explosion (3 tools → 1 tool) while maintaining full functionality - Improves discoverability by grouping related operations together - Reduces cognitive load when working with system management tasks - Enables consistent system interface across all operations - Follows FastMCP 2.13+ best practices for feature-rich MCP servers SUPPORTED OPERATIONS: - help: Get comprehensive help information about the server and its tools - status: Get server status with connectivity tests and robot counts - list_robots: List all registered robots with optional filtering Args: operation: The system operation to perform. MUST be one of: - "help": Get help information (no additional parameters) - "status": Get server status (no additional parameters) - "list_robots": List robots (optional: robot_type, is_virtual filters) robot_type: Optional filter for list_robots operation. Valid values: "scout", "go2", "g1", or any custom robot type. If None, returns all robot types. is_virtual: Optional filter for list_robots operation. - True: Only virtual robots (vbots) - False: Only physical robots (bots) - None: Both virtual and physical robots Returns: Dictionary containing operation-specific results: - help: Server info, tool list, features, mounted servers - status: Server health, robot counts, connectivity tests, HTTP status - list_robots: Robot list with filtering applied Examples: Get help information: result = await robotics_system(operation="help") Get server status: result = await robotics_system(operation="status") List all robots: result = await robotics_system(operation="list_robots") List only Scout robots: result = await robotics_system(operation="list_robots", robot_type="scout") List only virtual robots: result = await robotics_system(operation="list_robots", is_virtual=True) """ try: if operation == "help": return await self._handle_help() elif operation == "status": return await self._handle_status() elif operation == "list_robots": return await self._handle_list_robots(robot_type, is_virtual) else: return format_error_response( f"Unknown operation: {operation}", error_type="validation_error", operation=operation, ) except Exception as e: return handle_tool_error("robotics_system", e, operation=operation) async def _handle_help(self) -> Dict[str, Any]: """Handle help operation.""" try: # Get all registered tools tools_info = [] # FastMCP stores tools in _tools dict - get description from docstring for tool_name, tool_func in getattr(self.mcp, '_tools', {}).items(): description = "" if hasattr(tool_func, '__doc__') and tool_func.__doc__: # Get first line of docstring as description description = tool_func.__doc__.split('\n')[0].strip() tools_info.append( { "name": tool_name, "description": description, } ) return format_success_response( "Help information retrieved", data={ "server_name": "Robotics-MCP", "version": "0.1.0", "description": ( "Unified robotics control via MCP - Physical and virtual robots (bot + vbot). " "Provides comprehensive control for Moorebot Scout, Unitree robots, and virtual " "robots in Unity/VRChat. Integrates with osc-mcp, unity3d-mcp, vrchat-mcp, and " "avatar-mcp for seamless virtual robotics testing." ), "features": [ "Physical robot control (ROS 1.4 via rosbridge)", "Virtual robot control (Unity3D/VRChat/Resonite)", "YDLIDAR SuperLight (95g) LiDAR integration", "World Labs Marble/Chisel environment generation", "Multi-robot coordination", "Dual transport (stdio + HTTP)", ], "tools": tools_info, "mounted_servers": list(self.mounted_servers.keys()), "configuration": { "http_enabled": self.config.enable_http, "http_port": self.config.http_port if self.config.enable_http else None, "config_path": str(self.config_loader.config_path), }, }, ) except Exception as e: logger.error("Failed to generate help", error=str(e), exc_info=True) return format_error_response("Failed to generate help information", details={"error": str(e)}) async def _handle_status(self) -> Dict[str, Any]: """Handle status operation.""" try: robots = self.state_manager.list_robots() # Test mounted server connectivity mounted_servers_status: Dict[str, Any] = {} for server_name, server_instance in self.mounted_servers.items(): try: if hasattr(server_instance, "list_tools"): tools = server_instance.list_tools() mounted_servers_status[server_name] = { "available": True, "tools_count": len(tools) if isinstance(tools, dict) else 0, } else: mounted_servers_status[server_name] = { "available": True, "tools_count": "unknown", } except Exception as e: logger.warning("Mounted server connectivity test failed", server=server_name, error=str(e)) mounted_servers_status[server_name] = { "available": False, "error": str(e), } # Test HTTP server if enabled http_status = None if self.config.enable_http: try: import socket sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.settimeout(1) result = sock.connect_ex((self.config.http_host, self.config.http_port)) sock.close() http_status = { "enabled": True, "host": self.config.http_host, "port": self.config.http_port, "reachable": result == 0, } except Exception as e: logger.warning("HTTP server status check failed", error=str(e)) http_status = { "enabled": True, "host": self.config.http_host, "port": self.config.http_port, "reachable": False, "error": str(e), } return format_success_response( "Server status retrieved successfully", data={ "version": "0.1.0", "status": "healthy", "robots": [r.to_dict() for r in robots], "robots_count": len(robots), "mounted_servers": mounted_servers_status, "http": http_status, "config": { "http_enabled": self.config.enable_http, "log_level": self.config.log_level, }, }, ) except Exception as e: return handle_tool_error("robotics_system", e, operation="status") async def _handle_list_robots(self, robot_type: Optional[str], is_virtual: Optional[bool]) -> Dict[str, Any]: """Handle list_robots operation.""" try: robots = self.state_manager.list_robots() # Apply filters filtered_robots = robots if robot_type: filtered_robots = [r for r in filtered_robots if r.robot_type == robot_type] if is_virtual is not None: filtered_robots = [r for r in filtered_robots if r.is_virtual == is_virtual] return format_success_response( f"Found {len(filtered_robots)} robot(s)", data={ "count": len(filtered_robots), "robots": [r.to_dict() for r in filtered_robots], "filters": { "robot_type": robot_type, "is_virtual": is_virtual, }, }, ) except Exception as e: return handle_tool_error("robotics_system", e, operation="list_robots")

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/sandraschi/robotics-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server