query_utils.py•8.02 kB
"""
Query utility functions for compatibility with existing server functionality.
Provides basic query analysis capabilities.
"""
import json
from typing import List, Dict, Any
def explode_usage(rows_dict: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""
Explode usage data from SQL analyzer results.
Args:
rows_dict: List of dictionaries from AAI_SQL_ANALYZER query results
Returns:
List of exploded usage records
"""
exploded = []
for row in rows_dict:
# Extract basic information
base_record = {
'query_id': row.get('query_id'),
'user_id': row.get('uid', row.get('user_id')),
'total_elapsed_time': row.get('total_elapsed_time', 0),
'rows_produced': row.get('rows_produced', 0),
'bytes_scanned': row.get('bytes_scanned', 0),
'warehouse_name': row.get('warehouse_name'),
'query_text': row.get('query_text', ''),
'start_time': row.get('start_time')
}
# Handle direct_objects array
direct_objects = row.get('direct_objects', [])
if direct_objects and isinstance(direct_objects, (list, tuple)):
for obj in direct_objects:
record = base_record.copy()
record['object_name'] = obj
record['object_type'] = 'direct'
exploded.append(record)
# Handle base_objects array
base_objects = row.get('base_objects', [])
if base_objects and isinstance(base_objects, (list, tuple)):
for obj in base_objects:
record = base_record.copy()
record['object_name'] = obj
record['object_type'] = 'base'
exploded.append(record)
# Handle columns_accessed array
columns_accessed = row.get('columns_accessed', [])
if columns_accessed and isinstance(columns_accessed, (list, tuple)):
for col in columns_accessed:
record = base_record.copy()
record['column_name'] = col
record['access_type'] = 'column'
exploded.append(record)
# If no arrays to explode, add the base record
if not any([direct_objects, base_objects, columns_accessed]):
exploded.append(base_record)
return exploded
def compact_usage(usage_rows: List[Dict[str, Any]]) -> Dict[str, Any]:
"""
Compact usage data into summary statistics.
Args:
usage_rows: List of exploded usage records
Returns:
Dictionary with usage summary statistics
"""
if not usage_rows:
return {
'total_queries': 0,
'unique_users': 0,
'unique_objects': 0,
'total_elapsed_time': 0,
'total_bytes_scanned': 0,
'avg_elapsed_time': 0,
'top_users': [],
'top_objects': [],
'performance_summary': {}
}
# Calculate basic statistics
total_queries = len(set(r.get('query_id') for r in usage_rows if r.get('query_id')))
unique_users = len(set(r.get('user_id') for r in usage_rows if r.get('user_id')))
unique_objects = len(set(r.get('object_name') for r in usage_rows if r.get('object_name')))
total_elapsed_time = sum(r.get('total_elapsed_time', 0) for r in usage_rows)
total_bytes_scanned = sum(r.get('bytes_scanned', 0) for r in usage_rows)
avg_elapsed_time = total_elapsed_time / total_queries if total_queries > 0 else 0
# Top users by query count
user_counts = {}
user_time = {}
for row in usage_rows:
user_id = row.get('user_id')
if user_id:
user_counts[user_id] = user_counts.get(user_id, 0) + 1
user_time[user_id] = user_time.get(user_id, 0) + row.get('total_elapsed_time', 0)
top_users = sorted(
[
{
'user_id': user,
'query_count': count,
'total_time': user_time.get(user, 0),
'avg_time': user_time.get(user, 0) / count if count > 0 else 0
}
for user, count in user_counts.items()
],
key=lambda x: x['query_count'],
reverse=True
)[:10]
# Top objects by access count
object_counts = {}
for row in usage_rows:
obj_name = row.get('object_name')
if obj_name:
object_counts[obj_name] = object_counts.get(obj_name, 0) + 1
top_objects = sorted(
[
{'object_name': obj, 'access_count': count}
for obj, count in object_counts.items()
],
key=lambda x: x['access_count'],
reverse=True
)[:10]
# Performance summary
elapsed_times = [r.get('total_elapsed_time', 0) for r in usage_rows if r.get('total_elapsed_time')]
bytes_scanned = [r.get('bytes_scanned', 0) for r in usage_rows if r.get('bytes_scanned')]
performance_summary = {
'min_elapsed_time': min(elapsed_times) if elapsed_times else 0,
'max_elapsed_time': max(elapsed_times) if elapsed_times else 0,
'p95_elapsed_time': sorted(elapsed_times)[int(len(elapsed_times) * 0.95)] if elapsed_times else 0,
'min_bytes_scanned': min(bytes_scanned) if bytes_scanned else 0,
'max_bytes_scanned': max(bytes_scanned) if bytes_scanned else 0,
'avg_bytes_scanned': sum(bytes_scanned) / len(bytes_scanned) if bytes_scanned else 0
}
return {
'total_queries': total_queries,
'unique_users': unique_users,
'unique_objects': unique_objects,
'total_elapsed_time': total_elapsed_time,
'total_bytes_scanned': total_bytes_scanned,
'avg_elapsed_time': avg_elapsed_time,
'top_users': top_users,
'top_objects': top_objects,
'performance_summary': performance_summary,
'analysis_timestamp': None # Could add current timestamp if needed
}
def analyze_query_patterns(usage_rows: List[Dict[str, Any]]) -> Dict[str, Any]:
"""
Analyze query patterns for optimization opportunities.
Args:
usage_rows: List of usage records
Returns:
Dictionary with pattern analysis results
"""
patterns = {}
# Group by query text patterns (simplified)
query_patterns = {}
for row in usage_rows:
query_text = row.get('query_text', '')
if query_text:
# Simple pattern detection (could be more sophisticated)
if 'SELECT *' in query_text.upper():
pattern_key = 'select_star'
elif 'JOIN' in query_text.upper():
pattern_key = 'joins'
elif 'GROUP BY' in query_text.upper():
pattern_key = 'aggregation'
elif 'ORDER BY' in query_text.upper():
pattern_key = 'sorting'
else:
pattern_key = 'other'
if pattern_key not in query_patterns:
query_patterns[pattern_key] = {
'count': 0,
'total_time': 0,
'avg_time': 0,
'examples': []
}
query_patterns[pattern_key]['count'] += 1
query_patterns[pattern_key]['total_time'] += row.get('total_elapsed_time', 0)
if len(query_patterns[pattern_key]['examples']) < 3:
query_patterns[pattern_key]['examples'].append(query_text[:200])
# Calculate averages
for pattern in query_patterns.values():
if pattern['count'] > 0:
pattern['avg_time'] = pattern['total_time'] / pattern['count']
return {
'query_patterns': query_patterns,
'optimization_opportunities': [
{'pattern': 'select_star', 'recommendation': 'Replace SELECT * with specific columns'},
{'pattern': 'joins', 'recommendation': 'Consider query optimization and indexing'},
{'pattern': 'aggregation', 'recommendation': 'Consider materialized views for frequent aggregations'}
]
}