Skip to main content
Glama

DolphinScheduler MCP Server

by ocean-zhc
client.py8.97 kB
"""DolphinScheduler API Client for MCP tools.""" import os import json import logging from typing import Dict, Any, Optional, Union, List, Tuple, BinaryIO import aiohttp from aiohttp import FormData from .config import Config class DolphinSchedulerClient: """Client for interacting with DolphinScheduler REST API.""" def __init__(self, api_url: Optional[str] = None, api_key: Optional[str] = None): """Initialize the client. Args: api_url: API URL for DolphinScheduler api_key: API key for authentication """ self.config = Config() if api_url: self.config.api_url = api_url if api_key: self.config.api_key = api_key self.session: Optional[aiohttp.ClientSession] = None self.logger = logging.getLogger("dolphinscheduler_mcp.client") async def _ensure_session(self) -> aiohttp.ClientSession: """Ensure that a session exists. Returns: An aiohttp ClientSession. """ if self.session is None: self.session = aiohttp.ClientSession() return self.session async def close(self) -> None: """Close the session.""" if self.session: await self.session.close() self.session = None async def request( self, method: str, path: str, params: Optional[Dict[str, Any]] = None, json_data: Optional[Dict[str, Any]] = None ) -> Dict[str, Any]: """Make a request to the DolphinScheduler API. Args: method: HTTP method (GET, POST, PUT, DELETE) path: API path params: Query parameters json_data: JSON data for the request body Returns: Response from the API """ session = await self._ensure_session() # Build the full URL url = f"{self.config.api_url.rstrip('/')}/{path.lstrip('/')}" # Build headers headers = {} if self.config.has_api_key(): headers["Token"] = self.config.api_key # Make the request async with session.request(method, url, params=params, json=json_data, headers=headers) as response: # Raise an exception for 4XX/5XX responses response.raise_for_status() # Parse the response return await response.json() async def request_with_files( self, method: str, endpoint: str, params: Optional[Dict[str, Any]] = None, headers: Optional[Dict[str, str]] = None, files: Optional[Dict[str, Tuple[str, Union[str, bytes]]]] = None, ) -> Dict[str, Any]: """ Send a multipart request with files to the DolphinScheduler API. Args: method: HTTP method (typically POST) endpoint: API endpoint params: Form parameters headers: Request headers files: Dictionary of files to upload {field_name: (filename, content)} Returns: Response JSON """ await self._ensure_session() url = f"{self.config.api_url.rstrip('/')}/{endpoint.lstrip('/')}" if not headers: headers = {} # Add token if available if self.config.has_api_key(): headers["Token"] = self.config.api_key try: # Create form data form = FormData() # Add files to form data if files: for field_name, (filename, content) in files.items(): form.add_field(field_name, content, filename=filename) # Add other parameters to form data if params: for key, value in params.items(): if isinstance(value, (dict, list)): form.add_field(key, json.dumps(value)) else: form.add_field(key, str(value)) self.logger.debug(f"Sending multipart {method} request to {url}") async with self.session.request( method=method, url=url, headers=headers, data=form ) as response: result = await response.json() if response.status >= 400: self.logger.error(f"API error: {response.status} - {result}") return { "success": False, "error": f"API error: {response.status}", "data": result } return result except Exception as e: self.logger.exception(f"Error in API request to {url}: {str(e)}") return { "success": False, "error": f"Request error: {str(e)}" } async def request_raw( self, method: str, endpoint: str, params: Optional[Dict[str, Any]] = None, headers: Optional[Dict[str, str]] = None, ) -> bytes: """ Send a request to the DolphinScheduler API and return raw bytes response. Useful for downloading files. Args: method: HTTP method (typically GET) endpoint: API endpoint params: URL parameters headers: Request headers Returns: Raw response bytes """ await self._ensure_session() url = f"{self.config.api_url.rstrip('/')}/{endpoint.lstrip('/')}" if not headers: headers = {} # Add token if available if self.config.has_api_key(): headers["Token"] = self.config.api_key try: self.logger.debug(f"Sending {method} request to {url} for raw response") async with self.session.request( method=method, url=url, params=params, headers=headers ) as response: if response.status >= 400: error_text = await response.text() self.logger.error(f"API error: {response.status} - {error_text}") raise Exception(f"API error: {response.status} - {error_text}") return await response.read() except Exception as e: self.logger.exception(f"Error in API request to {url}: {str(e)}") raise # Project management methods async def get_projects(self, search_val: Optional[str] = None) -> Dict[str, Any]: """Get a list of projects. Args: search_val: Optional search value to filter projects by name Returns: Response from the API """ params = {} if search_val: params["searchVal"] = search_val return await self.request("GET", "projects", params=params) async def get_project(self, project_code: int) -> Dict[str, Any]: """Get project details. Args: project_code: Project code Returns: Response from the API """ return await self.request("GET", f"projects/{project_code}") async def create_project(self, name: str, description: str = "") -> Dict[str, Any]: """Create a new project. Args: name: Project name description: Project description Returns: Response from the API """ data = { "projectName": name } if description: data["description"] = description return await self.request("POST", "projects", json_data=data) async def update_project(self, project_code: int, name: str, description: str = "") -> Dict[str, Any]: """Update a project. Args: project_code: Project code name: Project name description: Project description Returns: Response from the API """ params = { "projectName": name } if description: params["description"] = description return await self.request("PUT", f"projects/{project_code}", params=params) async def delete_project(self, project_code: int) -> Dict[str, Any]: """Delete a project. Args: project_code: Project code Returns: Response from the API """ return await self.request("DELETE", f"projects/{project_code}")

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/ocean-zhc/dolphinscheduler-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server