Skip to main content
Glama
AIDC-AI

pixelle-mcp-Image-generation

by AIDC-AI
starters.py19.2 kB
# Copyright (C) 2025 AIDC-AI # This project is licensed under the MIT License (SPDX-License-identifier: MIT). from typing import Awaitable, Optional, Callable, Any, List, Dict, Tuple import chainlit as cl from pydantic import BaseModel import json from pathlib import Path import re import uuid import asyncio import random from pixelle.utils.os_util import get_data_path, get_src_path ReplyHandler = Callable[[cl.Message], Awaitable[None]] class StarterModel(BaseModel): label: str icon: Optional[str] = None image: Optional[str] = None reply_handler: Optional[ReplyHandler] = None messages: Optional[List[Dict[str, Any]]] = None enabled: bool = True order: int = 999 class Config: arbitrary_types_allowed = True @property def message(self) -> str: """Get the content of the first user message from messages""" if not self.messages: return "" # Find the first user message for item in self.messages: if (item.get("role") == "user" and item.get("type") == "message"): return item.get("content", "") return "" def to_cl_starter(self) -> cl.Starter: """Convert to Chainlit Starter object""" return cl.Starter( label=self.label, message=self.message, icon=self.icon, ) # Typing effect disabled - direct output mode # File operation related functions SYSTEM_STARTERS_DIR = Path(get_src_path("starters")) CUSTOM_STARTERS_DIR = Path(get_data_path("custom_starters")) def ensure_starters_dirs(): """Ensure that the system and user starters directories exist""" SYSTEM_STARTERS_DIR.mkdir(parents=True, exist_ok=True) CUSTOM_STARTERS_DIR.mkdir(parents=True, exist_ok=True) def get_system_starters_dir() -> Path: """Get the system preset starter directory""" return SYSTEM_STARTERS_DIR def get_custom_starters_dir() -> Path: """Get the user custom starter directory""" return CUSTOM_STARTERS_DIR def parse_filename(filename: str) -> Tuple[bool, int, str]: """ Parse the file name format: [_]xxx_label.json Returns: (enabled, order, label) """ # Remove the .json suffix name = filename.replace('.json', '') # Check if it starts with _ (disabled mark) enabled = True if name.startswith('_'): enabled = False name = name[1:] # Remove the _ in front # Match the format: xxx_label match = re.match(r'^(\d+)_(.+)$', name) if match: order = int(match.group(1)) label = match.group(2) return enabled, order, label else: # If it does not match the format, return the default value return enabled, 999, name def get_next_order_number() -> int: """Get the next order number in the user directory""" ensure_starters_dirs() max_order = 0 # Only consider the order number in the user custom directory for starter_file in CUSTOM_STARTERS_DIR.glob("*.json"): _, order, _ = parse_filename(starter_file.name) if order != 999: # Ignore the default value max_order = max(max_order, order) return max_order + 1 def load_custom_starter(starter_file: Path) -> Optional[StarterModel]: """Load a single custom starter from the file""" try: # Parse the file name enabled, order, expected_label = parse_filename(starter_file.name) with open(starter_file, 'r', encoding='utf-8') as f: data = json.load(f) return StarterModel( label=expected_label, icon=data.get("icon", "/public/tool.svg"), image=data.get("image", None), messages=data.get("messages", []), enabled=enabled, order=order, ) except Exception as e: print(f"Error loading starter from {starter_file}: {e}") return None def load_system_starters() -> List[StarterModel]: """Load the system preset starters""" ensure_starters_dirs() system_starters = [] for starter_file in SYSTEM_STARTERS_DIR.glob("*.json"): starter = load_custom_starter(starter_file) if starter and starter.enabled: # Only load the enabled starters system_starters.append(starter) # Sort by the order field system_starters.sort(key=lambda x: x.order) return system_starters def load_custom_starters() -> List[StarterModel]: """Load the user custom starters""" ensure_starters_dirs() custom_starters = [] for starter_file in CUSTOM_STARTERS_DIR.glob("*.json"): starter = load_custom_starter(starter_file) if starter and starter.enabled: # Only load the enabled starters custom_starters.append(starter) # Sort by the order field custom_starters.sort(key=lambda x: x.order) return custom_starters def get_all_starters() -> List[StarterModel]: """Get all enabled starters, system preset first, user custom later""" system_starters = load_system_starters() custom_starters = load_custom_starters() # System preset first, user custom later return system_starters + custom_starters def convert_message_to_dict(message: cl.Message) -> Dict[str, Any]: """Convert cl.Message to dictionary format""" message_dict = { "created_at": message.created_at, "role": "user" if message.type == "user_message" else "ai", "type": "message", "content": message.content, } # Process elements if message.elements: elements = [] for element in message.elements: if isinstance(element, cl.Image): elements.append({ "type": "image", "url": element.url, "size": getattr(element, 'size', 'small') }) elif isinstance(element, cl.Video): elements.append({ "type": "video", "url": element.url, "size": getattr(element, 'size', 'small') }) elif isinstance(element, cl.Audio): elements.append({ "type": "audio", "url": element.url, "size": getattr(element, 'size', 'small') }) if elements: message_dict["elements"] = elements return message_dict def convert_step_to_dict(step: cl.Step) -> Dict[str, Any]: """Convert cl.Step to dictionary format""" return { "created_at": step.created_at, "role": "ai", "type": "step", "name": step.name, "input": step.input if step.input else {}, "output": step.output if step.output else "" } async def save_conversation_as_starter(label: str, user_message: str) -> bool: """Save the current conversation as starter""" try: ensure_starters_dirs() # Get the conversation history chat_context = cl.chat_context.get() # Build the message array pure_messages = [] for item in chat_context: if not item.content and not item.elements: continue if item.type == "system_message": continue else: pure_messages.append(convert_message_to_dict(item)) # Get steps from user_session current_steps = cl.user_session.get("current_steps", []) step_messages = [] for step in current_steps: step_messages.append(convert_step_to_dict(step)) all_messages = pure_messages + step_messages all_messages.sort(key=lambda x: x.get("created_at", "")) # Add \u200B prefix to the first user message to identify starter if all_messages: first_user_msg = None for msg in all_messages: if msg.get("role") == "user" and msg.get("type") == "message": first_user_msg = msg break if first_user_msg and first_user_msg.get("content"): # Add zero width space prefix to identify this is a starter message starter_prefix = "\u200B" msg_content = first_user_msg["content"] if not msg_content.startswith(starter_prefix): first_user_msg["content"] = starter_prefix + msg_content # Create starter data starter_data = { "icon": "/public/tool.svg", "messages": all_messages } # Generate file name order = get_next_order_number() filename = f"{order:03d}_{label}.json" filepath = CUSTOM_STARTERS_DIR / filename # Save file with open(filepath, 'w', encoding='utf-8') as f: json.dump(starter_data, f, ensure_ascii=False, indent=2) return True except Exception as e: print(f"Error saving starter: {e}") return False def build_save_action(): return cl.Action( name="save_as_starter", payload={"value": "save_starter"}, icon="save" ) async def show_prompt_dialog(title: str, message: str, placeholder: str = ""): """Show custom prompt dialog, return (dialog_id, cancel_callback)""" dialog_id = str(uuid.uuid4()) # Create custom element prompt_element = cl.CustomElement( name="PromptDialog", props={ "open": True, "title": title, "message": message, "placeholder": placeholder, "dialogId": dialog_id } ) # Send dialog prompt_msg = cl.Message(content="", elements=[prompt_element]) await prompt_msg.send() # Store the dialog information in the user session cl.user_session.set(f"prompt_dialog_{dialog_id}", { "message": prompt_msg, "resolved": False, "result": None }) # Define cancel callback function async def cancel_callback(): """Clean up prompt dialog related resources""" try: await prompt_msg.remove() # cl.chat_context.remove(prompt_msg) cl.user_session.set(f"prompt_dialog_{dialog_id}", None) except Exception: pass # Ignore when cleanup fails return dialog_id, cancel_callback async def show_alert(alert_type: str, title: str, message: str): """Show alert dialog""" alert_element = cl.CustomElement( name="AlertDialog", props={ "open": True, "type": alert_type, "title": title, "message": message } ) # Send alert alert_msg = cl.Message(content="", elements=[alert_element]) await alert_msg.send() @cl.action_callback("prompt_confirmed") async def on_prompt_confirmed(action): """Handle user confirmation input""" dialog_id = action.payload.get("dialogId") value = action.payload.get("value", "").strip() if dialog_id: dialog_info = cl.user_session.get(f"prompt_dialog_{dialog_id}") if dialog_info: dialog_info["resolved"] = True dialog_info["result"] = value cl.user_session.set(f"prompt_dialog_{dialog_id}", dialog_info) @cl.action_callback("save_as_starter") async def on_save_starter(action): """Handle the action of saving as Starter""" try: # Get the conversation history chat_context = cl.chat_context.get() if not chat_context or len(chat_context) < 2: await show_alert("error", "Cannot save", "Need conversation content to save as Starter") return # Get the content of the first user message as display use first_user_message = None for msg in chat_context: if isinstance(msg, cl.Message) and msg.type == "user_message": first_user_message = msg.content break if not first_user_message: await show_alert("error", "Cannot save", "Cannot find user message") return # Show custom prompt dialog dialog_id, cancel_callback = await show_prompt_dialog( title="Save as Starter", message="Please input the label name of Starter (for display on the homepage)", placeholder="e.g. Image editing tutorial" ) # Wait for user input (wait indefinitely, no timeout) import asyncio while True: await asyncio.sleep(0.1) dialog_info = cl.user_session.get(f"prompt_dialog_{dialog_id}") if dialog_info and dialog_info.get("resolved"): label = dialog_info.get("result") break if not label: # User canceled, clean up prompt message await cancel_callback() return # Verify the label name import re if not re.match(r'^[\w\u4e00-\u9fff\-_\s]+$', label): # Invalid input, clean up prompt message await cancel_callback() await show_alert("warning", "Invalid input", "Label name can only contain English, Chinese, numbers, underscores, hyphens and spaces") return # Clean up prompt message after saving await cancel_callback() # Save starter (pass in label only, user_message parameter is no longer used) success = await save_conversation_as_starter(label, first_user_message) if success: await show_alert("success", "Save successfully", f"Successfully saved as Starter: {label}") # Remove action button await action.remove() else: await show_alert("error", "Save failed", "Please check file permissions or contact the administrator") except Exception as e: # Also clean up the state when an exception occurs if 'cancel_callback' in locals(): await cancel_callback() await show_alert("error", "Operation failed", str(e)) async def handle_messages(messages: List[Dict[str, Any]]) -> None: """Handle the preset answer array""" if not messages: return for item in messages: role = item.get("role", "ai") item_type = item.get("type", "message") if item_type == "step": await handle_step_item(item) elif item_type == "message": await handle_message_item(item, role) async def handle_step_item(item: Dict[str, Any]) -> None: """Handle the step item""" step_name = item.get("name", "Processing...") step_input = item.get("input", {}) step_output = item.get("output", "") async with cl.Step(name=step_name) as step: step.input = step_input step.output = step_output async def send_message_directly(content: str, message_type: str = "assistant_message") -> cl.Message: """Send message content directly without typing effect Args: content: The content to output message_type: The message type """ message = cl.Message(content=content, type=message_type) await message.send() return message async def handle_message_item(item: Dict[str, Any], role: str) -> None: """Handle the message item""" content = item.get("content", "") elements_data = item.get("elements", []) # Build the element list elements = [] for elem_data in elements_data: elem_type = elem_data.get("type", "") elem_url = elem_data.get("url", "") elem_size = elem_data.get("size", "small") if elem_type == "image" and elem_url: elements.append(cl.Image(url=elem_url, size=elem_size)) elif elem_type == "video" and elem_url: elements.append(cl.Video(url=elem_url, size=elem_size)) elif elem_type == "audio" and elem_url: elements.append(cl.Audio(url=elem_url, size=elem_size)) # Send messages directly without typing effect if role == "user": # User message - direct output message = await send_message_directly(content, message_type="user_message") else: # AI assistant message - direct output message = await send_message_directly(content, message_type="assistant_message") # After the content output is completed, add elements at once if elements: message.elements = elements await message.update() @cl.set_starters async def set_starters(): return [starter.to_cl_starter() for starter in get_all_starters()] async def hook_by_starters(message: cl.Message): """Hook function: handle the starter message content""" # Check if it is the first user message cl_messages = cl.chat_context.get() user_messages = [msg for msg in cl_messages if msg.type == "user_message"] is_first_message = len(user_messages) == 1 if not is_first_message: return False # Strictly match the message content for starter in get_all_starters(): # Get the first user message from messages for matching if not starter.messages: continue first_user_item = None for item in starter.messages: if (item.get("role") == "user" and item.get("type") == "message"): first_user_item = item break if not first_user_item: continue starter_message = first_user_item.get("content", "") if message.content != starter_message: continue # Check and handle the image elements in the starter starter_elements = first_user_item.get("elements", []) if starter_elements and not message.elements: message.elements = [] for elem_data in starter_elements: elem_type = elem_data.get("type", "") elem_url = elem_data.get("url", "") elem_size = elem_data.get("size", "small") if elem_type == "image" and elem_url: message.elements.append(cl.Image(url=elem_url, size=elem_size)) elif elem_type == "video" and elem_url: message.elements.append(cl.Video(url=elem_url, size=elem_size)) elif elem_type == "audio" and elem_url: message.elements.append(cl.Audio(url=elem_url, size=elem_size)) await message.update() # Handle the message, skip the first user message (already sent) if starter.messages and len(starter.messages) > 1: await handle_messages(starter.messages[1:]) return True # Then use the traditional reply_handler (backward compatible) elif starter.reply_handler: await starter.reply_handler(starter) return True return False

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/AIDC-AI/Pixelle-MCP'

If you have feedback or need assistance with the MCP directory API, please join our Discord server