Skip to main content
Glama
test_e2e.py10.8 kB
#!/usr/bin/env python3 """ End-to-end test for Personal RAG MCP Server Tests the full pipeline: store -> search -> ask """ import asyncio import sys import os import hashlib # Add parent directory to path for imports sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..')) from personal_rag_mcp.storage.sqlite_store import SQLiteStore from personal_rag_mcp.storage.qdrant_store import QdrantStore from personal_rag_mcp.utils.embeddings import EmbeddingClient from personal_rag_mcp.utils.chunking import TextChunker from personal_rag_mcp.pipeline.retriever import VectorRetriever from personal_rag_mcp.pipeline.generator import LLMGenerator from personal_rag_mcp.pipeline.pipeline import RAGPipeline from personal_rag_mcp.storage.schema import DocumentMetadata import uuid async def test_e2e(): print("=" * 60) print("End-to-End RAG Pipeline Test") print("=" * 60) # Initialize components print("\n[1/6] Initializing components...") sqlite_path = "/app/data/test_e2e.db" if os.path.exists(sqlite_path): os.remove(sqlite_path) print(f" Removed existing test database: {sqlite_path}") sqlite_store = SQLiteStore(sqlite_path) await sqlite_store.initialize() print(" ✓ SQLite store initialized") qdrant_url = os.getenv("QDRANT_URL", "http://qdrant:6333") qdrant_store = QdrantStore(qdrant_url, collection_name="test_e2e", vector_size=768) # Delete existing collection if it exists try: await qdrant_store.delete_collection() print(" Removed existing Qdrant collection: test_e2e") except: pass # Collection didn't exist await qdrant_store.initialize() print(" ✓ Qdrant store initialized") ollama_url = os.getenv("OLLAMA_URL", "http://ollama:11434") embedding_client = EmbeddingClient(ollama_url, model="nomic-embed-text") print(" ✓ Embedding client initialized") litellm_url = os.getenv("LITELLM_URL", "http://litellm-proxy:8000") generator = LLMGenerator(api_base=litellm_url, model="deepseek-r1-1.5b-qwen-distill-q4_K_M") print(" ✓ Generator initialized") chunker = TextChunker(chunk_size=512, chunk_overlap=50) print(" ✓ Text chunker initialized") retriever = VectorRetriever( sqlite_store=sqlite_store, qdrant_store=qdrant_store, embedding_client=embedding_client ) print(" ✓ Vector retriever initialized") pipeline = RAGPipeline(retriever=retriever, generator=generator, top_k=2) print(" ✓ RAG pipeline initialized") # Test 1: Store documents print("\n[2/6] Storing test documents...") test_docs = [ { "text": """ Docker Compose is a tool for defining and running multi-container Docker applications. With Compose, you use a YAML file to configure your application's services. Then, with a single command, you create and start all the services from your configuration. The docker-compose.yaml file defines services, networks, and volumes. """, "namespace": "notes/technical", "title": "Docker Compose Basics", "tags": ["docker", "containers", "devops"] }, { "text": """ RAG (Retrieval-Augmented Generation) combines semantic search with language models. First, relevant documents are retrieved using vector similarity search. Then, these documents are provided as context to the LLM to generate an answer. This approach reduces hallucinations and grounds responses in factual information. """, "namespace": "notes/technical", "title": "RAG Overview", "tags": ["rag", "llm", "ai"] }, { "text": """ Qdrant is a vector database optimized for neural search. It supports filtering by metadata and can handle billions of vectors. Qdrant uses HNSW (Hierarchical Navigable Small World) for fast approximate nearest neighbor search. It's written in Rust and provides both gRPC and HTTP APIs. """, "namespace": "notes/technical", "title": "Qdrant Database", "tags": ["qdrant", "vector-db", "search"] }, { "text": """ I prefer using bind mounts during development for faster iteration. Once the code is stable, baking it into the Docker image makes sense. This way I can edit code and see changes immediately without rebuilding. """, "namespace": "notes/personal", "title": "Docker Development Workflow", "tags": ["docker", "workflow", "development"] } ] stored_doc_ids = [] for i, doc in enumerate(test_docs, 1): doc_id = str(uuid.uuid4()) # Create metadata metadata = DocumentMetadata( id=doc_id, namespace=doc["namespace"], content_type="note", category="technical" if "technical" in doc["namespace"] else "personal", tags=doc["tags"], source="test_e2e.py", title=doc.get("title"), author="test_user" ) # Store full document in SQLite await sqlite_store.store_document(doc_id, doc["text"], metadata) # Chunk and embed chunks = chunker.chunk_text(doc["text"]) for chunk_idx, chunk_text in enumerate(chunks): chunk_id = f"{doc_id}_chunk_{chunk_idx}" # Generate a numeric Qdrant point ID from the chunk_id string # Use a hash to convert string to integer qdrant_point_id = int(hashlib.sha256(chunk_id.encode()).hexdigest()[:16], 16) # Get embedding and store in Qdrant embedding = await embedding_client.embed_text(chunk_text) await qdrant_store.upsert_vector( point_id=qdrant_point_id, vector=embedding, payload={ "document_id": doc_id, "chunk_id": chunk_id, "chunk_index": chunk_idx, "namespace": doc["namespace"], "content_type": "note", "tags": doc["tags"] } ) # Store chunk in SQLite (after Qdrant so we have the point ID) await sqlite_store.store_chunk( chunk_id=chunk_id, document_id=doc_id, chunk_index=chunk_idx, chunk_text=chunk_text, qdrant_point_id=str(qdrant_point_id) ) stored_doc_ids.append(doc_id) print(f" ✓ Stored document {i}: {doc['title']} ({len(chunks)} chunks)") print(f"\n Total: {len(stored_doc_ids)} documents stored") # Test 2: Vector search print("\n[3/6] Testing vector search...") search_query = "How does Docker Compose work?" query_vector = await embedding_client.embed_text(search_query) search_results = await qdrant_store.search( query_vector=query_vector, limit=3, namespace="notes/technical" ) print(f" Query: '{search_query}'") print(f" Found {len(search_results)} results:") for i, result in enumerate(search_results, 1): print(f" {i}. Score: {result['score']:.4f}, Chunk: {result['payload'].get('chunk_id', 'N/A')}") if len(search_results) > 0: print(" ✓ Vector search working") else: print(" ✗ Vector search failed - no results") return False # Test 3: Retriever (combines vector search + SQLite) print("\n[4/6] Testing retriever...") retrieved_docs = await retriever.retrieve( query=search_query, limit=2, namespace="notes/technical" ) print(f" Query: '{search_query}'") print(f" Retrieved {len(retrieved_docs)} documents:") for i, doc in enumerate(retrieved_docs, 1): title = doc.metadata.title or 'Untitled' score = doc.score preview = doc.text[:100].replace('\n', ' ').strip() print(f" {i}. {title} (score: {score:.4f})") print(f" Preview: {preview}...") if len(retrieved_docs) > 0: print(" ✓ Retriever working") else: print(" ✗ Retriever failed - no documents retrieved") return False # Test 4: RAG pipeline (search + generate) print("\n[5/6] Testing RAG pipeline...") rag_query = "What is RAG and how does it reduce hallucinations?" result = await pipeline.query( question=rag_query, namespace="notes/technical" ) print(f" Question: '{rag_query}'") print(f"\n Answer:\n {result['answer']}") print(f"\n Sources used: {len(result['sources'])}") for i, source in enumerate(result['sources'], 1): title = source.get('title') or 'Untitled' score = source.get('score', 0) print(f" {i}. {title} (score: {score:.4f})") if result['answer'] and len(result['sources']) > 0: print("\n ✓ RAG pipeline working") else: print("\n ✗ RAG pipeline failed") return False # Test 5: Namespace filtering print("\n[6/6] Testing namespace filtering...") personal_query = "What is my Docker development workflow?" personal_result = await pipeline.query( question=personal_query, namespace="notes/personal" ) print(f" Question: '{personal_query}' (namespace: notes/personal)") print(f"\n Answer:\n {personal_result['answer']}") print(f"\n Sources used: {len(personal_result['sources'])}") for i, source in enumerate(personal_result['sources'], 1): namespace = source.get('namespace', 'N/A') title = source.get('title') or 'Untitled' print(f" {i}. [{namespace}] {title}") # Verify all sources are from the correct namespace all_correct_namespace = all( s.get('namespace', '').startswith('notes/personal') for s in personal_result['sources'] ) if all_correct_namespace and len(personal_result['sources']) > 0: print("\n ✓ Namespace filtering working") else: print("\n ✗ Namespace filtering failed") return False # Cleanup print("\n[Cleanup]") print(f" Removing test database: {sqlite_path}") if os.path.exists(sqlite_path): os.remove(sqlite_path) print(f" Deleting Qdrant collection: test_e2e") await qdrant_store.delete_collection() return True if __name__ == "__main__": success = asyncio.run(test_e2e()) print("\n" + "=" * 60) if success: print("✓✓✓ All end-to-end tests passed!") else: print("✗✗✗ Some tests failed!") sys.exit(1) print("=" * 60)

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/timerickson/personal-rag-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server