Skip to main content
Glama
test_workflow.pyโ€ข6.41 kB
"""Tests for workflow engine.""" import pytest from pathlib import Path from delegation_mcp.workflow import ( WorkflowDefinition, WorkflowStep, WorkflowContext, WorkflowEngine, ) from delegation_mcp.config import OrchestratorConfig from delegation_mcp.orchestrator import OrchestratorRegistry @pytest.fixture def simple_workflow(): """Create a simple test workflow.""" return WorkflowDefinition( name="Test Workflow", description="Simple test workflow", steps=[ WorkflowStep( id="step1", agent="claude", task="Analyze {{ code_path }}", output="analysis", description="First step", ), WorkflowStep( id="step2", agent="gemini", task="Review: {{ analysis }}", output="review", condition="{{ analysis | length > 0 }}", description="Second step", ), ], ) @pytest.fixture def test_registry(): """Create test registry with mock commands.""" reg = OrchestratorRegistry() reg.register( OrchestratorConfig( name="claude", command="echo", args=["[CLAUDE]"], enabled=True, ) ) reg.register( OrchestratorConfig( name="gemini", command="echo", args=["[GEMINI]"], enabled=True, ) ) return reg @pytest.fixture def workflow_engine(test_registry): """Create workflow engine.""" return WorkflowEngine(test_registry) def test_workflow_context(): """Test workflow context variable management.""" context = WorkflowContext() # Set and get variables context.set("foo", "bar") assert context.get("foo") == "bar" assert context.get("missing") is None assert context.get("missing", "default") == "default" def test_context_interpolation(): """Test variable interpolation in templates.""" context = WorkflowContext() context.set("name", "World") context.set("count", 42) result = context.interpolate("Hello {{ name }}! Count: {{ count }}") assert result == "Hello World! Count: 42" def test_context_condition_evaluation(): """Test condition evaluation.""" context = WorkflowContext() # Test truthy/falsy context.set("exists", "value") assert context.evaluate_condition("{{ exists }}") is True context.set("empty", "") assert context.evaluate_condition("{{ empty }}") is False # Test length conditions context.set("items", [1, 2, 3]) assert context.evaluate_condition("{{ items | length > 0 }}") is True assert context.evaluate_condition("{{ items | length > 5 }}") is False context.set("text", "hello") assert context.evaluate_condition("{{ text | length > 0 }}") is True def test_workflow_serialization(simple_workflow, tmp_path): """Test workflow save/load.""" workflow_file = tmp_path / "test.yaml" # Save simple_workflow.to_yaml(workflow_file) assert workflow_file.exists() # Load loaded = WorkflowDefinition.from_yaml(workflow_file) assert loaded.name == simple_workflow.name assert len(loaded.steps) == len(simple_workflow.steps) assert loaded.steps[0].id == "step1" @pytest.mark.asyncio async def test_workflow_execution(workflow_engine, simple_workflow): """Test basic workflow execution.""" result = await workflow_engine.execute( simple_workflow, initial_context={"code_path": "test.py"} ) assert result is not None assert result.workflow_name == "Test Workflow" assert result.steps_completed == 2 assert result.total_steps == 2 assert result.success is True assert "analysis" in result.outputs assert "review" in result.outputs @pytest.mark.asyncio async def test_workflow_conditional_skip(workflow_engine): """Test conditional step skipping.""" workflow = WorkflowDefinition( name="Conditional Test", steps=[ WorkflowStep( id="always", agent="claude", task="Always run", output="result1", ), WorkflowStep( id="never", agent="gemini", task="Never run", output="result2", condition="{{ missing_var }}", ), ], ) result = await workflow_engine.execute(workflow) # Only first step should execute assert result.steps_completed == 1 assert "result1" in result.outputs assert "result2" not in result.outputs @pytest.mark.asyncio async def test_workflow_error_handling(test_registry): """Test workflow error handling.""" # Register a failing command test_registry.register( OrchestratorConfig( name="failing", command="false", # Command that always fails args=[], enabled=True, ) ) engine = WorkflowEngine(test_registry) workflow = WorkflowDefinition( name="Error Test", steps=[ WorkflowStep( id="fail", agent="failing", task="This will fail", output="result", ), WorkflowStep( id="never_reached", agent="claude", task="Should not execute", output="result2", ), ], ) result = await engine.execute(workflow) # Should stop on first error assert result.success is False assert result.steps_completed == 0 # Failed step doesn't count as completed assert len(result.errors) > 0 @pytest.mark.asyncio async def test_load_actual_workflows(workflow_engine): """Test loading actual workflow files.""" workflows_dir = Path("workflows") if not workflows_dir.exists(): pytest.skip("Workflows directory not found") workflows = workflow_engine.list_workflows(workflows_dir) # Should load at least some workflows assert len(workflows) > 0 # Check first workflow is valid workflow = workflows[0] assert workflow.name assert len(workflow.steps) > 0 assert all(step.agent for step in workflow.steps) assert all(step.task for step in workflow.steps)

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/carlosduplar/multi-agent-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server