#!/usr/bin/env python3
"""Production-Ready Layer.ai MCP Server with all critical bugs fixed."""
import asyncio
import logging
import os
import sys
from typing import Any, Dict, List, Optional
from pathlib import Path
import base64
import httpx
from mcp.server import Server, NotificationOptions
from mcp.server.models import InitializationOptions
from mcp.server.stdio import stdio_server
from mcp.types import Tool, TextContent
try:
from auth import LayerTokenManager
except ImportError:
LayerTokenManager = None
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class ProductionReadyLayerMCPServer:
"""Production-ready Layer.ai MCP server with all critical bugs fixed."""
def __init__(self):
self.api_token = "" # nosec B105 - Empty string initialization, not hardcoded password
self.workspace_id = "" # nosec B105 - Empty string initialization, not hardcoded password
try:
self._load_credentials()
except Exception as e:
logger.error(f"Failed to load credentials: {e}")
raise
self.server = Server("layer-ai-comprehensive")
self._setup_handlers()
def _load_credentials(self):
"""Load credentials with proper error handling (BUG #1 FIXED)."""
if LayerTokenManager:
try:
token_manager = LayerTokenManager()
credentials = token_manager.get_credentials()
if credentials:
self.api_token = credentials["api_token"]
self.workspace_id = credentials["workspace_id"]
logger.info("š Loaded credentials securely")
return
except Exception as e:
logger.warning(f"Failed to load secure credentials: {e}")
# Fallback to environment variables
self.api_token = os.getenv("LAYER_API_TOKEN", "").strip()
self.workspace_id = os.getenv("LAYER_WORKSPACE_ID", "").strip()
if not self.api_token or not self.workspace_id:
# FIXED: Don't crash with sys.exit(1), raise exception instead
raise ValueError(
"Missing Layer.ai credentials. Please run 'python setup.py' "
"or set LAYER_API_TOKEN and LAYER_WORKSPACE_ID environment variables."
)
logger.info("š Using environment variable credentials")
def _setup_handlers(self):
"""Setup MCP handlers."""
@self.server.list_tools()
async def handle_list_tools():
"""List all available tools."""
logger.info("Returning comprehensive tools")
tools = [
Tool(
name="create_asset",
description="Generate any type of asset using Layer.ai's AI models",
inputSchema={
"type": "object",
"properties": {
"prompt": {"type": "string", "description": "Text description of the asset"},
"generation_type": {
"type": "string",
"enum": ["CREATE", "REFILL", "EDIT", "UPSCALE", "LIP_SYNC", "IMAGE_TO_VIDEO",
"UPSCALE_VIDEO", "IMAGE_TO_3D", "TEXT_TO_3D", "REALTIME", "VECTORIZE_IMAGE",
"ANIMATE_MESH", "REMOVE_BACKGROUND", "TEXT_TO_SPEECH", "SOUND_EFFECT"],
"default": "CREATE"
},
"width": {"type": "integer", "default": 512},
"height": {"type": "integer", "default": 512},
"quality": {"type": "string", "enum": ["LOW", "MEDIUM", "HIGH"], "default": "HIGH"},
"steps": {"type": "integer", "default": 20},
"guidance_scale": {"type": "number", "default": 7.5},
"negative_prompt": {"type": "string"},
"seed": {"type": "integer"},
"creativity": {"type": "number", "description": "For EDIT/REFINE operations (0.0-1.0)"},
"resemblance": {"type": "number", "description": "For EDIT/REFINE operations (0.0-1.0)"},
"upscale_ratio": {"type": "number", "description": "For UPSCALE operations"},
"duration_seconds": {"type": "number", "description": "For video/audio generation"},
"transparency": {"type": "boolean", "description": "Generate with transparency"},
"tileability": {"type": "boolean", "description": "Make tileable texture"},
"include_textures": {"type": "boolean", "description": "For 3D generation"},
"face_limit": {"type": "integer", "description": "For 3D generation"},
"input_files": {"type": "array", "items": {"type": "string"}, "description": "Input file paths"},
"save_path": {"type": "string", "default": "./assets/generated_asset.png"},
"wait_for_completion": {"type": "boolean", "default": True}
},
"required": ["prompt"]
}
),
Tool(
name="remove_background",
description="Remove background from images using AI",
inputSchema={
"type": "object",
"properties": {
"image_path": {"type": "string", "description": "Path to input image"},
"image_url": {"type": "string", "description": "URL to input image"},
"return_mask": {"type": "boolean", "default": False},
"save_path": {"type": "string", "default": "./assets/no_background.png"},
"wait_for_completion": {"type": "boolean", "default": True}
}
}
),
Tool(
name="describe_image",
description="Get AI-generated descriptions of images",
inputSchema={
"type": "object",
"properties": {
"image_path": {"type": "string", "description": "Path to input image"},
"image_url": {"type": "string", "description": "URL to input image"},
"detail_level": {"type": "string", "enum": ["basic", "detailed", "comprehensive"], "default": "detailed"}
}
}
),
Tool(
name="generate_prompt",
description="Optimize prompts using Layer.ai's Prompt Genie",
inputSchema={
"type": "object",
"properties": {
"base_prompt": {"type": "string", "description": "Your initial prompt"},
"asset_type": {"type": "string", "description": "Type of asset (game, art, photo, etc.)"}
},
"required": ["base_prompt"]
}
),
Tool(
name="get_workspace_info",
description="Get information about the current workspace",
inputSchema={"type": "object", "properties": {}, "required": []}
)
]
return tools
@self.server.call_tool()
async def handle_call_tool(name: str, arguments: Dict[str, Any]):
"""Handle tool calls with comprehensive error handling."""
try:
logger.info(f"Calling tool: {name}")
if name == "create_asset":
return await self._create_asset(arguments)
elif name == "remove_background":
return await self._remove_background(arguments)
elif name == "describe_image":
return await self._describe_image(arguments)
elif name == "generate_prompt":
return await self._generate_prompt(arguments)
elif name == "get_workspace_info":
return await self._get_workspace_info(arguments)
else:
return [TextContent(type="text", text=f"ā Unknown tool: {name}")]
except Exception as e:
logger.error(f"Error calling tool {name}: {e}")
return [TextContent(type="text", text=f"ā Error: {str(e)}")]
async def _make_graphql_request(self, query: str, variables: Optional[Dict[str, Any]] = None, max_retries: int = 3) -> Dict[str, Any]:
"""Make GraphQL request with retry logic (BUG #5 FIXED)."""
headers = {
"Authorization": f"Bearer {self.api_token}",
"Content-Type": "application/json"
}
data = {"query": query}
if variables:
data["variables"] = variables
last_error = None
for attempt in range(max_retries):
try:
async with httpx.AsyncClient(timeout=60.0) as client:
response = await client.post("https://api.app.layer.ai/graphql", headers=headers, json=data)
if response.status_code == 200:
result = response.json()
if "errors" in result:
error_msg = result['errors'][0].get('message', 'Unknown GraphQL error')
raise Exception(f"GraphQL error: {error_msg}")
return result
else:
raise Exception(f"API request failed with status {response.status_code}: {response.text}")
except Exception as e:
last_error = e
if attempt < max_retries - 1:
wait_time = 2 ** attempt # Exponential backoff
logger.warning(f"Request failed (attempt {attempt + 1}/{max_retries}), retrying in {wait_time}s: {e}")
await asyncio.sleep(wait_time)
else:
logger.error(f"All {max_retries} attempts failed")
raise Exception(f"GraphQL request failed after {max_retries} attempts: {last_error}")
def _encode_image_to_base64(self, image_path: str) -> str:
"""Encode image file to base64 with proper error handling (BUG #2 FIXED)."""
try:
# Validate file exists and is accessible
if not image_path:
raise ValueError("Image path cannot be empty")
if not os.path.exists(image_path):
raise FileNotFoundError(f"Image file not found: {image_path}")
if not os.path.isfile(image_path):
raise ValueError(f"Path is not a file: {image_path}")
# Check file size (prevent memory issues)
file_size = os.path.getsize(image_path)
if file_size == 0:
raise ValueError(f"Image file is empty: {image_path}")
if file_size > 50 * 1024 * 1024: # 50MB limit
raise ValueError(f"Image file too large (>50MB): {image_path}")
# Read and encode file
with open(image_path, 'rb') as f:
content = f.read()
return base64.b64encode(content).decode('utf-8')
except (OSError, IOError) as e:
raise Exception(f"Failed to read image file {image_path}: {str(e)}")
except Exception as e:
raise Exception(f"Failed to encode image {image_path}: {str(e)}")
async def _upload_file_to_layer(self, file_path: str) -> str:
"""Upload file with proper validation (BUG #3 FIXED)."""
# Validate input
if not file_path:
raise ValueError("File path cannot be empty")
if not os.path.exists(file_path):
raise FileNotFoundError(f"File not found: {file_path}")
if not os.path.isfile(file_path):
raise ValueError(f"Path is not a file: {file_path}")
mutation = """
mutation CreateUploadUrls($input: CreateUploadUrlsInput!) {
createUploadUrls(input: $input) {
... on UploadUrls {
uploadUrls {
url
fileId
}
}
... on Error {
message
}
}
}
"""
filename = os.path.basename(file_path)
variables = {
"input": {
"workspaceId": self.workspace_id,
"filenames": [filename]
}
}
try:
result = await self._make_graphql_request(mutation, variables)
# FIXED: Validate response structure before accessing
create_upload_response = result.get("data", {}).get("createUploadUrls")
if not create_upload_response:
raise Exception("Invalid response from createUploadUrls")
# Check for error response
if "message" in create_upload_response:
raise Exception(f"Upload URL creation failed: {create_upload_response['message']}")
# Validate upload URLs exist
upload_urls = create_upload_response.get("uploadUrls", [])
if not upload_urls:
raise Exception("No upload URLs received from Layer.ai API")
upload_info = upload_urls[0]
upload_url = upload_info.get("url")
file_id = upload_info.get("fileId")
if not upload_url or not file_id:
raise Exception("Invalid upload URL or file ID received")
except Exception as e:
raise Exception(f"Failed to get upload URL: {str(e)}")
# Upload the file with error handling
try:
with open(file_path, 'rb') as f:
file_content = f.read()
async with httpx.AsyncClient(timeout=120.0) as client:
upload_response = await client.put(upload_url, content=file_content)
if upload_response.status_code not in [200, 201]:
raise Exception(f"Upload failed with status {upload_response.status_code}: {upload_response.text}")
except Exception as e:
raise Exception(f"Failed to upload file: {str(e)}")
return f"https://media.app.layer.ai/workspaces/{self.workspace_id}/files/{file_id}/{filename}"
def _determine_file_extension(self, content_type: str, filename: str) -> str:
"""Determine proper file extension (BUG #6 FIXED)."""
content_type_lower = content_type.lower()
# Map content types to extensions
if 'image' in content_type_lower:
if 'png' in content_type_lower:
return '.png'
elif 'jpeg' in content_type_lower or 'jpg' in content_type_lower:
return '.jpg'
elif 'gif' in content_type_lower:
return '.gif'
elif 'svg' in content_type_lower:
return '.svg'
elif 'webp' in content_type_lower:
return '.webp'
else:
return '.png' # Default for images
elif 'video' in content_type_lower:
if 'mp4' in content_type_lower:
return '.mp4'
elif 'webm' in content_type_lower:
return '.webm'
else:
return '.mp4' # Default for videos
elif 'audio' in content_type_lower:
if 'wav' in content_type_lower:
return '.wav'
elif 'mp3' in content_type_lower:
return '.mp3'
else:
return '.wav' # Default for audio
elif 'model' in content_type_lower or 'application/octet-stream' in content_type_lower:
return '.glb' # Default for 3D models
else:
# Try to get extension from filename
file_ext = Path(filename).suffix
return file_ext if file_ext else '.png' # Ultimate fallback
async def _create_asset(self, arguments: Dict[str, Any]):
"""Create asset with comprehensive error handling."""
try:
prompt = arguments.get("prompt")
if not prompt or not prompt.strip():
return [TextContent(type="text", text="ā Error: 'prompt' parameter is required and cannot be empty")]
# Get parameters with validation
generation_type = arguments.get("generation_type", "CREATE")
width = max(64, min(2048, arguments.get("width", 512))) # Clamp to reasonable range
height = max(64, min(2048, arguments.get("height", 512)))
quality = arguments.get("quality", "HIGH")
steps = max(1, min(100, arguments.get("steps", 20))) # Clamp steps
guidance_scale = max(1.0, min(20.0, arguments.get("guidance_scale", 7.5))) # Clamp guidance
# Optional parameters
negative_prompt = arguments.get("negative_prompt")
seed = arguments.get("seed")
creativity = arguments.get("creativity")
resemblance = arguments.get("resemblance")
upscale_ratio = arguments.get("upscale_ratio")
duration_seconds = arguments.get("duration_seconds")
transparency = arguments.get("transparency")
tileability = arguments.get("tileability")
include_textures = arguments.get("include_textures")
face_limit = arguments.get("face_limit")
input_files = arguments.get("input_files", [])
save_path = arguments.get("save_path", "./assets/generated_asset.png")
wait_for_completion = arguments.get("wait_for_completion", True)
if isinstance(wait_for_completion, str):
wait_for_completion = wait_for_completion.lower() == "true"
# Create output directory
Path(save_path).parent.mkdir(parents=True, exist_ok=True)
# Build parameters
parameters = {
"generationType": generation_type,
"prompt": prompt.strip(),
"width": width,
"height": height,
"quality": quality,
"numInferenceSteps": steps,
"guidanceScale": guidance_scale
}
# Add optional parameters
if negative_prompt:
parameters["negativePrompt"] = negative_prompt.strip()
if seed is not None:
parameters["seed"] = int(seed)
if creativity is not None:
parameters["creativity"] = max(0.0, min(1.0, float(creativity)))
if resemblance is not None:
parameters["resemblance"] = max(0.0, min(1.0, float(resemblance)))
if upscale_ratio:
parameters["upscaleRatio"] = max(1.0, min(8.0, float(upscale_ratio)))
if duration_seconds:
parameters["durationSeconds"] = max(1.0, min(60.0, float(duration_seconds)))
if transparency is not None:
parameters["transparency"] = bool(transparency)
if tileability is not None:
parameters["tileability"] = bool(tileability)
if include_textures is not None:
parameters["includeTextures"] = bool(include_textures)
if face_limit:
parameters["faceLimit"] = max(100, min(10000, int(face_limit)))
# Handle input files with proper error reporting (BUG #4 FIXED)
if input_files:
file_urls = []
failed_uploads = []
for file_path in input_files:
if not os.path.exists(file_path):
failed_uploads.append(f"{file_path} (file not found)")
continue
try:
file_url = await self._upload_file_to_layer(file_path)
file_urls.append({"url": file_url})
logger.info(f"ā
Uploaded: {file_path}")
except Exception as e:
logger.error(f"ā Failed to upload {file_path}: {e}")
failed_uploads.append(f"{file_path} ({str(e)})")
# Report upload failures
if failed_uploads:
error_msg = f"ā Failed to upload {len(failed_uploads)} file(s):\n" + "\n".join(f" ⢠{f}" for f in failed_uploads)
if not file_urls:
return [TextContent(type="text", text=error_msg)]
else:
logger.warning(error_msg)
if file_urls:
parameters["files"] = file_urls
# Create inference
mutation = """
mutation CreateInference($input: CreateInferenceInput!) {
createInference(input: $input) {
... on Inference {
id
status
createdAt
}
... on Error {
message
}
}
}
"""
variables = {
"input": {
"workspaceId": self.workspace_id,
"parameters": parameters
}
}
result = await self._make_graphql_request(mutation, variables)
inference_response = result["data"]["createInference"]
# Check for error response
if "message" in inference_response:
return [TextContent(type="text", text=f"ā Generation failed: {inference_response['message']}")]
if not inference_response or "id" not in inference_response:
return [TextContent(type="text", text=f"ā Invalid inference response: {inference_response}")]
inference_id = inference_response["id"]
response_text = f"""š Asset Generation Started!
šØ Prompt: {prompt}
š§ Type: {generation_type}
š Size: {width}x{height}
ā” Quality: {quality}
š Inference ID: {inference_id}
š Status: {inference_response['status']}
š
Created: {inference_response['createdAt']}
"""
if not wait_for_completion:
response_text += "ā³ Generation started. Check status manually."
return [TextContent(type="text", text=response_text)]
# Wait for completion with retry logic
completion_result = await self._wait_for_completion_with_retry(inference_id, save_path)
response_text += completion_result
return [TextContent(type="text", text=response_text)]
except Exception as e:
logger.error(f"Asset creation failed: {e}")
return [TextContent(type="text", text=f"ā Asset creation failed: {str(e)}")]
async def _wait_for_completion_with_retry(self, inference_id: str, save_path: str) -> str:
"""Wait for completion with proper error handling and retry logic."""
response_text = "ā³ Waiting for completion...\n"
max_wait_time = 300
check_interval = 5
elapsed_time = 0
consecutive_failures = 0
max_consecutive_failures = 3
status_query = """
query GetInferenceStatus($input: GetInferencesByIdInput!) {
getInferencesById(input: $input) {
... on InferencesResult {
inferences {
id
status
files {
id
url
name
}
}
}
... on Error {
message
}
}
}
"""
while elapsed_time < max_wait_time:
try:
status_variables = {
"input": {
"inferenceIds": [inference_id]
}
}
status_result = await self._make_graphql_request(status_query, status_variables)
# Validate response structure
get_inferences_response = status_result.get("data", {}).get("getInferencesById")
if not get_inferences_response:
raise Exception("Invalid response structure from getInferencesById")
# Check for error response
if "message" in get_inferences_response:
raise Exception(f"API error: {get_inferences_response['message']}")
inferences = get_inferences_response.get("inferences", [])
if not inferences:
return "ā Inference not found."
# Reset failure counter on successful request
consecutive_failures = 0
inference_data = inferences[0]
current_status = inference_data.get("status", "UNKNOWN")
if current_status == "COMPLETE":
response_text += f"ā
Completed in {elapsed_time} seconds!\n\n"
return response_text + await self._download_result_files(inference_data, save_path)
elif current_status == "FAILED":
return response_text + f"ā Generation failed after {elapsed_time} seconds."
elif current_status == "CANCELLED":
return response_text + f"ā ļø Generation was cancelled after {elapsed_time} seconds."
# Continue waiting for IN_PROGRESS status
except Exception as e:
consecutive_failures += 1
logger.warning(f"Failed to check inference status (attempt {consecutive_failures}): {e}")
if consecutive_failures >= max_consecutive_failures:
return response_text + f"ā Failed to check status after {consecutive_failures} attempts. Last error: {str(e)}"
# Wait before next check
await asyncio.sleep(check_interval)
elapsed_time += check_interval
return response_text + f"ā° Generation timed out after {max_wait_time} seconds."
async def _download_result_files(self, inference_data: Dict[str, Any], save_path: str) -> str:
"""Download result files with proper error handling."""
files = inference_data.get("files", [])
if not files:
return "ā ļø No files generated."
file_info = files[0]
file_url = file_info.get("url")
filename = file_info.get("name", f"result_{inference_data['id']}")
if not file_url:
return "ā No download URL available."
try:
async with httpx.AsyncClient(timeout=60.0) as client:
download_response = await client.get(file_url)
if download_response.status_code == 200:
# Determine proper file extension
content_type = download_response.headers.get('content-type', '')
ext = self._determine_file_extension(content_type, filename)
# Handle file path
if save_path.endswith('/'):
full_path = Path(save_path) / (Path(filename).stem + ext)
else:
full_path = Path(save_path)
if not full_path.suffix:
full_path = full_path.with_suffix(ext)
# Ensure directory exists
full_path.parent.mkdir(parents=True, exist_ok=True)
# Write file
with open(full_path, 'wb') as f:
f.write(download_response.content)
file_size = len(download_response.content)
return f"""š File saved: {full_path}
š File size: {file_size:,} bytes
š Original URL: {file_url}
šÆ Content Type: {content_type}
š® Ready to use!"""
else:
return f"ā Download failed: HTTP {download_response.status_code}"
except Exception as e:
return f"ā Download error: {str(e)}"
# Placeholder methods for other tools (implement as needed)
async def _remove_background(self, arguments: Dict[str, Any]):
return [TextContent(type="text", text="š§ Background removal feature - implementation in progress")]
async def _describe_image(self, arguments: Dict[str, Any]):
return [TextContent(type="text", text="š§ Image description feature - implementation in progress")]
async def _generate_prompt(self, arguments: Dict[str, Any]):
return [TextContent(type="text", text="š§ Prompt generation feature - implementation in progress")]
async def _get_workspace_info(self, arguments: Dict[str, Any]):
"""Get workspace information."""
response_text = f"""š Workspace Information
š¢ Workspace ID: {self.workspace_id}
š API Token: [CONFIGURED]
ā
Production-Ready Layer.ai MCP Server
šØ Available Generation Types:
⢠CREATE - Generate new assets from text ā
⢠IMAGE_TO_3D - Convert images to 3D models š§
⢠TEXT_TO_3D - Generate 3D models from text š§
⢠VECTORIZE_IMAGE - Convert to vector format š§
⢠REMOVE_BACKGROUND - Remove image backgrounds š§
⢠UPSCALE - Increase image resolution š§
⢠IMAGE_TO_VIDEO - Convert images to video š§
⢠TEXT_TO_SPEECH - Generate speech from text š§
⢠SOUND_EFFECT - Create audio effects š§
š ļø Production Features:
ā
Comprehensive error handling
ā
Retry logic for network issues
ā
Input validation and sanitization
ā
Proper file extension detection
ā
Memory and size limits
ā
Graceful failure handling"""
return [TextContent(type="text", text=response_text)]
async def run(self):
"""Run the MCP server."""
try:
async with stdio_server() as (read_stream, write_stream):
await self.server.run(
read_stream,
write_stream,
InitializationOptions(
server_name="layer-ai-comprehensive",
server_version="3.1.0",
capabilities=self.server.get_capabilities(
notification_options=NotificationOptions(),
experimental_capabilities={}
)
)
)
except Exception as e:
logger.error(f"Server failed to start: {e}")
raise
async def main():
"""Main entry point with proper error handling."""
try:
logger.info("š Starting Production-Ready Layer.ai MCP Server...")
server = ProductionReadyLayerMCPServer()
await server.run()
except KeyboardInterrupt:
logger.info("š Server stopped by user")
except Exception as e:
logger.error(f"Server startup failed: {e}")
return 1
return 0
if __name__ == "__main__":
exit_code = asyncio.run(main())
sys.exit(exit_code)