Skip to main content
Glama

Ultralytics MCP Server

server.js•28.4 kB
#!/usr/bin/env node /** * Ultralytics MCP Server with SSE Support - Docker Version */ const express = require('express'); const { randomUUID } = require('node:crypto'); const { exec } = require('child_process'); const util = require('util'); // Use Docker environment paths for MCP SDK const { McpServer } = require('/app/node_modules/@modelcontextprotocol/sdk/dist/cjs/server/mcp.js'); const { SSEServerTransport } = require('/app/node_modules/@modelcontextprotocol/sdk/dist/cjs/server/sse.js'); const { z } = require('zod'); const execAsync = util.promisify(exec); /** * Execute Python code in Ultralytics container via Docker exec */ async function executeInUltralyticsContainer(pythonCode) { try { console.log('šŸ Executing Python code in Ultralytics container...'); // Use base64 encoding to safely pass Python code to avoid shell escaping issues const encodedCode = Buffer.from(pythonCode).toString('base64'); const dockerCommand = `docker exec ultralytics-container python3 -c "import base64; exec(base64.b64decode('${encodedCode}').decode('utf-8'))"`; const { stdout, stderr } = await execAsync(dockerCommand); if (stderr && !stderr.includes('WARNING')) { console.error('🚨 Python execution stderr:', stderr); } const result = stdout || stderr || 'Code executed successfully (no output)'; console.log('āœ… Python execution completed'); return result; } catch (error) { console.error('āŒ Python execution failed:', error); const errorMsg = error.stderr || error.message || 'Unknown execution error'; return `āŒ Python execution failed: ${errorMsg}`; } } /** * Create the MCP Server with Ultralytics tools */ const createUltralyticsServer = () => { const server = new McpServer({ name: 'ultralytics_mcp', version: '1.0.0', }, { capabilities: { tools: {}, logging: {} } }); // Tool 1: Python Code Execution server.tool( 'execute_python_code', 'Execute Python code with Ultralytics libraries', { code: z.string().describe('Python code to execute') }, async ({ code }) => { try { console.log('šŸ Executing Python code:', code.substring(0, 50) + '...'); const result = await executeInUltralyticsContainer(code); return { content: [ { type: 'text', text: `āœ… Python code executed!\nResult:\n${result}` } ] }; } catch (error) { return { content: [ { type: 'text', text: `āŒ Python execution failed: ${error.message}` } ], isError: true }; } } ); // Tool 2: YOLO Operations server.tool( 'yolo_operation', 'Perform YOLO operations', { operation: z.enum(['detect', 'train', 'predict']).describe('YOLO operation'), model: z.string().default('yolo11n.pt').describe('YOLO model'), source: z.string().optional().describe('Input source (image/video path)') }, async ({ operation, model, source }) => { try { console.log(`šŸŽÆ YOLO ${operation} with ${model}`); // Build YOLO command let yoloCommand = `from ultralytics import YOLO; model = YOLO('${model}'); `; switch (operation) { case 'detect': case 'predict': if (source) { yoloCommand += `results = model.predict('${source}', save=True); print(f"Detection completed. Found {len(results)} result(s). Results saved to: {results[0].save_dir if hasattr(results[0], 'save_dir') else 'runs/detect/predict'}")`; } else { yoloCommand += `print(f"Model {model} loaded and ready for detection")`; } break; case 'train': if (source) { yoloCommand += ` # Training without tensorboard parameter (not supported in this version) results = model.train( data='${source}', epochs=3, imgsz=640, save=True, plots=True, project='/ultralytics/runs', name='detect/train_tb' ); print(f"Training completed! Results saved to: {results.save_dir if hasattr(results, 'save_dir') else 'runs/detect/train'}")`; } else { yoloCommand += ` # Training without tensorboard parameter (not supported in this version) results = model.train( data='coco8.yaml', epochs=3, imgsz=640, save=True, plots=True, project='/ultralytics/runs', name='detect/train_tb' ); print(f"Training completed with coco8 dataset! Results saved to: {results.save_dir if hasattr(results, 'save_dir') else 'runs/detect/train'}")`; } break; } const result = await executeInUltralyticsContainer(yoloCommand); return { content: [ { type: 'text', text: `āœ… YOLO ${operation} completed with ${model}!\nResult:\n${result}` } ] }; } catch (error) { return { content: [ { type: 'text', text: `āŒ YOLO operation failed: ${error.message}` } ], isError: true }; } } ); // Tool 3: List Training Results server.tool( 'list_training_results', 'List all available training and detection results', {}, async () => { try { console.log('šŸ“ Listing training results...'); const listCommand = ` import os import json from pathlib import Path def scan_results(): results = {"training": [], "detection": []} runs_path = Path("/ultralytics/runs") if runs_path.exists(): # Scan training results if (runs_path / "detect").exists(): for folder in (runs_path / "detect").iterdir(): if folder.is_dir() and folder.name.startswith("train"): info = {"name": folder.name, "path": str(folder)} # Check for results.csv if (folder / "results.csv").exists(): info["has_metrics"] = True # Read last line for final metrics with open(folder / "results.csv", "r") as f: lines = f.readlines() if len(lines) > 1: info["final_metrics"] = lines[-1].strip() # Check for weights weights_dir = folder / "weights" if weights_dir.exists(): info["weights"] = [w.name for w in weights_dir.glob("*.pt")] results["training"].append(info) elif folder.name.startswith("predict") or folder.name.startswith("detect"): results["detection"].append({ "name": folder.name, "path": str(folder), "images": len(list(folder.glob("*.jpg"))) + len(list(folder.glob("*.png"))) }) return results results = scan_results() print(json.dumps(results, indent=2)) `; const result = await executeInUltralyticsContainer(listCommand); return { content: [ { type: 'text', text: `šŸ“ Available Training & Detection Results:\n\n${result}` } ] }; } catch (error) { return { content: [ { type: 'text', text: `āŒ Failed to list results: ${error.message}` } ], isError: true }; } } ); // Tool 4: Analyze Training Results server.tool( 'analyze_training_results', 'Analyze specific training results with detailed metrics', { training_name: z.string().describe('Training folder name (e.g., train, train2)') }, async ({ training_name }) => { try { console.log(`šŸ“Š Analyzing training results: ${training_name}`); const analyzeCommand = ` import pandas as pd import json from pathlib import Path def analyze_training(train_name): base_path = Path(f"/ultralytics/runs/detect/{train_name}") analysis = {"training_name": train_name, "found": False} if not base_path.exists(): return analysis analysis["found"] = True analysis["path"] = str(base_path) # Read results.csv for metrics results_file = base_path / "results.csv" if results_file.exists(): df = pd.read_csv(results_file) analysis["epochs"] = len(df) if len(df) > 0: final_row = df.iloc[-1] analysis["final_metrics"] = { "epoch": int(final_row.get("epoch", 0)), "train_box_loss": float(final_row.get("train/box_loss", 0)), "train_cls_loss": float(final_row.get("train/cls_loss", 0)), "train_dfl_loss": float(final_row.get("train/dfl_loss", 0)), "val_box_loss": float(final_row.get("val/box_loss", 0)), "val_cls_loss": float(final_row.get("val/cls_loss", 0)), "val_dfl_loss": float(final_row.get("val/dfl_loss", 0)), "precision": float(final_row.get("metrics/precision(B)", 0)), "recall": float(final_row.get("metrics/recall(B)", 0)), "mAP50": float(final_row.get("metrics/mAP50(B)", 0)), "mAP50_95": float(final_row.get("metrics/mAP50-95(B)", 0)) } # Calculate improvement if len(df) > 1: first_row = df.iloc[0] analysis["improvement"] = { "mAP50_change": float(final_row.get("metrics/mAP50(B)", 0)) - float(first_row.get("metrics/mAP50(B)", 0)), "loss_reduction": float(first_row.get("train/box_loss", 0)) - float(final_row.get("train/box_loss", 0)) } # Check available files analysis["files"] = { "weights": list(str(p.name) for p in (base_path / "weights").glob("*.pt")) if (base_path / "weights").exists() else [], "plots": list(str(p.name) for p in base_path.glob("*.png")), "has_confusion_matrix": (base_path / "confusion_matrix.png").exists(), "has_results_plot": (base_path / "results.png").exists() } return analysis result = analyze_training("${training_name}") print(json.dumps(result, indent=2)) `; const result = await executeInUltralyticsContainer(analyzeCommand); return { content: [ { type: 'text', text: `šŸ“Š Training Analysis for "${training_name}":\n\n${result}` } ] }; } catch (error) { return { content: [ { type: 'text', text: `āŒ Analysis failed: ${error.message}` } ], isError: true }; } } ); // Tool 5: View TensorBoard server.tool( 'view_tensorboard', 'Access native TensorBoard visualization (TensorBoard is automatically enabled)', { training_name: z.string().optional().describe('Specific training to focus on (optional)') }, async ({ training_name }) => { try { console.log('šŸ“ˆ Accessing TensorBoard...'); let info = `šŸ“Š **Native TensorBoard is automatically enabled!**\n\n`; info += `🌐 **Access URL**: http://localhost:6006\n`; info += `šŸ“ **Log Directory**: /ultralytics/runs\n`; info += `āœ… **Status**: TensorBoard logs are automatically created during training\n\n`; if (training_name) { info += `šŸŽÆ **Focused on**: ${training_name}\n`; info += `šŸ“‚ **Specific logs**: /ultralytics/runs/detect/${training_name}\n\n`; } info += `šŸ’” **How it works**:\n`; info += `• TensorBoard is enabled by default (tensorboard=True)\n`; info += `• Logs are automatically created during training\n`; info += `• Real-time metrics: loss, accuracy, mAP, learning rate\n`; info += `• No manual setup required!\n\n`; // Check if TensorBoard process is running const checkCommand = ` import subprocess import psutil import os from pathlib import Path def check_tensorboard(): # Check if TensorBoard process is running tb_running = False for proc in psutil.process_iter(['pid', 'name', 'cmdline']): try: if 'tensorboard' in proc.info['name'].lower(): tb_running = True break except: continue # Check for event files runs_path = Path("/ultralytics/runs") event_files = [] if runs_path.exists(): event_files = list(runs_path.rglob("events.out.tfevents.*")) print(f"TensorBoard Process: {'āœ… Running' if tb_running else 'āŒ Not running'}") print(f"Event Files Found: {len(event_files)}") if event_files: print("Recent training sessions with TensorBoard logs:") for f in event_files[-5:]: # Show last 5 print(f" šŸ“„ {f.parent.name}: {f.name[:20]}...") if not tb_running and event_files: print("\\nšŸš€ Starting TensorBoard server...") cmd = ["tensorboard", "--logdir", "/ultralytics/runs", "--host", "0.0.0.0", "--port", "6006", "--bind_all"] subprocess.Popen(cmd, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) print("āœ… TensorBoard started at http://localhost:6006") check_tensorboard() `; const result = await executeInUltralyticsContainer(checkCommand); return { content: [ { type: 'text', text: `${info}šŸ“‹ **Current Status**:\n${result}` } ] }; } catch (error) { return { content: [ { type: 'text', text: `āŒ TensorBoard check failed: ${error.message}` } ], isError: true }; } } ); // Tool 6: Launch Streamlit Interface server.tool( 'launch_streamlit_interface', 'Launch Streamlit web interface for interactive YOLO inference', { model: z.string().optional().describe('YOLO model to use (default: yolo11n.pt)') }, async ({ model = 'yolo11n.pt' }) => { try { console.log('🌐 Launching Streamlit Interface...'); const launchCommand = ` import subprocess import os import sys import time from pathlib import Path def launch_streamlit(): # Kill any existing Streamlit processes try: subprocess.run(["pkill", "-f", "streamlit"], check=False) time.sleep(2) except: pass # Create Streamlit app script streamlit_script = ''' import sys sys.path.append("/ultralytics") sys.path.append("/workspace") from enhanced_streamlit_inference import EnhancedInference if __name__ == "__main__": model = "${model}" app = EnhancedInference(model=model) app.inference() ''' # Write the script to a file with open("/workspace/streamlit_app.py", "w") as f: f.write(streamlit_script) # Launch Enhanced Streamlit try: cmd = [ sys.executable, "-m", "streamlit", "run", "/workspace/enhanced_streamlit_inference.py", "--server.address", "0.0.0.0", "--server.port", "8501", "--server.headless", "true", "--server.enableCORS", "false", "--server.enableXsrfProtection", "false", "--server.maxUploadSize", "200" ] process = subprocess.Popen( cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, cwd="/ultralytics" ) time.sleep(5) # Wait for startup print("āœ… Enhanced Streamlit Interface launched successfully!") print(f"🌐 Access URL: http://localhost:8501") print(f"šŸ“± Enhanced Features available:") print(f" • šŸ·ļø Pre-trained YOLO models") print(f" • šŸŽÆ Custom trained models from your runs") print(f" • šŸ“¹ Webcam, video & image inference") print(f" • šŸ”„ Real-time object tracking") print(f" • šŸŽ›ļø Advanced threshold controls") print(f" • šŸŽØ Enhanced UI with model details") print(f" • šŸ“Š Detection statistics") print(f"šŸ”„ Process ID: {process.pid}") print(f"šŸ“¦ Default Model: {model}") print(f"šŸŽÆ Custom Models: Auto-scanned from /ultralytics/runs") return {"success": True, "port": 8501, "pid": process.pid} except Exception as e: print(f"āŒ Failed to launch Streamlit: {e}") return {"error": str(e)} result = launch_streamlit() `; const result = await executeInUltralyticsContainer(launchCommand); return { content: [ { type: 'text', text: `🌐 **Enhanced Streamlit Interface Status:**\n\n${result}\n\nšŸŽÆ **Enhanced Features:**\n• šŸ·ļø **Pre-trained Models**: Official YOLO models (COCO dataset)\n• šŸŽÆ **Custom Models**: Auto-scanned from your training runs\n• šŸ“¹ **Multi-Source**: Webcam, video files, and images\n• šŸŽ›ļø **Advanced Controls**: Confidence, IoU, class selection\n• šŸ”„ **Object Tracking**: Multi-object tracking support\n• šŸ“Š **Statistics**: Real-time detection metrics\n• šŸŽØ **Enhanced UI**: Modern interface with model details\n\nšŸš€ **Quick Start:**\n1. Open http://localhost:8501\n2. Choose Model Category (Pre-trained or Custom)\n3. Select your model\n4. Choose source and upload files\n5. Adjust settings and click "Start Inference"\n\nšŸ’” **Custom Models**: Automatically scans /ultralytics/runs for your trained models!` } ] }; } catch (error) { return { content: [ { type: 'text', text: `āŒ Failed to launch Streamlit interface: ${error.message}` } ], isError: true }; } } ); // Tool 7: System Information & Interface URLs server.tool( 'get_system_info', 'Get comprehensive system information including all available interfaces and their URLs', {}, async () => { try { console.log('šŸ“Š Getting system information...'); const systemInfoCommand = ` import json import subprocess import psutil import torch from pathlib import Path import sys def get_system_info(): info = { "system": { "python_version": sys.version, "torch_version": torch.__version__, "cuda_available": torch.cuda.is_available(), "gpu_count": torch.cuda.device_count() if torch.cuda.is_available() else 0 }, "interfaces": { "streamlit": { "url": "http://localhost:8501", "status": "active", "description": "Interactive AI inference interface with webcam, video, and image upload support", "features": [ "Real-time object detection", "Webcam inference", "File upload (images/videos)", "Confidence & IoU threshold adjustment", "Object tracking", "Side-by-side comparison", "Multiple model support" ] }, "tensorboard": { "url": "http://localhost:6006", "status": "active", "description": "Training metrics visualization and monitoring dashboard", "features": [ "Training loss curves", "Validation metrics", "Model architecture visualization", "Real-time training progress", "Hyperparameter tracking", "Scalar metrics plotting" ] }, "jupyter": { "url": "http://localhost:8888", "status": "active", "description": "Development environment for AI experimentation", "features": [ "Interactive notebooks", "Code development", "Data exploration", "Model prototyping", "Visualization tools" ] }, "mcp_server": { "url": "http://localhost:8092", "status": "active", "description": "Model Context Protocol server for N8N integration", "features": [ "7 AI tools available", "Training automation", "Prediction workflows", "Results management", "Interface launching", "Metrics monitoring" ] } }, "tools_available": [ "yolo_train - Train custom YOLO models", "yolo_predict - Run inference on images/videos", "list_model_results - Browse training outputs", "get_training_metrics - Extract performance data", "check_tensorboard_status - Monitor training progress", "launch_streamlit_interface - Start web UI", "get_system_info - This tool for system overview" ], "storage": { "models_path": "/ultralytics/runs", "results_path": "/ultralytics/runs/detect", "training_path": "/ultralytics/runs/detect/train*", "workspace": "/workspace" }, "quick_access": { "streamlit_ui": "http://localhost:8501 - Visual AI Interface", "tensorboard": "http://localhost:6006 - Training Metrics", "jupyter": "http://localhost:8888 - Development", "n8n_workflows": "http://localhost:5678 - Automation" } } # Check actual process status try: # Check Streamlit streamlit_proc = subprocess.run(["pgrep", "-f", "streamlit"], capture_output=True) info["interfaces"]["streamlit"]["process_active"] = streamlit_proc.returncode == 0 # Check TensorBoard tensorboard_proc = subprocess.run(["pgrep", "-f", "tensorboard"], capture_output=True) info["interfaces"]["tensorboard"]["process_active"] = tensorboard_proc.returncode == 0 except Exception as e: info["process_check_error"] = str(e) return info result = get_system_info() print(json.dumps(result, indent=2)) `; const result = await executeInUltralyticsContainer(systemInfoCommand); const systemData = JSON.parse(result); const formattedInfo = `šŸ–„ļø **ULTRALYTICS AI SYSTEM OVERVIEW** ============================================= šŸš€ **ACTIVE INTERFACES:** • 🌐 **Streamlit UI**: http://localhost:8501 āžœ Interactive AI inference with webcam & file upload • šŸ“Š **TensorBoard**: http://localhost:6006 āžœ Training metrics visualization & monitoring • šŸ““ **Jupyter Lab**: http://localhost:8888 āžœ Development environment & experimentation • šŸ”— **MCP Server**: http://localhost:8092 āžœ N8N integration with 7 AI tools šŸ› ļø **AVAILABLE MCP TOOLS:** 1ļøāƒ£ yolo_train - Train custom YOLO models 2ļøāƒ£ yolo_predict - Run inference on images/videos 3ļøāƒ£ list_model_results - Browse training outputs 4ļøāƒ£ get_training_metrics - Extract performance data 5ļøāƒ£ check_tensorboard_status - Monitor training progress 6ļøāƒ£ launch_streamlit_interface - Start web UI 7ļøāƒ£ get_system_info - System overview (this tool) šŸ’¾ **STORAGE PATHS:** • Models: /ultralytics/runs • Results: /ultralytics/runs/detect • Workspace: /workspace šŸŽÆ **QUICK ACCESS URLS:** • Visual AI Interface: http://localhost:8501 • Training Metrics: http://localhost:6006 • Development: http://localhost:8888 • N8N Automation: http://localhost:5678 ⚔ **SYSTEM STATUS:** • Python: ${systemData.system?.python_version?.split(' ')[0] || 'Active'} • PyTorch: ${systemData.system?.torch_version || 'Active'} • CUDA: ${systemData.system?.cuda_available ? 'āœ… Available' : 'āŒ Not Available'} • GPU Count: ${systemData.system?.gpu_count || 0} šŸ”„ **HYBRID WORKFLOW:** N8N (Automation) ↔ MCP Tools ↔ Streamlit (Visual Interface)`; return { content: [ { type: 'text', text: formattedInfo } ] }; } catch (error) { return { content: [ { type: 'text', text: `āŒ Failed to get system information: ${error.message}` } ], isError: true }; } } ); return server; }; // Create Express application const app = express(); app.use(express.json()); // Store transports by session ID const transports = {}; console.log('šŸš€ Starting Ultralytics MCP Server...'); //============================================================================= // SSE TRANSPORT - N8N COMPATIBLE //============================================================================= app.get('/sse', async (req, res) => { console.log('šŸ“” GET /sse - Establishing SSE connection'); try { const transport = new SSEServerTransport('/messages', res); transports[transport.sessionId] = transport; res.on("close", () => { console.log(`āŒ SSE connection closed for session ${transport.sessionId}`); delete transports[transport.sessionId]; }); const server = createUltralyticsServer(); await server.connect(transport); console.log(`āœ… SSE transport connected: ${transport.sessionId}`); } catch (error) { console.error('āŒ SSE connection error:', error); if (!res.headersSent) { res.status(500).send('Internal server error'); } } }); app.post("/messages", async (req, res) => { const sessionId = req.query.sessionId; console.log(`šŸ“” POST /messages: ${sessionId}`); try { const transport = transports[sessionId]; if (transport instanceof SSEServerTransport) { await transport.handlePostMessage(req, res, req.body); } else { res.status(400).json({ jsonrpc: '2.0', error: { code: -32000, message: 'No transport found for sessionId', }, id: null, }); } } catch (error) { console.error('āŒ POST message error:', error); if (!res.headersSent) { res.status(500).json({ jsonrpc: '2.0', error: { code: -32603, message: 'Internal server error', }, id: null, }); } } }); // Health check app.get('/health', (req, res) => { res.json({ status: 'healthy', server: 'ultralytics_mcp', version: '1.0.0', activeSessions: Object.keys(transports).length, timestamp: new Date().toISOString() }); }); // Start server const PORT = process.env.PORT || 8092; app.listen(PORT, '0.0.0.0', () => { console.log(` šŸŽÆ Ultralytics MCP Server Running on Port ${PORT} ================================================= šŸ“” SSE Endpoint: http://localhost:${PORT}/sse šŸ“Ø Messages: http://localhost:${PORT}/messages ā¤ļø Health: http://localhost:${PORT}/health āœ… N8N MCP Client Compatible ================================================= `); }); // Graceful shutdown process.on('SIGINT', async () => { console.log('\nšŸ›‘ Shutting down...'); for (const sessionId in transports) { try { await transports[sessionId].close(); delete transports[sessionId]; } catch (error) { console.error(`Error closing session ${sessionId}:`, error); } } console.log('āœ… Shutdown complete'); process.exit(0); });

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/MetehanYasar11/ultralytics_mcp_server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server