"""Tool implementations for DolphinScheduler MCP Server."""
import json
import logging
from typing import Any, Dict, List, Optional, Union
from modelcontextprotocol.errors import ToolError
from modelcontextprotocol.tool import Tool, schema
from .client import DolphinSchedulerClient
from .config import Config
logger = logging.getLogger(__name__)
# Connection Management Tools
class GetConnectionSettings(Tool):
"""Tool to get current connection settings for DolphinScheduler API."""
name = "get-connection-settings"
description = "Get current connection settings for DolphinScheduler API."
input_schema = schema({})
async def run(self, **kwargs) -> Dict[str, Any]:
"""Return the current connection settings."""
config = Config()
return {
"url": config.api_url,
"has_api_key": config.has_api_key(),
}
class UpdateConnectionSettings(Tool):
"""Tool to update connection settings for DolphinScheduler API."""
name = "update-connection-settings"
description = "Update connection settings for DolphinScheduler API."
input_schema = schema(
{
"url": {
"type": "string",
"description": "API URL for DolphinScheduler",
"optional": True,
},
"api_key": {
"type": "string",
"description": "API key for authentication",
"optional": True,
},
}
)
async def run(self, url: Optional[str] = None, api_key: Optional[str] = None) -> Dict[str, Any]:
"""Update connection settings."""
config = Config()
if url is not None:
config.api_url = url
if api_key is not None:
config.api_key = api_key
return {
"success": True,
"url": config.api_url,
"has_api_key": config.has_api_key(),
}
# Project Management Tools
class GetProjects(Tool):
"""Tool to get all projects in DolphinScheduler."""
name = "get-projects"
description = "Get list of all projects in DolphinScheduler."
input_schema = schema({})
async def run(self, **kwargs) -> Dict[str, Any]:
"""Get all projects."""
client = DolphinSchedulerClient()
try:
result = await client.get_projects()
return result
finally:
await client.close()
class GetProject(Tool):
"""Tool to get details of a specific project in DolphinScheduler."""
name = "get-project"
description = "Get details of a specific project in DolphinScheduler."
input_schema = schema(
{
"project_id": {
"type": "integer",
"description": "ID of the project to retrieve",
}
}
)
async def run(self, project_id: int) -> Dict[str, Any]:
"""Get project details by ID."""
client = DolphinSchedulerClient()
try:
result = await client.get_project(project_id)
return result
finally:
await client.close()
# Workflow Management Tools
class GetWorkflows(Tool):
"""Tool to get workflows for a specific project."""
name = "get-workflows"
description = "Get list of workflows for a specific project."
input_schema = schema(
{
"project_id": {
"type": "integer",
"description": "ID of the project",
}
}
)
async def run(self, project_id: int) -> Dict[str, Any]:
"""Get workflows for a project."""
client = DolphinSchedulerClient()
try:
result = await client.get_workflows(project_id)
return result
finally:
await client.close()
class GetWorkflow(Tool):
"""Tool to get details of a specific workflow."""
name = "get-workflow"
description = "Get details of a specific workflow."
input_schema = schema(
{
"project_id": {
"type": "integer",
"description": "ID of the project",
},
"workflow_id": {
"type": "integer",
"description": "ID of the workflow",
},
}
)
async def run(self, project_id: int, workflow_id: int) -> Dict[str, Any]:
"""Get workflow details by ID."""
client = DolphinSchedulerClient()
try:
result = await client.get_workflow(project_id, workflow_id)
return result
finally:
await client.close()
# Workflow Instance Management Tools
class GetWorkflowInstances(Tool):
"""Tool to get workflow instances for a project."""
name = "get-workflow-instances"
description = "Get list of workflow instances for a specific project."
input_schema = schema(
{
"project_id": {
"type": "integer",
"description": "ID of the project",
}
}
)
async def run(self, project_id: int) -> Dict[str, Any]:
"""Get workflow instances for a project."""
client = DolphinSchedulerClient()
try:
result = await client.get_workflow_instances(project_id)
return result
finally:
await client.close()
class GetWorkflowInstance(Tool):
"""Tool to get details of a specific workflow instance."""
name = "get-workflow-instance"
description = "Get details of a specific workflow instance."
input_schema = schema(
{
"project_id": {
"type": "integer",
"description": "ID of the project",
},
"instance_id": {
"type": "integer",
"description": "ID of the workflow instance",
},
}
)
async def run(self, project_id: int, instance_id: int) -> Dict[str, Any]:
"""Get workflow instance details by ID."""
client = DolphinSchedulerClient()
try:
result = await client.get_workflow_instance(project_id, instance_id)
return result
finally:
await client.close()
class StartWorkflow(Tool):
"""Tool to start a workflow execution."""
name = "start-workflow"
description = "Start a workflow execution."
input_schema = schema(
{
"project_id": {
"type": "integer",
"description": "ID of the project",
},
"workflow_id": {
"type": "integer",
"description": "ID of the workflow to execute",
},
"params": {
"type": "object",
"description": "Workflow parameters",
"optional": True,
},
}
)
async def run(
self, project_id: int, workflow_id: int, params: Optional[Dict[str, Any]] = None
) -> Dict[str, Any]:
"""Start a workflow execution."""
client = DolphinSchedulerClient()
try:
result = await client.start_workflow(project_id, workflow_id, params)
return result
finally:
await client.close()
class StopWorkflowInstance(Tool):
"""Tool to stop a running workflow instance."""
name = "stop-workflow-instance"
description = "Stop a running workflow instance."
input_schema = schema(
{
"project_id": {
"type": "integer",
"description": "ID of the project",
},
"instance_id": {
"type": "integer",
"description": "ID of the workflow instance to stop",
},
}
)
async def run(self, project_id: int, instance_id: int) -> Dict[str, Any]:
"""Stop a workflow instance."""
client = DolphinSchedulerClient()
try:
result = await client.stop_workflow_instance(project_id, instance_id)
return result
finally:
await client.close()
# Task Instance Management Tools
class GetTaskInstances(Tool):
"""Tool to get task instances for a workflow instance."""
name = "get-task-instances"
description = "Get task instances for a workflow instance."
input_schema = schema(
{
"project_id": {
"type": "integer",
"description": "ID of the project",
},
"instance_id": {
"type": "integer",
"description": "ID of the workflow instance",
},
}
)
async def run(self, project_id: int, instance_id: int) -> Dict[str, Any]:
"""Get task instances for a workflow instance."""
client = DolphinSchedulerClient()
try:
result = await client.get_task_instances(project_id, instance_id)
return result
finally:
await client.close()
# System Status Tools
class GetSystemStatus(Tool):
"""Tool to get overall system status."""
name = "get-system-status"
description = "Get overall system status including master and worker nodes."
input_schema = schema({})
async def run(self, **kwargs) -> Dict[str, Any]:
"""Get system status."""
client = DolphinSchedulerClient()
try:
result = await client.get_system_status()
return result
finally:
await client.close()
# Resource Management Tools
class GetResources(Tool):
"""Tool to get resource list."""
name = "get-resources"
description = "Get list of resources (files and directories)."
input_schema = schema({})
async def run(self, **kwargs) -> Dict[str, Any]:
"""Get resources."""
client = DolphinSchedulerClient()
try:
result = await client.get_resources()
return result
finally:
await client.close()
# List of all available tools
TOOLS = [
# Connection Management
GetConnectionSettings(),
UpdateConnectionSettings(),
# Project Management
GetProjects(),
GetProject(),
# Workflow Management
GetWorkflows(),
GetWorkflow(),
# Workflow Instance Management
GetWorkflowInstances(),
GetWorkflowInstance(),
StartWorkflow(),
StopWorkflowInstance(),
# Task Instance Management
GetTaskInstances(),
# System Status
GetSystemStatus(),
# Resource Management
GetResources(),
]