Skip to main content
Glama

Keboola Explorer MCP Server

conftest.py15.2 kB
import asyncio import dataclasses import json import logging import os import random import time import uuid from contextlib import _AsyncGeneratorContextManager, asynccontextmanager from dataclasses import dataclass from multiprocessing import Process from pathlib import Path from typing import Any, AsyncGenerator, Callable, Generator, Literal import pytest from dotenv import load_dotenv from fastmcp import Client, Context, FastMCP from fastmcp.client.transports import SSETransport, StreamableHttpTransport from kbcstorage.client import Client as SyncStorageClient from mcp.server.session import ServerSession from mcp.shared.context import RequestContext from keboola_mcp_server.clients.client import KeboolaClient from keboola_mcp_server.config import Config, ServerRuntimeInfo from keboola_mcp_server.mcp import ServerState from keboola_mcp_server.workspace import WorkspaceManager AsyncContextServerRemoteRunner = Callable[ [FastMCP, Literal['sse', 'streamable-http']], _AsyncGeneratorContextManager[str] ] AsyncContextClientRunner = Callable[ [Literal['sse', 'streamable-http'], str, dict[str, str] | None], _AsyncGeneratorContextManager[Client] ] LOG = logging.getLogger(__name__) STORAGE_API_TOKEN_ENV_VAR = 'INTEGTEST_STORAGE_TOKEN' STORAGE_API_URL_ENV_VAR = 'INTEGTEST_STORAGE_API_URL' WORKSPACE_SCHEMA_ENV_VAR = 'INTEGTEST_WORKSPACE_SCHEMA' # We reset dev environment variables to integtest values to ensure tests run locally using .env settings. DEV_STORAGE_API_URL_ENV_VAR = 'STORAGE_API_URL' DEV_STORAGE_TOKEN_ENV_VAR = 'KBC_STORAGE_TOKEN' DEV_WORKSPACE_SCHEMA_ENV_VAR = 'KBC_WORKSPACE_SCHEMA' @dataclass(frozen=True) class BucketDef: bucket_id: str display_name: str @dataclass(frozen=True) class TableDef: bucket_id: str table_name: str table_id: str @property def file_path(self) -> Path: """ Path to the CSV file containing the table data. """ return _data_dir() / 'proj' / 'buckets' / self.bucket_id / f'{self.table_name}.csv' @dataclass(frozen=True) class ConfigDef: component_id: str configuration_id: str | None # Will be generated by Storage API internal_id: str @property def file_path(self) -> Path: """ Path to the JSON file containing the configuration. """ return _data_dir() / 'proj' / 'configs' / self.component_id / f'{self.internal_id}.json' @dataclass(frozen=True) class ProjectDef: project_id: str buckets: list[BucketDef] tables: list[TableDef] configs: list[ConfigDef] @pytest.fixture(scope='session') def env_file_loaded() -> bool: return load_dotenv() @pytest.fixture(scope='session') def env_init(env_file_loaded: bool, storage_api_token: str, storage_api_url: str, workspace_schema: str) -> bool: # We reset the development environment variables to the values of the integtest environment variables. os.environ[DEV_STORAGE_API_URL_ENV_VAR] = storage_api_url os.environ[DEV_STORAGE_TOKEN_ENV_VAR] = storage_api_token os.environ[DEV_WORKSPACE_SCHEMA_ENV_VAR] = workspace_schema return env_file_loaded def _data_dir() -> Path: return Path(__file__).parent / 'data' @pytest.fixture(scope='session') def storage_api_url(env_file_loaded: bool) -> str: storage_api_url = os.getenv(STORAGE_API_URL_ENV_VAR) assert storage_api_url, f'{STORAGE_API_URL_ENV_VAR} must be set' return storage_api_url @pytest.fixture(scope='session') def storage_api_token(env_file_loaded: bool) -> str: storage_api_token = os.getenv(STORAGE_API_TOKEN_ENV_VAR) assert storage_api_token, f'{STORAGE_API_TOKEN_ENV_VAR} must be set' return storage_api_token @pytest.fixture(scope='session') def mcp_config(storage_api_token: str, storage_api_url: str) -> Config: return Config(storage_api_url=storage_api_url, storage_token=storage_api_token) @pytest.fixture(scope='session') def workspace_schema(env_file_loaded: bool) -> str: workspace_schema = os.getenv(WORKSPACE_SCHEMA_ENV_VAR) assert workspace_schema, f'{WORKSPACE_SCHEMA_ENV_VAR} must be set' return workspace_schema @pytest.fixture(scope='session') def shared_datadir_ro() -> Path: """ Session-scoped access to shared data directory for integration tests. Do not modify the data in this directory. For function-scoped access to the data, use `shared_datadir` fixture provided by `pytest-datadir`, which creates a temporary copy of the data which can therefore be modified. """ return _data_dir() _BUCKET_DEFS_IN = [ BucketDef(bucket_id='in.c-test_bucket_01', display_name='test_bucket_01'), BucketDef(bucket_id='in.c-test_bucket_02', display_name='test_bucket_02'), ] def _create_buckets(storage_client: SyncStorageClient) -> list[BucketDef]: for bucket in _BUCKET_DEFS_IN: LOG.info(f'Creating bucket with display name={bucket.display_name}') created_bucket = storage_client.buckets.create(bucket.display_name) assert created_bucket['id'] == bucket.bucket_id assert created_bucket['displayName'] == bucket.display_name return _BUCKET_DEFS_IN _TABLE_DEFS_IN = [ TableDef( bucket_id='in.c-test_bucket_01', table_name='test_table_01', table_id='in.c-test_bucket_01.test_table_01', ), ] def _create_tables(storage_client: SyncStorageClient) -> list[TableDef]: for table in _TABLE_DEFS_IN: LOG.info(f'Creating table with name={table.table_name}') created_table_id = storage_client.tables.create( bucket_id=table.bucket_id, name=table.table_name, file_path=str(table.file_path), ) assert created_table_id == table.table_id return _TABLE_DEFS_IN _CONFIG_DEFS_IN = [ ConfigDef( component_id='ex-generic-v2', configuration_id=None, internal_id='test_config1', ), ConfigDef( component_id='keboola.snowflake-transformation', configuration_id=None, internal_id='test_config2', ), ] def _create_configs(storage_client: SyncStorageClient) -> list[ConfigDef]: configs = [] for config in _CONFIG_DEFS_IN: LOG.info(f'Creating config with internal ID={config.internal_id}') with config.file_path.open('r', encoding='utf-8') as cfg_file: created_config = storage_client.configurations.create( component_id=config.component_id, name=config.internal_id, configuration_id=None, configuration=json.load(cfg_file), ) config = dataclasses.replace(config, configuration_id=created_config['id']) configs.append(config) LOG.info(f'Created config with component ID={config.component_id} and config ID={config.configuration_id}') return configs def _sync_storage_client(storage_api_token: str, storage_api_url: str) -> SyncStorageClient: client = SyncStorageClient(storage_api_url, storage_api_token) token_info = client.tokens.verify() LOG.info( f'Authorized as "{token_info["description"]}" ({token_info["id"]}) ' f'to project "{token_info["owner"]["name"]}" ({token_info["owner"]["id"]}) ' f'at "{client.root_url}" stack.' ) return client @pytest.fixture(scope='session') def keboola_project(env_init: bool, storage_api_token: str, storage_api_url: str) -> Generator[ProjectDef, Any, None]: """ Sets up a Keboola project with items needed for integration tests, such as buckets, tables and configurations. After the tests, the project is cleaned up. """ # Cannot use keboola_client fixture because it is function-scoped storage_client = _sync_storage_client(storage_api_token, storage_api_url) token_info = storage_client.tokens.verify() project_id: str = token_info['owner']['id'] current_buckets = storage_client.buckets.list() if current_buckets: pytest.fail(f'Expecting empty Keboola project {project_id}, but found {len(current_buckets)} buckets') buckets = _create_buckets(storage_client) current_tables = storage_client.tables.list() if current_tables: pytest.fail(f'Expecting empty Keboola project {project_id}, but found {len(current_tables)} tables') tables = _create_tables(storage_client) current_configs = storage_client.configurations.list(component_id='ex-generic-v2') if current_configs: pytest.fail(f'Expecting empty Keboola project {project_id}, but found {len(current_configs)} configs') configs = _create_configs(storage_client) if 'global-search' in token_info['owner'].get('fetaures', []): # Give the global search time to catch up on the changes done in the testing project. # See https://help.keboola.com/management/global-search/#limitations for moe info. time.sleep(10) LOG.info(f'Test setup for project {project_id} complete') yield ProjectDef(project_id=project_id, buckets=buckets, tables=tables, configs=configs) LOG.info(f'Cleaning up Keboola project with ID={project_id}') current_buckets = storage_client.buckets.list() for bucket in current_buckets: bucket_id = bucket['id'] LOG.info(f'Deleting bucket with ID={bucket_id}') storage_client.buckets.delete(bucket_id, force=True) for config in configs: LOG.info(f'Deleting config with component ID={config.component_id} and config ID={config.configuration_id}') storage_client.configurations.delete(config.component_id, config.configuration_id) # Double delete because the first delete moves the configuration to the trash storage_client.configurations.delete(config.component_id, config.configuration_id) @pytest.fixture(scope='session') def buckets(keboola_project: ProjectDef) -> list[BucketDef]: return keboola_project.buckets @pytest.fixture(scope='session') def tables(keboola_project: ProjectDef) -> list[TableDef]: return keboola_project.tables @pytest.fixture(scope='session') def configs(keboola_project: ProjectDef) -> list[ConfigDef]: return keboola_project.configs @pytest.fixture def sync_storage_client(storage_api_token: str, storage_api_url: str) -> SyncStorageClient: """Gets the ordinary (synchronous) client from the official Keboola SDK (i.e. `kbcstorage` package).""" return _sync_storage_client(storage_api_token, storage_api_url) @pytest.fixture def keboola_client(sync_storage_client: SyncStorageClient) -> KeboolaClient: return KeboolaClient(storage_api_token=sync_storage_client.token, storage_api_url=sync_storage_client.root_url) @pytest.fixture def unique_id() -> str: """Generates a unique ID string for test resources.""" return str(uuid.uuid4())[:8] @pytest.fixture def workspace_manager(keboola_client: KeboolaClient, workspace_schema: str) -> WorkspaceManager: return WorkspaceManager(keboola_client, workspace_schema) @pytest.fixture def mcp_context( mocker, keboola_client: KeboolaClient, workspace_manager: WorkspaceManager, keboola_project: ProjectDef, mcp_config: Config, ) -> Context: """ MCP context containing the Keboola client and workspace manager. """ client_context = mocker.MagicMock(Context) client_context.session = mocker.MagicMock(ServerSession) # We set the user session state as it is done in the @with_session_state decorator client_context.session.state = { KeboolaClient.STATE_KEY: keboola_client, WorkspaceManager.STATE_KEY: workspace_manager, } client_context.session.client_params = None client_context.client_id = None client_context.session_id = None client_context.request_context = mocker.MagicMock(RequestContext) client_context.request_context.lifespan_context = ServerState(mcp_config, ServerRuntimeInfo(transport='stdio')) return client_context @pytest.fixture def run_server_remote() -> AsyncContextServerRemoteRunner: """ Fixture providing an async context manager to run the server in a subprocess. """ @asynccontextmanager async def _run_server_remote( server: FastMCP, transport: Literal['sse', 'streamable-http'] ) -> AsyncGenerator[str, None]: """ Run the server in a subprocess with async context manager which ensures that the server is properly closed after the test. :param server: The server to run. :param transport: The transport to use. :return: The url of the remote server. """ port = random.randint(8000, 9000) proc = Process(target=lambda: asyncio.run(server.run_async(transport=transport, port=port))) proc.start() if transport == 'sse': url = f'http://127.0.0.1:{port}/sse' else: url = f'http://127.0.0.1:{port}/mcp' LOG.info(f'Running MCP server in subprocess listening on {url} with {transport} transport.') try: await asyncio.sleep(1.0) # wait for the server to start yield url finally: LOG.info('Terminating MCP server subprocess.') proc.terminate() proc.join() return _run_server_remote @pytest.fixture def run_client() -> AsyncContextClientRunner: """Fixture providing an async context manager to use the client connected to the server url.""" @asynccontextmanager async def _run_client( transport: Literal['sse', 'streamable-http'], url: str, headers: dict[str, str] | None = None ) -> AsyncGenerator[Client, None]: """ Run the client in an async context manager which will ensure that the client is properly closed after the test. The client is created with the given transport and connected to the url of the remote server with which it communicates. :param transport: The transport of the server to which the client will be connected. :param url: The url of the remote server to which the client will be connected. :param headers: The headers to use for the client. :return: The Client connected to the remote server. """ if transport == 'sse': transport_explicit = SSETransport(url=url, headers=headers) else: transport_explicit = StreamableHttpTransport(url=url, headers=headers) client_explicit = Client(transport_explicit) exception_from_client = None LOG.info(f'Running MCP client connecting to {url} and expecting `{transport}` server transport.') try: async with client_explicit: try: yield client_explicit except Exception as e: LOG.error(f'Error in client TaskGroup: {e}') exception_from_client = e # we need to keep an exception from the client TaskGroup and raise it # in outside of the context manager otherwise it will inform only about task group error finally: del client_explicit if isinstance(exception_from_client, Exception): raise exception_from_client return _run_client

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/keboola/keboola-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server