Airbyte Status Checker

import os import json import requests import time from typing import Optional, Dict, Any, List from dotenv import load_dotenv from fastmcp import FastMCP # Create a FastMCP instance mcp = FastMCP("Airbyte Connection Checker") # Load environment variables load_dotenv() # Airbyte API configuration API_BASE_URL = "https://api.airbyte.com/v1" API_KEY = os.getenv("AIRBYTE_API_KEY") WORKSPACE_ID = os.getenv("AIRBYTE_WORKSPACE_ID") CLIENT_ID = os.getenv("AIRBYTE_CLIENT_ID") CLIENT_SECRET = os.getenv("AIRBYTE_CLIENT_SECRET") if not WORKSPACE_ID or not API_KEY: raise ValueError("Missing required Airbyte credentials in .env file") # Headers for API requests HEADERS = { "Authorization": f"Bearer {API_KEY}", "Content-Type": "application/json" } # Token refresh function def refresh_airbyte_token(): """Refresh the Airbyte API token""" try: refresh_token = os.getenv('AIRBYTE_API_KEY') if not refresh_token: raise ValueError("AIRBYTE_API_KEY not found in environment variables") base_url = API_BASE_URL.rstrip('/') response = requests.post( f'{base_url}/applications/token', data={ 'grant_type': 'refresh_token', 'client_id': CLIENT_ID, 'client_secret': CLIENT_SECRET, 'refresh_token': refresh_token } ) response.raise_for_status() new_token = response.json().get('access_token') if not new_token: raise ValueError("No access token in response") # Update global API_KEY and HEADERS global API_KEY, HEADERS API_KEY = new_token HEADERS = { "Authorization": f"Bearer {API_KEY}", "Content-Type": "application/json" } return new_token except Exception as e: print(f"Failed to refresh token: {str(e)}") raise def get_connections(): """Get all connections in the workspace""" url = f"{API_BASE_URL}/connections" params = {"workspaceIds": WORKSPACE_ID} response = requests.get(url, headers=HEADERS, params=params) response.raise_for_status() return response.json().get("data", []) def check_connection_status(connection_id): """Check the status of a connection""" url = f"{API_BASE_URL}/connections/get" payload = { "connectionId": connection_id } response = requests.post(url, headers=HEADERS, json=payload) response.raise_for_status() return response.json() def get_connection_streams(connection_id): """Get the streams for a connection""" url = f"{API_BASE_URL}/connections/get" payload = { "connectionId": connection_id } response = requests.post(url, headers=HEADERS, json=payload) response.raise_for_status() connection_data = response.json() # Extract stream information from the connection data # This might need adjustment based on the actual API response structure streams = connection_data.get("syncCatalog", {}).get("streams", []) return [stream.get("stream", {}).get("name") for stream in streams if stream.get("config", {}).get("selected")] @mcp.tool() async def check_airbyte_connection(connection_name: Optional[str] = None) -> Dict[str, Any]: """ Check the status of an Airbyte connection or list all connections. Args: connection_name: Name of the connection to check. If not provided, lists all connections. Returns: A dictionary with status information. """ try: # Try to refresh the token if needed try: if CLIENT_ID and CLIENT_SECRET: refresh_airbyte_token() except Exception as e: print(f"Token refresh failed, continuing with existing token: {str(e)}") # Get all connections in the workspace connections = get_connections() if not connection_name: # If no connection name provided, return list of all connections connection_list = [ { "name": conn.get("name"), "id": conn.get("connectionId"), "status": "🟢 Active" if conn.get("status", "").lower() == "active" else "🔴 Inactive" } for conn in connections ] return { "status": "success", "message": "📋 Here's a list of all connections", "connections": connection_list } else: # Find the connection by name connection = None for conn in connections: if conn.get("name", "").lower() == connection_name.lower(): connection = conn break if not connection: return { "status": "error", "message": f"❌ Connection '{connection_name}' not found" } # Get connection details connection_id = connection.get("connectionId") connection_details = check_connection_status(connection_id) # Get streams for this connection streams = get_connection_streams(connection_id) status = connection.get("status", "") if status.lower() == "active": emoji = "✅" message = f"Connection '{connection_name}' is active" else: emoji = "❌" message = f"Connection '{connection_name}' is inactive" return { "status": status, "message": f"{emoji} {message}", "connection_name": connection_name, "connection_id": connection_id, "streams": streams, "details": connection_details } except Exception as e: return { "status": "error", "message": f"❌ Error: {str(e)}" } if __name__ == "__main__": # Initialize and run the server mcp.run(transport='stdio')