Skip to main content
Glama

Keboola Explorer MCP Server

conftest.py11.8 kB
import dataclasses import json import logging import os import time import uuid from dataclasses import dataclass from pathlib import Path from typing import Any, Generator import pytest from dotenv import load_dotenv from fastmcp import Context from kbcstorage.client import Client as SyncStorageClient from mcp.server.session import ServerSession from mcp.shared.context import RequestContext from keboola_mcp_server.clients.client import KeboolaClient from keboola_mcp_server.config import Config, ServerRuntimeInfo from keboola_mcp_server.mcp import ServerState from keboola_mcp_server.workspace import WorkspaceManager LOG = logging.getLogger(__name__) STORAGE_API_URL_ENV_VAR = 'INTEGTEST_STORAGE_API_URL' STORAGE_API_TOKEN_ENV_VAR = 'INTEGTEST_STORAGE_TOKEN' WORKSPACE_SCHEMA_ENV_VAR = 'INTEGTEST_WORKSPACE_SCHEMA' # The second pair of token/schema for testing simultaneous access to two different projects. STORAGE_API_TOKEN_ENV_VAR_2 = 'INTEGTEST_STORAGE_TOKEN_PRJ2' WORKSPACE_SCHEMA_ENV_VAR_2 = 'INTEGTEST_WORKSPACE_SCHEMA_PRJ2' # We reset dev environment variables to integtest values to ensure tests run locally using .env settings. DEV_STORAGE_API_URL_ENV_VAR = 'STORAGE_API_URL' DEV_STORAGE_TOKEN_ENV_VAR = 'KBC_STORAGE_TOKEN' DEV_WORKSPACE_SCHEMA_ENV_VAR = 'KBC_WORKSPACE_SCHEMA' @dataclass(frozen=True) class BucketDef: bucket_id: str display_name: str @dataclass(frozen=True) class TableDef: bucket_id: str table_name: str table_id: str @property def file_path(self) -> Path: """ Path to the CSV file containing the table data. """ return _data_dir() / 'proj' / 'buckets' / self.bucket_id / f'{self.table_name}.csv' @dataclass(frozen=True) class ConfigDef: component_id: str configuration_id: str | None # Will be generated by Storage API internal_id: str @property def file_path(self) -> Path: """ Path to the JSON file containing the configuration. """ return _data_dir() / 'proj' / 'configs' / self.component_id / f'{self.internal_id}.json' @dataclass(frozen=True) class ProjectDef: project_id: str buckets: list[BucketDef] tables: list[TableDef] configs: list[ConfigDef] @pytest.fixture(scope='session') def env_file_loaded() -> bool: return load_dotenv() @pytest.fixture(scope='session') def env_init(env_file_loaded: bool, storage_api_token: str, storage_api_url: str, workspace_schema: str) -> bool: # We reset the development environment variables to the values of the integtest environment variables. os.environ[DEV_STORAGE_API_URL_ENV_VAR] = storage_api_url os.environ[DEV_STORAGE_TOKEN_ENV_VAR] = storage_api_token os.environ[DEV_WORKSPACE_SCHEMA_ENV_VAR] = workspace_schema return env_file_loaded def _data_dir() -> Path: return Path(__file__).parent / 'data' @pytest.fixture(scope='session') def storage_api_url(env_file_loaded: bool) -> str: storage_api_url = os.getenv(STORAGE_API_URL_ENV_VAR) assert storage_api_url, f'{STORAGE_API_URL_ENV_VAR} must be set' return storage_api_url @pytest.fixture(scope='session') def storage_api_token(env_file_loaded: bool) -> str: storage_api_token = os.getenv(STORAGE_API_TOKEN_ENV_VAR) assert storage_api_token, f'{STORAGE_API_TOKEN_ENV_VAR} must be set' return storage_api_token @pytest.fixture(scope='session') def mcp_config(storage_api_token: str, storage_api_url: str) -> Config: return Config(storage_api_url=storage_api_url, storage_token=storage_api_token) @pytest.fixture(scope='session') def workspace_schema(env_file_loaded: bool) -> str: workspace_schema = os.getenv(WORKSPACE_SCHEMA_ENV_VAR) assert workspace_schema, f'{WORKSPACE_SCHEMA_ENV_VAR} must be set' return workspace_schema @pytest.fixture(scope='session') def storage_api_token_2(env_file_loaded: bool) -> str | None: return os.getenv(STORAGE_API_TOKEN_ENV_VAR_2) @pytest.fixture(scope='session') def workspace_schema_2(env_file_loaded: bool) -> str | None: return os.getenv(WORKSPACE_SCHEMA_ENV_VAR_2) @pytest.fixture(scope='session') def shared_datadir_ro() -> Path: """ Session-scoped access to shared data directory for integration tests. Do not modify the data in this directory. For function-scoped access to the data, use `shared_datadir` fixture provided by `pytest-datadir`, which creates a temporary copy of the data which can therefore be modified. """ return _data_dir() _BUCKET_DEFS_IN = [ BucketDef(bucket_id='in.c-test_bucket_01', display_name='test_bucket_01'), BucketDef(bucket_id='in.c-test_bucket_02', display_name='test_bucket_02'), ] def _create_buckets(storage_client: SyncStorageClient) -> list[BucketDef]: for bucket in _BUCKET_DEFS_IN: LOG.info(f'Creating bucket with display name={bucket.display_name}') created_bucket = storage_client.buckets.create(bucket.display_name) assert created_bucket['id'] == bucket.bucket_id assert created_bucket['displayName'] == bucket.display_name return _BUCKET_DEFS_IN _TABLE_DEFS_IN = [ TableDef( bucket_id='in.c-test_bucket_01', table_name='test_table_01', table_id='in.c-test_bucket_01.test_table_01', ), ] def _create_tables(storage_client: SyncStorageClient) -> list[TableDef]: for table in _TABLE_DEFS_IN: LOG.info(f'Creating table with name={table.table_name}') created_table_id = storage_client.tables.create( bucket_id=table.bucket_id, name=table.table_name, file_path=str(table.file_path), ) assert created_table_id == table.table_id return _TABLE_DEFS_IN _CONFIG_DEFS_IN = [ ConfigDef( component_id='ex-generic-v2', configuration_id=None, internal_id='test_config1', ), ConfigDef( component_id='keboola.snowflake-transformation', configuration_id=None, internal_id='test_config2', ), ] def _create_configs(storage_client: SyncStorageClient) -> list[ConfigDef]: configs = [] for config in _CONFIG_DEFS_IN: LOG.info(f'Creating config with internal ID={config.internal_id}') with config.file_path.open('r', encoding='utf-8') as cfg_file: created_config = storage_client.configurations.create( component_id=config.component_id, name=config.internal_id, configuration_id=None, configuration=json.load(cfg_file), ) config = dataclasses.replace(config, configuration_id=created_config['id']) configs.append(config) LOG.info(f'Created config with component ID={config.component_id} and config ID={config.configuration_id}') return configs def _sync_storage_client(storage_api_token: str, storage_api_url: str) -> SyncStorageClient: client = SyncStorageClient(storage_api_url, storage_api_token) token_info = client.tokens.verify() LOG.info( f'Authorized as "{token_info["description"]}" ({token_info["id"]}) ' f'to project "{token_info["owner"]["name"]}" ({token_info["owner"]["id"]}) ' f'at "{client.root_url}" stack.' ) return client @pytest.fixture(scope='session') def keboola_project(env_init: bool, storage_api_token: str, storage_api_url: str) -> Generator[ProjectDef, Any, None]: """ Sets up a Keboola project with items needed for integration tests, such as buckets, tables and configurations. After the tests, the project is cleaned up. """ # Cannot use keboola_client fixture because it is function-scoped storage_client = _sync_storage_client(storage_api_token, storage_api_url) token_info = storage_client.tokens.verify() project_id: str = token_info['owner']['id'] current_buckets = storage_client.buckets.list() if current_buckets: pytest.fail(f'Expecting empty Keboola project {project_id}, but found {len(current_buckets)} buckets') buckets = _create_buckets(storage_client) current_tables = storage_client.tables.list() if current_tables: pytest.fail(f'Expecting empty Keboola project {project_id}, but found {len(current_tables)} tables') tables = _create_tables(storage_client) current_configs = storage_client.configurations.list(component_id='ex-generic-v2') if current_configs: pytest.fail(f'Expecting empty Keboola project {project_id}, but found {len(current_configs)} configs') configs = _create_configs(storage_client) if 'global-search' in token_info['owner'].get('fetaures', []): # Give the global search time to catch up on the changes done in the testing project. # See https://help.keboola.com/management/global-search/#limitations for moe info. time.sleep(10) LOG.info(f'Test setup for project {project_id} complete') yield ProjectDef(project_id=project_id, buckets=buckets, tables=tables, configs=configs) LOG.info(f'Cleaning up Keboola project with ID={project_id}') current_buckets = storage_client.buckets.list() for bucket in current_buckets: bucket_id = bucket['id'] LOG.info(f'Deleting bucket with ID={bucket_id}') storage_client.buckets.delete(bucket_id, force=True) for config in configs: LOG.info(f'Deleting config with component ID={config.component_id} and config ID={config.configuration_id}') storage_client.configurations.delete(config.component_id, config.configuration_id) # Double delete because the first delete moves the configuration to the trash storage_client.configurations.delete(config.component_id, config.configuration_id) @pytest.fixture(scope='session') def buckets(keboola_project: ProjectDef) -> list[BucketDef]: return keboola_project.buckets @pytest.fixture(scope='session') def tables(keboola_project: ProjectDef) -> list[TableDef]: return keboola_project.tables @pytest.fixture(scope='session') def configs(keboola_project: ProjectDef) -> list[ConfigDef]: return keboola_project.configs @pytest.fixture def sync_storage_client(storage_api_token: str, storage_api_url: str) -> SyncStorageClient: """Gets the ordinary (synchronous) client from the official Keboola SDK (i.e. `kbcstorage` package).""" return _sync_storage_client(storage_api_token, storage_api_url) @pytest.fixture def keboola_client(sync_storage_client: SyncStorageClient) -> KeboolaClient: return KeboolaClient(storage_api_token=sync_storage_client.token, storage_api_url=sync_storage_client.root_url) @pytest.fixture def unique_id() -> str: """Generates a unique ID string for test resources.""" return str(uuid.uuid4())[:8] @pytest.fixture def workspace_manager(keboola_client: KeboolaClient, workspace_schema: str) -> WorkspaceManager: return WorkspaceManager(keboola_client, workspace_schema) @pytest.fixture def mcp_context( mocker, keboola_client: KeboolaClient, workspace_manager: WorkspaceManager, keboola_project: ProjectDef, mcp_config: Config, ) -> Context: """ MCP context containing the Keboola client and workspace manager. """ client_context = mocker.MagicMock(Context) client_context.session = mocker.MagicMock(ServerSession) # We set the user session state as it is done in the @with_session_state decorator client_context.session.state = { KeboolaClient.STATE_KEY: keboola_client, WorkspaceManager.STATE_KEY: workspace_manager, } client_context.session.client_params = None client_context.client_id = None client_context.session_id = None client_context.request_context = mocker.MagicMock(RequestContext) client_context.request_context.lifespan_context = ServerState(mcp_config, ServerRuntimeInfo(transport='stdio')) return client_context

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/keboola/keboola-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server