Skip to main content
Glama

send_message_stream

Send messages to A2A agents and receive streaming responses for real-time interaction and multi-turn conversations.

Instructions

Send a message to an A2A agent and stream the response.

Args: agent_url: URL of the A2A agent message: Message to send session_id: Optional session ID for multi-turn conversations

Returns: Stream of agent's responses

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
agent_urlYes
messageYes
session_idNo

Implementation Reference

  • The core handler function for the 'send_message_stream' tool, decorated with @mcp.tool(). It sends a streaming message to a registered A2A agent using A2AClient.send_task_streaming(), processes the event stream (TaskStatusUpdateEvent and TaskArtifactUpdateEvent), accumulates responses, messages, and artifacts, and returns a complete response dictionary.
    @mcp.tool()
    async def send_message_stream(
        agent_url: str,
        message: str,
        session_id: Optional[str] = None,
        ctx: Context = None,
    ) -> Dict[str, Any]:
        """
        Send a message to an A2A agent and stream the response.
        
        Args:
            agent_url: URL of the A2A agent
            message: Message to send
            session_id: Optional session ID for multi-turn conversations
            
        Returns:
            Stream of agent's responses
        """
        if agent_url not in registered_agents:
            return {
                "status": "error",
                "message": f"Agent not registered: {agent_url}",
            }
        
        # Create a client for the agent
        client = A2AClient(url=agent_url)
        
        try:
            # Generate a task ID
            task_id = str(uuid.uuid4())
            
            # Store the mapping of task_id to agent_url for later reference
            task_agent_mapping[task_id] = agent_url
            
            # Save the task mapping to disk
            save_to_json(task_agent_mapping, TASK_AGENT_MAPPING_FILE)
            
            # Create the message
            a2a_message = Message(
                role="user",
                parts=[TextPart(text=message)],
            )
            
            if ctx:
                await ctx.info(f"Sending message to agent (streaming): {message}")
                
            # Start progress indication
            if ctx:
                await ctx.info("Processing...")
            
            # Dictionary to accumulate streaming responses
            complete_response = {
                "status": "success",
                "task_id": task_id,
                "session_id": session_id,
                "state": "working",
                "messages": [],
                "artifacts": [],
            }
            
            # Create payload as a single dictionary
            payload = {
                "id": task_id,
                "message": a2a_message,
            }
            if session_id:
                payload["sessionId"] = session_id
            
            # Send the task and subscribe to updates
            stream = client.send_task_streaming(payload)
            
            # Process and report stream events
            try:
                all_events = []
                
                async for event in stream:
                    # Save all events for debugging
                    all_events.append({
                        "type": str(type(event)),
                        "dir": str(dir(event)),
                    })
                    
                    if hasattr(event, "result"):
                        if hasattr(event.result, "status"):
                            # It's a TaskStatusUpdateEvent
                            status_event = event.result
                            
                            # Update the state
                            if hasattr(status_event, "status") and hasattr(status_event.status, "state"):
                                complete_response["state"] = status_event.status.state
                            
                            # Extract any message
                            if hasattr(status_event, "status") and hasattr(status_event.status, "message") and status_event.status.message:
                                message_text = ""
                                for part in status_event.status.message.parts:
                                    if part.type == "text":
                                        message_text += part.text
                                
                                if message_text:
                                    complete_response["messages"].append(message_text)
                                    if ctx:
                                        await ctx.info(f"Agent: {message_text}")
                            
                            # If this is the final event, set session ID
                            if hasattr(status_event, "final") and status_event.final:
                                complete_response["session_id"] = getattr(status_event, "sessionId", session_id)
                            
                        elif hasattr(event.result, "artifact"):
                            # It's a TaskArtifactUpdateEvent
                            artifact_event = event.result
                            
                            # Extract artifact content
                            artifact_data = {
                                "name": artifact_event.artifact.name if hasattr(artifact_event.artifact, "name") else "unnamed",
                                "contents": [],
                            }
                            
                            for part in artifact_event.artifact.parts:
                                if part.type == "text":
                                    artifact_data["contents"].append({
                                        "type": "text",
                                        "text": part.text,
                                    })
                                elif part.type == "data":
                                    artifact_data["contents"].append({
                                        "type": "data",
                                        "data": part.data,
                                    })
                            
                            complete_response["artifacts"].append(artifact_data)
                            
                            if ctx:
                                await ctx.info(f"Received artifact: {artifact_data['name']}")
                    else:
                        # Unknown event type, try to extract what we can
                        complete_response["unknown_events"] = complete_response.get("unknown_events", []) + [
                            {
                                "type": str(type(event)),
                                "dir": str(dir(event))
                            }
                        ]
                
                # Include debug info            
                complete_response["_debug_info"] = {
                    "all_events": all_events
                }
                
                return complete_response
            except Exception as e:
                return {
                    "status": "error",
                    "message": f"Error processing stream events: {str(e)}",
                    "_debug_info": {
                        "all_events": all_events
                    }
                }
                
        except Exception as e:
            return {
                "status": "error",
                "message": f"Error sending message (stream): {str(e)}",
            }
  • The @mcp.tool() decorator registers the send_message_stream function as an MCP tool.
    @mcp.tool()

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/GongRzhe/A2A-MCP-Server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server