get_task_runs
Retrieve task runs from Prefect workflows with filtering options for task name, state, tags, and time ranges to monitor and analyze workflow execution.
Instructions
Get a list of task runs with optional filtering.
Args: limit: Maximum number of task runs to return offset: Number of task runs to skip task_name: Filter by task name state_type: Filter by state type (e.g., "RUNNING", "COMPLETED", "FAILED") state_name: Filter by state name tags: Filter by tags start_time_before: ISO formatted datetime string start_time_after: ISO formatted datetime string
Returns: A list of task runs with their details
Input Schema
| Name | Required | Description | Default |
|---|---|---|---|
| limit | No | ||
| offset | No | ||
| start_time_after | No | ||
| start_time_before | No | ||
| state_name | No | ||
| state_type | No | ||
| tags | No | ||
| task_name | No |
Implementation Reference
- src/mcp_prefect/task_run.py:30-129 (handler)The get_task_runs tool handler decorated with @mcp.tool, implementing the logic to fetch and filter Prefect task runs using the Prefect client, adding UI links, and returning as TextContent.@mcp.tool async def get_task_runs( limit: Optional[int] = None, offset: Optional[int] = None, task_name: Optional[str] = None, state_type: Optional[str] = None, state_name: Optional[str] = None, tags: Optional[List[str]] = None, start_time_before: Optional[str] = None, start_time_after: Optional[str] = None, ) -> List[Union[types.TextContent, types.ImageContent, types.EmbeddedResource]]: """ Get a list of task runs with optional filtering. Args: limit: Maximum number of task runs to return offset: Number of task runs to skip task_name: Filter by task name state_type: Filter by state type (e.g., "RUNNING", "COMPLETED", "FAILED") state_name: Filter by state name tags: Filter by tags start_time_before: ISO formatted datetime string start_time_after: ISO formatted datetime string Returns: A list of task runs with their details """ async with get_client() as client: # Build filter objects task_run_filter = None filter_components = [] if task_name: filter_components.append( TaskRunFilterName(like_=f"%{task_name}%") ) if state_type: filter_components.append( TaskRunFilterState( type=TaskRunFilterStateType(any_=[state_type.upper()]) ) ) if state_name: filter_components.append( TaskRunFilterState( name=TaskRunFilterStateName(any_=[state_name]) ) ) if tags: filter_components.append( TaskRunFilterTags(all_=tags) ) if start_time_after or start_time_before: start_time_filter_args = {} if start_time_after: start_time_filter_args["after_"] = datetime.fromisoformat(start_time_after) if start_time_before: start_time_filter_args["before_"] = datetime.fromisoformat(start_time_before) filter_components.append( TaskRunFilterStartTime(**start_time_filter_args) ) # Combine filters if any exist if filter_components: # Create TaskRunFilter with the components # Note: You may need to adjust this based on how TaskRunFilter combines filters task_run_filter = TaskRunFilter() for component in filter_components: if isinstance(component, TaskRunFilterName): task_run_filter.name = component elif isinstance(component, TaskRunFilterState): task_run_filter.state = component elif isinstance(component, TaskRunFilterTags): task_run_filter.tags = component elif isinstance(component, TaskRunFilterStartTime): task_run_filter.start_time = component task_runs = await client.read_task_runs( task_run_filter=task_run_filter, limit=limit, offset=offset or 0 ) # Add UI links to each task run task_runs_result = { "task_runs": [ { **task_run.model_dump(), "ui_url": get_task_run_url(str(task_run.id)) } for task_run in task_runs ] } return [types.TextContent(type="text", text=str(task_runs_result))]
- src/mcp_prefect/task_run.py:25-27 (helper)Helper function used by get_task_runs to generate UI URLs for task runs.def get_task_run_url(task_run_id: str) -> str: base_url = PREFECT_API_URL.replace("/api", "") return f"{base_url}/task-runs/{task_run_id}"