Skip to main content
Glama

DolphinScheduler MCP Server

by ocean-zhc
tools.py.bak10.6 kB
"""Tool implementations for DolphinScheduler MCP Server.""" import json import logging from typing import Any, Dict, List, Optional, Union from modelcontextprotocol.errors import ToolError from modelcontextprotocol.tool import Tool, schema from .client import DolphinSchedulerClient from .config import Config logger = logging.getLogger(__name__) # Connection Management Tools class GetConnectionSettings(Tool): """Tool to get current connection settings for DolphinScheduler API.""" name = "get-connection-settings" description = "Get current connection settings for DolphinScheduler API." input_schema = schema({}) async def run(self, **kwargs) -> Dict[str, Any]: """Return the current connection settings.""" config = Config() return { "url": config.api_url, "has_api_key": config.has_api_key(), } class UpdateConnectionSettings(Tool): """Tool to update connection settings for DolphinScheduler API.""" name = "update-connection-settings" description = "Update connection settings for DolphinScheduler API." input_schema = schema( { "url": { "type": "string", "description": "API URL for DolphinScheduler", "optional": True, }, "api_key": { "type": "string", "description": "API key for authentication", "optional": True, }, } ) async def run(self, url: Optional[str] = None, api_key: Optional[str] = None) -> Dict[str, Any]: """Update connection settings.""" config = Config() if url is not None: config.api_url = url if api_key is not None: config.api_key = api_key return { "success": True, "url": config.api_url, "has_api_key": config.has_api_key(), } # Project Management Tools class GetProjects(Tool): """Tool to get all projects in DolphinScheduler.""" name = "get-projects" description = "Get list of all projects in DolphinScheduler." input_schema = schema({}) async def run(self, **kwargs) -> Dict[str, Any]: """Get all projects.""" client = DolphinSchedulerClient() try: result = await client.get_projects() return result finally: await client.close() class GetProject(Tool): """Tool to get details of a specific project in DolphinScheduler.""" name = "get-project" description = "Get details of a specific project in DolphinScheduler." input_schema = schema( { "project_id": { "type": "integer", "description": "ID of the project to retrieve", } } ) async def run(self, project_id: int) -> Dict[str, Any]: """Get project details by ID.""" client = DolphinSchedulerClient() try: result = await client.get_project(project_id) return result finally: await client.close() # Workflow Management Tools class GetWorkflows(Tool): """Tool to get workflows for a specific project.""" name = "get-workflows" description = "Get list of workflows for a specific project." input_schema = schema( { "project_id": { "type": "integer", "description": "ID of the project", } } ) async def run(self, project_id: int) -> Dict[str, Any]: """Get workflows for a project.""" client = DolphinSchedulerClient() try: result = await client.get_workflows(project_id) return result finally: await client.close() class GetWorkflow(Tool): """Tool to get details of a specific workflow.""" name = "get-workflow" description = "Get details of a specific workflow." input_schema = schema( { "project_id": { "type": "integer", "description": "ID of the project", }, "workflow_id": { "type": "integer", "description": "ID of the workflow", }, } ) async def run(self, project_id: int, workflow_id: int) -> Dict[str, Any]: """Get workflow details by ID.""" client = DolphinSchedulerClient() try: result = await client.get_workflow(project_id, workflow_id) return result finally: await client.close() # Workflow Instance Management Tools class GetWorkflowInstances(Tool): """Tool to get workflow instances for a project.""" name = "get-workflow-instances" description = "Get list of workflow instances for a specific project." input_schema = schema( { "project_id": { "type": "integer", "description": "ID of the project", } } ) async def run(self, project_id: int) -> Dict[str, Any]: """Get workflow instances for a project.""" client = DolphinSchedulerClient() try: result = await client.get_workflow_instances(project_id) return result finally: await client.close() class GetWorkflowInstance(Tool): """Tool to get details of a specific workflow instance.""" name = "get-workflow-instance" description = "Get details of a specific workflow instance." input_schema = schema( { "project_id": { "type": "integer", "description": "ID of the project", }, "instance_id": { "type": "integer", "description": "ID of the workflow instance", }, } ) async def run(self, project_id: int, instance_id: int) -> Dict[str, Any]: """Get workflow instance details by ID.""" client = DolphinSchedulerClient() try: result = await client.get_workflow_instance(project_id, instance_id) return result finally: await client.close() class StartWorkflow(Tool): """Tool to start a workflow execution.""" name = "start-workflow" description = "Start a workflow execution." input_schema = schema( { "project_id": { "type": "integer", "description": "ID of the project", }, "workflow_id": { "type": "integer", "description": "ID of the workflow to execute", }, "params": { "type": "object", "description": "Workflow parameters", "optional": True, }, } ) async def run( self, project_id: int, workflow_id: int, params: Optional[Dict[str, Any]] = None ) -> Dict[str, Any]: """Start a workflow execution.""" client = DolphinSchedulerClient() try: result = await client.start_workflow(project_id, workflow_id, params) return result finally: await client.close() class StopWorkflowInstance(Tool): """Tool to stop a running workflow instance.""" name = "stop-workflow-instance" description = "Stop a running workflow instance." input_schema = schema( { "project_id": { "type": "integer", "description": "ID of the project", }, "instance_id": { "type": "integer", "description": "ID of the workflow instance to stop", }, } ) async def run(self, project_id: int, instance_id: int) -> Dict[str, Any]: """Stop a workflow instance.""" client = DolphinSchedulerClient() try: result = await client.stop_workflow_instance(project_id, instance_id) return result finally: await client.close() # Task Instance Management Tools class GetTaskInstances(Tool): """Tool to get task instances for a workflow instance.""" name = "get-task-instances" description = "Get task instances for a workflow instance." input_schema = schema( { "project_id": { "type": "integer", "description": "ID of the project", }, "instance_id": { "type": "integer", "description": "ID of the workflow instance", }, } ) async def run(self, project_id: int, instance_id: int) -> Dict[str, Any]: """Get task instances for a workflow instance.""" client = DolphinSchedulerClient() try: result = await client.get_task_instances(project_id, instance_id) return result finally: await client.close() # System Status Tools class GetSystemStatus(Tool): """Tool to get overall system status.""" name = "get-system-status" description = "Get overall system status including master and worker nodes." input_schema = schema({}) async def run(self, **kwargs) -> Dict[str, Any]: """Get system status.""" client = DolphinSchedulerClient() try: result = await client.get_system_status() return result finally: await client.close() # Resource Management Tools class GetResources(Tool): """Tool to get resource list.""" name = "get-resources" description = "Get list of resources (files and directories)." input_schema = schema({}) async def run(self, **kwargs) -> Dict[str, Any]: """Get resources.""" client = DolphinSchedulerClient() try: result = await client.get_resources() return result finally: await client.close() # List of all available tools TOOLS = [ # Connection Management GetConnectionSettings(), UpdateConnectionSettings(), # Project Management GetProjects(), GetProject(), # Workflow Management GetWorkflows(), GetWorkflow(), # Workflow Instance Management GetWorkflowInstances(), GetWorkflowInstance(), StartWorkflow(), StopWorkflowInstance(), # Task Instance Management GetTaskInstances(), # System Status GetSystemStatus(), # Resource Management GetResources(), ]

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/ocean-zhc/dolphinscheduler-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server