Skip to main content
Glama

store_memory

Store new information with optional tags for enhanced semantic memory. Enables persistent storage and retrieval using ChromaDB and sentence transformers in the MCP Memory Service.

Instructions

Store new information with optional tags

Input Schema

NameRequiredDescriptionDefault
contentYes
metadataNo

Input Schema (JSON Schema)

{ "properties": { "content": { "type": "string" }, "metadata": { "properties": { "tags": { "items": { "type": "string" }, "type": "array" }, "type": { "type": "string" } }, "type": "object" } }, "required": [ "content" ], "type": "object" }

Implementation Reference

  • Core handler implementing the store_memory tool logic: normalizes tags, handles metadata merging, generates hashes, auto-splits long content, creates Memory objects, and delegates to storage.
    self, content: str, tags: Union[str, List[str], None] = None, memory_type: Optional[str] = None, metadata: Optional[Dict[str, Any]] = None, client_hostname: Optional[str] = None ) -> Union[StoreMemorySingleSuccess, StoreMemoryChunkedSuccess, StoreMemoryFailure]: """ Store a new memory with validation and content processing. Accepts tags in multiple formats for maximum flexibility: - None → [] - "tag1,tag2,tag3" → ["tag1", "tag2", "tag3"] - "single-tag" → ["single-tag"] - ["tag1", "tag2"] → ["tag1", "tag2"] Args: content: The memory content tags: Optional tags for the memory (string, comma-separated string, or list) memory_type: Optional memory type classification metadata: Optional additional metadata (can also contain tags) client_hostname: Optional client hostname for source tagging Returns: Dictionary with operation result """ try: # Normalize tags from parameter (handles all formats) final_tags = normalize_tags(tags) # Extract and normalize metadata.tags if present final_metadata = metadata or {} if metadata and "tags" in metadata: metadata_tags = normalize_tags(metadata.get("tags")) # Merge with parameter tags and remove duplicates final_tags = list(set(final_tags + metadata_tags)) # Apply hostname tagging if provided (for consistent source tracking) if client_hostname: source_tag = f"source:{client_hostname}" if source_tag not in final_tags: final_tags.append(source_tag) final_metadata["hostname"] = client_hostname # Generate content hash for deduplication content_hash = generate_content_hash(content) # Process content if auto-splitting is enabled and content exceeds max length max_length = self.storage.max_content_length if ENABLE_AUTO_SPLIT and max_length and len(content) > max_length: # Split content into chunks chunks = split_content( content, max_length=max_length, preserve_boundaries=CONTENT_PRESERVE_BOUNDARIES, overlap=CONTENT_SPLIT_OVERLAP ) stored_memories = [] for i, chunk in enumerate(chunks): chunk_hash = generate_content_hash(chunk) chunk_metadata = final_metadata.copy() chunk_metadata["chunk_index"] = i chunk_metadata["total_chunks"] = len(chunks) chunk_metadata["original_hash"] = content_hash memory = Memory( content=chunk, content_hash=chunk_hash, tags=final_tags, memory_type=memory_type, metadata=chunk_metadata ) success, message = await self.storage.store(memory) if success: stored_memories.append(self._format_memory_response(memory)) return { "success": True, "memories": stored_memories, "total_chunks": len(chunks), "original_hash": content_hash } else: # Store as single memory memory = Memory( content=content, content_hash=content_hash, tags=final_tags, memory_type=memory_type, metadata=final_metadata ) success, message = await self.storage.store(memory) if success: return { "success": True, "memory": self._format_memory_response(memory) } else: return { "success": False, "error": message } except ValueError as e: # Handle validation errors specifically logger.warning(f"Validation error storing memory: {e}") return { "success": False, "error": f"Invalid memory data: {str(e)}" } except ConnectionError as e: # Handle storage connectivity issues logger.error(f"Storage connection error: {e}") return { "success": False, "error": f"Storage connection failed: {str(e)}" } except Exception as e: # Handle unexpected errors logger.exception(f"Unexpected error storing memory: {e}") return { "success": False, "error": f"Failed to store memory: {str(e)}" }
  • MCP tools/list endpoint registration that lists all available tools including store_memory.
    elif request.method == "tools/list": response = MCPResponse( id=request.id, result={ "tools": [tool.model_dump() for tool in MCP_TOOLS] } ) return JSONResponse(content=response.model_dump(exclude_none=True))
  • Input schema definition for the store_memory MCP tool.
    MCPTool( name="store_memory", description="Store a new memory with optional tags, metadata, and client information", inputSchema={ "type": "object", "properties": { "content": {"type": "string", "description": "The memory content to store"}, "tags": {"type": "array", "items": {"type": "string"}, "description": "Optional tags for the memory"}, "memory_type": {"type": "string", "description": "Optional memory type (e.g., 'note', 'reminder', 'fact')"}, "metadata": {"type": "object", "description": "Additional metadata for the memory"}, "client_hostname": {"type": "string", "description": "Client machine hostname for source tracking"} }, "required": ["content"] } ),
  • Handler for store_memory in the HTTP MCP endpoint's tool call dispatcher.
    if tool_name == "store_memory": from mcp_memory_service.models.memory import Memory content = arguments.get("content") tags = arguments.get("tags", []) memory_type = arguments.get("memory_type") metadata = arguments.get("metadata", {}) client_hostname = arguments.get("client_hostname") # Ensure metadata is a dict if isinstance(metadata, str): try: metadata = json.loads(metadata) except: metadata = {} elif not isinstance(metadata, dict): metadata = {} # Add client_hostname to metadata if provided if client_hostname: metadata["client_hostname"] = client_hostname content_hash = generate_content_hash(content, metadata) memory = Memory( content=content, content_hash=content_hash, tags=tags, memory_type=memory_type, metadata=metadata ) success, message = await storage.store(memory) return { "success": success, "message": message, "content_hash": memory.content_hash if success else None }
  • MCP tool handler in native FastMCP server that delegates to MemoryService.store_memory and formats response for MCP protocol.
    @mcp.tool() async def store_memory( content: str, ctx: Context, tags: Union[str, List[str], None] = None, memory_type: str = "note", metadata: Optional[Dict[str, Any]] = None, client_hostname: Optional[str] = None ) -> Union[StoreMemorySuccess, StoreMemorySplitSuccess, StoreMemoryFailure]: """ Store a new memory with content and optional metadata. **IMPORTANT - Content Length Limits:** - Cloudflare backend: 800 characters max (BGE model 512 token limit) - SQLite-vec backend: No limit (local storage) - Hybrid backend: 800 characters max (constrained by Cloudflare sync) If content exceeds the backend's limit, it will be automatically split into multiple linked memory chunks with preserved context (50-char overlap). The splitting respects natural boundaries: paragraphs → sentences → words. Args: content: The content to store as memory tags: Optional tags to categorize the memory (accepts array or comma-separated string) memory_type: Type of memory (note, decision, task, reference) metadata: Additional metadata for the memory client_hostname: Client machine hostname for source tracking **Tag Formats - All Formats Supported:** Both the tags parameter AND metadata.tags accept ALL formats: - ✅ Array format: tags=["tag1", "tag2", "tag3"] - ✅ Comma-separated string: tags="tag1,tag2,tag3" - ✅ Single string: tags="single-tag" - ✅ In metadata: metadata={"tags": "tag1,tag2", "type": "note"} - ✅ In metadata (array): metadata={"tags": ["tag1", "tag2"], "type": "note"} All formats are automatically normalized internally. If tags are provided in both the tags parameter and metadata.tags, they will be merged (duplicates removed). Returns: Dictionary with: - success: Boolean indicating if storage succeeded - message: Status message - content_hash: Hash of original content (for single memory) - chunks_created: Number of chunks (if content was split) - chunk_hashes: List of content hashes (if content was split) """ # Delegate to shared MemoryService business logic memory_service = ctx.request_context.lifespan_context.memory_service result = await memory_service.store_memory( content=content, tags=tags, memory_type=memory_type, metadata=metadata, client_hostname=client_hostname ) # Transform MemoryService response to MCP tool format if not result.get("success"): return StoreMemoryFailure( success=False, message=result.get("error", "Failed to store memory") ) # Handle chunked response (multiple memories) if "memories" in result: chunk_hashes = [mem["content_hash"] for mem in result["memories"]] return StoreMemorySplitSuccess( success=True, message=f"Successfully stored {len(result['memories'])} memory chunks", chunks_created=result["total_chunks"], chunk_hashes=chunk_hashes ) # Handle single memory response memory_data = result["memory"] return StoreMemorySuccess( success=True, message="Memory stored successfully", content_hash=memory_data["content_hash"] )

Other Tools

Related Tools

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/doobidoo/mcp-memory-service'

If you have feedback or need assistance with the MCP directory API, please join our Discord server