conftest.py•15.2 kB
import asyncio
import dataclasses
import json
import logging
import os
import random
import time
import uuid
from contextlib import _AsyncGeneratorContextManager, asynccontextmanager
from dataclasses import dataclass
from multiprocessing import Process
from pathlib import Path
from typing import Any, AsyncGenerator, Callable, Generator, Literal
import pytest
from dotenv import load_dotenv
from fastmcp import Client, Context, FastMCP
from fastmcp.client.transports import SSETransport, StreamableHttpTransport
from kbcstorage.client import Client as SyncStorageClient
from mcp.server.session import ServerSession
from mcp.shared.context import RequestContext
from keboola_mcp_server.clients.client import KeboolaClient
from keboola_mcp_server.config import Config, ServerRuntimeInfo
from keboola_mcp_server.mcp import ServerState
from keboola_mcp_server.workspace import WorkspaceManager
AsyncContextServerRemoteRunner = Callable[
[FastMCP, Literal['sse', 'streamable-http']], _AsyncGeneratorContextManager[str]
]
AsyncContextClientRunner = Callable[
[Literal['sse', 'streamable-http'], str, dict[str, str] | None], _AsyncGeneratorContextManager[Client]
]
LOG = logging.getLogger(__name__)
STORAGE_API_TOKEN_ENV_VAR = 'INTEGTEST_STORAGE_TOKEN'
STORAGE_API_URL_ENV_VAR = 'INTEGTEST_STORAGE_API_URL'
WORKSPACE_SCHEMA_ENV_VAR = 'INTEGTEST_WORKSPACE_SCHEMA'
# We reset dev environment variables to integtest values to ensure tests run locally using .env settings.
DEV_STORAGE_API_URL_ENV_VAR = 'STORAGE_API_URL'
DEV_STORAGE_TOKEN_ENV_VAR = 'KBC_STORAGE_TOKEN'
DEV_WORKSPACE_SCHEMA_ENV_VAR = 'KBC_WORKSPACE_SCHEMA'
@dataclass(frozen=True)
class BucketDef:
bucket_id: str
display_name: str
@dataclass(frozen=True)
class TableDef:
bucket_id: str
table_name: str
table_id: str
@property
def file_path(self) -> Path:
"""
Path to the CSV file containing the table data.
"""
return _data_dir() / 'proj' / 'buckets' / self.bucket_id / f'{self.table_name}.csv'
@dataclass(frozen=True)
class ConfigDef:
component_id: str
configuration_id: str | None # Will be generated by Storage API
internal_id: str
@property
def file_path(self) -> Path:
"""
Path to the JSON file containing the configuration.
"""
return _data_dir() / 'proj' / 'configs' / self.component_id / f'{self.internal_id}.json'
@dataclass(frozen=True)
class ProjectDef:
project_id: str
buckets: list[BucketDef]
tables: list[TableDef]
configs: list[ConfigDef]
@pytest.fixture(scope='session')
def env_file_loaded() -> bool:
return load_dotenv()
@pytest.fixture(scope='session')
def env_init(env_file_loaded: bool, storage_api_token: str, storage_api_url: str, workspace_schema: str) -> bool:
# We reset the development environment variables to the values of the integtest environment variables.
os.environ[DEV_STORAGE_API_URL_ENV_VAR] = storage_api_url
os.environ[DEV_STORAGE_TOKEN_ENV_VAR] = storage_api_token
os.environ[DEV_WORKSPACE_SCHEMA_ENV_VAR] = workspace_schema
return env_file_loaded
def _data_dir() -> Path:
return Path(__file__).parent / 'data'
@pytest.fixture(scope='session')
def storage_api_url(env_file_loaded: bool) -> str:
storage_api_url = os.getenv(STORAGE_API_URL_ENV_VAR)
assert storage_api_url, f'{STORAGE_API_URL_ENV_VAR} must be set'
return storage_api_url
@pytest.fixture(scope='session')
def storage_api_token(env_file_loaded: bool) -> str:
storage_api_token = os.getenv(STORAGE_API_TOKEN_ENV_VAR)
assert storage_api_token, f'{STORAGE_API_TOKEN_ENV_VAR} must be set'
return storage_api_token
@pytest.fixture(scope='session')
def mcp_config(storage_api_token: str, storage_api_url: str) -> Config:
return Config(storage_api_url=storage_api_url, storage_token=storage_api_token)
@pytest.fixture(scope='session')
def workspace_schema(env_file_loaded: bool) -> str:
workspace_schema = os.getenv(WORKSPACE_SCHEMA_ENV_VAR)
assert workspace_schema, f'{WORKSPACE_SCHEMA_ENV_VAR} must be set'
return workspace_schema
@pytest.fixture(scope='session')
def shared_datadir_ro() -> Path:
"""
Session-scoped access to shared data directory for integration tests.
Do not modify the data in this directory.
For function-scoped access to the data, use `shared_datadir` fixture provided by `pytest-datadir`,
which creates a temporary copy of the data which can therefore be modified.
"""
return _data_dir()
_BUCKET_DEFS_IN = [
BucketDef(bucket_id='in.c-test_bucket_01', display_name='test_bucket_01'),
BucketDef(bucket_id='in.c-test_bucket_02', display_name='test_bucket_02'),
]
def _create_buckets(storage_client: SyncStorageClient) -> list[BucketDef]:
for bucket in _BUCKET_DEFS_IN:
LOG.info(f'Creating bucket with display name={bucket.display_name}')
created_bucket = storage_client.buckets.create(bucket.display_name)
assert created_bucket['id'] == bucket.bucket_id
assert created_bucket['displayName'] == bucket.display_name
return _BUCKET_DEFS_IN
_TABLE_DEFS_IN = [
TableDef(
bucket_id='in.c-test_bucket_01',
table_name='test_table_01',
table_id='in.c-test_bucket_01.test_table_01',
),
]
def _create_tables(storage_client: SyncStorageClient) -> list[TableDef]:
for table in _TABLE_DEFS_IN:
LOG.info(f'Creating table with name={table.table_name}')
created_table_id = storage_client.tables.create(
bucket_id=table.bucket_id,
name=table.table_name,
file_path=str(table.file_path),
)
assert created_table_id == table.table_id
return _TABLE_DEFS_IN
_CONFIG_DEFS_IN = [
ConfigDef(
component_id='ex-generic-v2',
configuration_id=None,
internal_id='test_config1',
),
ConfigDef(
component_id='keboola.snowflake-transformation',
configuration_id=None,
internal_id='test_config2',
),
]
def _create_configs(storage_client: SyncStorageClient) -> list[ConfigDef]:
configs = []
for config in _CONFIG_DEFS_IN:
LOG.info(f'Creating config with internal ID={config.internal_id}')
with config.file_path.open('r', encoding='utf-8') as cfg_file:
created_config = storage_client.configurations.create(
component_id=config.component_id,
name=config.internal_id,
configuration_id=None,
configuration=json.load(cfg_file),
)
config = dataclasses.replace(config, configuration_id=created_config['id'])
configs.append(config)
LOG.info(f'Created config with component ID={config.component_id} and config ID={config.configuration_id}')
return configs
def _sync_storage_client(storage_api_token: str, storage_api_url: str) -> SyncStorageClient:
client = SyncStorageClient(storage_api_url, storage_api_token)
token_info = client.tokens.verify()
LOG.info(
f'Authorized as "{token_info["description"]}" ({token_info["id"]}) '
f'to project "{token_info["owner"]["name"]}" ({token_info["owner"]["id"]}) '
f'at "{client.root_url}" stack.'
)
return client
@pytest.fixture(scope='session')
def keboola_project(env_init: bool, storage_api_token: str, storage_api_url: str) -> Generator[ProjectDef, Any, None]:
"""
Sets up a Keboola project with items needed for integration tests,
such as buckets, tables and configurations.
After the tests, the project is cleaned up.
"""
# Cannot use keboola_client fixture because it is function-scoped
storage_client = _sync_storage_client(storage_api_token, storage_api_url)
token_info = storage_client.tokens.verify()
project_id: str = token_info['owner']['id']
current_buckets = storage_client.buckets.list()
if current_buckets:
pytest.fail(f'Expecting empty Keboola project {project_id}, but found {len(current_buckets)} buckets')
buckets = _create_buckets(storage_client)
current_tables = storage_client.tables.list()
if current_tables:
pytest.fail(f'Expecting empty Keboola project {project_id}, but found {len(current_tables)} tables')
tables = _create_tables(storage_client)
current_configs = storage_client.configurations.list(component_id='ex-generic-v2')
if current_configs:
pytest.fail(f'Expecting empty Keboola project {project_id}, but found {len(current_configs)} configs')
configs = _create_configs(storage_client)
if 'global-search' in token_info['owner'].get('fetaures', []):
# Give the global search time to catch up on the changes done in the testing project.
# See https://help.keboola.com/management/global-search/#limitations for moe info.
time.sleep(10)
LOG.info(f'Test setup for project {project_id} complete')
yield ProjectDef(project_id=project_id, buckets=buckets, tables=tables, configs=configs)
LOG.info(f'Cleaning up Keboola project with ID={project_id}')
current_buckets = storage_client.buckets.list()
for bucket in current_buckets:
bucket_id = bucket['id']
LOG.info(f'Deleting bucket with ID={bucket_id}')
storage_client.buckets.delete(bucket_id, force=True)
for config in configs:
LOG.info(f'Deleting config with component ID={config.component_id} and config ID={config.configuration_id}')
storage_client.configurations.delete(config.component_id, config.configuration_id)
# Double delete because the first delete moves the configuration to the trash
storage_client.configurations.delete(config.component_id, config.configuration_id)
@pytest.fixture(scope='session')
def buckets(keboola_project: ProjectDef) -> list[BucketDef]:
return keboola_project.buckets
@pytest.fixture(scope='session')
def tables(keboola_project: ProjectDef) -> list[TableDef]:
return keboola_project.tables
@pytest.fixture(scope='session')
def configs(keboola_project: ProjectDef) -> list[ConfigDef]:
return keboola_project.configs
@pytest.fixture
def sync_storage_client(storage_api_token: str, storage_api_url: str) -> SyncStorageClient:
"""Gets the ordinary (synchronous) client from the official Keboola SDK (i.e. `kbcstorage` package)."""
return _sync_storage_client(storage_api_token, storage_api_url)
@pytest.fixture
def keboola_client(sync_storage_client: SyncStorageClient) -> KeboolaClient:
return KeboolaClient(storage_api_token=sync_storage_client.token, storage_api_url=sync_storage_client.root_url)
@pytest.fixture
def unique_id() -> str:
"""Generates a unique ID string for test resources."""
return str(uuid.uuid4())[:8]
@pytest.fixture
def workspace_manager(keboola_client: KeboolaClient, workspace_schema: str) -> WorkspaceManager:
return WorkspaceManager(keboola_client, workspace_schema)
@pytest.fixture
def mcp_context(
mocker,
keboola_client: KeboolaClient,
workspace_manager: WorkspaceManager,
keboola_project: ProjectDef,
mcp_config: Config,
) -> Context:
"""
MCP context containing the Keboola client and workspace manager.
"""
client_context = mocker.MagicMock(Context)
client_context.session = mocker.MagicMock(ServerSession)
# We set the user session state as it is done in the @with_session_state decorator
client_context.session.state = {
KeboolaClient.STATE_KEY: keboola_client,
WorkspaceManager.STATE_KEY: workspace_manager,
}
client_context.session.client_params = None
client_context.client_id = None
client_context.session_id = None
client_context.request_context = mocker.MagicMock(RequestContext)
client_context.request_context.lifespan_context = ServerState(mcp_config, ServerRuntimeInfo(transport='stdio'))
return client_context
@pytest.fixture
def run_server_remote() -> AsyncContextServerRemoteRunner:
"""
Fixture providing an async context manager to run the server in a subprocess.
"""
@asynccontextmanager
async def _run_server_remote(
server: FastMCP, transport: Literal['sse', 'streamable-http']
) -> AsyncGenerator[str, None]:
"""
Run the server in a subprocess with async context manager which ensures that the server is properly closed
after the test.
:param server: The server to run.
:param transport: The transport to use.
:return: The url of the remote server.
"""
port = random.randint(8000, 9000)
proc = Process(target=lambda: asyncio.run(server.run_async(transport=transport, port=port)))
proc.start()
if transport == 'sse':
url = f'http://127.0.0.1:{port}/sse'
else:
url = f'http://127.0.0.1:{port}/mcp'
LOG.info(f'Running MCP server in subprocess listening on {url} with {transport} transport.')
try:
await asyncio.sleep(1.0) # wait for the server to start
yield url
finally:
LOG.info('Terminating MCP server subprocess.')
proc.terminate()
proc.join()
return _run_server_remote
@pytest.fixture
def run_client() -> AsyncContextClientRunner:
"""Fixture providing an async context manager to use the client connected to the server url."""
@asynccontextmanager
async def _run_client(
transport: Literal['sse', 'streamable-http'], url: str, headers: dict[str, str] | None = None
) -> AsyncGenerator[Client, None]:
"""
Run the client in an async context manager which will ensure that the client is properly closed after the test.
The client is created with the given transport and connected to the url of the remote server with which it
communicates.
:param transport: The transport of the server to which the client will be connected.
:param url: The url of the remote server to which the client will be connected.
:param headers: The headers to use for the client.
:return: The Client connected to the remote server.
"""
if transport == 'sse':
transport_explicit = SSETransport(url=url, headers=headers)
else:
transport_explicit = StreamableHttpTransport(url=url, headers=headers)
client_explicit = Client(transport_explicit)
exception_from_client = None
LOG.info(f'Running MCP client connecting to {url} and expecting `{transport}` server transport.')
try:
async with client_explicit:
try:
yield client_explicit
except Exception as e:
LOG.error(f'Error in client TaskGroup: {e}')
exception_from_client = e # we need to keep an exception from the client TaskGroup and raise it
# in outside of the context manager otherwise it will inform only about task group error
finally:
del client_explicit
if isinstance(exception_from_client, Exception):
raise exception_from_client
return _run_client