Skip to main content
Glama

After Effects Motion Control Panel

complete_mcp_fix.py36.4 kB
#!/usr/bin/env python """ Complete MCP Fix This script fixes all issues with the Motion Control Panel system: 1. Command file detection issue 2. Live Gemini processing display 3. Accurate command execution status 4. Proper Gemini command processing """ import os import sys import json import time import shutil import logging import asyncio import websockets import requests from pathlib import Path from typing import Dict, Any, Optional # Configure logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', handlers=[ logging.StreamHandler(sys.stdout), logging.FileHandler("complete_mcp_fix.log") ] ) logger = logging.getLogger(__name__) # Configuration TEMP_DIR = "C:/ae_temp" COMMAND_FILE = os.path.join(TEMP_DIR, "command.json") RESULT_FILE = os.path.join(TEMP_DIR, "result.json") STATUS_FILE = os.path.join(TEMP_DIR, "status.json") WEB_UI_PORT = 8000 WEBSOCKET_PATH = "/ws" # Check for Gemini API key GEMINI_API_KEY = os.environ.get("GEMINI_API_KEY", "") if not GEMINI_API_KEY: try: # Try to load from .env file from dotenv import load_dotenv load_dotenv() GEMINI_API_KEY = os.environ.get("GEMINI_API_KEY", "") except ImportError: pass # Gemini configuration GEMINI_ENABLED = bool(GEMINI_API_KEY) GEMINI_API_URL = "https://generativelanguage.googleapis.com/v1beta/models/gemini-1.5-flash:generateContent" # System prompt for Gemini SYSTEM_PROMPT = """You are an AI assistant that helps users control After Effects through natural language commands. Your task is to convert user requests into structured JSON commands that can be executed by After Effects. The JSON object you return should have this structure: { "action": "string", // The action to perform (e.g., 'create_text_layer') "params": { // Parameters for the action // Action-specific parameters }, "confirmation_message": "string" // Message to show user } Supported actions and their parameters: 1. import_media: - filePath: string (required) - Path to the media file - type: string (optional) - Type of media (image, video, audio) - position: array (optional) - [x, y] position - scale: number (optional) - Scale factor - rotation: number (optional) - Rotation in degrees - opacity: number (optional) - Opacity (0-100) 2. create_text_layer: - text: string (required) - Text content - name: string (optional) - Layer name - position: array (optional) - [x, y] position - fontSize: number (optional) - Font size - color: string (optional) - Color in hex format - fontFamily: string (optional) - Font family name 3. apply_text_animation: - layerName: string (required) - Name of text layer - animationType: string (required) - Type of animation - duration: number (optional) - Animation duration - easing: string (optional) - Easing function 4. create_animation: - type: string (required) - Type of animation (fade, scale, rotation, position, color) - startValue: any (required) - Starting value - endValue: any (required) - Ending value - duration: number (optional) - Duration in seconds 5. apply_color_animation: - layerName: string (required) - Name of layer - colors: array (required) - Array of color hex values - duration: number (required) - Animation duration in seconds - speed: number (optional) - Animation speed 6. apply_effect: - effectName: string (required) - Name of effect - effectParams: object (optional) - Effect properties 7. create_composition: - name: string (required) - Composition name - width: number (optional) - Width in pixels - height: number (optional) - Height in pixels - duration: number (optional) - Duration in seconds - frameRate: number (optional) - Frame rate 8. create_solid_layer: - name: string (required) - Layer name - width: number (optional) - Width in pixels - height: number (optional) - Height in pixels - color: array (optional) - [r, g, b] color values - position: array (optional) - [x, y] position 9. create_shape_layer: - shape: string (required) - Type of shape (rectangle, circle, star, polygon) - position: array (optional) - [x, y] position - size: array (optional) - [width, height] - color: string (optional) - Fill color in hex format 10. run_custom_code: - code: string (required) - Custom ExtendScript code to execute IMPORTANT: Use EXACTLY these action names as After Effects will reject commands with incorrect action names. Return ONLY the JSON object, no additional text or explanations.""" class MCPFixer: """Complete MCP Fixer class""" def __init__(self): self.ensure_temp_dir() self.websocket = None self.command_queue = [] self.processing_command = False self.last_command_time = 0 self.last_check_time = 0 # Check Gemini status if GEMINI_ENABLED: logger.info("Gemini API is enabled") else: logger.warning("Gemini API is not enabled. Set GEMINI_API_KEY environment variable for AI features.") def ensure_temp_dir(self): """Ensure the temporary directory exists""" try: Path(TEMP_DIR).mkdir(parents=True, exist_ok=True) logger.info(f"Temp directory ready: {TEMP_DIR}") return True except Exception as e: logger.error(f"Failed to create temp directory: {e}") return False def clean_temp_files(self): """Clean up any existing command or result files""" try: if os.path.exists(COMMAND_FILE): os.remove(COMMAND_FILE) logger.info(f"Removed existing command file") if os.path.exists(RESULT_FILE): os.remove(RESULT_FILE) logger.info(f"Removed existing result file") if os.path.exists(STATUS_FILE): os.remove(STATUS_FILE) logger.info(f"Removed existing status file") return True except Exception as e: logger.error(f"Error cleaning temp files: {e}") return False def create_command_file(self, command_data): """Create a command file for After Effects""" try: # Ensure directory exists self.ensure_temp_dir() # Create a temporary file first temp_file = f"{COMMAND_FILE}.tmp" with open(temp_file, 'w') as f: json.dump(command_data, f, indent=2) # Move to final location (atomic operation) shutil.move(temp_file, COMMAND_FILE) logger.info(f"Command file created at: {COMMAND_FILE}") logger.info(f"Command data: {json.dumps(command_data)[:100]}...") # Update status file self.update_status("command_created", f"Command file created at {time.time()}") # Update last command time self.last_command_time = time.time() return True except Exception as e: logger.error(f"Error creating command file: {e}") self.update_status("error", f"Error creating command file: {str(e)}") return False def update_status(self, status, message): """Update the status file""" try: status_data = { "status": status, "message": message, "timestamp": time.time() } with open(STATUS_FILE, 'w') as f: json.dump(status_data, f, indent=2) logger.info(f"Status updated: {status} - {message}") return True except Exception as e: logger.error(f"Error updating status: {e}") return False async def send_websocket_message(self, message_type, message_data): """Send a message to the web UI via websocket""" if not self.websocket: logger.warning("No websocket connection available") return False try: message = { "type": message_type, "data": message_data, "timestamp": time.time() } await self.websocket.send(json.dumps(message)) logger.info(f"Websocket message sent: {message_type}") return True except Exception as e: logger.error(f"Error sending websocket message: {e}") return False async def process_with_gemini(self, prompt: str) -> Dict[str, Any]: """Process a prompt with Gemini API""" if not GEMINI_ENABLED: return { "status": "error", "message": "Gemini API is not enabled. Set GEMINI_API_KEY environment variable." } try: # Update status await self.send_websocket_message("ai_status", { "status": "Thinking", "message": "Gemini is processing your command..." }) # Prepare the request headers = { "Content-Type": "application/json", "x-goog-api-key": GEMINI_API_KEY } data = { "contents": [ { "role": "user", "parts": [ { "text": f"{SYSTEM_PROMPT}\n\nUser command: {prompt}" } ] } ], "generationConfig": { "temperature": 0.2, "topP": 0.8, "topK": 40 } } # Send the request response = requests.post( GEMINI_API_URL, headers=headers, json=data ) if response.status_code != 200: logger.error(f"Gemini API error: {response.status_code} - {response.text}") return { "status": "error", "message": f"Gemini API error: {response.status_code}" } # Parse the response response_data = response.json() if "candidates" not in response_data or not response_data["candidates"]: return { "status": "error", "message": "No response from Gemini API" } # Extract the text text = response_data["candidates"][0]["content"]["parts"][0]["text"] # Try to parse as JSON try: # Clean up the text - remove markdown code blocks if present if "```json" in text: text = text.split("```json")[1].split("```")[0].strip() elif "```" in text: text = text.split("```")[1].split("```")[0].strip() command_json = json.loads(text) # Validate the command if "action" not in command_json: command_json["action"] = "unknown" if "params" not in command_json: command_json["params"] = {} if "confirmation_message" not in command_json: command_json["confirmation_message"] = f"Executing {command_json['action']}" return { "status": "success", "data": command_json, "raw_response": text } except json.JSONDecodeError: logger.error(f"Failed to parse Gemini response as JSON: {text}") return { "status": "error", "message": "Failed to parse Gemini response as JSON", "raw_response": text } except Exception as e: logger.error(f"Error processing with Gemini: {str(e)}") return { "status": "error", "message": f"Error processing with Gemini: {str(e)}" } async def map_command_action(self, command): """Map command actions to ensure compatibility with After Effects""" if not isinstance(command, dict): return command # Command action mapping action_mapping = { # Map to the exact command names in mcp_bridge.jsx "addTextLayer": "create_text_layer", "importMedia": "import_media", "addTextAnimation": "apply_text_animation", "addLayerAnimation": "create_animation", "addColorAnimation": "apply_color_animation", "addEffect": "apply_effect", "createComposition": "create_composition", "addSolidLayer": "create_solid_layer", "addShapeLayer": "create_shape_layer", "executeScript": "run_custom_code", # Add missing mappings to ensure all commands work "createTextLayer": "create_text_layer", "createAnimation": "create_animation", "applyTextAnimation": "apply_text_animation", "applyColorAnimation": "apply_color_animation", "applyEffect": "apply_effect", "modifyLayer": "modify_layer", "deleteLayer": "delete_layer", "duplicateLayer": "duplicate_layer", "precomposeLayers": "precompose_layers", "animateEffect": "animate_effect", "selectLayerByName": "select_layer_by_name", "importImageLayer": "import_image_layer", "runCustomCode": "run_custom_code" } # Map the action if needed if "action" in command and command["action"] in action_mapping: original_action = command["action"] command["action"] = action_mapping[original_action] logger.info(f"Mapped action from '{original_action}' to '{command['action']}'") return command async def handle_custom_command(self, command_text): """Handle custom commands by generating ExtendScript code""" try: # Update status self.update_status("processing_custom", f"Processing custom command: {command_text}") # Send processing status to web UI await self.send_websocket_message("ai_status", { "status": "Processing", "message": f"Creating custom command: {command_text}" }) # Process with Gemini to create custom ExtendScript code if GEMINI_ENABLED: # Create a custom prompt for ExtendScript generation custom_prompt = f"""Generate ExtendScript code for Adobe After Effects to perform the following task: {command_text} The code will be executed in After Effects and should be valid ExtendScript. Use the following variables that are available in the context: - app: The After Effects application object - comp: The active composition - project: The current project Return ONLY the ExtendScript code without any explanations or markdown formatting. Do not include ```javascript or ``` markers. Example of good response: var myLayer = comp.layers.addText("Hello World"); myLayer.position.setValue([960, 540]); """ # Send thinking status await self.send_websocket_message("ai_status", { "status": "Thinking", "message": "Creating custom ExtendScript code..." }) # Process with Gemini result = await self.process_with_gemini(custom_prompt) if result["status"] == "success": # Extract the code from the response code = result["raw_response"] # Create a command to run custom code command = { "action": "run_custom_code", "params": { "code": code }, "confirmation_message": f"Executing custom command: {command_text}" } # Send processing status await self.send_websocket_message("ai_status", { "status": "Creating", "message": "Custom code generated" }) # Send log message await self.send_websocket_message("log", { "message": f"Generated ExtendScript code for: {command_text}", "level": "info" }) # Create the command file self.create_command_file(command) # Send confirmation message await self.send_websocket_message("log", { "message": command.get("confirmation_message"), "level": "success" }) return { "status": "success", "message": "Custom command generated" } else: # Fallback to simple text layer if Gemini fails logger.warning(f"Gemini processing failed: {result['message']}") # Send error status await self.send_websocket_message("ai_status", { "status": "Error", "message": "Error creating custom command, using fallback" }) # Create a fallback command command = { "action": "create_text_layer", "params": { "text": f"Custom Command Failed: {command_text}", "fontSize": 50, "color": "#FF5555" }, "confirmation_message": "Created fallback text layer" } # Create the command file self.create_command_file(command) return { "status": "error", "message": f"Failed to create custom command: {result['message']}" } else: # Gemini not enabled, use simple text layer logger.warning("Gemini not enabled, using simple text layer") # Send status await self.send_websocket_message("ai_status", { "status": "Limited", "message": "Gemini AI not enabled, using simple command" }) # Create a simple command command = { "action": "create_text_layer", "params": { "text": f"Custom Command: {command_text}", "fontSize": 50, "color": "#00AAFF" }, "confirmation_message": "Created text layer with command" } # Create the command file self.create_command_file(command) return { "status": "success", "message": "Simple command created" } except Exception as e: logger.error(f"Error handling custom command: {e}") # Update status self.update_status("error", f"Error handling custom command: {str(e)}") # Send error status to web UI await self.send_websocket_message("ai_status", { "status": "Error", "message": f"Error handling custom command: {str(e)}" }) return { "status": "error", "message": f"Error handling custom command: {str(e)}" } async def process_gemini_command(self, command_text): """Process a Gemini command""" try: # Update status self.update_status("processing_gemini", f"Processing Gemini command: {command_text}") # Send processing status to web UI await self.send_websocket_message("ai_status", { "status": "Processing", "message": f"Processing command: {command_text}" }) # Check if this is a custom command request is_custom_command = False custom_command_keywords = ["custom", "create code", "write code", "generate script", "make a script", "custom command"] for keyword in custom_command_keywords: if keyword.lower() in command_text.lower(): is_custom_command = True break if is_custom_command: # Handle as a custom command return await self.handle_custom_command(command_text) # Process with Gemini if GEMINI_ENABLED: # Send thinking status await self.send_websocket_message("ai_status", { "status": "Thinking", "message": "Analyzing your command..." }) # Send log message await self.send_websocket_message("log", { "message": "Analyzing command with Gemini AI...", "level": "info" }) # Process with Gemini result = await self.process_with_gemini(command_text) if result["status"] == "success": # Map the command action to ensure compatibility command = await self.map_command_action(result["data"]) # Send processing status await self.send_websocket_message("ai_status", { "status": "Creating", "message": f"Creating command: {command['action']}" }) # Send log message await self.send_websocket_message("log", { "message": f"Gemini interpreted command as: {command['action']}", "level": "info" }) # Create the command file self.create_command_file(command) # Send confirmation message await self.send_websocket_message("log", { "message": command.get("confirmation_message", f"Executing {command['action']}"), "level": "success" }) else: # Try as custom command if normal processing fails logger.warning(f"Standard command processing failed, trying as custom command") # Send status await self.send_websocket_message("ai_status", { "status": "Adapting", "message": "Trying as custom command..." }) return await self.handle_custom_command(command_text) else: # Gemini not enabled, use simple text layer logger.warning("Gemini not enabled, using simple text layer") # Send status await self.send_websocket_message("ai_status", { "status": "Limited", "message": "Gemini AI not enabled, using simple command" }) # Create a simple command command = { "action": "create_text_layer", "params": { "text": f"Command: {command_text}", "fontSize": 50, "color": "#00AAFF" }, "confirmation_message": "Created text layer with command" } # Create the command file self.create_command_file(command) # Update status self.update_status("gemini_processed", "Gemini command processed") # Send completion status to web UI await self.send_websocket_message("ai_status", { "status": "Ready", "message": "Command processed. Click 'Check for Command' in After Effects." }) # Send log message to web UI await self.send_websocket_message("log", { "message": "Command processed. Click 'Check for Command' in After Effects.", "level": "info" }) # Send reminder await self.send_websocket_message("reminder", { "message": "⚠️ Remember to click 'Check for Command' in After Effects MCP Bridge panel", "level": "warning" }) return { "status": "success", "message": "Gemini command processed. Click 'Check for Command' in After Effects." } except Exception as e: logger.error(f"Error processing Gemini command: {e}") # Update status self.update_status("error", f"Error processing Gemini command: {str(e)}") # Send error status to web UI await self.send_websocket_message("ai_status", { "status": "Error", "message": f"Error processing command: {str(e)}" }) return { "status": "error", "message": f"Error processing Gemini command: {str(e)}" } async def monitor_result_file(self): """Monitor the result file from After Effects""" while True: try: # Check if there's a result file if os.path.exists(RESULT_FILE): try: with open(RESULT_FILE, 'r') as f: result_data = json.load(f) logger.info(f"Result file found: {json.dumps(result_data)[:100]}...") # Update status self.update_status("command_processed", f"Command processed: {json.dumps(result_data)}") # Send result to web UI await self.send_websocket_message("result", result_data) # Send log message to web UI await self.send_websocket_message("log", { "message": f"After Effects processed command: {result_data.get('message', 'No message')}", "level": "success" }) # Remove the result file os.remove(RESULT_FILE) except Exception as e: logger.error(f"Error processing result file: {e}") # Check if command file has been processed if self.last_command_time > 0 and time.time() - self.last_command_time > 10: if not os.path.exists(COMMAND_FILE): # Command file was processed but no result file was created logger.info("Command file was processed but no result file was created") # Update status self.update_status("command_processed_no_result", "Command file was processed but no result file was created") # Send message to web UI await self.send_websocket_message("log", { "message": "Command file was processed but no result file was created", "level": "warning" }) # Reset last command time self.last_command_time = 0 # Check if it's time to remind the user current_time = time.time() if self.last_check_time == 0 or current_time - self.last_check_time > 30: if os.path.exists(COMMAND_FILE): # Remind the user to check for commands logger.info("Reminder: Command file exists, click 'Check for Command' in After Effects") # Send reminder to web UI await self.send_websocket_message("reminder", { "message": "⚠️ Remember to click 'Check for Command' in After Effects MCP Bridge panel", "level": "warning" }) # Update last check time self.last_check_time = current_time # Process any commands in the queue if self.command_queue and not self.processing_command: self.processing_command = True command = self.command_queue.pop(0) if isinstance(command, str): # Process as Gemini command await self.process_gemini_command(command) else: # Process as direct command self.create_command_file(command) self.processing_command = False # Check file status and send to UI await self.check_file_status() except Exception as e: logger.error(f"Error in result monitor: {e}") # Wait before checking again await asyncio.sleep(1) async def check_file_status(self): """Check file status and send to UI""" try: if self.websocket: command_file_exists = os.path.exists(COMMAND_FILE) result_file_exists = os.path.exists(RESULT_FILE) await self.send_websocket_message("file_status", { "command_file_exists": command_file_exists, "result_file_exists": result_file_exists }) except Exception as e: logger.error(f"Error checking file status: {e}") async def handle_websocket(self, websocket, path): """Handle websocket connection from web UI""" logger.info(f"Websocket connection established: {path}") # Store the websocket connection self.websocket = websocket # Send initial status to web UI await self.send_websocket_message("status", { "status": "connected", "message": "Connected to MCP Fixer" }) # Send Gemini status await self.send_websocket_message("ai_status", { "status": "Ready", "message": f"Gemini AI {'enabled' if GEMINI_ENABLED else 'disabled'}" }) # Check file status await self.check_file_status() try: async for message in websocket: try: data = json.loads(message) logger.info(f"Received websocket message: {json.dumps(data)[:100]}...") # Handle different message types if data.get("type") == "command": # Add to command queue self.command_queue.append(data.get("data")) # Send acknowledgment to web UI await self.send_websocket_message("ack", { "message": "Command received and queued for processing", "queue_position": len(self.command_queue) }) elif data.get("type") == "gemini_command": # Add to command queue self.command_queue.append(data.get("text")) # Send acknowledgment to web UI await self.send_websocket_message("ack", { "message": "Gemini command received and queued for processing", "queue_position": len(self.command_queue) }) elif data.get("type") == "check_file_status": # Check file status await self.check_file_status() except json.JSONDecodeError: logger.error(f"Invalid JSON received: {message}") except Exception as e: logger.error(f"Error processing websocket message: {e}") except websockets.exceptions.ConnectionClosed: logger.info("Websocket connection closed") finally: self.websocket = None async def start_websocket_server(self): """Start the websocket server""" try: server = await websockets.serve( self.handle_websocket, "localhost", WEB_UI_PORT + 1 # Use a different port to avoid conflicts ) logger.info(f"Websocket server started on port {WEB_UI_PORT + 1}") # Keep the server running await server.wait_closed() except Exception as e: logger.error(f"Error starting websocket server: {e}") async def run(self): """Run the MCP Fixer""" logger.info("Starting MCP Fixer...") # Clean up any existing files self.clean_temp_files() # Create initial status self.update_status("starting", "MCP Fixer starting") # Start the result file monitor monitor_task = asyncio.create_task(self.monitor_result_file()) # Start the websocket server websocket_task = asyncio.create_task(self.start_websocket_server()) # Create a test command test_command = { "action": "create_text_layer", "params": { "text": "MCP Fixer Test", "fontSize": 72, "color": "#00FF00" } } self.create_command_file(test_command) logger.info("MCP Fixer started") logger.info("Test command file created") logger.info("Please click 'Check for Command' in After Effects") # Wait for tasks to complete (they won't) await asyncio.gather(monitor_task, websocket_task) async def main(): """Main function""" logger.info("=" * 60) logger.info("Complete MCP Fix") logger.info("=" * 60) fixer = MCPFixer() await fixer.run() if __name__ == "__main__": asyncio.run(main())

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/PankajBagariya/After-Efffect-MCP'

If you have feedback or need assistance with the MCP directory API, please join our Discord server