Skip to main content
Glama

Keboola Explorer MCP Server

test_workspace.py3.73 kB
import logging from typing import Any, Generator, Mapping import pytest import requests from kbcstorage.client import Client as SyncStorageClient from keboola_mcp_server.clients.client import KeboolaClient from keboola_mcp_server.workspace import WorkspaceManager LOG = logging.getLogger(__name__) @pytest.fixture def dynamic_manager( keboola_client: KeboolaClient, sync_storage_client: SyncStorageClient, workspace_schema: str ) -> Generator[WorkspaceManager, Any, None]: storage_client = sync_storage_client token_info = storage_client.tokens.verify() project_id: str = token_info['owner']['id'] def _get_workspace_meta() -> list[Mapping[str, Any]]: metadata: list[Mapping[str, Any]] = [] for m in storage_client.branches.metadata('default'): if m.get('key') == WorkspaceManager.MCP_META_KEY: metadata.append(m) return metadata metas = _get_workspace_meta() if metas: pytest.fail(f'Expecting empty Keboola project {project_id}, but found {metas} in the default branch') workspaces = storage_client.workspaces.list() # ignore the static workspaces workspaces = [ w for w in workspaces if all( [ w['connection']['schema'] != workspace_schema, w.get('creatorToken', {}).get('description') != 'Background Indexing Token', ] ) ] if workspaces: pytest.fail( f'Expecting empty Keboola project {project_id}, but found {len(workspaces)} extra workspaces: ' f'{[{"id": w["id"], "name": w["name"]} for w in workspaces]}' ) yield WorkspaceManager(keboola_client) LOG.info(f'Cleaning up workspaces in Keboola project with ID={project_id}') metas = _get_workspace_meta() if len(metas) > 1: LOG.info(f'Multiple metadata entries found: {metas}') for meta in metas: try: storage_client.workspaces.delete(meta['value']) LOG.info(f'Deleted workspaces: {meta["value"]}') except requests.HTTPError: LOG.exception(f'Failed to delete workspace {meta["value"]}') try: url = storage_client.branches.base_url.rstrip('/') storage_client.branches._delete(f'{url}/branch/default/metadata/{meta["id"]}') LOG.info(f'Deleted workspaces metadata: {meta["id"]}') except requests.HTTPError as e: LOG.exception(f'Failed to delete workspace metadata {meta["id"]}: {e}') class TestWorkspaceManager: @pytest.mark.asyncio async def test_static_workspace(self, workspace_manager: WorkspaceManager, workspace_schema: str): assert workspace_manager._workspace_schema == workspace_schema info = await workspace_manager._find_ws_by_schema(workspace_schema) assert info is not None assert info.schema == workspace_schema assert info.backend in ['snowflake', 'bigquery'] workspace = await workspace_manager._get_workspace() assert workspace is not None assert workspace.id == info.id @pytest.mark.asyncio async def test_dynamic_workspace(self, dynamic_manager: WorkspaceManager): assert dynamic_manager._workspace_schema is None # check that there is no workspace in the branch info = await dynamic_manager._find_ws_in_branch() assert info is None # create workspace workspace = await dynamic_manager._get_workspace() assert workspace is not None # check that the new workspace is recorded in the branch info = await dynamic_manager._find_ws_in_branch() assert info is not None assert workspace.id == info.id

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/keboola/keboola-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server