get_blocking_queries
Identify and analyze blocking queries in PostgreSQL databases to resolve performance issues, providing lock details, hierarchy, and actionable recommendations.
Instructions
Get comprehensive blocking queries analysis with lock information, hierarchy, and recommendations
Input Schema
TableJSON Schema
| Name | Required | Description | Default |
|---|---|---|---|
No arguments | |||
Implementation Reference
- MCP tool handler and registration for 'get_blocking_queries'. Decorated with @mcp.tool and implements the tool execution logic by instantiating BlockingQueriesAnalyzer and calling its get_blocking_queries method.@mcp.tool(description="Get comprehensive blocking queries analysis with lock information, hierarchy, and recommendations") async def get_blocking_queries() -> ResponseType: """Get comprehensive information about blocking queries and locks in the database with analysis and recommendations.""" try: sql_driver = await get_sql_driver() analyzer = BlockingQueriesAnalyzer(sql_driver) result = await analyzer.get_blocking_queries() return format_text_response(result) except Exception as e: logger.error(f"Error getting blocking queries: {e}") return format_error_response(str(e))
- Core helper method in BlockingQueriesAnalyzer class that implements the detailed logic for analyzing blocking queries, including SQL queries for data collection, graph generation, recommendations, and text formatting.async def get_blocking_queries(self) -> str: """ Get comprehensive blocking queries analysis using modern PostgreSQL features. Enhanced with lock wait graph visualization, deadlock detection, session termination recommendations, lock timeout suggestions, and historical analysis. Returns: String containing blocking queries data, summary, and recommendations """ try: # Get comprehensive blocking analysis blocking_data = await self._get_blocking_data() if not blocking_data: # Get deadlock information even when no current blocking deadlock_info = await self._get_deadlock_analysis() lock_contention_hotspots = await self._get_lock_contention_hotspots() result = { "status": "healthy", "message": "No blocking queries found - all queries are running without locks.", "blocking_queries": [], "summary": {"total_blocked": 0, "total_blocking": 0, "max_wait_time": 0, "affected_relations": []}, "deadlock_analysis": deadlock_info, "lock_contention_hotspots": lock_contention_hotspots, "recommendations": await self._generate_healthy_recommendations(), } return self._format_as_text(result) # Enhanced analysis for blocking queries lock_wait_graph = await self._generate_lock_wait_graph(blocking_data) deadlock_info = await self._get_deadlock_analysis() session_termination_recs = await self._generate_session_termination_recommendations(blocking_data) lock_timeout_suggestions = await self._generate_lock_timeout_suggestions(blocking_data) historical_analysis = await self._get_historical_blocking_analysis() lock_escalation_info = await self._detect_lock_escalation(blocking_data) query_pattern_analysis = await self._analyze_query_patterns(blocking_data) batch_operation_impact = await self._analyze_batch_operation_impact(blocking_data) lock_contention_hotspots = await self._get_lock_contention_hotspots() # Process blocking queries data with improved structure blocking_pids = set() blocked_pids = set() relations = set() max_wait_time = 0 for block in blocking_data: duration = block["blocked_process"]["duration_seconds"] max_wait_time = max(max_wait_time, duration) if block["blocking_process"]["pid"]: blocking_pids.add(block["blocking_process"]["pid"]) blocked_pids.add(block["blocked_process"]["pid"]) if block["lock_info"]["affected_relations"]: relations.update(block["lock_info"]["affected_relations"].split(", ")) # Generate enhanced summary summary = { "total_blocked": len(blocked_pids), "total_blocking": len(blocking_pids), "max_wait_time_seconds": max_wait_time, "affected_relations": list(relations), "analysis_timestamp": datetime.now().isoformat(), "lock_wait_threshold_alerts": self._check_lock_wait_thresholds(blocking_data), } recommendations = await self._generate_enhanced_recommendations( blocking_data, summary, session_termination_recs, lock_timeout_suggestions, historical_analysis, lock_escalation_info, query_pattern_analysis, ) result = { "status": "blocking_detected", "blocking_queries": blocking_data, "summary": summary, "lock_wait_graph": lock_wait_graph, "deadlock_analysis": deadlock_info, "session_termination_recommendations": session_termination_recs, "lock_timeout_suggestions": lock_timeout_suggestions, "historical_analysis": historical_analysis, "lock_escalation_detection": lock_escalation_info, "query_pattern_analysis": query_pattern_analysis, "batch_operation_impact": batch_operation_impact, "lock_contention_hotspots": lock_contention_hotspots, "recommendations": recommendations, } return self._format_as_text(result) except Exception as e: logger.error(f"Error analyzing blocking queries: {e}") raise
- Key helper method that executes the main SQL query to fetch blocking query data from pg_stat_activity and pg_locks, structures it into a list of blocking events.async def _get_blocking_data(self) -> List[Dict[str, Any]]: """Get comprehensive blocking queries data using modern PostgreSQL features.""" blocking_query = """ WITH blocking_tree AS ( SELECT activity.pid, activity.usename, activity.application_name, activity.client_addr, activity.state, activity.query, activity.query_start, activity.state_change, activity.wait_event, activity.wait_event_type, pg_blocking_pids(activity.pid) AS blocking_pids, EXTRACT(EPOCH FROM (now() - activity.query_start)) AS duration_seconds, EXTRACT(EPOCH FROM (now() - activity.state_change)) AS state_duration_seconds, CASE WHEN activity.query_start IS NOT NULL THEN EXTRACT(EPOCH FROM (now() - activity.query_start)) ELSE NULL END AS wait_duration_seconds, activity.backend_start, activity.xact_start FROM pg_stat_activity activity WHERE activity.pid <> pg_backend_pid() AND activity.state IS NOT NULL ), lock_details AS ( SELECT pid, string_agg(DISTINCT locktype, ', ') AS lock_types, string_agg(DISTINCT mode, ', ') AS lock_modes, COUNT(*) AS lock_count, string_agg(DISTINCT CASE WHEN relation IS NOT NULL THEN (SELECT schemaname||'.'||relname FROM pg_stat_user_tables WHERE relid = relation) ELSE NULL END, ', ') AS affected_relations, -- Enhanced lock information string_agg(DISTINCT CASE WHEN locktype = 'relation' AND mode LIKE '%ExclusiveLock' THEN 'TABLE_LOCK' WHEN locktype = 'tuple' THEN 'ROW_LOCK' WHEN locktype = 'transactionid' THEN 'TXN_LOCK' ELSE locktype END, ', ') AS lock_categories FROM pg_locks WHERE granted = false GROUP BY pid ) SELECT bt.pid AS blocked_pid, bt.usename AS blocked_user, bt.application_name AS blocked_application, bt.client_addr AS blocked_client_addr, bt.state AS blocked_state, bt.query AS blocked_query, bt.query_start AS blocked_query_start, bt.state_change AS blocked_state_change, bt.wait_event AS blocked_wait_event, bt.wait_event_type AS blocked_wait_event_type, bt.duration_seconds AS blocked_duration_seconds, bt.state_duration_seconds AS blocked_state_duration_seconds, bt.wait_duration_seconds AS blocked_wait_duration_seconds, bt.backend_start AS blocked_backend_start, bt.xact_start AS blocked_xact_start, bt.blocking_pids, blocker.pid AS blocking_pid, blocker.usename AS blocking_user, blocker.application_name AS blocking_application, blocker.client_addr AS blocking_client_addr, blocker.state AS blocking_state, blocker.query AS blocking_query, blocker.query_start AS blocking_query_start, blocker.duration_seconds AS blocking_duration_seconds, blocker.backend_start AS blocking_backend_start, blocker.xact_start AS blocking_xact_start, ld.lock_types, ld.lock_modes, ld.lock_count, ld.affected_relations, ld.lock_categories FROM blocking_tree bt LEFT JOIN LATERAL unnest(bt.blocking_pids) AS blocking_pid_unnest(pid) ON true LEFT JOIN blocking_tree blocker ON blocker.pid = blocking_pid_unnest.pid LEFT JOIN lock_details ld ON ld.pid = bt.pid WHERE cardinality(bt.blocking_pids) > 0 ORDER BY bt.duration_seconds DESC; """ rows = await self.sql_driver.execute_query(blocking_query) if not rows: return [] blocking_data = [] for row in rows: duration = float(row.cells["blocked_duration_seconds"]) if row.cells["blocked_duration_seconds"] else 0 blocking_data.append( { "blocked_process": { "pid": row.cells["blocked_pid"], "user": row.cells["blocked_user"], "application": row.cells["blocked_application"], "client_addr": row.cells["blocked_client_addr"], "state": row.cells["blocked_state"], "query_start": row.cells["blocked_query_start"], "state_change": row.cells["blocked_state_change"], "wait_event": row.cells["blocked_wait_event"], "wait_event_type": row.cells["blocked_wait_event_type"], "duration_seconds": duration, "state_duration_seconds": ( float(row.cells["blocked_state_duration_seconds"]) if row.cells["blocked_state_duration_seconds"] else 0 ), "wait_duration_seconds": ( float(row.cells["blocked_wait_duration_seconds"]) if row.cells["blocked_wait_duration_seconds"] else 0 ), "query": row.cells["blocked_query"], "backend_start": row.cells["blocked_backend_start"], "xact_start": row.cells["blocked_xact_start"], }, "blocking_process": { "pid": row.cells["blocking_pid"], "user": row.cells["blocking_user"], "application": row.cells["blocking_application"], "client_addr": row.cells["blocking_client_addr"], "state": row.cells["blocking_state"], "query_start": row.cells["blocking_query_start"], "duration_seconds": float(row.cells["blocking_duration_seconds"]) if row.cells["blocking_duration_seconds"] else 0, "query": row.cells["blocking_query"], "backend_start": row.cells["blocking_backend_start"], "xact_start": row.cells["blocking_xact_start"], }, "lock_info": { "types": row.cells["lock_types"], "modes": row.cells["lock_modes"], "count": row.cells["lock_count"], "affected_relations": row.cells["affected_relations"], "categories": row.cells["lock_categories"], }, "blocking_hierarchy": { "all_blocking_pids": row.cells["blocking_pids"], "immediate_blocker": row.cells["blocking_pid"], }, } ) return blocking_data