Skip to main content
Glama
isdaniel

PostgreSQL-Performance-Tuner-Mcp

analyze_index_bloat

Analyze PostgreSQL index bloat to identify fragmented indexes that need REINDEX operations, improve query performance, and reduce storage waste by examining leaf page density and fragmentation percentages.

Instructions

Analyze index bloat using pgstatindex from pgstattuple extension.

Note: This tool analyzes only user/client indexes and excludes PostgreSQL system indexes (pg_catalog, information_schema, pg_toast). This focuses the analysis on your application's custom indexes.

Uses pgstatindex to get B-tree index statistics including:

  • Leaf page density (avg_leaf_density) - lower values indicate more bloat

  • Fragmentation percentage

  • Empty and deleted pages

Helps identify indexes that:

  • Need REINDEX to improve performance

  • Have high fragmentation

  • Are wasting storage space

Requires the pgstattuple extension: CREATE EXTENSION IF NOT EXISTS pgstattuple;

Note: Also supports GIN indexes (pgstatginindex) and Hash indexes (pgstathashindex).

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
index_nameNoName of a specific index to analyze
table_nameNoAnalyze all indexes on this table
schema_nameNoSchema name (default: public)public
min_index_size_gbNoMinimum index size in GB to include (default: 5)
min_bloat_percentNoOnly show indexes with bloat above this percentage (default: 20)

Implementation Reference

  • IndexBloatToolHandler class: implements the core execution logic for analyze_index_bloat tool using pgstatindex for B-tree index bloat analysis, supporting single index, table indexes, and schema-wide scans.
    class IndexBloatToolHandler(ToolHandler): """Tool handler for analyzing index bloat using pgstatindex.""" name = "analyze_index_bloat" title = "Index Bloat Analyzer" read_only_hint = True destructive_hint = False idempotent_hint = True open_world_hint = False description = """Analyze index bloat using pgstatindex from pgstattuple extension. Note: This tool analyzes only user/client indexes and excludes PostgreSQL system indexes (pg_catalog, information_schema, pg_toast). This focuses the analysis on your application's custom indexes. Uses pgstatindex to get B-tree index statistics including: - Leaf page density (avg_leaf_density) - lower values indicate more bloat - Fragmentation percentage - Empty and deleted pages Helps identify indexes that: - Need REINDEX to improve performance - Have high fragmentation - Are wasting storage space Requires the pgstattuple extension: CREATE EXTENSION IF NOT EXISTS pgstattuple; Note: Also supports GIN indexes (pgstatginindex) and Hash indexes (pgstathashindex).""" def __init__(self, sql_driver: SqlDriver): self.sql_driver = sql_driver def get_tool_definition(self) -> Tool: return Tool( name=self.name, description=self.description, inputSchema={ "type": "object", "properties": { "index_name": { "type": "string", "description": "Name of a specific index to analyze" }, "table_name": { "type": "string", "description": "Analyze all indexes on this table" }, "schema_name": { "type": "string", "description": "Schema name (default: public)", "default": "public" }, "min_index_size_gb": { "type": "number", "description": "Minimum index size in GB to include (default: 5)", "default": 5 }, "min_bloat_percent": { "type": "number", "description": "Only show indexes with bloat above this percentage (default: 20)", "default": 20 } }, "required": [] }, annotations=self.get_annotations() ) async def run_tool(self, arguments: dict[str, Any]) -> Sequence[TextContent]: try: index_name = arguments.get("index_name") table_name = arguments.get("table_name") schema_name = arguments.get("schema_name", "public") min_size_gb = arguments.get("min_index_size_gb", 5) min_bloat_percent = arguments.get("min_bloat_percent", 20) # Check if pgstattuple extension is available ext_check = await self._check_extension() if not ext_check: return self.format_result( "Error: pgstattuple extension is not installed.\n" "Install it with: CREATE EXTENSION IF NOT EXISTS pgstattuple;" ) if index_name: # Analyze specific index result = await self._analyze_single_index(schema_name, index_name) elif table_name: # Analyze all indexes on a table result = await self._analyze_table_indexes( schema_name, table_name, min_size_gb, min_bloat_percent ) else: # Analyze all indexes in schema result = await self._analyze_schema_indexes( schema_name, min_size_gb, min_bloat_percent ) return self.format_json_result(result) except Exception as e: return self.format_error(e) async def _check_extension(self) -> bool: """Check if pgstattuple extension is available.""" query = """ SELECT EXISTS ( SELECT 1 FROM pg_extension WHERE extname = 'pgstattuple' ) as available """ result = await self.sql_driver.execute_query(query) return result[0].get("available", False) if result else False async def _analyze_single_index( self, schema_name: str, index_name: str ) -> dict[str, Any]: """Analyze a single index.""" # Get index info including type info_query = """ SELECT i.relname as index_name, t.relname as table_name, am.amname as index_type, pg_relation_size(i.oid) as index_size, idx.indisunique as is_unique, idx.indisprimary as is_primary, pg_get_indexdef(i.oid) as definition FROM pg_class i JOIN pg_namespace n ON n.oid = i.relnamespace JOIN pg_am am ON am.oid = i.relam JOIN pg_index idx ON idx.indexrelid = i.oid JOIN pg_class t ON t.oid = idx.indrelid WHERE i.relname = %s AND n.nspname = %s """ info_result = await self.sql_driver.execute_query( info_query, (index_name, schema_name) ) if not info_result: return { "error": f"Index {schema_name}.{index_name} not found" } info = info_result[0] index_type = info["index_type"] # Call appropriate function based on index type if index_type == "btree": stats = await self._get_btree_stats(schema_name, index_name) elif index_type == "gin": stats = await self._get_gin_stats(schema_name, index_name) elif index_type == "hash": stats = await self._get_hash_stats(schema_name, index_name) else: stats = {"note": f"pgstattuple does not support {index_type} indexes directly"} return { "schema": schema_name, "index_name": index_name, "table_name": info["table_name"], "index_type": index_type, "is_unique": info["is_unique"], "is_primary": info["is_primary"], "size": { "bytes": info["index_size"], "pretty": self._format_bytes(info["index_size"]) }, "definition": info["definition"], "statistics": stats, "recommendations": self._generate_index_recommendations(stats, index_type, info) } async def _get_btree_stats( self, schema_name: str, index_name: str ) -> dict[str, Any]: """Get B-tree index statistics using pgstatindex.""" query = """ SELECT * FROM pgstatindex(quote_ident(%s) || '.' || quote_ident(%s)) """ result = await self.sql_driver.execute_query( query, (schema_name, index_name) ) if not result: return {"error": "Could not get index statistics"} stats = result[0] # Get key metrics for bloat analysis avg_density = stats.get("avg_leaf_density", 90) or 90 # Calculate free_percent based on empty/deleted pages vs total leaf_pages = stats.get("leaf_pages", 1) or 1 empty_pages = stats.get("empty_pages", 0) or 0 deleted_pages = stats.get("deleted_pages", 0) or 0 free_percent = round(100.0 * (empty_pages + deleted_pages) / leaf_pages, 2) if leaf_pages > 0 else 0 # Get comprehensive bloat analysis bloat_analysis = self._get_index_bloat_severity(avg_density, free_percent) return { "version": stats.get("version"), "tree_level": stats.get("tree_level"), "index_size": stats.get("index_size"), "root_block_no": stats.get("root_block_no"), "internal_pages": stats.get("internal_pages"), "leaf_pages": leaf_pages, "empty_pages": empty_pages, "deleted_pages": deleted_pages, "avg_leaf_density": avg_density, "leaf_fragmentation": stats.get("leaf_fragmentation"), "free_percent": free_percent, "estimated_bloat_percent": bloat_analysis["estimated_bloat_percent"], "bloat_severity": bloat_analysis["overall_severity"], "density_status": bloat_analysis["density_status"], "issues": bloat_analysis["issues"] } async def _get_gin_stats( self, schema_name: str, index_name: str ) -> dict[str, Any]: """Get GIN index statistics using pgstatginindex.""" query = """ SELECT * FROM pgstatginindex(quote_ident(%s) || '.' || quote_ident(%s)) """ result = await self.sql_driver.execute_query( query, (schema_name, index_name) ) if not result: return {"error": "Could not get GIN index statistics"} stats = result[0] return { "version": stats.get("version"), "pending_pages": stats.get("pending_pages"), "pending_tuples": stats.get("pending_tuples"), "note": "GIN indexes with many pending tuples may need VACUUM to merge pending entries" } async def _get_hash_stats( self, schema_name: str, index_name: str ) -> dict[str, Any]: """Get Hash index statistics using pgstathashindex.""" query = """ SELECT * FROM pgstathashindex(quote_ident(%s) || '.' || quote_ident(%s)) """ result = await self.sql_driver.execute_query( query, (schema_name, index_name) ) if not result: return {"error": "Could not get Hash index statistics"} stats = result[0] return { "version": stats.get("version"), "bucket_pages": stats.get("bucket_pages"), "overflow_pages": stats.get("overflow_pages"), "bitmap_pages": stats.get("bitmap_pages"), "unused_pages": stats.get("unused_pages"), "live_items": stats.get("live_items"), "dead_items": stats.get("dead_items"), "free_percent": stats.get("free_percent") } async def _analyze_table_indexes( self, schema_name: str, table_name: str, min_size_gb: float, min_bloat_percent: float ) -> dict[str, Any]: """Analyze all indexes on a specific table.""" # Convert GB to bytes (use bigint cast to avoid integer overflow) min_size_bytes = int(min_size_gb * 1024 * 1024 * 1024) # Get all indexes on the table indexes_query = """ SELECT i.relname as index_name, am.amname as index_type, pg_relation_size(i.oid) as index_size FROM pg_class i JOIN pg_namespace n ON n.oid = i.relnamespace JOIN pg_am am ON am.oid = i.relam JOIN pg_index idx ON idx.indexrelid = i.oid JOIN pg_class t ON t.oid = idx.indrelid WHERE t.relname = %s AND n.nspname = %s AND pg_relation_size(i.oid) >= %s::bigint ORDER BY pg_relation_size(i.oid) DESC """ indexes = await self.sql_driver.execute_query( indexes_query, (table_name, schema_name, min_size_bytes) ) if not indexes: return { "schema": schema_name, "table_name": table_name, "message": f"No indexes found with size >= {min_size_gb}GB", "indexes": [] } results = [] for idx in indexes: try: idx_result = await self._analyze_single_index( schema_name, idx["index_name"] ) if "error" not in idx_result: stats = idx_result.get("statistics", {}) bloat_pct = stats.get("estimated_bloat_percent", 0) if bloat_pct >= min_bloat_percent or idx["index_type"] != "btree": results.append(idx_result) except Exception as e: results.append({ "index_name": idx["index_name"], "error": str(e) }) return { "schema": schema_name, "table_name": table_name, "indexes_analyzed": len(indexes), "indexes_with_bloat": len(results), "min_bloat_threshold": min_bloat_percent, "indexes": results } async def _analyze_schema_indexes( self, schema_name: str, min_size_gb: float, min_bloat_percent: float ) -> dict[str, Any]: """Analyze all indexes in a schema.""" # Convert GB to bytes (use bigint cast to avoid integer overflow) min_size_bytes = int(min_size_gb * 1024 * 1024 * 1024) # Get all B-tree user indexes in schema (only B-tree for bloat analysis, exclude system schemas) indexes_query = """ SELECT i.relname as index_name, t.relname as table_name, am.amname as index_type, pg_relation_size(i.oid) as index_size FROM pg_class i JOIN pg_namespace n ON n.oid = i.relnamespace JOIN pg_am am ON am.oid = i.relam JOIN pg_index idx ON idx.indexrelid = i.oid JOIN pg_class t ON t.oid = idx.indrelid WHERE n.nspname = %s AND n.nspname NOT IN ('pg_catalog', 'information_schema', 'pg_toast') AND am.amname = 'btree' AND pg_relation_size(i.oid) >= %s::bigint ORDER BY pg_relation_size(i.oid) DESC LIMIT 50 """ indexes = await self.sql_driver.execute_query( indexes_query, (schema_name, min_size_bytes) ) if not indexes: return { "schema": schema_name, "message": f"No B-tree indexes found with size >= {min_size_gb}GB", "indexes": [] } results = [] total_size = 0 total_bloated_size = 0 for idx in indexes: try: stats = await self._get_btree_stats(schema_name, idx["index_name"]) if "error" not in stats: bloat_pct = stats.get("estimated_bloat_percent", 0) idx_size = idx["index_size"] total_size += idx_size if bloat_pct >= min_bloat_percent: bloated_size = int(idx_size * bloat_pct / 100) total_bloated_size += bloated_size results.append({ "index_name": idx["index_name"], "table_name": idx["table_name"], "index_size": idx_size, "index_size_pretty": self._format_bytes(idx_size), "avg_leaf_density": stats.get("avg_leaf_density"), "leaf_fragmentation": stats.get("leaf_fragmentation"), "estimated_bloat_percent": bloat_pct, "bloat_severity": stats.get("bloat_severity"), "estimated_wasted_space": self._format_bytes(bloated_size) }) except Exception as e: pass # Skip indexes that fail # Sort by bloat percent results.sort(key=lambda x: x.get("estimated_bloat_percent", 0), reverse=True) return { "schema": schema_name, "indexes_analyzed": len(indexes), "indexes_with_bloat": len(results), "min_bloat_threshold": min_bloat_percent, "summary": { "total_index_size": self._format_bytes(total_size), "estimated_bloated_space": self._format_bytes(total_bloated_size) }, "indexes": results, "recommendations": self._generate_schema_index_recommendations(results) } def _get_index_bloat_severity(self, avg_leaf_density: float, free_percent: float = 0) -> dict[str, Any]: """ Determine index bloat severity based on pgstatindex metrics. Rules based on pgstattuple best practices for indexes: - avg_leaf_density < 70%: Index page fragmentation (needs REINDEX) - free_space > 20%: Too many empty index pages (needs REINDEX) - leaf_pages grows over time: Index bloat accumulating """ severity_result = { "overall_severity": "low", "density_status": "normal", "issues": [] } severity_score = 0 # Calculate estimated bloat from density (ideal is ~90%) estimated_bloat = max(0, 90 - avg_leaf_density) # Rule: avg_leaf_density < 70% = Index page fragmentation if avg_leaf_density < 50: severity_result["density_status"] = "critical" severity_result["issues"].append( f"Leaf density ({avg_leaf_density:.1f}%) is critically low (<50%). " "Index is heavily fragmented. REINDEX required." ) severity_score += 3 elif avg_leaf_density < 70: severity_result["density_status"] = "warning" severity_result["issues"].append( f"Leaf density ({avg_leaf_density:.1f}%) indicates fragmentation (<70%). " "Consider REINDEX to improve performance." ) severity_score += 2 # Rule: free_space > 20% = Too many empty index pages if free_percent > 30: severity_result["issues"].append( f"Free space ({free_percent:.1f}%) is very high (>30%). " "Many empty index pages. REINDEX recommended." ) severity_score += 2 elif free_percent > 20: severity_result["issues"].append( f"Free space ({free_percent:.1f}%) is elevated (>20%). " "Index may benefit from REINDEX." ) severity_score += 1 # Determine overall severity if severity_score >= 4 or estimated_bloat >= 40: severity_result["overall_severity"] = "critical" elif severity_score >= 3 or estimated_bloat >= 30: severity_result["overall_severity"] = "high" elif severity_score >= 2 or estimated_bloat >= 20: severity_result["overall_severity"] = "moderate" else: severity_result["overall_severity"] = "low" severity_result["estimated_bloat_percent"] = round(estimated_bloat, 2) return severity_result def _generate_index_recommendations( self, stats: dict, index_type: str, info: dict ) -> list[str]: """ Generate recommendations for a single index. Based on pgstatindex best practices: - avg_leaf_density < 70%: Index page fragmentation → REINDEX - free_space > 20%: Too many empty index pages → REINDEX - leaf_pages grows over time: Index bloat accumulating """ recommendations = [] if index_type == "btree": avg_density = stats.get("avg_leaf_density", 90) free_percent = stats.get("free_percent", 0) bloat_pct = stats.get("estimated_bloat_percent", 0) severity = stats.get("bloat_severity", "low") index_name = info.get("index_name", "") schema = info.get("schema", "public") if "schema" in info else "public" full_name = f"{schema}.{index_name}" if schema else index_name # Rule: avg_leaf_density < 70% = fragmentation if avg_density < 50: recommendations.append( f"CRITICAL: Leaf density ({avg_density:.1f}%) is very low (<50%). " f"Index is heavily fragmented. Run:\n" f" REINDEX INDEX CONCURRENTLY {full_name};" ) elif avg_density < 70: recommendations.append( f"WARNING: Leaf density ({avg_density:.1f}%) indicates fragmentation (<70%). " f"Consider:\n" f" REINDEX INDEX CONCURRENTLY {full_name};" ) # Rule: free_space > 20% = too many empty pages if free_percent > 30: recommendations.append( f"CRITICAL: Free space ({free_percent:.1f}%) is very high (>30%). " "Many empty index pages. REINDEX strongly recommended." ) elif free_percent > 20: recommendations.append( f"WARNING: Free space ({free_percent:.1f}%) is elevated (>20%). " "Index may benefit from REINDEX." ) frag = stats.get("leaf_fragmentation", 0) if frag and frag > 30: recommendations.append( f"Index has {frag:.1f}% leaf fragmentation. " "This can slow sequential index scans. Consider REINDEX." ) deleted_pages = stats.get("deleted_pages", 0) if deleted_pages and deleted_pages > 10: recommendations.append( f"Index has {deleted_pages} deleted pages. " "These will be reclaimed by future index operations or REINDEX." ) # If no issues, provide positive feedback if not recommendations: if avg_density >= 70 and free_percent <= 20: recommendations.append( f"Index {full_name} is healthy. Leaf density ({avg_density:.1f}%) is good." ) elif index_type == "gin": pending = stats.get("pending_tuples", 0) if pending and pending > 1000: recommendations.append( f"GIN index has {pending} pending tuples. " "Run VACUUM to merge pending entries into main index." ) elif pending and pending > 100: recommendations.append( f"GIN index has {pending} pending tuples. " "Consider running VACUUM if this continues to grow." ) elif index_type == "hash": dead_items = stats.get("dead_items", 0) if dead_items and dead_items > 100: recommendations.append( f"Hash index has {dead_items} dead items. " "Run VACUUM to clean up dead entries." ) return recommendations def _generate_schema_index_recommendations(self, indexes: list[dict]) -> list[str]: """ Generate schema-wide index recommendations. Based on pgstatindex best practices: - avg_leaf_density < 70%: Index page fragmentation - free_space > 20%: Too many empty index pages """ recommendations = [] critical = [i for i in indexes if i.get("bloat_severity") == "critical"] high = [i for i in indexes if i.get("bloat_severity") == "high"] if critical: idx_list = ", ".join(i["index_name"] for i in critical[:5]) recommendations.append( f"CRITICAL: {len(critical)} indexes have critical bloat. " f"Priority indexes: {idx_list}" ) recommendations.append( "Run REINDEX INDEX CONCURRENTLY for these indexes to reclaim space." ) if high: idx_list = ", ".join(i["index_name"] for i in high[:5]) recommendations.append( f"HIGH: {len(high)} indexes have high bloat levels. " f"Indexes: {idx_list}" ) # Check for low density indexes (< 70%) low_density_indexes = [i for i in indexes if i.get("avg_leaf_density", 100) < 70] if low_density_indexes: idx_list = ", ".join(i["index_name"] for i in low_density_indexes[:5]) recommendations.append( f"{len(low_density_indexes)} indexes have leaf density <70% (fragmented). " f"Indexes: {idx_list}. Consider REINDEX." ) return recommendations def _format_bytes(self, size: int | None) -> str: """Format bytes to human-readable string.""" if size is None: return "0 B" for unit in ['B', 'KB', 'MB', 'GB', 'TB']: if abs(size) < 1024.0: return f"{size:.2f} {unit}" size /= 1024.0 return f"{size:.2f} PB"
  • Input schema definition for the analyze_index_bloat tool, defining parameters like index_name, table_name, schema_name, min_index_size_gb, and min_bloat_percent.
    def get_tool_definition(self) -> Tool: return Tool( name=self.name, description=self.description, inputSchema={ "type": "object", "properties": { "index_name": { "type": "string", "description": "Name of a specific index to analyze" }, "table_name": { "type": "string", "description": "Analyze all indexes on this table" }, "schema_name": { "type": "string", "description": "Schema name (default: public)", "default": "public" }, "min_index_size_gb": { "type": "number", "description": "Minimum index size in GB to include (default: 5)", "default": 5 }, "min_bloat_percent": { "type": "number", "description": "Only show indexes with bloat above this percentage (default: 20)", "default": 20 } }, "required": [] }, annotations=self.get_annotations() )
  • Registration of the IndexBloatToolHandler instance in the register_all_tools() function, adding it to the global tool_handlers dictionary.
    # Bloat detection tools (using pgstattuple extension) add_tool_handler(TableBloatToolHandler(driver)) add_tool_handler(IndexBloatToolHandler(driver)) add_tool_handler(DatabaseBloatSummaryToolHandler(driver))
  • _get_btree_stats helper method: executes pgstatindex query and computes bloat metrics like avg_leaf_density, free_percent, and severity analysis.
    async def _get_btree_stats( self, schema_name: str, index_name: str ) -> dict[str, Any]: """Get B-tree index statistics using pgstatindex.""" query = """ SELECT * FROM pgstatindex(quote_ident(%s) || '.' || quote_ident(%s)) """ result = await self.sql_driver.execute_query( query, (schema_name, index_name) ) if not result: return {"error": "Could not get index statistics"} stats = result[0] # Get key metrics for bloat analysis avg_density = stats.get("avg_leaf_density", 90) or 90 # Calculate free_percent based on empty/deleted pages vs total leaf_pages = stats.get("leaf_pages", 1) or 1 empty_pages = stats.get("empty_pages", 0) or 0 deleted_pages = stats.get("deleted_pages", 0) or 0 free_percent = round(100.0 * (empty_pages + deleted_pages) / leaf_pages, 2) if leaf_pages > 0 else 0 # Get comprehensive bloat analysis bloat_analysis = self._get_index_bloat_severity(avg_density, free_percent) return { "version": stats.get("version"), "tree_level": stats.get("tree_level"), "index_size": stats.get("index_size"), "root_block_no": stats.get("root_block_no"), "internal_pages": stats.get("internal_pages"), "leaf_pages": leaf_pages, "empty_pages": empty_pages, "deleted_pages": deleted_pages, "avg_leaf_density": avg_density, "leaf_fragmentation": stats.get("leaf_fragmentation"), "free_percent": free_percent, "estimated_bloat_percent": bloat_analysis["estimated_bloat_percent"], "bloat_severity": bloat_analysis["overall_severity"], "density_status": bloat_analysis["density_status"], "issues": bloat_analysis["issues"] }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/isdaniel/pgtuner-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server