Skip to main content
Glama
interactive_manager.py10.7 kB
# interactive_manager.py import json import asyncio from typing import Dict, Optional, Any from pydantic import BaseModel from fastmcp import FastMCP from .run_orchestrator import RunOrchestrator from enum import Enum class InteractionState(str, Enum): WAITING = "waiting" RESUMED = "resumed" COMPLETED = "completed" TIMEOUT = "timeout" ERROR = "error" class PendingInteraction(BaseModel): run_id: str agent_name: str session_id: Optional[str] await_message: str timestamp: float timeout_seconds: int = 300 # 5 minutes default class InteractiveManager: def __init__(self, orchestrator: RunOrchestrator): self.orchestrator = orchestrator self.pending_interactions: Dict[str, PendingInteraction] = {} self.interaction_results: Dict[str, Any] = {} async def start_interactive_agent( self, agent_name: str, initial_input: str, session_id: Optional[str] = None, timeout_seconds: int = 300 ) -> Dict[str, Any]: """Start an interactive agent that may require user input""" try: # Start the agent execution run = await self.orchestrator.execute_agent_sync( agent_name=agent_name, input_text=initial_input, session_id=session_id ) # Check if agent is waiting for input if hasattr(run, 'await_request') and run.await_request: # Agent is waiting for input pending = PendingInteraction( run_id=run.run_id, agent_name=agent_name, session_id=session_id, await_message=run.await_request.get('message', 'Agent is waiting for input'), timestamp=asyncio.get_event_loop().time(), timeout_seconds=timeout_seconds ) self.pending_interactions[run.run_id] = pending return { "status": "awaiting_input", "run_id": run.run_id, "message": pending.await_message, "timeout_seconds": timeout_seconds } else: # Agent completed normally output = "" if run.output: # Handle ACP output format - run.output is already a list of messages output_text = "" for message in run.output: if isinstance(message, dict) and "parts" in message: for part in message["parts"]: if isinstance(part, dict) and "content" in part: output_text += part["content"] + "\n" output = output_text.strip() if output_text else "No text content" return { "status": "completed", "run_id": run.run_id, "output": output, "error": run.error } except Exception as e: return { "status": "error", "error": str(e) } async def resume_interactive_agent( self, run_id: str, user_input: str ) -> Dict[str, Any]: """Resume an agent that was waiting for user input""" if run_id not in self.pending_interactions: return { "status": "error", "error": f"No pending interaction found for run_id: {run_id}" } pending = self.pending_interactions[run_id] # Check timeout current_time = asyncio.get_event_loop().time() if current_time - pending.timestamp > pending.timeout_seconds: del self.pending_interactions[run_id] return { "status": "timeout", "error": "Interaction timed out" } try: # Resume the agent with user input # Note: This is a simplified version - actual ACP resume implementation may vary resume_payload = { "run_id": run_id, "resume_input": user_input } # For now, we'll simulate resume by starting a new session # In a real implementation, this would use ACP's resume endpoint run = await self.orchestrator.execute_agent_sync( agent_name=pending.agent_name, input_text=user_input, session_id=pending.session_id ) # Clean up pending interaction del self.pending_interactions[run_id] # Check if agent needs more input if hasattr(run, 'await_request') and run.await_request: # Agent needs more input new_pending = PendingInteraction( run_id=run.run_id, agent_name=pending.agent_name, session_id=pending.session_id, await_message=run.await_request.get('message', 'Agent is waiting for more input'), timestamp=current_time, timeout_seconds=pending.timeout_seconds ) self.pending_interactions[run.run_id] = new_pending return { "status": "awaiting_more_input", "run_id": run.run_id, "message": new_pending.await_message } else: # Agent completed output = "" if run.output: # Handle ACP output format - run.output is already a list of messages output_text = "" for message in run.output: if isinstance(message, dict) and "parts" in message: for part in message["parts"]: if isinstance(part, dict) and "content" in part: output_text += part["content"] + "\n" output = output_text.strip() if output_text else "No text content" # Store final result self.interaction_results[run_id] = { "output": output, "error": run.error, "completed_at": current_time } return { "status": "completed", "run_id": run.run_id, "output": output, "error": run.error } except Exception as e: # Clean up on error if run_id in self.pending_interactions: del self.pending_interactions[run_id] return { "status": "error", "error": str(e) } async def get_pending_interactions(self) -> Dict[str, Any]: """Get all pending interactions""" current_time = asyncio.get_event_loop().time() active_interactions = {} expired_interactions = [] for run_id, pending in self.pending_interactions.items(): if current_time - pending.timestamp > pending.timeout_seconds: expired_interactions.append(run_id) else: active_interactions[run_id] = { "agent_name": pending.agent_name, "message": pending.await_message, "waiting_seconds": int(current_time - pending.timestamp), "timeout_in_seconds": int(pending.timeout_seconds - (current_time - pending.timestamp)) } # Clean up expired interactions for run_id in expired_interactions: del self.pending_interactions[run_id] return { "active_count": len(active_interactions), "expired_count": len(expired_interactions), "interactions": active_interactions } async def cancel_interaction(self, run_id: str) -> bool: """Cancel a pending interaction""" if run_id in self.pending_interactions: del self.pending_interactions[run_id] return True return False # Integration with FastMCP def register_interactive_tools(mcp: FastMCP, manager: InteractiveManager): @mcp.tool() async def start_interactive_agent( agent_name: str, initial_input: str, session_id: str = None, timeout_minutes: int = 5 ) -> str: """Start an interactive ACP agent that may require user input""" try: result = await manager.start_interactive_agent( agent_name=agent_name, initial_input=initial_input, session_id=session_id, timeout_seconds=timeout_minutes * 60 ) return json.dumps(result, indent=2) except Exception as e: return f"Error: {e}" @mcp.tool() async def provide_user_input( run_id: str, user_input: str ) -> str: """Provide user input to resume a waiting interactive agent""" try: result = await manager.resume_interactive_agent(run_id, user_input) return json.dumps(result, indent=2) except Exception as e: return f"Error: {e}" @mcp.tool() async def list_pending_interactions() -> str: """List all pending interactive agents waiting for input""" try: interactions = await manager.get_pending_interactions() return json.dumps(interactions, indent=2) except Exception as e: return f"Error: {e}" @mcp.tool() async def cancel_interaction(run_id: str) -> str: """Cancel a pending interactive agent""" try: success = await manager.cancel_interaction(run_id) if success: return f"Successfully cancelled interaction: {run_id}" else: return f"No pending interaction found with ID: {run_id}" except Exception as e: return f"Error: {e}"

Implementation Reference

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/GongRzhe/ACP-MCP-Server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server