from fastmcp import FastMCP
from mem0 import Memory
from typing import Dict, List, Optional, Any
import os
import logging
from dotenv import load_dotenv
# Load environment variables
load_dotenv()
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Initialize FastMCP server
mcp = FastMCP("Mem0 Server 🧠")
mcp.description = "Self-hosted memory management for AI assistants"
# Initialize Mem0
def init_memory():
"""Initialize Mem0 with configuration"""
config = {
"vector_store": {
"provider": "pgvector",
"config": {
"host": os.getenv("POSTGRES_HOST", "localhost"),
"port": int(os.getenv("POSTGRES_PORT", "5432")),
"dbname": os.getenv("POSTGRES_DB", "postgres"),
"user": os.getenv("POSTGRES_USER", "postgres"),
"password": os.getenv("POSTGRES_PASSWORD", "postgres"),
"collection_name": os.getenv("POSTGRES_COLLECTION_NAME", "memories"),
"embedding_model_dims": 768, # Critical for nomic-embed-text
}
}
}
# Configure LLM based on provider
llm_provider = os.getenv("LLM_PROVIDER", "basic")
if llm_provider == "openai" and os.getenv("OPENAI_API_KEY"):
config["llm"] = {
"provider": "openai",
"config": {
"model": os.getenv("LLM_MODEL", "gpt-3.5-turbo"),
"api_key": os.getenv("OPENAI_API_KEY")
}
}
config["embedder"] = {
"provider": "openai",
"config": {
"model": os.getenv("EMBEDDING_MODEL", "text-embedding-ada-002"),
"api_key": os.getenv("OPENAI_API_KEY")
}
}
elif llm_provider == "ollama":
config["llm"] = {
"provider": "ollama",
"config": {
"model": os.getenv("OLLAMA_MODEL", "llama2"),
"ollama_base_url": os.getenv("OLLAMA_HOST", "http://localhost:11434")
}
}
config["embedder"] = {
"provider": "ollama",
"config": {
"model": "nomic-embed-text",
"ollama_base_url": os.getenv("OLLAMA_HOST", "http://localhost:11434")
}
}
elif llm_provider == "basic":
# Use Ollama for local embeddings and LLM
config["llm"] = {
"provider": "ollama",
"config": {
"model": os.getenv("OLLAMA_MODEL", "phi3:mini"),
"ollama_base_url": os.getenv("OLLAMA_HOST", "http://localhost:11434")
}
}
config["embedder"] = {
"provider": "ollama",
"config": {
"model": "nomic-embed-text",
"ollama_base_url": os.getenv("OLLAMA_HOST", "http://localhost:11434")
}
}
logger.info("Using local Ollama for both LLM and embeddings")
else:
# Default to Ollama embeddings
config["embedder"] = {
"provider": "ollama",
"config": {
"model": "nomic-embed-text",
"ollama_base_url": os.getenv("OLLAMA_HOST", "http://localhost:11434")
}
}
logger.warning("No valid LLM provider configured. Using Ollama embeddings.")
return Memory.from_config(config)
# Initialize memory store (lazy initialization)
memory = None
def get_memory():
"""Get or initialize memory store"""
global memory
if memory is None:
memory = init_memory()
return memory
@mcp.tool
def add_memory(content: str, user_id: str = "default", metadata: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
"""Add a memory to the store for a specific user.
Args:
content: The memory content to store
user_id: The user ID to associate with this memory (default: "default")
metadata: Optional metadata to attach to the memory
Returns:
Dictionary with memory ID and status
"""
try:
result = get_memory().add(content, user_id=user_id, metadata=metadata)
logger.info(f"Added memory for user {user_id}: {result}")
return {"success": True, "memory_id": result, "message": "Memory added successfully"}
except Exception as e:
logger.error(f"Error adding memory: {str(e)}")
return {"success": False, "error": str(e)}
@mcp.tool
def search_memories(query: str, user_id: str = "default", limit: int = 5) -> List[Dict[str, Any]]:
"""Search memories for a specific user based on a query.
Args:
query: The search query
user_id: The user ID to search memories for (default: "default")
limit: Maximum number of results to return (default: 5)
Returns:
List of relevant memories
"""
try:
results = get_memory().search(query, user_id=user_id, limit=limit)
logger.info(f"Raw search results: {results}")
# Handle different response formats from mem0
if isinstance(results, dict) and 'results' in results:
memories = results['results']
elif isinstance(results, list):
memories = results
else:
logger.warning(f"Unexpected results format: {type(results)}")
memories = []
logger.info(f"Found {len(memories)} memories for query '{query}' and user {user_id}")
return memories
except Exception as e:
logger.error(f"Error searching memories: {str(e)}")
return []
@mcp.tool
def get_all_memories(user_id: str = "default") -> List[Dict[str, Any]]:
"""Get all memories for a specific user.
Args:
user_id: The user ID to get memories for (default: "default")
Returns:
List of all memories for the user
"""
try:
results = get_memory().get_all(user_id=user_id)
logger.info(f"Raw get_all results: {results}")
# Handle different response formats from mem0
if isinstance(results, dict) and 'results' in results:
memories = results['results']
elif isinstance(results, list):
memories = results
else:
logger.warning(f"Unexpected results format: {type(results)}")
memories = []
logger.info(f"Retrieved {len(memories)} memories for user {user_id}")
return memories
except Exception as e:
logger.error(f"Error getting all memories: {str(e)}")
return []
@mcp.tool
def update_memory(memory_id: str, content: str) -> Dict[str, Any]:
"""Update a specific memory by ID.
Args:
memory_id: The ID of the memory to update
content: The new content for the memory
Returns:
Dictionary with update status
"""
try:
result = get_memory().update(memory_id, data=content)
logger.info(f"Updated memory {memory_id}")
return {"success": True, "memory_id": memory_id, "message": "Memory updated successfully"}
except Exception as e:
logger.error(f"Error updating memory: {str(e)}")
return {"success": False, "error": str(e)}
@mcp.tool
def delete_memory(memory_id: str) -> Dict[str, Any]:
"""Delete a specific memory by ID.
Args:
memory_id: The ID of the memory to delete
Returns:
Dictionary with deletion status
"""
try:
get_memory().delete(memory_id)
logger.info(f"Deleted memory {memory_id}")
return {"success": True, "message": f"Memory {memory_id} deleted successfully"}
except Exception as e:
logger.error(f"Error deleting memory: {str(e)}")
return {"success": False, "error": str(e)}
@mcp.tool
def delete_all_memories(user_id: str = "default") -> Dict[str, Any]:
"""Delete all memories for a specific user.
Args:
user_id: The user ID to delete all memories for (default: "default")
Returns:
Dictionary with deletion status
"""
try:
get_memory().delete_all(user_id=user_id)
logger.info(f"Deleted all memories for user {user_id}")
return {"success": True, "message": f"All memories for user {user_id} deleted successfully"}
except Exception as e:
logger.error(f"Error deleting all memories: {str(e)}")
return {"success": False, "error": str(e)}
@mcp.tool
def get_memory_stats(user_id: Optional[str] = None) -> Dict[str, Any]:
"""Get statistics about stored memories.
Args:
user_id: Optional user ID to get stats for. If None, gets overall stats.
Returns:
Dictionary with memory statistics
"""
try:
if user_id:
memories = get_memory().get_all(user_id=user_id)
return {
"user_id": user_id,
"total_memories": len(memories),
"status": "success"
}
else:
# This would need to be implemented based on your storage backend
return {
"message": "Overall stats not implemented yet",
"status": "partial"
}
except Exception as e:
logger.error(f"Error getting memory stats: {str(e)}")
return {"success": False, "error": str(e)}
if __name__ == "__main__":
# Run the server
import sys
# Check if we're running in Docker and use HTTP transport
if len(sys.argv) > 1 and sys.argv[1] == "--http":
host = os.getenv("HOST", "0.0.0.0")
port = int(os.getenv("PORT", "3000"))
print(f"Starting MCP server on HTTP {host}:{port}")
mcp.run(transport="sse", host=host, port=port)
else:
# Default STDIO transport for Claude Desktop
mcp.run()