Skip to main content
Glama

TimeLooker MCP Server

monitor_query_test.pyโ€ข11.9 kB
""" Monitor Query Test - Track how search results change over time """ import json import time import os from datetime import datetime import sys import os sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) from src.core.search_engine_claude import SearchEngine class QueryMonitor: """ Monitor a specific query over multiple iterations to track changes. """ def __init__(self, results_file="query_monitor_results.json"): self.search_engine = SearchEngine() self.results_file = results_file self.load_previous_results() def load_previous_results(self): """Load previous monitoring results from file.""" if os.path.exists(self.results_file): try: with open(self.results_file, 'r') as f: self.history = json.load(f) print(f"๐Ÿ“š Loaded {len(self.history)} previous monitoring sessions") except: self.history = [] print("๐Ÿ“š Starting fresh monitoring session") else: self.history = [] print("๐Ÿ“š No previous results found, starting fresh") def save_results(self): """Save monitoring results to file.""" with open(self.results_file, 'w') as f: json.dump(self.history, f, indent=2) print(f"๐Ÿ’พ Results saved to {self.results_file}") def monitor_query(self, query, iterations=5, delay_minutes=2): """ Monitor a query over multiple iterations. Args: query: Search query to monitor iterations: Number of monitoring iterations delay_minutes: Minutes to wait between iterations """ print(f"๐Ÿ” QUERY MONITORING TEST") print("=" * 80) print(f"Query: {query}") print(f"Iterations: {iterations}") print(f"Delay: {delay_minutes} minutes between checks") print("=" * 80) session = { 'query': query, 'started_at': datetime.now().isoformat(), 'iterations': [], 'analysis': {} } all_items_seen = [] # Track all unique items across iterations for iteration in range(1, iterations + 1): print(f"\n๐Ÿ”„ ITERATION {iteration}/{iterations}") print(f"Time: {datetime.now().strftime('%H:%M:%S')}") print("-" * 50) # Execute search start_time = time.time() raw_response = self.search_engine.execute_search_task(query) current_items = self.search_engine.parse_search_results(raw_response) execution_time = time.time() - start_time # Find new items compared to all previous iterations new_items = self.search_engine.find_new_items(current_items, all_items_seen) # Update running list of all items seen all_items_seen.extend(new_items) # Record iteration data iteration_data = { 'iteration': iteration, 'timestamp': datetime.now().isoformat(), 'execution_time': execution_time, 'total_items': len(current_items), 'new_items_count': len(new_items), 'items': current_items, 'new_items': new_items } session['iterations'].append(iteration_data) # Display results print(f"โœ“ Found {len(current_items)} total items") print(f"โœ“ {len(new_items)} genuinely NEW items") print(f"โœ“ Execution time: {execution_time:.2f}s") print(f"โœ“ Total unique items seen so far: {len(all_items_seen)}") # Show new items if any if new_items: print(f"\n๐Ÿ†• New items this iteration:") for i, item in enumerate(new_items[:3], 1): name = item.get('name', 'Unknown')[:50] source = item.get('source', 'Unknown')[:20] print(f" {i}. {name} (from {source})") if len(new_items) > 3: print(f" ... and {len(new_items) - 3} more") else: print("๐Ÿ“‹ No new items found (all were previously seen)") # Wait before next iteration (except for last one) if iteration < iterations: wait_seconds = delay_minutes * 60 print(f"\nโณ Waiting {delay_minutes} minutes until next check...") # Show countdown for first few seconds for remaining in range(min(10, wait_seconds), 0, -1): print(f"\r Next check in {remaining} seconds...", end="", flush=True) time.sleep(1) # Sleep for remaining time if wait_seconds > 10: time.sleep(wait_seconds - 10) print("\r Starting next iteration... ") # Analyze the session session['analysis'] = self._analyze_session(session) session['completed_at'] = datetime.now().isoformat() # Add to history and save self.history.append(session) self.save_results() # Display analysis self._display_session_analysis(session) return session def _analyze_session(self, session): """Analyze a completed monitoring session.""" iterations = session['iterations'] if not iterations: return {} # Basic stats total_items_per_iteration = [it['total_items'] for it in iterations] new_items_per_iteration = [it['new_items_count'] for it in iterations] execution_times = [it['execution_time'] for it in iterations] # Calculate discovery rate (how many new items found over time) cumulative_new_items = [] running_total = 0 for count in new_items_per_iteration: running_total += count cumulative_new_items.append(running_total) analysis = { 'total_iterations': len(iterations), 'total_unique_items_discovered': cumulative_new_items[-1] if cumulative_new_items else 0, 'avg_items_per_iteration': sum(total_items_per_iteration) / len(total_items_per_iteration), 'avg_new_items_per_iteration': sum(new_items_per_iteration) / len(new_items_per_iteration), 'avg_execution_time': sum(execution_times) / len(execution_times), 'discovery_pattern': new_items_per_iteration, 'discovery_trend': 'decreasing' if len(new_items_per_iteration) > 1 and new_items_per_iteration[-1] < new_items_per_iteration[0] else 'stable', 'performance_consistency': max(execution_times) - min(execution_times), 'items_per_iteration': total_items_per_iteration } return analysis def _display_session_analysis(self, session): """Display analysis of the monitoring session.""" analysis = session['analysis'] print(f"\n๐Ÿ“Š SESSION ANALYSIS") print("=" * 80) print(f"๐ŸŽฏ Discovery Summary:") print(f" โ€ข Total unique items discovered: {analysis['total_unique_items_discovered']}") print(f" โ€ข Average items per iteration: {analysis['avg_items_per_iteration']:.1f}") print(f" โ€ข Average new items per iteration: {analysis['avg_new_items_per_iteration']:.1f}") print(f" โ€ข Average execution time: {analysis['avg_execution_time']:.2f} seconds") print(f"\n๐Ÿ“ˆ Discovery Pattern:") for i, count in enumerate(analysis['discovery_pattern'], 1): bar = "โ–ˆ" * min(count, 20) # Visual bar chart print(f" Iteration {i}: {count:2d} new items {bar}") print(f"\n๐Ÿ”„ Trends:") trend = analysis['discovery_trend'] if trend == 'decreasing': print(" โœ“ Discovery rate decreasing (good - finding fewer duplicates over time)") else: print(" โš  Discovery rate stable/increasing (may indicate dynamic content)") performance_var = analysis['performance_consistency'] if performance_var < 5: print(" โœ“ Performance consistent (variation < 5 seconds)") else: print(f" โš  Performance variable (variation: {performance_var:.1f} seconds)") # Quality assessment print(f"\n๐Ÿ† Quality Assessment:") score = 0 max_score = 4 if analysis['total_unique_items_discovered'] >= 10: score += 1 print(" โœ… Discovery volume: Good (10+ unique items)") else: print(" โš ๏ธ Discovery volume: Low (<10 unique items)") if analysis['avg_new_items_per_iteration'] <= 3 and len(analysis['discovery_pattern']) > 1: score += 1 print(" โœ… Duplicate detection: Working well (low new item rate after first iteration)") else: print(" โš ๏ธ Duplicate detection: May need improvement") if analysis['avg_execution_time'] <= 15: score += 1 print(" โœ… Performance: Good (<15 seconds average)") else: print(" โš ๏ธ Performance: Slow (>15 seconds average)") if performance_var < 10: score += 1 print(" โœ… Consistency: Good (low variation)") else: print(" โš ๏ธ Consistency: Variable performance") print(f"\n๐ŸŽฏ MONITORING SCORE: {score}/{max_score}") def compare_sessions(self, query=None): """Compare multiple monitoring sessions for the same query.""" if not self.history: print("No previous sessions to compare") return if query: sessions = [s for s in self.history if s['query'] == query] if not sessions: print(f"No sessions found for query: {query}") return else: sessions = self.history print(f"\n๐Ÿ“Š SESSION COMPARISON") print("=" * 80) print(f"Comparing {len(sessions)} sessions") for i, session in enumerate(sessions, 1): analysis = session.get('analysis', {}) start_time = session.get('started_at', 'Unknown')[:19] # Remove microseconds print(f"\nSession {i} ({start_time}):") print(f" Query: {session['query'][:50]}...") print(f" Unique items: {analysis.get('total_unique_items_discovered', 0)}") print(f" Avg time: {analysis.get('avg_execution_time', 0):.1f}s") print(f" Iterations: {analysis.get('total_iterations', 0)}") def main(): """Main function for query monitoring.""" import argparse parser = argparse.ArgumentParser(description="Monitor search query over time") parser.add_argument("--query", type=str, help="Search query to monitor") parser.add_argument("--iterations", type=int, default=3, help="Number of iterations") parser.add_argument("--delay", type=int, default=1, help="Minutes between iterations") parser.add_argument("--compare", action="store_true", help="Compare previous sessions") args = parser.parse_args() monitor = QueryMonitor() if args.compare: monitor.compare_sessions(args.query) else: query = args.query or "AI Ethics and Safety openings fit for a PhD in Computer Science" monitor.monitor_query(query, iterations=args.iterations, delay_minutes=args.delay) if __name__ == "__main__": main()

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/fortnightly-devs/mcp-x402-task-scheduler'

If you have feedback or need assistance with the MCP directory API, please join our Discord server