fusion_mcp_core.pyβ’19.6 kB
"""
Enhanced Fusion 360 MCP Core - Production Ready
Improved accuracy, error handling, and validation for AI-assisted design
"""
import json
import os
import traceback
import ast
import time
import logging
from typing import Optional, Tuple, Dict, List
# Try to import Fusion modules (only available when running inside Fusion 360)
try:
import adsk.core
import adsk.fusion
FUSION_AVAILABLE = True
except ImportError:
FUSION_AVAILABLE = False
# Create dummy modules for server-only mode
class DummyModule:
pass
adsk = type('adsk', (), {'core': DummyModule(), 'fusion': DummyModule()})()
# Setup logging
log_path = os.path.expanduser('~/mcp_core.log')
logging.basicConfig(
level=logging.DEBUG,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler(log_path),
logging.StreamHandler()
]
)
logger = logging.getLogger(__name__)
def log(message, level='INFO'):
"""Legacy logging function for compatibility"""
if level == 'ERROR':
logger.error(message)
elif level == 'WARNING':
logger.warning(message)
elif level == 'DEBUG':
logger.debug(message)
else:
logger.info(message)
class ContextManager:
"""Enhanced context management with better memory handling"""
def __init__(self, long_term_file='mcp_memory.json', max_short_term=10):
self.short_term = []
self.long_term = self._load_long_term(long_term_file)
self.max_short_term = max_short_term
self.long_term_file = os.path.expanduser(long_term_file)
self.conversation_metadata = {
'created_at': time.time(),
'total_prompts': 0,
'successful_executions': 0,
'failed_executions': 0
}
def _load_long_term(self, file_path):
file_path = os.path.expanduser(file_path)
if os.path.exists(file_path):
try:
with open(file_path, 'r') as f:
return json.load(f)
except Exception as e:
log(f"Error loading long-term memory: {str(e)}", 'ERROR')
return []
return []
def _save_long_term(self):
try:
with open(self.long_term_file, 'w') as f:
json.dump(self.long_term, f, indent=2)
except Exception as e:
log(f"Error saving long-term memory: {str(e)}", 'ERROR')
def add_message(self, role, content):
self.short_term.append({
'role': role,
'content': content,
'timestamp': time.time()
})
if role == 'user':
self.conversation_metadata['total_prompts'] += 1
def summarize_if_needed(self, ai_interface):
if len(self.short_term) > self.max_short_term:
try:
summary_prompt = (
"Summarize the following design conversation, preserving:\n"
"1. All critical measurements and dimensions\n"
"2. Design intent and constraints\n"
"3. Fusion 360 operations performed\n"
"4. Any errors or corrections made\n\n"
+ '\n'.join([f"{m['role']}: {m['content']}" for m in self.short_term])
)
summary = ai_interface.generate_response(summary_prompt)
self.long_term.append({
'role': 'system',
'content': f"Context Summary: {summary}",
'timestamp': time.time()
})
self._save_long_term()
self.short_term = self.short_term[-2:]
log("Context summarized and saved to long-term memory")
except Exception as e:
log(f"Error during summarization: {str(e)}", 'ERROR')
def get_full_context(self):
"""Get enhanced context with improved system prompt"""
long_term_content = '\n'.join([m['content'] for m in self.long_term])
system_prompt = {
'role': 'system',
'content': f"""You are an expert AI assistant for Fusion 360 CAD automation. Your role is to generate PRECISE, EXECUTABLE Python scripts.
CRITICAL RULES:
1. UNITS: Fusion 360 API uses CENTIMETERS internally. Convert all measurements:
- Input in mm? Divide by 10 (10mm β 1cm)
- Input in inches? Multiply by 2.54
- Always use cm in API calls
2. OUTPUT FORMAT: Return ONLY code in this exact format:
```python
import adsk.core, adsk.fusion, adsk.cam
app = adsk.core.Application.get()
design = adsk.fusion.Design.cast(app.activeProduct)
# Your implementation here
```
3. ERROR PREVENTION:
- Always check if design/component exists
- Handle null returns gracefully
- Use try-except for critical operations
- Validate inputs before use
4. BEST PRACTICES:
- Create sketches on appropriate planes
- Name features for clarity
- Use parameters for key dimensions
- Clean up temporary objects
5. COMMON OPERATIONS:
- Cube (10mm): Sketch 1x1cm rectangle, extrude 1cm
- Cylinder (D=20mm, H=50mm): Sketch circle radius 1cm, extrude 5cm
- Patterns: Use rectangular/circular pattern features
- Fillets: Get edges via BRepEdges, create fillet
EXAMPLE - 10mm cube at origin:
```python
import adsk.core, adsk.fusion, adsk.cam
app = adsk.core.Application.get()
design = adsk.fusion.Design.cast(app.activeProduct)
root = design.rootComponent
# Create sketch
sketch = root.sketches.add(root.xYConstructionPlane)
lines = sketch.sketchCurves.sketchLines
rect = lines.addTwoPointRectangle(
adsk.core.Point3D.create(0, 0, 0),
adsk.core.Point3D.create(1, 1, 0) # 10mm = 1cm
)
# Extrude
profile = sketch.profiles.item(0)
extrudes = root.features.extrudeFeatures
extInput = extrudes.createInput(profile, adsk.fusion.FeatureOperations.NewBodyFeatureOperation)
extInput.setDistanceExtent(False, adsk.core.ValueInput.createByReal(1)) # 10mm = 1cm
extrude = extrudes.add(extInput)
```
CONTEXT FROM PREVIOUS WORK:
{long_term_content if long_term_content else "No previous context"}
Remember: Accuracy is CRITICAL. Double-check units, null checks, and API usage."""
}
return [system_prompt] + self.short_term
class AIInterface:
"""Enhanced AI interface with better error handling and retry logic"""
def __init__(self, backend, api_key=None, model=None, max_retries=3):
self.backend = backend.lower()
self.api_key = api_key
self.model = model or self._default_model()
self.max_retries = max_retries
self.request_count = 0
self.error_count = 0
self._setup()
def _default_model(self):
defaults = {
'openai': 'gpt-4',
'gemini': 'gemini-1.5-pro-latest',
'ollama': 'llama3'
}
return defaults.get(self.backend, 'llama3')
def _setup(self):
"""Setup backend with validation"""
try:
if self.backend == 'openai':
import openai
if self.api_key:
openai.api_key = self.api_key
log(f"OpenAI backend initialized with model: {self.model}")
elif self.backend == 'gemini':
import google.generativeai as genai
if self.api_key:
genai.configure(api_key=self.api_key)
log(f"Gemini backend initialized with model: {self.model}")
elif self.backend == 'ollama':
import ollama
# Test connection
try:
ollama.list()
log(f"Ollama backend initialized with model: {self.model}")
except Exception as e:
log(f"Ollama server check failed: {e}", 'WARNING')
else:
raise ValueError(f"Unsupported backend: {self.backend}")
except ImportError as e:
log(f"Failed to import backend library: {e}", 'ERROR')
raise
def generate_response(self, prompt, context=None):
"""Generate response with enhanced retry logic"""
messages = context or []
messages.append({'role': 'user', 'content': prompt})
for attempt in range(self.max_retries):
try:
self.request_count += 1
log(f"AI request #{self.request_count} (attempt {attempt + 1})")
if self.backend == 'openai':
import openai
resp = openai.ChatCompletion.create(
model=self.model,
messages=messages,
temperature=0.3, # Lower for more consistent code generation
max_tokens=2000
)
return resp.choices[0].message.content
elif self.backend == 'gemini':
import google.generativeai as genai
model = genai.GenerativeModel(self.model)
content = '\n'.join([f"{m['role']}: {m['content']}" for m in messages])
resp = model.generate_content(
content,
generation_config=genai.types.GenerationConfig(
temperature=0.3,
max_output_tokens=2000
)
)
return resp.text
elif self.backend == 'ollama':
import ollama
resp = ollama.chat(
model=self.model,
messages=messages,
options={'temperature': 0.3}
)
return resp['message']['content']
except Exception as e:
self.error_count += 1
log(f"AI API error (attempt {attempt + 1}/{self.max_retries}): {str(e)}", 'ERROR')
if attempt == self.max_retries - 1:
raise Exception(f"AI request failed after {self.max_retries} attempts: {str(e)}")
# Exponential backoff
wait_time = 2 ** attempt
log(f"Retrying in {wait_time} seconds...")
time.sleep(wait_time)
raise Exception("Failed to generate response")
class CommandExecutor:
"""Enhanced executor with better validation and safety"""
def __init__(self):
if FUSION_AVAILABLE:
self.app = adsk.core.Application.get()
self.ui = self.app.userInterface
else:
self.app = None
self.ui = None
self.execution_count = 0
self.validation_failures = 0
def validate_script(self, code):
"""Enhanced validation with more safety checks"""
try:
# Extended forbidden keywords - use regex word boundaries
import re
forbidden_patterns = [
(r'\bdelete\b', 'delete'),
(r'\bremove\b', 'remove'),
(r'\bdestroy\b', 'destroy'),
(r'\bclear\b', 'clear'),
(r'\berase\b', 'erase'),
(r'\bos\.', 'os.'),
(r'\bsys\.', 'sys.'),
(r'\bsubprocess\.', 'subprocess.'),
(r'\beval\s*\(', 'eval('),
(r'\bexec\s*\(', 'exec('),
(r'\b__import__\b', '__import__'),
(r'\bopen\s*\(', 'open('),
(r'\bfile\s*\(', 'file('),
(r'\binput\s*\(', 'input('), # Only match standalone input()
(r'\bshutil\.', 'shutil.'),
(r'\brmtree\b', 'rmtree'),
(r'\bunlink\b', 'unlink')
]
for pattern, name in forbidden_patterns:
if re.search(pattern, code, re.IGNORECASE):
self.validation_failures += 1
raise ValueError(f"Security: Script contains forbidden pattern: {name}")
# Syntax validation
try:
ast.parse(code)
except SyntaxError as e:
self.validation_failures += 1
raise ValueError(f"Syntax error at line {e.lineno}: {e.msg}")
# Check for required imports
if 'import adsk' not in code:
self.validation_failures += 1
raise ValueError("Script must import adsk modules")
# Check for basic structure
if 'adsk.core.Application.get()' not in code:
log("Warning: Script may not access Fusion API properly", 'WARNING')
log("Script validation passed")
return True
except ValueError:
raise
except Exception as e:
self.validation_failures += 1
raise ValueError(f"Validation error: {str(e)}")
def execute_script(self, code):
"""Execute with enhanced error handling and logging"""
self.execution_count += 1
log(f"Executing script #{self.execution_count}")
log(f"Code preview:\n{code[:200]}...")
# If running inside Fusion 360, execute directly
if FUSION_AVAILABLE:
try:
exec_globals = {
'adsk': adsk,
'__name__': '__main__',
'__file__': '<fusion_mcp>'
}
exec(code, exec_globals)
log("Script executed successfully")
return "Success: Design operation completed"
except Exception as e:
error_msg = traceback.format_exc()
log(f"Execution error:\n{error_msg}", 'ERROR')
error_type = type(e).__name__
error_line = self._extract_error_line(error_msg)
return f"Error ({error_type}): {str(e)}\nLine: {error_line}"
# If running from web server, use bridge file
else:
try:
import time
queue_path = os.path.expanduser('~/fusion_mcp_queue.json')
result_path = os.path.expanduser('~/fusion_mcp_result.json')
# Remove old result if exists
if os.path.exists(result_path):
os.remove(result_path)
# Write execution request
with open(queue_path, 'w') as f:
json.dump({'code': code, 'timestamp': time.time()}, f)
log("Execution request sent to Fusion 360 bridge")
# Wait for result (timeout 30 seconds)
start_time = time.time()
while time.time() - start_time < 30:
if os.path.exists(result_path):
with open(result_path, 'r') as f:
result = json.load(f)
os.remove(result_path)
if result['success']:
log("Script executed successfully via bridge")
return "Success: " + result['message']
else:
log(f"Execution error via bridge: {result['message']}", 'ERROR')
return "Error: " + result['message']
time.sleep(0.5)
# Timeout
return "Error: Execution timeout. Make sure Fusion 360 Bridge is running (run fusion_bridge.py in Fusion 360)"
except Exception as e:
error_msg = traceback.format_exc()
log(f"Bridge communication error:\n{error_msg}", 'ERROR')
return f"Error: Bridge communication failed: {str(e)}"
def _extract_error_line(self, traceback_str):
"""Extract relevant error line from traceback"""
lines = traceback_str.split('\n')
for i, line in enumerate(lines):
if 'line' in line.lower() and i + 1 < len(lines):
return lines[i + 1].strip()
return "Unknown"
class FusionMCPCore:
"""Enhanced core with better integration and accuracy"""
def __init__(self, ai_backend, api_key=None, model=None, long_term_file='~/mcp_memory.json'):
self.context_mgr = ContextManager(long_term_file)
self.ai_interface = AIInterface(ai_backend, api_key, model)
self.executor = CommandExecutor()
log(f"FusionMCPCore initialized: {ai_backend}/{model}")
def process_prompt(self, user_prompt):
"""Process prompt and return AI response, result"""
try:
context = self.context_mgr.get_full_context()
ai_response = self.ai_interface.generate_response(user_prompt, context)
self.context_mgr.add_message('user', user_prompt)
self.context_mgr.add_message('assistant', ai_response)
result = self._parse_and_execute(ai_response)
# Track execution results
if 'Success' in result:
self.context_mgr.conversation_metadata['successful_executions'] += 1
else:
self.context_mgr.conversation_metadata['failed_executions'] += 1
self.context_mgr.add_message('system', f"Result: {result}")
self.context_mgr.summarize_if_needed(self.ai_interface)
return ai_response, result
except Exception as e:
log(f"Error processing prompt: {str(e)}", 'ERROR')
return "Error", str(e)
def process_prompt_detailed(self, user_prompt):
"""Process and return AI response, code, and result separately"""
try:
context = self.context_mgr.get_full_context()
ai_response = self.ai_interface.generate_response(user_prompt, context)
self.context_mgr.add_message('user', user_prompt)
self.context_mgr.add_message('assistant', ai_response)
# Extract code
code = None
result = None
if '```python' in ai_response:
code_start = ai_response.find('```python') + len('```python\n')
code_end = ai_response.rfind('```')
code = ai_response[code_start:code_end].strip()
# Validate but don't execute yet (UI will trigger execution)
try:
self.executor.validate_script(code)
result = {'success': True, 'message': 'Code validated, ready to execute'}
except Exception as e:
result = {'success': False, 'message': f'Validation failed: {str(e)}'}
self.context_mgr.add_message('system', f"Result: {result}")
self.context_mgr.summarize_if_needed(self.ai_interface)
return ai_response, code, result
except Exception as e:
log(f"Error processing prompt: {str(e)}", 'ERROR')
return "Error", None, {'success': False, 'message': str(e)}
def _parse_and_execute(self, ai_response):
"""Parse and execute AI response"""
if '```python' in ai_response:
code_start = ai_response.find('```python') + len('```python\n')
code_end = ai_response.rfind('```')
code = ai_response[code_start:code_end].strip()
if self.executor.validate_script(code):
return self.executor.execute_script(code)
else:
return "Validation failed"
elif '```json' in ai_response:
return "JSON plugin execution not yet implemented in core"
else:
return f"No executable content. Response: {ai_response[:200]}"