"""
Robotics WebApp Backend - FastAPI Server
Complete robotics control platform with ROS integration, real-time WebSocket communication,
and comprehensive API endpoints for virtual and physical robot control.
"""
import asyncio
import logging
import os
import re
import time
from datetime import datetime
from typing import Any
import aiohttp
import socketio
import uvicorn
from anthropic import AsyncAnthropic
from app_launcher import app_launcher
from camera_integration import camera_manager, cleanup_cameras, initialize_cameras
from fastapi import FastAPI, HTTPException, WebSocket, WebSocketDisconnect
from fastapi.middleware.cors import CORSMiddleware
from llm_service import llm_service
from mcp_client import mcp_client
from openai import AsyncOpenAI
from pydantic import BaseModel, Field
from workflow_service import workflow_service
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Initialize FastAPI app
app = FastAPI(
title="Robotics MCP WebApp API",
description="Real-time robotics control platform with ROS integration",
version="1.0.0",
docs_url="/docs",
redoc_url="/redoc",
)
# Add CORS middleware - Allow all tailnet members
app.add_middleware(
CORSMiddleware,
allow_origins=["*"], # Allow all origins for tailnet access
allow_credentials=False, # Must be False when using "*"
allow_methods=["*"],
allow_headers=["*"],
)
# Initialize Socket.IO server for real-time communication
sio = socketio.AsyncServer(
async_mode="asgi", cors_allowed_origins=["http://localhost:3000", "http://127.0.0.1:3000"]
)
# Create ASGI app with Socket.IO
socket_app = socketio.ASGIApp(sio, app)
# LLM Provider Integrations
class LLMProviderManager:
def __init__(self):
self.providers = {
"openai": {
"client": None,
"api_key": os.getenv("OPENAI_API_KEY"),
"base_url": "https://api.openai.com/v1",
"models": ["gpt-4o", "gpt-4-turbo", "gpt-3.5-turbo"],
},
"anthropic": {
"client": None,
"api_key": os.getenv("ANTHROPIC_API_KEY"),
"base_url": "https://api.anthropic.com",
"models": [
"claude-3-opus-20240229",
"claude-3-sonnet-20240229",
"claude-3-haiku-20240307",
],
},
"ollama": {
"client": None,
"api_key": None,
"base_url": os.getenv("OLLAMA_BASE_URL", "http://localhost:11434"),
"models": ["mistral:7b", "llama2:13b", "codellama:7b"],
},
"lm-studio": {
"client": None,
"api_key": None,
"base_url": os.getenv("LM_STUDIO_BASE_URL", "http://localhost:1234"),
"models": ["local-model"],
},
}
self.spatial_models = ["gpt-4o", "claude-3-opus-20240229", "mistral:7b"]
async def initialize_clients(self):
"""Initialize LLM clients"""
if self.providers["openai"]["api_key"]:
self.providers["openai"]["client"] = AsyncOpenAI(
api_key=self.providers["openai"]["api_key"],
base_url=self.providers["openai"]["base_url"],
)
if self.providers["anthropic"]["api_key"]:
self.providers["anthropic"]["client"] = AsyncAnthropic(
api_key=self.providers["anthropic"]["api_key"]
)
async def call_llm(
self, provider: str, model: str, messages: list[dict], **kwargs
) -> dict[str, Any]:
"""Call LLM with unified interface"""
if provider not in self.providers:
raise ValueError(f"Unknown provider: {provider}")
provider_config = self.providers[provider]
if not provider_config["client"] and provider_config["api_key"]:
await self.initialize_clients()
try:
if provider == "openai":
response = await provider_config["client"].chat.completions.create(
model=model, messages=messages, **kwargs
)
return {
"content": response.choices[0].message.content,
"usage": response.usage.model_dump() if response.usage else {},
"finish_reason": response.choices[0].finish_reason,
}
elif provider == "anthropic":
# Convert messages to Anthropic format
system_message = ""
anthropic_messages = []
for msg in messages:
if msg["role"] == "system":
system_message = msg["content"]
else:
anthropic_messages.append(msg)
response = await provider_config["client"].messages.create(
model=model,
system=system_message,
messages=anthropic_messages,
max_tokens=kwargs.get("max_tokens", 4096),
)
return {
"content": response.content[0].text,
"usage": {
"input_tokens": response.usage.input_tokens,
"output_tokens": response.usage.output_tokens,
},
"finish_reason": response.stop_reason,
}
elif provider in ["ollama", "lm-studio"]:
# HTTP-based providers
payload = {"model": model, "messages": messages, "stream": False, **kwargs}
async with aiohttp.ClientSession() as session:
async with session.post(
f"{provider_config['base_url']}/api/chat",
json=payload,
headers={"Content-Type": "application/json"},
) as resp:
if resp.status == 200:
result = await resp.json()
return {
"content": result.get("message", {}).get("content", ""),
"usage": {},
"finish_reason": "stop",
}
else:
raise HTTPException(
status_code=resp.status,
detail=f"LLM API error: {await resp.text()}",
)
except Exception as e:
logger.error(f"LLM call failed for {provider}/{model}: {e}")
raise HTTPException(status_code=500, detail=f"LLM service error: {str(e)}") from e
def extract_spatial_commands(self, text: str) -> dict[str, Any]:
"""Extract spatial reasoning and commands from LLM response"""
# Enhanced spatial command extraction
commands = {"navigation": [], "manipulation": [], "observation": [], "safety": []}
# Navigation patterns
nav_patterns = [
r"(?:go|move|navigate) to (?:the )?([a-zA-Z\s]+)",
r"head towards ([a-zA-Z\s]+)",
r"approach ([a-zA-Z\s]+)",
r"coordinates?\s*\(?(\d+(?:\.\d+)?)\s*[,\s]\s*(\d+(?:\.\d+)?)(?:\s*[,\s]\s*(\d+(?:\.\d+)?))?\)?",
]
for pattern in nav_patterns:
matches = re.findall(pattern, text, re.IGNORECASE)
for match in matches:
if isinstance(match, tuple):
coords = [float(x) for x in match if x]
commands["navigation"].append({"action": "goto", "coordinates": coords})
else:
commands["navigation"].append({"action": "goto", "location": match.strip()})
# Manipulation patterns
manip_patterns = [
r"(?:pick up|grab|get|take) (?:the )?([a-zA-Z\s]+)",
r"place (?:the )?([a-zA-Z\s]+) (?:on|in|at) ([a-zA-Z\s]+)",
r"move (?:the )?([a-zA-Z\s]+) to ([a-zA-Z\s]+)",
]
for pattern in manip_patterns:
matches = re.findall(pattern, text, re.IGNORECASE)
for match in matches:
if isinstance(match, tuple):
commands["manipulation"].append(
{
"action": "manipulate",
"object": match[0].strip(),
"destination": match[1].strip() if len(match) > 1 else None,
}
)
else:
commands["manipulation"].append(
{"action": "manipulate", "object": match.strip()}
)
# Safety checks
safety_keywords = ["avoid", "careful", "danger", "obstacle", "collision", "emergency"]
if any(keyword in text.lower() for keyword in safety_keywords):
commands["safety"].append({"action": "enable_safety", "level": "high"})
return commands
llm_provider_manager = LLMProviderManager()
# Global state management
class RobotState:
def __init__(self):
# Initialize with mock data - will be populated from robotics MCP server
self.robots: dict[str, dict[str, Any]] = {
"vbot_scout_mini": {
"id": "vbot_scout_mini",
"name": "VBot Scout Mini",
"type": "virtual",
"status": "online",
"position": {"x": 0.0, "y": 0.0, "z": 0.0},
"rotation": {"roll": 0.0, "pitch": 0.0, "yaw": 0.0},
"velocity": {"linear": 0.0, "angular": 0.0},
"battery": 87,
"sensors": {
"imu": {"ax": 0.0, "ay": -9.81, "az": 0.0, "gx": 0.0, "gy": 0.0, "gz": 0.0},
"odometry": {"distance": 0.0, "speed": 0.0, "heading": 0.0},
"camera": {"streaming": True, "fps": 30, "resolution": "1920x1080"},
},
"last_update": datetime.now().isoformat(),
"uptime": 0,
}
}
self.physical_robots_loaded = False
self.active_connections: list[WebSocket] = []
self.command_history: list[dict[str, Any]] = []
def update_robot_state(self, robot_id: str, updates: dict[str, Any]):
"""Update robot state and broadcast to all connected clients"""
if robot_id in self.robots:
self.robots[robot_id].update(updates)
self.robots[robot_id]["last_update"] = datetime.now().isoformat()
# Broadcast update to all connected clients
asyncio.create_task(self.broadcast_robot_update(robot_id))
async def broadcast_robot_update(self, robot_id: str):
"""Broadcast robot state update to all WebSocket connections"""
update_data = {
"type": "robot_update",
"robot_id": robot_id,
"data": self.robots[robot_id],
"timestamp": datetime.now().isoformat(),
}
for connection in self.active_connections:
try:
await connection.send_json(update_data)
except Exception as e:
logger.error(f"Failed to send update to connection: {e}")
def add_command_to_history(self, robot_id: str, command: str, params: dict[str, Any]):
"""Add command to history for logging and debugging"""
self.command_history.append(
{
"timestamp": datetime.now().isoformat(),
"robot_id": robot_id,
"command": command,
"parameters": params,
}
)
# Keep only last 1000 commands
if len(self.command_history) > 1000:
self.command_history = self.command_history[-1000:]
async def load_physical_robots(self):
"""Load physical robots from the robotics MCP server"""
try:
# Try to get robots from robotics MCP server
async with aiohttp.ClientSession() as session:
async with session.get("http://localhost:8888/api/robots") as response:
if response.status == 200:
data = await response.json()
robots_data = data.get("robots", [])
# Add physical robots to our state
for robot_data in robots_data:
robot_id = robot_data["id"]
# Only add if we don't already have this robot
if robot_id not in self.robots:
self.robots[robot_id] = robot_data
logger.info(f"Loaded physical robot: {robot_id}")
self.physical_robots_loaded = True
logger.info(
f"Loaded {len(robots_data)} physical robots from robotics MCP server"
)
return True
else:
logger.warning("Failed to load robots from robotics MCP server")
return False
except Exception as e:
logger.warning(f"Could not connect to robotics MCP server: {e}")
return False
robot_state = RobotState()
# Pydantic models for API
class RobotCommand(BaseModel):
robot_id: str = Field(..., description="Robot identifier")
command: str = Field(..., description="Command to execute")
parameters: dict[str, Any] | None = Field(default={}, description="Command parameters")
class RobotStatus(BaseModel):
robot_id: str
status: str
position: dict[str, float]
rotation: dict[str, float]
velocity: dict[str, float]
battery: int
sensors: dict[str, Any]
last_update: str
# LLM-related models
class LLMModel(BaseModel):
id: str
name: str
provider: str
type: str # local or cloud
status: str # available, loaded, downloading, error
size: str
parameters: str
capabilities: list[str]
spatial_awareness: bool
description: str
class LLMProvider(BaseModel):
name: str
type: str # local or cloud
endpoint: str
api_key: str | None = None
models: list[str]
status: str = "disconnected"
class LLMCommand(BaseModel):
robot_id: str
natural_language_command: str
llm_model: str
spatial_context: dict[str, Any] | None = None
safety_constraints: dict[str, Any] | None = None
class LLMResponse(BaseModel):
command_id: str
robot_id: str
interpreted_command: str
actions: list[dict[str, Any]]
safety_checks: list[str]
execution_plan: dict[str, Any]
confidence_score: float
spatial_reasoning: str | None = None
llm_response: str | None = None
usage_stats: dict[str, Any] | None = None
# LLM Management State
class LLMState:
def __init__(self):
self.providers: dict[str, dict[str, Any]] = {
"ollama": {
"name": "Ollama",
"type": "local",
"endpoint": "http://localhost:11434",
"status": "disconnected",
"models": ["mistral:7b", "llama3.1:8b", "codellama:13b"],
},
"lm-studio": {
"name": "LM Studio",
"type": "local",
"endpoint": "http://localhost:1234",
"status": "disconnected",
"models": ["GPT-2", "GPT-J", "Llama 2"],
},
"openai": {
"name": "OpenAI",
"type": "cloud",
"endpoint": "https://api.openai.com",
"status": "disconnected",
"models": ["gpt-4o", "gpt-4o-spatial", "gpt-4-turbo"],
},
"anthropic": {
"name": "Anthropic",
"type": "cloud",
"endpoint": "https://api.anthropic.com",
"status": "disconnected",
"models": ["claude-3.5-sonnet", "claude-3-opus"],
},
}
self.models: dict[str, dict[str, Any]] = {
"mistral-7b-spatial": {
"id": "mistral-7b-spatial",
"name": "Mistral 7B Spatial",
"provider": "ollama",
"model_name": "mistral:7b",
"type": "local",
"status": "available",
"size": "4.1GB",
"parameters": "7B",
"capabilities": ["spatial-reasoning", "navigation", "object-detection"],
"spatial_awareness": True,
"description": "Spatially-aware Mistral model for robotics navigation",
},
"worldlabs-spatial-llm": {
"id": "worldlabs-spatial-llm",
"name": "WorldLabs Spatial LLM",
"provider": "ollama",
"model_name": "worldlabs-spatial-llm",
"type": "local",
"status": "available",
"size": "8.2GB",
"parameters": "12B",
"capabilities": ["3d-understanding", "spatial-navigation", "environment-modeling"],
"spatial_awareness": True,
"description": "WorldLabs specialized spatial LLM for 3D environment understanding",
},
"gpt-4o-spatial": {
"id": "gpt-4o-spatial",
"name": "GPT-4o Spatial",
"provider": "openai",
"model_name": "gpt-4o",
"type": "cloud",
"status": "available",
"size": "Cloud-hosted",
"parameters": "Unknown",
"capabilities": ["multimodal", "spatial-reasoning", "robot-control"],
"spatial_awareness": True,
"description": "OpenAI GPT-4o with spatial awareness for advanced robotics",
},
}
self.active_model: str | None = None
self.command_history: list[dict[str, Any]] = []
def get_available_models(self) -> list[dict[str, Any]]:
"""Get all available LLM models"""
return list(self.models.values())
def get_providers(self) -> list[dict[str, Any]]:
"""Get all LLM providers"""
return list(self.providers.values())
def set_active_model(self, model_id: str):
"""Set the active LLM model"""
if model_id in self.models:
self.active_model = model_id
async def execute_llm_command(self, command: LLMCommand) -> LLMResponse:
"""Execute a natural language command through LLM processing with real API integration"""
model_config = self.models.get(command.llm_model)
if not model_config:
raise ValueError(f"Unknown LLM model: {command.llm_model}")
provider = model_config.get("provider", "openai")
model_name = model_config.get("model_name", command.llm_model)
# Enhanced prompt with spatial reasoning capabilities
system_prompt = f"""You are an advanced robotics AI controller with spatial awareness and reasoning capabilities.
You control robots in 3D environments and must provide safe, precise commands.
Your capabilities:
- Navigate in 3D space with obstacle avoidance
- Manipulate objects with precise positioning
- Process visual and sensor data
- Execute complex multi-step tasks
- Maintain safety protocols
Environment context: {command.spatial_context or "Standard indoor environment"}
Respond with:
1. Clear interpretation of the human command
2. Step-by-step execution plan with spatial reasoning
3. Specific robot actions with coordinates when applicable
4. Safety considerations and risk assessment
Format your response as a structured plan."""
user_prompt = f"""Human command: "{command.natural_language_command}"
Please interpret this command and provide a detailed execution plan with spatial reasoning."""
messages = [
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_prompt},
]
# Call real LLM API
try:
llm_response = await llm_provider_manager.call_llm(
provider=provider,
model=model_name,
messages=messages,
max_tokens=2048,
temperature=0.3, # Lower temperature for more consistent robot commands
)
llm_content = llm_response["content"]
# Extract spatial commands from LLM response
spatial_commands = llm_provider_manager.extract_spatial_commands(llm_content)
# Convert spatial commands to robot actions
actions = []
spatial_reasoning = ""
# Process navigation commands
for nav_cmd in spatial_commands["navigation"]:
if "coordinates" in nav_cmd:
coords = nav_cmd["coordinates"]
actions.append(
{
"type": "path_planning",
"destination": {
"x": coords[0],
"y": coords[1],
"z": coords[2] if len(coords) > 2 else 0.0,
},
"avoid_obstacles": True,
}
)
spatial_reasoning += f"Planning path to coordinates ({coords[0]}, {coords[1]}, {coords[2] if len(coords) > 2 else 0.0}). "
else:
actions.append(
{
"type": "path_planning",
"destination": nav_cmd["location"],
"avoid_obstacles": True,
}
)
spatial_reasoning += f"Planning path to {nav_cmd['location']}. "
# Process manipulation commands
for manip_cmd in spatial_commands["manipulation"]:
if manip_cmd["action"] == "manipulate":
actions.extend(
[
{"type": "vision_processing", "target": manip_cmd["object"]},
{
"type": "arm_control",
"action": "grasp",
"target": manip_cmd["object"],
},
]
)
if manip_cmd.get("destination"):
actions.append(
{"type": "manipulation", "destination": manip_cmd["destination"]}
)
spatial_reasoning += f"Planning manipulation of {manip_cmd['object']}"
# Add velocity control for navigation
if spatial_commands["navigation"]:
actions.append({"type": "velocity_control", "linear": 0.3, "angular": 0.0})
# Safety enhancements
safety_checks = ["collision_detection", "velocity_limits", "emergency_stop"]
if spatial_commands["safety"]:
safety_checks.extend(
["enhanced_obstacle_detection", "force_limiting", "human_presence_detection"]
)
# Calculate confidence based on LLM response quality
confidence_score = min(
0.95, 0.7 + (len(actions) * 0.1)
) # Base confidence + action complexity
# Enhanced execution plan
execution_plan = {
"sequence": actions,
"estimated_duration": len(actions) * 2.5, # More realistic timing
"risk_assessment": "low" if len(safety_checks) >= 3 else "medium",
"llm_model_used": command.llm_model,
"spatial_reasoning_applied": bool(
spatial_commands["navigation"] or spatial_commands["manipulation"]
),
"api_call_duration": llm_response.get("usage", {}).get("total_tokens", 0)
* 0.001, # Rough estimate
}
response = LLMResponse(
command_id=f"cmd_{int(time.time())}_{command.robot_id}",
robot_id=command.robot_id,
interpreted_command=command.natural_language_command,
actions=actions,
safety_checks=safety_checks,
execution_plan=execution_plan,
confidence_score=round(confidence_score, 2),
spatial_reasoning=spatial_reasoning
or "Spatial reasoning applied to command interpretation",
llm_response=llm_content,
usage_stats=llm_response.get("usage", {}),
)
except Exception as e:
logger.error(f"LLM command execution failed: {e}")
# Fallback to basic command interpretation
response = LLMResponse(
command_id=f"cmd_{int(time.time())}_{command.robot_id}",
robot_id=command.robot_id,
interpreted_command=command.natural_language_command,
actions=[{"type": "command_interpretation", "action": "error_fallback"}],
safety_checks=["emergency_stop"],
execution_plan={
"sequence": [],
"estimated_duration": 0.0,
"risk_assessment": "high",
},
confidence_score=0.1,
spatial_reasoning="LLM service unavailable, using fallback mode",
llm_response=f"Error: {str(e)}",
usage_stats={},
)
# Add to command history
self.command_history.append(
{
"timestamp": datetime.now().isoformat(),
"command": command.dict(),
"response": response.dict(),
"llm_model": command.llm_model,
"provider": provider,
}
)
return response
llm_state = LLMState()
# Background tasks
async def simulate_robot_physics():
"""Simulate robot physics and sensor updates"""
while True:
try:
for robot_id, robot in robot_state.robots.items():
if robot["status"] == "online":
# Simulate physics updates
dt = 0.02 # 50Hz simulation
# Update position based on velocity
robot["position"]["x"] += (
robot["velocity"]["linear"] * dt * 0.5
) # Scale down for simulation
robot["rotation"]["yaw"] += robot["velocity"]["angular"] * dt * 10
# Simulate IMU data with some noise
robot["sensors"]["imu"]["ax"] = 0.0 + (0.1 * (0.5 - time.time() % 1))
robot["sensors"]["imu"]["ay"] = -9.81 + (0.05 * (0.5 - time.time() % 1))
robot["sensors"]["imu"]["gx"] = robot["velocity"]["angular"] + (
0.01 * (0.5 - time.time() % 1)
)
# Update odometry
robot["sensors"]["odometry"]["speed"] = abs(robot["velocity"]["linear"])
robot["sensors"]["odometry"]["heading"] = robot["rotation"]["yaw"]
# Simulate battery drain (very slow)
robot["battery"] = max(0, robot["battery"] - 0.001)
robot["uptime"] += dt
# Broadcast updates at 10Hz
if int(time.time() * 10) % 1 == 0:
await robot_state.broadcast_robot_update(robot_id)
except Exception as e:
logger.error(f"Physics simulation error: {e}")
await asyncio.sleep(0.02) # 50Hz update rate
# WebSocket endpoint for real-time communication
@app.websocket("/ws/robotics")
async def robotics_websocket(websocket: WebSocket):
await websocket.accept()
robot_state.active_connections.append(websocket)
try:
# Send initial state
for robot_id, robot_data in robot_state.robots.items():
await websocket.send_json(
{
"type": "robot_update",
"robot_id": robot_id,
"data": robot_data,
"timestamp": datetime.now().isoformat(),
}
)
while True:
# Receive messages from client
data = await websocket.receive_json()
if data.get("type") == "command":
await handle_robot_command(data, websocket)
elif data.get("type") == "subscribe":
# Handle subscription requests
pass
except WebSocketDisconnect:
robot_state.active_connections.remove(websocket)
logger.info("WebSocket connection closed")
# Socket.IO event handlers
@sio.event
async def connect(sid, environ):
logger.info(f"Socket.IO client connected: {sid}")
await sio.emit("status", {"message": "Connected to Robotics MCP"})
@sio.event
async def disconnect(sid):
logger.info(f"Socket.IO client disconnected: {sid}")
@sio.event
async def robot_command(sid, data):
"""Handle robot commands via Socket.IO"""
await handle_robot_command(data, None)
async def handle_robot_command(data: dict[str, Any], websocket: WebSocket | None):
"""Handle incoming robot commands"""
try:
robot_id = data.get("robot_id")
command = data.get("command")
params = data.get("parameters", {})
if not robot_id or robot_id not in robot_state.robots:
error_msg = {"type": "error", "message": f"Unknown robot: {robot_id}"}
if websocket:
await websocket.send_json(error_msg)
await sio.emit("error", error_msg)
return
# Process command
if command == "move":
linear = params.get("linear", 0.0)
angular = params.get("angular", 0.0)
robot_state.update_robot_state(
robot_id, {"velocity": {"linear": linear, "angular": angular}}
)
# Add to command history
robot_state.add_command_to_history(robot_id, command, params)
# Confirm command execution
response = {
"type": "command_ack",
"robot_id": robot_id,
"command": command,
"status": "executed",
"timestamp": datetime.now().isoformat(),
}
if websocket:
await websocket.send_json(response)
await sio.emit("command_ack", response)
elif command == "stop":
robot_state.update_robot_state(robot_id, {"velocity": {"linear": 0.0, "angular": 0.0}})
robot_state.add_command_to_history(robot_id, command, params)
elif command == "reset":
robot_state.update_robot_state(
robot_id,
{
"position": {"x": 0.0, "y": 0.0, "z": 0.0},
"rotation": {"roll": 0.0, "pitch": 0.0, "yaw": 0.0},
"velocity": {"linear": 0.0, "angular": 0.0},
},
)
robot_state.add_command_to_history(robot_id, command, params)
elif command == "emergency_stop":
robot_state.update_robot_state(
robot_id, {"status": "emergency_stop", "velocity": {"linear": 0.0, "angular": 0.0}}
)
robot_state.add_command_to_history(robot_id, command, params)
else:
error_msg = {"type": "error", "message": f"Unknown command: {command}"}
if websocket:
await websocket.send_json(error_msg)
await sio.emit("error", error_msg)
except Exception as e:
logger.error(f"Command handling error: {e}")
error_msg = {"type": "error", "message": f"Command processing failed: {str(e)}"}
if websocket:
await websocket.send_json(error_msg)
await sio.emit("error", error_msg)
# REST API endpoints
@app.get("/")
async def root():
"""Root endpoint with API information"""
return {
"message": "Robotics MCP WebApp API",
"version": "1.0.0",
"endpoints": {
"websocket": "/ws/robotics",
"socketio": "Socket.IO integration",
"docs": "/docs",
"robots": "/api/robots",
},
}
@app.get("/api/robots")
async def get_robots():
"""Get all robots and their current status"""
return {
"robots": list(robot_state.robots.values()),
"total_count": len(robot_state.robots),
"online_count": len([r for r in robot_state.robots.values() if r["status"] == "online"]),
}
@app.get("/api/robots/{robot_id}")
async def get_robot(robot_id: str):
"""Get specific robot status"""
if robot_id not in robot_state.robots:
raise HTTPException(status_code=404, detail="Robot not found")
return robot_state.robots[robot_id]
@app.post("/api/robots/{robot_id}/command")
async def send_robot_command(robot_id: str, command: RobotCommand):
"""Send command to specific robot"""
if robot_id != command.robot_id:
raise HTTPException(status_code=400, detail="Robot ID mismatch")
await handle_robot_command(
{
"robot_id": command.robot_id,
"command": command.command,
"parameters": command.parameters,
},
None,
)
return {"status": "command_queued", "robot_id": robot_id, "command": command.command}
@app.get("/api/commands/history")
async def get_command_history(limit: int = 50):
"""Get recent command history"""
return {
"commands": robot_state.command_history[-limit:],
"total_count": len(robot_state.command_history),
}
# LLM Management Endpoints
@app.get("/api/llm/models")
async def get_llm_models():
"""Get all available LLM models"""
return {
"models": llm_state.get_available_models(),
"active_model": llm_state.active_model,
"total_count": len(llm_state.models),
}
@app.get("/api/llm/providers")
async def get_llm_providers():
"""Get all LLM providers"""
return {"providers": llm_state.get_providers(), "total_count": len(llm_state.providers)}
@app.post("/api/llm/command")
async def execute_llm_command(command: LLMCommand):
"""Execute natural language robot command through LLM"""
if command.robot_id not in robot_state.robots:
raise HTTPException(status_code=404, detail="Robot not found")
if not llm_state.active_model:
raise HTTPException(status_code=400, detail="No active LLM model")
# Execute LLM command
response = await llm_state.execute_llm_command(command)
# Execute the interpreted actions on the robot
for action in response.actions:
if action["type"] == "velocity_control":
await handle_robot_command(
{
"robot_id": command.robot_id,
"command": "move",
"parameters": {
"linear": action.get("linear", 0.0),
"angular": action.get("angular", 0.0),
},
},
None,
)
elif action["type"] == "path_planning":
# Simulate path planning
logger.info(f"Planning path to {action.get('destination', 'unknown')}")
return response
@app.get("/api/llm/history")
async def get_llm_command_history(limit: int = 20):
"""Get LLM command execution history"""
return {
"commands": llm_state.command_history[-limit:],
"total_count": len(llm_state.command_history),
}
@app.post("/api/llm/providers/{provider_name}/connect")
async def connect_llm_provider(provider_name: str, api_key: str | None = None):
"""Connect to an LLM provider"""
if provider_name not in llm_state.providers:
raise HTTPException(status_code=404, detail="Provider not found")
# Simulate connection
await asyncio.sleep(1.0)
llm_state.providers[provider_name]["status"] = "connected"
if api_key:
llm_state.providers[provider_name]["api_key"] = api_key
return {
"status": "connected",
"provider": provider_name,
"endpoint": llm_state.providers[provider_name]["endpoint"],
}
# Camera API Endpoints
@app.get("/api/cameras")
async def get_cameras():
"""Get all camera information"""
cameras = camera_manager.get_all_cameras()
return {"cameras": cameras, "total": len(cameras)}
@app.get("/api/cameras/{camera_name}")
async def get_camera_status(camera_name: str):
"""Get specific camera status"""
camera = camera_manager.get_camera(camera_name)
if not camera:
raise HTTPException(status_code=404, detail="Camera not found")
return camera.get_status()
@app.get("/api/cameras/{camera_name}/frame")
async def get_camera_frame(camera_name: str, format: str = "base64", quality: int = 80):
"""Get a frame from a specific camera"""
frame = camera_manager.get_frame(camera_name, format, quality)
if frame is None:
raise HTTPException(status_code=404, detail="Camera not available or no frame captured")
if format == "base64":
return {"frame": frame, "format": "jpeg", "quality": quality, "timestamp": time.time()}
elif format == "jpeg":
from fastapi.responses import Response
return Response(content=frame, media_type="image/jpeg")
else:
raise HTTPException(status_code=400, detail="Unsupported format")
@app.post("/api/cameras/{camera_name}/reconnect")
async def reconnect_camera(camera_name: str):
"""Reconnect to a camera"""
camera = camera_manager.get_camera(camera_name)
if not camera:
raise HTTPException(status_code=404, detail="Camera not found")
# Disconnect and reconnect
camera.disconnect()
await asyncio.sleep(0.5) # Brief pause
success = camera.connect()
if success:
camera.start_capture()
return {"status": "success", "message": f"Camera {camera_name} reconnected"}
else:
raise HTTPException(status_code=500, detail="Failed to reconnect camera")
@app.get("/api/hardware/status")
async def get_hardware_status():
"""Get overall hardware status including cameras"""
cameras = camera_manager.get_all_cameras()
camera_status = "operational" if cameras else "no_cameras"
return {
"cameras": cameras,
"camera_status": camera_status,
"total_cameras": len(cameras),
"timestamp": time.time(),
}
@app.get("/api/health")
async def health_check():
"""Health check endpoint"""
return {
"status": "healthy",
"timestamp": datetime.now().isoformat(),
"active_connections": len(robot_state.active_connections),
"robots_online": len([r for r in robot_state.robots.values() if r["status"] == "online"]),
"llm_models_loaded": len([m for m in llm_state.models.values() if m["status"] == "loaded"]),
"active_llm_model": llm_state.active_model,
}
# MCP Server Integration Endpoints
@app.get("/api/mcp/servers")
async def get_mcp_servers():
"""Get status of all MCP servers"""
return {"servers": mcp_client.get_server_status(), "timestamp": datetime.now().isoformat()}
@app.get("/api/mcp/servers/{server_name}/health")
async def check_mcp_server_health(server_name: str):
"""Check health of a specific MCP server"""
is_healthy = await mcp_client.check_health(server_name)
return {"server": server_name, "healthy": is_healthy, "timestamp": datetime.now().isoformat()}
@app.get("/api/mcp/servers/{server_name}/tools")
async def list_mcp_tools(server_name: str):
"""List available tools on an MCP server"""
tools = await mcp_client.list_tools(server_name)
return {
"server": server_name,
"tools": tools,
"count": len(tools),
"timestamp": datetime.now().isoformat(),
}
@app.post("/api/mcp/servers/{server_name}/tools/{tool_name}")
async def call_mcp_tool(server_name: str, tool_name: str, arguments: dict[str, Any] | None = None):
"""Call an MCP tool on a specific server"""
try:
result = await mcp_client.call_tool(server_name, tool_name, arguments)
return result
except Exception as e:
logger.error(f"Error calling MCP tool: {e}")
raise HTTPException(status_code=500, detail=str(e)) from e
# LLM Management Endpoints
class ChatMessage(BaseModel):
role: str
content: str
class ChatRequest(BaseModel):
model: str
messages: list[ChatMessage]
temperature: float | None = 0.7
max_tokens: int | None = None
personality: str | None = None
class GenerateRequest(BaseModel):
model: str
prompt: str
temperature: float | None = 0.7
max_tokens: int | None = None
@app.get("/api/llm/models")
async def list_llm_models(provider: str | None = None):
"""List all available LLM models"""
return await llm_service.list_models(provider)
@app.get("/api/llm/models/ollama")
async def list_ollama_models():
"""List Ollama models"""
return await llm_service.list_ollama_models()
@app.get("/api/llm/models/lmstudio")
async def list_lmstudio_models():
"""List LM Studio models"""
return await llm_service.list_lmstudio_models()
@app.post("/api/llm/models/{model_id}/load")
async def load_llm_model(model_id: str, provider: str = "ollama"):
"""Load a model for inference"""
return await llm_service.load_model(model_id, provider)
@app.post("/api/llm/models/{model_id}/unload")
async def unload_llm_model(model_id: str, provider: str = "ollama"):
"""Unload a model"""
return await llm_service.unload_model(model_id, provider)
@app.post("/api/llm/models/{model_id}/pull")
async def pull_llm_model(model_id: str, provider: str = "ollama"):
"""Pull/download a model"""
return await llm_service.pull_model(model_id, provider)
@app.get("/api/llm/models/active")
async def get_active_models():
"""Get currently loaded models"""
return {
"success": True,
"models": llm_service.get_active_models(),
"timestamp": datetime.now().isoformat(),
}
@app.post("/api/llm/generate")
async def generate_text(request: GenerateRequest):
"""Generate text from a prompt"""
return await llm_service.generate_text(
model=request.model,
prompt=request.prompt,
temperature=request.temperature,
max_tokens=request.max_tokens,
)
@app.post("/api/llm/chat")
async def chat_completion(request: ChatRequest):
"""Generate chat completion"""
messages = [{"role": msg.role, "content": msg.content} for msg in request.messages]
return await llm_service.chat_completion(
model=request.model,
messages=messages,
temperature=request.temperature,
max_tokens=request.max_tokens,
personality=request.personality,
)
@app.get("/api/llm/health")
async def get_llm_health():
"""Get LLM service health"""
return await llm_service.get_health()
@app.get("/api/llm/system-info")
async def get_llm_system_info():
"""Get LLM system information"""
return await llm_service.get_system_info()
@app.get("/api/llm/personalities")
async def get_personalities():
"""Get available chat personalities"""
return {
"success": True,
"personalities": llm_service.get_personalities(),
"timestamp": datetime.now().isoformat(),
}
class PersonalityRequest(BaseModel):
key: str
name: str
system_prompt: str
temperature: float = 0.7
max_tokens: int = 2000
@app.post("/api/llm/personalities")
async def add_personality(request: PersonalityRequest):
"""Add a custom personality"""
llm_service.add_personality(
key=request.key,
name=request.name,
system_prompt=request.system_prompt,
temperature=request.temperature,
max_tokens=request.max_tokens,
)
return {
"success": True,
"message": f"Personality '{request.name}' added successfully",
"timestamp": datetime.now().isoformat(),
}
# Workflow Management Endpoints
class WorkflowCreateRequest(BaseModel):
workflow_data: dict[str, Any]
class WorkflowUpdateRequest(BaseModel):
workflow_data: dict[str, Any]
class WorkflowExecuteRequest(BaseModel):
variables: dict[str, Any] | None = None
@app.post("/api/workflows")
async def create_workflow(request: WorkflowCreateRequest):
"""Create a new workflow"""
try:
workflow = await workflow_service.create_workflow(request.workflow_data)
return {"success": True, "workflow": workflow}
except Exception as e:
logger.error(f"Failed to create workflow: {e}", exc_info=True)
raise HTTPException(status_code=500, detail=str(e)) from e
@app.get("/api/workflows/{workflow_id}")
async def get_workflow(workflow_id: str):
"""Get workflow by ID"""
try:
workflow = await workflow_service.get_workflow(workflow_id)
return {"success": True, "workflow": workflow}
except Exception as e:
logger.error(f"Failed to get workflow: {e}", exc_info=True)
raise HTTPException(status_code=500, detail=str(e)) from e
@app.put("/api/workflows/{workflow_id}")
async def update_workflow(workflow_id: str, request: WorkflowUpdateRequest):
"""Update workflow"""
try:
workflow = await workflow_service.update_workflow(workflow_id, request.workflow_data)
return {"success": True, "workflow": workflow}
except Exception as e:
logger.error(f"Failed to update workflow: {e}", exc_info=True)
raise HTTPException(status_code=500, detail=str(e)) from e
@app.delete("/api/workflows/{workflow_id}")
async def delete_workflow(workflow_id: str):
"""Delete workflow"""
try:
await workflow_service.delete_workflow(workflow_id)
return {"success": True, "message": "Workflow deleted"}
except Exception as e:
logger.error(f"Failed to delete workflow: {e}", exc_info=True)
raise HTTPException(status_code=500, detail=str(e)) from e
@app.get("/api/workflows")
async def list_workflows(
category: str | None = None,
tags: str | None = None,
search: str | None = None,
):
"""List workflows with optional filtering"""
try:
tag_list = tags.split(",") if tags else None
workflows = await workflow_service.list_workflows(
category=category, tags=tag_list, search=search
)
return {"success": True, "workflows": workflows}
except Exception as e:
logger.error(f"Failed to list workflows: {e}", exc_info=True)
raise HTTPException(status_code=500, detail=str(e)) from e
@app.post("/api/workflows/{workflow_id}/execute")
async def execute_workflow(workflow_id: str, request: WorkflowExecuteRequest):
"""Execute a workflow"""
try:
result = await workflow_service.execute_workflow(
workflow_id, request.variables or {}, debug_mode=request.debug_mode
)
return {"success": True, **result}
except Exception as e:
logger.error(f"Failed to execute workflow: {e}", exc_info=True)
raise HTTPException(status_code=500, detail=str(e)) from e
@app.post("/api/workflows/executions/{execution_id}/pause")
async def pause_execution(execution_id: str):
"""Pause workflow execution"""
try:
await workflow_service.pause_execution(execution_id)
return {"success": True, "message": "Execution paused"}
except Exception as e:
logger.error(f"Failed to pause execution: {e}", exc_info=True)
raise HTTPException(status_code=500, detail=str(e)) from e
@app.post("/api/workflows/executions/{execution_id}/resume")
async def resume_execution(execution_id: str):
"""Resume workflow execution"""
try:
await workflow_service.resume_execution(execution_id)
return {"success": True, "message": "Execution resumed"}
except Exception as e:
logger.error(f"Failed to resume execution: {e}", exc_info=True)
raise HTTPException(status_code=500, detail=str(e)) from e
@app.post("/api/workflows/executions/{execution_id}/step")
async def step_execution(execution_id: str):
"""Step to next instruction in debug mode"""
try:
await workflow_service.step_execution(execution_id)
return {"success": True, "message": "Stepped to next instruction"}
except Exception as e:
logger.error(f"Failed to step execution: {e}", exc_info=True)
raise HTTPException(status_code=500, detail=str(e)) from e
@app.post("/api/workflows/executions/{execution_id}/continue")
async def continue_execution(execution_id: str):
"""Continue execution from breakpoint (run until next breakpoint or end)"""
try:
await workflow_service.continue_execution(execution_id)
return {"success": True, "message": "Execution continued"}
except Exception as e:
logger.error(f"Failed to continue execution: {e}", exc_info=True)
raise HTTPException(status_code=500, detail=str(e)) from e
@app.get("/api/workflows/executions/{execution_id}")
async def get_execution_status(execution_id: str):
"""Get workflow execution status"""
try:
execution = await workflow_service.get_execution_status(execution_id)
return {"success": True, "execution": execution}
except Exception as e:
logger.error(f"Failed to get execution status: {e}", exc_info=True)
raise HTTPException(status_code=500, detail=str(e)) from e
@app.get("/api/workflows/templates")
async def get_workflow_templates():
"""Get workflow templates"""
try:
templates = await workflow_service.get_templates()
return {"success": True, "templates": templates}
except Exception as e:
logger.error(f"Failed to get templates: {e}", exc_info=True)
raise HTTPException(status_code=500, detail=str(e)) from e
@app.post("/api/workflows/import")
async def import_workflow(request: WorkflowCreateRequest):
"""Import workflow from JSON"""
try:
workflow = await workflow_service.import_workflow(request.workflow_data)
return {"success": True, "workflow": workflow}
except Exception as e:
logger.error(f"Failed to import workflow: {e}", exc_info=True)
raise HTTPException(status_code=500, detail=str(e)) from e
@app.get("/api/workflows/{workflow_id}/export")
async def export_workflow(workflow_id: str):
"""Export workflow to JSON"""
try:
workflow_data = await workflow_service.export_workflow(workflow_id)
return {"success": True, **workflow_data}
except Exception as e:
logger.error(f"Failed to export workflow: {e}", exc_info=True)
raise HTTPException(status_code=500, detail=str(e)) from e
# Dreame Robot API Models
class DreameCommand(BaseModel):
action: str = Field(
..., description="Action to perform: start_cleaning, stop_cleaning, return_to_dock, etc."
)
parameters: dict[str, Any] | None = Field(default={}, description="Action-specific parameters")
class DreameMapRequest(BaseModel):
include_history: bool = Field(default=False, description="Include cleaning history in map data")
high_resolution: bool = Field(default=False, description="Request high-resolution map")
# Dreame Robot API Endpoints
@app.post("/api/dreame/{robot_id}/command")
async def dreame_command(robot_id: str, command: DreameCommand):
"""Execute commands on Dreame vacuum robots via Robotics MCP server REST API."""
try:
robotics_mcp_url = "http://localhost:12230"
payload = {"action": command.action, **(command.parameters or {})}
async with aiohttp.ClientSession() as session:
async with session.post(
f"{robotics_mcp_url}/api/v1/robots/{robot_id}/control", json=payload
) as response:
try:
result = await response.json()
except Exception:
result = {}
is_ok = (
response.status == 200
and result.get("success", True)
and result.get("status") != "error"
)
if is_ok:
return {
"success": True,
"message": f"Dreame command '{command.action}' executed successfully",
"data": result,
"robot_id": robot_id,
"timestamp": datetime.now().isoformat(),
}
else:
error_text = result.get("message", result.get("error", ""))
return {
"success": False,
"message": f"Dreame command failed: {error_text or f'HTTP {response.status}'}",
"robot_id": robot_id,
"error": error_text,
}
except Exception as e:
logger.error(f"Dreame command failed for robot {robot_id}: {e}")
return {
"success": False,
"message": f"Dreame command failed: {str(e)}",
"robot_id": robot_id,
"error": str(e),
}
@app.get("/api/dreame/{robot_id}/status")
async def dreame_status(robot_id: str):
"""Get current status of Dreame vacuum robot via control action."""
try:
robotics_mcp_url = "http://localhost:12230"
async with aiohttp.ClientSession() as session:
async with session.post(
f"{robotics_mcp_url}/api/v1/robots/{robot_id}/control",
json={"action": "get_status"},
) as response:
if response.status == 200:
result = await response.json()
if not result.get("success", True):
return {
"success": False,
"message": result.get("error", "Dreame status failed"),
"robot_id": robot_id,
"error": result.get("error"),
}
data = result.get("data", result)
status = data.get("status", data) if isinstance(data, dict) else data
s = status if isinstance(status, dict) else {}
return {
"success": True,
"message": "Dreame status retrieved successfully",
"data": {
"battery": s.get("battery_level", s.get("battery", 0)),
"position": s.get("position", {"x": 0, "y": 0}),
"status": s.get("device_status", s.get("status", "unknown")),
**s,
},
"robot_id": robot_id,
}
else:
error_text = await response.text()
return {
"success": False,
"message": f"Failed to get Dreame status: HTTP {response.status}",
"robot_id": robot_id,
"error": error_text,
}
except Exception as e:
return {
"success": False,
"message": f"Failed to get Dreame status: {str(e)}",
"robot_id": robot_id,
"error": str(e),
}
@app.get("/api/dreame/{robot_id}/map")
async def dreame_map(robot_id: str, include_history: bool = False, high_resolution: bool = False):
"""Get LIDAR map data from Dreame vacuum robot via control action."""
try:
robotics_mcp_url = "http://localhost:12230"
async with aiohttp.ClientSession() as session:
async with session.post(
f"{robotics_mcp_url}/api/v1/robots/{robot_id}/control", json={"action": "get_map"}
) as response:
if response.status == 200:
result = await response.json()
data = result.get("data", result)
map_data = data.get("map", data) if isinstance(data, dict) else data
return {
"success": True,
"message": "Dreame map retrieved successfully",
"data": map_data if isinstance(map_data, dict) else {},
"robot_id": robot_id,
"map_format": "dreame_lidar_slam",
"scale_factor": 0.001,
"timestamp": datetime.now().isoformat(),
}
else:
error_text = await response.text()
return {
"success": False,
"message": f"Failed to get Dreame map: HTTP {response.status}",
"robot_id": robot_id,
"error": error_text,
}
except Exception as e:
return {
"success": False,
"message": f"Failed to get Dreame map: {str(e)}",
"robot_id": robot_id,
"error": str(e),
}
@app.post("/api/dreame/{robot_id}/settings")
async def dreame_settings(robot_id: str, settings: dict[str, Any]):
"""Update Dreame robot settings (suction, water, humidity, etc.)."""
try:
result = await mcp_client.call_tool(
"robotics",
"robot_control",
{"robot_id": robot_id, "action": "apply_settings", **settings},
)
return {
"success": True,
"message": "Dreame settings updated successfully",
"data": result,
"robot_id": robot_id,
"settings_applied": settings,
}
except Exception as e:
return {
"success": False,
"message": f"Failed to update Dreame settings: {str(e)}",
"robot_id": robot_id,
"error": str(e),
}
@app.post("/api/dreame/{robot_id}/zone")
async def dreame_zone_clean(robot_id: str, zones: list[list[int]]):
"""Clean specific zones with Dreame vacuum."""
try:
result = await mcp_client.call_tool(
"robotics",
"robot_control",
{"robot_id": robot_id, "action": "clean_zone", "zones": zones},
)
return {
"success": True,
"message": f"Dreame zone cleaning started for {len(zones)} zones",
"data": result,
"robot_id": robot_id,
"zones": zones,
}
except Exception as e:
return {
"success": False,
"message": f"Failed to start Dreame zone cleaning: {str(e)}",
"robot_id": robot_id,
"error": str(e),
}
@app.post("/api/dreame/{robot_id}/room/{room_id}")
async def dreame_room_clean(robot_id: str, room_id: int):
"""Clean specific room with Dreame vacuum."""
try:
result = await mcp_client.call_tool(
"robotics",
"robot_control",
{"robot_id": robot_id, "action": "clean_room", "room_id": room_id},
)
return {
"success": True,
"message": f"Dreame room cleaning started for room {room_id}",
"data": result,
"robot_id": robot_id,
"room_id": room_id,
}
except Exception as e:
return {
"success": False,
"message": f"Failed to start Dreame room cleaning: {str(e)}",
"robot_id": robot_id,
"error": str(e),
}
@app.post("/api/dreame/{robot_id}/spot")
async def dreame_spot_clean(robot_id: str, x: int, y: int):
"""Clean specific spot with Dreame vacuum."""
try:
result = await mcp_client.call_tool(
"robotics",
"robot_control",
{"robot_id": robot_id, "action": "clean_spot", "spot_x": x, "spot_y": y},
)
return {
"success": True,
"message": f"Dreame spot cleaning started at ({x}, {y})",
"data": result,
"robot_id": robot_id,
"coordinates": {"x": x, "y": y},
}
except Exception as e:
return {
"success": False,
"message": f"Failed to start Dreame spot cleaning: {str(e)}",
"robot_id": robot_id,
"error": str(e),
}
# Application Launcher Endpoints
class AppLaunchRequest(BaseModel):
desktop_number: int | None = None
project_path: str | None = None
fullscreen: bool = False
monitor: int | None = None
@app.post("/api/apps/{app_id}/launch")
async def launch_app(app_id: str, request: AppLaunchRequest):
"""Launch an application"""
try:
result = await app_launcher.launch_app(
app_id=app_id,
desktop_number=request.desktop_number,
project_path=request.project_path,
fullscreen=request.fullscreen,
monitor=request.monitor,
)
return result
except Exception as e:
logger.error(f"Failed to launch app {app_id}: {e}", exc_info=True)
raise HTTPException(status_code=500, detail=str(e)) from e
@app.post("/api/apps/{app_id}/stop")
async def stop_app(app_id: str):
"""Stop a running application"""
try:
result = await app_launcher.stop_app(app_id)
return result
except Exception as e:
logger.error(f"Failed to stop app {app_id}: {e}", exc_info=True)
raise HTTPException(status_code=500, detail=str(e)) from e
@app.get("/api/apps/status")
async def get_apps_status():
"""Get status of all applications"""
try:
apps = app_launcher.get_running_apps()
return {"success": True, "applications": apps}
except Exception as e:
logger.error(f"Failed to get app status: {e}", exc_info=True)
raise HTTPException(status_code=500, detail=str(e)) from e
# Start background tasks when app starts
@app.on_event("startup")
async def startup_event():
"""Initialize background tasks on startup"""
asyncio.create_task(simulate_robot_physics())
# Load physical robots from robotics MCP server
try:
await robot_state.load_physical_robots()
logger.info("Physical robots loaded from robotics MCP server")
except Exception as e:
logger.warning(f"Failed to load physical robots: {e}")
# Add mock Dreame robot for demo purposes if no physical robots found
if not robot_state.robots:
try:
mock_dreame = {
"robot_id": "dreame_01",
"name": "Dreame D20 Pro (Demo)",
"type": "dreame",
"status": "idle",
"connected": False, # Mock robot, not physically connected
"position": {"x": 0.0, "y": 0.0, "z": 0.0},
"rotation": {"yaw": 0.0},
"velocity": {"linear": 0.0, "angular": 0.0},
"battery": 85,
"sensors": {
"imu": {"ax": 0.0, "ay": -9.81, "az": 0.0, "gx": 0.0, "gy": 0.0, "gz": 0.0},
"odometry": {"distance": 0.0, "speed": 0.0, "heading": 0.0},
"camera": {"streaming": False, "fps": 0, "resolution": "0x0"},
"lidar": {"status": "active", "range": 8.0, "scan_rate": 5.0},
"cliff": [True, True, True, True],
"bumper": False,
"wheel_drop": False,
},
"last_update": datetime.now().isoformat(),
"uptime": 0,
"map_available": True,
}
robot_state.robots["dreame_01"] = mock_dreame
logger.info("Mock Dreame robot added for demo purposes")
except Exception as e:
logger.warning(f"Failed to add mock Dreame robot: {e}")
# Initialize LLM provider clients
try:
await llm_provider_manager.initialize_clients()
logger.info("LLM provider clients initialized")
except Exception as e:
logger.warning(f"Failed to initialize LLM providers: {e}")
# Initialize camera system
try:
await initialize_cameras()
logger.info("Camera system initialized")
except Exception as e:
logger.warning(f"Failed to initialize camera system: {e}")
logger.info("Robotics WebApp API server started")
@app.on_event("shutdown")
async def shutdown_event():
"""Clean up resources on shutdown"""
try:
await cleanup_cameras()
logger.info("Camera system cleaned up")
except Exception as e:
logger.error(f"Error during camera cleanup: {e}")
# Cleanup MCP client
try:
await mcp_client.close()
logger.info("MCP client cleaned up")
except Exception as e:
logger.error(f"Error during MCP client cleanup: {e}")
logger.info("Robotics MCP API server shut down")
if __name__ == "__main__":
# Run with uvicorn
uvicorn.run("main:socket_app", host="0.0.0.0", port=8354, reload=True, log_level="info")