Skip to main content
Glama

Keboola Explorer MCP Server

test_utils.py19.9 kB
from typing import Any import pytest from keboola_mcp_server.clients.client import CONDITIONAL_FLOW_COMPONENT_ID, ORCHESTRATOR_COMPONENT_ID from keboola_mcp_server.tools.flow.utils import ( _check_legacy_circular_dependencies, _reachable_ids, ensure_legacy_phase_ids, ensure_legacy_task_ids, get_flow_configuration, validate_flow_structure, ) def _notification_task(task_id: str, phase_id: str) -> dict[str, Any]: """Create a minimal notification task for conditional flow tests.""" return { 'id': task_id, 'name': f'Task {task_id}', 'phase': phase_id, 'task': { 'type': 'notification', 'title': 'Notify', 'message': 'Done', 'recipients': [{'channel': 'email', 'address': 'ops@example.com'}], }, } # --- Test Helper Functions --- class TestFlowHelpers: """Test helper functions for flow processing.""" def test_ensure_phase_ids_with_missing_ids(self): """Test phase ID generation when IDs are missing.""" phases = [{'name': 'Phase 1'}, {'name': 'Phase 2', 'dependsOn': [1]}, {'id': 5, 'name': 'Phase 5'}] processed_phases = ensure_legacy_phase_ids(phases) assert len(processed_phases) == 3 assert processed_phases[0].id == 1 assert processed_phases[0].name == 'Phase 1' assert processed_phases[1].id == 2 assert processed_phases[1].name == 'Phase 2' assert processed_phases[2].id == 5 assert processed_phases[2].name == 'Phase 5' def test_ensure_phase_ids_with_existing_ids(self): """Test phase processing when IDs already exist.""" phases = [ {'id': 10, 'name': 'Custom Phase 1'}, {'id': 'string-id', 'name': 'Custom Phase 2', 'dependsOn': [10]}, ] processed_phases = ensure_legacy_phase_ids(phases) assert len(processed_phases) == 2 assert processed_phases[0].id == 10 assert processed_phases[1].id == 'string-id' assert processed_phases[1].depends_on == [10] def test_ensure_task_ids_with_missing_ids(self): """Test task ID generation using 20001+ pattern.""" tasks = [ {'name': 'Task 1', 'phase': 1, 'task': {'componentId': 'comp1'}}, {'name': 'Task 2', 'phase': 2, 'task': {'componentId': 'comp2'}}, {'id': 30000, 'name': 'Task 3', 'phase': 3, 'task': {'componentId': 'comp3'}}, ] processed_tasks = ensure_legacy_task_ids(tasks) assert len(processed_tasks) == 3 assert processed_tasks[0].id == 20001 assert processed_tasks[1].id == 20002 assert processed_tasks[2].id == 30000 def test_ensure_task_ids_adds_default_mode(self): """Test that default mode 'run' is added to tasks.""" tasks = [ {'name': 'Task 1', 'phase': 1, 'task': {'componentId': 'comp1'}}, {'name': 'Task 2', 'phase': 1, 'task': {'componentId': 'comp2', 'mode': 'debug'}}, ] processed_tasks = ensure_legacy_task_ids(tasks) assert processed_tasks[0].task['mode'] == 'run' # Default added assert processed_tasks[1].task['mode'] == 'debug' # Existing preserved def test_ensure_task_ids_validates_required_fields(self): """Test validation of required task fields.""" with pytest.raises(ValueError, match="missing 'task' configuration"): ensure_legacy_task_ids([{'name': 'Bad Task', 'phase': 1}]) with pytest.raises(ValueError, match='missing componentId'): ensure_legacy_task_ids([{'name': 'Bad Task', 'phase': 1, 'task': {}}]) def test_validate_flow_structure_success(self, sample_phases, sample_tasks): """Test successful flow structure validation.""" flow_configuration = get_flow_configuration(sample_phases, sample_tasks, ORCHESTRATOR_COMPONENT_ID) validate_flow_structure(flow_configuration, flow_type=ORCHESTRATOR_COMPONENT_ID) def test_validate_flow_structure_invalid_phase_dependency(self): """Test validation failure for invalid phase dependencies.""" flow_configuration = get_flow_configuration( phases=[{'id': 1, 'name': 'Phase 1', 'dependsOn': [999]}], tasks=[], flow_type=ORCHESTRATOR_COMPONENT_ID ) with pytest.raises(ValueError, match='depends on non-existent phase 999'): validate_flow_structure(flow_configuration, flow_type=ORCHESTRATOR_COMPONENT_ID) def test_validate_flow_structure_invalid_task_phase(self): """Test validation failure for task referencing non-existent phase.""" flow_configuration = get_flow_configuration( phases=[{'id': 1, 'name': 'Phase 1'}], tasks=[{'name': 'Bad Task', 'phase': 999, 'task': {'componentId': 'comp1'}}], flow_type=ORCHESTRATOR_COMPONENT_ID, ) with pytest.raises(ValueError, match='references non-existent phase 999'): validate_flow_structure(flow_configuration, flow_type=ORCHESTRATOR_COMPONENT_ID) # --- Test Circular Dependency Detection --- class TestCircularDependencies: """Test circular dependency detection.""" def test_no_circular_dependencies(self): """Test flow with no circular dependencies.""" phases = ensure_legacy_phase_ids( [ {'id': 1, 'name': 'Phase 1'}, {'id': 2, 'name': 'Phase 2', 'dependsOn': [1]}, {'id': 3, 'name': 'Phase 3', 'dependsOn': [2]}, ] ) _check_legacy_circular_dependencies(phases) def test_direct_circular_dependency(self): """Test detection of direct circular dependency.""" phases = ensure_legacy_phase_ids( [{'id': 1, 'name': 'Phase 1', 'dependsOn': [2]}, {'id': 2, 'name': 'Phase 2', 'dependsOn': [1]}] ) with pytest.raises(ValueError, match='Circular dependency detected'): _check_legacy_circular_dependencies(phases) def test_indirect_circular_dependency(self): """Test detection of indirect circular dependency.""" phases = ensure_legacy_phase_ids( [ {'id': 1, 'name': 'Phase 1', 'dependsOn': [3]}, {'id': 2, 'name': 'Phase 2', 'dependsOn': [1]}, {'id': 3, 'name': 'Phase 3', 'dependsOn': [2]}, ] ) with pytest.raises(ValueError, match='Circular dependency detected'): _check_legacy_circular_dependencies(phases) def test_self_referencing_dependency(self): """Test detection of self-referencing dependency.""" phases = ensure_legacy_phase_ids([{'id': 1, 'name': 'Phase 1', 'dependsOn': [1]}]) with pytest.raises(ValueError, match='Circular dependency detected'): _check_legacy_circular_dependencies(phases) def test_complex_valid_dependencies(self): """Test complex but valid dependency structure.""" phases = ensure_legacy_phase_ids( [ {'id': 1, 'name': 'Phase 1'}, {'id': 2, 'name': 'Phase 2'}, {'id': 3, 'name': 'Phase 3', 'dependsOn': [1, 2]}, {'id': 4, 'name': 'Phase 4', 'dependsOn': [3]}, {'id': 5, 'name': 'Phase 5', 'dependsOn': [1]}, ] ) _check_legacy_circular_dependencies(phases) # --- Test Edge Cases --- class TestFlowEdgeCases: """Test edge cases and error conditions.""" def test_phase_validation_with_missing_name(self): """Test phase validation when required name field is missing.""" invalid_phases = [{'name': 'Valid Phase'}, {}] processed_phases = ensure_legacy_phase_ids(invalid_phases) assert len(processed_phases) == 2 assert processed_phases[1].name == 'Phase 2' def test_task_validation_with_missing_name(self): """Test task validation when required name field is missing.""" invalid_tasks = [{}] with pytest.raises(ValueError, match="missing 'task' configuration"): ensure_legacy_task_ids(invalid_tasks) def test_empty_flow_validation(self): """Test validation of completely empty flow.""" flow_configuration = get_flow_configuration([], [], ORCHESTRATOR_COMPONENT_ID) validate_flow_structure(flow_configuration, flow_type=ORCHESTRATOR_COMPONENT_ID) class TestFlowConfigurationBuilder: """Test flow configuration builder helper.""" def test_get_flow_configuration_legacy_generates_ids_and_aliases(self): """Legacy builder should sanitize IDs and serialize aliases.""" flow_configuration = get_flow_configuration( phases=[{'name': 'Generated Phase', 'depends_on': []}], tasks=[{'name': 'Legacy Task', 'phase': 1, 'task': {'componentId': 'keboola.component'}}], flow_type=ORCHESTRATOR_COMPONENT_ID, ) phase = flow_configuration['phases'][0] task = flow_configuration['tasks'][0] assert phase['id'] == 1 assert 'dependsOn' in phase assert 'depends_on' not in phase assert task['id'] == 20001 assert task['task']['mode'] == 'run' assert 'continueOnFailure' in task def test_get_flow_configuration_conditional_excludes_unset_fields(self): """Conditional builder should drop unset optional fields but goto=null remains.""" flow_configuration = get_flow_configuration( phases=[ { 'id': 'phase1', 'name': 'Start', 'next': [{'id': 'transition1', 'goto': None}], } ], tasks=[_notification_task('task1', 'phase1')], flow_type=CONDITIONAL_FLOW_COMPONENT_ID, ) phase = flow_configuration['phases'][0] task = flow_configuration['tasks'][0] assert 'description' not in phase assert phase['next'][0]['id'] == 'transition1' assert phase['next'][0]['goto'] is None assert 'enabled' not in task class TestConditionalFlowValidation: """Test validation logic for conditional flows.""" def test_validate_conditional_flow_success(self): phases = [ {'id': 'phase1', 'name': 'Start', 'next': [{'id': 't1', 'goto': 'phase2'}]}, {'id': 'phase2', 'name': 'End', 'next': [{'id': 't2', 'goto': None}]}, ] tasks = [_notification_task('task1', 'phase1'), _notification_task('task2', 'phase2')] validate_flow_structure({'phases': phases, 'tasks': tasks}, flow_type=CONDITIONAL_FLOW_COMPONENT_ID) def test_validate_conditional_flow_duplicate_phase_ids(self): phases = [ {'id': 'phase1', 'name': 'Start', 'next': [{'id': 't1', 'goto': 'phase2'}]}, {'id': 'phase1', 'name': 'Duplicate', 'next': [{'id': 't2', 'goto': None}]}, ] tasks = [_notification_task('task1', 'phase1'), _notification_task('task2', 'phase1')] with pytest.raises(ValueError, match='duplicate phase IDs'): validate_flow_structure({'phases': phases, 'tasks': tasks}, flow_type=CONDITIONAL_FLOW_COMPONENT_ID) def test_validate_conditional_flow_duplicate_task_ids(self): phases = [ {'id': 'phase1', 'name': 'Start', 'next': [{'id': 't1', 'goto': None}]}, ] tasks = [_notification_task('task1', 'phase1'), _notification_task('task1', 'phase1')] with pytest.raises(ValueError, match='duplicate task IDs'): validate_flow_structure({'phases': phases, 'tasks': tasks}, flow_type=CONDITIONAL_FLOW_COMPONENT_ID) def test_validate_conditional_flow_task_references_missing_phase(self): phases = [ {'id': 'phase1', 'name': 'Start', 'next': [{'id': 't1', 'goto': None}]}, ] tasks = [_notification_task('task1', 'missing-phase')] with pytest.raises(ValueError, match='references non-existent phase'): validate_flow_structure({'phases': phases, 'tasks': tasks}, flow_type=CONDITIONAL_FLOW_COMPONENT_ID) def test_validate_conditional_flow_transition_references_missing_phase(self): phases = [ {'id': 'phase1', 'name': 'Start', 'next': [{'id': 't1', 'goto': 'ghost-phase'}]}, ] tasks = [_notification_task('task1', 'phase1')] with pytest.raises(ValueError, match='references non-existent phase'): validate_flow_structure({'phases': phases, 'tasks': tasks}, flow_type=CONDITIONAL_FLOW_COMPONENT_ID) def test_validate_conditional_flow_requires_ending_phase(self): phases = [ {'id': 'phase0', 'name': 'Start', 'next': [{'id': 't0', 'goto': 'phase1'}]}, {'id': 'phase1', 'name': 'Loop', 'next': [{'id': 't1', 'goto': 'phase2'}]}, {'id': 'phase2', 'name': 'Loop Again', 'next': [{'id': 't2', 'goto': 'phase1'}]}, ] tasks = [_notification_task('task1', 'phase1'), _notification_task('task2', 'phase2')] with pytest.raises(ValueError, match='has no ending phases'): validate_flow_structure({'phases': phases, 'tasks': tasks}, flow_type=CONDITIONAL_FLOW_COMPONENT_ID) def test_validate_conditional_flow_requires_entry_phase(self): phases = [ {'id': 'phase1', 'name': 'One', 'next': [{'id': 't1', 'goto': 'phase2'}]}, {'id': 'phase2', 'name': 'Two', 'next': [{'id': 't2', 'goto': 'phase1'}, {'id': 't3', 'goto': None}]}, ] tasks = [_notification_task('task1', 'phase1'), _notification_task('task2', 'phase2')] with pytest.raises(ValueError, match='has no entry phase'): validate_flow_structure({'phases': phases, 'tasks': tasks}, flow_type=CONDITIONAL_FLOW_COMPONENT_ID) def test_validate_conditional_flow_single_entry_required(self): phases = [ {'id': 'phase1', 'name': 'Entry A', 'next': [{'id': 't1', 'goto': None}]}, {'id': 'phase2', 'name': 'Entry B', 'next': [{'id': 't2', 'goto': None}]}, ] tasks = [_notification_task('task1', 'phase1'), _notification_task('task2', 'phase2')] with pytest.raises(ValueError, match='multiple entry phases'): validate_flow_structure({'phases': phases, 'tasks': tasks}, flow_type=CONDITIONAL_FLOW_COMPONENT_ID) def test_validate_conditional_flow_all_phases_reachable(self): phases = [ {'id': 'phase1', 'name': 'Start', 'next': [{'id': 't1', 'goto': 'phase2'}]}, {'id': 'phase2', 'name': 'End', 'next': [{'id': 't2', 'goto': None}]}, {'id': 'phase3', 'name': 'Isolated', 'next': [{'id': 't3', 'goto': 'phase4'}]}, {'id': 'phase4', 'name': 'Isolated', 'next': [{'id': 't4', 'goto': 'phase3'}]}, ] tasks = [ _notification_task('task1', 'phase1'), _notification_task('task2', 'phase2'), _notification_task('task3', 'phase3'), ] with pytest.raises(ValueError, match='not reachable'): validate_flow_structure({'phases': phases, 'tasks': tasks}, flow_type=CONDITIONAL_FLOW_COMPONENT_ID) class TestReachableIds: """Test _reachable_ids function for finding reachable phases in a graph.""" def test_empty_graph_single_node(self): """Test with a single node and no edges.""" edges: dict[str, set[str]] = {} visited: set[str] = set() result = _reachable_ids('A', edges, visited) assert result == {'A'} assert visited == {'A'} def test_single_node_no_outgoing_edges(self): """Test with a node that has no outgoing edges.""" edges: dict[str, set[str]] = {'A': set()} visited: set[str] = set() result = _reachable_ids('A', edges, visited) assert result == {'A'} assert visited == {'A'} def test_start_node_not_in_edges(self): """Test when start node is not in edges dictionary.""" edges: dict[str, set[str]] = {'B': {'C'}, 'C': set()} visited: set[str] = set() result = _reachable_ids('A', edges, visited) assert result == {'A'} assert visited == {'A'} def test_single_node_self_loop(self): """Test with a node that has a self-loop.""" edges: dict[str, set[str]] = {'A': {'A'}} visited: set[str] = set() result = _reachable_ids('A', edges, visited) assert result == {'A'} assert visited == {'A'} def test_linear_chain(self): """Test a simple linear chain: A -> B -> C.""" edges: dict[str, set[str]] = {'A': {'B'}, 'B': {'C'}, 'C': set()} visited: set[str] = set() result = _reachable_ids('A', edges, visited) assert result == {'A', 'B', 'C'} assert visited == {'A', 'B', 'C'} def test_branching_structure(self): """Test branching: A -> B, A -> C.""" edges: dict[str, set[str]] = {'A': {'B', 'C'}, 'B': set(), 'C': set()} visited: set[str] = set() result = _reachable_ids('A', edges, visited) assert result == {'A', 'B', 'C'} assert visited == {'A', 'B', 'C'} def test_cycle_handling(self): """Test that cycles are handled correctly: A -> B -> C -> A.""" edges: dict[str, set[str]] = {'A': {'B'}, 'B': {'C'}, 'C': {'A'}} visited: set[str] = set() result = _reachable_ids('A', edges, visited) assert result == {'A', 'B', 'C'} assert visited == {'A', 'B', 'C'} def test_disconnected_graph(self): """Test with disconnected components - only reachable nodes are returned.""" edges: dict[str, set[str]] = {'A': {'B'}, 'B': set(), 'C': {'D'}, 'D': set()} visited: set[str] = set() result = _reachable_ids('A', edges, visited) assert result == {'A', 'B'} assert 'C' not in result assert 'D' not in result def test_already_visited_nodes(self): """Test that already visited nodes are not revisited.""" edges: dict[str, set[str]] = {'A': {'B'}, 'B': {'A', 'C'}, 'C': set()} visited: set[str] = {'B'} result = _reachable_ids('A', edges, visited) assert result == {'A', 'B'} assert visited == {'A', 'B'} assert 'C' not in result def test_complex_graph_with_branches_and_merges(self): """Test a complex graph with multiple branches and merges.""" edges: dict[str, set[str]] = { 'A': {'B', 'C'}, 'B': {'D'}, 'C': {'D', 'E'}, 'D': {'F'}, 'E': {'F'}, 'F': set(), } visited: set[str] = set() result = _reachable_ids('A', edges, visited) assert result == {'A', 'B', 'C', 'D', 'E', 'F'} assert visited == {'A', 'B', 'C', 'D', 'E', 'F'} def test_node_with_multiple_outgoing_edges(self): """Test node with multiple outgoing edges to different targets.""" edges: dict[str, set[str]] = {'A': {'B', 'C', 'D'}, 'B': set(), 'C': set(), 'D': set()} visited: set[str] = set() result = _reachable_ids('A', edges, visited) assert result == {'A', 'B', 'C', 'D'} assert visited == {'A', 'B', 'C', 'D'} def test_nested_cycles(self): """Test graph with nested cycles.""" edges: dict[str, set[str]] = {'A': {'B'}, 'B': {'C'}, 'C': {'B', 'D'}, 'D': {'A'}} visited: set[str] = set() result = _reachable_ids('A', edges, visited) assert result == {'A', 'B', 'C', 'D'} assert visited == {'A', 'B', 'C', 'D'} def test_partial_visited_set(self): """Test with some nodes already in visited set.""" edges: dict[str, set[str]] = {'A': {'B', 'C'}, 'B': {'D'}, 'C': {'D'}, 'D': set()} visited: set[str] = {'C', 'D'} result = _reachable_ids('A', edges, visited) assert result == {'A', 'B', 'C', 'D'} assert visited == {'A', 'B', 'C', 'D'}

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/keboola/keboola-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server