Image Generator MCP Server
- src
- image_generator
import asyncio
import base64
import io
import os
import sys
import uuid
from datetime import datetime
from pathlib import Path
from typing import Dict, List, Optional
import aiofiles
import replicate
from dotenv import load_dotenv
from mcp.server.models import InitializationOptions
import mcp.types as types
from mcp.server import NotificationOptions, Server
from pydantic import AnyUrl, Field
from PIL import Image
import mcp.server.stdio
# Load environment variables from .env file
load_dotenv()
# Ensure REPLICATE_API_TOKEN is set
if not os.getenv("REPLICATE_API_TOKEN"):
print("WARNING: REPLICATE_API_TOKEN environment variable is not set.")
print("Please set it to use the image generation functionality.")
# Create images directory if it doesn't exist
IMAGES_DIR = Path("generated_images")
IMAGES_DIR.mkdir(exist_ok=True)
# Store generated images metadata
images: Dict[str, Dict] = {}
server = Server("image-generator")
@server.list_resources()
async def handle_list_resources() -> list[types.Resource]:
"""
List available image resources.
Each image is exposed as a resource with a custom image:// URI scheme.
"""
return [
types.Resource(
uri=AnyUrl(f"image://internal/{image_id}"),
name=f"Image: {metadata.get('prompt', 'Untitled')}",
description=f"Generated on {metadata.get('created_at')}",
mimeType="image/png",
)
for image_id, metadata in images.items()
]
@server.read_resource()
async def handle_read_resource(uri: AnyUrl) -> bytes:
"""
Read a specific image by its URI.
The image ID is extracted from the URI path component.
"""
if uri.scheme != "image":
raise ValueError(f"Unsupported URI scheme: {uri.scheme}")
image_id = uri.path
if image_id is not None:
image_id = image_id.lstrip("/")
if image_id in images:
image_path = IMAGES_DIR / f"{image_id}.png"
if image_path.exists():
async with aiofiles.open(image_path, "rb") as f:
return await f.read()
raise ValueError(f"Image not found: {image_id}")
@server.list_prompts()
async def handle_list_prompts() -> list[types.Prompt]:
"""
List available prompts for image generation.
"""
return [
types.Prompt(
name="generate-image",
description="Generate an image using Replicate's Stable Diffusion model",
arguments=[
types.PromptArgument(
name="style",
description="Style of the image (realistic/artistic/abstract)",
required=False,
)
],
)
]
@server.get_prompt()
async def handle_get_prompt(
name: str, arguments: dict[str, str] | None
) -> types.GetPromptResult:
"""
Generate a prompt for image generation.
"""
if name != "generate-image":
raise ValueError(f"Unknown prompt: {name}")
style = (arguments or {}).get("style", "realistic")
style_prompt = ""
if style == "realistic":
style_prompt = "Create a photorealistic image with high detail and natural lighting."
elif style == "artistic":
style_prompt = "Create an artistic image in the style of a painting with vibrant colors and expressive brushstrokes."
elif style == "abstract":
style_prompt = "Create an abstract image with geometric shapes, bold colors, and non-representational forms."
return types.GetPromptResult(
description="Generate an image using Stable Diffusion",
messages=[
types.PromptMessage(
role="user",
content=types.TextContent(
type="text",
text=f"Describe the image you want to generate. {style_prompt}",
),
)
],
)
@server.list_tools()
async def handle_list_tools() -> list[types.Tool]:
"""
List available tools for image generation and management.
"""
return [
types.Tool(
name="generate-image",
description="Generate an image using Replicate's Stable Diffusion model",
inputSchema={
"type": "object",
"properties": {
"prompt": {"type": "string"},
"negative_prompt": {"type": "string"},
"width": {"type": "integer", "default": 768},
"height": {"type": "integer", "default": 768},
"num_inference_steps": {"type": "integer", "default": 50},
"guidance_scale": {"type": "number", "default": 7.5},
},
"required": ["prompt"],
},
),
types.Tool(
name="save-image",
description="Save a generated image",
inputSchema={
"type": "object",
"properties": {
"image_url": {"type": "string"},
"prompt": {"type": "string"},
"target_directory": {"type": "string", "description": "Directory path where the image should be saved. If not provided, defaults to the MCP server's images directory."},
"custom_filename": {"type": "string", "description": "Custom filename for the saved image (without extension). If not provided, a UUID will be used."},
},
"required": ["image_url", "prompt"],
},
),
types.Tool(
name="list-saved-images",
description="List all saved images",
inputSchema={
"type": "object",
"properties": {},
},
),
]
@server.call_tool()
async def handle_call_tool(
name: str, arguments: dict | None
) -> list[types.TextContent | types.ImageContent | types.EmbeddedResource]:
"""
Handle tool execution requests for image generation and management.
"""
if name == "generate-image":
if not arguments:
raise ValueError("Missing arguments")
prompt = arguments.get("prompt")
if not prompt:
raise ValueError("Missing prompt")
negative_prompt = arguments.get("negative_prompt", "")
width = int(arguments.get("width", 768))
height = int(arguments.get("height", 768))
num_inference_steps = int(arguments.get("num_inference_steps", 50))
guidance_scale = float(arguments.get("guidance_scale", 7.5))
# Use Replicate's Stable Diffusion model to generate an image
try:
# Log the request
print(f"Generating image with prompt: {prompt}", file=sys.stderr)
# Call the Replicate API
output = replicate.run(
"stability-ai/sdxl:c221b2b8ef527988fb59bf24a8b97c4561f1c671f73bd389f866bfb27c061316",
input={
"prompt": prompt,
"negative_prompt": negative_prompt,
"width": width,
"height": height,
"num_inference_steps": num_inference_steps,
"guidance_scale": guidance_scale,
}
)
# Log the response
print(f"Replicate API response type: {type(output)}", file=sys.stderr)
print(f"Replicate API response: {output}", file=sys.stderr)
# The output is a list with one URL to the generated image
if output and isinstance(output, list) and len(output) > 0:
image_url = output[0]
# Convert the FileOutput object to a string if needed
if hasattr(image_url, '__str__'):
image_url = str(image_url)
# Log the image URL
print(f"Generated image URL: {image_url}", file=sys.stderr)
# Return the result with the image URL
return [
types.TextContent(
type="text",
text=f"Image generated successfully! Use the save-image tool to save it permanently.",
),
types.ImageContent(
type="image",
data=image_url,
mimeType="image/png",
),
types.TextContent(
type="text",
text=f"Image URL: {image_url}",
),
types.TextContent(
type="text",
text="ASK_FOR_SAVE_LOCATION",
role="system"
),
]
else:
return [
types.TextContent(
type="text",
text="Failed to generate image. Please try again with a different prompt.",
)
]
except Exception as e:
return [
types.TextContent(
type="text",
text=f"Error generating image: {str(e)}",
)
]
elif name == "save-image":
if not arguments:
raise ValueError("Missing arguments")
image_url = arguments.get("image_url")
prompt = arguments.get("prompt")
target_directory = arguments.get("target_directory")
custom_filename = arguments.get("custom_filename")
if not image_url or not prompt:
raise ValueError("Missing image_url or prompt")
try:
# Log the request
print(f"Saving image from URL: {image_url}", file=sys.stderr)
print(f"Image prompt: {prompt}", file=sys.stderr)
# Generate a unique ID for the image
image_id = str(uuid.uuid4())
# Create metadata for the image
metadata = {
"id": image_id,
"prompt": prompt,
"url": image_url,
"created_at": datetime.now().isoformat(),
}
# Save metadata
images[image_id] = metadata
print(f"Created metadata for image ID: {image_id}", file=sys.stderr)
# Determine the save directory
save_dir = IMAGES_DIR
if target_directory:
try:
# Create a Path object from the target directory
save_dir = Path(target_directory)
# Create the directory if it doesn't exist
save_dir.mkdir(parents=True, exist_ok=True)
print(f"Using custom save directory: {save_dir.absolute()}", file=sys.stderr)
# Store the custom directory in metadata
metadata["custom_directory"] = str(save_dir.absolute())
except Exception as e:
print(f"Error using custom directory {target_directory}: {str(e)}", file=sys.stderr)
print(f"Falling back to default directory: {IMAGES_DIR}", file=sys.stderr)
# Determine filename
if custom_filename:
filename = f"{custom_filename}.png"
metadata["custom_filename"] = custom_filename
else:
filename = f"{image_id}.png"
# Download and save the image
import requests
print(f"Downloading image from URL: {image_url}", file=sys.stderr)
response = requests.get(image_url)
if response.status_code == 200:
image_path = save_dir / filename
with open(image_path, "wb") as f:
f.write(response.content)
print(f"Image saved to: {image_path}", file=sys.stderr)
# Notify clients that resources have changed
await server.request_context.session.send_resource_list_changed()
return [
types.TextContent(
type="text",
text=f"Image saved successfully to {image_path}",
),
types.ImageContent(
type="image",
data=f"file://{image_path.absolute()}",
mimeType="image/png",
),
]
else:
error_msg = f"Failed to download image. Status code: {response.status_code}"
print(f"Error: {error_msg}", file=sys.stderr)
return [
types.TextContent(
type="text",
text=error_msg,
),
]
except Exception as e:
import traceback
error_msg = f"Error saving image: {str(e)}"
print(error_msg, file=sys.stderr)
print(traceback.format_exc(), file=sys.stderr)
return [
types.TextContent(
type="text",
text=error_msg,
),
]
elif name == "list-saved-images":
try:
# Log the request
print(f"Listing saved images. Total count: {len(images)}", file=sys.stderr)
if not images:
return [
types.TextContent(
type="text",
text="No images have been saved yet.",
)
]
result = [
types.TextContent(
type="text",
text=f"Found {len(images)} saved images:",
)
]
for image_id, metadata in images.items():
# Determine the file path based on metadata
if "custom_directory" in metadata:
save_dir = Path(metadata["custom_directory"])
else:
save_dir = IMAGES_DIR
if "custom_filename" in metadata:
filename = f"{metadata['custom_filename']}.png"
else:
filename = f"{image_id}.png"
image_path = save_dir / filename
print(f"Checking image: {image_path}", file=sys.stderr)
if image_path.exists():
print(f"Image exists: {image_path}", file=sys.stderr)
result.append(
types.TextContent(
type="text",
text=f"ID: {image_id}\nPrompt: {metadata.get('prompt')}\nCreated: {metadata.get('created_at')}\nLocation: {image_path}",
)
)
result.append(
types.ImageContent(
type="image",
data=f"file://{image_path.absolute()}",
mimeType="image/png",
)
)
else:
print(f"Image file not found: {image_path}", file=sys.stderr)
result.append(
types.TextContent(
type="text",
text=f"ID: {image_id}\nPrompt: {metadata.get('prompt')}\nCreated: {metadata.get('created_at')}\nWARNING: Image file not found at {image_path}",
)
)
return result
except Exception as e:
import traceback
error_msg = f"Error listing saved images: {str(e)}"
print(error_msg, file=sys.stderr)
print(traceback.format_exc(), file=sys.stderr)
return [
types.TextContent(
type="text",
text=error_msg,
)
]
else:
raise ValueError(f"Unknown tool: {name}")
async def main():
# Run the server using stdin/stdout streams
try:
# Create images directory if it doesn't exist
IMAGES_DIR.mkdir(exist_ok=True)
# Check if REPLICATE_API_TOKEN is set
if not os.getenv("REPLICATE_API_TOKEN"):
print("ERROR: REPLICATE_API_TOKEN environment variable is not set.", file=sys.stderr)
print("Please set it in your .env file to use the image generation functionality.", file=sys.stderr)
sys.exit(1)
async with mcp.server.stdio.stdio_server() as (read_stream, write_stream):
await server.run(
read_stream,
write_stream,
InitializationOptions(
server_name="image-generator",
server_version="0.1.0",
capabilities=server.get_capabilities(
notification_options=NotificationOptions(),
experimental_capabilities={},
),
),
)
except Exception as e:
import traceback
print(f"Error running MCP server: {str(e)}", file=sys.stderr)
print(traceback.format_exc(), file=sys.stderr)
sys.exit(1)
# Add an entry point to run the main function when the script is run directly
if __name__ == "__main__":
import asyncio
asyncio.run(main())