Skip to main content
Glama

execute_workflow

Run a specified workflow with optional streaming support using AutoGen MCP Server, enabling integration and multi-agent conversations with provided input data.

Instructions

Execute a workflow with streaming support

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
input_dataYesInput data
streamingNoEnable streaming
workflow_nameYesWorkflow name

Implementation Reference

  • MCP-style handler entry point for the execute_workflow tool, delegates to internal _execute_workflow method.
    async def handle_execute_workflow(self, arguments: Dict[str, Any]) -> Dict[str, Any]: """Handle execute_workflow tool call.""" return await self._execute_workflow(arguments)
  • Primary handler logic for execute_workflow: parses arguments (workflow_name, input_data, etc.) and executes via WorkflowManager.
    async def _execute_workflow(self, args: Dict[str, Any]) -> Dict[str, Any]: """Execute a predefined workflow.""" workflow_name = args["workflow_name"] input_data = args["input_data"] output_format = args.get("output_format", "json") quality_checks = args.get("quality_checks", False) try: result = await self.workflow_manager.execute_workflow( workflow_name, input_data, output_format, quality_checks ) return { "success": True, "workflow": workflow_name, "result": result, "format": output_format } except Exception as e: return {"error": f"Workflow execution failed: {str(e)}"}
  • Core WorkflowManager.execute_workflow implementation: dispatches to template-specific workflows (e.g., code_generation) or custom workflows based on name.
    async def execute_workflow( self, workflow_name: str, input_data: Dict[str, Any], output_format: str = "json", quality_checks: bool = False ) -> Dict[str, Any]: """Execute a predefined workflow with enhanced features.""" if workflow_name in self._workflow_templates: return await self._workflow_templates[workflow_name]( input_data, output_format, quality_checks ) elif workflow_name in self._workflows: return await self._execute_custom_workflow( self._workflows[workflow_name], input_data, output_format, quality_checks ) else: raise ValueError(f"Unknown workflow: {workflow_name}")
  • Tool dispatcher (handle_tool_call) that registers/handles execute_workflow among other tools via elif branch.
    async def handle_tool_call(self, tool_name: str, arguments: Dict[str, Any]) -> Dict[str, Any]: """Handle tool calls with enhanced AutoGen features.""" try: if tool_name == "create_agent": return await self._create_agent(arguments) elif tool_name == "create_workflow": return await self._create_workflow(arguments) elif tool_name == "execute_chat": return await self._execute_chat(arguments) elif tool_name == "execute_group_chat": return await self._execute_group_chat(arguments) elif tool_name == "execute_nested_chat": return await self._execute_nested_chat(arguments) elif tool_name == "execute_swarm": return await self._execute_swarm(arguments) elif tool_name == "execute_workflow": return await self._execute_workflow(arguments) elif tool_name == "manage_agent_memory": return await self._manage_agent_memory(arguments) elif tool_name == "configure_teachability": return await self._configure_teachability(arguments) elif tool_name == "get_agent_status": return await self._get_agent_status(arguments) elif tool_name == "get_resource": return await self._get_resource(arguments) else: return {"error": f"Unknown tool: {tool_name}"} except Exception as e: return {"error": str(e)}

Other Tools

Related Tools

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/DynamicEndpoints/Autogen_MCP'

If you have feedback or need assistance with the MCP directory API, please join our Discord server