test_search_quality.py•21.4 kB
"""
Search Quality and Duplicate Detection Testing Suite
"""
import json
import time
from datetime import datetime
import sys
import os
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from src.core.search_engine_claude import SearchEngine
# TaskManager import removed - not needed for search quality tests
from src.core.models import get_db_session
class SearchQualityTester:
"""
Test the quality of search results and duplicate detection accuracy.
"""
def __init__(self):
self.search_engine = SearchEngine()
# Note: TaskManager not needed for quality tests as they only test search functionality
self.test_results = []
def test_search_consistency(self, task_description: str, num_runs: int = 3, delay_seconds: int = 30):
"""
Test if the same search query returns consistent, high-quality results.
Args:
task_description: Search query to test
num_runs: Number of search runs to perform
delay_seconds: Delay between runs
"""
print(f"\n{'='*80}")
print(f"SEARCH CONSISTENCY TEST")
print(f"Query: {task_description}")
print(f"Runs: {num_runs}, Delay: {delay_seconds}s between runs")
print(f"{'='*80}")
all_runs_data = []
for run in range(1, num_runs + 1):
print(f"\n--- RUN {run}/{num_runs} ---")
start_time = time.time()
# Execute search
raw_response = self.search_engine.execute_search_task(task_description)
current_items = self.search_engine.parse_search_results(raw_response)
execution_time = time.time() - start_time
# Analyze results
run_data = {
'run_number': run,
'execution_time': execution_time,
'total_items': len(current_items),
'items': current_items,
'timestamp': datetime.now().isoformat()
}
all_runs_data.append(run_data)
print(f"Items found: {len(current_items)}")
print(f"Execution time: {execution_time:.2f}s")
# Show sample items
for i, item in enumerate(current_items[:3], 1):
print(f" {i}. {item.get('name', 'Unknown')} at {item.get('source', 'Unknown')}")
if len(current_items) > 3:
print(f" ... and {len(current_items) - 3} more items")
# Wait before next run (except for last run)
if run < num_runs:
print(f"Waiting {delay_seconds} seconds before next run...")
time.sleep(delay_seconds)
# Analyze consistency across runs
self._analyze_consistency(all_runs_data, task_description)
return all_runs_data
def test_duplicate_detection_accuracy(self, task_description: str, simulate_variations: bool = True):
"""
Test the accuracy of AI-powered duplicate detection.
Args:
task_description: Search query to test
simulate_variations: Whether to create artificial variations for testing
"""
print(f"\n{'='*80}")
print(f"DUPLICATE DETECTION ACCURACY TEST")
print(f"Query: {task_description}")
print(f"{'='*80}")
# First search - baseline
print("\n--- BASELINE SEARCH ---")
raw_response1 = self.search_engine.execute_search_task(task_description)
baseline_items = self.search_engine.parse_search_results(raw_response1)
print(f"Baseline items: {len(baseline_items)}")
# Second search - should find duplicates
print("\n--- SECOND SEARCH (for duplicate detection) ---")
raw_response2 = self.search_engine.execute_search_task(task_description)
current_items = self.search_engine.parse_search_results(raw_response2)
print(f"Current items: {len(current_items)}")
# Test duplicate detection
new_items = self.search_engine.find_new_items(current_items, baseline_items)
print(f"Items detected as NEW: {len(new_items)}")
# Analyze detection accuracy
self._analyze_duplicate_detection(baseline_items, current_items, new_items, task_description)
if simulate_variations:
self._test_variation_detection(baseline_items, task_description)
return {
'baseline_items': baseline_items,
'current_items': current_items,
'new_items': new_items
}
def test_search_quality_metrics(self, task_description: str):
"""
Evaluate the quality of search results based on various metrics.
Args:
task_description: Search query to test
"""
print(f"\n{'='*80}")
print(f"SEARCH QUALITY METRICS TEST")
print(f"Query: {task_description}")
print(f"{'='*80}")
# Execute search
raw_response = self.search_engine.execute_search_task(task_description)
items = self.search_engine.parse_search_results(raw_response)
# Quality metrics
metrics = self._calculate_quality_metrics(items, task_description)
print(f"\n--- QUALITY METRICS ---")
print(f"Total items found: {metrics['total_items']}")
print(f"Items with URLs: {metrics['items_with_urls']} ({metrics['url_percentage']:.1f}%)")
print(f"Items with descriptions: {metrics['items_with_descriptions']} ({metrics['description_percentage']:.1f}%)")
print(f"Items with locations: {metrics['items_with_locations']} ({metrics['location_percentage']:.1f}%)")
print(f"Unique sources: {metrics['unique_sources']}")
print(f"Average name length: {metrics['avg_name_length']:.1f} characters")
print(f"Average description length: {metrics['avg_description_length']:.1f} characters")
print(f"\n--- TOP SOURCES ---")
for source, count in metrics['source_distribution'][:5]:
print(f" {source}: {count} items")
print(f"\n--- SAMPLE ITEMS ---")
for i, item in enumerate(items[:5], 1):
print(f"{i}. {item.get('name', 'No name')}")
print(f" Source: {item.get('source', 'No source')}")
print(f" Location: {item.get('location', 'No location')}")
print(f" URL: {item.get('url', 'No URL')[:60]}...")
print()
return metrics
def test_real_world_scenarios(self):
"""
Test real-world scenarios with different types of search queries.
"""
print(f"\n{'='*80}")
print(f"REAL-WORLD SCENARIOS TEST")
print(f"{'='*80}")
test_scenarios = [
{
'name': 'Job Search - AI/ML',
'query': 'Machine Learning Engineer jobs at tech companies',
'expected_sources': ['LinkedIn', 'Indeed', 'AngelList', 'company websites'],
'expected_fields': ['company', 'location', 'job_title']
},
{
'name': 'Product Releases',
'query': 'New iPhone releases and announcements from Apple',
'expected_sources': ['Apple', 'tech news sites', 'MacRumors'],
'expected_fields': ['product_name', 'release_date', 'features']
},
{
'name': 'Academic Opportunities',
'query': 'PhD fellowships in Computer Science for 2025',
'expected_sources': ['university websites', 'academic portals'],
'expected_fields': ['institution', 'deadline', 'requirements']
},
{
'name': 'News Events',
'query': 'Latest developments in AI regulation and policy',
'expected_sources': ['news websites', 'government sites'],
'expected_fields': ['policy_name', 'effective_date', 'scope']
}
]
scenario_results = []
for scenario in test_scenarios:
print(f"\n--- SCENARIO: {scenario['name']} ---")
print(f"Query: {scenario['query']}")
# Execute search
start_time = time.time()
raw_response = self.search_engine.execute_search_task(scenario['query'])
items = self.search_engine.parse_search_results(raw_response)
execution_time = time.time() - start_time
# Analyze results
scenario_result = {
'scenario': scenario,
'execution_time': execution_time,
'items_found': len(items),
'items': items
}
print(f"Items found: {len(items)}")
print(f"Execution time: {execution_time:.2f}s")
# Check for expected characteristics
sources = [item.get('source', '') for item in items]
unique_sources = list(set(sources))
print(f"Sources found: {len(unique_sources)}")
for source in unique_sources[:5]:
count = sources.count(source)
print(f" - {source}: {count} items")
scenario_results.append(scenario_result)
return scenario_results
def _analyze_consistency(self, all_runs_data, task_description):
"""Analyze consistency across multiple search runs."""
print(f"\n--- CONSISTENCY ANALYSIS ---")
if len(all_runs_data) < 2:
print("Need at least 2 runs to analyze consistency")
return
# Compare item counts
item_counts = [run['total_items'] for run in all_runs_data]
avg_items = sum(item_counts) / len(item_counts)
min_items = min(item_counts)
max_items = max(item_counts)
print(f"Item count consistency:")
print(f" Average: {avg_items:.1f} items")
print(f" Range: {min_items} - {max_items} items")
print(f" Variation: {((max_items - min_items) / avg_items * 100):.1f}%")
# Compare execution times
exec_times = [run['execution_time'] for run in all_runs_data]
avg_time = sum(exec_times) / len(exec_times)
print(f"Execution time:")
print(f" Average: {avg_time:.2f}s")
print(f" Range: {min(exec_times):.2f}s - {max(exec_times):.2f}s")
# Analyze content overlap between runs
if len(all_runs_data) >= 2:
run1_items = all_runs_data[0]['items']
run2_items = all_runs_data[1]['items']
# Compare by name and source
run1_signatures = {f"{item.get('name', '')}-{item.get('source', '')}" for item in run1_items}
run2_signatures = {f"{item.get('name', '')}-{item.get('source', '')}" for item in run2_items}
overlap = len(run1_signatures.intersection(run2_signatures))
total_unique = len(run1_signatures.union(run2_signatures))
print(f"Content overlap between first two runs:")
print(f" Common items: {overlap}")
print(f" Total unique items: {total_unique}")
print(f" Overlap percentage: {(overlap / total_unique * 100):.1f}%")
def _analyze_duplicate_detection(self, baseline_items, current_items, new_items, task_description):
"""Analyze the accuracy of duplicate detection."""
print(f"\n--- DUPLICATE DETECTION ANALYSIS ---")
# Calculate expected vs actual duplicates
baseline_signatures = {f"{item.get('name', '')}-{item.get('source', '')}" for item in baseline_items}
current_signatures = {f"{item.get('name', '')}-{item.get('source', '')}" for item in current_items}
new_signatures = {f"{item.get('name', '')}-{item.get('source', '')}" for item in new_items}
# Items that should be duplicates (exist in both baseline and current)
expected_duplicates = baseline_signatures.intersection(current_signatures)
# Items that were marked as new but exist in baseline (false positives)
false_positives = new_signatures.intersection(baseline_signatures)
# Items that exist in current but not baseline and not marked as new (false negatives)
truly_new_signatures = current_signatures - baseline_signatures
false_negatives = truly_new_signatures - new_signatures
print(f"Expected duplicates (items in both runs): {len(expected_duplicates)}")
print(f"Items marked as NEW: {len(new_signatures)}")
print(f"False positives (new items that are actually duplicates): {len(false_positives)}")
print(f"False negatives (truly new items not detected): {len(false_negatives)}")
if len(expected_duplicates) > 0:
accuracy = 1 - (len(false_positives) + len(false_negatives)) / len(expected_duplicates)
print(f"Detection accuracy: {accuracy * 100:.1f}%")
# Show examples of detection results
if false_positives:
print(f"\nFalse positives (incorrectly marked as new):")
for fp in list(false_positives)[:3]:
print(f" - {fp}")
if false_negatives:
print(f"\nFalse negatives (missed new items):")
for fn in list(false_negatives)[:3]:
print(f" - {fn}")
if new_items and not false_positives:
print(f"\nCorrectly identified new items:")
for item in new_items[:3]:
print(f" - {item.get('name', 'Unknown')} at {item.get('source', 'Unknown')}")
def _test_variation_detection(self, baseline_items, task_description):
"""Test detection of slight variations in item formatting."""
print(f"\n--- VARIATION DETECTION TEST ---")
if not baseline_items:
print("No baseline items to test variations")
return
# Create artificial variations of existing items
varied_items = []
original_item = baseline_items[0].copy()
# Test variations
variations = [
# Name variations
{**original_item, 'name': original_item.get('name', '') + ' - Updated'},
{**original_item, 'name': original_item.get('name', '').replace(' ', '_')},
# Location variations
{**original_item, 'location': original_item.get('location', '') + ', USA'},
{**original_item, 'location': original_item.get('location', '').replace(',', ' -')},
# URL variations
{**original_item, 'url': original_item.get('url', '') + '?ref=test'},
{**original_item, 'url': original_item.get('url', '').replace('https://', 'http://')},
]
for i, variation in enumerate(variations, 1):
# Update hash for the variation
import hashlib
essential_content = f"{variation.get('name', '')}-{variation.get('source', '')}-{variation.get('url', '')}"
variation['hash'] = hashlib.md5(essential_content.encode()).hexdigest()
varied_items.append(variation)
# Test AI detection on variations
new_items = self.search_engine.find_new_items(varied_items, baseline_items)
print(f"Original item: {original_item.get('name', 'Unknown')}")
print(f"Created {len(variations)} variations")
print(f"AI detected {len(new_items)} as genuinely new")
# This should ideally detect most variations as duplicates
if len(new_items) < len(variations):
print(f"✓ Good: AI correctly identified {len(variations) - len(new_items)} variations as duplicates")
else:
print(f"⚠ Warning: AI failed to detect variations as duplicates")
def _calculate_quality_metrics(self, items, task_description):
"""Calculate various quality metrics for search results."""
if not items:
return {'total_items': 0}
total_items = len(items)
items_with_urls = sum(1 for item in items if item.get('url'))
items_with_descriptions = sum(1 for item in items if item.get('description'))
items_with_locations = sum(1 for item in items if item.get('location'))
sources = [item.get('source', 'Unknown') for item in items]
unique_sources = len(set(sources))
# Source distribution
from collections import Counter
source_counts = Counter(sources)
source_distribution = source_counts.most_common()
# Text length analysis
names = [item.get('name', '') for item in items if item.get('name')]
descriptions = [item.get('description', '') for item in items if item.get('description')]
avg_name_length = sum(len(name) for name in names) / len(names) if names else 0
avg_description_length = sum(len(desc) for desc in descriptions) / len(descriptions) if descriptions else 0
return {
'total_items': total_items,
'items_with_urls': items_with_urls,
'url_percentage': (items_with_urls / total_items * 100) if total_items > 0 else 0,
'items_with_descriptions': items_with_descriptions,
'description_percentage': (items_with_descriptions / total_items * 100) if total_items > 0 else 0,
'items_with_locations': items_with_locations,
'location_percentage': (items_with_locations / total_items * 100) if total_items > 0 else 0,
'unique_sources': unique_sources,
'source_distribution': source_distribution,
'avg_name_length': avg_name_length,
'avg_description_length': avg_description_length
}
def run_comprehensive_test_suite(self):
"""Run all quality tests with predefined scenarios."""
print("="*80)
print("COMPREHENSIVE SEARCH QUALITY TEST SUITE")
print("="*80)
# Test scenarios
test_queries = [
"AI Ethics and Safety openings fit for a PhD in Computer Science",
"Latest iPhone 15 release information and reviews",
"Remote software engineering jobs at startups"
]
for i, query in enumerate(test_queries, 1):
print(f"\n\n🔍 TEST SCENARIO {i}: {query}")
print("="*80)
try:
# Test 1: Search quality metrics
quality_metrics = self.test_search_quality_metrics(query)
# Test 2: Duplicate detection accuracy
detection_results = self.test_duplicate_detection_accuracy(query, simulate_variations=True)
# Test 3: Consistency (only for first query to save time)
if i == 1:
consistency_results = self.test_search_consistency(query, num_runs=2, delay_seconds=10)
print(f"\n✅ Completed test scenario {i}")
except Exception as e:
print(f"❌ Error in test scenario {i}: {e}")
# Real-world scenarios
print(f"\n\n🌍 REAL-WORLD SCENARIOS")
print("="*80)
self.test_real_world_scenarios()
print(f"\n\n🎉 COMPREHENSIVE TEST SUITE COMPLETED")
def main():
"""Main function to run search quality tests."""
import argparse
parser = argparse.ArgumentParser(description="Test search quality and duplicate detection")
parser.add_argument("--comprehensive", action="store_true", help="Run comprehensive test suite")
parser.add_argument("--consistency", action="store_true", help="Test search consistency")
parser.add_argument("--duplicates", action="store_true", help="Test duplicate detection")
parser.add_argument("--quality", action="store_true", help="Test search quality metrics")
parser.add_argument("--query", type=str, help="Custom search query to test")
parser.add_argument("--runs", type=int, default=3, help="Number of runs for consistency test")
args = parser.parse_args()
tester = SearchQualityTester()
# Default query if none provided
query = args.query or "AI Ethics and Safety openings fit for a PhD in Computer Science"
if args.comprehensive:
tester.run_comprehensive_test_suite()
elif args.consistency:
tester.test_search_consistency(query, num_runs=args.runs)
elif args.duplicates:
tester.test_duplicate_detection_accuracy(query)
elif args.quality:
tester.test_search_quality_metrics(query)
else:
# Default: run a quick quality check
print("Running quick search quality test...")
print("Use --comprehensive for full test suite")
tester.test_search_quality_metrics(query)
tester.test_duplicate_detection_accuracy(query, simulate_variations=False)
if __name__ == "__main__":
main()