list_datasets
Retrieve and filter datasets (topics and data sources) from Lenses MCP Server environments with pagination, search, and sorting options.
Instructions
Retrieves a paginated list of datasets (topics and other data sources).
Args: environment: The environment name. page: Page number (default: 1). page_size: Items per page (default: 25). search: Search keyword for dataset, fields and description. connections: List of connection names to filter by. tags: List of tag names to filter by. sort_field: Field to sort results by. sort_order: Sorting order - "asc" or "desc" (default: "asc"). include_system: Include system entities (default: False). search_fields: Search field names/documentation (default: True). schema_format: Schema format filter for SchemaRegistrySubject. has_records: Filter based on whether dataset has records. is_compacted: Filter based on compacted status (Kafka only).
Returns: Paginated list of datasets with source types.
Input Schema
| Name | Required | Description | Default |
|---|---|---|---|
| environment | Yes | ||
| page | No | ||
| page_size | No | ||
| search | No | ||
| connections | No | ||
| tags | No | ||
| sort_field | No | ||
| sort_order | No | asc | |
| include_system | No | ||
| search_fields | No | ||
| schema_format | No | ||
| has_records | No | ||
| is_compacted | No |
Implementation Reference
- src/lenses_mcp/tools/topics.py:310-382 (handler)The core handler function for the 'list_datasets' tool. It is decorated with @mcp.tool(), which registers it with the MCP server when register_topics(mcp) is called. The function builds a query string from parameters and makes a GET request to the API endpoint to retrieve paginated datasets.@mcp.tool() async def list_datasets( environment: str, page: int = 1, page_size: int = 25, search: Optional[str] = None, connections: Optional[List[str]] = None, tags: Optional[List[str]] = None, sort_field: Optional[str] = None, sort_order: str = "asc", include_system: bool = False, search_fields: bool = True, schema_format: Optional[str] = None, has_records: Optional[bool] = None, is_compacted: Optional[bool] = None ) -> Dict[str, Any]: """ Retrieves a paginated list of datasets (topics and other data sources). Args: environment: The environment name. page: Page number (default: 1). page_size: Items per page (default: 25). search: Search keyword for dataset, fields and description. connections: List of connection names to filter by. tags: List of tag names to filter by. sort_field: Field to sort results by. sort_order: Sorting order - "asc" or "desc" (default: "asc"). include_system: Include system entities (default: False). search_fields: Search field names/documentation (default: True). schema_format: Schema format filter for SchemaRegistrySubject. has_records: Filter based on whether dataset has records. is_compacted: Filter based on compacted status (Kafka only). Returns: Paginated list of datasets with source types. """ params = { "page": page, "pageSize": page_size, "sortOrder": sort_order, "includeSystemEntities": include_system, "searchFields": search_fields } if search: params["search"] = search if connections: params["connections"] = connections if tags: params["tags"] = tags if sort_field: params["sortField"] = sort_field if schema_format: params["schemaFormat"] = schema_format if has_records is not None: params["hasRecords"] = has_records if is_compacted is not None: params["isCompacted"] = is_compacted # Build query string query_params = [] for key, value in params.items(): if isinstance(value, list): for item in value: query_params.append(f"{key}={item}") else: query_params.append(f"{key}={value}") query_string = "&".join(query_params) endpoint = f"/api/v1/environments/{environment}/proxy/api/v1/datasets?{query_string}" return await api_client._make_request("GET", endpoint)