Skip to main content
Glama
AIDC-AI

pixelle-mcp-Image-generation

by AIDC-AI
workflow_parser.py10.1 kB
# Copyright (C) 2025 AIDC-AI # This project is licensed under the MIT License (SPDX-License-identifier: MIT). import json import re from pathlib import Path from pixelle.logger import logger from typing import Dict, Any, Optional, List from pydantic import BaseModel, Field class WorkflowParam(BaseModel): name: str type: str = Field(default="str") description: Optional[str] = None required: bool = True default: Optional[Any] = None handler_type: Optional[str] = None class WorkflowParamMapping(BaseModel): """Parameter mapping information""" param_name: str node_id: str input_field: str node_class_type: str handler_type: Optional[str] = None # New: handler type class WorkflowOutputMapping(BaseModel): """Output mapping information""" node_id: str output_var: str class WorkflowMappingInfo(BaseModel): """Workflow mapping information""" param_mappings: List[WorkflowParamMapping] output_mappings: List[WorkflowOutputMapping] class WorkflowMetadata(BaseModel): title: str description: Optional[str] = None params: Dict[str, WorkflowParam] mapping_info: WorkflowMappingInfo workflow_id: Optional[str] = None # RunningHub workflow ID is_runninghub: bool = False # Whether this is a RunningHub workflow class WorkflowParser: """Workflow parser""" def __init__(self): pass def parse_dsl_title(self, title: str) -> Optional[Dict[str, Any]]: """Parse DSL title Syntax: $<name>.[~]<field>[!][:<description>] where ~ means need URL upload processing, return relative path """ pattern = r'^\$(\w+)\.(?:(~)?(\w+))(!)?(?::(.+))?$' match = re.match(pattern, title.strip()) if not match: return None name, handler_mark, field, required_mark, description = match.groups() # Determine handler type handler_type = "upload_rel" if handler_mark else None return { 'name': name, 'field': field, 'required': bool(required_mark), 'description': description.strip() if description else None, 'handler_type': handler_type } def infer_type_from_value(self, value: Any) -> str: """Infer type from value""" if isinstance(value, bool): return "bool" elif isinstance(value, int): return "int" elif isinstance(value, float): return "float" else: return "str" def extract_field_value(self, node_data: Dict[str, Any], field_name: str) -> Any: """Extract value from the specified field of the node""" inputs = node_data.get("inputs", {}) # Check if the specified field exists and is not a node connection if field_name in inputs and not isinstance(inputs[field_name], list): return inputs[field_name] return None def parse_output_marker(self, title: str) -> Optional[str]: """Parse output marker Format: $output.name """ if not title.startswith('$output.'): return None output_var = title[8:] # Remove '$output.' return output_var if output_var else None def is_known_output_node(self, class_type: str) -> bool: """Check if it is a known output node type""" known_output_types = { 'SaveImage', 'SaveVideo', 'SaveAudio', 'VHS_SaveVideo', 'VHS_SaveAudio' } return class_type in known_output_types def parse_node(self, node_id: str, node_data: Dict[str, Any]) -> tuple[Optional[WorkflowParam], Optional[WorkflowParamMapping], Optional[WorkflowOutputMapping]]: """Parse parameters or outputs from the node Returns: (param, param_mapping, output_mapping) """ # Check if the node is valid if not isinstance(node_data, dict) or "_meta" not in node_data: return None, None, None title = node_data["_meta"].get("title", "") class_type = node_data.get("class_type", "") # 1. Check if it is an output marker output_var = self.parse_output_marker(title) if output_var: output_mapping = WorkflowOutputMapping( node_id=node_id, output_var=output_var ) return None, None, output_mapping # 2. Check if it is a known output node if self.is_known_output_node(class_type): # Use node_id as default output variable name output_mapping = WorkflowOutputMapping( node_id=node_id, output_var=node_id ) return None, None, output_mapping # 3. Parse DSL parameter title dsl_info = self.parse_dsl_title(title) if not dsl_info: return None, None, None # 4. Get field name from DSL, extract default value and infer type input_field = dsl_info['field'] default_value = self.extract_field_value(node_data, input_field) param_type = self.infer_type_from_value(default_value) if default_value is not None else "str" # 6. Verify required logic is_required = dsl_info['required'] if not is_required and default_value is None: message = f"Parameter `{dsl_info['name']}` has no default value but not marked as required" logger.error(message) raise Exception(message) # 7. Create parameter object param = WorkflowParam( name=dsl_info['name'], type=param_type, description=dsl_info['description'], required=is_required, default= None if is_required else default_value, handler_type=dsl_info['handler_type'] ) # 8. Create parameter mapping param_mapping = WorkflowParamMapping( param_name=dsl_info['name'], node_id=node_id, input_field=input_field, node_class_type=class_type, handler_type=dsl_info['handler_type'] ) return param, param_mapping, None def find_mcp_node(self, workflow_data: Dict[str, Any]) -> Optional[Dict[str, Any]]: """Find MCP node (optional)""" mcp_nodes = [] for node_id, node_data in workflow_data.items(): if (isinstance(node_data, dict) and node_data.get("_meta", {}).get("title") == "MCP"): mcp_nodes.append(node_data) if len(mcp_nodes) == 0: # MCP node is optional, no error if not found return None elif len(mcp_nodes) > 1: logger.error(f"Workflow found multiple MCP nodes ({len(mcp_nodes)} nodes), only one MCP node is allowed") return None return mcp_nodes[0] def parse_mcp_node_config(self, mcp_node: Dict[str, Any]) -> Optional[str]: """Parse description information in MCP node""" try: inputs = mcp_node.get("inputs", {}) # Find configuration fields possible_fields = ["value", "text", "string"] inputs_lower = {k.lower(): v for k, v in inputs.items()} description_content = None for field in possible_fields: if field in inputs_lower: description_content = inputs_lower[field] break if description_content is None: logger.error(f"MCP node did not find valid configuration fields, tried fields: {possible_fields}") return None # Directly return text content as description return description_content.strip() if isinstance(description_content, str) else str(description_content).strip() except Exception as e: logger.error(f"Parse MCP node configuration failed: {e}", exc_info=True) return None def parse_workflow(self, workflow_data: Dict[str, Any], title: str) -> Optional[WorkflowMetadata]: """Parse complete workflow""" # 1. Find and parse MCP node (optional) mcp_node = self.find_mcp_node(workflow_data) description = None if mcp_node: description = self.parse_mcp_node_config(mcp_node) # If there is no MCP node, description remains None, continue parsing workflow # 2. Scan all nodes, collect parameter and mapping information params = {} param_mappings = [] output_mappings = [] for node_id, node_data in workflow_data.items(): param, param_mapping, output_mapping = self.parse_node(node_id, node_data) if param and param_mapping: params[param.name] = param param_mappings.append(param_mapping) if output_mapping: output_mappings.append(output_mapping) # 3. Build mapping_info mapping_info = WorkflowMappingInfo( param_mappings=param_mappings, output_mappings=output_mappings ) # 4. Build metadata metadata = WorkflowMetadata( title=title, description=description, params=params, mapping_info=mapping_info ) return metadata def parse_workflow_file(self, file_path: str, tool_name: Optional[str] = None) -> Optional[WorkflowMetadata]: """Parse workflow file""" with open(file_path, 'r', encoding='utf-8') as f: workflow_data = json.load(f) # Extract title from file name (remove suffix) title = tool_name or Path(file_path).stem return self.parse_workflow(workflow_data, title)

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/AIDC-AI/Pixelle-MCP'

If you have feedback or need assistance with the MCP directory API, please join our Discord server