Skip to main content
Glama

MCP Server for Splunk

Apache 2.0
16
  • Apple
  • Linux
test_workflow_runner.pyβ€’23.1 kB
""" Tests for WorkflowRunnerTool. Tests the workflow runner tool's ability to execute workflows with proper parameter handling and integration with the parallel execution infrastructure. """ import os from unittest.mock import AsyncMock, MagicMock, patch import pytest from fastmcp import Context from src.tools.workflows.shared.context import DiagnosticResult # Test the workflow runner tool from src.tools.workflows.workflow_runner import WorkflowRunnerTool from src.tools.workflows.get_executed_workflows import GetExecutedWorkflowsTool class TestWorkflowRunnerTool: """Test suite for WorkflowRunnerTool.""" @pytest.fixture def mock_context(self): """Create a mock FastMCP context.""" context = AsyncMock(spec=Context) context.report_progress = AsyncMock() context.info = AsyncMock() context.error = AsyncMock() return context @pytest.fixture def mock_config(self): """Create mock configuration.""" with patch.dict( os.environ, { "OPENAI_API_KEY": "test-key", "OPENAI_MODEL": "gpt-4o", "OPENAI_TEMPERATURE": "0.7", "OPENAI_MAX_TOKENS": "4000", }, ): yield @pytest.fixture def mock_openai_agents(self): """Mock OpenAI agents SDK.""" with patch("src.tools.workflows.workflow_runner.OPENAI_AGENTS_AVAILABLE", True), patch( "src.tools.workflows.workflow_runner.Agent" ), patch("src.tools.workflows.workflow_runner.Runner"), patch( "src.tools.workflows.workflow_runner.trace" ), patch( "src.tools.workflows.workflow_runner.custom_span" ): yield @pytest.fixture def mock_workflow_infrastructure(self): """Mock the workflow execution infrastructure.""" with patch( "src.tools.workflows.workflow_runner.SplunkToolRegistry" ) as mock_registry, patch( "src.tools.workflows.workflow_runner.WorkflowManager" ) as mock_manager, patch( "src.tools.workflows.workflow_runner.ParallelWorkflowExecutor" ) as mock_executor, patch( "src.tools.workflows.workflow_runner.create_summarization_tool" ) as mock_summarization, patch( "src.tools.workflows.shared.tools.create_splunk_tools" ): # Mock workflow definition mock_workflow = MagicMock() mock_workflow.workflow_id = "test_workflow" mock_workflow.name = "Test Workflow" mock_workflow.description = "A test workflow" mock_workflow.tasks = [] # Mock workflow manager mock_manager_instance = mock_manager.return_value mock_manager_instance.get_workflow.return_value = mock_workflow mock_manager_instance.list_workflows.return_value = [mock_workflow] # Mock workflow result with one task having the new fields mock_result = MagicMock() mock_result.status = "completed" mock_result.workflow_id = "test_workflow" # Build a realistic DiagnosticResult so the runner flattens fields correctly task_result = DiagnosticResult( step="step_1", status="healthy", findings=["All good"], recommendations=["Proceed"], details={"k": "v"}, severity="healthy", success_score=0.95, trace_url="https://platform.openai.com/logs?api=traces", trace_name="agent_execution_step_1_123", trace_timestamp=1234567890000, correlation_id="corr-123", ) mock_result.task_results = {"step_1": task_result} mock_result.summary = { "execution_phases": 1, "parallel_efficiency": 0.8, } # Mock parallel executor mock_executor_instance = mock_executor.return_value mock_executor_instance.execute_workflow = AsyncMock(return_value=mock_result) # Mock summarization tool mock_summarization_instance = mock_summarization.return_value mock_summarization_instance.execute = AsyncMock( return_value={"status": "completed", "executive_summary": "Test summary"} ) yield { "registry": mock_registry, "manager": mock_manager, "executor": mock_executor, "summarization": mock_summarization, "workflow": mock_workflow, "result": mock_result, } @pytest.fixture def workflow_runner_tool(self, mock_config, mock_openai_agents, mock_workflow_infrastructure): """Create a WorkflowRunnerTool instance with mocked dependencies.""" tool = WorkflowRunnerTool("workflow_runner", "workflows") return tool @pytest.mark.asyncio async def test_workflow_runner_initialization( self, mock_config, mock_openai_agents, mock_workflow_infrastructure ): """Test that WorkflowRunnerTool initializes correctly.""" tool = WorkflowRunnerTool("workflow_runner", "workflows") assert tool.category == "workflows" assert hasattr(tool, "config") assert hasattr(tool, "workflow_manager") assert hasattr(tool, "parallel_executor") assert hasattr(tool, "summarization_tool") @pytest.mark.asyncio async def test_execute_workflow_success( self, workflow_runner_tool, mock_context, mock_workflow_infrastructure ): """Test successful workflow execution.""" result = await workflow_runner_tool.execute( ctx=mock_context, workflow_id="test_workflow", problem_description="Test problem", earliest_time="-1h", latest_time="now", complexity_level="moderate", enable_summarization=True, ) # Verify result structure assert result["status"] == "completed" assert result["tool_type"] == "workflow_runner" assert result["workflow_id"] == "test_workflow" assert result["workflow_name"] == "Test Workflow" assert "execution_metadata" in result assert "workflow_execution" in result assert "summarization" in result # Verify new per-step fields are present and logs_url is not assert "task_results" in result assert "step_1" in result["task_results"] step = result["task_results"]["step_1"] assert step["status"] == "healthy" assert step["severity"] == "healthy" assert isinstance(step["success_score"], float) assert step["success"] is True assert isinstance(step["trace_url"], str) and step["trace_url"] assert "logs_url" not in step # removed per new design assert "trace_name" in step and "trace_timestamp" in step and "correlation_id" in step # Verify summarization was enabled assert result["summarization"]["enabled"] is True # Verify progress reporting was called mock_context.report_progress.assert_called() mock_context.info.assert_called() @pytest.mark.asyncio async def test_execute_workflow_not_found( self, workflow_runner_tool, mock_context, mock_workflow_infrastructure ): """Test workflow execution with non-existent workflow ID.""" # Mock workflow manager to return None for non-existent workflow mock_workflow_infrastructure["manager"].return_value.get_workflow.return_value = None result = await workflow_runner_tool.execute( ctx=mock_context, workflow_id="non_existent_workflow", complexity_level="moderate" ) # Verify error result assert result["status"] == "error" assert result["error_type"] == "workflow_not_found" assert "non_existent_workflow" in result["error"] assert "available_workflows" in result # Verify error was reported mock_context.error.assert_called() @pytest.mark.asyncio async def test_execute_workflow_without_summarization( self, workflow_runner_tool, mock_context, mock_workflow_infrastructure ): """Test workflow execution with summarization disabled.""" result = await workflow_runner_tool.execute( ctx=mock_context, workflow_id="test_workflow", enable_summarization=False ) # Verify summarization was disabled assert result["summarization"]["enabled"] is False assert result["summarization"]["reason"] == "Summarization disabled by user" # Verify summarization tool was not called mock_workflow_infrastructure["summarization"].return_value.execute.assert_not_called() @pytest.mark.asyncio async def test_parameter_validation(self, workflow_runner_tool, mock_context): """Test parameter validation.""" # Test empty workflow_id with pytest.raises(ValueError, match="workflow_id is required"): await workflow_runner_tool.execute(ctx=mock_context, workflow_id="") # Test invalid complexity_level with pytest.raises(ValueError, match="complexity_level must be"): await workflow_runner_tool.execute( ctx=mock_context, workflow_id="test_workflow", complexity_level="invalid" ) @pytest.mark.asyncio async def test_parameter_normalization( self, workflow_runner_tool, mock_context, mock_workflow_infrastructure ): """Test that empty string parameters are normalized to None.""" result = await workflow_runner_tool.execute( ctx=mock_context, workflow_id="test_workflow", problem_description="", # Empty string should become None focus_index=" ", # Whitespace should become None focus_host="", # Empty string should become None focus_sourcetype=" ", # Whitespace should become None ) # Verify parameters were normalized in diagnostic context diagnostic_context = result["diagnostic_context"] assert diagnostic_context["focus_index"] is None assert diagnostic_context["focus_host"] is None assert diagnostic_context["focus_sourcetype"] is None @pytest.mark.asyncio async def test_execution_metadata( self, workflow_runner_tool, mock_context, mock_workflow_infrastructure ): """Test that execution metadata is properly captured.""" result = await workflow_runner_tool.execute( ctx=mock_context, workflow_id="test_workflow", enable_summarization=True ) metadata = result["execution_metadata"] # Verify metadata structure assert "total_execution_time" in metadata assert "workflow_execution_time" in metadata assert "summarization_execution_time" in metadata assert metadata["parallel_execution"] is True assert metadata["summarization_enabled"] is True assert "tracing_enabled" in metadata @pytest.mark.asyncio async def test_store_and_retrieve_executed_workflows( self, workflow_runner_tool, mock_context, mock_workflow_infrastructure ): """Workflow completion should be stored and retrievable by session.""" # Execute a workflow to trigger storage await workflow_runner_tool.execute( ctx=mock_context, workflow_id="test_workflow", enable_summarization=False ) # Retrieve via tool (list for session) getter = GetExecutedWorkflowsTool("get_executed_workflows", "workflows") list_result = await getter.execute(ctx=mock_context) assert list_result["status"] == "ok" assert list_result["count"] >= 1 first = list_result["executed_workflows"][0] assert first["workflow_id"] == "test_workflow" assert "executed_workflow_id" in first assert "executed_at" in first assert "status" in first # Retrieve specific id by_id = await getter.execute(ctx=mock_context, id=first["executed_workflow_id"]) assert by_id["status"] == "ok" rec = by_id["executed_workflow"] assert rec["workflow_id"] == "test_workflow" assert rec["result"]["workflow_id"] == "test_workflow" @pytest.mark.asyncio async def test_workflow_execution_error_handling( self, workflow_runner_tool, mock_context, mock_workflow_infrastructure ): """Test error handling during workflow execution.""" # Mock parallel executor to raise an exception mock_workflow_infrastructure[ "executor" ].return_value.execute_workflow.side_effect = Exception("Test error") result = await workflow_runner_tool.execute(ctx=mock_context, workflow_id="test_workflow") # Verify error result assert result["status"] == "error" assert result["error_type"] == "execution_error" assert "Test error" in result["error"] # Verify error was reported mock_context.error.assert_called() @pytest.mark.asyncio async def test_summarization_error_handling( self, workflow_runner_tool, mock_context, mock_workflow_infrastructure ): """Test error handling during summarization.""" # Mock summarization tool to raise an exception mock_workflow_infrastructure["summarization"].return_value.execute.side_effect = Exception( "Summarization error" ) result = await workflow_runner_tool.execute( ctx=mock_context, workflow_id="test_workflow", enable_summarization=True ) # Verify workflow still completes but summarization shows error assert result["status"] == "completed" # Workflow itself succeeded assert result["summarization"]["enabled"] is True assert result["summarization"]["result"]["status"] == "error" assert "Summarization error" in result["summarization"]["result"]["error"] def test_tool_metadata(self, workflow_runner_tool): """Test that tool metadata is properly defined.""" metadata = workflow_runner_tool.METADATA assert metadata.name == "workflow_runner" assert metadata.category == "workflows" assert "Execute any available workflow by ID" in metadata.description assert "workflow_id (required)" in metadata.description assert "parallel execution" in metadata.description.lower() @pytest.mark.asyncio async def test_dynamic_tool_resolution_aliases(self): """Test that dynamic agent tool creation resolves aliases like 'me'.""" # Patch OpenAI agents availability and Agent class used in parallel executor with patch( "src.tools.workflows.shared.parallel_executor.OPENAI_AGENTS_AVAILABLE", True ), patch("src.tools.workflows.shared.parallel_executor.Agent") as mock_agent: from src.tools.workflows.shared.parallel_executor import ParallelWorkflowExecutor from src.tools.workflows.shared.workflow_manager import TaskDefinition from src.tools.workflows.shared.tools import SplunkToolRegistry # Prepare a mock tool registry with dynamic factory mock_registry = MagicMock(spec=SplunkToolRegistry) # Return distinct sentinels per tool to verify both are attempted def create_tool_side_effect(name): return f"dynamic_tool:{name}" mock_registry.create_agent_tool.side_effect = create_tool_side_effect # Minimal config mock mock_config = MagicMock() mock_config.model = "gpt-4o" mock_config.temperature = 0.7 # Instantiate executor executor = ParallelWorkflowExecutor(config=mock_config, tool_registry=mock_registry) # Define a task using alias 'me' and a canonical name task = TaskDefinition( task_id="t1", name="Test Task", description="Desc", instructions="Do something", required_tools=["me", "get_splunk_health"], ) # Invoke agent creation (internal helper) agent = executor._create_agent_from_task(task) # Ensure Agent was constructed assert mock_agent.called # Verify dynamic factory was called for both names assert mock_registry.create_agent_tool.call_count == 2 mock_registry.create_agent_tool.assert_any_call("me") mock_registry.create_agent_tool.assert_any_call("get_splunk_health") @pytest.mark.asyncio async def test_diagnostic_context_creation( self, workflow_runner_tool, mock_context, mock_workflow_infrastructure ): """Test that diagnostic context is properly created with all parameters.""" test_params = { "workflow_id": "test_workflow", "problem_description": "Test problem description", "earliest_time": "-2h", "latest_time": "+1h", "focus_index": "test_index", "focus_host": "test_host", "focus_sourcetype": "test_sourcetype", "complexity_level": "advanced", } result = await workflow_runner_tool.execute(ctx=mock_context, **test_params) # Verify diagnostic context was created with correct parameters diagnostic_context = result["diagnostic_context"] assert diagnostic_context["earliest_time"] == "-2h" assert diagnostic_context["latest_time"] == "+1h" assert diagnostic_context["focus_index"] == "test_index" assert diagnostic_context["focus_host"] == "test_host" assert diagnostic_context["focus_sourcetype"] == "test_sourcetype" assert diagnostic_context["complexity_level"] == "advanced" @pytest.mark.asyncio async def test_progress_reporting( self, workflow_runner_tool, mock_context, mock_workflow_infrastructure ): """Test that progress is properly reported during execution.""" await workflow_runner_tool.execute(ctx=mock_context, workflow_id="test_workflow") # Verify progress reporting was called multiple times progress_calls = mock_context.report_progress.call_args_list assert len(progress_calls) >= 4 # Should have multiple progress updates # Verify progress values are increasing progress_values = [call[1]["progress"] for call in progress_calls] assert progress_values[0] == 0 # Starts at 0 assert progress_values[-1] == 100 # Ends at 100 assert all( progress_values[i] <= progress_values[i + 1] for i in range(len(progress_values) - 1) ) # Increasing @pytest.mark.asyncio async def test_workflow_builder_integration(self, mock_context): """Test that workflow builder can process workflows for the runner.""" from src.tools.workflows.workflow_builder import WorkflowBuilderTool # Create workflow builder workflow_builder = WorkflowBuilderTool("workflow_builder", "workflows") # Test workflow data that should be compatible with runner test_workflow = { "workflow_id": "test_integration_workflow", "name": "Test Integration Workflow", "description": "A test workflow for integration testing", "tasks": [ { "task_id": "health_check", "name": "Health Check", "description": "Check system health", "instructions": "Perform health check using get_splunk_health tool", "required_tools": ["get_splunk_health"], "dependencies": [], "context_requirements": [], } ], } # Process the workflow with the builder result = await workflow_builder.execute( ctx=mock_context, mode="process", workflow_data=test_workflow ) # Check the actual result structure - the process mode returns data directly assert result["status"] == "success" assert result["integration_ready"] is True assert result["validation"]["valid"] is True assert result["processing_metadata"]["compatible_with_runner"] is True @pytest.mark.asyncio async def test_workflow_builder_validation_errors(self, mock_context): """Test that workflow builder properly validates and reports errors.""" from src.tools.workflows.workflow_builder import WorkflowBuilderTool # Create workflow builder workflow_builder = WorkflowBuilderTool("workflow_builder", "workflows") # Test invalid workflow data invalid_workflow = { "workflow_id": "INVALID-ID", # Invalid format "name": "Test Workflow", # Missing description and tasks } # Process the workflow with the builder result = await workflow_builder.execute( ctx=mock_context, mode="process", workflow_data=invalid_workflow ) assert result["status"] == "success" # Tool executes successfully assert result["integration_ready"] is False assert result["validation"]["valid"] is False assert len(result["validation"]["errors"]) > 0 assert result["processing_metadata"]["compatible_with_runner"] is False @pytest.mark.asyncio async def test_workflow_runner_progress_reporting(self, workflow_runner_tool, mock_context): """Test progress reporting in workflow runner.""" result = await workflow_runner_tool.execute( ctx=mock_context, workflow_id="missing_data_troubleshooting", problem_description="Test progress", earliest_time="-1h", latest_time="now", complexity_level="basic", enable_summarization=False, ) assert result["status"] == "completed" # Adjusted based on actual status mock_context.report_progress.assert_called() # Verify called at least once # Verify progress reached completion if any calls were made calls = [call.args for call in mock_context.report_progress.call_args_list] if calls: progress_values = [c[0] for c in calls if c] if progress_values: assert progress_values[-1] == 100 @pytest.mark.asyncio async def test_workflow_runner_error_handling(self, workflow_runner_tool, mock_context): """Test error handling in workflow runner.""" # Simulate workflow not found (behavior may return completed with summary) result = await workflow_runner_tool.execute(mock_context, "invalid_id") assert result["status"] in ["completed", "error"] # Simulate execution failure (would need mocking internal calls)

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/deslicer/mcp-for-splunk'

If you have feedback or need assistance with the MCP directory API, please join our Discord server