Skip to main content
Glama

handle_monitor_workload

Analyze Amazon Redshift cluster workload patterns by executing SQL scripts to gather data on resource usage, WLM performance, and query efficiency. Customize time windows and top queries for detailed insights.

Instructions

Analyzes cluster workload patterns over a specified time window.

Executes various SQL scripts concurrently to gather data on resource usage,
WLM performance, top queries, queuing, COPY performance, and disk-based
queries. Returns a dictionary containing the raw results (or Exceptions)
keyed by the script name.

Args:
    ctx: The MCP context object.
    time_window_days: Lookback period in days for the workload analysis.
                      Defaults to 2.
    top_n_queries: Number of top queries (by total execution time) to
                   consider for the 'top_queries.sql' script. Defaults to 10.

Returns:
    A dictionary where keys are script names (e.g., 'workload/top_queries.sql')
    and values are either a list of result rows (as dictionaries) or the
    Exception object if that script failed.

Raises:
    DataApiError: If a critical error occurs during configuration loading.
                  (Note: Individual script errors are returned in the result dict).

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
time_window_daysNo
top_n_queriesNo

Implementation Reference

  • The primary handler function for the 'handle_monitor_workload' tool. It orchestrates concurrent execution of multiple SQL scripts to gather comprehensive Redshift workload metrics including resource trends, WLM performance, top queries, queuing stats, and potential bottlenecks.
    async def handle_monitor_workload(
        ctx: Context, time_window_days: int = 2, top_n_queries: int = 10
    ) -> Dict[str, Union[List[Dict[str, Any]], Exception]]:
        """Analyzes cluster workload patterns over a specified time window.
    
        Executes various SQL scripts concurrently to gather data on resource usage,
        WLM performance, top queries, queuing, COPY performance, and disk-based
        queries. Returns a dictionary containing the raw results (or Exceptions)
        keyed by the script name.
    
        Args:
            ctx: The MCP context object.
            time_window_days: Lookback period in days for the workload analysis.
                              Defaults to 2.
            top_n_queries: Number of top queries (by total execution time) to
                           consider for the 'top_queries.sql' script. Defaults to 10.
    
        Returns:
            A dictionary where keys are script names (e.g., 'workload/top_queries.sql')
            and values are either a list of result rows (as dictionaries) or the
            Exception object if that script failed.
    
        Raises:
            DataApiError: If a critical error occurs during configuration loading.
                          (Note: Individual script errors are returned in the result dict).
        """
        await ctx.info(
            f"Starting workload monitoring data collection (Window: {time_window_days} days, Top N: {top_n_queries})..."
        )
    
        scripts: List[str] = [
            "workload/queue_resources_hourly.sql",
            "workload/wlm_apex.sql",
            "workload/wlm_apex_hourly.sql",
            "workload/wlm_trend_daily.sql",
            "workload/wlm_trend_hourly.sql",
            "workload/top_queries.sql",
            "workload/queuing_summary.sql",
            "workload/copy_performance.sql",
            "workload/disk_based_query_count.sql",
            "workload/query_type_hourly_summary.sql",
        ]
    
        raw_results_dict: Dict[str, Union[List[Dict[str, Any]], Exception]] = {}
        tasks: List[asyncio.Task] = []
    
        async def run_script(script_name: str) -> None:
            """Loads and executes a single SQL script for workload monitoring."""
            nonlocal raw_results_dict
            await ctx.debug(f"Executing workload script: {script_name}")
            try:
                sql: str = load_sql(script_name)
                params: List[Tuple[str, Any]] = []
                if script_name in [
                    "workload/queue_resources_hourly.sql",
                    "workload/wlm_apex.sql",
                    "workload/wlm_apex_hourly.sql",
                    "workload/wlm_trend_daily.sql",
                    "workload/wlm_trend_hourly.sql",
                    "workload/queuing_summary.sql",
                    "workload/copy_performance.sql",
                    "workload/disk_based_query_count.sql",
                ]:
                    params.append(("time_window_days", time_window_days))
    
                if script_name == "workload/top_queries.sql":
                    params.append(("top_n_queries", top_n_queries))
                    params.append(("time_window_days", time_window_days))
    
                config: DataApiConfig = get_data_api_config()
                result: Dict[str, Any] = await execute_sql(
                    config=config, sql=sql, params=params if params else None
                )
    
                if result.get("error"):
                    await ctx.warning(
                        f"Script {script_name} failed with error: {result['error']}"
                    )
    
                    raw_results_dict[script_name] = {"error": result["error"]}
                else:
    
                    raw_results_dict[script_name] = result.get("rows", [])
            except SqlScriptNotFoundError as e:
                await ctx.error(
                    f"SQL script not found for workload monitor: {script_name}",
                    exc_info=True,
                )
                raw_results_dict[script_name] = e
            except (DataApiError, SqlExecutionError, ClientError, Exception) as e:
                await ctx.error(
                    f"Error executing workload monitor script {script_name}: {e}",
                    exc_info=True,
                )
                raw_results_dict[script_name] = e
    
        await ctx.debug(f"Executing {len(scripts)} workload scripts...")
        tasks = [asyncio.create_task(run_script(script)) for script in scripts]
        await asyncio.gather(*tasks)
    
        total_steps = len(scripts)
        await ctx.report_progress(total_steps, total_steps)
    
        await ctx.info("Workload monitoring data collection completed.")
        return raw_results_dict
  • TypedDict definitions providing schema for the structured output returned by the handle_monitor_workload tool, detailing resource trends, WLM metrics, top queries, workload characteristics, and results.
    class ResourceTrends(TypedDict):
        cpu_peak_pct: Optional[float]
        spill_peak_mb: Optional[float]
        daily_query_counts: Dict[str, int]
    
    
    class WLMPerformance(TypedDict):
        avg_queue_time_secs: Optional[float]
        concurrency_apex_current: Optional[int]
        concurrency_apex_historical: Optional[int]
    
    
    class TopQuerySummary(TypedDict):
        query_id: int
        query_text_snippet: str
        total_execution_time_secs: float
        avg_execution_time_secs: float
        run_count: int
        total_cpu_secs: float
        total_spill_mb: float
    
    
    class WorkloadCharacteristics(TypedDict):
        avg_queueing_queries: Optional[float]
        pct_disk_based_queries: Optional[float]
        slow_copy_summary: Optional[str]
    
    
    class MonitorWorkloadResult(TypedDict):
        time_window_days: int
        resource_trends: ResourceTrends
        wlm_performance: WLMPerformance
        top_consuming_queries: List[TopQuerySummary]
        workload_characteristics: WorkloadCharacteristics
        potential_bottlenecks: List[str]
        errors: Optional[List[Dict[str, str]]]
  • Server-level registration of tools by importing handler functions from tools/handlers package (including monitor_workload), ensuring modules are loaded and @mcp.tool() decorators register the tools with the FastMCP server instance.
    from .tools.handlers import (  # noqa: E402
        check_cluster_health,
        diagnose_locks,
        diagnose_query_performance,
        execute_ad_hoc_query,
        get_table_definition,
        inspect_table,
        monitor_workload,
    )
    
    
    from .resources import handlers as resource_handlers  # noqa: E402
    
    
    from .prompts import handlers as prompt_handlers  # noqa: E402
    
    
    _ = (
        check_cluster_health,
        diagnose_locks,
        diagnose_query_performance,
        execute_ad_hoc_query,
        get_table_definition,
        inspect_table,
        monitor_workload,
        resource_handlers,
        prompt_handlers,
    )
  • Package-level export and import of all tool handlers, including handle_monitor_workload, facilitating bulk import in server.py for registration.
    # src/redshift_utils_mcp/tools/handlers/__init__.py
    """Exports tool handlers."""
    from .check_cluster_health import handle_check_cluster_health
    from .diagnose_locks import handle_diagnose_locks
    from .diagnose_query_performance import handle_diagnose_query_performance
    from .execute_ad_hoc_query import handle_execute_ad_hoc_query
    from .get_table_definition import handle_get_table_definition
    from .inspect_table import handle_inspect_table
    from .monitor_workload import handle_monitor_workload
    
    __all__ = [
        "handle_check_cluster_health",
        "handle_diagnose_locks",
        "handle_diagnose_query_performance",
        "handle_execute_ad_hoc_query",
        "handle_get_table_definition",
        "handle_inspect_table",
        "handle_monitor_workload",
    ]
Behavior4/5

Does the description disclose side effects, auth requirements, rate limits, or destructive behavior?

With no annotations provided, the description carries the full burden of behavioral disclosure. It effectively describes that the tool executes SQL scripts concurrently, returns a dictionary with raw results or exceptions, and handles individual script failures gracefully by including exceptions in the result dict. It also mentions that critical configuration errors raise DataApiError. However, it doesn't specify performance characteristics, rate limits, or authentication requirements.

Agents need to know what a tool does to the world before calling it. Descriptions should go beyond structured annotations to explain consequences.

Conciseness4/5

Is the description appropriately sized, front-loaded, and free of redundancy?

The description is well-structured with clear sections (purpose, execution details, args, returns, raises) and front-loaded with the core functionality. While comprehensive, some sentences could be more concise, such as the detailed explanation of the return dictionary structure which is somewhat verbose.

Shorter descriptions cost fewer tokens and are easier for agents to parse. Every sentence should earn its place.

Completeness4/5

Given the tool's complexity, does the description cover enough for an agent to succeed on first attempt?

Given the complexity of a workload analysis tool with 2 parameters, no annotations, and no output schema, the description provides substantial context about behavior, parameters, return format, and error handling. It explains the concurrent execution of SQL scripts, the dictionary return structure with success/failure results, and different error scenarios. The main gap is lack of information about what specific workload metrics are analyzed beyond the general categories mentioned.

Complex tools with many parameters or behaviors need more documentation. Simple tools need less. This dimension scales expectations accordingly.

Parameters5/5

Does the description clarify parameter syntax, constraints, interactions, or defaults beyond what the schema provides?

The description provides excellent parameter semantics beyond the basic schema. While schema description coverage is 0%, the description clearly explains that time_window_days is the 'lookback period in days for workload analysis' with a default of 2, and top_n_queries determines 'number of top queries to consider' with a default of 10. This adds meaningful context about what these parameters control in the analysis.

Input schemas describe structure but not intent. Descriptions should explain non-obvious parameter relationships and valid value ranges.

Purpose5/5

Does the description clearly state what the tool does and how it differs from similar tools?

The description clearly states the tool 'analyzes cluster workload patterns over a specified time window' with specific verbs (analyzes, executes, gathers) and resources (cluster workload, SQL scripts). It distinguishes from siblings like handle_check_cluster_health or handle_diagnose_query_performance by focusing on comprehensive workload analysis rather than specific health checks or query diagnostics.

Agents choose between tools based on descriptions. A clear purpose with a specific verb and resource helps agents select the right tool.

Usage Guidelines3/5

Does the description explain when to use this tool, when not to, or what alternatives exist?

The description implies usage for analyzing workload patterns over time, but doesn't explicitly state when to use this tool versus alternatives like handle_diagnose_query_performance or handle_execute_ad_hoc_query. There's no guidance on prerequisites, exclusions, or specific scenarios where this tool is preferred over sibling tools.

Agents often have multiple tools that could apply. Explicit usage guidance like "use X instead of Y when Z" prevents misuse.

Install Server

Other Tools

Related Tools

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/vinodismyname/redshift-utils-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server