main.py•34 kB
"""
Main Memory SDK class for SelfMemory.
This module provides the primary interface for local SelfMemory functionality,
with a zero-setup API for direct usage without authentication.
"""
import logging
import uuid
from datetime import datetime
from typing import Any
from selfmemory.configs import SelfMemoryConfig
from selfmemory.memory.base import MemoryBase
from selfmemory.memory.utils import (
audit_memory_access,
build_add_metadata,
build_search_filters,
validate_isolation_context,
)
from selfmemory.utils.factory import EmbeddingFactory, VectorStoreFactory
logger = logging.getLogger(__name__)
class SelfMemory(MemoryBase):
"""
User-scoped Memory class with automatic isolation.
This class provides zero-setup functionality with embedded vector stores
and automatic user isolation. Each Memory instance is scoped to specific
user identifiers, ensuring complete data separation between users.
Key Features:
- Automatic user isolation (users can only access their own memories)
- Zero-setup embedded vector stores and embeddings
- Compatible with multiple vector store providers (Qdrant, ChromaDB, etc.)
- Secure ownership validation for all operations
Examples:
Basic multi-user isolation:
>>> # Each user gets their own isolated memory space
>>> alice_memory = Memory(user_id="alice")
>>> bob_memory = Memory(user_id="bob")
>>> charlie_memory = Memory(user_id="charlie")
>>> # Users can add memories independently
>>> alice_memory.add("I love Italian food, especially pizza")
>>> bob_memory.add("I prefer Japanese cuisine like sushi")
>>> charlie_memory.add("Mexican food is my favorite")
>>> # Searches are automatically user-isolated
>>> alice_results = alice_memory.search("food") # Only gets Alice's memories
>>> bob_results = bob_memory.search("food") # Only gets Bob's memories
>>> charlie_results = charlie_memory.search("food") # Only gets Charlie's memories
Advanced usage with metadata and filtering:
>>> # Add memories with rich metadata
>>> alice_memory.add(
... "Had a great meeting with the product team",
... tags="work,meeting,product",
... people_mentioned="Sarah,Mike,Jennifer",
... topic_category="work"
... )
>>> # Search with advanced filtering (still user-isolated)
>>> work_memories = alice_memory.search(
... query="meeting",
... tags=["work", "meeting"],
... people_mentioned=["Sarah"],
... match_all_tags=True,
... limit=20
... )
User isolation in action:
>>> # Users cannot access each other's memories
>>> alice_memory.get_all() # Returns only Alice's memories
>>> bob_memory.get_all() # Returns only Bob's memories
>>> # Users can only delete their own memories
>>> alice_memory.delete_all() # Deletes only Alice's memories
>>> bob_memory.delete_all() # Deletes only Bob's memories
Custom configuration:
>>> # Use custom embedding and vector store providers
>>> config = {
... "embedding": {
... "provider": "ollama",
... "config": {"model": "nomic-embed-text"}
... },
... "vector_store": {
... "provider": "qdrant",
... "config": {"path": "./qdrant_data"}
... }
... }
>>> memory = Memory(user_id="user_123", config=config)
Production multi-tenant usage:
>>> # Different users in a multi-tenant application
>>> def get_user_memory(user_id: str) -> Memory:
... return Memory(user_id=user_id)
>>> # Each user gets isolated memory
>>> user_1_memory = get_user_memory("tenant_1_user_456")
>>> user_2_memory = get_user_memory("tenant_2_user_789")
>>> # Complete isolation - no cross-user data leakage
>>> user_1_memory.add("Confidential business data")
>>> user_2_memory.add("Personal notes")
>>> # Users can never see each other's data
"""
def __init__(self, config: SelfMemoryConfig | dict | None = None):
"""
Initialize Memory with configuration (selfmemory style - no user_id required).
Args:
config: Optional SelfMemoryConfig instance or config dictionary
Examples:
Basic memory instance:
>>> memory = Memory()
With custom config:
>>> config = {
... "embedding": {"provider": "ollama", "config": {...}},
... "vector_store": {"provider": "qdrant", "config": {...}}
... }
>>> memory = Memory(config=config)
Multi-user usage (user_id passed to methods):
>>> memory = Memory()
>>> memory.add("I love pizza", user_id="alice")
>>> memory.add("I love sushi", user_id="bob")
>>> alice_results = memory.search("pizza", user_id="alice") # Only Alice's memories
>>> bob_results = memory.search("sushi", user_id="bob") # Only Bob's memories
"""
# Handle different config types for clean API
if config is None:
self.config = SelfMemoryConfig()
elif isinstance(config, dict):
# Convert dict to SelfMemoryConfig for internal use
self.config = SelfMemoryConfig.from_dict(config)
else:
# Already an SelfMemoryConfig object
self.config = config
# Use factories with exact pattern - pass raw config
self.embedding_provider = EmbeddingFactory.create(
self.config.embedding.provider, self.config.embedding.config
)
self.vector_store = VectorStoreFactory.create(
self.config.vector_store.provider, self.config.vector_store.config
)
logger.info(
f"Memory SDK initialized: "
f"{self.config.embedding.provider} + {self.config.vector_store.provider}"
)
def add(
self,
memory_content: str,
*, # Enforce keyword-only arguments
user_id: str,
tags: str | None = None,
people_mentioned: str | None = None,
topic_category: str | None = None,
project_id: str | None = None,
organization_id: str | None = None,
metadata: dict[str, Any] | None = None,
) -> dict[str, Any]:
"""
Add a new memory to storage with multi-tenant isolation (selfmemory style).
Args:
memory_content: The memory text to store
user_id: Required user identifier for memory isolation
tags: Optional comma-separated tags
people_mentioned: Optional comma-separated people names
topic_category: Optional topic category
project_id: Optional project identifier for project-level isolation
organization_id: Optional organization identifier for org-level isolation
metadata: Optional additional metadata
Returns:
Dict: Result information including memory_id and status
Examples:
Basic user isolation (backward compatible):
>>> memory = Memory()
>>> memory.add("I love pizza", user_id="alice", tags="food,personal")
Multi-tenant isolation:
>>> memory.add("Meeting notes from project discussion",
... user_id="alice", project_id="proj_123",
... organization_id="org_456", tags="work,meeting",
... people_mentioned="Sarah,Mike")
"""
try:
# STRICT ISOLATION VALIDATION: Validate isolation context before proceeding
validate_isolation_context(
user_id=user_id,
project_id=project_id,
organization_id=organization_id,
operation="memory_add",
)
# Build memory-specific metadata
memory_metadata = {
"data": memory_content,
"tags": tags or "",
"people_mentioned": people_mentioned or "",
"topic_category": topic_category or "",
}
# Merge custom metadata if provided
if metadata:
memory_metadata.update(metadata)
# Build user-scoped metadata using specialized function for add operations
# Now supports multi-tenant isolation with project/organization context
storage_metadata = build_add_metadata(
user_id=user_id,
input_metadata=memory_metadata,
project_id=project_id,
organization_id=organization_id,
)
# Generate embedding using provider
embedding = self.embedding_provider.embed(memory_content)
# Generate unique ID
memory_id = str(uuid.uuid4())
# Insert using vector store provider with multi-tenant metadata
self.vector_store.insert(
vectors=[embedding], payloads=[storage_metadata], ids=[memory_id]
)
# AUDIT: Log successful memory addition
audit_memory_access(
operation="memory_add",
user_id=user_id,
project_id=project_id,
organization_id=organization_id,
memory_id=memory_id,
success=True,
)
context_info = f"user='{user_id}'"
if project_id and organization_id:
context_info += f", project='{project_id}', org='{organization_id}'"
logger.info(f"Memory added ({context_info}): {memory_content[:50]}...")
return {
"success": True,
"memory_id": memory_id,
"message": "Memory added successfully",
}
except Exception as e:
# AUDIT: Log failed memory addition
audit_memory_access(
operation="memory_add",
user_id=user_id,
project_id=project_id,
organization_id=organization_id,
success=False,
error=str(e),
)
context_info = f"user='{user_id}'"
if project_id and organization_id:
context_info += f", project='{project_id}', org='{organization_id}'"
logger.error(f"Memory.add() failed ({context_info}): {e}")
logger.error(f"Exception type: {type(e)}")
logger.error(f"Exception details: {str(e)}")
return {"success": False, "error": f"Memory addition failed: {str(e)}"}
def search(
self,
query: str = "",
*, # Enforce keyword-only arguments
user_id: str,
limit: int = 10,
tags: list[str] | None = None,
people_mentioned: list[str] | None = None,
topic_category: str | None = None,
temporal_filter: str | None = None,
threshold: float | None = None,
match_all_tags: bool = False,
include_metadata: bool = True,
sort_by: str = "relevance", # "relevance", "timestamp", "score"
project_id: str | None = None,
organization_id: str | None = None,
) -> dict[str, list[dict[str, Any]]]:
"""
Search memories with multi-tenant isolation (selfmemory style).
All searches are scoped to the specified user's memories, and optionally
to specific projects and organizations. Users cannot see or access
memories from other users, projects, or organizations.
Args:
query: Search query string (empty string returns all memories)
user_id: Required user identifier for memory isolation
limit: Maximum number of results
tags: Optional list of tags to filter by
people_mentioned: Optional list of people to filter by
topic_category: Optional topic category filter
temporal_filter: Optional temporal filter (e.g., "today", "this_week", "yesterday")
threshold: Optional minimum similarity score
match_all_tags: Whether to match all tags (AND) or any tag (OR)
include_metadata: Whether to include full metadata in results
sort_by: Sort results by "relevance" (default), "timestamp", or "score"
project_id: Optional project identifier for project-level isolation
organization_id: Optional organization identifier for org-level isolation
Returns:
Dict: Search results with "results" key containing list of memories within context
Examples:
Basic search (user-isolated, backward compatible):
>>> memory = Memory()
>>> results = memory.search("pizza", user_id="alice") # Only Alice's memories
Multi-tenant search:
>>> results = memory.search("pizza", user_id="alice",
... project_id="proj_123", organization_id="org_456")
Advanced filtering with multi-tenant context:
>>> results = memory.search(
... query="meetings",
... user_id="alice",
... project_id="proj_123",
... organization_id="org_456",
... tags=["work", "important"],
... people_mentioned=["John", "Sarah"],
... temporal_filter="this_week",
... match_all_tags=True,
... limit=20
... )
"""
try:
# STRICT ISOLATION VALIDATION: Validate isolation context before proceeding
validate_isolation_context(
user_id=user_id,
project_id=project_id,
organization_id=organization_id,
operation="memory_search",
)
context_info = f"user='{user_id}'"
if project_id and organization_id:
context_info += f", project='{project_id}', org='{organization_id}'"
# Log search operation
if not query or not query.strip():
logger.info(f"Retrieving all memories ({context_info}) (empty query)")
else:
logger.info(
f"Searching memories ({context_info}) with query: '{query[:50]}...'"
)
# Build additional filters from search parameters
additional_filters = {}
if topic_category:
additional_filters["topic_category"] = topic_category
if tags:
additional_filters["tags"] = tags
additional_filters["match_all_tags"] = match_all_tags
if people_mentioned:
additional_filters["people_mentioned"] = people_mentioned
if temporal_filter:
additional_filters["temporal_filter"] = temporal_filter
# Build multi-tenant filters using specialized function for search operations
# Now supports project/organization context
user_filters = build_search_filters(
user_id=user_id,
input_filters=additional_filters,
project_id=project_id,
organization_id=organization_id,
)
logger.info(
f"🔍 Memory.search: Built filters for isolation: {user_filters}"
)
# Generate embedding for search (vector stores handle empty queries)
query_embedding = self.embedding_provider.embed(
query.strip() if query else ""
)
# Execute semantic search with multi-tenant isolation
logger.info(
f"🔍 Memory.search: Calling vector_store.search with filters: {user_filters}"
)
results = self.vector_store.search(
query=query,
vectors=query_embedding,
limit=limit,
filters=user_filters, # Includes automatic user_id + project_id + org_id filtering
)
logger.info(
f"🔍 Memory.search: Received {len(results) if results else 0} raw results from vector store"
)
# Use helper method to format results consistently
formatted_results = self._format_results(
results, include_metadata, include_score=True
)
# Apply threshold filtering if specified
if threshold is not None:
formatted_results = [
result
for result in formatted_results
if result.get("score", 0) >= threshold
]
# Apply sorting using helper method
formatted_results = self._apply_sorting(formatted_results, sort_by)
# AUDIT: Log successful search operation
audit_memory_access(
operation="memory_search",
user_id=user_id,
project_id=project_id,
organization_id=organization_id,
memory_count=len(formatted_results),
success=True,
)
logger.info(
f"Search completed ({context_info}): {len(formatted_results)} results"
)
return {"results": formatted_results}
except Exception as e:
# AUDIT: Log failed search operation
audit_memory_access(
operation="memory_search",
user_id=user_id,
project_id=project_id,
organization_id=organization_id,
success=False,
error=str(e),
)
context_info = f"user='{user_id}'"
if project_id and organization_id:
context_info += f", project='{project_id}', org='{organization_id}'"
logger.error(f"Search failed ({context_info}): {e}")
return {"results": []}
def get_all(
self,
*, # Enforce keyword-only arguments
user_id: str,
limit: int = 100,
offset: int = 0,
project_id: str | None = None,
organization_id: str | None = None,
) -> dict[str, list[dict[str, Any]]]:
"""
Get all memories with multi-tenant isolation (selfmemory style).
Only returns memories belonging to the specified user, and optionally
filtered by project and organization. Users cannot see memories from
other users, projects, or organizations.
Args:
user_id: Required user identifier for memory isolation
limit: Maximum number of memories to return
offset: Number of memories to skip
project_id: Optional project identifier for project-level isolation
organization_id: Optional organization identifier for org-level isolation
Returns:
Dict: Memories within context with "results" key
Examples:
Basic user isolation (backward compatible):
>>> memory = Memory()
>>> all_memories = memory.get_all(user_id="alice") # Only Alice's memories
>>> recent_memories = memory.get_all(user_id="alice", limit=10)
Multi-tenant isolation:
>>> project_memories = memory.get_all(user_id="alice",
... project_id="proj_123",
... organization_id="org_456")
"""
try:
context_info = f"user='{user_id}'"
if project_id and organization_id:
context_info += f", project='{project_id}', org='{organization_id}'"
# Build multi-tenant filters using specialized function for search operations
# Now supports project/organization context
user_filters = build_search_filters(
user_id=user_id, project_id=project_id, organization_id=organization_id
)
# Use list() method with multi-tenant isolation filters
results = self.vector_store.list(filters=user_filters, limit=limit + offset)
# Use helper method to format results consistently
formatted_results = self._format_results(
results, include_metadata=True, include_score=False
)
# Apply offset by slicing results
paginated_results = formatted_results[offset : offset + limit]
logger.info(
f"Retrieved {len(paginated_results)} memories ({context_info}) (offset={offset}, limit={limit})"
)
return {"results": paginated_results}
except Exception as e:
context_info = f"user='{user_id}'"
if project_id and organization_id:
context_info += f", project='{project_id}', org='{organization_id}'"
logger.error(f"Failed to get memories ({context_info}): {e}")
return {"results": []}
def delete(self, memory_id: str) -> dict[str, Any]:
"""
Delete a specific memory (selfmemory style - no ownership validation needed).
Deletes the specified memory by ID. In the new selfmemory-style architecture,
ownership validation is handled at the API level, not in the Memory class.
Args:
memory_id: Memory identifier to delete
Returns:
Dict: Deletion result with success status and message
Examples:
>>> memory = Memory()
>>> result = memory.delete("memory_123")
"""
try:
# Simply delete the memory (selfmemory style - no ownership validation)
success = self.vector_store.delete(memory_id)
if success:
logger.info(f"Memory {memory_id} deleted successfully")
return {"success": True, "message": "Memory deleted successfully"}
return {
"success": False,
"error": "Memory deletion failed",
}
except Exception as e:
logger.error(f"Error deleting memory {memory_id}: {e}")
return {"success": False, "error": str(e)}
def delete_all(
self,
*, # Enforce keyword-only arguments
user_id: str,
project_id: str | None = None,
organization_id: str | None = None,
) -> dict[str, Any]:
"""
Delete all memories with multi-tenant isolation (selfmemory style).
Only deletes memories belonging to the specified user, and optionally
filtered by project and organization. Users cannot delete memories from
other users, projects, or organizations.
Args:
user_id: Required user identifier for memory isolation
project_id: Optional project identifier for project-level isolation
organization_id: Optional organization identifier for org-level isolation
Returns:
Dict: Deletion result with count of deleted memories within context
Examples:
Basic user isolation (backward compatible):
>>> memory = Memory()
>>> result = memory.delete_all(user_id="alice") # Only deletes Alice's memories
>>> print(result["deleted_count"]) # Number of Alice's memories deleted
Multi-tenant isolation:
>>> result = memory.delete_all(user_id="alice",
... project_id="proj_123",
... organization_id="org_456")
>>> print(result["deleted_count"]) # Number deleted within project context
"""
try:
context_info = f"user='{user_id}'"
if project_id and organization_id:
context_info += f", project='{project_id}', org='{organization_id}'"
# Build multi-tenant filters using specialized function for search operations
# Now supports project/organization context
user_filters = build_search_filters(
user_id=user_id, project_id=project_id, organization_id=organization_id
)
# Get memories within context only (for counting)
user_memories = self.vector_store.list(filters=user_filters, limit=10000)
# Use helper method to extract points from results
points = self._extract_points_from_results(user_memories)
deleted_count = 0
# Delete only memories within the specified context
for point in points:
memory_id = self._extract_memory_id(point)
if memory_id and self.vector_store.delete(memory_id):
deleted_count += 1
logger.info(f"Deleted {deleted_count} memories ({context_info})")
return {
"success": True,
"deleted_count": deleted_count,
"message": f"Deleted {deleted_count} memories ({context_info})",
}
except Exception as e:
context_info = f"user='{user_id}'"
if project_id and organization_id:
context_info += f", project='{project_id}', org='{organization_id}'"
logger.error(f"Failed to delete all memories ({context_info}): {e}")
return {"success": False, "error": str(e)}
def _format_results(
self, results, include_metadata: bool = True, include_score: bool = True
) -> list[dict[str, Any]]:
"""
Format results consistently across all methods (selfmemory style).
This helper method standardizes result formatting from different vector stores,
ensuring consistent output format regardless of the underlying storage provider.
Args:
results: Raw results from vector store operations
include_metadata: Whether to include full metadata in results
include_score: Whether to include similarity scores
Returns:
List of formatted result dictionaries
"""
formatted_results = []
# Extract points from results using helper method
points = self._extract_points_from_results(results)
for point in points:
# Build base result structure
result = {
"id": self._extract_memory_id(point),
"content": self._extract_content(point),
}
# Add score if requested and available
if include_score:
result["score"] = getattr(point, "score", 1.0)
# Add metadata if requested
if include_metadata:
result["metadata"] = self._extract_metadata(point)
formatted_results.append(result)
return formatted_results
def _extract_points_from_results(self, results) -> list:
"""
Extract points from vector store results (handles different formats).
Different vector stores return results in different formats:
- Some return tuples: (points, metadata)
- Some return lists directly: [point1, point2, ...]
- Some return single objects
Args:
results: Raw results from vector store
Returns:
List of point objects
"""
if isinstance(results, tuple) and len(results) > 0:
# Handle tuple format (e.g., from Qdrant list operations)
return results[0] if isinstance(results[0], list) else [results[0]]
if isinstance(results, list):
# Handle direct list format
return results
if results is not None:
# Handle single result
return [results]
# Handle empty/None results
return []
def _extract_memory_id(self, point) -> str:
"""
Extract memory ID from a point object (handles different formats).
Args:
point: Point object from vector store
Returns:
Memory ID as string
"""
if hasattr(point, "id"):
return str(point.id)
if isinstance(point, dict):
return str(point.get("id", ""))
return ""
def _extract_content(self, point) -> str:
"""
Extract content/data from a point object (handles different formats).
Args:
point: Point object from vector store
Returns:
Memory content as string
"""
if hasattr(point, "payload"):
return point.payload.get("data", "")
if isinstance(point, dict):
return point.get("data", point.get("content", ""))
return ""
def _extract_metadata(self, point) -> dict[str, Any]:
"""
Extract metadata from a point object (handles different formats).
Args:
point: Point object from vector store
Returns:
Metadata dictionary
"""
if hasattr(point, "payload"):
return point.payload
if isinstance(point, dict):
return point
return {}
def _apply_sorting(
self, results: list[dict[str, Any]], sort_by: str
) -> list[dict[str, Any]]:
"""
Apply sorting to formatted results (selfmemory style).
Args:
results: List of formatted result dictionaries
sort_by: Sort method ("relevance", "timestamp", "score")
Returns:
Sorted list of results
"""
if not results:
return results
if sort_by == "timestamp":
return sorted(
results,
key=lambda x: x.get("metadata", {}).get("created_at", ""),
reverse=True,
)
if sort_by == "score":
return sorted(results, key=lambda x: x.get("score", 0), reverse=True)
# "relevance" is default - already sorted by vector store
return results
def get_stats(self) -> dict[str, Any]:
"""
Get statistics for memories.
Returns:
Dict: Statistics including memory count, provider info, etc.
"""
try:
memory_count = (
self.vector_store.count() if hasattr(self.vector_store, "count") else 0
)
# Get embedding model from config
embedding_model = "unknown"
if self.config.embedding.config and hasattr(
self.config.embedding.config, "model"
):
embedding_model = self.config.embedding.config.model
return {
"embedding_provider": self.config.embedding.provider,
"embedding_model": embedding_model,
"vector_store": self.config.vector_store.provider,
"memory_count": memory_count,
"status": "healthy",
}
except Exception as e:
logger.error(f"Failed to get stats: {e}")
return {"error": str(e)}
def health_check(self) -> dict[str, Any]:
"""
Perform health check on all components.
Returns:
Dict: Health check results
"""
# Get embedding model from config
embedding_model = "unknown"
if self.config.embedding.config and hasattr(
self.config.embedding.config, "model"
):
embedding_model = self.config.embedding.config.model
health = {
"status": "healthy",
"storage_type": self.config.vector_store.provider,
"embedding_model": embedding_model,
"embedding_provider": self.config.embedding.provider,
"timestamp": datetime.now().isoformat(),
}
try:
# Test vector store connectivity
if hasattr(self.vector_store, "health_check"):
vector_health = self.vector_store.health_check()
health.update(vector_health)
elif hasattr(self.vector_store, "count"):
count = self.vector_store.count()
health["memory_count"] = count
health["vector_store_status"] = "connected"
else:
health["vector_store_status"] = "available"
# Test embedding provider
if hasattr(self.embedding_provider, "health_check"):
embedding_health = self.embedding_provider.health_check()
health.update(embedding_health)
else:
health["embedding_provider_status"] = "available"
logger.info("Health check passed")
except Exception as e:
health["status"] = "unhealthy"
health["error"] = str(e)
logger.error(f"Health check failed: {e}")
return health
def close(self) -> None:
"""
Close connections and cleanup resources.
Should be called when Memory instance is no longer needed.
"""
try:
# Clean up vector store and embedding providers
if hasattr(self, "vector_store") and hasattr(self.vector_store, "close"):
self.vector_store.close()
if hasattr(self, "embedding_provider") and hasattr(
self.embedding_provider, "close"
):
self.embedding_provider.close()
logger.info("Memory SDK connections closed")
except Exception as e:
logger.error(f"Error closing connections: {e}")
def __repr__(self) -> str:
"""String representation of Memory instance."""
return f"Memory(embedding={self.config.embedding.provider}, db={self.config.vector_store.provider})"