Skip to main content
Glama
Unstructured-IO

Unstructured API MCP Server

Official

list_workflows_with_finished_jobs

Retrieve workflows that have completed jobs from the Unstructured API, with optional filtering by source or destination connector types.

Instructions

List workflows with finished jobs via the Unstructured API. Args: source_type: Optional source connector type to filter by destination_type: Optional destination connector type to filter by Returns: String containing the list of workflows with finished jobs and source and destination details

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
source_typeNo
destination_typeNo

Implementation Reference

  • Primary handler function for the tool 'list_workflows_with_finished_jobs'. Decorated with @mcp.tool() for automatic registration in the MCP server. Implements logic to filter and list workflows with completed jobs, optionally by source and destination types.
    @mcp.tool() async def list_workflows_with_finished_jobs( ctx: Context, source_type: Optional[SourceConnectorType | str] = None, destination_type: Optional[DestinationConnectorType | str] = None, ) -> str: """ List workflows with finished jobs via the Unstructured API. Args: source_type: Optional source connector type to filter by destination_type: Optional destination connector type to filter by Returns: String containing the list of workflows with finished jobs and source and destination details """ if source_type: try: source_type = ( SourceConnectorType(source_type) if isinstance(source_type, str) else source_type ) except KeyError: return f"Invalid source type: {source_type}" if destination_type: try: destination_type = ( DestinationConnectorType(destination_type) if isinstance(destination_type, str) else destination_type ) except KeyError: return f"Invalid destination type: {destination_type}" client = ctx.request_context.lifespan_context.client try: workflows_details = await gather_workflows_details(client=client) except Exception as e: return f"Error retrieving workflows: {str(e)}" filtered_workflows_details = [] for workflow_details in workflows_details: updated_workflow_details = deepcopy(workflow_details) if source_type: updated_workflow_details.sources = [ source for source in workflow_details.sources if source.type == source_type ] if destination_type: updated_workflow_details.destinations = [ destination for destination in workflow_details.destinations if destination.type == destination_type ] updated_workflow_details.jobs = [ job for job in workflow_details.jobs if job.status == JobStatus.COMPLETED ] if ( updated_workflow_details.sources and updated_workflow_details.destinations and updated_workflow_details.jobs ): filtered_workflows_details.append(updated_workflow_details) if not filtered_workflows_details: return "No workflows found with finished jobs" result = ["Workflows:"] for workflow_details in filtered_workflows_details: result.append(f"- Name: {workflow_details.workflow.name}") result.append(f" ID: {workflow_details.workflow.id}") result.append(" Sources:") for source in workflow_details.sources: result.append(f" - {source.name} (ID: {source.id})") for key, value in source.config: result.append(f" {key}: {value}") result.append(" Destinations:") for destination in workflow_details.destinations: result.append(f" - {destination.name} (ID: {destination.id})") for key, value in destination.config: result.append(f" {key}: {value}") return "\n".join(result)
  • Key helper function that aggregates workflow details by fetching lists of workflows, jobs, sources, and destinations from the Unstructured API client and associating them appropriately.
    async def gather_workflows_details(client: UnstructuredClient) -> list[WorkflowDetails]: workflows, jobs, sources, destinations = await asyncio.gather( client.workflows.list_workflows_async(request=ListWorkflowsRequest()), client.jobs.list_jobs_async(request=ListJobsRequest()), client.sources.list_sources_async(request=ListSourcesRequest()), client.destinations.list_destinations_async(request=ListDestinationsRequest()), ) workflows: list[WorkflowInformation] = workflows.response_list_workflows jobs: list[JobInformation] = jobs.response_list_jobs sources: list[SourceConnectorInformation] = sources.response_list_sources destinations: list[DestinationConnectorInformation] = destinations.response_list_destinations workflow_id_to_jobs = { workflow_id: list(grouped_jobs) for workflow_id, grouped_jobs in groupby(jobs, lambda x: x.workflow_id) } source_id_to_source_info = {source.id: source for source in sources} destination_id_to_destination_info = { destination.id: destination for destination in destinations } sorted_workflows = sorted(workflows, key=lambda x: x.updated_at, reverse=True) workflows_details = [] for workflow in sorted_workflows: workflow_details = WorkflowDetails( workflow=workflow, jobs=list(workflow_id_to_jobs.get(workflow.id, [])), sources=[ source_id_to_source_info[source_id] for source_id in workflow.sources if source_id in source_id_to_source_info ], destinations=[ destination_id_to_destination_info[destination_id] for destination_id in workflow.destinations if destination_id in destination_id_to_destination_info ], ) workflows_details.append(workflow_details) return workflows_details
  • Pydantic model defining the structure for workflow details used in the tool implementation.
    class WorkflowDetails(BaseModel): workflow: WorkflowInformation jobs: list[JobInformation] sources: list[SourceConnectorInformation] destinations: list[DestinationConnectorInformation]
  • Entry point where the MCP server is run, automatically registering all @mcp.tool() decorated functions including the target tool.
    def main(): load_environment_variables() if len(sys.argv) < 2: # server is directly being invoked from client mcp.run(transport="stdio") else: # server is running as HTTP SSE server # reference: https://github.com/sidharthrajaram/mcp-sse mcp_server = mcp._mcp_server # noqa: WPS437 import argparse parser = argparse.ArgumentParser(description="Run MCP SSE-based server") parser.add_argument("--host", default="127.0.0.1", help="Host to bind to") parser.add_argument("--port", type=int, default=8080, help="Port to listen on") args = parser.parse_args() # Bind SSE request handling to MCP server starlette_app = create_starlette_app(mcp_server, debug=True) uvicorn.run(starlette_app, host=args.host, port=args.port) if __name__ == "__main__": main()

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/Unstructured-IO/UNS-MCP'

If you have feedback or need assistance with the MCP directory API, please join our Discord server