OpenAI MCP Server

#!/usr/bin/env python3 # claude_code/commands/multi_agent_client.py """Multi-agent MCP client implementation with synchronization capabilities.""" import asyncio import sys import os import json import logging import uuid import argparse import time from typing import Optional, Dict, Any, List, Set, Tuple from contextlib import AsyncExitStack from dataclasses import dataclass, field, asdict from rich.console import Console from rich.prompt import Prompt from rich.panel import Panel from rich.markdown import Markdown from rich.table import Table from rich.live import Live from rich import print as rprint from mcp import ClientSession, StdioServerParameters from mcp.client.stdio import stdio_client from anthropic import Anthropic from dotenv import load_dotenv # Setup logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) # Load environment variables load_dotenv() # Console for rich output console = Console() @dataclass class Agent: """Agent representation for multi-agent scenarios.""" id: str name: str role: str model: str system_prompt: str conversation: List[Dict[str, Any]] = field(default_factory=list) connected_agents: Set[str] = field(default_factory=set) message_queue: asyncio.Queue = field(default_factory=asyncio.Queue) def __post_init__(self): """Initialize the conversation with system prompt.""" self.conversation = [{ "role": "system", "content": self.system_prompt }] @dataclass class Message: """Message for agent communication.""" id: str = field(default_factory=lambda: str(uuid.uuid4())) sender_id: str = "" sender_name: str = "" recipient_id: Optional[str] = None # None means broadcast to all recipient_name: Optional[str] = None content: str = "" timestamp: float = field(default_factory=time.time) @classmethod def create(cls, sender_id: str, sender_name: str, content: str, recipient_id: Optional[str] = None, recipient_name: Optional[str] = None) -> 'Message': """Create a new message.""" return cls( sender_id=sender_id, sender_name=sender_name, recipient_id=recipient_id, recipient_name=recipient_name, content=content ) class AgentCoordinator: """Coordinates communication between multiple agents.""" def __init__(self): """Initialize the agent coordinator.""" self.agents: Dict[str, Agent] = {} self.message_history: List[Message] = [] self.broadcast_queue: asyncio.Queue = asyncio.Queue() def add_agent(self, agent: Agent) -> None: """Add a new agent to the coordinator. Args: agent: The agent to add """ self.agents[agent.id] = agent def remove_agent(self, agent_id: str) -> None: """Remove an agent from the coordinator. Args: agent_id: ID of the agent to remove """ if agent_id in self.agents: del self.agents[agent_id] async def broadcast_message(self, message: Message) -> None: """Broadcast a message to all agents. Args: message: The message to broadcast """ self.message_history.append(message) for agent_id, agent in self.agents.items(): # Don't send message back to sender if agent_id != message.sender_id: await agent.message_queue.put(message) logger.debug(f"Queued message from {message.sender_name} to {agent.name}") async def send_direct_message(self, message: Message) -> None: """Send a message to a specific agent. Args: message: The message to send """ self.message_history.append(message) if message.recipient_id in self.agents: recipient = self.agents[message.recipient_id] await recipient.message_queue.put(message) logger.debug(f"Queued direct message from {message.sender_name} to {recipient.name}") async def process_message(self, message: Message) -> None: """Process an incoming message and route appropriately. Args: message: The message to process """ if message.recipient_id is None: # Broadcast message await self.broadcast_message(message) else: # Direct message await self.send_direct_message(message) def get_message_history_for_agent(self, agent_id: str) -> List[Dict[str, Any]]: """Get conversation messages formatted for a specific agent. Args: agent_id: ID of the agent Returns: List of messages in the format expected by Claude """ agent = self.agents.get(agent_id) if not agent: return [] messages = [] # Start with the agent's conversation history messages.extend(agent.conversation) # Add relevant messages from the message history for msg in self.message_history: # Include messages sent by this agent or addressed to this agent # or broadcast messages from other agents if (msg.sender_id == agent_id or msg.recipient_id == agent_id or (msg.recipient_id is None and msg.sender_id != agent_id)): if msg.sender_id == agent_id: # This agent's own messages messages.append({ "role": "assistant", "content": msg.content }) else: # Messages from other agents sender = self.agents.get(msg.sender_id) sender_name = sender.name if sender else msg.sender_name if msg.recipient_id is None: # Broadcast message messages.append({ "role": "user", "content": f"{sender_name}: {msg.content}" }) else: # Direct message messages.append({ "role": "user", "content": f"{sender_name} (direct): {msg.content}" }) return messages class MultiAgentMCPClient: """Multi-agent Model Context Protocol client with synchronization capabilities.""" def __init__(self, config_path: str = None): """Initialize the multi-agent MCP client. Args: config_path: Path to the agent configuration file """ self.session: Optional[ClientSession] = None self.exit_stack = AsyncExitStack() self.anthropic = Anthropic() self.coordinator = AgentCoordinator() self.available_tools = [] # Configuration self.config_path = config_path self.agents_config = self._load_agents_config() def _load_agents_config(self) -> List[Dict[str, Any]]: """Load agent configurations from file. Returns: List of agent configurations """ default_config = [{ "name": "Assistant", "role": "general assistant", "model": "claude-3-5-sonnet-20241022", "system_prompt": "You are a helpful AI assistant participating in a multi-agent conversation. You can communicate with other agents and humans to solve complex problems." }] if not self.config_path: return default_config try: with open(self.config_path, 'r', encoding='utf-8') as f: return json.load(f) except Exception as e: logger.warning(f"Failed to load agent configuration: {e}") return default_config def setup_agents(self) -> None: """Set up agents based on configuration.""" for idx, agent_config in enumerate(self.agents_config): agent_id = str(uuid.uuid4()) agent = Agent( id=agent_id, name=agent_config.get("name", f"Agent-{idx+1}"), role=agent_config.get("role", "assistant"), model=agent_config.get("model", "claude-3-5-sonnet-20241022"), system_prompt=agent_config.get("system_prompt", "You are a helpful AI assistant.") ) self.coordinator.add_agent(agent) logger.info(f"Created agent: {agent.name} ({agent.role})") async def connect_to_server(self, server_script_path: str): """Connect to an MCP server. Args: server_script_path: Path to the server script (.py or .js) """ is_python = server_script_path.endswith('.py') is_js = server_script_path.endswith('.js') if not (is_python or is_js): raise ValueError("Server script must be a .py or .js file") command = "python" if is_python else "node" server_params = StdioServerParameters( command=command, args=[server_script_path], env=None ) stdio_transport = await self.exit_stack.enter_async_context(stdio_client(server_params)) self.stdio, self.write = stdio_transport self.session = await self.exit_stack.enter_async_context(ClientSession(self.stdio, self.write)) await self.session.initialize() # List available tools response = await self.session.list_tools() tools = response.tools self.available_tools = [{ "name": tool.name, "description": tool.description, "input_schema": tool.inputSchema } for tool in response.tools] tool_names = [tool.name for tool in tools] logger.info(f"Connected to server with tools: {tool_names}") console.print(Panel.fit( f"[bold green]Connected to MCP server[/bold green]\n" f"Available tools: {', '.join(tool_names)}", title="Connection Status", border_style="green" )) async def process_agent_query(self, agent_id: str, query: str, is_direct_message: bool = False) -> str: """Process a query using Claude and available tools for a specific agent. Args: agent_id: The ID of the agent processing the query query: The query to process is_direct_message: Whether this is a direct message from user Returns: The response text """ agent = self.coordinator.agents.get(agent_id) if not agent: return "Error: Agent not found" # Get the conversation history for this agent messages = self.coordinator.get_message_history_for_agent(agent_id) # Add the current query if it's a direct message if is_direct_message: messages.append({ "role": "user", "content": query }) # Initial Claude API call response = self.anthropic.messages.create( model=agent.model, max_tokens=1000, messages=messages, tools=self.available_tools ) # Process response and handle tool calls tool_results = [] final_text = "" assistant_message_content = [] for content in response.content: if content.type == 'text': final_text = content.text assistant_message_content.append(content) elif content.type == 'tool_use': tool_name = content.name tool_args = content.input # Execute tool call result = await self.session.call_tool(tool_name, tool_args) tool_results.append({"call": tool_name, "result": result}) console.print(f"[bold cyan]Agent {agent.name} calling tool {tool_name}[/bold cyan]") assistant_message_content.append(content) messages.append({ "role": "assistant", "content": assistant_message_content }) messages.append({ "role": "user", "content": [ { "type": "tool_result", "tool_use_id": content.id, "content": result.content } ] }) # Get next response from Claude response = self.anthropic.messages.create( model=agent.model, max_tokens=1000, messages=messages, tools=self.available_tools ) final_text = response.content[0].text # Create a message from the agent's response message = Message.create( sender_id=agent_id, sender_name=agent.name, content=final_text, recipient_id=None # Broadcast to all ) # Process the message await self.coordinator.process_message(message) return final_text async def process_user_query(self, query: str, target_agent_id: Optional[str] = None) -> None: """Process a query from the user and route it to agents. Args: query: The user query target_agent_id: Optional ID of a specific agent to target """ # Handle special commands if query.startswith("/"): await self._handle_special_command(query) return if target_agent_id: # Direct message to a specific agent agent = self.coordinator.agents.get(target_agent_id) if not agent: console.print("[bold red]Error: Agent not found[/bold red]") return console.print(f"[bold blue]User → {agent.name}:[/bold blue] {query}") response = await self.process_agent_query(target_agent_id, query, is_direct_message=True) console.print(f"[bold green]{agent.name}:[/bold green] {response}") else: # Broadcast to all agents console.print(f"[bold blue]User (broadcast):[/bold blue] {query}") # Create a message from the user message = Message.create( sender_id="user", sender_name="User", content=query, recipient_id=None # Broadcast ) # Process the message await self.coordinator.process_message(message) # Process in parallel for all agents tasks = [] for agent_id in self.coordinator.agents: tasks.append(asyncio.create_task(self.process_agent_query(agent_id, query))) # Wait for all agents to respond await asyncio.gather(*tasks) async def run_agent_thought_loops(self) -> None: """Run continuous thought loops for each agent in the background.""" while True: for agent_id, agent in self.coordinator.agents.items(): try: # Check if there are new messages for this agent if not agent.message_queue.empty(): message = await agent.message_queue.get() # Log the message if message.recipient_id is None: console.print(f"[bold cyan]{message.sender_name} (broadcast):[/bold cyan] {message.content}") else: console.print(f"[bold cyan]{message.sender_name} → {agent.name}:[/bold cyan] {message.content}") # Give the agent a chance to respond await self.process_agent_query(agent_id, message.content) # Mark the message as processed agent.message_queue.task_done() except Exception as e: logger.exception(f"Error in agent thought loop for {agent.name}: {e}") # Small delay to prevent CPU hogging await asyncio.sleep(0.1) async def _handle_special_command(self, command: str) -> None: """Handle special commands. Args: command: The command string starting with / """ parts = command.strip().split() cmd = parts[0].lower() args = parts[1:] if cmd == "/help": self._show_help() elif cmd == "/agents": self._show_agents() elif cmd == "/talk": if len(args) < 2: console.print("[bold red]Error: /talk requires agent name and message[/bold red]") return agent_name = args[0] message = " ".join(args[1:]) # Find agent by name target_agent = None for agent_id, agent in self.coordinator.agents.items(): if agent.name.lower() == agent_name.lower(): target_agent = agent break if target_agent: await self.process_user_query(message, target_agent.id) else: console.print(f"[bold red]Error: Agent '{agent_name}' not found[/bold red]") elif cmd == "/history": self._show_message_history() elif cmd == "/quit" or cmd == "/exit": console.print("[bold yellow]Exiting multi-agent session...[/bold yellow]") sys.exit(0) else: console.print(f"[bold red]Unknown command: {cmd}[/bold red]") self._show_help() def _show_help(self) -> None: """Show help information.""" help_text = """ # Multi-Agent MCP Client Commands - **/help**: Show this help message - **/agents**: List all active agents - **/talk <agent> <message>**: Send a direct message to a specific agent - **/history**: Show message history - **/quit**, **/exit**: Exit the application To broadcast a message to all agents, simply type your message without any command. """ console.print(Markdown(help_text)) def _show_agents(self) -> None: """Show information about all active agents.""" table = Table(title="Active Agents") table.add_column("Name", style="cyan") table.add_column("Role", style="green") table.add_column("Model", style="blue") for agent_id, agent in self.coordinator.agents.items(): table.add_row(agent.name, agent.role, agent.model) console.print(table) def _show_message_history(self) -> None: """Show the message history.""" if not self.coordinator.message_history: console.print("[yellow]No messages in history yet.[/yellow]") return table = Table(title="Message History") table.add_column("Time", style="cyan") table.add_column("From", style="green") table.add_column("To", style="blue") table.add_column("Message", style="white") for msg in self.coordinator.message_history: timestamp = time.strftime("%H:%M:%S", time.localtime(msg.timestamp)) recipient = msg.recipient_name if msg.recipient_name else "All" table.add_row(timestamp, msg.sender_name, recipient, msg.content[:50] + ("..." if len(msg.content) > 50 else "")) console.print(table) async def chat_loop(self) -> None: """Run the interactive chat loop.""" console.print(Panel.fit( "[bold green]Multi-Agent MCP Client Started![/bold green]\n" "Type your messages to broadcast to all agents or use /help for commands.", title="Welcome", border_style="green" )) # Start the agent thought loop in the background thought_loop_task = asyncio.create_task(self.run_agent_thought_loops()) try: # First, show active agents self._show_agents() # Main chat loop while True: try: query = Prompt.ask("\n[bold blue]>[/bold blue]").strip() if not query: continue if query.lower() == "quit" or query.lower() == "exit": break await self.process_user_query(query) except KeyboardInterrupt: console.print("\n[bold yellow]Operation cancelled.[/bold yellow]") continue except Exception as e: console.print(f"\n[bold red]Error: {str(e)}[/bold red]") logger.exception("Error processing query") finally: # Cancel the thought loop task thought_loop_task.cancel() try: await thought_loop_task except asyncio.CancelledError: pass async def cleanup(self) -> None: """Clean up resources.""" await self.exit_stack.aclose() def add_arguments(parser: argparse.ArgumentParser) -> None: """Add command-specific arguments to the parser. Args: parser: Argument parser """ parser.add_argument( "server_script", type=str, help="Path to the server script (.py or .js)" ) parser.add_argument( "--config", type=str, help="Path to agent configuration JSON file" ) def execute(args: argparse.Namespace) -> int: """Execute the multi-agent client command. Args: args: Command arguments Returns: Exit code """ try: client = MultiAgentMCPClient(config_path=args.config) client.setup_agents() async def run_client(): try: await client.connect_to_server(args.server_script) await client.chat_loop() finally: await client.cleanup() asyncio.run(run_client()) return 0 except Exception as e: logger.exception(f"Error running multi-agent MCP client: {e}") console.print(f"[bold red]Error: {str(e)}[/bold red]") return 1 def main() -> int: """Run the multi-agent client command as a standalone script.""" parser = argparse.ArgumentParser(description="Run the Claude Code Multi-Agent MCP client") add_arguments(parser) args = parser.parse_args() return execute(args) if __name__ == "__main__": sys.exit(main())