# Copyright (C) 2025 AIDC-AI
# This project is licensed under the MIT License (SPDX-License-identifier: MIT).
import json
import keyword
import re
import os
import tempfile
from urllib.parse import urlparse
from pydantic import Field
from pixelle.logger import logger
from pixelle.mcp_core import mcp
from pixelle.manager.workflow_manager import workflow_manager, CUSTOM_WORKFLOW_DIR
from pixelle.utils.file_util import download_files
from pixelle.utils.file_uploader import upload
from pixelle.utils.runninghub_util import handle_runninghub_workflow_save
@mcp.tool(name="save_workflow_tool")
async def save_workflow_tool(
workflow_source: str = Field(description="The workflow to save. Can be a URL for downloading a workflow file, or a RunningHub workflow_id (starts with numbers) for fetching from RunningHub"),
tool_name: str = Field(default="", description="The name for the MCP tool. Priority: 1) User-specified name (if provided), 2) For URL: use uploaded filename, 3) For RunningHub: must ask user to provide a specific tool name. Must be in English, without file extension, and follow pattern: start with letter/underscore, contain only letters/digits/underscores."),
):
"""
Add or update a workflow in MCP tools.
This tool should be invoked whenever the user wants to:
- add a new workflow / tool
- save a workflow / tool
- update or overwrite an existing workflow / tool
- store or register a workflow
- keep or preserve a workflow for later use
The workflow_source parameter supports two formats:
1. HTTP/HTTPS URL: Downloads the workflow file from the URL
2. RunningHub workflow_id: Fetches the workflow from RunningHub using the API
Common phrasings include:
"add this tool", "add this workflow", "save this workflow",
"update this tool", "overwrite the workflow", "store this workflow".
"""
def error(msg: str):
return json.dumps({ "success": False, "error": msg })
try:
# Valid format: starts with a letter or underscore, followed by letters, digits, or underscores
pattern = r'^[A-Za-z_][A-Za-z0-9_]*$'
if not re.match(pattern, tool_name):
return error(
"The tool_name format is invalid: only letters, digits, and underscores are allowed, and it must start with a letter or underscore."
)
if keyword.iskeyword(tool_name):
return error(
f"The tool_name cannot be a Python keyword: '{tool_name}'."
)
# Determine if workflow_source is URL or RunningHub workflow_id
if _is_url(workflow_source):
# Handle URL - use existing download logic
logger.info(f"Processing workflow from URL: {workflow_source}")
async with download_files(workflow_source) as temp_workflow_path:
return workflow_manager.load_workflow(temp_workflow_path, tool_name=tool_name)
else:
# Handle RunningHub workflow_id
logger.info(f"Processing workflow from RunningHub workflow_id: {workflow_source}")
result = await handle_runninghub_workflow_save(workflow_source, tool_name)
if not result["success"]:
return error(result["error"])
# Load the workflow using the created file
return workflow_manager.load_workflow(result["workflow_file_path"], tool_name=tool_name)
except Exception as e:
logger.error(f"Failed to save workflow: {e}", exc_info=True)
return error(f"Failed to save workflow: {str(e)}")
def _is_url(source: str) -> bool:
"""Check if the source is a valid URL"""
try:
parsed = urlparse(source)
return parsed.scheme in ['http', 'https'] and parsed.netloc
except Exception:
return False
@mcp.tool(name="reload_workflows_tool")
async def reload_workflows_tool():
"""
Reload all MCP tools that were generated by workflows.
"""
return workflow_manager.reload_all_workflows()
@mcp.tool(name="list_workflows_tool")
async def list_workflows_tool():
"""
List all MCP tools that were generated by workflows.
Call this tool when the user wants to list all MCP tools, such as:
- Query all tools
- List all MCP tools
- How many tools are there
"""
workflow_names = list(workflow_manager.loaded_workflows.keys())
workflow_names.sort()
return workflow_names
@mcp.tool(name="get_workflow_tool_detail")
async def get_workflow_tool_detail(
workflow_name: str = Field(description="The name of the workflow to get details for"),
):
"""
Get detailed information about a specific workflow tool, including the workflow file URL.
This tool will:
- Return the workflow metadata and configuration
- Upload the workflow file and return its URL
- Provide comprehensive information about the workflow tool
"""
def error(msg: str):
return json.dumps({"success": False, "error": msg})
try:
# Check if workflow exists
if workflow_name not in workflow_manager.loaded_workflows:
return error(f"Workflow '{workflow_name}' not found or not loaded")
# Get workflow info
workflow_info = workflow_manager.loaded_workflows[workflow_name]
# Get workflow file path
workflow_file_path = os.path.join(CUSTOM_WORKFLOW_DIR, f"{workflow_name}.json")
# Check if workflow file exists
if not os.path.exists(workflow_file_path):
return error(f"Workflow file not found: {workflow_file_path}")
# Upload workflow file and get URL
try:
workflow_file_url = upload(workflow_file_path, f"{workflow_name}.json")
except Exception as e:
logger.error(f"Failed to upload workflow file: {e}")
return error(f"Failed to upload workflow file: {str(e)}")
# Prepare detailed response
result = {
"success": True,
"workflow_name": workflow_name,
"workflow_file_url": workflow_file_url,
"metadata": workflow_info["metadata"],
"loaded_at": workflow_info["loaded_at"].strftime("%Y-%m-%d %H:%M:%S") if hasattr(workflow_info["loaded_at"], 'strftime') else str(workflow_info["loaded_at"]),
}
logger.info(f"Successfully retrieved workflow details for: {workflow_name}")
return json.dumps(result, ensure_ascii=False, indent=2)
except Exception as e:
logger.error(f"Failed to get workflow details for {workflow_name}: {e}")
return error(f"Failed to get workflow details: {str(e)}")
@mcp.tool(name="remove_workflow_tool")
async def remove_workflow_tool(
workflow_name: str = Field(description="The name of the workflow to remove"),
):
"""
Remove an MCP tool that was generated by a workflow.
"""
return workflow_manager.unload_workflow(workflow_name)