mem0 Memory System

  • examples
#!/usr/bin/env python3 """ ✨ mem0 MCP Server Demo Application ✨ Made with ❤️ by Pink Pixel This is a simple command-line chat application that demonstrates how to use the mem0 MCP server for memory storage and retrieval in a conversational context. """ import sys import json import requests import os import sys from datetime import datetime from typing import Dict, Any, List, Optional # Add parent directory to path to import modules sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))) # Colors for terminal output class Colors: BLUE = '\033[94m' GREEN = '\033[92m' YELLOW = '\033[93m' RED = '\033[91m' BOLD = '\033[1m' END = '\033[0m' # Base URL for the mem0 MCP server MEM0_MCP_URL = "http://0.0.0.0:8000" class MemoChatApp: """A simple chat application that demonstrates mem0 MCP server integration.""" def __init__(self): """Initialize the chat application.""" self.conversation_history = [] self.configure_memory() def configure_memory(self): """Configure the memory provider.""" print(f"{Colors.BLUE}Configuring memory provider...{Colors.END}") config_data = { "provider": "ollama", "embedding_provider": "ollama", "model": "llama3", "embedding_model": "nomic-embed-text", "data_dir": "./memory_data" } try: response = requests.post(f"{MEM0_MCP_URL}/configure", json=config_data) response.raise_for_status() print(f"{Colors.GREEN}Memory provider configured successfully!{Colors.END}") except Exception as e: print(f"{Colors.RED}Error configuring memory provider: {str(e)}{Colors.END}") sys.exit(1) def add_memory(self, content: str, metadata: Optional[Dict[str, Any]] = None) -> str: """Add a memory to the mem0 MCP server.""" if metadata is None: metadata = {} # Add timestamp to metadata metadata["timestamp"] = datetime.now().isoformat() memory_data = { "content": content, "metadata": metadata } try: response = requests.post(f"{MEM0_MCP_URL}/memory/add", json=memory_data) response.raise_for_status() data = response.json() memory_id = data.get("memory_id") print(f"{Colors.GREEN}Memory stored with ID: {memory_id}{Colors.END}") return memory_id except Exception as e: print(f"{Colors.RED}Error storing memory: {str(e)}{Colors.END}") return "" def search_memories(self, query: str, limit: int = 5) -> List[Dict[str, Any]]: """Search for memories in the mem0 MCP server.""" search_data = { "query": query, "limit": limit } try: response = requests.post(f"{MEM0_MCP_URL}/memory/search", json=search_data) response.raise_for_status() data = response.json() return data.get("results", []) except Exception as e: print(f"{Colors.RED}Error searching memories: {str(e)}{Colors.END}") return [] def process_user_input(self, user_input: str) -> str: """Process user input and generate a response.""" # Add user input to conversation history self.conversation_history.append({"role": "user", "content": user_input}) # Check for special commands if user_input.lower() == "exit": return "exit" if user_input.lower().startswith("remember "): # Extract the content to remember content = user_input[9:].strip() memory_id = self.add_memory(content, {"type": "explicit"}) return f"I'll remember that: {content}" if user_input.lower().startswith("recall "): # Search for memories related to the query query = user_input[7:].strip() memories = self.search_memories(query) if not memories: return f"I don't have any memories related to '{query}'." response = f"Here's what I remember about '{query}':\n\n" for i, memory in enumerate(memories): response += f"{i+1}. {memory.get('content', '')}\n" return response # For a real application, you would call an LLM here # For this demo, we'll just echo the input with some context relevant_memories = self.search_memories(user_input) if relevant_memories: memory_context = "I found some relevant information in my memory:\n" for memory in relevant_memories: memory_context += f"- {memory.get('content', '')}\n" response = f"{memory_context}\nBased on this, I would respond to '{user_input}' with more context." else: # No relevant memories found, store this as a new memory self.add_memory(user_input, {"type": "conversation"}) response = f"I don't have any relevant memories about that. I've stored '{user_input}' for future reference." # Add response to conversation history self.conversation_history.append({"role": "assistant", "content": response}) return response def run(self): """Run the chat application.""" print(f"\n{Colors.BOLD}{Colors.BLUE}✨ mem0 Chat Demo ✨{Colors.END}") print(f"{Colors.BOLD}Made with ❤️ by Pink Pixel{Colors.END}\n") print(f"{Colors.YELLOW}Type 'exit' to quit{Colors.END}") print(f"{Colors.YELLOW}Type 'remember [something]' to explicitly store a memory{Colors.END}") print(f"{Colors.YELLOW}Type 'recall [query]' to search for memories{Colors.END}\n") while True: user_input = input(f"{Colors.BOLD}You: {Colors.END}") response = self.process_user_input(user_input) if response == "exit": print(f"\n{Colors.BOLD}{Colors.GREEN}Thanks for chatting! Goodbye!{Colors.END}") break print(f"{Colors.BOLD}{Colors.GREEN}Assistant: {Colors.END}{response}\n") if __name__ == "__main__": app = MemoChatApp() app.run()