Skip to main content
Glama

Keboola Explorer MCP Server

search.py16.9 kB
import asyncio import logging import re from typing import Annotated, Any, AsyncGenerator, Iterable, Mapping, Sequence from fastmcp import Context, FastMCP from fastmcp.tools import FunctionTool from mcp.types import ToolAnnotations from pydantic import BaseModel, Field, model_validator from keboola_mcp_server.clients.ai_service import SuggestedComponent from keboola_mcp_server.clients.base import JsonDict from keboola_mcp_server.clients.client import KeboolaClient, get_metadata_property from keboola_mcp_server.clients.storage import ItemType from keboola_mcp_server.config import MetadataField from keboola_mcp_server.errors import tool_errors from keboola_mcp_server.tools.components.utils import get_nested LOG = logging.getLogger(__name__) SEARCH_TOOL_NAME = 'search' MAX_GLOBAL_SEARCH_LIMIT = 100 DEFAULT_GLOBAL_SEARCH_LIMIT = 50 SEARCH_TOOLS_TAG = 'search' ITEM_TYPE_TO_COMPONENT_TYPES: Mapping[ItemType, Sequence[str]] = { 'flow': ['other'], 'transformation': ['transformation'], 'configuration': ['extractor', 'writer'], 'configuration-row': ['extractor', 'writer'], 'workspace': ['other'], } def add_search_tools(mcp: FastMCP) -> None: """Add tools to the MCP server.""" LOG.info(f'Adding tool {find_component_id.__name__} to the MCP server.') mcp.add_tool( FunctionTool.from_function( find_component_id, annotations=ToolAnnotations(readOnlyHint=True), tags={SEARCH_TOOLS_TAG}, ) ) LOG.info(f'Adding tool {search.__name__} to the MCP server.') mcp.add_tool( FunctionTool.from_function( search, name=SEARCH_TOOL_NAME, annotations=ToolAnnotations(readOnlyHint=True), tags={SEARCH_TOOLS_TAG}, ) ) LOG.info('Search tools initialized.') class SearchHit(BaseModel): bucket_id: str | None = Field(default=None, description='The ID of the bucket.') table_id: str | None = Field(default=None, description='The ID of the table.') component_id: str | None = Field(default=None, description='The ID of the component.') configuration_id: str | None = Field(default=None, description='The ID of the configuration.') configuration_row_id: str | None = Field(default=None, description='The ID of the configuration row.') item_type: ItemType = Field(description='The type of the item (e.g. table, bucket, configuration, etc.).') updated: str = Field(description='The date and time the item was created in ISO 8601 format.') name: str | None = Field(default=None, description='Name of the item.') display_name: str | None = Field(default=None, description='Display name of the item.') description: str | None = Field(default=None, description='Description of the item.') @model_validator(mode='after') def check_id_fields(self) -> 'SearchHit': id_fields = [ self.bucket_id, self.table_id, self.component_id, self.configuration_id, self.configuration_row_id, ] if not any(field for field in id_fields if field): raise ValueError('At least one ID field must be filled.') if self.configuration_row_id and not all([self.component_id, self.configuration_id]): raise ValueError( 'If configuration_row_id is filled, ' 'both component_id and configuration_id must be filled.' ) if self.configuration_id and not self.component_id: raise ValueError('If configuration_id is filled, component_id must be filled.') return self def _matches_pattern(text: str | None, patterns: list[re.Pattern]) -> bool: """Checks if text matches any of the regex patterns.""" return text and any(pattern.search(text) for pattern in patterns) def _get_field_value(item: JsonDict, fields: Sequence[str]) -> Any | None: for field in fields: if value := get_nested(item, field): return value return None async def _fetch_buckets(client: KeboolaClient, patterns: list[re.Pattern]) -> list[SearchHit]: """Fetches and filters buckets.""" hits = [] for bucket in await client.storage_client.bucket_list(): if not (bucket_id := bucket.get('id')): continue bucket_name = bucket.get('name') bucket_display_name = bucket.get('displayName') bucket_description = get_metadata_property(bucket.get('metadata', []), MetadataField.DESCRIPTION) if ( _matches_pattern(bucket_id, patterns) or _matches_pattern(bucket_name, patterns) or _matches_pattern(bucket_display_name, patterns) or _matches_pattern(bucket_description, patterns) ): hits.append( SearchHit( bucket_id=bucket_id, item_type='bucket', updated=_get_field_value(bucket, ['lastChangeDate', 'updated', 'created']) or '', name=bucket_name, display_name=bucket_display_name, description=bucket_description, ) ) return hits async def _fetch_tables(client: KeboolaClient, patterns: list[re.Pattern]) -> list[SearchHit]: """Fetches and filters tables from all buckets.""" hits = [] for bucket in await client.storage_client.bucket_list(): if not (bucket_id := bucket.get('id')): continue tables = await client.storage_client.bucket_table_list(bucket_id) for table in tables: if not (table_id := table.get('id')): continue table_name = table.get('name') table_display_name = table.get('displayName') table_description = get_metadata_property(table.get('metadata', []), MetadataField.DESCRIPTION) if ( _matches_pattern(table_id, patterns) or _matches_pattern(table_name, patterns) or _matches_pattern(table_display_name, patterns) or _matches_pattern(table_description, patterns) ): hits.append( SearchHit( table_id=table_id, item_type='table', updated=_get_field_value(table, ['lastChangeDate', 'created']) or '', name=table_name, display_name=table_display_name, description=table_description, ) ) return hits async def _fetch_configurations( client: KeboolaClient, patterns: list[re.Pattern], item_types: Iterable[ItemType] | None = None ) -> list[SearchHit]: """Fetches and filters configurations and configuration rows from all component types.""" hits = [] component_types: set[str] = set() for item_type in item_types or []: if ctypes := ITEM_TYPE_TO_COMPONENT_TYPES.get(item_type): component_types.update(ctypes) if component_types: for component_type in component_types: async for hit in _fetch_configs(client, patterns, component_type=component_type): hits.append(hit) else: async for hit in _fetch_configs(client, patterns, component_type=None): hits.append(hit) return hits async def _fetch_configs( client: KeboolaClient, patterns: list[re.Pattern], component_type: str | None = None ) -> AsyncGenerator[SearchHit, None]: components = await client.storage_client.component_list(component_type, include=['configuration', 'rows']) for component in components: if not (component_id := component.get('id')): continue item_type: ItemType if component_id in ['keboola.orchestrator', 'keboola.flow']: item_type = 'flow' elif component_type == 'transformation': item_type = 'transformation' elif component_id == 'keboola.sandboxes': item_type = 'workspace' else: item_type = 'configuration' for config in component.get('configurations', []): if not (config_id := config.get('id')): continue config_name = config.get('name') config_description = config.get('description') config_updated = _get_field_value(config, ['currentVersion.created', 'created']) or '' if ( _matches_pattern(config_id, patterns) or _matches_pattern(config_name, patterns) or _matches_pattern(config_description, patterns) ): yield SearchHit( component_id=component_id, configuration_id=config_id, item_type=item_type, updated=config_updated, name=config_name, description=config_description, ) for row in config.get('rows', []): if not (row_id := row.get('id')): continue row_name = row.get('name') row_description = row.get('description') if ( _matches_pattern(row_id, patterns) or _matches_pattern(row_name, patterns) or _matches_pattern(row_description, patterns) ): yield SearchHit( component_id=component_id, configuration_id=config_id, configuration_row_id=row_id, item_type='configuration-row', updated=config_updated or _get_field_value(row, ['created']), name=row_name, description=row_description, ) @tool_errors() async def search( ctx: Context, patterns: Annotated[ list[str], Field( description='One or more search patterns to match against item ID, name, display name, or description. ' 'Supports regex patterns. Case-insensitive. Examples: ["customer"], ["sales", "revenue"], ' '["test.*table"]. Do not use empty strings or empty lists.' ), ], item_types: Annotated[ Sequence[ItemType], Field( description='Optional filter for specific Keboola item types. Leave empty to search all types. ' 'Common values: "table" (data tables), "bucket" (table containers), "transformation" ' '(SQL/Python transformations), "configuration" (extractor/writer configs), "flow" (orchestration flows). ' "Use when you know what type of item you're looking for." ), ] = tuple(), limit: Annotated[ int, Field( description=f'Maximum number of items to return (default: {DEFAULT_GLOBAL_SEARCH_LIMIT}, max: ' f'{MAX_GLOBAL_SEARCH_LIMIT}).' ), ] = DEFAULT_GLOBAL_SEARCH_LIMIT, offset: Annotated[int, Field(description='Number of matching items to skip for pagination (default: 0).')] = 0, ) -> list[SearchHit]: """ Searches for Keboola items (tables, buckets, configurations, transformations, flows, etc.) in the current project by matching patterns against item ID, name, display name, or description. Returns matching items grouped by type with their IDs and metadata. WHEN TO USE: - User asks to "find", "locate", or "search for" something by name - User mentions a partial name and you need to find the full item (e.g., "find the customer table") - User asks "what tables/configs/flows do I have with X in the name?" - You need to discover items before performing operations on them - User asks to "list all items with [name] in it" - DO NOT use for listing all items of a specific type. Use list_configs, list_tables, list_flows, etc instead. HOW IT WORKS: - Searches by regex pattern matching against id, name, displayName, and description fields - Case-insensitive search - Multiple patterns work as OR condition - matches items containing ANY of the patterns - Returns grouped results by item type (tables, buckets, configurations, flows, etc.) - Each result includes the item's ID, name, creation date, and relevant metadata IMPORTANT: - Always use this tool when the user mentions a name but you don't have the exact ID - The search returns IDs that you can use with other tools (e.g., get_table, get_config, get_flow) - Results are ordered by update time. The most recently updated items are returned first. - For exact ID lookups, use specific tools like get_table, get_config, get_flow instead - Use find_component_id and list_configs tools to find configurations related to a specific component USAGE EXAMPLES: - user_input: "Find all tables with 'customer' in the name" → patterns=["customer"], item_types=["table"] → Returns all tables whose id, name, displayName, or description contains "customer" - user_input: "Search for the sales transformation" → patterns=["sales"], item_types=["transformation"] → Returns transformations with "sales" in any searchable field - user_input: "Find items named 'daily report' or 'weekly summary'" → patterns=["daily.*report", "weekly.*summary"], item_types=[] → Returns all items matching any of these patterns - user_input: "Show me all configurations related to Google Analytics" → patterns=["google.*analytics"], item_types=["configuration"] → Returns configurations with matching patterns """ patterns = list(filter(None, map(str.strip, filter(None, patterns)))) if not patterns: raise ValueError('At least one search pattern must be provided.') offset = max(0, offset) if not 0 < limit <= MAX_GLOBAL_SEARCH_LIMIT: LOG.warning( f'The "limit" parameter is out of range (0, {MAX_GLOBAL_SEARCH_LIMIT}], setting to default value ' f'{DEFAULT_GLOBAL_SEARCH_LIMIT}.' ) limit = DEFAULT_GLOBAL_SEARCH_LIMIT # Compile regex patterns from patterns (case-insensitive) compiled_patterns = [re.compile(pattern, re.IGNORECASE) for pattern in patterns] # Determine which types to fetch types_to_fetch = set(item_types) if item_types else set() # Fetch items concurrently based on requested types tasks = [] all_hits: list[SearchHit] = [] client = KeboolaClient.from_state(ctx.session.state) if not types_to_fetch or 'bucket' in types_to_fetch: tasks.append(_fetch_buckets(client, compiled_patterns)) if not types_to_fetch or 'table' in types_to_fetch: tasks.append(_fetch_tables(client, compiled_patterns)) if not types_to_fetch: tasks.append(_fetch_configurations(client, compiled_patterns)) elif config_types_to_fetch := types_to_fetch & { 'configuration', 'transformation', 'flow', 'configuration-row', 'workspace', }: tasks.append(_fetch_configurations(client, compiled_patterns, item_types=config_types_to_fetch)) # Gather all results results = await asyncio.gather(*tasks, return_exceptions=True) # Process results for result in results: if isinstance(result, Exception): # TODO: report this somehow to the AI assistant LOG.warning(f'Error fetching items: {result}') continue else: all_hits.extend(result) # Filter by item_types if specified if types_to_fetch: all_hits = [item for item in all_hits if item.item_type in types_to_fetch] # TODO: Should we sort by the item type too? all_hits.sort( key=lambda x: ( x.updated, x.bucket_id or x.table_id or x.component_id or x.configuration_id or x.configuration_row_id, ), reverse=True, ) paginated_hits = all_hits[offset : offset + limit] # TODO: Should we report the total number of hits? return paginated_hits @tool_errors() async def find_component_id( ctx: Context, query: Annotated[str, Field(description='Natural language query to find the requested component.')], ) -> list[SuggestedComponent]: """ Returns list of component IDs that match the given query. USAGE: - Use when you want to find the component for a specific purpose. EXAMPLES: - user_input: `I am looking for a salesforce extractor component` - returns a list of component IDs that match the query, ordered by relevance/best match. """ client = KeboolaClient.from_state(ctx.session.state) suggestion_response = await client.ai_service_client.suggest_component(query) return suggestion_response.components

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/keboola/keboola-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server