import logging
import asyncio
from typing import Dict, Any, Optional, Callable, List
from mcp import StdioServerParameters
from mcp.client.stdio import stdio_client
from mcp.client.session import ClientSession
logger = logging.getLogger(__name__)
class OSCBridge:
"""
Unified OSC communication layer for virtual robotics.
Handles address mapping and state synchronization for Unity, VRChat, and Resonite.
"""
# Standard OSC Address Patterns
ADDRESS_PATTERNS = {
"unity": "/unity/robot/{robot_type}/{param}",
"vrchat": "/avatar/parameters/{robot_type}_{param}",
"resonite": "/resonite/avatar/{robot_type}_{param}", # Simplified for Resonite
}
def __init__(self):
self._session: Optional[ClientSession] = None
self._subscriptions: Dict[str, List[Callable]] = {}
self._connected = False
async def connect(self):
"""Connect to the local osc-mcp server"""
# In a real implementation we would connect to the osc-mcp
# For now we'll stub this since we're inside robotics-mcp
logger.info("Connecting to OSC Bridge...")
self._connected = True
async def send_command(self, platform: str, robot_type: str, param: str, value: Any):
"""
Send a command to a specific platform via OSC
Args:
platform: 'unity', 'vrchat', or 'resonite'
robot_type: 'scout', 'go2', 'g1'
param: Parameter name (e.g., 'velocity', 'battery')
value: Value to send
"""
if not self._connected:
await self.connect()
address = self._get_address(platform, robot_type, param)
logger.debug(f"Sending OSC to {platform}: {address} = {value}")
# Here we would call the osc-mcp tool
# await self._session.call_tool("osc-mcp", "send_message", address=address, value=value)
return True
def _get_address(self, platform: str, robot_type: str, param: str) -> str:
"""Construct OSC address based on platform patterns"""
pattern = self.ADDRESS_PATTERNS.get(platform)
if not pattern:
raise ValueError(f"Unsupported platform: {platform}")
return pattern.format(robot_type=robot_type, param=param)
async def send_message(self, address: str, args: List[Any]) -> bool:
"""
Send a raw OSC message to a specific address.
Args:
address: OSC address
args: List of arguments
"""
if not self._connected:
await self.connect()
logger.debug(f"Sending OSC message: {address} = {args}")
# In a real implementation we would call the osc-mcp tool here
return True
async def subscribe(self, robot_id: str, callback: Callable):
"""Subscribe to state changes for a virtual robot"""
if robot_id not in self._subscriptions:
self._subscriptions[robot_id] = []
self._subscriptions[robot_id].append(callback)
# Singleton instance
osc_bridge = OSCBridge()