handle_diagnose_locks
Analyzes and resolves lock contention in Redshift clusters by identifying active locks, filtering by process ID, table name, or wait time, and providing detailed reports for troubleshooting.
Instructions
Identifies active lock contention in the cluster.
Fetches all current lock information and then filters it based on the
optional target PID, target table name, and minimum wait time.
Formats the results into a list of contention details and a summary.
Args:
ctx: The MCP context object.
target_pid: Optional: Filter results to show locks held by or waited
for by this specific process ID (PID).
target_table_name: Optional: Filter results for locks specifically on
this table name (schema qualification recommended
if ambiguous).
min_wait_seconds: Minimum seconds a lock must be in a waiting state
to be included. Defaults to 5.
Returns:
A list of dictionaries, where each dictionary represents a row
from the lock contention query result.
Raises:
DataApiError: If fetching the initial lock information fails.
Input Schema
TableJSON Schema
| Name | Required | Description | Default |
|---|---|---|---|
| min_wait_seconds | No | ||
| target_pid | No | ||
| target_table_name | No |
Implementation Reference
- The core handler function for the 'handle_diagnose_locks' tool. It loads and executes a SQL script to diagnose locks, applies optional filters, and returns the results.@mcp.tool() async def handle_diagnose_locks( ctx: Context, target_pid: Optional[int] = None, target_table_name: Optional[str] = None, min_wait_seconds: int = 5, ) -> List[Dict[str, Any]]: """Identifies active lock contention in the cluster. Fetches all current lock information and then filters it based on the optional target PID, target table name, and minimum wait time. Formats the results into a list of contention details and a summary. Args: ctx: The MCP context object. target_pid: Optional: Filter results to show locks held by or waited for by this specific process ID (PID). target_table_name: Optional: Filter results for locks specifically on this table name (schema qualification recommended if ambiguous). min_wait_seconds: Minimum seconds a lock must be in a waiting state to be included. Defaults to 5. Returns: A list of dictionaries, where each dictionary represents a row from the lock contention query result. Raises: DataApiError: If fetching the initial lock information fails. """ ctx.info("Starting lock diagnosis...") ctx.debug( f"Lock diagnosis filters - PID: {target_pid}, Table: {target_table_name}, MinWait: {min_wait_seconds}s" ) lock_script: str = "locks/blocking_pids.sql" all_locks: List[Dict[str, Any]] = [] try: sql: str = load_sql(lock_script) config: DataApiConfig = get_data_api_config() all_locks: List[Dict[str, Any]] = await execute_sql(config=config, sql=sql) ctx.debug(f"Retrieved {len(all_locks)} raw lock entries.") except ( SqlScriptNotFoundError, DataApiError, SqlExecutionError, ClientError, Exception, ) as e: ctx.error(f"Failed to retrieve lock information: {e}", exc_info=True) raise DataApiError(f"Failed to retrieve lock information: {e}") ctx.info("Lock diagnosis completed.") return all_locks
- src/redshift_utils_mcp/server.py:64-91 (registration)Imports the 'diagnose_locks' handler module (and others) into the FastMCP server context, triggering automatic tool registration via the @mcp.tool() decorator.from .tools.handlers import ( # noqa: E402 check_cluster_health, diagnose_locks, diagnose_query_performance, execute_ad_hoc_query, get_table_definition, inspect_table, monitor_workload, ) from .resources import handlers as resource_handlers # noqa: E402 from .prompts import handlers as prompt_handlers # noqa: E402 _ = ( check_cluster_health, diagnose_locks, diagnose_query_performance, execute_ad_hoc_query, get_table_definition, inspect_table, monitor_workload, resource_handlers, prompt_handlers, )
- src/redshift_utils_mcp/tools/handlers/__init__.py:1-19 (registration)Exposes the 'handle_diagnose_locks' function via __init__.py for easy import in the server registration.# src/redshift_utils_mcp/tools/handlers/__init__.py """Exports tool handlers.""" from .check_cluster_health import handle_check_cluster_health from .diagnose_locks import handle_diagnose_locks from .diagnose_query_performance import handle_diagnose_query_performance from .execute_ad_hoc_query import handle_execute_ad_hoc_query from .get_table_definition import handle_get_table_definition from .inspect_table import handle_inspect_table from .monitor_workload import handle_monitor_workload __all__ = [ "handle_check_cluster_health", "handle_diagnose_locks", "handle_diagnose_query_performance", "handle_execute_ad_hoc_query", "handle_get_table_definition", "handle_inspect_table", "handle_monitor_workload", ]