Skip to main content
Glama

Keboola Explorer MCP Server

test_tools.py40.1 kB
import logging from typing import Any, AsyncGenerator, cast import pytest import pytest_asyncio from fastmcp import Client, Context, FastMCP from integtests.conftest import ConfigDef, ProjectDef from keboola_mcp_server.clients.client import KeboolaClient, get_metadata_property from keboola_mcp_server.config import Config, MetadataField, ServerRuntimeInfo from keboola_mcp_server.links import Link from keboola_mcp_server.server import create_server from keboola_mcp_server.tools.components import ( add_config_row, create_config, create_sql_transformation, get_component, get_config, get_config_examples, list_configs, ) from keboola_mcp_server.tools.components.model import ( Component, ComponentType, ComponentWithConfigurations, ConfigParamUpdate, ConfigToolOutput, Configuration, ListConfigsOutput, ) from keboola_mcp_server.tools.components.utils import ( TransformationConfiguration, expand_component_types, get_sql_transformation_id_from_sql_dialect, update_params, ) from keboola_mcp_server.workspace import WorkspaceManager LOG = logging.getLogger(__name__) @pytest.mark.asyncio async def test_get_config(mcp_context: Context, configs: list[ConfigDef]): """Tests that `get_config` returns a `Configuration` instance.""" for config in configs: assert config.configuration_id is not None configuration = await get_config( component_id=config.component_id, configuration_id=config.configuration_id, ctx=mcp_context ) assert isinstance(configuration, Configuration) assert configuration.component is not None assert configuration.component.component_id == config.component_id assert configuration.component.component_type is not None assert configuration.component.component_name is not None assert configuration.configuration_root is not None assert configuration.configuration_root.configuration_id == config.configuration_id assert configuration.configuration_root.component_id == config.component_id # Check links field assert configuration.links, 'Links list should not be empty.' for link in configuration.links: assert isinstance(link, Link) @pytest.mark.asyncio async def test_list_configs_by_ids(mcp_context: Context, configs: list[ConfigDef]): """Tests that `list_configs` returns components filtered by component IDs.""" # Get unique component IDs from test configs component_ids = list({config.component_id for config in configs}) assert len(component_ids) > 0 result = await list_configs(ctx=mcp_context, component_ids=component_ids) # Verify result structure and content assert isinstance(result, ListConfigsOutput) assert len(result.components_with_configurations) == len(component_ids) for item in result.components_with_configurations: assert isinstance(item, ComponentWithConfigurations) assert item.component.component_id in component_ids # Check that configurations belong to this component for config in item.configurations: assert config.configuration_root.component_id == item.component.component_id @pytest.mark.parametrize( ('component_types', 'expected_count'), [ (['extractor'], 1), (['transformation'], 1), (['application', 'extractor', 'transformation'], 2), ([], 2), ], ) @pytest.mark.asyncio async def test_list_configs_by_types( mcp_context: Context, configs: list[ConfigDef], component_types: list[ComponentType], expected_count: int ): """Tests that `list_configs` returns components filtered by component types.""" result = await list_configs(ctx=mcp_context, component_types=component_types) assert isinstance(result, ListConfigsOutput) assert sum(len(cmp.configurations) for cmp in result.components_with_configurations) == expected_count for item in result.components_with_configurations: assert isinstance(item, ComponentWithConfigurations) assert item.component.component_type in expand_component_types(component_types) @pytest.mark.asyncio async def test_create_config(mcp_context: Context, configs: list[ConfigDef], keboola_project: ProjectDef): """Tests that `create_config` creates a configuration with correct metadata.""" # Use the first component from configs for testing test_config = configs[0] component_id = test_config.component_id # Define test configuration data test_name = 'Test Configuration' test_description = 'Test configuration created by automated test' test_parameters = {} test_storage = {} client = KeboolaClient.from_state(mcp_context.session.state) project_id = keboola_project.project_id # Create the configuration created_config = await create_config( ctx=mcp_context, name=test_name, description=test_description, component_id=component_id, parameters=test_parameters, storage=test_storage, ) try: # Verify the response structure assert isinstance(created_config, ConfigToolOutput) assert created_config.component_id == component_id assert created_config.configuration_id is not None assert created_config.description == test_description assert created_config.success is True assert created_config.timestamp is not None assert created_config.version is not None assert frozenset(created_config.links) == frozenset( [ Link( type='ui-detail', title=f'Configuration: {test_name}', url=( f'https://connection.keboola.com/admin/projects/{project_id}/components/{component_id}/' + f'{created_config.configuration_id}' ), ), Link( type='ui-dashboard', title=f'{component_id} Configurations Dashboard', url=f'https://connection.keboola.com/admin/projects/{project_id}/components/{component_id}', ), ] ) # Verify the configuration exists in the backend by fetching it config_detail = await client.storage_client.configuration_detail( component_id=component_id, configuration_id=created_config.configuration_id ) assert config_detail['name'] == test_name assert config_detail['description'] == test_description assert 'configuration' in config_detail # Verify the parameters and storage were set correctly configuration_data = cast(dict, config_detail['configuration']) assert configuration_data['parameters'] == test_parameters assert configuration_data['storage'] == test_storage # Verify the metadata - check that KBC.MCP.createdBy is set to 'true' metadata = await client.storage_client.configuration_metadata_get( component_id=component_id, configuration_id=created_config.configuration_id ) # Convert metadata list to dictionary for easier checking # metadata is a list of dicts with 'key' and 'value' keys assert isinstance(metadata, list) metadata_dict = {item['key']: item['value'] for item in metadata if isinstance(item, dict)} assert MetadataField.CREATED_BY_MCP in metadata_dict assert metadata_dict[MetadataField.CREATED_BY_MCP] == 'true' finally: # Clean up: Delete the configuration await client.storage_client.configuration_delete( component_id=component_id, configuration_id=created_config.configuration_id, skip_trash=True, ) @pytest.fixture def mcp_server(storage_api_url: str, storage_api_token: str, workspace_schema: str) -> FastMCP: config = Config(storage_api_url=storage_api_url, storage_token=storage_api_token, workspace_schema=workspace_schema) server = create_server(config, runtime_info=ServerRuntimeInfo(transport='stdio')) assert isinstance(server, FastMCP) return server @pytest_asyncio.fixture async def mcp_client(mcp_server: FastMCP) -> AsyncGenerator[Client, None]: async with Client(mcp_server) as client: yield client @pytest_asyncio.fixture async def initial_cmpconf( mcp_client: Client, configs: list[ConfigDef], keboola_client: KeboolaClient ) -> AsyncGenerator[ConfigToolOutput, None]: # Create the initial component configuration test data tool_result = await mcp_client.call_tool( name='create_config', arguments={ 'name': 'Initial Test Configuration', 'description': 'Initial test configuration created by automated test', 'component_id': configs[0].component_id, 'parameters': {'initial_param': 'initial_value'}, 'storage': {'input': {'tables': [{'source': 'in.c-bucket.table', 'destination': 'input.csv'}]}}, }, ) try: yield ConfigToolOutput.model_validate(tool_result.structured_content) finally: # Clean up: Delete the configuration await keboola_client.storage_client.configuration_delete( component_id=configs[0].component_id, configuration_id=tool_result.structured_content['configuration_id'], skip_trash=True, ) @pytest.mark.asyncio @pytest.mark.parametrize( 'updates', [ { 'name': 'Updated Test Configuration', 'description': 'Updated test configuration by automated test', 'parameter_updates': [{'op': 'set', 'path': 'updated_param', 'new_val': 'updated_value'}], 'storage': {'output': {'tables': [{'source': 'output.csv', 'destination': 'out.c-bucket.table'}]}}, }, {'name': 'Updated just name'}, {'description': 'Updated just description'}, {'parameter_updates': [{'op': 'set', 'path': 'updated_param', 'new_val': 'Updated just parameters'}]}, {'storage': {'output': {'tables': [{'source': 'output.csv', 'destination': 'out.c-bucket.table'}]}}}, ], ) async def test_update_config( updates: dict[str, Any], initial_cmpconf: ConfigToolOutput, mcp_client: Client, keboola_project: ProjectDef, keboola_client: KeboolaClient, ): """Tests that `update_config` updates a configuration with correct metadata.""" project_id = keboola_project.project_id component_id = initial_cmpconf.component_id configuration_id = initial_cmpconf.configuration_id param_update_dicts = updates.get('parameter_updates') if param_update_dicts is not None: # Get the original configuration so we can compare the parameters orig_config = await keboola_client.storage_client.configuration_detail( component_id=component_id, configuration_id=configuration_id ) orig_parameters = cast(dict, orig_config.get('configuration', {}).get('parameters', {})) # Convert the parameter update dicts to ConfigParamUpdate objects from pydantic import TypeAdapter param_updates = [] for update_dict in param_update_dicts: update = TypeAdapter(ConfigParamUpdate).validate_python(update_dict) param_updates.append(update) tool_result = await mcp_client.call_tool( name='update_config', arguments={ 'change_description': 'Integration test update', 'component_id': component_id, 'configuration_id': configuration_id, **updates, }, ) # Check the tool's output update_result = ConfigToolOutput.model_validate(tool_result.structured_content) assert update_result.component_id == component_id assert update_result.configuration_id == configuration_id assert update_result.success is True assert update_result.timestamp is not None assert update_result.version is not None expected_name = updates.get('name') or 'Initial Test Configuration' expected_description = updates.get('description') or initial_cmpconf.description assert update_result.description == expected_description assert frozenset(update_result.links) == frozenset( [ Link( type='ui-detail', title=f'Configuration: {expected_name}', url='https://connection.keboola.com/admin' f'/projects/{project_id}/components/{component_id}/{configuration_id}', ), Link( type='ui-dashboard', title=f'{component_id} Configurations Dashboard', url=f'https://connection.keboola.com/admin/projects/{project_id}/components/{component_id}', ), ] ) # Verify the configuration was updated updated_config = await keboola_client.storage_client.configuration_detail( component_id=update_result.component_id, configuration_id=update_result.configuration_id ) assert updated_config['name'] == expected_name assert updated_config['description'] == expected_description updated_config_data = updated_config.get('configuration') assert isinstance(updated_config_data, dict), f'Expecting dict, got: {type(updated_config_data)}' if param_update_dicts is not None: expected_parameters = update_params(orig_parameters, param_updates) assert updated_config_data['parameters'] == expected_parameters if (expected_storage := updates.get('storage')) is not None: # Storage API might return more keys than what we set, so we check subset for k, v in expected_storage.items(): assert k in updated_config_data['storage'] assert updated_config_data['storage'][k] == v current_version = updated_config['version'] assert isinstance(current_version, int), f'Expecting int, got: {type(current_version)}' assert current_version == 2 # Check that KBC.MCP.updatedBy.version.{version} is set to 'true' metadata = await keboola_client.storage_client.configuration_metadata_get( component_id=update_result.component_id, configuration_id=update_result.configuration_id ) assert isinstance(metadata, list), f'Expecting list, got: {type(metadata)}' meta_key = f'{MetadataField.UPDATED_BY_MCP_PREFIX}{current_version}' meta_value = get_metadata_property(metadata, meta_key) assert meta_value == 'true' # Check that the original creation metadata is still there assert get_metadata_property(metadata, MetadataField.CREATED_BY_MCP) == 'true' @pytest.mark.asyncio async def test_add_config_row(mcp_context: Context, configs: list[ConfigDef], keboola_project: ProjectDef): """Tests that `add_config_row` creates a row configuration with correct metadata.""" # Use the first component from configs for testing test_config = configs[0] component_id = test_config.component_id # Define root configuration test data root_config_name = 'Root Configuration for Row Test' root_config_description = 'Root configuration created for row configuration test' root_config_parameters = {} root_config_storage = {} # Define row configuration test data row_name = 'Test Row Configuration' row_description = 'Test row configuration created by automated test' row_parameters = {'row_param': 'row_value'} row_storage = {} client = KeboolaClient.from_state(mcp_context.session.state) project_id = keboola_project.project_id # First create a root configuration to add row to root_config = await create_config( ctx=mcp_context, name=root_config_name, description=root_config_description, component_id=component_id, parameters=root_config_parameters, storage=root_config_storage, ) try: # Create the row configuration created_row_config = await add_config_row( ctx=mcp_context, name=row_name, description=row_description, component_id=component_id, configuration_id=root_config.configuration_id, parameters=row_parameters, storage=row_storage, ) assert isinstance(created_row_config, ConfigToolOutput) assert created_row_config.success is True assert created_row_config.timestamp is not None assert created_row_config.description == row_description assert created_row_config.component_id == component_id assert created_row_config.configuration_id == root_config.configuration_id assert created_row_config.version is not None assert frozenset(created_row_config.links) == frozenset( [ Link( type='ui-detail', title=f'Configuration: {row_name}', url=( f'https://connection.keboola.com/admin/projects/{project_id}/components/{component_id}/' + f'{root_config.configuration_id}' ), ), Link( type='ui-dashboard', title=f'{component_id} Configurations Dashboard', url=f'https://connection.keboola.com/admin/projects/{project_id}/components/{component_id}', ), ] ) # Verify the row configuration exists by fetching the root configuration and checking its rows config_detail = await client.storage_client.configuration_detail( component_id=component_id, configuration_id=root_config.configuration_id ) assert 'rows' in config_detail rows = cast(list, config_detail['rows']) assert len(rows) == 1 # Find the row we just created created_row = None for row in rows: if isinstance(row, dict) and row.get('name') == row_name: created_row = row break assert created_row is not None assert created_row['description'] == row_description assert 'configuration' in created_row # Verify the parameters and storage were set correctly row_configuration_data = cast(dict, created_row['configuration']) assert row_configuration_data['parameters'] == row_parameters assert row_configuration_data['storage'] == row_storage # Verify metadata was set for the parent configuration metadata = await client.storage_client.configuration_metadata_get( component_id=component_id, configuration_id=root_config.configuration_id ) assert isinstance(metadata, list) metadata_dict = {item['key']: item['value'] for item in metadata if isinstance(item, dict)} # The updated metadata should be present since we added a row to the configuration updated_by_md_keys = [ key for key in metadata_dict.keys() if isinstance(key, str) and key.startswith(MetadataField.UPDATED_BY_MCP_PREFIX) ] assert len(updated_by_md_keys) > 0 finally: # Delete the configuration (this will also delete the rows) await client.storage_client.configuration_delete( component_id=component_id, configuration_id=root_config.configuration_id, skip_trash=True, ) @pytest_asyncio.fixture async def initial_cmpconf_row( initial_cmpconf: ConfigToolOutput, mcp_client: Client, keboola_client: KeboolaClient ) -> ConfigToolOutput: # Create initial row configuration test data tool_result = await mcp_client.call_tool( name='add_config_row', arguments={ 'name': 'Initial Test Row Configuration', 'description': 'Initial row configuration for update test', 'component_id': initial_cmpconf.component_id, 'configuration_id': initial_cmpconf.configuration_id, 'parameters': {'initial_row_param': 'initial_row_value'}, 'storage': {}, }, ) return ConfigToolOutput.model_validate(tool_result.structured_content) @pytest.mark.asyncio @pytest.mark.parametrize( 'updates', [ { 'name': 'Updated Row Configuration', 'description': 'Updated row configuration by automated test', 'parameter_updates': [{'op': 'set', 'path': '$', 'new_val': {'updated_row_param': 'updated_row_value'}}], 'storage': {}, }, {'name': 'Updated just name'}, {'description': 'Updated just description'}, { 'parameter_updates': [ {'op': 'set', 'path': '$', 'new_val': {'updated_row_param': 'Updated just parameters'}} ] }, {'storage': {'output': {'tables': [{'source': 'output.csv', 'destination': 'out.c-bucket.table'}]}}}, ], ) async def test_update_config_row( updates: dict[str, Any], initial_cmpconf_row: ConfigToolOutput, mcp_client: Client, keboola_project: ProjectDef, keboola_client: KeboolaClient, ): """Tests that `update_config_row` updates a row configuration with correct metadata.""" project_id = keboola_project.project_id component_id = initial_cmpconf_row.component_id configuration_id = initial_cmpconf_row.configuration_id # Get the row ID from the configuration detail config_detail = await keboola_client.storage_client.configuration_detail( component_id=component_id, configuration_id=configuration_id ) rows = config_detail['rows'] assert isinstance(rows, list) assert len(rows) == 1 row_id = rows[0]['id'] tool_result = await mcp_client.call_tool( name='update_config_row', arguments={ 'change_description': 'Integration test update', 'component_id': component_id, 'configuration_id': configuration_id, 'configuration_row_id': row_id, **updates, }, ) # Check the tool's output updated_row_config = ConfigToolOutput.model_validate(tool_result.structured_content) assert updated_row_config.component_id == component_id assert updated_row_config.configuration_id == configuration_id assert updated_row_config.success is True assert updated_row_config.timestamp is not None assert updated_row_config.version is not None expected_row_name = updates.get('name') or 'Initial Test Row Configuration' expected_row_description = updates.get('description') or initial_cmpconf_row.description assert updated_row_config.description == expected_row_description assert frozenset(updated_row_config.links) == frozenset( [ Link( type='ui-detail', title=f'Configuration: {expected_row_name}', url='https://connection.keboola.com/admin' f'/projects/{project_id}/components/{component_id}/{configuration_id}', ), Link( type='ui-dashboard', title=f'{component_id} Configurations Dashboard', url=f'https://connection.keboola.com/admin/projects/{project_id}/components/{component_id}', ), ] ) # Verify the row configuration was updated row_config_detail = await keboola_client.storage_client.configuration_detail( component_id=updated_row_config.component_id, configuration_id=updated_row_config.configuration_id ) updated_rows = row_config_detail['rows'] assert isinstance(updated_rows, list), f'Expecting list, got: {type(updated_rows)}' # Find the updated row updated_row = next(filter(lambda x: x.get('id') == row_id, updated_rows), None) assert updated_row, f'No row for row_id: {row_id}' assert isinstance(updated_row, dict), f'Expecting dict, got: {type(updated_row)}' assert updated_row['name'] == expected_row_name assert updated_row['description'] == expected_row_description row_config_data = updated_row['configuration'] assert isinstance(row_config_data, dict), f'Expecting dict, got: {type(row_config_data)}' if (parameter_updates := updates.get('parameter_updates')) is not None: # Using the assumption that parameter_updates is a list with one element with 'set' operation on root path assert row_config_data['parameters'] == parameter_updates[0]['new_val'] if (expected_storage := updates.get('storage')) is not None: # Storage API might return more keys than what we set, so we check subset for k, v in expected_storage.items(): assert k in row_config_data['storage'] assert row_config_data['storage'][k] == v current_version = config_detail['version'] assert isinstance(current_version, int), f'Expecting int, got: {type(current_version)}' assert current_version == 2 # Check that KBC.MCP.updatedBy.version.{version} is set to 'true' metadata = await keboola_client.storage_client.configuration_metadata_get( component_id=updated_row_config.component_id, configuration_id=updated_row_config.configuration_id ) assert isinstance(metadata, list), f'Expecting list, got: {type(metadata)}' meta_key = f'{MetadataField.UPDATED_BY_MCP_PREFIX}{current_version}' meta_value = get_metadata_property(metadata, meta_key) assert meta_value == 'true' # Check that the original creation metadata is still there assert get_metadata_property(metadata, MetadataField.CREATED_BY_MCP) == 'true' @pytest.mark.asyncio async def test_create_sql_transformation(mcp_context: Context, keboola_project: ProjectDef): """Tests that `create_sql_transformation` creates a SQL transformation with correct configuration.""" test_name = 'Test SQL Transformation' test_description = 'Test SQL transformation created by automated test' # Define test SQL code blocks test_sql_code_blocks = [ TransformationConfiguration.Parameters.Block.Code( name='Main transformation', sql_statements=['SELECT 1 as test_column', 'SELECT 2 as another_column'] ) ] test_created_table_names = ['test_output_table'] client = KeboolaClient.from_state(mcp_context.session.state) # Create the SQL transformation created_transformation = await create_sql_transformation( ctx=mcp_context, name=test_name, description=test_description, sql_code_blocks=test_sql_code_blocks, created_table_names=test_created_table_names, ) sql_dialect = await WorkspaceManager.from_state(mcp_context.session.state).get_sql_dialect() expected_component_id = get_sql_transformation_id_from_sql_dialect(sql_dialect) project_id = keboola_project.project_id try: # Verify the response structure assert isinstance(created_transformation, ConfigToolOutput) assert created_transformation.success is True assert created_transformation.timestamp is not None assert created_transformation.description == test_description assert created_transformation.component_id == expected_component_id assert created_transformation.configuration_id is not None assert created_transformation.version is not None expected_links = frozenset( [ Link( type='ui-detail', title=f'Configuration: {test_name}', url=( f'https://connection.keboola.com/admin/projects/{project_id}/components/' f'{expected_component_id}/{created_transformation.configuration_id}' ), ), Link( type='ui-dashboard', title=f'{expected_component_id} Configurations Dashboard', url=( f'https://connection.keboola.com/admin/projects/{project_id}/components/' f'{expected_component_id}' ), ), ] ) assert frozenset(created_transformation.links) == expected_links # Verify the configuration exists in the backend by fetching it config_detail = await client.storage_client.configuration_detail( component_id=created_transformation.component_id, configuration_id=created_transformation.configuration_id ) assert config_detail['name'] == test_name assert config_detail['description'] == test_description assert 'configuration' in config_detail # Verify the configuration structure configuration_data = cast(dict, config_detail['configuration']) assert 'parameters' in configuration_data assert 'storage' in configuration_data # Verify the parameters structure parameters = configuration_data['parameters'] assert 'blocks' in parameters assert len(parameters['blocks']) == 1 block = parameters['blocks'][0] assert 'codes' in block assert len(block['codes']) == len(test_sql_code_blocks) code = block['codes'][0] assert code['name'] == test_sql_code_blocks[0].name assert 'script' in code # API uses 'script' instead of 'sql_statements' assert len(code['script']) == len(test_sql_code_blocks[0].sql_statements) assert code['script'][0] == test_sql_code_blocks[0].sql_statements[0] assert code['script'][1] == test_sql_code_blocks[0].sql_statements[1] # Verify the storage structure contains output tables storage = configuration_data['storage'] assert 'output' in storage assert 'tables' in storage['output'] assert len(storage['output']['tables']) == len(test_created_table_names) output_table = storage['output']['tables'][0] assert output_table['source'] == test_created_table_names[0] # Verify the metadata - check that KBC.MCP.createdBy is set to 'true' metadata = await client.storage_client.configuration_metadata_get( component_id=created_transformation.component_id, configuration_id=created_transformation.configuration_id ) # Convert metadata list to dictionary for easier checking assert isinstance(metadata, list) metadata_dict = {item['key']: item['value'] for item in metadata if isinstance(item, dict)} assert MetadataField.CREATED_BY_MCP in metadata_dict assert metadata_dict[MetadataField.CREATED_BY_MCP] == 'true' finally: # Clean up: Delete the configuration await client.storage_client.configuration_delete( component_id=created_transformation.component_id, configuration_id=created_transformation.configuration_id, skip_trash=True, ) @pytest_asyncio.fixture async def initial_sqltrfm( mcp_client: Client, configs: list[ConfigDef], keboola_client: KeboolaClient ) -> AsyncGenerator[ConfigToolOutput, None]: # Create the initial component configuration test data tool_result = await mcp_client.call_tool( name='create_sql_transformation', arguments={ 'name': 'Initial Test SQL Transformation', 'description': 'Initial SQL transformation for update test', 'sql_code_blocks': [{'name': 'Initial transformation', 'sql_statements': ['SELECT 1 as initial_column']}], 'created_table_names': ['initial_output_table'], }, ) try: yield ConfigToolOutput.model_validate(tool_result.structured_content) finally: # Clean up: Delete the configuration await keboola_client.storage_client.configuration_delete( component_id=tool_result.structured_content['component_id'], configuration_id=tool_result.structured_content['configuration_id'], skip_trash=True, ) @pytest.mark.asyncio @pytest.mark.parametrize( 'updates', [ { 'updated_description': 'Updated SQL transformation description', 'parameters': { 'blocks': [ { 'name': 'Updated block', 'codes': [ { 'name': 'Updated transformation', 'sql_statements': [ 'SELECT 1 as updated_column', 'SELECT 2 as additional_column', 'SELECT 3 as third_column', ], } ], } ] }, 'storage': { 'input': {'tables': [{'source': 'in.c-bucket.input_table', 'destination': 'input.csv'}]}, 'output': { 'tables': [ {'source': 'updated_output_table', 'destination': 'out.c-bucket.updated_output_table'}, {'source': 'second_output_table', 'destination': 'out.c-bucket.second_output_table'}, ] }, }, 'is_disabled': True, }, {'updated_description': 'Updated SQL transformation description'}, { 'parameters': { 'blocks': [ { 'name': 'Updated block', 'codes': [ { 'name': 'Updated transformation', 'sql_statements': [ 'SELECT 1 as updated_column', 'SELECT 2 as additional_column', 'SELECT 3 as third_column', ], } ], } ] } }, { 'storage': { 'input': {'tables': [{'source': 'in.c-bucket.input_table', 'destination': 'input.csv'}]}, 'output': { 'tables': [ {'source': 'updated_output_table', 'destination': 'out.c-bucket.updated_output_table'}, {'source': 'second_output_table', 'destination': 'out.c-bucket.second_output_table'}, ] }, } }, {'is_disabled': True}, ], ) async def test_update_sql_transformation( updates: dict[str, Any], initial_sqltrfm: ConfigToolOutput, mcp_client: Client, keboola_project: ProjectDef, keboola_client: KeboolaClient, ): """Tests that `update_sql_transformation` updates an existing SQL transformation correctly.""" project_id = keboola_project.project_id component_id = initial_sqltrfm.component_id configuration_id = initial_sqltrfm.configuration_id tool_result = await mcp_client.call_tool( name='update_sql_transformation', arguments={ 'change_description': 'Integration test update', 'configuration_id': configuration_id, **updates, }, ) # Check the tool's output updated_trfm = ConfigToolOutput.model_validate(tool_result.structured_content) assert updated_trfm.component_id == component_id assert updated_trfm.configuration_id == configuration_id assert updated_trfm.success is True assert updated_trfm.timestamp is not None assert updated_trfm.version is not None expected_name = updates.get('name') or 'Initial Test SQL Transformation' expected_description = updates.get('updated_description') or initial_sqltrfm.description assert updated_trfm.description == expected_description assert frozenset(updated_trfm.links) == frozenset( [ Link( type='ui-detail', title=f'Configuration: {expected_name}', url='https://connection.keboola.com/admin' f'/projects/{project_id}/components/{component_id}/{configuration_id}', ), Link( type='ui-dashboard', title=f'{component_id} Configurations Dashboard', url=f'https://connection.keboola.com/admin/projects/{project_id}/components/{component_id}', ), ] ) # Verify the transformation was updated trfm_detail = await keboola_client.storage_client.configuration_detail( component_id=updated_trfm.component_id, configuration_id=updated_trfm.configuration_id ) assert trfm_detail['name'] == expected_name assert trfm_detail['description'] == expected_description trfm_data = trfm_detail.get('configuration') assert isinstance(trfm_data, dict), f'Expecting dict, got: {type(trfm_data)}' actual_parameters = trfm_data.get('parameters') assert isinstance(actual_parameters, dict), f'Expecting dict, got: {type(actual_parameters)}' if (expected_parameters := updates.get('updated_parameters')) is not None: assert isinstance(expected_description, TransformationConfiguration.Parameters) assert actual_parameters == expected_parameters.model_dump() actual_storage = trfm_data.get('storage') assert isinstance(actual_storage, dict), f'Expecting dict, got: {type(actual_storage)}' if (expected_storage := updates.get('storage')) is not None: # Storage API might return more keys than what we set, so we check subset for k, v in expected_storage.items(): assert k in trfm_data['storage'] assert trfm_data['storage'][k] == v if (expected_is_disabled := updates.get('is_disabled')) is not None: assert trfm_detail['isDisabled'] == expected_is_disabled current_version = trfm_detail['version'] assert isinstance(current_version, int), f'Expecting int, got: {type(current_version)}' assert current_version == 2 # Check that KBC.MCP.updatedBy.version.{version} is set to 'true' metadata = await keboola_client.storage_client.configuration_metadata_get( component_id=updated_trfm.component_id, configuration_id=updated_trfm.configuration_id ) assert isinstance(metadata, list), f'Expecting list, got: {type(metadata)}' meta_key = f'{MetadataField.UPDATED_BY_MCP_PREFIX}{current_version}' meta_value = get_metadata_property(metadata, meta_key) assert meta_value == 'true' # Check that the original creation metadata is still there assert get_metadata_property(metadata, MetadataField.CREATED_BY_MCP) == 'true' @pytest.mark.asyncio async def test_get_component(mcp_context: Context, configs: list[ConfigDef]): """Tests that `get_component` returns component details.""" test_config = configs[0] component_id = test_config.component_id result = await get_component(component_id=component_id, ctx=mcp_context) assert isinstance(result, Component) assert result.component_id == test_config.component_id @pytest.mark.asyncio async def test_get_config_examples(mcp_context: Context, configs: list[ConfigDef]): """Tests that `get_config_examples` returns configuration examples in markdown format.""" test_config = configs[0] component_id = test_config.component_id result = await get_config_examples(component_id=component_id, ctx=mcp_context) # Verify the result is a markdown formatted string assert isinstance(result, str) assert f'# Configuration Examples for `{component_id}`' in result assert f'{component_id}`' in result assert 'parameters' in result @pytest.mark.asyncio async def test_get_config_examples_with_invalid_component(mcp_context: Context): """Tests that `get_config_examples` handles non-existent components properly.""" result = await get_config_examples(ctx=mcp_context, component_id='completely-non-existent-component-12345') assert result == ''

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/keboola/keboola-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server