Skip to main content
Glama
work_pools.py13.4 kB
from typing import Any, Callable, Dict, List, Optional, Union from uuid import UUID import mcp.types as types from prefect import get_client def get_all_functions() -> list[tuple[Callable, str, str]]: return [ (get_work_pools, "get_work_pools", "Get all work pools"), (get_work_pool, "get_work_pool", "Get a work pool by name"), (create_work_pool, "create_work_pool", "Create a work pool"), (update_work_pool, "update_work_pool", "Update a work pool"), (delete_work_pool, "delete_work_pool", "Delete a work pool"), (get_scheduled_flow_runs, "get_scheduled_flow_runs", "Get scheduled flow runs for a work pool"), (get_work_pool_queues, "get_work_pool_queues", "Get work queues for a work pool"), (get_work_pool_queue, "get_work_pool_queue", "Get a specific work queue in a work pool"), (create_work_pool_queue, "create_work_pool_queue", "Create a work queue in a work pool"), (update_work_pool_queue, "update_work_pool_queue", "Update a work queue in a work pool"), (delete_work_pool_queue, "delete_work_pool_queue", "Delete a work queue from a work pool"), ] async def get_work_pools( limit: Optional[int] = None, offset: Optional[int] = None, ) -> List[Union[types.TextContent, types.ImageContent, types.EmbeddedResource]]: """ Get all work pools with optional pagination. Args: limit: Maximum number of work pools to return offset: Number of work pools to skip Returns: A list of work pools """ async with get_client() as client: try: # The method name might be different based on Prefect's API work_pools = await client.read_work_pools( limit=limit, offset=offset ) work_pools_result = { "work_pools": [work_pool.dict() for work_pool in work_pools] } return [types.TextContent(type="text", text=str(work_pools_result))] except Exception as e: error_message = f"Error fetching work pools: {str(e)}" return [types.TextContent(type="text", text=error_message)] async def get_work_pool( name: str, ) -> List[Union[types.TextContent, types.ImageContent, types.EmbeddedResource]]: """ Get a work pool by name. Args: name: The work pool name Returns: Work pool details """ async with get_client() as client: try: work_pool = await client.read_work_pool(name) return [types.TextContent(type="text", text=str(work_pool.dict()))] except Exception as e: error_message = f"Error fetching work pool '{name}': {str(e)}" return [types.TextContent(type="text", text=error_message)] async def create_work_pool( name: str, type: str, description: Optional[str] = None, base_job_template: Optional[Dict] = None, is_paused: Optional[bool] = None, ) -> List[Union[types.TextContent, types.ImageContent, types.EmbeddedResource]]: """ Create a work pool. Args: name: The name for the work pool type: The type of work pool (e.g., 'kubernetes', 'process', etc.) description: Optional description base_job_template: Optional base job template as JSON is_paused: Whether the work pool should be paused Returns: Details of the created work pool """ async with get_client() as client: try: work_pool = await client.create_work_pool( name=name, work_pool_type=type, description=description, base_job_template=base_job_template or {}, is_paused=is_paused ) return [types.TextContent(type="text", text=str(work_pool.dict()))] except Exception as e: error_message = f"Error creating work pool: {str(e)}" return [types.TextContent(type="text", text=error_message)] async def update_work_pool( name: str, description: Optional[str] = None, base_job_template: Optional[Dict] = None, is_paused: Optional[bool] = None, ) -> List[Union[types.TextContent, types.ImageContent, types.EmbeddedResource]]: """ Update a work pool. Args: name: The work pool name description: New description base_job_template: New base job template as JSON is_paused: New paused status Returns: Details of the updated work pool """ async with get_client() as client: try: # Prepare update data update_data = {} if description is not None: update_data["description"] = description if base_job_template is not None: update_data["base_job_template"] = base_job_template if is_paused is not None: update_data["is_paused"] = is_paused work_pool = await client.update_work_pool( name=name, **update_data ) return [types.TextContent(type="text", text=str(work_pool.dict()))] except Exception as e: error_message = f"Error updating work pool '{name}': {str(e)}" return [types.TextContent(type="text", text=error_message)] async def delete_work_pool( name: str, ) -> List[Union[types.TextContent, types.ImageContent, types.EmbeddedResource]]: """ Delete a work pool by name. Args: name: The work pool name Returns: Confirmation message """ async with get_client() as client: try: await client.delete_work_pool(name) return [types.TextContent(type="text", text=f"Work pool '{name}' deleted successfully.")] except Exception as e: error_message = f"Error deleting work pool '{name}': {str(e)}" return [types.TextContent(type="text", text=error_message)] async def get_scheduled_flow_runs( work_pool_name: str, limit: Optional[int] = None, scheduled_before: Optional[str] = None, ) -> List[Union[types.TextContent, types.ImageContent, types.EmbeddedResource]]: """ Get scheduled flow runs for a work pool. Args: work_pool_name: The work pool name limit: Maximum number of flow runs to return scheduled_before: ISO formatted timestamp Returns: A list of scheduled flow runs """ async with get_client() as client: try: flow_runs = await client.get_scheduled_flow_runs( work_pool_name=work_pool_name, limit=limit, scheduled_before=scheduled_before ) flow_runs_result = { "scheduled_flow_runs": [flow_run.dict() for flow_run in flow_runs] } return [types.TextContent(type="text", text=str(flow_runs_result))] except Exception as e: error_message = f"Error fetching scheduled flow runs for work pool '{work_pool_name}': {str(e)}" return [types.TextContent(type="text", text=error_message)] async def get_work_pool_queues( work_pool_name: str, limit: Optional[int] = None, offset: Optional[int] = None, ) -> List[Union[types.TextContent, types.ImageContent, types.EmbeddedResource]]: """ Get work queues for a work pool. Args: work_pool_name: The work pool name limit: Maximum number of queues to return offset: Number of queues to skip Returns: A list of work queues in the work pool """ async with get_client() as client: try: queues = await client.read_work_pool_queues( work_pool_name=work_pool_name, limit=limit, offset=offset ) queues_result = { "work_pool_queues": [queue.dict() for queue in queues] } return [types.TextContent(type="text", text=str(queues_result))] except Exception as e: error_message = f"Error fetching work queues for work pool '{work_pool_name}': {str(e)}" return [types.TextContent(type="text", text=error_message)] async def get_work_pool_queue( work_pool_name: str, queue_name: str, ) -> List[Union[types.TextContent, types.ImageContent, types.EmbeddedResource]]: """ Get a specific work queue in a work pool. Args: work_pool_name: The work pool name queue_name: The work queue name Returns: Work queue details """ async with get_client() as client: try: queue = await client.read_work_pool_queue( work_pool_name=work_pool_name, queue_name=queue_name ) return [types.TextContent(type="text", text=str(queue.dict()))] except Exception as e: error_message = f"Error fetching work queue '{queue_name}' in work pool '{work_pool_name}': {str(e)}" return [types.TextContent(type="text", text=error_message)] async def create_work_pool_queue( work_pool_name: str, name: str, description: Optional[str] = None, is_paused: Optional[bool] = None, concurrency_limit: Optional[int] = None, priority: Optional[int] = None, ) -> List[Union[types.TextContent, types.ImageContent, types.EmbeddedResource]]: """ Create a work queue in a work pool. Args: work_pool_name: The work pool name name: The name for the work queue description: Optional description is_paused: Whether the queue is paused concurrency_limit: Optional concurrency limit priority: Optional priority Returns: Details of the created work queue """ async with get_client() as client: try: queue = await client.create_work_pool_queue( work_pool_name=work_pool_name, name=name, description=description, is_paused=is_paused, concurrency_limit=concurrency_limit, priority=priority ) return [types.TextContent(type="text", text=str(queue.dict()))] except Exception as e: error_message = f"Error creating work queue in work pool '{work_pool_name}': {str(e)}" return [types.TextContent(type="text", text=error_message)] async def update_work_pool_queue( work_pool_name: str, queue_name: str, description: Optional[str] = None, is_paused: Optional[bool] = None, concurrency_limit: Optional[int] = None, priority: Optional[int] = None, ) -> List[Union[types.TextContent, types.ImageContent, types.EmbeddedResource]]: """ Update a work queue in a work pool. Args: work_pool_name: The work pool name queue_name: The work queue name description: New description is_paused: New paused status concurrency_limit: New concurrency limit priority: New priority Returns: Details of the updated work queue """ async with get_client() as client: try: # Prepare update data update_data = {} if description is not None: update_data["description"] = description if is_paused is not None: update_data["is_paused"] = is_paused if concurrency_limit is not None: update_data["concurrency_limit"] = concurrency_limit if priority is not None: update_data["priority"] = priority queue = await client.update_work_pool_queue( work_pool_name=work_pool_name, queue_name=queue_name, **update_data ) return [types.TextContent(type="text", text=str(queue.dict()))] except Exception as e: error_message = f"Error updating work queue '{queue_name}' in work pool '{work_pool_name}': {str(e)}" return [types.TextContent(type="text", text=error_message)] async def delete_work_pool_queue( work_pool_name: str, queue_name: str, ) -> List[Union[types.TextContent, types.ImageContent, types.EmbeddedResource]]: """ Delete a work queue from a work pool. Args: work_pool_name: The work pool name queue_name: The work queue name Returns: Confirmation message """ async with get_client() as client: try: await client.delete_work_pool_queue( work_pool_name=work_pool_name, queue_name=queue_name ) return [types.TextContent(type="text", text=f"Work queue '{queue_name}' deleted from work pool '{work_pool_name}' successfully.")] except Exception as e: error_message = f"Error deleting work queue '{queue_name}' from work pool '{work_pool_name}': {str(e)}" return [types.TextContent(type="text", text=error_message)]

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/allen-munsch/mcp-prefect'

If you have feedback or need assistance with the MCP directory API, please join our Discord server