Skip to main content
Glama
isdaniel

PostgreSQL-Performance-Tuner-Mcp

get_table_stats

Analyze PostgreSQL table statistics to identify maintenance needs and performance issues. Provides table sizes, row counts, scan ratios, and vacuum timing data for user-created tables.

Instructions

Get detailed statistics for user/client database tables.

Note: This tool analyzes only user-created tables and excludes PostgreSQL system tables (pg_catalog, information_schema, pg_toast). This focuses the analysis on your application's custom tables.

Returns information about:

  • Table size (data, indexes, total)

  • Row counts and dead tuple ratio

  • Last vacuum and analyze times

  • Sequential vs index scan ratios

  • Cache hit ratios

This helps identify tables that may need maintenance (VACUUM, ANALYZE) or have performance issues.

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
schema_nameNoSchema to analyze (default: public)public
table_nameNoSpecific table to analyze (optional, analyzes all tables if not provided)
include_indexesNoInclude index statistics
order_byNoOrder results by this metricsize

Implementation Reference

  • TableStatsToolHandler class implementing the execute logic for 'get_table_stats' tool via run_tool method, querying PostgreSQL stats views for table sizes, rows, vacuum info, etc.
    class TableStatsToolHandler(ToolHandler): """Tool handler for retrieving table statistics and health metrics.""" name = "get_table_stats" title = "Table Statistics Analyzer" read_only_hint = True destructive_hint = False idempotent_hint = True open_world_hint = False description = """Get detailed statistics for user/client database tables. Note: This tool analyzes only user-created tables and excludes PostgreSQL system tables (pg_catalog, information_schema, pg_toast). This focuses the analysis on your application's custom tables. Returns information about: - Table size (data, indexes, total) - Row counts and dead tuple ratio - Last vacuum and analyze times - Sequential vs index scan ratios - Cache hit ratios This helps identify tables that may need maintenance (VACUUM, ANALYZE) or have performance issues.""" def __init__(self, sql_driver: SqlDriver): self.sql_driver = sql_driver def get_tool_definition(self) -> Tool: return Tool( name=self.name, description=self.description, inputSchema={ "type": "object", "properties": { "schema_name": { "type": "string", "description": "Schema to analyze (default: public)", "default": "public" }, "table_name": { "type": "string", "description": "Specific table to analyze (optional, analyzes all tables if not provided)" }, "include_indexes": { "type": "boolean", "description": "Include index statistics", "default": True }, "order_by": { "type": "string", "description": "Order results by this metric", "enum": ["size", "rows", "dead_tuples", "seq_scans", "last_vacuum"], "default": "size" } }, "required": [] }, annotations=self.get_annotations() ) async def run_tool(self, arguments: dict[str, Any]) -> Sequence[TextContent]: try: schema_name = arguments.get("schema_name", "public") table_name = arguments.get("table_name") include_indexes = arguments.get("include_indexes", True) order_by = arguments.get("order_by", "size") # Build the query order_map = { "size": "total_size DESC", "rows": "n_live_tup DESC", "dead_tuples": "n_dead_tup DESC", "seq_scans": "seq_scan DESC", "last_vacuum": "last_vacuum DESC NULLS LAST" } order_clause = order_map.get(order_by, "total_size DESC") table_filter = "" params = [schema_name] if table_name: table_filter = "AND c.relname ILIKE %s" params.append(table_name) # Query only user tables, explicitly excluding system schemas query = f""" SELECT c.relname as table_name, n.nspname as schema_name, pg_size_pretty(pg_table_size(c.oid)) as table_size, pg_size_pretty(pg_indexes_size(c.oid)) as indexes_size, pg_size_pretty(pg_total_relation_size(c.oid)) as total_size, pg_total_relation_size(c.oid) as total_size_bytes, s.n_live_tup, s.n_dead_tup, CASE WHEN s.n_live_tup > 0 THEN ROUND(100.0 * s.n_dead_tup / s.n_live_tup, 2) ELSE 0 END as dead_tuple_ratio, s.seq_scan, s.seq_tup_read, s.idx_scan, s.idx_tup_fetch, CASE WHEN s.seq_scan + COALESCE(s.idx_scan, 0) > 0 THEN ROUND(100.0 * COALESCE(s.idx_scan, 0) / (s.seq_scan + COALESCE(s.idx_scan, 0)), 2) ELSE 0 END as index_scan_ratio, s.last_vacuum, s.last_autovacuum, s.last_analyze, s.last_autoanalyze, s.vacuum_count, s.autovacuum_count, s.analyze_count, s.autoanalyze_count FROM pg_class c JOIN pg_namespace n ON n.oid = c.relnamespace LEFT JOIN pg_stat_user_tables s ON s.relid = c.oid WHERE c.relkind = 'r' AND n.nspname = %s AND n.nspname NOT IN ('pg_catalog', 'information_schema', 'pg_toast') {table_filter} ORDER BY {order_clause} """ results = await self.sql_driver.execute_query(query, params) if not results: return self.format_result(f"No tables found in schema '{schema_name}'") output = { "schema": schema_name, "table_count": len(results), "tables": results } # Add index statistics if requested if include_indexes and table_name: index_query = """ SELECT i.indexrelname as index_name, i.idx_scan as scans, i.idx_tup_read as tuples_read, i.idx_tup_fetch as tuples_fetched, pg_size_pretty(pg_relation_size(i.indexrelid)) as size, pg_relation_size(i.indexrelid) as size_bytes, pg_get_indexdef(i.indexrelid) as definition FROM pg_stat_user_indexes i JOIN pg_class c ON c.oid = i.relid JOIN pg_namespace n ON n.oid = c.relnamespace WHERE n.nspname = %s AND c.relname = %s ORDER BY i.idx_scan DESC """ index_results = await self.sql_driver.execute_query( index_query, [schema_name, table_name] ) output["indexes"] = index_results # Add analysis and recommendations output["analysis"] = self._analyze_stats(results) return self.format_json_result(output) except Exception as e: return self.format_error(e) def _analyze_stats(self, tables: list[dict]) -> dict[str, Any]: """Analyze table stats and generate recommendations.""" analysis = { "needs_vacuum": [], "needs_analyze": [], "low_index_usage": [], "recommendations": [] } for table in tables: table_name = table.get("table_name", "unknown") # Check dead tuple ratio dead_ratio = table.get("dead_tuple_ratio", 0) or 0 if dead_ratio > 10: analysis["needs_vacuum"].append({ "table": table_name, "dead_tuple_ratio": dead_ratio, "dead_tuples": table.get("n_dead_tup", 0) }) # Check if analyze is needed last_analyze = table.get("last_analyze") or table.get("last_autoanalyze") n_live = table.get("n_live_tup", 0) or 0 if n_live > 1000 and not last_analyze: analysis["needs_analyze"].append(table_name) # Check index usage idx_ratio = table.get("index_scan_ratio", 0) or 0 seq_scans = table.get("seq_scan", 0) or 0 if seq_scans > 100 and idx_ratio < 50 and n_live > 10000: analysis["low_index_usage"].append({ "table": table_name, "index_scan_ratio": idx_ratio, "seq_scans": seq_scans, "rows": n_live }) # Generate recommendations if analysis["needs_vacuum"]: tables_list = ", ".join(t["table"] for t in analysis["needs_vacuum"][:5]) analysis["recommendations"].append( f"Run VACUUM on tables with high dead tuple ratios: {tables_list}" ) if analysis["needs_analyze"]: tables_list = ", ".join(analysis["needs_analyze"][:5]) analysis["recommendations"].append( f"Run ANALYZE on tables that haven't been analyzed: {tables_list}" ) if analysis["low_index_usage"]: for item in analysis["low_index_usage"][:3]: analysis["recommendations"].append( f"Table '{item['table']}' has low index usage ({item['index_scan_ratio']}% index scans). " "Consider adding indexes for frequently filtered columns." ) return analysis
  • Input schema definition for get_table_stats tool parameters: schema_name, table_name, include_indexes, order_by.
    def get_tool_definition(self) -> Tool: return Tool( name=self.name, description=self.description, inputSchema={ "type": "object", "properties": { "schema_name": { "type": "string", "description": "Schema to analyze (default: public)", "default": "public" }, "table_name": { "type": "string", "description": "Specific table to analyze (optional, analyzes all tables if not provided)" }, "include_indexes": { "type": "boolean", "description": "Include index statistics", "default": True }, "order_by": { "type": "string", "description": "Order results by this metric", "enum": ["size", "rows", "dead_tuples", "seq_scans", "last_vacuum"], "default": "size" } }, "required": [] }, annotations=self.get_annotations() )
  • Registration of TableStatsToolHandler instance in the register_all_tools function, called during server initialization.
    def register_all_tools() -> None: """ Register all available tool handlers. This function serves as the central registry for all tools. New tool handlers should be added here for automatic registration. """ driver = get_sql_driver() hypopg_service = HypoPGService(driver) index_advisor = IndexAdvisor(driver) # Performance analysis tools add_tool_handler(GetSlowQueriesToolHandler(driver)) add_tool_handler(AnalyzeQueryToolHandler(driver)) add_tool_handler(TableStatsToolHandler(driver)) # Index tuning tools add_tool_handler(IndexAdvisorToolHandler(index_advisor)) add_tool_handler(ExplainQueryToolHandler(driver, hypopg_service)) add_tool_handler(HypoPGToolHandler(hypopg_service)) add_tool_handler(UnusedIndexesToolHandler(driver)) # Database health tools add_tool_handler(DatabaseHealthToolHandler(driver)) add_tool_handler(ActiveQueriesToolHandler(driver)) add_tool_handler(WaitEventsToolHandler(driver)) add_tool_handler(DatabaseSettingsToolHandler(driver)) # Bloat detection tools (using pgstattuple extension) add_tool_handler(TableBloatToolHandler(driver)) add_tool_handler(IndexBloatToolHandler(driver)) add_tool_handler(DatabaseBloatSummaryToolHandler(driver)) logger.info(f"Registered {len(tool_handlers)} tool handlers")
  • Helper method _analyze_stats that processes table stats results and generates maintenance recommendations like needs_vacuum, needs_analyze.
    def _analyze_stats(self, tables: list[dict]) -> dict[str, Any]: """Analyze table stats and generate recommendations.""" analysis = { "needs_vacuum": [], "needs_analyze": [], "low_index_usage": [], "recommendations": [] } for table in tables: table_name = table.get("table_name", "unknown") # Check dead tuple ratio dead_ratio = table.get("dead_tuple_ratio", 0) or 0 if dead_ratio > 10: analysis["needs_vacuum"].append({ "table": table_name, "dead_tuple_ratio": dead_ratio, "dead_tuples": table.get("n_dead_tup", 0) }) # Check if analyze is needed last_analyze = table.get("last_analyze") or table.get("last_autoanalyze") n_live = table.get("n_live_tup", 0) or 0 if n_live > 1000 and not last_analyze: analysis["needs_analyze"].append(table_name) # Check index usage idx_ratio = table.get("index_scan_ratio", 0) or 0 seq_scans = table.get("seq_scan", 0) or 0 if seq_scans > 100 and idx_ratio < 50 and n_live > 10000: analysis["low_index_usage"].append({ "table": table_name, "index_scan_ratio": idx_ratio, "seq_scans": seq_scans, "rows": n_live }) # Generate recommendations if analysis["needs_vacuum"]: tables_list = ", ".join(t["table"] for t in analysis["needs_vacuum"][:5]) analysis["recommendations"].append( f"Run VACUUM on tables with high dead tuple ratios: {tables_list}" ) if analysis["needs_analyze"]: tables_list = ", ".join(analysis["needs_analyze"][:5]) analysis["recommendations"].append( f"Run ANALYZE on tables that haven't been analyzed: {tables_list}" ) if analysis["low_index_usage"]: for item in analysis["low_index_usage"][:3]: analysis["recommendations"].append( f"Table '{item['table']}' has low index usage ({item['index_scan_ratio']}% index scans). " "Consider adding indexes for frequently filtered columns." ) return analysis
  • Helper resource handler _get_table_stats_resource providing similar table stats data via MCP resources, tested in test_resources.py.
    async def _get_table_stats_resource(schema: str, table_name: str) -> str: """Get table statistics as JSON.""" driver = get_sql_driver() query = """ SELECT schemaname, relname as table_name, n_live_tup as live_rows, n_dead_tup as dead_rows, n_mod_since_analyze as modifications_since_analyze, last_vacuum, last_autovacuum, last_analyze, last_autoanalyze, vacuum_count, autovacuum_count, analyze_count, autoanalyze_count, seq_scan, seq_tup_read, idx_scan, idx_tup_fetch, n_tup_ins as inserts, n_tup_upd as updates, n_tup_del as deletes, n_tup_hot_upd as hot_updates FROM pg_stat_user_tables WHERE schemaname = %s AND relname = %s """ result = await driver.execute_query(query, (schema, table_name)) if not result: return json.dumps({"error": f"Table {schema}.{table_name} not found"}, indent=2) # Get table size size_query = """ SELECT pg_size_pretty(pg_total_relation_size(quote_ident(%s) || '.' || quote_ident(%s))) as total_size, pg_size_pretty(pg_table_size(quote_ident(%s) || '.' || quote_ident(%s))) as table_size, pg_size_pretty(pg_indexes_size(quote_ident(%s) || '.' || quote_ident(%s))) as indexes_size """ size_result = await driver.execute_query( size_query, (schema, table_name, schema, table_name, schema, table_name) ) row = result[0] stats = { "schema": row["schemaname"], "table_name": row["table_name"], "row_counts": { "live_rows": row["live_rows"], "dead_rows": row["dead_rows"], "dead_row_ratio": round(row["dead_rows"] / max(row["live_rows"], 1) * 100, 2) }, "size": size_result[0] if size_result else {}, "maintenance": { "last_vacuum": str(row["last_vacuum"]) if row["last_vacuum"] else None, "last_autovacuum": str(row["last_autovacuum"]) if row["last_autovacuum"] else None, "last_analyze": str(row["last_analyze"]) if row["last_analyze"] else None, "last_autoanalyze": str(row["last_autoanalyze"]) if row["last_autoanalyze"] else None, "modifications_since_analyze": row["modifications_since_analyze"] }, "access_patterns": { "sequential_scans": row["seq_scan"], "sequential_rows_read": row["seq_tup_read"], "index_scans": row["idx_scan"], "index_rows_fetched": row["idx_tup_fetch"] }, "modifications": { "inserts": row["inserts"], "updates": row["updates"], "deletes": row["deletes"], "hot_updates": row["hot_updates"] } } return json.dumps(stats, indent=2, default=str)

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/isdaniel/pgtuner-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server