Skip to main content
Glama
flow_run.py8.18 kB
from typing import Any, Dict, List, Optional, Union from uuid import UUID import mcp.types as types from prefect import get_client from prefect.states import Cancelled, Completed, Failed, Pending, Running, Scheduled from .envs import PREFECT_API_URL from .server import mcp def get_flow_run_url(flow_run_id: str) -> str: base_url = PREFECT_API_URL.replace("/api", "") return f"{base_url}/flow-runs/{flow_run_id}" @mcp.tool async def get_flow_runs( limit: Optional[int] = None, offset: Optional[int] = None, flow_name: Optional[str] = None, state_type: Optional[str] = None, state_name: Optional[str] = None, deployment_id: Optional[str] = None, tags: Optional[List[str]] = None, start_time_before: Optional[str] = None, start_time_after: Optional[str] = None, ) -> List[Union[types.TextContent, types.ImageContent, types.EmbeddedResource]]: """ Get a list of flow runs with optional filtering. Args: limit: Maximum number of flow runs to return offset: Number of flow runs to skip flow_name: Filter by flow name state_type: Filter by state type (e.g., "RUNNING", "COMPLETED", "FAILED") state_name: Filter by state name deployment_id: Filter by deployment ID tags: Filter by tags start_time_before: ISO formatted datetime string start_time_after: ISO formatted datetime string Returns: A list of flow runs with their details """ async with get_client() as client: # Build filter parameters filters = {} if flow_name: filters["flow_name"] = {"like_": f"%{flow_name}%"} if state_type: filters["state"] = {"type": {"any_": [state_type.upper()]}} if state_name: filters["state"] = {"name": {"any_": [state_name]}} if deployment_id: filters["deployment_id"] = {"eq_": UUID(deployment_id)} if tags: filters["tags"] = {"all_": tags} if start_time_after: filters["start_time"] = {"ge_": start_time_after} if start_time_before: if "start_time" in filters: filters["start_time"]["le_"] = start_time_before else: filters["start_time"] = {"le_": start_time_before} flow_runs = await client.read_flow_runs( limit=limit, offset=offset, **filters ) # Add UI links to each flow run flow_runs_result = { "flow_runs": [ { **flow_run.dict(), "ui_url": get_flow_run_url(str(flow_run.id)) } for flow_run in flow_runs ] } return [types.TextContent(type="text", text=str(flow_runs_result))] @mcp.tool async def get_flow_run( flow_run_id: str, ) -> List[Union[types.TextContent, types.ImageContent, types.EmbeddedResource]]: """ Get details of a specific flow run by ID. Args: flow_run_id: The flow run UUID Returns: Flow run details """ async with get_client() as client: flow_run = await client.read_flow_run(UUID(flow_run_id)) # Add UI link flow_run_dict = flow_run.dict() flow_run_dict["ui_url"] = get_flow_run_url(flow_run_id) return [types.TextContent(type="text", text=str(flow_run_dict))] @mcp.tool async def get_flow_runs_by_flow( flow_id: str, limit: Optional[int] = None, offset: Optional[int] = None, state_type: Optional[str] = None, ) -> List[Union[types.TextContent, types.ImageContent, types.EmbeddedResource]]: """ Get flow runs for a specific flow. Args: flow_id: The flow UUID limit: Maximum number of flow runs to return offset: Number of flow runs to skip state_type: Filter by state type (e.g., "RUNNING", "COMPLETED", "FAILED") Returns: A list of flow runs for the specified flow """ async with get_client() as client: # Build filter parameters filters = {"flow_id": {"eq_": UUID(flow_id)}} if state_type: filters["state"] = {"type": {"any_": [state_type.upper()]}} flow_runs = await client.read_flow_runs( limit=limit, offset=offset, **filters ) # Add UI links to each flow run flow_runs_result = { "flow_runs": [ { **flow_run.dict(), "ui_url": get_flow_run_url(str(flow_run.id)) } for flow_run in flow_runs ] } return [types.TextContent(type="text", text=str(flow_runs_result))] @mcp.tool async def restart_flow_run( flow_run_id: str, ) -> List[Union[types.TextContent, types.ImageContent, types.EmbeddedResource]]: """ Restart a flow run. Args: flow_run_id: The flow run UUID Returns: Details of the new flow run """ async with get_client() as client: flow_run_id_uuid = UUID(flow_run_id) new_flow_run = await client.create_flow_run_from_flow_run(flow_run_id_uuid) new_flow_run_dict = new_flow_run.dict() new_flow_run_dict["ui_url"] = get_flow_run_url(str(new_flow_run.id)) return [types.TextContent(type="text", text=str(new_flow_run_dict))] @mcp.tool async def cancel_flow_run( flow_run_id: str, ) -> List[Union[types.TextContent, types.ImageContent, types.EmbeddedResource]]: """ Cancel a flow run. Args: flow_run_id: The flow run UUID Returns: Confirmation message """ async with get_client() as client: await client.set_flow_run_state( flow_run_id=UUID(flow_run_id), state=Cancelled(message="Cancelled via MCP") ) return [types.TextContent(type="text", text=f"Flow run '{flow_run_id}' cancelled successfully.")] @mcp.tool async def delete_flow_run( flow_run_id: str, ) -> List[Union[types.TextContent, types.ImageContent, types.EmbeddedResource]]: """ Delete a flow run. Args: flow_run_id: The flow run UUID Returns: Confirmation message """ async with get_client() as client: await client.delete_flow_run(UUID(flow_run_id)) return [types.TextContent(type="text", text=f"Flow run '{flow_run_id}' deleted successfully.")] @mcp.tool async def set_flow_run_state( flow_run_id: str, state: str, message: Optional[str] = None, ) -> List[Union[types.TextContent, types.ImageContent, types.EmbeddedResource]]: """ Set a flow run's state. Args: flow_run_id: The flow run UUID state: The new state to set (e.g., "SCHEDULED", "RUNNING", "COMPLETED", "FAILED") message: An optional message explaining the state change Returns: Result of the state change operation """ async with get_client() as client: state_obj = None if state.upper() == "SCHEDULED": state_obj = Scheduled(message=message) elif state.upper() == "RUNNING": state_obj = Running(message=message) elif state.upper() == "COMPLETED": state_obj = Completed(message=message) elif state.upper() == "FAILED": state_obj = Failed(message=message) elif state.upper() == "PENDING": state_obj = Pending(message=message) elif state.upper() == "CANCELLED": state_obj = Cancelled(message=message) else: return [types.TextContent( type="text", text=f"Invalid state '{state}'. Must be one of: SCHEDULED, RUNNING, COMPLETED, FAILED, PENDING, CANCELLED" )] result = await client.set_flow_run_state( flow_run_id=UUID(flow_run_id), state=state_obj ) return [types.TextContent(type="text", text=str(result.dict()))]

Implementation Reference

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/allen-munsch/mcp-prefect'

If you have feedback or need assistance with the MCP directory API, please join our Discord server