Skip to main content
Glama
database_overview.py42.3 kB
"""Database overview tool for comprehensive database analysis. Extended from the original postgres-mcp project: https://github.com/crystaldba/postgres-mcp """ from __future__ import annotations import asyncio import logging import time from typing import Any from .schema_mapping import SchemaMappingTool logger = logging.getLogger(__name__) class DatabaseOverviewTool: """Tool for generating comprehensive database overview with performance and security analysis.""" def __init__(self, sql_driver): self.sql_driver = sql_driver self.max_tables_per_schema = 100 # Limit tables per schema self.enable_sampling = True # Use sampling for large datasets self.timeout_seconds = 300 # 5 minute timeout self.schema_mapping_tool = SchemaMappingTool(sql_driver) async def get_database_overview(self, max_tables: int = 500, sampling_mode: bool = True, timeout: int = 300): """Get comprehensive database overview with performance and security analysis. Args: max_tables: Maximum number of tables to analyze per schema (default: 500) sampling_mode: Use statistical sampling for large datasets (default: True) timeout: Maximum execution time in seconds (default: 300) """ start_time = time.time() try: # Add timeout wrapper result = await asyncio.wait_for(self._get_database_overview_internal(max_tables, sampling_mode, start_time), timeout=timeout) return self._format_as_text(result) except asyncio.TimeoutError: logger.warning(f"Database overview timed out after {timeout} seconds") error_result = { "error": f"Operation timed out after {timeout} seconds", "execution_metadata": { "max_tables": max_tables, "sampling_mode": sampling_mode, "timeout": timeout, "execution_time": time.time() - start_time, }, } return self._format_as_text(error_result) except Exception as e: logger.error(f"Error generating database overview: {e!s}") error_result = { "error": str(e), "execution_metadata": { "max_tables": max_tables, "sampling_mode": sampling_mode, "timeout": timeout, "execution_time": time.time() - start_time, }, } return self._format_as_text(error_result) async def _get_database_overview_internal(self, max_tables: int, sampling_mode: bool, start_time: float) -> dict[str, Any]: """Internal implementation of database overview.""" try: db_info = { "schemas": {}, "database_summary": { "total_schemas": 0, "total_tables": 0, "total_size_bytes": 0, "total_rows": 0, }, "performance_overview": {}, "security_overview": {}, "relationships": {"foreign_keys": [], "relationship_summary": {}}, "execution_metadata": { "max_tables": max_tables, "sampling_mode": sampling_mode, "timeout": self.timeout_seconds, "tables_analyzed": 0, "tables_skipped": 0, }, } # Get database-wide performance metrics await self._get_performance_metrics(db_info) # Get schema information user_schemas = await self._get_user_schemas() db_info["database_summary"]["total_schemas"] = len(user_schemas) # Track relationships and table stats all_relationships = [] table_connections = {} all_tables_with_stats = [] # Process each schema with limits for schema in user_schemas: logger.info(f"Processing schema: {schema}") schema_info = await self._process_schema( schema, all_relationships, table_connections, all_tables_with_stats, max_tables, sampling_mode ) db_info["schemas"][schema] = schema_info # Update database totals db_info["database_summary"]["total_tables"] += schema_info["table_count"] db_info["database_summary"]["total_size_bytes"] += schema_info["total_size_bytes"] db_info["database_summary"]["total_rows"] += schema_info["total_rows"] # Update metadata db_info["execution_metadata"]["tables_analyzed"] += schema_info.get("tables_analyzed", 0) db_info["execution_metadata"]["tables_skipped"] += schema_info.get("tables_skipped", 0) # Add human-readable database size total_size_gb = db_info["database_summary"]["total_size_bytes"] / (1024**3) db_info["database_summary"]["total_size_readable"] = f"{total_size_gb:.2f} GB" # Add top tables summary if all_tables_with_stats: await self._add_top_tables_summary(db_info, all_tables_with_stats) # Add security overview await self._get_security_overview(db_info) # Build relationship summary await self._build_relationship_summary(db_info, all_relationships, table_connections, user_schemas) # Add schema relationship mapping await self._add_schema_relationship_mapping(db_info, user_schemas) # Add performance hotspot identification await self._identify_performance_hotspots(db_info, all_tables_with_stats) # Add execution timing execution_time = time.time() - start_time db_info["execution_metadata"]["execution_time"] = round(execution_time, 2) logger.info( f"Database overview complete: {db_info['database_summary']['total_tables']} tables " f"across {len(user_schemas)} schemas, {len(all_relationships)} relationships " f"in {execution_time:.2f}s" ) return db_info except Exception as e: logger.error(f"Error generating database overview: {e!s}") return {"error": str(e)} async def _get_user_schemas(self) -> list[str]: """Get list of user schemas (excluding system schemas).""" query = """ SELECT schema_name FROM information_schema.schemata WHERE schema_name NOT IN ('information_schema', 'pg_catalog', 'pg_toast') AND schema_name NOT LIKE 'pg_temp_%' AND schema_name NOT LIKE 'pg_toast_temp_%' ORDER BY schema_name """ rows = await self.sql_driver.execute_query(query) return [row.cells["schema_name"] for row in rows] if rows else [] async def _get_performance_metrics(self, db_info: dict[str, Any]) -> None: """Get database-wide performance metrics.""" db_stats_query = """ SELECT pg_database_size(current_database()) as database_size_bytes, (SELECT count(*) FROM pg_stat_activity WHERE state = 'active') as active_connections, (SELECT count(*) FROM pg_stat_activity) as total_connections, current_setting('max_connections')::int as max_connections """ rows = await self.sql_driver.execute_query(db_stats_query) if rows and rows[0]: row = rows[0].cells db_info["performance_overview"] = { "active_connections": row["active_connections"], "total_connections": row["total_connections"], "max_connections": row["max_connections"], "connection_usage_percent": round((row["total_connections"] / row["max_connections"]) * 100, 2) if row["max_connections"] > 0 else 0, } async def _process_schema( self, schema: str, all_relationships: list[dict[str, Any]], table_connections: dict[str, int], all_tables_with_stats: list[dict[str, Any]], max_tables: int, sampling_mode: bool, ) -> dict[str, Any]: """Process a single schema and return its information.""" # Get tables in schema tables = await self._get_tables_in_schema(schema) # Apply sampling and limits tables_to_process = tables tables_skipped = 0 if len(tables) > max_tables: if sampling_mode: # Sample tables evenly across the list step = len(tables) / max_tables tables_to_process = [tables[int(i * step)] for i in range(max_tables)] tables_skipped = len(tables) - max_tables logger.info(f"Schema {schema}: sampling {max_tables} of {len(tables)} tables") else: # Take first N tables tables_to_process = tables[:max_tables] tables_skipped = len(tables) - max_tables logger.info(f"Schema {schema}: limiting to first {max_tables} of {len(tables)} tables") schema_info = { "table_count": len(tables), "total_size_bytes": 0, "total_rows": 0, "tables": {}, "tables_analyzed": len(tables_to_process), "tables_skipped": tables_skipped, "is_sampled": tables_skipped > 0, } # Get bulk table statistics bulk_stats = await self._get_bulk_table_stats(tables_to_process, schema) for table in tables_to_process: # Get table stats from bulk query table_stats = bulk_stats.get(table, {"row_count": 0, "size_bytes": 0}) # Get foreign key relationships (keep individual for now due to complexity) relationships = await self._get_foreign_keys(table, schema) for relationship in relationships: all_relationships.append(relationship) # Track connections from_key = f"{schema}.{table}" to_key = f"{relationship['to_schema']}.{relationship['to_table']}" table_connections[from_key] = table_connections.get(from_key, 0) + 1 table_connections[to_key] = table_connections.get(to_key, 0) + 1 if "error" not in table_stats: essential_info = { "row_count": table_stats.get("row_count", 0), "size_bytes": table_stats.get("size_bytes", 0), "size_readable": self._format_bytes(table_stats.get("size_bytes", 0)), "needs_attention": [], } # Add performance insights if table_stats.get("seq_scans", 0) > table_stats.get("idx_scans", 0): essential_info["needs_attention"].append("frequent_seq_scans") if essential_info["row_count"] == 0: essential_info["needs_attention"].append("empty_table") # Store for analysis all_tables_with_stats.append( { "schema": schema, "table": table, "size_bytes": essential_info["size_bytes"], "total_scans": table_stats.get("seq_scans", 0) + table_stats.get("idx_scans", 0), } ) schema_info["tables"][table] = essential_info schema_info["total_size_bytes"] += essential_info["size_bytes"] schema_info["total_rows"] += essential_info["row_count"] else: schema_info["tables"][table] = {"error": "stats_unavailable"} return schema_info async def _get_tables_in_schema(self, schema: str) -> list[str]: """Get list of tables in a schema.""" query = """ SELECT table_name FROM information_schema.tables WHERE table_schema = %s AND table_type = 'BASE TABLE' ORDER BY table_name """ rows = await self.sql_driver.execute_query(query, (schema,)) return [row.cells["table_name"] for row in rows] if rows else [] async def _get_bulk_table_stats(self, tables: list[str], schema: str) -> dict[str, dict[str, Any]]: """Get table statistics for multiple tables in a single query.""" if not tables: return {} # Create IN clause for tables table_placeholders = ",".join(["%s"] * len(tables)) query = f""" SELECT relname as table_name, COALESCE(n_tup_ins + n_tup_upd + n_tup_del, 0) as total_modifications, COALESCE(n_tup_ins, 0) as inserts, COALESCE(n_tup_upd, 0) as updates, COALESCE(n_tup_del, 0) as deletes, COALESCE(seq_scan, 0) as seq_scans, COALESCE(seq_tup_read, 0) as seq_tup_read, COALESCE(idx_scan, 0) as idx_scans, COALESCE(idx_tup_fetch, 0) as idx_tup_fetch, COALESCE(n_live_tup, 0) as live_tuples, COALESCE(n_dead_tup, 0) as dead_tuples, pg_total_relation_size(quote_ident(schemaname)||'.'||quote_ident(relname)) as size_bytes, COALESCE(n_live_tup, 0) as row_count FROM pg_stat_user_tables WHERE schemaname = %s AND relname IN ({table_placeholders}) """ try: params = [schema, *tables] rows = await self.sql_driver.execute_query(query, params) result = {} if rows: for row in rows: table_name = row.cells["table_name"] result[table_name] = dict(row.cells) # Add empty stats for tables not found in pg_stat_user_tables for table in tables: if table not in result: result[table] = {"row_count": 0, "size_bytes": 0} return result except Exception as e: logger.warning(f"Could not get bulk stats for schema {schema}: {e}") # Fallback to individual queries result = {} for table in tables: result[table] = await self._get_table_stats(table, schema) return result async def _get_foreign_keys(self, table: str, schema: str) -> list[dict[str, Any]]: """Get foreign key relationships for a table.""" query = """ SELECT tc.constraint_name, tc.table_schema as from_schema, tc.table_name as from_table, string_agg(kcu.column_name, ',' ORDER BY kcu.ordinal_position) as from_columns, ccu.table_schema as to_schema, ccu.table_name as to_table, string_agg(ccu.column_name, ',' ORDER BY kcu.ordinal_position) as to_columns FROM information_schema.table_constraints AS tc JOIN information_schema.key_column_usage AS kcu ON tc.constraint_name = kcu.constraint_name AND tc.table_schema = kcu.table_schema JOIN information_schema.constraint_column_usage AS ccu ON ccu.constraint_name = tc.constraint_name AND ccu.table_schema = tc.table_schema WHERE tc.constraint_type = 'FOREIGN KEY' AND tc.table_schema = %s AND tc.table_name = %s GROUP BY tc.constraint_name, tc.table_schema, tc.table_name, ccu.table_schema, ccu.table_name """ relationships = [] try: rows = await self.sql_driver.execute_query(query, (schema, table)) for row in rows: relationship = { "from_schema": row.cells["from_schema"], "from_table": row.cells["from_table"], "from_columns": row.cells["from_columns"].split(","), "to_schema": row.cells["to_schema"], "to_table": row.cells["to_table"], "to_columns": row.cells["to_columns"].split(","), "constraint_name": row.cells["constraint_name"], } relationships.append(relationship) except Exception as e: logger.warning(f"Could not get foreign keys for {schema}.{table}: {e}") return relationships async def _get_table_stats(self, table: str, schema: str) -> dict[str, Any]: """Get basic table statistics.""" try: stats_query = """ SELECT COALESCE(n_tup_ins + n_tup_upd + n_tup_del, 0) as total_modifications, COALESCE(n_tup_ins, 0) as inserts, COALESCE(n_tup_upd, 0) as updates, COALESCE(n_tup_del, 0) as deletes, COALESCE(seq_scan, 0) as seq_scans, COALESCE(seq_tup_read, 0) as seq_tup_read, COALESCE(idx_scan, 0) as idx_scans, COALESCE(idx_tup_fetch, 0) as idx_tup_fetch, COALESCE(n_live_tup, 0) as live_tuples, COALESCE(n_dead_tup, 0) as dead_tuples, pg_total_relation_size(quote_ident(schemaname)||'.'||quote_ident(relname)) as size_bytes, COALESCE(n_live_tup, 0) as row_count FROM pg_stat_user_tables WHERE schemaname = %s AND relname = %s """ rows = await self.sql_driver.execute_query(stats_query, (schema, table)) if rows and rows[0]: return dict(rows[0].cells) else: # Fallback for tables without stats return {"row_count": 0, "size_bytes": 0} except Exception as e: logger.warning(f"Could not get stats for {schema}.{table}: {e}") return {"error": str(e)} async def _add_top_tables_summary(self, db_info: dict[str, Any], all_tables_with_stats: list[dict[str, Any]]) -> None: """Add top tables summary for performance insights.""" # Top 5 tables by size top_by_size = sorted(all_tables_with_stats, key=lambda x: x["size_bytes"], reverse=True)[:5] # Top 5 most active tables top_by_activity = sorted(all_tables_with_stats, key=lambda x: x["total_scans"], reverse=True)[:5] db_info["performance_overview"]["top_tables"] = { "largest": [ { "schema": t["schema"], "table": t["table"], "size_bytes": t["size_bytes"], "size_readable": self._format_bytes(t["size_bytes"]), } for t in top_by_size ], "most_active": [ { "schema": t["schema"], "table": t["table"], "total_scans": t["total_scans"], } for t in top_by_activity ], } async def _get_security_overview(self, db_info: dict[str, Any]) -> None: """Get security overview and recommendations.""" security_issues = [] security_score = 100 # Check security settings security_settings = {} settings_to_check = ["ssl", "log_connections", "password_encryption"] for setting in settings_to_check: try: result = await self.sql_driver.execute_query(f"SHOW {setting}") security_settings[setting] = result[0].cells[setting] if result and result[0] else "unknown" except Exception: security_settings[setting] = "not available" # Check for pg_stat_statements extension ext_query = "SELECT EXISTS(SELECT 1 FROM pg_extension WHERE extname = 'pg_stat_statements')" ext_result = await self.sql_driver.execute_query(ext_query) pg_stat_statements_installed = ext_result[0].cells["exists"] if ext_result and ext_result[0] else False security_settings["pg_stat_statements_installed"] = pg_stat_statements_installed # Security issue detection if security_settings.get("ssl") != "on": security_issues.append("ssl_disabled") security_score -= 20 if not pg_stat_statements_installed: security_issues.append("no_query_monitoring") security_score -= 10 # Get user security summary users_query = """ SELECT COUNT(*) as total_users, COUNT(*) FILTER (WHERE rolsuper = true) as superusers, COUNT(*) FILTER (WHERE rolconnlimit = -1) as unlimited_connections FROM pg_roles WHERE rolcanlogin = true """ user_result = await self.sql_driver.execute_query(users_query) if user_result and user_result[0]: user_stats = user_result[0].cells total_users = user_stats["total_users"] superusers = user_stats["superusers"] unlimited_conn = user_stats["unlimited_connections"] if superusers > 1: security_issues.append("multiple_superusers") security_score -= 15 if unlimited_conn > 0: security_issues.append("unlimited_connections") security_score -= 10 recommendations = [] if "ssl_disabled" in security_issues: recommendations.append("Enable SSL encryption") if "no_query_monitoring" in security_issues: recommendations.append("Install pg_stat_statements for query monitoring") if "multiple_superusers" in security_issues: recommendations.append("Review superuser privileges") if "unlimited_connections" in security_issues: recommendations.append("Set connection limits for users") db_info["security_overview"] = { "security_score": max(0, security_score), "total_users": total_users, "superusers": superusers, "unlimited_connections": unlimited_conn, "security_settings": security_settings, "security_issues": security_issues, "recommendations": recommendations, } async def _build_relationship_summary( self, db_info: dict[str, Any], all_relationships: list[dict[str, Any]], table_connections: dict[str, int], user_schemas: list[str], ) -> None: """Build relationship summary and insights.""" db_info["relationships"]["foreign_keys"] = all_relationships if all_relationships: # Find most connected tables most_connected = sorted(table_connections.items(), key=lambda x: x[1], reverse=True)[:5] # Find isolated tables all_table_keys = set() for schema in user_schemas: tables = await self._get_tables_in_schema(schema) for table in tables: all_table_keys.add(f"{schema}.{table}") connected_tables = set(table_connections.keys()) isolated_tables = all_table_keys - connected_tables # Find hub tables (highly referenced) relationship_patterns = {} for rel in all_relationships: to_table = f"{rel['to_schema']}.{rel['to_table']}" relationship_patterns[to_table] = relationship_patterns.get(to_table, 0) + 1 hub_tables = sorted(relationship_patterns.items(), key=lambda x: x[1], reverse=True)[:5] insights = [] if len(isolated_tables) > 0: insights.append(f"{len(isolated_tables)} tables have no foreign key relationships") if hub_tables: top_hub = hub_tables[0] insights.append(f"{top_hub[0]} is the most referenced table ({top_hub[1]} references)") db_info["relationships"]["relationship_summary"] = { "total_relationships": len(all_relationships), "connected_tables": len(connected_tables), "isolated_tables": len(isolated_tables), "most_connected_tables": [{"table": table, "connections": count} for table, count in most_connected], "hub_tables": [{"table": table, "referenced_by": count} for table, count in hub_tables], "relationship_insights": insights, } else: db_info["relationships"]["relationship_summary"] = { "total_relationships": 0, "connected_tables": 0, "isolated_tables": db_info["database_summary"]["total_tables"], "relationship_insights": ["No foreign key relationships found in the database"], } def _format_bytes(self, bytes_value: int) -> str: """Format bytes into compact human-readable string (no spaces).""" if bytes_value == 0: return "0B" value = float(bytes_value) for unit in ["B", "KB", "MB", "GB", "TB"]: if value < 1024.0: return f"{value:.1f}{unit}" value /= 1024.0 return f"{value:.1f}PB" async def _identify_performance_hotspots(self, db_info: dict[str, Any], all_tables_with_stats: list[dict[str, Any]]) -> None: """Identify performance hotspots in the database.""" try: logger.info("Identifying performance hotspots...") hotspots = { "high_scan_ratio_tables": [], "high_dead_tuple_tables": [], "large_tables_with_issues": [], "high_modification_tables": [], "tables_needing_maintenance": [], "summary": {"total_hotspots": 0, "critical_issues": 0, "warning_issues": 0}, } for table_info in all_tables_with_stats: schema = table_info["schema"] table = table_info["table"] stats = table_info # Calculate derived metrics total_scans = stats.get("seq_scans", 0) + stats.get("idx_scans", 0) seq_scan_ratio = (stats.get("seq_scans", 0) / total_scans) if total_scans > 0 else 0 dead_tuple_ratio = ( (stats.get("dead_tuples", 0) / (stats.get("live_tuples", 0) + stats.get("dead_tuples", 0))) if (stats.get("live_tuples", 0) + stats.get("dead_tuples", 0)) > 0 else 0 ) size_mb = stats.get("size_bytes", 0) / (1024 * 1024) # Identify high sequential scan ratio tables (>50% seq scans on tables with >1000 scans) if seq_scan_ratio > 0.5 and total_scans > 1000: hotspots["high_scan_ratio_tables"].append( { "qualified_name": f"{schema}.{table}", "seq_scan_ratio": round(seq_scan_ratio * 100, 1), "total_scans": total_scans, "seq_scans": stats.get("seq_scans", 0), "size_mb": round(size_mb, 2), "severity": "HIGH" if seq_scan_ratio > 0.8 else "MEDIUM", } ) # Identify tables with high dead tuple ratio (>20%) if dead_tuple_ratio > 0.2: hotspots["high_dead_tuple_tables"].append( { "qualified_name": f"{schema}.{table}", "dead_tuple_ratio": round(dead_tuple_ratio * 100, 1), "dead_tuples": stats.get("dead_tuples", 0), "live_tuples": stats.get("live_tuples", 0), "size_mb": round(size_mb, 2), "severity": "HIGH" if dead_tuple_ratio > 0.4 else "MEDIUM", } ) # Identify large tables with performance issues (>100MB with issues) if size_mb > 100: issues = [] if seq_scan_ratio > 0.3: issues.append(f"High seq scan ratio ({seq_scan_ratio * 100:.1f}%)") if dead_tuple_ratio > 0.1: issues.append(f"High dead tuple ratio ({dead_tuple_ratio * 100:.1f}%)") if total_scans > 10000 and stats.get("idx_scans", 0) == 0: issues.append("No index scans despite high activity") if issues: hotspots["large_tables_with_issues"].append( { "qualified_name": f"{schema}.{table}", "size_mb": round(size_mb, 2), "issues": issues, "total_scans": total_scans, "severity": "HIGH" if len(issues) > 1 else "MEDIUM", } ) # Identify tables with high modification rates total_modifications = stats.get("total_modifications", 0) if total_modifications > 100000: # Tables with >100k modifications hotspots["high_modification_tables"].append( { "qualified_name": f"{schema}.{table}", "total_modifications": total_modifications, "inserts": stats.get("inserts", 0), "updates": stats.get("updates", 0), "deletes": stats.get("deletes", 0), "size_mb": round(size_mb, 2), "severity": "HIGH" if total_modifications > 1000000 else "MEDIUM", } ) # Generate maintenance recommendations maintenance_needed = [] if dead_tuple_ratio > 0.2: maintenance_needed.append("VACUUM recommended") if dead_tuple_ratio > 0.4: maintenance_needed.append("VACUUM FULL may be needed") if seq_scan_ratio > 0.5 and total_scans > 1000: maintenance_needed.append("Consider adding indexes") if stats.get("n_mod_since_analyze", 0) > stats.get("live_tuples", 0) * 0.1: maintenance_needed.append("ANALYZE recommended") if maintenance_needed: hotspots["tables_needing_maintenance"].append( { "qualified_name": f"{schema}.{table}", "recommendations": maintenance_needed, "size_mb": round(size_mb, 2), "priority": "HIGH" if len(maintenance_needed) > 1 else "MEDIUM", } ) # Sort all hotspot lists by severity and size for hotspot_type in [ "high_scan_ratio_tables", "high_dead_tuple_tables", "large_tables_with_issues", "high_modification_tables", ]: hotspots[hotspot_type] = sorted(hotspots[hotspot_type], key=lambda x: (x["severity"] == "HIGH", x.get("size_mb", 0)), reverse=True)[ :10 ] # Limit to top 10 hotspots["tables_needing_maintenance"] = sorted( hotspots["tables_needing_maintenance"], key=lambda x: (x["priority"] == "HIGH", x.get("size_mb", 0)), reverse=True, )[:10] # Calculate summary statistics total_hotspots = sum(len(hotspots[key]) for key in hotspots if key != "summary") critical_issues = sum( 1 for hotspot_list in hotspots.values() if isinstance(hotspot_list, list) for item in hotspot_list if item.get("severity") == "HIGH" or item.get("priority") == "HIGH" ) warning_issues = total_hotspots - critical_issues hotspots["summary"] = { "total_hotspots": total_hotspots, "critical_issues": critical_issues, "warning_issues": warning_issues, } db_info["performance_hotspots"] = hotspots logger.info(f"Performance hotspot analysis complete: {total_hotspots} hotspots identified") except Exception as e: logger.error(f"Error identifying performance hotspots: {e}") db_info["performance_hotspots"] = {"error": f"Failed to identify performance hotspots: {e!s}"} async def _add_schema_relationship_mapping(self, db_info: dict[str, Any], user_schemas: list[str]) -> None: """Add schema relationship mapping analysis to database overview.""" try: logger.info("Analyzing schema relationships...") # Perform schema relationship analysis schema_mapping_results = await self.schema_mapping_tool.analyze_schema_relationships(user_schemas) # Add to database info (now returns text format) db_info["schema_relationship_mapping"] = {"analysis_text": schema_mapping_results} # Extract a simple count for logging (since we now get text, use a simpler approach) logger.info("Schema relationship mapping complete") except Exception as e: logger.error(f"Error adding schema relationship mapping: {e}") db_info["schema_relationship_mapping"] = {"error": f"Failed to analyze schema relationships: {e!s}"} def _format_as_text(self, result: dict[str, Any]) -> str: """Format database overview result as compact text (no emojis, minimal headers).""" if "error" in result: return f"Error: {result['error']}\nMeta: {self._format_execution_metadata(result.get('execution_metadata', {}))}" out: list[str] = [] # Database summary (single line) db_summary = result.get("database_summary", {}) out.append( "DB: " f"schemas={db_summary.get('total_schemas', 0)} " f"tables={db_summary.get('total_tables', 0)} " f"size={db_summary.get('total_size_readable', 'N/A')} " f"rows={db_summary.get('total_rows', 0)}" ) # Performance Overview perf_overview = result.get("performance_overview", {}) if perf_overview: out.append( "Perf: " f"active={perf_overview.get('active_connections', 0)} " f"total={perf_overview.get('total_connections', 0)} " f"max={perf_overview.get('max_connections', 0)} " f"usage={perf_overview.get('connection_usage_percent', 0)}%" ) top_tables = perf_overview.get("top_tables", {}) if top_tables.get("largest"): largest = [f"{t['schema']}.{t['table']} {t['size_readable']}" for t in top_tables["largest"][:3]] out.append("Largest: " + "; ".join(largest)) if top_tables.get("most_active"): active = [f"{t['schema']}.{t['table']} scans={t['total_scans']}" for t in top_tables["most_active"][:3]] out.append("MostActive: " + "; ".join(active)) # Security Overview security_overview = result.get("security_overview", {}) if security_overview: out.append( "Security: " f"score={security_overview.get('security_score', 0)}/100 " f"users={security_overview.get('total_users', 0)} " f"su={security_overview.get('superusers', 0)} " f"unlim_conn={security_overview.get('unlimited_connections', 0)}" ) security_issues = security_overview.get("security_issues", []) if security_issues: out.append("SecIssues: " + ", ".join(security_issues)) recommendations = security_overview.get("recommendations", []) if recommendations: out.append("SecRecs: " + ", ".join(recommendations)) # Performance Hotspots hotspots = result.get("performance_hotspots", {}) if hotspots and "error" not in hotspots: summary = hotspots.get("summary", {}) out.append( f"Hotspots: total={summary.get('total_hotspots', 0)} crit={summary.get('critical_issues', 0)} warn={summary.get('warning_issues', 0)}" ) if hotspots.get("high_scan_ratio_tables"): items = [ f"{t['qualified_name']} r={t['seq_scan_ratio']}% sc={t['total_scans']} sz={t['size_mb']}MB sev={'H' if t['severity'] == 'HIGH' else 'M'}" for t in hotspots["high_scan_ratio_tables"][:5] ] out.append("HighSeqScan: " + "; ".join(items)) if hotspots.get("high_dead_tuple_tables"): items = [ f"{t['qualified_name']} dead={t['dead_tuple_ratio']}% sz={t['size_mb']}MB sev={'H' if t['severity'] == 'HIGH' else 'M'}" for t in hotspots["high_dead_tuple_tables"][:5] ] out.append("HighDeadTuples: " + "; ".join(items)) if hotspots.get("large_tables_with_issues"): items = [ f"{t['qualified_name']} sz={t['size_mb']}MB issues=[{', '.join(t.get('issues', []))}] sev={'H' if t['severity'] == 'HIGH' else 'M'}" for t in hotspots["large_tables_with_issues"][:5] ] out.append("LargeWithIssues: " + "; ".join(items)) if hotspots.get("high_modification_tables"): items = [f"{t['qualified_name']} mods={t['total_modifications']}" for t in hotspots["high_modification_tables"][:5]] out.append("HighMod: " + "; ".join(items)) if hotspots.get("tables_needing_maintenance"): items = [ f"{t['qualified_name']} rec=[{', '.join(t.get('recommendations', []))}] prio={t.get('priority', 'MEDIUM')}" for t in hotspots["tables_needing_maintenance"][:5] ] out.append("Maintenance: " + "; ".join(items)) # Relationships Summary relationships = result.get("relationships", {}) if relationships: rel_summary = relationships.get("relationship_summary", {}) out.append( "Rel: " f"total={rel_summary.get('total_relationships', 0)} " f"connected={rel_summary.get('connected_tables', 0)} " f"isolated={rel_summary.get('isolated_tables', 0)}" ) most_connected = rel_summary.get("most_connected_tables", []) if most_connected: out.append("MostConnected: " + "; ".join([f"{t['table']}({t['connections']})" for t in most_connected[:5]])) hub_tables = rel_summary.get("hub_tables", []) if hub_tables: out.append("Hubs: " + "; ".join([f"{t['table']}({t['referenced_by']})" for t in hub_tables[:5]])) insights = rel_summary.get("relationship_insights", []) if insights: out.append("RelInsights: " + "; ".join(insights)) # Schema Details schemas = result.get("schemas", {}) if schemas: for schema_name, schema_info in schemas.items(): line = ( f"Schema {schema_name}: " f"tables={schema_info.get('table_count', 0)} " f"size={self._format_bytes(schema_info.get('total_size_bytes', 0))} " f"rows={schema_info.get('total_rows', 0)}" ) if schema_info.get("is_sampled"): line += f" sampled={schema_info.get('tables_analyzed', 0)}/{schema_info.get('table_count', 0)}" out.append(line) tables = schema_info.get("tables", {}) if tables: top_schema_tables = sorted( [(name, info) for name, info in tables.items() if "size_bytes" in info], key=lambda x: x[1]["size_bytes"], reverse=True, )[:3] if top_schema_tables: tops = [f"{name} {info.get('size_readable', 'N/A')}" for name, info in top_schema_tables] out.append(" Top: " + "; ".join(tops)) # Schema Relationship Mapping schema_mapping = result.get("schema_relationship_mapping", {}) if schema_mapping: if "error" in schema_mapping: out.append(f"SchemaMapError: {schema_mapping['error']}") elif "analysis_text" in schema_mapping: out.append("SchemaMap:") out.append(schema_mapping["analysis_text"]) # Execution Metadata metadata = result.get("execution_metadata", {}) if metadata: out.append("Meta: " + self._format_execution_metadata(metadata)) return "\n".join(out) def _format_execution_metadata(self, metadata: dict[str, Any]) -> str: """Format execution metadata compactly on one line.""" return ( f"max_tables={metadata.get('max_tables', 'NA')} " f"sampling={metadata.get('sampling_mode', 'NA')} " f"timeout={metadata.get('timeout', 'NA')}s " f"analyzed={metadata.get('tables_analyzed', 0)} " f"skipped={metadata.get('tables_skipped', 0)} " f"time={metadata.get('execution_time', 'NA')}s" )

Implementation Reference

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/Cloud-Thinker-AI/postgres-mcp-pro-plus'

If you have feedback or need assistance with the MCP directory API, please join our Discord server