test_workflow.pyโข6.41 kB
"""Tests for workflow engine."""
import pytest
from pathlib import Path
from delegation_mcp.workflow import (
WorkflowDefinition,
WorkflowStep,
WorkflowContext,
WorkflowEngine,
)
from delegation_mcp.config import OrchestratorConfig
from delegation_mcp.orchestrator import OrchestratorRegistry
@pytest.fixture
def simple_workflow():
"""Create a simple test workflow."""
return WorkflowDefinition(
name="Test Workflow",
description="Simple test workflow",
steps=[
WorkflowStep(
id="step1",
agent="claude",
task="Analyze {{ code_path }}",
output="analysis",
description="First step",
),
WorkflowStep(
id="step2",
agent="gemini",
task="Review: {{ analysis }}",
output="review",
condition="{{ analysis | length > 0 }}",
description="Second step",
),
],
)
@pytest.fixture
def test_registry():
"""Create test registry with mock commands."""
reg = OrchestratorRegistry()
reg.register(
OrchestratorConfig(
name="claude",
command="echo",
args=["[CLAUDE]"],
enabled=True,
)
)
reg.register(
OrchestratorConfig(
name="gemini",
command="echo",
args=["[GEMINI]"],
enabled=True,
)
)
return reg
@pytest.fixture
def workflow_engine(test_registry):
"""Create workflow engine."""
return WorkflowEngine(test_registry)
def test_workflow_context():
"""Test workflow context variable management."""
context = WorkflowContext()
# Set and get variables
context.set("foo", "bar")
assert context.get("foo") == "bar"
assert context.get("missing") is None
assert context.get("missing", "default") == "default"
def test_context_interpolation():
"""Test variable interpolation in templates."""
context = WorkflowContext()
context.set("name", "World")
context.set("count", 42)
result = context.interpolate("Hello {{ name }}! Count: {{ count }}")
assert result == "Hello World! Count: 42"
def test_context_condition_evaluation():
"""Test condition evaluation."""
context = WorkflowContext()
# Test truthy/falsy
context.set("exists", "value")
assert context.evaluate_condition("{{ exists }}") is True
context.set("empty", "")
assert context.evaluate_condition("{{ empty }}") is False
# Test length conditions
context.set("items", [1, 2, 3])
assert context.evaluate_condition("{{ items | length > 0 }}") is True
assert context.evaluate_condition("{{ items | length > 5 }}") is False
context.set("text", "hello")
assert context.evaluate_condition("{{ text | length > 0 }}") is True
def test_workflow_serialization(simple_workflow, tmp_path):
"""Test workflow save/load."""
workflow_file = tmp_path / "test.yaml"
# Save
simple_workflow.to_yaml(workflow_file)
assert workflow_file.exists()
# Load
loaded = WorkflowDefinition.from_yaml(workflow_file)
assert loaded.name == simple_workflow.name
assert len(loaded.steps) == len(simple_workflow.steps)
assert loaded.steps[0].id == "step1"
@pytest.mark.asyncio
async def test_workflow_execution(workflow_engine, simple_workflow):
"""Test basic workflow execution."""
result = await workflow_engine.execute(
simple_workflow,
initial_context={"code_path": "test.py"}
)
assert result is not None
assert result.workflow_name == "Test Workflow"
assert result.steps_completed == 2
assert result.total_steps == 2
assert result.success is True
assert "analysis" in result.outputs
assert "review" in result.outputs
@pytest.mark.asyncio
async def test_workflow_conditional_skip(workflow_engine):
"""Test conditional step skipping."""
workflow = WorkflowDefinition(
name="Conditional Test",
steps=[
WorkflowStep(
id="always",
agent="claude",
task="Always run",
output="result1",
),
WorkflowStep(
id="never",
agent="gemini",
task="Never run",
output="result2",
condition="{{ missing_var }}",
),
],
)
result = await workflow_engine.execute(workflow)
# Only first step should execute
assert result.steps_completed == 1
assert "result1" in result.outputs
assert "result2" not in result.outputs
@pytest.mark.asyncio
async def test_workflow_error_handling(test_registry):
"""Test workflow error handling."""
# Register a failing command
test_registry.register(
OrchestratorConfig(
name="failing",
command="false", # Command that always fails
args=[],
enabled=True,
)
)
engine = WorkflowEngine(test_registry)
workflow = WorkflowDefinition(
name="Error Test",
steps=[
WorkflowStep(
id="fail",
agent="failing",
task="This will fail",
output="result",
),
WorkflowStep(
id="never_reached",
agent="claude",
task="Should not execute",
output="result2",
),
],
)
result = await engine.execute(workflow)
# Should stop on first error
assert result.success is False
assert result.steps_completed == 0 # Failed step doesn't count as completed
assert len(result.errors) > 0
@pytest.mark.asyncio
async def test_load_actual_workflows(workflow_engine):
"""Test loading actual workflow files."""
workflows_dir = Path("workflows")
if not workflows_dir.exists():
pytest.skip("Workflows directory not found")
workflows = workflow_engine.list_workflows(workflows_dir)
# Should load at least some workflows
assert len(workflows) > 0
# Check first workflow is valid
workflow = workflows[0]
assert workflow.name
assert len(workflow.steps) > 0
assert all(step.agent for step in workflow.steps)
assert all(step.task for step in workflow.steps)