qdrant_store.py•4.83 kB
"""Qdrant vector storage for semantic search."""
from typing import List, Optional, Dict, Any
from qdrant_client import QdrantClient
from qdrant_client.models import (
Distance,
VectorParams,
PointStruct,
Filter,
FieldCondition,
MatchValue
)
class QdrantStore:
"""Qdrant vector store for semantic search."""
def __init__(
self,
url: str,
collection_name: str = "personal_context",
vector_size: int = 768 # nomic-embed-text dimension
):
self.client = QdrantClient(url=url)
self.collection_name = collection_name
self.vector_size = vector_size
async def initialize(self):
"""Initialize Qdrant collection."""
# Check if collection exists
collections = self.client.get_collections().collections
collection_exists = any(
c.name == self.collection_name for c in collections
)
if not collection_exists:
self.client.create_collection(
collection_name=self.collection_name,
vectors_config=VectorParams(
size=self.vector_size,
distance=Distance.COSINE
)
)
async def upsert_vector(
self,
point_id: str,
vector: List[float],
payload: Dict[str, Any]
):
"""Insert or update a vector with payload."""
point = PointStruct(
id=point_id,
vector=vector,
payload=payload
)
self.client.upsert(
collection_name=self.collection_name,
points=[point]
)
async def upsert_vectors(
self,
points: List[Dict[str, Any]]
):
"""Batch insert/update vectors.
Args:
points: List of dicts with 'id', 'vector', and 'payload' keys
"""
qdrant_points = [
PointStruct(
id=p["id"],
vector=p["vector"],
payload=p["payload"]
)
for p in points
]
self.client.upsert(
collection_name=self.collection_name,
points=qdrant_points
)
async def search(
self,
query_vector: List[float],
limit: int = 5,
namespace: Optional[str] = None,
content_type: Optional[str] = None,
score_threshold: Optional[float] = None
) -> List[Dict[str, Any]]:
"""Search for similar vectors.
Returns:
List of dicts with 'id', 'score', and 'payload' keys
"""
# Build filter
filter_conditions = []
if namespace:
filter_conditions.append(
FieldCondition(
key="namespace",
match=MatchValue(value=namespace)
)
)
if content_type:
filter_conditions.append(
FieldCondition(
key="content_type",
match=MatchValue(value=content_type)
)
)
search_filter = Filter(must=filter_conditions) if filter_conditions else None
# Perform search
results = self.client.search(
collection_name=self.collection_name,
query_vector=query_vector,
limit=limit,
query_filter=search_filter,
score_threshold=score_threshold
)
# Format results
return [
{
"id": str(result.id),
"score": result.score,
"payload": result.payload
}
for result in results
]
async def delete_vector(self, point_id: str):
"""Delete a vector by ID."""
self.client.delete(
collection_name=self.collection_name,
points_selector=[point_id]
)
async def delete_vectors_by_document(self, document_id: str):
"""Delete all vectors associated with a document."""
self.client.delete(
collection_name=self.collection_name,
points_selector=Filter(
must=[
FieldCondition(
key="document_id",
match=MatchValue(value=document_id)
)
]
)
)
async def get_collection_info(self) -> Dict[str, Any]:
"""Get collection statistics."""
info = self.client.get_collection(self.collection_name)
return {
"vectors_count": info.vectors_count,
"points_count": info.points_count,
"status": info.status
}
async def delete_collection(self):
"""Delete the entire collection."""
self.client.delete_collection(self.collection_name)