We provide all the information about MCP servers via our MCP API.
curl -X GET 'https://glama.ai/api/mcp/v1/servers/call518/MCP-PostgreSQL-Ops'
If you have feedback or need assistance with the MCP directory API, please join our Discord server
"""
PostgreSQL Version Compatibility Utilities
Provides version detection and compatibility handling for MCP PostgreSQL tools.
"""
import re
import logging
from typing import Tuple, Optional
from .functions import execute_single_query
logger = logging.getLogger(__name__)
class PostgreSQLVersion:
"""PostgreSQL version information and compatibility utilities."""
def __init__(self, major: int, minor: int = 0, patch: int = 0):
self.major = major
self.minor = minor
self.patch = patch
def __str__(self):
return f"{self.major}.{self.minor}.{self.patch}"
def __ge__(self, other):
if isinstance(other, (int, float)):
return self.major >= other
if isinstance(other, PostgreSQLVersion):
return (self.major, self.minor, self.patch) >= (other.major, other.minor, other.patch)
return False
def __lt__(self, other):
if isinstance(other, (int, float)):
return self.major < other
if isinstance(other, PostgreSQLVersion):
return (self.major, self.minor, self.patch) < (other.major, other.minor, other.patch)
return False
@property
def is_modern(self) -> bool:
"""Check if this is a modern PostgreSQL version (12+)."""
return self.major >= 12
@property
def has_checkpointer_split(self) -> bool:
"""Check if checkpointer stats are in separate view (only PG15)."""
return self.major == 15
@property
def has_pg_stat_io(self) -> bool:
"""Check if pg_stat_io view is available (16+)."""
return self.major >= 16
@property
def has_enhanced_wal_receiver(self) -> bool:
"""Check if pg_stat_wal_receiver has written_lsn/flushed_lsn (16+)."""
return self.major >= 16
@property
def has_replication_slot_stats(self) -> bool:
"""Check if pg_stat_replication_slots is available (14+)."""
return self.major >= 14
@property
def has_parallel_leader_tracking(self) -> bool:
"""Check if pg_stat_activity has leader_pid column (14+)."""
return self.major >= 14
@property
def has_replication_slot_wal_status(self) -> bool:
"""Check if pg_replication_slots has wal_status and safe_wal_size columns (13+)."""
return self.major >= 13
@property
def has_table_stats_ins_since_vacuum(self) -> bool:
"""Check if pg_stat_*_tables has n_ins_since_vacuum column (13+)."""
return self.major >= 13
@property
def has_pg_stat_statements_exec_time(self) -> bool:
"""Check if pg_stat_statements uses total_exec_time and mean_exec_time columns (13+)."""
return self.major >= 13
# Global version cache
_cached_version: Optional[PostgreSQLVersion] = None
async def get_postgresql_version(database: str = None, force_refresh: bool = False) -> PostgreSQLVersion:
"""
Get PostgreSQL server version with caching.
Args:
database: Database to connect to
force_refresh: Force refresh cached version
Returns:
PostgreSQLVersion object
"""
global _cached_version
if _cached_version is not None and not force_refresh:
return _cached_version
try:
result = await execute_single_query("SELECT version()", database=database)
version_string = result.get('version', '')
# Parse version string like "PostgreSQL 16.1 on x86_64-pc-linux-gnu..."
version_match = re.search(r'PostgreSQL\s+(\d+)\.?(\d*)\.?(\d*)', version_string)
if version_match:
major = int(version_match.group(1))
minor = int(version_match.group(2) or 0)
patch = int(version_match.group(3) or 0)
_cached_version = PostgreSQLVersion(major, minor, patch)
logger.info(f"Detected PostgreSQL version: {_cached_version}")
return _cached_version
else:
logger.warning(f"Could not parse version string: {version_string}")
# Default to PostgreSQL 17 if parsing fails
_cached_version = PostgreSQLVersion(17, 0, 0)
return _cached_version
except Exception as e:
logger.error(f"Failed to get PostgreSQL version: {e}")
# Default to PostgreSQL 17 if version detection fails
_cached_version = PostgreSQLVersion(17, 0, 0)
return _cached_version
async def check_feature_availability(feature: str, database: str = None) -> bool:
"""
Check if a specific PostgreSQL feature is available.
Args:
feature: Feature name to check
database: Database to connect to
Returns:
True if feature is available
"""
version = await get_postgresql_version(database)
feature_requirements = {
'pg_stat_io': version.has_pg_stat_io,
'checkpointer_split': version.has_checkpointer_split,
'enhanced_wal_receiver': version.has_enhanced_wal_receiver,
'replication_slot_stats': version.has_replication_slot_stats,
'parallel_leader_tracking': version.has_parallel_leader_tracking,
}
return feature_requirements.get(feature, False)
async def get_compatible_column_list(table_name: str,
all_columns: list,
version_specific_columns: dict,
database: str = None) -> str:
"""
Generate version-compatible column list for SELECT queries.
Args:
table_name: PostgreSQL table/view name
all_columns: List of all possible columns
version_specific_columns: Dict mapping version requirements to columns
database: Database to connect to
Returns:
Comma-separated column list for SQL SELECT
"""
version = await get_postgresql_version(database)
available_columns = []
for col in all_columns:
# Check if column has version requirements
required_version = version_specific_columns.get(col)
if required_version is None:
# No version requirement - always available
available_columns.append(col)
elif version >= required_version:
# Version requirement met
available_columns.append(col)
else:
# Version requirement not met - provide NULL placeholder
available_columns.append(f"NULL::text AS {col}")
return ", ".join(available_columns)
def get_version_appropriate_query(base_query: str,
version_variants: dict,
version: PostgreSQLVersion) -> str:
"""
Select version-appropriate query variant.
Args:
base_query: Default/fallback query
version_variants: Dict mapping version requirements to query variants
version: PostgreSQL version
Returns:
Most appropriate query for the version
"""
# Sort version variants by version (highest first)
sorted_variants = sorted(version_variants.items(),
key=lambda x: (x[0].major, x[0].minor),
reverse=True)
for required_version, query in sorted_variants:
if version >= required_version:
return query
return base_query
# Version-specific query builders
class VersionAwareQueries:
"""Collection of version-aware query builders."""
@staticmethod
async def get_bgwriter_checkpointer_stats(database: str = None) -> str:
"""Get background writer/checkpointer stats with version compatibility."""
version = await get_postgresql_version(database)
if version.has_checkpointer_split:
# PostgreSQL 15+: Use separate checkpointer view
return """
SELECT 'checkpointer' as component,
num_timed, num_requested, restartpoints_timed, restartpoints_req, restartpoints_done,
write_time, sync_time, buffers_written, stats_reset
FROM pg_stat_checkpointer
UNION ALL
SELECT 'bgwriter' as component,
NULL::bigint as num_timed, NULL::bigint as num_requested,
NULL::bigint as restartpoints_timed, NULL::bigint as restartpoints_req,
NULL::bigint as restartpoints_done, NULL::double precision as write_time,
NULL::double precision as sync_time,
buffers_clean as buffers_written, stats_reset
FROM pg_stat_bgwriter
"""
else:
# PostgreSQL 10-14: All stats in bgwriter view
return """
SELECT 'bgwriter_legacy' as component,
buffers_clean, maxwritten_clean, buffers_alloc, stats_reset,
NULL::bigint as num_timed, NULL::bigint as num_requested
FROM pg_stat_bgwriter
"""
@staticmethod
async def get_io_statistics(database: str = None) -> str:
"""Get I/O statistics with version compatibility."""
version = await get_postgresql_version(database)
if version.has_pg_stat_io:
# PostgreSQL 16+: Use comprehensive pg_stat_io
return """
SELECT backend_type, object, context,
reads, read_time, writes, write_time,
extends, extend_time, hits, evictions,
reuses, fsyncs, fsync_time
FROM pg_stat_io
WHERE reads > 0 OR writes > 0 OR hits > 0
"""
else:
# PostgreSQL 10-15: Fall back to pg_statio_* views
return """
SELECT 'client backend' as backend_type,
'relation' as object,
'normal' as context,
heap_blks_read as reads,
0::double precision as read_time,
0::bigint as writes,
0::double precision as write_time,
0::bigint as extends,
0::double precision as extend_time,
heap_blks_hit as hits,
0::bigint as evictions,
0::bigint as reuses,
0::bigint as fsyncs,
0::double precision as fsync_time
FROM pg_statio_all_tables
WHERE heap_blks_read > 0 OR heap_blks_hit > 0
"""
@staticmethod
async def get_activity_with_leader_info(database: str = None) -> str:
"""Get activity info with parallel leader tracking if available."""
version = await get_postgresql_version(database)
base_columns = [
"pid", "datname", "usename", "application_name",
"client_addr", "state", "query_start", "query"
]
version_columns = {
"leader_pid": PostgreSQLVersion(14),
"query_id": PostgreSQLVersion(13)
}
columns = await get_compatible_column_list(
"pg_stat_activity",
base_columns + list(version_columns.keys()),
version_columns,
database
)
return f"SELECT {columns} FROM pg_stat_activity WHERE state = 'active'"
@staticmethod
async def get_replication_slots_query(database: str = None) -> str:
"""Get replication slots info with version compatibility."""
version = await get_postgresql_version(database)
base_columns = [
"slot_name", "plugin", "slot_type", "datoid", "temporary",
"active", "active_pid", "restart_lsn", "confirmed_flush_lsn"
]
# wal_status and safe_wal_size are available from PostgreSQL 13+
if version.has_replication_slot_wal_status:
return """
SELECT
slot_name,
plugin,
slot_type,
datoid,
temporary,
active,
active_pid,
restart_lsn,
confirmed_flush_lsn,
wal_status,
safe_wal_size / 1024 / 1024 as safe_wal_size_mb
FROM pg_replication_slots
ORDER BY slot_name
"""
else:
# PostgreSQL 12 and older - without wal_status and safe_wal_size
return """
SELECT
slot_name,
plugin,
slot_type,
datoid,
temporary,
active,
active_pid,
restart_lsn,
confirmed_flush_lsn,
NULL::text as wal_status,
NULL::numeric as safe_wal_size_mb
FROM pg_replication_slots
ORDER BY slot_name
"""
@staticmethod
async def get_wal_receiver_query(database: str = None) -> str:
"""Get WAL receiver status with version compatibility."""
version = await get_postgresql_version(database)
if version.has_enhanced_wal_receiver:
# PostgreSQL 16+: has written_lsn/flushed_lsn columns
return """
SELECT
pid,
status,
receive_start_lsn,
receive_start_tli,
written_lsn,
flushed_lsn,
received_tli,
last_msg_send_time,
last_msg_receipt_time,
latest_end_lsn,
latest_end_time,
slot_name,
sender_host,
sender_port,
conninfo
FROM pg_stat_wal_receiver
"""
else:
# PostgreSQL 10-15: no written_lsn/flushed_lsn columns
return """
SELECT
pid,
status,
receive_start_lsn,
receive_start_tli,
NULL::text as written_lsn,
NULL::text as flushed_lsn,
received_tli,
last_msg_send_time,
last_msg_receipt_time,
latest_end_lsn,
latest_end_time,
slot_name,
sender_host,
sender_port,
conninfo
FROM pg_stat_wal_receiver
"""
@staticmethod
async def get_all_tables_stats_query(include_system: bool = False, database: str = None) -> str:
"""Get all tables statistics query with version compatibility."""
version = await get_postgresql_version(database)
view_name = "pg_stat_all_tables" if include_system else "pg_stat_user_tables"
# n_ins_since_vacuum is available from PostgreSQL 13+
if version.has_table_stats_ins_since_vacuum:
return f"""
SELECT
schemaname as schema_name,
relname as table_name,
seq_scan as sequential_scans,
seq_tup_read as seq_tuples_read,
idx_scan as index_scans,
idx_tup_fetch as idx_tuples_fetched,
n_tup_ins as tuples_inserted,
n_tup_upd as tuples_updated,
n_tup_del as tuples_deleted,
n_tup_hot_upd as hot_updates,
n_live_tup as estimated_live_tuples,
n_dead_tup as estimated_dead_tuples,
CASE
WHEN n_live_tup > 0 THEN
ROUND((n_dead_tup::numeric / n_live_tup) * 100, 2)
ELSE 0
END as dead_tuple_ratio_percent,
n_mod_since_analyze as modified_since_analyze,
n_ins_since_vacuum as inserted_since_vacuum,
last_vacuum,
last_autovacuum,
last_analyze,
last_autoanalyze,
vacuum_count,
autovacuum_count,
analyze_count,
autoanalyze_count
FROM {view_name}
ORDER BY seq_scan + COALESCE(idx_scan, 0) DESC, schemaname, relname
"""
else:
# PostgreSQL 12 - without n_ins_since_vacuum
return f"""
SELECT
schemaname as schema_name,
relname as table_name,
seq_scan as sequential_scans,
seq_tup_read as seq_tuples_read,
idx_scan as index_scans,
idx_tup_fetch as idx_tuples_fetched,
n_tup_ins as tuples_inserted,
n_tup_upd as tuples_updated,
n_tup_del as tuples_deleted,
n_tup_hot_upd as hot_updates,
n_live_tup as estimated_live_tuples,
n_dead_tup as estimated_dead_tuples,
CASE
WHEN n_live_tup > 0 THEN
ROUND((n_dead_tup::numeric / n_live_tup) * 100, 2)
ELSE 0
END as dead_tuple_ratio_percent,
n_mod_since_analyze as modified_since_analyze,
NULL::bigint as inserted_since_vacuum,
last_vacuum,
last_autovacuum,
last_analyze,
last_autoanalyze,
vacuum_count,
autovacuum_count,
analyze_count,
autoanalyze_count
FROM {view_name}
ORDER BY seq_scan + COALESCE(idx_scan, 0) DESC, schemaname, relname
"""
# Utility functions for tool implementations
async def execute_version_aware_query(queries_by_version: dict,
fallback_query: str,
database: str = None):
"""
Execute the most appropriate query based on PostgreSQL version.
Args:
queries_by_version: Dict mapping PostgreSQLVersion to query strings
fallback_query: Default query if no version matches
database: Database to connect to
Returns:
Query results
"""
from .functions import execute_query
version = await get_postgresql_version(database)
query = get_version_appropriate_query(fallback_query, queries_by_version, version)
return await execute_query(query, database=database)
# Version-aware pg_stat_statements queries
async def get_pg_stat_statements_query(database: str = None) -> str:
"""
Get version-compatible pg_stat_statements query.
Args:
database: Database to connect to for version detection
Returns:
SQL query string compatible with the database version
"""
version = await get_postgresql_version(database)
# Common base columns available in all versions
base_columns = [
"queryid", "query", "calls", "rows"
]
# Add version-specific timing columns
if version.has_pg_stat_statements_exec_time:
# PostgreSQL 13+: uses total_exec_time, mean_exec_time
base_columns.extend([
"total_exec_time", "mean_exec_time", "min_exec_time", "max_exec_time", "stddev_exec_time"
])
else:
# PostgreSQL 12: uses total_time, mean_time
base_columns.extend([
"total_time as total_exec_time", "mean_time as mean_exec_time",
"min_time as min_exec_time", "max_time as max_exec_time",
"stddev_time as stddev_exec_time"
])
# Add remaining common columns
base_columns.extend([
"shared_blks_hit", "shared_blks_read", "shared_blks_dirtied",
"shared_blks_written", "local_blks_hit", "local_blks_read",
"local_blks_dirtied", "local_blks_written", "temp_blks_read", "temp_blks_written"
])
columns_str = ",\n ".join(base_columns)
return f"""
SELECT
{columns_str}
FROM pg_stat_statements
ORDER BY total_exec_time DESC
"""
# Version-aware pg_stat_monitor queries
async def get_pg_stat_monitor_query(database: str = None) -> str:
"""
Get version-compatible pg_stat_monitor query.
Args:
database: Database to connect to for version detection
Returns:
SQL query string compatible with the database version
"""
version = await get_postgresql_version(database)
# Common base columns available in all versions
base_columns = [
"query", "calls", "rows"
]
# Add version-specific timing columns
if version.has_pg_stat_statements_exec_time:
# PostgreSQL 13+: uses total_exec_time, mean_exec_time
base_columns.extend([
"total_exec_time", "mean_exec_time"
])
else:
# PostgreSQL 12: uses total_time, mean_time
base_columns.extend([
"total_time as total_exec_time", "mean_time as mean_exec_time"
])
# Add remaining common columns
base_columns.extend([
"shared_blks_hit", "shared_blks_read", "client_ip", "bucket_start_time"
])
columns_str = ",\n ".join(base_columns)
return f"""
SELECT
{columns_str}
FROM pg_stat_monitor
ORDER BY total_exec_time DESC
"""