Skip to main content
Glama
Unstructured-IO

Unstructured API MCP Server

Official

list_workflows_with_finished_jobs

Retrieve workflows that have completed jobs from the Unstructured API, with optional filtering by source or destination connector types.

Instructions

List workflows with finished jobs via the Unstructured API.
Args:
    source_type: Optional source connector type to filter by
    destination_type: Optional destination connector type to filter by
Returns:
    String containing the list of workflows with finished jobs and source and destination
    details

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
source_typeNo
destination_typeNo

Implementation Reference

  • Primary handler function for the tool 'list_workflows_with_finished_jobs'. Decorated with @mcp.tool() for automatic registration in the MCP server. Implements logic to filter and list workflows with completed jobs, optionally by source and destination types.
    @mcp.tool()
    async def list_workflows_with_finished_jobs(
        ctx: Context,
        source_type: Optional[SourceConnectorType | str] = None,
        destination_type: Optional[DestinationConnectorType | str] = None,
    ) -> str:
        """
        List workflows with finished jobs via the Unstructured API.
        Args:
            source_type: Optional source connector type to filter by
            destination_type: Optional destination connector type to filter by
        Returns:
            String containing the list of workflows with finished jobs and source and destination
            details
        """
        if source_type:
            try:
                source_type = (
                    SourceConnectorType(source_type) if isinstance(source_type, str) else source_type
                )
            except KeyError:
                return f"Invalid source type: {source_type}"
        if destination_type:
            try:
                destination_type = (
                    DestinationConnectorType(destination_type)
                    if isinstance(destination_type, str)
                    else destination_type
                )
            except KeyError:
                return f"Invalid destination type: {destination_type}"
    
        client = ctx.request_context.lifespan_context.client
        try:
            workflows_details = await gather_workflows_details(client=client)
        except Exception as e:
            return f"Error retrieving workflows: {str(e)}"
    
        filtered_workflows_details = []
    
        for workflow_details in workflows_details:
            updated_workflow_details = deepcopy(workflow_details)
    
            if source_type:
                updated_workflow_details.sources = [
                    source for source in workflow_details.sources if source.type == source_type
                ]
    
            if destination_type:
                updated_workflow_details.destinations = [
                    destination
                    for destination in workflow_details.destinations
                    if destination.type == destination_type
                ]
    
            updated_workflow_details.jobs = [
                job for job in workflow_details.jobs if job.status == JobStatus.COMPLETED
            ]
    
            if (
                updated_workflow_details.sources
                and updated_workflow_details.destinations
                and updated_workflow_details.jobs
            ):
                filtered_workflows_details.append(updated_workflow_details)
    
        if not filtered_workflows_details:
            return "No workflows found with finished jobs"
    
        result = ["Workflows:"]
        for workflow_details in filtered_workflows_details:
            result.append(f"- Name: {workflow_details.workflow.name}")
            result.append(f"  ID: {workflow_details.workflow.id}")
            result.append("  Sources:")
            for source in workflow_details.sources:
                result.append(f"    - {source.name} (ID: {source.id})")
                for key, value in source.config:
                    result.append(f"      {key}: {value}")
    
            result.append("  Destinations:")
            for destination in workflow_details.destinations:
                result.append(f"    - {destination.name} (ID: {destination.id})")
                for key, value in destination.config:
                    result.append(f"      {key}: {value}")
    
        return "\n".join(result)
  • Key helper function that aggregates workflow details by fetching lists of workflows, jobs, sources, and destinations from the Unstructured API client and associating them appropriately.
    async def gather_workflows_details(client: UnstructuredClient) -> list[WorkflowDetails]:
        workflows, jobs, sources, destinations = await asyncio.gather(
            client.workflows.list_workflows_async(request=ListWorkflowsRequest()),
            client.jobs.list_jobs_async(request=ListJobsRequest()),
            client.sources.list_sources_async(request=ListSourcesRequest()),
            client.destinations.list_destinations_async(request=ListDestinationsRequest()),
        )
        workflows: list[WorkflowInformation] = workflows.response_list_workflows
        jobs: list[JobInformation] = jobs.response_list_jobs
        sources: list[SourceConnectorInformation] = sources.response_list_sources
        destinations: list[DestinationConnectorInformation] = destinations.response_list_destinations
    
        workflow_id_to_jobs = {
            workflow_id: list(grouped_jobs)
            for workflow_id, grouped_jobs in groupby(jobs, lambda x: x.workflow_id)
        }
        source_id_to_source_info = {source.id: source for source in sources}
        destination_id_to_destination_info = {
            destination.id: destination for destination in destinations
        }
    
        sorted_workflows = sorted(workflows, key=lambda x: x.updated_at, reverse=True)
    
        workflows_details = []
    
        for workflow in sorted_workflows:
            workflow_details = WorkflowDetails(
                workflow=workflow,
                jobs=list(workflow_id_to_jobs.get(workflow.id, [])),
                sources=[
                    source_id_to_source_info[source_id]
                    for source_id in workflow.sources
                    if source_id in source_id_to_source_info
                ],
                destinations=[
                    destination_id_to_destination_info[destination_id]
                    for destination_id in workflow.destinations
                    if destination_id in destination_id_to_destination_info
                ],
            )
            workflows_details.append(workflow_details)
    
        return workflows_details
  • Pydantic model defining the structure for workflow details used in the tool implementation.
    class WorkflowDetails(BaseModel):
        workflow: WorkflowInformation
        jobs: list[JobInformation]
        sources: list[SourceConnectorInformation]
        destinations: list[DestinationConnectorInformation]
  • Entry point where the MCP server is run, automatically registering all @mcp.tool() decorated functions including the target tool.
    def main():
        load_environment_variables()
        if len(sys.argv) < 2:
            # server is directly being invoked from client
            mcp.run(transport="stdio")
        else:
            # server is running as HTTP SSE server
            # reference: https://github.com/sidharthrajaram/mcp-sse
            mcp_server = mcp._mcp_server  # noqa: WPS437
    
            import argparse
    
            parser = argparse.ArgumentParser(description="Run MCP SSE-based server")
            parser.add_argument("--host", default="127.0.0.1", help="Host to bind to")
            parser.add_argument("--port", type=int, default=8080, help="Port to listen on")
            args = parser.parse_args()
    
            # Bind SSE request handling to MCP server
            starlette_app = create_starlette_app(mcp_server, debug=True)
    
            uvicorn.run(starlette_app, host=args.host, port=args.port)
    
    
    if __name__ == "__main__":
        main()

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/Unstructured-IO/UNS-MCP'

If you have feedback or need assistance with the MCP directory API, please join our Discord server