list_schemas
List all visible schemas in a PostgreSQL database by providing a connection ID.
Instructions
List visible schemas for a PostgreSQL connection.
Input Schema
| Name | Required | Description | Default |
|---|---|---|---|
| connection_id | Yes |
Implementation Reference
- sql_query_mcp/app.py:36-40 (registration)The 'list_schemas' tool is registered as an MCP tool via the @mcp.tool() decorator in create_app(). It accepts a connection_id string and delegates to metadata.list_schemas(connection_id).
@mcp.tool() def list_schemas(connection_id: str) -> dict: """List visible schemas for a PostgreSQL connection.""" return _run_tool(lambda: metadata.list_schemas(connection_id)) - sql_query_mcp/introspection.py:28-64 (handler)The MetadataService.list_schemas method is the core handler. It retrieves the connection config, enforces that the engine is 'postgres', opens a connection, calls adapter.list_schemas(conn), and returns the results with audit logging.
def list_schemas(self, connection_id: str) -> Dict[str, object]: started = time.perf_counter() config = None try: config = self._registry.get_connection_config(connection_id) require_engine(config, "postgres", "list_schemas") with self._registry.connection_from_config(config) as (conn, adapter): _apply_statement_timeout( adapter, conn, self._settings.statement_timeout_ms ) schemas = adapter.list_schemas(conn) duration_ms = _elapsed_ms(started) self._audit.log( tool="list_schemas", connection_id=connection_id, success=True, duration_ms=duration_ms, row_count=len(schemas), extra={"engine": config.engine}, ) return { "connection_id": connection_id, "engine": "postgres", "schemas": schemas, } except Exception as exc: duration_ms = _elapsed_ms(started) sanitized = sanitize_error_message(str(exc)) self._audit.log( tool="list_schemas", connection_id=connection_id, success=False, duration_ms=duration_ms, error=sanitized, extra=_build_audit_extra(config), ) raise QueryExecutionError(sanitized) from exc - The PostgresAdapter.list_schemas method executes the actual SQL query against PostgreSQL's information_schema.schemata, filtering out 'information_schema' and pg_* schemas, and returns the list of schema names.
def list_schemas(self, conn: object) -> List[str]: with conn.cursor() as cur: cur.execute( """ SELECT schema_name FROM information_schema.schemata WHERE schema_name NOT IN ('information_schema') AND schema_name NOT LIKE 'pg_%' ORDER BY schema_name """ ) return [row["schema_name"] for row in cur.fetchall()] - sql_query_mcp/namespace.py:54-56 (helper)The require_engine function validates that the connection engine is 'postgres' for list_schemas, raising a SecurityError if not.
def require_engine(config: ConnectionConfig, engine: str, tool_name: str) -> None: if config.engine != engine: raise SecurityError(f"{tool_name} 仅适用于 {engine} 连接,当前连接 engine={config.engine}")