Skip to main content
Glama
conversation_manager.py13.2 kB
#!/usr/bin/env python3 """ Conversation History Manager for OpenRouter MCP Server Handles conversation continuation with UUID tags to maintain chat history across multiple tool calls. """ import json import os import uuid from typing import Dict, List, Optional, Any from datetime import datetime import logging logger = logging.getLogger("conversation-manager") class ConversationManager: """Manages conversation history with UUID-based continuation""" def __init__(self, storage_dir: str = "/tmp/openrouter_conversations"): """Initialize conversation manager Args: storage_dir: Directory to store conversation files """ self.storage_dir = storage_dir self.ensure_storage_dir() # In-memory cache for active conversations (following best practices) self._conversation_cache = {} def ensure_storage_dir(self): """Ensure storage directory exists""" try: os.makedirs(self.storage_dir, exist_ok=True) logger.info(f"STORAGE: Conversation storage directory: {self.storage_dir}") # Test write access test_file = os.path.join(self.storage_dir, "test_write.tmp") with open(test_file, "w") as f: f.write("test") os.remove(test_file) logger.info(f"STORAGE: Directory is writable: {self.storage_dir}") except Exception as e: logger.error( f"STORAGE: Failed to create or write to storage directory {self.storage_dir}: {e}" ) raise def get_conversation_file(self, continuation_id: str) -> str: """Get file path for conversation Args: continuation_id: UUID of the conversation Returns: File path for the conversation """ return os.path.join(self.storage_dir, f"conversation_{continuation_id}.json") def create_conversation(self) -> str: """Create a new conversation Returns: New conversation UUID """ continuation_id = str(uuid.uuid4()) conversation_data = { "id": continuation_id, "created_at": datetime.utcnow().isoformat(), "messages": [], } file_path = self.get_conversation_file(continuation_id) logger.debug(f"STORAGE: Creating conversation file: {file_path}") try: with open(file_path, "w", encoding="utf-8") as f: json.dump(conversation_data, f, indent=2) # Add to cache self._conversation_cache[continuation_id] = conversation_data logger.info(f"STORAGE: Created new conversation: {continuation_id}") logger.debug( f"STORAGE: Conversation file created successfully at: {file_path}" ) return continuation_id except Exception as e: logger.error(f"STORAGE: Error creating conversation {continuation_id}: {e}") raise def load_conversation(self, continuation_id: str) -> Optional[Dict[str, Any]]: """Load conversation by ID with in-memory caching Args: continuation_id: UUID of the conversation Returns: Conversation data or None if not found """ # Check in-memory cache first (best practice) if continuation_id in self._conversation_cache: logger.debug(f"STORAGE: Loading conversation {continuation_id} from cache") cached_data = self._conversation_cache[continuation_id] logger.debug( f"STORAGE: Cached conversation has {len(cached_data.get('messages', []))} messages" ) return cached_data file_path = self.get_conversation_file(continuation_id) logger.debug(f"STORAGE: Attempting to load conversation from file: {file_path}") if not os.path.exists(file_path): logger.warning(f"STORAGE: Conversation file not found: {file_path}") return None try: with open(file_path, "r", encoding="utf-8") as f: conversation_data = json.load(f) # Cache the loaded conversation self._conversation_cache[continuation_id] = conversation_data logger.debug( f"STORAGE: Loaded conversation {continuation_id} with {len(conversation_data.get('messages', []))} messages from file" ) return conversation_data except Exception as e: logger.error(f"STORAGE: Error loading conversation {continuation_id}: {e}") return None def save_conversation(self, conversation_data: Dict[str, Any]): """Save conversation data with cache update Args: conversation_data: Conversation data to save """ continuation_id = conversation_data.get("id") if not continuation_id: logger.error("Cannot save conversation without ID") return file_path = self.get_conversation_file(continuation_id) conversation_data["updated_at"] = datetime.utcnow().isoformat() try: # Update cache first (best practice for performance) self._conversation_cache[continuation_id] = conversation_data # Then persist to disk with open(file_path, "w", encoding="utf-8") as f: json.dump(conversation_data, f, indent=2) logger.debug(f"Saved conversation {continuation_id}") except Exception as e: logger.error(f"Error saving conversation {continuation_id}: {e}") # Remove from cache if save failed if continuation_id in self._conversation_cache: del self._conversation_cache[continuation_id] def add_message( self, continuation_id: str, role: str, content: str, metadata: Optional[Dict[str, Any]] = None, ) -> bool: """Add a message to the conversation Args: continuation_id: UUID of the conversation role: Message role (user, assistant) content: Message content metadata: Optional metadata (model, files, etc.) Returns: True if successful, False otherwise """ logger.debug( f"STORAGE: Adding {role} message to conversation {continuation_id}" ) conversation_data = self.load_conversation(continuation_id) if not conversation_data: logger.error( f"STORAGE: Cannot add message to non-existent conversation: {continuation_id}" ) return False message = { "role": role, "content": content, "timestamp": datetime.utcnow().isoformat(), } if metadata: message["metadata"] = metadata conversation_data["messages"].append(message) logger.debug( f"STORAGE: Message added to conversation data. Total messages: {len(conversation_data['messages'])}" ) # Save conversation self.save_conversation(conversation_data) # Verify the message was actually saved verification_data = self.load_conversation(continuation_id) if verification_data and len(verification_data["messages"]) == len( conversation_data["messages"] ): logger.info( f"STORAGE: Successfully added {role} message to conversation {continuation_id}. Total messages: {len(verification_data['messages'])}" ) return True else: logger.error( f"STORAGE: Failed to verify message save for conversation {continuation_id}" ) return False def get_conversation_history( self, continuation_id: str, max_tokens: Optional[int] = None ) -> List[Dict[str, str]]: """Get conversation history in OpenAI format with optional token optimization Args: continuation_id: UUID of the conversation max_tokens: Maximum tokens to include (follows Zen MCP pattern) Returns: List of messages in OpenAI format """ conversation_data = self.load_conversation(continuation_id) if not conversation_data: return [] messages = conversation_data.get("messages", []) # Convert to OpenAI format openai_messages = [] for msg in messages: openai_messages.append({"role": msg["role"], "content": msg["content"]}) # Token optimization (following Zen MCP pattern) if max_tokens and len(openai_messages) > 0: # Rough token estimation (4 chars = 1 token) total_chars = sum(len(msg["content"]) for msg in openai_messages) estimated_tokens = total_chars // 4 if estimated_tokens > max_tokens: # Keep most recent messages within token limit target_chars = max_tokens * 4 optimized_messages = [] current_chars = 0 # Start from most recent and work backwards for msg in reversed(openai_messages): msg_chars = len(msg["content"]) if current_chars + msg_chars <= target_chars: optimized_messages.insert(0, msg) current_chars += msg_chars else: break logger.debug( f"Optimized conversation {continuation_id}: {len(openai_messages)} -> {len(optimized_messages)} messages" ) openai_messages = optimized_messages logger.debug( f"Retrieved {len(openai_messages)} messages for conversation {continuation_id}" ) return openai_messages def get_conversation_summary(self, continuation_id: str) -> Dict[str, Any]: """Get conversation summary Args: continuation_id: UUID of the conversation Returns: Conversation summary """ conversation_data = self.load_conversation(continuation_id) if not conversation_data: return {} messages = conversation_data.get("messages", []) return { "id": continuation_id, "created_at": conversation_data.get("created_at"), "updated_at": conversation_data.get("updated_at"), "message_count": len(messages), "last_message": messages[-1] if messages else None, } def list_conversations(self) -> List[Dict[str, Any]]: """List all conversations Returns: List of conversation summaries """ conversations = [] try: for filename in os.listdir(self.storage_dir): if filename.startswith("conversation_") and filename.endswith(".json"): continuation_id = filename[ 13:-5 ] # Remove "conversation_" and ".json" summary = self.get_conversation_summary(continuation_id) if summary: conversations.append(summary) except Exception as e: logger.error(f"Error listing conversations: {e}") # Sort by updated_at (most recent first) conversations.sort(key=lambda x: x.get("updated_at", ""), reverse=True) return conversations def delete_conversation(self, continuation_id: str) -> bool: """Delete a conversation Args: continuation_id: UUID of the conversation to delete Returns: True if successful, False otherwise """ file_path = self.get_conversation_file(continuation_id) try: if os.path.exists(file_path): os.remove(file_path) logger.info(f"Deleted conversation: {continuation_id}") return True else: logger.warning( f"Conversation not found for deletion: {continuation_id}" ) return False except Exception as e: logger.error(f"Error deleting conversation {continuation_id}: {e}") return False def cleanup_old_conversations(self, max_age_days: int = 30): """Clean up old conversations Args: max_age_days: Maximum age in days to keep conversations """ from datetime import timedelta cutoff_date = datetime.utcnow() - timedelta(days=max_age_days) cutoff_iso = cutoff_date.isoformat() deleted_count = 0 conversations = self.list_conversations() for conv in conversations: updated_at = conv.get("updated_at", "") if updated_at < cutoff_iso: if self.delete_conversation(conv["id"]): deleted_count += 1 logger.info( f"Cleaned up {deleted_count} old conversations (older than {max_age_days} days)" ) return deleted_count

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/slyfox1186/claude-code-openrouter'

If you have feedback or need assistance with the MCP directory API, please join our Discord server