Skip to main content
Glama

Keboola Explorer MCP Server

test_tools.py40.2 kB
"""Unit tests for Flow management tools.""" from typing import Any, Dict, List import httpx import pytest from mcp.server.fastmcp import Context from pytest_mock import MockerFixture from keboola_mcp_server.clients.client import CONDITIONAL_FLOW_COMPONENT_ID, ORCHESTRATOR_COMPONENT_ID, KeboolaClient from keboola_mcp_server.links import Link from keboola_mcp_server.tools.flow.model import ( ConditionalFlowConfiguration, ConditionalFlowPhase, ConditionalFlowTask, Flow, FlowConfiguration, FlowPhase, FlowSummary, FlowTask, GetFlowsDetailOutput, GetFlowsListOutput, ) from keboola_mcp_server.tools.flow.tools import ( FlowToolOutput, create_conditional_flow, create_flow, get_flow_examples, get_flow_schema, get_flows, update_flow, ) # ============================================================================= # FLOW DATA FIXTURES # ============================================================================= @pytest.fixture def legacy_flow_phases() -> List[Dict[str, Any]]: """Sample legacy flow phases.""" return [ {'id': 1, 'name': 'Data Extraction', 'description': 'Extract data from various sources', 'dependsOn': []}, {'id': 2, 'name': 'Data Transformation', 'description': 'Transform and process data', 'dependsOn': [1]}, {'id': 3, 'name': 'Data Loading', 'description': 'Load data to destination', 'dependsOn': [2]}, ] @pytest.fixture def legacy_flow_tasks() -> List[Dict[str, Any]]: """Sample legacy flow tasks.""" return [ { 'id': 20001, 'name': 'Extract from S3', 'phase': 1, 'enabled': True, 'continueOnFailure': False, 'task': {'componentId': 'keboola.ex-aws-s3', 'configId': '123456', 'mode': 'run'}, }, { 'id': 20002, 'name': 'Transform Data', 'phase': 2, 'enabled': True, 'continueOnFailure': False, 'task': {'componentId': 'keboola.snowflake-transformation', 'configId': '789012', 'mode': 'run'}, }, { 'id': 20003, 'name': 'Load to Warehouse', 'phase': 3, 'enabled': True, 'continueOnFailure': False, 'task': {'componentId': 'keboola.wr-snowflake', 'configId': '345678', 'mode': 'run'}, }, ] @pytest.fixture def mock_conditional_flow_phases() -> List[Dict[str, Any]]: """Sample conditional flow phases with simple configuration.""" return [ { 'id': 'phase1', 'name': 'Simple Phase', 'description': 'A simple conditional flow phase', 'next': [{'id': 'transition1', 'name': 'Simple Transition', 'goto': None}], } ] @pytest.fixture def mock_conditional_flow_tasks() -> List[Dict[str, Any]]: """Sample conditional flow tasks with simple configuration.""" return [ { 'id': 'task1', 'name': 'Simple Task', 'phase': 'phase1', 'enabled': True, 'task': { 'type': 'notification', 'recipients': [{'channel': 'email', 'address': 'admin@company.com'}], 'title': 'Simple Notification', 'message': 'This is a simple notification task', }, } ] @pytest.fixture def mock_conditional_flow( mock_conditional_flow_phases: List[Dict[str, Any]], mock_conditional_flow_tasks: List[Dict[str, Any]] ) -> Dict[str, Any]: """Mock conditional flow configuration response for get_flow endpoint.""" return { 'component_id': CONDITIONAL_FLOW_COMPONENT_ID, 'configuration_id': 'conditional_flow_456', 'name': 'Advanced Data Pipeline', 'description': 'Advanced pipeline with conditional logic and error handling', 'created': '2025-01-15T11:00:00Z', 'updated': '2025-01-15T11:00:00Z', 'creatorToken': {'id': 'test_token', 'description': 'Test token'}, 'version': 1, 'changeDescription': 'Initial creation', 'isDisabled': False, 'isDeleted': False, 'configuration': {'phases': mock_conditional_flow_phases, 'tasks': mock_conditional_flow_tasks}, 'rows': [], 'metadata': [], } @pytest.fixture def mock_conditional_flow_create_update( mock_conditional_flow_phases: List[Dict[str, Any]], mock_conditional_flow_tasks: List[Dict[str, Any]] ) -> Dict[str, Any]: """Mock conditional flow configuration response for create/update endpoints.""" return { 'id': 'conditional_flow_456', 'name': 'Advanced Data Pipeline', 'description': 'Advanced pipeline with conditional logic and error handling', 'created': '2025-01-15T11:00:00Z', 'creatorToken': {'id': 'test_token', 'description': 'Test token'}, 'version': 1, 'changeDescription': 'Initial creation', 'isDisabled': False, 'isDeleted': False, 'configuration': {'phases': mock_conditional_flow_phases, 'tasks': mock_conditional_flow_tasks}, 'state': {}, 'currentVersion': {'version': 1}, } @pytest.fixture def mock_legacy_flow_create_update( legacy_flow_phases: list[dict[str, Any]], legacy_flow_tasks: list[dict[str, Any]] ) -> dict[str, Any]: """Mock legacy flow configuration response for create/update endpoints.""" return { 'id': 'legacy_flow_123', 'name': 'Legacy ETL Pipeline', 'description': 'Traditional ETL pipeline using legacy flows', 'created': '2025-01-15T10:30:00Z', 'creatorToken': {'id': 'test_token', 'description': 'Test token'}, 'version': 1, 'changeDescription': 'Initial creation', 'isDisabled': False, 'isDeleted': False, 'configuration': {'phases': legacy_flow_phases, 'tasks': legacy_flow_tasks}, 'state': {}, 'currentVersion': {'version': 1}, } @pytest.fixture def mock_legacy_flow( legacy_flow_phases: list[dict[str, Any]], legacy_flow_tasks: list[dict[str, Any]] ) -> dict[str, Any]: """Mock legacy flow configuration response for get_flow endpoint.""" return { 'component_id': ORCHESTRATOR_COMPONENT_ID, 'configuration_id': 'legacy_flow_123', 'name': 'Legacy ETL Pipeline', 'description': 'Traditional ETL pipeline using legacy flows', 'created': '2025-01-15T10:30:00Z', 'updated': '2025-01-15T10:30:00Z', 'creatorToken': {'id': 'test_token', 'description': 'Test token'}, 'version': 1, 'changeDescription': 'Initial creation', 'isDisabled': False, 'isDeleted': False, 'configuration': {'phases': legacy_flow_phases, 'tasks': legacy_flow_tasks}, 'rows': [], 'metadata': [], } # ============================================================================= # CREATE_FLOW TOOL TESTS # ============================================================================= class TestCreateFlowTool: """Tests for the create_flow tool.""" @pytest.mark.asyncio async def test_create_legacy_flow( self, mocker: MockerFixture, mcp_context_client: Context, legacy_flow_phases: list[dict[str, Any]], legacy_flow_tasks: list[dict[str, Any]], mock_legacy_flow_create_update: dict[str, Any], ): """Should create a new legacy (orchestrator) flow with valid phases/tasks.""" keboola_client = KeboolaClient.from_state(mcp_context_client.session.state) mocker.patch.object( keboola_client.storage_client, 'configuration_create', return_value=mock_legacy_flow_create_update, ) result = await create_flow( ctx=mcp_context_client, name='Legacy ETL Pipeline', description='Traditional ETL pipeline using legacy flows', phases=legacy_flow_phases, tasks=legacy_flow_tasks, ) assert isinstance(result, FlowToolOutput) assert result.success is True assert result.configuration_id == mock_legacy_flow_create_update['id'] assert result.component_id == 'keboola.orchestrator' assert result.description == mock_legacy_flow_create_update['description'] assert result.timestamp is not None assert len(result.links) == 3 assert result.version == mock_legacy_flow_create_update['version'] keboola_client.storage_client.configuration_create.assert_called_once() @pytest.mark.asyncio async def test_create_conditional_flow( self, mocker: MockerFixture, mcp_context_client: Context, mock_conditional_flow_create_update: Dict[str, Any], ): """Test conditional flow creation.""" keboola_client = KeboolaClient.from_state(mcp_context_client.session.state) keboola_client.storage_client.configuration_create = mocker.AsyncMock( return_value=mock_conditional_flow_create_update ) result = await create_conditional_flow( ctx=mcp_context_client, name='Advanced Data Pipeline', description='Advanced pipeline with conditional logic and error handling', phases=mock_conditional_flow_create_update['configuration']['phases'], tasks=mock_conditional_flow_create_update['configuration']['tasks'], ) assert isinstance(result, FlowToolOutput) assert result.success is True assert result.configuration_id == mock_conditional_flow_create_update['id'] assert result.component_id == 'keboola.flow' assert result.description == mock_conditional_flow_create_update['description'] assert result.timestamp is not None assert len(result.links) == 3 assert result.version == mock_conditional_flow_create_update['version'] keboola_client.storage_client.configuration_create.assert_called_once() # ============================================================================= # UPDATE_FLOW TOOL TESTS # ============================================================================= class TestUpdateFlowTool: """Tests for the update_flow tool.""" # TODO: The test_update_*() tests need to cover different variations of the tool's parameters # and properly check that the original flow was correctly updated. @pytest.mark.asyncio async def test_update_legacy_flow( self, mocker: MockerFixture, mcp_context_client: Context, legacy_flow_phases: List[Dict[str, Any]], legacy_flow_tasks: List[Dict[str, Any]], mock_legacy_flow_create_update: Dict[str, Any], ): """Test legacy flow update with new phases and tasks.""" mock_project_info = mocker.Mock() mock_project_info.conditional_flows = True async def mock_get_project_info(ctx): return mock_project_info mocker.patch('keboola_mcp_server.tools.flow.tools.get_project_info', side_effect=mock_get_project_info) updated_config = mock_legacy_flow_create_update.copy() updated_config['version'] = 2 updated_config['description'] = 'Updated legacy ETL pipeline' keboola_client = KeboolaClient.from_state(mcp_context_client.session.state) keboola_client.storage_client.configuration_detail = mocker.AsyncMock(return_value={}) keboola_client.storage_client.configuration_update = mocker.AsyncMock(return_value=updated_config) result = await update_flow( ctx=mcp_context_client, configuration_id='legacy_flow_123', flow_type=ORCHESTRATOR_COMPONENT_ID, name='Updated Legacy ETL Pipeline', description='Updated legacy ETL pipeline', phases=legacy_flow_phases, tasks=legacy_flow_tasks, change_description='Added data validation phase and enhanced error handling', ) assert isinstance(result, FlowToolOutput) assert result.success is True assert result.configuration_id == 'legacy_flow_123' assert result.component_id == 'keboola.orchestrator' assert result.description == 'Updated legacy ETL pipeline' assert result.timestamp is not None assert len(result.links) == 3 assert result.version == updated_config['version'] keboola_client.storage_client.configuration_update.assert_called_once() @pytest.mark.asyncio async def test_update_conditional_flow( self, mocker: MockerFixture, mcp_context_client: Context, mock_conditional_flow_create_update: Dict[str, Any], ): """Test conditional flow update with enhanced conditions.""" mock_project_info = mocker.Mock() mock_project_info.conditional_flows = True async def mock_get_project_info(ctx): return mock_project_info mocker.patch('keboola_mcp_server.tools.flow.tools.get_project_info', side_effect=mock_get_project_info) updated_config = mock_conditional_flow_create_update.copy() updated_config['version'] = 2 updated_config['name'] = 'Enhanced Advanced Data Pipeline' updated_config['description'] = 'Enhanced pipeline with improved conditional logic' keboola_client = KeboolaClient.from_state(mcp_context_client.session.state) keboola_client.storage_client.configuration_detail = mocker.AsyncMock(return_value={}) keboola_client.storage_client.configuration_update = mocker.AsyncMock(return_value=updated_config) result = await update_flow( ctx=mcp_context_client, configuration_id='conditional_flow_456', flow_type=CONDITIONAL_FLOW_COMPONENT_ID, name='Enhanced Advanced Data Pipeline', description='Enhanced pipeline with improved conditional logic', phases=mock_conditional_flow_create_update['configuration']['phases'], tasks=mock_conditional_flow_create_update['configuration']['tasks'], change_description='Enhanced error handling and added notification phase', ) assert isinstance(result, FlowToolOutput) assert result.success is True assert result.configuration_id == updated_config['id'] assert result.component_id == 'keboola.flow' assert result.description == updated_config['description'] assert result.timestamp is not None assert len(result.links) == 3 assert result.version == updated_config['version'] keboola_client.storage_client.configuration_update.assert_called_once() @pytest.mark.asyncio async def test_update_conditional_flow_fails_when_conditional_flows_disabled( self, mocker: MockerFixture, mcp_context_client: Context, ): """Test that updating conditional flow fails when conditional flows are disabled.""" # Mock project info with conditional flows disabled mock_project_info = mocker.Mock() mock_project_info.conditional_flows = False mock_project_info.project_name = 'Test Project' async def mock_get_project_info(ctx): return mock_project_info mocker.patch('keboola_mcp_server.tools.flow.tools.get_project_info', side_effect=mock_get_project_info) # Should raise ValueError with proper error message with pytest.raises(ValueError, match='Conditional flows are not supported.') as exc_info: await update_flow( ctx=mcp_context_client, configuration_id='test-config-id', flow_type=CONDITIONAL_FLOW_COMPONENT_ID, name='Updated Conditional Flow', description='Updated description for conditional flow', phases=[], tasks=[], change_description='Test update', ) error_message = str(exc_info.value) assert 'Conditional flows are not supported in this project' in error_message assert 'Test Project' in error_message assert 'conditional_flows=false' in error_message assert 'enable them in your project settings' in error_message # ============================================================================= # GET_FLOWS TOOL TESTS # ============================================================================= class TestGetFlowsTool: """Tests for the get_flows tool.""" @pytest.mark.asyncio async def test_get_flows_with_legacy_flow_id( self, mocker: MockerFixture, mcp_context_client: Context, mock_legacy_flow: dict[str, Any], legacy_flow_phases: list[dict[str, Any]], legacy_flow_tasks: list[dict[str, Any]], ): """Should fall back to legacy flow when conditional flow is missing (404).""" async def mock_configuration_detail(component_id: str, configuration_id: str) -> dict[str, Any]: if component_id == CONDITIONAL_FLOW_COMPONENT_ID: response = mocker.Mock(status_code=404) raise httpx.HTTPStatusError('404 Not Found', request=None, response=response) if component_id == ORCHESTRATOR_COMPONENT_ID: return mock_legacy_flow raise ValueError(f'Unexpected component_id: {component_id}') keboola_client = KeboolaClient.from_state(mcp_context_client.session.state) mocker.patch.object( keboola_client.storage_client, 'configuration_detail', side_effect=mock_configuration_detail ) result = await get_flows( ctx=mcp_context_client, flow_ids=[mock_legacy_flow['configuration_id']], ) # Get URL components from context storage_api_url = keboola_client.storage_api_url project_id = await keboola_client.storage_client.project_id() base_url = f'{storage_api_url}/admin/projects/{project_id}' expected_flow = Flow( component_id=ORCHESTRATOR_COMPONENT_ID, configuration_id=mock_legacy_flow['configuration_id'], name=mock_legacy_flow['name'], description=mock_legacy_flow['description'], version=mock_legacy_flow['version'], is_disabled=mock_legacy_flow['isDisabled'], is_deleted=mock_legacy_flow['isDeleted'], configuration=FlowConfiguration( phases=[FlowPhase.model_validate(p) for p in legacy_flow_phases], tasks=[FlowTask.model_validate(t) for t in legacy_flow_tasks], ), change_description=mock_legacy_flow['changeDescription'], configuration_metadata=mock_legacy_flow['metadata'], created=mock_legacy_flow['created'], updated=mock_legacy_flow['updated'], links=[ Link( type='ui-detail', title=f"Flow: {mock_legacy_flow['name']}", url=f"{base_url}/flows/{mock_legacy_flow['configuration_id']}", ), Link(type='ui-dashboard', title='Flows in the project', url=f'{base_url}/flows'), Link(type='docs', title='Documentation for Keboola Flows', url='https://help.keboola.com/flows/'), ], ) assert result == GetFlowsDetailOutput(flows=[expected_flow]) @pytest.mark.asyncio async def test_get_flows_with_conditional_flow_id( self, mocker: MockerFixture, mcp_context_client: Context, mock_conditional_flow: Dict[str, Any], mock_conditional_flow_phases: list[dict[str, Any]], mock_conditional_flow_tasks: list[dict[str, Any]], ): """Test retrieving conditional flow details.""" keboola_client = KeboolaClient.from_state(mcp_context_client.session.state) keboola_client.storage_client.configuration_detail = mocker.AsyncMock(return_value=mock_conditional_flow) result = await get_flows(ctx=mcp_context_client, flow_ids=[mock_conditional_flow['configuration_id']]) # Get URL components from context storage_api_url = keboola_client.storage_api_url project_id = await keboola_client.storage_client.project_id() base_url = f'{storage_api_url}/admin/projects/{project_id}' expected_flow = Flow( component_id=CONDITIONAL_FLOW_COMPONENT_ID, configuration_id=mock_conditional_flow['configuration_id'], name=mock_conditional_flow['name'], description=mock_conditional_flow['description'], version=mock_conditional_flow['version'], is_disabled=mock_conditional_flow['isDisabled'], is_deleted=mock_conditional_flow['isDeleted'], configuration=ConditionalFlowConfiguration( phases=[ConditionalFlowPhase.model_validate(p) for p in mock_conditional_flow_phases], tasks=[ConditionalFlowTask.model_validate(t) for t in mock_conditional_flow_tasks], ), change_description=mock_conditional_flow['changeDescription'], configuration_metadata=mock_conditional_flow['metadata'], created=mock_conditional_flow['created'], updated=mock_conditional_flow['updated'], links=[ Link( type='ui-detail', title=f"Flow: {mock_conditional_flow['name']}", url=f"{base_url}/flows-v2/{mock_conditional_flow['configuration_id']}", ), Link(type='ui-dashboard', title='Conditional Flows in the project', url=f'{base_url}/flows-v2'), Link(type='docs', title='Documentation for Keboola Flows', url='https://help.keboola.com/flows/'), ], ) assert result == GetFlowsDetailOutput(flows=[expected_flow]) @pytest.mark.asyncio async def test_get_flows_no_params( self, mocker: MockerFixture, mcp_context_client: Context, mock_legacy_flow: Dict[str, Any], mock_conditional_flow: Dict[str, Any], ): """Test listing flows of both types.""" keboola_client = KeboolaClient.from_state(mcp_context_client.session.state) def mock_configuration_list(component_id): if component_id == ORCHESTRATOR_COMPONENT_ID: return [mock_legacy_flow] elif component_id == CONDITIONAL_FLOW_COMPONENT_ID: return [mock_conditional_flow] return [] keboola_client.storage_client.configuration_list = mocker.AsyncMock(side_effect=mock_configuration_list) result = await get_flows(ctx=mcp_context_client) # Get URL components from context storage_api_url = keboola_client.storage_api_url project_id = await keboola_client.storage_client.project_id() base_url = f'{storage_api_url}/admin/projects/{project_id}' expected_legacy_summary = FlowSummary( component_id=ORCHESTRATOR_COMPONENT_ID, configuration_id=mock_legacy_flow['configuration_id'], name=mock_legacy_flow['name'], description=mock_legacy_flow['description'], version=mock_legacy_flow['version'], is_disabled=mock_legacy_flow['isDisabled'], is_deleted=mock_legacy_flow['isDeleted'], phases_count=len(mock_legacy_flow['configuration']['phases']), tasks_count=len(mock_legacy_flow['configuration']['tasks']), created=mock_legacy_flow['created'], updated=mock_legacy_flow['updated'], ) expected_conditional_summary = FlowSummary( component_id=CONDITIONAL_FLOW_COMPONENT_ID, configuration_id=mock_conditional_flow['configuration_id'], name=mock_conditional_flow['name'], description=mock_conditional_flow['description'], version=mock_conditional_flow['version'], is_disabled=mock_conditional_flow['isDisabled'], is_deleted=mock_conditional_flow['isDeleted'], phases_count=len(mock_conditional_flow['configuration']['phases']), tasks_count=len(mock_conditional_flow['configuration']['tasks']), created=mock_conditional_flow['created'], updated=mock_conditional_flow['updated'], ) expected_links = [ Link(type='ui-dashboard', title='Flows in the project', url=f'{base_url}/flows'), Link(type='ui-dashboard', title='Conditional Flows in the project', url=f'{base_url}/flows-v2'), ] # Note: flows are returned in FLOW_TYPES order (conditional flows first, then legacy) assert result == GetFlowsListOutput( flows=[expected_conditional_summary, expected_legacy_summary], links=expected_links, ) assert keboola_client.storage_client.configuration_list.call_count == 2 @pytest.mark.asyncio async def test_get_flows_specific_ids_mixed_types( self, mocker: MockerFixture, mcp_context_client: Context, mock_legacy_flow: Dict[str, Any], mock_conditional_flow: Dict[str, Any], legacy_flow_phases: list[dict[str, Any]], legacy_flow_tasks: list[dict[str, Any]], mock_conditional_flow_phases: list[dict[str, Any]], mock_conditional_flow_tasks: list[dict[str, Any]], ): """Test retrieving specific flows by ID when they're different types.""" keboola_client = KeboolaClient.from_state(mcp_context_client.session.state) legacy_id = mock_legacy_flow['configuration_id'] conditional_id = mock_conditional_flow['configuration_id'] def mock_configuration_detail(component_id, configuration_id): if configuration_id == legacy_id and component_id == ORCHESTRATOR_COMPONENT_ID: return mock_legacy_flow elif configuration_id == conditional_id and component_id == CONDITIONAL_FLOW_COMPONENT_ID: return mock_conditional_flow raise Exception(f'Configuration {configuration_id} not found') keboola_client.storage_client.configuration_detail = mocker.AsyncMock(side_effect=mock_configuration_detail) result = await get_flows(ctx=mcp_context_client, flow_ids=[legacy_id, conditional_id]) # Get URL components from context storage_api_url = keboola_client.storage_api_url project_id = await keboola_client.storage_client.project_id() base_url = f'{storage_api_url}/admin/projects/{project_id}' expected_legacy_flow = Flow( component_id=ORCHESTRATOR_COMPONENT_ID, configuration_id=mock_legacy_flow['configuration_id'], name=mock_legacy_flow['name'], description=mock_legacy_flow['description'], version=mock_legacy_flow['version'], is_disabled=mock_legacy_flow['isDisabled'], is_deleted=mock_legacy_flow['isDeleted'], configuration=FlowConfiguration( phases=[FlowPhase.model_validate(p) for p in legacy_flow_phases], tasks=[FlowTask.model_validate(t) for t in legacy_flow_tasks], ), change_description=mock_legacy_flow['changeDescription'], configuration_metadata=mock_legacy_flow['metadata'], created=mock_legacy_flow['created'], updated=mock_legacy_flow['updated'], links=[ Link( type='ui-detail', title=f"Flow: {mock_legacy_flow['name']}", url=f"{base_url}/flows/{mock_legacy_flow['configuration_id']}", ), Link(type='ui-dashboard', title='Flows in the project', url=f'{base_url}/flows'), Link(type='docs', title='Documentation for Keboola Flows', url='https://help.keboola.com/flows/'), ], ) expected_conditional_flow = Flow( component_id=CONDITIONAL_FLOW_COMPONENT_ID, configuration_id=mock_conditional_flow['configuration_id'], name=mock_conditional_flow['name'], description=mock_conditional_flow['description'], version=mock_conditional_flow['version'], is_disabled=mock_conditional_flow['isDisabled'], is_deleted=mock_conditional_flow['isDeleted'], configuration=ConditionalFlowConfiguration( phases=[ConditionalFlowPhase.model_validate(p) for p in mock_conditional_flow_phases], tasks=[ConditionalFlowTask.model_validate(t) for t in mock_conditional_flow_tasks], ), change_description=mock_conditional_flow['changeDescription'], configuration_metadata=mock_conditional_flow['metadata'], created=mock_conditional_flow['created'], updated=mock_conditional_flow['updated'], links=[ Link( type='ui-detail', title=f"Flow: {mock_conditional_flow['name']}", url=f"{base_url}/flows-v2/{mock_conditional_flow['configuration_id']}", ), Link(type='ui-dashboard', title='Conditional Flows in the project', url=f'{base_url}/flows-v2'), Link(type='docs', title='Documentation for Keboola Flows', url='https://help.keboola.com/flows/'), ], ) assert result == GetFlowsDetailOutput(flows=[expected_legacy_flow, expected_conditional_flow]) # Since we look up for both types (conditional flows first) we expect the calls to be 2 and 1, respectfully assert keboola_client.storage_client.configuration_detail.call_count == 3 # ============================================================================= # GET_FLOW_SCHEMA TOOL TESTS # ============================================================================= class TestGetFlowSchemaTool: """Tests for the get_flow_schema tool.""" @pytest.mark.asyncio async def test_get_legacy_flow_schema_when_conditional_flows_disabled( self, mocker: MockerFixture, mcp_context_client: Context, ): """Test getting schema for legacy flow type when conditional flows are disabled.""" mock_project_info = mocker.Mock() mock_project_info.conditional_flows = False mocker.patch('keboola_mcp_server.tools.flow.tools.get_project_info', return_value=mock_project_info) result = await get_flow_schema(ctx=mcp_context_client, flow_type=ORCHESTRATOR_COMPONENT_ID) assert isinstance(result, str) assert '```json' in result assert 'dependsOn' in result @pytest.mark.asyncio async def test_get_legacy_flow_schema_when_conditional_flows_enabled( self, mocker: MockerFixture, mcp_context_client: Context, ): """Test getting schema for legacy flow type when conditional flows are enabled.""" mock_project_info = mocker.Mock() mock_project_info.conditional_flows = True mocker.patch('keboola_mcp_server.tools.flow.tools.get_project_info', return_value=mock_project_info) result = await get_flow_schema(ctx=mcp_context_client, flow_type=ORCHESTRATOR_COMPONENT_ID) assert isinstance(result, str) assert '```json' in result assert 'dependsOn' in result @pytest.mark.asyncio async def test_get_conditional_flow_schema_when_conditional_flows_enabled( self, mocker: MockerFixture, mcp_context_client: Context, ): """Test getting schema for conditional flow type when conditional flows are enabled.""" mock_project_info = mocker.Mock() mock_project_info.conditional_flows = True mocker.patch('keboola_mcp_server.tools.flow.tools.get_project_info', return_value=mock_project_info) result = await get_flow_schema(ctx=mcp_context_client, flow_type=CONDITIONAL_FLOW_COMPONENT_ID) assert isinstance(result, str) assert '```json' in result assert 'keboola.flow' in result or 'conditional' in result.lower() assert 'next' in result @pytest.mark.asyncio async def test_get_conditional_flow_schema_fails_when_conditional_flows_disabled( self, mocker: MockerFixture, mcp_context_client: Context, ): """Test that requesting conditional flow schema fails when conditional flows are disabled.""" mock_project_info = mocker.Mock() mock_project_info.conditional_flows = False mock_project_info.project_name = 'Test Project' async def mock_get_project_info(ctx): return mock_project_info mocker.patch('keboola_mcp_server.tools.flow.tools.get_project_info', side_effect=mock_get_project_info) # Should raise ValueError with proper error message with pytest.raises(ValueError, match='Conditional flows are not supported.') as exc_info: await get_flow_schema(ctx=mcp_context_client, flow_type=CONDITIONAL_FLOW_COMPONENT_ID) error_message = str(exc_info.value) assert 'Conditional flows are not supported in this project' in error_message assert 'Test Project' in error_message assert 'conditional_flows=false' in error_message assert 'enable them in your project settings' in error_message # ============================================================================= # GET_FLOW_EXAMPLES TOOL TESTS # ============================================================================= class TestGetFlowExamplesTool: """Tests for the get_flow_examples tool.""" @pytest.mark.asyncio async def test_get_legacy_flow_examples( self, mocker: MockerFixture, mcp_context_client: Context, ): """Test getting examples for legacy flow type.""" mock_project_info = mocker.Mock() mock_project_info.conditional_flows = True async def mock_get_project_info(ctx): return mock_project_info mocker.patch('keboola_mcp_server.tools.flow.tools.get_project_info', side_effect=mock_get_project_info) # Mock the file path and content properly - using actual structure from the real file mock_file_content = [ ( '{"tasks":[{"id":1,"name":"keboola.wr-google-bigquery-v2-28356142",' '"task":{"mode":"run","configId":"28356142","componentId":"keboola.wr-google-bigquery-v2"},' '"phase":1,"continueOnFailure":false,"enabled":true}],' '"phases":[{"id":1,"name":"Scheduledconfiguration","dependsOn":[]}]}' ), ( '{"phases":[{"id":59812,"name":"Extraction","dependsOn":[],' '"description":"ExtractdatafromWhenIworkandPaychex"}],' '"tasks":[{"id":36614,"name":"ex-generic-v2-34446855","phase":59812,' '"task":{"componentId":"ex-generic-v2","configId":"34446855","mode":"run"},' '"continueOnFailure":false,"enabled":false}]}' ), ] # Mock the file path resolution mock_path = mocker.Mock() mock_path.__truediv__ = mocker.Mock(return_value=mock_path) mock_path.open = mocker.mock_open(read_data='\n'.join(mock_file_content)) # Mock the importlib.resources.files function mocker.patch('importlib.resources.files', return_value=mock_path) result = await get_flow_examples(ctx=mcp_context_client, flow_type=ORCHESTRATOR_COMPONENT_ID) assert isinstance(result, str) assert 'Flow Configuration Examples for `keboola.orchestrator`' in result assert 'keboola.wr-google-bigquery-v2-28356142' in result assert 'ex-generic-v2-34446855' in result assert 'Scheduledconfiguration' in result assert 'Extraction' in result @pytest.mark.asyncio async def test_get_conditional_flow_examples( self, mocker: MockerFixture, mcp_context_client: Context, ): """Test getting examples for conditional flow type.""" mock_project_info = mocker.Mock() mock_project_info.conditional_flows = True async def mock_get_project_info(ctx): return mock_project_info mocker.patch('keboola_mcp_server.tools.flow.tools.get_project_info', side_effect=mock_get_project_info) # Mock the file path and content properly - using actual structure from the real file mock_file_content = [ ( '{"tasks":[{"id":"40fef978-7092-4d79-a5b4-ea3fb2e38d03",' '"name":"keboola.wr-azure-event-hub-92021091",' '"phase":"6afbf55b-782c-47d7-bf70-f0ef1be6505b",' '"task":{"type":"job","mode":"run","componentId":"keboola.wr-azure-event-hub","configId":"92021091"},' '"enabled":true}],' '"phases":[{"id":"7dd992b0-9ac5-495b-b277-d8bc0b7e15d5",' '"name":"Phase1",' '"next":[{"id":"a25a4e4a-3042-49a2-81d8-fb1103957ebe",' '"goto":"6afbf55b-782c-47d7-bf70-f0ef1be6505b"}]}]}' ), ( '{"tasks":[{"id":"6bcc72d8-d9a5-4708-b0bd-53c4f6e839f7",' '"name":"keboola.python-transformation-v2-16550",' '"phase":"78c07164-0d1c-41d6-ba48-b821e781d830",' '"task":{"type":"job","mode":"run",' '"componentId":"keboola.python-transformation-v2","configId":"16550"},' '"enabled":true}],' '"phases":[{"id":"78c07164-0d1c-41d6-ba48-b821e781d830",' '"name":"Phase1",' '"next":[{"id":"e5dc7c43-d311-4e90-a2ca-6cac8d2eb5f5",' '"goto":"92649482-45d6-475d-aace-33466f37e381"}]}]}' ), ] # Mock the file path resolution mock_path = mocker.Mock() mock_path.__truediv__ = mocker.Mock(return_value=mock_path) mock_path.open = mocker.mock_open(read_data='\n'.join(mock_file_content)) # Mock the importlib.resources.files function mocker.patch('importlib.resources.files', return_value=mock_path) result = await get_flow_examples(ctx=mcp_context_client, flow_type=CONDITIONAL_FLOW_COMPONENT_ID) assert isinstance(result, str) assert 'Flow Configuration Examples for `keboola.flow`' in result assert 'keboola.wr-azure-event-hub-92021091' in result assert 'keboola.python-transformation-v2-16550' in result assert 'Phase1' in result @pytest.mark.asyncio async def test_get_conditional_flow_examples_when_conditional_flows_disabled( self, mocker: MockerFixture, mcp_context_client: Context, ): """Test that requesting conditional flow examples fails when conditional flows are disabled.""" mock_project_info = mocker.Mock() mock_project_info.conditional_flows = False mock_project_info.project_name = 'Test Project' async def mock_get_project_info(ctx): return mock_project_info mocker.patch('keboola_mcp_server.tools.flow.tools.get_project_info', side_effect=mock_get_project_info) # Mock the file path resolution (should not be called due to early failure) mock_path = mocker.Mock() mock_path.__truediv__ = mocker.Mock(return_value=mock_path) mock_path.open = mocker.mock_open() mocker.patch('importlib.resources.files', return_value=mock_path) # Should raise ValueError with proper error message with pytest.raises(ValueError, match='Conditional flows are not supported.') as exc_info: await get_flow_examples(ctx=mcp_context_client, flow_type=CONDITIONAL_FLOW_COMPONENT_ID) error_message = str(exc_info.value) assert 'Conditional flows are not supported in this project' in error_message assert 'Test Project' in error_message assert 'conditional_flows=false' in error_message assert 'enable them in your project settings' in error_message

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/keboola/keboola-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server