Skip to main content
Glama

MCP Memory Service

mcp_server.py11.1 kB
#!/usr/bin/env python3 """ FastAPI MCP Server for Memory Service This module implements a native MCP server using the FastAPI MCP framework, replacing the Node.js HTTP-to-MCP bridge to resolve SSL connectivity issues and provide direct MCP protocol support. Features: - Native MCP protocol implementation using FastMCP - Direct integration with existing memory storage backends - Streamable HTTP transport for remote access - All 22 core memory operations (excluding dashboard tools) - SSL/HTTPS support with proper certificate handling """ import asyncio import logging from contextlib import asynccontextmanager from collections.abc import AsyncIterator from dataclasses import dataclass from typing import Dict, List, Optional, Any, Union, TypedDict try: from typing import NotRequired # Python 3.11+ except ImportError: from typing_extensions import NotRequired # Python 3.10 import os import sys import socket from pathlib import Path # Add src to path for imports current_dir = Path(__file__).parent src_dir = current_dir.parent.parent sys.path.insert(0, str(src_dir)) from mcp.server.fastmcp import FastMCP, Context from mcp.types import TextContent # Import existing memory service components from .config import ( STORAGE_BACKEND, CONSOLIDATION_ENABLED, EMBEDDING_MODEL_NAME, INCLUDE_HOSTNAME, SQLITE_VEC_PATH, CLOUDFLARE_API_TOKEN, CLOUDFLARE_ACCOUNT_ID, CLOUDFLARE_VECTORIZE_INDEX, CLOUDFLARE_D1_DATABASE_ID, CLOUDFLARE_R2_BUCKET, CLOUDFLARE_EMBEDDING_MODEL, CLOUDFLARE_LARGE_CONTENT_THRESHOLD, CLOUDFLARE_MAX_RETRIES, CLOUDFLARE_BASE_DELAY, HYBRID_SYNC_INTERVAL, HYBRID_BATCH_SIZE, HYBRID_MAX_QUEUE_SIZE, HYBRID_SYNC_ON_STARTUP, HYBRID_FALLBACK_TO_PRIMARY, CONTENT_PRESERVE_BOUNDARIES, CONTENT_SPLIT_OVERLAP, ENABLE_AUTO_SPLIT ) from .storage.base import MemoryStorage from .services.memory_service import MemoryService # Configure logging logging.basicConfig(level=logging.INFO) # Default to INFO level logger = logging.getLogger(__name__) @dataclass class MCPServerContext: """Application context for the MCP server with all required components.""" storage: MemoryStorage memory_service: MemoryService @asynccontextmanager async def mcp_server_lifespan(server: FastMCP) -> AsyncIterator[MCPServerContext]: """Manage MCP server lifecycle with proper resource initialization and cleanup.""" logger.info("Initializing MCP Memory Service components...") # Initialize storage backend using shared factory from .storage.factory import create_storage_instance storage = await create_storage_instance(SQLITE_VEC_PATH) # Initialize memory service with shared business logic memory_service = MemoryService(storage) try: yield MCPServerContext( storage=storage, memory_service=memory_service ) finally: # Cleanup on shutdown logger.info("Shutting down MCP Memory Service components...") if hasattr(storage, 'close'): await storage.close() # Create FastMCP server instance mcp = FastMCP( name="MCP Memory Service", host="0.0.0.0", # Listen on all interfaces for remote access port=8000, # Default port lifespan=mcp_server_lifespan, stateless_http=True # Enable stateless HTTP for Claude Code compatibility ) # ============================================================================= # TYPE DEFINITIONS # ============================================================================= class StoreMemorySuccess(TypedDict): """Return type for successful single memory storage.""" success: bool message: str content_hash: str class StoreMemorySplitSuccess(TypedDict): """Return type for successful chunked memory storage.""" success: bool message: str chunks_created: int chunk_hashes: List[str] class StoreMemoryFailure(TypedDict): """Return type for failed memory storage.""" success: bool message: str chunks_created: NotRequired[int] chunk_hashes: NotRequired[List[str]] # ============================================================================= # CORE MEMORY OPERATIONS # ============================================================================= @mcp.tool() async def store_memory( content: str, ctx: Context, tags: Union[str, List[str], None] = None, memory_type: str = "note", metadata: Optional[Dict[str, Any]] = None, client_hostname: Optional[str] = None ) -> Union[StoreMemorySuccess, StoreMemorySplitSuccess, StoreMemoryFailure]: """ Store a new memory with content and optional metadata. **IMPORTANT - Content Length Limits:** - Cloudflare backend: 800 characters max (BGE model 512 token limit) - SQLite-vec backend: No limit (local storage) - Hybrid backend: 800 characters max (constrained by Cloudflare sync) If content exceeds the backend's limit, it will be automatically split into multiple linked memory chunks with preserved context (50-char overlap). The splitting respects natural boundaries: paragraphs → sentences → words. Args: content: The content to store as memory tags: Optional tags to categorize the memory (accepts array or comma-separated string) memory_type: Type of memory (note, decision, task, reference) metadata: Additional metadata for the memory client_hostname: Client machine hostname for source tracking **Tag Formats - All Formats Supported:** Both the tags parameter AND metadata.tags accept ALL formats: - ✅ Array format: tags=["tag1", "tag2", "tag3"] - ✅ Comma-separated string: tags="tag1,tag2,tag3" - ✅ Single string: tags="single-tag" - ✅ In metadata: metadata={"tags": "tag1,tag2", "type": "note"} - ✅ In metadata (array): metadata={"tags": ["tag1", "tag2"], "type": "note"} All formats are automatically normalized internally. If tags are provided in both the tags parameter and metadata.tags, they will be merged (duplicates removed). Returns: Dictionary with: - success: Boolean indicating if storage succeeded - message: Status message - content_hash: Hash of original content (for single memory) - chunks_created: Number of chunks (if content was split) - chunk_hashes: List of content hashes (if content was split) """ # Delegate to shared MemoryService business logic memory_service = ctx.request_context.lifespan_context.memory_service result = await memory_service.store_memory( content=content, tags=tags, memory_type=memory_type, metadata=metadata, client_hostname=client_hostname ) # Transform MemoryService response to MCP tool format if not result.get("success"): return StoreMemoryFailure( success=False, message=result.get("error", "Failed to store memory") ) # Handle chunked response (multiple memories) if "memories" in result: chunk_hashes = [mem["content_hash"] for mem in result["memories"]] return StoreMemorySplitSuccess( success=True, message=f"Successfully stored {len(result['memories'])} memory chunks", chunks_created=result["total_chunks"], chunk_hashes=chunk_hashes ) # Handle single memory response memory_data = result["memory"] return StoreMemorySuccess( success=True, message="Memory stored successfully", content_hash=memory_data["content_hash"] ) @mcp.tool() async def retrieve_memory( query: str, ctx: Context, n_results: int = 5 ) -> Dict[str, Any]: """ Retrieve memories based on semantic similarity to a query. Args: query: Search query for semantic similarity n_results: Maximum number of results to return Returns: Dictionary with retrieved memories and metadata """ # Delegate to shared MemoryService business logic memory_service = ctx.request_context.lifespan_context.memory_service return await memory_service.retrieve_memories( query=query, n_results=n_results ) @mcp.tool() async def search_by_tag( tags: Union[str, List[str]], ctx: Context, match_all: bool = False ) -> Dict[str, Any]: """ Search memories by tags. Args: tags: Tag or list of tags to search for match_all: If True, memory must have ALL tags; if False, ANY tag Returns: Dictionary with matching memories """ # Delegate to shared MemoryService business logic memory_service = ctx.request_context.lifespan_context.memory_service return await memory_service.search_by_tag( tags=tags, match_all=match_all ) @mcp.tool() async def delete_memory( content_hash: str, ctx: Context ) -> Dict[str, Union[bool, str]]: """ Delete a specific memory by its content hash. Args: content_hash: Hash of the memory content to delete Returns: Dictionary with success status and message """ # Delegate to shared MemoryService business logic memory_service = ctx.request_context.lifespan_context.memory_service return await memory_service.delete_memory(content_hash) @mcp.tool() async def check_database_health(ctx: Context) -> Dict[str, Any]: """ Check the health and status of the memory database. Returns: Dictionary with health status and statistics """ # Delegate to shared MemoryService business logic memory_service = ctx.request_context.lifespan_context.memory_service return await memory_service.check_database_health() @mcp.tool() async def list_memories( ctx: Context, page: int = 1, page_size: int = 10, tag: Optional[str] = None, memory_type: Optional[str] = None ) -> Dict[str, Any]: """ List memories with pagination and optional filtering. Args: page: Page number (1-based) page_size: Number of memories per page tag: Filter by specific tag memory_type: Filter by memory type Returns: Dictionary with memories and pagination info """ # Delegate to shared MemoryService business logic memory_service = ctx.request_context.lifespan_context.memory_service return await memory_service.list_memories( page=page, page_size=page_size, tag=tag, memory_type=memory_type ) # ============================================================================= # MAIN ENTRY POINT # ============================================================================= def main(): """Main entry point for the FastAPI MCP server.""" # Configure for Claude Code integration port = int(os.getenv("MCP_SERVER_PORT", "8000")) host = os.getenv("MCP_SERVER_HOST", "0.0.0.0") logger.info(f"Starting MCP Memory Service FastAPI server on {host}:{port}") logger.info(f"Storage backend: {STORAGE_BACKEND}") # Run server with streamable HTTP transport mcp.run("streamable-http") if __name__ == "__main__": main()

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/doobidoo/mcp-memory-service'

If you have feedback or need assistance with the MCP directory API, please join our Discord server