Skip to main content
Glama
mcp_server.py18.9 kB
""" SelfMemory MCP Server following selfmemory openmemory patterns. This module implements an MCP (Model Context Protocol) server that provides memory operations for SelfMemory. The memory client is initialized lazily to prevent server crashes when external dependencies are unavailable. Key features: - Lazy memory client initialization - Graceful error handling for unavailable dependencies - Environment variable parsing for configuration - FastAPI integration following selfmemory patterns """ import contextvars import json import logging import os import sys from contextlib import asynccontextmanager from pathlib import Path from typing import Any from auth.client_cache import get_or_create_client from dotenv import load_dotenv from fastapi import FastAPI from mcp.server.fastmcp import FastMCP # Ensure project root is in sys.path PROJECT_ROOT = Path(__file__).resolve().parent.parent.parent.parent if str(PROJECT_ROOT) not in sys.path: sys.path.append(str(PROJECT_ROOT)) load_dotenv() # Load environment variables from .env # Import client cache for performance optimization sys.path.insert(0, str(Path(__file__).resolve().parent.parent.parent)) # Configure logging logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s" ) logger = logging.getLogger(__name__) # Configuration USER_ID = os.getenv("USER_ID", "default") SELFMEMORY_PLATFORM_MODE = ( os.getenv("SELFMEMORY_PLATFORM_MODE", "false").lower() == "true" ) CORE_SERVER_HOST = os.getenv("SELFMEMORY_API_HOST", "http://localhost:8081") MCP_SERVER_HOST = os.getenv("MCP_SERVER_HOST", "0.0.0.0") MCP_SERVER_PORT = int(os.getenv("MCP_SERVER_PORT", "8765")) # Import appropriate classes based on mode if SELFMEMORY_PLATFORM_MODE: from selfmemory import SelfMemoryClient MEMORY_CLIENT_CLASS = SelfMemoryClient logger.info("🏢 Platform Mode: Using SelfMemoryClient with API key authentication") else: from selfmemory import SelfMemory MEMORY_CLIENT_CLASS = SelfMemory logger.info("🏠 Open Source Mode: Using SelfMemory class without authentication") # Initialize MCP with stateless_http=False for session management mcp = FastMCP( name="selfmemory-mcp-server", stateless_http=False, json_response=True, ) # Context variables for user_id and client_name user_id_var: contextvars.ContextVar[str] = contextvars.ContextVar("user_id") client_name_var: contextvars.ContextVar[str] = contextvars.ContextVar("client_name") def _extract_memory_contents(search_result: dict[str, Any]) -> list[str]: """Extract only content strings from search results for LLM consumption. Args: search_result: Full search result dictionary from client.search() Returns: List of memory content strings, empty list if no results """ if "results" not in search_result or not search_result["results"]: return [] return [memory.get("content", "") for memory in search_result["results"]] def _generate_memory_confirmation(content: str) -> str: """Generate a personalized confirmation message for stored memory. Args: content: The memory content that was stored Returns: Personalized confirmation message string """ return "I learnt more about you with this!" def validate_and_get_client_platform(api_key: str) -> SelfMemoryClient: """ Validate request and create authenticated SelfMemoryClient (Platform Mode). Uses client caching to avoid creating new connections on every request, significantly improving performance by reusing TCP/TLS connections. Args: api_key: API key for authentication Returns: SelfMemoryClient: Client authenticated with the user's token (cached) Raises: ValueError: If authentication fails """ try: if not api_key: raise ValueError("No API key provided") # Use helper function to eliminate DRY violation def create_client(): client = SelfMemoryClient(api_key=api_key, host=CORE_SERVER_HOST) logger.info("✅ MCP Platform: API key authenticated (new client created)") return client return get_or_create_client(api_key, create_client) except ValueError: # Re-raise ValueError as-is (these are our custom auth errors) raise except Exception as e: logger.error(f"Authentication error: {e}") raise ValueError("Authentication failed") from e def get_memory_instance_open_source() -> SelfMemory: """ Create SelfMemory instance for Open Source Mode (no authentication required). Returns: SelfMemory: Direct memory instance for local usage Raises: ValueError: If memory instance creation fails """ try: # Create direct SelfMemory instance (no authentication needed) memory = SelfMemory() logger.info("✅ MCP Open Source: Direct memory instance created") return memory except Exception as e: logger.error(f"Failed to create memory instance: {e}") raise ValueError("Memory instance creation failed") from e def validate_and_get_memory_client(api_key: str | None = None) -> Any: """ Dual mode client/memory provider - returns appropriate instance based on mode. Args: api_key: API key for platform mode authentication Returns: SelfMemoryClient (Platform Mode) or SelfMemory (Open Source Mode) Raises: ValueError: If authentication/initialization fails """ if SELFMEMORY_PLATFORM_MODE: if not api_key: raise ValueError("API key required for platform mode") return validate_and_get_client_platform(api_key) return get_memory_instance_open_source() # Don't initialize memory client at import time - do it lazily when needed def get_memory_client_safe(): """Get memory client with error handling. Returns None if client cannot be initialized.""" try: if SELFMEMORY_PLATFORM_MODE: # Platform mode requires API key - will be handled per request return None # Will be created per request with proper auth # Open source mode - create direct instance return SelfMemory() except Exception as e: logger.warning(f"Failed to get memory client: {e}") return None @mcp.tool( description="Add a new memory. This method is called everytime the user informs anything about themselves, their preferences, or anything that has any relevant information which can be useful in the future conversation." ) async def add_memories(text: str) -> str: uid = user_id_var.get(USER_ID) client_name = client_name_var.get("selfmemory") if not uid: return "Error: user_id not provided" if not client_name: return "Error: client_name not provided" try: if SELFMEMORY_PLATFORM_MODE: # Platform mode: Use SelfMemoryClient with API key api_key = os.getenv("SELFMEMORY_API_KEY") if not api_key: return "Error: SELFMEMORY_API_KEY environment variable required for platform mode" memory_client = validate_and_get_client_platform(api_key) # Use selfmemory-style message format messages = [{"role": "user", "content": text}] result = memory_client.add( messages=messages, user_id=uid, metadata={ "source_app": "selfmemory", "mcp_client": client_name, }, ) return json.dumps(result, indent=2) # Open source mode: Use SelfMemory class directly memory_client = get_memory_client_safe() if not memory_client: return ( "Error: Memory system is currently unavailable. Please try again later." ) result = memory_client.add( memory_content=text, user_id=uid, metadata={ "source_app": "selfmemory", "mcp_client": client_name, }, ) if not result.get("success", False): return f"Error adding to memory: {result.get('error', 'Unknown error')}" return json.dumps(result, indent=2) except Exception as e: logger.exception(f"Error adding to memory: {e}") return f"Error adding to memory: {e}" @mcp.tool( description="Search through stored memories. This method is called EVERYTIME the user asks anything." ) async def search_memory(query: str) -> str: uid = user_id_var.get(USER_ID) client_name = client_name_var.get("selfmemory") if not uid: return "Error: user_id not provided" if not client_name: return "Error: client_name not provided" try: if SELFMEMORY_PLATFORM_MODE: # Platform mode: Use SelfMemoryClient with API key api_key = os.getenv("SELFMEMORY_API_KEY") if not api_key: return "Error: SELFMEMORY_API_KEY environment variable required for platform mode" memory_client = validate_and_get_client_platform(api_key) result = memory_client.search(query=query, user_id=uid, limit=10) results = [] if isinstance(result, list): # Handle direct list response for memory in result: results.append( { "id": memory.get("id"), "memory": memory.get("memory", memory.get("content")), "score": memory.get("score", 1.0), "created_at": memory.get("created_at"), } ) elif "results" in result: # Handle results wrapper for memory in result["results"]: results.append( { "id": memory.get("id"), "memory": memory.get("memory", memory.get("content")), "score": memory.get("score", 1.0), "created_at": memory.get("created_at"), } ) return json.dumps({"results": results}, indent=2) # Open source mode: Use SelfMemory class directly memory_client = get_memory_client_safe() if not memory_client: return ( "Error: Memory system is currently unavailable. Please try again later." ) result = memory_client.search(query=query, user_id=uid, limit=10) results = [] if "results" in result: for memory in result["results"]: results.append( { "id": memory.get("id"), "memory": memory.get("content"), "score": memory.get("score", 1.0), "created_at": memory.get("metadata", {}).get("created_at"), } ) return json.dumps({"results": results}, indent=2) except Exception as e: logger.exception(f"Error searching memory: {e}") return f"Error searching memory: {e}" @mcp.tool(description="List all memories in the user's memory") async def list_memories() -> str: uid = user_id_var.get(USER_ID) client_name = client_name_var.get("selfmemory") if not uid: return "Error: user_id not provided" if not client_name: return "Error: client_name not provided" try: if SELFMEMORY_PLATFORM_MODE: # Platform mode: Use SelfMemoryClient with API key api_key = os.getenv("SELFMEMORY_API_KEY") if not api_key: return "Error: SELFMEMORY_API_KEY environment variable required for platform mode" memory_client = validate_and_get_client_platform(api_key) result = memory_client.get_all(user_id=uid) filtered_memories = [] if isinstance(result, list): # Handle direct list response for memory in result: filtered_memories.append( { "id": memory.get("id"), "memory": memory.get("memory", memory.get("content")), "created_at": memory.get("created_at"), } ) elif "results" in result: # Handle results wrapper for memory in result["results"]: filtered_memories.append( { "id": memory.get("id"), "memory": memory.get("memory", memory.get("content")), "created_at": memory.get("created_at"), } ) return json.dumps(filtered_memories, indent=2) # Open source mode: Use SelfMemory class directly memory_client = get_memory_client_safe() if not memory_client: return ( "Error: Memory system is currently unavailable. Please try again later." ) result = memory_client.get_all(user_id=uid) filtered_memories = [] if "results" in result: for memory in result["results"]: filtered_memories.append( { "id": memory.get("id"), "memory": memory.get("content"), "created_at": memory.get("metadata", {}).get("created_at"), } ) return json.dumps(filtered_memories, indent=2) except Exception as e: logger.exception(f"Error getting memories: {e}") return f"Error getting memories: {e}" @mcp.tool(description="Delete all memories in the user's memory") async def delete_all_memories() -> str: uid = user_id_var.get(USER_ID) client_name = client_name_var.get("selfmemory") if not uid: return "Error: user_id not provided" if not client_name: return "Error: client_name not provided" try: if SELFMEMORY_PLATFORM_MODE: # Platform mode: Use SelfMemoryClient with API key api_key = os.getenv("SELFMEMORY_API_KEY") if not api_key: return "Error: SELFMEMORY_API_KEY environment variable required for platform mode" memory_client = validate_and_get_client_platform(api_key) # Get all memories first, then delete them individually memories = memory_client.get_all(user_id=uid) deleted_count = 0 if isinstance(memories, list): for memory in memories: if memory.get("id"): try: memory_client.delete(memory["id"]) deleted_count += 1 except Exception as e: logger.warning( f"Failed to delete memory {memory['id']}: {e}" ) elif "results" in memories: for memory in memories["results"]: if memory.get("id"): try: memory_client.delete(memory["id"]) deleted_count += 1 except Exception as e: logger.warning( f"Failed to delete memory {memory['id']}: {e}" ) return f"Successfully deleted {deleted_count} memories" # Open source mode: Use SelfMemory class directly memory_client = get_memory_client_safe() if not memory_client: return ( "Error: Memory system is currently unavailable. Please try again later." ) return "Successfully deleted all memories" except Exception as e: logger.exception(f"Error deleting memories: {e}") return f"Error deleting memories: {e}" # Lifespan context manager for MCP session management @asynccontextmanager async def lifespan(app_instance: FastAPI): """Manage server lifecycle - ensures MCP session manager is running. This is critical for proper RequestResponder initialization and prevents the 'RequestResponder must be used as a context manager' error. """ async with mcp.session_manager.run(): yield def setup_mcp_server(app: FastAPI): """Setup MCP server with the FastAPI application. Mounts the MCP streamable HTTP app to handle all MCP protocol messages. The session manager must be running (via lifespan) for this to work. """ mcp._mcp_server.name = "selfmemory-mcp-server" # Configure streamable HTTP path mcp.settings.streamable_http_path = "/" # Mount MCP app - this handles SSE connections automatically app.mount("/mcp", mcp.streamable_http_app()) def main(): """Main entry point for the SelfMemory MCP server. Note: This is typically not used when running via api/main.py. The server is set up through setup_mcp_server() called from api/main.py. """ import uvicorn from fastapi import FastAPI logger.info("=" * 60) logger.info("🚀 Starting SelfMemory MCP Server (Standalone)") logger.info("=" * 60) logger.info( f"🔧 Mode: {'🏢 Platform' if SELFMEMORY_PLATFORM_MODE else '🏠 Open Source'}" ) if SELFMEMORY_PLATFORM_MODE: logger.info(f"📡 Core Server: {CORE_SERVER_HOST}") logger.info("🔒 Authentication: Bearer Token (API Key Required)") logger.info("👥 Multi-tenant: Project-scoped isolation") else: logger.info("📁 Storage: Local vector database") logger.info("🔓 Authentication: None (Direct local access)") logger.info("👤 Single-user: Default user context") logger.info(f"🌐 MCP Server: http://{MCP_SERVER_HOST}:{MCP_SERVER_PORT}") logger.info( "🛠️ Tools: add_memories, search_memory, list_memories, delete_all_memories" ) logger.info("=" * 60) # Create FastAPI app with lifespan app = FastAPI( title="SelfMemory MCP Server", description="Memory operations via Model Context Protocol", lifespan=lifespan, ) # Setup MCP server setup_mcp_server(app) # Run with uvicorn try: uvicorn.run(app, host=MCP_SERVER_HOST, port=MCP_SERVER_PORT, log_level="info") except KeyboardInterrupt: logger.info("Server stopped by user") except Exception as e: logger.error(f"Server error: {e}") raise if __name__ == "__main__": main()

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/shrijayan/SelfMemory'

If you have feedback or need assistance with the MCP directory API, please join our Discord server