Skip to main content
Glama

Keboola Explorer MCP Server

test_tools.py34.3 kB
import json import logging from typing import Any, AsyncGenerator import pytest import pytest_asyncio from fastmcp import Client, Context, FastMCP from pydantic import ValidationError from integtests.conftest import ConfigDef, ProjectDef from keboola_mcp_server.clients.client import ( CONDITIONAL_FLOW_COMPONENT_ID, ORCHESTRATOR_COMPONENT_ID, FlowType, KeboolaClient, get_metadata_property, ) from keboola_mcp_server.config import Config, MetadataField, ServerRuntimeInfo from keboola_mcp_server.links import Link, ProjectLinksManager from keboola_mcp_server.server import create_server from keboola_mcp_server.tools.flow.model import Flow from keboola_mcp_server.tools.flow.tools import ( FlowToolOutput, ListFlowsOutput, create_conditional_flow, create_flow, get_flow, get_flow_schema, list_flows, ) from keboola_mcp_server.tools.project import get_project_info LOG = logging.getLogger(__name__) @pytest.mark.asyncio async def test_create_and_retrieve_flow(mcp_context: Context, configs: list[ConfigDef]) -> None: """ Create a flow and retrieve it using list_flows. :param mcp_context: The test context fixture. :param configs: List of real configuration definitions. """ assert configs assert configs[0].configuration_id is not None flow_type = ORCHESTRATOR_COMPONENT_ID phases = [ {'name': 'Extract', 'dependsOn': [], 'description': 'Extract data'}, {'name': 'Transform', 'dependsOn': [1], 'description': 'Transform data'}, ] tasks = [ { 'name': 'Extract Task', 'phase': 1, 'task': { 'componentId': configs[0].component_id, 'configId': configs[0].configuration_id, }, }, { 'name': 'Transform Task', 'phase': 2, 'task': { 'componentId': configs[0].component_id, 'configId': configs[0].configuration_id, }, }, ] flow_name = 'Integration Test Flow' flow_description = 'Flow created by integration test.' created = await create_flow( ctx=mcp_context, name=flow_name, description=flow_description, phases=phases, tasks=tasks, ) flow_id = created.configuration_id client = KeboolaClient.from_state(mcp_context.session.state) links_manager = await ProjectLinksManager.from_client(client) expected_links = [ links_manager.get_flow_detail_link(flow_id=flow_id, flow_name=flow_name, flow_type=flow_type), links_manager.get_flows_dashboard_link(flow_type=flow_type), links_manager.get_flows_docs_link(), ] try: assert isinstance(created, FlowToolOutput) assert created.component_id == ORCHESTRATOR_COMPONENT_ID assert created.description == flow_description # Verify the links of created flow assert created.success is True assert set(created.links) == set(expected_links) assert created.version is not None # Verify the flow is listed in the list_flows tool result = await list_flows(mcp_context) assert any(f.name == flow_name for f in result.flows) found = [f for f in result.flows if f.configuration_id == flow_id][0] flow = await get_flow(mcp_context, configuration_id=found.configuration_id) assert isinstance(flow, Flow) assert flow.component_id == ORCHESTRATOR_COMPONENT_ID assert flow.configuration_id == found.configuration_id assert flow.configuration.phases[0].name == 'Extract' assert flow.configuration.phases[1].name == 'Transform' assert flow.configuration.tasks[0].task['componentId'] == configs[0].component_id assert set(flow.links) == set(expected_links) # Verify the metadata - check that KBC.MCP.createdBy is set to 'true' metadata = await client.storage_client.configuration_metadata_get( component_id=ORCHESTRATOR_COMPONENT_ID, configuration_id=flow_id ) # Convert metadata list to dictionary for easier checking # metadata is a list of dicts with 'key' and 'value' keys assert isinstance(metadata, list) metadata_dict = {item['key']: item['value'] for item in metadata if isinstance(item, dict)} assert MetadataField.CREATED_BY_MCP in metadata_dict assert metadata_dict[MetadataField.CREATED_BY_MCP] == 'true' finally: await client.storage_client.configuration_delete( component_id=ORCHESTRATOR_COMPONENT_ID, configuration_id=flow_id, skip_trash=True, ) @pytest.mark.asyncio async def test_create_and_retrieve_conditional_flow(mcp_context: Context, configs: list[ConfigDef]) -> None: """ Create a conditional flow and retrieve it using list_flows. :param mcp_context: The test context fixture. :param configs: List of real configuration definitions. """ assert configs assert configs[0].configuration_id is not None flow_type = CONDITIONAL_FLOW_COMPONENT_ID phases = [ { 'id': 'extract_phase', 'name': 'Extract', 'description': 'Extract data', 'next': [{'id': 'extract_to_transform', 'name': 'Extract to Transform', 'goto': 'transform_phase'}], }, { 'id': 'transform_phase', 'name': 'Transform', 'description': 'Transform data', 'next': [{'id': 'transform_end', 'name': 'End Flow', 'goto': None}], }, ] tasks = [ { 'id': 'extract_task', 'name': 'Extract Task', 'phase': 'extract_phase', 'task': { 'type': 'job', 'componentId': configs[0].component_id, 'configId': configs[0].configuration_id, 'mode': 'run', }, }, { 'id': 'transform_task', 'name': 'Transform Task', 'phase': 'transform_phase', 'task': { 'type': 'job', 'componentId': configs[0].component_id, 'configId': configs[0].configuration_id, 'mode': 'run', }, }, ] flow_name = 'Integration Test Conditional Flow' flow_description = 'Conditional flow created by integration test.' created = await create_conditional_flow( ctx=mcp_context, name=flow_name, description=flow_description, phases=phases, tasks=tasks, ) flow_id = created.configuration_id client = KeboolaClient.from_state(mcp_context.session.state) links_manager = await ProjectLinksManager.from_client(client) expected_links = [ links_manager.get_flow_detail_link(flow_id=flow_id, flow_name=flow_name, flow_type=flow_type), links_manager.get_flows_dashboard_link(flow_type=flow_type), links_manager.get_flows_docs_link(), ] try: assert isinstance(created, FlowToolOutput) assert created.component_id == CONDITIONAL_FLOW_COMPONENT_ID assert created.description == flow_description assert created.success is True assert set(created.links) == set(expected_links) assert created.version is not None # Verify the flow is listed in the list_flows tool result = await list_flows(mcp_context) assert any(f.name == flow_name for f in result.flows) found = [f for f in result.flows if f.configuration_id == flow_id][0] flow = await get_flow(mcp_context, configuration_id=found.configuration_id) assert isinstance(flow, Flow) assert flow.component_id == CONDITIONAL_FLOW_COMPONENT_ID assert flow.configuration_id == found.configuration_id assert flow.configuration.phases[0].name == 'Extract' assert flow.configuration.phases[1].name == 'Transform' assert flow.configuration.tasks[0].task.component_id == configs[0].component_id assert set(flow.links) == set(expected_links) # Verify the metadata - check that KBC.MCP.createdBy is set to 'true' metadata = await client.storage_client.configuration_metadata_get( component_id=CONDITIONAL_FLOW_COMPONENT_ID, configuration_id=flow_id ) # Convert metadata list to dictionary for easier checking # metadata is a list of dicts with 'key' and 'value' keys assert isinstance(metadata, list) metadata_dict = {item['key']: item['value'] for item in metadata if isinstance(item, dict)} assert MetadataField.CREATED_BY_MCP in metadata_dict assert metadata_dict[MetadataField.CREATED_BY_MCP] == 'true' finally: await client.storage_client.configuration_delete( component_id=CONDITIONAL_FLOW_COMPONENT_ID, configuration_id=flow_id, skip_trash=True, ) @pytest.fixture def mcp_server(storage_api_url: str, storage_api_token: str, workspace_schema: str) -> FastMCP: config = Config(storage_api_url=storage_api_url, storage_token=storage_api_token, workspace_schema=workspace_schema) return create_server(config, runtime_info=ServerRuntimeInfo(transport='stdio')) @pytest_asyncio.fixture async def mcp_client(mcp_server: FastMCP) -> AsyncGenerator[Client, None]: async with Client(mcp_server) as client: yield client @pytest_asyncio.fixture async def initial_lf( mcp_client: Client, configs: list[ConfigDef], keboola_client: KeboolaClient ) -> AsyncGenerator[FlowToolOutput, None]: # Create the initial component configuration test data tool_result = await mcp_client.call_tool( name='create_flow', arguments={ 'name': 'Initial Test Flow', 'description': 'Initial test flow created by automated test', 'phases': [{'name': 'Phase1', 'dependsOn': [], 'description': 'First phase'}], 'tasks': [ { 'id': 20001, 'name': 'Task1', 'phase': 1, 'continueOnFailure': False, 'enabled': False, 'task': { 'componentId': configs[0].component_id, 'configId': configs[0].configuration_id, 'mode': 'run', }, } ], }, ) try: yield FlowToolOutput.model_validate(tool_result.structured_content) finally: # Clean up: Delete the configuration await keboola_client.storage_client.configuration_delete( component_id=ORCHESTRATOR_COMPONENT_ID, configuration_id=tool_result.structured_content['configuration_id'], skip_trash=True, ) @pytest_asyncio.fixture async def initial_cf( mcp_client: Client, configs: list[ConfigDef], keboola_client: KeboolaClient ) -> AsyncGenerator[FlowToolOutput, None]: # Create the initial component configuration test data tool_result = await mcp_client.call_tool( name='create_conditional_flow', arguments={ 'name': 'Initial Test Flow', 'description': 'Initial test flow created by automated test', 'phases': [ { 'id': 'phase1', 'name': 'Phase1', 'description': 'First phase', 'next': [{'id': 'phase1_end', 'name': 'End Flow', 'goto': None}], }, ], 'tasks': [ { 'id': 'task1', 'name': 'Task1', 'phase': 'phase1', 'task': { 'type': 'job', 'componentId': configs[0].component_id, 'configId': configs[0].configuration_id, 'mode': 'run', }, }, ], }, ) try: yield FlowToolOutput.model_validate(tool_result.structured_content) finally: # Clean up: Delete the configuration await keboola_client.storage_client.configuration_delete( component_id=CONDITIONAL_FLOW_COMPONENT_ID, configuration_id=tool_result.structured_content['configuration_id'], skip_trash=True, ) @pytest.mark.asyncio @pytest.mark.parametrize( ('flow_type', 'updates'), [ ( ORCHESTRATOR_COMPONENT_ID, { 'phases': [ {'id': 1, 'name': 'Phase1', 'dependsOn': [], 'description': 'First phase updated'}, {'id': 2, 'name': 'Phase2', 'dependsOn': [], 'description': 'Second phase added'}, ], 'tasks': [ { 'id': 20001, 'name': 'Task1 - Updated', 'phase': 1, 'continueOnFailure': False, 'enabled': False, 'task': {'componentId': 'ex-generic-v2', 'configId': 'test_config_001', 'mode': 'run'}, }, { 'id': 20002, 'name': 'Task2 - Added', 'phase': 2, 'continueOnFailure': False, 'enabled': False, 'task': {'componentId': 'ex-generic-v2', 'configId': 'test_config_002', 'mode': 'run'}, }, ], 'name': 'Updated Test Flow', 'description': 'The test flow updated by an automated test.', }, ), ( ORCHESTRATOR_COMPONENT_ID, { 'phases': [ {'id': 1, 'name': 'Phase1', 'dependsOn': [], 'description': 'First phase updated'}, {'id': 2, 'name': 'Phase2', 'dependsOn': [], 'description': 'Second phase added'}, ] }, ), ( ORCHESTRATOR_COMPONENT_ID, { 'tasks': [ { 'id': 20001, 'name': 'Task1 - Updated', 'phase': 1, 'continueOnFailure': False, 'enabled': False, 'task': {'componentId': 'ex-generic-v2', 'configId': 'test_config_001', 'mode': 'run'}, }, { 'id': 20002, 'name': 'Task2 - Added', 'phase': 1, 'continueOnFailure': False, 'enabled': False, 'task': {'componentId': 'ex-generic-v2', 'configId': 'test_config_002', 'mode': 'run'}, }, ] }, ), (ORCHESTRATOR_COMPONENT_ID, {'name': 'Updated just name'}), (ORCHESTRATOR_COMPONENT_ID, {'description': 'Updated just description'}), ( CONDITIONAL_FLOW_COMPONENT_ID, { 'phases': [ { 'id': 'phase1', 'name': 'Phase1', 'description': 'First phase updated', 'next': [{'id': 'phase1_end', 'name': 'End Flow', 'goto': None}], }, { 'id': 'phase2', 'name': 'Phase2', 'description': 'Second phase added', 'next': [{'id': 'phase1_end', 'name': 'End Flow', 'goto': None}], }, ], 'tasks': [ { 'id': 'task1', 'name': 'Task1 - Updated', 'phase': 'phase1', 'task': { 'type': 'job', 'componentId': 'ex-generic-v2', 'configId': 'test_config_001', 'mode': 'run', }, }, { 'id': 'task2', 'name': 'Task2 - Added', 'phase': 'phase2', 'task': { 'type': 'job', 'componentId': 'ex-generic-v2', 'configId': 'test_config_002', 'mode': 'run', }, }, ], }, ), ( CONDITIONAL_FLOW_COMPONENT_ID, { 'phases': [ { 'id': 'phase1', 'name': 'Phase1', 'description': 'First phase updated', 'next': [{'id': 'phase1_end', 'name': 'End Flow', 'goto': None}], }, { 'id': 'phase2', 'name': 'Phase2', 'description': 'Second phase added', 'next': [{'id': 'phase1_end', 'name': 'End Flow', 'goto': None}], }, ] }, ), ( CONDITIONAL_FLOW_COMPONENT_ID, { 'tasks': [ { 'id': 'task1', 'name': 'Task1 - Updated', 'phase': 'phase1', 'task': { 'type': 'job', 'componentId': 'ex-generic-v2', 'configId': 'test_config_001', 'mode': 'run', }, }, { 'id': 'task2', 'name': 'Task2 - Added', 'phase': 'phase1', 'task': { 'type': 'job', 'componentId': 'ex-generic-v2', 'configId': 'test_config_002', 'mode': 'run', }, }, ] }, ), (CONDITIONAL_FLOW_COMPONENT_ID, {'name': 'Updated just name'}), (CONDITIONAL_FLOW_COMPONENT_ID, {'description': 'Updated just description'}), ], ) async def test_update_flow( flow_type: FlowType, updates: dict[str, Any], initial_lf: FlowToolOutput, initial_cf: FlowToolOutput, mcp_client: Client, keboola_project: ProjectDef, keboola_client: KeboolaClient, ) -> None: """Tests that 'update_flow' tool works as expected.""" initial_flow = initial_lf if flow_type == ORCHESTRATOR_COMPONENT_ID else initial_cf project_id = keboola_project.project_id flow_id = initial_flow.configuration_id tool_result = await mcp_client.call_tool( name='update_flow', arguments={ 'configuration_id': flow_id, 'flow_type': flow_type, 'change_description': 'Integration test update', **updates, }, ) # Check the tool's output updated_flow = FlowToolOutput.model_validate(tool_result.structured_content) assert updated_flow.configuration_id == flow_id assert updated_flow.component_id == flow_type assert updated_flow.success is True assert updated_flow.timestamp is not None assert updated_flow.version is not None expected_name = updates.get('name') or 'Initial Test Flow' expected_description = updates.get('description') or initial_flow.description assert updated_flow.description == expected_description if flow_type == ORCHESTRATOR_COMPONENT_ID: flow_path = 'flows' flow_label = 'Flows' else: flow_path = 'flows-v2' flow_label = 'Conditional Flows' assert frozenset(updated_flow.links) == frozenset( [ Link( type='ui-detail', title=f'Flow: {expected_name}', url=f'https://connection.keboola.com/admin/projects/{project_id}/{flow_path}/{flow_id}', ), Link( type='ui-dashboard', title=f'{flow_label} in the project', url=f'https://connection.keboola.com/admin/projects/{project_id}/{flow_path}', ), Link(type='docs', title='Documentation for Keboola Flows', url='https://help.keboola.com/flows/'), ] ) # Verify the configuration was updated flow_detail = await keboola_client.storage_client.configuration_detail( component_id=flow_type, configuration_id=updated_flow.configuration_id ) assert flow_detail['name'] == expected_name assert flow_detail['description'] == expected_description flow_data = flow_detail.get('configuration') assert isinstance(flow_data, dict), f'Expecting dict, got: {type(flow_data)}' if (expected_phases := updates.get('phases')) is not None: assert flow_data['phases'] == expected_phases if (expected_tasks := updates.get('tasks')) is not None: assert flow_data['tasks'] == expected_tasks current_version = flow_detail['version'] assert isinstance(current_version, int), f'Expecting int, got: {type(current_version)}' assert current_version == 2 # Check that KBC.MCP.updatedBy.version.{version} is set to 'true' metadata = await keboola_client.storage_client.configuration_metadata_get( component_id=flow_type, configuration_id=updated_flow.configuration_id ) assert isinstance(metadata, list), f'Expecting list, got: {type(metadata)}' meta_key = f'{MetadataField.UPDATED_BY_MCP_PREFIX}{current_version}' meta_value = get_metadata_property(metadata, meta_key) assert meta_value == 'true' # Check that the original creation metadata is still there assert get_metadata_property(metadata, MetadataField.CREATED_BY_MCP) == 'true' @pytest.mark.asyncio async def test_list_flows_empty(mcp_context: Context) -> None: """ Retrieve flows when none exist (should not error, may return empty list). :param mcp_context: The test context fixture. """ flows = await list_flows(mcp_context) assert isinstance(flows, ListFlowsOutput) @pytest.mark.asyncio async def test_get_flow_schema(mcp_context: Context) -> None: """ Test that get_flow_schema returns the flow configuration JSON schema. Tests the conditional behavior where the tool might return a different schema than requested based on project settings. """ project_info = await get_project_info(mcp_context) # Test 1: Request orchestrator schema (should always work) legacy_flow_schema = await get_flow_schema(mcp_context, ORCHESTRATOR_COMPONENT_ID) assert isinstance(legacy_flow_schema, str) assert legacy_flow_schema.startswith('```json\n') assert legacy_flow_schema.endswith('\n```') assert 'dependsOn' in legacy_flow_schema # Extract and parse the JSON content to verify it's valid json_content = legacy_flow_schema[8:-4] # Remove ```json\n and \n``` parsed_legacy_schema = json.loads(json_content) # Verify basic schema structure for legacy flow assert isinstance(parsed_legacy_schema, dict) assert '$schema' in parsed_legacy_schema assert 'properties' in parsed_legacy_schema assert 'phases' in parsed_legacy_schema['properties'] assert 'tasks' in parsed_legacy_schema['properties'] # Test 2: Request conditional flow schema (behavior depends on project settings) conditional_schema = await get_flow_schema(mcp_context, CONDITIONAL_FLOW_COMPONENT_ID) assert isinstance(conditional_schema, str) assert conditional_schema.startswith('```json\n') assert conditional_schema.endswith('\n```') # Extract and parse the JSON content json_content = conditional_schema[8:-4] # Remove ```json\n and \n``` parsed_conditional_schema = json.loads(json_content) # Test 3: Verify the conditional behavior if not project_info.conditional_flows: # If the project does not support conditional flows, both requests should return the same schema assert legacy_flow_schema == conditional_schema LOG.info('Project has conditional flows disabled - both schemas are identical') else: # If conditional flows are enabled, the schemas should be different assert legacy_flow_schema != conditional_schema LOG.info('Project has conditional flows enabled - schemas are different') # Verify that the conditional schema has conditional-specific properties conditional_phases = parsed_conditional_schema['properties']['phases']['items']['properties'] assert 'next' in conditional_phases # Conditional flows use 'next' instead of 'dependsOn' conditional_tasks = parsed_conditional_schema['properties']['tasks']['items']['properties']['task'] assert 'oneOf' in conditional_tasks # Conditional flows have structured task types @pytest.mark.asyncio async def test_create_legacy_flow_invalid_structure(mcp_context: Context, configs: list[ConfigDef]) -> None: """ Create a legacy flow with invalid structure (should raise ValueError). :param mcp_context: The test context fixture. :param configs: List of real configuration definitions. """ assert configs assert configs[0].configuration_id is not None phases = [ {'name': 'Phase1', 'dependsOn': [99], 'description': 'Depends on non-existent phase'}, ] tasks = [ { 'name': 'Task1', 'phase': 1, 'task': { 'componentId': configs[0].component_id, 'configId': configs[0].configuration_id, }, }, ] with pytest.raises(ValueError, match='depends on non-existent phase'): await create_flow( ctx=mcp_context, name='Invalid Legacy Flow', description='Should fail', phases=phases, tasks=tasks, ) @pytest.mark.asyncio async def test_create_conditional_flow_invalid_structure(mcp_context: Context, configs: list[ConfigDef]) -> None: """ Create a conditional flow with invalid structure (should raise ValueError). :param mcp_context: The test context fixture. :param configs: List of real configuration definitions. """ assert configs assert configs[0].configuration_id is not None # Test invalid conditional flow structure - missing required fields and invalid types phases = [ { 'id': 123, # Invalid: should be string, not integer 'name': '', # Invalid: empty string not allowed 'next': [{'id': 'transition-1', 'goto': 'phase-2'}], } ] tasks = [ { 'id': 'task-1', 'name': 'Task1', 'phase': 'phase-1', 'enabled': True, 'task': { 'type': 'invalid_type', # Invalid: not one of job, notification, variable 'componentId': configs[0].component_id, 'configId': configs[0].configuration_id, 'mode': 'invalid_mode', # Invalid: should be 'run' }, } ] with pytest.raises(ValidationError): await create_conditional_flow( ctx=mcp_context, name='Invalid Conditional Flow', description='Should fail', phases=phases, tasks=tasks, ) @pytest.mark.asyncio async def test_flow_lifecycle_integration(mcp_context: Context, configs: list[ConfigDef]) -> None: """ Test complete flow lifecycle for both legacy and conditional flows. Creates flows, retrieves them individually, and lists all flows. Tests project-aware behavior based on conditional flows setting. """ assert configs assert configs[0].configuration_id is not None project_info = await get_project_info(mcp_context) # Test data for legacy flow legacy_phases = [ {'id': 1, 'name': 'Extract', 'description': 'Extract data from source', 'dependsOn': []}, {'id': 2, 'name': 'Load', 'description': 'Load data to destination', 'dependsOn': [1]}, ] legacy_tasks = [ { 'id': 20001, 'name': 'Extract from API', 'phase': 1, 'enabled': True, 'continueOnFailure': False, 'task': {'componentId': configs[0].component_id, 'configId': configs[0].configuration_id, 'mode': 'run'}, }, { 'id': 20002, 'name': 'Load to Warehouse', 'phase': 2, 'enabled': True, 'continueOnFailure': False, 'task': {'componentId': configs[0].component_id, 'configId': configs[0].configuration_id, 'mode': 'run'}, }, ] # Test data for conditional flow conditional_phases = [ { 'id': 'phase-1', 'name': 'Extract', 'description': 'Extract data from source', 'next': [{'id': 'transition-1', 'goto': 'phase-2'}], }, {'id': 'phase-2', 'name': 'Load', 'description': 'Load data to destination', 'next': []}, ] conditional_tasks = [ { 'id': 'task-1', 'name': 'Extract from API', 'phase': 'phase-1', 'enabled': True, 'task': { 'type': 'job', 'componentId': configs[0].component_id, 'configId': configs[0].configuration_id, 'mode': 'run', }, }, { 'id': 'task-2', 'name': 'Load to Warehouse', 'phase': 'phase-2', 'enabled': True, 'task': { 'type': 'job', 'componentId': configs[0].component_id, 'configId': configs[0].configuration_id, 'mode': 'run', }, }, ] created_flows = [] # Step 1: Create orchestrator flow (should always work) orchestrator_flow_name = 'Integration Test Orchestrator Flow' orchestrator_flow_description = 'Orchestrator flow created by integration test' orchestrator_result = await create_flow( ctx=mcp_context, name=orchestrator_flow_name, description=orchestrator_flow_description, phases=legacy_phases, tasks=legacy_tasks, ) assert isinstance(orchestrator_result, FlowToolOutput) assert orchestrator_result.success is True assert orchestrator_result.component_id == ORCHESTRATOR_COMPONENT_ID assert orchestrator_result.description == orchestrator_flow_description assert orchestrator_result.version is not None created_flows.append((ORCHESTRATOR_COMPONENT_ID, orchestrator_result.configuration_id)) # Step 2: Try to create conditional flow (only if project allows it) conditional_flow_name = 'Integration Test Conditional Flow' conditional_flow_description = 'Conditional flow created by integration test' if project_info.conditional_flows: conditional_result = await create_conditional_flow( ctx=mcp_context, name=conditional_flow_name, description=conditional_flow_description, phases=conditional_phases, tasks=conditional_tasks, ) assert isinstance(conditional_result, FlowToolOutput) assert conditional_result.success is True assert conditional_result.component_id == CONDITIONAL_FLOW_COMPONENT_ID assert conditional_result.description == conditional_flow_description assert conditional_result.version is not None created_flows.append((CONDITIONAL_FLOW_COMPONENT_ID, conditional_result.configuration_id)) else: LOG.info('Conditional flows are disabled in this project, skipping conditional flow creation') # Step 3: Get individual flows for flow_type, flow_id in created_flows: flow = await get_flow(mcp_context, configuration_id=flow_id) assert isinstance(flow, Flow) assert flow.configuration_id == flow_id if flow_type == ORCHESTRATOR_COMPONENT_ID: assert flow.name == orchestrator_flow_name assert flow.component_id == ORCHESTRATOR_COMPONENT_ID assert len(flow.configuration.phases) == 2 assert len(flow.configuration.tasks) == 2 assert flow.configuration.phases[0].name == 'Extract' assert flow.configuration.phases[1].name == 'Load' else: assert flow.name == conditional_flow_name assert flow.component_id == CONDITIONAL_FLOW_COMPONENT_ID assert len(flow.configuration.phases) == 2 assert len(flow.configuration.tasks) == 2 assert flow.configuration.phases[0].name == 'Extract' assert flow.configuration.phases[1].name == 'Load' # Step 4: List all flows and verify our created flows are there flows_list = await list_flows(mcp_context) assert isinstance(flows_list, ListFlowsOutput) assert len(flows_list.flows) >= len(created_flows) # Verify our created flows are in the list flow_ids = [flow.configuration_id for flow in flows_list.flows] for flow_type, flow_id in created_flows: assert flow_id in flow_ids, f'Created {flow_type} flow {flow_id} not found in flows list' # Step 5: Clean up - delete all created flows client = KeboolaClient.from_state(mcp_context.session.state) for flow_type, flow_id in created_flows: try: await client.storage_client.configuration_delete( component_id=flow_type, configuration_id=flow_id, skip_trash=True, ) LOG.info(f'Successfully deleted {flow_type} flow {flow_id}') except Exception as e: LOG.warning(f'Failed to delete {flow_type} flow {flow_id}: {e}')

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/keboola/keboola-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server