Skip to main content
Glama

DolphinScheduler MCP Server

by ocean-zhc
manage_resources.py4.35 kB
#!/usr/bin/env python3 """Example script to manage DolphinScheduler resources using the MCP client.""" import argparse import asyncio import json import os import sys from typing import Dict, Any # For local development, add the parent directory to the Python path sys.path.append(".") # Import the MCPClient from simple_client.py from examples.simple_client import MCPClient async def list_resources(client: MCPClient, args: argparse.Namespace) -> None: """List resources in DolphinScheduler.""" parameters: Dict[str, Any] = {} if args.pid is not None: parameters["pid"] = args.pid if args.resource_type is not None: parameters["resource_type"] = args.resource_type if args.search_val is not None: parameters["search_val"] = args.search_val response = await client.invoke_tool("get-resource-list", parameters) if "result" in response and "data" in response["result"]: resources = response["result"]["data"] print(f"Found {len(resources)} resources:") for resource in resources: print(f"- {resource.get('name', 'Unknown')} (ID: {resource.get('id', 'Unknown')})") print(f" Type: {resource.get('type', 'Unknown')}") if "description" in resource and resource["description"]: print(f" Description: {resource['description']}") print() else: print("Error or no resources found:") print(json.dumps(response, indent=2)) async def upload_resource(client: MCPClient, args: argparse.Namespace) -> None: """Upload a resource to DolphinScheduler.""" with open(args.file_path, "rb") as file: file_content = file.read() parameters = { "pid": args.pid, "name": os.path.basename(args.file_path), "description": args.description or "", "content": file_content.decode("utf-8") if args.is_text else "", "is_text": args.is_text, } response = await client.invoke_tool("create-resource", parameters) print(json.dumps(response, indent=2)) async def delete_resource(client: MCPClient, args: argparse.Namespace) -> None: """Delete a resource from DolphinScheduler.""" parameters = { "resource_id": args.resource_id } response = await client.invoke_tool("delete-resource", parameters) print(json.dumps(response, indent=2)) async def main(): """Manage DolphinScheduler resources using MCP.""" parser = argparse.ArgumentParser(description="Manage DolphinScheduler resources") subparsers = parser.add_subparsers(dest="command", help="Command to execute") # List resources command list_parser = subparsers.add_parser("list", help="List resources") list_parser.add_argument("--pid", type=int, help="Parent directory ID (0 for root)") list_parser.add_argument("--resource-type", type=str, choices=["FILE", "UDF"], help="Resource type") list_parser.add_argument("--search-val", type=str, help="Search value") # Upload resource command upload_parser = subparsers.add_parser("upload", help="Upload a resource") upload_parser.add_argument("--pid", type=int, required=True, help="Parent directory ID") upload_parser.add_argument("--file-path", type=str, required=True, help="Path to the file to upload") upload_parser.add_argument("--description", type=str, help="Resource description") upload_parser.add_argument("--is-text", action="store_true", help="Whether the file is a text file") # Delete resource command delete_parser = subparsers.add_parser("delete", help="Delete a resource") delete_parser.add_argument("--resource-id", type=int, required=True, help="Resource ID to delete") args = parser.parse_args() # Set up the MCP server URL mcp_url = os.environ.get("MCP_URL", "http://localhost:8089/mcp") print(f"Connecting to MCP server at {mcp_url}") async with MCPClient(mcp_url) as client: if args.command == "list": await list_resources(client, args) elif args.command == "upload": await upload_resource(client, args) elif args.command == "delete": await delete_resource(client, args) else: parser.print_help() if __name__ == "__main__": asyncio.run(main())

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/ocean-zhc/dolphinscheduler-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server