MCP Flux Studio

#!/usr/bin/env python3 import os import json import argparse from typing import Optional import base64 from PIL import Image, ImageDraw import requests import io import time from io import BytesIO import datetime class FluxAPI: def __init__(self, api_key: Optional[str] = None): self.api_key = api_key or os.getenv("BFL_API_KEY") if not self.api_key: raise ValueError("API key must be provided or set in BFL_API_KEY environment variable") self.base_url = "https://api.bfl.ml" self.headers = {"X-Key": self.api_key} def encode_image(self, image_path: str) -> str: """Convert an image file to base64 string.""" with open(image_path, 'rb') as image_file: return base64.b64encode(image_file.read()).decode('utf-8') def save_image_from_url(self, url: str, filename: str, target_width: int = None, target_height: int = None) -> bool: """Download and save image from URL.""" try: response = requests.get(url) response.raise_for_status() # Save the original image with open(filename, 'wb') as f: f.write(response.content) # If target dimensions are specified, resize the image if target_width and target_height: with Image.open(filename) as img: # Resize image maintaining aspect ratio img = img.resize((target_width, target_height), Image.Resampling.LANCZOS) # Save resized image img.save(filename, quality=95) print(f"✨ Saved as {filename}") return True except Exception as e: print(f"Failed to save image: {str(e)}") return False def get_task_result(self, task_id: str, silent: bool = False) -> Optional[dict]: """Poll for task result.""" max_attempts = 30 attempt = 0 print("Processing image...") while attempt < max_attempts: if not silent: print(f"Processing image... (attempt {attempt + 1}/{max_attempts})") response = requests.get(f"{self.base_url}/v1/get_result", params={'id': task_id}) result = response.json() if result['status'] == 'Ready': return result elif result['status'] == 'failed': print(f"Task failed: {result.get('error', 'Unknown error')}") return None attempt += 1 time.sleep(2) print("Timeout waiting for result") return None def generate_image(self, prompt: str, model: str = "flux.1.1-pro", width: int = None, height: int = None, aspect_ratio: str = None) -> Optional[str]: """Generate an image using any FLUX model.""" endpoint = { "flux.1.1-pro": "/v1/flux-pro-1.1", "flux.1-pro": "/v1/flux-pro", "flux.1-dev": "/v1/flux-dev", "flux.1.1-ultra": "/v1/flux-pro-1.1-ultra", }.get(model) if not endpoint: raise ValueError(f"Unknown model: {model}") # Set default dimensions based on aspect ratio if provided if aspect_ratio: if aspect_ratio == '1:1': width, height = 1024, 1024 elif aspect_ratio == '4:3': width, height = 1024, 768 elif aspect_ratio == '3:4': width, height = 768, 1024 elif aspect_ratio == '16:9': width, height = 1024, 576 elif aspect_ratio == '9:16': width, height = 576, 1024 else: # Use defaults if neither aspect ratio nor dimensions are provided width = width or 1024 height = height or 768 payload = { "prompt": prompt, "width": width, "height": height, "aspect_ratio": aspect_ratio if aspect_ratio else None } response = requests.post( f"{self.base_url}{endpoint}", json=payload, headers=self.headers ) task_id = response.json().get('id') if not task_id: print("Failed to start generation task") return None result = self.get_task_result(task_id) if result and result.get('result', {}).get('sample'): return result['result']['sample'] return None def create_mask(self, size: tuple, shape: str = 'rectangle', position: str = 'center') -> Image: """Create a mask for inpainting.""" mask = Image.new('L', size, 0) draw = ImageDraw.Draw(mask) width, height = size if position == 'ground': horizon_y = height * 0.65 y_start = horizon_y - (height * 0.05) points = [ (0, y_start), (0, height), (width, height), (width, y_start) ] draw.polygon(points, fill=255) else: x1 = width * 0.25 y1 = height * 0.25 x2 = width * 0.75 y2 = height * 0.75 if shape == 'rectangle': draw.rectangle([x1, y1, x2, y2], fill=255) else: # circle center = (width // 2, height // 2) radius = min(width, height) // 4 draw.ellipse([center[0] - radius, center[1] - radius, center[0] + radius, center[1] + radius], fill=255) return mask def inpaint(self, image_path: str, prompt: str, mask_shape: str = 'circle', position: str = 'center') -> Optional[str]: """Inpaint an image using a mask.""" base_image = Image.open(image_path) mask = self.create_mask(base_image.size, shape=mask_shape, position=position) mask_path = 'temp_mask.jpg' mask.save(mask_path) payload = { "image": self.encode_image(image_path), "mask": self.encode_image(mask_path), "prompt": prompt, "steps": 50, "guidance": 60, "output_format": "jpeg", "safety_tolerance": 2 } response = requests.post( f"{self.base_url}/v1/flux-pro-1.0-fill", json=payload, headers=self.headers ) os.remove(mask_path) task_id = response.json().get('id') if not task_id: return None result = self.get_task_result(task_id) if result and result.get('result', {}).get('sample'): return result['result']['sample'] return None def control_generate(self, control_type: str, control_image: str, prompt: str, **kwargs) -> Optional[str]: """Generate an image using any supported control type.""" endpoints = { 'canny': '/v1/flux-pro-1.0-canny', 'depth': '/v1/flux-pro-1.0-depth', 'pose': '/v1/flux-pro-1.0-pose' } default_params = { 'canny': {'guidance': 30}, 'depth': {'guidance': 15}, 'pose': {'guidance': 25} } if control_type not in endpoints: raise ValueError(f"Unsupported control type: {control_type}") payload = { "prompt": prompt, "control_image": self.encode_image(control_image), "steps": kwargs.get('steps', 50), "output_format": kwargs.get('output_format', 'jpeg'), "safety_tolerance": kwargs.get('safety_tolerance', 2) } payload.update(default_params.get(control_type, {})) payload.update(kwargs) response = requests.post( f"{self.base_url}{endpoints[control_type]}", json=payload, headers=self.headers ) task_id = response.json().get('id') if not task_id: return None result = self.get_task_result(task_id) if result and result.get('result', {}).get('sample'): return result['result']['sample'] return None def img2img(self, image_path: str, prompt: str, model: str = "flux.1.1-pro", strength: float = 0.75, width: int = None, height: int = None) -> Optional[str]: """Generate an image using another image as reference""" endpoint = { "flux.1.1-pro": "/v1/flux-pro-1.1", "flux.1-pro": "/v1/flux-pro", "flux.1-dev": "/v1/flux-dev", "flux.1.1-ultra": "/v1/flux-pro-1.1-ultra", }.get(model) if not endpoint: raise ValueError(f"Unknown model: {model}") with Image.open(image_path) as img: orig_width, orig_height = img.size if width is None or height is None: width, height = orig_width, orig_height aspect_ratio = orig_height / orig_width total_pixels = width * height if total_pixels > 1048576: max_area = 1048576 width = int((max_area / aspect_ratio) ** 0.5) height = int(width * aspect_ratio) buffered = BytesIO() img.save(buffered, format="JPEG", quality=95) image_base64 = base64.b64encode(buffered.getvalue()).decode('utf-8') payload = { "prompt": prompt, "image": image_base64, "strength": strength, "width": width, "height": height, "guidance_scale": 7.5, "num_inference_steps": 50, "scheduler": "euler_ancestral", "preserve_init_image_color_profile": True } response = requests.post( f"{self.base_url}{endpoint}", headers=self.headers, json=payload ) task_id = response.json().get('id') if not task_id: print("Failed to start image-to-image task") return None result = self.get_task_result(task_id) if result and result.get('result', {}).get('sample'): return result['result']['sample'] return None def main(): parser = argparse.ArgumentParser(description="FLUX CLI - Image Generation Tool") subparsers = parser.add_subparsers(dest='command', help='Commands') # Generate command generate_parser = subparsers.add_parser('generate', help='Generate an image from a text prompt') generate_parser.add_argument('--prompt', '-p', required=True, help='Text prompt for image generation') generate_parser.add_argument('--model', '-m', choices=['flux.1.1-pro', 'flux.1-pro', 'flux.1-dev', 'flux.1.1-ultra'], default='flux.1.1-pro', help='Model to use for generation') generate_parser.add_argument('--aspect-ratio', '-ar', choices=['1:1', '4:3', '3:4', '16:9', '9:16'], help='Aspect ratio of the output image') generate_parser.add_argument('--width', '-w', type=int, help='Image width (ignored if aspect-ratio is set)') generate_parser.add_argument('--height', type=int, help='Image height (ignored if aspect-ratio is set)') generate_parser.add_argument('--output', '-o', default='generated.jpg', help='Output filename') # Inpaint command inpaint_parser = subparsers.add_parser('inpaint', help='Inpaint an image using a mask') inpaint_parser.add_argument('--image', '-i', required=True, help='Input image for inpainting') inpaint_parser.add_argument('--prompt', '-p', required=True, help='Text prompt for inpainting') inpaint_parser.add_argument('--mask-shape', '-m', choices=['circle', 'rectangle'], default='circle', help='Shape of the mask') inpaint_parser.add_argument('--position', '-pos', choices=['center', 'ground'], default='center', help='Position of the mask') inpaint_parser.add_argument('--output', '-o', default='inpainted.jpg', help='Output filename') # Control command control_parser = subparsers.add_parser('control', help='Generate an image using control') control_parser.add_argument('--type', '-t', required=True, choices=['canny', 'depth', 'pose'], help='Type of control to use') control_parser.add_argument('--image', '-i', required=True, help='Input control image') control_parser.add_argument('--prompt', '-p', required=True, help='Text prompt for generation') control_parser.add_argument('--steps', type=int, default=50, help='Number of inference steps') control_parser.add_argument('--guidance', type=float, help='Guidance scale') control_parser.add_argument('--output', '-o', help='Output filename') # Img2img command img2img_parser = subparsers.add_parser('img2img', help='Generate an image using another image as reference') img2img_parser.add_argument('--image', '-i', required=True, help='Input image') img2img_parser.add_argument('--prompt', '-p', required=True, help='Text prompt for generation') img2img_parser.add_argument('--model', '-m', choices=['flux.1.1-pro', 'flux.1-pro', 'flux.1-dev', 'flux.1.1-ultra'], default='flux.1.1-pro', help='Model to use') img2img_parser.add_argument('--strength', '-s', type=float, default=0.85, help='Generation strength') img2img_parser.add_argument('--width', '-w', type=int, help='Output width') img2img_parser.add_argument('--height', type=int, help='Output height') img2img_parser.add_argument('--output', '-o', default='outputs/generated.jpg', help='Output filename') img2img_parser.add_argument('--name', '-n', required=True, help='Name for the generation') args = parser.parse_args() try: api = FluxAPI() if args.command == 'generate': print(f"Generating image with {args.model}...") print(f"Prompt: {args.prompt}") image_url = api.generate_image( prompt=args.prompt, model=args.model, width=args.width, height=args.height, aspect_ratio=args.aspect_ratio ) if image_url and api.save_image_from_url(image_url, args.output): print("✨ Generation complete!") else: print("Generation failed") elif args.command == 'inpaint': print(f"Inpainting image...") print(f"Prompt: {args.prompt}") image_url = api.inpaint(args.image, args.prompt, args.mask_shape, args.position) if image_url and api.save_image_from_url(image_url, args.output): print("✨ Inpainting complete!") else: print("Inpainting failed") elif args.command == 'control': output = args.output or f"{args.type}_result.jpg" kwargs = {'steps': args.steps} if args.guidance is not None: kwargs['guidance'] = args.guidance result_url = api.control_generate(args.type, args.image, args.prompt, **kwargs) if result_url and api.save_image_from_url(result_url, output): print("✨ Control generation complete!") else: print("Control generation failed") elif args.command == 'img2img': os.makedirs(os.path.dirname(args.output), exist_ok=True) print(f"Generating image-to-image with {args.model}...") print(f"Input image: {args.image}") print(f"Prompt: {args.prompt}") result = api.img2img( image_path=args.image, prompt=args.prompt, model=args.model, strength=args.strength, width=args.width, height=args.height ) if result and api.save_image_from_url(result, args.output): print("✨ Generation complete!") else: print("Generation failed") except Exception as e: print(f"Error: {str(e)}") return 1 return 0 if __name__ == '__main__': exit(main())