Skip to main content
Glama

Sumanshu Arora

client.pyβ€’34.5 kB
""" MCP Client - Programmatic Python API for MCP Template system. This module provides a high-level Python API for programmatic access to MCP servers, using the refactored core modules for consistent functionality. Example usage: ```python import asyncio from mcp_template.client import MCPClient async def main(): client = MCPClient() # List available templates templates = client.list_templates() print(f"Available templates: {list(templates.keys())}") # Start a server server = client.start_server("demo", {"greeting": "Hello from API!"}) # List tools tools = client.list_tools("demo") print(f"Available tools: {[t['name'] for t in tools]}") # Call a tool result = client.call_tool("demo", "echo", {"message": "Hello World"}) print(f"Tool result: {result}") # List running servers servers = client.list_servers() print(f"Running servers: {len(servers)}") asyncio.run(main()) ``` """ import asyncio import json import logging from collections import defaultdict from typing import Any, Dict, List, Optional, Union from mcp_template.core import ( DeploymentManager, MCPConnection, TemplateManager, ToolCaller, ToolManager, ) from mcp_template.core.deployment_manager import DeploymentOptions from mcp_template.core.multi_backend_manager import MultiBackendManager from mcp_template.template.utils.discovery import TemplateDiscovery logger = logging.getLogger(__name__) class MCPClient: """ Unified MCP Client for programmatic access to MCP servers. This client provides a simplified interface for common MCP operations: - Connecting to MCP servers - Listing and calling tools - Managing server instances - Template discovery Consolidates functionality from both MCPClient and CoreMCPClient for simplicity. """ def __init__(self, backend_type: str = "docker", timeout: int = 30): """ Initialize MCP Client. Args: backend_type: Deployment backend (docker, kubernetes, mock) timeout: Default timeout for operations in seconds """ self.backend_type = backend_type self.timeout = timeout # Initialize core managers self.template_manager = TemplateManager(backend_type) self.deployment_manager = DeploymentManager(backend_type) self.tool_manager = ToolManager(backend_type) # Connection management for direct MCP connections self._active_connections = {} self._background_tasks = set() # Initialize other components self.template_discovery = TemplateDiscovery() self.tool_caller = ToolCaller(backend_type) self.multi_manager = MultiBackendManager(self.backend_type) # This is a temp MultiBackendManager which is set by methods that # also accept backend as input and set the value scoped for that method self._multi_manager = None # Template Management def list_templates( self, include_deployed_status: bool = False, all_backends: bool = False, ) -> Dict[str, Dict[str, Any]]: """ List all available MCP server templates. Args: include_deployed_status: Whether to include deployment status all_backends: False Returns: Dictionary mapping template_id to template information """ if all_backends: # Overwrite self.multi_manager to use all backends self._multi_manager = MultiBackendManager(enabled_backends=None) else: self._multi_manager = self.multi_manager available_backends = self._multi_manager.get_available_backends() # Get templates (backend-agnostic) template_manager = TemplateManager( available_backends[0] ) # Use any backend for template listing templates = template_manager.list_templates(include_deployed_status=False) # Get all deployments across backends if include_deployed_status: all_deployments = self._multi_manager.get_all_deployments() # Count running instances per template per backend deployment_info = {} for deployment in all_deployments: if deployment.get("status") == "running": template_name = deployment.get("template", "unknown") backend_type = deployment.get("backend_type", "unknown") deployment_id = deployment.get( "id", deployment.get("deployment_id", None) ) if template_name not in deployment_info: deployment_info[template_name] = {} if backend_type not in deployment_info[template_name]: deployment_info[template_name][backend_type] = { "count": 0, "deployment_ids": [], } deployment_info[template_name][backend_type]["count"] += 1 if deployment_id: deployment_info[template_name][backend_type][ "deployment_ids" ].append(deployment_id) for template_name, template_info in templates.items(): template_info["deployments"] = deployment_info.get(template_name, {}) templates[template_name] = template_info return templates def get_template_info(self, template_id: str) -> Optional[Dict[str, Any]]: """ Get detailed information about a specific template. Args: template_id: ID of the template Returns: Template information or None if not found """ try: return self.template_manager.get_template_info(template_id) except Exception as e: logger.error(f"Failed to get template info for {template_id}: {e}") return None def validate_template(self, template_id: str) -> bool: """ Validate that a template exists and is properly structured. Args: template_id: The template identifier Returns: True if template is valid, False otherwise """ try: return self.template_manager.validate_template(template_id) except Exception as e: logger.error(f"Failed to validate template {template_id}: {e}") return False def search_templates(self, query: str) -> Dict[str, Dict[str, Any]]: """ Search templates by name, description, or tags. Args: query: Search query string Returns: Dictionary of matching templates """ try: return self.template_manager.search_templates(query) except Exception as e: logger.error(f"Failed to search templates: {e}") return {} # Server Management def list_servers( self, template_name: Optional[str] = None, all_backends: bool = False, status: str = None, ) -> List[Dict[str, Any]]: """ List all currently running MCP servers. Args: template_name: Optional filter by template name all_backends: All backend Returns: List of running server information """ if all_backends: self._multi_manager = MultiBackendManager(enabled_backends=None) else: self._multi_manager = self.multi_manager try: all_deployments = self._multi_manager.get_all_deployments( template_name=template_name, status=status ) return all_deployments except Exception as e: logger.error(f"Failed to list servers: {e}") return [] def list_servers_by_template(self, template: str) -> List[Dict[str, Any]]: """ List all currently running MCP servers for a specific template. Args: template: Template name to filter servers by Returns: List of running server information for the specified template """ return self.list_servers(template_name=template) def start_server( self, template_id: str, configuration: Optional[Dict[str, Any]] = None, config_file: Optional[str] = None, env_vars: Optional[Dict[str, str]] = None, overrides: Optional[Dict[str, str]] = None, volumes: Optional[Union[Dict[str, str], List[str]]] = None, pull_image: bool = True, transport: Optional[str] = "http", host: Optional[str] = "0.0.0.0", port: Optional[int] = None, name: Optional[str] = None, timeout: int = 300, backend_config: Optional[Dict[str, Any]] = None, backend_config_file: Optional[str] = None, ) -> Optional[Dict[str, Any]]: """ Start a new MCP server instance. Args: template_id: Template to deploy configuration: Configuration key-value pairs config_file: Path to configuration file env_vars: Environment variables (highest precedence) overrides: Template overrides values volumes: Volume mounts as {host_path: container_path} pull_image: Whether to pull the latest image transport: Optional transport type (e.g., "http", "stdio") host: Optional host port: Optional port for HTTP transport name: Custom deployment name timeout: Deployment timeout backend_config: Backend specific config backend_config_file: Backend config file Returns: Server deployment information or None if failed """ if backend_config or backend_config_file: raise ValueError("Backend config support to be added in future") # Validate volume format early if volumes and not isinstance(volumes, (dict, list)): raise TypeError( f"Invalid volume type: {type(volumes).__name__}. Expected dict or list" ) try: if not configuration: configuration = {} if host: configuration["MCP_HOST"] = host if transport: configuration["MCP_TRANSPORT"] = transport if port: configuration["MCP_PORT"] = str(port) # Structure config sources for deployment manager config_sources = { "config_file": config_file or None, "env_vars": env_vars if env_vars else None, "config_values": configuration, "override_values": overrides if overrides else None, "volume_config": volumes if volumes else None, "backend_config": backend_config if backend_config else None, "backend_config_file": backend_config_file, } # Handle volumes if volumes: if not config_sources.get("config_values"): config_sources["config_values"] = {} # Process volumes: convert list to dict if needed if isinstance(volumes, list): processed_volumes = {path: path for path in volumes} else: processed_volumes = volumes config_sources["config_values"]["VOLUMES"] = processed_volumes deployment_options = DeploymentOptions( name=name, transport=transport, port=port or 7071, pull_image=pull_image, timeout=timeout, ) result = self.deployment_manager.deploy_template( template_id, config_sources, deployment_options ) return result.to_dict() if result.success else None except Exception as e: logger.error(f"Failed to start server for {template_id}: {e}") return None def deploy_template( self, template_id: str, config_file: Optional[str] = None, config: Optional[Dict[str, str]] = None, env_vars: Optional[Dict[str, str]] = None, overrides: Optional[Dict[str, str]] = None, volumes: Optional[Union[Dict[str, str], List[str], str]] = None, transport: Optional[str] = None, pull_image: bool = True, name: Optional[str] = None, timeout: int = 300, host: str = "0.0.0.0", port: int = None, backend_config: Optional[Dict[str, Any]] = None, backend_config_file: Optional[str] = None, ) -> Optional[Dict[str, Any]]: """ Deploy a template with CLI-like interface supporting config precedence and volumes. Args: template_id: Template to deploy config_file: Path to configuration file config: Configuration key=value pairs env_vars: Environment variables (highest precedence) volumes: Volume mounts - dict {host_path: container_path}, list of paths, or JSON string transport: Transport protocol (http, stdio) pull_image: Whether to pull the latest image name: Custom deployment name timeout: Deployment timeout host: Host port: Port backend_config: Backend config backend_config_file: Backend config file Returns: Server deployment information or None if failed """ try: # Process volumes processed_volumes = None if volumes: if isinstance(volumes, dict): processed_volumes = volumes elif isinstance(volumes, list): processed_volumes = {path: path for path in volumes} elif isinstance(volumes, str): # Try to parse as JSON try: parsed = json.loads(volumes) if isinstance(parsed, dict): processed_volumes = parsed elif isinstance(parsed, list): processed_volumes = {path: path for path in parsed} else: raise ValueError( "Invalid volume format. Expected dict or list" ) except json.JSONDecodeError: raise ValueError("Invalid JSON format for volumes") else: raise ValueError("Invalid volume format. Expected dict or list") return self.start_server( template_id=template_id, configuration=config, config_file=config_file, env_vars=env_vars, overrides=overrides, volumes=processed_volumes, pull_image=pull_image, transport=transport, host=host, port=str(port) if port else None, name=name, timeout=timeout, backend_config=backend_config, backend_config_file=backend_config_file, ) except Exception as e: logger.error(f"Failed to deploy template {template_id}: {e}") return None def stop_server(self, deployment_id: str, timeout: int = 30) -> Dict[str, Any]: """Stop a running server. Args: deployment_id: Unique identifier for the deployment timeout: Timeout for graceful shutdown Returns: Result of the stop operation """ self._multi_manager = MultiBackendManager(enabled_backends=None) try: # Disconnect any active connections first if deployment_id in self._active_connections: # Don't create task if no event loop is running try: asyncio.get_running_loop() # Store task to prevent garbage collection task = asyncio.create_task( self._active_connections[deployment_id].disconnect() ) # Store the task in a background set self._background_tasks.add(task) task.add_done_callback(self._background_tasks.discard) except RuntimeError: # No event loop running, just remove the connection pass del self._active_connections[deployment_id] return self.multi_manager.stop_deployment(deployment_id, timeout) except Exception as e: logger.error(f"Failed to stop server {deployment_id}: {e}") return {"success": False, "error": str(e)} def stop_all_servers( self, template: str = None, all_backends: bool = False, timeout: int = 30, force: bool = False, ) -> bool: """ Stop all servers for a specific template. Args: template: Template name to stop all servers. If None, stops all servers. all_backends: Shall all deployments cross banckend be stopped? Returns: True if all servers were stopped successfully, False otherwise """ if all_backends: self._multi_manager = MultiBackendManager(enabled_backends=None) else: self._multi_manager = self.multi_manager try: targets = self.list_servers( template_name=template, all_backends=all_backends, status="running" ) if not targets: return {None: {"success": True}} results = {} for deployment in targets: backend = deployment.get("backend_type", "unknown") deployment_id = deployment.get( "id", deployment.get("deployment_id", deployment.get("name", None)) ) deployment_manager = self._multi_manager.deployment_managers.get( backend ) result = {"success": False} if backend and deployment_id and deployment_manager: try: result = deployment_manager.stop_deployment( deployment_id, timeout=timeout, force=force ) except Exception as e: logger.error(f"Failed to stop server {deployment_id}: {e}") results[deployment_id] = result except Exception as e: logger.error(f"Failed to stop all servers for template {template}: {e}") return results def get_server_info(self, deployment_id: str) -> Optional[Dict[str, Any]]: """ Get information about a specific server deployment. Args: deployment_id: ID of the deployment Returns: Server information or None if not found """ try: deployments = self.deployment_manager.find_deployments_by_criteria( deployment_id=deployment_id ) return deployments[0] if deployments else None except Exception as e: logger.error(f"Failed to get server info for {deployment_id}: {e}") return None def get_server_logs( self, deployment_id: str, lines: int = 100, follow: bool = False ) -> Optional[str]: """ Get logs from a running server. Args: deployment_id: ID of the deployment lines: Number of log lines to retrieve follow: Whether to stream logs in real-time Returns: Log content or None if failed """ self._multi_manager = MultiBackendManager(enabled_backends=None) try: result = self._multi_manager.get_deployment_logs( deployment_id, lines=lines, follow=follow ) return result.get("logs") if result.get("success") else None except Exception as e: logger.error(f"Failed to get server logs for {deployment_id}: {e}") return None def get_template_logs( self, template: str, lines: int = 100, all_backends: bool = False, ) -> Dict[str, List[Dict[str, str]]]: """ Get logs for all running deployments of a template, grouped by backend_type. Args: template: Template name to filter deployments lines: Number of log lines to retrieve per deployment all_backends: Whether to include all backends Returns: Dict of backend_type -> list of {deployment_id: logs} """ if all_backends: self._multi_manager = MultiBackendManager(enabled_backends=None) else: self._multi_manager = self.multi_manager result = defaultdict(list) try: deployments = self.list_servers( template_name=template, all_backends=all_backends, status="running" ) for dep in deployments: backend = dep.get("backend_type", "unknown") deployment_id = dep.get( "id", dep.get("deployment_id", dep.get("name", None)) ) logs = self.get_server_logs(deployment_id, lines=lines) result[backend].append({deployment_id: logs}) except Exception as e: logger.error(f"Failed to get logs for template {template}: {e}") return dict(result) # Tool Discovery and Management def list_tools( self, template_name: str, force_refresh: bool = False, static: bool = True, dynamic: bool = True, include_metadata: bool = False, ) -> Union[List[Dict[str, Any]], Dict[str, Any]]: """ List available tools from a template or all discovered tools. Args: template_name: Specific template to get tools from force_refresh: Force refresh of tool cache static: Allow static discovery dynamic: Allow dynamic discovery include_metadata: Whether to return metadata about discovery method Returns: If include_metadata=True: Dict with tools and metadata If include_metadata=False: List of tools (backward compatible) """ try: if force_refresh: self.tool_manager.clear_cache(template_name=template_name) result = self.tool_manager.list_tools( template_name, static=static, dynamic=dynamic, force_refresh=force_refresh, ) if include_metadata: return result else: # Backward compatible - return just the tools list return result.get("tools", []) except Exception as e: logger.error(f"Failed to list tools for {template_name}: {e}") return [] def call_tool( self, template_id: str, tool_name: str, arguments: Dict[str, Any] = None, deployment_id: Optional[str] = None, server_config: Optional[Dict[str, Any]] = None, timeout: int = 30, pull_image: bool = True, force_stdio: bool = False, ) -> Optional[Dict[str, Any]]: """ Call a tool on an MCP server. This method supports both stdio and HTTP transports, automatically determining the best approach based on deployment status and template configuration. Args: template_id: Template that provides the tool tool_name: Name of the tool to call arguments: Arguments to pass to the tool deployment_id: Existing deployment to use (optional) server_config: Configuration for server if starting new instance timeout: Timeout for the call pull_image: Whether to pull images for stdio calls force_stdio: Force stdio transport even if HTTP is available Returns: Tool response or None if failed """ try: # Use multi-backend manager for tool calling to support auto-detection # and priority-based discovery across all backends result = self.multi_manager.call_tool( template_name=template_id, tool_name=tool_name, arguments=arguments or {}, config_values=server_config, timeout=timeout, pull_image=pull_image, force_stdio=force_stdio, ) return result except Exception as e: logger.error(f"Failed to call tool {tool_name}: {e}") return None def call_tool_with_config( self, template_id: str, tool_name: str, arguments: Dict[str, Any] = None, config_file: Optional[str] = None, env_vars: Optional[Dict[str, str]] = None, config_values: Optional[Dict[str, Any]] = None, all_backends: bool = True, timeout: int = 30, pull_image: bool = True, force_stdio: bool = False, ) -> Optional[Dict[str, Any]]: """ Call a tool with flexible configuration support for interactive CLI. Args: template_id: Template that provides the tool tool_name: Name of the tool to call arguments: Arguments to pass to the tool config_file: Path to configuration file env_vars: Environment variables config_values: Direct configuration values all_backends: Try all backends timeout: Timeout for the call pull_image: Whether to pull images for stdio calls force_stdio: Force stdio transport Returns: Tool response or None if failed """ if all_backends: self._multi_manager = MultiBackendManager(enabled_backends=None) else: self._multi_manager = self.multi_manager try: from mcp_template.core.config_processor import ConfigProcessor # Get template info for configuration processing template_info = self.get_template_info(template_id) if not template_info: logger.error(f"Template {template_id} not found") return None # Merge configuration from all sources config_processor = ConfigProcessor() final_config = config_processor.prepare_configuration( template=template_info, session_config=config_values or {}, config_file=config_file, config_values={}, # inline configs handled via config_values param env_vars=env_vars or {}, ) # Use multi-backend manager for better discovery and backend selection result = self._multi_manager.call_tool( template_name=template_id, tool_name=tool_name, arguments=arguments or {}, config_values=final_config, timeout=timeout, pull_image=pull_image, force_stdio=force_stdio, ) return result except Exception as e: logger.error(f"Failed to call tool {tool_name} with config: {e}") return None # Direct Connection Methods async def connect_stdio( self, command: List[str], working_dir: Optional[str] = None, env_vars: Optional[Dict[str, str]] = None, connection_id: Optional[str] = None, ) -> Optional[str]: """ Create a direct stdio connection to an MCP server. Args: command: Command to execute MCP server working_dir: Working directory for the process env_vars: Environment variables for the process connection_id: Optional ID for the connection (auto-generated if None) Returns: Connection ID if successful, None if failed """ if connection_id is None: connection_id = f"stdio_{len(self._active_connections)}" connection = MCPConnection(timeout=self.timeout) success = await connection.connect_stdio( command=command, working_dir=working_dir, env_vars=env_vars ) if success: self._active_connections[connection_id] = connection return connection_id else: await connection.disconnect() return None async def list_tools_from_connection( self, connection_id: str ) -> Optional[List[Dict[str, Any]]]: """ List tools from an active connection. Args: connection_id: ID of the connection Returns: List of tool definitions or None if failed """ if connection_id not in self._active_connections: logger.error("Connection %s not found", connection_id) return None connection = self._active_connections[connection_id] return await connection.list_tools() async def call_tool_from_connection( self, connection_id: str, tool_name: str, arguments: Dict[str, Any] ) -> Optional[Dict[str, Any]]: """ Call a tool using an active connection. Args: connection_id: ID of the connection tool_name: Name of the tool to call arguments: Arguments to pass to the tool Returns: Tool response or None if failed """ if connection_id not in self._active_connections: logger.error("Connection %s not found", connection_id) return None connection = self._active_connections[connection_id] return await connection.call_tool(tool_name, arguments) async def disconnect(self, connection_id: str) -> bool: """ Disconnect from an active connection. Args: connection_id: ID of the connection to disconnect Returns: True if disconnected successfully, False if connection not found """ if connection_id not in self._active_connections: return False connection = self._active_connections[connection_id] await connection.disconnect() del self._active_connections[connection_id] return True # Async versions of main methods async def start_server_async( self, template_id: str, configuration: Optional[Dict[str, Any]] = None, pull_image: bool = True, transport: Optional[str] = None, port: Optional[int] = None, name: Optional[str] = None, timeout: int = 300, ) -> Optional[Dict[str, Any]]: """Async version of start_server.""" loop = asyncio.get_event_loop() return await loop.run_in_executor( None, self.start_server, template_id, configuration, pull_image, transport, port, name, timeout, ) async def list_tools_async( self, template_name: Optional[str] = None, force_refresh: bool = False, discovery_method: str = "auto", ) -> List[Dict[str, Any]]: """Async version of list_tools.""" loop = asyncio.get_event_loop() return await loop.run_in_executor( None, self.list_tools, template_name, force_refresh, False, discovery_method ) async def call_tool_async( self, template_id: str, tool_name: str, arguments: Dict[str, Any] = None, timeout: int = 30, ) -> Optional[Dict[str, Any]]: """Async version of call_tool.""" loop = asyncio.get_event_loop() return await loop.run_in_executor( None, self.call_tool, template_id, tool_name, arguments, None, None, timeout ) # Utility methods def clear_caches(self) -> None: """Clear all internal caches.""" try: self.template_manager.refresh_cache() self.tool_manager.clear_cache() except Exception as e: logger.error(f"Failed to clear caches: {e}") def get_backend_type(self) -> str: """Get the backend type being used.""" return self.backend_type def set_backend_type(self, backend_type: str) -> None: """ Change the backend type (reinitializes all managers). Args: backend_type: New backend type (docker, kubernetes, mock) """ try: self.backend_type = backend_type self.template_manager = TemplateManager(backend_type) self.deployment_manager = DeploymentManager(backend_type) self.tool_manager = ToolManager(backend_type) self.tool_caller = ToolCaller(backend_type) except Exception as e: logger.error(f"Failed to set backend type to {backend_type}: {e}") raise # Cleanup methods async def cleanup(self) -> None: """Clean up all active connections and resources.""" # Create a copy of keys to avoid modifying dict during iteration connection_ids = list(self._active_connections.keys()) for connection_id in connection_ids: await self.disconnect(connection_id) async def __aenter__(self): """Async context manager entry.""" return self async def __aexit__(self, exc_type, exc_val, exc_tb): """Async context manager exit.""" await self.cleanup() # Legacy compatibility - some methods for backward compatibility class CoreMCPClient(MCPClient): """Legacy alias for backward compatibility.""" def __init__(self, backend_type: str = "docker"): super().__init__(backend_type=backend_type)

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/Data-Everything/mcp-server-templates'

If you have feedback or need assistance with the MCP directory API, please join our Discord server