Skip to main content
Glama
Cloud-Thinker-AI

Postgres MCP Pro Plus

get_object_details

Retrieve detailed metadata about PostgreSQL database objects like tables, views, sequences, or extensions to understand structure and properties.

Instructions

Show detailed information about a database object

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
schema_nameYesSchema name
object_nameYesObject name
object_typeNoObject type: 'table', 'view', 'sequence', or 'extension'table

Implementation Reference

  • The handler function for the 'get_object_details' tool. It queries the database for detailed information on tables, views, sequences, or extensions based on the provided schema, object name, and type. Collects columns, constraints, indexes, etc., and formats the response.
    async def get_object_details(
        schema_name: str = Field(description="Schema name"),
        object_name: str = Field(description="Object name"),
        object_type: str = Field(description="Object type: 'table', 'view', 'sequence', or 'extension'", default="table"),
    ) -> ResponseType:
        """Get detailed information about a database object."""
        try:
            sql_driver = await get_sql_driver()
    
            if object_type in ("table", "view"):
                # Get columns
                col_rows = await SafeSqlDriver.execute_param_query(
                    sql_driver,
                    """
                    SELECT column_name, data_type, is_nullable, column_default
                    FROM information_schema.columns
                    WHERE table_schema = {} AND table_name = {}
                    ORDER BY ordinal_position
                    """,
                    [schema_name, object_name],
                )
                columns = (
                    [
                        {
                            "column": r.cells["column_name"],
                            "data_type": r.cells["data_type"],
                            "is_nullable": r.cells["is_nullable"],
                            "default": r.cells["column_default"],
                        }
                        for r in col_rows
                    ]
                    if col_rows
                    else []
                )
    
                # Get constraints
                con_rows = await SafeSqlDriver.execute_param_query(
                    sql_driver,
                    """
                    SELECT tc.constraint_name, tc.constraint_type, kcu.column_name
                    FROM information_schema.table_constraints AS tc
                    LEFT JOIN information_schema.key_column_usage AS kcu
                      ON tc.constraint_name = kcu.constraint_name
                     AND tc.table_schema = kcu.table_schema
                    WHERE tc.table_schema = {} AND tc.table_name = {}
                    """,
                    [schema_name, object_name],
                )
    
                constraints = {}
                if con_rows:
                    for row in con_rows:
                        cname = row.cells["constraint_name"]
                        ctype = row.cells["constraint_type"]
                        col = row.cells["column_name"]
    
                        if cname not in constraints:
                            constraints[cname] = {"type": ctype, "columns": []}
                        if col:
                            constraints[cname]["columns"].append(col)
    
                constraints_list = [{"name": name, **data} for name, data in constraints.items()]
    
                # Get indexes
                idx_rows = await SafeSqlDriver.execute_param_query(
                    sql_driver,
                    """
                    SELECT indexname, indexdef
                    FROM pg_indexes
                    WHERE schemaname = {} AND tablename = {}
                    """,
                    [schema_name, object_name],
                )
    
                indexes = [{"name": r.cells["indexname"], "definition": r.cells["indexdef"]} for r in idx_rows] if idx_rows else []
    
                result = {
                    "basic": {"schema": schema_name, "name": object_name, "type": object_type},
                    "columns": columns,
                    "constraints": constraints_list,
                    "indexes": indexes,
                }
    
            elif object_type == "sequence":
                rows = await SafeSqlDriver.execute_param_query(
                    sql_driver,
                    """
                    SELECT sequence_schema, sequence_name, data_type, start_value, increment
                    FROM information_schema.sequences
                    WHERE sequence_schema = {} AND sequence_name = {}
                    """,
                    [schema_name, object_name],
                )
    
                if rows and rows[0]:
                    row = rows[0]
                    result = {
                        "schema": row.cells["sequence_schema"],
                        "name": row.cells["sequence_name"],
                        "data_type": row.cells["data_type"],
                        "start_value": row.cells["start_value"],
                        "increment": row.cells["increment"],
                    }
                else:
                    result = {}
    
            elif object_type == "extension":
                rows = await SafeSqlDriver.execute_param_query(
                    sql_driver,
                    """
                    SELECT extname, extversion, extrelocatable
                    FROM pg_extension
                    WHERE extname = {}
                    """,
                    [object_name],
                )
    
                if rows and rows[0]:
                    row = rows[0]
                    result = {
                        "name": row.cells["extname"],
                        "version": row.cells["extversion"],
                        "relocatable": row.cells["extrelocatable"],
                    }
                else:
                    result = {}
    
            else:
                return format_error_response(f"Unsupported object type: {object_type}")
    
            return format_text_response(format_object_details_as_text(result, object_type))
        except Exception as e:
            logger.error(f"Error getting object details: {e}")
            return format_error_response(str(e))
  • The @mcp.tool decorator registers the get_object_details function as an MCP tool.
    @mcp.tool(description="Show detailed information about a database object")
  • Pydantic-based input schema definition using Field for parameters: schema_name, object_name, object_type.
    async def get_object_details(
        schema_name: str = Field(description="Schema name"),
        object_name: str = Field(description="Object name"),
        object_type: str = Field(description="Object type: 'table', 'view', 'sequence', or 'extension'", default="table"),
    ) -> ResponseType:
  • Helper function to format the retrieved object details into a compact text string for the response.
    def format_object_details_as_text(details: dict, object_type: str) -> str:
        """Format object details compactly without emojis, preserving content."""
        if not details:
            return f"No details found for {object_type}."
    
        output = []
    
        if object_type in ["table", "view"]:
            basic = details.get("basic", {})
            output.append(f"{object_type.capitalize()}: {basic.get('schema', 'N/A')}.{basic.get('name', 'N/A')} type={basic.get('type', 'N/A')}")
    
            # Columns
            columns = details.get("columns", [])
            if columns:
                parts = []
                for col in columns:
                    nullable = "NULL" if col.get("is_nullable") == "YES" else "NOTNULL"
                    default = col.get("default")
                    piece = f"{col['column']} {col['data_type']} {nullable}"
                    if default:
                        piece += f" def={default}"
                    parts.append(piece)
                output.append(f"Columns({len(columns)}): " + "; ".join(parts))
    
            # Constraints
            constraints = details.get("constraints", [])
            if constraints:
                items = []
                for constraint in constraints:
                    columns_str = ",".join(constraint.get("columns", []))
                    items.append(f"{constraint['name']}({constraint['type']}) on[{columns_str}]")
                output.append(f"Constraints({len(constraints)}): " + "; ".join(items))
    
            # Indexes
            indexes = details.get("indexes", [])
            if indexes:
                items = [f"{idx['name']} def={idx['definition']}" for idx in indexes]
                output.append(f"Indexes({len(indexes)}): " + "; ".join(items))
    
        elif object_type == "sequence":
            output.append(
                "Sequence: "
                f"{details.get('schema', 'N/A')}.{details.get('name', 'N/A')} "
                f"type={details.get('data_type', 'N/A')} start={details.get('start_value', 'N/A')} inc={details.get('increment', 'N/A')}"
            )
    
        elif object_type == "extension":
            output.append(f"Extension: name={details.get('name', 'N/A')} v={details.get('version', 'N/A')} reloc={details.get('relocatable', 'N/A')}")
    
        return "\n".join(output)

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/Cloud-Thinker-AI/postgres-mcp-pro-plus'

If you have feedback or need assistance with the MCP directory API, please join our Discord server