conftest.py•11.8 kB
import dataclasses
import json
import logging
import os
import time
import uuid
from dataclasses import dataclass
from pathlib import Path
from typing import Any, Generator
import pytest
from dotenv import load_dotenv
from fastmcp import Context
from kbcstorage.client import Client as SyncStorageClient
from mcp.server.session import ServerSession
from mcp.shared.context import RequestContext
from keboola_mcp_server.clients.client import KeboolaClient
from keboola_mcp_server.config import Config, ServerRuntimeInfo
from keboola_mcp_server.mcp import ServerState
from keboola_mcp_server.workspace import WorkspaceManager
LOG = logging.getLogger(__name__)
STORAGE_API_URL_ENV_VAR = 'INTEGTEST_STORAGE_API_URL'
STORAGE_API_TOKEN_ENV_VAR = 'INTEGTEST_STORAGE_TOKEN'
WORKSPACE_SCHEMA_ENV_VAR = 'INTEGTEST_WORKSPACE_SCHEMA'
# The second pair of token/schema for testing simultaneous access to two different projects.
STORAGE_API_TOKEN_ENV_VAR_2 = 'INTEGTEST_STORAGE_TOKEN_PRJ2'
WORKSPACE_SCHEMA_ENV_VAR_2 = 'INTEGTEST_WORKSPACE_SCHEMA_PRJ2'
# We reset dev environment variables to integtest values to ensure tests run locally using .env settings.
DEV_STORAGE_API_URL_ENV_VAR = 'STORAGE_API_URL'
DEV_STORAGE_TOKEN_ENV_VAR = 'KBC_STORAGE_TOKEN'
DEV_WORKSPACE_SCHEMA_ENV_VAR = 'KBC_WORKSPACE_SCHEMA'
@dataclass(frozen=True)
class BucketDef:
    bucket_id: str
    display_name: str
@dataclass(frozen=True)
class TableDef:
    bucket_id: str
    table_name: str
    table_id: str
    @property
    def file_path(self) -> Path:
        """
        Path to the CSV file containing the table data.
        """
        return _data_dir() / 'proj' / 'buckets' / self.bucket_id / f'{self.table_name}.csv'
@dataclass(frozen=True)
class ConfigDef:
    component_id: str
    configuration_id: str | None  # Will be generated by Storage API
    internal_id: str
    @property
    def file_path(self) -> Path:
        """
        Path to the JSON file containing the configuration.
        """
        return _data_dir() / 'proj' / 'configs' / self.component_id / f'{self.internal_id}.json'
@dataclass(frozen=True)
class ProjectDef:
    project_id: str
    buckets: list[BucketDef]
    tables: list[TableDef]
    configs: list[ConfigDef]
@pytest.fixture(scope='session')
def env_file_loaded() -> bool:
    return load_dotenv()
@pytest.fixture(scope='session')
def env_init(env_file_loaded: bool, storage_api_token: str, storage_api_url: str, workspace_schema: str) -> bool:
    # We reset the development environment variables to the values of the integtest environment variables.
    os.environ[DEV_STORAGE_API_URL_ENV_VAR] = storage_api_url
    os.environ[DEV_STORAGE_TOKEN_ENV_VAR] = storage_api_token
    os.environ[DEV_WORKSPACE_SCHEMA_ENV_VAR] = workspace_schema
    return env_file_loaded
def _data_dir() -> Path:
    return Path(__file__).parent / 'data'
@pytest.fixture(scope='session')
def storage_api_url(env_file_loaded: bool) -> str:
    storage_api_url = os.getenv(STORAGE_API_URL_ENV_VAR)
    assert storage_api_url, f'{STORAGE_API_URL_ENV_VAR} must be set'
    return storage_api_url
@pytest.fixture(scope='session')
def storage_api_token(env_file_loaded: bool) -> str:
    storage_api_token = os.getenv(STORAGE_API_TOKEN_ENV_VAR)
    assert storage_api_token, f'{STORAGE_API_TOKEN_ENV_VAR} must be set'
    return storage_api_token
@pytest.fixture(scope='session')
def mcp_config(storage_api_token: str, storage_api_url: str) -> Config:
    return Config(storage_api_url=storage_api_url, storage_token=storage_api_token)
@pytest.fixture(scope='session')
def workspace_schema(env_file_loaded: bool) -> str:
    workspace_schema = os.getenv(WORKSPACE_SCHEMA_ENV_VAR)
    assert workspace_schema, f'{WORKSPACE_SCHEMA_ENV_VAR} must be set'
    return workspace_schema
@pytest.fixture(scope='session')
def storage_api_token_2(env_file_loaded: bool) -> str | None:
    return os.getenv(STORAGE_API_TOKEN_ENV_VAR_2)
@pytest.fixture(scope='session')
def workspace_schema_2(env_file_loaded: bool) -> str | None:
    return os.getenv(WORKSPACE_SCHEMA_ENV_VAR_2)
@pytest.fixture(scope='session')
def shared_datadir_ro() -> Path:
    """
    Session-scoped access to shared data directory for integration tests.
    Do not modify the data in this directory.
    For function-scoped access to the data, use `shared_datadir` fixture provided by `pytest-datadir`,
    which creates a temporary copy of the data which can therefore be modified.
    """
    return _data_dir()
_BUCKET_DEFS_IN = [
    BucketDef(bucket_id='in.c-test_bucket_01', display_name='test_bucket_01'),
    BucketDef(bucket_id='in.c-test_bucket_02', display_name='test_bucket_02'),
]
def _create_buckets(storage_client: SyncStorageClient) -> list[BucketDef]:
    for bucket in _BUCKET_DEFS_IN:
        LOG.info(f'Creating bucket with display name={bucket.display_name}')
        created_bucket = storage_client.buckets.create(bucket.display_name)
        assert created_bucket['id'] == bucket.bucket_id
        assert created_bucket['displayName'] == bucket.display_name
    return _BUCKET_DEFS_IN
_TABLE_DEFS_IN = [
    TableDef(
        bucket_id='in.c-test_bucket_01',
        table_name='test_table_01',
        table_id='in.c-test_bucket_01.test_table_01',
    ),
]
def _create_tables(storage_client: SyncStorageClient) -> list[TableDef]:
    for table in _TABLE_DEFS_IN:
        LOG.info(f'Creating table with name={table.table_name}')
        created_table_id = storage_client.tables.create(
            bucket_id=table.bucket_id,
            name=table.table_name,
            file_path=str(table.file_path),
        )
        assert created_table_id == table.table_id
    return _TABLE_DEFS_IN
_CONFIG_DEFS_IN = [
    ConfigDef(
        component_id='ex-generic-v2',
        configuration_id=None,
        internal_id='test_config1',
    ),
    ConfigDef(
        component_id='keboola.snowflake-transformation',
        configuration_id=None,
        internal_id='test_config2',
    ),
]
def _create_configs(storage_client: SyncStorageClient) -> list[ConfigDef]:
    configs = []
    for config in _CONFIG_DEFS_IN:
        LOG.info(f'Creating config with internal ID={config.internal_id}')
        with config.file_path.open('r', encoding='utf-8') as cfg_file:
            created_config = storage_client.configurations.create(
                component_id=config.component_id,
                name=config.internal_id,
                configuration_id=None,
                configuration=json.load(cfg_file),
            )
        config = dataclasses.replace(config, configuration_id=created_config['id'])
        configs.append(config)
        LOG.info(f'Created config with component ID={config.component_id} and config ID={config.configuration_id}')
    return configs
def _sync_storage_client(storage_api_token: str, storage_api_url: str) -> SyncStorageClient:
    client = SyncStorageClient(storage_api_url, storage_api_token)
    token_info = client.tokens.verify()
    LOG.info(
        f'Authorized as "{token_info["description"]}" ({token_info["id"]}) '
        f'to project "{token_info["owner"]["name"]}" ({token_info["owner"]["id"]}) '
        f'at "{client.root_url}" stack.'
    )
    return client
@pytest.fixture(scope='session')
def keboola_project(env_init: bool, storage_api_token: str, storage_api_url: str) -> Generator[ProjectDef, Any, None]:
    """
    Sets up a Keboola project with items needed for integration tests,
    such as buckets, tables and configurations.
    After the tests, the project is cleaned up.
    """
    # Cannot use keboola_client fixture because it is function-scoped
    storage_client = _sync_storage_client(storage_api_token, storage_api_url)
    token_info = storage_client.tokens.verify()
    project_id: str = token_info['owner']['id']
    current_buckets = storage_client.buckets.list()
    if current_buckets:
        pytest.fail(f'Expecting empty Keboola project {project_id}, but found {len(current_buckets)} buckets')
    buckets = _create_buckets(storage_client)
    current_tables = storage_client.tables.list()
    if current_tables:
        pytest.fail(f'Expecting empty Keboola project {project_id}, but found {len(current_tables)} tables')
    tables = _create_tables(storage_client)
    current_configs = storage_client.configurations.list(component_id='ex-generic-v2')
    if current_configs:
        pytest.fail(f'Expecting empty Keboola project {project_id}, but found {len(current_configs)} configs')
    configs = _create_configs(storage_client)
    if 'global-search' in token_info['owner'].get('fetaures', []):
        # Give the global search time to catch up on the changes done in the testing project.
        # See https://help.keboola.com/management/global-search/#limitations for moe info.
        time.sleep(10)
    LOG.info(f'Test setup for project {project_id} complete')
    yield ProjectDef(project_id=project_id, buckets=buckets, tables=tables, configs=configs)
    LOG.info(f'Cleaning up Keboola project with ID={project_id}')
    current_buckets = storage_client.buckets.list()
    for bucket in current_buckets:
        bucket_id = bucket['id']
        LOG.info(f'Deleting bucket with ID={bucket_id}')
        storage_client.buckets.delete(bucket_id, force=True)
    for config in configs:
        LOG.info(f'Deleting config with component ID={config.component_id} and config ID={config.configuration_id}')
        storage_client.configurations.delete(config.component_id, config.configuration_id)
        # Double delete because the first delete moves the configuration to the trash
        storage_client.configurations.delete(config.component_id, config.configuration_id)
@pytest.fixture(scope='session')
def buckets(keboola_project: ProjectDef) -> list[BucketDef]:
    return keboola_project.buckets
@pytest.fixture(scope='session')
def tables(keboola_project: ProjectDef) -> list[TableDef]:
    return keboola_project.tables
@pytest.fixture(scope='session')
def configs(keboola_project: ProjectDef) -> list[ConfigDef]:
    return keboola_project.configs
@pytest.fixture
def sync_storage_client(storage_api_token: str, storage_api_url: str) -> SyncStorageClient:
    """Gets the ordinary (synchronous) client from the official Keboola SDK (i.e. `kbcstorage` package)."""
    return _sync_storage_client(storage_api_token, storage_api_url)
@pytest.fixture
def keboola_client(sync_storage_client: SyncStorageClient) -> KeboolaClient:
    return KeboolaClient(storage_api_token=sync_storage_client.token, storage_api_url=sync_storage_client.root_url)
@pytest.fixture
def unique_id() -> str:
    """Generates a unique ID string for test resources."""
    return str(uuid.uuid4())[:8]
@pytest.fixture
def workspace_manager(keboola_client: KeboolaClient, workspace_schema: str) -> WorkspaceManager:
    return WorkspaceManager(keboola_client, workspace_schema)
@pytest.fixture
def mcp_context(
    mocker,
    keboola_client: KeboolaClient,
    workspace_manager: WorkspaceManager,
    keboola_project: ProjectDef,
    mcp_config: Config,
) -> Context:
    """
    MCP context containing the Keboola client and workspace manager.
    """
    client_context = mocker.MagicMock(Context)
    client_context.session = mocker.MagicMock(ServerSession)
    # We set the user session state as it is done in the @with_session_state decorator
    client_context.session.state = {
        KeboolaClient.STATE_KEY: keboola_client,
        WorkspaceManager.STATE_KEY: workspace_manager,
    }
    client_context.session.client_params = None
    client_context.client_id = None
    client_context.session_id = None
    client_context.request_context = mocker.MagicMock(RequestContext)
    client_context.request_context.lifespan_context = ServerState(mcp_config, ServerRuntimeInfo(transport='stdio'))
    return client_context