main.pyā¢16.3 kB
# main.py
import json
import logging
import os
import asyncio
import socket
from pathlib import Path
from quart import Quart, request, jsonify, send_from_directory, websocket
from quart_cors import cors
from gemini_processor import process_command_with_gemini, fix_command_with_gemini, GeminiProcessor
from ae_controller import (
send_command_to_ae,
run_custom_code,
create_text_layer,
create_solid_layer,
create_shape_layer,
handle_gemini_code
)
from code_generator import AECodeGenerator
from gemini_processor import gemini_processor
from file_command_handler import ensure_temp_dir, clean_temp_files, send_command
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
)
# Ensure required directories exist
BASE_DIR = Path(__file__).parent.parent
FRONTEND_DIR = BASE_DIR / 'frontend'
STATIC_DIR = FRONTEND_DIR / 'static'
STATIC_DIR.mkdir(parents=True, exist_ok=True)
app = Quart(__name__, static_folder=str(FRONTEND_DIR))
app = cors(app)
# Initialize Gemini processor and code generator
GEMINI_API_KEY = os.getenv('GEMINI_API_KEY')
if not GEMINI_API_KEY:
logging.warning("GEMINI_API_KEY environment variable is not set. AI features will be disabled.")
HAS_GEMINI = False
else:
HAS_GEMINI = True
generator = AECodeGenerator(GEMINI_API_KEY)
# Store connected clients
connected_clients = set()
# Fix for TCP server binding issue
def fix_tcp_server_binding():
"""
Check if the TCP server can be started and fix any issues.
Returns True if TCP server is available, False if we should use file-based communication.
"""
try:
# Try to create and bind a TCP socket
AE_HOST = "127.0.0.1"
AE_PORT = 8250
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
# Set socket options to reuse address
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
# Try binding to the port
s.bind((AE_HOST, AE_PORT))
s.listen(1)
# If we get here, binding was successful
logging.info(f"TCP server can be bound to {AE_HOST}:{AE_PORT}")
# Close the socket properly
s.close()
return True
except Exception as e:
logging.error(f"TCP server binding failed: {e}")
logging.info("Falling back to file-based communication")
# Ensure temp directory exists for file-based communication
ensure_temp_dir()
clean_temp_files()
return False
# Check TCP server at startup
USE_TCP = fix_tcp_server_binding()
async def handle_command_with_retry(data, max_retries=3):
"""Handle command with automatic retry and self-healing."""
retries = 0
last_error = None
# If data is a string, assume it's a natural language command
if isinstance(data, str):
data = {"command": data}
# Send AI status update
for client in connected_clients:
try:
await client.send_json({
"type": "ai_status",
"status": "Processing"
})
await client.send_json({
"type": "log",
"message": f"Processing command: {json.dumps(data)[:100]}...",
"level": "info"
})
except Exception:
pass
while retries <= max_retries:
try:
# Handle different command types
if isinstance(data, dict):
# Handle Gemini AI command
if data.get("action") == "handle_gemini_code" or "prompt" in data:
for client in connected_clients:
try:
await client.send_json({
"type": "ai_status",
"status": "Gemini Processing"
})
except Exception:
pass
if HAS_GEMINI:
result = await handle_gemini_code(data)
else:
result = {
"status": "error",
"message": "Gemini API key not configured. AI features are disabled."
}
# Handle natural language command
elif "command" in data and isinstance(data["command"], str):
for client in connected_clients:
try:
await client.send_json({
"type": "ai_status",
"status": "Analyzing Command"
})
except Exception:
pass
if HAS_GEMINI:
result = await gemini_processor.process_command_with_gemini(data["command"])
else:
result = {
"status": "error",
"message": "Gemini API key not configured. AI features are disabled."
}
# Handle direct action command
elif "action" in data:
# Use file-based command if TCP is not available
if not USE_TCP:
result = send_command(data)
else:
result = await send_command_to_ae(data)
else:
result = {
"status": "error",
"message": "Invalid command format. Must include 'action' or 'command' field."
}
# If data is not a dict, try to process as natural language
else:
if HAS_GEMINI:
result = await gemini_processor.process_command_with_gemini(str(data))
else:
result = {
"status": "error",
"message": "Gemini API key not configured. AI features are disabled."
}
# Log the result
logging.info(f"Command result: {json.dumps(result)}")
# Send AI status update
for client in connected_clients:
try:
await client.send_json({
"type": "ai_status",
"status": "Ready"
})
await client.send_json({
"type": "log",
"message": f"Command completed with status: {result.get('status', 'unknown')}",
"level": "info" if result.get("status") == "success" else "error"
})
except Exception:
pass
return result
except Exception as e:
last_error = str(e)
retries += 1
logging.error(f"Error processing command (attempt {retries}/{max_retries+1}): {last_error}")
# Send error log
for client in connected_clients:
try:
await client.send_json({
"type": "log",
"message": f"Error (attempt {retries}/{max_retries+1}): {last_error}",
"level": "error"
})
await client.send_json({
"type": "ai_status",
"status": "Retrying" if retries <= max_retries else "Error"
})
except Exception:
pass
if retries <= max_retries:
# Try to fix the command with Gemini
try:
for client in connected_clients:
try:
await client.send_json({
"type": "log",
"message": "Attempting to fix command with Gemini AI...",
"level": "info"
})
except Exception:
pass
fixed_data = await fix_command_with_gemini(last_error, {"original_command": data})
if fixed_data and isinstance(fixed_data, dict):
data = fixed_data
for client in connected_clients:
try:
await client.send_json({
"type": "log",
"message": f"Command fixed by Gemini AI: {json.dumps(fixed_data)[:100]}...",
"level": "success"
})
except Exception:
pass
# Wait a moment before retrying
await asyncio.sleep(1)
except Exception as fix_error:
logging.error(f"Error fixing command: {str(fix_error)}")
for client in connected_clients:
try:
await client.send_json({
"type": "log",
"message": f"Failed to fix command: {str(fix_error)}",
"level": "error"
})
except Exception:
pass
# If we've exhausted retries, return error
return {
"status": "error",
"message": f"Failed after {max_retries+1} attempts. Last error: {last_error}"
}
@app.websocket('/ws')
async def ws():
"""Handle WebSocket connections."""
try:
await websocket.accept()
logging.info("WebSocket connection accepted")
# Add to connected clients
connected_clients.add(websocket)
# Send initial connection status
await websocket.send_json({
"type": "status",
"message": "Connected to server",
"status": "connected"
})
# Send welcome log
await websocket.send_json({
"type": "log",
"message": "Connected to After Effects MCP AI Server",
"level": "success"
})
while True:
try:
# Wait for messages
data = await websocket.receive()
# Process the message
try:
message = json.loads(data)
logging.info(f"Received WebSocket message: {message}")
# Handle different message types
if message.get("type") == "command":
# Send processing status
await websocket.send_json({
"type": "ai_status",
"status": "Processing"
})
# Process the command
result = await handle_command_with_retry(message.get("data", {}))
# Send the result back
await websocket.send_json({
"type": "result",
"status": result.get("status", "error"),
"message": result.get("message", "Unknown result"),
"generated_code": result.get("generated_code", "")
})
elif message.get("type") == "gemini_command":
# Send processing status
await websocket.send_json({
"type": "ai_status",
"status": "Gemini Processing"
})
# Log the Gemini request
await websocket.send_json({
"type": "log",
"message": "Processing with Gemini AI...",
"level": "info"
})
# Process with Gemini
result = await handle_gemini_code(message.get("data", {}))
# Send the result back
await websocket.send_json({
"type": "result",
"status": result.get("status", "error"),
"message": result.get("message", "Unknown result"),
"generated_code": result.get("generated_code", "")
})
else:
# Unknown message type
await websocket.send_json({
"type": "error",
"message": "Unknown message type"
})
except json.JSONDecodeError:
logging.error(f"Invalid JSON received: {data}")
await websocket.send_json({
"type": "error",
"message": "Invalid message format"
})
except Exception as e:
if "disconnect" in str(e).lower():
logging.info("WebSocket disconnected")
break
else:
logging.error(f"WebSocket error: {str(e)}")
try:
await websocket.send_json({
"type": "error",
"message": f"Server error: {str(e)}"
})
except:
pass
except Exception as e:
logging.error(f"WebSocket error: {str(e)}")
try:
await websocket.send_json({
"type": "error",
"message": f"Server error: {str(e)}"
})
except:
pass
finally:
# Remove from connected clients
if websocket in connected_clients:
connected_clients.remove(websocket)
try:
await websocket.close(code=1000) # Normal closure
except Exception as e:
logging.error(f"Error closing WebSocket: {str(e)}")
@app.route('/')
async def serve_index():
return await send_from_directory(str(FRONTEND_DIR), 'index.html')
@app.route('/static/<path:path>')
async def serve_static(path):
return await send_from_directory(str(STATIC_DIR), path)
@app.route('/api/command', methods=['POST'])
async def handle_command():
"""Handle incoming commands with autonomous learning and self-healing."""
try:
# Get the command data
data = await request.get_json()
if not data:
return jsonify({
"status": "error",
"message": "No data received"
}), 400
# Log the received command
logging.info(f"Received command: {json.dumps(data)}")
# Process with retry and self-healing
result = await handle_command_with_retry(data)
# Return the result
return jsonify(result)
except Exception as e:
error_msg = f"Error processing command: {str(e)}"
logging.error(error_msg)
return jsonify({
"status": "error",
"message": error_msg
}), 500
if __name__ == '__main__':
app.run(host='0.0.0.0', port=8000)