mem0 Memory System
- examples
#!/usr/bin/env python3
"""
✨ mem0 MCP Server Demo Application ✨
Made with ❤️ by Pink Pixel
This is a simple command-line chat application that demonstrates how to use
the mem0 MCP server for memory storage and retrieval in a conversational context.
"""
import sys
import json
import requests
import os
import sys
from datetime import datetime
from typing import Dict, Any, List, Optional
# Add parent directory to path to import modules
sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))
# Colors for terminal output
class Colors:
BLUE = '\033[94m'
GREEN = '\033[92m'
YELLOW = '\033[93m'
RED = '\033[91m'
BOLD = '\033[1m'
END = '\033[0m'
# Base URL for the mem0 MCP server
MEM0_MCP_URL = "http://0.0.0.0:8000"
class MemoChatApp:
"""A simple chat application that demonstrates mem0 MCP server integration."""
def __init__(self):
"""Initialize the chat application."""
self.conversation_history = []
self.configure_memory()
def configure_memory(self):
"""Configure the memory provider."""
print(f"{Colors.BLUE}Configuring memory provider...{Colors.END}")
config_data = {
"provider": "ollama",
"embedding_provider": "ollama",
"model": "llama3",
"embedding_model": "nomic-embed-text",
"data_dir": "./memory_data"
}
try:
response = requests.post(f"{MEM0_MCP_URL}/configure", json=config_data)
response.raise_for_status()
print(f"{Colors.GREEN}Memory provider configured successfully!{Colors.END}")
except Exception as e:
print(f"{Colors.RED}Error configuring memory provider: {str(e)}{Colors.END}")
sys.exit(1)
def add_memory(self, content: str, metadata: Optional[Dict[str, Any]] = None) -> str:
"""Add a memory to the mem0 MCP server."""
if metadata is None:
metadata = {}
# Add timestamp to metadata
metadata["timestamp"] = datetime.now().isoformat()
memory_data = {
"content": content,
"metadata": metadata
}
try:
response = requests.post(f"{MEM0_MCP_URL}/memory/add", json=memory_data)
response.raise_for_status()
data = response.json()
memory_id = data.get("memory_id")
print(f"{Colors.GREEN}Memory stored with ID: {memory_id}{Colors.END}")
return memory_id
except Exception as e:
print(f"{Colors.RED}Error storing memory: {str(e)}{Colors.END}")
return ""
def search_memories(self, query: str, limit: int = 5) -> List[Dict[str, Any]]:
"""Search for memories in the mem0 MCP server."""
search_data = {
"query": query,
"limit": limit
}
try:
response = requests.post(f"{MEM0_MCP_URL}/memory/search", json=search_data)
response.raise_for_status()
data = response.json()
return data.get("results", [])
except Exception as e:
print(f"{Colors.RED}Error searching memories: {str(e)}{Colors.END}")
return []
def process_user_input(self, user_input: str) -> str:
"""Process user input and generate a response."""
# Add user input to conversation history
self.conversation_history.append({"role": "user", "content": user_input})
# Check for special commands
if user_input.lower() == "exit":
return "exit"
if user_input.lower().startswith("remember "):
# Extract the content to remember
content = user_input[9:].strip()
memory_id = self.add_memory(content, {"type": "explicit"})
return f"I'll remember that: {content}"
if user_input.lower().startswith("recall "):
# Search for memories related to the query
query = user_input[7:].strip()
memories = self.search_memories(query)
if not memories:
return f"I don't have any memories related to '{query}'."
response = f"Here's what I remember about '{query}':\n\n"
for i, memory in enumerate(memories):
response += f"{i+1}. {memory.get('content', '')}\n"
return response
# For a real application, you would call an LLM here
# For this demo, we'll just echo the input with some context
relevant_memories = self.search_memories(user_input)
if relevant_memories:
memory_context = "I found some relevant information in my memory:\n"
for memory in relevant_memories:
memory_context += f"- {memory.get('content', '')}\n"
response = f"{memory_context}\nBased on this, I would respond to '{user_input}' with more context."
else:
# No relevant memories found, store this as a new memory
self.add_memory(user_input, {"type": "conversation"})
response = f"I don't have any relevant memories about that. I've stored '{user_input}' for future reference."
# Add response to conversation history
self.conversation_history.append({"role": "assistant", "content": response})
return response
def run(self):
"""Run the chat application."""
print(f"\n{Colors.BOLD}{Colors.BLUE}✨ mem0 Chat Demo ✨{Colors.END}")
print(f"{Colors.BOLD}Made with ❤️ by Pink Pixel{Colors.END}\n")
print(f"{Colors.YELLOW}Type 'exit' to quit{Colors.END}")
print(f"{Colors.YELLOW}Type 'remember [something]' to explicitly store a memory{Colors.END}")
print(f"{Colors.YELLOW}Type 'recall [query]' to search for memories{Colors.END}\n")
while True:
user_input = input(f"{Colors.BOLD}You: {Colors.END}")
response = self.process_user_input(user_input)
if response == "exit":
print(f"\n{Colors.BOLD}{Colors.GREEN}Thanks for chatting! Goodbye!{Colors.END}")
break
print(f"{Colors.BOLD}{Colors.GREEN}Assistant: {Colors.END}{response}\n")
if __name__ == "__main__":
app = MemoChatApp()
app.run()