Skip to main content
Glama

get_database_overview

Analyze PostgreSQL database structure, performance metrics, and security configurations to identify optimization opportunities and potential issues.

Instructions

Get comprehensive database overview with performance and security analysis

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
max_tablesNoMaximum number of tables to analyze per schema
sampling_modeNoUse statistical sampling for large datasets
timeoutNoMaximum execution time in seconds

Implementation Reference

  • MCP tool registration and thin wrapper handler for get_database_overview that instantiates DatabaseOverviewTool and delegates to it.
    @mcp.tool(description="Get comprehensive database overview with performance and security analysis") async def get_database_overview( max_tables: int = Field(description="Maximum number of tables to analyze per schema", default=500), sampling_mode: bool = Field(description="Use statistical sampling for large datasets", default=True), timeout: int = Field(description="Maximum execution time in seconds", default=300), ) -> ResponseType: """Get comprehensive database overview including schemas, tables, relationships, performance metrics, and security analysis.""" try: sql_driver = await get_sql_driver() overview_tool = DatabaseOverviewTool(sql_driver) result = await overview_tool.get_database_overview(max_tables, sampling_mode, timeout) return format_text_response(result) except Exception as e: logger.error(f"Error getting database overview: {e}") return format_error_response(str(e))
  • Main handler method in DatabaseOverviewTool class that handles timeout, error handling, and formatting for get_database_overview.
    async def get_database_overview(self, max_tables: int = 500, sampling_mode: bool = True, timeout: int = 300): """Get comprehensive database overview with performance and security analysis. Args: max_tables: Maximum number of tables to analyze per schema (default: 500) sampling_mode: Use statistical sampling for large datasets (default: True) timeout: Maximum execution time in seconds (default: 300) """ start_time = time.time() try: # Add timeout wrapper result = await asyncio.wait_for(self._get_database_overview_internal(max_tables, sampling_mode, start_time), timeout=timeout) return self._format_as_text(result) except asyncio.TimeoutError: logger.warning(f"Database overview timed out after {timeout} seconds") error_result = { "error": f"Operation timed out after {timeout} seconds", "execution_metadata": { "max_tables": max_tables, "sampling_mode": sampling_mode, "timeout": timeout, "execution_time": time.time() - start_time, }, } return self._format_as_text(error_result) except Exception as e: logger.error(f"Error generating database overview: {e!s}") error_result = { "error": str(e), "execution_metadata": { "max_tables": max_tables, "sampling_mode": sampling_mode, "timeout": timeout, "execution_time": time.time() - start_time, }, } return self._format_as_text(error_result)
  • Core internal handler that orchestrates gathering of database summary, performance, security, relationships, and hotspots.
    async def _get_database_overview_internal(self, max_tables: int, sampling_mode: bool, start_time: float) -> dict[str, Any]: """Internal implementation of database overview.""" try: db_info = { "schemas": {}, "database_summary": { "total_schemas": 0, "total_tables": 0, "total_size_bytes": 0, "total_rows": 0, }, "performance_overview": {}, "security_overview": {}, "relationships": {"foreign_keys": [], "relationship_summary": {}}, "execution_metadata": { "max_tables": max_tables, "sampling_mode": sampling_mode, "timeout": self.timeout_seconds, "tables_analyzed": 0, "tables_skipped": 0, }, } # Get database-wide performance metrics await self._get_performance_metrics(db_info) # Get schema information user_schemas = await self._get_user_schemas() db_info["database_summary"]["total_schemas"] = len(user_schemas) # Track relationships and table stats all_relationships = [] table_connections = {} all_tables_with_stats = [] # Process each schema with limits for schema in user_schemas: logger.info(f"Processing schema: {schema}") schema_info = await self._process_schema( schema, all_relationships, table_connections, all_tables_with_stats, max_tables, sampling_mode ) db_info["schemas"][schema] = schema_info # Update database totals db_info["database_summary"]["total_tables"] += schema_info["table_count"] db_info["database_summary"]["total_size_bytes"] += schema_info["total_size_bytes"] db_info["database_summary"]["total_rows"] += schema_info["total_rows"] # Update metadata db_info["execution_metadata"]["tables_analyzed"] += schema_info.get("tables_analyzed", 0) db_info["execution_metadata"]["tables_skipped"] += schema_info.get("tables_skipped", 0) # Add human-readable database size total_size_gb = db_info["database_summary"]["total_size_bytes"] / (1024**3) db_info["database_summary"]["total_size_readable"] = f"{total_size_gb:.2f} GB" # Add top tables summary if all_tables_with_stats: await self._add_top_tables_summary(db_info, all_tables_with_stats) # Add security overview await self._get_security_overview(db_info) # Build relationship summary await self._build_relationship_summary(db_info, all_relationships, table_connections, user_schemas) # Add schema relationship mapping await self._add_schema_relationship_mapping(db_info, user_schemas) # Add performance hotspot identification await self._identify_performance_hotspots(db_info, all_tables_with_stats) # Add execution timing execution_time = time.time() - start_time db_info["execution_metadata"]["execution_time"] = round(execution_time, 2) logger.info( f"Database overview complete: {db_info['database_summary']['total_tables']} tables " f"across {len(user_schemas)} schemas, {len(all_relationships)} relationships " f"in {execution_time:.2f}s" ) return db_info except Exception as e: logger.error(f"Error generating database overview: {e!s}") return {"error": str(e)}
  • Input schema definition via Pydantic Field for the tool parameters.
    async def get_database_overview( max_tables: int = Field(description="Maximum number of tables to analyze per schema", default=500), sampling_mode: bool = Field(description="Use statistical sampling for large datasets", default=True), timeout: int = Field(description="Maximum execution time in seconds", default=300), ) -> ResponseType:
  • Helper method to format the database overview result as compact text output.
    def _format_as_text(self, result: dict[str, Any]) -> str: """Format database overview result as compact text (no emojis, minimal headers).""" if "error" in result: return f"Error: {result['error']}\nMeta: {self._format_execution_metadata(result.get('execution_metadata', {}))}" out: list[str] = [] # Database summary (single line) db_summary = result.get("database_summary", {}) out.append( "DB: " f"schemas={db_summary.get('total_schemas', 0)} " f"tables={db_summary.get('total_tables', 0)} " f"size={db_summary.get('total_size_readable', 'N/A')} " f"rows={db_summary.get('total_rows', 0)}" ) # Performance Overview perf_overview = result.get("performance_overview", {}) if perf_overview: out.append( "Perf: " f"active={perf_overview.get('active_connections', 0)} " f"total={perf_overview.get('total_connections', 0)} " f"max={perf_overview.get('max_connections', 0)} " f"usage={perf_overview.get('connection_usage_percent', 0)}%" ) top_tables = perf_overview.get("top_tables", {}) if top_tables.get("largest"): largest = [f"{t['schema']}.{t['table']} {t['size_readable']}" for t in top_tables["largest"][:3]] out.append("Largest: " + "; ".join(largest)) if top_tables.get("most_active"): active = [f"{t['schema']}.{t['table']} scans={t['total_scans']}" for t in top_tables["most_active"][:3]] out.append("MostActive: " + "; ".join(active)) # Security Overview security_overview = result.get("security_overview", {}) if security_overview: out.append( "Security: " f"score={security_overview.get('security_score', 0)}/100 " f"users={security_overview.get('total_users', 0)} " f"su={security_overview.get('superusers', 0)} " f"unlim_conn={security_overview.get('unlimited_connections', 0)}" ) security_issues = security_overview.get("security_issues", []) if security_issues: out.append("SecIssues: " + ", ".join(security_issues)) recommendations = security_overview.get("recommendations", []) if recommendations: out.append("SecRecs: " + ", ".join(recommendations)) # Performance Hotspots hotspots = result.get("performance_hotspots", {}) if hotspots and "error" not in hotspots: summary = hotspots.get("summary", {}) out.append( f"Hotspots: total={summary.get('total_hotspots', 0)} crit={summary.get('critical_issues', 0)} warn={summary.get('warning_issues', 0)}" ) if hotspots.get("high_scan_ratio_tables"): items = [ f"{t['qualified_name']} r={t['seq_scan_ratio']}% sc={t['total_scans']} sz={t['size_mb']}MB sev={'H' if t['severity'] == 'HIGH' else 'M'}" for t in hotspots["high_scan_ratio_tables"][:5] ] out.append("HighSeqScan: " + "; ".join(items)) if hotspots.get("high_dead_tuple_tables"): items = [ f"{t['qualified_name']} dead={t['dead_tuple_ratio']}% sz={t['size_mb']}MB sev={'H' if t['severity'] == 'HIGH' else 'M'}" for t in hotspots["high_dead_tuple_tables"][:5] ] out.append("HighDeadTuples: " + "; ".join(items)) if hotspots.get("large_tables_with_issues"): items = [ f"{t['qualified_name']} sz={t['size_mb']}MB issues=[{', '.join(t.get('issues', []))}] sev={'H' if t['severity'] == 'HIGH' else 'M'}" for t in hotspots["large_tables_with_issues"][:5] ] out.append("LargeWithIssues: " + "; ".join(items)) if hotspots.get("high_modification_tables"): items = [f"{t['qualified_name']} mods={t['total_modifications']}" for t in hotspots["high_modification_tables"][:5]] out.append("HighMod: " + "; ".join(items)) if hotspots.get("tables_needing_maintenance"): items = [ f"{t['qualified_name']} rec=[{', '.join(t.get('recommendations', []))}] prio={t.get('priority', 'MEDIUM')}" for t in hotspots["tables_needing_maintenance"][:5] ] out.append("Maintenance: " + "; ".join(items)) # Relationships Summary relationships = result.get("relationships", {}) if relationships: rel_summary = relationships.get("relationship_summary", {}) out.append( "Rel: " f"total={rel_summary.get('total_relationships', 0)} " f"connected={rel_summary.get('connected_tables', 0)} " f"isolated={rel_summary.get('isolated_tables', 0)}" ) most_connected = rel_summary.get("most_connected_tables", []) if most_connected: out.append("MostConnected: " + "; ".join([f"{t['table']}({t['connections']})" for t in most_connected[:5]])) hub_tables = rel_summary.get("hub_tables", []) if hub_tables: out.append("Hubs: " + "; ".join([f"{t['table']}({t['referenced_by']})" for t in hub_tables[:5]])) insights = rel_summary.get("relationship_insights", []) if insights: out.append("RelInsights: " + "; ".join(insights)) # Schema Details schemas = result.get("schemas", {}) if schemas: for schema_name, schema_info in schemas.items(): line = ( f"Schema {schema_name}: " f"tables={schema_info.get('table_count', 0)} " f"size={self._format_bytes(schema_info.get('total_size_bytes', 0))} " f"rows={schema_info.get('total_rows', 0)}" ) if schema_info.get("is_sampled"): line += f" sampled={schema_info.get('tables_analyzed', 0)}/{schema_info.get('table_count', 0)}" out.append(line) tables = schema_info.get("tables", {}) if tables: top_schema_tables = sorted( [(name, info) for name, info in tables.items() if "size_bytes" in info], key=lambda x: x[1]["size_bytes"], reverse=True, )[:3] if top_schema_tables: tops = [f"{name} {info.get('size_readable', 'N/A')}" for name, info in top_schema_tables] out.append(" Top: " + "; ".join(tops)) # Schema Relationship Mapping schema_mapping = result.get("schema_relationship_mapping", {}) if schema_mapping: if "error" in schema_mapping: out.append(f"SchemaMapError: {schema_mapping['error']}") elif "analysis_text" in schema_mapping: out.append("SchemaMap:") out.append(schema_mapping["analysis_text"]) # Execution Metadata metadata = result.get("execution_metadata", {}) if metadata: out.append("Meta: " + self._format_execution_metadata(metadata)) return "\n".join(out)

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/Cloud-Thinker-AI/postgres-mcp-pro-plus'

If you have feedback or need assistance with the MCP directory API, please join our Discord server