Skip to main content
Glama
isdaniel

PostgreSQL-Performance-Tuner-Mcp

tools_performance.py25.4 kB
"""Performance analysis tool handlers.""" from __future__ import annotations import json from collections.abc import Sequence from typing import Any from mcp.types import TextContent, Tool from ..services import SqlDriver, get_user_filter from .toolhandler import ToolHandler class GetSlowQueriesToolHandler(ToolHandler): """Tool handler for retrieving slow queries from pg_stat_statements.""" name = "get_slow_queries" title = "Slow Query Analyzer" read_only_hint = True destructive_hint = False idempotent_hint = True open_world_hint = False description = """Retrieve slow queries from PostgreSQL using pg_stat_statements. Returns the top N slowest queries ordered by mean (average) execution time. Requires the pg_stat_statements extension to be enabled. Note: This tool focuses on user/application queries only. System catalog queries (pg_catalog, information_schema, pg_toast) are automatically excluded. The results include: - Query text (normalized) - Number of calls - Mean execution time (average per call) - Min/Max execution time - Rows returned - Shared buffer hits/reads for cache analysis""" def __init__(self, sql_driver: SqlDriver): self.sql_driver = sql_driver def get_tool_definition(self) -> Tool: return Tool( name=self.name, description=self.description, inputSchema={ "type": "object", "properties": { "limit": { "type": "integer", "description": "Maximum number of slow queries to return (default: 10)", "default": 10, "minimum": 1, "maximum": 100 }, "min_calls": { "type": "integer", "description": "Minimum number of calls for a query to be included (default: 1)", "default": 1, "minimum": 1 }, "min_mean_time_ms": { "type": "number", "description": "Minimum mean (average) execution time in milliseconds (default: 0)", "default": 0 }, "order_by": { "type": "string", "description": "Column to order results by", "enum": ["mean_time", "calls", "rows"], "default": "mean_time" } }, "required": [] }, annotations=self.get_annotations() ) async def run_tool(self, arguments: dict[str, Any]) -> Sequence[TextContent]: try: limit = arguments.get("limit", 10) min_calls = arguments.get("min_calls", 1) min_mean_time_ms = arguments.get("min_mean_time_ms", 0) order_by = arguments.get("order_by", "mean_time") # Map order_by to actual column names order_map = { "mean_time": "mean_exec_time", "calls": "calls", "rows": "rows" } order_column = order_map.get(order_by, "mean_exec_time") # Check if pg_stat_statements is available check_query = """ SELECT EXISTS ( SELECT 1 FROM pg_extension WHERE extname = 'pg_stat_statements' ) as available """ check_result = await self.sql_driver.execute_query(check_query) if not check_result or not check_result[0].get("available"): return self.format_result( "Error: pg_stat_statements extension is not installed.\n" "Install it with: CREATE EXTENSION pg_stat_statements;\n" "Note: You may need to add it to shared_preload_libraries in postgresql.conf" ) # Get user filter for excluding specific user IDs user_filter = get_user_filter() statements_filter = user_filter.get_statements_filter() # Query pg_stat_statements for slow queries # Using pg_stat_statements columns available in PostgreSQL 13+ # Excludes system catalog queries to focus on user/application queries query = f""" SELECT queryid, LEFT(query, 500) as query_text, calls, ROUND(mean_exec_time::numeric, 2) as mean_time_ms, ROUND(min_exec_time::numeric, 2) as min_time_ms, ROUND(max_exec_time::numeric, 2) as max_time_ms, ROUND(stddev_exec_time::numeric, 2) as stddev_time_ms, rows, shared_blks_hit, shared_blks_read, CASE WHEN shared_blks_hit + shared_blks_read > 0 THEN ROUND(100.0 * shared_blks_hit / (shared_blks_hit + shared_blks_read), 2) ELSE 100 END as cache_hit_ratio, temp_blks_read, temp_blks_written FROM pg_stat_statements WHERE calls >= %s AND mean_exec_time >= %s AND query NOT LIKE '%%pg_stat_statements%%' AND query NOT LIKE '%%pg_catalog%%' AND query NOT LIKE '%%information_schema%%' AND query NOT LIKE '%%pg_toast%%' {statements_filter} ORDER BY {order_column} DESC LIMIT %s """ results = await self.sql_driver.execute_query( query, [min_calls, min_mean_time_ms, limit] ) if not results: return self.format_result( "No slow queries found matching the criteria.\n" "This could mean:\n" "- pg_stat_statements has been recently reset\n" "- No queries exceed the minimum thresholds\n" "- The database has low query activity" ) # Format results output = { "summary": { "total_queries_returned": len(results), "filters_applied": { "min_calls": min_calls, "min_mean_time_ms": min_mean_time_ms, "order_by": order_by } }, "slow_queries": results } return self.format_json_result(output) except Exception as e: return self.format_error(e) class AnalyzeQueryToolHandler(ToolHandler): """Tool handler for analyzing a query's execution plan and performance.""" name = "analyze_query" title = "Query Execution Analyzer" read_only_hint = False # EXPLAIN ANALYZE actually executes the query destructive_hint = False # Read queries are safe, but DML could be destructive idempotent_hint = True open_world_hint = False description = """Analyze a SQL query's execution plan and performance characteristics. Uses EXPLAIN ANALYZE to execute the query and capture detailed timing information. Provides analysis of: - Execution plan with actual vs estimated rows - Timing breakdown by operation - Buffer usage and I/O statistics - Potential performance issues and recommendations WARNING: This actually executes the query! For SELECT queries this is safe, but be careful with INSERT/UPDATE/DELETE - use analyze_only=false for those.""" def __init__(self, sql_driver: SqlDriver): self.sql_driver = sql_driver def get_tool_definition(self) -> Tool: return Tool( name=self.name, description=self.description, inputSchema={ "type": "object", "properties": { "query": { "type": "string", "description": "The SQL query to analyze" }, "analyze": { "type": "boolean", "description": "Whether to actually execute the query (EXPLAIN ANALYZE vs EXPLAIN)", "default": True }, "buffers": { "type": "boolean", "description": "Include buffer usage statistics", "default": True }, "verbose": { "type": "boolean", "description": "Include verbose output with additional details", "default": False }, "format": { "type": "string", "description": "Output format for the execution plan", "enum": ["text", "json", "yaml", "xml"], "default": "json" }, "settings": { "type": "boolean", "description": "Include information about configuration parameters", "default": False } }, "required": ["query"] }, annotations=self.get_annotations() ) async def run_tool(self, arguments: dict[str, Any]) -> Sequence[TextContent]: try: self.validate_required_args(arguments, ["query"]) query = arguments["query"] analyze = arguments.get("analyze", True) buffers = arguments.get("buffers", True) verbose = arguments.get("verbose", False) output_format = arguments.get("format", "json") settings = arguments.get("settings", False) # Build EXPLAIN options options = [] if analyze: options.append("ANALYZE") if buffers: options.append("BUFFERS") if verbose: options.append("VERBOSE") if settings: options.append("SETTINGS") options.append(f"FORMAT {output_format.upper()}") options_str = ", ".join(options) explain_query = f"EXPLAIN ({options_str}) {query}" # Execute EXPLAIN results = await self.sql_driver.execute_query(explain_query) if not results: return self.format_result("No execution plan returned") # For JSON format, parse and analyze the plan if output_format == "json": # The result comes as a list with QUERY PLAN column plan_data = results[0].get("QUERY PLAN", results) # If it's a string, parse it if isinstance(plan_data, str): plan_data = json.loads(plan_data) analysis = self._analyze_plan(plan_data, analyze) output = { "query": query, "explain_options": { "analyze": analyze, "buffers": buffers, "verbose": verbose, "format": output_format }, "execution_plan": plan_data, "analysis": analysis } return self.format_json_result(output) else: # For text/yaml/xml, return as-is plan_text = "\n".join( str(row.get("QUERY PLAN", row)) for row in results ) return self.format_result(f"Query: {query}\n\nExecution Plan:\n{plan_text}") except Exception as e: return self.format_error(e) def _analyze_plan(self, plan_data: Any, was_analyzed: bool) -> dict[str, Any]: """Analyze an execution plan and extract insights.""" analysis = { "warnings": [], "recommendations": [], "statistics": {} } if not plan_data: return analysis # Handle the plan structure (it's usually a list with one element) if isinstance(plan_data, list) and len(plan_data) > 0: plan = plan_data[0].get("Plan", plan_data[0]) else: plan = plan_data.get("Plan", plan_data) # Extract top-level statistics if was_analyzed: if "Execution Time" in plan_data[0] if isinstance(plan_data, list) else plan_data: exec_time = (plan_data[0] if isinstance(plan_data, list) else plan_data).get("Execution Time", 0) analysis["statistics"]["execution_time_ms"] = exec_time if "Planning Time" in (plan_data[0] if isinstance(plan_data, list) else plan_data): plan_time = (plan_data[0] if isinstance(plan_data, list) else plan_data).get("Planning Time", 0) analysis["statistics"]["planning_time_ms"] = plan_time # Analyze the plan recursively self._analyze_node(plan, analysis) return analysis def _analyze_node(self, node: dict[str, Any], analysis: dict[str, Any], depth: int = 0) -> None: """Recursively analyze plan nodes for issues.""" if not isinstance(node, dict): return node_type = node.get("Node Type", "Unknown") # Check for sequential scans on large tables if node_type == "Seq Scan": rows = node.get("Actual Rows", node.get("Plan Rows", 0)) if rows > 10000: table = node.get("Relation Name", "unknown") analysis["warnings"].append( f"Sequential scan on '{table}' returned {rows} rows - consider adding an index" ) filter_cond = node.get("Filter") if filter_cond: analysis["recommendations"].append( f"Consider creating an index for filter condition: {filter_cond}" ) # Check for row estimate mismatches actual_rows = node.get("Actual Rows") plan_rows = node.get("Plan Rows") if actual_rows is not None and plan_rows is not None and plan_rows > 0: ratio = actual_rows / plan_rows if ratio > 10 or ratio < 0.1: analysis["warnings"].append( f"{node_type}: Row estimate mismatch - planned {plan_rows}, actual {actual_rows} " f"(ratio: {ratio:.2f}). Consider running ANALYZE on the table." ) # Check for hash operations with high memory usage if "Hash" in node_type: batches = node.get("Hash Batches", 1) if batches > 1: analysis["warnings"].append( f"{node_type} spilled to disk ({batches} batches). " "Consider increasing work_mem or optimizing the query." ) # Check for sorts that spill to disk if node_type == "Sort": sort_method = node.get("Sort Method", "") if "external" in sort_method.lower(): analysis["warnings"].append( f"Sort operation spilled to disk ({sort_method}). " "Consider increasing work_mem." ) # Check for nested loops with many iterations if node_type == "Nested Loop": actual_loops = node.get("Actual Loops", 1) if actual_loops > 1000: analysis["warnings"].append( f"Nested Loop executed {actual_loops} times - consider using a different join strategy" ) # Recursively analyze child nodes for child in node.get("Plans", []): self._analyze_node(child, analysis, depth + 1) class TableStatsToolHandler(ToolHandler): """Tool handler for retrieving table statistics and health metrics.""" name = "get_table_stats" title = "Table Statistics Analyzer" read_only_hint = True destructive_hint = False idempotent_hint = True open_world_hint = False description = """Get detailed statistics for user/client database tables. Note: This tool analyzes only user-created tables and excludes PostgreSQL system tables (pg_catalog, information_schema, pg_toast). This focuses the analysis on your application's custom tables. Returns information about: - Table size (data, indexes, total) - Row counts and dead tuple ratio - Last vacuum and analyze times - Sequential vs index scan ratios - Cache hit ratios This helps identify tables that may need maintenance (VACUUM, ANALYZE) or have performance issues.""" def __init__(self, sql_driver: SqlDriver): self.sql_driver = sql_driver def get_tool_definition(self) -> Tool: return Tool( name=self.name, description=self.description, inputSchema={ "type": "object", "properties": { "schema_name": { "type": "string", "description": "Schema to analyze (default: public)", "default": "public" }, "table_name": { "type": "string", "description": "Specific table to analyze (optional, analyzes all tables if not provided)" }, "include_indexes": { "type": "boolean", "description": "Include index statistics", "default": True }, "order_by": { "type": "string", "description": "Order results by this metric", "enum": ["size", "rows", "dead_tuples", "seq_scans", "last_vacuum"], "default": "size" } }, "required": [] }, annotations=self.get_annotations() ) async def run_tool(self, arguments: dict[str, Any]) -> Sequence[TextContent]: try: schema_name = arguments.get("schema_name", "public") table_name = arguments.get("table_name") include_indexes = arguments.get("include_indexes", True) order_by = arguments.get("order_by", "size") # Build the query order_map = { "size": "total_size DESC", "rows": "n_live_tup DESC", "dead_tuples": "n_dead_tup DESC", "seq_scans": "seq_scan DESC", "last_vacuum": "last_vacuum DESC NULLS LAST" } order_clause = order_map.get(order_by, "total_size DESC") table_filter = "" params = [schema_name] if table_name: table_filter = "AND c.relname ILIKE %s" params.append(table_name) # Query only user tables, explicitly excluding system schemas query = f""" SELECT c.relname as table_name, n.nspname as schema_name, pg_size_pretty(pg_table_size(c.oid)) as table_size, pg_size_pretty(pg_indexes_size(c.oid)) as indexes_size, pg_size_pretty(pg_total_relation_size(c.oid)) as total_size, pg_total_relation_size(c.oid) as total_size_bytes, s.n_live_tup, s.n_dead_tup, CASE WHEN s.n_live_tup > 0 THEN ROUND(100.0 * s.n_dead_tup / s.n_live_tup, 2) ELSE 0 END as dead_tuple_ratio, s.seq_scan, s.seq_tup_read, s.idx_scan, s.idx_tup_fetch, CASE WHEN s.seq_scan + COALESCE(s.idx_scan, 0) > 0 THEN ROUND(100.0 * COALESCE(s.idx_scan, 0) / (s.seq_scan + COALESCE(s.idx_scan, 0)), 2) ELSE 0 END as index_scan_ratio, s.last_vacuum, s.last_autovacuum, s.last_analyze, s.last_autoanalyze, s.vacuum_count, s.autovacuum_count, s.analyze_count, s.autoanalyze_count FROM pg_class c JOIN pg_namespace n ON n.oid = c.relnamespace LEFT JOIN pg_stat_user_tables s ON s.relid = c.oid WHERE c.relkind = 'r' AND n.nspname = %s AND n.nspname NOT IN ('pg_catalog', 'information_schema', 'pg_toast') {table_filter} ORDER BY {order_clause} """ results = await self.sql_driver.execute_query(query, params) if not results: return self.format_result(f"No tables found in schema '{schema_name}'") output = { "schema": schema_name, "table_count": len(results), "tables": results } # Add index statistics if requested if include_indexes and table_name: index_query = """ SELECT i.indexrelname as index_name, i.idx_scan as scans, i.idx_tup_read as tuples_read, i.idx_tup_fetch as tuples_fetched, pg_size_pretty(pg_relation_size(i.indexrelid)) as size, pg_relation_size(i.indexrelid) as size_bytes, pg_get_indexdef(i.indexrelid) as definition FROM pg_stat_user_indexes i JOIN pg_class c ON c.oid = i.relid JOIN pg_namespace n ON n.oid = c.relnamespace WHERE n.nspname = %s AND c.relname = %s ORDER BY i.idx_scan DESC """ index_results = await self.sql_driver.execute_query( index_query, [schema_name, table_name] ) output["indexes"] = index_results # Add analysis and recommendations output["analysis"] = self._analyze_stats(results) return self.format_json_result(output) except Exception as e: return self.format_error(e) def _analyze_stats(self, tables: list[dict]) -> dict[str, Any]: """Analyze table stats and generate recommendations.""" analysis = { "needs_vacuum": [], "needs_analyze": [], "low_index_usage": [], "recommendations": [] } for table in tables: table_name = table.get("table_name", "unknown") # Check dead tuple ratio dead_ratio = table.get("dead_tuple_ratio", 0) or 0 if dead_ratio > 10: analysis["needs_vacuum"].append({ "table": table_name, "dead_tuple_ratio": dead_ratio, "dead_tuples": table.get("n_dead_tup", 0) }) # Check if analyze is needed last_analyze = table.get("last_analyze") or table.get("last_autoanalyze") n_live = table.get("n_live_tup", 0) or 0 if n_live > 1000 and not last_analyze: analysis["needs_analyze"].append(table_name) # Check index usage idx_ratio = table.get("index_scan_ratio", 0) or 0 seq_scans = table.get("seq_scan", 0) or 0 if seq_scans > 100 and idx_ratio < 50 and n_live > 10000: analysis["low_index_usage"].append({ "table": table_name, "index_scan_ratio": idx_ratio, "seq_scans": seq_scans, "rows": n_live }) # Generate recommendations if analysis["needs_vacuum"]: tables_list = ", ".join(t["table"] for t in analysis["needs_vacuum"][:5]) analysis["recommendations"].append( f"Run VACUUM on tables with high dead tuple ratios: {tables_list}" ) if analysis["needs_analyze"]: tables_list = ", ".join(analysis["needs_analyze"][:5]) analysis["recommendations"].append( f"Run ANALYZE on tables that haven't been analyzed: {tables_list}" ) if analysis["low_index_usage"]: for item in analysis["low_index_usage"][:3]: analysis["recommendations"].append( f"Table '{item['table']}' has low index usage ({item['index_scan_ratio']}% index scans). " "Consider adding indexes for frequently filtered columns." ) return analysis

Implementation Reference

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/isdaniel/pgtuner-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server