list_tables
Retrieve all database tables with options to filter by schema, include views, and show system tables for database exploration.
Instructions
List all tables in the database.
Args:
schema: Filter by schema name (default: all user schemas).
include_views: Include views in results.
include_system: Include system tables.
Returns:
List of tables with schema, name, and type.
Input Schema
TableJSON Schema
| Name | Required | Description | Default |
|---|---|---|---|
| schema | No | ||
| include_views | No | ||
| include_system | No |
Implementation Reference
- src/cockroachdb_mcp/server.py:151-170 (handler)MCP tool handler and registration for 'list_tables'. This is the entry point registered with the MCP server that wraps and calls the core implementation.@mcp.tool() async def list_tables( schema: str | None = None, include_views: bool = True, include_system: bool = False, ) -> dict[str, Any]: """List all tables in the database. Args: schema: Filter by schema name (default: all user schemas). include_views: Include views in results. include_system: Include system tables. Returns: List of tables with schema, name, and type. """ try: return await tables.list_tables(schema, include_views, include_system) except Exception as e: return {"status": "error", "error": str(e)}
- Core helper function implementing the list_tables tool logic by querying CockroachDB's information_schema.tables with appropriate filters for schemas, views, and system tables.async def list_tables( schema: str | None = None, include_views: bool = True, include_system: bool = False, ) -> dict[str, Any]: """List all tables in the database. Args: schema: Filter by schema name. include_views: Include views in results. include_system: Include system tables. Returns: List of tables. """ conn = await connection_manager.ensure_connected() try: # Build type filter table_types = ["'BASE TABLE'"] if include_views: table_types.append("'VIEW'") type_filter = f"table_type IN ({','.join(table_types)})" # Build schema filter schema_filter = "" if schema: schema_filter = f"AND table_schema = '{schema}'" elif not include_system: schema_filter = """ AND table_schema NOT IN ( 'crdb_internal', 'information_schema', 'pg_catalog', 'pg_extension' ) """ query = f""" SELECT table_schema, table_name, table_type FROM information_schema.tables WHERE {type_filter} {schema_filter} ORDER BY table_schema, table_name """ async with conn.cursor() as cur: await cur.execute(query) rows = await cur.fetchall() tables = [] for row in rows: schema_name = row.get("table_schema", "") # Check if schema is allowed if not _is_allowed_schema(schema_name): continue tables.append( { "schema": schema_name, "name": row.get("table_name", ""), "type": "VIEW" if row.get("table_type") == "VIEW" else "TABLE", "full_name": f"{schema_name}.{row.get('table_name', '')}", } ) return { "tables": tables, "count": len(tables), "database": connection_manager.current_database, "schema_filter": schema, } except Exception as e: return {"status": "error", "error": str(e)}