conversation_service.pyā¢32.1 kB
"""
Conversation Service for AutoGen UI
Handles message flow between conversation widgets and session management
"""
import asyncio
import json
import logging
from typing import Dict, Any, List, Optional
from datetime import datetime
from dataclasses import dataclass, asdict
from enum import Enum
from PySide6.QtCore import QObject, Signal, QThread, QTimer
from .session_service import SessionService
from .mcp_conversation_client import MCPConversationClient
logger = logging.getLogger(__name__)
class MessageType(Enum):
"""Enumeration of different message types in conversations"""
USER = "user" # User input messages
AGENT_RESPONSE = "agent_response" # Direct agent responses to user
AGENT_TO_AGENT = "agent_to_agent" # Agent talking to another agent
AGENT_THINKING = "agent_thinking" # Agent reasoning/internal process
AGENT_COORDINATION = "agent_coordination" # Agents coordinating tasks
SYSTEM = "system" # System status messages
@dataclass
class ConversationMessage:
"""Enhanced data class for conversation messages with agent targeting"""
id: str
session_id: str
role: str # 'user', 'assistant', 'system' (for backward compatibility)
content: str
timestamp: str
message_type: str = MessageType.USER.value # Type of message
agent_name: Optional[str] = None
target_agents: Optional[List[str]] = None # Targeted agents
source_agent: Optional[str] = None # Source agent (for agent-to-agent)
reasoning_context: Optional[str] = None # Agent reasoning details
metadata: Optional[Dict[str, Any]] = None
def to_dict(self) -> Dict[str, Any]:
"""Convert to dictionary for serialization"""
return asdict(self)
@classmethod
def create_user_message(
cls, session_id: str, content: str, target_agents: Optional[List[str]] = None
) -> "ConversationMessage":
"""Factory method for creating user messages"""
return cls(
id=f"user_{datetime.now().timestamp()}",
session_id=session_id,
role="user",
content=content,
timestamp=datetime.now().strftime("%H:%M:%S"),
message_type=MessageType.USER.value,
target_agents=target_agents,
)
@classmethod
def create_agent_response(
cls, session_id: str, agent_name: str, content: str
) -> "ConversationMessage":
"""Factory method for creating agent response messages"""
return cls(
id=f"agent_{agent_name}_{datetime.now().timestamp()}",
session_id=session_id,
role="assistant",
content=content,
timestamp=datetime.now().strftime("%H:%M:%S"),
message_type=MessageType.AGENT_RESPONSE.value,
agent_name=agent_name,
)
@classmethod
def create_agent_to_agent_message(
cls, session_id: str, source_agent: str, target_agent: str, content: str
) -> "ConversationMessage":
"""Factory method for creating agent-to-agent messages"""
timestamp = datetime.now().timestamp()
return cls(
id=f"a2a_{source_agent}_{target_agent}_{timestamp}",
session_id=session_id,
role="assistant",
content=content,
timestamp=datetime.now().strftime("%H:%M:%S"),
message_type=MessageType.AGENT_TO_AGENT.value,
source_agent=source_agent,
target_agents=[target_agent],
)
@classmethod
def create_agent_thinking_message(
cls, session_id: str, agent_name: str, reasoning: str
) -> "ConversationMessage":
"""Factory method for creating agent thinking/reasoning messages"""
timestamp = datetime.now().timestamp()
return cls(
id=f"thinking_{agent_name}_{timestamp}",
session_id=session_id,
role="assistant",
content=reasoning,
timestamp=datetime.now().strftime("%H:%M:%S"),
message_type=MessageType.AGENT_THINKING.value,
agent_name=agent_name,
reasoning_context=reasoning,
)
@classmethod
def create_agent_coordination_message(
cls, session_id: str, content: str
) -> "ConversationMessage":
"""Factory method for creating agent coordination messages"""
timestamp = datetime.now().timestamp()
return cls(
id=f"coord_{timestamp}",
session_id=session_id,
role="assistant",
content=content,
timestamp=datetime.now().strftime("%H:%M:%S"),
message_type=MessageType.AGENT_COORDINATION.value,
)
@classmethod
def create_system_message(
cls, session_id: str, content: str
) -> "ConversationMessage":
"""Factory method for creating system messages"""
return cls(
id=f"system_{datetime.now().timestamp()}",
session_id=session_id,
role="system",
content=content,
timestamp=datetime.now().strftime("%H:%M:%S"),
message_type=MessageType.SYSTEM.value,
)
class ConversationWorker(QThread):
"""Worker thread for async conversation operations"""
message_sent = Signal(str, str) # session_id, message_id
message_received = Signal(ConversationMessage)
agent_typing = Signal(str, str) # session_id, agent_name
agent_stopped_typing = Signal(str, str) # session_id, agent_name
error_occurred = Signal(str, str) # session_id, error_message
def __init__(self, session_service: SessionService, mcp_client=None):
super().__init__()
self.session_service = session_service
self.mcp_client = mcp_client
self.active_sessions: Dict[str, bool] = {}
self.message_queue: List[Dict[str, Any]] = []
self._running = False
def run(self):
"""Main worker thread loop"""
self._running = True
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
loop.run_until_complete(self._message_processing_loop())
except Exception as e:
logger.error(f"Conversation worker error: {e}")
finally:
loop.close()
async def _message_processing_loop(self):
"""Process queued messages"""
while self._running:
if self.message_queue:
message_data = self.message_queue.pop(0)
try:
await self._send_message_to_session(message_data)
except Exception as e:
self.error_occurred.emit(
message_data.get("session_id", ""),
f"Failed to send message: {str(e)}",
)
await asyncio.sleep(0.1) # Small delay to prevent busy waiting
async def _send_message_to_session(self, message_data: dict):
"""Send message to session service using real MCP communication"""
try:
session_id = message_data.get("session_id", "")
message = message_data.get("message", "")
target_agents = message_data.get("agents", [])
logger.info(
f"Querying agents via MCP for session {session_id}",
extra={
"extra": {
"session_id": session_id,
"target_agents": target_agents,
"message_length": len(message),
}
},
)
# Use MCP client to query agents
if self.mcp_client:
try:
# Query agents via MCP server
result = self.mcp_client.query_agents_sync(
session_id=session_id,
message=message,
target_agents=target_agents if target_agents else None,
)
# Process agent responses from MCP result
agent_responses = result.get("responses", {})
if agent_responses:
logger.info(
f"Received {len(agent_responses)} agent responses",
extra={
"extra": {
"session_id": session_id,
"response_count": len(agent_responses),
}
},
)
# Emit each agent response
for agent_name, response_content in agent_responses.items():
response_message = ConversationMessage(
id=f"{agent_name.lower()}_{datetime.now().timestamp()}",
session_id=session_id,
role="assistant",
content=str(response_content),
timestamp=datetime.now().strftime("%H:%M:%S"),
message_type=MessageType.AGENT_RESPONSE.value,
agent_name=agent_name,
)
# Emit the response
self.message_received.emit(response_message)
# Small delay between responses for natural flow
await asyncio.sleep(0.2)
else:
logger.warning(
f"No agent responses received for session {session_id}"
)
except Exception as mcp_error:
logger.error(
f"MCP agent query failed: {str(mcp_error)}",
extra={
"extra": {"session_id": session_id, "error": str(mcp_error)}
},
)
# Emit error as a system message
error_message = ConversationMessage(
id=f"error_{datetime.now().timestamp()}",
session_id=session_id,
role="system",
content=f"Failed to get agent responses: {str(mcp_error)}",
timestamp=datetime.now().strftime("%H:%M:%S"),
message_type=MessageType.SYSTEM.value,
)
self.message_received.emit(error_message)
else:
# Fallback to mock responses if MCP client not available
logger.warning(
"MCP client not available, using fallback mock responses"
)
await self._send_mock_responses(session_id, message)
except Exception as e:
logger.error(f"Error in _send_message_to_session: {e}")
raise
async def _send_mock_responses(self, session_id: str, message: str):
"""Fallback method for mock responses when MCP is unavailable"""
# Create mock agent responses
agent_responses = [
{
"agent": "ScrumMaster",
"response": f"Thank you for the input! I'll facilitate our discussion on: '{message[:100]}...' Let's ensure all team members can contribute.",
"delay": 1.0,
},
{
"agent": "ProductOwner",
"response": "From a product perspective, I need to prioritize the features you mentioned. Here's my initial assessment of the requirements.",
"delay": 2.0,
},
]
# Send mock responses with delays
for agent_data in agent_responses:
await asyncio.sleep(agent_data["delay"])
response_message = ConversationMessage(
id=f"{agent_data['agent'].lower()}_{datetime.now().timestamp()}",
session_id=session_id,
role="assistant",
content=agent_data["response"],
timestamp=datetime.now().strftime("%H:%M:%S"),
message_type=MessageType.AGENT_RESPONSE.value,
agent_name=agent_data["agent"],
)
# Emit the response
self.message_received.emit(response_message)
def send_message_to_session(self, message_data: dict):
"""Send message to session service"""
# Extract session_id for future use
session_id = message_data["session_id"] # noqa: F841
# Emit typing indicators for agents (simulate agent processing)
QTimer.singleShot(
500, lambda: self.typing_indicator_updated.emit("Agent 1", True)
)
QTimer.singleShot(
2000, lambda: self.typing_indicator_updated.emit("Agent 1", False)
)
# TODO: Implement real session service communication
# For now, emit a mock response
mock_response = ConversationMessage(
sender="Assistant",
content="This is a mock response. Real session integration coming soon.", # noqa: E501
timestamp=datetime.now(),
message_type="assistant",
)
# Simulate processing delay
QTimer.singleShot(
1500, lambda: self.message_received.emit(mock_response.to_dict())
)
def queue_message(self, session_id: str, message: str, agents: List[str] = None):
"""Queue a message for processing"""
self.message_queue.append(
{"session_id": session_id, "message": message, "agents": agents or []}
)
def stop_worker(self):
"""Stop the worker thread"""
self._running = False
class ConversationService(QObject):
"""Service for managing conversation flow and integration"""
# Signals
message_added = Signal(ConversationMessage)
message_sent = Signal(str, str) # session_id, message_id
agent_typing_started = Signal(str, str) # session_id, agent_name
agent_typing_stopped = Signal(str, str) # session_id, agent_name
conversation_cleared = Signal(str) # session_id
conversation_exported = Signal(str, str) # session_id, file_path
error_occurred = Signal(str, str) # session_id, error_message
def __init__(self, session_service: SessionService):
super().__init__()
self.session_service = session_service
self.conversations: Dict[str, List[ConversationMessage]] = {}
self.active_sessions: Dict[str, Dict[str, Any]] = {}
# Initialize MCP client for backend integration
self.mcp_client = MCPConversationClient()
# Initialize worker thread
self.worker = ConversationWorker(session_service, self.mcp_client)
self._connect_worker_signals()
self.worker.start()
def _connect_worker_signals(self):
"""Connect worker thread signals"""
self.worker.message_sent.connect(self.message_sent.emit)
self.worker.message_received.connect(self._on_message_received)
self.worker.agent_typing.connect(self.agent_typing_started.emit)
self.worker.agent_stopped_typing.connect(self.agent_typing_stopped.emit)
self.worker.error_occurred.connect(self.error_occurred.emit)
def _on_message_received(self, message: ConversationMessage):
"""Handle received message from worker"""
self.add_message_to_conversation(message)
self.message_added.emit(message)
def start_conversation(self, session_id: str, session_config: Dict[str, Any]):
"""Start a new conversation for a session"""
self.active_sessions[session_id] = session_config
self.conversations[session_id] = []
# Add welcome message
session_name = session_config.get("name", "Unknown")
agents_list = ", ".join(session_config.get("agents", []))
content = f"Session '{session_name}' started with agents: {agents_list}"
welcome_message = ConversationMessage(
id=f"welcome_{session_id}",
session_id=session_id,
role="system",
content=content,
timestamp=datetime.now().strftime("%H:%M:%S"),
)
self.add_message_to_conversation(welcome_message)
self.message_added.emit(welcome_message)
logger.info(f"Started conversation for session {session_id}")
def send_message(self, session_id: str, message: str) -> str:
"""Send a user message to the conversation"""
if session_id not in self.active_sessions:
raise ValueError(f"No active session {session_id}")
# Create user message
message_id = f"user_{datetime.now().timestamp()}"
user_message = ConversationMessage(
id=message_id,
session_id=session_id,
role="user",
content=message,
timestamp=datetime.now().strftime("%H:%M:%S"),
)
# Add to conversation immediately
self.add_message_to_conversation(user_message)
self.message_added.emit(user_message)
# Queue for processing by agents
session_config = self.active_sessions[session_id]
agents = session_config.get("agents", [])
self.worker.queue_message(session_id, message, agents)
self.message_sent.emit(session_id, message_id)
logger.info(f"Sent message to session {session_id}: {message[:50]}...")
return message_id
def send_targeted_message(
self, session_id: str, message: str, target_agents: List[str]
) -> str:
"""Send a user message to specific agents in the conversation"""
if session_id not in self.active_sessions:
raise ValueError(f"No active session {session_id}")
# Validate target agents exist in session
session_config = self.active_sessions[session_id]
available_agents = session_config.get("agents", [])
invalid_agents = [
agent for agent in target_agents if agent not in available_agents
]
if invalid_agents:
raise ValueError(f"Invalid agents: {', '.join(invalid_agents)}")
# Create targeted user message
user_message = ConversationMessage.create_user_message(
session_id=session_id, content=message, target_agents=target_agents
)
# Add to conversation immediately
self.add_message_to_conversation(user_message)
self.message_added.emit(user_message)
# Queue for processing by specific agents
self.worker.queue_message(session_id, message, target_agents)
self.message_sent.emit(session_id, user_message.id)
agent_list = ", ".join(target_agents)
logger.info(
f"Sent targeted message to {agent_list} in session {session_id}: "
f"{message[:50]}..."
)
return user_message.id
def add_agent_to_agent_message(
self, session_id: str, source_agent: str, target_agent: str, content: str
):
"""Add an agent-to-agent communication message"""
message = ConversationMessage.create_agent_to_agent_message(
session_id=session_id,
source_agent=source_agent,
target_agent=target_agent,
content=content,
)
self.add_message_to_conversation(message)
self.message_added.emit(message)
# Factory methods for creating messages
def create_user_message(
self, session_id: str, content: str, target_agents: Optional[List[str]] = None
) -> ConversationMessage:
"""Create a user message"""
return ConversationMessage.create_user_message(
session_id, content, target_agents
)
def create_targeted_user_message(
self, session_id: str, content: str, target_agents: List[str]
) -> ConversationMessage:
"""Create a targeted user message"""
return ConversationMessage.create_user_message(
session_id, content, target_agents
)
def create_agent_response(
self, session_id: str, content: str, agent_name: str
) -> ConversationMessage:
"""Create an agent response message"""
return ConversationMessage.create_agent_response(
session_id, agent_name, content
)
def create_agent_thinking(
self,
session_id: str,
content: str,
agent_name: str,
reasoning_context: Optional[str] = None,
) -> ConversationMessage:
"""Create an agent thinking message"""
message = ConversationMessage.create_agent_thinking_message(
session_id, agent_name, content
)
if reasoning_context:
message.reasoning_context = reasoning_context
return message
def create_agent_coordination(
self,
session_id: str,
content: str,
participating_agents: Optional[List[str]] = None,
) -> ConversationMessage:
"""Create an agent coordination message"""
message = ConversationMessage.create_agent_coordination_message(
session_id, content
)
if participating_agents:
message.metadata = {"participating_agents": participating_agents}
return message
def get_conversation_history(self, session_id: str) -> List[ConversationMessage]:
"""Get conversation history for a session"""
return self.get_conversation(session_id)
def add_message(self, message: ConversationMessage):
"""Add a message to the conversation"""
self.add_message_to_conversation(message)
def add_agent_thinking_message(
self, session_id: str, agent_name: str, reasoning: str
):
"""Add an agent thinking/reasoning message"""
message = ConversationMessage.create_agent_thinking_message(
session_id=session_id, agent_name=agent_name, reasoning=reasoning
)
self.add_message_to_conversation(message)
self.message_added.emit(message)
logger.info(f"Agent thinking: {agent_name}")
def add_agent_coordination_message(
self, session_id: str, content: str, involved_agents: List[str]
):
"""Add an agent coordination message"""
message = ConversationMessage(
id=f"coord_{datetime.now().timestamp()}",
session_id=session_id,
role="system",
content=content,
timestamp=datetime.now().strftime("%H:%M:%S"),
message_type=MessageType.AGENT_COORDINATION.value,
target_agents=involved_agents,
)
self.add_message_to_conversation(message)
self.message_added.emit(message)
logger.info("Agent coordination message added")
def get_conversation_by_message_types(
self, session_id: str, message_types: List[str]
) -> List[ConversationMessage]:
"""Get conversation history filtered by message types"""
conversation = self.get_conversation(session_id)
return [msg for msg in conversation if msg.message_type in message_types]
def add_message_to_conversation(self, message: ConversationMessage):
"""Add a message to the conversation history"""
if message.session_id not in self.conversations:
self.conversations[message.session_id] = []
self.conversations[message.session_id].append(message)
def get_conversation(self, session_id: str) -> List[ConversationMessage]:
"""Get conversation history for a session"""
return self.conversations.get(session_id, []).copy()
def clear_conversation(self, session_id: str):
"""Clear conversation history for a session"""
if session_id in self.conversations:
self.conversations[session_id] = []
self.conversation_cleared.emit(session_id)
logger.info(f"Cleared conversation for session {session_id}")
def end_conversation(self, session_id: str):
"""End conversation for a session"""
if session_id in self.active_sessions:
# Add goodbye message
session_name = self.active_sessions[session_id].get("name", "Unknown")
content = f"Session '{session_name}' ended."
goodbye_message = ConversationMessage(
id=f"goodbye_{session_id}",
session_id=session_id,
role="system",
content=content,
timestamp=datetime.now().strftime("%H:%M:%S"),
)
self.add_message_to_conversation(goodbye_message)
self.message_added.emit(goodbye_message)
# Clean up session
del self.active_sessions[session_id]
logger.info(f"Ended conversation for session {session_id}")
def export_conversation(
self, session_id: str, file_path: str, format: str = "json"
) -> bool:
"""Export conversation to file"""
try:
conversation = self.get_conversation(session_id)
if not conversation:
raise ValueError("No conversation to export")
if format.lower() == "json":
self._export_to_json(conversation, file_path)
elif format.lower() == "markdown":
self._export_to_markdown(conversation, file_path)
else:
raise ValueError(f"Unsupported export format: {format}")
self.conversation_exported.emit(session_id, file_path)
logger.info(f"Exported conversation {session_id} to {file_path}")
return True
except Exception as e:
logger.error(f"Failed to export conversation {session_id}: {e}")
self.error_occurred.emit(session_id, f"Export failed: {str(e)}")
return False
def _export_to_json(self, conversation: List[ConversationMessage], file_path: str):
"""Export conversation to JSON format"""
export_data = {
"exported_at": datetime.now().isoformat(),
"message_count": len(conversation),
"messages": [msg.to_dict() for msg in conversation],
}
with open(file_path, "w", encoding="utf-8") as f:
json.dump(export_data, f, indent=2, ensure_ascii=False)
def _export_to_markdown(
self, conversation: List[ConversationMessage], file_path: str
):
"""Export conversation to Markdown format"""
with open(file_path, "w", encoding="utf-8") as f:
f.write("# Conversation Export\n\n")
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
f.write(f"**Exported:** {timestamp}\n")
f.write(f"**Messages:** {len(conversation)}\n\n")
f.write("---\n\n")
for msg in conversation:
role_display = msg.role.title()
if msg.agent_name:
role_display = f"{msg.agent_name} ({role_display})"
f.write(f"## {role_display} - {msg.timestamp}\n\n")
f.write(f"{msg.content}\n\n")
f.write("---\n\n")
def get_active_sessions(self) -> Dict[str, Dict[str, Any]]:
"""Get all active conversation sessions"""
return self.active_sessions.copy()
def is_session_active(self, session_id: str) -> bool:
"""Check if a session has an active conversation"""
return session_id in self.active_sessions
def cleanup(self):
"""Clean up the service"""
if hasattr(self, "worker") and self.worker.isRunning():
self.worker.stop_worker()
self.worker.wait(3000) # Wait up to 3 seconds for thread to finish
# Clean up MCP client
try:
import asyncio
loop = asyncio.get_event_loop()
loop.run_until_complete(self.mcp_client.close())
except Exception as e:
logger.warning(f"Failed to close MCP client: {e}")
logger.info("ConversationService cleaned up")
# MCP Integration Methods
def send_message_via_mcp(
self,
session_id: str,
content: str,
target_agents: Optional[List[str]] = None,
message_type: str = "user",
) -> bool:
"""Send message via MCP server"""
try:
result = self.mcp_client.send_message_sync(
session_id, content, target_agents, message_type
)
logger.info(f"Message sent via MCP: {result}")
return True
except Exception as e:
logger.error(f"Failed to send message via MCP: {e}")
self.error_occurred.emit(session_id, f"MCP communication failed: {str(e)}")
return False
def load_conversation_from_mcp(self, session_id: str) -> bool:
"""Load conversation history from MCP server"""
try:
messages = self.mcp_client.get_conversation_history_sync(session_id)
# Clear current conversation
self.conversations[session_id] = []
# Convert MCP messages to ConversationMessage objects
for msg_data in messages:
timestamp_default = datetime.now().strftime("%H:%M:%S")
message = ConversationMessage(
id=msg_data.get("id", f"mcp_{datetime.now().timestamp()}"),
session_id=session_id,
role=msg_data.get("role", "user"),
content=msg_data.get("content", ""),
timestamp=msg_data.get("timestamp", timestamp_default),
message_type=msg_data.get("message_type", "user"),
agent_name=msg_data.get("agent_name"),
target_agents=msg_data.get("target_agents"),
source_agent=msg_data.get("source_agent"),
reasoning_context=msg_data.get("reasoning_context"),
metadata=msg_data.get("metadata"),
)
self.conversations[session_id].append(message)
self.message_added.emit(message)
logger.info(
f"Loaded {len(messages)} messages from MCP for session {session_id}"
)
return True
except Exception as e:
logger.error(f"Failed to load conversation from MCP: {e}")
self.error_occurred.emit(
session_id, f"Failed to load conversation: {str(e)}"
)
return False
def get_available_agents_from_mcp(self, session_id: str) -> List[str]:
"""Get available agents for session from MCP server"""
try:
agents = self.mcp_client.get_session_agents_sync(session_id)
logger.info(f"Available agents for {session_id}: {agents}")
return agents
except Exception as e:
logger.error(f"Failed to get agents from MCP: {e}")
return []
def sync_with_mcp_server(self, session_id: str):
"""Synchronize conversation state with MCP server"""
try:
# Send any unsent messages to MCP
if session_id in self.conversations:
for message in self.conversations[session_id]:
# Check if message needs to be synced
if not message.metadata or not message.metadata.get(
"synced_to_mcp"
):
self.send_message_via_mcp(
session_id,
message.content,
message.target_agents,
message.message_type,
)
# Mark as synced
if not message.metadata:
message.metadata = {}
message.metadata["synced_to_mcp"] = True
# Load latest from MCP
self.load_conversation_from_mcp(session_id)
except Exception as e:
logger.error(f"Failed to sync with MCP server: {e}")
self.error_occurred.emit(session_id, f"Sync failed: {str(e)}")