Skip to main content
Glama

handle_monitor_workload

Analyze Amazon Redshift cluster workload patterns by executing SQL scripts to gather data on resource usage, WLM performance, and query efficiency. Customize time windows and top queries for detailed insights.

Instructions

Analyzes cluster workload patterns over a specified time window.

Executes various SQL scripts concurrently to gather data on resource usage,
WLM performance, top queries, queuing, COPY performance, and disk-based
queries. Returns a dictionary containing the raw results (or Exceptions)
keyed by the script name.

Args:
    ctx: The MCP context object.
    time_window_days: Lookback period in days for the workload analysis.
                      Defaults to 2.
    top_n_queries: Number of top queries (by total execution time) to
                   consider for the 'top_queries.sql' script. Defaults to 10.

Returns:
    A dictionary where keys are script names (e.g., 'workload/top_queries.sql')
    and values are either a list of result rows (as dictionaries) or the
    Exception object if that script failed.

Raises:
    DataApiError: If a critical error occurs during configuration loading.
                  (Note: Individual script errors are returned in the result dict).

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
time_window_daysNo
top_n_queriesNo

Implementation Reference

  • The primary handler function for the 'handle_monitor_workload' tool. It orchestrates concurrent execution of multiple SQL scripts to gather comprehensive Redshift workload metrics including resource trends, WLM performance, top queries, queuing stats, and potential bottlenecks.
    async def handle_monitor_workload(
        ctx: Context, time_window_days: int = 2, top_n_queries: int = 10
    ) -> Dict[str, Union[List[Dict[str, Any]], Exception]]:
        """Analyzes cluster workload patterns over a specified time window.
    
        Executes various SQL scripts concurrently to gather data on resource usage,
        WLM performance, top queries, queuing, COPY performance, and disk-based
        queries. Returns a dictionary containing the raw results (or Exceptions)
        keyed by the script name.
    
        Args:
            ctx: The MCP context object.
            time_window_days: Lookback period in days for the workload analysis.
                              Defaults to 2.
            top_n_queries: Number of top queries (by total execution time) to
                           consider for the 'top_queries.sql' script. Defaults to 10.
    
        Returns:
            A dictionary where keys are script names (e.g., 'workload/top_queries.sql')
            and values are either a list of result rows (as dictionaries) or the
            Exception object if that script failed.
    
        Raises:
            DataApiError: If a critical error occurs during configuration loading.
                          (Note: Individual script errors are returned in the result dict).
        """
        await ctx.info(
            f"Starting workload monitoring data collection (Window: {time_window_days} days, Top N: {top_n_queries})..."
        )
    
        scripts: List[str] = [
            "workload/queue_resources_hourly.sql",
            "workload/wlm_apex.sql",
            "workload/wlm_apex_hourly.sql",
            "workload/wlm_trend_daily.sql",
            "workload/wlm_trend_hourly.sql",
            "workload/top_queries.sql",
            "workload/queuing_summary.sql",
            "workload/copy_performance.sql",
            "workload/disk_based_query_count.sql",
            "workload/query_type_hourly_summary.sql",
        ]
    
        raw_results_dict: Dict[str, Union[List[Dict[str, Any]], Exception]] = {}
        tasks: List[asyncio.Task] = []
    
        async def run_script(script_name: str) -> None:
            """Loads and executes a single SQL script for workload monitoring."""
            nonlocal raw_results_dict
            await ctx.debug(f"Executing workload script: {script_name}")
            try:
                sql: str = load_sql(script_name)
                params: List[Tuple[str, Any]] = []
                if script_name in [
                    "workload/queue_resources_hourly.sql",
                    "workload/wlm_apex.sql",
                    "workload/wlm_apex_hourly.sql",
                    "workload/wlm_trend_daily.sql",
                    "workload/wlm_trend_hourly.sql",
                    "workload/queuing_summary.sql",
                    "workload/copy_performance.sql",
                    "workload/disk_based_query_count.sql",
                ]:
                    params.append(("time_window_days", time_window_days))
    
                if script_name == "workload/top_queries.sql":
                    params.append(("top_n_queries", top_n_queries))
                    params.append(("time_window_days", time_window_days))
    
                config: DataApiConfig = get_data_api_config()
                result: Dict[str, Any] = await execute_sql(
                    config=config, sql=sql, params=params if params else None
                )
    
                if result.get("error"):
                    await ctx.warning(
                        f"Script {script_name} failed with error: {result['error']}"
                    )
    
                    raw_results_dict[script_name] = {"error": result["error"]}
                else:
    
                    raw_results_dict[script_name] = result.get("rows", [])
            except SqlScriptNotFoundError as e:
                await ctx.error(
                    f"SQL script not found for workload monitor: {script_name}",
                    exc_info=True,
                )
                raw_results_dict[script_name] = e
            except (DataApiError, SqlExecutionError, ClientError, Exception) as e:
                await ctx.error(
                    f"Error executing workload monitor script {script_name}: {e}",
                    exc_info=True,
                )
                raw_results_dict[script_name] = e
    
        await ctx.debug(f"Executing {len(scripts)} workload scripts...")
        tasks = [asyncio.create_task(run_script(script)) for script in scripts]
        await asyncio.gather(*tasks)
    
        total_steps = len(scripts)
        await ctx.report_progress(total_steps, total_steps)
    
        await ctx.info("Workload monitoring data collection completed.")
        return raw_results_dict
  • TypedDict definitions providing schema for the structured output returned by the handle_monitor_workload tool, detailing resource trends, WLM metrics, top queries, workload characteristics, and results.
    class ResourceTrends(TypedDict):
        cpu_peak_pct: Optional[float]
        spill_peak_mb: Optional[float]
        daily_query_counts: Dict[str, int]
    
    
    class WLMPerformance(TypedDict):
        avg_queue_time_secs: Optional[float]
        concurrency_apex_current: Optional[int]
        concurrency_apex_historical: Optional[int]
    
    
    class TopQuerySummary(TypedDict):
        query_id: int
        query_text_snippet: str
        total_execution_time_secs: float
        avg_execution_time_secs: float
        run_count: int
        total_cpu_secs: float
        total_spill_mb: float
    
    
    class WorkloadCharacteristics(TypedDict):
        avg_queueing_queries: Optional[float]
        pct_disk_based_queries: Optional[float]
        slow_copy_summary: Optional[str]
    
    
    class MonitorWorkloadResult(TypedDict):
        time_window_days: int
        resource_trends: ResourceTrends
        wlm_performance: WLMPerformance
        top_consuming_queries: List[TopQuerySummary]
        workload_characteristics: WorkloadCharacteristics
        potential_bottlenecks: List[str]
        errors: Optional[List[Dict[str, str]]]
  • Server-level registration of tools by importing handler functions from tools/handlers package (including monitor_workload), ensuring modules are loaded and @mcp.tool() decorators register the tools with the FastMCP server instance.
    from .tools.handlers import (  # noqa: E402
        check_cluster_health,
        diagnose_locks,
        diagnose_query_performance,
        execute_ad_hoc_query,
        get_table_definition,
        inspect_table,
        monitor_workload,
    )
    
    
    from .resources import handlers as resource_handlers  # noqa: E402
    
    
    from .prompts import handlers as prompt_handlers  # noqa: E402
    
    
    _ = (
        check_cluster_health,
        diagnose_locks,
        diagnose_query_performance,
        execute_ad_hoc_query,
        get_table_definition,
        inspect_table,
        monitor_workload,
        resource_handlers,
        prompt_handlers,
    )
  • Package-level export and import of all tool handlers, including handle_monitor_workload, facilitating bulk import in server.py for registration.
    # src/redshift_utils_mcp/tools/handlers/__init__.py
    """Exports tool handlers."""
    from .check_cluster_health import handle_check_cluster_health
    from .diagnose_locks import handle_diagnose_locks
    from .diagnose_query_performance import handle_diagnose_query_performance
    from .execute_ad_hoc_query import handle_execute_ad_hoc_query
    from .get_table_definition import handle_get_table_definition
    from .inspect_table import handle_inspect_table
    from .monitor_workload import handle_monitor_workload
    
    __all__ = [
        "handle_check_cluster_health",
        "handle_diagnose_locks",
        "handle_diagnose_query_performance",
        "handle_execute_ad_hoc_query",
        "handle_get_table_definition",
        "handle_inspect_table",
        "handle_monitor_workload",
    ]

Tool Definition Quality

Score is being calculated. Check back soon.

Install Server

Other Tools

Related Tools

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/vinodismyname/redshift-utils-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server