Skip to main content
Glama
cloudthinker-ai

Postgres MCP Pro Plus

get_database_overview

Analyze PostgreSQL database structure, performance metrics, and security configurations to identify optimization opportunities and potential issues.

Instructions

Get comprehensive database overview with performance and security analysis

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
max_tablesNoMaximum number of tables to analyze per schema
sampling_modeNoUse statistical sampling for large datasets
timeoutNoMaximum execution time in seconds

Implementation Reference

  • MCP tool registration and thin wrapper handler for get_database_overview that instantiates DatabaseOverviewTool and delegates to it.
    @mcp.tool(description="Get comprehensive database overview with performance and security analysis")
    async def get_database_overview(
        max_tables: int = Field(description="Maximum number of tables to analyze per schema", default=500),
        sampling_mode: bool = Field(description="Use statistical sampling for large datasets", default=True),
        timeout: int = Field(description="Maximum execution time in seconds", default=300),
    ) -> ResponseType:
        """Get comprehensive database overview including schemas, tables, relationships, performance metrics, and security analysis."""
        try:
            sql_driver = await get_sql_driver()
            overview_tool = DatabaseOverviewTool(sql_driver)
            result = await overview_tool.get_database_overview(max_tables, sampling_mode, timeout)
            return format_text_response(result)
        except Exception as e:
            logger.error(f"Error getting database overview: {e}")
            return format_error_response(str(e))
  • Main handler method in DatabaseOverviewTool class that handles timeout, error handling, and formatting for get_database_overview.
    async def get_database_overview(self, max_tables: int = 500, sampling_mode: bool = True, timeout: int = 300):
        """Get comprehensive database overview with performance and security analysis.
    
        Args:
            max_tables: Maximum number of tables to analyze per schema (default: 500)
            sampling_mode: Use statistical sampling for large datasets (default: True)
            timeout: Maximum execution time in seconds (default: 300)
        """
        start_time = time.time()
        try:
            # Add timeout wrapper
            result = await asyncio.wait_for(self._get_database_overview_internal(max_tables, sampling_mode, start_time), timeout=timeout)
            return self._format_as_text(result)
        except asyncio.TimeoutError:
            logger.warning(f"Database overview timed out after {timeout} seconds")
            error_result = {
                "error": f"Operation timed out after {timeout} seconds",
                "execution_metadata": {
                    "max_tables": max_tables,
                    "sampling_mode": sampling_mode,
                    "timeout": timeout,
                    "execution_time": time.time() - start_time,
                },
            }
            return self._format_as_text(error_result)
        except Exception as e:
            logger.error(f"Error generating database overview: {e!s}")
            error_result = {
                "error": str(e),
                "execution_metadata": {
                    "max_tables": max_tables,
                    "sampling_mode": sampling_mode,
                    "timeout": timeout,
                    "execution_time": time.time() - start_time,
                },
            }
            return self._format_as_text(error_result)
  • Core internal handler that orchestrates gathering of database summary, performance, security, relationships, and hotspots.
    async def _get_database_overview_internal(self, max_tables: int, sampling_mode: bool, start_time: float) -> dict[str, Any]:
        """Internal implementation of database overview."""
        try:
            db_info = {
                "schemas": {},
                "database_summary": {
                    "total_schemas": 0,
                    "total_tables": 0,
                    "total_size_bytes": 0,
                    "total_rows": 0,
                },
                "performance_overview": {},
                "security_overview": {},
                "relationships": {"foreign_keys": [], "relationship_summary": {}},
                "execution_metadata": {
                    "max_tables": max_tables,
                    "sampling_mode": sampling_mode,
                    "timeout": self.timeout_seconds,
                    "tables_analyzed": 0,
                    "tables_skipped": 0,
                },
            }
    
            # Get database-wide performance metrics
            await self._get_performance_metrics(db_info)
    
            # Get schema information
            user_schemas = await self._get_user_schemas()
            db_info["database_summary"]["total_schemas"] = len(user_schemas)
    
            # Track relationships and table stats
            all_relationships = []
            table_connections = {}
            all_tables_with_stats = []
    
            # Process each schema with limits
            for schema in user_schemas:
                logger.info(f"Processing schema: {schema}")
                schema_info = await self._process_schema(
                    schema, all_relationships, table_connections, all_tables_with_stats, max_tables, sampling_mode
                )
                db_info["schemas"][schema] = schema_info
    
                # Update database totals
                db_info["database_summary"]["total_tables"] += schema_info["table_count"]
                db_info["database_summary"]["total_size_bytes"] += schema_info["total_size_bytes"]
                db_info["database_summary"]["total_rows"] += schema_info["total_rows"]
    
                # Update metadata
                db_info["execution_metadata"]["tables_analyzed"] += schema_info.get("tables_analyzed", 0)
                db_info["execution_metadata"]["tables_skipped"] += schema_info.get("tables_skipped", 0)
    
            # Add human-readable database size
            total_size_gb = db_info["database_summary"]["total_size_bytes"] / (1024**3)
            db_info["database_summary"]["total_size_readable"] = f"{total_size_gb:.2f} GB"
    
            # Add top tables summary
            if all_tables_with_stats:
                await self._add_top_tables_summary(db_info, all_tables_with_stats)
    
            # Add security overview
            await self._get_security_overview(db_info)
    
            # Build relationship summary
            await self._build_relationship_summary(db_info, all_relationships, table_connections, user_schemas)
    
            # Add schema relationship mapping
            await self._add_schema_relationship_mapping(db_info, user_schemas)
    
            # Add performance hotspot identification
            await self._identify_performance_hotspots(db_info, all_tables_with_stats)
    
            # Add execution timing
            execution_time = time.time() - start_time
            db_info["execution_metadata"]["execution_time"] = round(execution_time, 2)
            logger.info(
                f"Database overview complete: {db_info['database_summary']['total_tables']} tables "
                f"across {len(user_schemas)} schemas, {len(all_relationships)} relationships "
                f"in {execution_time:.2f}s"
            )
            return db_info
    
        except Exception as e:
            logger.error(f"Error generating database overview: {e!s}")
            return {"error": str(e)}
  • Input schema definition via Pydantic Field for the tool parameters.
    async def get_database_overview(
        max_tables: int = Field(description="Maximum number of tables to analyze per schema", default=500),
        sampling_mode: bool = Field(description="Use statistical sampling for large datasets", default=True),
        timeout: int = Field(description="Maximum execution time in seconds", default=300),
    ) -> ResponseType:
  • Helper method to format the database overview result as compact text output.
    def _format_as_text(self, result: dict[str, Any]) -> str:
        """Format database overview result as compact text (no emojis, minimal headers)."""
        if "error" in result:
            return f"Error: {result['error']}\nMeta: {self._format_execution_metadata(result.get('execution_metadata', {}))}"
    
        out: list[str] = []
    
        # Database summary (single line)
        db_summary = result.get("database_summary", {})
        out.append(
            "DB: "
            f"schemas={db_summary.get('total_schemas', 0)} "
            f"tables={db_summary.get('total_tables', 0)} "
            f"size={db_summary.get('total_size_readable', 'N/A')} "
            f"rows={db_summary.get('total_rows', 0)}"
        )
    
        # Performance Overview
        perf_overview = result.get("performance_overview", {})
        if perf_overview:
            out.append(
                "Perf: "
                f"active={perf_overview.get('active_connections', 0)} "
                f"total={perf_overview.get('total_connections', 0)} "
                f"max={perf_overview.get('max_connections', 0)} "
                f"usage={perf_overview.get('connection_usage_percent', 0)}%"
            )
            top_tables = perf_overview.get("top_tables", {})
            if top_tables.get("largest"):
                largest = [f"{t['schema']}.{t['table']} {t['size_readable']}" for t in top_tables["largest"][:3]]
                out.append("Largest: " + "; ".join(largest))
            if top_tables.get("most_active"):
                active = [f"{t['schema']}.{t['table']} scans={t['total_scans']}" for t in top_tables["most_active"][:3]]
                out.append("MostActive: " + "; ".join(active))
    
        # Security Overview
        security_overview = result.get("security_overview", {})
        if security_overview:
            out.append(
                "Security: "
                f"score={security_overview.get('security_score', 0)}/100 "
                f"users={security_overview.get('total_users', 0)} "
                f"su={security_overview.get('superusers', 0)} "
                f"unlim_conn={security_overview.get('unlimited_connections', 0)}"
            )
            security_issues = security_overview.get("security_issues", [])
            if security_issues:
                out.append("SecIssues: " + ", ".join(security_issues))
            recommendations = security_overview.get("recommendations", [])
            if recommendations:
                out.append("SecRecs: " + ", ".join(recommendations))
    
        # Performance Hotspots
        hotspots = result.get("performance_hotspots", {})
        if hotspots and "error" not in hotspots:
            summary = hotspots.get("summary", {})
            out.append(
                f"Hotspots: total={summary.get('total_hotspots', 0)} crit={summary.get('critical_issues', 0)} warn={summary.get('warning_issues', 0)}"
            )
            if hotspots.get("high_scan_ratio_tables"):
                items = [
                    f"{t['qualified_name']} r={t['seq_scan_ratio']}% sc={t['total_scans']} sz={t['size_mb']}MB sev={'H' if t['severity'] == 'HIGH' else 'M'}"
                    for t in hotspots["high_scan_ratio_tables"][:5]
                ]
                out.append("HighSeqScan: " + "; ".join(items))
            if hotspots.get("high_dead_tuple_tables"):
                items = [
                    f"{t['qualified_name']} dead={t['dead_tuple_ratio']}% sz={t['size_mb']}MB sev={'H' if t['severity'] == 'HIGH' else 'M'}"
                    for t in hotspots["high_dead_tuple_tables"][:5]
                ]
                out.append("HighDeadTuples: " + "; ".join(items))
            if hotspots.get("large_tables_with_issues"):
                items = [
                    f"{t['qualified_name']} sz={t['size_mb']}MB issues=[{', '.join(t.get('issues', []))}] sev={'H' if t['severity'] == 'HIGH' else 'M'}"
                    for t in hotspots["large_tables_with_issues"][:5]
                ]
                out.append("LargeWithIssues: " + "; ".join(items))
            if hotspots.get("high_modification_tables"):
                items = [f"{t['qualified_name']} mods={t['total_modifications']}" for t in hotspots["high_modification_tables"][:5]]
                out.append("HighMod: " + "; ".join(items))
            if hotspots.get("tables_needing_maintenance"):
                items = [
                    f"{t['qualified_name']} rec=[{', '.join(t.get('recommendations', []))}] prio={t.get('priority', 'MEDIUM')}"
                    for t in hotspots["tables_needing_maintenance"][:5]
                ]
                out.append("Maintenance: " + "; ".join(items))
    
        # Relationships Summary
        relationships = result.get("relationships", {})
        if relationships:
            rel_summary = relationships.get("relationship_summary", {})
            out.append(
                "Rel: "
                f"total={rel_summary.get('total_relationships', 0)} "
                f"connected={rel_summary.get('connected_tables', 0)} "
                f"isolated={rel_summary.get('isolated_tables', 0)}"
            )
            most_connected = rel_summary.get("most_connected_tables", [])
            if most_connected:
                out.append("MostConnected: " + "; ".join([f"{t['table']}({t['connections']})" for t in most_connected[:5]]))
            hub_tables = rel_summary.get("hub_tables", [])
            if hub_tables:
                out.append("Hubs: " + "; ".join([f"{t['table']}({t['referenced_by']})" for t in hub_tables[:5]]))
            insights = rel_summary.get("relationship_insights", [])
            if insights:
                out.append("RelInsights: " + "; ".join(insights))
    
        # Schema Details
        schemas = result.get("schemas", {})
        if schemas:
            for schema_name, schema_info in schemas.items():
                line = (
                    f"Schema {schema_name}: "
                    f"tables={schema_info.get('table_count', 0)} "
                    f"size={self._format_bytes(schema_info.get('total_size_bytes', 0))} "
                    f"rows={schema_info.get('total_rows', 0)}"
                )
                if schema_info.get("is_sampled"):
                    line += f" sampled={schema_info.get('tables_analyzed', 0)}/{schema_info.get('table_count', 0)}"
                out.append(line)
    
                tables = schema_info.get("tables", {})
                if tables:
                    top_schema_tables = sorted(
                        [(name, info) for name, info in tables.items() if "size_bytes" in info],
                        key=lambda x: x[1]["size_bytes"],
                        reverse=True,
                    )[:3]
                    if top_schema_tables:
                        tops = [f"{name} {info.get('size_readable', 'N/A')}" for name, info in top_schema_tables]
                        out.append("  Top: " + "; ".join(tops))
    
        # Schema Relationship Mapping
        schema_mapping = result.get("schema_relationship_mapping", {})
        if schema_mapping:
            if "error" in schema_mapping:
                out.append(f"SchemaMapError: {schema_mapping['error']}")
            elif "analysis_text" in schema_mapping:
                out.append("SchemaMap:")
                out.append(schema_mapping["analysis_text"])
    
        # Execution Metadata
        metadata = result.get("execution_metadata", {})
        if metadata:
            out.append("Meta: " + self._format_execution_metadata(metadata))
    
        return "\n".join(out)
Behavior2/5

Does the description disclose side effects, auth requirements, rate limits, or destructive behavior?

No annotations are provided, so the description carries the full burden of behavioral disclosure. While 'Get' implies a read operation, the description doesn't address critical behavioral aspects like whether this is a heavy operation (given performance analysis), whether it requires specific permissions, potential impact on database performance, or what the output format looks like. The mention of 'comprehensive' analysis hints at scope but lacks operational details.

Agents need to know what a tool does to the world before calling it. Descriptions should go beyond structured annotations to explain consequences.

Conciseness5/5

Is the description appropriately sized, front-loaded, and free of redundancy?

The description is a single, efficient sentence that clearly communicates the core purpose. Every word earns its place - 'comprehensive' sets scope, 'database overview' specifies the resource, and 'performance and security analysis' clarifies the analysis dimensions. There's no wasted verbiage or redundancy.

Shorter descriptions cost fewer tokens and are easier for agents to parse. Every sentence should earn its place.

Completeness2/5

Given the tool's complexity, does the description cover enough for an agent to succeed on first attempt?

For a tool that performs 'comprehensive database overview' with performance and security analysis, the description is insufficient. There's no output schema, and with no annotations, the description doesn't address what information is returned, how extensive the analysis is, whether this is a resource-intensive operation, or how it differs from similar analysis tools. The agent lacks critical context for proper tool invocation and result interpretation.

Complex tools with many parameters or behaviors need more documentation. Simple tools need less. This dimension scales expectations accordingly.

Parameters3/5

Does the description clarify parameter syntax, constraints, interactions, or defaults beyond what the schema provides?

The input schema has 100% description coverage, providing clear documentation for all three parameters (max_tables, sampling_mode, timeout). The description doesn't add any meaningful parameter semantics beyond what's already in the schema - it doesn't explain how these parameters affect the 'comprehensive overview' or their practical implications. This meets the baseline for high schema coverage.

Input schemas describe structure but not intent. Descriptions should explain non-obvious parameter relationships and valid value ranges.

Purpose4/5

Does the description clearly state what the tool does and how it differs from similar tools?

The description clearly states the tool's purpose: 'Get comprehensive database overview with performance and security analysis'. It specifies the verb ('Get') and resource ('database overview') with additional scope ('performance and security analysis'). However, it doesn't explicitly differentiate from sibling tools like 'analyze_db_health' or 'get_object_details', which prevents a perfect score.

Agents choose between tools based on descriptions. A clear purpose with a specific verb and resource helps agents select the right tool.

Usage Guidelines2/5

Does the description explain when to use this tool, when not to, or what alternatives exist?

The description provides no guidance on when to use this tool versus alternatives. With multiple sibling tools focused on database analysis (e.g., analyze_db_health, analyze_query_indexes), there's no indication of what makes this tool distinct or when it should be preferred over others. This leaves the agent without context for tool selection.

Agents often have multiple tools that could apply. Explicit usage guidance like "use X instead of Y when Z" prevents misuse.

Install Server

Other Tools

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/cloudthinker-ai/postgres-mcp-pro-plus'

If you have feedback or need assistance with the MCP directory API, please join our Discord server