Skip to main content
Glama

Fusion360MCP

by jaskirat1616
server.pyβ€’15 kB
#!/usr/bin/env python3 """ Production-ready HTTP/WebSocket server for Fusion 360 MCP Chat UI Provides real-time communication between web chat interface and Fusion 360 backend """ import asyncio import json import logging import os import sqlite3 import traceback from datetime import datetime from pathlib import Path from typing import Dict, Optional, List import aiohttp from aiohttp import web import aiohttp_cors # Configure logging log_path = os.path.expanduser('~/mcp_server.log') logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler(log_path), logging.StreamHandler() ] ) logger = logging.getLogger(__name__) class Database: """SQLite database for conversation persistence""" def __init__(self, db_path: str = '~/mcp_conversations.db'): self.db_path = os.path.expanduser(db_path) self.init_db() def init_db(self): """Initialize database schema""" with sqlite3.connect(self.db_path) as conn: conn.execute(''' CREATE TABLE IF NOT EXISTS conversations ( id TEXT PRIMARY KEY, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, title TEXT, backend TEXT, model TEXT ) ''') conn.execute(''' CREATE TABLE IF NOT EXISTS messages ( id INTEGER PRIMARY KEY AUTOINCREMENT, conversation_id TEXT, role TEXT, content TEXT, code TEXT, execution_result TEXT, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, FOREIGN KEY (conversation_id) REFERENCES conversations(id) ) ''') conn.execute(''' CREATE TABLE IF NOT EXISTS executions ( id INTEGER PRIMARY KEY AUTOINCREMENT, message_id INTEGER, code TEXT, result TEXT, success BOOLEAN, executed_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, FOREIGN KEY (message_id) REFERENCES messages(id) ) ''') conn.commit() def create_conversation(self, conv_id: str, backend: str, model: str) -> None: """Create a new conversation""" with sqlite3.connect(self.db_path) as conn: conn.execute( 'INSERT INTO conversations (id, backend, model) VALUES (?, ?, ?)', (conv_id, backend, model) ) conn.commit() def save_message(self, conv_id: str, role: str, content: str, code: Optional[str] = None) -> int: """Save a message to database""" with sqlite3.connect(self.db_path) as conn: cursor = conn.execute( 'INSERT INTO messages (conversation_id, role, content, code) VALUES (?, ?, ?, ?)', (conv_id, role, content, code) ) conn.commit() return cursor.lastrowid def save_execution(self, message_id: int, code: str, result: str, success: bool) -> None: """Save execution result""" with sqlite3.connect(self.db_path) as conn: conn.execute( 'INSERT INTO executions (message_id, code, result, success) VALUES (?, ?, ?, ?)', (message_id, code, result, success) ) conn.commit() def get_conversations(self, limit: int = 50) -> List[Dict]: """Get recent conversations""" with sqlite3.connect(self.db_path) as conn: conn.row_factory = sqlite3.Row cursor = conn.execute( 'SELECT * FROM conversations ORDER BY updated_at DESC LIMIT ?', (limit,) ) return [dict(row) for row in cursor.fetchall()] def get_messages(self, conv_id: str) -> List[Dict]: """Get messages for a conversation""" with sqlite3.connect(self.db_path) as conn: conn.row_factory = sqlite3.Row cursor = conn.execute( 'SELECT * FROM messages WHERE conversation_id = ? ORDER BY created_at ASC', (conv_id,) ) return [dict(row) for row in cursor.fetchall()] class FusionMCPServer: """WebSocket server for Fusion 360 MCP""" def __init__(self): self.app = web.Application() self.db = Database() self.active_connections: Dict[str, web.WebSocketResponse] = {} self.mcp_instance = None self.setup_routes() def setup_routes(self): """Setup HTTP and WebSocket routes""" self.app.router.add_get('/ws', self.websocket_handler) self.app.router.add_get('/', self.index_handler) self.app.router.add_get('/chat_ui.js', self.js_handler) self.app.router.add_get('/chat_ui.css', self.css_handler) # Setup CORS cors = aiohttp_cors.setup(self.app, defaults={ "*": aiohttp_cors.ResourceOptions( allow_credentials=True, expose_headers="*", allow_headers="*", ) }) for route in list(self.app.router.routes()): cors.add(route) async def index_handler(self, request): """Serve the main chat UI""" html_path = Path(__file__).parent / 'chat_ui.html' if html_path.exists(): return web.FileResponse(html_path) return web.Response(text='Chat UI not found', status=404) async def js_handler(self, request): """Serve JavaScript file""" js_path = Path(__file__).parent / 'chat_ui.js' if js_path.exists(): return web.FileResponse(js_path, headers={'Content-Type': 'application/javascript'}) return web.Response(text='JS not found', status=404) async def css_handler(self, request): """Serve CSS file""" css_path = Path(__file__).parent / 'chat_ui.css' if css_path.exists(): return web.FileResponse(css_path, headers={'Content-Type': 'text/css'}) return web.Response(text='CSS not found', status=404) async def websocket_handler(self, request): """Handle WebSocket connections""" ws = web.WebSocketResponse() await ws.prepare(request) client_id = id(ws) self.active_connections[client_id] = ws logger.info(f'Client {client_id} connected') try: async for msg in ws: if msg.type == aiohttp.WSMsgType.TEXT: await self.handle_message(ws, msg.data, client_id) elif msg.type == aiohttp.WSMsgType.ERROR: logger.error(f'WebSocket error: {ws.exception()}') finally: del self.active_connections[client_id] logger.info(f'Client {client_id} disconnected') return ws async def handle_message(self, ws: web.WebSocketResponse, data: str, client_id: int): """Handle incoming WebSocket messages""" try: message = json.loads(data) msg_type = message.get('type') if msg_type == 'message': await self.handle_user_message(ws, message, client_id) elif msg_type == 'execute': await self.handle_code_execution(ws, message) elif msg_type == 'get_models': await self.handle_get_models(ws, message) elif msg_type == 'set_model': await self.handle_set_model(ws, message) else: logger.warning(f'Unknown message type: {msg_type}') except json.JSONDecodeError as e: logger.error(f'JSON decode error: {e}') await self.send_error(ws, 'Invalid JSON format') except Exception as e: logger.error(f'Error handling message: {e}\n{traceback.format_exc()}') await self.send_error(ws, str(e)) async def handle_user_message(self, ws: web.WebSocketResponse, message: Dict, client_id: int): """Handle user message and generate AI response""" try: content = message.get('content') conv_id = message.get('conversationId') if not content: return # Import here to avoid circular dependency from fusion_mcp_core import FusionMCPCore if not self.mcp_instance: # Initialize with default settings (will be updated via set_model) self.mcp_instance = FusionMCPCore(ai_backend='ollama', model='llama3') # Process the message logger.info(f'Processing message: {content[:50]}...') # Get AI response ai_response, code, result = await asyncio.to_thread( self.mcp_instance.process_prompt_detailed, content ) # Save to database if conv_id: self.db.save_message(conv_id, 'user', content) msg_id = self.db.save_message(conv_id, 'assistant', ai_response, code) if result: self.db.save_execution(msg_id, code or '', str(result), 'Success' in str(result)) # Send response to client await ws.send_json({ 'type': 'response', 'content': ai_response, 'code': code, 'executionResult': result }) except Exception as e: logger.error(f'Error processing message: {e}\n{traceback.format_exc()}') await self.send_error(ws, f'Failed to process message: {str(e)}') async def handle_code_execution(self, ws: web.WebSocketResponse, message: Dict): """Handle code execution request""" try: code = message.get('code') message_id = message.get('messageId') if not code: return # Import here to avoid circular dependency from fusion_mcp_core import CommandExecutor executor = CommandExecutor() # Validate and execute logger.info(f'Executing code for message {message_id}') success = False result = '' try: if executor.validate_script(code): result = await asyncio.to_thread(executor.execute_script, code) success = 'Success' in result or 'successfully' in result.lower() else: result = 'Validation failed' except Exception as e: result = f'Execution error: {str(e)}' success = False # Send result await ws.send_json({ 'type': 'execution_result', 'messageId': message_id, 'result': result, 'success': success }) except Exception as e: logger.error(f'Error executing code: {e}\n{traceback.format_exc()}') await self.send_error(ws, f'Execution failed: {str(e)}') async def handle_get_models(self, ws: web.WebSocketResponse, message: Dict): """Get available models for backend""" try: backend = message.get('backend', 'ollama') api_key = message.get('apiKey') models = [] if backend == 'ollama': try: import ollama # Run in thread pool to avoid blocking models_list = await asyncio.to_thread(ollama.list) models = [model.get('model', model.get('name', '')) for model in models_list.get('models', [])] models = [m for m in models if m] # Filter empty strings except Exception as e: logger.error(f'Error getting Ollama models: {e}') await self.send_error(ws, f'Ollama error: {str(e)}. Is the server running?') return elif backend == 'gemini': try: import google.generativeai as genai if not api_key: await self.send_error(ws, 'API key required for Gemini') return await asyncio.to_thread(genai.configure, api_key=api_key) model_list = await asyncio.to_thread(genai.list_models) models = [m.name for m in model_list if 'generateContent' in m.supported_generation_methods] except Exception as e: logger.error(f'Error getting Gemini models: {e}') await self.send_error(ws, f'Gemini error: {str(e)}') return elif backend == 'openai': # OpenAI models are predefined models = ['gpt-4', 'gpt-4-turbo-preview', 'gpt-3.5-turbo', 'gpt-4o', 'gpt-4o-mini'] logger.info(f'Retrieved {len(models)} models for {backend}') await ws.send_json({ 'type': 'models', 'models': models }) except Exception as e: logger.error(f'Error getting models: {e}') await self.send_error(ws, f'Failed to get models: {str(e)}') async def handle_set_model(self, ws: web.WebSocketResponse, message: Dict): """Set the AI model and backend""" try: backend = message.get('backend') model = message.get('model') api_key = message.get('apiKey') logger.info(f'Setting model: {backend}/{model}') # Import here to avoid circular dependency from fusion_mcp_core import FusionMCPCore # Reinitialize MCP with new settings self.mcp_instance = FusionMCPCore( ai_backend=backend, api_key=api_key if api_key else None, model=model ) logger.info(f'Model set successfully: {backend}/{model}') except Exception as e: logger.error(f'Error setting model: {e}') await self.send_error(ws, f'Failed to set model: {str(e)}') async def send_error(self, ws: web.WebSocketResponse, error_msg: str): """Send error message to client""" await ws.send_json({ 'type': 'error', 'message': error_msg }) def run(self, host='0.0.0.0', port=8888): """Run the server""" logger.info(f'Starting Fusion MCP Server on {host}:{port}') logger.info(f'Chat UI available at http://localhost:{port}') web.run_app(self.app, host=host, port=port) if __name__ == '__main__': server = FusionMCPServer() server.run()

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/jaskirat1616/Fusion360MCP'

If you have feedback or need assistance with the MCP directory API, please join our Discord server