test_data_apps.py•10.1 kB
import logging
import uuid
from typing import Any, AsyncGenerator, Mapping, cast
import pytest
import pytest_asyncio
from fastmcp import Client, FastMCP
from keboola_mcp_server.clients.client import DATA_APP_COMPONENT_ID, KeboolaClient, get_metadata_property
from keboola_mcp_server.config import Config, MetadataField, ServerRuntimeInfo
from keboola_mcp_server.server import create_server
from keboola_mcp_server.tools.data_apps import (
_DEFAULT_PACKAGES,
_QUERY_DATA_FUNCTION_CODE,
DataApp,
DataAppSummary,
GetDataAppsOutput,
ModifiedDataAppOutput,
)
LOG = logging.getLogger(__name__)
@pytest.fixture
def streamlit_app_imports() -> str:
return 'import streamlit as st\n\n'
@pytest.fixture
def streamlit_app_entrypoint() -> str:
return (
'def main():\n'
" st.title('Integration Test Data App')\n"
" st.write('Hello from integration test')\n"
' # Optionally query data (kept commented to avoid side-effects during tests)\n'
" # df = query_data('select 1 as col')\n"
' # st.dataframe(df)\n\n'
'if __name__ == "__main__":\n'
' main()\n'
)
@pytest.fixture
def sample_streamlit_app(streamlit_app_imports: str, streamlit_app_entrypoint: str) -> str:
"""Return a minimal Streamlit app template that supports query injection."""
return f'{streamlit_app_imports}' '{QUERY_DATA_FUNCTION}\n\n' f'{streamlit_app_entrypoint}'
@pytest.fixture
def mcp_server(storage_api_url: str, storage_api_token: str, workspace_schema: str) -> FastMCP:
config = Config(storage_api_url=storage_api_url, storage_token=storage_api_token, workspace_schema=workspace_schema)
mcp_server = create_server(config, runtime_info=ServerRuntimeInfo(transport='stdio'))
assert isinstance(mcp_server, FastMCP)
return mcp_server
@pytest_asyncio.fixture
async def mcp_client(mcp_server: FastMCP) -> AsyncGenerator[Client, None]:
async with Client(mcp_server) as client:
yield client
@pytest.fixture
def app_name() -> str:
unique_suffix = uuid.uuid4().hex[:8]
return f'Integration Test Data App {unique_suffix}'
@pytest.fixture
def app_description() -> str:
return 'Data app created by integration test'
@pytest_asyncio.fixture
async def initial_data_app(
mcp_client: Client,
keboola_client: KeboolaClient,
app_name: str,
app_description: str,
sample_streamlit_app: str,
) -> AsyncGenerator[ModifiedDataAppOutput, None]:
sync_output: ModifiedDataAppOutput | None = None
try:
# Create
created_result = await mcp_client.call_tool(
name='modify_data_app',
arguments={
'name': app_name,
'description': app_description,
'source_code': sample_streamlit_app,
'packages': ['numpy', 'streamlit'],
'authorization_required': False,
},
)
assert created_result.structured_content is not None
sync_output = ModifiedDataAppOutput.model_validate(created_result.structured_content)
yield sync_output
finally:
if sync_output:
try:
# Delete the data app from the data science API and the configuration from the storage API as well.
await keboola_client.data_science_client.delete_data_app(sync_output.data_app.data_app_id)
except Exception as e:
LOG.error(f'Error deleting data app: {e}')
else:
LOG.error('No data app to delete')
@pytest.mark.asyncio
async def test_get_data_apps_listing(mcp_client: Client, initial_data_app: ModifiedDataAppOutput) -> None:
"""Test listing data apps does not error."""
tool_result = await mcp_client.call_tool(name='get_data_apps', arguments={})
assert tool_result.structured_content is not None
apps = GetDataAppsOutput.model_validate(tool_result.structured_content)
assert isinstance(apps.data_apps, list)
assert all(isinstance(app, DataAppSummary) for app in apps.data_apps)
@pytest.mark.asyncio
async def test_data_app_lifecycle(
mcp_client: Client,
keboola_client: KeboolaClient,
app_name: str,
app_description: str,
initial_data_app: ModifiedDataAppOutput,
streamlit_app_imports: str,
streamlit_app_entrypoint: str,
) -> None:
"""
End-to-end lifecycle for data apps:
Starts with a created app.
- get details and list of created app
- update app
- get details and list of updated app
Always deletes the data app in teardown.
"""
# Check created app basic details
assert initial_data_app.response == 'created'
data_app_id = initial_data_app.data_app.data_app_id
configuration_id = initial_data_app.data_app.configuration_id
assert data_app_id
assert configuration_id
# Verify the metadata - check that KBC.MCP.createdBy is set to 'true'
metadata = await keboola_client.storage_client.configuration_metadata_get(
component_id=DATA_APP_COMPONENT_ID, configuration_id=configuration_id
)
assert isinstance(metadata, list)
metadata_dict = {item['key']: item['value'] for item in metadata if isinstance(item, dict)}
assert MetadataField.CREATED_BY_MCP in metadata_dict
assert metadata_dict[MetadataField.CREATED_BY_MCP] == 'true'
# Check created app details by configuration_id
details_result = await mcp_client.call_tool(
name='get_data_apps', arguments={'configuration_ids': [configuration_id]}
)
assert details_result.structured_content is not None
details = GetDataAppsOutput.model_validate(details_result.structured_content)
assert len(details.data_apps) == 1
data_app_details = details.data_apps[0]
assert isinstance(data_app_details, DataApp)
assert data_app_details.configuration_id == configuration_id
assert data_app_details.data_app_id == data_app_id
assert data_app_details.name == app_name
assert data_app_details.description == app_description
# Check code and code injection
assert streamlit_app_imports in data_app_details.parameters['script'][0]
assert streamlit_app_entrypoint in data_app_details.parameters['script'][0]
assert _QUERY_DATA_FUNCTION_CODE in data_app_details.parameters['script'][0]
# Check packages
assert set(data_app_details.parameters['packages']) == set(['numpy', 'streamlit'] + _DEFAULT_PACKAGES)
# Check listing contains our app
# TODO(REMOVE): Set the limit back to the default value once DSAPI is fixed. The limit is temporarily increased to
# 500 to prevent listing only the leftover data apps from previous tests (100). These apps cannot be deleted
# because their configurations were removed in SAPI first, causing the DSAPI delete endpoint to return a 500 error
# afterward.
listed_result = await mcp_client.call_tool(name='get_data_apps', arguments={'limit': 500})
assert listed_result.structured_content is not None
listed = GetDataAppsOutput.model_validate(listed_result.structured_content)
assert len(listed.data_apps) > 0
assert all(isinstance(app, DataAppSummary) for app in listed.data_apps)
assert configuration_id in [a.configuration_id for a in listed.data_apps]
# TODO(REMOVE): Remove this assertion once DSAPI is fixed. This only checks that we do not leave any data apps
# in the CI project after test executions except those which are already there and cannot be deleted.
assert len(listed.data_apps) < 110
# Update app
updated_name = f'{app_name} - Updated'
updated_description = 'Data app updated by integration test'
updated_source_code = 'import numpy as np\n\n'
updated_result = await mcp_client.call_tool(
name='modify_data_app',
arguments={
'name': updated_name,
'description': updated_description,
'source_code': updated_source_code,
'packages': ['streamlit'],
'authorization_required': False,
'configuration_id': configuration_id,
'change_description': 'Update Code',
},
)
# Check updated app basic details
assert updated_result.structured_content is not None
updated = ModifiedDataAppOutput.model_validate(updated_result.structured_content)
assert updated.response == 'updated'
assert updated.data_app.data_app_id == data_app_id
assert updated.data_app.configuration_id == configuration_id
# Check that KBC.MCP.updatedBy.version.{version} is set to 'true'
metadata = cast(
list[Mapping[str, Any]],
await keboola_client.storage_client.configuration_metadata_get(
component_id=DATA_APP_COMPONENT_ID, configuration_id=configuration_id
),
)
meta_key = f'{MetadataField.UPDATED_BY_MCP_PREFIX}{updated.data_app.config_version}'
meta_value = get_metadata_property(metadata, meta_key)
assert meta_value == 'true'
# Check that the original creation metadata is still there
assert get_metadata_property(metadata, MetadataField.CREATED_BY_MCP) == 'true'
# Check updated app details by configuration_id
fetched_app = await mcp_client.call_tool(name='get_data_apps', arguments={'configuration_ids': [configuration_id]})
assert fetched_app.structured_content is not None
fetched = GetDataAppsOutput.model_validate(fetched_app.structured_content)
assert len(fetched.data_apps) == 1
assert isinstance(fetched.data_apps[0], DataApp)
assert fetched.data_apps[0].name == updated_name
assert fetched.data_apps[0].description == updated_description
# Check that the source code is updated
assert _QUERY_DATA_FUNCTION_CODE in fetched.data_apps[0].parameters['script'][0]
assert updated_source_code in fetched.data_apps[0].parameters['script'][0]
assert streamlit_app_imports not in fetched.data_apps[0].parameters['script'][0]
assert streamlit_app_entrypoint not in fetched.data_apps[0].parameters['script'][0]
# Check that the packages are updated
assert set(fetched.data_apps[0].parameters['packages']) == set(['streamlit'] + _DEFAULT_PACKAGES)