YetAnotherUnityMcp
by Azreal42
- YetAnotherUnityMcp
- server
"""Dynamic tool registration from Unity schema"""
import logging
import inspect
import json
import re
from typing import Any, Dict, List, Optional, Callable, Awaitable, Sequence, Set, Union
from mcp.server.fastmcp.resources import FunctionResource
from mcp.server.fastmcp.tools import Tool
from mcp.server.fastmcp import FastMCP, Context
from pydantic import AnyUrl
from server.connection_manager import UnityConnectionManager
from server.unity_client_util import UnityClientUtil
logger = logging.getLogger("dynamic_tools")
class DynamicToolManager:
"""
Manager for dynamically registering tools and resources based on Unity schema.
"""
def __init__(self, mcp: FastMCP, connection_manager: UnityConnectionManager):
self.mcp = mcp
self.connection_manager = connection_manager
self.registered_tools: Dict[str, str] = {}
self.registered_resources: Dict[str, str] = {}
async def register_from_schema(self) -> bool:
"""
Register all tools and resources from the Unity schema.
Returns:
True if successful, False otherwise
"""
logger.info("Fetching Unity schema for dynamic tool registration...")
try:
# Get schema from Unity
schema_result = await self.connection_manager.client.get_schema()
# Debug the schema structure
logger.debug(f"Schema result type: {type(schema_result)}")
if isinstance(schema_result, dict):
logger.debug(f"Schema result keys: {schema_result.keys()}")
# Process the schema - handle various formats
processed_schema = await self._process_schema(schema_result)
if not processed_schema or not isinstance(processed_schema, dict):
logger.error(f"Failed to process schema: {processed_schema}")
return False
if 'tools' not in processed_schema or 'resources' not in processed_schema:
logger.error(f"Processed schema is missing tools or resources")
return False
# Process tools
tools = processed_schema.get('tools', [])
for tool in tools:
try:
await self._register_tool(tool)
except Exception as e:
logger.error(f"Error registering tool: {str(e)}")
# Continue with other tools
# Process resources
resources = processed_schema.get('resources', [])
for resource in resources:
try:
await self._register_resource(resource)
except Exception as e:
logger.error(f"Error registering resource: {str(e)}")
# Continue with other resources
logger.info(f"Dynamic registration complete: {len(self.registered_tools)} tools, {len(self.registered_resources)} resources")
return True
except Exception as e:
logger.error(f"Error registering from schema: {str(e)}")
return False
async def _process_schema(self, schema_result: Any) -> Dict[str, Any]:
"""Process the schema result and extract the actual schema structure"""
schema = schema_result
# If schema is a string, try to parse it as JSON
if isinstance(schema, str):
try:
schema = json.loads(schema)
except json.JSONDecodeError as e:
logger.error(f"Failed to parse schema JSON: {e}")
return {}
# Handle different schema structures
if isinstance(schema, dict):
# Case 1: Schema already has tools and resources at top level
if 'tools' in schema or 'resources' in schema:
logger.info("Schema has tools and resources directly at the top level")
return schema
# Case 2: Schema in a result wrapper
if 'result' in schema:
result = schema.get('result', {})
# Case 2.1: Result directly contains tools and resources
if isinstance(result, dict) and 'tools' in result and 'resources' in result:
logger.info("Schema in result field with tools and resources")
return result
# Case 2.2: Result contains content array
if isinstance(result, dict) and 'content' in result:
content = result.get('content', [])
# Look for text content that contains tools and resources
for item in content:
if item.get('type') == 'text':
text = item.get('text', '')
try:
parsed = json.loads(text)
if isinstance(parsed, dict) and 'tools' in parsed:
logger.info("Found schema in content text")
return parsed
except json.JSONDecodeError:
pass
# Case 3: Schema directly in content array
if 'content' in schema:
content = schema.get('content', [])
for item in content:
if item.get('type') == 'text':
text = item.get('text', '')
try:
parsed = json.loads(text)
if isinstance(parsed, dict) and 'tools' in parsed:
logger.info("Found schema in top-level content text")
return parsed
except json.JSONDecodeError:
pass
logger.error("Could not find valid schema structure")
return {}
@staticmethod
def func_metadata(dynamic_func: Callable[..., Any], input_schema: Dict[str, Any] = None) -> Any:
"""Given a function and an input schema, return metadata including a pydantic model representing its signature.
This creates a pydantic model based on the input schema that can validate parameters
before passing them to the dynamic tool function.
Args:
dynamic_func: The function to create metadata for
input_schema: The JSON schema describing the function's parameters (optional)
Returns:
A FuncMetadata object with an arg_model that can validate parameters
"""
# If no input_schema is provided, create a very basic one
if input_schema is None:
logger.warning("No input schema provided for func_metadata, creating a basic schema")
input_schema = {"properties": {}, "required": []}
# Import the necessary modules from the FastMCP library
from pydantic import create_model, Field
from mcp.server.fastmcp.utilities.func_metadata import FuncMetadata, ArgModelBase
# Get the properties from the input schema
properties = input_schema.get('properties', {})
required_params = input_schema.get('required', [])
# Create a mapping of parameter names to their types and field info
dynamic_pydantic_model_params = {}
for param_name, param_schema in properties.items():
# Skip parameters that start with underscore (following the same pattern as in inspect-based implementation)
if param_name.startswith('_'):
logger.warning(f"Parameter {param_name} of {dynamic_func.__name__} starts with '_' and will be skipped")
continue
# Determine if parameter is required
is_required = param_name in required_params
# Get parameter type based on JSON schema
param_type = param_schema.get('type', 'string')
# Map JSON schema types to Python types
type_mapping = {
'string': str,
'integer': int,
'number': float,
'boolean': bool,
'array': list,
'object': dict,
'null': type(None)
}
python_type = type_mapping.get(param_type, Any)
description = param_schema.get('description', f'Parameter {param_name}')
# Create field info with appropriate settings
field_info = Field(
description=description,
title=param_name,
default=... if is_required else None, # Use ellipsis for required params
)
# Add to model parameters dictionary
dynamic_pydantic_model_params[param_name] = (python_type, field_info)
# Create the Pydantic model for function arguments
arguments_model = create_model(
f"{dynamic_func.__name__}Arguments",
**dynamic_pydantic_model_params,
__base__=ArgModelBase
)
# Create and return the FuncMetadata
return FuncMetadata(arg_model=arguments_model)
async def _register_tool(self, tool_schema: Dict[str, Any]) -> None:
"""
Register a tool from schema.
Args:
tool_schema: Tool schema from Unity
"""
tool_name = tool_schema.get('name')
if not tool_name:
logger.warning("Tool without name found in schema, skipping")
return
# Skip if already registered
if tool_name in self.registered_tools:
logger.debug(f"Tool {tool_name} already registered, skipping")
return
description = tool_schema.get('description', f"Unity tool: {tool_name}")
# Create the dynamic tool function
async def dynamic_tool(ctx: Context, *args, **kwargs) -> Any:
"""Dynamic tool execution function"""
# Extract parameters based on the input schema
params = {}
# Map positional args to parameters based on input schema
input_schema = tool_schema.get('inputSchema', {})
properties = input_schema.get('properties', {})
param_names = list(properties.keys())
# Get required parameters from schema (new MCP format uses required array)
required_params = input_schema.get('required', [])
logger.debug(f"Tool {tool_name} required parameters: {required_params}")
# Map the positional args to named parameters
for i, param_name in enumerate(param_names):
if i < len(args):
params[param_name] = args[i]
# Add any keyword args
for k, v in kwargs.items():
if k != 'ctx':
params[k] = v
try:
await ctx.info(f"Executing dynamic tool {tool_name} with params: {json.dumps(params)}")
result = await UnityClientUtil.execute_unity_operation(
self.connection_manager,
f"dynamic tool {tool_name}",
lambda: self.connection_manager.client.send_command(tool_name, params),
ctx,
f"Error executing {tool_name}"
)
return result
except Exception as e:
await ctx.error(f"Error in dynamic tool {tool_name}: {str(e)}")
raise
# Get input schema for logging
input_schema = tool_schema.get('inputSchema', {})
# Log registration details
logger.info(f"Registering dynamic tool {tool_name} with required parameters: {input_schema.get('required', [])} and optional parameters: {input_schema.get('properties', {}).keys() - input_schema.get('required', [])}")
func_arg_metadata = self.func_metadata(
dynamic_tool,
input_schema
)
# Register the tool with FastMCP
# since we are using a custom tool function, we need to manually create the Tool object
mcp_tool = Tool(
fn=dynamic_tool,
name=tool_name,
description=description,
parameters=input_schema,
fn_metadata=func_arg_metadata,
is_async=True,
context_kwarg="ctx",
)
self.mcp._tool_manager._tools[tool_name] = mcp_tool
# Store reference to the registered tool
self.registered_tools[tool_name] = description
logger.info(f"Successfully registered dynamic tool: {tool_name}")
async def _register_resource(self, resource_schema: Dict[str, Any]) -> None:
"""
Register a resource from schema.
Args:
resource_schema: Resource schema from Unity
"""
resource_name = resource_schema.get('name')
if not resource_name:
logger.warning("Resource without name found in schema, skipping")
return
# Skip if already registered
if resource_name in self.registered_resources:
logger.debug(f"Resource {resource_name} already registered, skipping")
return
# Check for both uri
uri = resource_schema.get('uri')
if not uri:
logger.warning(f"Resource {resource_name} has no URI, skipping")
return
description = resource_schema.get('description', f"Unity resource: {resource_name}")
mime_type = resource_schema.get('mimeType', 'application/json')
# Extract parameter information from URI
# This is useful for debugging and parameter mapping
parameters = []
parts = uri.split('/')
for part in parts:
if part.startswith('{') and part.endswith('}'):
# Extract the parameter name without the braces
param_name = part[1:-1]
parameters.append(param_name)
# Log the detected parameters
if parameters:
logger.info(f"Resource {resource_name} requires parameters: {parameters}")
# Check for camelCase parameters that might cause issues
camel_case_params = [p for p in parameters if any(c.isupper() for c in p)]
if camel_case_params:
# Log information about parameter name conversion
logger.info(f"Resource {resource_name} uses camelCase parameters: {camel_case_params}")
snake_case_examples = [self._camel_to_snake(p) for p in camel_case_params]
logger.info(f"When accessing this resource, use snake_case in Python: {snake_case_examples}")
logger.info(f"Parameters will be automatically converted back to camelCase when sent to Unity")
# Store the resource in our registry with all relevant info
self.registered_resources[resource_name] = {
"uri": uri,
"description": description,
"uri_params": parameters
}
# Create a dynamic resource handler function
async def dynamic_resource_handler(ctx: Context, *args, **kwargs):
"""Dynamic resource handler function"""
# Convert positional args to named parameters
param_dict = {}
# Map positional args to parameters from URI
for i, param_name in enumerate(parameters):
if i < len(args):
param_dict[param_name] = args[i]
# Add any keyword args
param_dict.update(kwargs)
# Check if all required parameters are provided
if len(parameters) > 0 and len(param_dict) < len(parameters):
missing_params = set(parameters) - set(param_dict.keys())
raise TypeError(f"Missing required parameters for resource {resource_name}: {', '.join(missing_params)}")
logger.info(f"Accessing dynamic resource {resource_name} with params: {param_dict}")
try:
# Execute the resource access
result = await UnityClientUtil.execute_unity_operation(
self.connection_manager,
f"dynamic resource {resource_name}",
lambda: self.connection_manager.client.send_command("access_resource", {
"resource_name": resource_name,
"parameters": param_dict
}),
ctx,
f"Error accessing {resource_name}"
)
return result
except Exception as e:
logger.error(f"Error in dynamic resource {resource_name}: {str(e)}")
raise
# Create the FunctionResource with the required fn parameter
try:
resource = FunctionResource(
uri=AnyUrl(uri), # FastMCP uses uri even for parameterized URIs
name=resource_name,
description=description,
mime_type=mime_type or "text/plain",
fn=dynamic_resource_handler, # Add the required fn parameter
)
self.mcp._resource_manager.add_resource(resource)
# Add the function to our registry
self.registered_resources[resource_name]["func"] = dynamic_resource_handler
logger.info(f"Successfully registered dynamic resource: {resource_name}")
except Exception as e:
logger.error(f"Error registering resource: {e}")
return
@staticmethod
def _camel_to_snake(name):
"""Convert a camelCase string to snake_case"""
# Replace common patterns first (like 'Id' to '_id')
name = name.replace("Id", "_id").replace("Name", "_name")
# Handle the general case
return ''.join(['_' + c.lower() if c.isupper() else c.lower() for c in name]).lstrip('_')