mcp_main.pyโข14.9 kB
"""
Dynamic MCP server that loads API version-specific tools based on AIRFLOW_API_VERSION.
- Airflow API v1 Documents: https://airflow.apache.org/docs/apache-airflow/2.0.0/stable-rest-api-ref.html
- Airflow API v2 Documents: https://airflow.apache.org/docs/apache-airflow/stable/stable-rest-api-ref.html
"""
import argparse
import asyncio
import logging
from typing import Any, Dict, List, Optional
from fastmcp import FastMCP
# Try to import StaticTokenVerifier, fallback if not available
try:
from fastmcp.server.auth import StaticTokenVerifier
HAS_AUTH_SUPPORT = True
except ImportError:
StaticTokenVerifier = None
HAS_AUTH_SUPPORT = False
import os
import argparse
import logging
from mcp_airflow_api.functions import get_api_version
# Setup logging
logger = logging.getLogger(__name__)
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
# =============================================================================
# Authentication Setup
# =============================================================================
def create_mcp_instance(auth_enable: bool = False, secret_key: str = "") -> FastMCP:
"""Create FastMCP instance with optional authentication."""
if auth_enable and secret_key:
if not HAS_AUTH_SUPPORT:
logger.warning("Bearer token authentication requested but StaticTokenVerifier not available")
logger.warning("Creating MCP instance without authentication")
return FastMCP("mcp-airflow-api")
# Simple token-based authentication using StaticTokenVerifier
# This is much simpler than JWT with RSA keys
logger.info("Creating MCP instance with Bearer token authentication")
# Create token configuration
# The key is the token, the value contains metadata about the token
tokens = {
secret_key: {
"client_id": "airflow-api-client",
"user": "admin",
"scopes": ["read", "write"],
"description": "Airflow API access token"
}
}
try:
auth = StaticTokenVerifier(tokens=tokens)
return FastMCP("mcp-airflow-api", auth=auth)
except Exception as e:
logger.warning(f"Failed to create StaticTokenVerifier: {e}")
logger.warning("Creating MCP instance without authentication")
return FastMCP("mcp-airflow-api")
else:
logger.info("Creating MCP instance without authentication")
return FastMCP("mcp-airflow-api")
# Initialize with default (no auth) - will be recreated in main() if needed
mcp = FastMCP("mcp-airflow-api")
def register_prompts(mcp, api_version: str):
"""Register prompt templates for the MCP server."""
@mcp.prompt()
async def airflow_cluster_monitoring(dag_name: Optional[str] = None, time_range: Optional[str] = "today") -> str:
"""Comprehensive Airflow cluster monitoring assistant.
Args:
dag_name: Specific DAG to focus on (optional)
time_range: Time range for analysis (default: "today")
"""
# Read the prompt template
import os
template_path = os.path.join(os.path.dirname(__file__), "prompt_template.md")
try:
with open(template_path, 'r', encoding='utf-8') as f:
template_content = f.read()
except FileNotFoundError:
template_content = "# MCP Airflow API Prompt Template\n\nTemplate file not found."
context = f"""
{template_content}
## Current Session Context
**API Version**: {api_version}
**Target DAG**: {dag_name or "All DAGs"}
**Time Range**: {time_range}
**Available Tools**: {"45 tools (43 common + 2 asset tools)" if api_version == "v2" else "43 tools"}
## Session Instructions
You are an expert Airflow cluster monitoring assistant. Use the available MCP tools to:
1. **Monitor cluster health and performance**
2. **Analyze DAG execution patterns**
3. **Investigate failures and bottlenecks**
4. **Provide actionable insights**
Always start by checking cluster health with `get_health` and version with `get_version`.
For DAG analysis, use `list_dags`, `running_dags`, and `failed_dags` to get an overview.
"""
if api_version == "v2":
context += """
## V2 Enhanced Features Available
- **Asset Management**: Use `list_assets` and `list_asset_events` for data-aware scheduling
- **Enhanced DAG Analysis**: All tools support improved filtering and metadata
- **JWT Authentication**: Automatic token management for Airflow 3.0+
"""
return context
@mcp.prompt()
async def airflow_troubleshooting(issue_type: Optional[str] = "general", severity: Optional[str] = "medium") -> str:
"""Specialized Airflow troubleshooting assistant.
Args:
issue_type: Type of issue (failed_tasks, slow_dags, resource_issues, general)
severity: Issue severity (low, medium, high, critical)
"""
context = f"""
# Airflow Troubleshooting Assistant
**API Version**: {api_version}
**Issue Type**: {issue_type}
**Severity**: {severity}
## Troubleshooting Workflow
### 1. Initial Assessment
- Check cluster health: `get_health`
- Review system status: `get_version`
- Identify failed DAGs: `failed_dags`
### 2. Issue-Specific Analysis
**Failed Tasks**: Use `list_task_instances_all` with state="failed"
**Slow DAGs**: Use `dag_run_duration` and `dag_task_duration`
**Resource Issues**: Check `list_pools` and pool utilization
**Import Errors**: Use `list_import_errors` and `all_dag_import_summary`
### 3. Deep Dive Investigation
- Examine task logs: `get_task_instance_logs`
- Review task details: `get_task_instance_details`
- Check XCom data: `list_xcom_entries`
- Analyze event logs: `list_event_logs`
### 4. Monitoring and Prevention
- Set up regular health checks
- Monitor resource utilization trends
- Review configuration settings
"""
if issue_type == "failed_tasks":
context += """
## Failed Tasks Investigation
1. `failed_dags` - Get overview of failed DAG runs
2. `list_task_instances_all(state="failed")` - List all failed tasks
3. `get_task_instance_logs(dag_id, dag_run_id, task_id)` - Check error logs
4. `get_task_instance_details(dag_id, dag_run_id, task_id)` - Get task metadata
"""
elif issue_type == "slow_dags":
context += """
## Performance Analysis
1. `dag_run_duration(dag_id)` - Get runtime statistics
2. `dag_task_duration(dag_id, run_id)` - Identify slow tasks
3. `list_task_instances_all(duration_gte=300)` - Find long-running tasks
4. `list_pools` - Check resource allocation
"""
return context
@mcp.prompt()
async def airflow_dag_analysis(analysis_type: Optional[str] = "overview", dag_pattern: Optional[str] = None) -> str:
"""DAG analysis and optimization assistant.
Args:
analysis_type: Type of analysis (overview, performance, dependencies, configuration)
dag_pattern: Pattern to filter DAGs (optional)
"""
context = f"""
# DAG Analysis Assistant
**API Version**: {api_version}
**Analysis Type**: {analysis_type}
**DAG Pattern**: {dag_pattern or "All DAGs"}
## Analysis Workflows
### Overview Analysis
1. `list_dags` - Get DAG inventory
2. `get_dags_detailed_batch(fetch_all=True)` - Comprehensive DAG details
3. `running_dags` and `failed_dags` - Current status
### Performance Analysis
1. `dag_run_duration(dag_id)` - Runtime trends
2. `dag_task_duration(dag_id, run_id)` - Task-level performance
3. `list_task_instances_all` - Task execution patterns
### Dependencies Analysis
1. `dag_graph(dag_id)` - Task dependency visualization
2. `list_tasks(dag_id)` - Task configuration details
3. `dag_code(dag_id)` - Source code review
"""
if analysis_type == "performance":
context += """
## Performance Optimization Focus
- Identify bottleneck tasks with high duration
- Check resource utilization patterns
- Analyze failure rates and retry patterns
- Review scheduling efficiency
"""
if api_version == "v2" and analysis_type == "dependencies":
context += """
## V2 Enhanced Dependencies (Data-Aware Scheduling)
- `list_assets` - Show data assets and dependencies
- `list_asset_events` - Track data lineage and updates
- Enhanced DAG metadata with asset relationships
"""
return context
def create_mcp_server() -> FastMCP:
"""Create and configure the MCP server with tools based on API version."""
api_version = get_api_version()
# Note: Initial server creation without auth - will be recreated in main() if needed
mcp_instance = FastMCP("mcp-airflow-api")
logger.info(f"Initializing MCP server for Airflow API {api_version}")
# Register prompt templates
register_prompts(mcp_instance, api_version)
if api_version == "v1":
logger.info("Loading Airflow API v1 tools (Airflow 2.x)")
from mcp_airflow_api.tools import v1_tools
v1_tools.register_tools(mcp_instance)
elif api_version == "v2":
logger.info("Loading Airflow API v2 tools (Airflow 3.0+)")
from mcp_airflow_api.tools import v2_tools
v2_tools.register_tools(mcp_instance)
else:
raise ValueError(f"Unsupported API version: {api_version}. Use 'v1' or 'v2'")
logger.info(f"MCP server initialized with API version {api_version}")
return mcp_instance
# Create the MCP server instance
mcp = create_mcp_server()
def main(argv: Optional[List[str]] = None):
"""Entrypoint for MCP Airflow API server.
Supports optional CLI arguments (e.g. --log-level DEBUG) while remaining
backward-compatible with stdio launcher expectations.
"""
parser = argparse.ArgumentParser(prog="mcp-airflow-api", description="MCP Airflow API Server")
parser.add_argument(
"--log-level",
dest="log_level",
help="Logging level override (DEBUG, INFO, WARNING, ERROR, CRITICAL). Overrides MCP_LOG_LEVEL env if provided.",
choices=["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"],
)
parser.add_argument(
"--type",
dest="transport_type",
help="Transport type (stdio or streamable-http). Default: stdio",
choices=["stdio", "streamable-http"],
)
parser.add_argument(
"--host",
dest="host",
help="Host address for streamable-http transport. Default: 127.0.0.1",
)
parser.add_argument(
"--port",
dest="port",
type=int,
help="Port number for streamable-http transport. Default: 8000",
)
parser.add_argument(
"--auth-enable",
dest="auth_enable",
action="store_true",
help="Enable Bearer token authentication for streamable-http mode. Default: False",
)
parser.add_argument(
"--secret-key",
dest="secret_key",
help="Secret key for Bearer token authentication. Required when auth is enabled.",
)
# Allow future extension without breaking unknown args usage
args = parser.parse_args(argv)
# Determine log level: CLI arg > environment variable > default
log_level = args.log_level or os.getenv("MCP_LOG_LEVEL", "INFO")
# Set logging level
logging.getLogger().setLevel(log_level)
logger.setLevel(log_level)
logging.getLogger("aiohttp.client").setLevel("WARNING") # reduce noise at DEBUG
if args.log_level:
logger.info("Log level set via CLI to %s", args.log_level)
elif os.getenv("MCP_LOG_LEVEL"):
logger.info("Log level set via environment variable to %s", log_level)
else:
logger.info("Using default log level: %s", log_level)
# ์ฐ์ ์์: ์คํ์ต์
> ํ๊ฒฝ๋ณ์ > ๊ธฐ๋ณธ๊ฐ
# Transport type ๊ฒฐ์
transport_type = args.transport_type or os.getenv("FASTMCP_TYPE", "stdio")
# Host ๊ฒฐ์
host = args.host or os.getenv("FASTMCP_HOST", "127.0.0.1")
# Port ๊ฒฐ์ (๊ฐ๊ฒฐํ๊ฒ)
port = args.port or int(os.getenv("FASTMCP_PORT", 8000))
# Authentication ์ค์ ๊ฒฐ์
# REMOTE_AUTH_ENABLE defaults to "false" when undefined, empty, or null
# Supported values: true/false, 1/0, yes/no, on/off (case insensitive)
auth_enable = args.auth_enable or os.getenv("REMOTE_AUTH_ENABLE", "false").lower() in ("true", "1", "yes", "on")
secret_key = args.secret_key or os.getenv("REMOTE_SECRET_KEY", "")
# Validation for streamable-http mode with authentication
if transport_type == "streamable-http":
if auth_enable:
if not HAS_AUTH_SUPPORT:
logger.error("ERROR: Bearer token authentication requested but not supported by current fastmcp version")
logger.error("Please upgrade fastmcp to a version that supports StaticTokenVerifier")
return
if not secret_key:
logger.error("ERROR: Authentication is enabled but no secret key provided.")
logger.error("Please set REMOTE_SECRET_KEY environment variable or use --secret-key argument.")
return
logger.info("Authentication enabled for streamable-http transport")
else:
logger.warning("WARNING: streamable-http mode without authentication enabled!")
logger.warning("This server will accept requests without Bearer token verification.")
logger.warning("Set REMOTE_AUTH_ENABLE=true and REMOTE_SECRET_KEY to enable authentication.")
elif auth_enable:
logger.warning("WARNING: Authentication is only supported in streamable-http mode, ignoring auth settings")
# Create MCP instance with or without authentication
global mcp
mcp = create_mcp_instance(auth_enable=auth_enable, secret_key=secret_key)
# Load tools into the authenticated instance
api_version = get_api_version()
register_prompts(mcp, api_version)
if api_version == "v1":
logger.info("Loading Airflow API v1 tools (Airflow 2.x)")
from mcp_airflow_api.tools import v1_tools
v1_tools.register_tools(mcp)
elif api_version == "v2":
logger.info("Loading Airflow API v2 tools (Airflow 3.0+)")
from mcp_airflow_api.tools import v2_tools
v2_tools.register_tools(mcp)
# Transport ๋ชจ๋์ ๋ฐ๋ฅธ ์คํ
if transport_type == "streamable-http":
logger.info(f"Starting streamable-http server on {host}:{port}")
mcp.run(transport="streamable-http", host=host, port=port)
else:
logger.info("Starting stdio transport for local usage")
mcp.run(transport='stdio')