Skip to main content
Glama
isdaniel

PostgreSQL-Performance-Tuner-Mcp

tools_vacuum.py19.9 kB
"""Vacuum progress monitoring tool handlers.""" from __future__ import annotations from collections.abc import Sequence from datetime import datetime, timezone from typing import Any from mcp.types import TextContent, Tool from ..services import SqlDriver, get_user_filter from .toolhandler import ToolHandler class VacuumProgressToolHandler(ToolHandler): """Tool handler for monitoring vacuum operations progress.""" name = "monitor_vacuum_progress" title = "Vacuum Progress Monitor" read_only_hint = True destructive_hint = False idempotent_hint = True open_world_hint = False description = """Track long-running VACUUM operations in PostgreSQL. This tool monitors the progress of: - Manual VACUUM operations - VACUUM FULL operations - Autovacuum processes Shows: - Current phase of the vacuum operation - Progress percentage (heap scanned, indexes vacuumed) - Dead tuples collected - Index vacuum rounds - Estimated time remaining Also provides information about: - Autovacuum configuration - Tables that need vacuuming - Recent vacuum activity Useful for: - Monitoring long-running maintenance operations - Identifying tables with vacuum backlog - Troubleshooting autovacuum issues""" def __init__(self, sql_driver: SqlDriver): self.sql_driver = sql_driver def get_tool_definition(self) -> Tool: return Tool( name=self.name, description=self.description, inputSchema={ "type": "object", "properties": { "action": { "type": "string", "description": "Action to perform", "enum": [ "progress", # Show current vacuum progress "needs_vacuum", # List tables that need vacuuming "autovacuum_status", # Show autovacuum configuration and status "recent_activity" # Show recent vacuum activity ], "default": "needs_vacuum" }, "schema_name": { "type": "string", "description": "Schema to filter by (default: all schemas)", "default": "public" }, "include_toast": { "type": "boolean", "description": "Include TOAST tables in results", "default": False }, "min_dead_tuples": { "type": "integer", "description": "Minimum dead tuples for needs_vacuum action", "default": 1000 } }, "required": [] }, annotations=self.get_annotations() ) async def run_tool(self, arguments: dict[str, Any]) -> Sequence[TextContent]: try: action = arguments.get("action", "progress") if action == "progress": return await self._get_vacuum_progress(arguments) elif action == "needs_vacuum": return await self._get_tables_needing_vacuum(arguments) elif action == "autovacuum_status": return await self._get_autovacuum_status(arguments) elif action == "recent_activity": return await self._get_recent_vacuum_activity(arguments) else: return self.format_result(f"Unknown action: {action}") except Exception as e: return self.format_error(e) async def _get_vacuum_progress(self, arguments: dict[str, Any]) -> Sequence[TextContent]: """Get current vacuum operation progress.""" include_toast = arguments.get("include_toast", False) # Get user filter user_filter = get_user_filter() activity_filter = user_filter.get_activity_filter() toast_filter = "" if include_toast else "AND c.relname NOT LIKE 'pg_toast%%'" # Query pg_stat_progress_vacuum for vacuum progress progress_query = f""" SELECT p.pid, p.datname as database, n.nspname as schema_name, c.relname as table_name, p.phase, p.heap_blks_total, p.heap_blks_scanned, p.heap_blks_vacuumed, p.index_vacuum_count, p.max_dead_tuples, p.num_dead_tuples, CASE WHEN p.heap_blks_total > 0 THEN ROUND(100.0 * p.heap_blks_scanned / p.heap_blks_total, 2) ELSE 0 END as scan_progress_pct, CASE WHEN p.heap_blks_total > 0 THEN ROUND(100.0 * p.heap_blks_vacuumed / p.heap_blks_total, 2) ELSE 0 END as vacuum_progress_pct, a.query, a.state, EXTRACT(epoch FROM now() - a.xact_start)::integer as duration_seconds, a.wait_event_type, a.wait_event FROM pg_stat_progress_vacuum p JOIN pg_class c ON c.oid = p.relid JOIN pg_namespace n ON n.oid = c.relnamespace LEFT JOIN pg_stat_activity a ON a.pid = p.pid WHERE n.nspname NOT IN ('pg_catalog', 'information_schema') {toast_filter} {activity_filter.replace('usesysid', 'a.usesysid') if activity_filter else ''} ORDER BY p.heap_blks_total DESC """ progress = await self.sql_driver.execute_query(progress_query) # Also check for VACUUM FULL operations (uses pg_stat_progress_cluster) cluster_query = f""" SELECT p.pid, p.datname as database, n.nspname as schema_name, c.relname as table_name, p.phase, p.heap_blks_total, p.heap_blks_scanned, p.heap_tuples_scanned, p.heap_tuples_written, CASE WHEN p.heap_blks_total > 0 THEN ROUND(100.0 * p.heap_blks_scanned / p.heap_blks_total, 2) ELSE 0 END as progress_pct, a.query, EXTRACT(epoch FROM now() - a.xact_start)::integer as duration_seconds FROM pg_stat_progress_cluster p JOIN pg_class c ON c.oid = p.relid JOIN pg_namespace n ON n.oid = c.relnamespace LEFT JOIN pg_stat_activity a ON a.pid = p.pid WHERE p.command = 'VACUUM FULL' AND n.nspname NOT IN ('pg_catalog', 'information_schema') {toast_filter} {activity_filter.replace('usesysid', 'a.usesysid') if activity_filter else ''} """ vacuum_full = await self.sql_driver.execute_query(cluster_query) # Check for active autovacuum workers autovacuum_query = f""" SELECT pid, datname as database, query, state, EXTRACT(epoch FROM now() - xact_start)::integer as duration_seconds, wait_event_type, wait_event FROM pg_stat_activity WHERE backend_type = 'autovacuum worker' {activity_filter} """ autovacuum_workers = await self.sql_driver.execute_query(autovacuum_query) output = { "timestamp": datetime.now(timezone.utc).isoformat(), "vacuum_operations": progress if progress else [], "vacuum_full_operations": vacuum_full if vacuum_full else [], "autovacuum_workers": autovacuum_workers if autovacuum_workers else [], "summary": { "active_vacuum_count": len(progress) if progress else 0, "active_vacuum_full_count": len(vacuum_full) if vacuum_full else 0, "active_autovacuum_workers": len(autovacuum_workers) if autovacuum_workers else 0 } } if not progress and not vacuum_full and not autovacuum_workers: output["message"] = "No vacuum operations currently in progress" return self.format_json_result(output) async def _get_tables_needing_vacuum(self, arguments: dict[str, Any]) -> Sequence[TextContent]: """Get tables that need vacuuming based on dead tuple count.""" schema_name = arguments.get("schema_name") min_dead_tuples = arguments.get("min_dead_tuples", 1000) include_toast = arguments.get("include_toast", False) toast_filter = "" if include_toast else "AND c.relname NOT LIKE 'pg_toast%%'" if schema_name: schema_filter = "AND n.nspname = %s" params: list[Any] = [min_dead_tuples, schema_name] else: schema_filter = "" params = [min_dead_tuples] query = f""" SELECT n.nspname as schema_name, c.relname as table_name, s.n_live_tup, s.n_dead_tup, CASE WHEN s.n_live_tup > 0 THEN ROUND(100.0 * s.n_dead_tup / s.n_live_tup, 2) ELSE 0 END as dead_tuple_ratio, s.last_vacuum, s.last_autovacuum, s.vacuum_count, s.autovacuum_count, pg_size_pretty(pg_table_size(c.oid)) as table_size, pg_table_size(c.oid) as table_size_bytes, -- Calculate if autovacuum threshold is exceeded COALESCE( (SELECT setting::integer FROM pg_settings WHERE name = 'autovacuum_vacuum_threshold'), 50 ) + COALESCE( (SELECT setting::float FROM pg_settings WHERE name = 'autovacuum_vacuum_scale_factor'), 0.2 ) * s.n_live_tup as autovacuum_threshold, CASE WHEN s.n_dead_tup > ( COALESCE((SELECT setting::integer FROM pg_settings WHERE name = 'autovacuum_vacuum_threshold'), 50) + COALESCE((SELECT setting::float FROM pg_settings WHERE name = 'autovacuum_vacuum_scale_factor'), 0.2) * s.n_live_tup ) THEN true ELSE false END as exceeds_threshold FROM pg_class c JOIN pg_namespace n ON n.oid = c.relnamespace LEFT JOIN pg_stat_user_tables s ON s.relid = c.oid WHERE c.relkind = 'r' AND n.nspname NOT IN ('pg_catalog', 'information_schema', 'pg_toast') AND s.n_dead_tup >= %s {schema_filter} {toast_filter} ORDER BY s.n_dead_tup DESC LIMIT 50 """ results = await self.sql_driver.execute_query(query, params) # Get tables with transaction ID wraparound risk wraparound_query = """ SELECT n.nspname as schema_name, c.relname as table_name, age(c.relfrozenxid) as xid_age, pg_size_pretty(pg_table_size(c.oid)) as table_size, ROUND(100.0 * age(c.relfrozenxid) / 2147483647, 2) as pct_towards_wraparound FROM pg_class c JOIN pg_namespace n ON n.oid = c.relnamespace WHERE c.relkind = 'r' AND n.nspname NOT IN ('pg_catalog', 'information_schema', 'pg_toast') AND age(c.relfrozenxid) > 100000000 -- > 100 million transactions ORDER BY age(c.relfrozenxid) DESC LIMIT 20 """ wraparound_results = await self.sql_driver.execute_query(wraparound_query) output = { "tables_needing_vacuum": results if results else [], "tables_with_wraparound_risk": wraparound_results if wraparound_results else [], "summary": { "tables_with_dead_tuples": len(results) if results else 0, "tables_exceeding_threshold": sum(1 for r in results if r.get("exceeds_threshold")) if results else 0, "tables_with_wraparound_risk": len(wraparound_results) if wraparound_results else 0 }, "recommendations": [] } # Generate recommendations if results: high_dead_ratio = [r for r in results if (r.get("dead_tuple_ratio") or 0) > 20] if high_dead_ratio: output["recommendations"].append({ "severity": "high", "message": f"{len(high_dead_ratio)} tables have > 20% dead tuple ratio", "action": "Consider running VACUUM on these tables" }) if wraparound_results: critical = [r for r in wraparound_results if (r.get("pct_towards_wraparound") or 0) > 50] if critical: output["recommendations"].append({ "severity": "critical", "message": f"{len(critical)} tables are > 50% towards transaction wraparound", "action": "Urgently run VACUUM FREEZE on these tables" }) return self.format_json_result(output) async def _get_autovacuum_status(self, arguments: dict[str, Any]) -> Sequence[TextContent]: """Get autovacuum configuration and status.""" # Get autovacuum settings settings_query = """ SELECT name, setting, unit, short_desc FROM pg_settings WHERE name LIKE 'autovacuum%' ORDER BY name """ settings = await self.sql_driver.execute_query(settings_query) # Get user filter user_filter = get_user_filter() activity_filter = user_filter.get_activity_filter() # Get current autovacuum workers workers_query = f""" SELECT pid, datname as database, SUBSTRING(query FROM 'autovacuum: (.*)') as operation, state, EXTRACT(epoch FROM now() - xact_start)::integer as duration_seconds, wait_event_type, wait_event FROM pg_stat_activity WHERE backend_type = 'autovacuum worker' {activity_filter} """ workers = await self.sql_driver.execute_query(workers_query) # Get autovacuum launcher status launcher_query = f""" SELECT pid, state, EXTRACT(epoch FROM now() - backend_start)::integer as uptime_seconds FROM pg_stat_activity WHERE backend_type = 'autovacuum launcher' {activity_filter} """ launcher = await self.sql_driver.execute_query(launcher_query) # Calculate effective settings max_workers = 3 # default naptime = 60 # default (seconds) for s in settings or []: if s["name"] == "autovacuum_max_workers": max_workers = int(s["setting"]) elif s["name"] == "autovacuum_naptime": naptime = int(s["setting"]) output = { "autovacuum_enabled": any(s["setting"] == "on" for s in settings if s["name"] == "autovacuum") if settings else True, "settings": settings if settings else [], "launcher": launcher[0] if launcher else None, "active_workers": workers if workers else [], "summary": { "max_workers": max_workers, "active_worker_count": len(workers) if workers else 0, "naptime_seconds": naptime, "workers_available": max_workers - (len(workers) if workers else 0) } } # Add warnings if output["summary"]["workers_available"] == 0: output["warning"] = "All autovacuum workers are busy - consider increasing autovacuum_max_workers" return self.format_json_result(output) async def _get_recent_vacuum_activity(self, arguments: dict[str, Any]) -> Sequence[TextContent]: """Get recent vacuum activity from statistics.""" schema_name = arguments.get("schema_name") include_toast = arguments.get("include_toast", False) toast_filter = "" if include_toast else "AND c.relname NOT LIKE 'pg_toast%%'" schema_filter = "AND n.nspname = %s" if schema_name else "" query = f""" SELECT n.nspname as schema_name, c.relname as table_name, s.last_vacuum, s.last_autovacuum, s.last_analyze, s.last_autoanalyze, s.vacuum_count, s.autovacuum_count, s.analyze_count, s.autoanalyze_count, s.n_live_tup, s.n_dead_tup, s.n_mod_since_analyze, pg_size_pretty(pg_table_size(c.oid)) as table_size, GREATEST(s.last_vacuum, s.last_autovacuum) as last_vacuumed, CASE WHEN GREATEST(s.last_vacuum, s.last_autovacuum) IS NULL THEN 'never' WHEN GREATEST(s.last_vacuum, s.last_autovacuum) < now() - interval '7 days' THEN 'stale' WHEN GREATEST(s.last_vacuum, s.last_autovacuum) < now() - interval '1 day' THEN 'recent' ELSE 'fresh' END as vacuum_status FROM pg_class c JOIN pg_namespace n ON n.oid = c.relnamespace LEFT JOIN pg_stat_user_tables s ON s.relid = c.oid WHERE c.relkind = 'r' AND n.nspname NOT IN ('pg_catalog', 'information_schema', 'pg_toast') {schema_filter} {toast_filter} ORDER BY GREATEST(s.last_vacuum, s.last_autovacuum) DESC NULLS LAST LIMIT 50 """ # Build parameters list params: list[Any] = [] if schema_name: params.append(schema_name) results = await self.sql_driver.execute_query(query, params if params else None) if not results: return self.format_result(f"No tables found in schema: {schema_name or 'all'}") # Categorize tables never_vacuumed = [r for r in results if r.get("vacuum_status") == "never"] stale = [r for r in results if r.get("vacuum_status") == "stale"] recent = [r for r in results if r.get("vacuum_status") == "recent"] fresh = [r for r in results if r.get("vacuum_status") == "fresh"] output = { "recent_activity": results, "summary": { "total_tables": len(results), "never_vacuumed": len(never_vacuumed), "stale_vacuum": len(stale), "recent_vacuum": len(recent), "fresh_vacuum": len(fresh) }, "recommendations": [] } if never_vacuumed: output["recommendations"].append({ "severity": "warning", "message": f"{len(never_vacuumed)} tables have never been vacuumed", "tables": [t["table_name"] for t in never_vacuumed[:5]] }) if stale: output["recommendations"].append({ "severity": "info", "message": f"{len(stale)} tables haven't been vacuumed in over 7 days", "tables": [t["table_name"] for t in stale[:5]] }) return self.format_json_result(output)

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/isdaniel/pgtuner-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server