test_e2e.py•10.8 kB
#!/usr/bin/env python3
"""
End-to-end test for Personal RAG MCP Server
Tests the full pipeline: store -> search -> ask
"""
import asyncio
import sys
import os
import hashlib
# Add parent directory to path for imports
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..'))
from personal_rag_mcp.storage.sqlite_store import SQLiteStore
from personal_rag_mcp.storage.qdrant_store import QdrantStore
from personal_rag_mcp.utils.embeddings import EmbeddingClient
from personal_rag_mcp.utils.chunking import TextChunker
from personal_rag_mcp.pipeline.retriever import VectorRetriever
from personal_rag_mcp.pipeline.generator import LLMGenerator
from personal_rag_mcp.pipeline.pipeline import RAGPipeline
from personal_rag_mcp.storage.schema import DocumentMetadata
import uuid
async def test_e2e():
print("=" * 60)
print("End-to-End RAG Pipeline Test")
print("=" * 60)
# Initialize components
print("\n[1/6] Initializing components...")
sqlite_path = "/app/data/test_e2e.db"
if os.path.exists(sqlite_path):
os.remove(sqlite_path)
print(f" Removed existing test database: {sqlite_path}")
sqlite_store = SQLiteStore(sqlite_path)
await sqlite_store.initialize()
print(" ✓ SQLite store initialized")
qdrant_url = os.getenv("QDRANT_URL", "http://qdrant:6333")
qdrant_store = QdrantStore(qdrant_url, collection_name="test_e2e", vector_size=768)
# Delete existing collection if it exists
try:
await qdrant_store.delete_collection()
print(" Removed existing Qdrant collection: test_e2e")
except:
pass # Collection didn't exist
await qdrant_store.initialize()
print(" ✓ Qdrant store initialized")
ollama_url = os.getenv("OLLAMA_URL", "http://ollama:11434")
embedding_client = EmbeddingClient(ollama_url, model="nomic-embed-text")
print(" ✓ Embedding client initialized")
litellm_url = os.getenv("LITELLM_URL", "http://litellm-proxy:8000")
generator = LLMGenerator(api_base=litellm_url, model="deepseek-r1-1.5b-qwen-distill-q4_K_M")
print(" ✓ Generator initialized")
chunker = TextChunker(chunk_size=512, chunk_overlap=50)
print(" ✓ Text chunker initialized")
retriever = VectorRetriever(
sqlite_store=sqlite_store,
qdrant_store=qdrant_store,
embedding_client=embedding_client
)
print(" ✓ Vector retriever initialized")
pipeline = RAGPipeline(retriever=retriever, generator=generator, top_k=2)
print(" ✓ RAG pipeline initialized")
# Test 1: Store documents
print("\n[2/6] Storing test documents...")
test_docs = [
{
"text": """
Docker Compose is a tool for defining and running multi-container Docker applications.
With Compose, you use a YAML file to configure your application's services.
Then, with a single command, you create and start all the services from your configuration.
The docker-compose.yaml file defines services, networks, and volumes.
""",
"namespace": "notes/technical",
"title": "Docker Compose Basics",
"tags": ["docker", "containers", "devops"]
},
{
"text": """
RAG (Retrieval-Augmented Generation) combines semantic search with language models.
First, relevant documents are retrieved using vector similarity search.
Then, these documents are provided as context to the LLM to generate an answer.
This approach reduces hallucinations and grounds responses in factual information.
""",
"namespace": "notes/technical",
"title": "RAG Overview",
"tags": ["rag", "llm", "ai"]
},
{
"text": """
Qdrant is a vector database optimized for neural search.
It supports filtering by metadata and can handle billions of vectors.
Qdrant uses HNSW (Hierarchical Navigable Small World) for fast approximate nearest neighbor search.
It's written in Rust and provides both gRPC and HTTP APIs.
""",
"namespace": "notes/technical",
"title": "Qdrant Database",
"tags": ["qdrant", "vector-db", "search"]
},
{
"text": """
I prefer using bind mounts during development for faster iteration.
Once the code is stable, baking it into the Docker image makes sense.
This way I can edit code and see changes immediately without rebuilding.
""",
"namespace": "notes/personal",
"title": "Docker Development Workflow",
"tags": ["docker", "workflow", "development"]
}
]
stored_doc_ids = []
for i, doc in enumerate(test_docs, 1):
doc_id = str(uuid.uuid4())
# Create metadata
metadata = DocumentMetadata(
id=doc_id,
namespace=doc["namespace"],
content_type="note",
category="technical" if "technical" in doc["namespace"] else "personal",
tags=doc["tags"],
source="test_e2e.py",
title=doc.get("title"),
author="test_user"
)
# Store full document in SQLite
await sqlite_store.store_document(doc_id, doc["text"], metadata)
# Chunk and embed
chunks = chunker.chunk_text(doc["text"])
for chunk_idx, chunk_text in enumerate(chunks):
chunk_id = f"{doc_id}_chunk_{chunk_idx}"
# Generate a numeric Qdrant point ID from the chunk_id string
# Use a hash to convert string to integer
qdrant_point_id = int(hashlib.sha256(chunk_id.encode()).hexdigest()[:16], 16)
# Get embedding and store in Qdrant
embedding = await embedding_client.embed_text(chunk_text)
await qdrant_store.upsert_vector(
point_id=qdrant_point_id,
vector=embedding,
payload={
"document_id": doc_id,
"chunk_id": chunk_id,
"chunk_index": chunk_idx,
"namespace": doc["namespace"],
"content_type": "note",
"tags": doc["tags"]
}
)
# Store chunk in SQLite (after Qdrant so we have the point ID)
await sqlite_store.store_chunk(
chunk_id=chunk_id,
document_id=doc_id,
chunk_index=chunk_idx,
chunk_text=chunk_text,
qdrant_point_id=str(qdrant_point_id)
)
stored_doc_ids.append(doc_id)
print(f" ✓ Stored document {i}: {doc['title']} ({len(chunks)} chunks)")
print(f"\n Total: {len(stored_doc_ids)} documents stored")
# Test 2: Vector search
print("\n[3/6] Testing vector search...")
search_query = "How does Docker Compose work?"
query_vector = await embedding_client.embed_text(search_query)
search_results = await qdrant_store.search(
query_vector=query_vector,
limit=3,
namespace="notes/technical"
)
print(f" Query: '{search_query}'")
print(f" Found {len(search_results)} results:")
for i, result in enumerate(search_results, 1):
print(f" {i}. Score: {result['score']:.4f}, Chunk: {result['payload'].get('chunk_id', 'N/A')}")
if len(search_results) > 0:
print(" ✓ Vector search working")
else:
print(" ✗ Vector search failed - no results")
return False
# Test 3: Retriever (combines vector search + SQLite)
print("\n[4/6] Testing retriever...")
retrieved_docs = await retriever.retrieve(
query=search_query,
limit=2,
namespace="notes/technical"
)
print(f" Query: '{search_query}'")
print(f" Retrieved {len(retrieved_docs)} documents:")
for i, doc in enumerate(retrieved_docs, 1):
title = doc.metadata.title or 'Untitled'
score = doc.score
preview = doc.text[:100].replace('\n', ' ').strip()
print(f" {i}. {title} (score: {score:.4f})")
print(f" Preview: {preview}...")
if len(retrieved_docs) > 0:
print(" ✓ Retriever working")
else:
print(" ✗ Retriever failed - no documents retrieved")
return False
# Test 4: RAG pipeline (search + generate)
print("\n[5/6] Testing RAG pipeline...")
rag_query = "What is RAG and how does it reduce hallucinations?"
result = await pipeline.query(
question=rag_query,
namespace="notes/technical"
)
print(f" Question: '{rag_query}'")
print(f"\n Answer:\n {result['answer']}")
print(f"\n Sources used: {len(result['sources'])}")
for i, source in enumerate(result['sources'], 1):
title = source.get('title') or 'Untitled'
score = source.get('score', 0)
print(f" {i}. {title} (score: {score:.4f})")
if result['answer'] and len(result['sources']) > 0:
print("\n ✓ RAG pipeline working")
else:
print("\n ✗ RAG pipeline failed")
return False
# Test 5: Namespace filtering
print("\n[6/6] Testing namespace filtering...")
personal_query = "What is my Docker development workflow?"
personal_result = await pipeline.query(
question=personal_query,
namespace="notes/personal"
)
print(f" Question: '{personal_query}' (namespace: notes/personal)")
print(f"\n Answer:\n {personal_result['answer']}")
print(f"\n Sources used: {len(personal_result['sources'])}")
for i, source in enumerate(personal_result['sources'], 1):
namespace = source.get('namespace', 'N/A')
title = source.get('title') or 'Untitled'
print(f" {i}. [{namespace}] {title}")
# Verify all sources are from the correct namespace
all_correct_namespace = all(
s.get('namespace', '').startswith('notes/personal')
for s in personal_result['sources']
)
if all_correct_namespace and len(personal_result['sources']) > 0:
print("\n ✓ Namespace filtering working")
else:
print("\n ✗ Namespace filtering failed")
return False
# Cleanup
print("\n[Cleanup]")
print(f" Removing test database: {sqlite_path}")
if os.path.exists(sqlite_path):
os.remove(sqlite_path)
print(f" Deleting Qdrant collection: test_e2e")
await qdrant_store.delete_collection()
return True
if __name__ == "__main__":
success = asyncio.run(test_e2e())
print("\n" + "=" * 60)
if success:
print("✓✓✓ All end-to-end tests passed!")
else:
print("✗✗✗ Some tests failed!")
sys.exit(1)
print("=" * 60)