Skip to main content
Glama

ACP-MCP-Server

Guide.md58.5 kB
# Complete Guide: Building an ACP-MCP Server ## Table of Contents 1. [Overview](#overview) 2. [Architecture](#architecture) 3. [Prerequisites](#prerequisites) 4. [Core Components](#core-components) 5. [Implementation Guide](#implementation-guide) 6. [Deployment](#deployment) 7. [Usage Examples](#usage-examples) 8. [Advanced Features](#advanced-features) 9. [Troubleshooting](#troubleshooting) ## Overview The ACP-MCP Server acts as a bridge between two complementary AI communication protocols: - **ACP (Agent Communication Protocol)**: IBM's REST-based standard for agent-to-agent communication - **MCP (Model Context Protocol)**: Anthropic's JSON-RPC standard for model-to-tool connectivity ### Why Bridge These Protocols? **ACP Strengths:** - Agent-to-agent communication - Multi-modal messaging (text, images, JSON) - Async/sync/streaming execution modes - Complex workflow orchestration - Interactive agent patterns (await/resume) **MCP Strengths:** - Model-to-tool connectivity - Wide ecosystem support (Claude Desktop, etc.) - Standardized tool definitions - Resource management - Simple integration patterns **Combined Value:** - Enable MCP clients to access sophisticated ACP agent networks - Allow ACP agents to be used as MCP tools - Create unified multi-agent + multi-tool workflows ## Architecture ```mermaid graph TB subgraph "MCP Ecosystem" MC[MCP Client<br/>Claude Desktop] MR[MCP Resources] MT[MCP Tools] end subgraph "ACP-MCP Server" BRIDGE[Bridge Server<br/>FastMCP + ACP SDK] AD[Agent Discovery] MM[Message Bridge] RO[Run Orchestrator] AR[Agent Router] IM[Interactive Manager] end subgraph "ACP Ecosystem" AS[ACP Server] A1[Agent 1] A2[Agent 2] A3[Agent N] end MC <-->|JSON-RPC| BRIDGE BRIDGE <-->|REST| AS AS --- A1 AS --- A2 AS --- A3 BRIDGE --- AD BRIDGE --- MM BRIDGE --- RO BRIDGE --- AR BRIDGE --- IM ``` ### Component Responsibilities | Component | Purpose | Input | Output | |-----------|---------|-------|--------| | Agent Discovery | Find and register ACP agents | ACP `/agents` endpoint | MCP resource definitions | | Message Bridge | Convert message formats | ACP MessageParts | MCP content types | | Run Orchestrator | Execute ACP agents | MCP tool calls | Agent run results | | Agent Router | Route to specialized agents | Request analysis | Target agent selection | | Interactive Manager | Handle await/resume patterns | User interactions | Resumed agent execution | ## Prerequisites ### Software Requirements ```bash # Python 3.11+ python --version # Required packages pip install fastmcp acp-sdk uvicorn asyncio aiohttp pydantic ``` ### ACP Server Setup ```bash # Clone ACP repository git clone https://github.com/i-am-bee/acp.git cd acp # Install ACP SDK uv add acp-sdk # Create a simple echo agent cat > echo_agent.py << 'EOF' from acp_sdk.server import Server from acp_sdk.models import Message server = Server() @server.agent() async def echo(input: list[Message]): """Simple echo agent for testing""" for message in input: yield message server.run(port=8000) EOF # Start ACP server python echo_agent.py ``` ### Verify ACP Server ```bash # Test agent discovery curl http://localhost:8000/agents # Test agent execution curl -X POST http://localhost:8000/runs \ -H "Content-Type: application/json" \ -d '{"agent_name": "echo", "input": [{"parts": [{"content": "Hello ACP!"}]}]}' ``` ## Core Components ### 1. Agent Discovery Tool **Purpose**: Automatically discover ACP agents and register them as MCP resources. ```python # agent_discovery.py import asyncio import aiohttp from typing import List, Dict, Any from fastmcp import FastMCP, Context from pydantic import BaseModel class ACPAgent(BaseModel): name: str description: str metadata: Dict[str, Any] = {} class AgentDiscoveryTool: def __init__(self, acp_base_url: str = "http://localhost:8000"): self.acp_base_url = acp_base_url self.discovered_agents: Dict[str, ACPAgent] = {} async def discover_agents(self) -> List[ACPAgent]: """Discover all available ACP agents""" async with aiohttp.ClientSession() as session: try: async with session.get(f"{self.acp_base_url}/agents") as response: if response.status == 200: data = await response.json() agents = [ACPAgent(**agent) for agent in data.get("agents", [])] # Update discovered agents cache for agent in agents: self.discovered_agents[agent.name] = agent return agents else: print(f"Failed to discover agents: {response.status}") return [] except Exception as e: print(f"Error discovering agents: {e}") return [] def get_mcp_resource_uri(self, agent_name: str) -> str: """Generate MCP resource URI for an ACP agent""" return f"acp://agents/{agent_name}" async def get_agent_capabilities(self, agent_name: str) -> Dict[str, Any]: """Get detailed capabilities of a specific agent""" # This could be extended to call a capabilities endpoint # For now, return basic info from discovery agent = self.discovered_agents.get(agent_name) if agent: return { "name": agent.name, "description": agent.description, "metadata": agent.metadata, "supports_streaming": True, # ACP supports streaming "supports_multimodal": True, # ACP supports multi-modal "interaction_modes": ["sync", "async", "stream"] } return {} # Integration with FastMCP def register_discovery_tools(mcp: FastMCP, discovery: AgentDiscoveryTool): @mcp.tool() async def discover_acp_agents(ctx: Context) -> str: """Discover all available ACP agents and register them as resources""" agents = await discovery.discover_agents() result = { "discovered_count": len(agents), "agents": [] } for agent in agents: agent_info = { "name": agent.name, "description": agent.description, "resource_uri": discovery.get_mcp_resource_uri(agent.name), "capabilities": await discovery.get_agent_capabilities(agent.name) } result["agents"].append(agent_info) await ctx.info(f"Discovered {len(agents)} ACP agents") return str(result) @mcp.tool() async def get_agent_info(agent_name: str, ctx: Context) -> str: """Get detailed information about a specific ACP agent""" capabilities = await discovery.get_agent_capabilities(agent_name) if capabilities: await ctx.info(f"Retrieved info for agent: {agent_name}") return str(capabilities) else: await ctx.warning(f"Agent not found: {agent_name}") return f"Agent '{agent_name}' not found" ``` ### 2. Multi-Modal Message Bridge **Purpose**: Convert between ACP's rich message formats and MCP content types. ```python # message_bridge.py from typing import List, Dict, Any, Union from pydantic import BaseModel from fastmcp import FastMCP, Context import aiohttp import base64 class ACPMessagePart(BaseModel): content: str content_type: str content_encoding: str = "plain" content_url: str = None name: str = None class ACPMessage(BaseModel): parts: List[ACPMessagePart] class MCPContent(BaseModel): type: str # "text", "image", "resource" text: str = None data: str = None mimeType: str = None class MessageBridge: def __init__(self): self.supported_content_types = { "text/plain": "text", "text/markdown": "text", "image/jpeg": "image", "image/png": "image", "image/gif": "image", "application/json": "text" } async def acp_to_mcp(self, acp_messages: List[ACPMessage]) -> List[MCPContent]: """Convert ACP messages to MCP content format""" mcp_content = [] for message in acp_messages: for part in message.parts: content = await self._convert_message_part(part) if content: mcp_content.append(content) return mcp_content async def mcp_to_acp(self, text_content: str) -> List[ACPMessage]: """Convert MCP text to ACP message format""" return [ACPMessage(parts=[ ACPMessagePart( content=text_content, content_type="text/plain", content_encoding="plain" ) ])] async def _convert_message_part(self, part: ACPMessagePart) -> MCPContent: """Convert a single ACP message part to MCP content""" content_type = part.content_type if content_type.startswith("text/"): return MCPContent( type="text", text=part.content, mimeType=content_type ) elif content_type.startswith("image/"): # Handle image content if part.content_url: # Download image from URL image_data = await self._download_content(part.content_url) if image_data: return MCPContent( type="image", data=base64.b64encode(image_data).decode(), mimeType=content_type ) else: # Direct image data return MCPContent( type="image", data=part.content, mimeType=content_type ) elif content_type == "application/json": return MCPContent( type="text", text=part.content, mimeType=content_type ) else: # Fallback to text representation return MCPContent( type="text", text=str(part.content), mimeType="text/plain" ) async def _download_content(self, url: str) -> bytes: """Download content from URL""" try: async with aiohttp.ClientSession() as session: async with session.get(url) as response: if response.status == 200: return await response.read() except Exception as e: print(f"Error downloading content from {url}: {e}") return None # Integration with FastMCP def register_bridge_tools(mcp: FastMCP, bridge: MessageBridge): @mcp.tool() async def convert_acp_message( acp_message_json: str, ctx: Context ) -> str: """Convert ACP message format to MCP-compatible format""" try: import json message_data = json.loads(acp_message_json) acp_message = ACPMessage(**message_data) mcp_content = await bridge.acp_to_mcp([acp_message]) await ctx.info(f"Converted ACP message with {len(acp_message.parts)} parts") return json.dumps([content.dict() for content in mcp_content], indent=2) except Exception as e: await ctx.error(f"Error converting message: {e}") return f"Error: {e}" @mcp.tool() async def analyze_message_content( acp_message_json: str, ctx: Context ) -> str: """Analyze the content types and structure of an ACP message""" try: import json message_data = json.loads(acp_message_json) acp_message = ACPMessage(**message_data) analysis = { "total_parts": len(acp_message.parts), "content_types": {}, "has_urls": False, "encodings": set(), "total_size": 0 } for part in acp_message.parts: content_type = part.content_type analysis["content_types"][content_type] = analysis["content_types"].get(content_type, 0) + 1 analysis["encodings"].add(part.content_encoding) if part.content_url: analysis["has_urls"] = True if part.content: analysis["total_size"] += len(part.content) analysis["encodings"] = list(analysis["encodings"]) await ctx.info(f"Analyzed message with {analysis['total_parts']} parts") return json.dumps(analysis, indent=2) except Exception as e: await ctx.error(f"Error analyzing message: {e}") return f"Error: {e}" ``` ### 3. Run Orchestrator **Purpose**: Execute ACP agents with different modes through MCP interface. ```python # run_orchestrator.py import asyncio import aiohttp import json from typing import Dict, Any, List, Optional from enum import Enum from pydantic import BaseModel from fastmcp import FastMCP, Context from message_bridge import MessageBridge, ACPMessage class RunMode(str, Enum): SYNC = "sync" ASYNC = "async" STREAM = "stream" class RunStatus(str, Enum): CREATED = "created" RUNNING = "running" COMPLETED = "completed" FAILED = "failed" class ACPRun(BaseModel): run_id: str agent_name: str status: RunStatus output: List[Dict[str, Any]] = [] error: Optional[str] = None session_id: Optional[str] = None class RunOrchestrator: def __init__(self, acp_base_url: str = "http://localhost:8000"): self.acp_base_url = acp_base_url self.message_bridge = MessageBridge() self.active_runs: Dict[str, ACPRun] = {} async def execute_agent_sync( self, agent_name: str, input_text: str, session_id: Optional[str] = None ) -> ACPRun: """Execute an ACP agent synchronously""" # Convert input to ACP format acp_messages = await self.message_bridge.mcp_to_acp(input_text) payload = { "agent_name": agent_name, "input": [msg.dict() for msg in acp_messages], "mode": "sync" } if session_id: payload["session_id"] = session_id async with aiohttp.ClientSession() as session: try: async with session.post( f"{self.acp_base_url}/runs", json=payload, headers={"Content-Type": "application/json"} ) as response: if response.status == 200: result = await response.json() run = ACPRun(**result) self.active_runs[run.run_id] = run return run else: error_msg = f"ACP request failed: {response.status}" return ACPRun( run_id="error", agent_name=agent_name, status=RunStatus.FAILED, error=error_msg ) except Exception as e: return ACPRun( run_id="error", agent_name=agent_name, status=RunStatus.FAILED, error=str(e) ) async def execute_agent_async( self, agent_name: str, input_text: str, session_id: Optional[str] = None ) -> str: """Start an asynchronous ACP agent execution""" acp_messages = await self.message_bridge.mcp_to_acp(input_text) payload = { "agent_name": agent_name, "input": [msg.dict() for msg in acp_messages], "mode": "async" } if session_id: payload["session_id"] = session_id async with aiohttp.ClientSession() as session: try: async with session.post( f"{self.acp_base_url}/runs", json=payload, headers={"Content-Type": "application/json"} ) as response: if response.status == 200: result = await response.json() run_id = result.get("run_id") # Store partial run info run = ACPRun( run_id=run_id, agent_name=agent_name, status=RunStatus.CREATED ) self.active_runs[run_id] = run return run_id else: raise Exception(f"Failed to start async run: {response.status}") except Exception as e: raise Exception(f"Error starting async run: {e}") async def get_run_status(self, run_id: str) -> ACPRun: """Get the status of an async run""" async with aiohttp.ClientSession() as session: try: async with session.get(f"{self.acp_base_url}/runs/{run_id}") as response: if response.status == 200: result = await response.json() run = ACPRun(**result) self.active_runs[run_id] = run return run else: raise Exception(f"Failed to get run status: {response.status}") except Exception as e: # Return cached run or error if run_id in self.active_runs: return self.active_runs[run_id] else: return ACPRun( run_id=run_id, agent_name="unknown", status=RunStatus.FAILED, error=str(e) ) async def execute_agent_stream( self, agent_name: str, input_text: str, session_id: Optional[str] = None ) -> AsyncGenerator[str, None]: """Execute an ACP agent with streaming""" acp_messages = await self.message_bridge.mcp_to_acp(input_text) payload = { "agent_name": agent_name, "input": [msg.dict() for msg in acp_messages], "mode": "stream" } if session_id: payload["session_id"] = session_id async with aiohttp.ClientSession() as session: try: async with session.post( f"{self.acp_base_url}/runs", json=payload, headers={ "Content-Type": "application/json", "Accept": "text/event-stream" } ) as response: if response.status == 200: async for line in response.content: line_str = line.decode('utf-8').strip() if line_str.startswith('data: '): data = line_str[6:] # Remove 'data: ' prefix if data and data != '[DONE]': yield data else: yield f"Error: Stream failed with status {response.status}" except Exception as e: yield f"Error: {e}" # Integration with FastMCP def register_orchestrator_tools(mcp: FastMCP, orchestrator: RunOrchestrator): @mcp.tool() async def run_acp_agent( agent_name: str, input_text: str, mode: str = "sync", session_id: str = None, ctx: Context = None ) -> str: """Execute an ACP agent with specified mode""" try: if mode == "sync": await ctx.info(f"Running agent '{agent_name}' synchronously") run = await orchestrator.execute_agent_sync(agent_name, input_text, session_id) if run.status == RunStatus.COMPLETED: # Convert output back to readable format if run.output: mcp_content = await orchestrator.message_bridge.acp_to_mcp([ ACPMessage(parts=run.output) ]) result = "\n".join([content.text for content in mcp_content if content.text]) await ctx.info(f"Agent completed successfully") return result else: return "Agent completed with no output" else: await ctx.error(f"Agent failed: {run.error}") return f"Error: {run.error}" elif mode == "async": await ctx.info(f"Starting agent '{agent_name}' asynchronously") run_id = await orchestrator.execute_agent_async(agent_name, input_text, session_id) return f"Started async run with ID: {run_id}" else: await ctx.error(f"Unsupported mode: {mode}") return f"Unsupported mode: {mode}. Use 'sync' or 'async'" except Exception as e: await ctx.error(f"Error running agent: {e}") return f"Error: {e}" @mcp.tool() async def get_async_run_result(run_id: str, ctx: Context) -> str: """Get the result of an asynchronous run""" try: await ctx.info(f"Checking status of run: {run_id}") run = await orchestrator.get_run_status(run_id) result = { "run_id": run.run_id, "agent_name": run.agent_name, "status": run.status, "has_output": len(run.output) > 0, "error": run.error } if run.status == RunStatus.COMPLETED and run.output: # Convert output to readable format mcp_content = await orchestrator.message_bridge.acp_to_mcp([ ACPMessage(parts=run.output) ]) result["output"] = "\n".join([content.text for content in mcp_content if content.text]) return json.dumps(result, indent=2) except Exception as e: await ctx.error(f"Error getting run result: {e}") return f"Error: {e}" @mcp.tool() async def list_active_runs(ctx: Context) -> str: """List all active runs""" runs_info = [] for run_id, run in orchestrator.active_runs.items(): runs_info.append({ "run_id": run_id, "agent_name": run.agent_name, "status": run.status, "has_error": run.error is not None }) await ctx.info(f"Found {len(runs_info)} active runs") return json.dumps(runs_info, indent=2) ``` ### 4. Agent Router **Purpose**: Intelligently route requests to specialized ACP agents. ```python # agent_router.py import json from typing import Dict, List, Optional, Any from pydantic import BaseModel from fastmcp import FastMCP, Context from agent_discovery import AgentDiscoveryTool, ACPAgent from run_orchestrator import RunOrchestrator class RoutingRule(BaseModel): keywords: List[str] agent_name: str priority: int = 1 description: str = "" class RouterStrategy(BaseModel): name: str description: str rules: List[RoutingRule] class AgentRouter: def __init__( self, discovery: AgentDiscoveryTool, orchestrator: RunOrchestrator ): self.discovery = discovery self.orchestrator = orchestrator self.strategies: Dict[str, RouterStrategy] = {} self.default_strategy = self._create_default_strategy() def _create_default_strategy(self) -> RouterStrategy: """Create a default routing strategy based on common patterns""" return RouterStrategy( name="default", description="Default routing based on common keywords", rules=[ RoutingRule( keywords=["translate", "translation", "spanish", "french", "language"], agent_name="translation", priority=10, description="Route translation requests" ), RoutingRule( keywords=["weather", "temperature", "forecast", "climate"], agent_name="weather", priority=10, description="Route weather requests" ), RoutingRule( keywords=["calculate", "math", "compute", "sum", "multiply"], agent_name="calculator", priority=10, description="Route calculation requests" ), RoutingRule( keywords=["echo", "repeat", "test"], agent_name="echo", priority=5, description="Route test requests to echo" ) ] ) def add_strategy(self, strategy: RouterStrategy): """Add a custom routing strategy""" self.strategies[strategy.name] = strategy async def route_request( self, input_text: str, strategy_name: str = "default", fallback_agent: str = "echo" ) -> str: """Route a request to the most appropriate agent""" # Get available agents available_agents = await self.discovery.discover_agents() available_agent_names = {agent.name for agent in available_agents} # Select strategy strategy = self.strategies.get(strategy_name, self.default_strategy) # Find best matching rule best_rule = None best_score = 0 input_lower = input_text.lower() for rule in strategy.rules: # Check if agent is available if rule.agent_name not in available_agent_names: continue # Calculate match score score = 0 for keyword in rule.keywords: if keyword.lower() in input_lower: score += rule.priority if score > best_score: best_score = score best_rule = rule # Determine target agent if best_rule and best_score > 0: target_agent = best_rule.agent_name routing_reason = f"Matched rule: {best_rule.description} (score: {best_score})" else: # Fallback to first available agent or specified fallback if fallback_agent in available_agent_names: target_agent = fallback_agent routing_reason = f"Used fallback agent: {fallback_agent}" elif available_agents: target_agent = available_agents[0].name routing_reason = f"Used first available agent: {target_agent}" else: raise Exception("No agents available for routing") return target_agent, routing_reason async def execute_routed_request( self, input_text: str, strategy_name: str = "default", mode: str = "sync", session_id: Optional[str] = None ) -> Dict[str, Any]: """Route and execute a request""" try: # Route the request target_agent, routing_reason = await self.route_request(input_text, strategy_name) # Execute the agent if mode == "sync": run = await self.orchestrator.execute_agent_sync( target_agent, input_text, session_id ) result = { "routed_to": target_agent, "routing_reason": routing_reason, "execution_mode": mode, "status": run.status, "run_id": run.run_id } if run.output: # Convert output for display from message_bridge import ACPMessage mcp_content = await self.orchestrator.message_bridge.acp_to_mcp([ ACPMessage(parts=run.output) ]) result["output"] = "\n".join([content.text for content in mcp_content if content.text]) if run.error: result["error"] = run.error return result else: # Async mode run_id = await self.orchestrator.execute_agent_async( target_agent, input_text, session_id ) return { "routed_to": target_agent, "routing_reason": routing_reason, "execution_mode": mode, "run_id": run_id, "status": "async_started" } except Exception as e: return { "error": str(e), "routed_to": None, "routing_reason": "Routing failed" } # Integration with FastMCP def register_router_tools(mcp: FastMCP, router: AgentRouter): @mcp.tool() async def smart_route_request( input_text: str, strategy: str = "default", mode: str = "sync", session_id: str = None, ctx: Context = None ) -> str: """Intelligently route a request to the best ACP agent""" try: await ctx.info(f"Routing request using strategy: {strategy}") result = await router.execute_routed_request( input_text=input_text, strategy_name=strategy, mode=mode, session_id=session_id ) if "error" not in result: await ctx.info(f"Routed to agent: {result['routed_to']}") await ctx.debug(f"Routing reason: {result['routing_reason']}") else: await ctx.error(f"Routing failed: {result['error']}") return json.dumps(result, indent=2) except Exception as e: await ctx.error(f"Error in smart routing: {e}") return f"Error: {e}" @mcp.tool() async def add_routing_rule( strategy_name: str, keywords: str, # Comma-separated agent_name: str, priority: int = 5, description: str = "", ctx: Context = None ) -> str: """Add a new routing rule to a strategy""" try: keyword_list = [k.strip() for k in keywords.split(",")] new_rule = RoutingRule( keywords=keyword_list, agent_name=agent_name, priority=priority, description=description or f"Route to {agent_name}" ) # Get or create strategy if strategy_name in router.strategies: strategy = router.strategies[strategy_name] strategy.rules.append(new_rule) else: strategy = RouterStrategy( name=strategy_name, description=f"Custom strategy: {strategy_name}", rules=[new_rule] ) router.strategies[strategy_name] = strategy await ctx.info(f"Added routing rule to strategy '{strategy_name}'") return f"Successfully added rule: {keyword_list} -> {agent_name}" except Exception as e: await ctx.error(f"Error adding routing rule: {e}") return f"Error: {e}" @mcp.tool() async def list_routing_strategies(ctx: Context) -> str: """List all available routing strategies and their rules""" try: strategies_info = {} # Include default strategy strategies_info["default"] = { "description": router.default_strategy.description, "rules": [ { "keywords": rule.keywords, "agent": rule.agent_name, "priority": rule.priority, "description": rule.description } for rule in router.default_strategy.rules ] } # Include custom strategies for name, strategy in router.strategies.items(): strategies_info[name] = { "description": strategy.description, "rules": [ { "keywords": rule.keywords, "agent": rule.agent_name, "priority": rule.priority, "description": rule.description } for rule in strategy.rules ] } await ctx.info(f"Found {len(strategies_info)} routing strategies") return json.dumps(strategies_info, indent=2) except Exception as e: await ctx.error(f"Error listing strategies: {e}") return f"Error: {e}" @mcp.tool() async def test_routing( input_text: str, strategy: str = "default", ctx: Context = None ) -> str: """Test routing without executing - shows which agent would be selected""" try: target_agent, routing_reason = await router.route_request(input_text, strategy) result = { "input": input_text, "strategy": strategy, "target_agent": target_agent, "routing_reason": routing_reason, "would_execute": True } await ctx.info(f"Test routing: '{input_text}' -> {target_agent}") return json.dumps(result, indent=2) except Exception as e: await ctx.error(f"Error testing routing: {e}") return f"Error: {e}" ``` ### 5. Interactive Agent Manager **Purpose**: Handle ACP's await/resume patterns for interactive agents. ```python # interactive_manager.py import json import asyncio from typing import Dict, Optional, Any from pydantic import BaseModel from fastmcp import FastMCP, Context from run_orchestrator import RunOrchestrator from enum import Enum class InteractionState(str, Enum): WAITING = "waiting" RESUMED = "resumed" COMPLETED = "completed" TIMEOUT = "timeout" ERROR = "error" class PendingInteraction(BaseModel): run_id: str agent_name: str session_id: Optional[str] await_message: str timestamp: float timeout_seconds: int = 300 # 5 minutes default class InteractiveManager: def __init__(self, orchestrator: RunOrchestrator): self.orchestrator = orchestrator self.pending_interactions: Dict[str, PendingInteraction] = {} self.interaction_results: Dict[str, Any] = {} async def start_interactive_agent( self, agent_name: str, initial_input: str, session_id: Optional[str] = None, timeout_seconds: int = 300 ) -> Dict[str, Any]: """Start an interactive agent that may require user input""" try: # Start the agent execution run = await self.orchestrator.execute_agent_sync( agent_name=agent_name, input_text=initial_input, session_id=session_id ) # Check if agent is waiting for input if hasattr(run, 'await_request') and run.await_request: # Agent is waiting for input pending = PendingInteraction( run_id=run.run_id, agent_name=agent_name, session_id=session_id, await_message=run.await_request.get('message', 'Agent is waiting for input'), timestamp=asyncio.get_event_loop().time(), timeout_seconds=timeout_seconds ) self.pending_interactions[run.run_id] = pending return { "status": "awaiting_input", "run_id": run.run_id, "message": pending.await_message, "timeout_seconds": timeout_seconds } else: # Agent completed normally output = "" if run.output: from message_bridge import ACPMessage mcp_content = await self.orchestrator.message_bridge.acp_to_mcp([ ACPMessage(parts=run.output) ]) output = "\n".join([content.text for content in mcp_content if content.text]) return { "status": "completed", "run_id": run.run_id, "output": output, "error": run.error } except Exception as e: return { "status": "error", "error": str(e) } async def resume_interactive_agent( self, run_id: str, user_input: str ) -> Dict[str, Any]: """Resume an agent that was waiting for user input""" if run_id not in self.pending_interactions: return { "status": "error", "error": f"No pending interaction found for run_id: {run_id}" } pending = self.pending_interactions[run_id] # Check timeout current_time = asyncio.get_event_loop().time() if current_time - pending.timestamp > pending.timeout_seconds: del self.pending_interactions[run_id] return { "status": "timeout", "error": "Interaction timed out" } try: # Resume the agent with user input # Note: This is a simplified version - actual ACP resume implementation may vary resume_payload = { "run_id": run_id, "resume_input": user_input } # For now, we'll simulate resume by starting a new session # In a real implementation, this would use ACP's resume endpoint run = await self.orchestrator.execute_agent_sync( agent_name=pending.agent_name, input_text=user_input, session_id=pending.session_id ) # Clean up pending interaction del self.pending_interactions[run_id] # Check if agent needs more input if hasattr(run, 'await_request') and run.await_request: # Agent needs more input new_pending = PendingInteraction( run_id=run.run_id, agent_name=pending.agent_name, session_id=pending.session_id, await_message=run.await_request.get('message', 'Agent is waiting for more input'), timestamp=current_time, timeout_seconds=pending.timeout_seconds ) self.pending_interactions[run.run_id] = new_pending return { "status": "awaiting_more_input", "run_id": run.run_id, "message": new_pending.await_message } else: # Agent completed output = "" if run.output: from message_bridge import ACPMessage mcp_content = await self.orchestrator.message_bridge.acp_to_mcp([ ACPMessage(parts=run.output) ]) output = "\n".join([content.text for content in mcp_content if content.text]) # Store final result self.interaction_results[run_id] = { "output": output, "error": run.error, "completed_at": current_time } return { "status": "completed", "run_id": run.run_id, "output": output, "error": run.error } except Exception as e: # Clean up on error if run_id in self.pending_interactions: del self.pending_interactions[run_id] return { "status": "error", "error": str(e) } async def get_pending_interactions(self) -> Dict[str, Any]: """Get all pending interactions""" current_time = asyncio.get_event_loop().time() active_interactions = {} expired_interactions = [] for run_id, pending in self.pending_interactions.items(): if current_time - pending.timestamp > pending.timeout_seconds: expired_interactions.append(run_id) else: active_interactions[run_id] = { "agent_name": pending.agent_name, "message": pending.await_message, "waiting_seconds": int(current_time - pending.timestamp), "timeout_in_seconds": int(pending.timeout_seconds - (current_time - pending.timestamp)) } # Clean up expired interactions for run_id in expired_interactions: del self.pending_interactions[run_id] return { "active_count": len(active_interactions), "expired_count": len(expired_interactions), "interactions": active_interactions } async def cancel_interaction(self, run_id: str) -> bool: """Cancel a pending interaction""" if run_id in self.pending_interactions: del self.pending_interactions[run_id] return True return False # Integration with FastMCP def register_interactive_tools(mcp: FastMCP, manager: InteractiveManager): @mcp.tool() async def start_interactive_agent( agent_name: str, initial_input: str, session_id: str = None, timeout_minutes: int = 5, ctx: Context = None ) -> str: """Start an interactive ACP agent that may require user input""" try: await ctx.info(f"Starting interactive agent: {agent_name}") result = await manager.start_interactive_agent( agent_name=agent_name, initial_input=initial_input, session_id=session_id, timeout_seconds=timeout_minutes * 60 ) if result["status"] == "awaiting_input": await ctx.info(f"Agent is waiting for input: {result['message']}") elif result["status"] == "completed": await ctx.info("Agent completed without requiring input") else: await ctx.error(f"Agent failed: {result.get('error', 'Unknown error')}") return json.dumps(result, indent=2) except Exception as e: await ctx.error(f"Error starting interactive agent: {e}") return f"Error: {e}" @mcp.tool() async def provide_user_input( run_id: str, user_input: str, ctx: Context = None ) -> str: """Provide user input to resume a waiting interactive agent""" try: await ctx.info(f"Providing input to run: {run_id}") result = await manager.resume_interactive_agent(run_id, user_input) if result["status"] == "completed": await ctx.info("Interactive agent completed successfully") elif result["status"] == "awaiting_more_input": await ctx.info("Agent is waiting for additional input") else: await ctx.error(f"Resume failed: {result.get('error', 'Unknown error')}") return json.dumps(result, indent=2) except Exception as e: await ctx.error(f"Error providing user input: {e}") return f"Error: {e}" @mcp.tool() async def list_pending_interactions(ctx: Context) -> str: """List all pending interactive agents waiting for input""" try: interactions = await manager.get_pending_interactions() await ctx.info(f"Found {interactions['active_count']} pending interactions") if interactions['expired_count'] > 0: await ctx.warning(f"Cleaned up {interactions['expired_count']} expired interactions") return json.dumps(interactions, indent=2) except Exception as e: await ctx.error(f"Error listing interactions: {e}") return f"Error: {e}" @mcp.tool() async def cancel_interaction( run_id: str, ctx: Context = None ) -> str: """Cancel a pending interactive agent""" try: success = await manager.cancel_interaction(run_id) if success: await ctx.info(f"Cancelled interaction: {run_id}") return f"Successfully cancelled interaction: {run_id}" else: await ctx.warning(f"No pending interaction found: {run_id}") return f"No pending interaction found with ID: {run_id}" except Exception as e: await ctx.error(f"Error cancelling interaction: {e}") return f"Error: {e}" ``` ## Main Server Implementation ```python # acp_mcp_server.py import asyncio from fastmcp import FastMCP # Import all components from agent_discovery import AgentDiscoveryTool, register_discovery_tools from message_bridge import MessageBridge, register_bridge_tools from run_orchestrator import RunOrchestrator, register_orchestrator_tools from agent_router import AgentRouter, register_router_tools from interactive_manager import InteractiveManager, register_interactive_tools class ACPMCPServer: def __init__(self, acp_base_url: str = "http://localhost:8000"): # Initialize FastMCP server self.mcp = FastMCP("ACP-MCP Bridge Server") # Initialize all components self.discovery = AgentDiscoveryTool(acp_base_url) self.message_bridge = MessageBridge() self.orchestrator = RunOrchestrator(acp_base_url) self.router = AgentRouter(self.discovery, self.orchestrator) self.interactive_manager = InteractiveManager(self.orchestrator) # Register all tools self._register_all_tools() def _register_all_tools(self): """Register all component tools with FastMCP""" # Core server info tool @self.mcp.tool() async def get_server_info() -> str: """Get information about the ACP-MCP bridge server""" info = { "name": "ACP-MCP Bridge Server", "description": "Bridge between Agent Communication Protocol and Model Context Protocol", "components": [ "Agent Discovery Tool", "Multi-Modal Message Bridge", "Run Orchestrator", "Agent Router", "Interactive Manager" ], "acp_endpoint": self.discovery.acp_base_url, "capabilities": [ "Agent discovery and registration", "Multi-modal message conversion", "Sync/async/streaming execution", "Intelligent agent routing", "Interactive agent sessions" ] } return str(info) # Register component tools register_discovery_tools(self.mcp, self.discovery) register_bridge_tools(self.mcp, self.message_bridge) register_orchestrator_tools(self.mcp, self.orchestrator) register_router_tools(self.mcp, self.router) register_interactive_tools(self.mcp, self.interactive_manager) def run(self, port: int = 3000): """Start the ACP-MCP bridge server""" print(f"Starting ACP-MCP Bridge Server on port {port}") print(f"Connecting to ACP server at: {self.discovery.acp_base_url}") print("\nAvailable tools:") print("- Agent Discovery & Registration") print("- Multi-Modal Message Bridge") print("- Run Orchestrator (sync/async/stream)") print("- Intelligent Agent Router") print("- Interactive Agent Manager") print(f"\nServer running at http://localhost:{port}") self.mcp.run(port=port) # Example usage and deployment if __name__ == "__main__": # Create and start the server server = ACPMCPServer(acp_base_url="http://localhost:8000") server.run(port=3000) ``` ## Deployment ### Docker Deployment ```dockerfile # Dockerfile FROM python:3.11-slim WORKDIR /app # Copy requirements COPY requirements.txt . RUN pip install -r requirements.txt # Copy source code COPY *.py ./ # Expose port EXPOSE 3000 # Health check HEALTHCHECK --interval=30s --timeout=10s --start-period=5s --retries=3 \ CMD curl -f http://localhost:3000/health || exit 1 # Start server CMD ["python", "acp_mcp_server.py"] ``` ```yaml # docker-compose.yml version: '3.8' services: acp-server: build: ./acp-server ports: - "8000:8000" environment: - PORT=8000 healthcheck: test: ["CMD", "curl", "-f", "http://localhost:8000/agents"] interval: 30s timeout: 10s retries: 3 acp-mcp-bridge: build: . ports: - "3000:3000" environment: - ACP_BASE_URL=http://acp-server:8000 - PORT=3000 depends_on: acp-server: condition: service_healthy healthcheck: test: ["CMD", "curl", "-f", "http://localhost:3000/health"] interval: 30s timeout: 10s retries: 3 ``` ### Claude Desktop Integration ```json // claude_desktop_config.json { "mcpServers": { "acp-bridge": { "command": "python", "args": ["/path/to/acp_mcp_server.py"], "env": { "ACP_BASE_URL": "http://localhost:8000" } } } } ``` ## Usage Examples ### Basic Agent Discovery ```bash # Using curl to test MCP tools curl -X POST http://localhost:3000/tools/call \ -H "Content-Type: application/json" \ -d '{ "name": "discover_acp_agents", "arguments": {} }' ``` ### Smart Routing Example ```bash # Route a translation request curl -X POST http://localhost:3000/tools/call \ -H "Content-Type: application/json" \ -d '{ "name": "smart_route_request", "arguments": { "input_text": "Translate hello world to Spanish", "strategy": "default", "mode": "sync" } }' ``` ### Interactive Agent Example ```bash # Start interactive agent curl -X POST http://localhost:3000/tools/call \ -H "Content-Type: application/json" \ -d '{ "name": "start_interactive_agent", "arguments": { "agent_name": "config_bot", "initial_input": "Help me configure my system" } }' # Provide user input curl -X POST http://localhost:3000/tools/call \ -H "Content-Type: application/json" \ -d '{ "name": "provide_user_input", "arguments": { "run_id": "12345678-abcd-1234-abcd-123456789abc", "user_input": "I want to set up authentication" } }' ``` ## Advanced Features ### Custom Routing Strategies ```python # Add custom routing for domain-specific agents custom_strategy = RouterStrategy( name="enterprise", description="Enterprise agent routing", rules=[ RoutingRule( keywords=["security", "auth", "login", "permissions"], agent_name="security_agent", priority=15, description="Route security requests" ), RoutingRule( keywords=["report", "analytics", "dashboard", "metrics"], agent_name="analytics_agent", priority=15, description="Route analytics requests" ) ] ) router.add_strategy(custom_strategy) ``` ### Message Processing Pipeline ```python # Custom message processors for specific content types class ImageProcessor: async def process_image(self, image_url: str) -> str: # Download and analyze image # Return description or analysis pass class JSONProcessor: async def process_json(self, json_data: str) -> str: # Parse and validate JSON # Return formatted or processed data pass ``` ### Health Monitoring ```python # Add health monitoring endpoints @mcp.tool() async def health_check(ctx: Context) -> str: """Check health of all components""" health_status = { "acp_connection": False, "discovered_agents": 0, "active_runs": 0, "pending_interactions": 0 } try: # Test ACP connection agents = await discovery.discover_agents() health_status["acp_connection"] = True health_status["discovered_agents"] = len(agents) # Check active runs health_status["active_runs"] = len(orchestrator.active_runs) # Check pending interactions interactions = await interactive_manager.get_pending_interactions() health_status["pending_interactions"] = interactions["active_count"] overall_health = "healthy" if health_status["acp_connection"] else "unhealthy" health_status["overall"] = overall_health return json.dumps(health_status, indent=2) except Exception as e: health_status["error"] = str(e) health_status["overall"] = "error" return json.dumps(health_status, indent=2) ``` ## Troubleshooting ### Common Issues **1. ACP Connection Failed** ```bash # Check if ACP server is running curl http://localhost:8000/agents # Check Docker logs docker logs acp-server # Verify network connectivity ping localhost ``` **2. Agent Not Found** ```bash # Refresh agent discovery curl -X POST http://localhost:3000/tools/call \ -H "Content-Type: application/json" \ -d '{"name": "discover_acp_agents", "arguments": {}}' ``` **3. Message Conversion Errors** ```python # Enable debug logging import logging logging.basicConfig(level=logging.DEBUG) # Test message conversion manually test_message = ACPMessage(parts=[ ACPMessagePart(content="test", content_type="text/plain") ]) result = await bridge.acp_to_mcp([test_message]) ``` **4. Interactive Agent Timeouts** ```bash # Check pending interactions curl -X POST http://localhost:3000/tools/call \ -H "Content-Type: application/json" \ -d '{"name": "list_pending_interactions", "arguments": {}}' # Cancel timed out interactions curl -X POST http://localhost:3000/tools/call \ -H "Content-Type: application/json" \ -d '{"name": "cancel_interaction", "arguments": {"run_id": "timeout-run-id"}}' ``` ### Performance Optimization **1. Connection Pooling** ```python # Use connection pooling for ACP requests connector = aiohttp.TCPConnector(limit=100, limit_per_host=30) session = aiohttp.ClientSession(connector=connector) ``` **2. Caching** ```python # Cache agent discovery results from functools import lru_cache import time @lru_cache(maxsize=1) async def cached_discover_agents(cache_timestamp: int): return await discovery.discover_agents() # Use with 5-minute cache current_time = int(time.time() // 300) # 5-minute buckets agents = await cached_discover_agents(current_time) ``` **3. Async Optimization** ```python # Use asyncio.gather for parallel operations agents_task = discovery.discover_agents() health_task = check_acp_health() agents, health = await asyncio.gather(agents_task, health_task) ``` This comprehensive guide provides everything needed to build a production-ready ACP-MCP server that bridges these two important AI communication protocols. The modular design allows for easy extension and customization based on specific use cases and requirements.

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/GongRzhe/ACP-MCP-Server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server