list_tables
List all tables and views in a database or schema using a connection ID.
Instructions
List tables and views for a resolved schema or database.
Input Schema
| Name | Required | Description | Default |
|---|---|---|---|
| connection_id | Yes | ||
| schema | No | ||
| database | No |
Implementation Reference
- sql_query_mcp/app.py:48-56 (registration)MCP tool registration for 'list_tables' — decorates the function with @mcp.tool(), defines input params (connection_id, schema, database), and delegates to metadata.list_tables() via _run_tool.
@mcp.tool() def list_tables( connection_id: str, schema: Optional[str] = None, database: Optional[str] = None, ) -> dict: """List tables and views for a resolved schema or database.""" return _run_tool(lambda: metadata.list_tables(connection_id, schema, database)) - sql_query_mcp/introspection.py:105-147 (handler)MetadataService.list_tables — core handler that resolves namespace, opens connection via registry, applies statement timeout, calls adapter.list_tables(), audits success/failure, and returns the result dict.
def list_tables( self, connection_id: str, schema: Optional[str] = None, database: Optional[str] = None, ) -> Dict[str, object]: started = time.perf_counter() config = None try: config = self._registry.get_connection_config(connection_id) namespace = resolve_namespace(config, schema=schema, database=database) with self._registry.connection_from_config(config) as (conn, adapter): _apply_statement_timeout( adapter, conn, self._settings.statement_timeout_ms ) tables = adapter.list_tables(conn, namespace.value) duration_ms = _elapsed_ms(started) self._audit.log( tool="list_tables", connection_id=connection_id, success=True, duration_ms=duration_ms, row_count=len(tables), extra={"engine": config.engine, namespace.field_name: namespace.value}, ) return { "connection_id": connection_id, "engine": config.engine, namespace.field_name: namespace.value, "tables": tables, } except Exception as exc: duration_ms = _elapsed_ms(started) sanitized = sanitize_error_message(str(exc)) self._audit.log( tool="list_tables", connection_id=connection_id, success=False, duration_ms=duration_ms, error=sanitized, extra=_build_audit_extra(config, schema=schema, database=database), ) raise QueryExecutionError(sanitized) from exc - sql_query_mcp/namespace.py:18-51 (helper)resolve_namespace — helper that determines whether to use 'schema' (postgres) or 'database' (mysql/hive) based on engine config, with validation.
def resolve_namespace( config: ConnectionConfig, *, schema: Optional[str] = None, database: Optional[str] = None, ) -> NamespaceSelection: if schema and database: raise SecurityError("schema 和 database 不能同时传入。") if config.engine == "postgres": if database: raise SecurityError("PostgreSQL 连接不接受 database 参数。") resolved = schema or config.default_schema if not resolved: raise SecurityError("PostgreSQL 连接必须显式传 schema,或在配置中设置 default_schema。") return NamespaceSelection(field_name="schema", value=resolved) if config.engine == "mysql": if schema: raise SecurityError("MySQL 连接不接受 schema 参数。") resolved = database or config.default_database if not resolved: raise SecurityError("MySQL 连接必须显式传 database,或在配置中设置 default_database。") return NamespaceSelection(field_name="database", value=resolved) if config.engine == "hive": if schema: raise SecurityError("Hive 连接不接受 schema 参数。") resolved = database or config.default_database if not resolved: raise SecurityError("Hive 连接必须显式传 database,或在配置中设置 default_database。") return NamespaceSelection(field_name="database", value=resolved) raise SecurityError(f"未知 engine: {config.engine}") - sql_query_mcp/adapters/postgres.py:53-64 (handler)PostgresAdapter.list_tables — executes SQL query on information_schema.tables filtered by schema, returns rows with schema, table_name, table_type.
def list_tables(self, conn: object, schema: str): with conn.cursor() as cur: cur.execute( """ SELECT table_schema AS schema, table_name, table_type FROM information_schema.tables WHERE table_schema = %s ORDER BY table_name """, (schema,), ) return cur.fetchall() - sql_query_mcp/adapters/mysql.py:57-68 (handler)MySQLAdapter.list_tables — executes SQL query on information_schema.tables filtered by database, returns rows with database_name, table_name, table_type.
def list_tables(self, conn: object, database: str): with conn.cursor() as cur: cur.execute( """ SELECT table_schema AS database_name, table_name, table_type FROM information_schema.tables WHERE table_schema = %s ORDER BY table_name """, (database,), ) return cur.fetchall()