diffugen.py•35.5 kB
import sys
import os
import logging
import subprocess
import uuid
import re
import argparse
import json
from pathlib import Path
import random
import time
import threading
import atexit
# Simplified logging setup - log only essential info
logging.basicConfig(
filename='diffugen_debug.log',
level=logging.INFO, # Changed from DEBUG to INFO
format='%(asctime)s - %(levelname)s - %(message)s'
)
# Queue management system
class GenerationQueue:
def __init__(self):
self.lock = threading.Lock()
self.is_busy = False
self.lock_file = os.path.join(os.getcwd(), "diffugen.lock")
# Clean up any stale lock on startup
self._remove_lock_file()
# Register cleanup on exit
atexit.register(self._remove_lock_file)
def acquire(self, timeout=0):
"""Try to acquire the lock for image generation.
Returns True if successful, False if busy."""
with self.lock:
# First check local thread lock
if self.is_busy:
logging.info("Image generation already in progress (local lock)")
return False
# Then check file lock (for inter-process locking)
if os.path.exists(self.lock_file):
try:
# Check if the lock file is stale (older than 30 minutes)
lock_time = os.path.getmtime(self.lock_file)
if time.time() - lock_time > 1800: # 30 minutes
logging.warning("Found stale lock file, removing it")
self._remove_lock_file()
else:
with open(self.lock_file, 'r') as f:
pid = f.read().strip()
logging.info(f"Image generation already in progress by process {pid}")
return False
except Exception as e:
logging.error(f"Error checking lock file: {e}")
return False
# If we got here, no active generation is running
try:
with open(self.lock_file, 'w') as f:
f.write(str(os.getpid()))
self.is_busy = True
logging.info(f"Acquired generation lock (PID: {os.getpid()})")
return True
except Exception as e:
logging.error(f"Error creating lock file: {e}")
return False
def release(self):
"""Release the generation lock."""
with self.lock:
self.is_busy = False
self._remove_lock_file()
logging.info("Released generation lock")
def _remove_lock_file(self):
"""Remove the lock file if it exists."""
try:
if os.path.exists(self.lock_file):
os.remove(self.lock_file)
except Exception as e:
logging.error(f"Error removing lock file: {e}")
# Create global generation queue
generation_queue = GenerationQueue()
# Helper function to print to stderr
def log_to_stderr(message):
print(message, file=sys.stderr, flush=True)
# Try to import real FastMCP with minimal error handling
mcp = None
try:
from mcp.server.fastmcp import FastMCP
mcp = FastMCP("DiffuGen")
except ImportError as e:
log_to_stderr(f"Error importing FastMCP: {e}")
# Simple fallback MCP implementation
class FallbackMCP:
def __init__(self, name):
self.name = name
log_to_stderr(f"Using fallback MCP server: {name}")
def tool(self, *args, **kwargs):
def decorator(func): return func
return decorator
def start(self): return True
mcp = FallbackMCP("DiffuGen")
if mcp is None:
log_to_stderr("Failed to create MCP server")
sys.exit(1)
# Function to load configuration
def load_config():
config = {
"sd_cpp_path": os.path.join(os.getcwd(), "stable-diffusion.cpp"),
"models_dir": None, # Will be set based on sd_cpp_path if not provided
"output_dir": os.getcwd(),
"default_model": None, # No default model, will be determined by function
"vram_usage": "adaptive",
"gpu_layers": -1,
"default_params": {
"width": 512,
"height": 512,
"steps": {
"flux-schnell": 8,
"flux-dev": 20,
"sdxl": 20,
"sd3": 20,
"sd15": 20
},
"cfg_scale": {
"flux-schnell": 1.0,
"flux-dev": 1.0,
"sdxl": 7.0,
"sd3": 7.0,
"sd15": 7.0
},
"sampling_method": "euler"
}
}
# Try to read from environment variables first (highest priority)
if "SD_CPP_PATH" in os.environ:
config["sd_cpp_path"] = os.path.normpath(os.environ.get("SD_CPP_PATH"))
logging.info(f"Using SD_CPP_PATH from environment: {config['sd_cpp_path']}")
if "DIFFUGEN_OUTPUT_DIR" in os.environ:
config["output_dir"] = os.path.normpath(os.environ.get("DIFFUGEN_OUTPUT_DIR"))
logging.info(f"Using output_dir from environment: {config['output_dir']}")
if "DIFFUGEN_DEFAULT_MODEL" in os.environ:
config["default_model"] = os.environ.get("DIFFUGEN_DEFAULT_MODEL")
logging.info(f"Using default_model from environment: {config['default_model']}")
if "DIFFUGEN_VRAM_USAGE" in os.environ:
config["vram_usage"] = os.environ.get("DIFFUGEN_VRAM_USAGE")
logging.info(f"Using vram_usage from environment: {config['vram_usage']}")
# Try to read from diffugen.json configuration (second priority)
try:
diffugen_json_path = os.path.join(os.getcwd(), "diffugen.json")
if os.path.exists(diffugen_json_path):
logging.info(f"Loading configuration from {diffugen_json_path}")
with open(diffugen_json_path, 'r') as f:
diffugen_config = json.load(f)
# Extract values from the mcpServers.diffugen structure
if 'mcpServers' in diffugen_config and 'diffugen' in diffugen_config.get('mcpServers', {}):
server_config = diffugen_config['mcpServers']['diffugen']
# Extract environment variables
if 'env' in server_config:
env_vars = server_config['env']
if 'SD_CPP_PATH' in env_vars and 'SD_CPP_PATH' not in os.environ:
config['sd_cpp_path'] = os.path.normpath(env_vars['SD_CPP_PATH'])
logging.info(f"Using sd_cpp_path from diffugen.json: {config['sd_cpp_path']}")
if 'default_model' in env_vars and 'DIFFUGEN_DEFAULT_MODEL' not in os.environ:
config['default_model'] = env_vars['default_model']
logging.info(f"Using default_model from diffugen.json: {config['default_model']}")
# Extract resources
if 'resources' in server_config:
resources = server_config['resources']
if 'output_dir' in resources and 'DIFFUGEN_OUTPUT_DIR' not in os.environ:
config['output_dir'] = os.path.normpath(resources['output_dir'])
logging.info(f"Using output_dir from diffugen.json: {config['output_dir']}")
if 'models_dir' in resources:
config['models_dir'] = os.path.normpath(resources['models_dir'])
logging.info(f"Using models_dir from diffugen.json: {config['models_dir']}")
if 'vram_usage' in resources and 'DIFFUGEN_VRAM_USAGE' not in os.environ:
config['vram_usage'] = resources['vram_usage']
logging.info(f"Using vram_usage from diffugen.json: {config['vram_usage']}")
if 'gpu_layers' in resources:
config['gpu_layers'] = resources['gpu_layers']
logging.info(f"Using gpu_layers from diffugen.json: {config['gpu_layers']}")
# Extract default_params
if 'default_params' in server_config:
config['default_params'] = server_config['default_params']
logging.info("Loaded default_params from diffugen.json")
except Exception as e:
logging.warning(f"Error loading diffugen.json configuration: {e}")
# If models_dir wasn't set, use sd_cpp_path/models
if not config["models_dir"]:
config["models_dir"] = os.path.join(config["sd_cpp_path"], "models")
return config
# Load the configuration
config = load_config()
# Core path initialization (using the config we loaded)
sd_cpp_path = os.path.normpath(config["sd_cpp_path"])
default_output_dir = os.path.normpath(config["output_dir"])
# Create output directory
os.makedirs(default_output_dir, exist_ok=True)
# Helper functions to get model-specific parameters from config
def get_default_steps(model):
"""Get default steps for a model"""
model = model.lower()
# Try to get from model-specific defaults, fall back to general default
try:
if isinstance(config["default_params"]["steps"], dict):
return config["default_params"]["steps"].get(model, 20)
else:
return config["default_params"].get("steps", 20)
except (KeyError, TypeError):
logging.warning(f"Could not find default steps for model {model}, using fallback value of 20")
return 20
def get_default_cfg_scale(model):
"""Get default CFG scale for a model"""
model = model.lower()
# Try to get from model-specific defaults, fall back to general default
try:
if isinstance(config["default_params"]["cfg_scale"], dict):
# For Flux models, default to 1.0, for others default to 7.0
default_value = 1.0 if model.startswith("flux-") else 7.0
return config["default_params"]["cfg_scale"].get(model, default_value)
else:
return config["default_params"].get("cfg_scale", 7.0)
except (KeyError, TypeError):
# For Flux models, default to 1.0, for others default to 7.0
default_value = 1.0 if model.startswith("flux-") else 7.0
logging.warning(f"Could not find default cfg_scale for model {model}, using fallback value of {default_value}")
return default_value
def get_default_sampling_method(model=None):
"""Get default sampling method, optionally model-specific"""
try:
# First try to get model-specific sampling method if provided
if model and isinstance(config["default_params"]["sampling_method"], dict):
model = model.lower()
return config["default_params"]["sampling_method"].get(model, "euler")
# Otherwise get general default
elif isinstance(config["default_params"]["sampling_method"], dict):
return config["default_params"]["sampling_method"].get("default", "euler")
else:
return config["default_params"].get("sampling_method", "euler")
except (KeyError, TypeError):
logging.warning("Could not find default sampling method, using fallback value of 'euler'")
return "euler"
# Lazy-loaded model paths - only resolved when needed
_model_paths = {}
_supporting_files = {}
def get_model_path(model_name):
"""Lazy-load model paths only when needed"""
if not _model_paths:
# Initialize paths only on first access
models_dir = config["models_dir"]
_model_paths.update({
"flux-schnell": os.path.join(models_dir, "flux", "flux1-schnell-q8_0.gguf"),
"flux-dev": os.path.join(models_dir, "flux", "flux1-dev-q8_0.gguf"),
"sdxl": os.path.join(models_dir, "sdxl-1.0-base.safetensors"),
"sd3": os.path.join(models_dir, "sd3-medium.safetensors"),
"sd15": os.path.join(models_dir, "sd15.safetensors"),
})
return _model_paths.get(model_name)
def get_supporting_file(file_name):
"""Lazy-load supporting file paths only when needed"""
if not _supporting_files:
# Initialize paths only on first access
models_dir = config["models_dir"]
_supporting_files.update({
"vae": os.path.join(models_dir, "ae.sft"),
"clip_l": os.path.join(models_dir, "clip_l.safetensors"),
"t5xxl": os.path.join(models_dir, "t5xxl_fp16.safetensors"),
"sdxl_vae": os.path.join(models_dir, "sdxl_vae-fp16-fix.safetensors")
})
return _supporting_files.get(file_name)
# Minimal ready message
log_to_stderr("DiffuGen ready")
@mcp.tool()
def generate_stable_diffusion_image(prompt: str, model: str = None, output_dir: str = None,
width: int = None, height: int = None, steps: int = None,
cfg_scale: float = None, seed: int = -1,
sampling_method: str = None, negative_prompt: str = "") -> dict:
"""Generate an image using standard Stable Diffusion models (SDXL, SD3 or SD1.5)
Args:
prompt: The image description to generate
model: Model to use (sdxl, sd3, sd15 only - NOT flux models)
output_dir: Directory to save the image (defaults to current directory)
width: Image width in pixels
height: Image height in pixels
steps: Number of diffusion steps
cfg_scale: CFG scale parameter
seed: Seed for reproducibility (-1 for random)
sampling_method: Sampling method (euler, euler_a, heun, dpm2, dpm++2s_a, dpm++2m, dpm++2mv2, lcm)
negative_prompt: Negative prompt (for SD models ONLY)
Returns:
A dictionary containing the path to the generated image and the command used
"""
logging.info(f"Generate stable diffusion image request: prompt={prompt}, model={model}")
# Use the generation queue to prevent concurrent generation
if not generation_queue.acquire():
return {
"success": False,
"error": "Another image generation is already in progress. Please try again when the current generation completes."
}
try:
# Sanitize prompt and negative prompt
sanitized_prompt = re.sub(r'[^\w\s.,;:!?\'"-]+', '', prompt).strip()
sanitized_negative_prompt = negative_prompt
if negative_prompt:
sanitized_negative_prompt = re.sub(r'[^\w\s.,;:!?\'"-]+', '', negative_prompt).strip()
# Select appropriate model
if not model:
model = "sd15" # Default to SD1.5
model = model.lower()
# Only allow SD models in this function
if model.startswith("flux-"):
error_msg = f"Please use generate_flux_image for Flux models (received {model})"
logging.error(error_msg)
return {"success": False, "error": error_msg}
# Normalize model name
if model in ["sdxl", "sdxl-1.0", "sdxl1.0"]:
model = "sdxl"
elif model in ["sd3", "sd3-medium"]:
model = "sd3"
elif model in ["sd15", "sd1.5", "sd-1.5"]:
model = "sd15"
# Use default parameters if not specified
if width is None:
width = config["default_params"]["width"]
if height is None:
height = config["default_params"]["height"]
if steps is None:
steps = get_default_steps(model)
if cfg_scale is None:
cfg_scale = get_default_cfg_scale(model)
if sampling_method is None:
sampling_method = get_default_sampling_method(model)
if output_dir is None:
output_dir = default_output_dir
# Ensure output directory exists
os.makedirs(output_dir, exist_ok=True)
# Get model path
model_path = get_model_path(model)
if not model_path:
error_msg = f"Model not found: {model}"
logging.error(error_msg)
return {"success": False, "error": error_msg}
# Generate a random seed if not provided
if seed == -1:
seed = random.randint(1, 1000000000)
# Create output filename
sanitized_prompt_for_filename = re.sub(r'[^\w\s]+', '', sanitized_prompt).strip()
sanitized_prompt_for_filename = re.sub(r'\s+', '_', sanitized_prompt_for_filename)
truncated_prompt = sanitized_prompt_for_filename[:20].lower() # Limit to 20 chars
unique_id = uuid.uuid4().hex[:8]
output_filename = f"{model}_{truncated_prompt}_{unique_id}.png"
output_path = os.path.join(output_dir, output_filename)
# Prepare command for sd.cpp
bin_path = os.path.join(sd_cpp_path, "build", "bin", "sd")
base_command = [
bin_path,
"-p", sanitized_prompt
]
# Add negative prompt if provided
if sanitized_negative_prompt:
base_command.extend(["--negative-prompt", sanitized_negative_prompt])
# Add remaining parameters
base_command.extend([
"--cfg-scale", str(cfg_scale),
"--sampling-method", sampling_method,
"--steps", str(steps),
"-H", str(height),
"-W", str(width),
"-o", output_path,
"--seed", str(seed)
])
# Add model-specific paths
base_command.extend(["--diffusion-model", model_path])
# Get supporting files
vae_path = get_supporting_file("vae")
clip_path = get_supporting_file("clip")
clip_l_path = get_supporting_file("clip_l")
t5xxl_path = get_supporting_file("t5xxl")
if vae_path:
base_command.extend(["--vae", vae_path])
if model == "sdxl":
if clip_path and t5xxl_path:
base_command.extend(["--clip", clip_path])
base_command.extend(["--t5xxl", t5xxl_path])
elif model == "sd15":
if clip_path:
base_command.extend(["--clip", clip_path])
# Add GPU and memory usage settings
if config["vram_usage"] != "adaptive":
base_command.append(f"--{config['vram_usage']}")
if config["gpu_layers"] != -1:
base_command.extend(["--gpu-layer", str(config["gpu_layers"])])
try:
# Run the command
logging.info(f"Running command: {' '.join(base_command)}")
result = subprocess.run(
base_command,
check=True,
capture_output=True,
text=True
)
logging.info(f"Successfully generated image at: {output_path} (size: {os.path.getsize(output_path)} bytes)")
# Format the response to match OpenAPI style
image_description = "Image of " + sanitized_prompt[:50] + ("..." if len(sanitized_prompt) > 50 else "")
markdown_response = f"Here's the image you requested:\n\n{image_description}\n\n**Generation Details:**\n\nModel: {model}\nResolution: {width}x{height} pixels\nSteps: {steps}\nCFG Scale: {cfg_scale}\nSampling Method: {sampling_method}\nSeed: {seed}\nThis image was generated based on your prompt {sanitized_prompt}. Let me know if you'd like adjustments!"
return {
"success": True,
"image_path": output_path,
"prompt": sanitized_prompt,
"model": model,
"width": width,
"height": height,
"steps": steps,
"cfg_scale": cfg_scale,
"seed": seed,
"sampling_method": sampling_method,
"command": " ".join(base_command),
"output": result.stdout,
"markdown_response": markdown_response
}
except subprocess.CalledProcessError as e:
error_msg = f"Process error (exit code {e.returncode}): {str(e)}"
logging.error(f"Image generation failed: {error_msg}")
logging.error(f"Command: {' '.join(base_command)}")
if e.stderr:
logging.error(f"Process stderr: {e.stderr}")
return {
"success": False,
"error": error_msg,
"stderr": e.stderr,
"command": " ".join(base_command),
"exit_code": e.returncode
}
except FileNotFoundError as e:
error_msg = f"Binary not found at {base_command[0]}"
logging.error(f"Image generation failed: {error_msg}")
return {
"success": False,
"error": error_msg,
"command": " ".join(base_command),
}
except Exception as e:
error_msg = f"Unexpected error: {str(e)}"
logging.error(f"Image generation failed with unexpected error: {error_msg}")
logging.error(f"Command: {' '.join(base_command)}")
return {
"success": False,
"error": error_msg,
"command": " ".join(base_command),
}
finally:
# Always release the lock when done
generation_queue.release()
@mcp.tool()
def generate_flux_image(prompt: str, output_dir: str = None, cfg_scale: float = None,
sampling_method: str = None, steps: int = None,
model: str = None, width: int = None,
height: int = None, seed: int = -1) -> dict:
"""
Generate an image using Flux stable diffusion models ONLY.
Use this tool for any request involving flux-schnell or flux-dev models.
Args:
prompt: The image description to generate
model: Flux model to use (only "flux-schnell" or "flux-dev" are supported)
output_dir: Directory to save the image
cfg_scale: CFG scale parameter (default: 1.0 for all flux models)
sampling_method: Sampling method to use (default: euler)
steps: Number of diffusion steps (default: 8 for flux-schnell, 20 for flux-dev)
width: Image width in pixels (default: 512)
height: Image height in pixels (default: 512)
seed: Seed for reproducibility (-1 for random)
Returns:
A dictionary containing the path to the generated image and the command used
"""
logging.info(f"Generate flux image request: prompt={prompt}, model={model}")
# Use the generation queue to prevent concurrent generation
if not generation_queue.acquire():
return {
"success": False,
"error": "Another image generation is already in progress. Please try again when the current generation completes."
}
try:
# Sanitize prompt
sanitized_prompt = re.sub(r'[^\w\s.,;:!?\'"-]+', '', prompt).strip()
# Select appropriate model
if not model:
model = "flux-schnell" # Default to flux-schnell
model = model.lower()
# Only allow Flux models in this function
if not model.startswith("flux-"):
# If the user specified an SD model, suggest using the other function
if model in ["sdxl", "sd3", "sd15", "sd1.5"]:
error_msg = f"Please use generate_stable_diffusion_image for standard SD models (received {model})"
else:
error_msg = f"Invalid model: {model}. For Flux image generation, use 'flux-schnell' or 'flux-dev'"
logging.error(error_msg)
return {"success": False, "error": error_msg}
# Normalize model name
if model in ["flux-schnell", "flux_schnell", "fluxschnell", "flux1-schnell"]:
model = "flux-schnell"
elif model in ["flux-dev", "flux_dev", "fluxdev", "flux1-dev"]:
model = "flux-dev"
# Use default parameters if not specified
if width is None:
width = config["default_params"]["width"]
if height is None:
height = config["default_params"]["height"]
if steps is None:
steps = get_default_steps(model)
if cfg_scale is None:
cfg_scale = get_default_cfg_scale(model)
if sampling_method is None:
sampling_method = get_default_sampling_method(model)
if output_dir is None:
output_dir = default_output_dir
# Ensure output directory exists
os.makedirs(output_dir, exist_ok=True)
# Get model path
model_path = get_model_path(model)
if not model_path:
error_msg = f"Model not found: {model}"
logging.error(error_msg)
return {"success": False, "error": error_msg}
# Generate a random seed if not provided
if seed == -1:
seed = random.randint(1, 1000000000)
# Create output filename
sanitized_prompt_for_filename = re.sub(r'[^\w\s]+', '', sanitized_prompt).strip()
sanitized_prompt_for_filename = re.sub(r'\s+', '_', sanitized_prompt_for_filename)
truncated_prompt = sanitized_prompt_for_filename[:20].lower() # Limit to 20 chars
unique_id = uuid.uuid4().hex[:8]
output_filename = f"{model}_{truncated_prompt}_{unique_id}.png"
output_path = os.path.join(output_dir, output_filename)
# Prepare command for sd.cpp
bin_path = os.path.join(sd_cpp_path, "build", "bin", "sd")
base_command = [
bin_path,
"-p", sanitized_prompt,
"--cfg-scale", str(cfg_scale),
"--sampling-method", sampling_method,
"--steps", str(steps),
"-H", str(height),
"-W", str(width),
"-o", output_path,
"--diffusion-fa", # Add Flux-specific flag
"--seed", str(seed)
]
# Add model-specific paths
base_command.extend(["--diffusion-model", model_path])
# Get supporting files for Flux
vae_path = get_supporting_file("vae")
clip_l_path = get_supporting_file("clip_l")
t5xxl_path = get_supporting_file("t5xxl")
if vae_path:
base_command.extend(["--vae", vae_path])
if clip_l_path:
base_command.extend(["--clip_l", clip_l_path])
if t5xxl_path:
base_command.extend(["--t5xxl", t5xxl_path])
# Add GPU and memory usage settings
if config["vram_usage"] != "adaptive":
base_command.append(f"--{config['vram_usage']}")
if config["gpu_layers"] != -1:
base_command.extend(["--gpu-layer", str(config["gpu_layers"])])
try:
# Run the command
logging.info(f"Running command: {' '.join(base_command)}")
result = subprocess.run(
base_command,
check=True,
capture_output=True,
text=True
)
logging.info(f"Successfully generated image at: {output_path} (size: {os.path.getsize(output_path)} bytes)")
# Format the response to match OpenAPI style
image_description = "Image of " + sanitized_prompt[:50] + ("..." if len(sanitized_prompt) > 50 else "")
markdown_response = f"Here's the image you requested:\n\n{image_description}\n\n**Generation Details:**\n\nModel: {model}\nResolution: {width}x{height} pixels\nSteps: {steps}\nCFG Scale: {cfg_scale}\nSampling Method: {sampling_method}\nSeed: {seed}\nThis image was generated based on your prompt {sanitized_prompt}. Let me know if you'd like adjustments!"
return {
"success": True,
"image_path": output_path,
"prompt": sanitized_prompt,
"model": model,
"width": width,
"height": height,
"steps": steps,
"cfg_scale": cfg_scale,
"seed": seed,
"sampling_method": sampling_method,
"command": " ".join(base_command),
"output": result.stdout,
"markdown_response": markdown_response
}
except subprocess.CalledProcessError as e:
error_msg = f"Process error (exit code {e.returncode}): {str(e)}"
logging.error(f"Image generation failed: {error_msg}")
logging.error(f"Command: {' '.join(base_command)}")
if e.stderr:
logging.error(f"Process stderr: {e.stderr}")
return {
"success": False,
"error": error_msg,
"stderr": e.stderr,
"command": " ".join(base_command),
"exit_code": e.returncode
}
except FileNotFoundError as e:
error_msg = f"Binary not found at {base_command[0]}"
logging.error(f"Image generation failed: {error_msg}")
return {
"success": False,
"error": error_msg,
"command": " ".join(base_command),
}
except Exception as e:
error_msg = f"Unexpected error: {str(e)}"
logging.error(f"Image generation failed with unexpected error: {error_msg}")
logging.error(f"Command: {' '.join(base_command)}")
return {
"success": False,
"error": error_msg,
"command": " ".join(base_command),
}
finally:
# Always release the lock when done
generation_queue.release()
if __name__ == "__main__":
try:
# Check if command line arguments are provided for direct image generation
if len(sys.argv) > 1:
# Parse command line arguments
parser = argparse.ArgumentParser(description="Generate SD\FLux Images")
parser.add_argument("prompt", type=str, help="The image description to generate")
parser.add_argument("--model", type=str,
help="Model to use (flux-schnell, flux-dev, sdxl, sd3, sd15)")
parser.add_argument("--width", type=int, default=config["default_params"]["width"],
help="Image width in pixels")
parser.add_argument("--height", type=int, default=config["default_params"]["height"],
help="Image height in pixels")
# For steps and cfg_scale, we'll determine the default based on the model after parsing
parser.add_argument("--steps", type=int, default=None,
help="Number of diffusion steps")
parser.add_argument("--cfg-scale", type=float, dest="cfg_scale", default=None,
help="CFG scale parameter")
parser.add_argument("--seed", type=int, default=-1,
help="Seed for reproducibility (-1 for random)")
parser.add_argument("--sampling-method", type=str, dest="sampling_method",
default=None,
help="Sampling method")
parser.add_argument("--negative-prompt", type=str, dest="negative_prompt", default="",
help="Negative prompt")
parser.add_argument("--output-dir", type=str, dest="output_dir", default=None,
help="Directory to save the image")
# Parse arguments
args, unknown = parser.parse_known_args()
# Determine model - use from args, config, or choose appropriate default
if args.model is None:
args.model = config["default_model"]
# If still None, prompt the user to specify a model
if args.model is None:
log_to_stderr("Model not specified. Please specify a model using --model. Available options:")
log_to_stderr(" Flux models: flux-schnell, flux-dev")
log_to_stderr(" SD models: sdxl, sd3, sd15")
sys.exit(1)
# Set model-specific defaults if not provided
if args.steps is None:
args.steps = get_default_steps(args.model)
if args.cfg_scale is None:
args.cfg_scale = get_default_cfg_scale(args.model)
if args.sampling_method is None:
args.sampling_method = get_default_sampling_method(args.model)
# Determine which generation function to use based on model
if args.model.lower().startswith("flux-"):
log_to_stderr(f"Generating Flux image with model: {args.model}")
result = generate_flux_image(
prompt=args.prompt,
model=args.model,
width=args.width,
height=args.height,
steps=args.steps,
cfg_scale=args.cfg_scale,
seed=args.seed,
sampling_method=args.sampling_method,
output_dir=args.output_dir
)
else:
log_to_stderr(f"Generating SD image with model: {args.model}")
result = generate_stable_diffusion_image(
prompt=args.prompt,
model=args.model,
width=args.width,
height=args.height,
steps=args.steps,
cfg_scale=args.cfg_scale,
seed=args.seed,
sampling_method=args.sampling_method,
negative_prompt=args.negative_prompt,
output_dir=args.output_dir
)
# Print the result path
if result.get("success", False):
print(f"Image generated successfully: {result['image_path']}")
sys.exit(0)
else:
log_to_stderr(f"Image generation failed: {result.get('error', 'Unknown error')}")
sys.exit(1)
else:
# No arguments provided, start the MCP server
mcp.run()
except Exception as e:
logging.error(f"Error running DiffuGen: {e}")
sys.exit(1)