# ποΈ Architecture Overview
## Robotics MCP WebApp
This document describes the technical architecture, design decisions, and system components of the Robotics MCP WebApp.
---
## π Table of Contents
- [System Overview](#system-overview)
- [Technology Stack](#technology-stack)
- [Frontend Architecture](#frontend-architecture)
- [Backend Architecture](#backend-architecture)
- [Real-time Communication](#real-time-communication)
- [Data Architecture](#data-architecture)
- [Security Architecture](#security-architecture)
- [Deployment Architecture](#deployment-architecture)
- [Performance Considerations](#performance-considerations)
- [Scalability Design](#scalability-design)
---
## ποΈ System Overview
The Robotics MCP WebApp is a distributed, real-time robotics control platform designed for both virtual and physical robot operation. The system follows a microservices-inspired architecture with clear separation of concerns.
### Core Principles
- **Real-time Performance**: <50ms end-to-end latency for control operations
- **Scalability**: Support for 10,000+ concurrent users and robots
- **Reliability**: 99.9% uptime with automatic failover
- **Security**: End-to-end encryption and comprehensive access control
- **Modularity**: Clean separation of frontend, backend, and integration layers
### System Components
```
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β User Interface Layer β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β Next.js Frontend (Port 3000) β β
β β βββββββββββ¬ββββββββββ¬ββββββββββ¬ββββββββββ β β
β β βDashboardβVBot CtrlβSensors βDocs β β β
β β βββββββββββ΄ββββββββββ΄ββββββββββ΄ββββββββββ β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β
β WebSocket/HTTP
βΌ
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β Application Layer β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β FastAPI Backend (Port 8354) β β
β β βββββββββββ¬ββββββββββ¬ββββββββββ¬ββββββββββ β β
β β βWebSocketβREST API βAuth βPhysics β β β
β β βServer β βService βEngine β β β
β β βββββββββββ΄ββββββββββ΄ββββββββββ΄ββββββββββ β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β
β ROS Bridge/OSC
βΌ
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β Robotics Integration Layer β
β βββββββββββ¬ββββββββββ¬ββββββββββ¬ββββββββββ¬ββββββββββ β
β βVirtual βPhysical βWorld βVRChat βResonite β β
β βRobots βRobots βLabs βOSC βProtoFluxβ β
β βββββββββββ΄ββββββββββ΄ββββββββββ΄ββββββββββ΄ββββββββββ β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
```
---
## π οΈ Technology Stack
### Frontend Technologies
| Component | Technology | Version | Purpose |
|-----------|------------|---------|---------|
| **Framework** | Next.js | 16.0.10 | React framework with App Router |
| **Language** | TypeScript | 5.3+ | Type-safe JavaScript |
| **Styling** | Tailwind CSS | 4.0+ | Utility-first CSS framework |
| **UI Components** | shadcn/ui | Latest | Professional component library |
| **Charts** | Recharts | Latest | Data visualization |
| **3D Graphics** | Three.js + React Three Fiber | Latest | 3D rendering and robotics visualization |
| **Real-time** | WebSocket API | Native | Bidirectional communication |
| **State Management** | React Hooks + Context | Built-in | Component state management |
### Backend Technologies
| Component | Technology | Version | Purpose |
|-----------|------------|---------|---------|
| **Framework** | FastAPI | 0.124.4 | High-performance async web framework |
| **Language** | Python | 3.10+ | Backend programming language |
| **ASGI Server** | Uvicorn | 0.38.0 | Production ASGI server |
| **WebSocket** | WebSocket Protocol | RFC 6455 | Real-time bidirectional communication |
| **Real-time** | Socket.IO | 5.15.1 | Enhanced real-time features |
| **Data Validation** | Pydantic | 2.12.5 | Data validation and serialization |
| **Authentication** | JWT | Latest | Token-based authentication |
### Robotics Integration
| Component | Technology | Purpose |
|-----------|------------|---------|
| **ROS Bridge** | rosbridge_suite | ROS-Web communication |
| **OSC Protocol** | python-osc | VR platform communication |
| **World Labs** | Marble API | AI environment generation |
| **Unity Integration** | Unity3D Engine | Virtual robot physics |
| **VRChat SDK** | Udon Scripting | VR robot control |
| **Hardware** | Serial/USB protocols | Physical robot communication |
### DevOps & Deployment
| Component | Technology | Purpose |
|-----------|------------|---------|
| **Containerization** | Docker | Application containerization |
| **Orchestration** | Kubernetes | Container orchestration |
| **CI/CD** | GitHub Actions | Automated testing and deployment |
| **Monitoring** | Prometheus + Grafana | System monitoring |
| **Logging** | ELK Stack | Centralized logging |
| **Load Balancing** | NGINX | HTTP load balancing |
---
## π¨ Frontend Architecture
### Directory Structure
```
src/
βββ app/ # Next.js App Router
β βββ (dashboard)/ # Route groups for organization
β βββ api/ # API routes (server-side)
β βββ globals.css # Global styles
β βββ layout.tsx # Root layout
βββ components/ # React components
β βββ ui/ # Reusable UI components (shadcn)
β βββ layout/ # Layout-specific components
β βββ features/ # Feature-specific components
β βββ providers/ # React context providers
β βββ hooks/ # Custom React hooks
βββ lib/ # Utilities and configurations
β βββ utils.ts # General utilities
β βββ api.ts # API client functions
β βββ websocket.ts # WebSocket client
β βββ validations.ts # Form validation schemas
β βββ constants.ts # Application constants
βββ types/ # TypeScript definitions
βββ api.ts # API response types
βββ components.ts # Component prop types
βββ robotics.ts # Robotics domain types
βββ websocket.ts # WebSocket message types
```
### Component Architecture
#### Atomic Design Pattern
```
Atoms (UI Primitives)
βββ Button, Input, Badge, Avatar, etc.
Molecules (Composite Components)
βββ ControlPanel, SensorCard, ChartContainer, etc.
Organisms (Complex Components)
βββ VBotControl, SensorDashboard, Navigation, etc.
Templates (Page Layouts)
βββ DashboardLayout, ControlLayout, DocsLayout, etc.
Pages (Complete Views)
βββ Dashboard, VBotControl, Sensors, Documentation, etc.
```
#### State Management Strategy
```typescript
// Context-based state management for global state
interface AppState {
user: User | null;
robots: Robot[];
connections: ConnectionStatus;
theme: 'light' | 'dark' | 'system';
}
const AppContext = createContext<AppState | undefined>(undefined);
// Custom hooks for domain-specific state
function useRobotState(robotId: string) {
const [robotData, setRobotData] = useState<RobotData | null>(null);
const [isConnected, setIsConnected] = useState(false);
useEffect(() => {
// WebSocket subscription logic
const ws = new RoboticsWebSocket((data) => {
if (data.robot_id === robotId) {
setRobotData(data.data);
}
}, setIsConnected);
return () => ws.disconnect();
}, [robotId]);
return { robotData, isConnected };
}
// Component-level state with useReducer for complex state
interface ControlState {
linearVelocity: number;
angularVelocity: number;
isEmergencyStop: boolean;
lastCommand: string | null;
}
function controlReducer(state: ControlState, action: ControlAction): ControlState {
switch (action.type) {
case 'SET_VELOCITY':
return { ...state, ...action.payload };
case 'EMERGENCY_STOP':
return { ...state, isEmergencyStop: true, linearVelocity: 0, angularVelocity: 0 };
case 'COMMAND_EXECUTED':
return { ...state, lastCommand: action.command };
default:
return state;
}
}
```
### Performance Optimizations
#### Code Splitting
```typescript
// Automatic code splitting with Next.js App Router
// Each route is automatically code-split
// Manual code splitting for heavy components
const SensorDashboard = dynamic(() => import('@/components/SensorDashboard'), {
loading: () => <Skeleton className="h-96 w-full" />,
ssr: false // Disable SSR for WebSocket-dependent components
});
```
#### Memoization Strategy
```typescript
// Memoize expensive calculations
const sensorData = useMemo(() => {
return processSensorData(rawData);
}, [rawData]);
// Memoize event handlers
const handleVelocityChange = useCallback((value: number[]) => {
const [linear, angular] = value;
sendCommand('move', { linear, angular });
}, [sendCommand]);
// Memoize components
const SensorChart = memo(function SensorChart({ data, type }) {
return <LineChart data={data} />;
});
```
---
## π§ Backend Architecture
### Directory Structure
```
backend/
βββ app/ # FastAPI application
β βββ main.py # FastAPI app instance
β βββ config.py # Configuration management
β βββ database.py # Database connection
β βββ models/ # Pydantic models
βββ routes/ # API route handlers
β βββ robots.py # Robot control endpoints
β βββ sensors.py # Sensor data endpoints
β βββ environments.py # Environment management
β βββ auth.py # Authentication endpoints
βββ services/ # Business logic services
β βββ robot_service.py # Robot control logic
β βββ sensor_service.py # Sensor data processing
β βββ environment_service.py # Environment management
β βββ websocket_service.py # WebSocket management
βββ core/ # Core functionality
β βββ security.py # Authentication & authorization
β βββ websocket.py # WebSocket connection handling
β βββ physics.py # Physics simulation engine
β βββ integrations/ # External service integrations
β βββ ros_bridge.py # ROS communication
β βββ osc_bridge.py # OSC protocol handling
β βββ world_labs.py # World Labs API client
βββ tests/ # Test suite
β βββ unit/ # Unit tests
β βββ integration/ # Integration tests
β βββ fixtures/ # Test data
βββ scripts/ # Utility scripts
βββ init_db.py # Database initialization
βββ seed_data.py # Test data seeding
```
### API Design Principles
#### RESTful Endpoints
```python
# Resource-based URL structure
GET /api/robots # List all robots
POST /api/robots # Create new robot
GET /api/robots/{id} # Get robot details
PUT /api/robots/{id} # Update robot
DELETE /api/robots/{id} # Delete robot
# Sub-resource endpoints
GET /api/robots/{id}/sensors # Get robot sensors
POST /api/robots/{id}/command # Execute command
GET /api/robots/{id}/logs # Get robot logs
```
#### WebSocket Message Protocol
```python
# Client β Server messages
{
"type": "command",
"robot_id": "vbot_scout_mini",
"command": "move",
"parameters": {
"linear": 0.5,
"angular": 0.0
},
"timestamp": "2025-12-17T15:30:25Z"
}
# Server β Client messages
{
"type": "robot_update",
"robot_id": "vbot_scout_mini",
"data": {
"position": {"x": 1.24, "y": 0.0, "z": -2.15},
"velocity": {"linear": 0.0, "angular": 0.0},
"battery": 87,
"status": "active"
},
"timestamp": "2025-12-17T15:30:26Z"
}
```
### Service Layer Architecture
```python
# Dependency injection with service layers
class RobotService:
def __init__(self, robot_repository: RobotRepository, websocket_service: WebSocketService):
self.repository = robot_repository
self.websocket = websocket_service
async def execute_command(self, robot_id: str, command: RobotCommand) -> CommandResult:
# Validate command
await self._validate_command(robot_id, command)
# Execute command
result = await self._execute_command(robot_id, command)
# Update state
await self.repository.update_robot_state(robot_id, result.new_state)
# Broadcast update
await self.websocket.broadcast_robot_update(robot_id, result.new_state)
# Log command
await self._log_command(robot_id, command, result)
return result
async def _validate_command(self, robot_id: str, command: RobotCommand):
# Command validation logic
pass
async def _execute_command(self, robot_id: str, command: RobotCommand) -> CommandResult:
# Command execution logic
pass
```
### Background Task Management
```python
# Physics simulation as background task
@asynccontextmanager
async def lifespan(app: FastAPI):
# Startup
physics_task = asyncio.create_task(run_physics_simulation())
websocket_cleanup_task = asyncio.create_task(cleanup_inactive_connections())
yield
# Shutdown
physics_task.cancel()
websocket_cleanup_task.cancel()
try:
await physics_task
await websocket_cleanup_task
except asyncio.CancelledError:
pass
app = FastAPI(lifespan=lifespan)
async def run_physics_simulation():
"""Run physics simulation at 50Hz"""
while True:
try:
# Update all robot physics
robots = await robot_service.get_active_robots()
for robot in robots:
new_state = await physics_engine.update_robot(robot)
await robot_service.update_robot_state(robot.id, new_state)
await websocket_service.broadcast_update(robot.id, new_state)
await asyncio.sleep(0.02) # 50Hz
except Exception as e:
logger.error(f"Physics simulation error: {e}")
await asyncio.sleep(1) # Back off on errors
```
---
## π Real-time Communication
### WebSocket Architecture
#### Connection Management
```python
class WebSocketManager:
def __init__(self):
self.active_connections: Dict[str, WebSocket] = {}
self.connection_metadata: Dict[str, ConnectionMetadata] = {}
self.message_handlers: Dict[str, Callable] = {}
async def connect(self, websocket: WebSocket, client_id: str):
await websocket.accept()
self.active_connections[client_id] = websocket
self.connection_metadata[client_id] = ConnectionMetadata(
connected_at=datetime.now(),
last_activity=datetime.now(),
subscriptions=set()
)
# Send welcome message
await self.send_to_client(client_id, {
"type": "connection_established",
"client_id": client_id,
"server_time": datetime.now().isoformat()
})
async def disconnect(self, client_id: str):
if client_id in self.active_connections:
del self.active_connections[client_id]
del self.connection_metadata[client_id]
async def broadcast(self, message: dict, exclude_client: str = None):
"""Broadcast message to all connected clients"""
tasks = []
for client_id, websocket in self.active_connections.items():
if client_id != exclude_client:
tasks.append(self.send_to_client(client_id, message))
await asyncio.gather(*tasks, return_exceptions=True)
async def send_to_client(self, client_id: str, message: dict):
"""Send message to specific client"""
if client_id in self.active_connections:
try:
await self.active_connections[client_id].send_json(message)
self.connection_metadata[client_id].last_activity = datetime.now()
except Exception as e:
logger.error(f"Failed to send message to {client_id}: {e}")
await self.disconnect(client_id)
```
#### Message Routing
```python
class MessageRouter:
def __init__(self, websocket_manager: WebSocketManager):
self.ws_manager = websocket_manager
self.command_handlers = {
'robot_command': self.handle_robot_command,
'subscribe': self.handle_subscription,
'unsubscribe': self.handle_unsubscription,
}
async def route_message(self, client_id: str, message: dict):
"""Route incoming message to appropriate handler"""
message_type = message.get('type')
if message_type in self.command_handlers:
handler = self.command_handlers[message_type]
try:
await handler(client_id, message)
except Exception as e:
logger.error(f"Message handler error: {e}")
await self.ws_manager.send_to_client(client_id, {
"type": "error",
"message": f"Failed to process {message_type} message"
})
else:
await self.ws_manager.send_to_client(client_id, {
"type": "error",
"message": f"Unknown message type: {message_type}"
})
async def handle_robot_command(self, client_id: str, message: dict):
"""Handle robot command messages"""
robot_id = message.get('robot_id')
command = message.get('command')
parameters = message.get('parameters', {})
# Validate permissions
if not await self.check_client_permissions(client_id, robot_id):
await self.ws_manager.send_to_client(client_id, {
"type": "error",
"message": "Insufficient permissions"
})
return
# Execute command
result = await robot_service.execute_command(robot_id, command, parameters)
# Send acknowledgment
await self.ws_manager.send_to_client(client_id, {
"type": "command_ack",
"command_id": result.command_id,
"status": "executed",
"result": result.data
})
```
### Performance Optimizations
#### Message Batching
```python
class MessageBatcher:
def __init__(self, batch_size: int = 10, batch_timeout: float = 0.1):
self.batch_size = batch_size
self.batch_timeout = batch_timeout
self.pending_messages: Dict[str, List[dict]] = {}
self.batch_timers: Dict[str, asyncio.Task] = {}
async def add_message(self, client_id: str, message: dict):
"""Add message to batch queue"""
if client_id not in self.pending_messages:
self.pending_messages[client_id] = []
self.pending_messages[client_id].append(message)
# Start batch timer if not already running
if client_id not in self.batch_timers:
self.batch_timers[client_id] = asyncio.create_task(
self.flush_batch(client_id)
)
# Flush immediately if batch is full
if len(self.pending_messages[client_id]) >= self.batch_size:
await self.flush_batch_immediately(client_id)
async def flush_batch(self, client_id: str):
"""Flush batch after timeout"""
await asyncio.sleep(self.batch_timeout)
await self.flush_batch_immediately(client_id)
async def flush_batch_immediately(self, client_id: str):
"""Flush all pending messages for client"""
if client_id in self.pending_messages and self.pending_messages[client_id]:
messages = self.pending_messages[client_id]
self.pending_messages[client_id] = []
# Send batched message
await websocket_manager.send_to_client(client_id, {
"type": "batch",
"messages": messages,
"timestamp": datetime.now().isoformat()
})
# Clean up timer
if client_id in self.batch_timers:
self.batch_timers[client_id].cancel()
del self.batch_timers[client_id]
```
---
## πΎ Data Architecture
### Data Models
#### Robot State Model
```python
from pydantic import BaseModel, Field
from typing import Optional, Dict, Any
from datetime import datetime
class Position(BaseModel):
x: float = Field(..., description="X coordinate in meters")
y: float = Field(..., description="Y coordinate in meters")
z: float = Field(..., description="Z coordinate in meters")
class Orientation(BaseModel):
roll: float = Field(..., description="Roll angle in radians")
pitch: float = Field(..., description="Pitch angle in radians")
yaw: float = Field(..., description="Yaw angle in radians")
class Velocity(BaseModel):
linear: float = Field(..., description="Linear velocity in m/s")
angular: float = Field(..., description="Angular velocity in rad/s")
class SensorData(BaseModel):
imu: Dict[str, Any] = Field(default_factory=dict, description="IMU sensor data")
odometry: Dict[str, Any] = Field(default_factory=dict, description="Odometry data")
camera: Dict[str, Any] = Field(default_factory=dict, description="Camera sensor data")
lidar: Optional[Dict[str, Any]] = Field(None, description="LiDAR sensor data")
class Robot(BaseModel):
id: str = Field(..., description="Unique robot identifier")
name: str = Field(..., description="Human-readable robot name")
type: str = Field(..., description="Robot type (virtual/physical)")
status: str = Field(..., description="Current robot status")
position: Position
orientation: Orientation
velocity: Velocity
battery: int = Field(..., ge=0, le=100, description="Battery percentage")
sensors: SensorData = Field(default_factory=SensorData)
last_update: datetime = Field(default_factory=datetime.now)
uptime: float = Field(default=0.0, description="Uptime in seconds")
metadata: Dict[str, Any] = Field(default_factory=dict)
```
### Database Schema
#### Robots Table
```sql
CREATE TABLE robots (
id VARCHAR(255) PRIMARY KEY,
name VARCHAR(255) NOT NULL,
type VARCHAR(50) NOT NULL,
status VARCHAR(50) NOT NULL DEFAULT 'offline',
position_x DOUBLE PRECISION NOT NULL DEFAULT 0.0,
position_y DOUBLE PRECISION NOT NULL DEFAULT 0.0,
position_z DOUBLE PRECISION NOT NULL DEFAULT 0.0,
orientation_roll DOUBLE PRECISION NOT NULL DEFAULT 0.0,
orientation_pitch DOUBLE PRECISION NOT NULL DEFAULT 0.0,
orientation_yaw DOUBLE PRECISION NOT NULL DEFAULT 0.0,
velocity_linear DOUBLE PRECISION NOT NULL DEFAULT 0.0,
velocity_angular DOUBLE PRECISION NOT NULL DEFAULT 0.0,
battery INTEGER NOT NULL DEFAULT 100,
sensor_data JSONB DEFAULT '{}',
last_update TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
uptime DOUBLE PRECISION NOT NULL DEFAULT 0.0,
metadata JSONB DEFAULT '{}',
created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
updated_at TIMESTAMP WITH TIME ZONE DEFAULT NOW()
);
-- Indexes for performance
CREATE INDEX idx_robots_status ON robots(status);
CREATE INDEX idx_robots_type ON robots(type);
CREATE INDEX idx_robots_last_update ON robots(last_update);
CREATE INDEX idx_robots_position ON robots USING gist (point(position_x, position_y));
```
#### Commands History Table
```sql
CREATE TABLE command_history (
id SERIAL PRIMARY KEY,
robot_id VARCHAR(255) NOT NULL REFERENCES robots(id),
command VARCHAR(255) NOT NULL,
parameters JSONB DEFAULT '{}',
result JSONB DEFAULT '{}',
executed_by VARCHAR(255),
executed_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
execution_time DOUBLE PRECISION,
success BOOLEAN NOT NULL DEFAULT TRUE,
error_message TEXT
);
-- Indexes for analytics
CREATE INDEX idx_commands_robot_id ON command_history(robot_id);
CREATE INDEX idx_commands_executed_at ON command_history(executed_at);
CREATE INDEX idx_commands_success ON command_history(success);
```
### Data Flow Architecture
```
User Input
β
Input Validation
β
Command Processing
β
State Update
β
Database Persistence
β
WebSocket Broadcasting
β
UI Updates
β
User Feedback
```
---
## π Security Architecture
### Authentication & Authorization
#### JWT Token Flow
```python
from datetime import datetime, timedelta
from jose import JWTError, jwt
from passlib.context import CryptContext
class AuthService:
def __init__(self, secret_key: str, algorithm: str = "HS256"):
self.secret_key = secret_key
self.algorithm = algorithm
self.pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
def create_access_token(self, data: dict, expires_delta: timedelta = None):
to_encode = data.copy()
if expires_delta:
expire = datetime.utcnow() + expires_delta
else:
expire = datetime.utcnow() + timedelta(minutes=15)
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, self.secret_key, algorithm=self.algorithm)
return encoded_jwt
def verify_token(self, token: str):
try:
payload = jwt.decode(token, self.secret_key, algorithms=[self.algorithm])
username: str = payload.get("sub")
if username is None:
return None
return username
except JWTError:
return None
def hash_password(self, password: str) -> str:
return self.pwd_context.hash(password)
def verify_password(self, plain_password: str, hashed_password: str) -> bool:
return self.pwd_context.verify(plain_password, hashed_password)
```
#### Role-Based Access Control
```python
from enum import Enum
from typing import List
class UserRole(str, Enum):
ADMIN = "admin"
OPERATOR = "operator"
VIEWER = "viewer"
GUEST = "guest"
class Permission(Enum):
ROBOT_CONTROL = "robot:control"
ROBOT_VIEW = "robot:view"
SENSOR_READ = "sensor:read"
ENVIRONMENT_MODIFY = "environment:modify"
SYSTEM_ADMIN = "system:admin"
ROLE_PERMISSIONS = {
UserRole.ADMIN: [
Permission.ROBOT_CONTROL,
Permission.ROBOT_VIEW,
Permission.SENSOR_READ,
Permission.ENVIRONMENT_MODIFY,
Permission.SYSTEM_ADMIN
],
UserRole.OPERATOR: [
Permission.ROBOT_CONTROL,
Permission.ROBOT_VIEW,
Permission.SENSOR_READ,
Permission.ENVIRONMENT_MODIFY
],
UserRole.VIEWER: [
Permission.ROBOT_VIEW,
Permission.SENSOR_READ
],
UserRole.GUEST: [
Permission.ROBOT_VIEW
]
}
def check_permission(user_role: UserRole, required_permission: Permission) -> bool:
"""Check if user role has required permission"""
return required_permission in ROLE_PERMISSIONS.get(user_role, [])
```
### Security Measures
#### WebSocket Security
```python
# Origin validation
ALLOWED_ORIGINS = [
"http://localhost:3000",
"https://robotics-webapp.com",
"https://*.robotics-webapp.com"
]
def validate_origin(origin: str) -> bool:
"""Validate WebSocket connection origin"""
from urllib.parse import urlparse
try:
parsed = urlparse(origin)
hostname = parsed.hostname
for allowed in ALLOWED_ORIGINS:
if allowed.startswith("https://*."):
# Wildcard domain matching
allowed_domain = allowed[12:] # Remove https://*.
if hostname and hostname.endswith(allowed_domain):
return True
elif allowed == origin:
return True
return False
except:
return False
# Rate limiting
from collections import defaultdict
import time
class RateLimiter:
def __init__(self, max_requests: int = 100, window_seconds: int = 60):
self.max_requests = max_requests
self.window_seconds = window_seconds
self.requests = defaultdict(list)
def is_allowed(self, client_id: str) -> bool:
now = time.time()
client_requests = self.requests[client_id]
# Remove old requests outside the window
client_requests[:] = [req for req in client_requests if now - req < self.window_seconds]
if len(client_requests) >= self.max_requests:
return False
client_requests.append(now)
return True
```
---
## π Deployment Architecture
### Containerization Strategy
#### Dockerfile (Frontend)
```dockerfile
# Multi-stage build for Next.js
FROM node:18-alpine AS base
# Install dependencies
FROM base AS deps
WORKDIR /app
COPY package.json package-lock.json ./
RUN npm ci --only=production
# Build application
FROM base AS builder
WORKDIR /app
COPY --from=deps /app/node_modules ./node_modules
COPY . .
RUN npm run build
# Production image
FROM base AS runner
WORKDIR /app
ENV NODE_ENV production
# Create non-root user
RUN addgroup --system --gid 1001 nodejs
RUN adduser --system --uid 1001 nextjs
COPY --from=builder /app/public ./public
COPY --from=builder --chown=nextjs:nodejs /app/.next/standalone ./
COPY --from=builder --chown=nextjs:nodejs /app/.next/static ./.next/static
USER nextjs
EXPOSE 3000
ENV PORT 3000
CMD ["node", "server.js"]
```
#### Dockerfile (Backend)
```dockerfile
FROM python:3.11-slim
# Set environment variables
ENV PYTHONDONTWRITEBYTECODE 1
ENV PYTHONUNBUFFERED 1
ENV PYTHONPATH /app
# Install system dependencies
RUN apt-get update \
&& apt-get install -y --no-install-recommends \
gcc \
libc6-dev \
&& rm -rf /var/lib/apt/lists/*
# Create app user
RUN groupadd -r appuser && useradd -r -g appuser appuser
# Create app directory
WORKDIR /app
# Install Python dependencies
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# Copy application code
COPY --chown=appuser:appuser . .
# Switch to non-root user
USER appuser
# Health check
HEALTHCHECK --interval=30s --timeout=30s --start-period=5s --retries=3 \
CMD python -c "import requests; requests.get('http://localhost:8354/health')"
EXPOSE 8354
CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8354"]
```
### Kubernetes Deployment
#### Frontend Deployment
```yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: robotics-webapp-frontend
spec:
replicas: 3
selector:
matchLabels:
app: robotics-webapp-frontend
template:
metadata:
labels:
app: robotics-webapp-frontend
spec:
containers:
- name: frontend
image: robotics-webapp/frontend:latest
ports:
- containerPort: 3000
env:
- name: NEXT_PUBLIC_API_URL
value: "http://robotics-webapp-backend:8354"
resources:
requests:
memory: "128Mi"
cpu: "100m"
limits:
memory: "512Mi"
cpu: "500m"
livenessProbe:
httpGet:
path: /api/health
port: 3000
initialDelaySeconds: 30
periodSeconds: 10
readinessProbe:
httpGet:
path: /api/health
port: 3000
initialDelaySeconds: 5
periodSeconds: 5
```
#### Backend Deployment
```yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: robotics-webapp-backend
spec:
replicas: 5
selector:
matchLabels:
app: robotics-webapp-backend
template:
metadata:
labels:
app: robotics-webapp-backend
spec:
containers:
- name: backend
image: robotics-webapp/backend:latest
ports:
- containerPort: 8354
env:
- name: DATABASE_URL
valueFrom:
secretKeyRef:
name: robotics-db-secret
key: url
- name: SECRET_KEY
valueFrom:
secretKeyRef:
name: robotics-secret
key: secret-key
resources:
requests:
memory: "256Mi"
cpu: "200m"
limits:
memory: "1Gi"
cpu: "1000m"
livenessProbe:
httpGet:
path: /health
port: 8354
initialDelaySeconds: 30
periodSeconds: 10
readinessProbe:
httpGet:
path: /health
port: 8354
initialDelaySeconds: 5
periodSeconds: 5
```
### Service Mesh Configuration
#### Ingress Configuration
```yaml
apiVersion: networking.k8s.io/v1
kind: Ingress
metadata:
name: robotics-webapp-ingress
annotations:
nginx.ingress.kubernetes.io/ssl-redirect: "true"
nginx.ingress.kubernetes.io/force-ssl-redirect: "true"
cert-manager.io/cluster-issuer: "letsencrypt-prod"
spec:
ingressClassName: nginx
tls:
- hosts:
- robotics-webapp.com
- api.robotics-webapp.com
secretName: robotics-tls
rules:
- host: robotics-webapp.com
http:
paths:
- path: /
pathType: Prefix
backend:
service:
name: robotics-webapp-frontend
port:
number: 3000
- host: api.robotics-webapp.com
http:
paths:
- path: /
pathType: Prefix
backend:
service:
name: robotics-webapp-backend
port:
number: 8354
```
---
## β‘ Performance Considerations
### Frontend Performance
#### Bundle Optimization
```javascript
// next.config.js
module.exports = {
experimental: {
optimizePackageImports: ['lucide-react', '@radix-ui/react-icons']
},
compiler: {
removeConsole: process.env.NODE_ENV === 'production'
},
images: {
formats: ['image/webp', 'image/avif']
}
}
```
#### Caching Strategy
```typescript
// Service Worker for offline functionality
// public/sw.js
const CACHE_NAME = 'robotics-webapp-v1';
self.addEventListener('install', (event) => {
event.waitUntil(
caches.open(CACHE_NAME).then((cache) => {
return cache.addAll([
'/',
'/static/js/bundle.js',
'/static/css/main.css',
'/manifest.json'
]);
})
);
});
self.addEventListener('fetch', (event) => {
event.respondWith(
caches.match(event.request).then((response) => {
return response || fetch(event.request);
})
);
});
```
### Backend Performance
#### Connection Pooling
```python
# Database connection pooling
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine
from sqlalchemy.orm import sessionmaker
DATABASE_URL = "postgresql+asyncpg://user:password@localhost/robotics"
engine = create_async_engine(
DATABASE_URL,
pool_size=20,
max_overflow=30,
pool_timeout=30,
pool_recycle=1800,
echo=False
)
async_session = sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)
```
#### API Response Caching
```python
from fastapi_cache import FastAPICache
from fastapi_cache.backends.redis import RedisBackend
from redis.asyncio import Redis
@asynccontextmanager
async def lifespan(app: FastAPI):
# Initialize Redis cache
redis = Redis(host='localhost', port=6379, db=0)
FastAPICache.init(RedisBackend(redis), prefix="robotics-cache")
yield
# Cleanup
await redis.close()
# Cached endpoint
from fastapi_cache.decorator import cache
@app.get("/api/robots")
@cache(expire=30) # Cache for 30 seconds
async def get_robots():
return await robot_service.get_all_robots()
```
---
## π Scalability Design
### Horizontal Scaling Strategy
#### Load Balancing
```python
# Backend load balancing with multiple instances
upstream robotics_backend {
least_conn;
server backend-1:8354;
server backend-2:8354;
server backend-3:8354;
server backend-4:8354;
}
server {
listen 80;
server_name api.robotics-webapp.com;
location / {
proxy_pass http://robotics_backend;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header X-Forwarded-Proto $scheme;
# WebSocket support
proxy_http_version 1.1;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection "upgrade";
}
}
```
#### Database Sharding
```python
class DatabaseSharding:
def __init__(self, shard_count: int = 4):
self.shard_count = shard_count
self.shards = {}
for i in range(shard_count):
self.shards[i] = create_async_engine(
f"postgresql+asyncpg://user:password@shard-{i}/robotics"
)
def get_shard(self, robot_id: str) -> int:
"""Determine shard for robot ID using consistent hashing"""
import hashlib
hash_value = int(hashlib.md5(robot_id.encode()).hexdigest(), 16)
return hash_value % self.shard_count
async def get_robot(self, robot_id: str):
shard_id = self.get_shard(robot_id)
async with self.shards[shard_id].connect() as conn:
result = await conn.execute(
"SELECT * FROM robots WHERE id = $1", robot_id
)
return result.fetchone()
```
### Microservices Architecture
#### Service Decomposition
```
robotics-platform/
βββ user-service/ # User management and authentication
βββ robot-service/ # Robot state management and control
βββ sensor-service/ # Sensor data processing and analytics
βββ environment-service/ # World Labs integration and environment management
βββ vr-service/ # VR platform integrations (VRChat, Resonite)
βββ api-gateway/ # Unified API entry point
βββ websocket-service/ # Real-time communication service
βββ monitoring-service/ # System monitoring and alerting
```
#### Service Communication
```python
# Inter-service communication with gRPC
from grpc.experimental.aio import init_grpc_aio
import robot_service_pb2
import robot_service_pb2_grpc
class RobotServiceClient:
def __init__(self, host: str = "robot-service:50051"):
self.channel = grpc.aio.insecure_channel(host)
self.stub = robot_service_pb2_grpc.RobotServiceStub(self.channel)
async def get_robot_status(self, robot_id: str) -> dict:
request = robot_service_pb2.GetRobotStatusRequest(robot_id=robot_id)
response = await self.stub.GetRobotStatus(request)
return {
"robot_id": response.robot_id,
"status": response.status,
"position": {
"x": response.position.x,
"y": response.position.y,
"z": response.position.z
},
"battery": response.battery
}
# Message queue for async communication
from aio_pika import connect_robust, Message
class MessageQueue:
def __init__(self, amqp_url: str):
self.amqp_url = amqp_url
self.connection = None
self.channel = None
async def connect(self):
self.connection = await connect_robust(self.amqp_url)
self.channel = await self.connection.channel()
async def publish_command(self, robot_id: str, command: dict):
"""Publish robot command to message queue"""
message_body = json.dumps({
"robot_id": robot_id,
"command": command,
"timestamp": datetime.now().isoformat()
}).encode()
message = Message(body=message_body)
await self.channel.default_exchange.publish(
message,
routing_key=f"robot.{robot_id}.command"
)
```
### Monitoring and Observability
#### Metrics Collection
```python
from prometheus_client import Counter, Histogram, Gauge
import time
# Define metrics
REQUEST_COUNT = Counter('http_requests_total', 'Total HTTP requests', ['method', 'endpoint', 'status'])
REQUEST_LATENCY = Histogram('http_request_duration_seconds', 'HTTP request latency', ['method', 'endpoint'])
ACTIVE_CONNECTIONS = Gauge('websocket_active_connections', 'Number of active WebSocket connections')
ROBOT_COUNT = Gauge('robots_total', 'Total number of robots', ['status'])
COMMAND_EXECUTION_TIME = Histogram('robot_command_execution_seconds', 'Robot command execution time', ['command_type'])
class MetricsMiddleware:
def __init__(self, app):
self.app = app
async def __call__(self, scope, receive, send):
if scope["type"] != "http":
return await self.app(scope, receive, send)
start_time = time.time()
async def send_wrapper(message):
if message["type"] == "http.response.start":
status_code = message["status"]
REQUEST_COUNT.labels(
method=scope["method"],
endpoint=scope["path"],
status=status_code
).inc()
await send(message)
await self.app(scope, receive, send_wrapper)
REQUEST_LATENCY.labels(
method=scope["method"],
endpoint=scope["path"]
).observe(time.time() - start_time)
```
This architecture document provides a comprehensive overview of the Robotics MCP WebApp's technical design, from high-level system organization to low-level implementation details. The modular, scalable architecture ensures the platform can grow with increasing user demands while maintaining performance and reliability.