Skip to main content
Glama
main.py34 kB
""" Main Memory SDK class for SelfMemory. This module provides the primary interface for local SelfMemory functionality, with a zero-setup API for direct usage without authentication. """ import logging import uuid from datetime import datetime from typing import Any from selfmemory.configs import SelfMemoryConfig from selfmemory.memory.base import MemoryBase from selfmemory.memory.utils import ( audit_memory_access, build_add_metadata, build_search_filters, validate_isolation_context, ) from selfmemory.utils.factory import EmbeddingFactory, VectorStoreFactory logger = logging.getLogger(__name__) class SelfMemory(MemoryBase): """ User-scoped Memory class with automatic isolation. This class provides zero-setup functionality with embedded vector stores and automatic user isolation. Each Memory instance is scoped to specific user identifiers, ensuring complete data separation between users. Key Features: - Automatic user isolation (users can only access their own memories) - Zero-setup embedded vector stores and embeddings - Compatible with multiple vector store providers (Qdrant, ChromaDB, etc.) - Secure ownership validation for all operations Examples: Basic multi-user isolation: >>> # Each user gets their own isolated memory space >>> alice_memory = Memory(user_id="alice") >>> bob_memory = Memory(user_id="bob") >>> charlie_memory = Memory(user_id="charlie") >>> # Users can add memories independently >>> alice_memory.add("I love Italian food, especially pizza") >>> bob_memory.add("I prefer Japanese cuisine like sushi") >>> charlie_memory.add("Mexican food is my favorite") >>> # Searches are automatically user-isolated >>> alice_results = alice_memory.search("food") # Only gets Alice's memories >>> bob_results = bob_memory.search("food") # Only gets Bob's memories >>> charlie_results = charlie_memory.search("food") # Only gets Charlie's memories Advanced usage with metadata and filtering: >>> # Add memories with rich metadata >>> alice_memory.add( ... "Had a great meeting with the product team", ... tags="work,meeting,product", ... people_mentioned="Sarah,Mike,Jennifer", ... topic_category="work" ... ) >>> # Search with advanced filtering (still user-isolated) >>> work_memories = alice_memory.search( ... query="meeting", ... tags=["work", "meeting"], ... people_mentioned=["Sarah"], ... match_all_tags=True, ... limit=20 ... ) User isolation in action: >>> # Users cannot access each other's memories >>> alice_memory.get_all() # Returns only Alice's memories >>> bob_memory.get_all() # Returns only Bob's memories >>> # Users can only delete their own memories >>> alice_memory.delete_all() # Deletes only Alice's memories >>> bob_memory.delete_all() # Deletes only Bob's memories Custom configuration: >>> # Use custom embedding and vector store providers >>> config = { ... "embedding": { ... "provider": "ollama", ... "config": {"model": "nomic-embed-text"} ... }, ... "vector_store": { ... "provider": "qdrant", ... "config": {"path": "./qdrant_data"} ... } ... } >>> memory = Memory(user_id="user_123", config=config) Production multi-tenant usage: >>> # Different users in a multi-tenant application >>> def get_user_memory(user_id: str) -> Memory: ... return Memory(user_id=user_id) >>> # Each user gets isolated memory >>> user_1_memory = get_user_memory("tenant_1_user_456") >>> user_2_memory = get_user_memory("tenant_2_user_789") >>> # Complete isolation - no cross-user data leakage >>> user_1_memory.add("Confidential business data") >>> user_2_memory.add("Personal notes") >>> # Users can never see each other's data """ def __init__(self, config: SelfMemoryConfig | dict | None = None): """ Initialize Memory with configuration (selfmemory style - no user_id required). Args: config: Optional SelfMemoryConfig instance or config dictionary Examples: Basic memory instance: >>> memory = Memory() With custom config: >>> config = { ... "embedding": {"provider": "ollama", "config": {...}}, ... "vector_store": {"provider": "qdrant", "config": {...}} ... } >>> memory = Memory(config=config) Multi-user usage (user_id passed to methods): >>> memory = Memory() >>> memory.add("I love pizza", user_id="alice") >>> memory.add("I love sushi", user_id="bob") >>> alice_results = memory.search("pizza", user_id="alice") # Only Alice's memories >>> bob_results = memory.search("sushi", user_id="bob") # Only Bob's memories """ # Handle different config types for clean API if config is None: self.config = SelfMemoryConfig() elif isinstance(config, dict): # Convert dict to SelfMemoryConfig for internal use self.config = SelfMemoryConfig.from_dict(config) else: # Already an SelfMemoryConfig object self.config = config # Use factories with exact pattern - pass raw config self.embedding_provider = EmbeddingFactory.create( self.config.embedding.provider, self.config.embedding.config ) self.vector_store = VectorStoreFactory.create( self.config.vector_store.provider, self.config.vector_store.config ) logger.info( f"Memory SDK initialized: " f"{self.config.embedding.provider} + {self.config.vector_store.provider}" ) def add( self, memory_content: str, *, # Enforce keyword-only arguments user_id: str, tags: str | None = None, people_mentioned: str | None = None, topic_category: str | None = None, project_id: str | None = None, organization_id: str | None = None, metadata: dict[str, Any] | None = None, ) -> dict[str, Any]: """ Add a new memory to storage with multi-tenant isolation (selfmemory style). Args: memory_content: The memory text to store user_id: Required user identifier for memory isolation tags: Optional comma-separated tags people_mentioned: Optional comma-separated people names topic_category: Optional topic category project_id: Optional project identifier for project-level isolation organization_id: Optional organization identifier for org-level isolation metadata: Optional additional metadata Returns: Dict: Result information including memory_id and status Examples: Basic user isolation (backward compatible): >>> memory = Memory() >>> memory.add("I love pizza", user_id="alice", tags="food,personal") Multi-tenant isolation: >>> memory.add("Meeting notes from project discussion", ... user_id="alice", project_id="proj_123", ... organization_id="org_456", tags="work,meeting", ... people_mentioned="Sarah,Mike") """ try: # STRICT ISOLATION VALIDATION: Validate isolation context before proceeding validate_isolation_context( user_id=user_id, project_id=project_id, organization_id=organization_id, operation="memory_add", ) # Build memory-specific metadata memory_metadata = { "data": memory_content, "tags": tags or "", "people_mentioned": people_mentioned or "", "topic_category": topic_category or "", } # Merge custom metadata if provided if metadata: memory_metadata.update(metadata) # Build user-scoped metadata using specialized function for add operations # Now supports multi-tenant isolation with project/organization context storage_metadata = build_add_metadata( user_id=user_id, input_metadata=memory_metadata, project_id=project_id, organization_id=organization_id, ) # Generate embedding using provider embedding = self.embedding_provider.embed(memory_content) # Generate unique ID memory_id = str(uuid.uuid4()) # Insert using vector store provider with multi-tenant metadata self.vector_store.insert( vectors=[embedding], payloads=[storage_metadata], ids=[memory_id] ) # AUDIT: Log successful memory addition audit_memory_access( operation="memory_add", user_id=user_id, project_id=project_id, organization_id=organization_id, memory_id=memory_id, success=True, ) context_info = f"user='{user_id}'" if project_id and organization_id: context_info += f", project='{project_id}', org='{organization_id}'" logger.info(f"Memory added ({context_info}): {memory_content[:50]}...") return { "success": True, "memory_id": memory_id, "message": "Memory added successfully", } except Exception as e: # AUDIT: Log failed memory addition audit_memory_access( operation="memory_add", user_id=user_id, project_id=project_id, organization_id=organization_id, success=False, error=str(e), ) context_info = f"user='{user_id}'" if project_id and organization_id: context_info += f", project='{project_id}', org='{organization_id}'" logger.error(f"Memory.add() failed ({context_info}): {e}") logger.error(f"Exception type: {type(e)}") logger.error(f"Exception details: {str(e)}") return {"success": False, "error": f"Memory addition failed: {str(e)}"} def search( self, query: str = "", *, # Enforce keyword-only arguments user_id: str, limit: int = 10, tags: list[str] | None = None, people_mentioned: list[str] | None = None, topic_category: str | None = None, temporal_filter: str | None = None, threshold: float | None = None, match_all_tags: bool = False, include_metadata: bool = True, sort_by: str = "relevance", # "relevance", "timestamp", "score" project_id: str | None = None, organization_id: str | None = None, ) -> dict[str, list[dict[str, Any]]]: """ Search memories with multi-tenant isolation (selfmemory style). All searches are scoped to the specified user's memories, and optionally to specific projects and organizations. Users cannot see or access memories from other users, projects, or organizations. Args: query: Search query string (empty string returns all memories) user_id: Required user identifier for memory isolation limit: Maximum number of results tags: Optional list of tags to filter by people_mentioned: Optional list of people to filter by topic_category: Optional topic category filter temporal_filter: Optional temporal filter (e.g., "today", "this_week", "yesterday") threshold: Optional minimum similarity score match_all_tags: Whether to match all tags (AND) or any tag (OR) include_metadata: Whether to include full metadata in results sort_by: Sort results by "relevance" (default), "timestamp", or "score" project_id: Optional project identifier for project-level isolation organization_id: Optional organization identifier for org-level isolation Returns: Dict: Search results with "results" key containing list of memories within context Examples: Basic search (user-isolated, backward compatible): >>> memory = Memory() >>> results = memory.search("pizza", user_id="alice") # Only Alice's memories Multi-tenant search: >>> results = memory.search("pizza", user_id="alice", ... project_id="proj_123", organization_id="org_456") Advanced filtering with multi-tenant context: >>> results = memory.search( ... query="meetings", ... user_id="alice", ... project_id="proj_123", ... organization_id="org_456", ... tags=["work", "important"], ... people_mentioned=["John", "Sarah"], ... temporal_filter="this_week", ... match_all_tags=True, ... limit=20 ... ) """ try: # STRICT ISOLATION VALIDATION: Validate isolation context before proceeding validate_isolation_context( user_id=user_id, project_id=project_id, organization_id=organization_id, operation="memory_search", ) context_info = f"user='{user_id}'" if project_id and organization_id: context_info += f", project='{project_id}', org='{organization_id}'" # Log search operation if not query or not query.strip(): logger.info(f"Retrieving all memories ({context_info}) (empty query)") else: logger.info( f"Searching memories ({context_info}) with query: '{query[:50]}...'" ) # Build additional filters from search parameters additional_filters = {} if topic_category: additional_filters["topic_category"] = topic_category if tags: additional_filters["tags"] = tags additional_filters["match_all_tags"] = match_all_tags if people_mentioned: additional_filters["people_mentioned"] = people_mentioned if temporal_filter: additional_filters["temporal_filter"] = temporal_filter # Build multi-tenant filters using specialized function for search operations # Now supports project/organization context user_filters = build_search_filters( user_id=user_id, input_filters=additional_filters, project_id=project_id, organization_id=organization_id, ) logger.info( f"🔍 Memory.search: Built filters for isolation: {user_filters}" ) # Generate embedding for search (vector stores handle empty queries) query_embedding = self.embedding_provider.embed( query.strip() if query else "" ) # Execute semantic search with multi-tenant isolation logger.info( f"🔍 Memory.search: Calling vector_store.search with filters: {user_filters}" ) results = self.vector_store.search( query=query, vectors=query_embedding, limit=limit, filters=user_filters, # Includes automatic user_id + project_id + org_id filtering ) logger.info( f"🔍 Memory.search: Received {len(results) if results else 0} raw results from vector store" ) # Use helper method to format results consistently formatted_results = self._format_results( results, include_metadata, include_score=True ) # Apply threshold filtering if specified if threshold is not None: formatted_results = [ result for result in formatted_results if result.get("score", 0) >= threshold ] # Apply sorting using helper method formatted_results = self._apply_sorting(formatted_results, sort_by) # AUDIT: Log successful search operation audit_memory_access( operation="memory_search", user_id=user_id, project_id=project_id, organization_id=organization_id, memory_count=len(formatted_results), success=True, ) logger.info( f"Search completed ({context_info}): {len(formatted_results)} results" ) return {"results": formatted_results} except Exception as e: # AUDIT: Log failed search operation audit_memory_access( operation="memory_search", user_id=user_id, project_id=project_id, organization_id=organization_id, success=False, error=str(e), ) context_info = f"user='{user_id}'" if project_id and organization_id: context_info += f", project='{project_id}', org='{organization_id}'" logger.error(f"Search failed ({context_info}): {e}") return {"results": []} def get_all( self, *, # Enforce keyword-only arguments user_id: str, limit: int = 100, offset: int = 0, project_id: str | None = None, organization_id: str | None = None, ) -> dict[str, list[dict[str, Any]]]: """ Get all memories with multi-tenant isolation (selfmemory style). Only returns memories belonging to the specified user, and optionally filtered by project and organization. Users cannot see memories from other users, projects, or organizations. Args: user_id: Required user identifier for memory isolation limit: Maximum number of memories to return offset: Number of memories to skip project_id: Optional project identifier for project-level isolation organization_id: Optional organization identifier for org-level isolation Returns: Dict: Memories within context with "results" key Examples: Basic user isolation (backward compatible): >>> memory = Memory() >>> all_memories = memory.get_all(user_id="alice") # Only Alice's memories >>> recent_memories = memory.get_all(user_id="alice", limit=10) Multi-tenant isolation: >>> project_memories = memory.get_all(user_id="alice", ... project_id="proj_123", ... organization_id="org_456") """ try: context_info = f"user='{user_id}'" if project_id and organization_id: context_info += f", project='{project_id}', org='{organization_id}'" # Build multi-tenant filters using specialized function for search operations # Now supports project/organization context user_filters = build_search_filters( user_id=user_id, project_id=project_id, organization_id=organization_id ) # Use list() method with multi-tenant isolation filters results = self.vector_store.list(filters=user_filters, limit=limit + offset) # Use helper method to format results consistently formatted_results = self._format_results( results, include_metadata=True, include_score=False ) # Apply offset by slicing results paginated_results = formatted_results[offset : offset + limit] logger.info( f"Retrieved {len(paginated_results)} memories ({context_info}) (offset={offset}, limit={limit})" ) return {"results": paginated_results} except Exception as e: context_info = f"user='{user_id}'" if project_id and organization_id: context_info += f", project='{project_id}', org='{organization_id}'" logger.error(f"Failed to get memories ({context_info}): {e}") return {"results": []} def delete(self, memory_id: str) -> dict[str, Any]: """ Delete a specific memory (selfmemory style - no ownership validation needed). Deletes the specified memory by ID. In the new selfmemory-style architecture, ownership validation is handled at the API level, not in the Memory class. Args: memory_id: Memory identifier to delete Returns: Dict: Deletion result with success status and message Examples: >>> memory = Memory() >>> result = memory.delete("memory_123") """ try: # Simply delete the memory (selfmemory style - no ownership validation) success = self.vector_store.delete(memory_id) if success: logger.info(f"Memory {memory_id} deleted successfully") return {"success": True, "message": "Memory deleted successfully"} return { "success": False, "error": "Memory deletion failed", } except Exception as e: logger.error(f"Error deleting memory {memory_id}: {e}") return {"success": False, "error": str(e)} def delete_all( self, *, # Enforce keyword-only arguments user_id: str, project_id: str | None = None, organization_id: str | None = None, ) -> dict[str, Any]: """ Delete all memories with multi-tenant isolation (selfmemory style). Only deletes memories belonging to the specified user, and optionally filtered by project and organization. Users cannot delete memories from other users, projects, or organizations. Args: user_id: Required user identifier for memory isolation project_id: Optional project identifier for project-level isolation organization_id: Optional organization identifier for org-level isolation Returns: Dict: Deletion result with count of deleted memories within context Examples: Basic user isolation (backward compatible): >>> memory = Memory() >>> result = memory.delete_all(user_id="alice") # Only deletes Alice's memories >>> print(result["deleted_count"]) # Number of Alice's memories deleted Multi-tenant isolation: >>> result = memory.delete_all(user_id="alice", ... project_id="proj_123", ... organization_id="org_456") >>> print(result["deleted_count"]) # Number deleted within project context """ try: context_info = f"user='{user_id}'" if project_id and organization_id: context_info += f", project='{project_id}', org='{organization_id}'" # Build multi-tenant filters using specialized function for search operations # Now supports project/organization context user_filters = build_search_filters( user_id=user_id, project_id=project_id, organization_id=organization_id ) # Get memories within context only (for counting) user_memories = self.vector_store.list(filters=user_filters, limit=10000) # Use helper method to extract points from results points = self._extract_points_from_results(user_memories) deleted_count = 0 # Delete only memories within the specified context for point in points: memory_id = self._extract_memory_id(point) if memory_id and self.vector_store.delete(memory_id): deleted_count += 1 logger.info(f"Deleted {deleted_count} memories ({context_info})") return { "success": True, "deleted_count": deleted_count, "message": f"Deleted {deleted_count} memories ({context_info})", } except Exception as e: context_info = f"user='{user_id}'" if project_id and organization_id: context_info += f", project='{project_id}', org='{organization_id}'" logger.error(f"Failed to delete all memories ({context_info}): {e}") return {"success": False, "error": str(e)} def _format_results( self, results, include_metadata: bool = True, include_score: bool = True ) -> list[dict[str, Any]]: """ Format results consistently across all methods (selfmemory style). This helper method standardizes result formatting from different vector stores, ensuring consistent output format regardless of the underlying storage provider. Args: results: Raw results from vector store operations include_metadata: Whether to include full metadata in results include_score: Whether to include similarity scores Returns: List of formatted result dictionaries """ formatted_results = [] # Extract points from results using helper method points = self._extract_points_from_results(results) for point in points: # Build base result structure result = { "id": self._extract_memory_id(point), "content": self._extract_content(point), } # Add score if requested and available if include_score: result["score"] = getattr(point, "score", 1.0) # Add metadata if requested if include_metadata: result["metadata"] = self._extract_metadata(point) formatted_results.append(result) return formatted_results def _extract_points_from_results(self, results) -> list: """ Extract points from vector store results (handles different formats). Different vector stores return results in different formats: - Some return tuples: (points, metadata) - Some return lists directly: [point1, point2, ...] - Some return single objects Args: results: Raw results from vector store Returns: List of point objects """ if isinstance(results, tuple) and len(results) > 0: # Handle tuple format (e.g., from Qdrant list operations) return results[0] if isinstance(results[0], list) else [results[0]] if isinstance(results, list): # Handle direct list format return results if results is not None: # Handle single result return [results] # Handle empty/None results return [] def _extract_memory_id(self, point) -> str: """ Extract memory ID from a point object (handles different formats). Args: point: Point object from vector store Returns: Memory ID as string """ if hasattr(point, "id"): return str(point.id) if isinstance(point, dict): return str(point.get("id", "")) return "" def _extract_content(self, point) -> str: """ Extract content/data from a point object (handles different formats). Args: point: Point object from vector store Returns: Memory content as string """ if hasattr(point, "payload"): return point.payload.get("data", "") if isinstance(point, dict): return point.get("data", point.get("content", "")) return "" def _extract_metadata(self, point) -> dict[str, Any]: """ Extract metadata from a point object (handles different formats). Args: point: Point object from vector store Returns: Metadata dictionary """ if hasattr(point, "payload"): return point.payload if isinstance(point, dict): return point return {} def _apply_sorting( self, results: list[dict[str, Any]], sort_by: str ) -> list[dict[str, Any]]: """ Apply sorting to formatted results (selfmemory style). Args: results: List of formatted result dictionaries sort_by: Sort method ("relevance", "timestamp", "score") Returns: Sorted list of results """ if not results: return results if sort_by == "timestamp": return sorted( results, key=lambda x: x.get("metadata", {}).get("created_at", ""), reverse=True, ) if sort_by == "score": return sorted(results, key=lambda x: x.get("score", 0), reverse=True) # "relevance" is default - already sorted by vector store return results def get_stats(self) -> dict[str, Any]: """ Get statistics for memories. Returns: Dict: Statistics including memory count, provider info, etc. """ try: memory_count = ( self.vector_store.count() if hasattr(self.vector_store, "count") else 0 ) # Get embedding model from config embedding_model = "unknown" if self.config.embedding.config and hasattr( self.config.embedding.config, "model" ): embedding_model = self.config.embedding.config.model return { "embedding_provider": self.config.embedding.provider, "embedding_model": embedding_model, "vector_store": self.config.vector_store.provider, "memory_count": memory_count, "status": "healthy", } except Exception as e: logger.error(f"Failed to get stats: {e}") return {"error": str(e)} def health_check(self) -> dict[str, Any]: """ Perform health check on all components. Returns: Dict: Health check results """ # Get embedding model from config embedding_model = "unknown" if self.config.embedding.config and hasattr( self.config.embedding.config, "model" ): embedding_model = self.config.embedding.config.model health = { "status": "healthy", "storage_type": self.config.vector_store.provider, "embedding_model": embedding_model, "embedding_provider": self.config.embedding.provider, "timestamp": datetime.now().isoformat(), } try: # Test vector store connectivity if hasattr(self.vector_store, "health_check"): vector_health = self.vector_store.health_check() health.update(vector_health) elif hasattr(self.vector_store, "count"): count = self.vector_store.count() health["memory_count"] = count health["vector_store_status"] = "connected" else: health["vector_store_status"] = "available" # Test embedding provider if hasattr(self.embedding_provider, "health_check"): embedding_health = self.embedding_provider.health_check() health.update(embedding_health) else: health["embedding_provider_status"] = "available" logger.info("Health check passed") except Exception as e: health["status"] = "unhealthy" health["error"] = str(e) logger.error(f"Health check failed: {e}") return health def close(self) -> None: """ Close connections and cleanup resources. Should be called when Memory instance is no longer needed. """ try: # Clean up vector store and embedding providers if hasattr(self, "vector_store") and hasattr(self.vector_store, "close"): self.vector_store.close() if hasattr(self, "embedding_provider") and hasattr( self.embedding_provider, "close" ): self.embedding_provider.close() logger.info("Memory SDK connections closed") except Exception as e: logger.error(f"Error closing connections: {e}") def __repr__(self) -> str: """String representation of Memory instance.""" return f"Memory(embedding={self.config.embedding.provider}, db={self.config.vector_store.provider})"

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/shrijayan/SelfMemory'

If you have feedback or need assistance with the MCP directory API, please join our Discord server