Skip to main content
Glama
isdaniel

PostgreSQL-Performance-Tuner-Mcp

tools_performance.py50.4 kB
"""Performance analysis tool handlers.""" from __future__ import annotations import json from collections.abc import Sequence from typing import Any from mcp.types import TextContent, Tool from ..services import SqlDriver, get_user_filter from .toolhandler import ToolHandler class GetSlowQueriesToolHandler(ToolHandler): """Tool handler for retrieving slow queries from pg_stat_statements.""" name = "get_slow_queries" title = "Slow Query Analyzer" read_only_hint = True destructive_hint = False idempotent_hint = True open_world_hint = False description = """Retrieve slow queries from PostgreSQL using pg_stat_statements. Returns the top N slowest queries ordered by mean (average) execution time. Requires the pg_stat_statements extension to be enabled. Note: This tool focuses on user/application queries only. System catalog queries (pg_catalog, information_schema, pg_toast) are automatically excluded. The results include: - Query text (normalized) - Number of calls - Mean execution time (average per call) - Min/Max execution time - Rows returned - Shared buffer hits/reads for cache analysis""" def __init__(self, sql_driver: SqlDriver): self.sql_driver = sql_driver def get_tool_definition(self) -> Tool: return Tool( name=self.name, description=self.description, inputSchema={ "type": "object", "properties": { "limit": { "type": "integer", "description": "Maximum number of slow queries to return (default: 10)", "default": 10, "minimum": 1, "maximum": 100 }, "min_calls": { "type": "integer", "description": "Minimum number of calls for a query to be included (default: 1)", "default": 1, "minimum": 1 }, "min_mean_time_ms": { "type": "number", "description": "Minimum mean (average) execution time in milliseconds (default: 0)", "default": 0 }, "order_by": { "type": "string", "description": "Column to order results by", "enum": ["mean_time", "calls", "rows"], "default": "mean_time" } }, "required": [] }, annotations=self.get_annotations() ) async def run_tool(self, arguments: dict[str, Any]) -> Sequence[TextContent]: try: limit = arguments.get("limit", 10) min_calls = arguments.get("min_calls", 1) min_mean_time_ms = arguments.get("min_mean_time_ms", 0) order_by = arguments.get("order_by", "mean_time") # Map order_by to actual column names (whitelist for SQL injection protection) order_map = { "mean_time": "mean_exec_time", "calls": "calls", "rows": "rows" } # Validate order_by against whitelist to prevent SQL injection if order_by not in order_map: order_by = "mean_time" order_column = order_map[order_by] # Check if pg_stat_statements is available check_query = """ SELECT EXISTS ( SELECT 1 FROM pg_extension WHERE extname = 'pg_stat_statements' ) as available """ check_result = await self.sql_driver.execute_query(check_query) if not check_result or not check_result[0].get("available"): return self.format_result( "Error: pg_stat_statements extension is not installed.\n" "Install it with: CREATE EXTENSION pg_stat_statements;\n" "Note: You may need to add it to shared_preload_libraries in postgresql.conf" ) # Get user filter for excluding specific user IDs user_filter = get_user_filter() statements_filter = user_filter.get_statements_filter() # Query pg_stat_statements for slow queries # Using pg_stat_statements columns available in PostgreSQL 13+ # Excludes system catalog queries to focus on user/application queries query = f""" SELECT queryid, LEFT(query, 500) as query_text, calls, ROUND(mean_exec_time::numeric, 2) as mean_time_ms, ROUND(min_exec_time::numeric, 2) as min_time_ms, ROUND(max_exec_time::numeric, 2) as max_time_ms, ROUND(stddev_exec_time::numeric, 2) as stddev_time_ms, rows, shared_blks_hit, shared_blks_read, CASE WHEN shared_blks_hit + shared_blks_read > 0 THEN ROUND(100.0 * shared_blks_hit / (shared_blks_hit + shared_blks_read), 2) ELSE 100 END as cache_hit_ratio, temp_blks_read, temp_blks_written FROM pg_stat_statements WHERE calls >= %s AND mean_exec_time >= %s AND query NOT LIKE '%%pg_stat_statements%%' AND query NOT LIKE '%%pg_catalog%%' AND query NOT LIKE '%%information_schema%%' AND query NOT LIKE '%%pg_toast%%' {statements_filter} ORDER BY {order_column} DESC LIMIT %s """ results = await self.sql_driver.execute_query( query, [min_calls, min_mean_time_ms, limit] ) if not results: return self.format_result( "No slow queries found matching the criteria.\n" "This could mean:\n" "- pg_stat_statements has been recently reset\n" "- No queries exceed the minimum thresholds\n" "- The database has low query activity" ) # Format results output = { "summary": { "total_queries_returned": len(results), "filters_applied": { "min_calls": min_calls, "min_mean_time_ms": min_mean_time_ms, "order_by": order_by } }, "slow_queries": results } return self.format_json_result(output) except Exception as e: return self.format_error(e) class AnalyzeQueryToolHandler(ToolHandler): """Tool handler for analyzing a query's execution plan and performance.""" name = "analyze_query" title = "Query Execution Analyzer" read_only_hint = False # EXPLAIN ANALYZE actually executes the query destructive_hint = False # Read queries are safe, but DML could be destructive idempotent_hint = True open_world_hint = False description = """Analyze a SQL query's execution plan and performance characteristics. Uses EXPLAIN ANALYZE to execute the query and capture detailed timing information. Provides analysis of: - Execution plan with actual vs estimated rows - Timing breakdown by operation - Buffer usage and I/O statistics - Potential performance issues and recommendations WARNING: This actually executes the query! For SELECT queries this is safe, but be careful with INSERT/UPDATE/DELETE - use analyze_only=false for those.""" def __init__(self, sql_driver: SqlDriver): self.sql_driver = sql_driver def get_tool_definition(self) -> Tool: return Tool( name=self.name, description=self.description, inputSchema={ "type": "object", "properties": { "query": { "type": "string", "description": "The SQL query to analyze" }, "analyze": { "type": "boolean", "description": "Whether to actually execute the query (EXPLAIN ANALYZE vs EXPLAIN)", "default": True }, "buffers": { "type": "boolean", "description": "Include buffer usage statistics", "default": True }, "verbose": { "type": "boolean", "description": "Include verbose output with additional details", "default": False }, "format": { "type": "string", "description": "Output format for the execution plan", "enum": ["text", "json", "yaml", "xml"], "default": "json" }, "settings": { "type": "boolean", "description": "Include information about configuration parameters", "default": False } }, "required": ["query"] }, annotations=self.get_annotations() ) async def run_tool(self, arguments: dict[str, Any]) -> Sequence[TextContent]: try: self.validate_required_args(arguments, ["query"]) query = arguments["query"] analyze = arguments.get("analyze", True) buffers = arguments.get("buffers", True) verbose = arguments.get("verbose", False) output_format = arguments.get("format", "json") settings = arguments.get("settings", False) # Validate output_format against whitelist (defense in depth) valid_formats = {"text", "json", "yaml", "xml"} if output_format.lower() not in valid_formats: output_format = "json" # Build EXPLAIN options options = [] if analyze: options.append("ANALYZE") if buffers: options.append("BUFFERS") if verbose: options.append("VERBOSE") if settings: options.append("SETTINGS") options.append(f"FORMAT {output_format.upper()}") options_str = ", ".join(options) explain_query = f"EXPLAIN ({options_str}) {query}" # Execute EXPLAIN results = await self.sql_driver.execute_query(explain_query) if not results: return self.format_result("No execution plan returned") # For JSON format, parse and analyze the plan if output_format == "json": # The result comes as a list with QUERY PLAN column plan_data = results[0].get("QUERY PLAN", results) # If it's a string, parse it if isinstance(plan_data, str): plan_data = json.loads(plan_data) analysis = self._analyze_plan(plan_data, analyze) output = { "query": query, "explain_options": { "analyze": analyze, "buffers": buffers, "verbose": verbose, "format": output_format }, "execution_plan": plan_data, "analysis": analysis } return self.format_json_result(output) else: # For text/yaml/xml, return as-is plan_text = "\n".join( str(row.get("QUERY PLAN", row)) for row in results ) return self.format_result(f"Query: {query}\n\nExecution Plan:\n{plan_text}") except Exception as e: return self.format_error(e) def _analyze_plan(self, plan_data: Any, was_analyzed: bool) -> dict[str, Any]: """Analyze an execution plan and extract insights.""" analysis = { "warnings": [], "recommendations": [], "statistics": {} } if not plan_data: return analysis # Handle the plan structure (it's usually a list with one element) if isinstance(plan_data, list) and len(plan_data) > 0: plan = plan_data[0].get("Plan", plan_data[0]) else: plan = plan_data.get("Plan", plan_data) # Extract top-level statistics if was_analyzed: if "Execution Time" in plan_data[0] if isinstance(plan_data, list) else plan_data: exec_time = (plan_data[0] if isinstance(plan_data, list) else plan_data).get("Execution Time", 0) analysis["statistics"]["execution_time_ms"] = exec_time if "Planning Time" in (plan_data[0] if isinstance(plan_data, list) else plan_data): plan_time = (plan_data[0] if isinstance(plan_data, list) else plan_data).get("Planning Time", 0) analysis["statistics"]["planning_time_ms"] = plan_time # Analyze the plan recursively self._analyze_node(plan, analysis) return analysis def _analyze_node(self, node: dict[str, Any], analysis: dict[str, Any], depth: int = 0) -> None: """Recursively analyze plan nodes for issues.""" if not isinstance(node, dict): return node_type = node.get("Node Type", "Unknown") # Check for sequential scans on large tables if node_type == "Seq Scan": rows = node.get("Actual Rows", node.get("Plan Rows", 0)) if rows > 10000: table = node.get("Relation Name", "unknown") analysis["warnings"].append( f"Sequential scan on '{table}' returned {rows} rows - consider adding an index" ) filter_cond = node.get("Filter") if filter_cond: analysis["recommendations"].append( f"Consider creating an index for filter condition: {filter_cond}" ) # Check for row estimate mismatches actual_rows = node.get("Actual Rows") plan_rows = node.get("Plan Rows") if actual_rows is not None and plan_rows is not None and plan_rows > 0: ratio = actual_rows / plan_rows if ratio > 10 or ratio < 0.1: analysis["warnings"].append( f"{node_type}: Row estimate mismatch - planned {plan_rows}, actual {actual_rows} " f"(ratio: {ratio:.2f}). Consider running ANALYZE on the table." ) # Check for hash operations with high memory usage if "Hash" in node_type: batches = node.get("Hash Batches", 1) if batches > 1: analysis["warnings"].append( f"{node_type} spilled to disk ({batches} batches). " "Consider increasing work_mem or optimizing the query." ) # Check for sorts that spill to disk if node_type == "Sort": sort_method = node.get("Sort Method", "") if "external" in sort_method.lower(): analysis["warnings"].append( f"Sort operation spilled to disk ({sort_method}). " "Consider increasing work_mem." ) # Check for nested loops with many iterations if node_type == "Nested Loop": actual_loops = node.get("Actual Loops", 1) if actual_loops > 1000: analysis["warnings"].append( f"Nested Loop executed {actual_loops} times - consider using a different join strategy" ) # Recursively analyze child nodes for child in node.get("Plans", []): self._analyze_node(child, analysis, depth + 1) class TableStatsToolHandler(ToolHandler): """Tool handler for retrieving table statistics and health metrics.""" name = "get_table_stats" title = "Table Statistics Analyzer" read_only_hint = True destructive_hint = False idempotent_hint = True open_world_hint = False description = """Get detailed statistics for user/client database tables. Note: This tool analyzes only user-created tables and excludes PostgreSQL system tables (pg_catalog, information_schema, pg_toast). This focuses the analysis on your application's custom tables. Returns information about: - Table size (data, indexes, total) - Row counts and dead tuple ratio - Last vacuum and analyze times - Sequential vs index scan ratios - Cache hit ratios This helps identify tables that may need maintenance (VACUUM, ANALYZE) or have performance issues.""" def __init__(self, sql_driver: SqlDriver): self.sql_driver = sql_driver def get_tool_definition(self) -> Tool: return Tool( name=self.name, description=self.description, inputSchema={ "type": "object", "properties": { "schema_name": { "type": "string", "description": "Schema to analyze (default: public)", "default": "public" }, "table_name": { "type": "string", "description": "Specific table to analyze (optional, analyzes all tables if not provided)" }, "include_indexes": { "type": "boolean", "description": "Include index statistics", "default": True }, "order_by": { "type": "string", "description": "Order results by this metric", "enum": ["size", "rows", "dead_tuples", "seq_scans", "last_vacuum"], "default": "size" } }, "required": [] }, annotations=self.get_annotations() ) async def run_tool(self, arguments: dict[str, Any]) -> Sequence[TextContent]: try: schema_name = arguments.get("schema_name", "public") table_name = arguments.get("table_name") include_indexes = arguments.get("include_indexes", True) order_by = arguments.get("order_by", "size") # Build the query with whitelist-validated order clause order_map = { "size": "total_size DESC", "rows": "n_live_tup DESC", "dead_tuples": "n_dead_tup DESC", "seq_scans": "seq_scan DESC", "last_vacuum": "last_vacuum DESC NULLS LAST" } # Validate order_by against whitelist to prevent SQL injection if order_by not in order_map: order_by = "size" order_clause = order_map[order_by] table_filter = "" params = [schema_name] if table_name: table_filter = "AND c.relname ILIKE %s" params.append(table_name) # Query only user tables, explicitly excluding system schemas query = f""" SELECT c.relname as table_name, n.nspname as schema_name, pg_size_pretty(pg_table_size(c.oid)) as table_size, pg_size_pretty(pg_indexes_size(c.oid)) as indexes_size, pg_size_pretty(pg_total_relation_size(c.oid)) as total_size, pg_total_relation_size(c.oid) as total_size_bytes, s.n_live_tup, s.n_dead_tup, CASE WHEN s.n_live_tup > 0 THEN ROUND(100.0 * s.n_dead_tup / s.n_live_tup, 2) ELSE 0 END as dead_tuple_ratio, s.seq_scan, s.seq_tup_read, s.idx_scan, s.idx_tup_fetch, CASE WHEN s.seq_scan + COALESCE(s.idx_scan, 0) > 0 THEN ROUND(100.0 * COALESCE(s.idx_scan, 0) / (s.seq_scan + COALESCE(s.idx_scan, 0)), 2) ELSE 0 END as index_scan_ratio, s.last_vacuum, s.last_autovacuum, s.last_analyze, s.last_autoanalyze, s.vacuum_count, s.autovacuum_count, s.analyze_count, s.autoanalyze_count FROM pg_class c JOIN pg_namespace n ON n.oid = c.relnamespace LEFT JOIN pg_stat_user_tables s ON s.relid = c.oid WHERE c.relkind = 'r' AND n.nspname = %s AND n.nspname NOT IN ('pg_catalog', 'information_schema', 'pg_toast') {table_filter} ORDER BY {order_clause} """ results = await self.sql_driver.execute_query(query, params) if not results: return self.format_result(f"No tables found in schema '{schema_name}'") output = { "schema": schema_name, "table_count": len(results), "tables": results } # Add index statistics if requested if include_indexes and table_name: index_query = """ SELECT i.indexrelname as index_name, i.idx_scan as scans, i.idx_tup_read as tuples_read, i.idx_tup_fetch as tuples_fetched, pg_size_pretty(pg_relation_size(i.indexrelid)) as size, pg_relation_size(i.indexrelid) as size_bytes, pg_get_indexdef(i.indexrelid) as definition FROM pg_stat_user_indexes i JOIN pg_class c ON c.oid = i.relid JOIN pg_namespace n ON n.oid = c.relnamespace WHERE n.nspname = %s AND c.relname = %s ORDER BY i.idx_scan DESC """ index_results = await self.sql_driver.execute_query( index_query, [schema_name, table_name] ) output["indexes"] = index_results # Add analysis and recommendations output["analysis"] = self._analyze_stats(results) return self.format_json_result(output) except Exception as e: return self.format_error(e) def _analyze_stats(self, tables: list[dict]) -> dict[str, Any]: """Analyze table stats and generate recommendations.""" analysis = { "needs_vacuum": [], "needs_analyze": [], "low_index_usage": [], "recommendations": [] } for table in tables: table_name = table.get("table_name", "unknown") # Check dead tuple ratio dead_ratio = table.get("dead_tuple_ratio", 0) or 0 if dead_ratio > 10: analysis["needs_vacuum"].append({ "table": table_name, "dead_tuple_ratio": dead_ratio, "dead_tuples": table.get("n_dead_tup", 0) }) # Check if analyze is needed last_analyze = table.get("last_analyze") or table.get("last_autoanalyze") n_live = table.get("n_live_tup", 0) or 0 if n_live > 1000 and not last_analyze: analysis["needs_analyze"].append(table_name) # Check index usage idx_ratio = table.get("index_scan_ratio", 0) or 0 seq_scans = table.get("seq_scan", 0) or 0 if seq_scans > 100 and idx_ratio < 50 and n_live > 10000: analysis["low_index_usage"].append({ "table": table_name, "index_scan_ratio": idx_ratio, "seq_scans": seq_scans, "rows": n_live }) # Generate recommendations if analysis["needs_vacuum"]: tables_list = ", ".join(t["table"] for t in analysis["needs_vacuum"][:5]) analysis["recommendations"].append( f"Run VACUUM on tables with high dead tuple ratios: {tables_list}" ) if analysis["needs_analyze"]: tables_list = ", ".join(analysis["needs_analyze"][:5]) analysis["recommendations"].append( f"Run ANALYZE on tables that haven't been analyzed: {tables_list}" ) if analysis["low_index_usage"]: for item in analysis["low_index_usage"][:3]: analysis["recommendations"].append( f"Table '{item['table']}' has low index usage ({item['index_scan_ratio']}% index scans). " "Consider adding indexes for frequently filtered columns." ) return analysis class DiskIOPatternToolHandler(ToolHandler): """Tool handler for analyzing disk I/O patterns and identifying bottlenecks.""" name = "analyze_disk_io_patterns" title = "Disk I/O Pattern Analyzer" read_only_hint = True destructive_hint = False idempotent_hint = True open_world_hint = False description = """Analyze disk I/O read/write patterns in PostgreSQL. Note: This tool focuses on user/client tables and excludes PostgreSQL system tables (pg_catalog, information_schema, pg_toast) from analysis. This tool provides comprehensive I/O analysis including: - Buffer pool I/O statistics (hits vs reads) - Table and index I/O patterns (sequential vs random reads) - Backend vs background writer I/O distribution - Temporary file I/O usage - Checkpointer I/O statistics - Per-table read/write hotspots For PostgreSQL 16+, additional pg_stat_io metrics are available. Use this to identify: - Tables with high I/O activity (hot tables) - I/O bottlenecks and cache inefficiencies - Sequential scan heavy workloads - Temporary file spills indicating work_mem issues""" def __init__(self, sql_driver: SqlDriver): self.sql_driver = sql_driver def get_tool_definition(self) -> Tool: return Tool( name=self.name, description=self.description, inputSchema={ "type": "object", "properties": { "schema_name": { "type": "string", "description": "Schema to analyze (default: public)", "default": "public" }, "include_indexes": { "type": "boolean", "description": "Include index I/O statistics", "default": True }, "top_n": { "type": "integer", "description": "Number of top tables to return by I/O activity", "default": 20, "minimum": 1, "maximum": 100 }, "analysis_type": { "type": "string", "description": "Type of I/O analysis to perform", "enum": ["all", "tables", "indexes", "buffer_pool", "temp_files", "checkpoints"], "default": "all" }, "min_size_gb": { "type": "number", "description": "Minimum table/index size in GB to include in analysis (default: 1)", "default": 1, "minimum": 0 } }, "required": [] }, annotations=self.get_annotations() ) async def run_tool(self, arguments: dict[str, Any]) -> Sequence[TextContent]: try: schema_name = arguments.get("schema_name", "public") include_indexes = arguments.get("include_indexes", True) top_n = arguments.get("top_n", 20) analysis_type = arguments.get("analysis_type", "all") min_size_gb = arguments.get("min_size_gb", 1) output: dict[str, Any] = { "schema": schema_name, "analysis_type": analysis_type, "io_patterns": {}, "analysis": { "issues": [], "recommendations": [] } } # Collect different types of I/O statistics based on analysis_type if analysis_type in ("all", "buffer_pool"): await self._analyze_buffer_pool(output) if analysis_type in ("all", "tables"): await self._analyze_table_io(output, schema_name, top_n, min_size_gb) if analysis_type in ("all", "indexes") and include_indexes: await self._analyze_index_io(output, schema_name, top_n, min_size_gb) if analysis_type in ("all", "temp_files"): await self._analyze_temp_files(output) if analysis_type in ("all", "checkpoints"): await self._analyze_checkpoint_io(output) # Check for pg_stat_io availability (PostgreSQL 16+) await self._analyze_pg_stat_io(output) # Generate summary and recommendations self._generate_io_recommendations(output) return self.format_json_result(output) except Exception as e: return self.format_error(e) async def _analyze_buffer_pool(self, output: dict[str, Any]) -> None: """Analyze buffer pool I/O statistics.""" query = """ SELECT sum(heap_blks_read) as heap_blocks_read, sum(heap_blks_hit) as heap_blocks_hit, sum(idx_blks_read) as index_blocks_read, sum(idx_blks_hit) as index_blocks_hit, sum(toast_blks_read) as toast_blocks_read, sum(toast_blks_hit) as toast_blocks_hit, sum(tidx_blks_read) as toast_index_blocks_read, sum(tidx_blks_hit) as toast_index_blocks_hit, CASE WHEN sum(heap_blks_hit) + sum(heap_blks_read) > 0 THEN ROUND(100.0 * sum(heap_blks_hit) / (sum(heap_blks_hit) + sum(heap_blks_read)), 2) ELSE 100 END as heap_hit_ratio, CASE WHEN sum(idx_blks_hit) + sum(idx_blks_read) > 0 THEN ROUND(100.0 * sum(idx_blks_hit) / (sum(idx_blks_hit) + sum(idx_blks_read)), 2) ELSE 100 END as index_hit_ratio FROM pg_statio_user_tables """ result = await self.sql_driver.execute_query(query) if result: row = result[0] heap_hit = row.get("heap_hit_ratio") or 100 idx_hit = row.get("index_hit_ratio") or 100 output["io_patterns"]["buffer_pool"] = { "heap_blocks_read": row.get("heap_blocks_read") or 0, "heap_blocks_hit": row.get("heap_blocks_hit") or 0, "heap_hit_ratio": heap_hit, "index_blocks_read": row.get("index_blocks_read") or 0, "index_blocks_hit": row.get("index_blocks_hit") or 0, "index_hit_ratio": idx_hit, "toast_blocks_read": row.get("toast_blocks_read") or 0, "toast_blocks_hit": row.get("toast_blocks_hit") or 0 } # Check for cache issues if heap_hit < 90: output["analysis"]["issues"].append( f"Low heap buffer cache hit ratio: {heap_hit}%" ) output["analysis"]["recommendations"].append( "Consider increasing shared_buffers to improve cache hit ratio" ) if idx_hit < 95: output["analysis"]["issues"].append( f"Low index buffer cache hit ratio: {idx_hit}%" ) output["analysis"]["recommendations"].append( "Ensure frequently accessed indexes fit in buffer cache" ) async def _analyze_table_io(self, output: dict[str, Any], schema_name: str, top_n: int, min_size_gb: float = 1) -> None: """Analyze table-level I/O patterns.""" min_size_bytes = int(min_size_gb * 1024 * 1024 * 1024) query = """ SELECT s.schemaname, s.relname as table_name, s.heap_blks_read, s.heap_blks_hit, CASE WHEN s.heap_blks_hit + s.heap_blks_read > 0 THEN ROUND(100.0 * s.heap_blks_hit / (s.heap_blks_hit + s.heap_blks_read), 2) ELSE 100 END as heap_hit_ratio, s.idx_blks_read, s.idx_blks_hit, CASE WHEN s.idx_blks_hit + s.idx_blks_read > 0 THEN ROUND(100.0 * s.idx_blks_hit / (s.idx_blks_hit + s.idx_blks_read), 2) ELSE 100 END as idx_hit_ratio, s.heap_blks_read + COALESCE(s.idx_blks_read, 0) as total_reads, s.heap_blks_hit + COALESCE(s.idx_blks_hit, 0) as total_hits, pg_total_relation_size(c.oid) as table_size_bytes FROM pg_statio_user_tables s JOIN pg_class c ON c.oid = s.relid JOIN pg_namespace n ON n.oid = c.relnamespace AND n.nspname = s.schemaname WHERE s.schemaname = %s AND s.schemaname NOT IN ('pg_catalog', 'information_schema', 'pg_toast') AND pg_total_relation_size(c.oid) >= %s ORDER BY (s.heap_blks_read + COALESCE(s.idx_blks_read, 0)) DESC LIMIT %s """ result = await self.sql_driver.execute_query(query, [schema_name, min_size_bytes, top_n]) # Also get sequential vs index scan patterns scan_query = """ SELECT s.schemaname, s.relname as table_name, s.seq_scan, s.seq_tup_read, s.idx_scan, s.idx_tup_fetch, CASE WHEN s.seq_scan + COALESCE(s.idx_scan, 0) > 0 THEN ROUND(100.0 * s.seq_scan / (s.seq_scan + COALESCE(s.idx_scan, 0)), 2) ELSE 0 END as seq_scan_ratio, s.n_live_tup, s.n_dead_tup FROM pg_stat_user_tables s JOIN pg_class c ON c.oid = s.relid JOIN pg_namespace n ON n.oid = c.relnamespace AND n.nspname = s.schemaname WHERE s.schemaname = %s AND s.schemaname NOT IN ('pg_catalog', 'information_schema', 'pg_toast') AND pg_total_relation_size(c.oid) >= %s ORDER BY s.seq_tup_read DESC LIMIT %s """ scan_result = await self.sql_driver.execute_query(scan_query, [schema_name, min_size_bytes, top_n]) # Combine results tables = [] scan_data = {r["table_name"]: r for r in scan_result} if scan_result else {} if result: for row in result: table_name = row.get("table_name") scan_info = scan_data.get(table_name, {}) tables.append({ "table_name": table_name, "heap_blocks_read": row.get("heap_blks_read") or 0, "heap_blocks_hit": row.get("heap_blks_hit") or 0, "heap_hit_ratio": row.get("heap_hit_ratio") or 100, "index_blocks_read": row.get("idx_blks_read") or 0, "index_blocks_hit": row.get("idx_blks_hit") or 0, "index_hit_ratio": row.get("idx_hit_ratio") or 100, "total_physical_reads": row.get("total_reads") or 0, "seq_scans": scan_info.get("seq_scan") or 0, "seq_tuples_read": scan_info.get("seq_tup_read") or 0, "idx_scans": scan_info.get("idx_scan") or 0, "idx_tuples_fetched": scan_info.get("idx_tup_fetch") or 0, "seq_scan_ratio": scan_info.get("seq_scan_ratio") or 0, "live_tuples": scan_info.get("n_live_tup") or 0 }) # Identify hot tables with low cache hit heap_hit = row.get("heap_hit_ratio") or 100 total_reads = row.get("total_reads") or 0 if total_reads > 1000 and heap_hit < 85: output["analysis"]["issues"].append( f"Table '{table_name}' has high I/O ({total_reads} reads) with low cache hit ({heap_hit}%)" ) # Identify sequential scan heavy tables seq_ratio = scan_info.get("seq_scan_ratio") or 0 live_tuples = scan_info.get("n_live_tup") or 0 if seq_ratio > 80 and live_tuples > 10000: output["analysis"]["issues"].append( f"Table '{table_name}' has {seq_ratio}% sequential scans with {live_tuples} rows" ) output["analysis"]["recommendations"].append( f"Consider adding indexes to table '{table_name}' to reduce sequential scans" ) output["io_patterns"]["tables"] = { "count": len(tables), "top_tables_by_io": tables } async def _analyze_index_io(self, output: dict[str, Any], schema_name: str, top_n: int, min_size_gb: float = 1) -> None: """Analyze index-level I/O patterns.""" min_size_bytes = int(min_size_gb * 1024 * 1024 * 1024) query = """ SELECT s.schemaname, s.relname as table_name, s.indexrelname as index_name, s.idx_blks_read, s.idx_blks_hit, CASE WHEN s.idx_blks_hit + s.idx_blks_read > 0 THEN ROUND(100.0 * s.idx_blks_hit / (s.idx_blks_hit + s.idx_blks_read), 2) ELSE 100 END as hit_ratio, pg_relation_size(s.indexrelid) as index_size_bytes FROM pg_statio_user_indexes s JOIN pg_class i ON i.oid = s.indexrelid JOIN pg_namespace n ON n.oid = i.relnamespace AND n.nspname = s.schemaname WHERE s.schemaname = %s AND s.schemaname NOT IN ('pg_catalog', 'information_schema', 'pg_toast') AND pg_relation_size(s.indexrelid) >= %s ORDER BY s.idx_blks_read DESC LIMIT %s """ result = await self.sql_driver.execute_query(query, [schema_name, min_size_bytes, top_n]) if result: indexes = [] for row in result: indexes.append({ "index_name": row.get("index_name"), "table_name": row.get("table_name"), "blocks_read": row.get("idx_blks_read") or 0, "blocks_hit": row.get("idx_blks_hit") or 0, "hit_ratio": row.get("hit_ratio") or 100 }) # Check for indexes with poor cache hit hit_ratio = row.get("hit_ratio") or 100 blocks_read = row.get("idx_blks_read") or 0 if blocks_read > 1000 and hit_ratio < 90: output["analysis"]["issues"].append( f"Index '{row.get('index_name')}' has high I/O with low cache hit ({hit_ratio}%)" ) output["io_patterns"]["indexes"] = { "count": len(indexes), "top_indexes_by_io": indexes } async def _analyze_temp_files(self, output: dict[str, Any]) -> None: """Analyze temporary file I/O usage.""" query = """ SELECT datname, temp_files, temp_bytes, pg_size_pretty(temp_bytes) as temp_size_pretty FROM pg_stat_database WHERE datname = current_database() """ result = await self.sql_driver.execute_query(query) if result: row = result[0] temp_files = row.get("temp_files") or 0 temp_bytes = row.get("temp_bytes") or 0 output["io_patterns"]["temp_files"] = { "temp_files_created": temp_files, "temp_bytes_written": temp_bytes, "temp_size_pretty": row.get("temp_size_pretty") or "0 bytes" } # Check for excessive temp file usage if temp_files > 1000: output["analysis"]["issues"].append( f"High temporary file usage: {temp_files} files created" ) output["analysis"]["recommendations"].append( "Consider increasing work_mem to reduce temporary file spills" ) if temp_bytes > 1024 * 1024 * 1024: # > 1GB output["analysis"]["issues"].append( f"Large temporary file I/O: {row.get('temp_size_pretty')}" ) output["analysis"]["recommendations"].append( "Review queries using sorts, hashes, or CTEs that may spill to disk" ) async def _analyze_checkpoint_io(self, output: dict[str, Any]) -> None: """Analyze checkpoint and background writer I/O.""" query = """ SELECT checkpoints_timed, checkpoints_req, checkpoint_write_time, checkpoint_sync_time, buffers_checkpoint, buffers_clean, buffers_backend, buffers_backend_fsync, buffers_alloc, CASE WHEN buffers_checkpoint + buffers_clean + buffers_backend > 0 THEN ROUND(100.0 * buffers_backend / (buffers_checkpoint + buffers_clean + buffers_backend), 2) ELSE 0 END as backend_write_ratio, stats_reset FROM pg_stat_bgwriter """ result = await self.sql_driver.execute_query(query) if result: row = result[0] backend_ratio = row.get("backend_write_ratio") or 0 buffers_backend_fsync = row.get("buffers_backend_fsync") or 0 checkpoints_req = row.get("checkpoints_req") or 0 checkpoints_timed = row.get("checkpoints_timed") or 0 output["io_patterns"]["checkpoints"] = { "checkpoints_timed": checkpoints_timed, "checkpoints_requested": checkpoints_req, "checkpoint_write_time_ms": row.get("checkpoint_write_time") or 0, "checkpoint_sync_time_ms": row.get("checkpoint_sync_time") or 0, "buffers_written_by_checkpoint": row.get("buffers_checkpoint") or 0, "buffers_written_by_bgwriter": row.get("buffers_clean") or 0, "buffers_written_by_backend": row.get("buffers_backend") or 0, "backend_fsync_count": buffers_backend_fsync, "buffers_allocated": row.get("buffers_alloc") or 0, "backend_write_ratio": backend_ratio, "stats_reset": str(row.get("stats_reset")) if row.get("stats_reset") else None } # Check for backend doing too much writing if backend_ratio > 20: output["analysis"]["issues"].append( f"Backend processes writing {backend_ratio}% of buffers (should be near 0)" ) output["analysis"]["recommendations"].append( "Increase shared_buffers and bgwriter_lru_maxpages to reduce backend writes" ) # Check for backend fsyncs (very bad for performance) if buffers_backend_fsync > 0: output["analysis"]["issues"].append( f"Backend processes performed {buffers_backend_fsync} fsync calls (should be 0)" ) output["analysis"]["recommendations"].append( "This indicates severe I/O performance issues - check storage and increase checkpointing" ) # Check for too many requested checkpoints total_checkpoints = checkpoints_timed + checkpoints_req if total_checkpoints > 0 and checkpoints_req > checkpoints_timed: output["analysis"]["issues"].append( f"More requested checkpoints ({checkpoints_req}) than timed ({checkpoints_timed})" ) output["analysis"]["recommendations"].append( "Increase max_wal_size and checkpoint_timeout to reduce checkpoint frequency" ) async def _analyze_pg_stat_io(self, output: dict[str, Any]) -> None: """Analyze pg_stat_io for PostgreSQL 16+ detailed I/O statistics.""" # Check if pg_stat_io exists (PostgreSQL 16+) check_query = """ SELECT EXISTS ( SELECT 1 FROM pg_catalog.pg_class c JOIN pg_catalog.pg_namespace n ON n.oid = c.relnamespace WHERE n.nspname = 'pg_catalog' AND c.relname = 'pg_stat_io' ) as available """ check_result = await self.sql_driver.execute_query(check_query) if not check_result or not check_result[0].get("available"): output["io_patterns"]["pg_stat_io"] = { "available": False, "message": "pg_stat_io is available in PostgreSQL 16+ for more detailed I/O statistics" } return # Query pg_stat_io for detailed breakdown query = """ SELECT backend_type, object, context, reads, read_time, writes, write_time, writebacks, writeback_time, extends, extend_time, hits, evictions, reuses, fsyncs, fsync_time FROM pg_stat_io WHERE reads > 0 OR writes > 0 ORDER BY reads + writes DESC LIMIT 20 """ result = await self.sql_driver.execute_query(query) if result: output["io_patterns"]["pg_stat_io"] = { "available": True, "io_by_backend_and_object": result } # Analyze for issues for row in result: backend = row.get("backend_type", "unknown") reads = row.get("reads") or 0 writes = row.get("writes") or 0 read_time = row.get("read_time") or 0 write_time = row.get("write_time") or 0 # Check for slow I/O if reads > 0 and read_time / reads > 10: # > 10ms average avg_time = read_time / reads output["analysis"]["issues"].append( f"Slow reads for {backend}: {avg_time:.2f}ms average" ) if writes > 0 and write_time / writes > 10: # > 10ms average avg_time = write_time / writes output["analysis"]["issues"].append( f"Slow writes for {backend}: {avg_time:.2f}ms average" ) def _generate_io_recommendations(self, output: dict[str, Any]) -> None: """Generate overall I/O recommendations based on analysis.""" issues = output["analysis"]["issues"] # Remove duplicates while preserving order seen_issues = set() unique_issues = [] for issue in issues: if issue not in seen_issues: seen_issues.add(issue) unique_issues.append(issue) output["analysis"]["issues"] = unique_issues # Remove duplicate recommendations seen_recs = set() unique_recs = [] for rec in output["analysis"]["recommendations"]: if rec not in seen_recs: seen_recs.add(rec) unique_recs.append(rec) output["analysis"]["recommendations"] = unique_recs # Add summary io_patterns = output["io_patterns"] summary = { "total_issues": len(unique_issues), "total_recommendations": len(unique_recs) } if "buffer_pool" in io_patterns: bp = io_patterns["buffer_pool"] summary["heap_cache_hit_ratio"] = bp.get("heap_hit_ratio") summary["index_cache_hit_ratio"] = bp.get("index_hit_ratio") if "temp_files" in io_patterns: tf = io_patterns["temp_files"] summary["temp_files_created"] = tf.get("temp_files_created") if "checkpoints" in io_patterns: cp = io_patterns["checkpoints"] summary["backend_write_ratio"] = cp.get("backend_write_ratio") output["summary"] = summary

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/isdaniel/pgtuner-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server