mcp_server.py•5.69 kB
"""main mcp server integrating all components."""
from __future__ import annotations
import importlib
import asyncio
import os
from typing import Any, Dict, List, Optional
from dataclasses import dataclass, asdict
from .mcp_protocol import MCPServer, MCPRequest, MCPResponse
from .reflection import ReflectionEngine, ToolMetadata
from .schema_generator import SchemaGenerator
from .executor import Executor
from .safety import SafetyConfig
from .config import get_settings
@dataclass
class SDKConfig:
"""configuration for loading an sdk."""
name: str
module: str
allow_patterns: Optional[List[str]] = None
deny_patterns: Optional[List[str]] = None
allow_mutating: bool = False
class GeneralizedMCPServer:
"""generalized mcp server that works with any python sdk."""
def __init__(
self,
sdks: List[SDKConfig],
use_llm_schemas: bool = True,
safety_config: Optional[SafetyConfig] = None
):
"""
initialize generalized mcp server.
args:
sdks: list of sdk configurations to load
use_llm_schemas: whether to use llm for schema enhancement
safety_config: safety configuration
"""
self.sdks = sdks
self.schema_generator = SchemaGenerator(use_llm=use_llm_schemas)
self.executor = Executor(safety_config=safety_config)
self.tools: Dict[str, ToolMetadata] = {}
self._load_tools()
def _load_tools(self):
"""load and discover tools from all configured sdks."""
for sdk_config in self.sdks:
try:
module = importlib.import_module(sdk_config.module)
# create reflection engine with sdk-specific config
reflector = ReflectionEngine(
allow_patterns=sdk_config.allow_patterns,
deny_patterns=sdk_config.deny_patterns,
allow_mutating=sdk_config.allow_mutating
)
# discover tools
discovered = reflector.discover_tools(module, sdk_config.name)
for tool in discovered:
self.tools[tool.fq_name] = tool
print(f"loaded {len(discovered)} tools from {sdk_config.name}")
except ModuleNotFoundError:
print(f"warning: sdk '{sdk_config.module}' not installed, skipping")
except Exception as e:
print(f"error loading sdk '{sdk_config.name}': {e}")
def handle_tools_list(self, params: Dict[str, Any]) -> Dict[str, Any]:
"""handle tools/list request."""
tools = []
for tool in self.tools.values():
mcp_tool = self.schema_generator.generate_mcp_tool(tool)
tools.append(mcp_tool)
return {"tools": tools}
async def handle_tools_call(self, params: Dict[str, Any]) -> Dict[str, Any]:
"""handle tools/call request."""
tool_name = params.get("name")
arguments = params.get("arguments", {})
if tool_name not in self.tools:
raise ValueError(f"tool not found: {tool_name}")
tool = self.tools[tool_name]
result = await self.executor.execute(tool, arguments)
if not result.success:
raise RuntimeError(result.error["message"])
return {
"content": [
{
"type": "text",
"text": str(result.result)
}
]
}
def create_mcp_server(self) -> MCPServer:
"""create mcp protocol server."""
server = MCPServer()
# register handlers
server.register("tools/list", lambda p: self.handle_tools_list(p))
server.register("tools/call", lambda p: asyncio.run(self.handle_tools_call(p)))
return server
def create_server_from_config() -> GeneralizedMCPServer:
"""create server from environment/config."""
settings = get_settings()
# default sdk configurations
sdks = [
SDKConfig(
name="kubernetes",
module="kubernetes.client",
allow_patterns=["*.list_*", "*.read_*", "*.get_*"],
allow_mutating=False
),
SDKConfig(
name="github",
module="github",
allow_patterns=["*.get_*", "*.list_*"],
allow_mutating=False
),
SDKConfig(
name="azure",
module="azure.identity",
allow_patterns=["*"],
allow_mutating=False
),
]
# add extra sdks from env
extra_sdks = os.getenv("EXTRA_SDKS", "")
for sdk_module in filter(None, [m.strip() for m in extra_sdks.split(",")]):
sdks.append(SDKConfig(
name=sdk_module.split(".")[0],
module=sdk_module,
allow_mutating=False
))
return GeneralizedMCPServer(
sdks=sdks,
use_llm_schemas=bool(settings.openai_api_key),
safety_config=SafetyConfig(
allow_mutating=False,
dry_run=os.getenv("DRY_RUN", "").lower() == "true",
redact_secrets=True
)
)
def main():
"""main entrypoint."""
server = create_server_from_config()
mcp_server = server.create_mcp_server()
print("generalized mcp server starting...")
print(f"loaded {len(server.tools)} tools from {len(server.sdks)} sdks")
print("listening on stdio...")
mcp_server.run_stdio()
if __name__ == "__main__":
main()