Skip to main content
Glama
search_analytics.py14 kB
""" Search analytics module for tracking and analyzing search patterns """ import sqlite3 from datetime import datetime, timedelta from collections import Counter, defaultdict from typing import Dict, List, Tuple import json class SearchAnalytics: def __init__(self, db_path: str = "unified_docs_analytics.db"): self.conn = sqlite3.connect(db_path) self.init_database() def init_database(self): """Initialize analytics database tables""" # Search queries log self.conn.execute(""" CREATE TABLE IF NOT EXISTS search_queries ( id INTEGER PRIMARY KEY AUTOINCREMENT, query TEXT NOT NULL, results_count INTEGER DEFAULT 0, clicked_results TEXT, -- JSON array of clicked result IDs search_time REAL, -- Time taken for search in seconds timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP ) """) # Popular searches cache self.conn.execute(""" CREATE TABLE IF NOT EXISTS popular_searches ( id INTEGER PRIMARY KEY AUTOINCREMENT, query TEXT UNIQUE NOT NULL, search_count INTEGER DEFAULT 1, last_searched TIMESTAMP DEFAULT CURRENT_TIMESTAMP, avg_results_count REAL DEFAULT 0, avg_search_time REAL DEFAULT 0 ) """) # Search categories tracking self.conn.execute(""" CREATE TABLE IF NOT EXISTS search_categories ( id INTEGER PRIMARY KEY AUTOINCREMENT, category TEXT UNIQUE NOT NULL, search_count INTEGER DEFAULT 0, last_updated TIMESTAMP DEFAULT CURRENT_TIMESTAMP ) """) # Missing documentation tracking self.conn.execute(""" CREATE TABLE IF NOT EXISTS missing_docs ( id INTEGER PRIMARY KEY AUTOINCREMENT, topic TEXT NOT NULL, query TEXT NOT NULL, request_count INTEGER DEFAULT 1, first_requested TIMESTAMP DEFAULT CURRENT_TIMESTAMP, last_requested TIMESTAMP DEFAULT CURRENT_TIMESTAMP ) """) self.conn.commit() def log_search(self, query: str, results_count: int, search_time: float, clicked_results: List[str] = None): """Log a search query and its results""" # Log the individual search self.conn.execute(""" INSERT INTO search_queries (query, results_count, clicked_results, search_time) VALUES (?, ?, ?, ?) """, (query, results_count, json.dumps(clicked_results or []), search_time)) # Update popular searches cursor = self.conn.execute(""" SELECT search_count, avg_results_count, avg_search_time FROM popular_searches WHERE query = ? """, (query,)) row = cursor.fetchone() if row: # Update existing entry count, avg_results, avg_time = row new_count = count + 1 new_avg_results = (avg_results * count + results_count) / new_count new_avg_time = (avg_time * count + search_time) / new_count self.conn.execute(""" UPDATE popular_searches SET search_count = ?, avg_results_count = ?, avg_search_time = ?, last_searched = CURRENT_TIMESTAMP WHERE query = ? """, (new_count, new_avg_results, new_avg_time, query)) else: # Insert new entry self.conn.execute(""" INSERT INTO popular_searches (query, search_count, avg_results_count, avg_search_time) VALUES (?, 1, ?, ?) """, (query, results_count, search_time)) # Track categories from query self._track_categories(query) # Track if no results found if results_count == 0: self._track_missing_docs(query) self.conn.commit() def _track_categories(self, query: str): """Extract and track categories from search query""" # Category keywords mapping category_keywords = { 'AI/ML': ['ai', 'ml', 'machine learning', 'deep learning', 'neural', 'tensorflow', 'pytorch'], 'Web Development': ['react', 'vue', 'angular', 'frontend', 'backend', 'web', 'javascript', 'css'], 'Mobile Development': ['ios', 'android', 'mobile', 'react native', 'flutter', 'swift'], 'Databases': ['sql', 'database', 'mongodb', 'postgresql', 'redis', 'mysql'], 'Cloud/DevOps': ['docker', 'kubernetes', 'aws', 'azure', 'cloud', 'devops', 'ci/cd'], 'Security': ['security', 'auth', 'encryption', 'owasp', 'vulnerability'], 'Testing': ['test', 'testing', 'jest', 'pytest', 'unit test', 'e2e'], 'API Development': ['api', 'rest', 'graphql', 'grpc', 'endpoint'] } query_lower = query.lower() detected_categories = [] for category, keywords in category_keywords.items(): if any(keyword in query_lower for keyword in keywords): detected_categories.append(category) # Update category counts for category in detected_categories: cursor = self.conn.execute(""" SELECT search_count FROM search_categories WHERE category = ? """, (category,)) row = cursor.fetchone() if row: self.conn.execute(""" UPDATE search_categories SET search_count = search_count + 1, last_updated = CURRENT_TIMESTAMP WHERE category = ? """, (category,)) else: self.conn.execute(""" INSERT INTO search_categories (category, search_count) VALUES (?, 1) """, (category,)) def _track_missing_docs(self, query: str): """Track queries with no results as missing documentation""" # Try to extract topic from query topic = self._extract_topic(query) cursor = self.conn.execute(""" SELECT request_count FROM missing_docs WHERE topic = ? AND query = ? """, (topic, query)) row = cursor.fetchone() if row: self.conn.execute(""" UPDATE missing_docs SET request_count = request_count + 1, last_requested = CURRENT_TIMESTAMP WHERE topic = ? AND query = ? """, (topic, query)) else: self.conn.execute(""" INSERT INTO missing_docs (topic, query) VALUES (?, ?) """, (topic, query)) def _extract_topic(self, query: str) -> str: """Extract main topic from query""" # Simple topic extraction - take first significant word stop_words = {'how', 'to', 'what', 'where', 'when', 'why', 'is', 'are', 'the', 'a', 'an'} words = query.lower().split() for word in words: if word not in stop_words and len(word) > 2: return word return query.split()[0] if query.split() else 'unknown' def get_popular_searches(self, limit: int = 20, days: int = 30) -> List[Dict]: """Get most popular searches in the last N days""" since_date = (datetime.now() - timedelta(days=days)).isoformat() cursor = self.conn.execute(""" SELECT query, search_count, avg_results_count, avg_search_time FROM popular_searches WHERE last_searched >= ? ORDER BY search_count DESC LIMIT ? """, (since_date, limit)) return [ { 'query': row[0], 'count': row[1], 'avg_results': round(row[2], 1), 'avg_time': round(row[3], 3) } for row in cursor ] def get_trending_categories(self, limit: int = 10) -> List[Dict]: """Get trending search categories""" cursor = self.conn.execute(""" SELECT category, search_count FROM search_categories ORDER BY search_count DESC LIMIT ? """, (limit,)) return [ {'category': row[0], 'count': row[1]} for row in cursor ] def get_missing_docs_report(self, min_requests: int = 3) -> List[Dict]: """Get report of frequently searched but missing documentation""" cursor = self.conn.execute(""" SELECT topic, query, request_count, first_requested, last_requested FROM missing_docs WHERE request_count >= ? ORDER BY request_count DESC """, (min_requests,)) return [ { 'topic': row[0], 'query': row[1], 'requests': row[2], 'first_requested': row[3], 'last_requested': row[4] } for row in cursor ] def get_search_performance_stats(self) -> Dict: """Get overall search performance statistics""" # Total searches total_searches = self.conn.execute( "SELECT COUNT(*) FROM search_queries" ).fetchone()[0] # Average search time avg_time = self.conn.execute( "SELECT AVG(search_time) FROM search_queries" ).fetchone()[0] or 0 # Searches with results with_results = self.conn.execute( "SELECT COUNT(*) FROM search_queries WHERE results_count > 0" ).fetchone()[0] # Average results per search avg_results = self.conn.execute( "SELECT AVG(results_count) FROM search_queries" ).fetchone()[0] or 0 return { 'total_searches': total_searches, 'avg_search_time': round(avg_time, 3), 'success_rate': round(with_results / total_searches * 100, 1) if total_searches > 0 else 0, 'avg_results_per_search': round(avg_results, 1) } def generate_expansion_recommendations(self) -> List[Dict]: """Generate recommendations for documentation expansion based on analytics""" recommendations = [] # Get missing docs missing = self.get_missing_docs_report(min_requests=5) if missing: recommendations.append({ 'type': 'missing_documentation', 'priority': 'high', 'description': 'Frequently searched topics with no results', 'items': [m['topic'] for m in missing[:5]] }) # Get trending categories trending = self.get_trending_categories(limit=5) recommendations.append({ 'type': 'trending_categories', 'priority': 'medium', 'description': 'Most searched categories - consider expanding', 'items': trending }) # Get low-result searches cursor = self.conn.execute(""" SELECT query, avg_results_count FROM popular_searches WHERE avg_results_count < 5 AND search_count > 5 ORDER BY search_count DESC LIMIT 10 """) low_results = [{'query': row[0], 'avg_results': row[1]} for row in cursor] if low_results: recommendations.append({ 'type': 'low_result_queries', 'priority': 'medium', 'description': 'Popular searches with few results', 'items': low_results }) return recommendations # CLI interface if __name__ == "__main__": import sys analytics = SearchAnalytics() if len(sys.argv) < 2: print("Search Analytics Dashboard") print("=" * 60) # Performance stats stats = analytics.get_search_performance_stats() print(f"\nPerformance Stats:") print(f" Total Searches: {stats['total_searches']}") print(f" Success Rate: {stats['success_rate']}%") print(f" Avg Search Time: {stats['avg_search_time']}s") print(f" Avg Results: {stats['avg_results_per_search']}") # Popular searches print(f"\nTop 10 Popular Searches:") for i, search in enumerate(analytics.get_popular_searches(10), 1): print(f" {i}. '{search['query']}' ({search['count']} times)") # Trending categories print(f"\nTrending Categories:") for cat in analytics.get_trending_categories(5): print(f" • {cat['category']}: {cat['count']} searches") # Recommendations print(f"\nExpansion Recommendations:") for rec in analytics.generate_expansion_recommendations(): print(f"\n [{rec['priority'].upper()}] {rec['description']}:") if rec['type'] == 'trending_categories': for item in rec['items']: print(f" - {item['category']} ({item['count']} searches)") else: for item in rec['items'][:5]: if isinstance(item, dict): print(f" - {item}") else: print(f" - {item}") else: # Log a search from command line query = ' '.join(sys.argv[1:]) analytics.log_search(query, 10, 0.123) # Mock data print(f"Logged search: '{query}'")

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/boodrow/MCP-Server-unified-docs-hub'

If you have feedback or need assistance with the MCP directory API, please join our Discord server