Skip to main content
Glama
qdrant_store.py4.83 kB
"""Qdrant vector storage for semantic search.""" from typing import List, Optional, Dict, Any from qdrant_client import QdrantClient from qdrant_client.models import ( Distance, VectorParams, PointStruct, Filter, FieldCondition, MatchValue ) class QdrantStore: """Qdrant vector store for semantic search.""" def __init__( self, url: str, collection_name: str = "personal_context", vector_size: int = 768 # nomic-embed-text dimension ): self.client = QdrantClient(url=url) self.collection_name = collection_name self.vector_size = vector_size async def initialize(self): """Initialize Qdrant collection.""" # Check if collection exists collections = self.client.get_collections().collections collection_exists = any( c.name == self.collection_name for c in collections ) if not collection_exists: self.client.create_collection( collection_name=self.collection_name, vectors_config=VectorParams( size=self.vector_size, distance=Distance.COSINE ) ) async def upsert_vector( self, point_id: str, vector: List[float], payload: Dict[str, Any] ): """Insert or update a vector with payload.""" point = PointStruct( id=point_id, vector=vector, payload=payload ) self.client.upsert( collection_name=self.collection_name, points=[point] ) async def upsert_vectors( self, points: List[Dict[str, Any]] ): """Batch insert/update vectors. Args: points: List of dicts with 'id', 'vector', and 'payload' keys """ qdrant_points = [ PointStruct( id=p["id"], vector=p["vector"], payload=p["payload"] ) for p in points ] self.client.upsert( collection_name=self.collection_name, points=qdrant_points ) async def search( self, query_vector: List[float], limit: int = 5, namespace: Optional[str] = None, content_type: Optional[str] = None, score_threshold: Optional[float] = None ) -> List[Dict[str, Any]]: """Search for similar vectors. Returns: List of dicts with 'id', 'score', and 'payload' keys """ # Build filter filter_conditions = [] if namespace: filter_conditions.append( FieldCondition( key="namespace", match=MatchValue(value=namespace) ) ) if content_type: filter_conditions.append( FieldCondition( key="content_type", match=MatchValue(value=content_type) ) ) search_filter = Filter(must=filter_conditions) if filter_conditions else None # Perform search results = self.client.search( collection_name=self.collection_name, query_vector=query_vector, limit=limit, query_filter=search_filter, score_threshold=score_threshold ) # Format results return [ { "id": str(result.id), "score": result.score, "payload": result.payload } for result in results ] async def delete_vector(self, point_id: str): """Delete a vector by ID.""" self.client.delete( collection_name=self.collection_name, points_selector=[point_id] ) async def delete_vectors_by_document(self, document_id: str): """Delete all vectors associated with a document.""" self.client.delete( collection_name=self.collection_name, points_selector=Filter( must=[ FieldCondition( key="document_id", match=MatchValue(value=document_id) ) ] ) ) async def get_collection_info(self) -> Dict[str, Any]: """Get collection statistics.""" info = self.client.get_collection(self.collection_name) return { "vectors_count": info.vectors_count, "points_count": info.points_count, "status": info.status } async def delete_collection(self): """Delete the entire collection.""" self.client.delete_collection(self.collection_name)

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/timerickson/personal-rag-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server