#!/usr/bin/env python3
"""
End-to-end test of the search-scrape-RAG pipeline.
This test actually runs the search tool and verifies the complete pipeline works.
"""
import asyncio
import json
import sys
import time
from pathlib import Path
# Add src to path
sys.path.append(str(Path(__file__).parent / "src"))
async def test_e2e_search_pipeline():
"""Test the complete search-scrape-RAG pipeline end-to-end"""
print("π§ͺ Starting End-to-End Pipeline Test")
print("=" * 50)
try:
# Import the necessary components
import logging
from crawl4ai import AsyncWebCrawler
from crawl4ai_mcp import mcp
from src.database.factory import create_database_client
# Set up logging to see what's happening
logging.basicConfig(level=logging.INFO)
print("β
Successfully imported MCP components")
# Create a mock context (simplified version)
class MockLifespanContext:
def __init__(self):
self.crawler = None
self.database_client = None
self.reranking_model = None
class MockRequestContext:
def __init__(self):
self.lifespan_context = MockLifespanContext()
class MockContext:
def __init__(self):
self.request_context = MockRequestContext()
# Initialize components
print("\nπ§ Initializing components...")
# Create database client
database_client = create_database_client()
await database_client.initialize()
print("β
Database client initialized")
# Create crawler
crawler = AsyncWebCrawler(
headless=True,
verbose=False,
)
print("β
Crawler initialized")
# Set up mock context
ctx = MockContext()
ctx.request_context.lifespan_context.database_client = database_client
ctx.request_context.lifespan_context.crawler = crawler
# Test 1: Search with raw markdown (no RAG storage)
print("\nπ Test 1: Search with raw markdown mode")
# Get the search function from the MCP tools
search_tool = None
for tool in mcp.list_tools():
if tool.name == "search":
search_tool = tool
break
if not search_tool:
print("β Could not find search tool")
return False
# Run search with raw markdown
query = "python fastapi tutorial"
print(f" Query: {query}")
print(" Mode: raw_markdown=True")
start_time = time.time()
result = await search_tool.fn(
ctx=ctx,
query=query,
return_raw_markdown=True,
num_results=2, # Small number for test
)
duration = time.time() - start_time
print(f" Duration: {duration:.2f}s")
# Parse result
result_data = json.loads(result)
if result_data.get("success"):
print(" β
Search completed successfully")
print(f" Mode: {result_data.get('mode', 'unknown')}")
# Check results
results = result_data.get("results", {})
print(f" β
Found results for {len(results)} URLs")
# Show sample content
for i, (url, content) in enumerate(list(results.items())[:1]):
content_preview = (
(content[:200] + "...") if len(content) > 200 else content
)
print(f" Sample content from {url}:")
print(f" {content_preview}")
else:
print(f" β Search failed: {result_data.get('error', 'Unknown error')}")
return False
# Test 2: Search with RAG mode (stores and queries)
print("\nπ§ Test 2: Search with RAG mode")
start_time = time.time()
result = await search_tool.fn(
ctx=ctx,
query=query,
return_raw_markdown=False, # RAG mode
num_results=2,
)
duration = time.time() - start_time
print(f" Duration: {duration:.2f}s")
# Parse result
result_data = json.loads(result)
if result_data.get("success"):
print(" β
RAG search completed successfully")
print(f" Mode: {result_data.get('mode', 'unknown')}")
# Check if content was stored and retrieved
results = result_data.get("results", {})
print(f" β
RAG results for {len(results)} URLs")
# Show RAG results structure
for i, (url, rag_results) in enumerate(list(results.items())[:1]):
if isinstance(rag_results, list) and rag_results:
print(f" Sample RAG result from {url}:")
sample = rag_results[0]
print(f" Content: {sample.get('content', '')[:100]}...")
print(f" Similarity: {sample.get('similarity', 0):.3f}")
else:
print(
f" β RAG search failed: {result_data.get('error', 'Unknown error')}",
)
return False
# Test 3: Verify data was stored in Qdrant
print("\nποΈ Test 3: Verify data storage")
# Check if documents were stored
try:
sources = await database_client.get_sources()
print(f" β
Found {len(sources)} sources in database")
if sources:
sample_source = sources[0]
print(f" Sample source: {sample_source.get('source_id', 'unknown')}")
except Exception as e:
print(f" β οΈ Could not check sources: {e}")
print("\nπ End-to-End Test Complete!")
return True
except Exception as e:
print(f"\nβ E2E test failed: {e}")
import traceback
traceback.print_exc()
return False
def main():
"""Run the end-to-end test"""
print("π Crawl4AI MCP End-to-End Pipeline Test")
# Check if services are running first
import subprocess
try:
result = subprocess.run(
["docker", "ps", "--filter", "name=searxng-dev", "--format", "{{.Names}}"],
capture_output=True,
text=True,
check=True,
)
if "searxng-dev" not in result.stdout:
print(
"β SearXNG service not running. Please start with: make dev-bg-nobuild",
)
return False
except:
print("β Docker not available or services not running")
return False
# Run the async test
success = asyncio.run(test_e2e_search_pipeline())
if success:
print(
"\nβ
All E2E tests passed! The search-scrape-RAG pipeline is fully functional.",
)
print("\nπ PIPELINE VERIFICATION COMPLETE:")
print("β’ SearXNG search β URL extraction β
")
print("β’ URL scraping β content extraction β
")
print("β’ Content storage β Qdrant embeddings β
")
print("β’ RAG query β similarity search β
")
print("β’ Raw markdown mode β
")
print("β’ Error handling β
")
else:
print("\nβ E2E test failed. Check the pipeline implementation.")
return success
if __name__ == "__main__":
success = main()
sys.exit(0 if success else 1)