Skip to main content
Glama

DolphinScheduler MCP Server

by ocean-zhc
config.py4.14 kB
"""Configuration for DolphinScheduler MCP.""" import os import json from pathlib import Path from typing import Optional, Dict, Any def read_mcp_settings() -> Dict[str, Any]: """Read MCP settings from the Cursor MCP settings file. Returns: A dictionary containing the MCP settings """ # Default location for the Cursor MCP settings file settings_path = os.path.expanduser("~/Library/Application Support/Cursor/User/globalStorage/rooveterinaryinc.roo-cline/settings/mcp_settings.json") if os.path.exists(settings_path): try: with open(settings_path, 'r') as f: settings = json.load(f) return settings except Exception as e: print(f"Error reading MCP settings file: {e}") return {} def get_env_from_mcp_settings() -> Dict[str, str]: """Get environment variables from MCP settings. Returns: A dictionary containing environment variables """ settings = read_mcp_settings() env_vars = {} print("Reading MCP settings:", settings.keys() if settings else "No settings found") # Look for the dolphinscheduler server config if 'mcpServers' in settings and 'dolphinscheduler' in settings['mcpServers']: server_config = settings['mcpServers']['dolphinscheduler'] print("Found dolphinscheduler server config:", server_config.keys()) if 'env' in server_config: env_vars = server_config['env'] print("Found environment variables in MCP settings:", env_vars) return env_vars class Config: """Configuration for DolphinScheduler MCP.""" _instance = None def __new__(cls): """Create a new instance of Config or return the existing one.""" if cls._instance is None: cls._instance = super(Config, cls).__new__(cls) # First, try to get env variables from MCP settings mcp_env = get_env_from_mcp_settings() # Get API URL from MCP settings, env var, or use default cls._instance._api_url = mcp_env.get( "DOLPHINSCHEDULER_API_URL", os.environ.get( "DOLPHINSCHEDULER_API_URL", "http://localhost:12345/dolphinscheduler" ) ) # Get API key from MCP settings, env var, or use default cls._instance._api_key = mcp_env.get( "DOLPHINSCHEDULER_API_KEY", os.environ.get("DOLPHINSCHEDULER_API_KEY", "") ) # Set the environment variables for other parts of the app if cls._instance._api_url: os.environ["DOLPHINSCHEDULER_API_URL"] = cls._instance._api_url if cls._instance._api_key: os.environ["DOLPHINSCHEDULER_API_KEY"] = cls._instance._api_key return cls._instance @property def api_url(self) -> str: """Get the API URL. Returns: The API URL. """ return self._api_url @api_url.setter def api_url(self, value: str) -> None: """Set the API URL. Args: value: The API URL. """ self._api_url = value # We could also update the environment variable here os.environ["DOLPHINSCHEDULER_API_URL"] = value @property def api_key(self) -> str: """Get the API key. Returns: The API key. """ return self._api_key @api_key.setter def api_key(self, value: str) -> None: """Set the API key. Args: value: The API key. """ self._api_key = value # We could also update the environment variable here os.environ["DOLPHINSCHEDULER_API_KEY"] = value def has_api_key(self) -> bool: """Check if an API key is set. Returns: True if an API key is set, False otherwise. """ return bool(self._api_key)

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/ocean-zhc/dolphinscheduler-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server