Skip to main content
Glama
call518

MCP PostgreSQL Operations

version_compat.py•21.4 kB
""" PostgreSQL Version Compatibility Utilities Provides version detection and compatibility handling for MCP PostgreSQL tools. """ import re import logging from typing import Tuple, Optional from .functions import execute_single_query logger = logging.getLogger(__name__) class PostgreSQLVersion: """PostgreSQL version information and compatibility utilities.""" def __init__(self, major: int, minor: int = 0, patch: int = 0): self.major = major self.minor = minor self.patch = patch def __str__(self): return f"{self.major}.{self.minor}.{self.patch}" def __ge__(self, other): if isinstance(other, (int, float)): return self.major >= other if isinstance(other, PostgreSQLVersion): return (self.major, self.minor, self.patch) >= (other.major, other.minor, other.patch) return False def __lt__(self, other): if isinstance(other, (int, float)): return self.major < other if isinstance(other, PostgreSQLVersion): return (self.major, self.minor, self.patch) < (other.major, other.minor, other.patch) return False @property def is_modern(self) -> bool: """Check if this is a modern PostgreSQL version (12+).""" return self.major >= 12 @property def has_checkpointer_split(self) -> bool: """Check if checkpointer stats are in separate view (only PG15).""" return self.major == 15 @property def has_pg_stat_io(self) -> bool: """Check if pg_stat_io view is available (16+).""" return self.major >= 16 @property def has_enhanced_wal_receiver(self) -> bool: """Check if pg_stat_wal_receiver has written_lsn/flushed_lsn (16+).""" return self.major >= 16 @property def has_replication_slot_stats(self) -> bool: """Check if pg_stat_replication_slots is available (14+).""" return self.major >= 14 @property def has_parallel_leader_tracking(self) -> bool: """Check if pg_stat_activity has leader_pid column (14+).""" return self.major >= 14 @property def has_replication_slot_wal_status(self) -> bool: """Check if pg_replication_slots has wal_status and safe_wal_size columns (13+).""" return self.major >= 13 @property def has_table_stats_ins_since_vacuum(self) -> bool: """Check if pg_stat_*_tables has n_ins_since_vacuum column (13+).""" return self.major >= 13 @property def has_pg_stat_statements_exec_time(self) -> bool: """Check if pg_stat_statements uses total_exec_time and mean_exec_time columns (13+).""" return self.major >= 13 # Global version cache _cached_version: Optional[PostgreSQLVersion] = None async def get_postgresql_version(database: str = None, force_refresh: bool = False) -> PostgreSQLVersion: """ Get PostgreSQL server version with caching. Args: database: Database to connect to force_refresh: Force refresh cached version Returns: PostgreSQLVersion object """ global _cached_version if _cached_version is not None and not force_refresh: return _cached_version try: result = await execute_single_query("SELECT version()", database=database) version_string = result.get('version', '') # Parse version string like "PostgreSQL 16.1 on x86_64-pc-linux-gnu..." version_match = re.search(r'PostgreSQL\s+(\d+)\.?(\d*)\.?(\d*)', version_string) if version_match: major = int(version_match.group(1)) minor = int(version_match.group(2) or 0) patch = int(version_match.group(3) or 0) _cached_version = PostgreSQLVersion(major, minor, patch) logger.info(f"Detected PostgreSQL version: {_cached_version}") return _cached_version else: logger.warning(f"Could not parse version string: {version_string}") # Default to PostgreSQL 17 if parsing fails _cached_version = PostgreSQLVersion(17, 0, 0) return _cached_version except Exception as e: logger.error(f"Failed to get PostgreSQL version: {e}") # Default to PostgreSQL 17 if version detection fails _cached_version = PostgreSQLVersion(17, 0, 0) return _cached_version async def check_feature_availability(feature: str, database: str = None) -> bool: """ Check if a specific PostgreSQL feature is available. Args: feature: Feature name to check database: Database to connect to Returns: True if feature is available """ version = await get_postgresql_version(database) feature_requirements = { 'pg_stat_io': version.has_pg_stat_io, 'checkpointer_split': version.has_checkpointer_split, 'enhanced_wal_receiver': version.has_enhanced_wal_receiver, 'replication_slot_stats': version.has_replication_slot_stats, 'parallel_leader_tracking': version.has_parallel_leader_tracking, } return feature_requirements.get(feature, False) async def get_compatible_column_list(table_name: str, all_columns: list, version_specific_columns: dict, database: str = None) -> str: """ Generate version-compatible column list for SELECT queries. Args: table_name: PostgreSQL table/view name all_columns: List of all possible columns version_specific_columns: Dict mapping version requirements to columns database: Database to connect to Returns: Comma-separated column list for SQL SELECT """ version = await get_postgresql_version(database) available_columns = [] for col in all_columns: # Check if column has version requirements required_version = version_specific_columns.get(col) if required_version is None: # No version requirement - always available available_columns.append(col) elif version >= required_version: # Version requirement met available_columns.append(col) else: # Version requirement not met - provide NULL placeholder available_columns.append(f"NULL::text AS {col}") return ", ".join(available_columns) def get_version_appropriate_query(base_query: str, version_variants: dict, version: PostgreSQLVersion) -> str: """ Select version-appropriate query variant. Args: base_query: Default/fallback query version_variants: Dict mapping version requirements to query variants version: PostgreSQL version Returns: Most appropriate query for the version """ # Sort version variants by version (highest first) sorted_variants = sorted(version_variants.items(), key=lambda x: (x[0].major, x[0].minor), reverse=True) for required_version, query in sorted_variants: if version >= required_version: return query return base_query # Version-specific query builders class VersionAwareQueries: """Collection of version-aware query builders.""" @staticmethod async def get_bgwriter_checkpointer_stats(database: str = None) -> str: """Get background writer/checkpointer stats with version compatibility.""" version = await get_postgresql_version(database) if version.has_checkpointer_split: # PostgreSQL 15+: Use separate checkpointer view return """ SELECT 'checkpointer' as component, num_timed, num_requested, restartpoints_timed, restartpoints_req, restartpoints_done, write_time, sync_time, buffers_written, stats_reset FROM pg_stat_checkpointer UNION ALL SELECT 'bgwriter' as component, NULL::bigint as num_timed, NULL::bigint as num_requested, NULL::bigint as restartpoints_timed, NULL::bigint as restartpoints_req, NULL::bigint as restartpoints_done, NULL::double precision as write_time, NULL::double precision as sync_time, buffers_clean as buffers_written, stats_reset FROM pg_stat_bgwriter """ else: # PostgreSQL 10-14: All stats in bgwriter view return """ SELECT 'bgwriter_legacy' as component, buffers_clean, maxwritten_clean, buffers_alloc, stats_reset, NULL::bigint as num_timed, NULL::bigint as num_requested FROM pg_stat_bgwriter """ @staticmethod async def get_io_statistics(database: str = None) -> str: """Get I/O statistics with version compatibility.""" version = await get_postgresql_version(database) if version.has_pg_stat_io: # PostgreSQL 16+: Use comprehensive pg_stat_io return """ SELECT backend_type, object, context, reads, read_time, writes, write_time, extends, extend_time, hits, evictions, reuses, fsyncs, fsync_time FROM pg_stat_io WHERE reads > 0 OR writes > 0 OR hits > 0 """ else: # PostgreSQL 10-15: Fall back to pg_statio_* views return """ SELECT 'client backend' as backend_type, 'relation' as object, 'normal' as context, heap_blks_read as reads, 0::double precision as read_time, 0::bigint as writes, 0::double precision as write_time, 0::bigint as extends, 0::double precision as extend_time, heap_blks_hit as hits, 0::bigint as evictions, 0::bigint as reuses, 0::bigint as fsyncs, 0::double precision as fsync_time FROM pg_statio_all_tables WHERE heap_blks_read > 0 OR heap_blks_hit > 0 """ @staticmethod async def get_activity_with_leader_info(database: str = None) -> str: """Get activity info with parallel leader tracking if available.""" version = await get_postgresql_version(database) base_columns = [ "pid", "datname", "usename", "application_name", "client_addr", "state", "query_start", "query" ] version_columns = { "leader_pid": PostgreSQLVersion(14), "query_id": PostgreSQLVersion(13) } columns = await get_compatible_column_list( "pg_stat_activity", base_columns + list(version_columns.keys()), version_columns, database ) return f"SELECT {columns} FROM pg_stat_activity WHERE state = 'active'" @staticmethod async def get_replication_slots_query(database: str = None) -> str: """Get replication slots info with version compatibility.""" version = await get_postgresql_version(database) base_columns = [ "slot_name", "plugin", "slot_type", "datoid", "temporary", "active", "active_pid", "restart_lsn", "confirmed_flush_lsn" ] # wal_status and safe_wal_size are available from PostgreSQL 13+ if version.has_replication_slot_wal_status: return """ SELECT slot_name, plugin, slot_type, datoid, temporary, active, active_pid, restart_lsn, confirmed_flush_lsn, wal_status, safe_wal_size / 1024 / 1024 as safe_wal_size_mb FROM pg_replication_slots ORDER BY slot_name """ else: # PostgreSQL 12 and older - without wal_status and safe_wal_size return """ SELECT slot_name, plugin, slot_type, datoid, temporary, active, active_pid, restart_lsn, confirmed_flush_lsn, NULL::text as wal_status, NULL::numeric as safe_wal_size_mb FROM pg_replication_slots ORDER BY slot_name """ @staticmethod async def get_wal_receiver_query(database: str = None) -> str: """Get WAL receiver status with version compatibility.""" version = await get_postgresql_version(database) if version.has_enhanced_wal_receiver: # PostgreSQL 16+: has written_lsn/flushed_lsn columns return """ SELECT pid, status, receive_start_lsn, receive_start_tli, written_lsn, flushed_lsn, received_tli, last_msg_send_time, last_msg_receipt_time, latest_end_lsn, latest_end_time, slot_name, sender_host, sender_port, conninfo FROM pg_stat_wal_receiver """ else: # PostgreSQL 10-15: no written_lsn/flushed_lsn columns return """ SELECT pid, status, receive_start_lsn, receive_start_tli, NULL::text as written_lsn, NULL::text as flushed_lsn, received_tli, last_msg_send_time, last_msg_receipt_time, latest_end_lsn, latest_end_time, slot_name, sender_host, sender_port, conninfo FROM pg_stat_wal_receiver """ @staticmethod async def get_all_tables_stats_query(include_system: bool = False, database: str = None) -> str: """Get all tables statistics query with version compatibility.""" version = await get_postgresql_version(database) view_name = "pg_stat_all_tables" if include_system else "pg_stat_user_tables" # n_ins_since_vacuum is available from PostgreSQL 13+ if version.has_table_stats_ins_since_vacuum: return f""" SELECT schemaname as schema_name, relname as table_name, seq_scan as sequential_scans, seq_tup_read as seq_tuples_read, idx_scan as index_scans, idx_tup_fetch as idx_tuples_fetched, n_tup_ins as tuples_inserted, n_tup_upd as tuples_updated, n_tup_del as tuples_deleted, n_tup_hot_upd as hot_updates, n_live_tup as estimated_live_tuples, n_dead_tup as estimated_dead_tuples, CASE WHEN n_live_tup > 0 THEN ROUND((n_dead_tup::numeric / n_live_tup) * 100, 2) ELSE 0 END as dead_tuple_ratio_percent, n_mod_since_analyze as modified_since_analyze, n_ins_since_vacuum as inserted_since_vacuum, last_vacuum, last_autovacuum, last_analyze, last_autoanalyze, vacuum_count, autovacuum_count, analyze_count, autoanalyze_count FROM {view_name} ORDER BY seq_scan + COALESCE(idx_scan, 0) DESC, schemaname, relname """ else: # PostgreSQL 12 - without n_ins_since_vacuum return f""" SELECT schemaname as schema_name, relname as table_name, seq_scan as sequential_scans, seq_tup_read as seq_tuples_read, idx_scan as index_scans, idx_tup_fetch as idx_tuples_fetched, n_tup_ins as tuples_inserted, n_tup_upd as tuples_updated, n_tup_del as tuples_deleted, n_tup_hot_upd as hot_updates, n_live_tup as estimated_live_tuples, n_dead_tup as estimated_dead_tuples, CASE WHEN n_live_tup > 0 THEN ROUND((n_dead_tup::numeric / n_live_tup) * 100, 2) ELSE 0 END as dead_tuple_ratio_percent, n_mod_since_analyze as modified_since_analyze, NULL::bigint as inserted_since_vacuum, last_vacuum, last_autovacuum, last_analyze, last_autoanalyze, vacuum_count, autovacuum_count, analyze_count, autoanalyze_count FROM {view_name} ORDER BY seq_scan + COALESCE(idx_scan, 0) DESC, schemaname, relname """ # Utility functions for tool implementations async def execute_version_aware_query(queries_by_version: dict, fallback_query: str, database: str = None): """ Execute the most appropriate query based on PostgreSQL version. Args: queries_by_version: Dict mapping PostgreSQLVersion to query strings fallback_query: Default query if no version matches database: Database to connect to Returns: Query results """ from .functions import execute_query version = await get_postgresql_version(database) query = get_version_appropriate_query(fallback_query, queries_by_version, version) return await execute_query(query, database=database) # Version-aware pg_stat_statements queries async def get_pg_stat_statements_query(database: str = None) -> str: """ Get version-compatible pg_stat_statements query. Args: database: Database to connect to for version detection Returns: SQL query string compatible with the database version """ version = await get_postgresql_version(database) # Common base columns available in all versions base_columns = [ "queryid", "query", "calls", "rows" ] # Add version-specific timing columns if version.has_pg_stat_statements_exec_time: # PostgreSQL 13+: uses total_exec_time, mean_exec_time base_columns.extend([ "total_exec_time", "mean_exec_time", "min_exec_time", "max_exec_time", "stddev_exec_time" ]) else: # PostgreSQL 12: uses total_time, mean_time base_columns.extend([ "total_time as total_exec_time", "mean_time as mean_exec_time", "min_time as min_exec_time", "max_time as max_exec_time", "stddev_time as stddev_exec_time" ]) # Add remaining common columns base_columns.extend([ "shared_blks_hit", "shared_blks_read", "shared_blks_dirtied", "shared_blks_written", "local_blks_hit", "local_blks_read", "local_blks_dirtied", "local_blks_written", "temp_blks_read", "temp_blks_written" ]) columns_str = ",\n ".join(base_columns) return f""" SELECT {columns_str} FROM pg_stat_statements ORDER BY total_exec_time DESC """ # Version-aware pg_stat_monitor queries async def get_pg_stat_monitor_query(database: str = None) -> str: """ Get version-compatible pg_stat_monitor query. Args: database: Database to connect to for version detection Returns: SQL query string compatible with the database version """ version = await get_postgresql_version(database) # Common base columns available in all versions base_columns = [ "query", "calls", "rows" ] # Add version-specific timing columns if version.has_pg_stat_statements_exec_time: # PostgreSQL 13+: uses total_exec_time, mean_exec_time base_columns.extend([ "total_exec_time", "mean_exec_time" ]) else: # PostgreSQL 12: uses total_time, mean_time base_columns.extend([ "total_time as total_exec_time", "mean_time as mean_exec_time" ]) # Add remaining common columns base_columns.extend([ "shared_blks_hit", "shared_blks_read", "client_ip", "bucket_start_time" ]) columns_str = ",\n ".join(base_columns) return f""" SELECT {columns_str} FROM pg_stat_monitor ORDER BY total_exec_time DESC """

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/call518/MCP-PostgreSQL-Ops'

If you have feedback or need assistance with the MCP directory API, please join our Discord server