Skip to main content
Glama
vacuum_analysis.py28.9 kB
"""Vacuum analysis tool for comprehensive database maintenance recommendations. Extended from the original postgres-mcp project: https://github.com/crystaldba/postgres-mcp """ from __future__ import annotations import logging from typing import Any logger = logging.getLogger(__name__) class VacuumAnalysisTool: """Tool for comprehensive vacuum analysis and maintenance recommendations.""" def __init__(self, sql_driver): self.sql_driver = sql_driver async def analyze_vacuum_requirements(self) -> str: """Perform comprehensive vacuum analysis and return structured results.""" try: logger.info("Starting comprehensive vacuum analysis...") analysis_results = { "summary": {}, "bloat_analysis": {}, "autovacuum_analysis": {}, "vacuum_performance": {}, "maintenance_recommendations": [], "critical_issues": [], "configuration_recommendations": [], } # Get basic statistics await self._get_vacuum_summary(analysis_results) # Analyze table bloat await self._analyze_table_bloat(analysis_results) # Analyze autovacuum configuration await self._analyze_autovacuum_config(analysis_results) # Analyze vacuum performance await self._analyze_vacuum_performance(analysis_results) # Generate maintenance recommendations await self._generate_maintenance_recommendations(analysis_results) # Check for critical issues await self._identify_critical_issues(analysis_results) # Generate configuration recommendations await self._generate_configuration_recommendations(analysis_results) logger.info("Vacuum analysis completed successfully") return self._format_as_text(analysis_results) except Exception as e: logger.error(f"Error in vacuum analysis: {e}") error_result = { "error": f"Vacuum analysis failed: {e!s}", "summary": {}, "bloat_analysis": {}, "autovacuum_analysis": {}, "vacuum_performance": {}, "maintenance_recommendations": [], "critical_issues": [], "configuration_recommendations": [], } return self._format_as_text(error_result) async def _get_vacuum_summary(self, analysis_results: dict[str, Any]) -> None: """Get high-level vacuum summary statistics.""" query = """ SELECT COUNT(*) as total_tables, COUNT(CASE WHEN last_vacuum IS NOT NULL OR last_autovacuum IS NOT NULL THEN 1 END) as tables_with_vacuum_history, COUNT(CASE WHEN last_vacuum IS NULL AND last_autovacuum IS NULL THEN 1 END) as tables_never_vacuumed, COUNT(CASE WHEN n_dead_tup > 0 THEN 1 END) as tables_with_dead_tuples, SUM(n_dead_tup) as total_dead_tuples, SUM(n_live_tup) as total_live_tuples, AVG(CASE WHEN n_live_tup + n_dead_tup > 0 THEN n_dead_tup::float / (n_live_tup + n_dead_tup) * 100 ELSE 0 END) as avg_dead_tuple_percentage, COUNT(CASE WHEN EXTRACT(EPOCH FROM (NOW() - COALESCE(last_autovacuum, last_vacuum))) > 86400 THEN 1 END) as tables_not_vacuumed_24h FROM pg_stat_user_tables """ rows = await self.sql_driver.execute_query(query) if rows and rows[0]: data = rows[0].cells analysis_results["summary"] = { "total_tables": data["total_tables"], "tables_with_vacuum_history": data["tables_with_vacuum_history"], "tables_never_vacuumed": data["tables_never_vacuumed"], "tables_with_dead_tuples": data["tables_with_dead_tuples"], "total_dead_tuples": data["total_dead_tuples"], "total_live_tuples": data["total_live_tuples"], "avg_dead_tuple_percentage": round(data["avg_dead_tuple_percentage"] or 0.0, 2), "tables_not_vacuumed_24h": data["tables_not_vacuumed_24h"], } async def _analyze_table_bloat(self, analysis_results: dict[str, Any]) -> None: """Analyze table bloat and identify problematic tables.""" query = """ WITH bloat_analysis AS ( SELECT schemaname, relname, n_live_tup, n_dead_tup, CASE WHEN n_live_tup + n_dead_tup > 0 THEN (n_dead_tup::float / (n_live_tup + n_dead_tup)) * 100 ELSE 0 END as dead_percentage, pg_total_relation_size(schemaname||'.'||relname) as total_size_bytes, CASE WHEN n_live_tup > 0 THEN pg_total_relation_size(schemaname||'.'||relname) / n_live_tup ELSE 0 END as bytes_per_tuple, last_vacuum, last_autovacuum, vacuum_count, autovacuum_count FROM pg_stat_user_tables WHERE n_live_tup + n_dead_tup > 0 ) SELECT *, CASE WHEN dead_percentage > 40 THEN 'CRITICAL' WHEN dead_percentage > 20 THEN 'HIGH' WHEN dead_percentage > 10 THEN 'MEDIUM' WHEN dead_percentage > 5 THEN 'LOW' ELSE 'HEALTHY' END as bloat_severity, CASE WHEN dead_percentage > 40 AND total_size_bytes > 100*1024*1024 THEN 'VACUUM FULL required' WHEN dead_percentage > 20 THEN 'VACUUM required' WHEN dead_percentage > 10 THEN 'VACUUM recommended' ELSE 'No immediate action needed' END as recommendation FROM bloat_analysis ORDER BY dead_percentage DESC, total_size_bytes DESC LIMIT 50 """ rows = await self.sql_driver.execute_query(query) bloat_tables = [] severity_counts = {"CRITICAL": 0, "HIGH": 0, "MEDIUM": 0, "LOW": 0, "HEALTHY": 0} if rows: for row in rows: data = row.cells bloat_tables.append( { "qualified_name": f"{data['schemaname']}.{data['relname']}", "dead_percentage": round(data["dead_percentage"] or 0.0, 2), "dead_tuples": data["n_dead_tup"], "live_tuples": data["n_live_tup"], "total_size_mb": round(data["total_size_bytes"] / (1024 * 1024), 2), "bytes_per_tuple": data["bytes_per_tuple"], "last_vacuum": data["last_vacuum"], "last_autovacuum": data["last_autovacuum"], "vacuum_count": data["vacuum_count"], "autovacuum_count": data["autovacuum_count"], "bloat_severity": data["bloat_severity"], "recommendation": data["recommendation"], } ) severity_counts[data["bloat_severity"]] += 1 analysis_results["bloat_analysis"] = { "tables": bloat_tables, "severity_distribution": severity_counts, "high_priority_tables": [t for t in bloat_tables if t["bloat_severity"] in ["CRITICAL", "HIGH"]], } async def _analyze_autovacuum_config(self, analysis_results: dict[str, Any]) -> None: """Analyze autovacuum configuration and effectiveness.""" query = """ WITH autovacuum_settings AS ( SELECT schemaname, relname, n_dead_tup, n_live_tup, COALESCE( (SELECT setting FROM pg_settings WHERE name = 'autovacuum_vacuum_threshold'), '50' )::int as global_vacuum_threshold, COALESCE( (SELECT setting FROM pg_settings WHERE name = 'autovacuum_vacuum_scale_factor'), '0.2' )::float as global_vacuum_scale_factor, last_autovacuum, autovacuum_count, n_tup_ins + n_tup_upd + n_tup_del as total_modifications, EXTRACT(EPOCH FROM (NOW() - COALESCE(last_autovacuum, '1970-01-01'::timestamp))) / 3600 as hours_since_last_autovacuum FROM pg_stat_user_tables WHERE n_live_tup + n_dead_tup > 0 ) SELECT *, global_vacuum_threshold + (global_vacuum_scale_factor * n_live_tup) as calculated_threshold, CASE WHEN n_dead_tup > (global_vacuum_threshold + (global_vacuum_scale_factor * n_live_tup)) THEN 'OVERDUE' WHEN n_dead_tup > (global_vacuum_threshold + (global_vacuum_scale_factor * n_live_tup)) * 0.8 THEN 'APPROACHING' ELSE 'HEALTHY' END as autovacuum_status, CASE WHEN autovacuum_count > 0 AND total_modifications > 0 THEN total_modifications / autovacuum_count ELSE 0 END as avg_modifications_per_autovacuum FROM autovacuum_settings ORDER BY n_dead_tup DESC, hours_since_last_autovacuum DESC LIMIT 30 """ rows = await self.sql_driver.execute_query(query) autovacuum_tables = [] status_counts = {"OVERDUE": 0, "APPROACHING": 0, "HEALTHY": 0} if rows: for row in rows: data = row.cells autovacuum_tables.append( { "qualified_name": f"{data['schemaname']}.{data['relname']}", "dead_tuples": data["n_dead_tup"], "live_tuples": data["n_live_tup"], "calculated_threshold": round(data["calculated_threshold"] or 0.0, 0), "autovacuum_status": data["autovacuum_status"], "hours_since_last_autovacuum": round(data["hours_since_last_autovacuum"] or 0.0, 1), "autovacuum_count": data["autovacuum_count"], "avg_modifications_per_autovacuum": round(data["avg_modifications_per_autovacuum"] or 0.0, 0), } ) status_counts[data["autovacuum_status"]] += 1 analysis_results["autovacuum_analysis"] = { "tables": autovacuum_tables, "status_distribution": status_counts, "problematic_tables": [t for t in autovacuum_tables if t["autovacuum_status"] in ["OVERDUE", "APPROACHING"]], } async def _analyze_vacuum_performance(self, analysis_results: dict[str, Any]) -> None: """Analyze vacuum performance metrics.""" query = """ SELECT schemaname, relname, vacuum_count, autovacuum_count, last_vacuum, last_autovacuum, n_tup_ins + n_tup_upd + n_tup_del as total_modifications, CASE WHEN vacuum_count + autovacuum_count > 0 THEN (n_tup_ins + n_tup_upd + n_tup_del) / (vacuum_count + autovacuum_count) ELSE 0 END as avg_modifications_per_vacuum, EXTRACT(EPOCH FROM (NOW() - COALESCE(last_autovacuum, last_vacuum))) / 3600 as hours_since_last_vacuum, pg_total_relation_size(schemaname||'.'||relname) as table_size_bytes, CASE WHEN vacuum_count + autovacuum_count > 0 THEN pg_total_relation_size(schemaname||'.'||relname) / (vacuum_count + autovacuum_count) ELSE 0 END as avg_size_per_vacuum FROM pg_stat_user_tables WHERE n_tup_ins + n_tup_upd + n_tup_del > 0 ORDER BY hours_since_last_vacuum DESC, total_modifications DESC LIMIT 25 """ rows = await self.sql_driver.execute_query(query) performance_tables = [] if rows: for row in rows: data = row.cells performance_tables.append( { "qualified_name": f"{data['schemaname']}.{data['relname']}", "vacuum_count": data["vacuum_count"], "autovacuum_count": data["autovacuum_count"], "total_modifications": data["total_modifications"], "avg_modifications_per_vacuum": round(data["avg_modifications_per_vacuum"] or 0.0, 0), "hours_since_last_vacuum": round(data["hours_since_last_vacuum"] or 0.0, 1), "table_size_mb": round(data["table_size_bytes"] / (1024 * 1024), 2), "avg_size_per_vacuum_mb": round((data["avg_size_per_vacuum"] or 0) / (1024 * 1024), 2), } ) analysis_results["vacuum_performance"] = { "tables": performance_tables, "stale_tables": [t for t in performance_tables if t["hours_since_last_vacuum"] > 72], # >3 days "high_modification_tables": [t for t in performance_tables if t["total_modifications"] > 1000000], } async def _generate_maintenance_recommendations(self, analysis_results: dict[str, Any]) -> None: """Generate specific maintenance recommendations.""" recommendations = [] # Recommendations based on bloat analysis bloat_data = analysis_results.get("bloat_analysis", {}) high_priority_bloat = bloat_data.get("high_priority_tables", []) for table in high_priority_bloat[:5]: # Top 5 priority tables if table["bloat_severity"] == "CRITICAL": recommendations.append( { "type": "VACUUM_FULL", "priority": "CRITICAL", "table": table["qualified_name"], "description": f"Table has {table['dead_percentage']}% dead tuples requiring VACUUM FULL", "command": f"VACUUM FULL {table['qualified_name']};", "estimated_impact": "High I/O, exclusive lock required", } ) elif table["bloat_severity"] == "HIGH": recommendations.append( { "type": "VACUUM", "priority": "HIGH", "table": table["qualified_name"], "description": f"Table has {table['dead_percentage']}% dead tuples requiring VACUUM", "command": f"VACUUM {table['qualified_name']};", "estimated_impact": "Moderate I/O, shared lock", } ) # Recommendations based on autovacuum analysis autovacuum_data = analysis_results.get("autovacuum_analysis", {}) problematic_autovacuum = autovacuum_data.get("problematic_tables", []) for table in problematic_autovacuum[:3]: # Top 3 autovacuum issues if table["autovacuum_status"] == "OVERDUE": recommendations.append( { "type": "IMMEDIATE_VACUUM", "priority": "HIGH", "table": table["qualified_name"], "description": f"Table is overdue for autovacuum ({table['dead_tuples']} dead tuples)", "command": f"VACUUM {table['qualified_name']};", "estimated_impact": "Should reduce dead tuple count", } ) # Recommendations based on performance analysis performance_data = analysis_results.get("vacuum_performance", {}) stale_tables = performance_data.get("stale_tables", []) for table in stale_tables[:3]: # Top 3 stale tables if table["hours_since_last_vacuum"] > 168: # >1 week recommendations.append( { "type": "ANALYZE", "priority": "MEDIUM", "table": table["qualified_name"], "description": f"Table hasn't been vacuumed in {table['hours_since_last_vacuum']:.1f} hours", "command": f"ANALYZE {table['qualified_name']};", "estimated_impact": "Update table statistics", } ) analysis_results["maintenance_recommendations"] = recommendations async def _identify_critical_issues(self, analysis_results: dict[str, Any]) -> None: """Identify critical issues requiring immediate attention.""" critical_issues = [] # Check transaction ID wraparound wraparound_query = """ SELECT s.schemaname, s.relname, age(c.relfrozenxid) as xid_age, 2000000000 - age(c.relfrozenxid) as xids_until_wraparound FROM pg_class c JOIN pg_namespace n ON c.relnamespace = n.oid JOIN pg_stat_user_tables s ON c.relname = s.relname AND n.nspname = s.schemaname WHERE c.relkind = 'r' AND age(c.relfrozenxid) > 1500000000 ORDER BY age(c.relfrozenxid) DESC LIMIT 10 """ rows = await self.sql_driver.execute_query(wraparound_query) if rows: for row in rows: data = row.cells critical_issues.append( { "type": "TRANSACTION_ID_WRAPAROUND", "severity": "CRITICAL", "table": f"{data['schemaname']}.{data['relname']}", "description": f"Table is approaching transaction ID wraparound (age: {data['xid_age']:,})", "action_required": "Immediate VACUUM required", "time_sensitive": True, } ) # Check for tables with extreme bloat bloat_data = analysis_results.get("bloat_analysis", {}) for table in bloat_data.get("tables", []): if table["bloat_severity"] == "CRITICAL" and table["total_size_mb"] > 1000: # >1GB critical_issues.append( { "type": "EXTREME_BLOAT", "severity": "CRITICAL", "table": table["qualified_name"], "description": f"Large table ({table['total_size_mb']} MB) with {table['dead_percentage']}% dead tuples", "action_required": "VACUUM FULL during maintenance window", "time_sensitive": False, } ) analysis_results["critical_issues"] = critical_issues async def _generate_configuration_recommendations(self, analysis_results: dict[str, Any]) -> None: """Generate autovacuum configuration recommendations.""" config_recommendations = [] # Check current autovacuum settings settings_query = """ SELECT name, setting, unit, short_desc FROM pg_settings WHERE name IN ( 'autovacuum', 'autovacuum_max_workers', 'autovacuum_naptime', 'autovacuum_vacuum_threshold', 'autovacuum_vacuum_scale_factor', 'autovacuum_analyze_threshold', 'autovacuum_analyze_scale_factor', 'autovacuum_vacuum_cost_delay', 'autovacuum_vacuum_cost_limit' ) ORDER BY name """ rows = await self.sql_driver.execute_query(settings_query) current_settings = {} if rows: for row in rows: data = row.cells current_settings[data["name"]] = { "value": data["setting"], "unit": data["unit"], "description": data["short_desc"], } # Analyze if autovacuum is effective autovacuum_data = analysis_results.get("autovacuum_analysis", {}) status_dist = autovacuum_data.get("status_distribution", {}) if status_dist.get("OVERDUE", 0) > 5: config_recommendations.append( { "parameter": "autovacuum_vacuum_scale_factor", "current_value": current_settings.get("autovacuum_vacuum_scale_factor", {}).get("value", "unknown"), "recommended_value": "0.1", "reason": f"{status_dist['OVERDUE']} tables are overdue for autovacuum", "impact": "More frequent autovacuum runs", } ) if status_dist.get("OVERDUE", 0) > 10: config_recommendations.append( { "parameter": "autovacuum_max_workers", "current_value": current_settings.get("autovacuum_max_workers", {}).get("value", "unknown"), "recommended_value": "6", "reason": "High number of tables requiring vacuum", "impact": "More parallel vacuum operations", } ) # Check for large tables that might need custom settings bloat_data = analysis_results.get("bloat_analysis", {}) large_bloated_tables = [t for t in bloat_data.get("tables", []) if t["total_size_mb"] > 10000 and t["bloat_severity"] in ["HIGH", "CRITICAL"]] if large_bloated_tables: config_recommendations.append( { "parameter": "table_specific_settings", "current_value": "default", "recommended_value": "custom per table", "reason": f"{len(large_bloated_tables)} large tables with high bloat", "impact": "Tailored autovacuum behavior for large tables", "example_tables": [t["qualified_name"] for t in large_bloated_tables[:3]], } ) analysis_results["configuration_recommendations"] = config_recommendations def _format_as_text(self, result: dict[str, Any]) -> str: """Format vacuum analysis result as compact text (no emojis).""" if "error" in result: return f"Error: {result['error']}" out: list[str] = [] # Summary summary = result.get("summary", {}) if summary: out.append( "Summary: " f"tables={summary.get('total_tables', 0)} " f"hist={summary.get('tables_with_vacuum_history', 0)} " f"never={summary.get('tables_never_vacuumed', 0)} " f"dead_tbls={summary.get('tables_with_dead_tuples', 0)} " f"dead={summary.get('total_dead_tuples', 0)} " f"live={summary.get('total_live_tuples', 0)} " f"dead_pct={summary.get('avg_dead_tuple_percentage', 0):.2f}% " f"not_24h={summary.get('tables_not_vacuumed_24h', 0)}" ) # Critical Issues critical_issues = result.get("critical_issues", []) if critical_issues: for i, issue in enumerate(critical_issues, 1): line = ( f"Critical{i}: type={issue.get('type')} sev={issue.get('severity')} table={issue.get('table')} " f"desc={issue.get('description')} action={issue.get('action_required')}" ) if issue.get("time_sensitive"): line += " time_sensitive=true" out.append(line) # Bloat Analysis bloat_analysis = result.get("bloat_analysis", {}) if bloat_analysis: severity_dist = bloat_analysis.get("severity_distribution", {}) parts = [f"{k.lower()}={v}" for k, v in severity_dist.items() if v] if parts: out.append("BloatSeverity: " + " ".join(parts)) high_priority = bloat_analysis.get("high_priority_tables", []) if high_priority: for table in high_priority[:10]: out.append( "Bloat: " f"{table.get('qualified_name')} dead%={table.get('dead_percentage'):.1f} " f"dead={table.get('dead_tuples')} live={table.get('live_tuples')} size={table.get('total_size_mb'):.1f}MB " f"sev={table.get('bloat_severity')} rec={table.get('recommendation')} " f"last_vac={table.get('last_vacuum')} last_auto={table.get('last_autovacuum')}" ) # Autovacuum Analysis autovacuum_analysis = result.get("autovacuum_analysis", {}) if autovacuum_analysis: status_dist = autovacuum_analysis.get("status_distribution", {}) parts = [f"{k.lower()}={v}" for k, v in status_dist.items() if v] if parts: out.append("AutovacuumStatus: " + " ".join(parts)) problematic = autovacuum_analysis.get("problematic_tables", []) if problematic: for table in problematic[:10]: out.append( "Auto: " f"{table.get('qualified_name')} status={table.get('autovacuum_status')} dead={table.get('dead_tuples')} " f"thr={table.get('calculated_threshold'):.0f} hrs={table.get('hours_since_last_autovacuum'):.1f} " f"auto_count={table.get('autovacuum_count')}" ) # Vacuum Performance vacuum_performance = result.get("vacuum_performance", {}) if vacuum_performance: stale_tables = vacuum_performance.get("stale_tables", []) if stale_tables: for table in stale_tables[:5]: out.append( "Stale: " f"{table.get('qualified_name')} hrs={table.get('hours_since_last_vacuum'):.1f} " f"size={table.get('table_size_mb'):.1f}MB mods={table.get('total_modifications')}" ) high_mod_tables = vacuum_performance.get("high_modification_tables", []) if high_mod_tables: for table in high_mod_tables[:5]: out.append( "HighMods: " f"{table.get('qualified_name')} mods={table.get('total_modifications')} vac={table.get('vacuum_count')} auto={table.get('autovacuum_count')}" ) # Maintenance Recommendations maintenance_recs = result.get("maintenance_recommendations", []) if maintenance_recs: for rec in maintenance_recs: out.append( f"Maint: prio={rec.get('priority')} type={rec.get('type')} table={rec.get('table')} desc={rec.get('description')} cmd={rec.get('command')} impact={rec.get('estimated_impact')}" ) # Configuration Recommendations config_recs = result.get("configuration_recommendations", []) if config_recs: for i, rec in enumerate(config_recs, 1): line = ( f"Cfg{i}: param={rec.get('parameter')} curr={rec.get('current_value')} rec={rec.get('recommended_value')} " f"reason={rec.get('reason')} impact={rec.get('impact')}" ) if rec.get("example_tables"): line += " examples=" + ", ".join(rec.get("example_tables", [])) out.append(line) # Summary recommendations if not critical_issues and not maintenance_recs: out.append("Overall: no critical vacuum issues; autovacuum normal; continue monitoring") out.append("General: monitor dead tuple %; review settings; manual vacuum during low-activity; keep stats up to date") return "\n".join(out)

Implementation Reference

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/Cloud-Thinker-AI/postgres-mcp-pro-plus'

If you have feedback or need assistance with the MCP directory API, please join our Discord server