agent_server.py•5.65 kB
"""
AgentServer class for managing an agent in both client and server modes.
"""
import asyncio
import json
import logging
from typing import Any, Dict, Optional
from src.agent.agent import Agent
from src.capabilities.capability import CapabilityRegistry
from src.llm.base_llm import BaseLLM
from src.llm.groq_llm import GroqLLM
from src.mcp.mcp_connection_manager import MCPConnectionManager
from src.mcp.mcp_server_wrapper import MCPServerWrapper
from src.tools.tool import ToolRegistry
from src.utils.context import cleanup_context, get_context, initialize_context
class AgentServer:
    """
    Manages an agent that can operate both as a client and as a server.
    """
    def __init__(
        self, config: Dict[str, Any], server_mode: bool = False, server_name: str = None
    ):
        self.config = config
        self.server_mode = server_mode
        self.server_name = server_name or config.get(
            "server_name", f"agent-server-{id(self)}"
        )
        self.agent = None
        self.server = None
        self.connection_manager = None
        self.tool_registry = None
        self.capability_registry = None
        self.llm_client = None
        self.logger = logging.getLogger("agent-server")
    async def initialize(self):
        """Initialize the agent and server if in server mode."""
        # Initialize context
        await initialize_context(self.config)
        context = get_context()
        # Create connection manager
        self.connection_manager = MCPConnectionManager()
        # Create registries
        self.tool_registry = ToolRegistry()
        self.capability_registry = CapabilityRegistry()
        # Create LLM client
        self.llm_client = self._create_llm_client()
        # Create agent
        self.agent = Agent(
            llm_client=self.llm_client,
            connection_manager=self.connection_manager,
            tool_registry=self.tool_registry,
            capability_registry=self.capability_registry,
            name=self.config.get("agent_name", "agent"),
        )
        # Connect to servers and discover tools
        await self._connect_to_servers_and_discover_tools()
        # Load capabilities from config
        await self._load_capabilities()
        # If in server mode, initialize the server wrapper
        if self.server_mode:
            self.server = MCPServerWrapper(self.agent, self.server_name)
            self.logger.info(
                f"Initialized agent in server mode with name: {self.server_name}"
            )
        else:
            self.logger.info(f"Initialized agent in client-only mode")
        return self
    async def run(self):
        """Run the agent in the appropriate mode."""
        if self.server_mode:
            # Run as a server
            self.logger.info(f"Running agent as server with name: {self.server_name}")
            await self.server.run_stdio_async()
        else:
            # Run as a standalone agent
            self.logger.info("Running agent in interactive conversation mode")
            await self.agent.start_conversation()
    async def cleanup(self):
        """Clean up agent resources."""
        try:
            self.logger.info("Cleaning up agent resources")
            if self.connection_manager:
                await self.connection_manager.disconnect_all()
            await cleanup_context()
            self.logger.info("Cleanup complete")
        except Exception as e:
            self.logger.error(f"Error during cleanup: {e}")
    async def _connect_to_servers_and_discover_tools(self):
        """Connect to servers and discover their tools."""
        try:
            # First establish connections to all servers
            servers = self.config.get("servers", {})
            for name, server_config in servers.items():
                self.logger.info(f"Connecting to server: {name}")
                await self.connection_manager.connect_server(name, server_config)
            # Then discover tools from all servers
            self.logger.info("Discovering tools from servers")
            await self.tool_registry.load_from_config(
                self.config, self.connection_manager
            )
            # Log the tools that were discovered
            tools = self.tool_registry.list_tools()
            self.logger.info(f"Discovered {len(tools)} tools from MCP servers")
        except Exception as e:
            self.logger.error(f"Error connecting to servers or discovering tools: {e}")
            raise
    async def _load_capabilities(self):
        """Load capabilities from the configuration."""
        try:
            self.logger.info("Loading capabilities from configuration")
            await self.capability_registry.load_from_config(self.config)
            # Log the capabilities that were loaded
            capabilities = self.capability_registry.list_capabilities()
            self.logger.info(
                f"Loaded {len(capabilities)} capabilities from configuration"
            )
        except Exception as e:
            self.logger.error(f"Error loading capabilities: {e}")
            raise
    def _create_llm_client(self) -> BaseLLM:
        """Create an LLM client based on configuration."""
        provider = self.config.get("llm_provider", "groq")
        api_key = self.config.get("llm_api_key", "")
        self.logger.info(f"Creating LLM client with provider: {provider}")
        if provider == "groq":
            return GroqLLM(api_key)
        # TODO: add more providers (Anthropic, OpenAI...)
        else:
            raise ValueError(f"Unsupported LLM provider: {provider}")