get_database_overview
Analyze PostgreSQL database structure, performance metrics, and security configurations to identify optimization opportunities and potential issues.
Instructions
Get comprehensive database overview with performance and security analysis
Input Schema
| Name | Required | Description | Default |
|---|---|---|---|
| max_tables | No | Maximum number of tables to analyze per schema | |
| sampling_mode | No | Use statistical sampling for large datasets | |
| timeout | No | Maximum execution time in seconds |
Implementation Reference
- src/postgres_mcp_pro_plus/server.py:669-684 (registration)MCP tool registration and thin wrapper handler for get_database_overview that instantiates DatabaseOverviewTool and delegates to it.
@mcp.tool(description="Get comprehensive database overview with performance and security analysis") async def get_database_overview( max_tables: int = Field(description="Maximum number of tables to analyze per schema", default=500), sampling_mode: bool = Field(description="Use statistical sampling for large datasets", default=True), timeout: int = Field(description="Maximum execution time in seconds", default=300), ) -> ResponseType: """Get comprehensive database overview including schemas, tables, relationships, performance metrics, and security analysis.""" try: sql_driver = await get_sql_driver() overview_tool = DatabaseOverviewTool(sql_driver) result = await overview_tool.get_database_overview(max_tables, sampling_mode, timeout) return format_text_response(result) except Exception as e: logger.error(f"Error getting database overview: {e}") return format_error_response(str(e)) - Main handler method in DatabaseOverviewTool class that handles timeout, error handling, and formatting for get_database_overview.
async def get_database_overview(self, max_tables: int = 500, sampling_mode: bool = True, timeout: int = 300): """Get comprehensive database overview with performance and security analysis. Args: max_tables: Maximum number of tables to analyze per schema (default: 500) sampling_mode: Use statistical sampling for large datasets (default: True) timeout: Maximum execution time in seconds (default: 300) """ start_time = time.time() try: # Add timeout wrapper result = await asyncio.wait_for(self._get_database_overview_internal(max_tables, sampling_mode, start_time), timeout=timeout) return self._format_as_text(result) except asyncio.TimeoutError: logger.warning(f"Database overview timed out after {timeout} seconds") error_result = { "error": f"Operation timed out after {timeout} seconds", "execution_metadata": { "max_tables": max_tables, "sampling_mode": sampling_mode, "timeout": timeout, "execution_time": time.time() - start_time, }, } return self._format_as_text(error_result) except Exception as e: logger.error(f"Error generating database overview: {e!s}") error_result = { "error": str(e), "execution_metadata": { "max_tables": max_tables, "sampling_mode": sampling_mode, "timeout": timeout, "execution_time": time.time() - start_time, }, } return self._format_as_text(error_result) - Core internal handler that orchestrates gathering of database summary, performance, security, relationships, and hotspots.
async def _get_database_overview_internal(self, max_tables: int, sampling_mode: bool, start_time: float) -> dict[str, Any]: """Internal implementation of database overview.""" try: db_info = { "schemas": {}, "database_summary": { "total_schemas": 0, "total_tables": 0, "total_size_bytes": 0, "total_rows": 0, }, "performance_overview": {}, "security_overview": {}, "relationships": {"foreign_keys": [], "relationship_summary": {}}, "execution_metadata": { "max_tables": max_tables, "sampling_mode": sampling_mode, "timeout": self.timeout_seconds, "tables_analyzed": 0, "tables_skipped": 0, }, } # Get database-wide performance metrics await self._get_performance_metrics(db_info) # Get schema information user_schemas = await self._get_user_schemas() db_info["database_summary"]["total_schemas"] = len(user_schemas) # Track relationships and table stats all_relationships = [] table_connections = {} all_tables_with_stats = [] # Process each schema with limits for schema in user_schemas: logger.info(f"Processing schema: {schema}") schema_info = await self._process_schema( schema, all_relationships, table_connections, all_tables_with_stats, max_tables, sampling_mode ) db_info["schemas"][schema] = schema_info # Update database totals db_info["database_summary"]["total_tables"] += schema_info["table_count"] db_info["database_summary"]["total_size_bytes"] += schema_info["total_size_bytes"] db_info["database_summary"]["total_rows"] += schema_info["total_rows"] # Update metadata db_info["execution_metadata"]["tables_analyzed"] += schema_info.get("tables_analyzed", 0) db_info["execution_metadata"]["tables_skipped"] += schema_info.get("tables_skipped", 0) # Add human-readable database size total_size_gb = db_info["database_summary"]["total_size_bytes"] / (1024**3) db_info["database_summary"]["total_size_readable"] = f"{total_size_gb:.2f} GB" # Add top tables summary if all_tables_with_stats: await self._add_top_tables_summary(db_info, all_tables_with_stats) # Add security overview await self._get_security_overview(db_info) # Build relationship summary await self._build_relationship_summary(db_info, all_relationships, table_connections, user_schemas) # Add schema relationship mapping await self._add_schema_relationship_mapping(db_info, user_schemas) # Add performance hotspot identification await self._identify_performance_hotspots(db_info, all_tables_with_stats) # Add execution timing execution_time = time.time() - start_time db_info["execution_metadata"]["execution_time"] = round(execution_time, 2) logger.info( f"Database overview complete: {db_info['database_summary']['total_tables']} tables " f"across {len(user_schemas)} schemas, {len(all_relationships)} relationships " f"in {execution_time:.2f}s" ) return db_info except Exception as e: logger.error(f"Error generating database overview: {e!s}") return {"error": str(e)} - Input schema definition via Pydantic Field for the tool parameters.
async def get_database_overview( max_tables: int = Field(description="Maximum number of tables to analyze per schema", default=500), sampling_mode: bool = Field(description="Use statistical sampling for large datasets", default=True), timeout: int = Field(description="Maximum execution time in seconds", default=300), ) -> ResponseType: - Helper method to format the database overview result as compact text output.
def _format_as_text(self, result: dict[str, Any]) -> str: """Format database overview result as compact text (no emojis, minimal headers).""" if "error" in result: return f"Error: {result['error']}\nMeta: {self._format_execution_metadata(result.get('execution_metadata', {}))}" out: list[str] = [] # Database summary (single line) db_summary = result.get("database_summary", {}) out.append( "DB: " f"schemas={db_summary.get('total_schemas', 0)} " f"tables={db_summary.get('total_tables', 0)} " f"size={db_summary.get('total_size_readable', 'N/A')} " f"rows={db_summary.get('total_rows', 0)}" ) # Performance Overview perf_overview = result.get("performance_overview", {}) if perf_overview: out.append( "Perf: " f"active={perf_overview.get('active_connections', 0)} " f"total={perf_overview.get('total_connections', 0)} " f"max={perf_overview.get('max_connections', 0)} " f"usage={perf_overview.get('connection_usage_percent', 0)}%" ) top_tables = perf_overview.get("top_tables", {}) if top_tables.get("largest"): largest = [f"{t['schema']}.{t['table']} {t['size_readable']}" for t in top_tables["largest"][:3]] out.append("Largest: " + "; ".join(largest)) if top_tables.get("most_active"): active = [f"{t['schema']}.{t['table']} scans={t['total_scans']}" for t in top_tables["most_active"][:3]] out.append("MostActive: " + "; ".join(active)) # Security Overview security_overview = result.get("security_overview", {}) if security_overview: out.append( "Security: " f"score={security_overview.get('security_score', 0)}/100 " f"users={security_overview.get('total_users', 0)} " f"su={security_overview.get('superusers', 0)} " f"unlim_conn={security_overview.get('unlimited_connections', 0)}" ) security_issues = security_overview.get("security_issues", []) if security_issues: out.append("SecIssues: " + ", ".join(security_issues)) recommendations = security_overview.get("recommendations", []) if recommendations: out.append("SecRecs: " + ", ".join(recommendations)) # Performance Hotspots hotspots = result.get("performance_hotspots", {}) if hotspots and "error" not in hotspots: summary = hotspots.get("summary", {}) out.append( f"Hotspots: total={summary.get('total_hotspots', 0)} crit={summary.get('critical_issues', 0)} warn={summary.get('warning_issues', 0)}" ) if hotspots.get("high_scan_ratio_tables"): items = [ f"{t['qualified_name']} r={t['seq_scan_ratio']}% sc={t['total_scans']} sz={t['size_mb']}MB sev={'H' if t['severity'] == 'HIGH' else 'M'}" for t in hotspots["high_scan_ratio_tables"][:5] ] out.append("HighSeqScan: " + "; ".join(items)) if hotspots.get("high_dead_tuple_tables"): items = [ f"{t['qualified_name']} dead={t['dead_tuple_ratio']}% sz={t['size_mb']}MB sev={'H' if t['severity'] == 'HIGH' else 'M'}" for t in hotspots["high_dead_tuple_tables"][:5] ] out.append("HighDeadTuples: " + "; ".join(items)) if hotspots.get("large_tables_with_issues"): items = [ f"{t['qualified_name']} sz={t['size_mb']}MB issues=[{', '.join(t.get('issues', []))}] sev={'H' if t['severity'] == 'HIGH' else 'M'}" for t in hotspots["large_tables_with_issues"][:5] ] out.append("LargeWithIssues: " + "; ".join(items)) if hotspots.get("high_modification_tables"): items = [f"{t['qualified_name']} mods={t['total_modifications']}" for t in hotspots["high_modification_tables"][:5]] out.append("HighMod: " + "; ".join(items)) if hotspots.get("tables_needing_maintenance"): items = [ f"{t['qualified_name']} rec=[{', '.join(t.get('recommendations', []))}] prio={t.get('priority', 'MEDIUM')}" for t in hotspots["tables_needing_maintenance"][:5] ] out.append("Maintenance: " + "; ".join(items)) # Relationships Summary relationships = result.get("relationships", {}) if relationships: rel_summary = relationships.get("relationship_summary", {}) out.append( "Rel: " f"total={rel_summary.get('total_relationships', 0)} " f"connected={rel_summary.get('connected_tables', 0)} " f"isolated={rel_summary.get('isolated_tables', 0)}" ) most_connected = rel_summary.get("most_connected_tables", []) if most_connected: out.append("MostConnected: " + "; ".join([f"{t['table']}({t['connections']})" for t in most_connected[:5]])) hub_tables = rel_summary.get("hub_tables", []) if hub_tables: out.append("Hubs: " + "; ".join([f"{t['table']}({t['referenced_by']})" for t in hub_tables[:5]])) insights = rel_summary.get("relationship_insights", []) if insights: out.append("RelInsights: " + "; ".join(insights)) # Schema Details schemas = result.get("schemas", {}) if schemas: for schema_name, schema_info in schemas.items(): line = ( f"Schema {schema_name}: " f"tables={schema_info.get('table_count', 0)} " f"size={self._format_bytes(schema_info.get('total_size_bytes', 0))} " f"rows={schema_info.get('total_rows', 0)}" ) if schema_info.get("is_sampled"): line += f" sampled={schema_info.get('tables_analyzed', 0)}/{schema_info.get('table_count', 0)}" out.append(line) tables = schema_info.get("tables", {}) if tables: top_schema_tables = sorted( [(name, info) for name, info in tables.items() if "size_bytes" in info], key=lambda x: x[1]["size_bytes"], reverse=True, )[:3] if top_schema_tables: tops = [f"{name} {info.get('size_readable', 'N/A')}" for name, info in top_schema_tables] out.append(" Top: " + "; ".join(tops)) # Schema Relationship Mapping schema_mapping = result.get("schema_relationship_mapping", {}) if schema_mapping: if "error" in schema_mapping: out.append(f"SchemaMapError: {schema_mapping['error']}") elif "analysis_text" in schema_mapping: out.append("SchemaMap:") out.append(schema_mapping["analysis_text"]) # Execution Metadata metadata = result.get("execution_metadata", {}) if metadata: out.append("Meta: " + self._format_execution_metadata(metadata)) return "\n".join(out)