provide_user_input
Resume a waiting interactive agent by submitting required user input, facilitating communication between ACP agents and MCP-compatible tools via the ACP-MCP Server.
Instructions
Provide user input to resume a waiting interactive agent
Input Schema
| Name | Required | Description | Default |
|---|---|---|---|
| run_id | Yes | ||
| user_input | Yes |
Input Schema (JSON Schema)
{
"properties": {
"run_id": {
"title": "Run Id",
"type": "string"
},
"user_input": {
"title": "User Input",
"type": "string"
}
},
"required": [
"run_id",
"user_input"
],
"type": "object"
}
Implementation Reference
- The core handler function for the 'provide_user_input' MCP tool. Decorated with @mcp.tool() for automatic registration. It resumes the interactive agent by calling InteractiveManager.resume_interactive_agent and returns a JSON-serialized result.@mcp.tool() async def provide_user_input( run_id: str, user_input: str ) -> str: """Provide user input to resume a waiting interactive agent""" try: result = await manager.resume_interactive_agent(run_id, user_input) return json.dumps(result, indent=2) except Exception as e: return f"Error: {e}"
- acp_mcp_server/server.py:89-89 (registration)The call to register_interactive_tools in the server's _register_all_tools method, which executes the tool definitions and registrations including 'provide_user_input'.register_interactive_tools(self.mcp, self.interactive_manager)
- The InteractiveManager.resume_interactive_agent method, which contains the core logic invoked by the 'provide_user_input' tool handler for resuming agent execution with user input.async def resume_interactive_agent( self, run_id: str, user_input: str ) -> Dict[str, Any]: """Resume an agent that was waiting for user input""" if run_id not in self.pending_interactions: return { "status": "error", "error": f"No pending interaction found for run_id: {run_id}" } pending = self.pending_interactions[run_id] # Check timeout current_time = asyncio.get_event_loop().time() if current_time - pending.timestamp > pending.timeout_seconds: del self.pending_interactions[run_id] return { "status": "timeout", "error": "Interaction timed out" } try: # Resume the agent with user input # Note: This is a simplified version - actual ACP resume implementation may vary resume_payload = { "run_id": run_id, "resume_input": user_input } # For now, we'll simulate resume by starting a new session # In a real implementation, this would use ACP's resume endpoint run = await self.orchestrator.execute_agent_sync( agent_name=pending.agent_name, input_text=user_input, session_id=pending.session_id ) # Clean up pending interaction del self.pending_interactions[run_id] # Check if agent needs more input if hasattr(run, 'await_request') and run.await_request: # Agent needs more input new_pending = PendingInteraction( run_id=run.run_id, agent_name=pending.agent_name, session_id=pending.session_id, await_message=run.await_request.get('message', 'Agent is waiting for more input'), timestamp=current_time, timeout_seconds=pending.timeout_seconds ) self.pending_interactions[run.run_id] = new_pending return { "status": "awaiting_more_input", "run_id": run.run_id, "message": new_pending.await_message } else: # Agent completed output = "" if run.output: # Handle ACP output format - run.output is already a list of messages output_text = "" for message in run.output: if isinstance(message, dict) and "parts" in message: for part in message["parts"]: if isinstance(part, dict) and "content" in part: output_text += part["content"] + "\n" output = output_text.strip() if output_text else "No text content" # Store final result self.interaction_results[run_id] = { "output": output, "error": run.error, "completed_at": current_time } return { "status": "completed", "run_id": run.run_id, "output": output, "error": run.error } except Exception as e: # Clean up on error if run_id in self.pending_interactions: del self.pending_interactions[run_id] return { "status": "error", "error": str(e) }