MCP Server for Apache Airflow

from typing import Any, Callable, Dict, List, Optional, Union import mcp.types as types from airflow_client.client.api.config_api import ConfigApi from src.airflow.airflow_client import api_client config_api = ConfigApi(api_client) def get_all_functions() -> list[tuple[Callable, str, str]]: return [ (get_config, "get_config", "Get current configuration"), (get_value, "get_value", "Get a specific option from configuration"), ] async def get_config( section: Optional[str] = None, ) -> List[Union[types.TextContent, types.ImageContent, types.EmbeddedResource]]: # Build parameters dictionary kwargs: Dict[str, Any] = {} if section is not None: kwargs["section"] = section response = config_api.get_config(**kwargs) return [types.TextContent(type="text", text=str(response.to_dict()))] async def get_value( section: str, option: str ) -> List[Union[types.TextContent, types.ImageContent, types.EmbeddedResource]]: response = config_api.get_value(section=section, option=option) return [types.TextContent(type="text", text=str(response.to_dict()))]