connection_manager.py•7.35 kB
"""
Unreal Connection Manager for WebSocket Bridge
언리얼 엔진 플러그인과의 WebSocket 연결을 관리하는 모듈
"""
import asyncio
import json
import uuid
import logging
from typing import Dict, Optional, Any, Union
logger = logging.getLogger(__name__)
class UnrealConnectionManager:
"""
언리얼 엔진 플러그인과의 WebSocket 연결을 관리하는 싱글톤 클래스
기능:
- WebSocket 연결 관리
- JSON-RPC 2.0 메시지 송수신
- 요청-응답 매칭 (asyncio.Future 사용)
- 기존 언리얼 플러그인과 100% 호환
"""
_instance = None
_lock = asyncio.Lock()
def __new__(cls):
if cls._instance is None:
cls._instance = super().__new__(cls)
return cls._instance
def __init__(self):
if hasattr(self, '_initialized'):
return
self.unreal_client: Optional[Any] = None # WebSocket 또는 WebSocketWrapper
self.pending_responses: Dict[str, asyncio.Future] = {}
self.request_counter = 0
self._initialized = True
logger.info("UnrealConnectionManager initialized")
def generate_request_id(self) -> str:
"""
기존 언리얼 플러그인과 호환되는 요청 ID 생성
형식: req_{counter}_{uuid8}
"""
self.request_counter += 1
uuid_part = uuid.uuid4().hex[:8]
request_id = f"req_{self.request_counter}_{uuid_part}"
logger.debug(f"Generated request ID: {request_id}")
return request_id
async def connect(self, websocket) -> None:
"""
새로운 언리얼 클라이언트 연결 등록
Args:
websocket: 연결된 WebSocket 인스턴스 (Starlette WebSocket 또는 WebSocketWrapper)
"""
async with self._lock:
if self.unreal_client is not None:
logger.warning("Previous Unreal client connection exists, replacing...")
await self.disconnect()
self.unreal_client = websocket
logger.info("Unreal client connected successfully")
async def disconnect(self) -> None:
"""
언리얼 클라이언트 연결 해제 및 정리
"""
async with self._lock:
if self.unreal_client is not None:
try:
await self.unreal_client.close()
except Exception as e:
logger.warning(f"Error closing WebSocket: {e}")
self.unreal_client = None
logger.info("Unreal client disconnected")
# 모든 대기 중인 요청에 연결 해제 오류 설정
for request_id, future in self.pending_responses.items():
if not future.done():
future.set_exception(ConnectionError("WebSocket connection lost"))
logger.debug(f"Request {request_id} cancelled due to disconnection")
self.pending_responses.clear()
def is_connected(self) -> bool:
"""
연결 상태 확인
Returns:
bool: 연결되어 있으면 True, 아니면 False
"""
return self.unreal_client is not None
async def send_to_unreal(self, method: str, params: dict, timeout: float = 30.0) -> dict:
"""
언리얼 엔진으로 JSON-RPC 2.0 메시지 전송 및 응답 대기
Args:
method: 호출할 메서드 이름
params: 메서드 파라미터
timeout: 응답 대기 시간 (초)
Returns:
dict: 언리얼에서 받은 응답 데이터
Raises:
ConnectionError: 연결되지 않은 경우
TimeoutError: 응답 시간 초과
RuntimeError: JSON-RPC 에러 응답
"""
if not self.is_connected():
raise ConnectionError("No Unreal Engine client is connected")
request_id = self.generate_request_id()
# JSON-RPC 2.0 메시지 생성 (기존 플러그인 호환)
message = {
"jsonrpc": "2.0",
"id": request_id,
"method": method,
"params": params
}
# 응답 대기를 위한 Future 생성
future = asyncio.get_running_loop().create_future()
self.pending_responses[request_id] = future
try:
# WebSocket으로 메시지 전송
message_str = json.dumps(message)
await self.unreal_client.send_text(message_str)
logger.info(f"Sent to Unreal: {method} (ID: {request_id})")
logger.debug(f"Message content: {message_str}")
# 응답 대기
response = await asyncio.wait_for(future, timeout=timeout)
logger.info(f"Received response for {method} (ID: {request_id})")
logger.debug(f"Response content: {response}")
return response
except asyncio.TimeoutError:
logger.error(f"Timeout waiting for response to {method} (ID: {request_id})")
raise TimeoutError(f"No response from Unreal within {timeout} seconds for method '{method}'")
except Exception as e:
logger.error(f"Error sending to Unreal: {e}")
raise
finally:
# Future 정리
self.pending_responses.pop(request_id, None)
async def handle_unreal_response(self, message: dict) -> None:
"""
언리얼에서 받은 응답 메시지 처리
Args:
message: 언리얼에서 받은 JSON 메시지
"""
message_id = message.get("id")
if message_id is None:
# ID가 없는 경우 알림(notification) 메시지
await self._handle_notification(message)
return
# 응답 메시지 처리
if message_id in self.pending_responses:
future = self.pending_responses[message_id]
if not future.done():
if "error" in message:
# JSON-RPC 에러 응답
error = message["error"]
error_msg = f"Unreal RPC Error {error.get('code', 'N/A')}: {error.get('message', 'Unknown error')}"
logger.error(f"Unreal error for request {message_id}: {error_msg}")
future.set_exception(RuntimeError(error_msg))
else:
# 정상 응답
result = message.get("result", {})
logger.debug(f"Setting result for request {message_id}: {result}")
future.set_result(result)
else:
logger.warning(f"Received response for unknown request ID: {message_id}")
async def _handle_notification(self, message: dict) -> None:
"""
언리얼에서 받은 알림 메시지 처리
Args:
message: 알림 메시지 (ID가 없는 메시지)
"""
method = message.get("method", "unknown")
params = message.get("params", {})
logger.info(f"Received notification from Unreal: {method}")
logger.debug(f"Notification params: {params}")
# 알림 메시지 처리 로직 확장 가능
# 예: 연결 상태 변경, 에디터 상태 업데이트 등
# 전역 인스턴스 (싱글톤)
connection_manager = UnrealConnectionManager()