"""
Vector store using ChromaDB for local storage.
Supports efficient similarity search and filtering.
"""
import chromadb
from chromadb.config import Settings as ChromaSettings
from typing import List, Dict, Any, Optional
import logging
from pathlib import Path
from app.config import settings
from app.rag.embeddings import get_embedding_service
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class VectorStore:
"""
Vector store using ChromaDB for persistent local storage.
Supports CRUD operations and similarity search.
"""
def __init__(
self,
persist_directory: Path = settings.VECTORDB_DIR,
collection_name: str = settings.COLLECTION_NAME
):
"""
Initialize the vector store.
Args:
persist_directory: Directory to persist the database
collection_name: Name of the collection to use
"""
self.persist_directory = persist_directory
self.collection_name = collection_name
# Initialize ChromaDB client with persistence
self.client = chromadb.PersistentClient(
path=str(persist_directory),
settings=ChromaSettings(
anonymized_telemetry=False,
allow_reset=True
)
)
# Get or create collection
self.collection = self.client.get_or_create_collection(
name=collection_name,
metadata={"hnsw:space": "cosine"} # Use cosine similarity
)
logger.info(f"Vector store initialized. Collection: {collection_name}, Items: {self.collection.count()}")
def add_documents(
self,
documents: List[str],
embeddings: List[List[float]],
metadatas: List[Dict[str, Any]],
ids: List[str]
) -> None:
"""
Add documents to the vector store.
Args:
documents: List of document texts
embeddings: List of embedding vectors
metadatas: List of metadata dictionaries
ids: List of unique document IDs
"""
if not documents:
logger.warning("No documents to add")
return
# ChromaDB doesn't accept None values in metadata
clean_metadatas = []
for meta in metadatas:
clean_meta = {}
for k, v in meta.items():
if v is not None:
clean_meta[k] = v
clean_metadatas.append(clean_meta)
self.collection.add(
documents=documents,
embeddings=embeddings,
metadatas=clean_metadatas,
ids=ids
)
logger.info(f"Added {len(documents)} documents to vector store")
def search(
self,
query_embedding: List[float],
top_k: int = settings.TOP_K,
filter_dict: Optional[Dict[str, Any]] = None
) -> List[Dict[str, Any]]:
"""
Search for similar documents.
Args:
query_embedding: Query embedding vector
top_k: Number of results to return
filter_dict: Optional filter criteria (e.g., {"kb_id": "123"})
Returns:
List of results with document, metadata, and similarity score
"""
# ChromaDB requires filters in $and/$or format for multiple conditions
where_filter = None
if filter_dict:
if len(filter_dict) == 1:
# Single condition - use directly
where_filter = filter_dict
else:
# Multiple conditions - use $and operator
where_filter = {
"$and": [
{k: v} for k, v in filter_dict.items()
]
}
results = self.collection.query(
query_embeddings=[query_embedding],
n_results=top_k,
where=where_filter,
include=["documents", "metadatas", "distances"]
)
# Format results
formatted_results = []
if results and results['ids'] and results['ids'][0]:
for i, doc_id in enumerate(results['ids'][0]):
# ChromaDB returns distances, convert to similarity
# For cosine distance: similarity = 1 - distance
distance = results['distances'][0][i] if results['distances'] else 0
similarity = 1 - distance # Convert distance to similarity
formatted_results.append({
'id': doc_id,
'content': results['documents'][0][i] if results['documents'] else "",
'metadata': results['metadatas'][0][i] if results['metadatas'] else {},
'similarity_score': max(0, min(1, similarity)) # Clamp to 0-1
})
return formatted_results
def delete_by_filter(self, filter_dict: Dict[str, Any]) -> int:
"""
Delete documents matching a filter.
Args:
filter_dict: Filter criteria
Returns:
Number of documents deleted
"""
# ChromaDB requires filters in $and/$or format for multiple conditions
where_filter = None
if len(filter_dict) == 1:
where_filter = filter_dict
else:
where_filter = {
"$and": [
{k: v} for k, v in filter_dict.items()
]
}
# First, find matching documents
results = self.collection.get(
where=where_filter,
include=["metadatas"]
)
if results and results['ids']:
self.collection.delete(ids=results['ids'])
logger.info(f"Deleted {len(results['ids'])} documents matching filter")
return len(results['ids'])
return 0
def delete_by_ids(self, ids: List[str]) -> None:
"""Delete documents by their IDs."""
if ids:
self.collection.delete(ids=ids)
logger.info(f"Deleted {len(ids)} documents by ID")
def get_stats(
self,
tenant_id: Optional[str] = None, # CRITICAL: Multi-tenant isolation
kb_id: Optional[str] = None,
user_id: Optional[str] = None
) -> Dict[str, Any]:
"""
Get statistics about the vector store.
Args:
tenant_id: Tenant ID for multi-tenant isolation (REQUIRED if filtering)
kb_id: Optional knowledge base ID to filter
user_id: Optional user ID to filter
Returns:
Statistics dictionary
"""
filter_dict = {}
if tenant_id:
filter_dict["tenant_id"] = tenant_id # CRITICAL: Multi-tenant isolation
if kb_id:
filter_dict["kb_id"] = kb_id
if user_id:
filter_dict["user_id"] = user_id
if filter_dict:
# ChromaDB requires filters in $and/$or format for multiple conditions
where_filter = None
if len(filter_dict) == 1:
where_filter = filter_dict
else:
where_filter = {
"$and": [
{k: v} for k, v in filter_dict.items()
]
}
results = self.collection.get(
where=where_filter,
include=["metadatas"]
)
count = len(results['ids']) if results and results['ids'] else 0
# Get unique file names
file_names = set()
if results and results['metadatas']:
for meta in results['metadatas']:
if 'file_name' in meta:
file_names.add(meta['file_name'])
return {
"total_chunks": count,
"file_names": list(file_names),
"tenant_id": tenant_id,
"kb_id": kb_id,
"user_id": user_id
}
else:
return {
"total_chunks": self.collection.count(),
"collection_name": self.collection_name
}
def clear_collection(self) -> None:
"""Clear all documents from the collection."""
self.client.delete_collection(self.collection_name)
self.collection = self.client.create_collection(
name=self.collection_name,
metadata={"hnsw:space": "cosine"}
)
logger.info(f"Cleared collection: {self.collection_name}")
# Global vector store instance
_vector_store: Optional[VectorStore] = None
def get_vector_store() -> VectorStore:
"""Get the global vector store instance."""
global _vector_store
if _vector_store is None:
_vector_store = VectorStore()
return _vector_store