execute_workflow
Run a specified workflow with optional streaming support using AutoGen MCP Server, enabling integration and multi-agent conversations with provided input data.
Instructions
Execute a workflow with streaming support
Input Schema
TableJSON Schema
| Name | Required | Description | Default |
|---|---|---|---|
| input_data | Yes | Input data | |
| streaming | No | Enable streaming | |
| workflow_name | Yes | Workflow name |
Implementation Reference
- src/autogen_mcp/server.py:618-620 (handler)MCP-style handler entry point for the execute_workflow tool, delegates to internal _execute_workflow method.async def handle_execute_workflow(self, arguments: Dict[str, Any]) -> Dict[str, Any]: """Handle execute_workflow tool call.""" return await self._execute_workflow(arguments)
- src/autogen_mcp/server.py:384-402 (handler)Primary handler logic for execute_workflow: parses arguments (workflow_name, input_data, etc.) and executes via WorkflowManager.async def _execute_workflow(self, args: Dict[str, Any]) -> Dict[str, Any]: """Execute a predefined workflow.""" workflow_name = args["workflow_name"] input_data = args["input_data"] output_format = args.get("output_format", "json") quality_checks = args.get("quality_checks", False) try: result = await self.workflow_manager.execute_workflow( workflow_name, input_data, output_format, quality_checks ) return { "success": True, "workflow": workflow_name, "result": result, "format": output_format } except Exception as e: return {"error": f"Workflow execution failed: {str(e)}"}
- src/autogen_mcp/workflows.py:40-57 (handler)Core WorkflowManager.execute_workflow implementation: dispatches to template-specific workflows (e.g., code_generation) or custom workflows based on name.async def execute_workflow( self, workflow_name: str, input_data: Dict[str, Any], output_format: str = "json", quality_checks: bool = False ) -> Dict[str, Any]: """Execute a predefined workflow with enhanced features.""" if workflow_name in self._workflow_templates: return await self._workflow_templates[workflow_name]( input_data, output_format, quality_checks ) elif workflow_name in self._workflows: return await self._execute_custom_workflow( self._workflows[workflow_name], input_data, output_format, quality_checks ) else: raise ValueError(f"Unknown workflow: {workflow_name}")
- src/autogen_mcp/server.py:73-101 (registration)Tool dispatcher (handle_tool_call) that registers/handles execute_workflow among other tools via elif branch.async def handle_tool_call(self, tool_name: str, arguments: Dict[str, Any]) -> Dict[str, Any]: """Handle tool calls with enhanced AutoGen features.""" try: if tool_name == "create_agent": return await self._create_agent(arguments) elif tool_name == "create_workflow": return await self._create_workflow(arguments) elif tool_name == "execute_chat": return await self._execute_chat(arguments) elif tool_name == "execute_group_chat": return await self._execute_group_chat(arguments) elif tool_name == "execute_nested_chat": return await self._execute_nested_chat(arguments) elif tool_name == "execute_swarm": return await self._execute_swarm(arguments) elif tool_name == "execute_workflow": return await self._execute_workflow(arguments) elif tool_name == "manage_agent_memory": return await self._manage_agent_memory(arguments) elif tool_name == "configure_teachability": return await self._configure_teachability(arguments) elif tool_name == "get_agent_status": return await self._get_agent_status(arguments) elif tool_name == "get_resource": return await self._get_resource(arguments) else: return {"error": f"Unknown tool: {tool_name}"} except Exception as e: return {"error": str(e)}