Keboola Explorer MCP Server

"""Tests for server functionality.""" from typing import Any, Dict, List, Optional import pytest from keboola_mcp_server.config import Config from keboola_mcp_server.server import BucketInfo, TableColumnInfo, TableDetail, create_server @pytest.fixture def test_config() -> Config: return Config( storage_token="test-token", storage_api_url="https://connection.test.keboola.com", log_level="INFO", snowflake_account="test-account", snowflake_user="test-user", snowflake_password="test-password", snowflake_warehouse="test-warehouse", snowflake_database="test-database", snowflake_role="test-role", ) @pytest.fixture def mock_table_detail() -> Dict[str, Any]: """Create a mock table detail.""" return { "id": "in.c-test.test-table", "name": "test-table", "primary_key": ["id"], "created": "2024-01-01T00:00:00Z", "row_count": 100, "data_size_bytes": 1000, "columns": ["id", "name", "value"], "column_identifiers": [ {"name": "id", "db_identifier": '"id"'}, {"name": "name", "db_identifier": '"name"'}, {"name": "value", "db_identifier": '"value"'}, ], "db_identifier": '"KEBOOLA_test"."in.c-test"."test-table"', } @pytest.fixture def mock_buckets() -> List[Dict[str, Any]]: """Fixture for mock bucket data.""" return [ { "id": "bucket1", "name": "Test Bucket 1", "description": "A test bucket", "stage": "production", "created": "2024-01-01T00:00:00Z", "table_count": 5, "data_size_bytes": 1024, }, { "id": "bucket2", "name": "Test Bucket 2", "description": "Another test bucket", "created": "2025-01-01T00:00:00Z", "table_count": 3, "data_size_bytes": 2048, }, ] @pytest.mark.asyncio async def test_query_table_data_tool(test_config: Config) -> None: # TODO -- make this test to actually test something; in its current shape it only calls the mock function # Testing the MCP tools is tricky. Ideally, we would need a test client similar to # `starlette.testclient.TestClient`, but for the statefull MCP. mock_table_info = { "db_identifier": 'SAPI_10025."in.c-test"."test_table"', "column_identifiers": [ {"name": "id", "db_identifier": '"id"'}, {"name": "name", "db_identifier": '"name"'}, ], } # Create server first server = create_server(test_config) # Store original functions original_get_table_detail = server.__dict__.get("get_table_detail") original_query_table = server.__dict__.get("query_table") original_query_table_data = server.__dict__.get("query_table_data") # Create our mock async functions async def mock_get_table_detail(*args, **kwargs): return mock_table_info async def mock_query_table(*args, **kwargs): return "id,name\nid1,name1\nid2,name2" async def query_table_data( table_id: str, columns: Optional[List[str]] = None, where: Optional[str] = None, limit: Optional[int] = None, ) -> str: table_info = await mock_get_table_detail(table_id) if columns: column_map = { col["name"]: col["db_identifier"] for col in table_info["column_identifiers"] } select_clause = ", ".join(column_map[col] for col in columns) else: select_clause = "*" query = f"SELECT {select_clause} FROM {table_info['db_identifier']}" if where: query += f" WHERE {where}" if limit: query += f" LIMIT {limit}" return await mock_query_table(query) # Set them directly on server.__dict__ server.__dict__["get_table_detail"] = mock_get_table_detail server.__dict__["query_table"] = mock_query_table server.__dict__["query_table_data"] = query_table_data try: # Test basic query result = await server.query_table_data("in.c-test.test_table") assert "id,name" in result assert "id1,name1" in result assert "id2,name2" in result # Test with parameters result = await server.query_table_data( "in.c-test.test_table", columns=["id"], where="name = 'test'", limit=10 ) assert "id" in result finally: # Restore original functions if they existed if original_get_table_detail: server.__dict__["get_table_detail"] = original_get_table_detail if original_query_table: server.__dict__["query_table"] = original_query_table if original_query_table_data: server.__dict__["query_table_data"] = original_query_table_data @pytest.mark.asyncio async def test_list_all_buckets(test_config: Config, mock_buckets: List[Dict[str, Any]]) -> None: """Test the list_all_buckets tool.""" server = create_server(test_config) original_list_buckets = server.__dict__.get("list_all_buckets") async def mock_list_all_buckets(ctx): return [BucketInfo(**bucket) for bucket in mock_buckets] server.__dict__["list_all_buckets"] = mock_list_all_buckets try: result = await server.list_all_buckets(None) assert isinstance(result, list) assert all(isinstance(bucket, BucketInfo) for bucket in result) assert result[0].id == "bucket1" assert result[0].name == "Test Bucket 1" assert result[0].description == "A test bucket" assert result[0].stage == "production" assert result[0].created == "2024-01-01T00:00:00Z" assert result[0].table_count == 5 assert result[0].data_size_bytes == 1024 finally: # Restore original function if it existed if original_list_buckets: server.__dict__["list_all_buckets"] = original_list_buckets @pytest.mark.asyncio @pytest.mark.parametrize("bucket_id", ["bucket1", "bucket2"]) async def test_get_bucket_metadata( test_config: Config, mock_buckets: List[Dict[str, Any]], bucket_id: str ) -> None: """Test the get_bucket_metadata tool.""" server = create_server(test_config) original_get_bucket_metadata = server.__dict__.get("get_bucket_metadata") async def mock_get_bucket_metadata(bid: str, ctx): bucket = next((b for b in mock_buckets if b["id"] == bid), None) if not bucket: raise ValueError(f"Bucket {bid} not found") return BucketInfo(**bucket) server.__dict__["get_bucket_metadata"] = mock_get_bucket_metadata try: expected_bucket = next(b for b in mock_buckets if b["id"] == bucket_id) result = await server.get_bucket_metadata(bucket_id, None) assert isinstance(result, BucketInfo) for field, value in expected_bucket.items(): assert getattr(result, field) == value with pytest.raises(ValueError, match="Bucket nonexistent-bucket not found"): await server.get_bucket_metadata("nonexistent-bucket", None) finally: if original_get_bucket_metadata: server.__dict__["get_bucket_metadata"] = original_get_bucket_metadata