qdrant.py•11.3 kB
import logging
import shutil
from pathlib import Path
from qdrant_client import QdrantClient
from qdrant_client.models import (
Distance,
FieldCondition,
Filter,
MatchValue,
PointIdsList,
PointStruct,
Range,
VectorParams,
)
from selfmemory.vector_stores.base import VectorStoreBase
logger = logging.getLogger(__name__)
class Qdrant(VectorStoreBase):
def __init__(
self,
collection_name: str,
embedding_model_dims: int,
client: QdrantClient = None,
host: str = None,
port: int = None,
path: str = None,
url: str = None,
api_key: str = None,
on_disk: bool = False,
**kwargs,
):
"""
Initialize the Qdrant vector store.
Args:
collection_name (str): Name of the collection.
embedding_model_dims (int): Dimensions of the embedding model.
client (QdrantClient, optional): Existing Qdrant client instance. Defaults to None.
host (str, optional): Host address for Qdrant server. Defaults to None.
port (int, optional): Port for Qdrant server. Defaults to None.
path (str, optional): Path for local Qdrant database. Defaults to None.
url (str, optional): Full URL for Qdrant server. Defaults to None.
api_key (str, optional): API key for Qdrant server. Defaults to None.
on_disk (bool, optional): Enables persistent storage. Defaults to False.
"""
if client:
self.client = client
self.is_local = False
else:
params = {}
if api_key:
params["api_key"] = api_key
if url:
params["url"] = url
if host and port:
params["host"] = host
params["port"] = port
if not params:
params["path"] = path
self.is_local = True
if not on_disk and path:
p = Path(path)
if p.exists() and p.is_dir():
shutil.rmtree(p)
else:
self.is_local = False
self.client = QdrantClient(**params)
self.collection_name = collection_name
self.embedding_model_dims = embedding_model_dims
self.on_disk = on_disk
self.create_col(embedding_model_dims, on_disk)
def create_col(
self, vector_size: int, on_disk: bool, distance: Distance = Distance.COSINE
):
"""
Create a new collection.
Args:
vector_size (int): Size of the vectors to be stored.
on_disk (bool): Enables persistent storage.
distance (Distance, optional): Distance metric for vector similarity. Defaults to Distance.COSINE.
"""
# Skip creating collection if already exists
response = self.list_cols()
for collection in response.collections:
if collection.name == self.collection_name:
logger.debug(
f"Collection {self.collection_name} already exists. Skipping creation."
)
self._create_filter_indexes()
return
self.client.create_collection(
collection_name=self.collection_name,
vectors_config=VectorParams(
size=vector_size, distance=distance, on_disk=on_disk
),
)
self._create_filter_indexes()
def _create_filter_indexes(self):
"""Create indexes for commonly used filter fields to enable filtering."""
# Only create payload indexes for remote Qdrant servers
if self.is_local:
logger.debug(
"Skipping payload index creation for local Qdrant (not supported)"
)
return
common_fields = [
"user_id",
"project_id", # Critical for multi-tenant project isolation
"organization_id", # Critical for multi-tenant organization isolation
"agent_id",
"run_id",
"actor_id",
"tags",
"people_mentioned",
"topic_category",
]
for field in common_fields:
try:
self.client.create_payload_index(
collection_name=self.collection_name,
field_name=field,
field_schema="keyword",
)
logger.info(
f"Created index for {field} in collection {self.collection_name}"
)
except Exception as e:
logger.debug(f"Index for {field} might already exist: {e}")
def insert(self, vectors: list, payloads: list = None, ids: list = None):
"""
Insert vectors into a collection.
Args:
vectors (list): List of vectors to insert.
payloads (list, optional): List of payloads corresponding to vectors. Defaults to None.
ids (list, optional): List of IDs corresponding to vectors. Defaults to None.
"""
logger.info(
f"Inserting {len(vectors)} vectors into collection {self.collection_name}"
)
points = [
PointStruct(
id=idx if ids is None else ids[idx],
vector=vector,
payload=payloads[idx] if payloads else {},
)
for idx, vector in enumerate(vectors)
]
self.client.upsert(collection_name=self.collection_name, points=points)
def _create_filter(self, filters: dict) -> Filter:
"""
Create a Filter object from the provided filters.
Args:
filters (dict): Filters to apply.
Returns:
Filter: The created Filter object.
"""
if not filters:
logger.info("🔍 Qdrant: No filters provided, returning None")
return None
logger.info(f"🔍 Qdrant: Creating filters from: {filters}")
conditions = []
for key, value in filters.items():
if isinstance(value, dict) and "gte" in value and "lte" in value:
condition = FieldCondition(
key=key, range=Range(gte=value["gte"], lte=value["lte"])
)
logger.info(
f"🔍 Qdrant: Added range condition: {key} >= {value['gte']} <= {value['lte']}"
)
else:
condition = FieldCondition(key=key, match=MatchValue(value=value))
logger.info(f"🔍 Qdrant: Added match condition: {key} = {value}")
conditions.append(condition)
filter_obj = Filter(must=conditions) if conditions else None
logger.info(
f"🔍 Qdrant: Created filter with {len(conditions)} conditions: {filter_obj}"
)
return filter_obj
def search(
self, query: str, vectors: list, limit: int = 5, filters: dict = None
) -> list:
"""
Search for similar vectors.
Args:
query (str): Query.
vectors (list): Query vector.
limit (int, optional): Number of results to return. Defaults to 5.
filters (dict, optional): Filters to apply to the search. Defaults to None.
Returns:
list: Search results.
"""
query_filter = self._create_filter(filters) if filters else None
hits = self.client.query_points(
collection_name=self.collection_name,
query=vectors,
query_filter=query_filter,
limit=limit,
)
return hits.points
def delete(self, vector_id: int):
"""
Delete a vector by ID.
Args:
vector_id (int): ID of the vector to delete.
"""
self.client.delete(
collection_name=self.collection_name,
points_selector=PointIdsList(
points=[vector_id],
),
)
def update(self, vector_id: int, vector: list = None, payload: dict = None):
"""
Update a vector and its payload.
Args:
vector_id (int): ID of the vector to update.
vector (list, optional): Updated vector. Defaults to None.
payload (dict, optional): Updated payload. Defaults to None.
"""
point = PointStruct(id=vector_id, vector=vector, payload=payload)
self.client.upsert(collection_name=self.collection_name, points=[point])
def get(self, vector_id: int) -> dict:
"""
Retrieve a vector by ID.
Args:
vector_id (int): ID of the vector to retrieve.
Returns:
dict: Retrieved vector.
"""
result = self.client.retrieve(
collection_name=self.collection_name, ids=[vector_id], with_payload=True
)
return result[0] if result else None
def list_cols(self) -> list:
"""
List all collections.
Returns:
list: List of collection names.
"""
return self.client.get_collections()
def delete_col(self):
"""Delete a collection."""
self.client.delete_collection(collection_name=self.collection_name)
def col_info(self) -> dict:
"""
Get information about a collection.
Returns:
dict: Collection information.
"""
return self.client.get_collection(collection_name=self.collection_name)
def list(self, filters: dict = None, limit: int = 100) -> list:
"""
List all vectors in a collection.
Args:
filters (dict, optional): Filters to apply to the list. Defaults to None.
limit (int, optional): Number of vectors to return. Defaults to 100.
Returns:
list: List of vectors.
"""
query_filter = self._create_filter(filters) if filters else None
result = self.client.scroll(
collection_name=self.collection_name,
scroll_filter=query_filter,
limit=limit,
with_payload=True,
with_vectors=False,
)
return result
def count(self):
"""
Get count of points in the collection.
Returns:
int: Number of points in collection
"""
try:
collection_info = self.client.get_collection(self.collection_name)
return collection_info.points_count
except Exception as e:
logger.error(f"Error getting count: {e}")
return 0
def delete_all(self):
"""
Delete all points from the collection.
Returns:
bool: True if successful
"""
try:
# Reset the collection (delete and recreate)
self.reset()
return True
except Exception as e:
logger.error(f"Error in delete_all: {e}")
return False
def reset(self):
"""Reset the index by deleting and recreating it."""
logger.warning(f"Resetting index {self.collection_name}...")
self.delete_col()
self.create_col(self.embedding_model_dims, self.on_disk)