Skip to main content
Glama

Keboola Explorer MCP Server

data_apps.py29.1 kB
import asyncio import logging import re from typing import Annotated, Any, Literal, Optional, Sequence, Union, cast import httpx from fastmcp import Context, FastMCP from fastmcp.tools import FunctionTool from mcp.types import ToolAnnotations from pydantic import BaseModel, Field from keboola_mcp_server.clients.client import DATA_APP_COMPONENT_ID, KeboolaClient from keboola_mcp_server.clients.data_science import DataAppConfig, DataAppResponse from keboola_mcp_server.clients.storage import ConfigurationAPIResponse from keboola_mcp_server.errors import tool_errors from keboola_mcp_server.links import Link, ProjectLinksManager from keboola_mcp_server.tools.components.utils import set_cfg_creation_metadata, set_cfg_update_metadata from keboola_mcp_server.workspace import WorkspaceManager LOG = logging.getLogger(__name__) DATA_APP_TOOLS_TAG = 'data-apps' def add_data_app_tools(mcp: FastMCP) -> None: """Add tools to the MCP server.""" mcp.add_tool( FunctionTool.from_function( modify_data_app, tags={DATA_APP_TOOLS_TAG}, annotations=ToolAnnotations(destructiveHint=True), ) ) mcp.add_tool( FunctionTool.from_function( get_data_apps, tags={DATA_APP_TOOLS_TAG}, annotations=ToolAnnotations(readOnlyHint=True), ) ) mcp.add_tool( FunctionTool.from_function( deploy_data_app, tags={DATA_APP_TOOLS_TAG}, annotations=ToolAnnotations(destructiveHint=False), ) ) LOG.info('Data app tools initialized.') # State of the data app State = Literal['created', 'running', 'stopped', 'starting', 'stopping', 'restarting'] # Accepts known states or any string preventing from validation errors when receiving unknown states from the API # LLM agent can still understand the state of the data app even if it is different from the known states SafeState = Union[State, str] # Type of the data app Type = Literal['streamlit'] # Accepts known types or any string preventing from validation errors when receiving unknown types from the API # LLM agent can still understand the type of the data app even if it is different from the known types SafeType = Union[Type, str] class DataAppSummary(BaseModel): """A summary of a data app used for sync operations.""" component_id: str = Field(description='The ID of the data app component.') configuration_id: str = Field(description='The ID of the data app config.') data_app_id: str = Field(description='The ID of the data app.') project_id: str = Field(description='The ID of the project.') branch_id: str = Field(description='The ID of the branch.') config_version: str = Field(description='The version of the data app config.') state: SafeState = Field(description='The state of the data app.') type: SafeType = Field( description=( 'The type of the data app. Currently, only "streamlit" is supported in the MCP. However, Keboola DSAPI ' 'supports additional types, which can be retrieved from the API.' ) ) deployment_url: Optional[str] = Field(description='The URL of the running data app.', default=None) auto_suspend_after_seconds: int = Field( description='The number of seconds after which the running data app is automatically suspended.' ) @classmethod def from_api_response(cls, api_response: DataAppResponse) -> 'DataAppSummary': return cls( component_id=api_response.component_id, configuration_id=api_response.config_id, data_app_id=api_response.id, project_id=api_response.project_id, branch_id=api_response.branch_id or '', config_version=api_response.config_version, state=api_response.state, type=api_response.type, deployment_url=api_response.url, auto_suspend_after_seconds=api_response.auto_suspend_after_seconds, ) class DeploymentInfo(BaseModel): """Deployment information of a data app.""" version: str = Field(description='The version of the data app deployment.') state: str = Field(description='The state of the data app deployment.') url: Optional[str] = Field(description='The URL of the running data app deployment.', default=None) last_request_timestamp: Optional[str] = Field( description='The last request timestamp of the data app deployment.', default=None ) last_start_timestamp: Optional[str] = Field( description='The last start timestamp of the data app deployment.', default=None ) logs: list[str] = Field(description='The latest 100 logs of the data app deployment.', default_factory=list) class DataApp(BaseModel): """A data app used for detail views.""" name: str = Field(description='The name of the data app.') description: Optional[str] = Field(description='The description of the data app.', default=None) component_id: str = Field(description='The ID of the data app component.') configuration_id: str = Field(description='The ID of the data app configuration.') data_app_id: str = Field(description='The ID of the data app.') project_id: str = Field(description='The ID of the project.') branch_id: str = Field(description='The ID of the branch.') config_version: str = Field(description='The version of the data app config.') state: SafeState = Field(description='The state of the data app.') type: SafeType = Field( description=( 'The type of the data app. Currently, only "streamlit" is supported in the MCP. However, Keboola DSAPI ' 'supports additional types, which can be retrieved from the API.' ) ) deployment_url: Optional[str] = Field(description='The URL of the running data app.', default=None) auto_suspend_after_seconds: int = Field( description='The number of seconds after which the running data app is automatically suspended.' ) is_authorized: bool = Field(description='Whether the data app is authorized using simple password or not.') parameters: dict[str, Any] = Field(description='The parameters of the data app.') authorization: dict[str, Any] = Field(description='The authorization of the data app.') storage: dict[str, Any] = Field( description='The storage input/output mapping of the data app.', default_factory=dict ) deployment_info: Optional[DeploymentInfo] = Field(description='The deployment info of the data app.', default=None) links: list[Link] = Field(description='Navigation links for the web interface.', default_factory=list) @classmethod def from_api_responses( cls, api_response: DataAppResponse, api_configuration: ConfigurationAPIResponse, ) -> 'DataApp': parameters = api_configuration.configuration.get('parameters', {}) or {} authorization = api_configuration.configuration.get('authorization', {}) or {} storage = api_configuration.configuration.get('storage', {}) or {} return cls( component_id=api_configuration.component_id, configuration_id=api_configuration.configuration_id, data_app_id=api_response.id, project_id=api_response.project_id, branch_id=api_response.branch_id or '', config_version=str(api_configuration.version), state=api_response.state, type=api_response.type, deployment_url=api_response.url, auto_suspend_after_seconds=api_response.auto_suspend_after_seconds, name=api_configuration.name, description=api_configuration.description, parameters=parameters, authorization=authorization, storage=storage, is_authorized=_is_authorized(authorization), deployment_info=None, links=[], ) def with_links(self, links: list[Link]) -> 'DataApp': self.links = links return self def with_deployment_info(self, logs: list[str]) -> 'DataApp': """Adds deployment info to the data app. :param logs: The logs of the data app deployment. :return: The data app with the deployment info. """ self.deployment_info = DeploymentInfo( version=self.config_version, state=self.state, url=self.deployment_url or 'deployment link not available yet', logs=logs, ) return self class ModifiedDataAppOutput(BaseModel): """Modified data app output containing the response of the action performed and the data app and links to the web interface.""" response: str = Field(description='The response of the action performed with potential additional information.') data_app: DataAppSummary = Field(description='The data app.') links: list[Link] = Field(description='Navigation links for the web interface.') class DeploymentDataAppOutput(BaseModel): """Deployment data app output containing the action performed, links and the deployment info of the data app.""" state: SafeState = Field(description='The state of the data app deployment.') deployment_info: DeploymentInfo | None = Field(description='The deployment info of the data app.') links: list[Link] = Field(description='Navigation links for the web interface.') class GetDataAppsOutput(BaseModel): """Output of the get_data_apps tool. Serves for both DataAppSummary and DataApp outputs.""" data_apps: Sequence[DataAppSummary | DataApp] = Field(description='The data apps in the project.') links: list[Link] = Field(description='Navigation links for the web interface.', default_factory=list) @tool_errors() async def modify_data_app( ctx: Context, name: Annotated[str, Field(description='Name of the data app.')], description: Annotated[str, Field(description='Description of the data app.')], source_code: Annotated[str, Field(description='Complete Python/Streamlit source code for the data app.')], packages: Annotated[ list[str], Field( description='Python packages used in the source code that will be installed by `pip install` ' 'into the environment before the code runs. For example: ["pandas", "requests~=2.32"].' ), ], authorization_required: Annotated[ bool, Field(description='Whether the data app is authorized using simple password or not.') ] = True, configuration_id: Annotated[ str, Field(description='The ID of existing data app configuration when updating, otherwise empty string.') ] = '', change_description: Annotated[ str, Field(description='The description of the change when updating (e.g. "Update Code"), otherwise empty string.'), ] = '', ) -> ModifiedDataAppOutput: """Creates or updates a Streamlit data app. Considerations: - The `source_code` parameter must be a complete and runnable Streamlit app. It must include a placeholder `{QUERY_DATA_FUNCTION}` where a `query_data` function will be injected. This function accepts a string of SQL query following current sql dialect and returns a pandas DataFrame with the results from the workspace. - Write SQL queries so they are compatible with the current workspace backend, you can ensure this by using the `query_data` tool to inspect the data in the workspace before using it in the data app. - If you're updating an existing data app, provide the `configuration_id` parameter and the `change_description` parameter. - If the data app is updated while running, it must be redeployed for the changes to take effect. - The Data App requires basic authorization by default for security reasons, unless explicitly specified otherwise by the user. """ client = KeboolaClient.from_state(ctx.session.state) workspace_manager = WorkspaceManager.from_state(ctx.session.state) links_manager = await ProjectLinksManager.from_client(client) project_id = await client.storage_client.project_id() source_code = _inject_query_to_source_code(source_code) secrets = _get_secrets(client, str(await workspace_manager.get_workspace_id())) if configuration_id: # Update existing data app data_app = await _fetch_data_app(client, configuration_id=configuration_id, data_app_id=None) existing_config = { 'parameters': data_app.parameters, 'authorization': data_app.authorization, 'storage': data_app.storage, } updated_config = _update_existing_data_app_config( existing_config, name, source_code, packages, authorization_required, secrets ) updated_config = cast( dict[str, Any], await client.encryption_client.encrypt( updated_config, component_id=DATA_APP_COMPONENT_ID, project_id=project_id ), ) await client.storage_client.configuration_update( component_id=DATA_APP_COMPONENT_ID, configuration_id=configuration_id, configuration=updated_config, change_description=change_description or 'Change Data App', updated_name=name, updated_description=description, ) data_app = await _fetch_data_app(client, configuration_id=configuration_id, data_app_id=None) await set_cfg_update_metadata( client=client, component_id=DATA_APP_COMPONENT_ID, configuration_id=configuration_id, configuration_version=int(data_app.config_version), ) links = links_manager.get_data_app_links( configuration_id=data_app.configuration_id, configuration_name=name, deployment_link=data_app.deployment_url, is_authorized=data_app.is_authorized, ) response = ( 'updated (redeploy required to apply changes in the running app)' if data_app.state in ('running', 'starting') else 'updated' ) return ModifiedDataAppOutput( response=response, data_app=DataAppSummary.model_validate(data_app.model_dump()), links=links ) else: # Create new data app config = _build_data_app_config(name, source_code, packages, authorization_required, secrets) config = await client.encryption_client.encrypt( config, component_id=DATA_APP_COMPONENT_ID, project_id=project_id ) validated_config = DataAppConfig.model_validate(config) data_app_resp = await client.data_science_client.create_data_app( name, description, configuration=validated_config ) await set_cfg_creation_metadata( client=client, component_id=DATA_APP_COMPONENT_ID, configuration_id=data_app_resp.config_id, ) links = links_manager.get_data_app_links( configuration_id=data_app_resp.config_id, configuration_name=name, deployment_link=data_app_resp.url, is_authorized=authorization_required, ) return ModifiedDataAppOutput( response='created', data_app=DataAppSummary.from_api_response(data_app_resp), links=links ) @tool_errors() async def get_data_apps( ctx: Context, configuration_ids: Annotated[Sequence[str], Field(description='The IDs of the data app configurations.')] = tuple(), limit: Annotated[int, Field(description='The limit of the data apps to fetch.')] = 100, offset: Annotated[int, Field(description='The offset of the data apps to fetch.')] = 0, ) -> GetDataAppsOutput: """Lists summaries of data apps in the project given the limit and offset or gets details of a data apps by providing their configuration IDs. Considerations: - If configuration_ids are provided, the tool will return details of the data apps by their configuration IDs. - If no configuration_ids are provided, the tool will list all data apps in the project given the limit and offset. - Data App details contain configurations, deployment info along with logs and links to the data app dashboard. """ client = KeboolaClient.from_state(ctx.session.state) links_manager = await ProjectLinksManager.from_client(client) if configuration_ids: # Get details of the data apps by their configuration IDs using 10 parallel requests at a time to not overload # the API data_app_details: list[DataApp | str] = [] batch_size = 10 # fetching 10 data apps details at a time to not overload the API for current_batch in range(0, len(configuration_ids), batch_size): batch_ids = configuration_ids[current_batch : current_batch + batch_size] data_app_details.extend( await asyncio.gather( *( _fetch_data_app_details_task(client, links_manager, configuration_id) for configuration_id in batch_ids ) ) ) found_data_apps: list[DataApp] = [dap for dap in data_app_details if isinstance(dap, DataApp)] not_found_ids: list[str] = [dap for dap in data_app_details if isinstance(dap, str)] if not_found_ids: await ctx.log(f'Could not find Data Apps Configurations for IDs: {not_found_ids}', 'error') logging.error(f'Could not find Data Apps Configurations for IDs: {not_found_ids}') return GetDataAppsOutput(data_apps=found_data_apps) else: # List all data apps in the project data_apps: list[DataAppResponse] = await client.data_science_client.list_data_apps(limit=limit, offset=offset) links = [links_manager.get_data_app_dashboard_link()] return GetDataAppsOutput( data_apps=[DataAppSummary.from_api_response(data_app) for data_app in data_apps], links=links, ) @tool_errors() async def deploy_data_app( ctx: Context, action: Annotated[Literal['deploy', 'stop'], Field(description='The action to perform.')], configuration_id: Annotated[str, Field(description='The ID of the data app configuration.')], ) -> DeploymentDataAppOutput: """Deploys/redeploys a data app or stops running data app in the Keboola environment given the action and configuration ID. Considerations: - Redeploying a data app takes some time, and the app temporarily may have status "stopped" during this process because it needs to restart. """ client = KeboolaClient.from_state(ctx.session.state) links_manager = await ProjectLinksManager.from_client(client) if action == 'deploy': data_app = await _fetch_data_app(client, configuration_id=configuration_id, data_app_id=None) if data_app.state == 'stopping': raise ValueError('Data app is currently "stopping", could not be started at the moment.') config_version = await client.storage_client.configuration_version_latest( DATA_APP_COMPONENT_ID, data_app.configuration_id ) _ = await client.data_science_client.deploy_data_app(data_app.data_app_id, str(config_version)) data_app = await _fetch_data_app(client, configuration_id=configuration_id, data_app_id=None) data_app = data_app.with_deployment_info(await _fetch_logs(client, data_app.data_app_id)) links = links_manager.get_data_app_links( configuration_id=data_app.configuration_id, configuration_name=data_app.name, deployment_link=data_app.deployment_url, is_authorized=data_app.is_authorized, ) return DeploymentDataAppOutput(state=data_app.state, links=links, deployment_info=data_app.deployment_info) elif action == 'stop': data_app = await _fetch_data_app(client, configuration_id=configuration_id, data_app_id=None) if data_app.state in ('starting', 'restarting'): raise ValueError('Data app is currently "starting", could not be stopped at the moment.') _ = await client.data_science_client.suspend_data_app(data_app.data_app_id) data_app = await _fetch_data_app(client, configuration_id=configuration_id, data_app_id=None) links = links_manager.get_data_app_links( configuration_id=data_app.configuration_id, configuration_name=data_app.name, deployment_link=None, is_authorized=data_app.is_authorized, ) return DeploymentDataAppOutput(state=data_app.state, links=links, deployment_info=None) else: raise ValueError(f'Invalid action: {action}') _DEFAULT_STREAMLIT_THEME = ( '[theme]\nfont = "sans serif"\ntextColor = "#222529"\nbackgroundColor = "#FFFFFF"\nsecondaryBackgroundColor = ' '"#E6F2FF"\nprimaryColor = "#1F8FFF"' ) _DEFAULT_PACKAGES = ['pandas', 'httpx'] def _build_data_app_config( name: str, source_code: str, packages: list[str], authorize_with_password: bool, secrets: dict[str, Any], ) -> dict[str, Any]: packages = sorted(list(set(packages + _DEFAULT_PACKAGES))) slug = _get_data_app_slug(name) parameters = { 'size': 'tiny', 'autoSuspendAfterSeconds': 900, 'dataApp': { 'slug': slug, 'streamlit': { 'config.toml': _DEFAULT_STREAMLIT_THEME, }, 'secrets': secrets, }, 'script': [source_code], 'packages': packages, } authorization = _get_authorization(authorize_with_password) return {'parameters': parameters, 'authorization': authorization} def _update_existing_data_app_config( existing_config: dict[str, Any], name: str, source_code: str, packages: list[str], authorize_with_password: bool, secrets: dict[str, Any], ) -> dict[str, Any]: new_config = existing_config.copy() new_config['parameters']['dataApp']['slug'] = ( _get_data_app_slug(name) or existing_config['parameters']['dataApp']['slug'] ) new_config['parameters']['script'] = [source_code] if source_code else existing_config['parameters']['script'] new_config['parameters']['packages'] = sorted(list(set(packages + _DEFAULT_PACKAGES))) new_config['parameters']['dataApp']['secrets'] = existing_config['parameters']['dataApp']['secrets'] | secrets new_config['authorization'] = _get_authorization(authorize_with_password) return new_config async def _fetch_data_app( client: KeboolaClient, *, data_app_id: Optional[str], configuration_id: Optional[str], ) -> DataApp: """ Fetches data app from both data-science API and storage API based on the provided data_app_id or configuration_id. :param client: The Keboola client :param data_app_id: The ID of the data app :param configuration_id: The ID of the configuration :return: The data app """ if data_app_id: # Fetch data app from science API to get the configuration ID data_app_science = await client.data_science_client.get_data_app(data_app_id) raw_data_app_config = await client.storage_client.configuration_detail( component_id=DATA_APP_COMPONENT_ID, configuration_id=data_app_science.config_id ) api_config = ConfigurationAPIResponse.model_validate( raw_data_app_config | {'component_id': DATA_APP_COMPONENT_ID} ) return DataApp.from_api_responses(data_app_science, api_config) elif configuration_id: raw_configuration = await client.storage_client.configuration_detail( component_id=DATA_APP_COMPONENT_ID, configuration_id=configuration_id ) api_config = ConfigurationAPIResponse.model_validate( raw_configuration | {'component_id': DATA_APP_COMPONENT_ID} ) data_app_id = cast(str, api_config.configuration['parameters']['id']) data_app_science = await client.data_science_client.get_data_app(data_app_id) return DataApp.from_api_responses(data_app_science, api_config) else: raise ValueError('Either data_app_id or configuration_id must be provided.') async def _fetch_data_app_details_task( client: KeboolaClient, links_manager: ProjectLinksManager, configuration_id: str ) -> DataApp | str: """Task fetching data app details with logs and links by configuration ID. :param client: The Keboola client :param configuration_id: The ID of the data app configuration :return: The data app details or the configuration ID if the data app is not found """ try: data_app = await _fetch_data_app(client, configuration_id=configuration_id, data_app_id=None) links = links_manager.get_data_app_links( configuration_id=data_app.configuration_id, configuration_name=data_app.name, deployment_link=data_app.deployment_url, is_authorized=data_app.is_authorized, ) logs = await _fetch_logs(client, data_app.data_app_id) return data_app.with_links(links).with_deployment_info(logs) except Exception: return configuration_id async def _fetch_logs(client: KeboolaClient, data_app_id: str) -> list[str]: """Fetches the logs of a data app if it is running otherwise returns empty list.""" try: str_logs = await client.data_science_client.tail_app_logs(data_app_id, since=None, lines=20) logs = str_logs.split('\n') return logs except httpx.HTTPStatusError: # The data app is not running, return empty list return [] def _get_authorization(auth_required: bool) -> dict[str, Any]: if auth_required: return { 'app_proxy': { 'auth_providers': [{'id': 'simpleAuth', 'type': 'password'}], 'auth_rules': [{'type': 'pathPrefix', 'value': '/', 'auth_required': True, 'auth': ['simpleAuth']}], }, } else: return { 'app_proxy': { 'auth_providers': [], 'auth_rules': [{'type': 'pathPrefix', 'value': '/', 'auth_required': False}], } } def _get_data_app_slug(name: str) -> str: return re.sub(r'[^a-z0-9\-]', '', name.lower().replace(' ', '-')) def _is_authorized(authorization: dict[str, Any]) -> bool: try: return any(auth_rule['auth_required'] for auth_rule in authorization['app_proxy']['auth_rules']) except Exception: return False _QUERY_DATA_FUNCTION_CODE = """ #### INJECTED_CODE #### #### QUERY DATA FUNCTION #### import httpx import os import pandas as pd def query_data(query: str) -> pd.DataFrame: bid = os.environ.get('BRANCH_ID') wid = os.environ.get('WORKSPACE_ID') kbc_url = os.environ.get('KBC_URL') kbc_token = os.environ.get('KBC_TOKEN') timeout = httpx.Timeout(connect=10.0, read=60.0, write=10.0, pool=None) limits = httpx.Limits(max_keepalive_connections=5, max_connections=10) with httpx.Client(timeout=timeout, limits=limits) as client: response = client.post( f'{kbc_url}/v2/storage/branch/{bid}/workspaces/{wid}/query', json={'query': query}, headers={'X-StorageAPI-Token': kbc_token}, ) response.raise_for_status() response_json = response.json() if response_json.get('status') == 'error': raise ValueError(f'Error when executing query "{query}": {response_json.get("message")}.') return pd.DataFrame(response_json['data']['rows']) #### END_OF_INJECTED_CODE #### """ def _inject_query_to_source_code(source_code: str) -> str: """ Injects the query_data function into the source code if it is not already present. """ if _QUERY_DATA_FUNCTION_CODE in source_code: return source_code if '### INJECTED_CODE ###' in source_code and '### END_OF_INJECTED_CODE ###' in source_code: # get the first and the last part before and after generated code and inject the query_data function imports = source_code.split('### INJECTED_CODE ###')[0] source_code = source_code.split('### INJECTED_CODE ###')[1].split('### END_OF_INJECTED_CODE ###')[1] return imports + '\n\n' + _QUERY_DATA_FUNCTION_CODE + '\n\n' + source_code elif '{QUERY_DATA_FUNCTION}' in source_code: return source_code.replace('{QUERY_DATA_FUNCTION}', _QUERY_DATA_FUNCTION_CODE) else: return _QUERY_DATA_FUNCTION_CODE + '\n\n' + source_code def _get_secrets(client: KeboolaClient, workspace_id: str) -> dict[str, Any]: """ Generates secrets for the data app for querying the tables in the given wokrspace using the query_data endpoint. :param client: The Keboola client :param workspace_id: The ID of the workspace :return: The secrets """ return { 'WORKSPACE_ID': workspace_id, 'BRANCH_ID': client.branch_id or 'default', }

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/keboola/keboola-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server