Skip to main content
Glama
AIDC-AI

pixelle-mcp-Image-generation

by AIDC-AI
workflow_source_util.py4.82 kB
# Copyright (C) 2025 AIDC-AI # This project is licensed under the MIT License (SPDX-License-identifier: MIT). """ Workflow source utility functions - centralized logic for workflow source type detection and routing """ import json import os from pathlib import Path from typing import Optional, Dict, Any from pixelle.logger import logger def get_workflow_source(workflow_file: str | Path) -> Optional[str]: """Get workflow source type Args: workflow_file: Path to the workflow file Returns: Optional[str]: Source type like 'runninghub', 'local', etc. None if cannot be identified or is a standard ComfyUI workflow """ try: if not os.path.exists(workflow_file): return None with open(workflow_file, 'r', encoding='utf-8') as f: data = json.load(f) return data.get("_source") except Exception: return None def is_external_workflow(workflow_file: str | Path) -> bool: """Check if the workflow is an external workflow (not local ComfyUI workflow) Args: workflow_file: Path to the workflow file Returns: bool: True if it's an external workflow that needs special handling """ source = get_workflow_source(workflow_file) return source is not None def has_workflow_source(workflow_file: str | Path) -> bool: """Check if the workflow file contains _source field Args: workflow_file: Path to the workflow file Returns: bool: True if contains _source field """ try: if not os.path.exists(workflow_file): return False with open(workflow_file, 'r', encoding='utf-8') as f: data = json.load(f) return "_source" in data except Exception: return False def get_workflow_source_data(workflow_file: str | Path) -> Optional[Dict[str, Any]]: """Get complete workflow source data Args: workflow_file: Path to the workflow file Returns: Optional[Dict]: Dictionary containing _source and other related data, None if parsing failed """ try: if not os.path.exists(workflow_file): return None with open(workflow_file, 'r', encoding='utf-8') as f: data = json.load(f) # Only return data if it has _source field if "_source" not in data: return None return data except Exception as e: logger.error(f"Failed to parse workflow source data from {workflow_file}: {e}") return None def validate_workflow_source_format(workflow_file: str | Path) -> bool: """Validate if the workflow source format is correct Args: workflow_file: Path to the workflow file Returns: bool: True if format is correct """ try: data = get_workflow_source_data(workflow_file) if not data: return False # Must have _source field source = data.get("_source") if not source or not isinstance(source, str): return False # Source should not be empty if not source.strip(): return False return True except Exception: return False def create_workflow_source_file(source: str, source_data: Dict[str, Any], output_path: str) -> str: """Create a workflow file with source information Args: source: Source type, e.g. 'runninghub' source_data: Source-related data, e.g. {'workflow_id': '123456'} output_path: Output file path Returns: str: Path to the created file """ # Create the workflow data with source information workflow_data = { "_source": source, **source_data } # Ensure output directory exists os.makedirs(os.path.dirname(output_path), exist_ok=True) # Save the workflow file with open(output_path, 'w', encoding='utf-8') as f: json.dump(workflow_data, f, ensure_ascii=False, indent=2) logger.info(f"Created workflow source file: {output_path} (source: {source})") return output_path def get_supported_sources() -> list[str]: """Get list of supported workflow source types Returns: list[str]: List of supported source types """ return ["runninghub"] # Will be extended as we add more sources def is_supported_source(source: str) -> bool: """Check if the source type is supported Args: source: Source type to check Returns: bool: True if supported """ return source in get_supported_sources()

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/AIDC-AI/Pixelle-MCP'

If you have feedback or need assistance with the MCP directory API, please join our Discord server