Skip to main content
Glama

Snowflake MCP Agent System

query_utils.py8.02 kB
""" Query utility functions for compatibility with existing server functionality. Provides basic query analysis capabilities. """ import json from typing import List, Dict, Any def explode_usage(rows_dict: List[Dict[str, Any]]) -> List[Dict[str, Any]]: """ Explode usage data from SQL analyzer results. Args: rows_dict: List of dictionaries from AAI_SQL_ANALYZER query results Returns: List of exploded usage records """ exploded = [] for row in rows_dict: # Extract basic information base_record = { 'query_id': row.get('query_id'), 'user_id': row.get('uid', row.get('user_id')), 'total_elapsed_time': row.get('total_elapsed_time', 0), 'rows_produced': row.get('rows_produced', 0), 'bytes_scanned': row.get('bytes_scanned', 0), 'warehouse_name': row.get('warehouse_name'), 'query_text': row.get('query_text', ''), 'start_time': row.get('start_time') } # Handle direct_objects array direct_objects = row.get('direct_objects', []) if direct_objects and isinstance(direct_objects, (list, tuple)): for obj in direct_objects: record = base_record.copy() record['object_name'] = obj record['object_type'] = 'direct' exploded.append(record) # Handle base_objects array base_objects = row.get('base_objects', []) if base_objects and isinstance(base_objects, (list, tuple)): for obj in base_objects: record = base_record.copy() record['object_name'] = obj record['object_type'] = 'base' exploded.append(record) # Handle columns_accessed array columns_accessed = row.get('columns_accessed', []) if columns_accessed and isinstance(columns_accessed, (list, tuple)): for col in columns_accessed: record = base_record.copy() record['column_name'] = col record['access_type'] = 'column' exploded.append(record) # If no arrays to explode, add the base record if not any([direct_objects, base_objects, columns_accessed]): exploded.append(base_record) return exploded def compact_usage(usage_rows: List[Dict[str, Any]]) -> Dict[str, Any]: """ Compact usage data into summary statistics. Args: usage_rows: List of exploded usage records Returns: Dictionary with usage summary statistics """ if not usage_rows: return { 'total_queries': 0, 'unique_users': 0, 'unique_objects': 0, 'total_elapsed_time': 0, 'total_bytes_scanned': 0, 'avg_elapsed_time': 0, 'top_users': [], 'top_objects': [], 'performance_summary': {} } # Calculate basic statistics total_queries = len(set(r.get('query_id') for r in usage_rows if r.get('query_id'))) unique_users = len(set(r.get('user_id') for r in usage_rows if r.get('user_id'))) unique_objects = len(set(r.get('object_name') for r in usage_rows if r.get('object_name'))) total_elapsed_time = sum(r.get('total_elapsed_time', 0) for r in usage_rows) total_bytes_scanned = sum(r.get('bytes_scanned', 0) for r in usage_rows) avg_elapsed_time = total_elapsed_time / total_queries if total_queries > 0 else 0 # Top users by query count user_counts = {} user_time = {} for row in usage_rows: user_id = row.get('user_id') if user_id: user_counts[user_id] = user_counts.get(user_id, 0) + 1 user_time[user_id] = user_time.get(user_id, 0) + row.get('total_elapsed_time', 0) top_users = sorted( [ { 'user_id': user, 'query_count': count, 'total_time': user_time.get(user, 0), 'avg_time': user_time.get(user, 0) / count if count > 0 else 0 } for user, count in user_counts.items() ], key=lambda x: x['query_count'], reverse=True )[:10] # Top objects by access count object_counts = {} for row in usage_rows: obj_name = row.get('object_name') if obj_name: object_counts[obj_name] = object_counts.get(obj_name, 0) + 1 top_objects = sorted( [ {'object_name': obj, 'access_count': count} for obj, count in object_counts.items() ], key=lambda x: x['access_count'], reverse=True )[:10] # Performance summary elapsed_times = [r.get('total_elapsed_time', 0) for r in usage_rows if r.get('total_elapsed_time')] bytes_scanned = [r.get('bytes_scanned', 0) for r in usage_rows if r.get('bytes_scanned')] performance_summary = { 'min_elapsed_time': min(elapsed_times) if elapsed_times else 0, 'max_elapsed_time': max(elapsed_times) if elapsed_times else 0, 'p95_elapsed_time': sorted(elapsed_times)[int(len(elapsed_times) * 0.95)] if elapsed_times else 0, 'min_bytes_scanned': min(bytes_scanned) if bytes_scanned else 0, 'max_bytes_scanned': max(bytes_scanned) if bytes_scanned else 0, 'avg_bytes_scanned': sum(bytes_scanned) / len(bytes_scanned) if bytes_scanned else 0 } return { 'total_queries': total_queries, 'unique_users': unique_users, 'unique_objects': unique_objects, 'total_elapsed_time': total_elapsed_time, 'total_bytes_scanned': total_bytes_scanned, 'avg_elapsed_time': avg_elapsed_time, 'top_users': top_users, 'top_objects': top_objects, 'performance_summary': performance_summary, 'analysis_timestamp': None # Could add current timestamp if needed } def analyze_query_patterns(usage_rows: List[Dict[str, Any]]) -> Dict[str, Any]: """ Analyze query patterns for optimization opportunities. Args: usage_rows: List of usage records Returns: Dictionary with pattern analysis results """ patterns = {} # Group by query text patterns (simplified) query_patterns = {} for row in usage_rows: query_text = row.get('query_text', '') if query_text: # Simple pattern detection (could be more sophisticated) if 'SELECT *' in query_text.upper(): pattern_key = 'select_star' elif 'JOIN' in query_text.upper(): pattern_key = 'joins' elif 'GROUP BY' in query_text.upper(): pattern_key = 'aggregation' elif 'ORDER BY' in query_text.upper(): pattern_key = 'sorting' else: pattern_key = 'other' if pattern_key not in query_patterns: query_patterns[pattern_key] = { 'count': 0, 'total_time': 0, 'avg_time': 0, 'examples': [] } query_patterns[pattern_key]['count'] += 1 query_patterns[pattern_key]['total_time'] += row.get('total_elapsed_time', 0) if len(query_patterns[pattern_key]['examples']) < 3: query_patterns[pattern_key]['examples'].append(query_text[:200]) # Calculate averages for pattern in query_patterns.values(): if pattern['count'] > 0: pattern['avg_time'] = pattern['total_time'] / pattern['count'] return { 'query_patterns': query_patterns, 'optimization_opportunities': [ {'pattern': 'select_star', 'recommendation': 'Replace SELECT * with specific columns'}, {'pattern': 'joins', 'recommendation': 'Consider query optimization and indexing'}, {'pattern': 'aggregation', 'recommendation': 'Consider materialized views for frequent aggregations'} ] }

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/silverknight404/mcp_code2'

If you have feedback or need assistance with the MCP directory API, please join our Discord server