test_workspace.py•3.73 kB
import logging
from typing import Any, Generator, Mapping
import pytest
import requests
from kbcstorage.client import Client as SyncStorageClient
from keboola_mcp_server.clients.client import KeboolaClient
from keboola_mcp_server.workspace import WorkspaceManager
LOG = logging.getLogger(__name__)
@pytest.fixture
def dynamic_manager(
    keboola_client: KeboolaClient, sync_storage_client: SyncStorageClient, workspace_schema: str
) -> Generator[WorkspaceManager, Any, None]:
    storage_client = sync_storage_client
    token_info = storage_client.tokens.verify()
    project_id: str = token_info['owner']['id']
    def _get_workspace_meta() -> list[Mapping[str, Any]]:
        metadata: list[Mapping[str, Any]] = []
        for m in storage_client.branches.metadata('default'):
            if m.get('key') == WorkspaceManager.MCP_META_KEY:
                metadata.append(m)
        return metadata
    metas = _get_workspace_meta()
    if metas:
        pytest.fail(f'Expecting empty Keboola project {project_id}, but found {metas} in the default branch')
    workspaces = storage_client.workspaces.list()
    # ignore the static workspaces
    workspaces = [
        w
        for w in workspaces
        if all(
            [
                w['connection']['schema'] != workspace_schema,
                w.get('creatorToken', {}).get('description') != 'Background Indexing Token',
            ]
        )
    ]
    if workspaces:
        pytest.fail(
            f'Expecting empty Keboola project {project_id}, but found {len(workspaces)} extra workspaces: '
            f'{[{"id": w["id"], "name": w["name"]} for w in workspaces]}'
        )
    yield WorkspaceManager(keboola_client)
    LOG.info(f'Cleaning up workspaces in Keboola project with ID={project_id}')
    metas = _get_workspace_meta()
    if len(metas) > 1:
        LOG.info(f'Multiple metadata entries found: {metas}')
    for meta in metas:
        try:
            storage_client.workspaces.delete(meta['value'])
            LOG.info(f'Deleted workspaces: {meta["value"]}')
        except requests.HTTPError:
            LOG.exception(f'Failed to delete workspace {meta["value"]}')
        try:
            url = storage_client.branches.base_url.rstrip('/')
            storage_client.branches._delete(f'{url}/branch/default/metadata/{meta["id"]}')
            LOG.info(f'Deleted workspaces metadata: {meta["id"]}')
        except requests.HTTPError as e:
            LOG.exception(f'Failed to delete workspace metadata {meta["id"]}: {e}')
class TestWorkspaceManager:
    @pytest.mark.asyncio
    async def test_static_workspace(self, workspace_manager: WorkspaceManager, workspace_schema: str):
        assert workspace_manager._workspace_schema == workspace_schema
        info = await workspace_manager._find_ws_by_schema(workspace_schema)
        assert info is not None
        assert info.schema == workspace_schema
        assert info.backend in ['snowflake', 'bigquery']
        workspace = await workspace_manager._get_workspace()
        assert workspace is not None
        assert workspace.id == info.id
    @pytest.mark.asyncio
    async def test_dynamic_workspace(self, dynamic_manager: WorkspaceManager):
        assert dynamic_manager._workspace_schema is None
        # check that there is no workspace in the branch
        info = await dynamic_manager._find_ws_in_branch()
        assert info is None
        # create workspace
        workspace = await dynamic_manager._get_workspace()
        assert workspace is not None
        # check that the new workspace is recorded in the branch
        info = await dynamic_manager._find_ws_in_branch()
        assert info is not None
        assert workspace.id == info.id