MCP Unreal Server
by radial-hks
Verified
- MCP-Unreal-Server
- src
- mcp_server_unreal
import asyncio
import logging
import time
from typing import Dict, List, Optional, AsyncIterator, Any
from contextlib import asynccontextmanager
from collections import deque
from mcp.server.models import InitializationOptions
# Configure logging
logging.basicConfig(level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
_logger = logging.getLogger("UnrealMCPServer")
# 添加这一行来降低 mcp.server.lowlevel.server 的日志级别
# 初始设置为Error等级,导致在Cline中存在警告信息,故降低日志级别
logging.getLogger("mcp.server.lowlevel.server").setLevel(logging.WARNING)
# Configure file handler with more concise format for frequent operations
file_handler = logging.FileHandler('mcp_unreal.log')
file_handler.setLevel(logging.DEBUG)
file_handler.setFormatter(logging.Formatter('%(asctime)s.%(msecs)03d - %(levelname).1s - %(message)s', '%H:%M:%S'))
_logger.addHandler(file_handler)
# Configure console handler with detailed format
console_handler = logging.StreamHandler()
console_handler.setLevel(logging.INFO)
console_handler.setFormatter(logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s'))
_logger.addHandler(console_handler)
import mcp.types as types
from mcp.server import NotificationOptions, Server
from pydantic import AnyUrl
import mcp.server.stdio
from .remote_execution import RemoteExecution, RemoteExecutionConfig,MODE_EXEC_FILE,MODE_EXEC_STATEMENT,MODE_EVAL_STATEMENT
# 全局连接变量
_unreal_connection: Optional[RemoteExecution] = None
_node_monitor_task: Optional[asyncio.Task] = None
def get_unreal_connection(host: str = "239.0.0.1", port: int = 6766) -> RemoteExecution:
"""获取或创建持久化的Unreal连接"""
global _unreal_connection
# 如果已有连接,检查是否仍然有效
if _unreal_connection is not None:
try:
nodes = _unreal_connection.remote_nodes
return _unreal_connection
except Exception as e:
_logger.warning(f"现有连接已失效: {str(e)}")
try:
_unreal_connection.stop()
except:
pass
_unreal_connection = None
# 创建新连接
if _unreal_connection is None:
config = RemoteExecutionConfig()
config.multicast_group_endpoint = (host, port)
_unreal_connection = RemoteExecution(config)
_unreal_connection.start()
_logger.info("创建新的持久化Unreal连接")
return _unreal_connection
class McpUnrealServer:
def __init__(self, server_name: str, lifespan=None):
self.server = Server(server_name, lifespan=lifespan)
self.remote_execution = None
self.connected_nodes: Dict[str, dict] = {}
self._node_monitor_task = None
self._setup_handlers()
def _setup_handlers(self):
@self.server.list_resources()
async def handle_list_resources() -> list[types.Resource]:
"""列出可用的Unreal节点资源。"""
resources = []
if self.remote_execution:
for node in self.remote_execution.remote_nodes:
resources.append(
types.Resource(
uri=AnyUrl(f"unreal://{node['node_id']}"),
name=f"Unreal Instance: {node['node_id']}",
description="Unreal Engine实例",
mimeType="application/x-unreal",
)
)
return resources
@self.server.list_tools()
async def handle_list_tools() -> list[types.Tool]:
"""列出可用的工具。"""
return [
types.Tool(
name="execute-python",
description="在Unreal中执行Python代码",
inputSchema={
"type": "object",
"properties": {
"code": {"type": "string"},
"unattended": {"type": "boolean", "default": True},
},
"required": ["code"],
},
),
]
# 添加资源模板处理器
@self.server.list_resource_templates()
async def handle_list_resource_templates() -> list[types.ResourceTemplate]:
"""列出可用的资源模板。"""
return []
@self.server.call_tool()
async def handle_call_tool(
name: str, arguments: dict | None
) -> list[types.TextContent | types.ImageContent | types.EmbeddedResource]:
"""处理工具执行请求。"""
if name == "connect-unreal":
return await self._handle_connect_unreal(arguments or {})
elif name == "execute-python":
return await self._handle_execute_python(arguments or {})
raise ValueError(f"未知的工具: {name}")
async def _handle_connect_unreal(self, arguments: dict) -> list[types.TextContent]:
"""处理Unreal连接请求。"""
try:
host = arguments.get("host", "239.0.0.1")
port = arguments.get("port", 6766)
_logger.info(f"尝试连接Unreal: host={host}, port={port}")
if self.remote_execution:
self.remote_execution.stop()
config = RemoteExecutionConfig()
config.multicast_group_endpoint = (host, port)
self.remote_execution = RemoteExecution(config)
self.remote_execution.start()
# 等待发现节点
await asyncio.sleep(2)
nodes = self.remote_execution.remote_nodes
if not nodes:
_logger.warning("未发现任何Unreal节点")
return [types.TextContent(type="text", text="未发现任何Unreal节点")]
# 更新已连接节点列表
self.connected_nodes = {node["node_id"]: node for node in nodes}
await self.server.request_context.session.send_resource_list_changed()
# 启动节点监控任务
if self._node_monitor_task:
self._node_monitor_task.cancel()
self._node_monitor_task = asyncio.create_task(self._monitor_nodes())
_logger.info(f"成功连接到Unreal,发现{len(nodes)}个节点")
_logger.info(f"当前节点列表为: {self.connected_nodes.keys()}")
return [types.TextContent(
type="text",
text=f"成功连接到Unreal,发现{len(nodes)}个节点"
)]
except Exception as e:
_logger.error(f"连接Unreal失败: {str(e)}")
return [types.TextContent(
type="text",
text=f"连接Unreal失败: {str(e)}"
)]
async def _handle_execute_python(self, arguments: dict) -> list[types.TextContent]:
"""处理Python代码执行请求。"""
global _unreal_connection
# 确保连接存在且有效
try:
if not _unreal_connection or not _unreal_connection.remote_nodes:
_unreal_connection = get_unreal_connection()
# 等待一小段时间以确保连接建立
await asyncio.sleep(1)
if not _unreal_connection or not _unreal_connection.remote_nodes:
return [types.TextContent(type="text", text="无法连接到Unreal实例,请确保Unreal正在运行并启用了远程执行")]
except Exception as e:
return [types.TextContent(type="text", text=f"连接Unreal失败: {str(e)}")]
code = arguments.get("code")
if not code:
return [types.TextContent(type="text", text="未提供Python代码")]
unattended = arguments.get("unattended", True)
exec_mode = MODE_EXEC_STATEMENT
try:
# 获取第一个可用节点
nodes = _unreal_connection.remote_nodes
if not nodes:
return [types.TextContent(type="text", text="未发现任何Unreal节点")]
node_id = nodes[0]["node_id"]
_unreal_connection.open_command_connection(node_id)
result = _unreal_connection.run_command(
code, unattended=unattended, exec_mode=exec_mode
)
_unreal_connection.close_command_connection()
if not result.get("success", False):
return [types.TextContent(
type="text",
text=f"执行失败: {result.get('result', '未知错误')}"
)]
return [types.TextContent(
type="text",
text=f"执行结果:\n{result.get('result', '')}"
)]
except Exception as e:
if _unreal_connection:
try:
_unreal_connection.close_command_connection()
except:
pass
return [types.TextContent(
type="text",
text=f"执行失败: {str(e)}"
)]
async def _monitor_nodes(self):
"""监控节点状态的异步任务。"""
while True:
try:
await asyncio.sleep(1) # 每秒检查一次
if not self.remote_execution:
break
current_nodes = {node["node_id"]: node for node in self.remote_execution.remote_nodes}
# 检查节点变化
if current_nodes != self.connected_nodes:
self.connected_nodes = current_nodes
await self.server.request_context.session.send_resource_list_changed()
except asyncio.CancelledError:
break
except Exception as e:
_logger.error(f"节点监控错误: {str(e)}")
async def close(self):
"""关闭服务器和所有连接。"""
if self._node_monitor_task:
self._node_monitor_task.cancel()
try:
await self._node_monitor_task
except asyncio.CancelledError:
pass
if self.remote_execution:
self.remote_execution.stop()
@asynccontextmanager
async def server_lifespan(server: Server) -> AsyncIterator[Dict[str, Any]]:
"""管理服务器启动和关闭生命周期"""
try:
# 记录服务器启动
_logger.info("UnrealMCP服务器正在启动")
# 尝试在启动时连接到Unreal
try:
# 这将初始化全局连接
unreal = get_unreal_connection()
_logger.info("成功连接到Unreal")
except Exception as e:
_logger.warning(f"无法在启动时连接到Unreal: {str(e)}")
_logger.warning("请确保Unreal实例正在运行并启用了远程执行")
# 返回空上下文 - 我们使用全局连接
yield {}
finally:
# 在关闭时清理全局连接
global _unreal_connection, _node_monitor_task
if _node_monitor_task:
_node_monitor_task.cancel()
try:
await _node_monitor_task
except asyncio.CancelledError:
pass
_node_monitor_task = None
if _unreal_connection:
_logger.info("正在断开与Unreal的连接")
_unreal_connection.stop()
_unreal_connection = None
_logger.info("UnrealMCP服务器已关闭")
async def main():
unreal_server = McpUnrealServer("mcp-server-unreal", lifespan=server_lifespan)
try:
# 使用实例中的server对象来保持handler注册一致性
async with mcp.server.stdio.stdio_server() as (read_stream, write_stream):
await unreal_server.server.run(
read_stream,
write_stream,
InitializationOptions(
server_name="mcp-server-unreal",
server_version="0.1.0",
capabilities=unreal_server.server.get_capabilities(
notification_options=NotificationOptions(),
experimental_capabilities={},
),
),
)
finally:
unreal_server.close()