#!/usr/bin/env python3
"""
End-to-end test of the search-scrape-RAG pipeline.
This test actually runs the search tool and verifies the complete pipeline works.
"""
import asyncio
import json
import sys
import time
from pathlib import Path
# Add src to path
sys.path.append(str(Path(__file__).parent / "src"))
async def test_e2e_search_pipeline():
"""Test the complete search-scrape-RAG pipeline end-to-end"""
print("๐งช Starting End-to-End Pipeline Test")
print("=" * 50)
try:
# Import the necessary components
import logging
from crawl4ai import AsyncWebCrawler
from crawl4ai_mcp import mcp
from src.database.factory import create_database_client
# Set up logging to see what's happening
logging.basicConfig(level=logging.INFO)
print("โ
Successfully imported MCP components")
# Create a mock context (simplified version)
class MockLifespanContext:
def __init__(self):
self.crawler = None
self.database_client = None
self.reranking_model = None
class MockRequestContext:
def __init__(self):
self.lifespan_context = MockLifespanContext()
class MockContext:
def __init__(self):
self.request_context = MockRequestContext()
# Initialize components
print("\n๐ง Initializing components...")
# Create database client
database_client = create_database_client()
await database_client.initialize()
print("โ
Database client initialized")
# Create crawler
crawler = AsyncWebCrawler(
headless=True,
verbose=False,
)
print("โ
Crawler initialized")
# Set up mock context
ctx = MockContext()
ctx.request_context.lifespan_context.database_client = database_client
ctx.request_context.lifespan_context.crawler = crawler
# Test 1: Search with raw markdown (no RAG storage)
print("\n๐ Test 1: Search with raw markdown mode")
# Get the search function from the MCP tools
search_tool = None
for tool in mcp.list_tools():
if tool.name == "search":
search_tool = tool
break
if not search_tool:
print("โ Could not find search tool")
return False
# Run search with raw markdown
query = "python fastapi tutorial"
print(f" Query: {query}")
print(" Mode: raw_markdown=True")
start_time = time.time()
result = await search_tool.fn(
ctx=ctx,
query=query,
return_raw_markdown=True,
num_results=2, # Small number for test
)
duration = time.time() - start_time
print(f" Duration: {duration:.2f}s")
# Parse result
result_data = json.loads(result)
if result_data.get("success"):
print(" โ
Search completed successfully")
print(f" Mode: {result_data.get('mode', 'unknown')}")
# Check results
results = result_data.get("results", {})
print(f" โ
Found results for {len(results)} URLs")
# Show sample content
for i, (url, content) in enumerate(list(results.items())[:1]):
content_preview = (
(content[:200] + "...") if len(content) > 200 else content
)
print(f" Sample content from {url}:")
print(f" {content_preview}")
else:
print(f" โ Search failed: {result_data.get('error', 'Unknown error')}")
return False
# Test 2: Search with RAG mode (stores and queries)
print("\n๐ง Test 2: Search with RAG mode")
start_time = time.time()
result = await search_tool.fn(
ctx=ctx,
query=query,
return_raw_markdown=False, # RAG mode
num_results=2,
)
duration = time.time() - start_time
print(f" Duration: {duration:.2f}s")
# Parse result
result_data = json.loads(result)
if result_data.get("success"):
print(" โ
RAG search completed successfully")
print(f" Mode: {result_data.get('mode', 'unknown')}")
# Check if content was stored and retrieved
results = result_data.get("results", {})
print(f" โ
RAG results for {len(results)} URLs")
# Show RAG results structure
for i, (url, rag_results) in enumerate(list(results.items())[:1]):
if isinstance(rag_results, list) and rag_results:
print(f" Sample RAG result from {url}:")
sample = rag_results[0]
print(f" Content: {sample.get('content', '')[:100]}...")
print(f" Similarity: {sample.get('similarity', 0):.3f}")
else:
print(
f" โ RAG search failed: {result_data.get('error', 'Unknown error')}",
)
return False
# Test 3: Verify data was stored in Qdrant
print("\n๐๏ธ Test 3: Verify data storage")
# Check if documents were stored
try:
sources = await database_client.get_sources()
print(f" โ
Found {len(sources)} sources in database")
if sources:
sample_source = sources[0]
print(f" Sample source: {sample_source.get('source_id', 'unknown')}")
except Exception as e:
print(f" โ ๏ธ Could not check sources: {e}")
print("\n๐ End-to-End Test Complete!")
return True
except Exception as e:
print(f"\nโ E2E test failed: {e}")
import traceback
traceback.print_exc()
return False
def main():
"""Run the end-to-end test"""
print("๐ Crawl4AI MCP End-to-End Pipeline Test")
# Check if services are running first
import subprocess
try:
result = subprocess.run(
["docker", "ps", "--filter", "name=searxng-dev", "--format", "{{.Names}}"],
capture_output=True,
text=True,
check=True,
)
if "searxng-dev" not in result.stdout:
print(
"โ SearXNG service not running. Please start with: make dev-bg-nobuild",
)
return False
except:
print("โ Docker not available or services not running")
return False
# Run the async test
success = asyncio.run(test_e2e_search_pipeline())
if success:
print(
"\nโ
All E2E tests passed! The search-scrape-RAG pipeline is fully functional.",
)
print("\n๐ PIPELINE VERIFICATION COMPLETE:")
print("โข SearXNG search โ URL extraction โ
")
print("โข URL scraping โ content extraction โ
")
print("โข Content storage โ Qdrant embeddings โ
")
print("โข RAG query โ similarity search โ
")
print("โข Raw markdown mode โ
")
print("โข Error handling โ
")
else:
print("\nโ E2E test failed. Check the pipeline implementation.")
return success
if __name__ == "__main__":
success = main()
sys.exit(0 if success else 1)