Skip to main content
Glama

MCP Unified Server

by getfounded
sdk.py12.1 kB
""" MCP Tool Kit SDK - Easy integration for Python applications """ import asyncio import logging import os import time from typing import Dict, Any, Optional, List, Callable, Union from concurrent.futures import ThreadPoolExecutor import json from functools import wraps from app.toolkit_client import MCPClient class ToolResult: """Wrapper for tool execution results with status and metadata.""" def __init__(self, success: bool, data: Any, error: Optional[str] = None, metadata: Optional[Dict] = None): self.success = success self.data = data self.error = error self.metadata = metadata or {} def __repr__(self): return f"ToolResult(success={self.success}, data={self.data[:100] if isinstance(self.data, str) else self.data}...)" def to_dict(self): return { "success": self.success, "data": self.data, "error": self.error, "metadata": self.metadata } class MCPToolKitSDK: """ Enhanced SDK for integrating MCP tools into Python applications. Features: - Simplified tool discovery and usage - Async/sync support - Context managers for resource management - Batch operations - Event hooks and middleware - Built-in caching and retry logic """ def __init__( self, server_url: str = "http://localhost:8000", async_mode: bool = False, retry_count: int = 3, timeout: int = 30, cache_ttl: int = 300 ): """ Initialize the SDK with enhanced configuration options. Args: server_url: MCP server URL async_mode: Enable async operations retry_count: Number of retries for failed operations timeout: Request timeout in seconds cache_ttl: Cache time-to-live in seconds """ self.client = MCPClient(server_url) self.async_mode = async_mode self.retry_count = retry_count self.timeout = timeout self.cache_ttl = cache_ttl self._cache = {} self._middleware = [] self._event_handlers = {} self._executor = ThreadPoolExecutor(max_workers=10) if async_mode else None self.logger = logging.getLogger("MCPToolKitSDK") def __enter__(self): """Context manager entry.""" return self def __exit__(self, exc_type, exc_val, exc_tb): """Context manager exit with cleanup.""" if self._executor: self._executor.shutdown(wait=True) async def __aenter__(self): """Async context manager entry.""" return self async def __aexit__(self, exc_type, exc_val, exc_tb): """Async context manager exit.""" if self._executor: self._executor.shutdown(wait=True) # Core tool execution def call_tool(self, tool_name: str, params: Dict[str, Any], **kwargs) -> ToolResult: """ Call a tool with enhanced error handling and result wrapping. Args: tool_name: Name of the tool params: Tool parameters **kwargs: Additional options (cache, retry, etc.) Returns: ToolResult object with execution results """ # Check cache cache_key = f"{tool_name}:{json.dumps(params, sort_keys=True)}" if kwargs.get('cache', True) and cache_key in self._cache: cached_data, timestamp = self._cache[cache_key] if timestamp + self.cache_ttl > time.time(): return ToolResult(True, cached_data, metadata={"cached": True}) # Apply middleware for middleware in self._middleware: params = middleware(tool_name, params) # Execute with retry logic retry_count = kwargs.get('retry', self.retry_count) last_error = None for attempt in range(retry_count): try: # Emit before event self._emit_event('before_call', tool_name, params) # Execute tool result = self.client.call_tool(tool_name, params) # Parse result try: data = json.loads(result) if isinstance(result, str) else result if isinstance(data, dict) and 'error' in data: raise Exception(data['error']) except json.JSONDecodeError: data = result # Cache successful result if kwargs.get('cache', True): self._cache[cache_key] = (data, time.time()) # Emit after event self._emit_event('after_call', tool_name, params, data) return ToolResult(True, data) except Exception as e: last_error = str(e) if attempt < retry_count - 1: self.logger.warning(f"Retry {attempt + 1}/{retry_count} for {tool_name}: {e}") continue # All retries failed self._emit_event('error', tool_name, params, last_error) return ToolResult(False, None, error=last_error) async def call_tool_async(self, tool_name: str, params: Dict[str, Any], **kwargs) -> ToolResult: """Async version of call_tool.""" loop = asyncio.get_event_loop() return await loop.run_in_executor( self._executor, self.call_tool, tool_name, params, **kwargs ) # Batch operations def batch_call(self, operations: List[Dict[str, Any]]) -> List[ToolResult]: """ Execute multiple tool calls in batch. Args: operations: List of dicts with 'tool', 'params', and optional 'options' Returns: List of ToolResult objects """ results = [] for op in operations: result = self.call_tool( op['tool'], op['params'], **op.get('options', {}) ) results.append(result) return results async def batch_call_async(self, operations: List[Dict[str, Any]]) -> List[ToolResult]: """Async batch execution with concurrency.""" tasks = [ self.call_tool_async( op['tool'], op['params'], **op.get('options', {}) ) for op in operations ] return await asyncio.gather(*tasks) # Tool discovery def list_tools(self) -> List[Dict[str, Any]]: """Get list of available tools with metadata.""" result = self.call_tool("list_tools", {}) return result.data if result.success else [] def get_tool_info(self, tool_name: str) -> Optional[Dict[str, Any]]: """Get detailed information about a specific tool.""" tools = self.list_tools() for tool in tools: if tool.get('name') == tool_name: return tool return None # Middleware and hooks def add_middleware(self, middleware: Callable): """Add middleware to process parameters before tool calls.""" self._middleware.append(middleware) def on(self, event: str, handler: Callable): """Register event handler.""" if event not in self._event_handlers: self._event_handlers[event] = [] self._event_handlers[event].append(handler) def _emit_event(self, event: str, *args, **kwargs): """Emit event to registered handlers.""" if event in self._event_handlers: for handler in self._event_handlers[event]: try: handler(*args, **kwargs) except Exception as e: self.logger.error(f"Event handler error: {e}") # Convenience methods for common tools def file(self, path: str) -> 'FileOperations': """Get file operations helper for a specific path.""" return FileOperations(self, path) def git(self, repo_path: str = ".") -> 'GitOperations': """Get git operations helper.""" return GitOperations(self, repo_path) def web(self) -> 'WebOperations': """Get web operations helper.""" return WebOperations(self) # Dynamic tool method generation def __getattr__(self, name: str): """Dynamically generate tool methods.""" def tool_method(**params): return self.call_tool(name, params) return tool_method class FileOperations: """Simplified file operations using MCP tools.""" def __init__(self, sdk: MCPToolKitSDK, base_path: str = "."): self.sdk = sdk self.base_path = base_path def read(self, path: Optional[str] = None) -> str: """Read file contents.""" full_path = path or self.base_path result = self.sdk.call_tool("read_file", {"path": full_path}) return result.data if result.success else "" def write(self, content: str, path: Optional[str] = None) -> bool: """Write content to file.""" full_path = path or self.base_path result = self.sdk.call_tool("write_file", {"path": full_path, "content": content}) return result.success def append(self, content: str, path: Optional[str] = None) -> bool: """Append content to file.""" full_path = path or self.base_path current = self.read(full_path) return self.write(current + content, full_path) def exists(self, path: Optional[str] = None) -> bool: """Check if file exists.""" full_path = path or self.base_path result = self.sdk.call_tool("list_directory", {"path": os.path.dirname(full_path)}) if result.success: filename = os.path.basename(full_path) return filename in result.data return False class GitOperations: """Simplified git operations.""" def __init__(self, sdk: MCPToolKitSDK, repo_path: str = "."): self.sdk = sdk self.repo_path = repo_path def status(self) -> str: """Get git status.""" result = self.sdk.call_tool("git_status", {"repo_path": self.repo_path}) return result.data if result.success else "" def commit(self, message: str, files: Optional[List[str]] = None) -> bool: """Create a git commit.""" params = {"repo_path": self.repo_path, "message": message} if files: params["files"] = files result = self.sdk.call_tool("git_commit", params) return result.success def push(self, branch: Optional[str] = None) -> bool: """Push to remote.""" params = {"repo_path": self.repo_path} if branch: params["branch"] = branch result = self.sdk.call_tool("git_push", params) return result.success class WebOperations: """Simplified web operations.""" def __init__(self, sdk: MCPToolKitSDK): self.sdk = sdk def get(self, url: str, headers: Optional[Dict] = None) -> Union[str, Dict]: """Make GET request.""" params = {"url": url, "method": "GET"} if headers: params["headers"] = headers result = self.sdk.call_tool("fetch", params) return result.data if result.success else None def post(self, url: str, data: Any, headers: Optional[Dict] = None) -> Union[str, Dict]: """Make POST request.""" params = {"url": url, "method": "POST", "body": data} if headers: params["headers"] = headers result = self.sdk.call_tool("fetch", params) return result.data if result.success else None # Export main classes __all__ = ['MCPToolKitSDK', 'ToolResult', 'FileOperations', 'GitOperations', 'WebOperations']

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/getfounded/mcp-tool-kit'

If you have feedback or need assistance with the MCP directory API, please join our Discord server