Skip to main content
Glama

get_top_queries

Identify and analyze the slowest SQL queries by execution time using data from the 'pg_stat_statements' extension. Specify the number of queries and sort by total or mean execution time to optimize database performance.

Instructions

Reports the slowest SQL queries based on execution time, using data from the 'pg_stat_statements' extension.

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
limitNoNumber of slow queries to return
sort_byNoSort criteria: 'total' for total execution time or 'mean' for mean execution time per callmean

Implementation Reference

  • The primary handler function for the 'get_top_queries' MCP tool. It is registered via @mcp.tool decorator, defines input schema via Pydantic Fields, and orchestrates the logic by instantiating TopQueriesCalc and calling appropriate methods based on parameters.
    @mcp.tool( name="get_top_queries", description=f"Reports the slowest or most resource-intensive queries using data from the '{PG_STAT_STATEMENTS}' extension.", ) async def get_top_queries( sort_by: str = Field( description="Ranking criteria: 'total_time' for total execution time or 'mean_time' for mean execution time per call, or 'resources' " "for resource-intensive queries", default="resources", ), limit: int = Field(description="Number of queries to return when ranking based on mean_time or total_time", default=10), ) -> ResponseType: try: sql_driver = await get_sql_driver() top_queries_tool = TopQueriesCalc(sql_driver=sql_driver) if sort_by == "resources": result = await top_queries_tool.get_top_resource_queries() return format_text_response(result) elif sort_by == "mean_time" or sort_by == "total_time": # Map the sort_by values to what get_top_queries_by_time expects result = await top_queries_tool.get_top_queries_by_time(limit=limit, sort_by="mean" if sort_by == "mean_time" else "total") else: return format_error_response("Invalid sort criteria. Please use 'resources' or 'mean_time' or 'total_time'.") return format_text_response(result) except Exception as e: logger.error(f"Error getting slow queries: {e}") return format_error_response(str(e))
  • Helper method in TopQueriesCalc that executes the SQL query against pg_stat_statements to fetch top queries sorted by total or mean execution time, handling PostgreSQL version differences in column names.
    async def get_top_queries_by_time(self, limit: int = 10, sort_by: Literal["total", "mean"] = "mean") -> str: """Reports the slowest SQL queries based on execution time. Args: limit: Number of slow queries to return sort_by: Sort criteria - 'total' for total execution time or 'mean' for mean execution time per call (default) Returns: A string with the top queries or installation instructions """ try: logger.debug(f"Getting top queries by time. limit={limit}, sort_by={sort_by}") extension_status = await check_extension( self.sql_driver, PG_STAT_STATEMENTS, include_messages=False, ) if not extension_status.is_installed: logger.warning(f"Extension {PG_STAT_STATEMENTS} is not installed") # Return installation instructions if the extension is not installed return install_pg_stat_statements_message # Check PostgreSQL version to determine column names pg_version = await get_postgres_version(self.sql_driver) logger.debug(f"PostgreSQL version: {pg_version}") # Column names changed in PostgreSQL 13 if pg_version >= 13: # PostgreSQL 13 and newer total_time_col = "total_exec_time" mean_time_col = "mean_exec_time" else: # PostgreSQL 12 and older total_time_col = "total_time" mean_time_col = "mean_time" logger.debug(f"Using time columns: total={total_time_col}, mean={mean_time_col}") # Determine which column to sort by based on sort_by parameter and version order_by_column = total_time_col if sort_by == "total" else mean_time_col query = f""" SELECT query, calls, {total_time_col}, {mean_time_col}, rows FROM pg_stat_statements ORDER BY {order_by_column} DESC LIMIT {{}}; """ logger.debug(f"Executing query: {query}") slow_query_rows = await SafeSqlDriver.execute_param_query( self.sql_driver, query, [limit], ) slow_queries = [row.cells for row in slow_query_rows] if slow_query_rows else [] logger.info(f"Found {len(slow_queries)} slow queries") # Create result description based on sort criteria if sort_by == "total": criteria = "total execution time" else: criteria = "mean execution time per call" result = f"Top {len(slow_queries)} slowest queries by {criteria}:\n" result += str(slow_queries) return result except Exception as e: logger.error(f"Error getting slow queries: {e}", exc_info=True) return f"Error getting slow queries: {e}"
  • Helper method in TopQueriesCalc that executes a complex SQL query to identify resource-intensive queries based on fractions of total execution time, buffer access, and WAL usage.
    async def get_top_resource_queries(self, frac_threshold: float = 0.05) -> str: """Reports the most time consuming queries based on a resource blend. Args: frac_threshold: Fraction threshold for filtering queries (default: 0.05) Returns: A string with the resource-heavy queries or error message """ try: logger.debug(f"Getting top resource queries with threshold {frac_threshold}") extension_status = await check_extension( self.sql_driver, PG_STAT_STATEMENTS, include_messages=False, ) if not extension_status.is_installed: logger.warning(f"Extension {PG_STAT_STATEMENTS} is not installed") # Return installation instructions if the extension is not installed return install_pg_stat_statements_message # Check PostgreSQL version to determine column names pg_version = await get_postgres_version(self.sql_driver) logger.debug(f"PostgreSQL version: {pg_version}") # Column names changed in PostgreSQL 13 if pg_version >= 13: # PostgreSQL 13 and newer total_time_col = "total_exec_time" mean_time_col = "mean_exec_time" else: # PostgreSQL 12 and older total_time_col = "total_time" mean_time_col = "mean_time" query = cast( LiteralString, f""" WITH resource_fractions AS ( SELECT query, calls, rows, {total_time_col} total_exec_time, {mean_time_col} mean_exec_time, stddev_exec_time, shared_blks_hit, shared_blks_read, shared_blks_dirtied, wal_bytes, total_exec_time / SUM(total_exec_time) OVER () AS total_exec_time_frac, (shared_blks_hit + shared_blks_read) / SUM(shared_blks_hit + shared_blks_read) OVER () AS shared_blks_accessed_frac, shared_blks_read / SUM(shared_blks_read) OVER () AS shared_blks_read_frac, shared_blks_dirtied / SUM(shared_blks_dirtied) OVER () AS shared_blks_dirtied_frac, wal_bytes / SUM(wal_bytes) OVER () AS total_wal_bytes_frac FROM pg_stat_statements ) SELECT query, calls, rows, total_exec_time, mean_exec_time, stddev_exec_time, total_exec_time_frac, shared_blks_accessed_frac, shared_blks_read_frac, shared_blks_dirtied_frac, total_wal_bytes_frac, shared_blks_hit, shared_blks_read, shared_blks_dirtied, wal_bytes FROM resource_fractions WHERE total_exec_time_frac > {frac_threshold} OR shared_blks_accessed_frac > {frac_threshold} OR shared_blks_read_frac > {frac_threshold} OR shared_blks_dirtied_frac > {frac_threshold} OR total_wal_bytes_frac > {frac_threshold} ORDER BY total_exec_time DESC """, ) logger.debug(f"Executing query: {query}") slow_query_rows = await SafeSqlDriver.execute_param_query( self.sql_driver, query, ) resource_queries = [row.cells for row in slow_query_rows] if slow_query_rows else [] logger.info(f"Found {len(resource_queries)} resource-intensive queries") return str(resource_queries) except Exception as e: logger.error(f"Error getting resource-intensive queries: {e}", exc_info=True) return f"Error resource-intensive queries: {e}"

Other Tools

Related Tools

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/crystaldba/postgres-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server