Skip to main content
Glama
by fortunto2
server.py15.5 kB
#!/usr/bin/env python """ Prefect MCP Server (using FastMCP) -------------------------------- MCP server integrating with the Prefect API for managing workflows, using FastMCP from the 'mcp' package and official prefect-client. """ import os import sys from typing import Dict, Any, Optional, List, Union from contextlib import asynccontextmanager from collections.abc import AsyncIterator from uuid import UUID from prefect.client.orchestration import get_client from prefect.client.schemas.filters import FlowFilter, FlowRunFilter, DeploymentFilter, FlowFilterName from mcp.server.fastmcp import FastMCP, Context # Prefect API Settings PREFECT_API_URL = os.environ.get("PREFECT_API_URL", "http://localhost:4200/api") PREFECT_API_KEY = os.environ.get("PREFECT_API_KEY", "") # --- API Client Lifespan Management --- @asynccontextmanager async def prefect_api_lifespan( server: FastMCP, ) -> AsyncIterator[Dict[str, Any]]: """Async context manager to initialize and clean up the Prefect API client.""" print("Initializing Prefect API Client for MCP server...", file=sys.stderr) # Set environment variables for the client if provided if PREFECT_API_URL: os.environ["PREFECT_API_URL"] = PREFECT_API_URL if PREFECT_API_KEY: os.environ["PREFECT_API_KEY"] = PREFECT_API_KEY # No need to pre-initialize the client, we will create it in each tool yield {} # --- MCP Server Definition with FastMCP --- mcp = FastMCP( name="prefect", # Server name lifespan=prefect_api_lifespan, # Specify the context manager ) # ------------------------------------------------------------------------ # Flow CRUD Operations # ------------------------------------------------------------------------ @mcp.tool() async def get_flow_by_id(ctx: Context, flow_id: str) -> Dict[str, Any]: """Get a flow by its ID. Args: flow_id: ID of the flow to retrieve. """ if not flow_id: return {"error": "Missing required argument: flow_id"} async with get_client() as client: try: flow = await client.read_flow(UUID(flow_id)) return {"flow": flow.model_dump()} except Exception as e: return {"error": f"Failed to get flow: {str(e)}"} @mcp.tool() async def get_flow_by_name(ctx: Context, name: str) -> Dict[str, Any]: """Get a flow by its name. Args: name: Name of the flow to retrieve. """ if not name: return {"error": "Missing required argument: name"} async with get_client() as client: try: # Use correct flow_filter parameter flow_filter = FlowFilter(name=FlowFilterName(any_=[name])) flows = await client.read_flows(flow_filter=flow_filter) if not flows: return {"error": f"No flow found with name: {name}"} # Return the first matching flow return {"flow": flows[0].model_dump()} except Exception as e: return {"error": f"Failed to get flow: {str(e)}"} @mcp.tool() async def list_flows(ctx: Context, limit: int = 20, offset: int = 0) -> Dict[str, Any]: """Get a list of flows from the Prefect API. Args: limit: Maximum number of flows to return (default 20). offset: Number of flows to skip (default 0). """ async with get_client() as client: flows = await client.read_flows(limit=limit, offset=offset) return {"flows": [flow.model_dump() for flow in flows], "count": len(flows)} @mcp.tool() async def search_flows( ctx: Context, name: Optional[str] = None, tags: Optional[List[str]] = None, limit: int = 20, ) -> Dict[str, Any]: """Search for flows by name and/or tags. Args: name: Optional name to search for (case-insensitive contains match). tags: Optional list of tags to filter by. limit: Maximum number of flows to return (default 20). """ filter_dict = {} if name: filter_dict["name"] = {"contains": name} if tags: filter_dict["tags"] = {"all_": tags} async with get_client() as client: flow_filter = FlowFilter(**filter_dict) flows = await client.read_flows(flow_filter=flow_filter, limit=limit) return {"flows": [flow.model_dump() for flow in flows], "count": len(flows)} # ------------------------------------------------------------------------ # Flow Run CRUD Operations # ------------------------------------------------------------------------ @mcp.tool() async def get_flow_run_by_id(ctx: Context, flow_run_id: str) -> Dict[str, Any]: """Get a flow run by its ID. Args: flow_run_id: ID of the flow run to retrieve. """ if not flow_run_id: return {"error": "Missing required argument: flow_run_id"} async with get_client() as client: try: flow_run = await client.read_flow_run(UUID(flow_run_id)) return {"flow_run": flow_run.model_dump()} except Exception as e: return {"error": f"Failed to get flow run: {str(e)}"} @mcp.tool() async def list_flow_runs( ctx: Context, limit: int = 20, offset: int = 0, flow_id: Optional[str] = None ) -> Dict[str, Any]: """Get a list of flow runs from the Prefect API. Args: limit: Maximum number of flow runs to return (default 20). offset: Number of flow runs to skip (default 0). flow_id: Optional ID of the flow to filter runs by. """ filter_dict = {} if flow_id: filter_dict["flow_id"] = {"equals": flow_id} async with get_client() as client: flow_run_filter = FlowRunFilter(**filter_dict) if filter_dict else None flow_runs = await client.read_flow_runs( flow_run_filter=flow_run_filter, limit=limit, offset=offset ) return { "flow_runs": [run.model_dump() for run in flow_runs], "count": len(flow_runs), } @mcp.tool() async def search_flow_runs_by_state( ctx: Context, state_type: Optional[str] = None, state_name: Optional[str] = None, limit: int = 20, ) -> Dict[str, Any]: """Search for flow runs by state. Args: state_type: Optional state type (e.g., "COMPLETED", "FAILED", "CRASHED"). state_name: Optional state name (e.g., "Completed", "Failed"). limit: Maximum number of flow runs to return (default 20). """ filter_dict = {} if state_type: filter_dict["state"] = {"type": {"equals": state_type}} if state_name: filter_dict["state"] = {"name": {"equals": state_name}} async with get_client() as client: flow_run_filter = FlowRunFilter(**filter_dict) if filter_dict else None flow_runs = await client.read_flow_runs( flow_run_filter=flow_run_filter, limit=limit ) return { "flow_runs": [run.model_dump() for run in flow_runs], "count": len(flow_runs), } @mcp.tool() async def cancel_flow_run(ctx: Context, flow_run_id: str) -> Dict[str, Any]: """Cancel a flow run. Args: flow_run_id: ID of the flow run to cancel. """ if not flow_run_id: return {"error": "Missing required argument: flow_run_id"} async with get_client() as client: try: result = await client.cancel_flow_run(UUID(flow_run_id)) return {"success": True, "result": str(result)} except Exception as e: return {"error": f"Failed to cancel flow run: {str(e)}"} # ------------------------------------------------------------------------ # Deployment CRUD Operations # ------------------------------------------------------------------------ @mcp.tool() async def get_deployment_by_id(ctx: Context, deployment_id: str) -> Dict[str, Any]: """Get a deployment by its ID. Args: deployment_id: ID of the deployment to retrieve. """ if not deployment_id: return {"error": "Missing required argument: deployment_id"} async with get_client() as client: try: deployment = await client.read_deployment(UUID(deployment_id)) return {"deployment": deployment.model_dump()} except Exception as e: return {"error": f"Failed to get deployment: {str(e)}"} @mcp.tool() async def get_deployment_by_name(ctx: Context, name: str) -> Dict[str, Any]: """Get a deployment by its name. Args: name: Name of the deployment to retrieve, in format "flow_name/deployment_name". """ if not name: return {"error": "Missing required argument: name"} if "/" not in name: return {"error": "Name should be in format 'flow_name/deployment_name'"} async with get_client() as client: try: flow_name, deployment_name = name.split("/", 1) # Use filters to find deployment by name deployment_filter = DeploymentFilter( name={"equals": deployment_name}, flow_name={"equals": flow_name} ) deployments = await client.read_deployments( deployment_filter=deployment_filter ) if not deployments: return {"error": f"No deployment found with name: {name}"} # Return the first matching deployment return {"deployment": deployments[0].model_dump()} except Exception as e: return {"error": f"Failed to get deployment: {str(e)}"} @mcp.tool() async def list_deployments( ctx: Context, limit: int = 20, offset: int = 0, flow_id: Optional[str] = None ) -> Dict[str, Any]: """Get a list of deployments from the Prefect API. Args: limit: Maximum number of deployments to return (default 20). offset: Number of deployments to skip (default 0). flow_id: Optional ID of the flow to filter deployments by. """ filter_dict = {} if flow_id: filter_dict["flow_id"] = {"equals": flow_id} async with get_client() as client: deployment_filter = DeploymentFilter(**filter_dict) if filter_dict else None deployments = await client.read_deployments( deployment_filter=deployment_filter, limit=limit, offset=offset ) return { "deployments": [depl.model_dump() for depl in deployments], "count": len(deployments), } @mcp.tool() async def search_deployments_by_status( ctx: Context, is_schedule_active: Optional[bool] = None, limit: int = 20 ) -> Dict[str, Any]: """Search for deployments by schedule status. Args: is_schedule_active: Filter deployments by whether their schedule is active. limit: Maximum number of deployments to return (default 20). """ filter_dict = {} if is_schedule_active is not None: filter_dict["is_schedule_active"] = {"equals": is_schedule_active} async with get_client() as client: deployment_filter = DeploymentFilter(**filter_dict) if filter_dict else None deployments = await client.read_deployments( deployment_filter=deployment_filter, limit=limit ) return { "deployments": [depl.model_dump() for depl in deployments], "count": len(deployments), } @mcp.tool() async def create_flow_run_from_deployment( ctx: Context, deployment_id: str, parameters: Optional[Dict[str, Any]] = None, name: Optional[str] = None, timeout: int = 0, ) -> Dict[str, Any]: """Create a new flow run for the specified deployment. Args: deployment_id: ID of the deployment or name in format 'flow_name/deployment_name'. parameters: Dictionary with parameters for the flow run (optional). name: Optional name for the flow run. timeout: Timeout in seconds, 0 means no waiting for completion (default 0). """ if not deployment_id: return {"error": "Missing required argument: deployment_id"} from prefect.deployments import run_deployment try: # Создаем flow run с помощью функции run_deployment result = await run_deployment( name=deployment_id, # В документации это "name", а не "deployment_id" parameters=parameters or {}, timeout=timeout, flow_run_name=name, ) return {"flow_run_id": str(result)} except Exception as e: return {"error": f"Failed to create flow run: {str(e)}"} # ------------------------------------------------------------------------ # Legacy Support / Backwards Compatibility # ------------------------------------------------------------------------ @mcp.tool() async def filter_flows(ctx: Context, filter_criteria: Dict[str, Any]) -> Dict[str, Any]: """Filter flows based on specified criteria. Args: filter_criteria: Dictionary with filter criteria according to Prefect API. Example: {"flows": {"tags": {"all_": ["production"]}}} """ async with get_client() as client: flow_filter = FlowFilter(**filter_criteria) flows = await client.read_flows(flow_filter=flow_filter) return {"flows": [flow.model_dump() for flow in flows]} @mcp.tool() async def filter_flow_runs( ctx: Context, filter_criteria: Dict[str, Any] ) -> Dict[str, Any]: """Filter flow runs based on specified criteria. Args: filter_criteria: Dictionary with filter criteria according to Prefect API. Example: {"flow_runs": {"state": {"type": {"any_": ["FAILED", "CRASHED"]}}}} """ async with get_client() as client: flow_run_filter = FlowRunFilter(**filter_criteria) flow_runs = await client.read_flow_runs(flow_run_filter=flow_run_filter) return {"flow_runs": [run.model_dump() for run in flow_runs]} @mcp.tool() async def filter_deployments( ctx: Context, filter_criteria: Dict[str, Any] ) -> Dict[str, Any]: """Filter deployments based on specified criteria. Args: filter_criteria: Dictionary with filter criteria according to Prefect API. Example1: {"deployments": {"is_schedule_active": {"eq_": true}}} Example2: {"deployments": {"tags": {"all_": ["production"]}}} """ async with get_client() as client: deployment_filter = DeploymentFilter(**filter_criteria) deployments = await client.read_deployments(deployment_filter=deployment_filter) return {"deployments": [deployment.model_dump() for deployment in deployments]} @mcp.tool() async def create_flow_run( ctx: Context, deployment_id: str, parameters: Optional[Dict[str, Any]] = None ) -> Dict[str, Any]: """Create a new flow run for the specified deployment (Legacy). Args: deployment_id: ID of the deployment to create a run for. parameters: Dictionary with parameters for the flow run (optional). """ return await create_flow_run_from_deployment(ctx, deployment_id, parameters) def main_run(): print("Starting Prefect MCP Server using FastMCP...", file=sys.stderr) print(f"Prefect API URL: {PREFECT_API_URL}", file=sys.stderr) if PREFECT_API_KEY: print("Using Prefect API Key: YES", file=sys.stderr) else: print("Using Prefect API Key: NO", file=sys.stderr) # mcp.run() starts the server and handles the stdio transport mcp.run() # --- Main entry point for running the server --- if __name__ == "__main__": main_run()

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/fortunto2/prefect-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server