retriever.py•3.15 kB
"""Retriever strategies for document retrieval."""
from abc import ABC, abstractmethod
from typing import List, Optional
from ..storage.schema import SearchResult, DocumentMetadata
from ..storage.qdrant_store import QdrantStore
from ..storage.sqlite_store import SQLiteStore
from ..utils.embeddings import EmbeddingClient
class Retriever(ABC):
"""Base retriever interface."""
@abstractmethod
async def retrieve(
self,
query: str,
limit: int = 5,
namespace: Optional[str] = None,
content_type: Optional[str] = None
) -> List[SearchResult]:
"""Retrieve relevant documents for a query."""
pass
class VectorRetriever(Retriever):
"""Simple vector similarity search via Qdrant."""
def __init__(
self,
qdrant_store: QdrantStore,
sqlite_store: SQLiteStore,
embedding_client: EmbeddingClient,
score_threshold: float = 0.0
):
self.qdrant = qdrant_store
self.sqlite = sqlite_store
self.embeddings = embedding_client
self.score_threshold = score_threshold
async def retrieve(
self,
query: str,
limit: int = 5,
namespace: Optional[str] = None,
content_type: Optional[str] = None
) -> List[SearchResult]:
"""Retrieve documents using vector similarity search.
Args:
query: Search query
limit: Maximum number of results
namespace: Optional namespace filter
content_type: Optional content type filter
Returns:
List of SearchResult objects
"""
# Generate query embedding
query_vector = await self.embeddings.embed_text(query)
# Search Qdrant
qdrant_results = await self.qdrant.search(
query_vector=query_vector,
limit=limit,
namespace=namespace,
content_type=content_type,
score_threshold=self.score_threshold
)
# Retrieve full documents from SQLite
search_results = []
for result in qdrant_results:
payload = result["payload"]
document_id = payload.get("document_id")
chunk_id = payload.get("chunk_id")
# Get chunk text or full document
if chunk_id:
text = await self.sqlite.get_chunk_text(chunk_id)
doc = await self.sqlite.get_document(document_id)
metadata = doc.metadata if doc else None
else:
doc = await self.sqlite.get_document(document_id)
if doc:
text = doc.full_text
metadata = doc.metadata
else:
continue
if text and metadata:
search_results.append(
SearchResult(
document_id=document_id,
chunk_id=chunk_id,
score=result["score"],
text=text,
metadata=metadata
)
)
return search_results