Skip to main content
Glama

TimeLooker MCP Server

test_search_quality.py21.4 kB
""" Search Quality and Duplicate Detection Testing Suite """ import json import time from datetime import datetime import sys import os sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) from src.core.search_engine_claude import SearchEngine # TaskManager import removed - not needed for search quality tests from src.core.models import get_db_session class SearchQualityTester: """ Test the quality of search results and duplicate detection accuracy. """ def __init__(self): self.search_engine = SearchEngine() # Note: TaskManager not needed for quality tests as they only test search functionality self.test_results = [] def test_search_consistency(self, task_description: str, num_runs: int = 3, delay_seconds: int = 30): """ Test if the same search query returns consistent, high-quality results. Args: task_description: Search query to test num_runs: Number of search runs to perform delay_seconds: Delay between runs """ print(f"\n{'='*80}") print(f"SEARCH CONSISTENCY TEST") print(f"Query: {task_description}") print(f"Runs: {num_runs}, Delay: {delay_seconds}s between runs") print(f"{'='*80}") all_runs_data = [] for run in range(1, num_runs + 1): print(f"\n--- RUN {run}/{num_runs} ---") start_time = time.time() # Execute search raw_response = self.search_engine.execute_search_task(task_description) current_items = self.search_engine.parse_search_results(raw_response) execution_time = time.time() - start_time # Analyze results run_data = { 'run_number': run, 'execution_time': execution_time, 'total_items': len(current_items), 'items': current_items, 'timestamp': datetime.now().isoformat() } all_runs_data.append(run_data) print(f"Items found: {len(current_items)}") print(f"Execution time: {execution_time:.2f}s") # Show sample items for i, item in enumerate(current_items[:3], 1): print(f" {i}. {item.get('name', 'Unknown')} at {item.get('source', 'Unknown')}") if len(current_items) > 3: print(f" ... and {len(current_items) - 3} more items") # Wait before next run (except for last run) if run < num_runs: print(f"Waiting {delay_seconds} seconds before next run...") time.sleep(delay_seconds) # Analyze consistency across runs self._analyze_consistency(all_runs_data, task_description) return all_runs_data def test_duplicate_detection_accuracy(self, task_description: str, simulate_variations: bool = True): """ Test the accuracy of AI-powered duplicate detection. Args: task_description: Search query to test simulate_variations: Whether to create artificial variations for testing """ print(f"\n{'='*80}") print(f"DUPLICATE DETECTION ACCURACY TEST") print(f"Query: {task_description}") print(f"{'='*80}") # First search - baseline print("\n--- BASELINE SEARCH ---") raw_response1 = self.search_engine.execute_search_task(task_description) baseline_items = self.search_engine.parse_search_results(raw_response1) print(f"Baseline items: {len(baseline_items)}") # Second search - should find duplicates print("\n--- SECOND SEARCH (for duplicate detection) ---") raw_response2 = self.search_engine.execute_search_task(task_description) current_items = self.search_engine.parse_search_results(raw_response2) print(f"Current items: {len(current_items)}") # Test duplicate detection new_items = self.search_engine.find_new_items(current_items, baseline_items) print(f"Items detected as NEW: {len(new_items)}") # Analyze detection accuracy self._analyze_duplicate_detection(baseline_items, current_items, new_items, task_description) if simulate_variations: self._test_variation_detection(baseline_items, task_description) return { 'baseline_items': baseline_items, 'current_items': current_items, 'new_items': new_items } def test_search_quality_metrics(self, task_description: str): """ Evaluate the quality of search results based on various metrics. Args: task_description: Search query to test """ print(f"\n{'='*80}") print(f"SEARCH QUALITY METRICS TEST") print(f"Query: {task_description}") print(f"{'='*80}") # Execute search raw_response = self.search_engine.execute_search_task(task_description) items = self.search_engine.parse_search_results(raw_response) # Quality metrics metrics = self._calculate_quality_metrics(items, task_description) print(f"\n--- QUALITY METRICS ---") print(f"Total items found: {metrics['total_items']}") print(f"Items with URLs: {metrics['items_with_urls']} ({metrics['url_percentage']:.1f}%)") print(f"Items with descriptions: {metrics['items_with_descriptions']} ({metrics['description_percentage']:.1f}%)") print(f"Items with locations: {metrics['items_with_locations']} ({metrics['location_percentage']:.1f}%)") print(f"Unique sources: {metrics['unique_sources']}") print(f"Average name length: {metrics['avg_name_length']:.1f} characters") print(f"Average description length: {metrics['avg_description_length']:.1f} characters") print(f"\n--- TOP SOURCES ---") for source, count in metrics['source_distribution'][:5]: print(f" {source}: {count} items") print(f"\n--- SAMPLE ITEMS ---") for i, item in enumerate(items[:5], 1): print(f"{i}. {item.get('name', 'No name')}") print(f" Source: {item.get('source', 'No source')}") print(f" Location: {item.get('location', 'No location')}") print(f" URL: {item.get('url', 'No URL')[:60]}...") print() return metrics def test_real_world_scenarios(self): """ Test real-world scenarios with different types of search queries. """ print(f"\n{'='*80}") print(f"REAL-WORLD SCENARIOS TEST") print(f"{'='*80}") test_scenarios = [ { 'name': 'Job Search - AI/ML', 'query': 'Machine Learning Engineer jobs at tech companies', 'expected_sources': ['LinkedIn', 'Indeed', 'AngelList', 'company websites'], 'expected_fields': ['company', 'location', 'job_title'] }, { 'name': 'Product Releases', 'query': 'New iPhone releases and announcements from Apple', 'expected_sources': ['Apple', 'tech news sites', 'MacRumors'], 'expected_fields': ['product_name', 'release_date', 'features'] }, { 'name': 'Academic Opportunities', 'query': 'PhD fellowships in Computer Science for 2025', 'expected_sources': ['university websites', 'academic portals'], 'expected_fields': ['institution', 'deadline', 'requirements'] }, { 'name': 'News Events', 'query': 'Latest developments in AI regulation and policy', 'expected_sources': ['news websites', 'government sites'], 'expected_fields': ['policy_name', 'effective_date', 'scope'] } ] scenario_results = [] for scenario in test_scenarios: print(f"\n--- SCENARIO: {scenario['name']} ---") print(f"Query: {scenario['query']}") # Execute search start_time = time.time() raw_response = self.search_engine.execute_search_task(scenario['query']) items = self.search_engine.parse_search_results(raw_response) execution_time = time.time() - start_time # Analyze results scenario_result = { 'scenario': scenario, 'execution_time': execution_time, 'items_found': len(items), 'items': items } print(f"Items found: {len(items)}") print(f"Execution time: {execution_time:.2f}s") # Check for expected characteristics sources = [item.get('source', '') for item in items] unique_sources = list(set(sources)) print(f"Sources found: {len(unique_sources)}") for source in unique_sources[:5]: count = sources.count(source) print(f" - {source}: {count} items") scenario_results.append(scenario_result) return scenario_results def _analyze_consistency(self, all_runs_data, task_description): """Analyze consistency across multiple search runs.""" print(f"\n--- CONSISTENCY ANALYSIS ---") if len(all_runs_data) < 2: print("Need at least 2 runs to analyze consistency") return # Compare item counts item_counts = [run['total_items'] for run in all_runs_data] avg_items = sum(item_counts) / len(item_counts) min_items = min(item_counts) max_items = max(item_counts) print(f"Item count consistency:") print(f" Average: {avg_items:.1f} items") print(f" Range: {min_items} - {max_items} items") print(f" Variation: {((max_items - min_items) / avg_items * 100):.1f}%") # Compare execution times exec_times = [run['execution_time'] for run in all_runs_data] avg_time = sum(exec_times) / len(exec_times) print(f"Execution time:") print(f" Average: {avg_time:.2f}s") print(f" Range: {min(exec_times):.2f}s - {max(exec_times):.2f}s") # Analyze content overlap between runs if len(all_runs_data) >= 2: run1_items = all_runs_data[0]['items'] run2_items = all_runs_data[1]['items'] # Compare by name and source run1_signatures = {f"{item.get('name', '')}-{item.get('source', '')}" for item in run1_items} run2_signatures = {f"{item.get('name', '')}-{item.get('source', '')}" for item in run2_items} overlap = len(run1_signatures.intersection(run2_signatures)) total_unique = len(run1_signatures.union(run2_signatures)) print(f"Content overlap between first two runs:") print(f" Common items: {overlap}") print(f" Total unique items: {total_unique}") print(f" Overlap percentage: {(overlap / total_unique * 100):.1f}%") def _analyze_duplicate_detection(self, baseline_items, current_items, new_items, task_description): """Analyze the accuracy of duplicate detection.""" print(f"\n--- DUPLICATE DETECTION ANALYSIS ---") # Calculate expected vs actual duplicates baseline_signatures = {f"{item.get('name', '')}-{item.get('source', '')}" for item in baseline_items} current_signatures = {f"{item.get('name', '')}-{item.get('source', '')}" for item in current_items} new_signatures = {f"{item.get('name', '')}-{item.get('source', '')}" for item in new_items} # Items that should be duplicates (exist in both baseline and current) expected_duplicates = baseline_signatures.intersection(current_signatures) # Items that were marked as new but exist in baseline (false positives) false_positives = new_signatures.intersection(baseline_signatures) # Items that exist in current but not baseline and not marked as new (false negatives) truly_new_signatures = current_signatures - baseline_signatures false_negatives = truly_new_signatures - new_signatures print(f"Expected duplicates (items in both runs): {len(expected_duplicates)}") print(f"Items marked as NEW: {len(new_signatures)}") print(f"False positives (new items that are actually duplicates): {len(false_positives)}") print(f"False negatives (truly new items not detected): {len(false_negatives)}") if len(expected_duplicates) > 0: accuracy = 1 - (len(false_positives) + len(false_negatives)) / len(expected_duplicates) print(f"Detection accuracy: {accuracy * 100:.1f}%") # Show examples of detection results if false_positives: print(f"\nFalse positives (incorrectly marked as new):") for fp in list(false_positives)[:3]: print(f" - {fp}") if false_negatives: print(f"\nFalse negatives (missed new items):") for fn in list(false_negatives)[:3]: print(f" - {fn}") if new_items and not false_positives: print(f"\nCorrectly identified new items:") for item in new_items[:3]: print(f" - {item.get('name', 'Unknown')} at {item.get('source', 'Unknown')}") def _test_variation_detection(self, baseline_items, task_description): """Test detection of slight variations in item formatting.""" print(f"\n--- VARIATION DETECTION TEST ---") if not baseline_items: print("No baseline items to test variations") return # Create artificial variations of existing items varied_items = [] original_item = baseline_items[0].copy() # Test variations variations = [ # Name variations {**original_item, 'name': original_item.get('name', '') + ' - Updated'}, {**original_item, 'name': original_item.get('name', '').replace(' ', '_')}, # Location variations {**original_item, 'location': original_item.get('location', '') + ', USA'}, {**original_item, 'location': original_item.get('location', '').replace(',', ' -')}, # URL variations {**original_item, 'url': original_item.get('url', '') + '?ref=test'}, {**original_item, 'url': original_item.get('url', '').replace('https://', 'http://')}, ] for i, variation in enumerate(variations, 1): # Update hash for the variation import hashlib essential_content = f"{variation.get('name', '')}-{variation.get('source', '')}-{variation.get('url', '')}" variation['hash'] = hashlib.md5(essential_content.encode()).hexdigest() varied_items.append(variation) # Test AI detection on variations new_items = self.search_engine.find_new_items(varied_items, baseline_items) print(f"Original item: {original_item.get('name', 'Unknown')}") print(f"Created {len(variations)} variations") print(f"AI detected {len(new_items)} as genuinely new") # This should ideally detect most variations as duplicates if len(new_items) < len(variations): print(f"✓ Good: AI correctly identified {len(variations) - len(new_items)} variations as duplicates") else: print(f"⚠ Warning: AI failed to detect variations as duplicates") def _calculate_quality_metrics(self, items, task_description): """Calculate various quality metrics for search results.""" if not items: return {'total_items': 0} total_items = len(items) items_with_urls = sum(1 for item in items if item.get('url')) items_with_descriptions = sum(1 for item in items if item.get('description')) items_with_locations = sum(1 for item in items if item.get('location')) sources = [item.get('source', 'Unknown') for item in items] unique_sources = len(set(sources)) # Source distribution from collections import Counter source_counts = Counter(sources) source_distribution = source_counts.most_common() # Text length analysis names = [item.get('name', '') for item in items if item.get('name')] descriptions = [item.get('description', '') for item in items if item.get('description')] avg_name_length = sum(len(name) for name in names) / len(names) if names else 0 avg_description_length = sum(len(desc) for desc in descriptions) / len(descriptions) if descriptions else 0 return { 'total_items': total_items, 'items_with_urls': items_with_urls, 'url_percentage': (items_with_urls / total_items * 100) if total_items > 0 else 0, 'items_with_descriptions': items_with_descriptions, 'description_percentage': (items_with_descriptions / total_items * 100) if total_items > 0 else 0, 'items_with_locations': items_with_locations, 'location_percentage': (items_with_locations / total_items * 100) if total_items > 0 else 0, 'unique_sources': unique_sources, 'source_distribution': source_distribution, 'avg_name_length': avg_name_length, 'avg_description_length': avg_description_length } def run_comprehensive_test_suite(self): """Run all quality tests with predefined scenarios.""" print("="*80) print("COMPREHENSIVE SEARCH QUALITY TEST SUITE") print("="*80) # Test scenarios test_queries = [ "AI Ethics and Safety openings fit for a PhD in Computer Science", "Latest iPhone 15 release information and reviews", "Remote software engineering jobs at startups" ] for i, query in enumerate(test_queries, 1): print(f"\n\n🔍 TEST SCENARIO {i}: {query}") print("="*80) try: # Test 1: Search quality metrics quality_metrics = self.test_search_quality_metrics(query) # Test 2: Duplicate detection accuracy detection_results = self.test_duplicate_detection_accuracy(query, simulate_variations=True) # Test 3: Consistency (only for first query to save time) if i == 1: consistency_results = self.test_search_consistency(query, num_runs=2, delay_seconds=10) print(f"\n✅ Completed test scenario {i}") except Exception as e: print(f"❌ Error in test scenario {i}: {e}") # Real-world scenarios print(f"\n\n🌍 REAL-WORLD SCENARIOS") print("="*80) self.test_real_world_scenarios() print(f"\n\n🎉 COMPREHENSIVE TEST SUITE COMPLETED") def main(): """Main function to run search quality tests.""" import argparse parser = argparse.ArgumentParser(description="Test search quality and duplicate detection") parser.add_argument("--comprehensive", action="store_true", help="Run comprehensive test suite") parser.add_argument("--consistency", action="store_true", help="Test search consistency") parser.add_argument("--duplicates", action="store_true", help="Test duplicate detection") parser.add_argument("--quality", action="store_true", help="Test search quality metrics") parser.add_argument("--query", type=str, help="Custom search query to test") parser.add_argument("--runs", type=int, default=3, help="Number of runs for consistency test") args = parser.parse_args() tester = SearchQualityTester() # Default query if none provided query = args.query or "AI Ethics and Safety openings fit for a PhD in Computer Science" if args.comprehensive: tester.run_comprehensive_test_suite() elif args.consistency: tester.test_search_consistency(query, num_runs=args.runs) elif args.duplicates: tester.test_duplicate_detection_accuracy(query) elif args.quality: tester.test_search_quality_metrics(query) else: # Default: run a quick quality check print("Running quick search quality test...") print("Use --comprehensive for full test suite") tester.test_search_quality_metrics(query) tester.test_duplicate_detection_accuracy(query, simulate_variations=False) if __name__ == "__main__": main()

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/fortnightly-devs/mcp-x402-task-scheduler'

If you have feedback or need assistance with the MCP directory API, please join our Discord server