Skip to main content
Glama

send_message_stream

Send a message to an A2A agent and receive a streamed response for efficient communication, supporting multi-turn conversations via optional session IDs.

Instructions

Send a message to an A2A agent and stream the response.

Args: agent_url: URL of the A2A agent message: Message to send session_id: Optional session ID for multi-turn conversations

Returns: Stream of agent's responses

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
agent_urlYes
messageYes
session_idNo

Implementation Reference

  • The primary handler function for the 'send_message_stream' MCP tool. It is decorated with @mcp.tool() for automatic registration. The function creates an A2AClient for the given agent_url, generates a task_id, sends a streaming task using client.send_task_streaming, processes incoming TaskStatusUpdateEvent and TaskArtifactUpdateEvent from the stream, accumulates responses, messages, artifacts, and provides real-time updates via ctx.info.
    @mcp.tool() async def send_message_stream( agent_url: str, message: str, session_id: Optional[str] = None, ctx: Context = None, ) -> Dict[str, Any]: """ Send a message to an A2A agent and stream the response. Args: agent_url: URL of the A2A agent message: Message to send session_id: Optional session ID for multi-turn conversations Returns: Stream of agent's responses """ if agent_url not in registered_agents: return { "status": "error", "message": f"Agent not registered: {agent_url}", } # Create a client for the agent client = A2AClient(url=agent_url) try: # Generate a task ID task_id = str(uuid.uuid4()) # Store the mapping of task_id to agent_url for later reference task_agent_mapping[task_id] = agent_url # Save the task mapping to disk save_to_json(task_agent_mapping, TASK_AGENT_MAPPING_FILE) # Create the message a2a_message = Message( role="user", parts=[TextPart(text=message)], ) if ctx: await ctx.info(f"Sending message to agent (streaming): {message}") # Start progress indication if ctx: await ctx.info("Processing...") # Dictionary to accumulate streaming responses complete_response = { "status": "success", "task_id": task_id, "session_id": session_id, "state": "working", "messages": [], "artifacts": [], } # Create payload as a single dictionary payload = { "id": task_id, "message": a2a_message, } if session_id: payload["sessionId"] = session_id # Send the task and subscribe to updates stream = client.send_task_streaming(payload) # Process and report stream events try: all_events = [] async for event in stream: # Save all events for debugging all_events.append({ "type": str(type(event)), "dir": str(dir(event)), }) if hasattr(event, "result"): if hasattr(event.result, "status"): # It's a TaskStatusUpdateEvent status_event = event.result # Update the state if hasattr(status_event, "status") and hasattr(status_event.status, "state"): complete_response["state"] = status_event.status.state # Extract any message if hasattr(status_event, "status") and hasattr(status_event.status, "message") and status_event.status.message: message_text = "" for part in status_event.status.message.parts: if part.type == "text": message_text += part.text if message_text: complete_response["messages"].append(message_text) if ctx: await ctx.info(f"Agent: {message_text}") # If this is the final event, set session ID if hasattr(status_event, "final") and status_event.final: complete_response["session_id"] = getattr(status_event, "sessionId", session_id) elif hasattr(event.result, "artifact"): # It's a TaskArtifactUpdateEvent artifact_event = event.result # Extract artifact content artifact_data = { "name": artifact_event.artifact.name if hasattr(artifact_event.artifact, "name") else "unnamed", "contents": [], } for part in artifact_event.artifact.parts: if part.type == "text": artifact_data["contents"].append({ "type": "text", "text": part.text, }) elif part.type == "data": artifact_data["contents"].append({ "type": "data", "data": part.data, }) complete_response["artifacts"].append(artifact_data) if ctx: await ctx.info(f"Received artifact: {artifact_data['name']}") else: # Unknown event type, try to extract what we can complete_response["unknown_events"] = complete_response.get("unknown_events", []) + [ { "type": str(type(event)), "dir": str(dir(event)) } ] # Include debug info complete_response["_debug_info"] = { "all_events": all_events } return complete_response except Exception as e: return { "status": "error", "message": f"Error processing stream events: {str(e)}", "_debug_info": { "all_events": all_events } } except Exception as e: return { "status": "error", "message": f"Error sending message (stream): {str(e)}", }

Other Tools

Related Tools

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/GongRzhe/A2A-MCP-Server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server