get_bloat_summary
Analyze PostgreSQL database bloat to identify tables and indexes with wasted space, estimate reclaimable storage, and prioritize maintenance tasks for performance optimization.
Instructions
Get a comprehensive summary of database bloat across tables and indexes.
Note: This tool analyzes only user/client tables and indexes, excluding PostgreSQL system objects (pg_catalog, information_schema, pg_toast). This focuses the analysis on your application's custom objects.
Provides a high-level overview of:
Top bloated tables by wasted space
Top bloated indexes by estimated bloat
Total reclaimable space estimates
Priority maintenance recommendations
Uses pgstattuple_approx for tables (faster) and pgstatindex for B-tree indexes. Requires the pgstattuple extension to be installed.
Best for: Quick assessment of database bloat and maintenance priorities.
Input Schema
| Name | Required | Description | Default |
|---|---|---|---|
| schema_name | No | Schema to analyze (default: public) | public |
| top_n | No | Number of top bloated objects to show (default: 10) | |
| min_size_gb | No | Minimum object size in GB to include (default: 5) |
Implementation Reference
- The DatabaseBloatSummaryToolHandler class is the main handler for the 'get_bloat_summary' tool. It defines the tool name, description, input schema, and implements the run_tool method that checks for pgstattuple extension, fetches table and index bloat summaries, computes totals, and generates priority maintenance actions.class DatabaseBloatSummaryToolHandler(ToolHandler): """Tool handler for getting a comprehensive database bloat summary.""" name = "get_bloat_summary" title = "Database Bloat Summary" read_only_hint = True destructive_hint = False idempotent_hint = True open_world_hint = False description = """Get a comprehensive summary of database bloat across tables and indexes. Note: This tool analyzes only user/client tables and indexes, excluding PostgreSQL system objects (pg_catalog, information_schema, pg_toast). This focuses the analysis on your application's custom objects. Provides a high-level overview of: - Top bloated tables by wasted space - Top bloated indexes by estimated bloat - Total reclaimable space estimates - Priority maintenance recommendations Uses pgstattuple_approx for tables (faster) and pgstatindex for B-tree indexes. Requires the pgstattuple extension to be installed. Best for: Quick assessment of database bloat and maintenance priorities.""" def __init__(self, sql_driver: SqlDriver): self.sql_driver = sql_driver def get_tool_definition(self) -> Tool: return Tool( name=self.name, description=self.description, inputSchema={ "type": "object", "properties": { "schema_name": { "type": "string", "description": "Schema to analyze (default: public)", "default": "public" }, "top_n": { "type": "integer", "description": "Number of top bloated objects to show (default: 10)", "default": 10 }, "min_size_gb": { "type": "number", "description": "Minimum object size in GB to include (default: 5)", "default": 5 } }, "required": [] }, annotations=self.get_annotations() ) async def run_tool(self, arguments: dict[str, Any]) -> Sequence[TextContent]: try: schema_name = arguments.get("schema_name", "public") top_n = arguments.get("top_n", 10) min_size_gb = arguments.get("min_size_gb", 5) # Check extension ext_query = """ SELECT EXISTS ( SELECT 1 FROM pg_extension WHERE extname = 'pgstattuple' ) as available """ ext_result = await self.sql_driver.execute_query(ext_query) if not ext_result or not ext_result[0].get("available"): return self.format_result( "Error: pgstattuple extension is not installed.\n" "Install it with: CREATE EXTENSION IF NOT EXISTS pgstattuple;" ) # Get table bloat summary table_bloat = await self._get_table_bloat_summary( schema_name, top_n, min_size_gb ) # Get index bloat summary index_bloat = await self._get_index_bloat_summary( schema_name, top_n, min_size_gb ) # Calculate totals total_table_wasted = sum( t.get("wasted_bytes", 0) for t in table_bloat.get("tables", []) ) total_index_wasted = sum( i.get("estimated_wasted_bytes", 0) for i in index_bloat.get("indexes", []) ) result = { "schema": schema_name, "summary": { "tables_analyzed": table_bloat.get("tables_analyzed", 0), "indexes_analyzed": index_bloat.get("indexes_analyzed", 0), "total_table_wasted_space": self._format_bytes(total_table_wasted), "total_index_wasted_space": self._format_bytes(total_index_wasted), "total_reclaimable": self._format_bytes(total_table_wasted + total_index_wasted) }, "top_bloated_tables": table_bloat.get("tables", []), "top_bloated_indexes": index_bloat.get("indexes", []), "maintenance_priority": self._generate_priority_actions( table_bloat.get("tables", []), index_bloat.get("indexes", []) ) } return self.format_json_result(result) except Exception as e: return self.format_error(e) async def _get_table_bloat_summary( self, schema_name: str, top_n: int, min_size_gb: float ) -> dict[str, Any]: """ Get summary of table bloat using pgstattuple_approx. Analyzes tables based on the key bloat indicators: - dead_tuple_percent > 10%: Autovacuum lag - free_percent > 20%: Page fragmentation - tuple_percent < 70%: Heavy bloat """ # Convert GB to bytes (use bigint cast to avoid integer overflow) min_size_bytes = int(min_size_gb * 1024 * 1024 * 1024) # Get user tables to analyze (exclude system schemas) tables_query = """ SELECT c.relname as table_name, pg_table_size(c.oid) as table_size FROM pg_class c JOIN pg_namespace n ON n.oid = c.relnamespace WHERE c.relkind = 'r' AND n.nspname = %s AND n.nspname NOT IN ('pg_catalog', 'information_schema', 'pg_toast') AND pg_table_size(c.oid) >= %s::bigint ORDER BY pg_table_size(c.oid) DESC LIMIT 100 """ tables = await self.sql_driver.execute_query( tables_query, (schema_name, min_size_bytes) ) results = [] for table in tables: try: stats_query = """ SELECT * FROM pgstattuple_approx(quote_ident(%s) || '.' || quote_ident(%s)) """ stats_result = await self.sql_driver.execute_query( stats_query, (schema_name, table["table_name"]) ) if stats_result: stats = stats_result[0] table_len = stats.get("table_len", 0) or 0 dead_tuple_len = stats.get("dead_tuple_len", 0) or 0 free_space = stats.get("approx_free_space", 0) or 0 wasted = dead_tuple_len + free_space wasted_pct = round(100.0 * wasted / table_len, 2) if table_len > 0 else 0 # Get key metrics for bloat analysis dead_tuple_percent = stats.get("dead_tuple_percent", 0) or 0 free_percent = stats.get("approx_free_percent", 0) or 0 tuple_percent = stats.get("approx_tuple_percent", 0) or 0 # Determine bloat severity based on rules bloat_severity = "minimal" if dead_tuple_percent > 30 or free_percent > 30 or (tuple_percent > 0 and tuple_percent < 50): bloat_severity = "critical" elif dead_tuple_percent > 10 or free_percent > 20 or (tuple_percent > 0 and tuple_percent < 70): bloat_severity = "high" elif dead_tuple_percent > 5 or free_percent > 10: bloat_severity = "moderate" results.append({ "table_name": table["table_name"], "table_size": self._format_bytes(table_len), "table_size_bytes": table_len, "dead_tuple_percent": dead_tuple_percent, "free_percent": free_percent, "tuple_percent": tuple_percent, "wasted_bytes": wasted, "wasted_space": self._format_bytes(wasted), "wasted_percent": wasted_pct, "bloat_severity": bloat_severity }) except Exception: pass # Sort by wasted space and take top N results.sort(key=lambda x: x.get("wasted_bytes", 0), reverse=True) return { "tables_analyzed": len(tables) if tables else 0, "tables": results[:top_n] } async def _get_index_bloat_summary( self, schema_name: str, top_n: int, min_size_gb: float ) -> dict[str, Any]: """ Get summary of index bloat using pgstatindex. Analyzes indexes based on the key bloat indicators: - avg_leaf_density < 70%: Index page fragmentation - free_space > 20%: Too many empty index pages """ # Convert GB to bytes (use bigint cast to avoid integer overflow) min_size_bytes = int(min_size_gb * 1024 * 1024 * 1024) # Get B-tree user indexes (exclude system schemas) indexes_query = """ SELECT i.relname as index_name, t.relname as table_name, pg_relation_size(i.oid) as index_size FROM pg_class i JOIN pg_namespace n ON n.oid = i.relnamespace JOIN pg_am am ON am.oid = i.relam JOIN pg_index idx ON idx.indexrelid = i.oid JOIN pg_class t ON t.oid = idx.indrelid WHERE n.nspname = %s AND n.nspname NOT IN ('pg_catalog', 'information_schema', 'pg_toast') AND am.amname = 'btree' AND pg_relation_size(i.oid) >= %s::bigint ORDER BY pg_relation_size(i.oid) DESC LIMIT 100 """ indexes = await self.sql_driver.execute_query( indexes_query, (schema_name, min_size_bytes) ) results = [] for idx in indexes: try: stats_query = """ SELECT * FROM pgstatindex(quote_ident(%s) || '.' || quote_ident(%s)) """ stats_result = await self.sql_driver.execute_query( stats_query, (schema_name, idx["index_name"]) ) if stats_result: stats = stats_result[0] avg_density = stats.get("avg_leaf_density", 90) or 90 bloat_pct = max(0, 90 - avg_density) idx_size = idx["index_size"] wasted = int(idx_size * bloat_pct / 100) # Calculate free percent from empty/deleted pages leaf_pages = stats.get("leaf_pages", 1) or 1 empty_pages = stats.get("empty_pages", 0) or 0 deleted_pages = stats.get("deleted_pages", 0) or 0 free_percent = round(100.0 * (empty_pages + deleted_pages) / leaf_pages, 2) if leaf_pages > 0 else 0 # Determine bloat severity based on rules bloat_severity = "low" if avg_density < 50 or free_percent > 30: bloat_severity = "critical" elif avg_density < 70 or free_percent > 20: bloat_severity = "high" elif bloat_pct >= 20: bloat_severity = "moderate" results.append({ "index_name": idx["index_name"], "table_name": idx["table_name"], "index_size": self._format_bytes(idx_size), "index_size_bytes": idx_size, "avg_leaf_density": avg_density, "free_percent": free_percent, "estimated_bloat_percent": round(bloat_pct, 2), "estimated_wasted_bytes": wasted, "estimated_wasted_space": self._format_bytes(wasted), "bloat_severity": bloat_severity }) except Exception: pass # Sort by bloat percent and take top N results.sort(key=lambda x: x.get("estimated_bloat_percent", 0), reverse=True) return { "indexes_analyzed": len(indexes) if indexes else 0, "indexes": results[:top_n] } def _generate_priority_actions( self, tables: list[dict], indexes: list[dict] ) -> list[dict]: """ Generate prioritized maintenance actions based on bloat analysis. Uses pgstattuple best practice thresholds: - Tables: dead_tuple_percent > 10%, free_percent > 20%, tuple_percent < 70% - Indexes: avg_leaf_density < 70%, free_space > 20% """ actions = [] # High-priority table maintenance based on the new rules for t in tables: dead_pct = t.get("dead_tuple_percent", 0) free_pct = t.get("free_percent", 0) tuple_pct = t.get("tuple_percent", 100) wasted_pct = t.get("wasted_percent", 0) severity = t.get("bloat_severity", "minimal") issues = [] priority = "low" # Check dead tuple percent (autovacuum lag indicator) if dead_pct > 30: issues.append(f"dead tuples {dead_pct:.1f}% (critical)") priority = "high" elif dead_pct > 10: issues.append(f"dead tuples {dead_pct:.1f}% (autovacuum lag)") priority = "medium" if priority != "high" else priority # Check free space percent (fragmentation indicator) if free_pct > 30: issues.append(f"free space {free_pct:.1f}% (severe fragmentation)") priority = "high" elif free_pct > 20: issues.append(f"free space {free_pct:.1f}% (fragmentation)") priority = "medium" if priority != "high" else priority # Check tuple percent (live data density) if tuple_pct > 0 and tuple_pct < 50: issues.append(f"tuple density {tuple_pct:.1f}% (critical bloat)") priority = "high" elif tuple_pct > 0 and tuple_pct < 70: issues.append(f"tuple density {tuple_pct:.1f}% (heavy bloat)") priority = "medium" if priority != "high" else priority if issues: action = f"VACUUM ANALYZE {t['table_name']}" alternative = None if priority == "high" or tuple_pct < 70: alternative = f"VACUUM FULL {t['table_name']} (requires exclusive lock) or pg_repack" actions.append({ "priority": priority, "type": "table", "object": t["table_name"], "issue": "; ".join(issues), "action": action, "alternative": alternative }) # High-priority index maintenance based on the new rules for i in indexes: avg_density = i.get("avg_leaf_density", 90) free_pct = i.get("free_percent", 0) bloat_pct = i.get("estimated_bloat_percent", 0) severity = i.get("bloat_severity", "low") issues = [] priority = "low" # Check leaf density (fragmentation indicator) if avg_density < 50: issues.append(f"leaf density {avg_density:.1f}% (critical fragmentation)") priority = "high" elif avg_density < 70: issues.append(f"leaf density {avg_density:.1f}% (fragmentation)") priority = "medium" # Check free percent if free_pct > 30: issues.append(f"free space {free_pct:.1f}% (many empty pages)") priority = "high" elif free_pct > 20: issues.append(f"free space {free_pct:.1f}% (elevated)") priority = "medium" if priority != "high" else priority if issues: actions.append({ "priority": priority, "type": "index", "object": i["index_name"], "table": i["table_name"], "issue": "; ".join(issues), "action": f"REINDEX INDEX CONCURRENTLY {i['index_name']}" }) # Sort by priority priority_order = {"high": 0, "medium": 1, "low": 2} actions.sort(key=lambda x: priority_order.get(x.get("priority", "low"), 2)) return actions[:10] # Top 10 actions def _format_bytes(self, size: int | None) -> str: """Format bytes to human-readable string.""" if size is None: return "0 B" for unit in ['B', 'KB', 'MB', 'GB', 'TB']: if abs(size) < 1024.0: return f"{size:.2f} {unit}" size /= 1024.0 return f"{size:.2f} PB"
- The get_tool_definition method provides the input schema and metadata for the 'get_bloat_summary' tool.def get_tool_definition(self) -> Tool: return Tool( name=self.name, description=self.description, inputSchema={ "type": "object", "properties": { "schema_name": { "type": "string", "description": "Schema to analyze (default: public)", "default": "public" }, "top_n": { "type": "integer", "description": "Number of top bloated objects to show (default: 10)", "default": 10 }, "min_size_gb": { "type": "number", "description": "Minimum object size in GB to include (default: 5)", "default": 5 } }, "required": [] }, annotations=self.get_annotations() )
- src/pgtuner_mcp/server.py:164-170 (registration)In the register_all_tools function, the DatabaseBloatSummaryToolHandler is instantiated with the SQL driver and registered to the MCP server using add_tool_handler.# Bloat detection tools (using pgstattuple extension) add_tool_handler(TableBloatToolHandler(driver)) add_tool_handler(IndexBloatToolHandler(driver)) add_tool_handler(DatabaseBloatSummaryToolHandler(driver)) logger.info(f"Registered {len(tool_handlers)} tool handlers")
- Helper method to compute table bloat summary using pgstattuple_approx on tables in the schema, calculating wasted space and severity.async def _get_table_bloat_summary( self, schema_name: str, top_n: int, min_size_gb: float ) -> dict[str, Any]: """ Get summary of table bloat using pgstattuple_approx. Analyzes tables based on the key bloat indicators: - dead_tuple_percent > 10%: Autovacuum lag - free_percent > 20%: Page fragmentation - tuple_percent < 70%: Heavy bloat """ # Convert GB to bytes (use bigint cast to avoid integer overflow) min_size_bytes = int(min_size_gb * 1024 * 1024 * 1024) # Get user tables to analyze (exclude system schemas) tables_query = """ SELECT c.relname as table_name, pg_table_size(c.oid) as table_size FROM pg_class c JOIN pg_namespace n ON n.oid = c.relnamespace WHERE c.relkind = 'r' AND n.nspname = %s AND n.nspname NOT IN ('pg_catalog', 'information_schema', 'pg_toast') AND pg_table_size(c.oid) >= %s::bigint ORDER BY pg_table_size(c.oid) DESC LIMIT 100 """ tables = await self.sql_driver.execute_query( tables_query, (schema_name, min_size_bytes) ) results = [] for table in tables: try: stats_query = """ SELECT * FROM pgstattuple_approx(quote_ident(%s) || '.' || quote_ident(%s)) """ stats_result = await self.sql_driver.execute_query( stats_query, (schema_name, table["table_name"]) ) if stats_result: stats = stats_result[0] table_len = stats.get("table_len", 0) or 0 dead_tuple_len = stats.get("dead_tuple_len", 0) or 0 free_space = stats.get("approx_free_space", 0) or 0 wasted = dead_tuple_len + free_space wasted_pct = round(100.0 * wasted / table_len, 2) if table_len > 0 else 0 # Get key metrics for bloat analysis dead_tuple_percent = stats.get("dead_tuple_percent", 0) or 0 free_percent = stats.get("approx_free_percent", 0) or 0 tuple_percent = stats.get("approx_tuple_percent", 0) or 0 # Determine bloat severity based on rules bloat_severity = "minimal" if dead_tuple_percent > 30 or free_percent > 30 or (tuple_percent > 0 and tuple_percent < 50): bloat_severity = "critical" elif dead_tuple_percent > 10 or free_percent > 20 or (tuple_percent > 0 and tuple_percent < 70): bloat_severity = "high" elif dead_tuple_percent > 5 or free_percent > 10: bloat_severity = "moderate" results.append({ "table_name": table["table_name"], "table_size": self._format_bytes(table_len), "table_size_bytes": table_len, "dead_tuple_percent": dead_tuple_percent, "free_percent": free_percent, "tuple_percent": tuple_percent, "wasted_bytes": wasted, "wasted_space": self._format_bytes(wasted), "wasted_percent": wasted_pct, "bloat_severity": bloat_severity }) except Exception: pass # Sort by wasted space and take top N results.sort(key=lambda x: x.get("wasted_bytes", 0), reverse=True) return { "tables_analyzed": len(tables) if tables else 0, "tables": results[:top_n] }
- Helper method to compute index bloat summary using pgstatindex on B-tree indexes, estimating bloat from leaf density.async def _get_index_bloat_summary( self, schema_name: str, top_n: int, min_size_gb: float ) -> dict[str, Any]: """ Get summary of index bloat using pgstatindex. Analyzes indexes based on the key bloat indicators: - avg_leaf_density < 70%: Index page fragmentation - free_space > 20%: Too many empty index pages """ # Convert GB to bytes (use bigint cast to avoid integer overflow) min_size_bytes = int(min_size_gb * 1024 * 1024 * 1024) # Get B-tree user indexes (exclude system schemas) indexes_query = """ SELECT i.relname as index_name, t.relname as table_name, pg_relation_size(i.oid) as index_size FROM pg_class i JOIN pg_namespace n ON n.oid = i.relnamespace JOIN pg_am am ON am.oid = i.relam JOIN pg_index idx ON idx.indexrelid = i.oid JOIN pg_class t ON t.oid = idx.indrelid WHERE n.nspname = %s AND n.nspname NOT IN ('pg_catalog', 'information_schema', 'pg_toast') AND am.amname = 'btree' AND pg_relation_size(i.oid) >= %s::bigint ORDER BY pg_relation_size(i.oid) DESC LIMIT 100 """ indexes = await self.sql_driver.execute_query( indexes_query, (schema_name, min_size_bytes) ) results = [] for idx in indexes: try: stats_query = """ SELECT * FROM pgstatindex(quote_ident(%s) || '.' || quote_ident(%s)) """ stats_result = await self.sql_driver.execute_query( stats_query, (schema_name, idx["index_name"]) ) if stats_result: stats = stats_result[0] avg_density = stats.get("avg_leaf_density", 90) or 90 bloat_pct = max(0, 90 - avg_density) idx_size = idx["index_size"] wasted = int(idx_size * bloat_pct / 100) # Calculate free percent from empty/deleted pages leaf_pages = stats.get("leaf_pages", 1) or 1 empty_pages = stats.get("empty_pages", 0) or 0 deleted_pages = stats.get("deleted_pages", 0) or 0 free_percent = round(100.0 * (empty_pages + deleted_pages) / leaf_pages, 2) if leaf_pages > 0 else 0 # Determine bloat severity based on rules bloat_severity = "low" if avg_density < 50 or free_percent > 30: bloat_severity = "critical" elif avg_density < 70 or free_percent > 20: bloat_severity = "high" elif bloat_pct >= 20: bloat_severity = "moderate" results.append({ "index_name": idx["index_name"], "table_name": idx["table_name"], "index_size": self._format_bytes(idx_size), "index_size_bytes": idx_size, "avg_leaf_density": avg_density, "free_percent": free_percent, "estimated_bloat_percent": round(bloat_pct, 2), "estimated_wasted_bytes": wasted, "estimated_wasted_space": self._format_bytes(wasted), "bloat_severity": bloat_severity }) except Exception: pass # Sort by bloat percent and take top N results.sort(key=lambda x: x.get("estimated_bloat_percent", 0), reverse=True) return { "indexes_analyzed": len(indexes) if indexes else 0, "indexes": results[:top_n] }