ae_controller.pyā¢23.8 kB
# ae_controller.py
# v3: FINAL Failsafe Version.
# Tries the fast TCP socket method first. If it fails, it automatically
# falls back to the reliable (but slower) file-based method.
import socket
import json
import os
import time
import asyncio
import logging
import sys
from pathlib import Path
# Set up logging to show in terminal
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.StreamHandler(sys.stdout)
]
)
logger = logging.getLogger(__name__)
AE_HOST = "127.0.0.1"
AE_PORT = 8250
TCP_TIMEOUT = 5
TEMP_DIR = "C:/ae_temp"
COMMAND_FILE = os.path.join(TEMP_DIR, "command.json")
RESULT_FILE = os.path.join(TEMP_DIR, "result.json")
FILE_TIMEOUT = 30
# Create temp directory if it doesn't exist
Path(TEMP_DIR).mkdir(parents=True, exist_ok=True)
def ensure_temp_dir():
"""Ensure the temporary directory exists"""
if not os.path.exists(TEMP_DIR):
try:
os.makedirs(TEMP_DIR)
logger.info(f"Created temporary directory: {TEMP_DIR}")
except Exception as e:
logger.error(f"Failed to create temporary directory: {e}")
raise
async def send_command_to_ae(command_data: dict):
"""
Tries to send a command via TCP socket first for speed. If that fails,
it automatically falls back to the reliable file-based method.
"""
ensure_temp_dir()
logger.info("=" * 50)
logger.info(f"Processing command: {json.dumps(command_data, indent=2)}")
logger.info("=" * 50)
try:
# Try TCP connection first
try:
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.settimeout(TCP_TIMEOUT)
logger.info(f"Attempting fast connection to AE at {AE_HOST}:{AE_PORT}...")
s.connect((AE_HOST, AE_PORT))
logger.info("Connection successful! Sending command via TCP...")
s.sendall(json.dumps(command_data).encode('utf-8'))
response_data = s.recv(4096)
response_json = json.loads(response_data.decode('utf-8'))
logger.info(f"Received response via TCP: {json.dumps(response_json, indent=2)}")
return response_json
except (ConnectionRefusedError, socket.timeout) as e:
logger.warning(f"TCP connection failed: {e}")
logger.info("Falling back to reliable file-based method...")
try:
# Clean up any existing result file
if os.path.exists(RESULT_FILE):
os.remove(RESULT_FILE)
# Write command to file
with open(COMMAND_FILE, 'w') as f:
json.dump(command_data, f, indent=2)
logger.info(f"Command file written to {COMMAND_FILE}")
logger.info("Waiting for After Effects to process the command...")
logger.info("Please click the 'Check for Command' button in After Effects")
# Wait for After Effects to process the command
start_time = time.time()
while not os.path.exists(RESULT_FILE):
await asyncio.sleep(0.5)
if time.time() - start_time > FILE_TIMEOUT:
error_msg = "Timeout: After Effects did not respond. Please make sure the After Effects script is running and check the 'Check for Command' button."
logger.error(error_msg)
return {
"status": "error",
"message": error_msg,
"details": "The After Effects script might not be running or the 'Check for Command' button needs to be clicked."
}
# Read the result
with open(RESULT_FILE, 'r') as f:
result_data = json.load(f)
logger.info(f"Received response via file: {json.dumps(result_data, indent=2)}")
if result_data.get('status') == 'success':
return result_data
else:
logger.error(f"After Effects returned error or did not process the command: {result_data}")
return {
'status': 'error',
'message': result_data.get('message', 'After Effects did not process the command.'),
'details': result_data
}
except Exception as file_e:
error_msg = f"An error occurred during file-based communication: {file_e}"
logger.error(error_msg)
return {
"status": "error",
"message": error_msg,
"details": "Please check if After Effects is running and the script is properly installed."
}
finally:
# Clean up
try:
if os.path.exists(RESULT_FILE):
os.remove(RESULT_FILE)
if os.path.exists(COMMAND_FILE):
os.remove(COMMAND_FILE)
logger.info("Temporary files cleaned up")
except Exception as cleanup_e:
logger.warning(f"Error during cleanup: {cleanup_e}")
except Exception as e:
error_msg = f"Unexpected error while communicating with After Effects: {e}"
logger.error(error_msg)
return {
"status": "error",
"message": error_msg,
"details": "Please check if After Effects is running and properly configured."
}
async def create_text_layer(params):
"""Create a new text layer in After Effects"""
command = {
"action": "create_text_layer",
"params": params,
"confirmation_message": "Creating text layer..."
}
return await send_command_to_ae(command)
def modify_layer(params):
"""Modify an existing layer in After Effects"""
command = {
"action": "modify_layer",
"params": params,
"confirmation_message": "Modifying layer..."
}
return send_command_to_ae(command)
def apply_effect(params):
"""Apply an effect to a layer in After Effects"""
command = {
"action": "apply_effect",
"params": params,
"confirmation_message": "Applying effect..."
}
return send_command_to_ae(command)
def create_animation(params):
"""Create an animation in After Effects"""
command = {
"action": "create_animation",
"params": params,
"confirmation_message": "Creating animation..."
}
return send_command_to_ae(command)
async def inject_script(script_path):
"""Inject a script into After Effects."""
command = {
'action': 'inject_script',
'params': {'script_path': script_path},
'confirmation_message': 'Injecting script into After Effects'
}
try:
return await send_command_to_ae(command)
except Exception as e:
logging.error(f"Error injecting script: {str(e)}")
return {'status': 'error', 'message': str(e)}
async def import_media(params):
"""Import media files into After Effects."""
try:
if not params or not isinstance(params, dict):
return {"status": "error", "message": "Invalid parameters"}
file_path = params.get("filePath")
media_type = params.get("type")
options = params.get("options", {})
if not file_path:
return {"status": "error", "message": "No file path specified"}
if not media_type:
return {"status": "error", "message": "No media type specified"}
# Validate file exists
if not os.path.exists(file_path):
return {"status": "error", "message": f"File not found: {file_path}"}
# Create command
command = {
"action": "import_media",
"params": {
"filePath": file_path,
"type": media_type,
"options": options
},
"confirmation_message": f"Importing {media_type} from {file_path}"
}
# Send command to After Effects
result = await send_command_to_ae(command)
return result
except Exception as e:
logging.error(f"Error importing media: {str(e)}")
return {"status": "error", "message": str(e)}
async def run_custom_code(params):
"""Run custom code in After Effects."""
try:
if not params or not isinstance(params, dict):
return {"status": "error", "message": "Invalid parameters"}
# Handle direct code execution
if "code" in params:
custom_code = params.get("code")
if not custom_code:
return {"status": "error", "message": "Empty code provided"}
# Create command for direct code execution
command = {
"action": "run_custom_code",
"params": {
"code": custom_code
},
"confirmation_message": "Running custom code..."
}
# Send to After Effects
return await send_command_to_ae(command)
# Handle action-based requests
action = params.get("action")
if not action:
return {"status": "error", "message": "No action or code specified"}
# Handle different actions
if action == "import_media":
return await import_media(params.get("params", {}))
elif action == "create_text_layer":
return await create_text_layer(params.get("params", {}))
elif action == "create_solid_layer":
return await create_solid_layer(params.get("params", {}))
elif action == "create_shape_layer":
return await create_shape_layer(params.get("params", {}))
elif action == "add_text_animation":
return await add_text_animation(params.get("params", {}))
elif action == "add_layer_animation":
return await add_layer_animation(params.get("params", {}))
elif action == "add_color_animation":
return await add_color_animation(params.get("params", {}))
elif action == "add_effect":
return await add_effect(params.get("params", {}))
elif action == "create_composition":
return await create_composition(params.get("params", {}))
else:
return {"status": "error", "message": f"Unknown action: {action}"}
except Exception as e:
logging.error(f"Error in run_custom_code: {str(e)}")
return {"status": "error", "message": str(e)}
async def handle_gemini_code(params):
"""Handle Gemini AI-based code generation from a natural language prompt with self-healing."""
from code_generator import AECodeGenerator
import os
import time
try:
# Extract the prompt from params
prompt = params.get('prompt', '')
if not prompt:
return {
'status': 'error',
'message': 'No prompt provided for Gemini code generation'
}
# Get the API key
api_key = os.getenv("GEMINI_API_KEY")
if not api_key:
return {
'status': 'error',
'message': 'GEMINI_API_KEY not found in environment variables'
}
# Initialize the code generator
generator = AECodeGenerator(api_key)
# Get error context if this is a retry
error_context = params.get('error_context', None)
previous_attempts = params.get('previous_attempts', [])
# Generate the code with advanced capabilities
logging.info(f"Generating After Effects code for prompt: {prompt}")
result = generator.generate_custom_code_from_prompt(
prompt=prompt,
error_context=error_context,
previous_attempts=previous_attempts
)
# Check if generation was successful
if result['status'] == 'success':
code = result.get('code', '')
# Create the command with the correct action
command = {
'action': 'run_custom_code',
'params': {
'code': code
},
'confirmation_message': f"Running Gemini-generated code for: {prompt}"
}
# Send the command to After Effects
logging.info(f"Executing Gemini-generated code for: {prompt}")
ae_result = await send_command_to_ae(command)
# Check if execution was successful
if ae_result.get('status') == 'success':
# Add the generated code to the response
ae_result['generated_code'] = code
ae_result['prompt'] = prompt
return ae_result
else:
# If execution failed, try to fix the code
error_message = ae_result.get('message', 'Unknown error')
logging.warning(f"Execution failed: {error_message}. Attempting to fix...")
# Try to fix the code
fixed_result = generator.fix_code_from_error(
code=code,
error_message=error_message,
task_description=prompt
)
if fixed_result['status'] == 'success':
fixed_code = fixed_result.get('code', '')
# Try executing the fixed code
fixed_command = {
'action': 'run_custom_code',
'params': {
'code': fixed_code
},
'confirmation_message': f"Running fixed code for: {prompt}"
}
logging.info("Executing fixed code...")
fixed_ae_result = await send_command_to_ae(fixed_command)
# Add the generated and fixed code to the response
fixed_ae_result['generated_code'] = fixed_code
fixed_ae_result['original_code'] = code
fixed_ae_result['prompt'] = prompt
fixed_ae_result['was_fixed'] = True
return fixed_ae_result
else:
# If fixing failed, return the original error
ae_result['generated_code'] = code
ae_result['fix_attempted'] = True
return ae_result
else:
return {
'status': 'error',
'message': f"Failed to generate code: {result.get('message', 'Unknown error')}",
'details': result
}
except Exception as e:
logging.error(f"Error in handle_gemini_code: {str(e)}")
return {
'status': 'error',
'message': f"Error generating or executing code: {str(e)}"
}
def create_image_layer(params):
"""Create a new image layer in After Effects"""
command = {
"action": "create_image_layer",
"params": {
"file_path": params.get('file_path'),
"position": params.get('position', [0, 0]),
"scale": params.get('scale', [100, 100]),
"duration": params.get('duration', 10)
},
"confirmation_message": "Creating image layer..."
}
return send_command_to_ae(command)
async def create_solid_layer(params):
"""Create a solid layer in After Effects."""
try:
if not params or not isinstance(params, dict):
return {"status": "error", "message": "Invalid parameters"}
name = params.get("name", "Solid Layer")
width = params.get("width", 1920)
height = params.get("height", 1080)
color = params.get("color", [1, 1, 1])
position = params.get("position", [960, 540])
command = {
"action": "create_solid_layer",
"params": {
"name": name,
"width": width,
"height": height,
"color": color,
"position": position
},
"confirmation_message": f"Creating solid layer '{name}'"
}
result = await send_command_to_ae(command)
return result
except Exception as e:
logging.error(f"Error creating solid layer: {str(e)}")
return {"status": "error", "message": str(e)}
async def create_shape_layer(params):
"""Create a shape layer in After Effects."""
try:
if not params or not isinstance(params, dict):
return {"status": "error", "message": "Invalid parameters"}
shape_type = params.get("shapeType", "rectangle")
position = params.get("position", [960, 540])
properties = params.get("properties", {})
command = {
"action": "create_shape_layer",
"params": {
"shapeType": shape_type,
"position": position,
"properties": properties
},
"confirmation_message": f"Creating {shape_type} shape layer"
}
result = await send_command_to_ae(command)
return result
except Exception as e:
logging.error(f"Error creating shape layer: {str(e)}")
return {"status": "error", "message": str(e)}
async def create_composition(params):
"""Create a new composition in After Effects."""
try:
if not params or not isinstance(params, dict):
return {"status": "error", "message": "Invalid parameters"}
name = params.get("name", "New Composition")
width = params.get("width", 1920)
height = params.get("height", 1080)
duration = params.get("duration", 10)
frame_rate = params.get("frameRate", 30)
command = {
"action": "create_composition",
"params": {
"name": name,
"width": width,
"height": height,
"duration": duration,
"frameRate": frame_rate
},
"confirmation_message": f"Creating composition '{name}'"
}
result = await send_command_to_ae(command)
return result
except Exception as e:
logging.error(f"Error creating composition: {str(e)}")
return {"status": "error", "message": str(e)}
async def add_text_animation(params):
"""Add animation to text layer."""
try:
if not params or not isinstance(params, dict):
return {"status": "error", "message": "Invalid parameters"}
layer_name = params.get("layerName")
animation_type = params.get("animationType")
duration = params.get("duration", 2)
easing = params.get("easing", "ease")
if not layer_name:
return {"status": "error", "message": "No layer name specified"}
if not animation_type:
return {"status": "error", "message": "No animation type specified"}
command = {
"action": "add_text_animation",
"params": {
"layerName": layer_name,
"animationType": animation_type,
"duration": duration,
"easing": easing
},
"confirmation_message": f"Adding {animation_type} animation to text layer '{layer_name}'"
}
result = await send_command_to_ae(command)
return result
except Exception as e:
logging.error(f"Error adding text animation: {str(e)}")
return {"status": "error", "message": str(e)}
async def add_layer_animation(params):
"""Add animation to any layer."""
try:
if not params or not isinstance(params, dict):
return {"status": "error", "message": "Invalid parameters"}
layer_name = params.get("layerName")
property_name = params.get("property")
keyframes = params.get("keyframes", [])
easing = params.get("easing", "ease")
if not layer_name:
return {"status": "error", "message": "No layer name specified"}
if not property_name:
return {"status": "error", "message": "No property specified"}
if not keyframes:
return {"status": "error", "message": "No keyframes specified"}
command = {
"action": "add_layer_animation",
"params": {
"layerName": layer_name,
"property": property_name,
"keyframes": keyframes,
"easing": easing
},
"confirmation_message": f"Adding animation to layer '{layer_name}'"
}
result = await send_command_to_ae(command)
return result
except Exception as e:
logging.error(f"Error adding layer animation: {str(e)}")
return {"status": "error", "message": str(e)}
async def add_color_animation(params):
"""Add color animation to a layer."""
try:
if not params or not isinstance(params, dict):
return {"status": "error", "message": "Invalid parameters"}
layer_name = params.get("layerName")
property_name = params.get("property")
keyframes = params.get("keyframes", [])
easing = params.get("easing", "ease")
if not layer_name:
return {"status": "error", "message": "No layer name specified"}
if not property_name:
return {"status": "error", "message": "No property specified"}
if not keyframes:
return {"status": "error", "message": "No keyframes specified"}
command = {
"action": "add_color_animation",
"params": {
"layerName": layer_name,
"property": property_name,
"keyframes": keyframes,
"easing": easing
},
"confirmation_message": f"Adding color animation to layer '{layer_name}'"
}
result = await send_command_to_ae(command)
return result
except Exception as e:
logging.error(f"Error adding color animation: {str(e)}")
return {"status": "error", "message": str(e)}
async def add_effect(params):
"""Add effect to a layer."""
try:
if not params or not isinstance(params, dict):
return {"status": "error", "message": "Invalid parameters"}
layer_name = params.get("layerName")
effect_name = params.get("effectName")
properties = params.get("properties", {})
if not layer_name:
return {"status": "error", "message": "No layer name specified"}
if not effect_name:
return {"status": "error", "message": "No effect name specified"}
command = {
"action": "add_effect",
"params": {
"layerName": layer_name,
"effectName": effect_name,
"properties": properties
},
"confirmation_message": f"Adding {effect_name} effect to layer '{layer_name}'"
}
result = await send_command_to_ae(command)
return result
except Exception as e:
logging.error(f"Error adding effect: {str(e)}")
return {"status": "error", "message": str(e)}