complete_mcp_fix.py•36.4 kB
#!/usr/bin/env python
"""
Complete MCP Fix
This script fixes all issues with the Motion Control Panel system:
1. Command file detection issue
2. Live Gemini processing display
3. Accurate command execution status
4. Proper Gemini command processing
"""
import os
import sys
import json
import time
import shutil
import logging
import asyncio
import websockets
import requests
from pathlib import Path
from typing import Dict, Any, Optional
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.StreamHandler(sys.stdout),
logging.FileHandler("complete_mcp_fix.log")
]
)
logger = logging.getLogger(__name__)
# Configuration
TEMP_DIR = "C:/ae_temp"
COMMAND_FILE = os.path.join(TEMP_DIR, "command.json")
RESULT_FILE = os.path.join(TEMP_DIR, "result.json")
STATUS_FILE = os.path.join(TEMP_DIR, "status.json")
WEB_UI_PORT = 8000
WEBSOCKET_PATH = "/ws"
# Check for Gemini API key
GEMINI_API_KEY = os.environ.get("GEMINI_API_KEY", "")
if not GEMINI_API_KEY:
try:
# Try to load from .env file
from dotenv import load_dotenv
load_dotenv()
GEMINI_API_KEY = os.environ.get("GEMINI_API_KEY", "")
except ImportError:
pass
# Gemini configuration
GEMINI_ENABLED = bool(GEMINI_API_KEY)
GEMINI_API_URL = "https://generativelanguage.googleapis.com/v1beta/models/gemini-1.5-flash:generateContent"
# System prompt for Gemini
SYSTEM_PROMPT = """You are an AI assistant that helps users control After Effects through natural language commands. Your task is to convert user requests into structured JSON commands that can be executed by After Effects.
The JSON object you return should have this structure:
{
"action": "string", // The action to perform (e.g., 'create_text_layer')
"params": { // Parameters for the action
// Action-specific parameters
},
"confirmation_message": "string" // Message to show user
}
Supported actions and their parameters:
1. import_media:
- filePath: string (required) - Path to the media file
- type: string (optional) - Type of media (image, video, audio)
- position: array (optional) - [x, y] position
- scale: number (optional) - Scale factor
- rotation: number (optional) - Rotation in degrees
- opacity: number (optional) - Opacity (0-100)
2. create_text_layer:
- text: string (required) - Text content
- name: string (optional) - Layer name
- position: array (optional) - [x, y] position
- fontSize: number (optional) - Font size
- color: string (optional) - Color in hex format
- fontFamily: string (optional) - Font family name
3. apply_text_animation:
- layerName: string (required) - Name of text layer
- animationType: string (required) - Type of animation
- duration: number (optional) - Animation duration
- easing: string (optional) - Easing function
4. create_animation:
- type: string (required) - Type of animation (fade, scale, rotation, position, color)
- startValue: any (required) - Starting value
- endValue: any (required) - Ending value
- duration: number (optional) - Duration in seconds
5. apply_color_animation:
- layerName: string (required) - Name of layer
- colors: array (required) - Array of color hex values
- duration: number (required) - Animation duration in seconds
- speed: number (optional) - Animation speed
6. apply_effect:
- effectName: string (required) - Name of effect
- effectParams: object (optional) - Effect properties
7. create_composition:
- name: string (required) - Composition name
- width: number (optional) - Width in pixels
- height: number (optional) - Height in pixels
- duration: number (optional) - Duration in seconds
- frameRate: number (optional) - Frame rate
8. create_solid_layer:
- name: string (required) - Layer name
- width: number (optional) - Width in pixels
- height: number (optional) - Height in pixels
- color: array (optional) - [r, g, b] color values
- position: array (optional) - [x, y] position
9. create_shape_layer:
- shape: string (required) - Type of shape (rectangle, circle, star, polygon)
- position: array (optional) - [x, y] position
- size: array (optional) - [width, height]
- color: string (optional) - Fill color in hex format
10. run_custom_code:
- code: string (required) - Custom ExtendScript code to execute
IMPORTANT: Use EXACTLY these action names as After Effects will reject commands with incorrect action names.
Return ONLY the JSON object, no additional text or explanations."""
class MCPFixer:
"""Complete MCP Fixer class"""
def __init__(self):
self.ensure_temp_dir()
self.websocket = None
self.command_queue = []
self.processing_command = False
self.last_command_time = 0
self.last_check_time = 0
# Check Gemini status
if GEMINI_ENABLED:
logger.info("Gemini API is enabled")
else:
logger.warning("Gemini API is not enabled. Set GEMINI_API_KEY environment variable for AI features.")
def ensure_temp_dir(self):
"""Ensure the temporary directory exists"""
try:
Path(TEMP_DIR).mkdir(parents=True, exist_ok=True)
logger.info(f"Temp directory ready: {TEMP_DIR}")
return True
except Exception as e:
logger.error(f"Failed to create temp directory: {e}")
return False
def clean_temp_files(self):
"""Clean up any existing command or result files"""
try:
if os.path.exists(COMMAND_FILE):
os.remove(COMMAND_FILE)
logger.info(f"Removed existing command file")
if os.path.exists(RESULT_FILE):
os.remove(RESULT_FILE)
logger.info(f"Removed existing result file")
if os.path.exists(STATUS_FILE):
os.remove(STATUS_FILE)
logger.info(f"Removed existing status file")
return True
except Exception as e:
logger.error(f"Error cleaning temp files: {e}")
return False
def create_command_file(self, command_data):
"""Create a command file for After Effects"""
try:
# Ensure directory exists
self.ensure_temp_dir()
# Create a temporary file first
temp_file = f"{COMMAND_FILE}.tmp"
with open(temp_file, 'w') as f:
json.dump(command_data, f, indent=2)
# Move to final location (atomic operation)
shutil.move(temp_file, COMMAND_FILE)
logger.info(f"Command file created at: {COMMAND_FILE}")
logger.info(f"Command data: {json.dumps(command_data)[:100]}...")
# Update status file
self.update_status("command_created", f"Command file created at {time.time()}")
# Update last command time
self.last_command_time = time.time()
return True
except Exception as e:
logger.error(f"Error creating command file: {e}")
self.update_status("error", f"Error creating command file: {str(e)}")
return False
def update_status(self, status, message):
"""Update the status file"""
try:
status_data = {
"status": status,
"message": message,
"timestamp": time.time()
}
with open(STATUS_FILE, 'w') as f:
json.dump(status_data, f, indent=2)
logger.info(f"Status updated: {status} - {message}")
return True
except Exception as e:
logger.error(f"Error updating status: {e}")
return False
async def send_websocket_message(self, message_type, message_data):
"""Send a message to the web UI via websocket"""
if not self.websocket:
logger.warning("No websocket connection available")
return False
try:
message = {
"type": message_type,
"data": message_data,
"timestamp": time.time()
}
await self.websocket.send(json.dumps(message))
logger.info(f"Websocket message sent: {message_type}")
return True
except Exception as e:
logger.error(f"Error sending websocket message: {e}")
return False
async def process_with_gemini(self, prompt: str) -> Dict[str, Any]:
"""Process a prompt with Gemini API"""
if not GEMINI_ENABLED:
return {
"status": "error",
"message": "Gemini API is not enabled. Set GEMINI_API_KEY environment variable."
}
try:
# Update status
await self.send_websocket_message("ai_status", {
"status": "Thinking",
"message": "Gemini is processing your command..."
})
# Prepare the request
headers = {
"Content-Type": "application/json",
"x-goog-api-key": GEMINI_API_KEY
}
data = {
"contents": [
{
"role": "user",
"parts": [
{
"text": f"{SYSTEM_PROMPT}\n\nUser command: {prompt}"
}
]
}
],
"generationConfig": {
"temperature": 0.2,
"topP": 0.8,
"topK": 40
}
}
# Send the request
response = requests.post(
GEMINI_API_URL,
headers=headers,
json=data
)
if response.status_code != 200:
logger.error(f"Gemini API error: {response.status_code} - {response.text}")
return {
"status": "error",
"message": f"Gemini API error: {response.status_code}"
}
# Parse the response
response_data = response.json()
if "candidates" not in response_data or not response_data["candidates"]:
return {
"status": "error",
"message": "No response from Gemini API"
}
# Extract the text
text = response_data["candidates"][0]["content"]["parts"][0]["text"]
# Try to parse as JSON
try:
# Clean up the text - remove markdown code blocks if present
if "```json" in text:
text = text.split("```json")[1].split("```")[0].strip()
elif "```" in text:
text = text.split("```")[1].split("```")[0].strip()
command_json = json.loads(text)
# Validate the command
if "action" not in command_json:
command_json["action"] = "unknown"
if "params" not in command_json:
command_json["params"] = {}
if "confirmation_message" not in command_json:
command_json["confirmation_message"] = f"Executing {command_json['action']}"
return {
"status": "success",
"data": command_json,
"raw_response": text
}
except json.JSONDecodeError:
logger.error(f"Failed to parse Gemini response as JSON: {text}")
return {
"status": "error",
"message": "Failed to parse Gemini response as JSON",
"raw_response": text
}
except Exception as e:
logger.error(f"Error processing with Gemini: {str(e)}")
return {
"status": "error",
"message": f"Error processing with Gemini: {str(e)}"
}
async def map_command_action(self, command):
"""Map command actions to ensure compatibility with After Effects"""
if not isinstance(command, dict):
return command
# Command action mapping
action_mapping = {
# Map to the exact command names in mcp_bridge.jsx
"addTextLayer": "create_text_layer",
"importMedia": "import_media",
"addTextAnimation": "apply_text_animation",
"addLayerAnimation": "create_animation",
"addColorAnimation": "apply_color_animation",
"addEffect": "apply_effect",
"createComposition": "create_composition",
"addSolidLayer": "create_solid_layer",
"addShapeLayer": "create_shape_layer",
"executeScript": "run_custom_code",
# Add missing mappings to ensure all commands work
"createTextLayer": "create_text_layer",
"createAnimation": "create_animation",
"applyTextAnimation": "apply_text_animation",
"applyColorAnimation": "apply_color_animation",
"applyEffect": "apply_effect",
"modifyLayer": "modify_layer",
"deleteLayer": "delete_layer",
"duplicateLayer": "duplicate_layer",
"precomposeLayers": "precompose_layers",
"animateEffect": "animate_effect",
"selectLayerByName": "select_layer_by_name",
"importImageLayer": "import_image_layer",
"runCustomCode": "run_custom_code"
}
# Map the action if needed
if "action" in command and command["action"] in action_mapping:
original_action = command["action"]
command["action"] = action_mapping[original_action]
logger.info(f"Mapped action from '{original_action}' to '{command['action']}'")
return command
async def handle_custom_command(self, command_text):
"""Handle custom commands by generating ExtendScript code"""
try:
# Update status
self.update_status("processing_custom", f"Processing custom command: {command_text}")
# Send processing status to web UI
await self.send_websocket_message("ai_status", {
"status": "Processing",
"message": f"Creating custom command: {command_text}"
})
# Process with Gemini to create custom ExtendScript code
if GEMINI_ENABLED:
# Create a custom prompt for ExtendScript generation
custom_prompt = f"""Generate ExtendScript code for Adobe After Effects to perform the following task: {command_text}
The code will be executed in After Effects and should be valid ExtendScript. Use the following variables that are available in the context:
- app: The After Effects application object
- comp: The active composition
- project: The current project
Return ONLY the ExtendScript code without any explanations or markdown formatting. Do not include ```javascript or ``` markers.
Example of good response:
var myLayer = comp.layers.addText("Hello World");
myLayer.position.setValue([960, 540]);
"""
# Send thinking status
await self.send_websocket_message("ai_status", {
"status": "Thinking",
"message": "Creating custom ExtendScript code..."
})
# Process with Gemini
result = await self.process_with_gemini(custom_prompt)
if result["status"] == "success":
# Extract the code from the response
code = result["raw_response"]
# Create a command to run custom code
command = {
"action": "run_custom_code",
"params": {
"code": code
},
"confirmation_message": f"Executing custom command: {command_text}"
}
# Send processing status
await self.send_websocket_message("ai_status", {
"status": "Creating",
"message": "Custom code generated"
})
# Send log message
await self.send_websocket_message("log", {
"message": f"Generated ExtendScript code for: {command_text}",
"level": "info"
})
# Create the command file
self.create_command_file(command)
# Send confirmation message
await self.send_websocket_message("log", {
"message": command.get("confirmation_message"),
"level": "success"
})
return {
"status": "success",
"message": "Custom command generated"
}
else:
# Fallback to simple text layer if Gemini fails
logger.warning(f"Gemini processing failed: {result['message']}")
# Send error status
await self.send_websocket_message("ai_status", {
"status": "Error",
"message": "Error creating custom command, using fallback"
})
# Create a fallback command
command = {
"action": "create_text_layer",
"params": {
"text": f"Custom Command Failed: {command_text}",
"fontSize": 50,
"color": "#FF5555"
},
"confirmation_message": "Created fallback text layer"
}
# Create the command file
self.create_command_file(command)
return {
"status": "error",
"message": f"Failed to create custom command: {result['message']}"
}
else:
# Gemini not enabled, use simple text layer
logger.warning("Gemini not enabled, using simple text layer")
# Send status
await self.send_websocket_message("ai_status", {
"status": "Limited",
"message": "Gemini AI not enabled, using simple command"
})
# Create a simple command
command = {
"action": "create_text_layer",
"params": {
"text": f"Custom Command: {command_text}",
"fontSize": 50,
"color": "#00AAFF"
},
"confirmation_message": "Created text layer with command"
}
# Create the command file
self.create_command_file(command)
return {
"status": "success",
"message": "Simple command created"
}
except Exception as e:
logger.error(f"Error handling custom command: {e}")
# Update status
self.update_status("error", f"Error handling custom command: {str(e)}")
# Send error status to web UI
await self.send_websocket_message("ai_status", {
"status": "Error",
"message": f"Error handling custom command: {str(e)}"
})
return {
"status": "error",
"message": f"Error handling custom command: {str(e)}"
}
async def process_gemini_command(self, command_text):
"""Process a Gemini command"""
try:
# Update status
self.update_status("processing_gemini", f"Processing Gemini command: {command_text}")
# Send processing status to web UI
await self.send_websocket_message("ai_status", {
"status": "Processing",
"message": f"Processing command: {command_text}"
})
# Check if this is a custom command request
is_custom_command = False
custom_command_keywords = ["custom", "create code", "write code", "generate script", "make a script", "custom command"]
for keyword in custom_command_keywords:
if keyword.lower() in command_text.lower():
is_custom_command = True
break
if is_custom_command:
# Handle as a custom command
return await self.handle_custom_command(command_text)
# Process with Gemini
if GEMINI_ENABLED:
# Send thinking status
await self.send_websocket_message("ai_status", {
"status": "Thinking",
"message": "Analyzing your command..."
})
# Send log message
await self.send_websocket_message("log", {
"message": "Analyzing command with Gemini AI...",
"level": "info"
})
# Process with Gemini
result = await self.process_with_gemini(command_text)
if result["status"] == "success":
# Map the command action to ensure compatibility
command = await self.map_command_action(result["data"])
# Send processing status
await self.send_websocket_message("ai_status", {
"status": "Creating",
"message": f"Creating command: {command['action']}"
})
# Send log message
await self.send_websocket_message("log", {
"message": f"Gemini interpreted command as: {command['action']}",
"level": "info"
})
# Create the command file
self.create_command_file(command)
# Send confirmation message
await self.send_websocket_message("log", {
"message": command.get("confirmation_message", f"Executing {command['action']}"),
"level": "success"
})
else:
# Try as custom command if normal processing fails
logger.warning(f"Standard command processing failed, trying as custom command")
# Send status
await self.send_websocket_message("ai_status", {
"status": "Adapting",
"message": "Trying as custom command..."
})
return await self.handle_custom_command(command_text)
else:
# Gemini not enabled, use simple text layer
logger.warning("Gemini not enabled, using simple text layer")
# Send status
await self.send_websocket_message("ai_status", {
"status": "Limited",
"message": "Gemini AI not enabled, using simple command"
})
# Create a simple command
command = {
"action": "create_text_layer",
"params": {
"text": f"Command: {command_text}",
"fontSize": 50,
"color": "#00AAFF"
},
"confirmation_message": "Created text layer with command"
}
# Create the command file
self.create_command_file(command)
# Update status
self.update_status("gemini_processed", "Gemini command processed")
# Send completion status to web UI
await self.send_websocket_message("ai_status", {
"status": "Ready",
"message": "Command processed. Click 'Check for Command' in After Effects."
})
# Send log message to web UI
await self.send_websocket_message("log", {
"message": "Command processed. Click 'Check for Command' in After Effects.",
"level": "info"
})
# Send reminder
await self.send_websocket_message("reminder", {
"message": "⚠️ Remember to click 'Check for Command' in After Effects MCP Bridge panel",
"level": "warning"
})
return {
"status": "success",
"message": "Gemini command processed. Click 'Check for Command' in After Effects."
}
except Exception as e:
logger.error(f"Error processing Gemini command: {e}")
# Update status
self.update_status("error", f"Error processing Gemini command: {str(e)}")
# Send error status to web UI
await self.send_websocket_message("ai_status", {
"status": "Error",
"message": f"Error processing command: {str(e)}"
})
return {
"status": "error",
"message": f"Error processing Gemini command: {str(e)}"
}
async def monitor_result_file(self):
"""Monitor the result file from After Effects"""
while True:
try:
# Check if there's a result file
if os.path.exists(RESULT_FILE):
try:
with open(RESULT_FILE, 'r') as f:
result_data = json.load(f)
logger.info(f"Result file found: {json.dumps(result_data)[:100]}...")
# Update status
self.update_status("command_processed", f"Command processed: {json.dumps(result_data)}")
# Send result to web UI
await self.send_websocket_message("result", result_data)
# Send log message to web UI
await self.send_websocket_message("log", {
"message": f"After Effects processed command: {result_data.get('message', 'No message')}",
"level": "success"
})
# Remove the result file
os.remove(RESULT_FILE)
except Exception as e:
logger.error(f"Error processing result file: {e}")
# Check if command file has been processed
if self.last_command_time > 0 and time.time() - self.last_command_time > 10:
if not os.path.exists(COMMAND_FILE):
# Command file was processed but no result file was created
logger.info("Command file was processed but no result file was created")
# Update status
self.update_status("command_processed_no_result", "Command file was processed but no result file was created")
# Send message to web UI
await self.send_websocket_message("log", {
"message": "Command file was processed but no result file was created",
"level": "warning"
})
# Reset last command time
self.last_command_time = 0
# Check if it's time to remind the user
current_time = time.time()
if self.last_check_time == 0 or current_time - self.last_check_time > 30:
if os.path.exists(COMMAND_FILE):
# Remind the user to check for commands
logger.info("Reminder: Command file exists, click 'Check for Command' in After Effects")
# Send reminder to web UI
await self.send_websocket_message("reminder", {
"message": "⚠️ Remember to click 'Check for Command' in After Effects MCP Bridge panel",
"level": "warning"
})
# Update last check time
self.last_check_time = current_time
# Process any commands in the queue
if self.command_queue and not self.processing_command:
self.processing_command = True
command = self.command_queue.pop(0)
if isinstance(command, str):
# Process as Gemini command
await self.process_gemini_command(command)
else:
# Process as direct command
self.create_command_file(command)
self.processing_command = False
# Check file status and send to UI
await self.check_file_status()
except Exception as e:
logger.error(f"Error in result monitor: {e}")
# Wait before checking again
await asyncio.sleep(1)
async def check_file_status(self):
"""Check file status and send to UI"""
try:
if self.websocket:
command_file_exists = os.path.exists(COMMAND_FILE)
result_file_exists = os.path.exists(RESULT_FILE)
await self.send_websocket_message("file_status", {
"command_file_exists": command_file_exists,
"result_file_exists": result_file_exists
})
except Exception as e:
logger.error(f"Error checking file status: {e}")
async def handle_websocket(self, websocket, path):
"""Handle websocket connection from web UI"""
logger.info(f"Websocket connection established: {path}")
# Store the websocket connection
self.websocket = websocket
# Send initial status to web UI
await self.send_websocket_message("status", {
"status": "connected",
"message": "Connected to MCP Fixer"
})
# Send Gemini status
await self.send_websocket_message("ai_status", {
"status": "Ready",
"message": f"Gemini AI {'enabled' if GEMINI_ENABLED else 'disabled'}"
})
# Check file status
await self.check_file_status()
try:
async for message in websocket:
try:
data = json.loads(message)
logger.info(f"Received websocket message: {json.dumps(data)[:100]}...")
# Handle different message types
if data.get("type") == "command":
# Add to command queue
self.command_queue.append(data.get("data"))
# Send acknowledgment to web UI
await self.send_websocket_message("ack", {
"message": "Command received and queued for processing",
"queue_position": len(self.command_queue)
})
elif data.get("type") == "gemini_command":
# Add to command queue
self.command_queue.append(data.get("text"))
# Send acknowledgment to web UI
await self.send_websocket_message("ack", {
"message": "Gemini command received and queued for processing",
"queue_position": len(self.command_queue)
})
elif data.get("type") == "check_file_status":
# Check file status
await self.check_file_status()
except json.JSONDecodeError:
logger.error(f"Invalid JSON received: {message}")
except Exception as e:
logger.error(f"Error processing websocket message: {e}")
except websockets.exceptions.ConnectionClosed:
logger.info("Websocket connection closed")
finally:
self.websocket = None
async def start_websocket_server(self):
"""Start the websocket server"""
try:
server = await websockets.serve(
self.handle_websocket,
"localhost",
WEB_UI_PORT + 1 # Use a different port to avoid conflicts
)
logger.info(f"Websocket server started on port {WEB_UI_PORT + 1}")
# Keep the server running
await server.wait_closed()
except Exception as e:
logger.error(f"Error starting websocket server: {e}")
async def run(self):
"""Run the MCP Fixer"""
logger.info("Starting MCP Fixer...")
# Clean up any existing files
self.clean_temp_files()
# Create initial status
self.update_status("starting", "MCP Fixer starting")
# Start the result file monitor
monitor_task = asyncio.create_task(self.monitor_result_file())
# Start the websocket server
websocket_task = asyncio.create_task(self.start_websocket_server())
# Create a test command
test_command = {
"action": "create_text_layer",
"params": {
"text": "MCP Fixer Test",
"fontSize": 72,
"color": "#00FF00"
}
}
self.create_command_file(test_command)
logger.info("MCP Fixer started")
logger.info("Test command file created")
logger.info("Please click 'Check for Command' in After Effects")
# Wait for tasks to complete (they won't)
await asyncio.gather(monitor_task, websocket_task)
async def main():
"""Main function"""
logger.info("=" * 60)
logger.info("Complete MCP Fix")
logger.info("=" * 60)
fixer = MCPFixer()
await fixer.run()
if __name__ == "__main__":
asyncio.run(main())