Skip to main content
Glama

After Effects Motion Control Panel

main.py•16.3 kB
# main.py import json import logging import os import asyncio import socket from pathlib import Path from quart import Quart, request, jsonify, send_from_directory, websocket from quart_cors import cors from gemini_processor import process_command_with_gemini, fix_command_with_gemini, GeminiProcessor from ae_controller import ( send_command_to_ae, run_custom_code, create_text_layer, create_solid_layer, create_shape_layer, handle_gemini_code ) from code_generator import AECodeGenerator from gemini_processor import gemini_processor from file_command_handler import ensure_temp_dir, clean_temp_files, send_command # Configure logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', datefmt='%Y-%m-%d %H:%M:%S' ) # Ensure required directories exist BASE_DIR = Path(__file__).parent.parent FRONTEND_DIR = BASE_DIR / 'frontend' STATIC_DIR = FRONTEND_DIR / 'static' STATIC_DIR.mkdir(parents=True, exist_ok=True) app = Quart(__name__, static_folder=str(FRONTEND_DIR)) app = cors(app) # Initialize Gemini processor and code generator GEMINI_API_KEY = os.getenv('GEMINI_API_KEY') if not GEMINI_API_KEY: logging.warning("GEMINI_API_KEY environment variable is not set. AI features will be disabled.") HAS_GEMINI = False else: HAS_GEMINI = True generator = AECodeGenerator(GEMINI_API_KEY) # Store connected clients connected_clients = set() # Fix for TCP server binding issue def fix_tcp_server_binding(): """ Check if the TCP server can be started and fix any issues. Returns True if TCP server is available, False if we should use file-based communication. """ try: # Try to create and bind a TCP socket AE_HOST = "127.0.0.1" AE_PORT = 8250 with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: # Set socket options to reuse address s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) # Try binding to the port s.bind((AE_HOST, AE_PORT)) s.listen(1) # If we get here, binding was successful logging.info(f"TCP server can be bound to {AE_HOST}:{AE_PORT}") # Close the socket properly s.close() return True except Exception as e: logging.error(f"TCP server binding failed: {e}") logging.info("Falling back to file-based communication") # Ensure temp directory exists for file-based communication ensure_temp_dir() clean_temp_files() return False # Check TCP server at startup USE_TCP = fix_tcp_server_binding() async def handle_command_with_retry(data, max_retries=3): """Handle command with automatic retry and self-healing.""" retries = 0 last_error = None # If data is a string, assume it's a natural language command if isinstance(data, str): data = {"command": data} # Send AI status update for client in connected_clients: try: await client.send_json({ "type": "ai_status", "status": "Processing" }) await client.send_json({ "type": "log", "message": f"Processing command: {json.dumps(data)[:100]}...", "level": "info" }) except Exception: pass while retries <= max_retries: try: # Handle different command types if isinstance(data, dict): # Handle Gemini AI command if data.get("action") == "handle_gemini_code" or "prompt" in data: for client in connected_clients: try: await client.send_json({ "type": "ai_status", "status": "Gemini Processing" }) except Exception: pass if HAS_GEMINI: result = await handle_gemini_code(data) else: result = { "status": "error", "message": "Gemini API key not configured. AI features are disabled." } # Handle natural language command elif "command" in data and isinstance(data["command"], str): for client in connected_clients: try: await client.send_json({ "type": "ai_status", "status": "Analyzing Command" }) except Exception: pass if HAS_GEMINI: result = await gemini_processor.process_command_with_gemini(data["command"]) else: result = { "status": "error", "message": "Gemini API key not configured. AI features are disabled." } # Handle direct action command elif "action" in data: # Use file-based command if TCP is not available if not USE_TCP: result = send_command(data) else: result = await send_command_to_ae(data) else: result = { "status": "error", "message": "Invalid command format. Must include 'action' or 'command' field." } # If data is not a dict, try to process as natural language else: if HAS_GEMINI: result = await gemini_processor.process_command_with_gemini(str(data)) else: result = { "status": "error", "message": "Gemini API key not configured. AI features are disabled." } # Log the result logging.info(f"Command result: {json.dumps(result)}") # Send AI status update for client in connected_clients: try: await client.send_json({ "type": "ai_status", "status": "Ready" }) await client.send_json({ "type": "log", "message": f"Command completed with status: {result.get('status', 'unknown')}", "level": "info" if result.get("status") == "success" else "error" }) except Exception: pass return result except Exception as e: last_error = str(e) retries += 1 logging.error(f"Error processing command (attempt {retries}/{max_retries+1}): {last_error}") # Send error log for client in connected_clients: try: await client.send_json({ "type": "log", "message": f"Error (attempt {retries}/{max_retries+1}): {last_error}", "level": "error" }) await client.send_json({ "type": "ai_status", "status": "Retrying" if retries <= max_retries else "Error" }) except Exception: pass if retries <= max_retries: # Try to fix the command with Gemini try: for client in connected_clients: try: await client.send_json({ "type": "log", "message": "Attempting to fix command with Gemini AI...", "level": "info" }) except Exception: pass fixed_data = await fix_command_with_gemini(last_error, {"original_command": data}) if fixed_data and isinstance(fixed_data, dict): data = fixed_data for client in connected_clients: try: await client.send_json({ "type": "log", "message": f"Command fixed by Gemini AI: {json.dumps(fixed_data)[:100]}...", "level": "success" }) except Exception: pass # Wait a moment before retrying await asyncio.sleep(1) except Exception as fix_error: logging.error(f"Error fixing command: {str(fix_error)}") for client in connected_clients: try: await client.send_json({ "type": "log", "message": f"Failed to fix command: {str(fix_error)}", "level": "error" }) except Exception: pass # If we've exhausted retries, return error return { "status": "error", "message": f"Failed after {max_retries+1} attempts. Last error: {last_error}" } @app.websocket('/ws') async def ws(): """Handle WebSocket connections.""" try: await websocket.accept() logging.info("WebSocket connection accepted") # Add to connected clients connected_clients.add(websocket) # Send initial connection status await websocket.send_json({ "type": "status", "message": "Connected to server", "status": "connected" }) # Send welcome log await websocket.send_json({ "type": "log", "message": "Connected to After Effects MCP AI Server", "level": "success" }) while True: try: # Wait for messages data = await websocket.receive() # Process the message try: message = json.loads(data) logging.info(f"Received WebSocket message: {message}") # Handle different message types if message.get("type") == "command": # Send processing status await websocket.send_json({ "type": "ai_status", "status": "Processing" }) # Process the command result = await handle_command_with_retry(message.get("data", {})) # Send the result back await websocket.send_json({ "type": "result", "status": result.get("status", "error"), "message": result.get("message", "Unknown result"), "generated_code": result.get("generated_code", "") }) elif message.get("type") == "gemini_command": # Send processing status await websocket.send_json({ "type": "ai_status", "status": "Gemini Processing" }) # Log the Gemini request await websocket.send_json({ "type": "log", "message": "Processing with Gemini AI...", "level": "info" }) # Process with Gemini result = await handle_gemini_code(message.get("data", {})) # Send the result back await websocket.send_json({ "type": "result", "status": result.get("status", "error"), "message": result.get("message", "Unknown result"), "generated_code": result.get("generated_code", "") }) else: # Unknown message type await websocket.send_json({ "type": "error", "message": "Unknown message type" }) except json.JSONDecodeError: logging.error(f"Invalid JSON received: {data}") await websocket.send_json({ "type": "error", "message": "Invalid message format" }) except Exception as e: if "disconnect" in str(e).lower(): logging.info("WebSocket disconnected") break else: logging.error(f"WebSocket error: {str(e)}") try: await websocket.send_json({ "type": "error", "message": f"Server error: {str(e)}" }) except: pass except Exception as e: logging.error(f"WebSocket error: {str(e)}") try: await websocket.send_json({ "type": "error", "message": f"Server error: {str(e)}" }) except: pass finally: # Remove from connected clients if websocket in connected_clients: connected_clients.remove(websocket) try: await websocket.close(code=1000) # Normal closure except Exception as e: logging.error(f"Error closing WebSocket: {str(e)}") @app.route('/') async def serve_index(): return await send_from_directory(str(FRONTEND_DIR), 'index.html') @app.route('/static/<path:path>') async def serve_static(path): return await send_from_directory(str(STATIC_DIR), path) @app.route('/api/command', methods=['POST']) async def handle_command(): """Handle incoming commands with autonomous learning and self-healing.""" try: # Get the command data data = await request.get_json() if not data: return jsonify({ "status": "error", "message": "No data received" }), 400 # Log the received command logging.info(f"Received command: {json.dumps(data)}") # Process with retry and self-healing result = await handle_command_with_retry(data) # Return the result return jsonify(result) except Exception as e: error_msg = f"Error processing command: {str(e)}" logging.error(error_msg) return jsonify({ "status": "error", "message": error_msg }), 500 if __name__ == '__main__': app.run(host='0.0.0.0', port=8000)

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/PankajBagariya/After-Efffect-MCP'

If you have feedback or need assistance with the MCP directory API, please join our Discord server