Skip to main content
Glama
formatting.py26 kB
from typing import List, Dict, Any, Optional, Set, Union, cast from typing_extensions import TypedDict class HealthCheckItem(TypedDict, total=False): check_name: str status: str value: Any unit: Optional[str] threshold: Optional[Union[int, float]] details: str class ParsedHealthCheckResult(TypedDict): overall_status: str checks: List[HealthCheckItem] recommendations: List[str] class QueryPerformanceMetrics(TypedDict, total=False): duration_sec: Optional[float] queue_time_sec: Optional[float] cpu_time_sec: Optional[float] compile_time_sec: Optional[float] cpu_skew: Optional[float] io_skew: Optional[float] scan_row_count: Optional[int] join_row_count: Optional[int] return_row_count: Optional[int] scan_blocks_read: Optional[int] spill_size_mb: Optional[int] result_cache_hit: Optional[bool] query_priority: Optional[str] query_type: Optional[str] wlm_service_class_id: Optional[int] wlm_service_class_name: Optional[str] class ParsedQueryPerformanceResult(TypedDict, total=False): query_id: int query_text: str overall_metrics: QueryPerformanceMetrics segment_metrics: List[Dict[str, Any]] slice_skew: List[Dict[str, Any]] explain_plan: List[Dict[str, Any]] historical_runs: List[Dict[str, Any]] potential_issues: List[Any] recommendations: List[str] class TableBasicInfo(TypedDict, total=False): size_mb: Optional[float] rows: Optional[int] dist_style: Optional[str] dist_key: Optional[str] sort_keys: Optional[Union[str, List[str]]] class TablePerformanceStats(TypedDict, total=False): scan_frequency: Optional[int] missing_stats_count: int stale_stats_count: int needs_vacuum: bool class ParsedInspectTableResult(TypedDict, total=False): table_identifier: str basic_info: TableBasicInfo column_details: List[Dict[str, Any]] ddl: Optional[str] performance_stats: TablePerformanceStats potential_issues: List[Dict[str, Any]] vacuum_history: List[Dict[str, Any]] recommendations: List[str] class WorkloadSummary(TypedDict, total=False): active_sessions: int queries_executing: int queries_queued: int avg_commit_wait_ms: Optional[float] disk_based_queries_recent: int class WorkloadTrends(TypedDict, total=False): apex: List[Dict[str, Any]] hourly_trend: List[Dict[str, Any]] class ParsedMonitorWorkloadResult(TypedDict, total=False): summary: WorkloadSummary wlm_state: List[Dict[str, Any]] queue_performance: List[Dict[str, Any]] top_active_queries: List[Dict[str, Any]] recent_copy_performance: List[Dict[str, Any]] workload_trends: WorkloadTrends potential_issues: List[str] recommendations: List[str] def _get_first_value( data_list: Optional[List[Dict[str, Any]]], key: str, default: Any = None ) -> Any: """Safely get the value of a key from the first dictionary in a list. Args: data_list: A list potentially containing dictionaries, or None. key: The dictionary key to retrieve the value for. default: The value to return if the list is empty, None, not a list, the first item is not a dict, or the key is not found. Returns: The value associated with the key in the first dictionary, or the default value. """ if ( data_list and isinstance(data_list, list) and len(data_list) > 0 and isinstance(data_list[0], dict) ): return data_list[0].get(key, default) return default def _get_list_value( data_dict: Optional[Dict[str, List[Dict[str, Any]]]], key: str, default: Optional[List[Dict[str, Any]]] = None, ) -> List[Dict[str, Any]]: """Safely get a list value from a dictionary, ensuring it's a list of dicts. Args: data_dict: A dictionary potentially containing lists, or None. key: The dictionary key whose value is expected to be a list. default: The default list to return if the key is not found, the value is not a list, or the input dict is invalid. Defaults to an empty list. Returns: The list associated with the key, or the default list. """ if default is None: default = [] if not data_dict or not isinstance(data_dict, dict): return default value = data_dict.get(key) return value if isinstance(value, list) else default def parse_health_check_results( raw_results_dict: Dict[str, Union[List[Dict[str, Any]], Exception]], ) -> ParsedHealthCheckResult: """Parses raw health check results from multiple scripts into the expected MCP tool output format. Accepts a dictionary where keys are script paths (e.g., 'health/cpu_monitor.sql') and values are either a list of result rows (dictionaries) or an Exception if the script execution failed. Generates recommendations based on specific check names and values exceeding thresholds or having non-standard details. The overall status is marked 'Warning' if any check indicates a warning state. Args: raw_results_dict: A dictionary mapping script paths to their results (list of dicts) or an Exception object. Returns: A dictionary containing the overall status ('OK' or 'Warning'), a list of parsed checks with their individual status and details, and a list of generated recommendations. """ parsed_checks: List[HealthCheckItem] = [] recommendations: List[str] = [] errors_encountered: List[str] = [] overall_warning: bool = False cpu_script_key = "health/cpu_monitor.sql" cpu_results_or_error = raw_results_dict.get(cpu_script_key) if isinstance(cpu_results_or_error, list): max_peak_cpu: float = 0.0 peak_node: Optional[int] = None avg_cpu_sum: float = 0.0 node_count: int = 0 for node_data in cpu_results_or_error: if isinstance(node_data, dict): node_count += 1 peak = node_data.get("peak_cpu_utilization_last_hour") avg = node_data.get("avg_cpu_utilization_last_hour") if isinstance(peak, (int, float)): if peak > max_peak_cpu: max_peak_cpu = float(peak) peak_node = node_data.get("node") if isinstance(avg, (int, float)): avg_cpu_sum += float(avg) cpu_threshold = 90.0 cpu_status = "OK" cpu_details = ( f"Highest peak CPU usage across nodes in the last hour: {max_peak_cpu:.2f}%" ) if peak_node is not None: cpu_details += f" (on node {peak_node})" if max_peak_cpu > cpu_threshold: cpu_status = "Warning" overall_warning = True recommendations.append( f"Peak CPU Utilization reached {max_peak_cpu:.2f}% (node {peak_node}). Check running queries or consider scaling." ) parsed_checks.append( HealthCheckItem( check_name="Peak CPU Utilization (Last Hour)", status=cpu_status, value=f"{max_peak_cpu:.2f}", unit="%", threshold=cpu_threshold, details=cpu_details, ) ) elif isinstance(cpu_results_or_error, Exception): errors_encountered.append( f"Failed to retrieve CPU utilization: {cpu_results_or_error}" ) parsed_checks.append( HealthCheckItem( check_name="Peak CPU Utilization (Last Hour)", status="Error", details=f"Could not retrieve data: {cpu_results_or_error}", ) ) overall_warning = True disk_script_key = "health/disk_usage.sql" disk_results_or_error = raw_results_dict.get(disk_script_key) disk_threshold = 85.0 if isinstance(disk_results_or_error, list): disk_usage_pct_str = _get_first_value( disk_results_or_error, "total_disk_usage_pct" ) disk_usage_pct: Optional[float] = None try: if disk_usage_pct_str is not None: disk_usage_pct = float(disk_usage_pct_str) except (ValueError, TypeError): disk_usage_pct = None if disk_usage_pct is not None: disk_status = "OK" disk_details = f"Total cluster disk usage: {disk_usage_pct:.2f}%" if disk_usage_pct > disk_threshold: disk_status = "Warning" overall_warning = True recommendations.append( f"Total Disk Usage is high ({disk_usage_pct:.2f}%). Consider vacuuming tables, archiving old data, or resizing the cluster." ) parsed_checks.append( HealthCheckItem( check_name="Total Disk Usage", status=disk_status, value=f"{disk_usage_pct:.2f}", unit="%", threshold=disk_threshold, details=disk_details, ) ) else: errors_encountered.append( f"Unexpected data format for {disk_script_key}: {disk_results_or_error}" ) parsed_checks.append( HealthCheckItem( check_name="Total Disk Usage", status="Error", details="Could not parse disk usage percentage.", ) ) overall_warning = True elif isinstance(disk_results_or_error, Exception): errors_encountered.append( f"Failed to retrieve disk usage: {disk_results_or_error}" ) parsed_checks.append( HealthCheckItem( check_name="Total Disk Usage", status="Error", details=f"Could not retrieve data: {disk_results_or_error}", ) ) overall_warning = True wlm_script_key = "workload/wlm_queue_state.sql" wlm_results_or_error = raw_results_dict.get(wlm_script_key) queue_count_threshold = 10 queue_wait_threshold_sec = 300 if isinstance(wlm_results_or_error, list): for queue_data in wlm_results_or_error: if isinstance(queue_data, dict): sc = queue_data.get("service_class") queued = queue_data.get("queued_count") executing = queue_data.get("executing_count") max_wait_sec = queue_data.get("max_queue_wait_seconds") if sc is None: continue q_status = "OK" q_details = f"SC {sc}: Queued={queued}, Executing={executing}" if isinstance(queued, int) and queued > queue_count_threshold: q_status = "Warning" overall_warning = True recommendations.append( f"High queue count ({queued}) in Service Class {sc}. Check WLM config or long-running queries." ) q_details += f" (Threshold: {queue_count_threshold})" parsed_checks.append( HealthCheckItem( check_name=f"WLM Queue Count (SC {sc})", status=q_status, value=queued, unit="queries", threshold=queue_count_threshold, details=q_details, ) ) w_status = "OK" w_details = ( f"SC {sc}: Max Wait={max_wait_sec:.2f}s" if isinstance(max_wait_sec, (int, float)) else f"SC {sc}: Max Wait=N/A" ) if ( isinstance(max_wait_sec, (int, float)) and max_wait_sec > queue_wait_threshold_sec ): w_status = "Warning" overall_warning = True recommendations.append( f"Long queue wait time ({max_wait_sec:.2f}s) in Service Class {sc}. Check WLM config or resource contention." ) w_details += f" (Threshold: {queue_wait_threshold_sec}s)" parsed_checks.append( HealthCheckItem( check_name=f"WLM Max Queue Wait (SC {sc})", status=w_status, value=( f"{max_wait_sec:.2f}" if isinstance(max_wait_sec, (int, float)) else "N/A" ), unit="seconds", threshold=queue_wait_threshold_sec, details=w_details, ) ) elif isinstance(wlm_results_or_error, Exception): errors_encountered.append( f"Failed to retrieve WLM queue state: {wlm_results_or_error}" ) parsed_checks.append( HealthCheckItem( check_name="WLM Queue State", status="Error", details=f"Could not retrieve data: {wlm_results_or_error}", ) ) overall_warning = True other_check_results: List[Dict[str, Any]] = [] for script_key, result_or_error in raw_results_dict.items(): if script_key in [ cpu_script_key, disk_script_key, wlm_script_key, ]: continue if isinstance(result_or_error, list): if len(result_or_error) == 1 and isinstance(result_or_error[0], dict): other_check_results.append(result_or_error[0]) elif isinstance(result_or_error, Exception): errors_encountered.append( f"Error executing {script_key}: {result_or_error}" ) check_name_from_key = ( script_key.replace(".sql", "") .replace("health/", "") .replace("workload/", "") .replace("locks/", "") .replace("table_inspect/", "") .replace("_", " ") .title() ) parsed_checks.append( HealthCheckItem( check_name=f"{check_name_from_key} Check", status="Error", details=f"Could not retrieve data: {result_or_error}", ) ) overall_warning = True check: Dict[str, Any] for check in other_check_results: if not isinstance(check, dict): continue try: name: Optional[str] = check.get("check_name") value: Any = check.get("metric_value") threshold: Optional[Union[int, float]] = check.get("threshold") unit: Optional[str] = check.get("unit") details: str = check.get("recommendation", "N/A") if name is None: if "num_blocking_locks" in check: name = "Blocking Locks" value = check["num_blocking_locks"] unit = "locks" threshold = 0 details = ( f"{value} blocking lock(s) detected." if isinstance(value, int) and value > 0 else "Normal" ) elif "missing_stats_count" in check: name = "Missing Table Statistics" value = check["missing_stats_count"] unit = "tables" threshold = 0 details = ( f"{value} table(s) missing statistics." if isinstance(value, int) and value > 0 else "Normal" ) elif "dist_skew_count" in check: name = "Distribution Skew" value = check["dist_skew_count"] unit = "tables" threshold = 0 details = ( f"{value} table(s) have significant distribution skew (skew_rows > 4)." if isinstance(value, int) and value > 0 else "Normal" ) elif "underrepped_blocks_count" in check: name = "Under-replicated Blocks" value = check["underrepped_blocks_count"] unit = "blocks" threshold = 0 details = ( f"{value} block(s) found with < 2 replicas." if isinstance(value, int) and value > 0 else "Normal" ) elif "max_commit_wait_seconds" in check: name = "Max Commit Wait Time" value = check["max_commit_wait_seconds"] unit = "seconds" threshold = 60 details = ( f"Maximum commit queue wait time in the last {check.get('time_window_days', 'N/A')} days: {value}s." if isinstance(value, (int, float)) else "N/A" ) elif "stale_stats_count" in check: name = "Stale Statistics Count" value = check["stale_stats_count"] unit = "tables" threshold = 0 details = ( f"{value} table(s) have stale statistics." if isinstance(value, int) and value > 0 else "Normal" ) elif "needs_vacuum_count" in check: name = "Tables Needing Vacuum" value = check["needs_vacuum_count"] unit = "tables" threshold = 0 details = ( f"{value} table(s) may benefit from VACUUM (based on unsorted rows or deleted rows)." if isinstance(value, int) and value > 0 else "Normal" ) else: errors_encountered.append( f"Could not determine check name or metric value for result: {check}" ) continue if name is None or value is None: errors_encountered.append(f"Missing name or value for check: {check}") continue status: str = "OK" if ( threshold is not None and isinstance(value, (int, float)) and isinstance(threshold, (int, float)) and value > threshold ): status = "Warning" overall_warning = True if details not in ("Normal", "N/A", None, "") and status == "OK": status = "Warning" parsed_check_item = HealthCheckItem( check_name=name, status=status, value=value, unit=unit, threshold=threshold, details=details, ) parsed_checks.append(parsed_check_item) if status == "Warning": if details not in ("Normal", "N/A", None, ""): recommendations.append(f"Check '{name}': {details}") except Exception as e: errors_encountered.append(f"Error processing check result {check}: {e}") overall_warning = True continue if errors_encountered: parsed_checks.append( HealthCheckItem( check_name="Data Retrieval/Parsing Errors", status="Error", details="; ".join(errors_encountered), ) ) overall_warning = True final_result: ParsedHealthCheckResult = { "overall_status": "Warning" if overall_warning else "OK", "checks": parsed_checks, "recommendations": recommendations, } return final_result def parse_monitor_workload_results( results: Dict[str, List[Dict[str, Any]]], ) -> ParsedMonitorWorkloadResult: """Parses raw workload monitoring results from multiple queries into the expected MCP tool output format. Expects a dictionary where keys are identifiers for the source query (e.g., 'wlm_queue_state', 'top_queries', 'queuing_summary', etc.) and values are lists of dictionaries representing the raw rows returned by those SQL queries. Args: results: A dictionary mapping source query identifiers to lists of raw result rows (as dictionaries). Expected keys include: 'wlm_queue_state', 'top_queries', 'queuing_summary', 'disk_based_query_count', 'copy_performance', 'commit_wait', 'current_sessions', 'wlm_apex', 'wlm_trend_hourly'. Returns: A dictionary containing structured workload details, including summaries, WLM state, top queries, trends, potential issues, and recommendations. Returns an empty dictionary if the input appears logically empty based on expected keys and content. """ if not results or not isinstance(results, dict): return cast(ParsedMonitorWorkloadResult, {}) wlm_state: List[Dict[str, Any]] = _get_list_value(results, "wlm_queue_state") top_queries: List[Dict[str, Any]] = _get_list_value(results, "top_queries") queuing_summary: List[Dict[str, Any]] = _get_list_value(results, "queuing_summary") disk_based_list: List[Dict[str, Any]] = _get_list_value( results, "disk_based_query_count" ) copy_perf: List[Dict[str, Any]] = _get_list_value(results, "copy_performance") commit_wait_list: List[Dict[str, Any]] = _get_list_value(results, "commit_wait") sessions_list: List[Dict[str, Any]] = _get_list_value(results, "current_sessions") wlm_apex_list: List[Dict[str, Any]] = _get_list_value(results, "wlm_apex") wlm_trend_hourly_list: List[Dict[str, Any]] = _get_list_value( results, "wlm_trend_hourly" ) all_extracted_lists_empty: bool = not any( [ wlm_state, top_queries, queuing_summary, disk_based_list, copy_perf, commit_wait_list, sessions_list, wlm_apex_list, wlm_trend_hourly_list, ] ) known_keys: Set[str] = { "wlm_queue_state", "top_queries", "queuing_summary", "disk_based_query_count", "copy_performance", "commit_wait", "current_sessions", "wlm_apex", "wlm_trend_hourly", } is_truly_empty_input: bool = True for k in known_keys: if k in results: v = results[k] if isinstance(v, list) and len(v) > 0: is_truly_empty_input = False break if is_truly_empty_input: for k, v in results.items(): if k not in known_keys and v: is_truly_empty_input = False break if all_extracted_lists_empty and is_truly_empty_input: return cast(ParsedMonitorWorkloadResult, {}) active_sessions_count: int = len(sessions_list) queries_executing: int = sum( q.get("num_executing", 0) for q in wlm_state if isinstance(q, dict) ) queries_queued: int = sum( q.get("num_queued", 0) for q in wlm_state if isinstance(q, dict) ) avg_commit_wait_any: Any = _get_first_value(commit_wait_list, "avg_commit_wait_ms") disk_based_count_any: Any = _get_first_value(disk_based_list, "disk_based_count", 0) avg_commit_wait: Optional[float] = ( cast(Optional[float], avg_commit_wait_any) if isinstance(avg_commit_wait_any, (int, float)) else None ) disk_based_count: int = ( cast(int, disk_based_count_any) if isinstance(disk_based_count_any, int) else 0 ) summary: WorkloadSummary = { "active_sessions": active_sessions_count, "queries_executing": queries_executing, "queries_queued": queries_queued, "avg_commit_wait_ms": avg_commit_wait, "disk_based_queries_recent": disk_based_count, } workload_trends: WorkloadTrends = { "apex": wlm_apex_list, "hourly_trend": wlm_trend_hourly_list, } potential_issues: List[str] = [] recommendations: List[str] = [] if disk_based_count > 0: issue_text: str = ( f"{disk_based_count} queries recently executed disk-based operations. Investigate memory allocation or query complexity." ) potential_issues.append(issue_text) recommendations.append( f"Consider investigating the {disk_based_count} queries that recently executed disk-based operations." ) if queries_queued > 5: recommendations.append( f"High number of queued queries ({queries_queued}). Check WLM configuration and long-running queries." ) potential_issues.append(f"High WLM queuing ({queries_queued} queries).") final_result: ParsedMonitorWorkloadResult = { "summary": summary, "wlm_state": wlm_state, "queue_performance": queuing_summary, "top_active_queries": top_queries, "recent_copy_performance": copy_perf, "workload_trends": workload_trends, "potential_issues": potential_issues, "recommendations": recommendations, } return final_result

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/vinodismyname/redshift-utils-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server