monitor_query_test.pyโข11.9 kB
"""
Monitor Query Test - Track how search results change over time
"""
import json
import time
import os
from datetime import datetime
import sys
import os
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from src.core.search_engine_claude import SearchEngine
class QueryMonitor:
"""
Monitor a specific query over multiple iterations to track changes.
"""
def __init__(self, results_file="query_monitor_results.json"):
self.search_engine = SearchEngine()
self.results_file = results_file
self.load_previous_results()
def load_previous_results(self):
"""Load previous monitoring results from file."""
if os.path.exists(self.results_file):
try:
with open(self.results_file, 'r') as f:
self.history = json.load(f)
print(f"๐ Loaded {len(self.history)} previous monitoring sessions")
except:
self.history = []
print("๐ Starting fresh monitoring session")
else:
self.history = []
print("๐ No previous results found, starting fresh")
def save_results(self):
"""Save monitoring results to file."""
with open(self.results_file, 'w') as f:
json.dump(self.history, f, indent=2)
print(f"๐พ Results saved to {self.results_file}")
def monitor_query(self, query, iterations=5, delay_minutes=2):
"""
Monitor a query over multiple iterations.
Args:
query: Search query to monitor
iterations: Number of monitoring iterations
delay_minutes: Minutes to wait between iterations
"""
print(f"๐ QUERY MONITORING TEST")
print("=" * 80)
print(f"Query: {query}")
print(f"Iterations: {iterations}")
print(f"Delay: {delay_minutes} minutes between checks")
print("=" * 80)
session = {
'query': query,
'started_at': datetime.now().isoformat(),
'iterations': [],
'analysis': {}
}
all_items_seen = [] # Track all unique items across iterations
for iteration in range(1, iterations + 1):
print(f"\n๐ ITERATION {iteration}/{iterations}")
print(f"Time: {datetime.now().strftime('%H:%M:%S')}")
print("-" * 50)
# Execute search
start_time = time.time()
raw_response = self.search_engine.execute_search_task(query)
current_items = self.search_engine.parse_search_results(raw_response)
execution_time = time.time() - start_time
# Find new items compared to all previous iterations
new_items = self.search_engine.find_new_items(current_items, all_items_seen)
# Update running list of all items seen
all_items_seen.extend(new_items)
# Record iteration data
iteration_data = {
'iteration': iteration,
'timestamp': datetime.now().isoformat(),
'execution_time': execution_time,
'total_items': len(current_items),
'new_items_count': len(new_items),
'items': current_items,
'new_items': new_items
}
session['iterations'].append(iteration_data)
# Display results
print(f"โ Found {len(current_items)} total items")
print(f"โ {len(new_items)} genuinely NEW items")
print(f"โ Execution time: {execution_time:.2f}s")
print(f"โ Total unique items seen so far: {len(all_items_seen)}")
# Show new items if any
if new_items:
print(f"\n๐ New items this iteration:")
for i, item in enumerate(new_items[:3], 1):
name = item.get('name', 'Unknown')[:50]
source = item.get('source', 'Unknown')[:20]
print(f" {i}. {name} (from {source})")
if len(new_items) > 3:
print(f" ... and {len(new_items) - 3} more")
else:
print("๐ No new items found (all were previously seen)")
# Wait before next iteration (except for last one)
if iteration < iterations:
wait_seconds = delay_minutes * 60
print(f"\nโณ Waiting {delay_minutes} minutes until next check...")
# Show countdown for first few seconds
for remaining in range(min(10, wait_seconds), 0, -1):
print(f"\r Next check in {remaining} seconds...", end="", flush=True)
time.sleep(1)
# Sleep for remaining time
if wait_seconds > 10:
time.sleep(wait_seconds - 10)
print("\r Starting next iteration... ")
# Analyze the session
session['analysis'] = self._analyze_session(session)
session['completed_at'] = datetime.now().isoformat()
# Add to history and save
self.history.append(session)
self.save_results()
# Display analysis
self._display_session_analysis(session)
return session
def _analyze_session(self, session):
"""Analyze a completed monitoring session."""
iterations = session['iterations']
if not iterations:
return {}
# Basic stats
total_items_per_iteration = [it['total_items'] for it in iterations]
new_items_per_iteration = [it['new_items_count'] for it in iterations]
execution_times = [it['execution_time'] for it in iterations]
# Calculate discovery rate (how many new items found over time)
cumulative_new_items = []
running_total = 0
for count in new_items_per_iteration:
running_total += count
cumulative_new_items.append(running_total)
analysis = {
'total_iterations': len(iterations),
'total_unique_items_discovered': cumulative_new_items[-1] if cumulative_new_items else 0,
'avg_items_per_iteration': sum(total_items_per_iteration) / len(total_items_per_iteration),
'avg_new_items_per_iteration': sum(new_items_per_iteration) / len(new_items_per_iteration),
'avg_execution_time': sum(execution_times) / len(execution_times),
'discovery_pattern': new_items_per_iteration,
'discovery_trend': 'decreasing' if len(new_items_per_iteration) > 1 and new_items_per_iteration[-1] < new_items_per_iteration[0] else 'stable',
'performance_consistency': max(execution_times) - min(execution_times),
'items_per_iteration': total_items_per_iteration
}
return analysis
def _display_session_analysis(self, session):
"""Display analysis of the monitoring session."""
analysis = session['analysis']
print(f"\n๐ SESSION ANALYSIS")
print("=" * 80)
print(f"๐ฏ Discovery Summary:")
print(f" โข Total unique items discovered: {analysis['total_unique_items_discovered']}")
print(f" โข Average items per iteration: {analysis['avg_items_per_iteration']:.1f}")
print(f" โข Average new items per iteration: {analysis['avg_new_items_per_iteration']:.1f}")
print(f" โข Average execution time: {analysis['avg_execution_time']:.2f} seconds")
print(f"\n๐ Discovery Pattern:")
for i, count in enumerate(analysis['discovery_pattern'], 1):
bar = "โ" * min(count, 20) # Visual bar chart
print(f" Iteration {i}: {count:2d} new items {bar}")
print(f"\n๐ Trends:")
trend = analysis['discovery_trend']
if trend == 'decreasing':
print(" โ Discovery rate decreasing (good - finding fewer duplicates over time)")
else:
print(" โ Discovery rate stable/increasing (may indicate dynamic content)")
performance_var = analysis['performance_consistency']
if performance_var < 5:
print(" โ Performance consistent (variation < 5 seconds)")
else:
print(f" โ Performance variable (variation: {performance_var:.1f} seconds)")
# Quality assessment
print(f"\n๐ Quality Assessment:")
score = 0
max_score = 4
if analysis['total_unique_items_discovered'] >= 10:
score += 1
print(" โ
Discovery volume: Good (10+ unique items)")
else:
print(" โ ๏ธ Discovery volume: Low (<10 unique items)")
if analysis['avg_new_items_per_iteration'] <= 3 and len(analysis['discovery_pattern']) > 1:
score += 1
print(" โ
Duplicate detection: Working well (low new item rate after first iteration)")
else:
print(" โ ๏ธ Duplicate detection: May need improvement")
if analysis['avg_execution_time'] <= 15:
score += 1
print(" โ
Performance: Good (<15 seconds average)")
else:
print(" โ ๏ธ Performance: Slow (>15 seconds average)")
if performance_var < 10:
score += 1
print(" โ
Consistency: Good (low variation)")
else:
print(" โ ๏ธ Consistency: Variable performance")
print(f"\n๐ฏ MONITORING SCORE: {score}/{max_score}")
def compare_sessions(self, query=None):
"""Compare multiple monitoring sessions for the same query."""
if not self.history:
print("No previous sessions to compare")
return
if query:
sessions = [s for s in self.history if s['query'] == query]
if not sessions:
print(f"No sessions found for query: {query}")
return
else:
sessions = self.history
print(f"\n๐ SESSION COMPARISON")
print("=" * 80)
print(f"Comparing {len(sessions)} sessions")
for i, session in enumerate(sessions, 1):
analysis = session.get('analysis', {})
start_time = session.get('started_at', 'Unknown')[:19] # Remove microseconds
print(f"\nSession {i} ({start_time}):")
print(f" Query: {session['query'][:50]}...")
print(f" Unique items: {analysis.get('total_unique_items_discovered', 0)}")
print(f" Avg time: {analysis.get('avg_execution_time', 0):.1f}s")
print(f" Iterations: {analysis.get('total_iterations', 0)}")
def main():
"""Main function for query monitoring."""
import argparse
parser = argparse.ArgumentParser(description="Monitor search query over time")
parser.add_argument("--query", type=str, help="Search query to monitor")
parser.add_argument("--iterations", type=int, default=3, help="Number of iterations")
parser.add_argument("--delay", type=int, default=1, help="Minutes between iterations")
parser.add_argument("--compare", action="store_true", help="Compare previous sessions")
args = parser.parse_args()
monitor = QueryMonitor()
if args.compare:
monitor.compare_sessions(args.query)
else:
query = args.query or "AI Ethics and Safety openings fit for a PhD in Computer Science"
monitor.monitor_query(query, iterations=args.iterations, delay_minutes=args.delay)
if __name__ == "__main__":
main()