Skip to main content
Glama

After Effects Motion Control Panel

ae_controller.py•23.8 kB
# ae_controller.py # v3: FINAL Failsafe Version. # Tries the fast TCP socket method first. If it fails, it automatically # falls back to the reliable (but slower) file-based method. import socket import json import os import time import asyncio import logging import sys from pathlib import Path # Set up logging to show in terminal logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', handlers=[ logging.StreamHandler(sys.stdout) ] ) logger = logging.getLogger(__name__) AE_HOST = "127.0.0.1" AE_PORT = 8250 TCP_TIMEOUT = 5 TEMP_DIR = "C:/ae_temp" COMMAND_FILE = os.path.join(TEMP_DIR, "command.json") RESULT_FILE = os.path.join(TEMP_DIR, "result.json") FILE_TIMEOUT = 30 # Create temp directory if it doesn't exist Path(TEMP_DIR).mkdir(parents=True, exist_ok=True) def ensure_temp_dir(): """Ensure the temporary directory exists""" if not os.path.exists(TEMP_DIR): try: os.makedirs(TEMP_DIR) logger.info(f"Created temporary directory: {TEMP_DIR}") except Exception as e: logger.error(f"Failed to create temporary directory: {e}") raise async def send_command_to_ae(command_data: dict): """ Tries to send a command via TCP socket first for speed. If that fails, it automatically falls back to the reliable file-based method. """ ensure_temp_dir() logger.info("=" * 50) logger.info(f"Processing command: {json.dumps(command_data, indent=2)}") logger.info("=" * 50) try: # Try TCP connection first try: with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: s.settimeout(TCP_TIMEOUT) logger.info(f"Attempting fast connection to AE at {AE_HOST}:{AE_PORT}...") s.connect((AE_HOST, AE_PORT)) logger.info("Connection successful! Sending command via TCP...") s.sendall(json.dumps(command_data).encode('utf-8')) response_data = s.recv(4096) response_json = json.loads(response_data.decode('utf-8')) logger.info(f"Received response via TCP: {json.dumps(response_json, indent=2)}") return response_json except (ConnectionRefusedError, socket.timeout) as e: logger.warning(f"TCP connection failed: {e}") logger.info("Falling back to reliable file-based method...") try: # Clean up any existing result file if os.path.exists(RESULT_FILE): os.remove(RESULT_FILE) # Write command to file with open(COMMAND_FILE, 'w') as f: json.dump(command_data, f, indent=2) logger.info(f"Command file written to {COMMAND_FILE}") logger.info("Waiting for After Effects to process the command...") logger.info("Please click the 'Check for Command' button in After Effects") # Wait for After Effects to process the command start_time = time.time() while not os.path.exists(RESULT_FILE): await asyncio.sleep(0.5) if time.time() - start_time > FILE_TIMEOUT: error_msg = "Timeout: After Effects did not respond. Please make sure the After Effects script is running and check the 'Check for Command' button." logger.error(error_msg) return { "status": "error", "message": error_msg, "details": "The After Effects script might not be running or the 'Check for Command' button needs to be clicked." } # Read the result with open(RESULT_FILE, 'r') as f: result_data = json.load(f) logger.info(f"Received response via file: {json.dumps(result_data, indent=2)}") if result_data.get('status') == 'success': return result_data else: logger.error(f"After Effects returned error or did not process the command: {result_data}") return { 'status': 'error', 'message': result_data.get('message', 'After Effects did not process the command.'), 'details': result_data } except Exception as file_e: error_msg = f"An error occurred during file-based communication: {file_e}" logger.error(error_msg) return { "status": "error", "message": error_msg, "details": "Please check if After Effects is running and the script is properly installed." } finally: # Clean up try: if os.path.exists(RESULT_FILE): os.remove(RESULT_FILE) if os.path.exists(COMMAND_FILE): os.remove(COMMAND_FILE) logger.info("Temporary files cleaned up") except Exception as cleanup_e: logger.warning(f"Error during cleanup: {cleanup_e}") except Exception as e: error_msg = f"Unexpected error while communicating with After Effects: {e}" logger.error(error_msg) return { "status": "error", "message": error_msg, "details": "Please check if After Effects is running and properly configured." } async def create_text_layer(params): """Create a new text layer in After Effects""" command = { "action": "create_text_layer", "params": params, "confirmation_message": "Creating text layer..." } return await send_command_to_ae(command) def modify_layer(params): """Modify an existing layer in After Effects""" command = { "action": "modify_layer", "params": params, "confirmation_message": "Modifying layer..." } return send_command_to_ae(command) def apply_effect(params): """Apply an effect to a layer in After Effects""" command = { "action": "apply_effect", "params": params, "confirmation_message": "Applying effect..." } return send_command_to_ae(command) def create_animation(params): """Create an animation in After Effects""" command = { "action": "create_animation", "params": params, "confirmation_message": "Creating animation..." } return send_command_to_ae(command) async def inject_script(script_path): """Inject a script into After Effects.""" command = { 'action': 'inject_script', 'params': {'script_path': script_path}, 'confirmation_message': 'Injecting script into After Effects' } try: return await send_command_to_ae(command) except Exception as e: logging.error(f"Error injecting script: {str(e)}") return {'status': 'error', 'message': str(e)} async def import_media(params): """Import media files into After Effects.""" try: if not params or not isinstance(params, dict): return {"status": "error", "message": "Invalid parameters"} file_path = params.get("filePath") media_type = params.get("type") options = params.get("options", {}) if not file_path: return {"status": "error", "message": "No file path specified"} if not media_type: return {"status": "error", "message": "No media type specified"} # Validate file exists if not os.path.exists(file_path): return {"status": "error", "message": f"File not found: {file_path}"} # Create command command = { "action": "import_media", "params": { "filePath": file_path, "type": media_type, "options": options }, "confirmation_message": f"Importing {media_type} from {file_path}" } # Send command to After Effects result = await send_command_to_ae(command) return result except Exception as e: logging.error(f"Error importing media: {str(e)}") return {"status": "error", "message": str(e)} async def run_custom_code(params): """Run custom code in After Effects.""" try: if not params or not isinstance(params, dict): return {"status": "error", "message": "Invalid parameters"} # Handle direct code execution if "code" in params: custom_code = params.get("code") if not custom_code: return {"status": "error", "message": "Empty code provided"} # Create command for direct code execution command = { "action": "run_custom_code", "params": { "code": custom_code }, "confirmation_message": "Running custom code..." } # Send to After Effects return await send_command_to_ae(command) # Handle action-based requests action = params.get("action") if not action: return {"status": "error", "message": "No action or code specified"} # Handle different actions if action == "import_media": return await import_media(params.get("params", {})) elif action == "create_text_layer": return await create_text_layer(params.get("params", {})) elif action == "create_solid_layer": return await create_solid_layer(params.get("params", {})) elif action == "create_shape_layer": return await create_shape_layer(params.get("params", {})) elif action == "add_text_animation": return await add_text_animation(params.get("params", {})) elif action == "add_layer_animation": return await add_layer_animation(params.get("params", {})) elif action == "add_color_animation": return await add_color_animation(params.get("params", {})) elif action == "add_effect": return await add_effect(params.get("params", {})) elif action == "create_composition": return await create_composition(params.get("params", {})) else: return {"status": "error", "message": f"Unknown action: {action}"} except Exception as e: logging.error(f"Error in run_custom_code: {str(e)}") return {"status": "error", "message": str(e)} async def handle_gemini_code(params): """Handle Gemini AI-based code generation from a natural language prompt with self-healing.""" from code_generator import AECodeGenerator import os import time try: # Extract the prompt from params prompt = params.get('prompt', '') if not prompt: return { 'status': 'error', 'message': 'No prompt provided for Gemini code generation' } # Get the API key api_key = os.getenv("GEMINI_API_KEY") if not api_key: return { 'status': 'error', 'message': 'GEMINI_API_KEY not found in environment variables' } # Initialize the code generator generator = AECodeGenerator(api_key) # Get error context if this is a retry error_context = params.get('error_context', None) previous_attempts = params.get('previous_attempts', []) # Generate the code with advanced capabilities logging.info(f"Generating After Effects code for prompt: {prompt}") result = generator.generate_custom_code_from_prompt( prompt=prompt, error_context=error_context, previous_attempts=previous_attempts ) # Check if generation was successful if result['status'] == 'success': code = result.get('code', '') # Create the command with the correct action command = { 'action': 'run_custom_code', 'params': { 'code': code }, 'confirmation_message': f"Running Gemini-generated code for: {prompt}" } # Send the command to After Effects logging.info(f"Executing Gemini-generated code for: {prompt}") ae_result = await send_command_to_ae(command) # Check if execution was successful if ae_result.get('status') == 'success': # Add the generated code to the response ae_result['generated_code'] = code ae_result['prompt'] = prompt return ae_result else: # If execution failed, try to fix the code error_message = ae_result.get('message', 'Unknown error') logging.warning(f"Execution failed: {error_message}. Attempting to fix...") # Try to fix the code fixed_result = generator.fix_code_from_error( code=code, error_message=error_message, task_description=prompt ) if fixed_result['status'] == 'success': fixed_code = fixed_result.get('code', '') # Try executing the fixed code fixed_command = { 'action': 'run_custom_code', 'params': { 'code': fixed_code }, 'confirmation_message': f"Running fixed code for: {prompt}" } logging.info("Executing fixed code...") fixed_ae_result = await send_command_to_ae(fixed_command) # Add the generated and fixed code to the response fixed_ae_result['generated_code'] = fixed_code fixed_ae_result['original_code'] = code fixed_ae_result['prompt'] = prompt fixed_ae_result['was_fixed'] = True return fixed_ae_result else: # If fixing failed, return the original error ae_result['generated_code'] = code ae_result['fix_attempted'] = True return ae_result else: return { 'status': 'error', 'message': f"Failed to generate code: {result.get('message', 'Unknown error')}", 'details': result } except Exception as e: logging.error(f"Error in handle_gemini_code: {str(e)}") return { 'status': 'error', 'message': f"Error generating or executing code: {str(e)}" } def create_image_layer(params): """Create a new image layer in After Effects""" command = { "action": "create_image_layer", "params": { "file_path": params.get('file_path'), "position": params.get('position', [0, 0]), "scale": params.get('scale', [100, 100]), "duration": params.get('duration', 10) }, "confirmation_message": "Creating image layer..." } return send_command_to_ae(command) async def create_solid_layer(params): """Create a solid layer in After Effects.""" try: if not params or not isinstance(params, dict): return {"status": "error", "message": "Invalid parameters"} name = params.get("name", "Solid Layer") width = params.get("width", 1920) height = params.get("height", 1080) color = params.get("color", [1, 1, 1]) position = params.get("position", [960, 540]) command = { "action": "create_solid_layer", "params": { "name": name, "width": width, "height": height, "color": color, "position": position }, "confirmation_message": f"Creating solid layer '{name}'" } result = await send_command_to_ae(command) return result except Exception as e: logging.error(f"Error creating solid layer: {str(e)}") return {"status": "error", "message": str(e)} async def create_shape_layer(params): """Create a shape layer in After Effects.""" try: if not params or not isinstance(params, dict): return {"status": "error", "message": "Invalid parameters"} shape_type = params.get("shapeType", "rectangle") position = params.get("position", [960, 540]) properties = params.get("properties", {}) command = { "action": "create_shape_layer", "params": { "shapeType": shape_type, "position": position, "properties": properties }, "confirmation_message": f"Creating {shape_type} shape layer" } result = await send_command_to_ae(command) return result except Exception as e: logging.error(f"Error creating shape layer: {str(e)}") return {"status": "error", "message": str(e)} async def create_composition(params): """Create a new composition in After Effects.""" try: if not params or not isinstance(params, dict): return {"status": "error", "message": "Invalid parameters"} name = params.get("name", "New Composition") width = params.get("width", 1920) height = params.get("height", 1080) duration = params.get("duration", 10) frame_rate = params.get("frameRate", 30) command = { "action": "create_composition", "params": { "name": name, "width": width, "height": height, "duration": duration, "frameRate": frame_rate }, "confirmation_message": f"Creating composition '{name}'" } result = await send_command_to_ae(command) return result except Exception as e: logging.error(f"Error creating composition: {str(e)}") return {"status": "error", "message": str(e)} async def add_text_animation(params): """Add animation to text layer.""" try: if not params or not isinstance(params, dict): return {"status": "error", "message": "Invalid parameters"} layer_name = params.get("layerName") animation_type = params.get("animationType") duration = params.get("duration", 2) easing = params.get("easing", "ease") if not layer_name: return {"status": "error", "message": "No layer name specified"} if not animation_type: return {"status": "error", "message": "No animation type specified"} command = { "action": "add_text_animation", "params": { "layerName": layer_name, "animationType": animation_type, "duration": duration, "easing": easing }, "confirmation_message": f"Adding {animation_type} animation to text layer '{layer_name}'" } result = await send_command_to_ae(command) return result except Exception as e: logging.error(f"Error adding text animation: {str(e)}") return {"status": "error", "message": str(e)} async def add_layer_animation(params): """Add animation to any layer.""" try: if not params or not isinstance(params, dict): return {"status": "error", "message": "Invalid parameters"} layer_name = params.get("layerName") property_name = params.get("property") keyframes = params.get("keyframes", []) easing = params.get("easing", "ease") if not layer_name: return {"status": "error", "message": "No layer name specified"} if not property_name: return {"status": "error", "message": "No property specified"} if not keyframes: return {"status": "error", "message": "No keyframes specified"} command = { "action": "add_layer_animation", "params": { "layerName": layer_name, "property": property_name, "keyframes": keyframes, "easing": easing }, "confirmation_message": f"Adding animation to layer '{layer_name}'" } result = await send_command_to_ae(command) return result except Exception as e: logging.error(f"Error adding layer animation: {str(e)}") return {"status": "error", "message": str(e)} async def add_color_animation(params): """Add color animation to a layer.""" try: if not params or not isinstance(params, dict): return {"status": "error", "message": "Invalid parameters"} layer_name = params.get("layerName") property_name = params.get("property") keyframes = params.get("keyframes", []) easing = params.get("easing", "ease") if not layer_name: return {"status": "error", "message": "No layer name specified"} if not property_name: return {"status": "error", "message": "No property specified"} if not keyframes: return {"status": "error", "message": "No keyframes specified"} command = { "action": "add_color_animation", "params": { "layerName": layer_name, "property": property_name, "keyframes": keyframes, "easing": easing }, "confirmation_message": f"Adding color animation to layer '{layer_name}'" } result = await send_command_to_ae(command) return result except Exception as e: logging.error(f"Error adding color animation: {str(e)}") return {"status": "error", "message": str(e)} async def add_effect(params): """Add effect to a layer.""" try: if not params or not isinstance(params, dict): return {"status": "error", "message": "Invalid parameters"} layer_name = params.get("layerName") effect_name = params.get("effectName") properties = params.get("properties", {}) if not layer_name: return {"status": "error", "message": "No layer name specified"} if not effect_name: return {"status": "error", "message": "No effect name specified"} command = { "action": "add_effect", "params": { "layerName": layer_name, "effectName": effect_name, "properties": properties }, "confirmation_message": f"Adding {effect_name} effect to layer '{layer_name}'" } result = await send_command_to_ae(command) return result except Exception as e: logging.error(f"Error adding effect: {str(e)}") return {"status": "error", "message": str(e)}

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/PankajBagariya/After-Efffect-MCP'

If you have feedback or need assistance with the MCP directory API, please join our Discord server