Skip to main content
Glama

UnrealBlueprintMCP

by BestDev
connection_manager.py7.35 kB
""" Unreal Connection Manager for WebSocket Bridge 언리얼 엔진 플러그인과의 WebSocket 연결을 관리하는 모듈 """ import asyncio import json import uuid import logging from typing import Dict, Optional, Any, Union logger = logging.getLogger(__name__) class UnrealConnectionManager: """ 언리얼 엔진 플러그인과의 WebSocket 연결을 관리하는 싱글톤 클래스 기능: - WebSocket 연결 관리 - JSON-RPC 2.0 메시지 송수신 - 요청-응답 매칭 (asyncio.Future 사용) - 기존 언리얼 플러그인과 100% 호환 """ _instance = None _lock = asyncio.Lock() def __new__(cls): if cls._instance is None: cls._instance = super().__new__(cls) return cls._instance def __init__(self): if hasattr(self, '_initialized'): return self.unreal_client: Optional[Any] = None # WebSocket 또는 WebSocketWrapper self.pending_responses: Dict[str, asyncio.Future] = {} self.request_counter = 0 self._initialized = True logger.info("UnrealConnectionManager initialized") def generate_request_id(self) -> str: """ 기존 언리얼 플러그인과 호환되는 요청 ID 생성 형식: req_{counter}_{uuid8} """ self.request_counter += 1 uuid_part = uuid.uuid4().hex[:8] request_id = f"req_{self.request_counter}_{uuid_part}" logger.debug(f"Generated request ID: {request_id}") return request_id async def connect(self, websocket) -> None: """ 새로운 언리얼 클라이언트 연결 등록 Args: websocket: 연결된 WebSocket 인스턴스 (Starlette WebSocket 또는 WebSocketWrapper) """ async with self._lock: if self.unreal_client is not None: logger.warning("Previous Unreal client connection exists, replacing...") await self.disconnect() self.unreal_client = websocket logger.info("Unreal client connected successfully") async def disconnect(self) -> None: """ 언리얼 클라이언트 연결 해제 및 정리 """ async with self._lock: if self.unreal_client is not None: try: await self.unreal_client.close() except Exception as e: logger.warning(f"Error closing WebSocket: {e}") self.unreal_client = None logger.info("Unreal client disconnected") # 모든 대기 중인 요청에 연결 해제 오류 설정 for request_id, future in self.pending_responses.items(): if not future.done(): future.set_exception(ConnectionError("WebSocket connection lost")) logger.debug(f"Request {request_id} cancelled due to disconnection") self.pending_responses.clear() def is_connected(self) -> bool: """ 연결 상태 확인 Returns: bool: 연결되어 있으면 True, 아니면 False """ return self.unreal_client is not None async def send_to_unreal(self, method: str, params: dict, timeout: float = 30.0) -> dict: """ 언리얼 엔진으로 JSON-RPC 2.0 메시지 전송 및 응답 대기 Args: method: 호출할 메서드 이름 params: 메서드 파라미터 timeout: 응답 대기 시간 (초) Returns: dict: 언리얼에서 받은 응답 데이터 Raises: ConnectionError: 연결되지 않은 경우 TimeoutError: 응답 시간 초과 RuntimeError: JSON-RPC 에러 응답 """ if not self.is_connected(): raise ConnectionError("No Unreal Engine client is connected") request_id = self.generate_request_id() # JSON-RPC 2.0 메시지 생성 (기존 플러그인 호환) message = { "jsonrpc": "2.0", "id": request_id, "method": method, "params": params } # 응답 대기를 위한 Future 생성 future = asyncio.get_running_loop().create_future() self.pending_responses[request_id] = future try: # WebSocket으로 메시지 전송 message_str = json.dumps(message) await self.unreal_client.send_text(message_str) logger.info(f"Sent to Unreal: {method} (ID: {request_id})") logger.debug(f"Message content: {message_str}") # 응답 대기 response = await asyncio.wait_for(future, timeout=timeout) logger.info(f"Received response for {method} (ID: {request_id})") logger.debug(f"Response content: {response}") return response except asyncio.TimeoutError: logger.error(f"Timeout waiting for response to {method} (ID: {request_id})") raise TimeoutError(f"No response from Unreal within {timeout} seconds for method '{method}'") except Exception as e: logger.error(f"Error sending to Unreal: {e}") raise finally: # Future 정리 self.pending_responses.pop(request_id, None) async def handle_unreal_response(self, message: dict) -> None: """ 언리얼에서 받은 응답 메시지 처리 Args: message: 언리얼에서 받은 JSON 메시지 """ message_id = message.get("id") if message_id is None: # ID가 없는 경우 알림(notification) 메시지 await self._handle_notification(message) return # 응답 메시지 처리 if message_id in self.pending_responses: future = self.pending_responses[message_id] if not future.done(): if "error" in message: # JSON-RPC 에러 응답 error = message["error"] error_msg = f"Unreal RPC Error {error.get('code', 'N/A')}: {error.get('message', 'Unknown error')}" logger.error(f"Unreal error for request {message_id}: {error_msg}") future.set_exception(RuntimeError(error_msg)) else: # 정상 응답 result = message.get("result", {}) logger.debug(f"Setting result for request {message_id}: {result}") future.set_result(result) else: logger.warning(f"Received response for unknown request ID: {message_id}") async def _handle_notification(self, message: dict) -> None: """ 언리얼에서 받은 알림 메시지 처리 Args: message: 알림 메시지 (ID가 없는 메시지) """ method = message.get("method", "unknown") params = message.get("params", {}) logger.info(f"Received notification from Unreal: {method}") logger.debug(f"Notification params: {params}") # 알림 메시지 처리 로직 확장 가능 # 예: 연결 상태 변경, 에디터 상태 업데이트 등 # 전역 인스턴스 (싱글톤) connection_manager = UnrealConnectionManager()

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/BestDev/unreal_bp_mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server