Skip to main content
Glama

DolphinScheduler MCP Server

by ocean-zhc
project_tools.py14.6 kB
"""Tools for project operations in DolphinScheduler.""" from typing import Dict, List, Optional from ..fastmcp_compat import FastMCPTool from ..client import DolphinSchedulerClient class GetProjects(FastMCPTool): """Tool to 通过项目id查询项目信息""" name = "get_projects" description = "通过项目ID查询项目信息" is_async = True schema = { "type": "object", "properties": { "code": { "type": "integer", "format": "int64", "description": "\u9879\u76eeCode" } }, "required": [ "code" ] } async def _run(self, code) -> Dict: """Execute the GET operation on /projects/{code}.""" client = DolphinSchedulerClient() try: response = await client.request( "GET", f"/projects/{code}" ) return {"success": True, "data": response} finally: await client.close() class UpdateProjects(FastMCPTool): """Tool to 更新项目""" name = "update_projects" description = "更新项目" is_async = True schema = { "type": "object", "properties": { "code": { "type": "integer", "format": "int64", "description": "\u9879\u76eeCode" }, "projectName": { "type": "string", "description": "\u9879\u76ee\u540d\u79f0" }, "description": { "type": "string", "description": "\u9879\u76ee\u63cf\u8ff0" } }, "required": [ "code", "projectName" ] } async def _run(self, code, projectName, description) -> Dict: """Execute the PUT operation on /projects/{code}.""" client = DolphinSchedulerClient() try: params = { "projectName": projectName, "description": description, } response = await client.request( "PUT", f"/projects/{code}", params=params ) return {"success": True, "data": response} finally: await client.close() class DeleteProjects(FastMCPTool): """Tool to 通过id删除项目""" name = "delete_projects" description = "通过ID删除项目" is_async = True schema = { "type": "object", "properties": { "code": { "type": "integer", "format": "int64", "description": "\u9879\u76eeCode" } }, "required": [ "code" ] } async def _run(self, code) -> Dict: """Execute the DELETE operation on /projects/{code}.""" client = DolphinSchedulerClient() try: response = await client.request( "DELETE", f"/projects/{code}" ) return {"success": True, "data": response} finally: await client.close() class GetProjects36(FastMCPTool): """Tool to 分页查询项目列表""" name = "get_projects36" description = "分页查询项目列表" is_async = True schema = { "type": "object", "properties": { "searchVal": { "type": "string", "description": "\u641c\u7d22\u503c" }, "pageSize": { "type": "integer", "format": "int32", "description": "\u9875\u5927\u5c0f" }, "pageNo": { "type": "integer", "format": "int32", "description": "\u9875\u7801\u53f7" } }, "required": [ "pageSize", "pageNo" ] } async def _run(self, searchVal, pageSize, pageNo) -> Dict: """Execute the GET operation on /projects.""" client = DolphinSchedulerClient() try: params = { "searchVal": searchVal, "pageSize": pageSize, "pageNo": pageNo, } response = await client.request( "GET", f"/projects", params=params ) return {"success": True, "data": response} finally: await client.close() class CreateProjects(FastMCPTool): """Tool to 创建项目""" name = "create_projects" description = "创建项目" is_async = True schema = { "type": "object", "properties": { "projectName": { "type": "string", "description": "\u9879\u76ee\u540d\u79f0" }, "description": { "type": "string", "description": "\u9879\u76ee\u63cf\u8ff0" } }, "required": [ "projectName" ] } async def _run(self, projectName, description) -> Dict: """Execute the POST operation on /projects.""" client = DolphinSchedulerClient() try: params = { "projectName": projectName, "description": description, } response = await client.request( "POST", f"/projects", params=params ) return {"success": True, "data": response} finally: await client.close() class GetProjectsUnauthProject(FastMCPTool): """Tool to 查询未授权的项目""" name = "get_projects_unauth_project" description = "查询未授权的项目" is_async = True schema = { "type": "object", "properties": { "userId": { "type": "integer", "format": "int32", "description": "\u7528\u6237ID" } }, "required": [ "userId" ] } async def _run(self, userId) -> Dict: """Execute the GET operation on /projects/unauth-project.""" client = DolphinSchedulerClient() try: params = { "userId": userId, } response = await client.request( "GET", f"/projects/unauth-project", params=params ) return {"success": True, "data": response} finally: await client.close() class GetProjectsProjectWithAuthorizedLevel(FastMCPTool): """Tool to query_project_authorized_level""" name = "get_projects_project_with_authorized_level" description = "QUERY_PROJECT_AUTHORIZED_LEVEL" is_async = True schema = { "type": "object", "properties": { "user_id": { "type": "integer", "format": "int32", "description": "\u7528\u6237ID" } }, "required": [ "user_id" ] } async def _run(self, user_id) -> Dict: """Execute the GET operation on /projects/project-with-authorized-level.""" client = DolphinSchedulerClient() try: params = { "userId": user_id, } response = await client.request( "GET", f"/projects/project-with-authorized-level", params=params ) return {"success": True, "data": response} finally: await client.close() class GetProjectsProjectWithAuthorizedLevelListPaging(FastMCPTool): """Tool to query_project_with_auth_level_list_paging_notes""" name = "get_projects_project_with_authorized_level_list_paging" description = "QUERY_PROJECT_WITH_AUTH_LEVEL_LIST_PAGING_NOTES" is_async = True schema = { "type": "object", "properties": { "user_id": { "type": "integer", "format": "int32", "description": "\u7528\u6237ID" }, "search_val": { "type": "string", "description": "\u641c\u7d22\u503c" }, "page_size": { "type": "integer", "format": "int32", "description": "\u9875\u5927\u5c0f" }, "page_no": { "type": "integer", "format": "int32", "description": "\u9875\u7801\u53f7" } }, "required": [ "user_id", "page_size", "page_no" ] } async def _run(self, user_id, search_val, page_size, page_no) -> Dict: """Execute the GET operation on /projects/project-with-authorized-level-list-paging.""" client = DolphinSchedulerClient() try: params = { "userId": user_id, "searchVal": search_val, "pageSize": page_size, "pageNo": page_no, } response = await client.request( "GET", f"/projects/project-with-authorized-level-list-paging", params=params ) return {"success": True, "data": response} finally: await client.close() class ListProjectsList(FastMCPTool): """Tool to 查询所有项目""" name = "list_projects_list" description = "查询所有项目" is_async = True async def _run(self) -> Dict: """Execute the GET operation on /projects/list.""" client = DolphinSchedulerClient() try: response = await client.request( "GET", f"/projects/list" ) return {"success": True, "data": response} finally: await client.close() class GetProjectsListDependent(FastMCPTool): """Tool to 查询dependent节点所有项目""" name = "get_projects_list_dependent" description = "查询Dependent节点所有项目" is_async = True async def _run(self) -> Dict: """Execute the GET operation on /projects/list-dependent.""" client = DolphinSchedulerClient() try: response = await client.request( "GET", f"/projects/list-dependent" ) return {"success": True, "data": response} finally: await client.close() class GetProjectsCreatedAndAuthed(FastMCPTool): """Tool to 查询授权和用户创建的项目""" name = "get_projects_created_and_authed" description = "查询授权和用户创建的项目" is_async = True async def _run(self) -> Dict: """Execute the GET operation on /projects/created-and-authed.""" client = DolphinSchedulerClient() try: response = await client.request( "GET", f"/projects/created-and-authed" ) return {"success": True, "data": response} finally: await client.close() class GetProjectsAuthedUser(FastMCPTool): """Tool to 查询拥有项目授权的用户""" name = "get_projects_authed_user" description = "查询拥有项目授权的用户" is_async = True schema = { "type": "object", "properties": { "project_code": { "type": "integer", "format": "int64", "description": "\u9879\u76eeCode" } }, "required": [ "project_code" ] } async def _run(self, project_code) -> Dict: """Execute the GET operation on /projects/authed-user.""" client = DolphinSchedulerClient() try: params = { "projectCode": project_code, } response = await client.request( "GET", f"/projects/authed-user", params=params ) return {"success": True, "data": response} finally: await client.close() class GetProjectsAuthedProject(FastMCPTool): """Tool to 查询授权项目""" name = "get_projects_authed_project" description = "查询授权项目" is_async = True schema = { "type": "object", "properties": { "user_id": { "type": "integer", "format": "int32", "description": "\u7528\u6237ID" } }, "required": [ "user_id" ] } async def _run(self, user_id) -> Dict: """Execute the GET operation on /projects/authed-project.""" client = DolphinSchedulerClient() try: params = { "userId": user_id, } response = await client.request( "GET", f"/projects/authed-project", params=params ) return {"success": True, "data": response} finally: await client.close() def register_project_tools(mcp): """Register project tools with FastMCP. Args: mcp: The FastMCP instance to register tools with. """ from ..fastmcp_compat import register_tool_class register_tool_class(mcp, CreateProjects) register_tool_class(mcp, DeleteProjects) register_tool_class(mcp, GetProjects) register_tool_class(mcp, GetProjects36) register_tool_class(mcp, GetProjectsAuthedProject) register_tool_class(mcp, GetProjectsAuthedUser) register_tool_class(mcp, GetProjectsCreatedAndAuthed) register_tool_class(mcp, GetProjectsListDependent) register_tool_class(mcp, GetProjectsProjectWithAuthorizedLevel) register_tool_class(mcp, GetProjectsProjectWithAuthorizedLevelListPaging) register_tool_class(mcp, GetProjectsUnauthProject) register_tool_class(mcp, ListProjectsList) register_tool_class(mcp, UpdateProjects)

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/ocean-zhc/dolphinscheduler-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server