Skip to main content
Glama
Cloud-Thinker-AI

Postgres MCP Pro Plus

get_database_overview

Analyze PostgreSQL database structure, performance metrics, and security configurations to identify optimization opportunities and potential issues.

Instructions

Get comprehensive database overview with performance and security analysis

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
max_tablesNoMaximum number of tables to analyze per schema
sampling_modeNoUse statistical sampling for large datasets
timeoutNoMaximum execution time in seconds

Implementation Reference

  • MCP tool registration and thin wrapper handler for get_database_overview that instantiates DatabaseOverviewTool and delegates to it.
    @mcp.tool(description="Get comprehensive database overview with performance and security analysis")
    async def get_database_overview(
        max_tables: int = Field(description="Maximum number of tables to analyze per schema", default=500),
        sampling_mode: bool = Field(description="Use statistical sampling for large datasets", default=True),
        timeout: int = Field(description="Maximum execution time in seconds", default=300),
    ) -> ResponseType:
        """Get comprehensive database overview including schemas, tables, relationships, performance metrics, and security analysis."""
        try:
            sql_driver = await get_sql_driver()
            overview_tool = DatabaseOverviewTool(sql_driver)
            result = await overview_tool.get_database_overview(max_tables, sampling_mode, timeout)
            return format_text_response(result)
        except Exception as e:
            logger.error(f"Error getting database overview: {e}")
            return format_error_response(str(e))
  • Main handler method in DatabaseOverviewTool class that handles timeout, error handling, and formatting for get_database_overview.
    async def get_database_overview(self, max_tables: int = 500, sampling_mode: bool = True, timeout: int = 300):
        """Get comprehensive database overview with performance and security analysis.
    
        Args:
            max_tables: Maximum number of tables to analyze per schema (default: 500)
            sampling_mode: Use statistical sampling for large datasets (default: True)
            timeout: Maximum execution time in seconds (default: 300)
        """
        start_time = time.time()
        try:
            # Add timeout wrapper
            result = await asyncio.wait_for(self._get_database_overview_internal(max_tables, sampling_mode, start_time), timeout=timeout)
            return self._format_as_text(result)
        except asyncio.TimeoutError:
            logger.warning(f"Database overview timed out after {timeout} seconds")
            error_result = {
                "error": f"Operation timed out after {timeout} seconds",
                "execution_metadata": {
                    "max_tables": max_tables,
                    "sampling_mode": sampling_mode,
                    "timeout": timeout,
                    "execution_time": time.time() - start_time,
                },
            }
            return self._format_as_text(error_result)
        except Exception as e:
            logger.error(f"Error generating database overview: {e!s}")
            error_result = {
                "error": str(e),
                "execution_metadata": {
                    "max_tables": max_tables,
                    "sampling_mode": sampling_mode,
                    "timeout": timeout,
                    "execution_time": time.time() - start_time,
                },
            }
            return self._format_as_text(error_result)
  • Core internal handler that orchestrates gathering of database summary, performance, security, relationships, and hotspots.
    async def _get_database_overview_internal(self, max_tables: int, sampling_mode: bool, start_time: float) -> dict[str, Any]:
        """Internal implementation of database overview."""
        try:
            db_info = {
                "schemas": {},
                "database_summary": {
                    "total_schemas": 0,
                    "total_tables": 0,
                    "total_size_bytes": 0,
                    "total_rows": 0,
                },
                "performance_overview": {},
                "security_overview": {},
                "relationships": {"foreign_keys": [], "relationship_summary": {}},
                "execution_metadata": {
                    "max_tables": max_tables,
                    "sampling_mode": sampling_mode,
                    "timeout": self.timeout_seconds,
                    "tables_analyzed": 0,
                    "tables_skipped": 0,
                },
            }
    
            # Get database-wide performance metrics
            await self._get_performance_metrics(db_info)
    
            # Get schema information
            user_schemas = await self._get_user_schemas()
            db_info["database_summary"]["total_schemas"] = len(user_schemas)
    
            # Track relationships and table stats
            all_relationships = []
            table_connections = {}
            all_tables_with_stats = []
    
            # Process each schema with limits
            for schema in user_schemas:
                logger.info(f"Processing schema: {schema}")
                schema_info = await self._process_schema(
                    schema, all_relationships, table_connections, all_tables_with_stats, max_tables, sampling_mode
                )
                db_info["schemas"][schema] = schema_info
    
                # Update database totals
                db_info["database_summary"]["total_tables"] += schema_info["table_count"]
                db_info["database_summary"]["total_size_bytes"] += schema_info["total_size_bytes"]
                db_info["database_summary"]["total_rows"] += schema_info["total_rows"]
    
                # Update metadata
                db_info["execution_metadata"]["tables_analyzed"] += schema_info.get("tables_analyzed", 0)
                db_info["execution_metadata"]["tables_skipped"] += schema_info.get("tables_skipped", 0)
    
            # Add human-readable database size
            total_size_gb = db_info["database_summary"]["total_size_bytes"] / (1024**3)
            db_info["database_summary"]["total_size_readable"] = f"{total_size_gb:.2f} GB"
    
            # Add top tables summary
            if all_tables_with_stats:
                await self._add_top_tables_summary(db_info, all_tables_with_stats)
    
            # Add security overview
            await self._get_security_overview(db_info)
    
            # Build relationship summary
            await self._build_relationship_summary(db_info, all_relationships, table_connections, user_schemas)
    
            # Add schema relationship mapping
            await self._add_schema_relationship_mapping(db_info, user_schemas)
    
            # Add performance hotspot identification
            await self._identify_performance_hotspots(db_info, all_tables_with_stats)
    
            # Add execution timing
            execution_time = time.time() - start_time
            db_info["execution_metadata"]["execution_time"] = round(execution_time, 2)
            logger.info(
                f"Database overview complete: {db_info['database_summary']['total_tables']} tables "
                f"across {len(user_schemas)} schemas, {len(all_relationships)} relationships "
                f"in {execution_time:.2f}s"
            )
            return db_info
    
        except Exception as e:
            logger.error(f"Error generating database overview: {e!s}")
            return {"error": str(e)}
  • Input schema definition via Pydantic Field for the tool parameters.
    async def get_database_overview(
        max_tables: int = Field(description="Maximum number of tables to analyze per schema", default=500),
        sampling_mode: bool = Field(description="Use statistical sampling for large datasets", default=True),
        timeout: int = Field(description="Maximum execution time in seconds", default=300),
    ) -> ResponseType:
  • Helper method to format the database overview result as compact text output.
    def _format_as_text(self, result: dict[str, Any]) -> str:
        """Format database overview result as compact text (no emojis, minimal headers)."""
        if "error" in result:
            return f"Error: {result['error']}\nMeta: {self._format_execution_metadata(result.get('execution_metadata', {}))}"
    
        out: list[str] = []
    
        # Database summary (single line)
        db_summary = result.get("database_summary", {})
        out.append(
            "DB: "
            f"schemas={db_summary.get('total_schemas', 0)} "
            f"tables={db_summary.get('total_tables', 0)} "
            f"size={db_summary.get('total_size_readable', 'N/A')} "
            f"rows={db_summary.get('total_rows', 0)}"
        )
    
        # Performance Overview
        perf_overview = result.get("performance_overview", {})
        if perf_overview:
            out.append(
                "Perf: "
                f"active={perf_overview.get('active_connections', 0)} "
                f"total={perf_overview.get('total_connections', 0)} "
                f"max={perf_overview.get('max_connections', 0)} "
                f"usage={perf_overview.get('connection_usage_percent', 0)}%"
            )
            top_tables = perf_overview.get("top_tables", {})
            if top_tables.get("largest"):
                largest = [f"{t['schema']}.{t['table']} {t['size_readable']}" for t in top_tables["largest"][:3]]
                out.append("Largest: " + "; ".join(largest))
            if top_tables.get("most_active"):
                active = [f"{t['schema']}.{t['table']} scans={t['total_scans']}" for t in top_tables["most_active"][:3]]
                out.append("MostActive: " + "; ".join(active))
    
        # Security Overview
        security_overview = result.get("security_overview", {})
        if security_overview:
            out.append(
                "Security: "
                f"score={security_overview.get('security_score', 0)}/100 "
                f"users={security_overview.get('total_users', 0)} "
                f"su={security_overview.get('superusers', 0)} "
                f"unlim_conn={security_overview.get('unlimited_connections', 0)}"
            )
            security_issues = security_overview.get("security_issues", [])
            if security_issues:
                out.append("SecIssues: " + ", ".join(security_issues))
            recommendations = security_overview.get("recommendations", [])
            if recommendations:
                out.append("SecRecs: " + ", ".join(recommendations))
    
        # Performance Hotspots
        hotspots = result.get("performance_hotspots", {})
        if hotspots and "error" not in hotspots:
            summary = hotspots.get("summary", {})
            out.append(
                f"Hotspots: total={summary.get('total_hotspots', 0)} crit={summary.get('critical_issues', 0)} warn={summary.get('warning_issues', 0)}"
            )
            if hotspots.get("high_scan_ratio_tables"):
                items = [
                    f"{t['qualified_name']} r={t['seq_scan_ratio']}% sc={t['total_scans']} sz={t['size_mb']}MB sev={'H' if t['severity'] == 'HIGH' else 'M'}"
                    for t in hotspots["high_scan_ratio_tables"][:5]
                ]
                out.append("HighSeqScan: " + "; ".join(items))
            if hotspots.get("high_dead_tuple_tables"):
                items = [
                    f"{t['qualified_name']} dead={t['dead_tuple_ratio']}% sz={t['size_mb']}MB sev={'H' if t['severity'] == 'HIGH' else 'M'}"
                    for t in hotspots["high_dead_tuple_tables"][:5]
                ]
                out.append("HighDeadTuples: " + "; ".join(items))
            if hotspots.get("large_tables_with_issues"):
                items = [
                    f"{t['qualified_name']} sz={t['size_mb']}MB issues=[{', '.join(t.get('issues', []))}] sev={'H' if t['severity'] == 'HIGH' else 'M'}"
                    for t in hotspots["large_tables_with_issues"][:5]
                ]
                out.append("LargeWithIssues: " + "; ".join(items))
            if hotspots.get("high_modification_tables"):
                items = [f"{t['qualified_name']} mods={t['total_modifications']}" for t in hotspots["high_modification_tables"][:5]]
                out.append("HighMod: " + "; ".join(items))
            if hotspots.get("tables_needing_maintenance"):
                items = [
                    f"{t['qualified_name']} rec=[{', '.join(t.get('recommendations', []))}] prio={t.get('priority', 'MEDIUM')}"
                    for t in hotspots["tables_needing_maintenance"][:5]
                ]
                out.append("Maintenance: " + "; ".join(items))
    
        # Relationships Summary
        relationships = result.get("relationships", {})
        if relationships:
            rel_summary = relationships.get("relationship_summary", {})
            out.append(
                "Rel: "
                f"total={rel_summary.get('total_relationships', 0)} "
                f"connected={rel_summary.get('connected_tables', 0)} "
                f"isolated={rel_summary.get('isolated_tables', 0)}"
            )
            most_connected = rel_summary.get("most_connected_tables", [])
            if most_connected:
                out.append("MostConnected: " + "; ".join([f"{t['table']}({t['connections']})" for t in most_connected[:5]]))
            hub_tables = rel_summary.get("hub_tables", [])
            if hub_tables:
                out.append("Hubs: " + "; ".join([f"{t['table']}({t['referenced_by']})" for t in hub_tables[:5]]))
            insights = rel_summary.get("relationship_insights", [])
            if insights:
                out.append("RelInsights: " + "; ".join(insights))
    
        # Schema Details
        schemas = result.get("schemas", {})
        if schemas:
            for schema_name, schema_info in schemas.items():
                line = (
                    f"Schema {schema_name}: "
                    f"tables={schema_info.get('table_count', 0)} "
                    f"size={self._format_bytes(schema_info.get('total_size_bytes', 0))} "
                    f"rows={schema_info.get('total_rows', 0)}"
                )
                if schema_info.get("is_sampled"):
                    line += f" sampled={schema_info.get('tables_analyzed', 0)}/{schema_info.get('table_count', 0)}"
                out.append(line)
    
                tables = schema_info.get("tables", {})
                if tables:
                    top_schema_tables = sorted(
                        [(name, info) for name, info in tables.items() if "size_bytes" in info],
                        key=lambda x: x[1]["size_bytes"],
                        reverse=True,
                    )[:3]
                    if top_schema_tables:
                        tops = [f"{name} {info.get('size_readable', 'N/A')}" for name, info in top_schema_tables]
                        out.append("  Top: " + "; ".join(tops))
    
        # Schema Relationship Mapping
        schema_mapping = result.get("schema_relationship_mapping", {})
        if schema_mapping:
            if "error" in schema_mapping:
                out.append(f"SchemaMapError: {schema_mapping['error']}")
            elif "analysis_text" in schema_mapping:
                out.append("SchemaMap:")
                out.append(schema_mapping["analysis_text"])
    
        # Execution Metadata
        metadata = result.get("execution_metadata", {})
        if metadata:
            out.append("Meta: " + self._format_execution_metadata(metadata))
    
        return "\n".join(out)

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/Cloud-Thinker-AI/postgres-mcp-pro-plus'

If you have feedback or need assistance with the MCP directory API, please join our Discord server