YetAnotherUnityMcp
by Azreal42
- YetAnotherUnityMcp
- server
"""
DEPRECATED: Connection manager for WebSocket/TCP connections.
This module contains two connection managers:
1. ConnectionManager - A server-side FastAPI WebSocket connection manager (deprecated)
2. UnityConnectionManager - A client-side connection manager for Unity TCP connections
The server-side ConnectionManager class is no longer used and will be removed in a future version.
Only the UnityConnectionManager is actively maintained.
"""
import warnings
# Show deprecation warning for ConnectionManager
warnings.warn(
"ConnectionManager class is deprecated and will be removed in a future version. "
"Only UnityConnectionManager should be used.",
DeprecationWarning,
stacklevel=2
)
import asyncio
import logging
import time
from typing import List, Optional, Callable, Dict, Any
from fastapi import WebSocket
from server.unity_socket_client import get_client
logger = logging.getLogger("mcp_server")
# Server-side connection manager for FastAPI WebSockets (DEPRECATED)
class ConnectionManager:
"""
DEPRECATED: Server-side connection manager for FastAPI WebSockets.
This class is no longer used in the current architecture and will be removed in a future version.
It's maintained here for backward compatibility only.
"""
def __init__(self) -> None:
warnings.warn(
"ConnectionManager is deprecated and unused in the current architecture.",
DeprecationWarning,
stacklevel=2
)
self.active_connections: List[WebSocket] = []
async def connect(self, websocket: WebSocket) -> None:
"""DEPRECATED: Accept a WebSocket connection."""
try:
await websocket.accept()
self.active_connections.append(websocket)
logger.info(f"WebSocket connected. Active connections: {len(self.active_connections)}")
# Log the client details
client = websocket.client
logger.info(f"Client connected from: {client.host}:{client.port}")
except Exception as e:
logger.error(f"Error accepting WebSocket connection: {str(e)}")
def disconnect(self, websocket: WebSocket) -> None:
"""DEPRECATED: Disconnect a WebSocket."""
try:
if websocket in self.active_connections:
self.active_connections.remove(websocket)
# Try to get client info before disconnection
client_info = "Unknown"
try:
if hasattr(websocket, 'client') and websocket.client:
client_info = f"{websocket.client.host}:{websocket.client.port}"
except:
pass
logger.info(f"WebSocket disconnected from {client_info}. Active connections: {len(self.active_connections)}")
else:
logger.warning("Attempted to disconnect a WebSocket that was not in active_connections")
except Exception as e:
logger.error(f"Error during WebSocket disconnection: {str(e)}")
async def send_message(self, websocket: WebSocket, message: str) -> None:
"""DEPRECATED: Send a message to a WebSocket client."""
try:
# Add timing information
start_time = time.time()
# Log message size
logger.debug(f"Sending message of length {len(message)} to client")
# Send the message
await websocket.send_text(message)
# Log the time it took to send
elapsed = time.time() - start_time
if elapsed > 0.5: # Log if it took more than 500ms
logger.warning(f"Slow message send: {elapsed:.2f}s for {len(message)} bytes")
except Exception as e:
logger.error(f"Error sending message: {str(e)}")
async def broadcast(self, message: str) -> None:
"""DEPRECATED: Broadcast a message to all connected WebSocket clients."""
if not self.active_connections:
logger.warning("Attempted to broadcast message but no active connections exist")
return
logger.info(f"Broadcasting message of length {len(message)} to {len(self.active_connections)} clients")
failed_connections = []
for connection in self.active_connections:
try:
await connection.send_text(message)
except Exception as e:
logger.error(f"Error broadcasting to client: {str(e)}")
failed_connections.append(connection)
# Remove any connections that failed
for failed in failed_connections:
if failed in self.active_connections:
self.active_connections.remove(failed)
logger.warning("Removed failed connection from active_connections")
# Client-side connection manager for Unity TCP client
class UnityConnectionManager:
"""
Connection manager that handles automatic reconnection to Unity
"""
def __init__(self,
reconnect_attempts: int = 5,
reconnect_delay: float = 2.0,
auto_reconnect: bool = True):
"""
Initialize the connection manager.
Args:
reconnect_attempts: Maximum number of reconnect attempts
reconnect_delay: Delay between reconnection attempts in seconds
auto_reconnect: Whether to automatically reconnect on disconnect
"""
self.client = get_client()
self.reconnect_attempts = reconnect_attempts
self.reconnect_delay = reconnect_delay
self.auto_reconnect = auto_reconnect
self.reconnect_task: Optional[asyncio.Task] = None
self.is_reconnecting = False
self.connection_listeners: List[Callable] = []
self.disconnection_listeners: List[Callable] = []
# Register event handlers
self.client.on("disconnected", self._handle_disconnect)
async def connect(self, url: str = "tcp://localhost:8080/") -> bool:
"""
Connect to Unity TCP server with automatic reconnection.
Args:
url: TCP server URL (tcp://host:port/)
Returns:
True if connected successfully, False otherwise
"""
if self.client.connected:
logger.info("Already connected to Unity")
return True
logger.info(f"Connecting to Unity at {url}...")
try:
result = await self.client.connect()
if result:
logger.info("Connected to Unity successfully")
# Notify connection listeners
await self._notify_connection_listeners()
return True
else:
logger.error("Failed to connect to Unity")
return False
except Exception as e:
logger.error(f"Error connecting to Unity: {str(e)}")
return False
async def disconnect(self) -> None:
"""
Disconnect from Unity TCP server.
"""
if not self.client.connected:
logger.info("Not connected to Unity")
return
logger.info("Disconnecting from Unity...")
# Cancel any reconnection task
if self.reconnect_task and not self.reconnect_task.done():
self.reconnect_task.cancel()
self.reconnect_task = None
# Disconnect the client
await self.client.disconnect()
# Notify disconnection listeners
await self._notify_disconnection_listeners()
async def execute_with_reconnect(self, operation: Callable, *args, **kwargs) -> Any:
"""
Execute an operation with automatic reconnection if disconnected.
Args:
operation: Async function to execute
*args: Arguments to pass to the operation
**kwargs: Keyword arguments to pass to the operation
Returns:
Result of the operation
"""
if not self.client.connected:
logger.info("Not connected to Unity, attempting to reconnect...")
connected = await self.reconnect()
if not connected:
raise Exception("Not connected to Unity and reconnection failed")
try:
return await operation(*args, **kwargs)
except Exception as e:
# Check if this is a connection-related error
if "Not connected" in str(e):
logger.warning(f"Connection error during operation: {str(e)}")
connected = await self.reconnect()
if connected:
# Retry the operation
logger.info("Retrying operation after successful reconnection")
return await operation(*args, **kwargs)
else:
raise Exception(f"Operation failed and reconnection failed: {str(e)}")
else:
# Not a connection error, re-raise
raise
async def reconnect(self) -> bool:
"""
Attempt to reconnect to Unity with multiple attempts.
Returns:
True if reconnected successfully, False otherwise
"""
if self.client.connected:
logger.info("Already connected to Unity")
return True
if self.is_reconnecting:
logger.info("Reconnection already in progress")
# Wait for the existing reconnection to complete
if self.reconnect_task and not self.reconnect_task.done():
try:
return await self.reconnect_task
except Exception:
return False
return False
self.is_reconnecting = True
try:
logger.info(f"Attempting to reconnect to Unity (max {self.reconnect_attempts} attempts)")
for attempt in range(1, self.reconnect_attempts + 1):
logger.info(f"Reconnection attempt {attempt}/{self.reconnect_attempts}")
try:
result = await self.client.connect()
if result:
logger.info("Reconnected to Unity successfully")
# Notify connection listeners
await self._notify_connection_listeners()
return True
except Exception as e:
logger.error(f"Error during reconnection attempt {attempt}: {str(e)}")
if attempt < self.reconnect_attempts:
# Wait before next attempt with exponential backoff
delay = self.reconnect_delay * (2 ** (attempt - 1))
logger.info(f"Waiting {delay:.1f} seconds before next reconnection attempt")
await asyncio.sleep(delay)
logger.error(f"Failed to reconnect to Unity after {self.reconnect_attempts} attempts")
return False
finally:
self.is_reconnecting = False
async def _handle_disconnect(self) -> None:
"""
Handle disconnection event from Unity.
"""
logger.info("Disconnected from Unity")
# Notify disconnection listeners
await self._notify_disconnection_listeners()
# Start automatic reconnection if enabled
if self.auto_reconnect and not self.is_reconnecting:
logger.info("Starting automatic reconnection task")
self.reconnect_task = asyncio.create_task(self._auto_reconnect())
async def _auto_reconnect(self) -> bool:
"""
Automatic reconnection task.
Returns:
True if reconnected successfully, False otherwise
"""
# Add a small delay before reconnecting to avoid rapid reconnection attempts
await asyncio.sleep(1.0)
return await self.reconnect()
def add_connection_listener(self, listener: Callable) -> None:
"""
Add a listener that will be called when connection is established.
Args:
listener: Async function to call on connection
"""
if listener not in self.connection_listeners:
self.connection_listeners.append(listener)
def remove_connection_listener(self, listener: Callable) -> None:
"""
Remove a connection listener.
Args:
listener: Listener to remove
"""
if listener in self.connection_listeners:
self.connection_listeners.remove(listener)
def add_disconnection_listener(self, listener: Callable) -> None:
"""
Add a listener that will be called when disconnected.
Args:
listener: Async function to call on disconnection
"""
if listener not in self.disconnection_listeners:
self.disconnection_listeners.append(listener)
def remove_disconnection_listener(self, listener: Callable) -> None:
"""
Remove a disconnection listener.
Args:
listener: Listener to remove
"""
if listener in self.disconnection_listeners:
self.disconnection_listeners.remove(listener)
async def _notify_connection_listeners(self) -> None:
"""
Notify all connection listeners.
"""
for listener in self.connection_listeners:
try:
await listener()
except Exception as e:
logger.error(f"Error in connection listener: {str(e)}")
async def _notify_disconnection_listeners(self) -> None:
"""
Notify all disconnection listeners.
"""
for listener in self.disconnection_listeners:
try:
await listener()
except Exception as e:
logger.error(f"Error in disconnection listener: {str(e)}")
# Singleton instance for easy access
_unity_connection_manager: Optional[UnityConnectionManager] = None
def get_unity_connection_manager() -> UnityConnectionManager:
"""
Get the Unity connection manager instance.
Returns:
Unity connection manager instance
"""
global _unity_connection_manager
if _unity_connection_manager is None:
_unity_connection_manager = UnityConnectionManager()
return _unity_connection_manager