server.pyβ’15 kB
#!/usr/bin/env python3
"""
Production-ready HTTP/WebSocket server for Fusion 360 MCP Chat UI
Provides real-time communication between web chat interface and Fusion 360 backend
"""
import asyncio
import json
import logging
import os
import sqlite3
import traceback
from datetime import datetime
from pathlib import Path
from typing import Dict, Optional, List
import aiohttp
from aiohttp import web
import aiohttp_cors
# Configure logging
log_path = os.path.expanduser('~/mcp_server.log')
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler(log_path),
logging.StreamHandler()
]
)
logger = logging.getLogger(__name__)
class Database:
"""SQLite database for conversation persistence"""
def __init__(self, db_path: str = '~/mcp_conversations.db'):
self.db_path = os.path.expanduser(db_path)
self.init_db()
def init_db(self):
"""Initialize database schema"""
with sqlite3.connect(self.db_path) as conn:
conn.execute('''
CREATE TABLE IF NOT EXISTS conversations (
id TEXT PRIMARY KEY,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
title TEXT,
backend TEXT,
model TEXT
)
''')
conn.execute('''
CREATE TABLE IF NOT EXISTS messages (
id INTEGER PRIMARY KEY AUTOINCREMENT,
conversation_id TEXT,
role TEXT,
content TEXT,
code TEXT,
execution_result TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (conversation_id) REFERENCES conversations(id)
)
''')
conn.execute('''
CREATE TABLE IF NOT EXISTS executions (
id INTEGER PRIMARY KEY AUTOINCREMENT,
message_id INTEGER,
code TEXT,
result TEXT,
success BOOLEAN,
executed_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (message_id) REFERENCES messages(id)
)
''')
conn.commit()
def create_conversation(self, conv_id: str, backend: str, model: str) -> None:
"""Create a new conversation"""
with sqlite3.connect(self.db_path) as conn:
conn.execute(
'INSERT INTO conversations (id, backend, model) VALUES (?, ?, ?)',
(conv_id, backend, model)
)
conn.commit()
def save_message(self, conv_id: str, role: str, content: str, code: Optional[str] = None) -> int:
"""Save a message to database"""
with sqlite3.connect(self.db_path) as conn:
cursor = conn.execute(
'INSERT INTO messages (conversation_id, role, content, code) VALUES (?, ?, ?, ?)',
(conv_id, role, content, code)
)
conn.commit()
return cursor.lastrowid
def save_execution(self, message_id: int, code: str, result: str, success: bool) -> None:
"""Save execution result"""
with sqlite3.connect(self.db_path) as conn:
conn.execute(
'INSERT INTO executions (message_id, code, result, success) VALUES (?, ?, ?, ?)',
(message_id, code, result, success)
)
conn.commit()
def get_conversations(self, limit: int = 50) -> List[Dict]:
"""Get recent conversations"""
with sqlite3.connect(self.db_path) as conn:
conn.row_factory = sqlite3.Row
cursor = conn.execute(
'SELECT * FROM conversations ORDER BY updated_at DESC LIMIT ?',
(limit,)
)
return [dict(row) for row in cursor.fetchall()]
def get_messages(self, conv_id: str) -> List[Dict]:
"""Get messages for a conversation"""
with sqlite3.connect(self.db_path) as conn:
conn.row_factory = sqlite3.Row
cursor = conn.execute(
'SELECT * FROM messages WHERE conversation_id = ? ORDER BY created_at ASC',
(conv_id,)
)
return [dict(row) for row in cursor.fetchall()]
class FusionMCPServer:
"""WebSocket server for Fusion 360 MCP"""
def __init__(self):
self.app = web.Application()
self.db = Database()
self.active_connections: Dict[str, web.WebSocketResponse] = {}
self.mcp_instance = None
self.setup_routes()
def setup_routes(self):
"""Setup HTTP and WebSocket routes"""
self.app.router.add_get('/ws', self.websocket_handler)
self.app.router.add_get('/', self.index_handler)
self.app.router.add_get('/chat_ui.js', self.js_handler)
self.app.router.add_get('/chat_ui.css', self.css_handler)
# Setup CORS
cors = aiohttp_cors.setup(self.app, defaults={
"*": aiohttp_cors.ResourceOptions(
allow_credentials=True,
expose_headers="*",
allow_headers="*",
)
})
for route in list(self.app.router.routes()):
cors.add(route)
async def index_handler(self, request):
"""Serve the main chat UI"""
html_path = Path(__file__).parent / 'chat_ui.html'
if html_path.exists():
return web.FileResponse(html_path)
return web.Response(text='Chat UI not found', status=404)
async def js_handler(self, request):
"""Serve JavaScript file"""
js_path = Path(__file__).parent / 'chat_ui.js'
if js_path.exists():
return web.FileResponse(js_path, headers={'Content-Type': 'application/javascript'})
return web.Response(text='JS not found', status=404)
async def css_handler(self, request):
"""Serve CSS file"""
css_path = Path(__file__).parent / 'chat_ui.css'
if css_path.exists():
return web.FileResponse(css_path, headers={'Content-Type': 'text/css'})
return web.Response(text='CSS not found', status=404)
async def websocket_handler(self, request):
"""Handle WebSocket connections"""
ws = web.WebSocketResponse()
await ws.prepare(request)
client_id = id(ws)
self.active_connections[client_id] = ws
logger.info(f'Client {client_id} connected')
try:
async for msg in ws:
if msg.type == aiohttp.WSMsgType.TEXT:
await self.handle_message(ws, msg.data, client_id)
elif msg.type == aiohttp.WSMsgType.ERROR:
logger.error(f'WebSocket error: {ws.exception()}')
finally:
del self.active_connections[client_id]
logger.info(f'Client {client_id} disconnected')
return ws
async def handle_message(self, ws: web.WebSocketResponse, data: str, client_id: int):
"""Handle incoming WebSocket messages"""
try:
message = json.loads(data)
msg_type = message.get('type')
if msg_type == 'message':
await self.handle_user_message(ws, message, client_id)
elif msg_type == 'execute':
await self.handle_code_execution(ws, message)
elif msg_type == 'get_models':
await self.handle_get_models(ws, message)
elif msg_type == 'set_model':
await self.handle_set_model(ws, message)
else:
logger.warning(f'Unknown message type: {msg_type}')
except json.JSONDecodeError as e:
logger.error(f'JSON decode error: {e}')
await self.send_error(ws, 'Invalid JSON format')
except Exception as e:
logger.error(f'Error handling message: {e}\n{traceback.format_exc()}')
await self.send_error(ws, str(e))
async def handle_user_message(self, ws: web.WebSocketResponse, message: Dict, client_id: int):
"""Handle user message and generate AI response"""
try:
content = message.get('content')
conv_id = message.get('conversationId')
if not content:
return
# Import here to avoid circular dependency
from fusion_mcp_core import FusionMCPCore
if not self.mcp_instance:
# Initialize with default settings (will be updated via set_model)
self.mcp_instance = FusionMCPCore(ai_backend='ollama', model='llama3')
# Process the message
logger.info(f'Processing message: {content[:50]}...')
# Get AI response
ai_response, code, result = await asyncio.to_thread(
self.mcp_instance.process_prompt_detailed,
content
)
# Save to database
if conv_id:
self.db.save_message(conv_id, 'user', content)
msg_id = self.db.save_message(conv_id, 'assistant', ai_response, code)
if result:
self.db.save_execution(msg_id, code or '', str(result), 'Success' in str(result))
# Send response to client
await ws.send_json({
'type': 'response',
'content': ai_response,
'code': code,
'executionResult': result
})
except Exception as e:
logger.error(f'Error processing message: {e}\n{traceback.format_exc()}')
await self.send_error(ws, f'Failed to process message: {str(e)}')
async def handle_code_execution(self, ws: web.WebSocketResponse, message: Dict):
"""Handle code execution request"""
try:
code = message.get('code')
message_id = message.get('messageId')
if not code:
return
# Import here to avoid circular dependency
from fusion_mcp_core import CommandExecutor
executor = CommandExecutor()
# Validate and execute
logger.info(f'Executing code for message {message_id}')
success = False
result = ''
try:
if executor.validate_script(code):
result = await asyncio.to_thread(executor.execute_script, code)
success = 'Success' in result or 'successfully' in result.lower()
else:
result = 'Validation failed'
except Exception as e:
result = f'Execution error: {str(e)}'
success = False
# Send result
await ws.send_json({
'type': 'execution_result',
'messageId': message_id,
'result': result,
'success': success
})
except Exception as e:
logger.error(f'Error executing code: {e}\n{traceback.format_exc()}')
await self.send_error(ws, f'Execution failed: {str(e)}')
async def handle_get_models(self, ws: web.WebSocketResponse, message: Dict):
"""Get available models for backend"""
try:
backend = message.get('backend', 'ollama')
api_key = message.get('apiKey')
models = []
if backend == 'ollama':
try:
import ollama
# Run in thread pool to avoid blocking
models_list = await asyncio.to_thread(ollama.list)
models = [model.get('model', model.get('name', '')) for model in models_list.get('models', [])]
models = [m for m in models if m] # Filter empty strings
except Exception as e:
logger.error(f'Error getting Ollama models: {e}')
await self.send_error(ws, f'Ollama error: {str(e)}. Is the server running?')
return
elif backend == 'gemini':
try:
import google.generativeai as genai
if not api_key:
await self.send_error(ws, 'API key required for Gemini')
return
await asyncio.to_thread(genai.configure, api_key=api_key)
model_list = await asyncio.to_thread(genai.list_models)
models = [m.name for m in model_list if 'generateContent' in m.supported_generation_methods]
except Exception as e:
logger.error(f'Error getting Gemini models: {e}')
await self.send_error(ws, f'Gemini error: {str(e)}')
return
elif backend == 'openai':
# OpenAI models are predefined
models = ['gpt-4', 'gpt-4-turbo-preview', 'gpt-3.5-turbo', 'gpt-4o', 'gpt-4o-mini']
logger.info(f'Retrieved {len(models)} models for {backend}')
await ws.send_json({
'type': 'models',
'models': models
})
except Exception as e:
logger.error(f'Error getting models: {e}')
await self.send_error(ws, f'Failed to get models: {str(e)}')
async def handle_set_model(self, ws: web.WebSocketResponse, message: Dict):
"""Set the AI model and backend"""
try:
backend = message.get('backend')
model = message.get('model')
api_key = message.get('apiKey')
logger.info(f'Setting model: {backend}/{model}')
# Import here to avoid circular dependency
from fusion_mcp_core import FusionMCPCore
# Reinitialize MCP with new settings
self.mcp_instance = FusionMCPCore(
ai_backend=backend,
api_key=api_key if api_key else None,
model=model
)
logger.info(f'Model set successfully: {backend}/{model}')
except Exception as e:
logger.error(f'Error setting model: {e}')
await self.send_error(ws, f'Failed to set model: {str(e)}')
async def send_error(self, ws: web.WebSocketResponse, error_msg: str):
"""Send error message to client"""
await ws.send_json({
'type': 'error',
'message': error_msg
})
def run(self, host='0.0.0.0', port=8888):
"""Run the server"""
logger.info(f'Starting Fusion MCP Server on {host}:{port}')
logger.info(f'Chat UI available at http://localhost:{port}')
web.run_app(self.app, host=host, port=port)
if __name__ == '__main__':
server = FusionMCPServer()
server.run()