Skip to main content
Glama

Keboola Explorer MCP Server

model.py18 kB
""" Flow models for Keboola MCP server. """ from datetime import datetime from typing import Any, Literal, Optional, Union from pydantic import AliasChoices, BaseModel, Field from keboola_mcp_server.clients.client import ORCHESTRATOR_COMPONENT_ID, FlowType from keboola_mcp_server.clients.storage import APIFlowResponse from keboola_mcp_server.links import Link # ============================================================================= # RESPONSE MODELS # ============================================================================= class ListFlowsOutput(BaseModel): """Output of list_flows tool.""" flows: list['FlowSummary'] = Field(description='The retrieved flow configurations.') links: list[Link] = Field(description='The list of links relevant to the flows.') class FlowToolOutput(BaseModel): """ Standard response model for flow tool operations. :param configuration_id: The configuration ID of the flow. :param component_id: The component ID of the flow. :param description: The description of the Flow. :param timestamp: The timestamp of the operation. :param success: Indicates if the operation succeeded. :param links: The links relevant to the flow. :param version: The version number of the flow configuration. """ configuration_id: str = Field(description='The configuration ID of the flow.') component_id: str = Field(description='The ID of the component.') description: str = Field(description='The description of the Flow.') version: int = Field(description='The version number of the flow configuration.') timestamp: datetime = Field(description='The timestamp of the operation.') success: bool = Field(default=True, description='Indicates if the operation succeeded.') links: list[Link] = Field(description='The links relevant to the flow.') # ============================================================================= # LEGACY ORCHESTRATOR FLOW MODELS # ============================================================================= class FlowPhase(BaseModel): """Represents a phase in a legacy flow configuration.""" id: int | str = Field(description='Unique identifier of the phase') name: str = Field(description='Name of the phase', min_length=1) description: str = Field(default_factory=str, description='Description of the phase') depends_on: list[int | str] = Field( default_factory=list, description='List of phase IDs this phase depends on', validation_alias=AliasChoices('depends_on', 'dependsOn', 'depends-on'), serialization_alias='dependsOn', ) class FlowTask(BaseModel): """Represents a task in a legacy flow configuration.""" id: int | str = Field(description='Unique identifier of the task') name: str = Field(description='Name of the task') phase: int | str = Field(description='ID of the phase this task belongs to') enabled: Optional[bool] = Field(default=True, description='Whether the task is enabled') continue_on_failure: Optional[bool] = Field( default=False, description='Whether to continue if task fails', validation_alias=AliasChoices('continue_on_failure', 'continueOnFailure', 'continue-on-failure'), serialization_alias='continueOnFailure', ) task: dict[str, Any] = Field(description='Task configuration containing componentId, configId, etc.') class FlowConfiguration(BaseModel): """Represents a complete legacy flow configuration.""" phases: list[FlowPhase] = Field(description='List of phases in the flow') tasks: list[FlowTask] = Field(description='List of tasks in the flow') # ============================================================================= # CONDITIONAL FLOW MODELS - RETRY CONFIGURATION # NOTE: These will be removed in future iterations once we fetch the shcema from AI service # ============================================================================= class RetryStrategyParams(BaseModel): """Retry strategy parameters configuration.""" max_retries: int = Field(default=3, description='Maximum number of retry attempts', alias='maxRetries') delay: int = Field(default=10, description='Delay in seconds between retry attempts') class RetryOnCondition(BaseModel): """Retry condition configuration.""" type: Literal['errorMessageContains', 'errorMessageExact'] = Field(description='Type of retry condition') value: str = Field(description='Value to match for retry condition') class RetryConfiguration(BaseModel): """Retry configuration for tasks and phases.""" strategy: Literal['linear'] = Field(default='linear', description='Retry strategy') strategy_params: RetryStrategyParams = Field( default_factory=RetryStrategyParams, description='Strategy parameters', alias='strategyParams' ) retry_on: Optional[list[RetryOnCondition]] = Field( default=None, description='Conditions that trigger retry', alias='retryOn' ) # ============================================================================= # CONDITIONAL FLOW MODELS - CONDITIONS # ============================================================================= class TaskCondition(BaseModel): """Task-based condition for flow transitions.""" type: Literal['task'] = Field(description='Condition type') task: str = Field(description='ID of the task to evaluate, or "*" when used with phase operators') value: Literal[ 'taskId', 'phaseId', 'status', 'job.id', 'job.componentId', 'job.configId', 'job.status', 'job.result', 'job.startTime', 'job.endTime', 'job.duration', 'job.result.output.tables', 'job.result.message', ] = Field(description='Property path to retrieve from the task context') class PhaseCondition(BaseModel): """Phase-based condition for flow transitions.""" type: Literal['phase'] = Field(description='Condition type') phase: str = Field(description='ID of the phase to evaluate') value: Literal['phaseId', 'status'] = Field(description='Property to retrieve from the phase') class ConstantCondition(BaseModel): """Constant value condition.""" type: Literal['const', 'constant'] = Field(description='Condition type') value: Union[str, int, bool, list] = Field(description='Constant value') class VariableCondition(BaseModel): """Variable-based condition.""" type: Literal['variable'] = Field(description='Condition type') value: str = Field(description='The name of the variable to evaluate') class OperatorCondition(BaseModel): """Operator-based condition with operands.""" type: Literal['operator'] = Field(description='Condition type') operator: Literal['AND', 'OR', 'EQUALS', 'NOT_EQUALS', 'GREATER_THAN', 'LESS_THAN', 'INCLUDES', 'CONTAINS'] = Field( description='Operator type' ) operands: list['ConditionObject'] = Field(description='List of operand conditions') class PhaseOperatorCondition(BaseModel): """Phase-specific operator condition.""" type: Literal['operator'] = Field(description='Condition type') operator: Literal['ALL_TASKS_IN_PHASE', 'ANY_TASKS_IN_PHASE'] = Field(description='Phase operator type') phase: str = Field(description='ID of the phase to check') operands: list['OperatorCondition'] = Field(description='List of operand conditions') class FunctionCondition(BaseModel): """Function-based condition.""" type: Literal['function'] = Field(description='Condition type') function: Literal['COUNT', 'DATE'] = Field(description='Function type') operands: list['VariableSourceObject'] = Field(description='List of operand conditions') class ArrayCondition(BaseModel): """Array-based condition.""" type: Literal['array'] = Field(description='Condition type') operands: list['VariableSourceObject'] = Field(description='List of operand conditions') # Union type for all condition types ConditionObject = Union[ TaskCondition, PhaseCondition, ConstantCondition, VariableCondition, OperatorCondition, PhaseOperatorCondition, FunctionCondition, ArrayCondition, ] # ============================================================================= # CONDITIONAL FLOW MODELS - TASK CONFIGURATIONS # ============================================================================= class JobTaskConfiguration(BaseModel): """Job task configuration.""" type: Literal['job'] = Field(description='Task type') component_id: str = Field(description='Component ID', alias='componentId') config_id: Optional[str] = Field(default=None, description='Configuration ID', alias='configId') config_data: Optional[dict[str, Any]] = Field(default=None, description='Configuration data', alias='configData') mode: Literal['run'] = Field(description='Execution mode') delay: Optional[Union[str, int]] = Field(default=None, description='Initial delay in seconds') retry: Optional[RetryConfiguration] = Field(default=None, description='Retry configuration') class NotificationRecipient(BaseModel): """Notification recipient configuration.""" channel: Literal['email', 'webhook'] = Field(description='Channel type') address: str = Field(description='Recipient address (email or webhook URL)') class NotificationTaskConfiguration(BaseModel): """Notification task configuration.""" type: Literal['notification'] = Field(description='Task type') recipients: list[NotificationRecipient] = Field(description='List of notification recipients', min_length=1) title: str = Field(description='Notification title') message: Optional[str] = Field(default=None, description='Notification message') # Variable source object (limited subset of conditions) VariableSourceObject = Union[ ConstantCondition, PhaseCondition, TaskCondition, VariableCondition, FunctionCondition, ArrayCondition ] class VariableTaskConfiguration(BaseModel): """Variable task configuration.""" type: Literal['variable'] = Field(description='Task type') name: str = Field(description='Variable name') value: Optional[str] = Field(default=None, description='Variable value') source: Optional[VariableSourceObject] = Field(default=None, description='Variable source') TaskConfiguration = Union[JobTaskConfiguration, NotificationTaskConfiguration, VariableTaskConfiguration] # ============================================================================= # CONDITIONAL FLOW MODELS - CORE STRUCTURES # ============================================================================= class ConditionalFlowTransition(BaseModel): """Transition model with structured conditions.""" id: str = Field(description='Unique identifier of the transition') name: Optional[str] = Field(default=None, description='Optional descriptive name for the transition') condition: Optional[ConditionObject] = Field(default=None, description='Structured condition for this transition') goto: str | None = Field(description='Target phase ID to transition to, or null to end the flow') class ConditionalFlowTask(BaseModel): """Task model with structured configuration.""" id: str = Field(description='Unique identifier of the task (must be string)') name: str = Field(description='Name of the task') phase: str = Field(description='ID of the phase this task belongs to (must be string)') enabled: Optional[bool] = Field(default=True, description='Whether the task is enabled') task: TaskConfiguration = Field(description='Structured task configuration') class ConditionalFlowPhase(BaseModel): """Phase model with structured retry configuration.""" id: str = Field(description='Unique identifier of the phase (must be string)') name: str = Field(description='Name of the phase', min_length=1) description: Optional[str] = Field(default=None, description='Description of the phase') retry: Optional[RetryConfiguration] = Field( default=None, description='Retry configuration for all tasks in this phase' ) next: Optional[list[ConditionalFlowTransition]] = Field( default_factory=list, description='Array of transitions to other phases' ) class ConditionalFlowConfiguration(BaseModel): """Represents a complete legacy flow configuration.""" phases: list[ConditionalFlowPhase] = Field(description='List of phases in the flow') tasks: list[ConditionalFlowTask] = Field(description='List of tasks in the flow') # ============================================================================= # DOMAIN MODELS # ============================================================================= class Flow(BaseModel): """Complete flow configuration with all data.""" component_id: FlowType = Field(description='The ID of the component (keboola.orchestrator/keboola.flow)') configuration_id: str = Field(description='The ID of this flow configuration') name: str = Field(description='The name of the flow configuration') description: Optional[str] = Field(default=None, description='The description of the flow configuration') version: int = Field(description='The version of the flow configuration') is_disabled: bool = Field(default=False, description='Whether the flow configuration is disabled') is_deleted: bool = Field(default=False, description='Whether the flow configuration is deleted') configuration: FlowConfiguration | ConditionalFlowConfiguration = Field( description='The flow configuration containing phases and tasks' ) change_description: Optional[str] = Field(default=None, description='The description of the latest changes') configuration_metadata: list[dict[str, Any]] = Field( default_factory=list, description='Flow configuration metadata including MCP tracking' ) created: Optional[str] = Field(None, description='Creation timestamp') updated: Optional[str] = Field(None, description='Last update timestamp') links: list[Link] = Field(default_factory=list, description='MCP-specific links for UI navigation') @classmethod def from_api_response( cls, api_config: APIFlowResponse, flow_component_id: FlowType, links: Optional[list[Link]] = None ) -> 'Flow': """ Create a Flow domain model from an APIFlowResponse. :param api_config: The APIFlowResponse instance. :param flow_component_id: The component ID of the flow. :param links: Optional list of navigation links. :return: Flow domain model. """ is_legacy = flow_component_id == ORCHESTRATOR_COMPONENT_ID if is_legacy: phases = [FlowPhase.model_validate(p) for p in api_config.configuration.get('phases', [])] tasks = [FlowTask.model_validate(t) for t in api_config.configuration.get('tasks', [])] config = FlowConfiguration(phases=phases, tasks=tasks) else: phases = [ConditionalFlowPhase.model_validate(p) for p in api_config.configuration.get('phases', [])] tasks = [ConditionalFlowTask.model_validate(p) for p in api_config.configuration.get('tasks', [])] config = ConditionalFlowConfiguration(phases=phases, tasks=tasks) return cls.model_construct( component_id=flow_component_id, configuration_id=api_config.configuration_id, name=api_config.name, description=api_config.description, version=api_config.version, is_disabled=api_config.is_disabled, is_deleted=api_config.is_deleted, configuration=config, change_description=api_config.change_description, configuration_metadata=api_config.metadata, created=api_config.created, updated=api_config.updated, links=links or [], ) class FlowSummary(BaseModel): """Lightweight flow configuration for list operations.""" component_id: FlowType = Field(description='The ID of the component (keboola.orchestrator/keboola.flow)') configuration_id: str = Field(description='The ID of this flow configuration') name: str = Field(description='The name of the flow configuration') description: Optional[str] = Field(default=None, description='The description of the flow configuration') version: int = Field(description='The version of the flow configuration') is_disabled: bool = Field(default=False, description='Whether the flow configuration is disabled') is_deleted: bool = Field(default=False, description='Whether the flow configuration is deleted') phases_count: int = Field(description='Number of phases in the flow') tasks_count: int = Field(description='Number of tasks in the flow') created: Optional[str] = Field(None, description='Creation timestamp') updated: Optional[str] = Field(None, description='Last update timestamp') @classmethod def from_api_response(cls, api_config: APIFlowResponse, flow_component_id: FlowType) -> 'FlowSummary': """ Create a FlowSummary domain model from an APIFlowResponse. :param api_config: The APIFlowResponse instance. :param flow_component_id: The component ID of the flow. :return: FlowSummary domain model. """ config = getattr(api_config, 'configuration', {}) or {} return cls.model_construct( component_id=flow_component_id, configuration_id=api_config.configuration_id, name=api_config.name, description=api_config.description, version=api_config.version, is_disabled=api_config.is_disabled, is_deleted=api_config.is_deleted, phases_count=len(config.get('phases', [])), tasks_count=len(config.get('tasks', [])), created=api_config.created, updated=api_config.updated, )

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/keboola/keboola-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server