Skip to main content
Glama
conversation_service.py•32.1 kB
""" Conversation Service for AutoGen UI Handles message flow between conversation widgets and session management """ import asyncio import json import logging from typing import Dict, Any, List, Optional from datetime import datetime from dataclasses import dataclass, asdict from enum import Enum from PySide6.QtCore import QObject, Signal, QThread, QTimer from .session_service import SessionService from .mcp_conversation_client import MCPConversationClient logger = logging.getLogger(__name__) class MessageType(Enum): """Enumeration of different message types in conversations""" USER = "user" # User input messages AGENT_RESPONSE = "agent_response" # Direct agent responses to user AGENT_TO_AGENT = "agent_to_agent" # Agent talking to another agent AGENT_THINKING = "agent_thinking" # Agent reasoning/internal process AGENT_COORDINATION = "agent_coordination" # Agents coordinating tasks SYSTEM = "system" # System status messages @dataclass class ConversationMessage: """Enhanced data class for conversation messages with agent targeting""" id: str session_id: str role: str # 'user', 'assistant', 'system' (for backward compatibility) content: str timestamp: str message_type: str = MessageType.USER.value # Type of message agent_name: Optional[str] = None target_agents: Optional[List[str]] = None # Targeted agents source_agent: Optional[str] = None # Source agent (for agent-to-agent) reasoning_context: Optional[str] = None # Agent reasoning details metadata: Optional[Dict[str, Any]] = None def to_dict(self) -> Dict[str, Any]: """Convert to dictionary for serialization""" return asdict(self) @classmethod def create_user_message( cls, session_id: str, content: str, target_agents: Optional[List[str]] = None ) -> "ConversationMessage": """Factory method for creating user messages""" return cls( id=f"user_{datetime.now().timestamp()}", session_id=session_id, role="user", content=content, timestamp=datetime.now().strftime("%H:%M:%S"), message_type=MessageType.USER.value, target_agents=target_agents, ) @classmethod def create_agent_response( cls, session_id: str, agent_name: str, content: str ) -> "ConversationMessage": """Factory method for creating agent response messages""" return cls( id=f"agent_{agent_name}_{datetime.now().timestamp()}", session_id=session_id, role="assistant", content=content, timestamp=datetime.now().strftime("%H:%M:%S"), message_type=MessageType.AGENT_RESPONSE.value, agent_name=agent_name, ) @classmethod def create_agent_to_agent_message( cls, session_id: str, source_agent: str, target_agent: str, content: str ) -> "ConversationMessage": """Factory method for creating agent-to-agent messages""" timestamp = datetime.now().timestamp() return cls( id=f"a2a_{source_agent}_{target_agent}_{timestamp}", session_id=session_id, role="assistant", content=content, timestamp=datetime.now().strftime("%H:%M:%S"), message_type=MessageType.AGENT_TO_AGENT.value, source_agent=source_agent, target_agents=[target_agent], ) @classmethod def create_agent_thinking_message( cls, session_id: str, agent_name: str, reasoning: str ) -> "ConversationMessage": """Factory method for creating agent thinking/reasoning messages""" timestamp = datetime.now().timestamp() return cls( id=f"thinking_{agent_name}_{timestamp}", session_id=session_id, role="assistant", content=reasoning, timestamp=datetime.now().strftime("%H:%M:%S"), message_type=MessageType.AGENT_THINKING.value, agent_name=agent_name, reasoning_context=reasoning, ) @classmethod def create_agent_coordination_message( cls, session_id: str, content: str ) -> "ConversationMessage": """Factory method for creating agent coordination messages""" timestamp = datetime.now().timestamp() return cls( id=f"coord_{timestamp}", session_id=session_id, role="assistant", content=content, timestamp=datetime.now().strftime("%H:%M:%S"), message_type=MessageType.AGENT_COORDINATION.value, ) @classmethod def create_system_message( cls, session_id: str, content: str ) -> "ConversationMessage": """Factory method for creating system messages""" return cls( id=f"system_{datetime.now().timestamp()}", session_id=session_id, role="system", content=content, timestamp=datetime.now().strftime("%H:%M:%S"), message_type=MessageType.SYSTEM.value, ) class ConversationWorker(QThread): """Worker thread for async conversation operations""" message_sent = Signal(str, str) # session_id, message_id message_received = Signal(ConversationMessage) agent_typing = Signal(str, str) # session_id, agent_name agent_stopped_typing = Signal(str, str) # session_id, agent_name error_occurred = Signal(str, str) # session_id, error_message def __init__(self, session_service: SessionService, mcp_client=None): super().__init__() self.session_service = session_service self.mcp_client = mcp_client self.active_sessions: Dict[str, bool] = {} self.message_queue: List[Dict[str, Any]] = [] self._running = False def run(self): """Main worker thread loop""" self._running = True loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) try: loop.run_until_complete(self._message_processing_loop()) except Exception as e: logger.error(f"Conversation worker error: {e}") finally: loop.close() async def _message_processing_loop(self): """Process queued messages""" while self._running: if self.message_queue: message_data = self.message_queue.pop(0) try: await self._send_message_to_session(message_data) except Exception as e: self.error_occurred.emit( message_data.get("session_id", ""), f"Failed to send message: {str(e)}", ) await asyncio.sleep(0.1) # Small delay to prevent busy waiting async def _send_message_to_session(self, message_data: dict): """Send message to session service using real MCP communication""" try: session_id = message_data.get("session_id", "") message = message_data.get("message", "") target_agents = message_data.get("agents", []) logger.info( f"Querying agents via MCP for session {session_id}", extra={ "extra": { "session_id": session_id, "target_agents": target_agents, "message_length": len(message), } }, ) # Use MCP client to query agents if self.mcp_client: try: # Query agents via MCP server result = self.mcp_client.query_agents_sync( session_id=session_id, message=message, target_agents=target_agents if target_agents else None, ) # Process agent responses from MCP result agent_responses = result.get("responses", {}) if agent_responses: logger.info( f"Received {len(agent_responses)} agent responses", extra={ "extra": { "session_id": session_id, "response_count": len(agent_responses), } }, ) # Emit each agent response for agent_name, response_content in agent_responses.items(): response_message = ConversationMessage( id=f"{agent_name.lower()}_{datetime.now().timestamp()}", session_id=session_id, role="assistant", content=str(response_content), timestamp=datetime.now().strftime("%H:%M:%S"), message_type=MessageType.AGENT_RESPONSE.value, agent_name=agent_name, ) # Emit the response self.message_received.emit(response_message) # Small delay between responses for natural flow await asyncio.sleep(0.2) else: logger.warning( f"No agent responses received for session {session_id}" ) except Exception as mcp_error: logger.error( f"MCP agent query failed: {str(mcp_error)}", extra={ "extra": {"session_id": session_id, "error": str(mcp_error)} }, ) # Emit error as a system message error_message = ConversationMessage( id=f"error_{datetime.now().timestamp()}", session_id=session_id, role="system", content=f"Failed to get agent responses: {str(mcp_error)}", timestamp=datetime.now().strftime("%H:%M:%S"), message_type=MessageType.SYSTEM.value, ) self.message_received.emit(error_message) else: # Fallback to mock responses if MCP client not available logger.warning( "MCP client not available, using fallback mock responses" ) await self._send_mock_responses(session_id, message) except Exception as e: logger.error(f"Error in _send_message_to_session: {e}") raise async def _send_mock_responses(self, session_id: str, message: str): """Fallback method for mock responses when MCP is unavailable""" # Create mock agent responses agent_responses = [ { "agent": "ScrumMaster", "response": f"Thank you for the input! I'll facilitate our discussion on: '{message[:100]}...' Let's ensure all team members can contribute.", "delay": 1.0, }, { "agent": "ProductOwner", "response": "From a product perspective, I need to prioritize the features you mentioned. Here's my initial assessment of the requirements.", "delay": 2.0, }, ] # Send mock responses with delays for agent_data in agent_responses: await asyncio.sleep(agent_data["delay"]) response_message = ConversationMessage( id=f"{agent_data['agent'].lower()}_{datetime.now().timestamp()}", session_id=session_id, role="assistant", content=agent_data["response"], timestamp=datetime.now().strftime("%H:%M:%S"), message_type=MessageType.AGENT_RESPONSE.value, agent_name=agent_data["agent"], ) # Emit the response self.message_received.emit(response_message) def send_message_to_session(self, message_data: dict): """Send message to session service""" # Extract session_id for future use session_id = message_data["session_id"] # noqa: F841 # Emit typing indicators for agents (simulate agent processing) QTimer.singleShot( 500, lambda: self.typing_indicator_updated.emit("Agent 1", True) ) QTimer.singleShot( 2000, lambda: self.typing_indicator_updated.emit("Agent 1", False) ) # TODO: Implement real session service communication # For now, emit a mock response mock_response = ConversationMessage( sender="Assistant", content="This is a mock response. Real session integration coming soon.", # noqa: E501 timestamp=datetime.now(), message_type="assistant", ) # Simulate processing delay QTimer.singleShot( 1500, lambda: self.message_received.emit(mock_response.to_dict()) ) def queue_message(self, session_id: str, message: str, agents: List[str] = None): """Queue a message for processing""" self.message_queue.append( {"session_id": session_id, "message": message, "agents": agents or []} ) def stop_worker(self): """Stop the worker thread""" self._running = False class ConversationService(QObject): """Service for managing conversation flow and integration""" # Signals message_added = Signal(ConversationMessage) message_sent = Signal(str, str) # session_id, message_id agent_typing_started = Signal(str, str) # session_id, agent_name agent_typing_stopped = Signal(str, str) # session_id, agent_name conversation_cleared = Signal(str) # session_id conversation_exported = Signal(str, str) # session_id, file_path error_occurred = Signal(str, str) # session_id, error_message def __init__(self, session_service: SessionService): super().__init__() self.session_service = session_service self.conversations: Dict[str, List[ConversationMessage]] = {} self.active_sessions: Dict[str, Dict[str, Any]] = {} # Initialize MCP client for backend integration self.mcp_client = MCPConversationClient() # Initialize worker thread self.worker = ConversationWorker(session_service, self.mcp_client) self._connect_worker_signals() self.worker.start() def _connect_worker_signals(self): """Connect worker thread signals""" self.worker.message_sent.connect(self.message_sent.emit) self.worker.message_received.connect(self._on_message_received) self.worker.agent_typing.connect(self.agent_typing_started.emit) self.worker.agent_stopped_typing.connect(self.agent_typing_stopped.emit) self.worker.error_occurred.connect(self.error_occurred.emit) def _on_message_received(self, message: ConversationMessage): """Handle received message from worker""" self.add_message_to_conversation(message) self.message_added.emit(message) def start_conversation(self, session_id: str, session_config: Dict[str, Any]): """Start a new conversation for a session""" self.active_sessions[session_id] = session_config self.conversations[session_id] = [] # Add welcome message session_name = session_config.get("name", "Unknown") agents_list = ", ".join(session_config.get("agents", [])) content = f"Session '{session_name}' started with agents: {agents_list}" welcome_message = ConversationMessage( id=f"welcome_{session_id}", session_id=session_id, role="system", content=content, timestamp=datetime.now().strftime("%H:%M:%S"), ) self.add_message_to_conversation(welcome_message) self.message_added.emit(welcome_message) logger.info(f"Started conversation for session {session_id}") def send_message(self, session_id: str, message: str) -> str: """Send a user message to the conversation""" if session_id not in self.active_sessions: raise ValueError(f"No active session {session_id}") # Create user message message_id = f"user_{datetime.now().timestamp()}" user_message = ConversationMessage( id=message_id, session_id=session_id, role="user", content=message, timestamp=datetime.now().strftime("%H:%M:%S"), ) # Add to conversation immediately self.add_message_to_conversation(user_message) self.message_added.emit(user_message) # Queue for processing by agents session_config = self.active_sessions[session_id] agents = session_config.get("agents", []) self.worker.queue_message(session_id, message, agents) self.message_sent.emit(session_id, message_id) logger.info(f"Sent message to session {session_id}: {message[:50]}...") return message_id def send_targeted_message( self, session_id: str, message: str, target_agents: List[str] ) -> str: """Send a user message to specific agents in the conversation""" if session_id not in self.active_sessions: raise ValueError(f"No active session {session_id}") # Validate target agents exist in session session_config = self.active_sessions[session_id] available_agents = session_config.get("agents", []) invalid_agents = [ agent for agent in target_agents if agent not in available_agents ] if invalid_agents: raise ValueError(f"Invalid agents: {', '.join(invalid_agents)}") # Create targeted user message user_message = ConversationMessage.create_user_message( session_id=session_id, content=message, target_agents=target_agents ) # Add to conversation immediately self.add_message_to_conversation(user_message) self.message_added.emit(user_message) # Queue for processing by specific agents self.worker.queue_message(session_id, message, target_agents) self.message_sent.emit(session_id, user_message.id) agent_list = ", ".join(target_agents) logger.info( f"Sent targeted message to {agent_list} in session {session_id}: " f"{message[:50]}..." ) return user_message.id def add_agent_to_agent_message( self, session_id: str, source_agent: str, target_agent: str, content: str ): """Add an agent-to-agent communication message""" message = ConversationMessage.create_agent_to_agent_message( session_id=session_id, source_agent=source_agent, target_agent=target_agent, content=content, ) self.add_message_to_conversation(message) self.message_added.emit(message) # Factory methods for creating messages def create_user_message( self, session_id: str, content: str, target_agents: Optional[List[str]] = None ) -> ConversationMessage: """Create a user message""" return ConversationMessage.create_user_message( session_id, content, target_agents ) def create_targeted_user_message( self, session_id: str, content: str, target_agents: List[str] ) -> ConversationMessage: """Create a targeted user message""" return ConversationMessage.create_user_message( session_id, content, target_agents ) def create_agent_response( self, session_id: str, content: str, agent_name: str ) -> ConversationMessage: """Create an agent response message""" return ConversationMessage.create_agent_response( session_id, agent_name, content ) def create_agent_thinking( self, session_id: str, content: str, agent_name: str, reasoning_context: Optional[str] = None, ) -> ConversationMessage: """Create an agent thinking message""" message = ConversationMessage.create_agent_thinking_message( session_id, agent_name, content ) if reasoning_context: message.reasoning_context = reasoning_context return message def create_agent_coordination( self, session_id: str, content: str, participating_agents: Optional[List[str]] = None, ) -> ConversationMessage: """Create an agent coordination message""" message = ConversationMessage.create_agent_coordination_message( session_id, content ) if participating_agents: message.metadata = {"participating_agents": participating_agents} return message def get_conversation_history(self, session_id: str) -> List[ConversationMessage]: """Get conversation history for a session""" return self.get_conversation(session_id) def add_message(self, message: ConversationMessage): """Add a message to the conversation""" self.add_message_to_conversation(message) def add_agent_thinking_message( self, session_id: str, agent_name: str, reasoning: str ): """Add an agent thinking/reasoning message""" message = ConversationMessage.create_agent_thinking_message( session_id=session_id, agent_name=agent_name, reasoning=reasoning ) self.add_message_to_conversation(message) self.message_added.emit(message) logger.info(f"Agent thinking: {agent_name}") def add_agent_coordination_message( self, session_id: str, content: str, involved_agents: List[str] ): """Add an agent coordination message""" message = ConversationMessage( id=f"coord_{datetime.now().timestamp()}", session_id=session_id, role="system", content=content, timestamp=datetime.now().strftime("%H:%M:%S"), message_type=MessageType.AGENT_COORDINATION.value, target_agents=involved_agents, ) self.add_message_to_conversation(message) self.message_added.emit(message) logger.info("Agent coordination message added") def get_conversation_by_message_types( self, session_id: str, message_types: List[str] ) -> List[ConversationMessage]: """Get conversation history filtered by message types""" conversation = self.get_conversation(session_id) return [msg for msg in conversation if msg.message_type in message_types] def add_message_to_conversation(self, message: ConversationMessage): """Add a message to the conversation history""" if message.session_id not in self.conversations: self.conversations[message.session_id] = [] self.conversations[message.session_id].append(message) def get_conversation(self, session_id: str) -> List[ConversationMessage]: """Get conversation history for a session""" return self.conversations.get(session_id, []).copy() def clear_conversation(self, session_id: str): """Clear conversation history for a session""" if session_id in self.conversations: self.conversations[session_id] = [] self.conversation_cleared.emit(session_id) logger.info(f"Cleared conversation for session {session_id}") def end_conversation(self, session_id: str): """End conversation for a session""" if session_id in self.active_sessions: # Add goodbye message session_name = self.active_sessions[session_id].get("name", "Unknown") content = f"Session '{session_name}' ended." goodbye_message = ConversationMessage( id=f"goodbye_{session_id}", session_id=session_id, role="system", content=content, timestamp=datetime.now().strftime("%H:%M:%S"), ) self.add_message_to_conversation(goodbye_message) self.message_added.emit(goodbye_message) # Clean up session del self.active_sessions[session_id] logger.info(f"Ended conversation for session {session_id}") def export_conversation( self, session_id: str, file_path: str, format: str = "json" ) -> bool: """Export conversation to file""" try: conversation = self.get_conversation(session_id) if not conversation: raise ValueError("No conversation to export") if format.lower() == "json": self._export_to_json(conversation, file_path) elif format.lower() == "markdown": self._export_to_markdown(conversation, file_path) else: raise ValueError(f"Unsupported export format: {format}") self.conversation_exported.emit(session_id, file_path) logger.info(f"Exported conversation {session_id} to {file_path}") return True except Exception as e: logger.error(f"Failed to export conversation {session_id}: {e}") self.error_occurred.emit(session_id, f"Export failed: {str(e)}") return False def _export_to_json(self, conversation: List[ConversationMessage], file_path: str): """Export conversation to JSON format""" export_data = { "exported_at": datetime.now().isoformat(), "message_count": len(conversation), "messages": [msg.to_dict() for msg in conversation], } with open(file_path, "w", encoding="utf-8") as f: json.dump(export_data, f, indent=2, ensure_ascii=False) def _export_to_markdown( self, conversation: List[ConversationMessage], file_path: str ): """Export conversation to Markdown format""" with open(file_path, "w", encoding="utf-8") as f: f.write("# Conversation Export\n\n") timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") f.write(f"**Exported:** {timestamp}\n") f.write(f"**Messages:** {len(conversation)}\n\n") f.write("---\n\n") for msg in conversation: role_display = msg.role.title() if msg.agent_name: role_display = f"{msg.agent_name} ({role_display})" f.write(f"## {role_display} - {msg.timestamp}\n\n") f.write(f"{msg.content}\n\n") f.write("---\n\n") def get_active_sessions(self) -> Dict[str, Dict[str, Any]]: """Get all active conversation sessions""" return self.active_sessions.copy() def is_session_active(self, session_id: str) -> bool: """Check if a session has an active conversation""" return session_id in self.active_sessions def cleanup(self): """Clean up the service""" if hasattr(self, "worker") and self.worker.isRunning(): self.worker.stop_worker() self.worker.wait(3000) # Wait up to 3 seconds for thread to finish # Clean up MCP client try: import asyncio loop = asyncio.get_event_loop() loop.run_until_complete(self.mcp_client.close()) except Exception as e: logger.warning(f"Failed to close MCP client: {e}") logger.info("ConversationService cleaned up") # MCP Integration Methods def send_message_via_mcp( self, session_id: str, content: str, target_agents: Optional[List[str]] = None, message_type: str = "user", ) -> bool: """Send message via MCP server""" try: result = self.mcp_client.send_message_sync( session_id, content, target_agents, message_type ) logger.info(f"Message sent via MCP: {result}") return True except Exception as e: logger.error(f"Failed to send message via MCP: {e}") self.error_occurred.emit(session_id, f"MCP communication failed: {str(e)}") return False def load_conversation_from_mcp(self, session_id: str) -> bool: """Load conversation history from MCP server""" try: messages = self.mcp_client.get_conversation_history_sync(session_id) # Clear current conversation self.conversations[session_id] = [] # Convert MCP messages to ConversationMessage objects for msg_data in messages: timestamp_default = datetime.now().strftime("%H:%M:%S") message = ConversationMessage( id=msg_data.get("id", f"mcp_{datetime.now().timestamp()}"), session_id=session_id, role=msg_data.get("role", "user"), content=msg_data.get("content", ""), timestamp=msg_data.get("timestamp", timestamp_default), message_type=msg_data.get("message_type", "user"), agent_name=msg_data.get("agent_name"), target_agents=msg_data.get("target_agents"), source_agent=msg_data.get("source_agent"), reasoning_context=msg_data.get("reasoning_context"), metadata=msg_data.get("metadata"), ) self.conversations[session_id].append(message) self.message_added.emit(message) logger.info( f"Loaded {len(messages)} messages from MCP for session {session_id}" ) return True except Exception as e: logger.error(f"Failed to load conversation from MCP: {e}") self.error_occurred.emit( session_id, f"Failed to load conversation: {str(e)}" ) return False def get_available_agents_from_mcp(self, session_id: str) -> List[str]: """Get available agents for session from MCP server""" try: agents = self.mcp_client.get_session_agents_sync(session_id) logger.info(f"Available agents for {session_id}: {agents}") return agents except Exception as e: logger.error(f"Failed to get agents from MCP: {e}") return [] def sync_with_mcp_server(self, session_id: str): """Synchronize conversation state with MCP server""" try: # Send any unsent messages to MCP if session_id in self.conversations: for message in self.conversations[session_id]: # Check if message needs to be synced if not message.metadata or not message.metadata.get( "synced_to_mcp" ): self.send_message_via_mcp( session_id, message.content, message.target_agents, message.message_type, ) # Mark as synced if not message.metadata: message.metadata = {} message.metadata["synced_to_mcp"] = True # Load latest from MCP self.load_conversation_from_mcp(session_id) except Exception as e: logger.error(f"Failed to sync with MCP server: {e}") self.error_occurred.emit(session_id, f"Sync failed: {str(e)}")

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/hannesnortje/MCP'

If you have feedback or need assistance with the MCP directory API, please join our Discord server