Skip to main content
Glama
server.py27.9 kB
#!/usr/bin/env python3 import os import json import sys import asyncio from typing import Dict, Any, List, Optional from datetime import datetime from dotenv import load_dotenv import autogen from autogen import ( Agent, AssistantAgent, UserProxyAgent, GroupChat, GroupChatManager, ConversableAgent ) try: from autogen import TeachableAgent, RetrieveUserProxyAgent except ImportError: # Fallback for older versions TeachableAgent = None RetrieveUserProxyAgent = None import mcp.types as types from mcp import server from .agents import AgentManager from .config import ServerConfig, AgentConfig from .workflows import WorkflowManager load_dotenv() class EnhancedAutoGenServer: def __init__(self): """Initialize the enhanced AutoGen MCP server.""" self.config_path = os.getenv("AUTOGEN_MCP_CONFIG") if not self.config_path: # Create a default config if not specified self.config = { "llm_config": { "config_list": [ { "model": "gpt-4o", "api_key": os.getenv("OPENAI_API_KEY") } ], "temperature": 0.7 }, "code_execution_config": { "work_dir": "coding", "use_docker": False } } else: with open(self.config_path) as f: self.config = json.load(f) self.server_config = ServerConfig( default_llm_config=self.config.get("llm_config"), default_code_execution_config=self.config.get("code_execution_config") ) self.agent_manager = AgentManager() self.workflow_manager = WorkflowManager() self.chat_history = [] self.resource_cache = {} # Enhanced capabilities self.capabilities = { "tools": True, "prompts": True, "resources": True, "sampling": False # Can be enabled if needed } async def handle_tool_call(self, tool_name: str, arguments: Dict[str, Any]) -> Dict[str, Any]: """Handle tool calls with enhanced AutoGen features.""" try: if tool_name == "create_agent": return await self._create_agent(arguments) elif tool_name == "create_workflow": return await self._create_workflow(arguments) elif tool_name == "execute_chat": return await self._execute_chat(arguments) elif tool_name == "execute_group_chat": return await self._execute_group_chat(arguments) elif tool_name == "execute_nested_chat": return await self._execute_nested_chat(arguments) elif tool_name == "execute_swarm": return await self._execute_swarm(arguments) elif tool_name == "execute_workflow": return await self._execute_workflow(arguments) elif tool_name == "manage_agent_memory": return await self._manage_agent_memory(arguments) elif tool_name == "configure_teachability": return await self._configure_teachability(arguments) elif tool_name == "get_agent_status": return await self._get_agent_status(arguments) elif tool_name == "get_resource": return await self._get_resource(arguments) else: return {"error": f"Unknown tool: {tool_name}"} except Exception as e: return {"error": str(e)} async def _create_agent(self, args: Dict[str, Any]) -> Dict[str, Any]: """Create an enhanced AutoGen agent.""" name = args["name"] agent_type = args["type"] system_message = args.get("system_message", "You are a helpful AI assistant.") llm_config = args.get("llm_config", self.server_config.default_llm_config) code_execution_config = args.get("code_execution_config", self.server_config.default_code_execution_config) human_input_mode = args.get("human_input_mode", "NEVER") tools = args.get("tools", []) teachability_config = args.get("teachability", {}) try: if agent_type == "assistant": agent = AssistantAgent( name=name, system_message=system_message, llm_config=llm_config, human_input_mode=human_input_mode, ) elif agent_type == "user_proxy": agent = UserProxyAgent( name=name, system_message=system_message, code_execution_config=code_execution_config, human_input_mode=human_input_mode, ) elif agent_type == "conversable": agent = ConversableAgent( name=name, system_message=system_message, llm_config=llm_config, human_input_mode=human_input_mode, ) elif agent_type == "teachable" and TeachableAgent: agent = TeachableAgent( name=name, system_message=system_message, llm_config=llm_config, teach_config=teachability_config, ) elif agent_type == "retrievable" and RetrieveUserProxyAgent: agent = RetrieveUserProxyAgent( name=name, system_message=system_message, code_execution_config=code_execution_config, retrieve_config=args.get("retrieve_config", {}), ) else: return {"error": f"Unknown or unsupported agent type: {agent_type}"} # Register tools if provided if tools: for tool in tools: if hasattr(agent, 'register_for_execution'): agent.register_for_execution(name=tool["name"])(tool["function"]) self.agent_manager.add_agent(name, agent) return { "success": True, "message": f"Agent '{name}' created successfully", "agent_info": { "name": name, "type": agent_type, "capabilities": { "llm_enabled": hasattr(agent, "llm_config") and agent.llm_config is not None, "code_execution": hasattr(agent, "code_execution_config") and agent.code_execution_config is not False, "teachable": agent_type == "teachable", "tools_count": len(tools) } } } except Exception as e: return {"error": f"Failed to create agent: {str(e)}"} async def _create_workflow(self, args: Dict[str, Any]) -> Dict[str, Any]: """Create a complete multi-agent workflow.""" workflow_name = args["workflow_name"] workflow_type = args["workflow_type"] agents_config = args["agents"] task_description = args["task_description"] max_rounds = args.get("max_rounds", 10) termination_conditions = args.get("termination_conditions", ["TERMINATE"]) try: # Create agents for the workflow created_agents = [] for agent_config in agents_config: agent_result = await self._create_agent(agent_config) if agent_result.get("success"): created_agents.append(agent_config["name"]) # Store workflow configuration workflow_config = { "name": workflow_name, "type": workflow_type, "agents": created_agents, "task": task_description, "max_rounds": max_rounds, "termination_conditions": termination_conditions, "created_at": datetime.now().isoformat() } self.workflow_manager.add_workflow(workflow_name, workflow_config) return { "success": True, "message": f"Workflow '{workflow_name}' created with {len(created_agents)} agents", "workflow_info": workflow_config } except Exception as e: return {"error": f"Failed to create workflow: {str(e)}"} async def _execute_chat(self, args: Dict[str, Any]) -> Dict[str, Any]: """Execute an enhanced chat between two agents.""" initiator_name = args["initiator"] responder_name = args["responder"] message = args["message"] max_turns = args.get("max_turns", 10) clear_history = args.get("clear_history", False) summary_method = args.get("summary_method", "last_msg") try: initiator = self.agent_manager.get_agent(initiator_name) responder = self.agent_manager.get_agent(responder_name) if not initiator or not responder: return {"error": "One or both agents not found"} if clear_history: if hasattr(initiator, 'clear_history'): initiator.clear_history() if hasattr(responder, 'clear_history'): responder.clear_history() # Execute the chat chat_result = initiator.initiate_chat( responder, message=message, max_turns=max_turns, summary_method=summary_method ) # Store chat history chat_record = { "timestamp": datetime.now().isoformat(), "initiator": initiator_name, "responder": responder_name, "initial_message": message, "result": str(chat_result), "turns": max_turns } self.chat_history.append(chat_record) return { "success": True, "chat_result": str(chat_result), "summary": getattr(chat_result, 'summary', "Chat completed"), "cost": getattr(chat_result, 'cost', None) } except Exception as e: return {"error": f"Chat execution failed: {str(e)}"} async def _execute_group_chat(self, args: Dict[str, Any]) -> Dict[str, Any]: """Execute an enhanced group chat.""" agent_names = args["agent_names"] initiator_name = args["initiator"] message = args["message"] max_round = args.get("max_round", 10) speaker_selection_method = args.get("speaker_selection_method", "auto") allow_repeat_speaker = args.get("allow_repeat_speaker", True) admin_name = args.get("admin_name", "Admin") try: agents = [] for name in agent_names: agent = self.agent_manager.get_agent(name) if agent: agents.append(agent) else: return {"error": f"Agent '{name}' not found"} initiator = self.agent_manager.get_agent(initiator_name) if not initiator: return {"error": f"Initiator agent '{initiator_name}' not found"} # Create group chat with enhanced features group_chat = GroupChat( agents=agents, messages=[], max_round=max_round, speaker_selection_method=speaker_selection_method, allow_repeat_speaker=allow_repeat_speaker, admin_name=admin_name ) manager = GroupChatManager( groupchat=group_chat, llm_config=self.server_config.default_llm_config ) # Execute group chat chat_result = initiator.initiate_chat(manager, message=message) # Store chat history chat_record = { "timestamp": datetime.now().isoformat(), "type": "group_chat", "agents": agent_names, "initiator": initiator_name, "initial_message": message, "result": str(chat_result), "rounds": max_round } self.chat_history.append(chat_record) return { "success": True, "chat_result": str(chat_result), "participants": agent_names, "total_messages": len(group_chat.messages) } except Exception as e: return {"error": f"Group chat execution failed: {str(e)}"} async def _execute_nested_chat(self, args: Dict[str, Any]) -> Dict[str, Any]: """Execute nested conversations with hierarchical structures.""" primary_agent_name = args["primary_agent"] secondary_agent_names = args["secondary_agents"] task = args["task"] max_turns = args.get("max_turns", 5) nesting_depth = args.get("nesting_depth", 2) try: primary_agent = self.agent_manager.get_agent(primary_agent_name) if not primary_agent: return {"error": f"Primary agent '{primary_agent_name}' not found"} secondary_agents = [] for name in secondary_agent_names: agent = self.agent_manager.get_agent(name) if agent: secondary_agents.append(agent) else: return {"error": f"Secondary agent '{name}' not found"} # Implementation for nested chat nested_results = [] for depth in range(nesting_depth): for secondary_agent in secondary_agents: nested_task = f"Depth {depth + 1}: {task}" result = primary_agent.initiate_chat( secondary_agent, message=nested_task, max_turns=max_turns ) nested_results.append({ "depth": depth + 1, "secondary_agent": secondary_agent.name, "result": str(result) }) return { "success": True, "nested_results": nested_results, "total_conversations": len(nested_results) } except Exception as e: return {"error": f"Nested chat execution failed: {str(e)}"} async def _execute_swarm(self, args: Dict[str, Any]) -> Dict[str, Any]: """Execute a swarm-based multi-agent conversation.""" # This would be implemented based on specific swarm requirements # For now, return a placeholder implementation return { "success": True, "message": "Swarm execution not yet fully implemented", "placeholder": True } async def _execute_workflow(self, args: Dict[str, Any]) -> Dict[str, Any]: """Execute a predefined workflow.""" workflow_name = args["workflow_name"] input_data = args["input_data"] output_format = args.get("output_format", "json") quality_checks = args.get("quality_checks", False) try: result = await self.workflow_manager.execute_workflow( workflow_name, input_data, output_format, quality_checks ) return { "success": True, "workflow": workflow_name, "result": result, "format": output_format } except Exception as e: return {"error": f"Workflow execution failed: {str(e)}"} async def _manage_agent_memory(self, args: Dict[str, Any]) -> Dict[str, Any]: """Manage agent memory and knowledge persistence.""" agent_name = args["agent_name"] action = args["action"] memory_type = args.get("memory_type", "conversation") try: agent = self.agent_manager.get_agent(agent_name) if not agent: return {"error": f"Agent '{agent_name}' not found"} if action == "save": # Save memory data data = args.get("data", {}) # Implementation depends on agent type and memory system return {"success": True, "message": f"Memory saved for {agent_name}"} elif action == "load": # Load memory data return {"success": True, "memory_data": {}, "message": f"Memory loaded for {agent_name}"} elif action == "clear": # Clear memory if hasattr(agent, 'clear_history'): agent.clear_history() return {"success": True, "message": f"Memory cleared for {agent_name}"} elif action == "query": # Query memory query = args.get("query", "") return {"success": True, "query_result": [], "message": f"Memory queried for {agent_name}"} else: return {"error": f"Unknown memory action: {action}"} except Exception as e: return {"error": f"Memory management failed: {str(e)}"} async def _configure_teachability(self, args: Dict[str, Any]) -> Dict[str, Any]: """Configure teachability features for agents.""" agent_name = args["agent_name"] enable_teachability = args["enable_teachability"] try: agent = self.agent_manager.get_agent(agent_name) if not agent: return {"error": f"Agent '{agent_name}' not found"} # Implementation would depend on the specific teachability requirements return { "success": True, "message": f"Teachability {'enabled' if enable_teachability else 'disabled'} for {agent_name}", "teachability_enabled": enable_teachability } except Exception as e: return {"error": f"Teachability configuration failed: {str(e)}"} async def _get_agent_status(self, args: Dict[str, Any]) -> Dict[str, Any]: """Get detailed status and metrics for agents.""" agent_name = args.get("agent_name") include_metrics = args.get("include_metrics", False) include_memory = args.get("include_memory", False) try: if agent_name: agent = self.agent_manager.get_agent(agent_name) if not agent: return {"error": f"Agent '{agent_name}' not found"} agents = {agent_name: agent} else: agents = self.agent_manager.get_all_agents() status_data = {} for name, agent in agents.items(): status = { "name": name, "type": type(agent).__name__, "active": True, "capabilities": { "llm_enabled": hasattr(agent, "llm_config") and agent.llm_config is not None, "code_execution": hasattr(agent, "code_execution_config") and agent.code_execution_config is not False, } } if include_metrics: status["metrics"] = { "total_messages": len(getattr(agent, "chat_messages", {})), "conversations": 0 # Would need to track this } if include_memory: status["memory"] = { "history_length": len(getattr(agent, "chat_messages", {})), "memory_size": 0 # Would need to calculate this } status_data[name] = status return { "success": True, "agents": status_data, "total_agents": len(status_data) } except Exception as e: return {"error": f"Status retrieval failed: {str(e)}"} async def _get_resource(self, args: Dict[str, Any]) -> Dict[str, Any]: """Get resource data for MCP resources.""" uri = args["uri"] try: if uri == "autogen://agents/list": agents = self.agent_manager.get_all_agents() agent_list = [] for name, agent in agents.items(): agent_list.append({ "name": name, "type": type(agent).__name__, "status": "active" }) return {"agents": agent_list} elif uri == "autogen://workflows/templates": templates = { "sequential": { "description": "Sequential agent workflow", "agents": ["initiator", "processor", "reviewer"], "flow": "linear", }, "group_chat": { "description": "Group chat with multiple agents", "agents": ["facilitator", "expert1", "expert2", "critic"], "flow": "collaborative", }, "hierarchical": { "description": "Hierarchical workflow with manager", "agents": ["manager", "worker1", "worker2", "validator"], "flow": "top-down", }, "available_workflows": list(self.workflow_manager._workflow_templates.keys()) } return templates elif uri == "autogen://chat/history": # Return recent chat history recent_history = self.chat_history[-10:] # Last 10 chats history_text = "" for chat in recent_history: history_text += f"[{chat['timestamp']}] {chat['initiator']} -> {chat.get('responder', 'Group')}: {chat['initial_message'][:100]}...\n" return {"content": history_text} elif uri == "autogen://config/current": return { "llm_config": self.server_config.default_llm_config, "code_execution_config": self.server_config.default_code_execution_config, "capabilities": self.capabilities } else: return {"error": f"Unknown resource URI: {uri}"} except Exception as e: return {"error": f"Resource retrieval failed: {str(e)}"} # MCP-style tool handlers for compatibility async def handle_create_agent(self, arguments: Dict[str, Any]) -> Dict[str, Any]: """Handle create_agent tool call.""" return await self._create_agent(arguments) async def handle_delete_agent(self, arguments: Dict[str, Any]) -> Dict[str, Any]: """Handle delete_agent tool call.""" agent_name = arguments.get("name") if not agent_name: return {"content": [{"type": "text", "text": "Agent name is required"}]} try: self.agent_manager.agents.pop(agent_name, None) return {"content": [{"type": "text", "text": f"Agent {agent_name} deleted successfully"}]} except Exception as e: return {"content": [{"type": "text", "text": f"Error deleting agent: {str(e)}"}]} async def handle_list_agents(self, arguments: Dict[str, Any]) -> Dict[str, Any]: """Handle list_agents tool call.""" agents = self.agent_manager.list_agents() return {"content": [{"type": "text", "text": f"Available agents: {', '.join(agents)}"}]} async def handle_start_chat(self, arguments: Dict[str, Any]) -> Dict[str, Any]: """Handle start_chat tool call.""" return await self._execute_chat(arguments) async def handle_send_message(self, arguments: Dict[str, Any]) -> Dict[str, Any]: """Handle send_message tool call.""" agent_name = arguments.get("agent_name") message = arguments.get("message") if not agent_name or not message: return {"content": [{"type": "text", "text": "Agent name and message are required"}]} try: agent = self.agent_manager.get_agent(agent_name) if not agent: return {"content": [{"type": "text", "text": f"Agent {agent_name} not found"}]} # Simulate message sending response = f"Message sent to {agent_name}: {message}" return {"content": [{"type": "text", "text": response}]} except Exception as e: return {"content": [{"type": "text", "text": f"Error sending message: {str(e)}"}]} async def handle_get_chat_history(self, arguments: Dict[str, Any]) -> Dict[str, Any]: """Handle get_chat_history tool call.""" history = self.chat_history[-10:] # Last 10 chats history_text = "\n".join([f"[{chat['timestamp']}] {chat['initial_message'][:100]}..." for chat in history]) return {"content": [{"type": "text", "text": history_text or "No chat history available"}]} async def handle_create_group_chat(self, arguments: Dict[str, Any]) -> Dict[str, Any]: """Handle create_group_chat tool call.""" return await self._execute_group_chat(arguments) async def handle_execute_workflow(self, arguments: Dict[str, Any]) -> Dict[str, Any]: """Handle execute_workflow tool call.""" return await self._execute_workflow(arguments) async def handle_teach_agent(self, arguments: Dict[str, Any]) -> Dict[str, Any]: """Handle teach_agent tool call.""" return await self._configure_teachability(arguments) async def handle_save_conversation(self, arguments: Dict[str, Any]) -> Dict[str, Any]: """Handle save_conversation tool call.""" conversation_id = arguments.get("conversation_id", "default") conversation_data = arguments.get("conversation_data", {}) try: # Save conversation logic self.resource_cache[f"conversation_{conversation_id}"] = { "id": conversation_id, "data": conversation_data, "timestamp": datetime.now().isoformat() } return {"content": [{"type": "text", "text": f"Conversation {conversation_id} saved successfully"}]} except Exception as e: return {"content": [{"type": "text", "text": f"Error saving conversation: {str(e)}"}]} def main(): """Main function for command line execution.""" if len(sys.argv) != 3: print(json.dumps({"error": "Usage: python server.py <tool_name> <arguments_json>"})) sys.exit(1) tool_name = sys.argv[1] try: arguments = json.loads(sys.argv[2]) except json.JSONDecodeError: print(json.dumps({"error": "Invalid JSON arguments"})) sys.exit(1) # Create server instance server = EnhancedAutoGenServer() # Run the tool call try: result = asyncio.run(server.handle_tool_call(tool_name, arguments)) print(json.dumps(result)) except Exception as e: print(json.dumps({"error": str(e)})) sys.exit(1) if __name__ == "__main__": main()

Implementation Reference

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/DynamicEndpoints/Autogen_MCP'

If you have feedback or need assistance with the MCP directory API, please join our Discord server