create_work_queue
Create a work queue to organize and manage workflow execution in Prefect, allowing control over queue behavior with options for pausing, concurrency limits, and descriptions.
Instructions
Create a work queue.
Args: name: The name for the work queue description: Optional description is_paused: Whether the queue should be paused upon creation concurrency_limit: Optional concurrency limit
Returns: Details of the created work queue
Input Schema
TableJSON Schema
| Name | Required | Description | Default |
|---|---|---|---|
| concurrency_limit | No | ||
| description | No | ||
| is_paused | No | ||
| name | Yes |
Implementation Reference
- src/mcp_prefect/work_queue.py:107-138 (handler)The handler function for the 'create_work_queue' tool, decorated with @mcp.tool for registration. It uses the Prefect client to create a work queue with the provided parameters and returns the created work queue details as text content.@mcp.tool async def create_work_queue( name: str, description: Optional[str] = None, is_paused: Optional[bool] = None, concurrency_limit: Optional[int] = None, work_pool_name: Optional[str] = None, ) -> List[Union[types.TextContent, types.ImageContent, types.EmbeddedResource]]: """ Create a work queue. Args: name: The name for the work queue description: Optional description is_paused: Whether the queue should be paused upon creation concurrency_limit: Optional concurrency limit work_pool_name: Optional work pool name to create the queue in Returns: Details of the created work queue """ async with get_client() as client: work_queue = await client.create_work_queue( name=name, description=description, is_paused=is_paused, concurrency_limit=concurrency_limit, work_pool_name=work_pool_name, ) return [types.TextContent(type="text", text=str(work_queue.model_dump()))]
- src/mcp_prefect/main.py:62-64 (registration)The import statement in main.py that loads the work_queue module, thereby registering all tools defined in it (including create_work_queue) via their @mcp.tool decorators.if APIType.WORK_QUEUE.value in apis: info("Loading Work Queue API...") from . import work_queue