send_message
Send a message to an A2A agent, specifying the agent URL and message content. Optionally include a session ID for multi-turn conversations. Receive the agent's response with a task ID for tracking.
Instructions
Send a message to an A2A agent.
Args: agent_url: URL of the A2A agent message: Message to send session_id: Optional session ID for multi-turn conversations
Returns: Agent's response with task_id for future reference
Input Schema
TableJSON Schema
| Name | Required | Description | Default |
|---|---|---|---|
| agent_url | Yes | ||
| message | Yes | ||
| session_id | No |
Implementation Reference
- a2a_mcp_server.py:416-544 (handler)The primary handler for the 'send_message' MCP tool. It registers the tool via @mcp.tool() decorator, validates the agent_url, creates an A2AClient, sends the message payload to the agent, processes the result extracting state, message, and artifacts, and returns a structured response.@mcp.tool() async def send_message( agent_url: str, message: str, session_id: Optional[str] = None, ctx: Context = None, ) -> Dict[str, Any]: """ Send a message to an A2A agent. Args: agent_url: URL of the A2A agent message: Message to send session_id: Optional session ID for multi-turn conversations Returns: Agent's response with task_id for future reference """ if agent_url not in registered_agents: return { "status": "error", "message": f"Agent not registered: {agent_url}", } # Create a client for the agent client = A2AClient(url=agent_url) try: # Generate a task ID task_id = str(uuid.uuid4()) # Store the mapping of task_id to agent_url for later reference task_agent_mapping[task_id] = agent_url # Create the message a2a_message = Message( role="user", parts=[TextPart(text=message)], ) if ctx: await ctx.info(f"Sending message to agent: {message}") # Create payload as a single dictionary payload = { "id": task_id, "message": a2a_message, } if session_id: payload["sessionId"] = session_id # Send the task with the payload result = await client.send_task(payload) # Save task mapping to disk save_to_json(task_agent_mapping, TASK_AGENT_MAPPING_FILE) # Debug: Print the raw response for analysis if ctx: await ctx.info(f"Raw response: {result}") # Create a response dictionary with as much info as we can extract response = { "status": "success", "task_id": task_id, } # Add any available fields from the result if hasattr(result, "sessionId"): response["session_id"] = result.sessionId else: response["session_id"] = None # Try to get the state try: if hasattr(result, "status") and hasattr(result.status, "state"): response["state"] = result.status.state else: response["state"] = "unknown" except Exception as e: response["state"] = f"error_getting_state: {str(e)}" # Try to extract response message try: if hasattr(result, "status") and hasattr(result.status, "message") and result.status.message: response_text = "" for part in result.status.message.parts: if part.type == "text": response_text += part.text if response_text: response["message"] = response_text except Exception as e: response["message_error"] = f"Error extracting message: {str(e)}" # Try to get artifacts try: if hasattr(result, "artifacts") and result.artifacts: artifacts_data = [] for artifact in result.artifacts: artifact_data = { "name": artifact.name if hasattr(artifact, "name") else "unnamed_artifact", "contents": [], } for part in artifact.parts: if part.type == "text": artifact_data["contents"].append({ "type": "text", "text": part.text, }) elif part.type == "data": artifact_data["contents"].append({ "type": "data", "data": part.data, }) artifacts_data.append(artifact_data) response["artifacts"] = artifacts_data except Exception as e: response["artifacts_error"] = f"Error extracting artifacts: {str(e)}" return response except Exception as e: return { "status": "error", "message": f"Error sending message: {str(e)}", }
- a2a_mcp_server.py:416-416 (registration)The @mcp.tool() decorator registers the send_message function as an MCP tool.@mcp.tool()