Skip to main content
Glama

MCP Orchestration Server

universal_connector.py19.6 kB
#!/usr/bin/env python3 """ Universal Agent Connector - USB-like interface for external agents Connects to any external agent without modifying their code """ import requests import json import importlib import inspect from typing import Dict, Any, List, Optional, Callable, Union from datetime import datetime import logging logger = logging.getLogger(__name__) class UniversalAgentConnector: """ Universal connector that can interface with any external agent. Works like a USB - plug and play without code modifications. """ def __init__(self): """Initialize the universal connector.""" self.connected_agents = {} self.agent_capabilities = {} self.routing_patterns = {} self.connection_methods = { 'http_api': self._connect_http_api, 'python_module': self._connect_python_module, 'function_call': self._connect_function, 'class_instance': self._connect_class_instance, 'websocket': self._connect_websocket, 'grpc': self._connect_grpc } def register_agent(self, agent_config: Dict[str, Any]) -> bool: """ Register an external agent with the MCP system. Args: agent_config: Configuration for the external agent Returns: True if successfully registered """ try: agent_id = agent_config['id'] connection_type = agent_config['connection_type'] # Validate connection type if connection_type not in self.connection_methods: logger.error(f"Unsupported connection type: {connection_type}") return False # Connect to the agent connector_func = self.connection_methods[connection_type] agent_interface = connector_func(agent_config) if agent_interface: # Store agent connection self.connected_agents[agent_id] = { 'interface': agent_interface, 'config': agent_config, 'connection_type': connection_type, 'status': 'connected', 'last_ping': datetime.now() } # Extract capabilities capabilities = self._extract_capabilities(agent_config, agent_interface) self.agent_capabilities[agent_id] = capabilities # Update routing patterns self._update_routing_patterns(agent_id, capabilities) logger.info(f"Successfully registered agent: {agent_id}") return True return False except Exception as e: logger.error(f"Failed to register agent {agent_config.get('id', 'unknown')}: {e}") return False def route_request(self, user_input: str, context: Dict[str, Any] = None) -> Dict[str, Any]: """ Automatically route user request to the most appropriate agent. Args: user_input: User's natural language request context: Additional context for routing Returns: Response from the selected agent """ try: # Analyze request to determine best agent selected_agent = self._select_agent(user_input, context) if not selected_agent: return { 'status': 'error', 'message': 'No suitable agent found for this request', 'user_input': user_input } # Execute request through selected agent response = self._execute_request(selected_agent, user_input, context) # Add routing metadata response['routing_info'] = { 'selected_agent': selected_agent, 'routing_confidence': self._calculate_confidence(user_input, selected_agent), 'available_agents': list(self.connected_agents.keys()), 'timestamp': datetime.now().isoformat() } return response except Exception as e: logger.error(f"Error routing request: {e}") return { 'status': 'error', 'message': f'Routing error: {str(e)}', 'user_input': user_input } def _connect_http_api(self, config: Dict[str, Any]) -> Optional[Dict[str, Any]]: """Connect to HTTP API based agent.""" try: base_url = config['endpoint'] headers = config.get('headers', {}) auth = config.get('auth', {}) # Test connection health_endpoint = config.get('health_check', f"{base_url}/health") response = requests.get(health_endpoint, headers=headers, timeout=10) if response.status_code == 200: return { 'type': 'http_api', 'base_url': base_url, 'headers': headers, 'auth': auth, 'methods': config.get('methods', ['POST']), 'endpoints': config.get('endpoints', {}) } return None except Exception as e: logger.error(f"Failed to connect to HTTP API: {e}") return None def _connect_python_module(self, config: Dict[str, Any]) -> Optional[Any]: """Connect to Python module based agent.""" try: module_path = config['module_path'] class_name = config.get('class_name') # Import the module module = importlib.import_module(module_path) if class_name: # Get class from module agent_class = getattr(module, class_name) # Initialize with config parameters init_params = config.get('init_params', {}) agent_instance = agent_class(**init_params) return agent_instance else: # Return module directly return module except Exception as e: logger.error(f"Failed to connect to Python module: {e}") return None def _connect_function(self, config: Dict[str, Any]) -> Optional[Callable]: """Connect to function based agent.""" try: module_path = config['module_path'] function_name = config['function_name'] # Import module and get function module = importlib.import_module(module_path) function = getattr(module, function_name) return function except Exception as e: logger.error(f"Failed to connect to function: {e}") return None def _connect_class_instance(self, config: Dict[str, Any]) -> Optional[Any]: """Connect to existing class instance.""" try: # This would connect to an already instantiated class # Implementation depends on how the instance is provided instance_ref = config.get('instance_reference') if instance_ref: # Could be a global variable, singleton, etc. return instance_ref return None except Exception as e: logger.error(f"Failed to connect to class instance: {e}") return None def _connect_websocket(self, config: Dict[str, Any]) -> Optional[Dict[str, Any]]: """Connect to WebSocket based agent.""" try: # WebSocket connection setup ws_url = config['websocket_url'] return { 'type': 'websocket', 'url': ws_url, 'protocols': config.get('protocols', []), 'headers': config.get('headers', {}) } except Exception as e: logger.error(f"Failed to connect to WebSocket: {e}") return None def _connect_grpc(self, config: Dict[str, Any]) -> Optional[Dict[str, Any]]: """Connect to gRPC based agent.""" try: # gRPC connection setup grpc_endpoint = config['grpc_endpoint'] return { 'type': 'grpc', 'endpoint': grpc_endpoint, 'service': config.get('service'), 'methods': config.get('methods', {}) } except Exception as e: logger.error(f"Failed to connect to gRPC: {e}") return None def _extract_capabilities(self, config: Dict[str, Any], interface: Any) -> Dict[str, Any]: """Extract capabilities from the connected agent.""" capabilities = { 'keywords': config.get('keywords', []), 'description': config.get('description', ''), 'input_types': config.get('input_types', ['text']), 'output_types': config.get('output_types', ['json']), 'methods': [], 'confidence_patterns': config.get('patterns', []) } # Auto-detect methods if it's a Python object if hasattr(interface, '__dict__'): methods = [method for method in dir(interface) if not method.startswith('_') and callable(getattr(interface, method))] capabilities['methods'] = methods return capabilities def _update_routing_patterns(self, agent_id: str, capabilities: Dict[str, Any]) -> None: """Update routing patterns based on agent capabilities.""" patterns = capabilities.get('confidence_patterns', []) keywords = capabilities.get('keywords', []) # Add keyword-based patterns for keyword in keywords: if keyword not in self.routing_patterns: self.routing_patterns[keyword] = [] self.routing_patterns[keyword].append(agent_id) # Add regex patterns for pattern in patterns: if pattern not in self.routing_patterns: self.routing_patterns[pattern] = [] self.routing_patterns[pattern].append(agent_id) def _select_agent(self, user_input: str, context: Dict[str, Any] = None) -> Optional[str]: """Select the most appropriate agent for the request.""" user_input_lower = user_input.lower() agent_scores = {} # Score agents based on keyword matching for keyword, agent_ids in self.routing_patterns.items(): if keyword.lower() in user_input_lower: for agent_id in agent_ids: if agent_id not in agent_scores: agent_scores[agent_id] = 0 agent_scores[agent_id] += 1 # Score based on capabilities for agent_id, capabilities in self.agent_capabilities.items(): description = capabilities.get('description', '').lower() if any(word in description for word in user_input_lower.split()): if agent_id not in agent_scores: agent_scores[agent_id] = 0 agent_scores[agent_id] += 0.5 # Return agent with highest score if agent_scores: return max(agent_scores, key=agent_scores.get) return None def _calculate_confidence(self, user_input: str, agent_id: str) -> float: """Calculate confidence score for agent selection.""" if agent_id not in self.agent_capabilities: return 0.0 capabilities = self.agent_capabilities[agent_id] keywords = capabilities.get('keywords', []) matches = sum(1 for keyword in keywords if keyword.lower() in user_input.lower()) total_keywords = len(keywords) if keywords else 1 return min(matches / total_keywords, 1.0) def _execute_request(self, agent_id: str, user_input: str, context: Dict[str, Any] = None) -> Dict[str, Any]: """Execute request through the selected agent.""" if agent_id not in self.connected_agents: return { 'status': 'error', 'message': f'Agent {agent_id} not connected' } agent_info = self.connected_agents[agent_id] interface = agent_info['interface'] connection_type = agent_info['connection_type'] try: if connection_type == 'http_api': return self._execute_http_request(interface, user_input, context) elif connection_type == 'python_module': return self._execute_python_request(interface, user_input, context) elif connection_type == 'function_call': return self._execute_function_request(interface, user_input, context) elif connection_type == 'class_instance': return self._execute_instance_request(interface, user_input, context) else: return { 'status': 'error', 'message': f'Unsupported connection type: {connection_type}' } except Exception as e: logger.error(f"Error executing request through agent {agent_id}: {e}") return { 'status': 'error', 'message': f'Execution error: {str(e)}', 'agent_id': agent_id } def _execute_http_request(self, interface: Dict[str, Any], user_input: str, context: Dict[str, Any]) -> Dict[str, Any]: """Execute request through HTTP API.""" try: base_url = interface['base_url'] headers = interface['headers'] # Prepare request payload payload = { 'input': user_input, 'context': context or {} } # Use configured endpoint or default endpoint = interface.get('endpoints', {}).get('process', '/process') url = f"{base_url}{endpoint}" response = requests.post(url, json=payload, headers=headers, timeout=30) if response.status_code == 200: return { 'status': 'success', 'result': response.json(), 'agent_type': 'http_api' } else: return { 'status': 'error', 'message': f'HTTP {response.status_code}: {response.text}', 'agent_type': 'http_api' } except Exception as e: return { 'status': 'error', 'message': f'HTTP request failed: {str(e)}', 'agent_type': 'http_api' } def _execute_python_request(self, interface: Any, user_input: str, context: Dict[str, Any]) -> Dict[str, Any]: """Execute request through Python module/class.""" try: # Try common method names method_names = ['process', 'execute', 'run', 'handle', 'plan'] for method_name in method_names: if hasattr(interface, method_name): method = getattr(interface, method_name) # Call with appropriate parameters if callable(method): # Try different parameter combinations try: result = method(user_input) except TypeError: try: result = method({'input': user_input, 'context': context}) except TypeError: result = method(user_input, context) return { 'status': 'success', 'result': result, 'agent_type': 'python_module', 'method_used': method_name } return { 'status': 'error', 'message': 'No suitable method found in Python module', 'agent_type': 'python_module' } except Exception as e: return { 'status': 'error', 'message': f'Python execution failed: {str(e)}', 'agent_type': 'python_module' } def _execute_function_request(self, interface: Callable, user_input: str, context: Dict[str, Any]) -> Dict[str, Any]: """Execute request through function call.""" try: # Get function signature to determine parameters sig = inspect.signature(interface) params = list(sig.parameters.keys()) if len(params) == 1: result = interface(user_input) elif len(params) == 2: result = interface(user_input, context) else: # Try with keyword arguments result = interface(input=user_input, context=context) return { 'status': 'success', 'result': result, 'agent_type': 'function_call' } except Exception as e: return { 'status': 'error', 'message': f'Function execution failed: {str(e)}', 'agent_type': 'function_call' } def _execute_instance_request(self, interface: Any, user_input: str, context: Dict[str, Any]) -> Dict[str, Any]: """Execute request through class instance.""" return self._execute_python_request(interface, user_input, context) def get_connected_agents(self) -> Dict[str, Any]: """Get information about all connected agents.""" return { agent_id: { 'status': info['status'], 'connection_type': info['connection_type'], 'capabilities': self.agent_capabilities.get(agent_id, {}), 'last_ping': info['last_ping'].isoformat() } for agent_id, info in self.connected_agents.items() } def disconnect_agent(self, agent_id: str) -> bool: """Disconnect an agent from the system.""" if agent_id in self.connected_agents: del self.connected_agents[agent_id] if agent_id in self.agent_capabilities: del self.agent_capabilities[agent_id] # Remove from routing patterns for pattern, agents in self.routing_patterns.items(): if agent_id in agents: agents.remove(agent_id) logger.info(f"Disconnected agent: {agent_id}") return True return False # Global universal connector instance universal_connector = UniversalAgentConnector()

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/Nisarg-123-web/MCP2'

If you have feedback or need assistance with the MCP directory API, please join our Discord server