get_pools
Retrieve a list of Airflow connection pools from the MCP Server Airflow Token, allowing users to view available pools with optional filtering and sorting parameters.
Instructions
List pools
Input Schema
| Name | Required | Description | Default |
|---|---|---|---|
| limit | No | ||
| offset | No | ||
| order_by | No |
Input Schema (JSON Schema)
{
"properties": {
"limit": {
"anyOf": [
{
"type": "integer"
},
{
"type": "null"
}
],
"default": null,
"title": "Limit"
},
"offset": {
"anyOf": [
{
"type": "integer"
},
{
"type": "null"
}
],
"default": null,
"title": "Offset"
},
"order_by": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"title": "Order By"
}
},
"type": "object"
}
Implementation Reference
- src/airflow/pool.py:23-50 (handler)The main execution logic for the 'get_pools' tool. It accepts optional parameters for pagination and sorting, calls the Airflow PoolApi, and returns the list of pools as a text content block.async def get_pools( limit: Optional[int] = None, offset: Optional[int] = None, order_by: Optional[str] = None, ) -> List[Union[types.TextContent, types.ImageContent, types.EmbeddedResource]]: """ List pools. Args: limit: The numbers of items to return. offset: The number of items to skip before starting to collect the result set. order_by: The name of the field to order the results by. Prefix a field name with `-` to reverse the sort order. Returns: A list of pools. """ # Build parameters dictionary kwargs: Dict[str, Any] = {} if limit is not None: kwargs["limit"] = limit if offset is not None: kwargs["offset"] = offset if order_by is not None: kwargs["order_by"] = order_by response = pool_api.get_pools(**kwargs) return [types.TextContent(type="text", text=str(response.to_dict()))]
- src/airflow/pool.py:12-20 (registration)The get_all_functions() in this module returns the registration tuple for 'get_pools' (function, name, description, read_only=True), which is later used in src/main.py to add the tool via app.add_tool.def get_all_functions() -> list[tuple[Callable, str, str, bool]]: """Return list of (function, name, description, is_read_only) tuples for registration.""" return [ (get_pools, "get_pools", "List pools", True), (get_pool, "get_pool", "Get a pool by name", True), (delete_pool, "delete_pool", "Delete a pool", False), (post_pool, "post_pool", "Create a pool", False), (patch_pool, "patch_pool", "Update a pool", False), ]