"""Robot navigation portmanteau tool - Path planning and navigation.
Provides unified navigation control for both physical and virtual robots.
"""
from typing import Any, Literal
import structlog
from fastmcp import Client
from ..utils.error_handler import format_error_response, format_success_response, handle_tool_error
logger = structlog.get_logger(__name__)
class RobotNavigationTool:
"""Portmanteau tool for robot navigation and path planning."""
def __init__(self, mcp: Any, state_manager: Any, mounted_servers: dict[str, Any] | None = None):
"""Initialize robot navigation tool.
Args:
mcp: FastMCP server instance.
state_manager: Robot state manager instance.
mounted_servers: Dictionary of mounted MCP servers.
"""
self.mcp = mcp
self.state_manager = state_manager
self.mounted_servers = mounted_servers or {}
def register(self):
"""Register robot navigation tool with MCP server."""
@self.mcp.tool()
async def robot_navigation(
robot_id: str,
action: Literal[
"plan_path",
"follow_path",
"set_waypoint",
"clear_waypoints",
"get_path_status",
"avoid_obstacle",
"get_current_path",
],
start_position: dict[str, float] | None = None,
goal_position: dict[str, float] | None = None,
waypoint: dict[str, float] | None = None,
obstacle_position: dict[str, float] | None = None,
path_id: str | None = None,
) -> dict[str, Any]:
"""Robot navigation and path planning portmanteau.
PORTMANTEAU PATTERN: Consolidates navigation operations into a single tool.
SUPPORTED OPERATIONS:
- plan_path: Plan path from A to B (A* or RRT)
- follow_path: Execute planned path
- set_waypoint: Set navigation waypoint
- clear_waypoints: Clear waypoint list
- get_path_status: Check path execution status
- avoid_obstacle: Dynamic obstacle avoidance
- get_current_path: Get current path being followed
Args:
robot_id: Robot identifier (e.g., "scout_01", "vbot_scout_01").
action: Navigation operation to perform.
start_position: Start position (x, y, z) for plan_path.
goal_position: Goal position (x, y, z) for plan_path.
waypoint: Waypoint position (x, y, z) for set_waypoint.
obstacle_position: Obstacle position (x, y, z) for avoid_obstacle.
path_id: Path identifier for follow_path or get_path_status.
Returns:
Dictionary containing operation result with path data or status.
Examples:
Plan path:
result = await robot_navigation(
robot_id="scout_01",
action="plan_path",
start_position={"x": 0, "y": 0, "z": 0},
goal_position={"x": 5, "y": 0, "z": 0}
)
Follow path:
result = await robot_navigation(
robot_id="scout_01",
action="follow_path",
path_id="path_123"
)
Set waypoint:
result = await robot_navigation(
robot_id="scout_01",
action="set_waypoint",
waypoint={"x": 2, "y": 0, "z": 0}
)
"""
try:
robot = self.state_manager.get_robot(robot_id)
if not robot:
return format_error_response(
f"Robot {robot_id} not found",
error_type="not_found",
robot_id=robot_id,
action=action,
)
# Route to appropriate handler
if robot.is_virtual:
return await self._handle_virtual_navigation(
robot,
action,
start_position,
goal_position,
waypoint,
obstacle_position,
path_id,
)
else:
return await self._handle_physical_navigation(
robot,
action,
start_position,
goal_position,
waypoint,
obstacle_position,
path_id,
)
except Exception as e:
return handle_tool_error("robot_navigation", e, robot_id=robot_id, action=action)
async def _handle_virtual_navigation(
self,
robot: Any,
action: str,
start_position: dict[str, float] | None,
goal_position: dict[str, float] | None,
waypoint: dict[str, float] | None,
obstacle_position: dict[str, float] | None,
path_id: str | None,
) -> dict[str, Any]:
"""Handle virtual robot navigation."""
try:
if robot.platform == "unity" and "unity" in self.mounted_servers:
async with Client(self.mcp) as client:
if action == "plan_path":
# Call Unity RobotNavigator.PlanPath()
result = await client.call_tool(
"execute_unity_method",
{
"class_name": "RobotNavigator",
"method_name": "PlanPath",
"parameters": {
"robotId": robot.robot_id,
"startPosition": start_position or {},
"goalPosition": goal_position or {},
},
},
)
return format_success_response(
f"Path planned for {robot.robot_id}",
robot_id=robot.robot_id,
action=action,
data=result,
)
elif action == "follow_path":
# Call Unity RobotNavigator.FollowPath()
result = await client.call_tool(
"execute_unity_method",
{
"class_name": "RobotNavigator",
"method_name": "FollowPath",
"parameters": {
"robotId": robot.robot_id,
"pathId": path_id or "",
},
},
)
return format_success_response(
f"Path following started for {robot.robot_id}",
robot_id=robot.robot_id,
action=action,
data=result,
)
elif action == "set_waypoint":
# Call Unity RobotNavigator.SetWaypoint()
result = await client.call_tool(
"execute_unity_method",
{
"class_name": "RobotNavigator",
"method_name": "SetWaypoint",
"parameters": {
"robotId": robot.robot_id,
"waypoint": waypoint or {},
},
},
)
return format_success_response(
f"Waypoint set for {robot.robot_id}",
robot_id=robot.robot_id,
action=action,
data=result,
)
elif action == "clear_waypoints":
# Call Unity RobotNavigator.ClearWaypoints()
result = await client.call_tool(
"execute_unity_method",
{
"class_name": "RobotNavigator",
"method_name": "ClearWaypoints",
"parameters": {
"robotId": robot.robot_id,
},
},
)
return format_success_response(
f"Waypoints cleared for {robot.robot_id}",
robot_id=robot.robot_id,
action=action,
data=result,
)
elif action == "get_path_status":
# Call Unity RobotNavigator.GetPathStatus()
result = await client.call_tool(
"execute_unity_method",
{
"class_name": "RobotNavigator",
"method_name": "GetPathStatus",
"parameters": {
"robotId": robot.robot_id,
"pathId": path_id or "",
},
},
)
return format_success_response(
f"Path status retrieved for {robot.robot_id}",
robot_id=robot.robot_id,
action=action,
data=result,
)
elif action == "avoid_obstacle":
# Call Unity RobotNavigator.AvoidObstacle()
result = await client.call_tool(
"execute_unity_method",
{
"class_name": "RobotNavigator",
"method_name": "AvoidObstacle",
"parameters": {
"robotId": robot.robot_id,
"obstaclePosition": obstacle_position or {},
},
},
)
return format_success_response(
f"Obstacle avoidance triggered for {robot.robot_id}",
robot_id=robot.robot_id,
action=action,
data=result,
)
elif action == "get_current_path":
# Call Unity RobotNavigator.GetCurrentPath()
result = await client.call_tool(
"execute_unity_method",
{
"class_name": "RobotNavigator",
"method_name": "GetCurrentPath",
"parameters": {
"robotId": robot.robot_id,
},
},
)
return format_success_response(
f"Current path retrieved for {robot.robot_id}",
robot_id=robot.robot_id,
action=action,
data=result,
)
else:
# Mock navigation for testing
logger.info("Mock navigation", robot_id=robot.robot_id, action=action)
mock_path = {
"path_id": path_id or "mock_path_123",
"waypoints": [
start_position or {"x": 0, "y": 0, "z": 0},
goal_position or {"x": 5, "y": 0, "z": 0},
],
"status": "planned",
}
return format_success_response(
f"Mock navigation: {action} for {robot.robot_id}",
robot_id=robot.robot_id,
action=action,
data={"note": "Mock mode - Unity not available", "path": mock_path},
)
except Exception as e:
return handle_tool_error(
"_handle_virtual_navigation", e, robot_id=robot.robot_id, action=action
)
async def _handle_physical_navigation(
self,
robot: Any,
action: str,
start_position: dict[str, float] | None,
goal_position: dict[str, float] | None,
waypoint: dict[str, float] | None,
obstacle_position: dict[str, float] | None,
path_id: str | None,
) -> dict[str, Any]:
"""Handle physical robot navigation."""
# TODO: Implement ROS-based navigation (move_base, amcl)
logger.info("Physical robot navigation", robot_id=robot.robot_id, action=action)
return format_success_response(
f"Physical robot navigation: {action} for {robot.robot_id}",
robot_id=robot.robot_id,
action=action,
data={
"note": "Physical robot navigation not yet implemented (requires ROS navigation stack)"
},
)