Skip to main content
Glama

MCP Image Server

mcp_image.py20.2 kB
#!/usr/bin/env python3 import os import sys import asyncio import httpx import logging from io import BytesIO from datetime import datetime from PIL import Image as PILImage from urllib.parse import urlparse from mcp.server.fastmcp import FastMCP, Image, Context from typing import List, Dict, Any, Union, Optional MAX_IMAGE_SIZE = 1024 # Maximum dimension size in pixels TEMP_DIR = "./Temp" DATA_DIR = "./data" # Ensure directories exist os.makedirs(DATA_DIR, exist_ok=True) os.makedirs(TEMP_DIR, exist_ok=True) # Configure logging: first disable other loggers logging.getLogger("httpx").setLevel(logging.WARNING) logging.getLogger("httpcore").setLevel(logging.WARNING) logging.getLogger("asyncio").setLevel(logging.WARNING) logging.getLogger("mcp").setLevel(logging.WARNING) # Configure our logger log_filename = os.path.join(DATA_DIR, datetime.now().strftime("%d-%m-%y.log")) formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') # Create handlers file_handler = logging.FileHandler(log_filename) file_handler.setFormatter(formatter) console_handler = logging.StreamHandler(sys.stderr) console_handler.setFormatter(formatter) # Set up our logger logger = logging.getLogger("image-mcp") logger.setLevel(logging.DEBUG) logger.addHandler(file_handler) logger.addHandler(console_handler) # Prevent double logging logger.propagate = False # Create a FastMCP server instance mcp = FastMCP("image-service") async def process_image_data(data: bytes, content_type: str, image_source: str, ctx: Context) -> Image | None: """Process image data and return an MCP Image object.""" try: # If image is not large, try to log dimensions without processing if len(data) <= 1048576: try: with PILImage.open(BytesIO(data)) as img: width, height = img.size logger.debug(f"Original image dimensions from {image_source}: {width}x{height}") logger.debug(f"Image format from PIL: {img.format}, mode: {img.mode}") except Exception as e: logger.debug(f"Could not determine dimensions for {image_source}: {e}") # Ensure content_type is valid and doesn't include 'image/' if content_type.startswith('image/'): content_type = content_type.split('/')[-1] logger.debug(f"Creating Image object with format: {content_type}") return Image(data=data, format=content_type) # For large images, save to temp file and process temp_path = os.path.join(TEMP_DIR, f"temp_image_{hash(image_source)}." + content_type.split('/')[-1]) with open(temp_path, "wb") as f: f.write(data) try: # First pass: get dimensions and basic info with PILImage.open(temp_path) as img: orig_width, orig_height = img.size orig_format = img.format orig_mode = img.mode logger.debug(f"Original image dimensions from {image_source}: {orig_width}x{orig_height}") logger.debug(f"Large image format from PIL: {orig_format}, mode: {orig_mode}") # Calculate optimal resize factor if image is very large max_dimension = max(orig_width, orig_height) initial_scale = 1.0 if max_dimension > 3000: initial_scale = 3000 / max_dimension logger.debug(f"Very large image detected ({max_dimension}px), will start with scale factor: {initial_scale}") # Second pass: process the image with PILImage.open(temp_path) as img: if img.mode in ('RGBA', 'P'): img = img.convert('RGB') # Apply initial scale if needed if initial_scale < 1.0: width = int(orig_width * initial_scale) height = int(orig_height * initial_scale) img = img.resize((width, height), PILImage.LANCZOS) else: width, height = img.size quality = 85 scale_factor = 1.0 while True: img_byte_arr = BytesIO() # Create a copy for this iteration to avoid accumulating transforms if scale_factor < 1.0: current_width = int(width * scale_factor) current_height = int(height * scale_factor) current_img = img.resize((current_width, current_height), PILImage.LANCZOS) else: current_img = img current_width, current_height = width, height current_img.save(img_byte_arr, format='JPEG', quality=quality, optimize=True) processed_data = img_byte_arr.getvalue() # Clean up the temporary image if we created one if scale_factor < 1.0 and hasattr(current_img, 'close'): current_img.close() # Target 800KB to leave buffer for any MCP overhead if len(processed_data) <= 819200: # 800KB logger.debug(f"Processed image dimensions from {image_source}: {current_width}x{current_height} (quality={quality})") logger.debug(f"Returning processed image with format: jpeg, size: {len(processed_data)} bytes") return Image(data=processed_data, format='jpeg') # Try reducing quality first if quality > 20: quality -= 10 logger.debug(f"Reducing quality to {quality} for {image_source}, current size: {len(processed_data)} bytes") else: # Then try scaling down scale_factor *= 0.8 if current_width * scale_factor < 200 or current_height * scale_factor < 200: ctx.error("Unable to compress image to acceptable size while maintaining quality") logger.error(f"Failed processing image from {image_source}: dimensions too small") return None logger.debug(f"Applying scale factor {scale_factor} to image from {image_source}") quality = 85 # Reset quality when changing size except MemoryError as e: ctx.error(f"Out of memory processing large image: {str(e)}") logger.error(f"MemoryError processing image from {image_source}: {str(e)}") return None except Exception as e: ctx.error(f"Image processing error: {str(e)}") logger.exception(f"Exception processing image from {image_source}") return None finally: if os.path.exists(temp_path): os.remove(temp_path) except Exception as e: ctx.error(f"Error processing image: {str(e)}") logger.exception(f"Unexpected error processing {image_source}") return None async def process_local_image(file_path: str, ctx: Context) -> Dict[str, Any]: """Processes a local image file and returns a dictionary with the result.""" try: if not os.path.exists(file_path): error_msg = f"File not found: {file_path}" ctx.error(error_msg) logger.error(error_msg) return {"path": file_path, "error": error_msg} # Determine content type based on file extension _, ext = os.path.splitext(file_path) ext = ext[1:].lower() if ext else "jpeg" # Default to jpeg if no extension # Map extension to proper MIME type mime_type_map = { "jpg": "jpeg", "jpeg": "jpeg", "png": "png", "gif": "gif", "bmp": "bmp", "webp": "webp", "tiff": "tiff", "tif": "tiff" } content_type = mime_type_map.get(ext, "jpeg") # Default to jpeg if unknown extension logger.debug(f"Local image {file_path} has extension '{ext}', mapped to content type '{content_type}'") # For large files, read and process directly without loading entire file into memory file_size = os.path.getsize(file_path) if file_size > 1048576: logger.debug(f"Large local image detected: {file_path} ({file_size} bytes)") # Process the image directly using the same logic as for URL images return await process_large_local_image(file_path, content_type, ctx) # For smaller files, read the entire content with open(file_path, "rb") as f: file_data = f.read() logger.debug(f"Read local image from {file_path} with {len(file_data)} bytes") processed_image = await process_image_data(file_data, content_type, file_path, ctx) if processed_image is None: return {"path": file_path, "error": "Failed to process image"} return {"path": file_path, "image": processed_image} except Exception as e: error_msg = f"Error processing local image {file_path}: {str(e)}" ctx.error(error_msg) logger.exception(error_msg) return {"path": file_path, "error": error_msg} async def process_large_local_image(file_path: str, content_type: str, ctx: Context) -> Dict[str, Any]: """Process a large local image file directly without loading it entirely into memory.""" temp_path = None try: # Create a temporary file path for processing temp_path = os.path.join(TEMP_DIR, f"temp_local_{os.path.basename(file_path)}") # First pass: get dimensions and basic info with PILImage.open(file_path) as img: orig_width, orig_height = img.size orig_format = img.format orig_mode = img.mode logger.debug(f"Original large local image dimensions from {file_path}: {orig_width}x{orig_height}") logger.debug(f"Original image format: {orig_format}, mode: {orig_mode}") # Calculate optimal resize factor if image is very large max_dimension = max(orig_width, orig_height) initial_scale = 1.0 if max_dimension > 4000: initial_scale = 4000 / max_dimension logger.debug(f"Very large image detected, will start with scale factor: {initial_scale}") # Second pass: process the image with PILImage.open(file_path) as img: if img.mode in ('RGBA', 'P'): img = img.convert('RGB') # Apply initial scale if needed if initial_scale < 1.0: width = int(orig_width * initial_scale) height = int(orig_height * initial_scale) img = img.resize((width, height), PILImage.LANCZOS) else: width, height = img.size quality = 75 # Start with lower quality for large images scale_factor = 1.0 while True: # Save the processed image to a temporary BytesIO img_byte_arr = BytesIO() # Create a copy for this iteration to avoid accumulating transforms if scale_factor < 1.0: current_width = int(width * scale_factor) current_height = int(height * scale_factor) current_img = img.resize((current_width, current_height), PILImage.LANCZOS) else: current_img = img current_width, current_height = width, height current_img.save(img_byte_arr, format='JPEG', quality=quality, optimize=True) processed_data = img_byte_arr.getvalue() # Clean up the temporary image if we created one if scale_factor < 1.0 and hasattr(current_img, 'close'): current_img.close() # Target 800KB to leave buffer for any MCP overhead if len(processed_data) <= 819200: # 800KB logger.debug(f"Successfully compressed large local image {file_path} to {len(processed_data)} bytes (quality={quality}, dimensions={current_width}x{current_height})") return {"path": file_path, "image": Image(data=processed_data, format='jpeg')} # Try reducing quality first if quality > 30: quality -= 10 logger.debug(f"Reducing quality to {quality} for {file_path}") else: # Then try scaling down scale_factor *= 0.8 if current_width * scale_factor < 200 or current_height * scale_factor < 200: error_msg = f"Unable to compress large local image {file_path} to acceptable size while maintaining quality" ctx.error(error_msg) logger.error(error_msg) return {"path": file_path, "error": error_msg} logger.debug(f"Applying scale factor {scale_factor} to image {file_path}") quality = 85 # Reset quality when changing size except MemoryError as e: error_msg = f"Out of memory processing large local image {file_path}: {str(e)}" ctx.error(error_msg) logger.error(error_msg) return {"path": file_path, "error": error_msg} except Exception as e: error_msg = f"Error processing large local image {file_path}: {str(e)}" ctx.error(error_msg) logger.exception(error_msg) return {"path": file_path, "error": error_msg} finally: # Clean up temporary file if it exists if temp_path and os.path.exists(temp_path): os.remove(temp_path) async def fetch_single_image(url: str, client: httpx.AsyncClient, ctx: Context) -> Dict[str, Any]: """Fetches and processes a single image asynchronously.""" try: parsed = urlparse(url) if not all([parsed.scheme in ['http', 'https'], parsed.netloc]): error_msg = f"Invalid URL: {url}" ctx.error(error_msg) return {"url": url, "error": error_msg} response = await client.get(url) response.raise_for_status() content_type = response.headers.get('content-type', '') if not content_type.startswith('image/'): error_msg = f"Not an image (got {content_type})" ctx.error(error_msg) return {"url": url, "error": error_msg} logger.debug(f"Fetched image from {url} with {len(response.content)} bytes") logger.debug(f"Content-Type from server: {content_type}") # Extract the format from content-type format_type = content_type.split('/')[-1] logger.debug(f"Extracted format type: {format_type}") processed_image = await process_image_data(response.content, format_type, url, ctx) if processed_image is None: return {"url": url, "error": "Failed to process image"} return {"url": url, "image": processed_image} except httpx.HTTPError as e: error_msg = f"HTTP error: {str(e)}" ctx.error(error_msg) return {"url": url, "error": error_msg} except Exception as e: error_msg = f"Unexpected error: {str(e)}" ctx.error(error_msg) return {"url": url, "error": error_msg} def is_url(path_or_url: str) -> bool: """Determine if the given string is a URL or a local file path.""" parsed = urlparse(path_or_url) return bool(parsed.scheme and parsed.netloc) async def process_images_async(image_sources: List[str], ctx: Context) -> List[Dict[str, Any]]: """Process multiple images (URLs or local files) concurrently.""" if not image_sources: raise ValueError("No image sources provided") # Separate URLs from local file paths urls = [src for src in image_sources if is_url(src)] local_paths = [src for src in image_sources if not is_url(src)] results = [] # Process URLs if any if urls: logger.debug(f"Processing {len(urls)} URLs") async with httpx.AsyncClient() as client: url_tasks = [fetch_single_image(url, client, ctx) for url in urls] url_results = await asyncio.gather(*url_tasks) results.extend(url_results) # Process local files if any if local_paths: logger.debug(f"Processing {len(local_paths)} local files") local_tasks = [process_local_image(path, ctx) for path in local_paths] local_results = await asyncio.gather(*local_tasks) results.extend(local_results) # Ensure results are in the same order as input sources ordered_results = [] for src in image_sources: for result in results: if (src == result.get("url", None)) or (src == result.get("path", None)): ordered_results.append(result) break return ordered_results @mcp.tool() async def fetch_images(image_sources: List[str], ctx: Context) -> List[Image | None]: """ Fetch and process images from URLs or local file paths, returning them in a format suitable for LLMs. This tool accepts a list of image sources which can be either: 1. URLs pointing to web-hosted images (http:// or https://) 2. Local file paths pointing to images stored on the local filesystem (e.g., "C:/images/photo1.jpg") For a single image, provide a one-element list. The function will process images in parallel when multiple sources are provided. Images that exceed the size limit (1MB) will be automatically compressed while maintaining aspect ratio and reasonable quality. Args: image_sources: A list of image URLs or local file paths. For a single image, provide a one-element list. Returns: A list of Image objects or None values (if processing failed) in the same order as the input sources. """ try: start_time = asyncio.get_event_loop().time() # Validate input if not image_sources: ctx.error("No image sources provided") logger.error("fetch_images called with empty source list") return [] # Log the types of sources we're processing url_count = sum(1 for src in image_sources if is_url(src)) local_count = len(image_sources) - url_count logger.debug(f"Processing {len(image_sources)} image sources: {url_count} URLs and {local_count} local files") # Process all images results = await process_images_async(image_sources, ctx) # Extract just the Image objects or None values image_results = [] for result in results: if "image" in result: image_results.append(result["image"]) else: image_results.append(None) elapsed = asyncio.get_event_loop().time() - start_time success_count = sum(1 for r in image_results if r is not None) logger.debug( f"Processed {len(image_sources)} images in {elapsed:.2f} seconds. " f"Success: {success_count}, Failed: {len(image_sources) - success_count}" ) return image_results except Exception as e: logger.exception("Error in fetch_images") ctx.error(f"Failed to process images: {str(e)}") return [None] * len(image_sources) if __name__ == "__main__": mcp.run(transport='stdio')

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/IA-Programming/mcp-images'

If you have feedback or need assistance with the MCP directory API, please join our Discord server