IACR MCP Server
import base64
import inspect
import subprocess
import traceback
import asyncio
from typing import List, Dict, Any, Optional, Union, Callable
from pydantic import BaseModel
from fastapi import HTTPException
from pydantic import BaseModel
from mcp import ClientSession, StdioServerParameters
from mcp.client.stdio import stdio_client
from mcp.client.stdio import get_default_environment
import asyncio
from contextlib import asynccontextmanager
@asynccontextmanager
async def managed_session(command: str, args: list, env: dict | None = None):
if env is None:
env = get_default_environment()
else:
default_env = get_default_environment()
default_env.update(env)
env = default_env
server_params = StdioServerParameters(
command=command,
args=args,
env=env,
)
client = None
session = None
try:
client = stdio_client(server_params)
read, write = await client.__aenter__()
session = ClientSession(read, write)
await session.__aenter__()
await session.initialize()
yield session
finally:
if session:
try:
await session.__aexit__(None, None, None)
except Exception:
pass
if client:
try:
await client.__aexit__(None, None, None)
except Exception:
pass
def install_library_(library):
try:
result = subprocess.run(
["uv", "pip", "install", library],
check=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
return result.returncode == 0
except subprocess.CalledProcessError:
return False
def uninstall_library_(library):
try:
result = subprocess.run(
["uv", "pip", "uninstall", "-y", library],
check=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
return result.returncode == 0
except subprocess.CalledProcessError:
return False
def add_tool_(function, description: str = "", properties: Dict[str, Any] = None, required: List[str] = None):
"""
Add a tool to the registered functions.
Args:
function: The function to be registered as a tool
"""
from ..server.function_tools import tool
# Apply the tool decorator with empty description
decorated_function = tool(description=description, custom_properties=properties, custom_required=required)(function)
return decorated_function
import cloudpickle
cloudpickle.DEFAULT_PROTOCOL = 2
from fastapi import HTTPException
from pydantic import BaseModel
from mcp import ClientSession, StdioServerParameters
import asyncio
from contextlib import asynccontextmanager
# Create server parameters for stdio connection
from .api import app, timeout
prefix = "/tools"
class InstallLibraryRequest(BaseModel):
library: str
@app.post(f"{prefix}/install_library")
@timeout(30.0)
async def install_library(request: InstallLibraryRequest):
"""
Endpoint to install a library.
Args:
library: The library to install
Returns:
A success message
"""
install_library_(request.library)
return {"message": "Library installed successfully"}
@app.post(f"{prefix}/uninstall_library")
@timeout(30.0)
async def uninstall_library(request: InstallLibraryRequest):
"""
Endpoint to uninstall a library.
"""
uninstall_library_(request.library)
return {"message": "Library uninstalled successfully"}
class AddToolRequest(BaseModel):
function: str
@app.post(f"{prefix}/add_tool")
@timeout(30.0)
async def add_tool(request: AddToolRequest):
"""
Endpoint to add a tool.
"""
# Cloudpickle the function
decoded_function = base64.b64decode(request.function)
deserialized_function = cloudpickle.loads(decoded_function)
add_tool_(deserialized_function)
return {"message": "Tool added successfully"}
class AddMCPToolRequest(BaseModel):
name: str
command: str
args: List[str]
env: Dict[str, str]
async def add_mcp_tool_(name: str, command: str, args: List[str], env: Dict[str, str]):
"""
Add a tool.
"""
def get_python_type(schema_type: str, format: Optional[str] = None) -> type:
"""Convert JSON schema type to Python type."""
type_mapping = {
"string": str,
"integer": int,
"boolean": bool,
"number": float,
"array": list,
"object": dict,
}
return type_mapping.get(schema_type, Any)
async with managed_session(command=command, args=args, env=env) as session:
tools = await session.list_tools()
tools = tools.tools
for tool in tools:
tool_name: str = tool.name
tool_desc: str = tool.description
input_schema: Dict[str, Any] = tool.inputSchema
properties: Dict[str, Dict[str, Any]] = input_schema.get("properties", {})
required: List[str] = input_schema.get("required", [])
def create_tool_function(
tool_name: str,
properties: Dict[str, Dict[str, Any]],
required: List[str],
) -> Callable[..., Dict[str, Any]]:
# Create function parameters type annotations
annotations = {}
defaults = {}
# First add required parameters
for param_name in required:
param_info = properties[param_name]
param_type = get_python_type(param_info.get("type", "any"))
annotations[param_name] = param_type
# Then add optional parameters
for param_name, param_info in properties.items():
if param_name not in required:
param_type = get_python_type(param_info.get("type", "any"))
annotations[param_name] = param_type
defaults[param_name] = param_info.get("default", None)
# Create the signature parameters
from inspect import Parameter, Signature
parameters = []
# Add required parameters first
for param_name in required:
param_type = annotations[param_name]
parameters.append(
Parameter(
name=param_name,
kind=Parameter.POSITIONAL_OR_KEYWORD,
annotation=param_type
)
)
# Add optional parameters
for param_name, param_type in annotations.items():
if param_name not in required:
parameters.append(
Parameter(
name=param_name,
kind=Parameter.POSITIONAL_OR_KEYWORD,
annotation=param_type,
default=defaults[param_name]
)
)
async def tool_function(*args: Any, **kwargs: Any) -> Dict[str, Any]:
# Convert positional args to kwargs
if len(args) > len(required):
raise TypeError(
f"{tool_name}() takes {len(required)} positional arguments but {len(args)} were given"
)
# Combine positional args with kwargs
all_kwargs = kwargs.copy()
for i, arg in enumerate(args):
if i < len(required):
all_kwargs[required[i]] = arg
# Validate required parameters
for req in required:
if req not in all_kwargs:
raise ValueError(f"Missing required parameter: {req}")
# Add defaults for optional parameters
for param, default in defaults.items():
if param not in all_kwargs:
all_kwargs[param] = default
async with managed_session(command=tool_function.command, args=tool_function.args, env=tool_function.env) as new_session:
# Remove None kwargs
all_kwargs = {k: v for k, v in all_kwargs.items() if v is not None}
result = await new_session.call_tool(name=tool_name, arguments=all_kwargs)
return {"result": result}
# Set function name and annotations
tool_function.__name__ = tool_name
tool_function.__annotations__ = {
**annotations,
"return": Dict[str, Any],
}
tool_function.__doc__ = f"{tool_desc}\n\nReturns:\n Tool execution results"
# Create and set the signature
tool_function.__signature__ = Signature(
parameters=parameters,
return_annotation=Dict[str, Any]
)
# Store session parameters as attributes of the function
tool_function.command = command
tool_function.args = args
tool_function.env = env
return tool_function
# Create function with proper annotations
func = create_tool_function(tool_name, properties, required)
#name should be name__function_name
full_name = f"{name}__{tool_name}"
func.__name__ = full_name
add_tool_(func, description=tool_desc, properties=properties, required=required)
@app.post(f"{prefix}/add_mcp_tool")
@timeout(60.0)
async def add_mcp_tool(request: AddMCPToolRequest):
"""
Endpoint to add a tool.
"""
await add_mcp_tool_(request.name, request.command, request.args, request.env)
return {"message": "Tool added successfully"}