Skip to main content
Glama

Keboola Explorer MCP Server

test_tools.py37.4 kB
"""Unit tests for Flow management tools.""" from typing import Any, Dict, List import httpx import pytest from mcp.server.fastmcp import Context from pytest_mock import MockerFixture from keboola_mcp_server.clients.client import CONDITIONAL_FLOW_COMPONENT_ID, ORCHESTRATOR_COMPONENT_ID, KeboolaClient from keboola_mcp_server.tools.flow.model import ( ConditionalFlowPhase, ConditionalFlowTask, Flow, FlowPhase, FlowSummary, FlowTask, ListFlowsOutput, ) from keboola_mcp_server.tools.flow.tools import ( FlowToolOutput, create_conditional_flow, create_flow, get_flow, get_flow_examples, get_flow_schema, list_flows, update_flow, ) # ============================================================================= # FLOW DATA FIXTURES # ============================================================================= @pytest.fixture def legacy_flow_phases() -> List[Dict[str, Any]]: """Sample legacy flow phases.""" return [ {'id': 1, 'name': 'Data Extraction', 'description': 'Extract data from various sources', 'dependsOn': []}, {'id': 2, 'name': 'Data Transformation', 'description': 'Transform and process data', 'dependsOn': [1]}, {'id': 3, 'name': 'Data Loading', 'description': 'Load data to destination', 'dependsOn': [2]}, ] @pytest.fixture def legacy_flow_tasks() -> List[Dict[str, Any]]: """Sample legacy flow tasks.""" return [ { 'id': 20001, 'name': 'Extract from S3', 'phase': 1, 'enabled': True, 'continueOnFailure': False, 'task': {'componentId': 'keboola.ex-aws-s3', 'configId': '123456', 'mode': 'run'}, }, { 'id': 20002, 'name': 'Transform Data', 'phase': 2, 'enabled': True, 'continueOnFailure': False, 'task': {'componentId': 'keboola.snowflake-transformation', 'configId': '789012', 'mode': 'run'}, }, { 'id': 20003, 'name': 'Load to Warehouse', 'phase': 3, 'enabled': True, 'continueOnFailure': False, 'task': {'componentId': 'keboola.wr-snowflake', 'configId': '345678', 'mode': 'run'}, }, ] @pytest.fixture def mock_conditional_flow_phases() -> List[Dict[str, Any]]: """Sample conditional flow phases with simple configuration.""" return [ { 'id': 'phase1', 'name': 'Simple Phase', 'description': 'A simple conditional flow phase', 'next': [{'id': 'transition1', 'name': 'Simple Transition', 'goto': None}], } ] @pytest.fixture def mock_conditional_flow_tasks() -> List[Dict[str, Any]]: """Sample conditional flow tasks with simple configuration.""" return [ { 'id': 'task1', 'name': 'Simple Task', 'phase': 'phase1', 'enabled': True, 'task': { 'type': 'notification', 'recipients': [{'channel': 'email', 'address': 'admin@company.com'}], 'title': 'Simple Notification', 'message': 'This is a simple notification task', }, } ] @pytest.fixture def mock_conditional_flow( mock_conditional_flow_phases: List[Dict[str, Any]], mock_conditional_flow_tasks: List[Dict[str, Any]] ) -> Dict[str, Any]: """Mock conditional flow configuration response for get_flow endpoint.""" return { 'component_id': CONDITIONAL_FLOW_COMPONENT_ID, 'configuration_id': 'conditional_flow_456', 'name': 'Advanced Data Pipeline', 'description': 'Advanced pipeline with conditional logic and error handling', 'created': '2025-01-15T11:00:00Z', 'updated': '2025-01-15T11:00:00Z', 'creatorToken': {'id': 'test_token', 'description': 'Test token'}, 'version': 1, 'changeDescription': 'Initial creation', 'isDisabled': False, 'isDeleted': False, 'configuration': {'phases': mock_conditional_flow_phases, 'tasks': mock_conditional_flow_tasks}, 'rows': [], 'metadata': [], } @pytest.fixture def mock_conditional_flow_create_update( mock_conditional_flow_phases: List[Dict[str, Any]], mock_conditional_flow_tasks: List[Dict[str, Any]] ) -> Dict[str, Any]: """Mock conditional flow configuration response for create/update endpoints.""" return { 'id': 'conditional_flow_456', 'name': 'Advanced Data Pipeline', 'description': 'Advanced pipeline with conditional logic and error handling', 'created': '2025-01-15T11:00:00Z', 'creatorToken': {'id': 'test_token', 'description': 'Test token'}, 'version': 1, 'changeDescription': 'Initial creation', 'isDisabled': False, 'isDeleted': False, 'configuration': {'phases': mock_conditional_flow_phases, 'tasks': mock_conditional_flow_tasks}, 'state': {}, 'currentVersion': {'version': 1}, } @pytest.fixture def mock_legacy_flow_create_update( legacy_flow_phases: list[dict[str, Any]], legacy_flow_tasks: list[dict[str, Any]] ) -> dict[str, Any]: """Mock legacy flow configuration response for create/update endpoints.""" return { 'id': 'legacy_flow_123', 'name': 'Legacy ETL Pipeline', 'description': 'Traditional ETL pipeline using legacy flows', 'created': '2025-01-15T10:30:00Z', 'creatorToken': {'id': 'test_token', 'description': 'Test token'}, 'version': 1, 'changeDescription': 'Initial creation', 'isDisabled': False, 'isDeleted': False, 'configuration': {'phases': legacy_flow_phases, 'tasks': legacy_flow_tasks}, 'state': {}, 'currentVersion': {'version': 1}, } @pytest.fixture def mock_legacy_flow( legacy_flow_phases: list[dict[str, Any]], legacy_flow_tasks: list[dict[str, Any]] ) -> dict[str, Any]: """Mock legacy flow configuration response for get_flow endpoint.""" return { 'component_id': ORCHESTRATOR_COMPONENT_ID, 'configuration_id': 'legacy_flow_123', 'name': 'Legacy ETL Pipeline', 'description': 'Traditional ETL pipeline using legacy flows', 'created': '2025-01-15T10:30:00Z', 'updated': '2025-01-15T10:30:00Z', 'creatorToken': {'id': 'test_token', 'description': 'Test token'}, 'version': 1, 'changeDescription': 'Initial creation', 'isDisabled': False, 'isDeleted': False, 'configuration': {'phases': legacy_flow_phases, 'tasks': legacy_flow_tasks}, 'rows': [], 'metadata': [], } # ============================================================================= # CREATE_FLOW TOOL TESTS # ============================================================================= class TestCreateFlowTool: """Tests for the create_flow tool.""" @pytest.mark.asyncio async def test_create_legacy_flow( self, mocker: MockerFixture, mcp_context_client: Context, legacy_flow_phases: list[dict[str, Any]], legacy_flow_tasks: list[dict[str, Any]], mock_legacy_flow_create_update: dict[str, Any], ): """Should create a new legacy (orchestrator) flow with valid phases/tasks.""" keboola_client = KeboolaClient.from_state(mcp_context_client.session.state) mocker.patch.object( keboola_client.storage_client, 'configuration_create', return_value=mock_legacy_flow_create_update, ) result = await create_flow( ctx=mcp_context_client, name='Legacy ETL Pipeline', description='Traditional ETL pipeline using legacy flows', phases=legacy_flow_phases, tasks=legacy_flow_tasks, ) assert isinstance(result, FlowToolOutput) assert result.success is True assert result.configuration_id == mock_legacy_flow_create_update['id'] assert result.component_id == 'keboola.orchestrator' assert result.description == mock_legacy_flow_create_update['description'] assert result.timestamp is not None assert len(result.links) == 3 assert result.version == mock_legacy_flow_create_update['version'] keboola_client.storage_client.configuration_create.assert_called_once() @pytest.mark.asyncio async def test_create_conditional_flow( self, mocker: MockerFixture, mcp_context_client: Context, mock_conditional_flow_create_update: Dict[str, Any], ): """Test conditional flow creation.""" keboola_client = KeboolaClient.from_state(mcp_context_client.session.state) keboola_client.storage_client.configuration_create = mocker.AsyncMock( return_value=mock_conditional_flow_create_update ) result = await create_conditional_flow( ctx=mcp_context_client, name='Advanced Data Pipeline', description='Advanced pipeline with conditional logic and error handling', phases=mock_conditional_flow_create_update['configuration']['phases'], tasks=mock_conditional_flow_create_update['configuration']['tasks'], ) assert isinstance(result, FlowToolOutput) assert result.success is True assert result.configuration_id == mock_conditional_flow_create_update['id'] assert result.component_id == 'keboola.flow' assert result.description == mock_conditional_flow_create_update['description'] assert result.timestamp is not None assert len(result.links) == 3 assert result.version == mock_conditional_flow_create_update['version'] keboola_client.storage_client.configuration_create.assert_called_once() # ============================================================================= # UPDATE_FLOW TOOL TESTS # ============================================================================= class TestUpdateFlowTool: """Tests for the update_flow tool.""" # TODO: The test_update_*() tests need to cover different variations of the tool's parameters # and properly check that the original flow was correctly updated. @pytest.mark.asyncio async def test_update_legacy_flow( self, mocker: MockerFixture, mcp_context_client: Context, legacy_flow_phases: List[Dict[str, Any]], legacy_flow_tasks: List[Dict[str, Any]], mock_legacy_flow_create_update: Dict[str, Any], ): """Test legacy flow update with new phases and tasks.""" mock_project_info = mocker.Mock() mock_project_info.conditional_flows = True async def mock_get_project_info(ctx): return mock_project_info mocker.patch('keboola_mcp_server.tools.flow.tools.get_project_info', side_effect=mock_get_project_info) updated_config = mock_legacy_flow_create_update.copy() updated_config['version'] = 2 updated_config['description'] = 'Updated legacy ETL pipeline' keboola_client = KeboolaClient.from_state(mcp_context_client.session.state) keboola_client.storage_client.configuration_detail = mocker.AsyncMock(return_value={}) keboola_client.storage_client.configuration_update = mocker.AsyncMock(return_value=updated_config) result = await update_flow( ctx=mcp_context_client, configuration_id='legacy_flow_123', flow_type=ORCHESTRATOR_COMPONENT_ID, name='Updated Legacy ETL Pipeline', description='Updated legacy ETL pipeline', phases=legacy_flow_phases, tasks=legacy_flow_tasks, change_description='Added data validation phase and enhanced error handling', ) assert isinstance(result, FlowToolOutput) assert result.success is True assert result.configuration_id == 'legacy_flow_123' assert result.component_id == 'keboola.orchestrator' assert result.description == 'Updated legacy ETL pipeline' assert result.timestamp is not None assert len(result.links) == 3 assert result.version == updated_config['version'] keboola_client.storage_client.configuration_update.assert_called_once() @pytest.mark.asyncio async def test_update_conditional_flow( self, mocker: MockerFixture, mcp_context_client: Context, mock_conditional_flow_create_update: Dict[str, Any], ): """Test conditional flow update with enhanced conditions.""" mock_project_info = mocker.Mock() mock_project_info.conditional_flows = True async def mock_get_project_info(ctx): return mock_project_info mocker.patch('keboola_mcp_server.tools.flow.tools.get_project_info', side_effect=mock_get_project_info) updated_config = mock_conditional_flow_create_update.copy() updated_config['version'] = 2 updated_config['name'] = 'Enhanced Advanced Data Pipeline' updated_config['description'] = 'Enhanced pipeline with improved conditional logic' keboola_client = KeboolaClient.from_state(mcp_context_client.session.state) keboola_client.storage_client.configuration_detail = mocker.AsyncMock(return_value={}) keboola_client.storage_client.configuration_update = mocker.AsyncMock(return_value=updated_config) result = await update_flow( ctx=mcp_context_client, configuration_id='conditional_flow_456', flow_type=CONDITIONAL_FLOW_COMPONENT_ID, name='Enhanced Advanced Data Pipeline', description='Enhanced pipeline with improved conditional logic', phases=mock_conditional_flow_create_update['configuration']['phases'], tasks=mock_conditional_flow_create_update['configuration']['tasks'], change_description='Enhanced error handling and added notification phase', ) assert isinstance(result, FlowToolOutput) assert result.success is True assert result.configuration_id == updated_config['id'] assert result.component_id == 'keboola.flow' assert result.description == updated_config['description'] assert result.timestamp is not None assert len(result.links) == 3 assert result.version == updated_config['version'] keboola_client.storage_client.configuration_update.assert_called_once() @pytest.mark.asyncio async def test_update_conditional_flow_fails_when_conditional_flows_disabled( self, mocker: MockerFixture, mcp_context_client: Context, ): """Test that updating conditional flow fails when conditional flows are disabled.""" # Mock project info with conditional flows disabled mock_project_info = mocker.Mock() mock_project_info.conditional_flows = False mock_project_info.project_name = 'Test Project' async def mock_get_project_info(ctx): return mock_project_info mocker.patch('keboola_mcp_server.tools.flow.tools.get_project_info', side_effect=mock_get_project_info) # Should raise ValueError with proper error message with pytest.raises(ValueError, match='Conditional flows are not supported.') as exc_info: await update_flow( ctx=mcp_context_client, configuration_id='test-config-id', flow_type=CONDITIONAL_FLOW_COMPONENT_ID, name='Updated Conditional Flow', description='Updated description for conditional flow', phases=[], tasks=[], change_description='Test update', ) error_message = str(exc_info.value) assert 'Conditional flows are not supported in this project' in error_message assert 'Test Project' in error_message assert 'conditional_flows=false' in error_message assert 'enable them in your project settings' in error_message # ============================================================================= # GET_FLOW TOOL TESTS # ============================================================================= class TestGetFlowTool: """Tests for the get_flow tool.""" @pytest.mark.asyncio async def test_get_legacy_flow( self, mocker: MockerFixture, mcp_context_client: Context, mock_legacy_flow: dict[str, Any], ): """Should fall back to legacy flow when conditional flow is missing (404).""" async def mock_configuration_detail(component_id: str, configuration_id: str) -> dict[str, Any]: if component_id == CONDITIONAL_FLOW_COMPONENT_ID: response = mocker.Mock(status_code=404) raise httpx.HTTPStatusError('404 Not Found', request=None, response=response) if component_id == ORCHESTRATOR_COMPONENT_ID: return mock_legacy_flow raise ValueError(f'Unexpected component_id: {component_id}') keboola_client = KeboolaClient.from_state(mcp_context_client.session.state) mocker.patch.object( keboola_client.storage_client, 'configuration_detail', side_effect=mock_configuration_detail ) result = await get_flow( ctx=mcp_context_client, configuration_id='legacy_flow_123', ) assert isinstance(result, Flow) assert result.configuration_id == mock_legacy_flow['configuration_id'] assert result.name == mock_legacy_flow['name'] assert result.description == mock_legacy_flow['description'] assert result.created == mock_legacy_flow['created'] assert result.updated == mock_legacy_flow['updated'] assert result.version == mock_legacy_flow['version'] assert result.is_disabled == mock_legacy_flow['isDisabled'] assert result.is_deleted == mock_legacy_flow['isDeleted'] assert result.configuration.phases == [ FlowPhase.model_validate(phase) for phase in mock_legacy_flow['configuration']['phases'] ] assert result.configuration.tasks == [ FlowTask.model_validate(task) for task in mock_legacy_flow['configuration']['tasks'] ] assert len(result.links) == 3 @pytest.mark.asyncio async def test_get_conditional_flow( self, mocker: MockerFixture, mcp_context_client: Context, mock_conditional_flow: Dict[str, Any], ): """Test retrieving conditional flow details.""" keboola_client = KeboolaClient.from_state(mcp_context_client.session.state) keboola_client.storage_client.configuration_detail = mocker.AsyncMock(return_value=mock_conditional_flow) result = await get_flow(ctx=mcp_context_client, configuration_id='conditional_flow_456') assert isinstance(result, Flow) assert result.component_id == mock_conditional_flow['component_id'] assert result.configuration_id == mock_conditional_flow['configuration_id'] assert result.name == mock_conditional_flow['name'] assert result.description == mock_conditional_flow['description'] assert result.created == mock_conditional_flow['created'] assert result.updated == mock_conditional_flow['updated'] assert result.version == mock_conditional_flow['version'] assert result.is_disabled == mock_conditional_flow['isDisabled'] assert result.is_deleted == mock_conditional_flow['isDeleted'] assert result.configuration.phases == [ ConditionalFlowPhase.model_validate(phase) for phase in mock_conditional_flow['configuration']['phases'] ] assert result.configuration.tasks == [ ConditionalFlowTask.model_validate(task) for task in mock_conditional_flow['configuration']['tasks'] ] assert len(result.links) == 3 # ============================================================================= # LIST_FLOWS TOOL TESTS # ============================================================================= class TestListFlowsTool: """Tests for the list_flows tool.""" @pytest.mark.asyncio async def test_list_flows_no_params( self, mocker: MockerFixture, mcp_context_client: Context, mock_legacy_flow: Dict[str, Any], mock_conditional_flow: Dict[str, Any], ): """Test listing flows of both types with comprehensive validation.""" keboola_client = KeboolaClient.from_state(mcp_context_client.session.state) # Mock different responses for different flow types def mock_configuration_list(component_id): if component_id == ORCHESTRATOR_COMPONENT_ID: return [mock_legacy_flow] elif component_id == CONDITIONAL_FLOW_COMPONENT_ID: return [mock_conditional_flow] return [] keboola_client.storage_client.configuration_list = mocker.AsyncMock(side_effect=mock_configuration_list) result = await list_flows(ctx=mcp_context_client) assert isinstance(result, ListFlowsOutput) assert len(result.flows) == 2 # Verify both flow types are present flow_types = {flow.component_id for flow in result.flows} assert ORCHESTRATOR_COMPONENT_ID in flow_types assert CONDITIONAL_FLOW_COMPONENT_ID in flow_types # Extract and validate individual flows legacy_flow = next(f for f in result.flows if f.component_id == ORCHESTRATOR_COMPONENT_ID) conditional_flow = next(f for f in result.flows if f.component_id == CONDITIONAL_FLOW_COMPONENT_ID) # Validate legacy flow summary structure assert isinstance(legacy_flow, FlowSummary) assert legacy_flow.component_id == ORCHESTRATOR_COMPONENT_ID assert legacy_flow.configuration_id == mock_legacy_flow['configuration_id'] assert legacy_flow.name == mock_legacy_flow['name'] assert legacy_flow.description == mock_legacy_flow['description'] assert legacy_flow.created == mock_legacy_flow['created'] assert legacy_flow.updated == mock_legacy_flow['updated'] assert legacy_flow.version == mock_legacy_flow['version'] assert legacy_flow.is_disabled == mock_legacy_flow['isDisabled'] assert legacy_flow.is_deleted == mock_legacy_flow['isDeleted'] # Validate conditional flow summary structure assert isinstance(conditional_flow, FlowSummary) assert conditional_flow.component_id == CONDITIONAL_FLOW_COMPONENT_ID assert conditional_flow.configuration_id == mock_conditional_flow['configuration_id'] assert conditional_flow.name == mock_conditional_flow['name'] assert conditional_flow.description == mock_conditional_flow['description'] assert conditional_flow.created == mock_conditional_flow['created'] assert conditional_flow.updated == mock_conditional_flow['updated'] assert conditional_flow.version == mock_conditional_flow['version'] assert conditional_flow.is_disabled == mock_conditional_flow['isDisabled'] assert conditional_flow.is_deleted == mock_conditional_flow['isDeleted'] assert keboola_client.storage_client.configuration_list.call_count == 2 @pytest.mark.asyncio async def test_list_flows_specific_ids_mixed_types( self, mocker: MockerFixture, mcp_context_client: Context, mock_legacy_flow: Dict[str, Any], mock_conditional_flow: Dict[str, Any], ): """Test listing specific flows by ID when they're different types with comprehensive validation.""" keboola_client = KeboolaClient.from_state(mcp_context_client.session.state) def mock_configuration_detail(component_id, configuration_id): if configuration_id == 'legacy_flow_123' and component_id == ORCHESTRATOR_COMPONENT_ID: return mock_legacy_flow elif configuration_id == 'conditional_flow_456' and component_id == CONDITIONAL_FLOW_COMPONENT_ID: return mock_conditional_flow raise Exception(f'Configuration {configuration_id} not found') keboola_client.storage_client.configuration_detail = mocker.AsyncMock(side_effect=mock_configuration_detail) result = await list_flows(ctx=mcp_context_client, flow_ids=['legacy_flow_123', 'conditional_flow_456']) # Validate the overall response structure assert isinstance(result, ListFlowsOutput) assert len(result.flows) == 2 # Verify both flow types are present flow_types = {flow.component_id for flow in result.flows} assert ORCHESTRATOR_COMPONENT_ID in flow_types assert CONDITIONAL_FLOW_COMPONENT_ID in flow_types # Extract and validate individual flows legacy_flow = next(f for f in result.flows if f.component_id == ORCHESTRATOR_COMPONENT_ID) conditional_flow = next(f for f in result.flows if f.component_id == CONDITIONAL_FLOW_COMPONENT_ID) # Validate legacy flow summary structure assert isinstance(legacy_flow, FlowSummary) assert legacy_flow.component_id == ORCHESTRATOR_COMPONENT_ID assert legacy_flow.configuration_id == mock_legacy_flow['configuration_id'] assert legacy_flow.name == mock_legacy_flow['name'] assert legacy_flow.description == mock_legacy_flow['description'] assert legacy_flow.created == mock_legacy_flow['created'] assert legacy_flow.updated == mock_legacy_flow['updated'] assert legacy_flow.version == mock_legacy_flow['version'] assert legacy_flow.is_disabled == mock_legacy_flow['isDisabled'] assert legacy_flow.is_deleted == mock_legacy_flow['isDeleted'] # Validate conditional flow summary structure assert isinstance(conditional_flow, FlowSummary) assert conditional_flow.component_id == CONDITIONAL_FLOW_COMPONENT_ID assert conditional_flow.configuration_id == mock_conditional_flow['configuration_id'] assert conditional_flow.name == mock_conditional_flow['name'] assert conditional_flow.description == mock_conditional_flow['description'] assert conditional_flow.created == mock_conditional_flow['created'] assert conditional_flow.updated == mock_conditional_flow['updated'] assert conditional_flow.version == mock_conditional_flow['version'] assert conditional_flow.is_disabled == mock_conditional_flow['isDisabled'] assert conditional_flow.is_deleted == mock_conditional_flow['isDeleted'] # Since we look up for both types (conditional flows first) we expect the calls to be 2 and 1, respectfully assert keboola_client.storage_client.configuration_detail.call_count == 3 # ============================================================================= # GET_FLOW_SCHEMA TOOL TESTS # ============================================================================= class TestGetFlowSchemaTool: """Tests for the get_flow_schema tool.""" @pytest.mark.asyncio async def test_get_legacy_flow_schema_when_conditional_flows_disabled( self, mocker: MockerFixture, mcp_context_client: Context, ): """Test getting schema for legacy flow type when conditional flows are disabled.""" mock_project_info = mocker.Mock() mock_project_info.conditional_flows = False mocker.patch('keboola_mcp_server.tools.flow.tools.get_project_info', return_value=mock_project_info) result = await get_flow_schema(ctx=mcp_context_client, flow_type=ORCHESTRATOR_COMPONENT_ID) assert isinstance(result, str) assert '```json' in result assert 'dependsOn' in result @pytest.mark.asyncio async def test_get_legacy_flow_schema_when_conditional_flows_enabled( self, mocker: MockerFixture, mcp_context_client: Context, ): """Test getting schema for legacy flow type when conditional flows are enabled.""" mock_project_info = mocker.Mock() mock_project_info.conditional_flows = True mocker.patch('keboola_mcp_server.tools.flow.tools.get_project_info', return_value=mock_project_info) result = await get_flow_schema(ctx=mcp_context_client, flow_type=ORCHESTRATOR_COMPONENT_ID) assert isinstance(result, str) assert '```json' in result assert 'dependsOn' in result @pytest.mark.asyncio async def test_get_conditional_flow_schema_when_conditional_flows_enabled( self, mocker: MockerFixture, mcp_context_client: Context, ): """Test getting schema for conditional flow type when conditional flows are enabled.""" mock_project_info = mocker.Mock() mock_project_info.conditional_flows = True mocker.patch('keboola_mcp_server.tools.flow.tools.get_project_info', return_value=mock_project_info) result = await get_flow_schema(ctx=mcp_context_client, flow_type=CONDITIONAL_FLOW_COMPONENT_ID) assert isinstance(result, str) assert '```json' in result assert 'keboola.flow' in result or 'conditional' in result.lower() assert 'next' in result @pytest.mark.asyncio async def test_get_conditional_flow_schema_fails_when_conditional_flows_disabled( self, mocker: MockerFixture, mcp_context_client: Context, ): """Test that requesting conditional flow schema fails when conditional flows are disabled.""" mock_project_info = mocker.Mock() mock_project_info.conditional_flows = False mock_project_info.project_name = 'Test Project' async def mock_get_project_info(ctx): return mock_project_info mocker.patch('keboola_mcp_server.tools.flow.tools.get_project_info', side_effect=mock_get_project_info) # Should raise ValueError with proper error message with pytest.raises(ValueError, match='Conditional flows are not supported.') as exc_info: await get_flow_schema(ctx=mcp_context_client, flow_type=CONDITIONAL_FLOW_COMPONENT_ID) error_message = str(exc_info.value) assert 'Conditional flows are not supported in this project' in error_message assert 'Test Project' in error_message assert 'conditional_flows=false' in error_message assert 'enable them in your project settings' in error_message # ============================================================================= # GET_FLOW_EXAMPLES TOOL TESTS # ============================================================================= class TestGetFlowExamplesTool: """Tests for the get_flow_examples tool.""" @pytest.mark.asyncio async def test_get_legacy_flow_examples( self, mocker: MockerFixture, mcp_context_client: Context, ): """Test getting examples for legacy flow type.""" mock_project_info = mocker.Mock() mock_project_info.conditional_flows = True async def mock_get_project_info(ctx): return mock_project_info mocker.patch('keboola_mcp_server.tools.flow.tools.get_project_info', side_effect=mock_get_project_info) # Mock the file path and content properly - using actual structure from the real file mock_file_content = [ ( '{"tasks":[{"id":1,"name":"keboola.wr-google-bigquery-v2-28356142",' '"task":{"mode":"run","configId":"28356142","componentId":"keboola.wr-google-bigquery-v2"},' '"phase":1,"continueOnFailure":false,"enabled":true}],' '"phases":[{"id":1,"name":"Scheduledconfiguration","dependsOn":[]}]}' ), ( '{"phases":[{"id":59812,"name":"Extraction","dependsOn":[],' '"description":"ExtractdatafromWhenIworkandPaychex"}],' '"tasks":[{"id":36614,"name":"ex-generic-v2-34446855","phase":59812,' '"task":{"componentId":"ex-generic-v2","configId":"34446855","mode":"run"},' '"continueOnFailure":false,"enabled":false}]}' ), ] # Mock the file path resolution mock_path = mocker.Mock() mock_path.__truediv__ = mocker.Mock(return_value=mock_path) mock_path.open = mocker.mock_open(read_data='\n'.join(mock_file_content)) # Mock the importlib.resources.files function mocker.patch('importlib.resources.files', return_value=mock_path) result = await get_flow_examples(ctx=mcp_context_client, flow_type=ORCHESTRATOR_COMPONENT_ID) assert isinstance(result, str) assert 'Flow Configuration Examples for `keboola.orchestrator`' in result assert 'keboola.wr-google-bigquery-v2-28356142' in result assert 'ex-generic-v2-34446855' in result assert 'Scheduledconfiguration' in result assert 'Extraction' in result @pytest.mark.asyncio async def test_get_conditional_flow_examples( self, mocker: MockerFixture, mcp_context_client: Context, ): """Test getting examples for conditional flow type.""" mock_project_info = mocker.Mock() mock_project_info.conditional_flows = True async def mock_get_project_info(ctx): return mock_project_info mocker.patch('keboola_mcp_server.tools.flow.tools.get_project_info', side_effect=mock_get_project_info) # Mock the file path and content properly - using actual structure from the real file mock_file_content = [ ( '{"tasks":[{"id":"40fef978-7092-4d79-a5b4-ea3fb2e38d03",' '"name":"keboola.wr-azure-event-hub-92021091",' '"phase":"6afbf55b-782c-47d7-bf70-f0ef1be6505b",' '"task":{"type":"job","mode":"run","componentId":"keboola.wr-azure-event-hub","configId":"92021091"},' '"enabled":true}],' '"phases":[{"id":"7dd992b0-9ac5-495b-b277-d8bc0b7e15d5",' '"name":"Phase1",' '"next":[{"id":"a25a4e4a-3042-49a2-81d8-fb1103957ebe",' '"goto":"6afbf55b-782c-47d7-bf70-f0ef1be6505b"}]}]}' ), ( '{"tasks":[{"id":"6bcc72d8-d9a5-4708-b0bd-53c4f6e839f7",' '"name":"keboola.python-transformation-v2-16550",' '"phase":"78c07164-0d1c-41d6-ba48-b821e781d830",' '"task":{"type":"job","mode":"run",' '"componentId":"keboola.python-transformation-v2","configId":"16550"},' '"enabled":true}],' '"phases":[{"id":"78c07164-0d1c-41d6-ba48-b821e781d830",' '"name":"Phase1",' '"next":[{"id":"e5dc7c43-d311-4e90-a2ca-6cac8d2eb5f5",' '"goto":"92649482-45d6-475d-aace-33466f37e381"}]}]}' ), ] # Mock the file path resolution mock_path = mocker.Mock() mock_path.__truediv__ = mocker.Mock(return_value=mock_path) mock_path.open = mocker.mock_open(read_data='\n'.join(mock_file_content)) # Mock the importlib.resources.files function mocker.patch('importlib.resources.files', return_value=mock_path) result = await get_flow_examples(ctx=mcp_context_client, flow_type=CONDITIONAL_FLOW_COMPONENT_ID) assert isinstance(result, str) assert 'Flow Configuration Examples for `keboola.flow`' in result assert 'keboola.wr-azure-event-hub-92021091' in result assert 'keboola.python-transformation-v2-16550' in result assert 'Phase1' in result @pytest.mark.asyncio async def test_get_conditional_flow_examples_when_conditional_flows_disabled( self, mocker: MockerFixture, mcp_context_client: Context, ): """Test that requesting conditional flow examples fails when conditional flows are disabled.""" mock_project_info = mocker.Mock() mock_project_info.conditional_flows = False mock_project_info.project_name = 'Test Project' async def mock_get_project_info(ctx): return mock_project_info mocker.patch('keboola_mcp_server.tools.flow.tools.get_project_info', side_effect=mock_get_project_info) # Mock the file path resolution (should not be called due to early failure) mock_path = mocker.Mock() mock_path.__truediv__ = mocker.Mock(return_value=mock_path) mock_path.open = mocker.mock_open() mocker.patch('importlib.resources.files', return_value=mock_path) # Should raise ValueError with proper error message with pytest.raises(ValueError, match='Conditional flows are not supported.') as exc_info: await get_flow_examples(ctx=mcp_context_client, flow_type=CONDITIONAL_FLOW_COMPONENT_ID) error_message = str(exc_info.value) assert 'Conditional flows are not supported in this project' in error_message assert 'Test Project' in error_message assert 'conditional_flows=false' in error_message assert 'enable them in your project settings' in error_message

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/keboola/keboola-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server