get_xcom_entries
Retrieve XCom entries from Apache Airflow by specifying DAG ID, run ID, task ID, and optional parameters like key, limit, and offset for targeted data extraction.
Instructions
Get all XCom entries
Input Schema
TableJSON Schema
| Name | Required | Description | Default |
|---|---|---|---|
| dag_id | Yes | ||
| dag_run_id | Yes | ||
| limit | No | ||
| map_index | No | ||
| offset | No | ||
| task_id | Yes | ||
| xcom_key | No |
Implementation Reference
- src/airflow/xcom.py:19-40 (handler)The async handler function that implements the core logic of the 'get_xcom_entries' tool by querying the Airflow XCom API and returning the results as text content.async def get_xcom_entries( dag_id: str, dag_run_id: str, task_id: str, map_index: Optional[int] = None, xcom_key: Optional[str] = None, limit: Optional[int] = None, offset: Optional[int] = None, ) -> List[Union[types.TextContent, types.ImageContent, types.EmbeddedResource]]: # Build parameters dictionary kwargs: Dict[str, Any] = {} if map_index is not None: kwargs["map_index"] = map_index if xcom_key is not None: kwargs["xcom_key"] = xcom_key if limit is not None: kwargs["limit"] = limit if offset is not None: kwargs["offset"] = offset response = xcom_api.get_xcom_entries(dag_id=dag_id, dag_run_id=dag_run_id, task_id=task_id, **kwargs) return [types.TextContent(type="text", text=str(response.to_dict()))]
- src/airflow/xcom.py:11-16 (registration)The get_all_functions() defines the registration details (function reference, name 'get_xcom_entries', description, read-only flag) used by main.py to register the tool.def get_all_functions() -> list[tuple[Callable, str, str, bool]]: """Return list of (function, name, description, is_read_only) tuples for registration.""" return [ (get_xcom_entries, "get_xcom_entries", "Get all XCom entries", True), (get_xcom_entry, "get_xcom_entry", "Get an XCom entry", True), ]
- src/main.py:95-96 (registration)The tool registration loop in main.py where functions from xcom.get_all_functions() (including get_xcom_entries) are registered with the MCP app using Tool.from_function.for func, name, description, *_ in functions: app.add_tool(Tool.from_function(func, name=name, description=description))
- src/main.py:20-20 (registration)Import of xcom.py's get_all_functions into main.py, aliased as get_xcom_functions, which is mapped to APIType.XCOM.from src.airflow.xcom import get_all_functions as get_xcom_functions