Skip to main content
Glama

DolphinScheduler MCP Server

by ocean-zhc
datasource_tools.py17.1 kB
"""Tools for data source operations in DolphinScheduler.""" from typing import Dict, List, Optional from ..fastmcp_compat import FastMCPTool from ..client import DolphinSchedulerClient class GetDatasources(FastMCPTool): """Tool to 查询数据源通过id""" name = "get_datasources" description = "查询数据源通过ID" is_async = True schema = { "type": "object", "properties": { "id": { "type": "integer", "format": "int32", "description": "\u6570\u636e\u6e90ID" } }, "required": [ "id" ] } async def _run(self, id) -> Dict: """Execute the GET operation on /datasources/{id}.""" client = DolphinSchedulerClient() try: response = await client.request( "GET", f"/datasources/{id}" ) return {"success": True, "data": response} finally: await client.close() class UpdateDatasources(FastMCPTool): """Tool to 更新数据源""" name = "update_datasources" description = "更新数据源" is_async = True schema = { "type": "object", "properties": { "id": { "type": "integer", "format": "int32", "description": "\u6570\u636e\u6e90ID" }, "data_source_param": { "type": "object", "description": "\u6570\u636e\u6e90\u53c2\u6570" } }, "required": [ "id", "data_source_param" ] } async def _run(self, id, data_source_param) -> Dict: """Execute the PUT operation on /datasources/{id}.""" client = DolphinSchedulerClient() try: response = await client.request( "PUT", f"/datasources/{id}" ) return {"success": True, "data": response} finally: await client.close() class DeleteDatasources(FastMCPTool): """Tool to 删除数据源""" name = "delete_datasources" description = "删除数据源" is_async = True schema = { "type": "object", "properties": { "id": { "type": "integer", "format": "int32", "description": "\u6570\u636e\u6e90ID" } }, "required": [ "id" ] } async def _run(self, id) -> Dict: """Execute the DELETE operation on /datasources/{id}.""" client = DolphinSchedulerClient() try: response = await client.request( "DELETE", f"/datasources/{id}" ) return {"success": True, "data": response} finally: await client.close() class GetDatasources2(FastMCPTool): """Tool to 分页查询数据源列表""" name = "get_datasources2" description = "分页查询数据源列表" is_async = True schema = { "type": "object", "properties": { "search_val": { "type": "string", "description": "\u641c\u7d22\u503c" }, "page_no": { "type": "integer", "format": "int32", "description": "\u9875\u7801\u53f7" }, "page_size": { "type": "integer", "format": "int32", "description": "\u9875\u5927\u5c0f" } }, "required": [ "page_no", "page_size" ] } async def _run(self, search_val, page_no, page_size) -> Dict: """Execute the GET operation on /datasources.""" client = DolphinSchedulerClient() try: params = { "searchVal": search_val, "pageNo": page_no, "pageSize": page_size, } response = await client.request( "GET", f"/datasources", params=params ) return {"success": True, "data": response} finally: await client.close() class CreateDatasources(FastMCPTool): """Tool to 创建数据源""" name = "create_datasources" description = "创建数据源" is_async = True async def _run(self) -> Dict: """Execute the POST operation on /datasources.""" client = DolphinSchedulerClient() try: response = await client.request( "POST", f"/datasources" ) return {"success": True, "data": response} finally: await client.close() class CreateDatasourcesConnect(FastMCPTool): """Tool to 连接数据源""" name = "create_datasources_connect" description = "连接数据源" is_async = True async def _run(self) -> Dict: """Execute the POST operation on /datasources/connect.""" client = DolphinSchedulerClient() try: response = await client.request( "POST", f"/datasources/connect" ) return {"success": True, "data": response} finally: await client.close() class GetDatasourcesConnectTest(FastMCPTool): """Tool to 连接数据源测试""" name = "get_datasources_connect_test" description = "连接数据源测试" is_async = True schema = { "type": "object", "properties": { "id": { "type": "integer", "format": "int32", "description": "\u6570\u636e\u6e90ID" } }, "required": [ "id" ] } async def _run(self, id) -> Dict: """Execute the GET operation on /datasources/{id}/connect-test.""" client = DolphinSchedulerClient() try: response = await client.request( "GET", f"/datasources/{id}/connect-test" ) return {"success": True, "data": response} finally: await client.close() class GetDatasourcesVerifyName(FastMCPTool): """Tool to 验证数据源""" name = "get_datasources_verify_name" description = "验证数据源" is_async = True schema = { "type": "object", "properties": { "name": { "type": "string", "description": "\u6570\u636e\u6e90\u540d\u79f0" } }, "required": [ "name" ] } async def _run(self, name) -> Dict: """Execute the GET operation on /datasources/verify-name.""" client = DolphinSchedulerClient() try: params = { "name": name, } response = await client.request( "GET", f"/datasources/verify-name", params=params ) return {"success": True, "data": response} finally: await client.close() class GetDatasourcesUnauthDatasource(FastMCPTool): """Tool to 未授权的数据源""" name = "get_datasources_unauth_datasource" description = "未授权的数据源" is_async = True schema = { "type": "object", "properties": { "user_id": { "type": "integer", "format": "int32", "description": "\u7528\u6237ID" } }, "required": [ "user_id" ] } async def _run(self, user_id) -> Dict: """Execute the GET operation on /datasources/unauth-datasource.""" client = DolphinSchedulerClient() try: params = { "userId": user_id, } response = await client.request( "GET", f"/datasources/unauth-datasource", params=params ) return {"success": True, "data": response} finally: await client.close() class ListDatasourcesTables(FastMCPTool): """Tool to 获取数据源表列表""" name = "list_datasources_tables" description = "获取数据源表列表" is_async = True schema = { "type": "object", "properties": { "datasource_id": { "type": "integer", "format": "int32", "description": "\u6570\u636e\u6e90ID" }, "database": { "type": "string", "description": "DATABASE" } }, "required": [ "datasource_id", "database" ] } async def _run(self, datasource_id, database) -> Dict: """Execute the GET operation on /datasources/tables.""" client = DolphinSchedulerClient() try: params = { "datasourceId": datasource_id, "database": database, } response = await client.request( "GET", f"/datasources/tables", params=params ) return {"success": True, "data": response} finally: await client.close() class ListDatasourcesTablecolumns(FastMCPTool): """Tool to 获取数据源表列名""" name = "list_datasources_tablecolumns" description = "获取数据源表列名" is_async = True schema = { "type": "object", "properties": { "datasource_id": { "type": "integer", "format": "int32", "description": "\u6570\u636e\u6e90ID" }, "table_name": { "type": "string", "description": "\u8868\u540d" }, "database": { "type": "string", "description": "DATABASE" } }, "required": [ "datasource_id", "table_name", "database" ] } async def _run(self, datasource_id, table_name, database) -> Dict: """Execute the GET operation on /datasources/tableColumns.""" client = DolphinSchedulerClient() try: params = { "datasourceId": datasource_id, "tableName": table_name, "database": database, } response = await client.request( "GET", f"/datasources/tableColumns", params=params ) return {"success": True, "data": response} finally: await client.close() class ListDatasourcesList(FastMCPTool): """Tool to 通过数据源类型查询数据源列表""" name = "list_datasources_list" description = "通过数据源类型查询数据源列表" is_async = True schema = { "type": "object", "properties": { "type": { "type": "string", "enum": [ "MYSQL", "POSTGRESQL", "HIVE", "SPARK", "CLICKHOUSE", "ORACLE", "SQLSERVER", "DB2", "PRESTO", "H2", "REDSHIFT", "ATHENA", "TRINO", "STARROCKS", "AZURESQL", "DAMENG", "OCEANBASE", "SSH", "KYUUBI", "DATABEND", "SNOWFLAKE", "VERTICA", "HANA", "DORIS", "ZEPPELIN", "SAGEMAKER" ], "description": "\u6570\u636e\u6e90\u7c7b\u578b" } }, "required": [ "type" ] } async def _run(self, type) -> Dict: """Execute the GET operation on /datasources/list.""" client = DolphinSchedulerClient() try: params = { "type": type, } response = await client.request( "GET", f"/datasources/list", params=params ) return {"success": True, "data": response} finally: await client.close() class GetDatasourcesKerberosStartupState(FastMCPTool): """Tool to 获取用户信息""" name = "get_datasources_kerberos_startup_state" description = "获取用户信息" is_async = True async def _run(self) -> Dict: """Execute the GET operation on /datasources/kerberos-startup-state.""" client = DolphinSchedulerClient() try: response = await client.request( "GET", f"/datasources/kerberos-startup-state" ) return {"success": True, "data": response} finally: await client.close() class ListDatasourcesDatabases(FastMCPTool): """Tool to get_datasource_database_notes""" name = "list_datasources_databases" description = "GET_DATASOURCE_DATABASE_NOTES" is_async = True schema = { "type": "object", "properties": { "datasource_id": { "type": "integer", "format": "int32", "description": "\u6570\u636e\u6e90ID" } }, "required": [ "datasource_id" ] } async def _run(self, datasource_id) -> Dict: """Execute the GET operation on /datasources/databases.""" client = DolphinSchedulerClient() try: params = { "datasourceId": datasource_id, } response = await client.request( "GET", f"/datasources/databases", params=params ) return {"success": True, "data": response} finally: await client.close() class GetDatasourcesAuthedDatasource(FastMCPTool): """Tool to 授权的数据源""" name = "get_datasources_authed_datasource" description = "授权的数据源" is_async = True schema = { "type": "object", "properties": { "user_id": { "type": "integer", "format": "int32", "description": "\u7528\u6237ID" } }, "required": [ "user_id" ] } async def _run(self, user_id) -> Dict: """Execute the GET operation on /datasources/authed-datasource.""" client = DolphinSchedulerClient() try: params = { "userId": user_id, } response = await client.request( "GET", f"/datasources/authed-datasource", params=params ) return {"success": True, "data": response} finally: await client.close() def register_datasource_tools(mcp): """Register datasource tools with FastMCP. Args: mcp: The FastMCP instance to register tools with. """ from ..fastmcp_compat import register_tool_class register_tool_class(mcp, CreateDatasources) register_tool_class(mcp, CreateDatasourcesConnect) register_tool_class(mcp, DeleteDatasources) register_tool_class(mcp, GetDatasources) register_tool_class(mcp, GetDatasources2) register_tool_class(mcp, GetDatasourcesAuthedDatasource) register_tool_class(mcp, GetDatasourcesConnectTest) register_tool_class(mcp, GetDatasourcesKerberosStartupState) register_tool_class(mcp, GetDatasourcesUnauthDatasource) register_tool_class(mcp, GetDatasourcesVerifyName) register_tool_class(mcp, ListDatasourcesDatabases) register_tool_class(mcp, ListDatasourcesList) register_tool_class(mcp, ListDatasourcesTablecolumns) register_tool_class(mcp, ListDatasourcesTables) register_tool_class(mcp, UpdateDatasources)

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/ocean-zhc/dolphinscheduler-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server