get_table_stats
Analyze PostgreSQL table statistics to identify maintenance needs and performance issues. Provides table sizes, row counts, scan ratios, and vacuum timing data for user-created tables.
Instructions
Get detailed statistics for user/client database tables.
Note: This tool analyzes only user-created tables and excludes PostgreSQL system tables (pg_catalog, information_schema, pg_toast). This focuses the analysis on your application's custom tables.
Returns information about:
Table size (data, indexes, total)
Row counts and dead tuple ratio
Last vacuum and analyze times
Sequential vs index scan ratios
Cache hit ratios
This helps identify tables that may need maintenance (VACUUM, ANALYZE) or have performance issues.
Input Schema
| Name | Required | Description | Default |
|---|---|---|---|
| schema_name | No | Schema to analyze (default: public) | public |
| table_name | No | Specific table to analyze (optional, analyzes all tables if not provided) | |
| include_indexes | No | Include index statistics | |
| order_by | No | Order results by this metric | size |
Implementation Reference
- TableStatsToolHandler class implementing the execute logic for 'get_table_stats' tool via run_tool method, querying PostgreSQL stats views for table sizes, rows, vacuum info, etc.class TableStatsToolHandler(ToolHandler): """Tool handler for retrieving table statistics and health metrics.""" name = "get_table_stats" title = "Table Statistics Analyzer" read_only_hint = True destructive_hint = False idempotent_hint = True open_world_hint = False description = """Get detailed statistics for user/client database tables. Note: This tool analyzes only user-created tables and excludes PostgreSQL system tables (pg_catalog, information_schema, pg_toast). This focuses the analysis on your application's custom tables. Returns information about: - Table size (data, indexes, total) - Row counts and dead tuple ratio - Last vacuum and analyze times - Sequential vs index scan ratios - Cache hit ratios This helps identify tables that may need maintenance (VACUUM, ANALYZE) or have performance issues.""" def __init__(self, sql_driver: SqlDriver): self.sql_driver = sql_driver def get_tool_definition(self) -> Tool: return Tool( name=self.name, description=self.description, inputSchema={ "type": "object", "properties": { "schema_name": { "type": "string", "description": "Schema to analyze (default: public)", "default": "public" }, "table_name": { "type": "string", "description": "Specific table to analyze (optional, analyzes all tables if not provided)" }, "include_indexes": { "type": "boolean", "description": "Include index statistics", "default": True }, "order_by": { "type": "string", "description": "Order results by this metric", "enum": ["size", "rows", "dead_tuples", "seq_scans", "last_vacuum"], "default": "size" } }, "required": [] }, annotations=self.get_annotations() ) async def run_tool(self, arguments: dict[str, Any]) -> Sequence[TextContent]: try: schema_name = arguments.get("schema_name", "public") table_name = arguments.get("table_name") include_indexes = arguments.get("include_indexes", True) order_by = arguments.get("order_by", "size") # Build the query order_map = { "size": "total_size DESC", "rows": "n_live_tup DESC", "dead_tuples": "n_dead_tup DESC", "seq_scans": "seq_scan DESC", "last_vacuum": "last_vacuum DESC NULLS LAST" } order_clause = order_map.get(order_by, "total_size DESC") table_filter = "" params = [schema_name] if table_name: table_filter = "AND c.relname ILIKE %s" params.append(table_name) # Query only user tables, explicitly excluding system schemas query = f""" SELECT c.relname as table_name, n.nspname as schema_name, pg_size_pretty(pg_table_size(c.oid)) as table_size, pg_size_pretty(pg_indexes_size(c.oid)) as indexes_size, pg_size_pretty(pg_total_relation_size(c.oid)) as total_size, pg_total_relation_size(c.oid) as total_size_bytes, s.n_live_tup, s.n_dead_tup, CASE WHEN s.n_live_tup > 0 THEN ROUND(100.0 * s.n_dead_tup / s.n_live_tup, 2) ELSE 0 END as dead_tuple_ratio, s.seq_scan, s.seq_tup_read, s.idx_scan, s.idx_tup_fetch, CASE WHEN s.seq_scan + COALESCE(s.idx_scan, 0) > 0 THEN ROUND(100.0 * COALESCE(s.idx_scan, 0) / (s.seq_scan + COALESCE(s.idx_scan, 0)), 2) ELSE 0 END as index_scan_ratio, s.last_vacuum, s.last_autovacuum, s.last_analyze, s.last_autoanalyze, s.vacuum_count, s.autovacuum_count, s.analyze_count, s.autoanalyze_count FROM pg_class c JOIN pg_namespace n ON n.oid = c.relnamespace LEFT JOIN pg_stat_user_tables s ON s.relid = c.oid WHERE c.relkind = 'r' AND n.nspname = %s AND n.nspname NOT IN ('pg_catalog', 'information_schema', 'pg_toast') {table_filter} ORDER BY {order_clause} """ results = await self.sql_driver.execute_query(query, params) if not results: return self.format_result(f"No tables found in schema '{schema_name}'") output = { "schema": schema_name, "table_count": len(results), "tables": results } # Add index statistics if requested if include_indexes and table_name: index_query = """ SELECT i.indexrelname as index_name, i.idx_scan as scans, i.idx_tup_read as tuples_read, i.idx_tup_fetch as tuples_fetched, pg_size_pretty(pg_relation_size(i.indexrelid)) as size, pg_relation_size(i.indexrelid) as size_bytes, pg_get_indexdef(i.indexrelid) as definition FROM pg_stat_user_indexes i JOIN pg_class c ON c.oid = i.relid JOIN pg_namespace n ON n.oid = c.relnamespace WHERE n.nspname = %s AND c.relname = %s ORDER BY i.idx_scan DESC """ index_results = await self.sql_driver.execute_query( index_query, [schema_name, table_name] ) output["indexes"] = index_results # Add analysis and recommendations output["analysis"] = self._analyze_stats(results) return self.format_json_result(output) except Exception as e: return self.format_error(e) def _analyze_stats(self, tables: list[dict]) -> dict[str, Any]: """Analyze table stats and generate recommendations.""" analysis = { "needs_vacuum": [], "needs_analyze": [], "low_index_usage": [], "recommendations": [] } for table in tables: table_name = table.get("table_name", "unknown") # Check dead tuple ratio dead_ratio = table.get("dead_tuple_ratio", 0) or 0 if dead_ratio > 10: analysis["needs_vacuum"].append({ "table": table_name, "dead_tuple_ratio": dead_ratio, "dead_tuples": table.get("n_dead_tup", 0) }) # Check if analyze is needed last_analyze = table.get("last_analyze") or table.get("last_autoanalyze") n_live = table.get("n_live_tup", 0) or 0 if n_live > 1000 and not last_analyze: analysis["needs_analyze"].append(table_name) # Check index usage idx_ratio = table.get("index_scan_ratio", 0) or 0 seq_scans = table.get("seq_scan", 0) or 0 if seq_scans > 100 and idx_ratio < 50 and n_live > 10000: analysis["low_index_usage"].append({ "table": table_name, "index_scan_ratio": idx_ratio, "seq_scans": seq_scans, "rows": n_live }) # Generate recommendations if analysis["needs_vacuum"]: tables_list = ", ".join(t["table"] for t in analysis["needs_vacuum"][:5]) analysis["recommendations"].append( f"Run VACUUM on tables with high dead tuple ratios: {tables_list}" ) if analysis["needs_analyze"]: tables_list = ", ".join(analysis["needs_analyze"][:5]) analysis["recommendations"].append( f"Run ANALYZE on tables that haven't been analyzed: {tables_list}" ) if analysis["low_index_usage"]: for item in analysis["low_index_usage"][:3]: analysis["recommendations"].append( f"Table '{item['table']}' has low index usage ({item['index_scan_ratio']}% index scans). " "Consider adding indexes for frequently filtered columns." ) return analysis
- Input schema definition for get_table_stats tool parameters: schema_name, table_name, include_indexes, order_by.def get_tool_definition(self) -> Tool: return Tool( name=self.name, description=self.description, inputSchema={ "type": "object", "properties": { "schema_name": { "type": "string", "description": "Schema to analyze (default: public)", "default": "public" }, "table_name": { "type": "string", "description": "Specific table to analyze (optional, analyzes all tables if not provided)" }, "include_indexes": { "type": "boolean", "description": "Include index statistics", "default": True }, "order_by": { "type": "string", "description": "Order results by this metric", "enum": ["size", "rows", "dead_tuples", "seq_scans", "last_vacuum"], "default": "size" } }, "required": [] }, annotations=self.get_annotations() )
- src/pgtuner_mcp/server.py:136-170 (registration)Registration of TableStatsToolHandler instance in the register_all_tools function, called during server initialization.def register_all_tools() -> None: """ Register all available tool handlers. This function serves as the central registry for all tools. New tool handlers should be added here for automatic registration. """ driver = get_sql_driver() hypopg_service = HypoPGService(driver) index_advisor = IndexAdvisor(driver) # Performance analysis tools add_tool_handler(GetSlowQueriesToolHandler(driver)) add_tool_handler(AnalyzeQueryToolHandler(driver)) add_tool_handler(TableStatsToolHandler(driver)) # Index tuning tools add_tool_handler(IndexAdvisorToolHandler(index_advisor)) add_tool_handler(ExplainQueryToolHandler(driver, hypopg_service)) add_tool_handler(HypoPGToolHandler(hypopg_service)) add_tool_handler(UnusedIndexesToolHandler(driver)) # Database health tools add_tool_handler(DatabaseHealthToolHandler(driver)) add_tool_handler(ActiveQueriesToolHandler(driver)) add_tool_handler(WaitEventsToolHandler(driver)) add_tool_handler(DatabaseSettingsToolHandler(driver)) # Bloat detection tools (using pgstattuple extension) add_tool_handler(TableBloatToolHandler(driver)) add_tool_handler(IndexBloatToolHandler(driver)) add_tool_handler(DatabaseBloatSummaryToolHandler(driver)) logger.info(f"Registered {len(tool_handlers)} tool handlers")
- Helper method _analyze_stats that processes table stats results and generates maintenance recommendations like needs_vacuum, needs_analyze.def _analyze_stats(self, tables: list[dict]) -> dict[str, Any]: """Analyze table stats and generate recommendations.""" analysis = { "needs_vacuum": [], "needs_analyze": [], "low_index_usage": [], "recommendations": [] } for table in tables: table_name = table.get("table_name", "unknown") # Check dead tuple ratio dead_ratio = table.get("dead_tuple_ratio", 0) or 0 if dead_ratio > 10: analysis["needs_vacuum"].append({ "table": table_name, "dead_tuple_ratio": dead_ratio, "dead_tuples": table.get("n_dead_tup", 0) }) # Check if analyze is needed last_analyze = table.get("last_analyze") or table.get("last_autoanalyze") n_live = table.get("n_live_tup", 0) or 0 if n_live > 1000 and not last_analyze: analysis["needs_analyze"].append(table_name) # Check index usage idx_ratio = table.get("index_scan_ratio", 0) or 0 seq_scans = table.get("seq_scan", 0) or 0 if seq_scans > 100 and idx_ratio < 50 and n_live > 10000: analysis["low_index_usage"].append({ "table": table_name, "index_scan_ratio": idx_ratio, "seq_scans": seq_scans, "rows": n_live }) # Generate recommendations if analysis["needs_vacuum"]: tables_list = ", ".join(t["table"] for t in analysis["needs_vacuum"][:5]) analysis["recommendations"].append( f"Run VACUUM on tables with high dead tuple ratios: {tables_list}" ) if analysis["needs_analyze"]: tables_list = ", ".join(analysis["needs_analyze"][:5]) analysis["recommendations"].append( f"Run ANALYZE on tables that haven't been analyzed: {tables_list}" ) if analysis["low_index_usage"]: for item in analysis["low_index_usage"][:3]: analysis["recommendations"].append( f"Table '{item['table']}' has low index usage ({item['index_scan_ratio']}% index scans). " "Consider adding indexes for frequently filtered columns." ) return analysis
- src/pgtuner_mcp/server.py:844-924 (helper)Helper resource handler _get_table_stats_resource providing similar table stats data via MCP resources, tested in test_resources.py.async def _get_table_stats_resource(schema: str, table_name: str) -> str: """Get table statistics as JSON.""" driver = get_sql_driver() query = """ SELECT schemaname, relname as table_name, n_live_tup as live_rows, n_dead_tup as dead_rows, n_mod_since_analyze as modifications_since_analyze, last_vacuum, last_autovacuum, last_analyze, last_autoanalyze, vacuum_count, autovacuum_count, analyze_count, autoanalyze_count, seq_scan, seq_tup_read, idx_scan, idx_tup_fetch, n_tup_ins as inserts, n_tup_upd as updates, n_tup_del as deletes, n_tup_hot_upd as hot_updates FROM pg_stat_user_tables WHERE schemaname = %s AND relname = %s """ result = await driver.execute_query(query, (schema, table_name)) if not result: return json.dumps({"error": f"Table {schema}.{table_name} not found"}, indent=2) # Get table size size_query = """ SELECT pg_size_pretty(pg_total_relation_size(quote_ident(%s) || '.' || quote_ident(%s))) as total_size, pg_size_pretty(pg_table_size(quote_ident(%s) || '.' || quote_ident(%s))) as table_size, pg_size_pretty(pg_indexes_size(quote_ident(%s) || '.' || quote_ident(%s))) as indexes_size """ size_result = await driver.execute_query( size_query, (schema, table_name, schema, table_name, schema, table_name) ) row = result[0] stats = { "schema": row["schemaname"], "table_name": row["table_name"], "row_counts": { "live_rows": row["live_rows"], "dead_rows": row["dead_rows"], "dead_row_ratio": round(row["dead_rows"] / max(row["live_rows"], 1) * 100, 2) }, "size": size_result[0] if size_result else {}, "maintenance": { "last_vacuum": str(row["last_vacuum"]) if row["last_vacuum"] else None, "last_autovacuum": str(row["last_autovacuum"]) if row["last_autovacuum"] else None, "last_analyze": str(row["last_analyze"]) if row["last_analyze"] else None, "last_autoanalyze": str(row["last_autoanalyze"]) if row["last_autoanalyze"] else None, "modifications_since_analyze": row["modifications_since_analyze"] }, "access_patterns": { "sequential_scans": row["seq_scan"], "sequential_rows_read": row["seq_tup_read"], "index_scans": row["idx_scan"], "index_rows_fetched": row["idx_tup_fetch"] }, "modifications": { "inserts": row["inserts"], "updates": row["updates"], "deletes": row["deletes"], "hot_updates": row["hot_updates"] } } return json.dumps(stats, indent=2, default=str)