list_workflows
Retrieve and filter workflows from the Unstructured API by destination ID, source ID, or status to manage and monitor data processing tasks efficiently.
Instructions
List workflows from the Unstructured API.
Args:
destination_id: Optional destination connector ID to filter by
source_id: Optional source connector ID to filter by
status: Optional workflow status to filter by
Returns:
String containing the list of workflows
Input Schema
TableJSON Schema
| Name | Required | Description | Default |
|---|---|---|---|
| destination_id | No | ||
| source_id | No | ||
| status | No |
Implementation Reference
- uns_mcp/server.py:251-296 (handler)The handler function decorated with @mcp.tool() that implements the list_workflows tool. It fetches workflows using the UnstructuredClient, applies filters, sorts them, and formats the output as a string.@mcp.tool() async def list_workflows( ctx: Context, destination_id: Optional[str] = None, source_id: Optional[str] = None, status: Optional[WorkflowState | str] = None, ) -> str: """ List workflows from the Unstructured API. Args: destination_id: Optional destination connector ID to filter by source_id: Optional source connector ID to filter by status: Optional workflow status to filter by Returns: String containing the list of workflows """ client = ctx.request_context.lifespan_context.client request = ListWorkflowsRequest(destination_id=destination_id, source_id=source_id) if status: try: status = WorkflowState(status) if isinstance(status, str) else status request.status = status except KeyError: return f"Invalid workflow status: {status}" response = await client.workflows.list_workflows_async(request=request) # Sort workflows by name sorted_workflows = sorted( response.response_list_workflows, key=lambda workflow: workflow.name.lower(), ) if not sorted_workflows: return "No workflows found" # Format response result = ["Available workflows:"] for workflow in sorted_workflows: result.append(f"- {workflow.name} (ID: {workflow.id})") return "\n".join(result)