Skip to main content
Glama
isdaniel

PostgreSQL-Performance-Tuner-Mcp

get_table_stats

Analyze PostgreSQL table statistics to identify maintenance needs and performance issues. Provides table sizes, row counts, scan ratios, and vacuum timing data for user-created tables.

Instructions

Get detailed statistics for user/client database tables.

Note: This tool analyzes only user-created tables and excludes PostgreSQL system tables (pg_catalog, information_schema, pg_toast). This focuses the analysis on your application's custom tables.

Returns information about:

  • Table size (data, indexes, total)

  • Row counts and dead tuple ratio

  • Last vacuum and analyze times

  • Sequential vs index scan ratios

  • Cache hit ratios

This helps identify tables that may need maintenance (VACUUM, ANALYZE) or have performance issues.

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
schema_nameNoSchema to analyze (default: public)public
table_nameNoSpecific table to analyze (optional, analyzes all tables if not provided)
include_indexesNoInclude index statistics
order_byNoOrder results by this metricsize

Implementation Reference

  • TableStatsToolHandler class implementing the execute logic for 'get_table_stats' tool via run_tool method, querying PostgreSQL stats views for table sizes, rows, vacuum info, etc.
    class TableStatsToolHandler(ToolHandler):
        """Tool handler for retrieving table statistics and health metrics."""
    
        name = "get_table_stats"
        title = "Table Statistics Analyzer"
        read_only_hint = True
        destructive_hint = False
        idempotent_hint = True
        open_world_hint = False
        description = """Get detailed statistics for user/client database tables.
    
    Note: This tool analyzes only user-created tables and excludes PostgreSQL
    system tables (pg_catalog, information_schema, pg_toast). This focuses
    the analysis on your application's custom tables.
    
    Returns information about:
    - Table size (data, indexes, total)
    - Row counts and dead tuple ratio
    - Last vacuum and analyze times
    - Sequential vs index scan ratios
    - Cache hit ratios
    
    This helps identify tables that may need maintenance (VACUUM, ANALYZE)
    or have performance issues."""
    
        def __init__(self, sql_driver: SqlDriver):
            self.sql_driver = sql_driver
    
        def get_tool_definition(self) -> Tool:
            return Tool(
                name=self.name,
                description=self.description,
                inputSchema={
                    "type": "object",
                    "properties": {
                        "schema_name": {
                            "type": "string",
                            "description": "Schema to analyze (default: public)",
                            "default": "public"
                        },
                        "table_name": {
                            "type": "string",
                            "description": "Specific table to analyze (optional, analyzes all tables if not provided)"
                        },
                        "include_indexes": {
                            "type": "boolean",
                            "description": "Include index statistics",
                            "default": True
                        },
                        "order_by": {
                            "type": "string",
                            "description": "Order results by this metric",
                            "enum": ["size", "rows", "dead_tuples", "seq_scans", "last_vacuum"],
                            "default": "size"
                        }
                    },
                    "required": []
                },
                annotations=self.get_annotations()
            )
    
        async def run_tool(self, arguments: dict[str, Any]) -> Sequence[TextContent]:
            try:
                schema_name = arguments.get("schema_name", "public")
                table_name = arguments.get("table_name")
                include_indexes = arguments.get("include_indexes", True)
                order_by = arguments.get("order_by", "size")
    
                # Build the query
                order_map = {
                    "size": "total_size DESC",
                    "rows": "n_live_tup DESC",
                    "dead_tuples": "n_dead_tup DESC",
                    "seq_scans": "seq_scan DESC",
                    "last_vacuum": "last_vacuum DESC NULLS LAST"
                }
                order_clause = order_map.get(order_by, "total_size DESC")
    
                table_filter = ""
                params = [schema_name]
                if table_name:
                    table_filter = "AND c.relname ILIKE %s"
                    params.append(table_name)
    
                # Query only user tables, explicitly excluding system schemas
                query = f"""
                    SELECT
                        c.relname as table_name,
                        n.nspname as schema_name,
                        pg_size_pretty(pg_table_size(c.oid)) as table_size,
                        pg_size_pretty(pg_indexes_size(c.oid)) as indexes_size,
                        pg_size_pretty(pg_total_relation_size(c.oid)) as total_size,
                        pg_total_relation_size(c.oid) as total_size_bytes,
                        s.n_live_tup,
                        s.n_dead_tup,
                        CASE
                            WHEN s.n_live_tup > 0
                            THEN ROUND(100.0 * s.n_dead_tup / s.n_live_tup, 2)
                            ELSE 0
                        END as dead_tuple_ratio,
                        s.seq_scan,
                        s.seq_tup_read,
                        s.idx_scan,
                        s.idx_tup_fetch,
                        CASE
                            WHEN s.seq_scan + COALESCE(s.idx_scan, 0) > 0
                            THEN ROUND(100.0 * COALESCE(s.idx_scan, 0) / (s.seq_scan + COALESCE(s.idx_scan, 0)), 2)
                            ELSE 0
                        END as index_scan_ratio,
                        s.last_vacuum,
                        s.last_autovacuum,
                        s.last_analyze,
                        s.last_autoanalyze,
                        s.vacuum_count,
                        s.autovacuum_count,
                        s.analyze_count,
                        s.autoanalyze_count
                    FROM pg_class c
                    JOIN pg_namespace n ON n.oid = c.relnamespace
                    LEFT JOIN pg_stat_user_tables s ON s.relid = c.oid
                    WHERE c.relkind = 'r'
                      AND n.nspname = %s
                      AND n.nspname NOT IN ('pg_catalog', 'information_schema', 'pg_toast')
                      {table_filter}
                    ORDER BY {order_clause}
                """
    
                results = await self.sql_driver.execute_query(query, params)
    
                if not results:
                    return self.format_result(f"No tables found in schema '{schema_name}'")
    
                output = {
                    "schema": schema_name,
                    "table_count": len(results),
                    "tables": results
                }
    
                # Add index statistics if requested
                if include_indexes and table_name:
                    index_query = """
                        SELECT
                            i.indexrelname as index_name,
                            i.idx_scan as scans,
                            i.idx_tup_read as tuples_read,
                            i.idx_tup_fetch as tuples_fetched,
                            pg_size_pretty(pg_relation_size(i.indexrelid)) as size,
                            pg_relation_size(i.indexrelid) as size_bytes,
                            pg_get_indexdef(i.indexrelid) as definition
                        FROM pg_stat_user_indexes i
                        JOIN pg_class c ON c.oid = i.relid
                        JOIN pg_namespace n ON n.oid = c.relnamespace
                        WHERE n.nspname = %s AND c.relname = %s
                        ORDER BY i.idx_scan DESC
                    """
                    index_results = await self.sql_driver.execute_query(
                        index_query,
                        [schema_name, table_name]
                    )
                    output["indexes"] = index_results
    
                # Add analysis and recommendations
                output["analysis"] = self._analyze_stats(results)
    
                return self.format_json_result(output)
    
            except Exception as e:
                return self.format_error(e)
    
        def _analyze_stats(self, tables: list[dict]) -> dict[str, Any]:
            """Analyze table stats and generate recommendations."""
            analysis = {
                "needs_vacuum": [],
                "needs_analyze": [],
                "low_index_usage": [],
                "recommendations": []
            }
    
            for table in tables:
                table_name = table.get("table_name", "unknown")
    
                # Check dead tuple ratio
                dead_ratio = table.get("dead_tuple_ratio", 0) or 0
                if dead_ratio > 10:
                    analysis["needs_vacuum"].append({
                        "table": table_name,
                        "dead_tuple_ratio": dead_ratio,
                        "dead_tuples": table.get("n_dead_tup", 0)
                    })
    
                # Check if analyze is needed
                last_analyze = table.get("last_analyze") or table.get("last_autoanalyze")
                n_live = table.get("n_live_tup", 0) or 0
                if n_live > 1000 and not last_analyze:
                    analysis["needs_analyze"].append(table_name)
    
                # Check index usage
                idx_ratio = table.get("index_scan_ratio", 0) or 0
                seq_scans = table.get("seq_scan", 0) or 0
                if seq_scans > 100 and idx_ratio < 50 and n_live > 10000:
                    analysis["low_index_usage"].append({
                        "table": table_name,
                        "index_scan_ratio": idx_ratio,
                        "seq_scans": seq_scans,
                        "rows": n_live
                    })
    
            # Generate recommendations
            if analysis["needs_vacuum"]:
                tables_list = ", ".join(t["table"] for t in analysis["needs_vacuum"][:5])
                analysis["recommendations"].append(
                    f"Run VACUUM on tables with high dead tuple ratios: {tables_list}"
                )
    
            if analysis["needs_analyze"]:
                tables_list = ", ".join(analysis["needs_analyze"][:5])
                analysis["recommendations"].append(
                    f"Run ANALYZE on tables that haven't been analyzed: {tables_list}"
                )
    
            if analysis["low_index_usage"]:
                for item in analysis["low_index_usage"][:3]:
                    analysis["recommendations"].append(
                        f"Table '{item['table']}' has low index usage ({item['index_scan_ratio']}% index scans). "
                        "Consider adding indexes for frequently filtered columns."
                    )
    
            return analysis
  • Input schema definition for get_table_stats tool parameters: schema_name, table_name, include_indexes, order_by.
    def get_tool_definition(self) -> Tool:
        return Tool(
            name=self.name,
            description=self.description,
            inputSchema={
                "type": "object",
                "properties": {
                    "schema_name": {
                        "type": "string",
                        "description": "Schema to analyze (default: public)",
                        "default": "public"
                    },
                    "table_name": {
                        "type": "string",
                        "description": "Specific table to analyze (optional, analyzes all tables if not provided)"
                    },
                    "include_indexes": {
                        "type": "boolean",
                        "description": "Include index statistics",
                        "default": True
                    },
                    "order_by": {
                        "type": "string",
                        "description": "Order results by this metric",
                        "enum": ["size", "rows", "dead_tuples", "seq_scans", "last_vacuum"],
                        "default": "size"
                    }
                },
                "required": []
            },
            annotations=self.get_annotations()
        )
  • Registration of TableStatsToolHandler instance in the register_all_tools function, called during server initialization.
    def register_all_tools() -> None:
        """
        Register all available tool handlers.
    
        This function serves as the central registry for all tools.
        New tool handlers should be added here for automatic registration.
        """
        driver = get_sql_driver()
        hypopg_service = HypoPGService(driver)
        index_advisor = IndexAdvisor(driver)
    
        # Performance analysis tools
        add_tool_handler(GetSlowQueriesToolHandler(driver))
        add_tool_handler(AnalyzeQueryToolHandler(driver))
        add_tool_handler(TableStatsToolHandler(driver))
    
        # Index tuning tools
        add_tool_handler(IndexAdvisorToolHandler(index_advisor))
        add_tool_handler(ExplainQueryToolHandler(driver, hypopg_service))
        add_tool_handler(HypoPGToolHandler(hypopg_service))
        add_tool_handler(UnusedIndexesToolHandler(driver))
    
        # Database health tools
        add_tool_handler(DatabaseHealthToolHandler(driver))
        add_tool_handler(ActiveQueriesToolHandler(driver))
        add_tool_handler(WaitEventsToolHandler(driver))
        add_tool_handler(DatabaseSettingsToolHandler(driver))
    
        # Bloat detection tools (using pgstattuple extension)
        add_tool_handler(TableBloatToolHandler(driver))
        add_tool_handler(IndexBloatToolHandler(driver))
        add_tool_handler(DatabaseBloatSummaryToolHandler(driver))
    
        logger.info(f"Registered {len(tool_handlers)} tool handlers")
  • Helper method _analyze_stats that processes table stats results and generates maintenance recommendations like needs_vacuum, needs_analyze.
    def _analyze_stats(self, tables: list[dict]) -> dict[str, Any]:
        """Analyze table stats and generate recommendations."""
        analysis = {
            "needs_vacuum": [],
            "needs_analyze": [],
            "low_index_usage": [],
            "recommendations": []
        }
    
        for table in tables:
            table_name = table.get("table_name", "unknown")
    
            # Check dead tuple ratio
            dead_ratio = table.get("dead_tuple_ratio", 0) or 0
            if dead_ratio > 10:
                analysis["needs_vacuum"].append({
                    "table": table_name,
                    "dead_tuple_ratio": dead_ratio,
                    "dead_tuples": table.get("n_dead_tup", 0)
                })
    
            # Check if analyze is needed
            last_analyze = table.get("last_analyze") or table.get("last_autoanalyze")
            n_live = table.get("n_live_tup", 0) or 0
            if n_live > 1000 and not last_analyze:
                analysis["needs_analyze"].append(table_name)
    
            # Check index usage
            idx_ratio = table.get("index_scan_ratio", 0) or 0
            seq_scans = table.get("seq_scan", 0) or 0
            if seq_scans > 100 and idx_ratio < 50 and n_live > 10000:
                analysis["low_index_usage"].append({
                    "table": table_name,
                    "index_scan_ratio": idx_ratio,
                    "seq_scans": seq_scans,
                    "rows": n_live
                })
    
        # Generate recommendations
        if analysis["needs_vacuum"]:
            tables_list = ", ".join(t["table"] for t in analysis["needs_vacuum"][:5])
            analysis["recommendations"].append(
                f"Run VACUUM on tables with high dead tuple ratios: {tables_list}"
            )
    
        if analysis["needs_analyze"]:
            tables_list = ", ".join(analysis["needs_analyze"][:5])
            analysis["recommendations"].append(
                f"Run ANALYZE on tables that haven't been analyzed: {tables_list}"
            )
    
        if analysis["low_index_usage"]:
            for item in analysis["low_index_usage"][:3]:
                analysis["recommendations"].append(
                    f"Table '{item['table']}' has low index usage ({item['index_scan_ratio']}% index scans). "
                    "Consider adding indexes for frequently filtered columns."
                )
    
        return analysis
  • Helper resource handler _get_table_stats_resource providing similar table stats data via MCP resources, tested in test_resources.py.
    async def _get_table_stats_resource(schema: str, table_name: str) -> str:
        """Get table statistics as JSON."""
    
        driver = get_sql_driver()
    
        query = """
            SELECT
                schemaname,
                relname as table_name,
                n_live_tup as live_rows,
                n_dead_tup as dead_rows,
                n_mod_since_analyze as modifications_since_analyze,
                last_vacuum,
                last_autovacuum,
                last_analyze,
                last_autoanalyze,
                vacuum_count,
                autovacuum_count,
                analyze_count,
                autoanalyze_count,
                seq_scan,
                seq_tup_read,
                idx_scan,
                idx_tup_fetch,
                n_tup_ins as inserts,
                n_tup_upd as updates,
                n_tup_del as deletes,
                n_tup_hot_upd as hot_updates
            FROM pg_stat_user_tables
            WHERE schemaname = %s AND relname = %s
        """
    
        result = await driver.execute_query(query, (schema, table_name))
    
        if not result:
            return json.dumps({"error": f"Table {schema}.{table_name} not found"}, indent=2)
    
        # Get table size
        size_query = """
            SELECT
                pg_size_pretty(pg_total_relation_size(quote_ident(%s) || '.' || quote_ident(%s))) as total_size,
                pg_size_pretty(pg_table_size(quote_ident(%s) || '.' || quote_ident(%s))) as table_size,
                pg_size_pretty(pg_indexes_size(quote_ident(%s) || '.' || quote_ident(%s))) as indexes_size
        """
        size_result = await driver.execute_query(
            size_query, (schema, table_name, schema, table_name, schema, table_name)
        )
    
        row = result[0]
        stats = {
            "schema": row["schemaname"],
            "table_name": row["table_name"],
            "row_counts": {
                "live_rows": row["live_rows"],
                "dead_rows": row["dead_rows"],
                "dead_row_ratio": round(row["dead_rows"] / max(row["live_rows"], 1) * 100, 2)
            },
            "size": size_result[0] if size_result else {},
            "maintenance": {
                "last_vacuum": str(row["last_vacuum"]) if row["last_vacuum"] else None,
                "last_autovacuum": str(row["last_autovacuum"]) if row["last_autovacuum"] else None,
                "last_analyze": str(row["last_analyze"]) if row["last_analyze"] else None,
                "last_autoanalyze": str(row["last_autoanalyze"]) if row["last_autoanalyze"] else None,
                "modifications_since_analyze": row["modifications_since_analyze"]
            },
            "access_patterns": {
                "sequential_scans": row["seq_scan"],
                "sequential_rows_read": row["seq_tup_read"],
                "index_scans": row["idx_scan"],
                "index_rows_fetched": row["idx_tup_fetch"]
            },
            "modifications": {
                "inserts": row["inserts"],
                "updates": row["updates"],
                "deletes": row["deletes"],
                "hot_updates": row["hot_updates"]
            }
        }
    
        return json.dumps(stats, indent=2, default=str)

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/isdaniel/pgtuner-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server