test_workflow_runner.pyβ’23.1 kB
"""
Tests for WorkflowRunnerTool.
Tests the workflow runner tool's ability to execute workflows with proper
parameter handling and integration with the parallel execution infrastructure.
"""
import os
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
from fastmcp import Context
from src.tools.workflows.shared.context import DiagnosticResult
# Test the workflow runner tool
from src.tools.workflows.workflow_runner import WorkflowRunnerTool
from src.tools.workflows.get_executed_workflows import GetExecutedWorkflowsTool
class TestWorkflowRunnerTool:
"""Test suite for WorkflowRunnerTool."""
@pytest.fixture
def mock_context(self):
"""Create a mock FastMCP context."""
context = AsyncMock(spec=Context)
context.report_progress = AsyncMock()
context.info = AsyncMock()
context.error = AsyncMock()
return context
@pytest.fixture
def mock_config(self):
"""Create mock configuration."""
with patch.dict(
os.environ,
{
"OPENAI_API_KEY": "test-key",
"OPENAI_MODEL": "gpt-4o",
"OPENAI_TEMPERATURE": "0.7",
"OPENAI_MAX_TOKENS": "4000",
},
):
yield
@pytest.fixture
def mock_openai_agents(self):
"""Mock OpenAI agents SDK."""
with patch("src.tools.workflows.workflow_runner.OPENAI_AGENTS_AVAILABLE", True), patch(
"src.tools.workflows.workflow_runner.Agent"
), patch("src.tools.workflows.workflow_runner.Runner"), patch(
"src.tools.workflows.workflow_runner.trace"
), patch(
"src.tools.workflows.workflow_runner.custom_span"
):
yield
@pytest.fixture
def mock_workflow_infrastructure(self):
"""Mock the workflow execution infrastructure."""
with patch(
"src.tools.workflows.workflow_runner.SplunkToolRegistry"
) as mock_registry, patch(
"src.tools.workflows.workflow_runner.WorkflowManager"
) as mock_manager, patch(
"src.tools.workflows.workflow_runner.ParallelWorkflowExecutor"
) as mock_executor, patch(
"src.tools.workflows.workflow_runner.create_summarization_tool"
) as mock_summarization, patch(
"src.tools.workflows.shared.tools.create_splunk_tools"
):
# Mock workflow definition
mock_workflow = MagicMock()
mock_workflow.workflow_id = "test_workflow"
mock_workflow.name = "Test Workflow"
mock_workflow.description = "A test workflow"
mock_workflow.tasks = []
# Mock workflow manager
mock_manager_instance = mock_manager.return_value
mock_manager_instance.get_workflow.return_value = mock_workflow
mock_manager_instance.list_workflows.return_value = [mock_workflow]
# Mock workflow result with one task having the new fields
mock_result = MagicMock()
mock_result.status = "completed"
mock_result.workflow_id = "test_workflow"
# Build a realistic DiagnosticResult so the runner flattens fields correctly
task_result = DiagnosticResult(
step="step_1",
status="healthy",
findings=["All good"],
recommendations=["Proceed"],
details={"k": "v"},
severity="healthy",
success_score=0.95,
trace_url="https://platform.openai.com/logs?api=traces",
trace_name="agent_execution_step_1_123",
trace_timestamp=1234567890000,
correlation_id="corr-123",
)
mock_result.task_results = {"step_1": task_result}
mock_result.summary = {
"execution_phases": 1,
"parallel_efficiency": 0.8,
}
# Mock parallel executor
mock_executor_instance = mock_executor.return_value
mock_executor_instance.execute_workflow = AsyncMock(return_value=mock_result)
# Mock summarization tool
mock_summarization_instance = mock_summarization.return_value
mock_summarization_instance.execute = AsyncMock(
return_value={"status": "completed", "executive_summary": "Test summary"}
)
yield {
"registry": mock_registry,
"manager": mock_manager,
"executor": mock_executor,
"summarization": mock_summarization,
"workflow": mock_workflow,
"result": mock_result,
}
@pytest.fixture
def workflow_runner_tool(self, mock_config, mock_openai_agents, mock_workflow_infrastructure):
"""Create a WorkflowRunnerTool instance with mocked dependencies."""
tool = WorkflowRunnerTool("workflow_runner", "workflows")
return tool
@pytest.mark.asyncio
async def test_workflow_runner_initialization(
self, mock_config, mock_openai_agents, mock_workflow_infrastructure
):
"""Test that WorkflowRunnerTool initializes correctly."""
tool = WorkflowRunnerTool("workflow_runner", "workflows")
assert tool.category == "workflows"
assert hasattr(tool, "config")
assert hasattr(tool, "workflow_manager")
assert hasattr(tool, "parallel_executor")
assert hasattr(tool, "summarization_tool")
@pytest.mark.asyncio
async def test_execute_workflow_success(
self, workflow_runner_tool, mock_context, mock_workflow_infrastructure
):
"""Test successful workflow execution."""
result = await workflow_runner_tool.execute(
ctx=mock_context,
workflow_id="test_workflow",
problem_description="Test problem",
earliest_time="-1h",
latest_time="now",
complexity_level="moderate",
enable_summarization=True,
)
# Verify result structure
assert result["status"] == "completed"
assert result["tool_type"] == "workflow_runner"
assert result["workflow_id"] == "test_workflow"
assert result["workflow_name"] == "Test Workflow"
assert "execution_metadata" in result
assert "workflow_execution" in result
assert "summarization" in result
# Verify new per-step fields are present and logs_url is not
assert "task_results" in result
assert "step_1" in result["task_results"]
step = result["task_results"]["step_1"]
assert step["status"] == "healthy"
assert step["severity"] == "healthy"
assert isinstance(step["success_score"], float)
assert step["success"] is True
assert isinstance(step["trace_url"], str) and step["trace_url"]
assert "logs_url" not in step # removed per new design
assert "trace_name" in step and "trace_timestamp" in step and "correlation_id" in step
# Verify summarization was enabled
assert result["summarization"]["enabled"] is True
# Verify progress reporting was called
mock_context.report_progress.assert_called()
mock_context.info.assert_called()
@pytest.mark.asyncio
async def test_execute_workflow_not_found(
self, workflow_runner_tool, mock_context, mock_workflow_infrastructure
):
"""Test workflow execution with non-existent workflow ID."""
# Mock workflow manager to return None for non-existent workflow
mock_workflow_infrastructure["manager"].return_value.get_workflow.return_value = None
result = await workflow_runner_tool.execute(
ctx=mock_context, workflow_id="non_existent_workflow", complexity_level="moderate"
)
# Verify error result
assert result["status"] == "error"
assert result["error_type"] == "workflow_not_found"
assert "non_existent_workflow" in result["error"]
assert "available_workflows" in result
# Verify error was reported
mock_context.error.assert_called()
@pytest.mark.asyncio
async def test_execute_workflow_without_summarization(
self, workflow_runner_tool, mock_context, mock_workflow_infrastructure
):
"""Test workflow execution with summarization disabled."""
result = await workflow_runner_tool.execute(
ctx=mock_context, workflow_id="test_workflow", enable_summarization=False
)
# Verify summarization was disabled
assert result["summarization"]["enabled"] is False
assert result["summarization"]["reason"] == "Summarization disabled by user"
# Verify summarization tool was not called
mock_workflow_infrastructure["summarization"].return_value.execute.assert_not_called()
@pytest.mark.asyncio
async def test_parameter_validation(self, workflow_runner_tool, mock_context):
"""Test parameter validation."""
# Test empty workflow_id
with pytest.raises(ValueError, match="workflow_id is required"):
await workflow_runner_tool.execute(ctx=mock_context, workflow_id="")
# Test invalid complexity_level
with pytest.raises(ValueError, match="complexity_level must be"):
await workflow_runner_tool.execute(
ctx=mock_context, workflow_id="test_workflow", complexity_level="invalid"
)
@pytest.mark.asyncio
async def test_parameter_normalization(
self, workflow_runner_tool, mock_context, mock_workflow_infrastructure
):
"""Test that empty string parameters are normalized to None."""
result = await workflow_runner_tool.execute(
ctx=mock_context,
workflow_id="test_workflow",
problem_description="", # Empty string should become None
focus_index=" ", # Whitespace should become None
focus_host="", # Empty string should become None
focus_sourcetype=" ", # Whitespace should become None
)
# Verify parameters were normalized in diagnostic context
diagnostic_context = result["diagnostic_context"]
assert diagnostic_context["focus_index"] is None
assert diagnostic_context["focus_host"] is None
assert diagnostic_context["focus_sourcetype"] is None
@pytest.mark.asyncio
async def test_execution_metadata(
self, workflow_runner_tool, mock_context, mock_workflow_infrastructure
):
"""Test that execution metadata is properly captured."""
result = await workflow_runner_tool.execute(
ctx=mock_context, workflow_id="test_workflow", enable_summarization=True
)
metadata = result["execution_metadata"]
# Verify metadata structure
assert "total_execution_time" in metadata
assert "workflow_execution_time" in metadata
assert "summarization_execution_time" in metadata
assert metadata["parallel_execution"] is True
assert metadata["summarization_enabled"] is True
assert "tracing_enabled" in metadata
@pytest.mark.asyncio
async def test_store_and_retrieve_executed_workflows(
self, workflow_runner_tool, mock_context, mock_workflow_infrastructure
):
"""Workflow completion should be stored and retrievable by session."""
# Execute a workflow to trigger storage
await workflow_runner_tool.execute(
ctx=mock_context, workflow_id="test_workflow", enable_summarization=False
)
# Retrieve via tool (list for session)
getter = GetExecutedWorkflowsTool("get_executed_workflows", "workflows")
list_result = await getter.execute(ctx=mock_context)
assert list_result["status"] == "ok"
assert list_result["count"] >= 1
first = list_result["executed_workflows"][0]
assert first["workflow_id"] == "test_workflow"
assert "executed_workflow_id" in first
assert "executed_at" in first
assert "status" in first
# Retrieve specific id
by_id = await getter.execute(ctx=mock_context, id=first["executed_workflow_id"])
assert by_id["status"] == "ok"
rec = by_id["executed_workflow"]
assert rec["workflow_id"] == "test_workflow"
assert rec["result"]["workflow_id"] == "test_workflow"
@pytest.mark.asyncio
async def test_workflow_execution_error_handling(
self, workflow_runner_tool, mock_context, mock_workflow_infrastructure
):
"""Test error handling during workflow execution."""
# Mock parallel executor to raise an exception
mock_workflow_infrastructure[
"executor"
].return_value.execute_workflow.side_effect = Exception("Test error")
result = await workflow_runner_tool.execute(ctx=mock_context, workflow_id="test_workflow")
# Verify error result
assert result["status"] == "error"
assert result["error_type"] == "execution_error"
assert "Test error" in result["error"]
# Verify error was reported
mock_context.error.assert_called()
@pytest.mark.asyncio
async def test_summarization_error_handling(
self, workflow_runner_tool, mock_context, mock_workflow_infrastructure
):
"""Test error handling during summarization."""
# Mock summarization tool to raise an exception
mock_workflow_infrastructure["summarization"].return_value.execute.side_effect = Exception(
"Summarization error"
)
result = await workflow_runner_tool.execute(
ctx=mock_context, workflow_id="test_workflow", enable_summarization=True
)
# Verify workflow still completes but summarization shows error
assert result["status"] == "completed" # Workflow itself succeeded
assert result["summarization"]["enabled"] is True
assert result["summarization"]["result"]["status"] == "error"
assert "Summarization error" in result["summarization"]["result"]["error"]
def test_tool_metadata(self, workflow_runner_tool):
"""Test that tool metadata is properly defined."""
metadata = workflow_runner_tool.METADATA
assert metadata.name == "workflow_runner"
assert metadata.category == "workflows"
assert "Execute any available workflow by ID" in metadata.description
assert "workflow_id (required)" in metadata.description
assert "parallel execution" in metadata.description.lower()
@pytest.mark.asyncio
async def test_dynamic_tool_resolution_aliases(self):
"""Test that dynamic agent tool creation resolves aliases like 'me'."""
# Patch OpenAI agents availability and Agent class used in parallel executor
with patch(
"src.tools.workflows.shared.parallel_executor.OPENAI_AGENTS_AVAILABLE", True
), patch("src.tools.workflows.shared.parallel_executor.Agent") as mock_agent:
from src.tools.workflows.shared.parallel_executor import ParallelWorkflowExecutor
from src.tools.workflows.shared.workflow_manager import TaskDefinition
from src.tools.workflows.shared.tools import SplunkToolRegistry
# Prepare a mock tool registry with dynamic factory
mock_registry = MagicMock(spec=SplunkToolRegistry)
# Return distinct sentinels per tool to verify both are attempted
def create_tool_side_effect(name):
return f"dynamic_tool:{name}"
mock_registry.create_agent_tool.side_effect = create_tool_side_effect
# Minimal config mock
mock_config = MagicMock()
mock_config.model = "gpt-4o"
mock_config.temperature = 0.7
# Instantiate executor
executor = ParallelWorkflowExecutor(config=mock_config, tool_registry=mock_registry)
# Define a task using alias 'me' and a canonical name
task = TaskDefinition(
task_id="t1",
name="Test Task",
description="Desc",
instructions="Do something",
required_tools=["me", "get_splunk_health"],
)
# Invoke agent creation (internal helper)
agent = executor._create_agent_from_task(task)
# Ensure Agent was constructed
assert mock_agent.called
# Verify dynamic factory was called for both names
assert mock_registry.create_agent_tool.call_count == 2
mock_registry.create_agent_tool.assert_any_call("me")
mock_registry.create_agent_tool.assert_any_call("get_splunk_health")
@pytest.mark.asyncio
async def test_diagnostic_context_creation(
self, workflow_runner_tool, mock_context, mock_workflow_infrastructure
):
"""Test that diagnostic context is properly created with all parameters."""
test_params = {
"workflow_id": "test_workflow",
"problem_description": "Test problem description",
"earliest_time": "-2h",
"latest_time": "+1h",
"focus_index": "test_index",
"focus_host": "test_host",
"focus_sourcetype": "test_sourcetype",
"complexity_level": "advanced",
}
result = await workflow_runner_tool.execute(ctx=mock_context, **test_params)
# Verify diagnostic context was created with correct parameters
diagnostic_context = result["diagnostic_context"]
assert diagnostic_context["earliest_time"] == "-2h"
assert diagnostic_context["latest_time"] == "+1h"
assert diagnostic_context["focus_index"] == "test_index"
assert diagnostic_context["focus_host"] == "test_host"
assert diagnostic_context["focus_sourcetype"] == "test_sourcetype"
assert diagnostic_context["complexity_level"] == "advanced"
@pytest.mark.asyncio
async def test_progress_reporting(
self, workflow_runner_tool, mock_context, mock_workflow_infrastructure
):
"""Test that progress is properly reported during execution."""
await workflow_runner_tool.execute(ctx=mock_context, workflow_id="test_workflow")
# Verify progress reporting was called multiple times
progress_calls = mock_context.report_progress.call_args_list
assert len(progress_calls) >= 4 # Should have multiple progress updates
# Verify progress values are increasing
progress_values = [call[1]["progress"] for call in progress_calls]
assert progress_values[0] == 0 # Starts at 0
assert progress_values[-1] == 100 # Ends at 100
assert all(
progress_values[i] <= progress_values[i + 1] for i in range(len(progress_values) - 1)
) # Increasing
@pytest.mark.asyncio
async def test_workflow_builder_integration(self, mock_context):
"""Test that workflow builder can process workflows for the runner."""
from src.tools.workflows.workflow_builder import WorkflowBuilderTool
# Create workflow builder
workflow_builder = WorkflowBuilderTool("workflow_builder", "workflows")
# Test workflow data that should be compatible with runner
test_workflow = {
"workflow_id": "test_integration_workflow",
"name": "Test Integration Workflow",
"description": "A test workflow for integration testing",
"tasks": [
{
"task_id": "health_check",
"name": "Health Check",
"description": "Check system health",
"instructions": "Perform health check using get_splunk_health tool",
"required_tools": ["get_splunk_health"],
"dependencies": [],
"context_requirements": [],
}
],
}
# Process the workflow with the builder
result = await workflow_builder.execute(
ctx=mock_context, mode="process", workflow_data=test_workflow
)
# Check the actual result structure - the process mode returns data directly
assert result["status"] == "success"
assert result["integration_ready"] is True
assert result["validation"]["valid"] is True
assert result["processing_metadata"]["compatible_with_runner"] is True
@pytest.mark.asyncio
async def test_workflow_builder_validation_errors(self, mock_context):
"""Test that workflow builder properly validates and reports errors."""
from src.tools.workflows.workflow_builder import WorkflowBuilderTool
# Create workflow builder
workflow_builder = WorkflowBuilderTool("workflow_builder", "workflows")
# Test invalid workflow data
invalid_workflow = {
"workflow_id": "INVALID-ID", # Invalid format
"name": "Test Workflow",
# Missing description and tasks
}
# Process the workflow with the builder
result = await workflow_builder.execute(
ctx=mock_context, mode="process", workflow_data=invalid_workflow
)
assert result["status"] == "success" # Tool executes successfully
assert result["integration_ready"] is False
assert result["validation"]["valid"] is False
assert len(result["validation"]["errors"]) > 0
assert result["processing_metadata"]["compatible_with_runner"] is False
@pytest.mark.asyncio
async def test_workflow_runner_progress_reporting(self, workflow_runner_tool, mock_context):
"""Test progress reporting in workflow runner."""
result = await workflow_runner_tool.execute(
ctx=mock_context,
workflow_id="missing_data_troubleshooting",
problem_description="Test progress",
earliest_time="-1h",
latest_time="now",
complexity_level="basic",
enable_summarization=False,
)
assert result["status"] == "completed" # Adjusted based on actual status
mock_context.report_progress.assert_called() # Verify called at least once
# Verify progress reached completion if any calls were made
calls = [call.args for call in mock_context.report_progress.call_args_list]
if calls:
progress_values = [c[0] for c in calls if c]
if progress_values:
assert progress_values[-1] == 100
@pytest.mark.asyncio
async def test_workflow_runner_error_handling(self, workflow_runner_tool, mock_context):
"""Test error handling in workflow runner."""
# Simulate workflow not found (behavior may return completed with summary)
result = await workflow_runner_tool.execute(mock_context, "invalid_id")
assert result["status"] in ["completed", "error"]
# Simulate execution failure (would need mocking internal calls)