Skip to main content
Glama
agents.py3.47 kB
"""Agent management for AutoGen MCP.""" from typing import Dict, Optional, Any, List, cast import autogen from autogen import ConversableAgent, Agent from .config import AgentConfig, ServerConfig class AgentManager: """Manages AutoGen agents.""" def __init__(self): """Initialize the agent manager.""" self._agents: Dict[str, ConversableAgent] = {} self._server_config = ServerConfig() def create_agent(self, config: AgentConfig) -> ConversableAgent: """Create a new agent.""" if config.name in self._agents: raise ValueError(f"Agent {config.name} already exists") # Get base configuration agent_config = config.to_autogen_config() # Add default configurations if not provided if not agent_config.get("llm_config"): agent_config["llm_config"] = self._server_config.get_default_llm_config() if not agent_config.get("code_execution_config") and config.type == "assistant": agent_config["code_execution_config"] = self._server_config.get_default_code_execution_config() # Create the appropriate agent type if config.type == "assistant": agent = autogen.AssistantAgent( name=config.name, system_message=agent_config.get("system_message", ""), llm_config=agent_config.get("llm_config"), code_execution_config=agent_config.get("code_execution_config"), human_input_mode="NEVER", max_consecutive_auto_reply=10, ) elif config.type == "user": agent = autogen.UserProxyAgent( name=config.name, human_input_mode="NEVER", max_consecutive_auto_reply=10, system_message=agent_config.get("system_message", ""), code_execution_config=False, ) else: raise ValueError(f"Unknown agent type: {config.type}") self._agents[config.name] = agent return agent def add_agent(self, name: str, agent: ConversableAgent) -> None: """Add an agent to the manager.""" self._agents[name] = agent def get_agent(self, name: str) -> Optional[ConversableAgent]: """Get an agent by name.""" return self._agents.get(name) def get_all_agents(self) -> Dict[str, ConversableAgent]: """Get all agents.""" return self._agents.copy() def clear_all_agents(self) -> None: """Clear all agents.""" self._agents.clear() def get_agent_count(self) -> int: """Get the number of managed agents.""" return len(self._agents) def agent_exists(self, name: str) -> bool: """Check if an agent exists.""" return name in self._agents def list_agents(self) -> List[str]: """List all agent names.""" return list(self._agents.keys()) def remove_agent(self, name: str) -> None: """Remove an agent.""" if name in self._agents: del self._agents[name] def create_group_chat( self, agents: List[ConversableAgent], messages: Optional[List[Dict[str, Any]]] = None, max_round: int = 10, ) -> autogen.GroupChat: """Create a group chat.""" return autogen.GroupChat( agents=cast(List[Agent], agents), messages=messages or [], max_round=max_round, )

Implementation Reference

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/DynamicEndpoints/Autogen_MCP'

If you have feedback or need assistance with the MCP directory API, please join our Discord server