server.jsā¢28.4 kB
#!/usr/bin/env node
/**
* Ultralytics MCP Server with SSE Support - Docker Version
*/
const express = require('express');
const { randomUUID } = require('node:crypto');
const { exec } = require('child_process');
const util = require('util');
// Use Docker environment paths for MCP SDK
const { McpServer } = require('/app/node_modules/@modelcontextprotocol/sdk/dist/cjs/server/mcp.js');
const { SSEServerTransport } = require('/app/node_modules/@modelcontextprotocol/sdk/dist/cjs/server/sse.js');
const { z } = require('zod');
const execAsync = util.promisify(exec);
/**
* Execute Python code in Ultralytics container via Docker exec
*/
async function executeInUltralyticsContainer(pythonCode) {
try {
console.log('š Executing Python code in Ultralytics container...');
// Use base64 encoding to safely pass Python code to avoid shell escaping issues
const encodedCode = Buffer.from(pythonCode).toString('base64');
const dockerCommand = `docker exec ultralytics-container python3 -c "import base64; exec(base64.b64decode('${encodedCode}').decode('utf-8'))"`;
const { stdout, stderr } = await execAsync(dockerCommand);
if (stderr && !stderr.includes('WARNING')) {
console.error('šØ Python execution stderr:', stderr);
}
const result = stdout || stderr || 'Code executed successfully (no output)';
console.log('ā
Python execution completed');
return result;
} catch (error) {
console.error('ā Python execution failed:', error);
const errorMsg = error.stderr || error.message || 'Unknown execution error';
return `ā Python execution failed: ${errorMsg}`;
}
}
/**
* Create the MCP Server with Ultralytics tools
*/
const createUltralyticsServer = () => {
const server = new McpServer({
name: 'ultralytics_mcp',
version: '1.0.0',
}, {
capabilities: {
tools: {},
logging: {}
}
});
// Tool 1: Python Code Execution
server.tool(
'execute_python_code',
'Execute Python code with Ultralytics libraries',
{
code: z.string().describe('Python code to execute')
},
async ({ code }) => {
try {
console.log('š Executing Python code:', code.substring(0, 50) + '...');
const result = await executeInUltralyticsContainer(code);
return {
content: [
{
type: 'text',
text: `ā
Python code executed!\nResult:\n${result}`
}
]
};
} catch (error) {
return {
content: [
{
type: 'text',
text: `ā Python execution failed: ${error.message}`
}
],
isError: true
};
}
}
);
// Tool 2: YOLO Operations
server.tool(
'yolo_operation',
'Perform YOLO operations',
{
operation: z.enum(['detect', 'train', 'predict']).describe('YOLO operation'),
model: z.string().default('yolo11n.pt').describe('YOLO model'),
source: z.string().optional().describe('Input source (image/video path)')
},
async ({ operation, model, source }) => {
try {
console.log(`šÆ YOLO ${operation} with ${model}`);
// Build YOLO command
let yoloCommand = `from ultralytics import YOLO; model = YOLO('${model}'); `;
switch (operation) {
case 'detect':
case 'predict':
if (source) {
yoloCommand += `results = model.predict('${source}', save=True); print(f"Detection completed. Found {len(results)} result(s). Results saved to: {results[0].save_dir if hasattr(results[0], 'save_dir') else 'runs/detect/predict'}")`;
} else {
yoloCommand += `print(f"Model {model} loaded and ready for detection")`;
}
break;
case 'train':
if (source) {
yoloCommand += `
# Training without tensorboard parameter (not supported in this version)
results = model.train(
data='${source}',
epochs=3,
imgsz=640,
save=True,
plots=True,
project='/ultralytics/runs',
name='detect/train_tb'
);
print(f"Training completed! Results saved to: {results.save_dir if hasattr(results, 'save_dir') else 'runs/detect/train'}")`;
} else {
yoloCommand += `
# Training without tensorboard parameter (not supported in this version)
results = model.train(
data='coco8.yaml',
epochs=3,
imgsz=640,
save=True,
plots=True,
project='/ultralytics/runs',
name='detect/train_tb'
);
print(f"Training completed with coco8 dataset! Results saved to: {results.save_dir if hasattr(results, 'save_dir') else 'runs/detect/train'}")`;
}
break;
}
const result = await executeInUltralyticsContainer(yoloCommand);
return {
content: [
{
type: 'text',
text: `ā
YOLO ${operation} completed with ${model}!\nResult:\n${result}`
}
]
};
} catch (error) {
return {
content: [
{
type: 'text',
text: `ā YOLO operation failed: ${error.message}`
}
],
isError: true
};
}
}
);
// Tool 3: List Training Results
server.tool(
'list_training_results',
'List all available training and detection results',
{},
async () => {
try {
console.log('š Listing training results...');
const listCommand = `
import os
import json
from pathlib import Path
def scan_results():
results = {"training": [], "detection": []}
runs_path = Path("/ultralytics/runs")
if runs_path.exists():
# Scan training results
if (runs_path / "detect").exists():
for folder in (runs_path / "detect").iterdir():
if folder.is_dir() and folder.name.startswith("train"):
info = {"name": folder.name, "path": str(folder)}
# Check for results.csv
if (folder / "results.csv").exists():
info["has_metrics"] = True
# Read last line for final metrics
with open(folder / "results.csv", "r") as f:
lines = f.readlines()
if len(lines) > 1:
info["final_metrics"] = lines[-1].strip()
# Check for weights
weights_dir = folder / "weights"
if weights_dir.exists():
info["weights"] = [w.name for w in weights_dir.glob("*.pt")]
results["training"].append(info)
elif folder.name.startswith("predict") or folder.name.startswith("detect"):
results["detection"].append({
"name": folder.name,
"path": str(folder),
"images": len(list(folder.glob("*.jpg"))) + len(list(folder.glob("*.png")))
})
return results
results = scan_results()
print(json.dumps(results, indent=2))
`;
const result = await executeInUltralyticsContainer(listCommand);
return {
content: [
{
type: 'text',
text: `š Available Training & Detection Results:\n\n${result}`
}
]
};
} catch (error) {
return {
content: [
{
type: 'text',
text: `ā Failed to list results: ${error.message}`
}
],
isError: true
};
}
}
);
// Tool 4: Analyze Training Results
server.tool(
'analyze_training_results',
'Analyze specific training results with detailed metrics',
{
training_name: z.string().describe('Training folder name (e.g., train, train2)')
},
async ({ training_name }) => {
try {
console.log(`š Analyzing training results: ${training_name}`);
const analyzeCommand = `
import pandas as pd
import json
from pathlib import Path
def analyze_training(train_name):
base_path = Path(f"/ultralytics/runs/detect/{train_name}")
analysis = {"training_name": train_name, "found": False}
if not base_path.exists():
return analysis
analysis["found"] = True
analysis["path"] = str(base_path)
# Read results.csv for metrics
results_file = base_path / "results.csv"
if results_file.exists():
df = pd.read_csv(results_file)
analysis["epochs"] = len(df)
if len(df) > 0:
final_row = df.iloc[-1]
analysis["final_metrics"] = {
"epoch": int(final_row.get("epoch", 0)),
"train_box_loss": float(final_row.get("train/box_loss", 0)),
"train_cls_loss": float(final_row.get("train/cls_loss", 0)),
"train_dfl_loss": float(final_row.get("train/dfl_loss", 0)),
"val_box_loss": float(final_row.get("val/box_loss", 0)),
"val_cls_loss": float(final_row.get("val/cls_loss", 0)),
"val_dfl_loss": float(final_row.get("val/dfl_loss", 0)),
"precision": float(final_row.get("metrics/precision(B)", 0)),
"recall": float(final_row.get("metrics/recall(B)", 0)),
"mAP50": float(final_row.get("metrics/mAP50(B)", 0)),
"mAP50_95": float(final_row.get("metrics/mAP50-95(B)", 0))
}
# Calculate improvement
if len(df) > 1:
first_row = df.iloc[0]
analysis["improvement"] = {
"mAP50_change": float(final_row.get("metrics/mAP50(B)", 0)) - float(first_row.get("metrics/mAP50(B)", 0)),
"loss_reduction": float(first_row.get("train/box_loss", 0)) - float(final_row.get("train/box_loss", 0))
}
# Check available files
analysis["files"] = {
"weights": list(str(p.name) for p in (base_path / "weights").glob("*.pt")) if (base_path / "weights").exists() else [],
"plots": list(str(p.name) for p in base_path.glob("*.png")),
"has_confusion_matrix": (base_path / "confusion_matrix.png").exists(),
"has_results_plot": (base_path / "results.png").exists()
}
return analysis
result = analyze_training("${training_name}")
print(json.dumps(result, indent=2))
`;
const result = await executeInUltralyticsContainer(analyzeCommand);
return {
content: [
{
type: 'text',
text: `š Training Analysis for "${training_name}":\n\n${result}`
}
]
};
} catch (error) {
return {
content: [
{
type: 'text',
text: `ā Analysis failed: ${error.message}`
}
],
isError: true
};
}
}
);
// Tool 5: View TensorBoard
server.tool(
'view_tensorboard',
'Access native TensorBoard visualization (TensorBoard is automatically enabled)',
{
training_name: z.string().optional().describe('Specific training to focus on (optional)')
},
async ({ training_name }) => {
try {
console.log('š Accessing TensorBoard...');
let info = `š **Native TensorBoard is automatically enabled!**\n\n`;
info += `š **Access URL**: http://localhost:6006\n`;
info += `š **Log Directory**: /ultralytics/runs\n`;
info += `ā
**Status**: TensorBoard logs are automatically created during training\n\n`;
if (training_name) {
info += `šÆ **Focused on**: ${training_name}\n`;
info += `š **Specific logs**: /ultralytics/runs/detect/${training_name}\n\n`;
}
info += `š” **How it works**:\n`;
info += `⢠TensorBoard is enabled by default (tensorboard=True)\n`;
info += `⢠Logs are automatically created during training\n`;
info += `⢠Real-time metrics: loss, accuracy, mAP, learning rate\n`;
info += `⢠No manual setup required!\n\n`;
// Check if TensorBoard process is running
const checkCommand = `
import subprocess
import psutil
import os
from pathlib import Path
def check_tensorboard():
# Check if TensorBoard process is running
tb_running = False
for proc in psutil.process_iter(['pid', 'name', 'cmdline']):
try:
if 'tensorboard' in proc.info['name'].lower():
tb_running = True
break
except:
continue
# Check for event files
runs_path = Path("/ultralytics/runs")
event_files = []
if runs_path.exists():
event_files = list(runs_path.rglob("events.out.tfevents.*"))
print(f"TensorBoard Process: {'ā
Running' if tb_running else 'ā Not running'}")
print(f"Event Files Found: {len(event_files)}")
if event_files:
print("Recent training sessions with TensorBoard logs:")
for f in event_files[-5:]: # Show last 5
print(f" š {f.parent.name}: {f.name[:20]}...")
if not tb_running and event_files:
print("\\nš Starting TensorBoard server...")
cmd = ["tensorboard", "--logdir", "/ultralytics/runs", "--host", "0.0.0.0", "--port", "6006", "--bind_all"]
subprocess.Popen(cmd, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
print("ā
TensorBoard started at http://localhost:6006")
check_tensorboard()
`;
const result = await executeInUltralyticsContainer(checkCommand);
return {
content: [
{
type: 'text',
text: `${info}š **Current Status**:\n${result}`
}
]
};
} catch (error) {
return {
content: [
{
type: 'text',
text: `ā TensorBoard check failed: ${error.message}`
}
],
isError: true
};
}
}
);
// Tool 6: Launch Streamlit Interface
server.tool(
'launch_streamlit_interface',
'Launch Streamlit web interface for interactive YOLO inference',
{
model: z.string().optional().describe('YOLO model to use (default: yolo11n.pt)')
},
async ({ model = 'yolo11n.pt' }) => {
try {
console.log('š Launching Streamlit Interface...');
const launchCommand = `
import subprocess
import os
import sys
import time
from pathlib import Path
def launch_streamlit():
# Kill any existing Streamlit processes
try:
subprocess.run(["pkill", "-f", "streamlit"], check=False)
time.sleep(2)
except:
pass
# Create Streamlit app script
streamlit_script = '''
import sys
sys.path.append("/ultralytics")
sys.path.append("/workspace")
from enhanced_streamlit_inference import EnhancedInference
if __name__ == "__main__":
model = "${model}"
app = EnhancedInference(model=model)
app.inference()
'''
# Write the script to a file
with open("/workspace/streamlit_app.py", "w") as f:
f.write(streamlit_script)
# Launch Enhanced Streamlit
try:
cmd = [
sys.executable, "-m", "streamlit", "run",
"/workspace/enhanced_streamlit_inference.py",
"--server.address", "0.0.0.0",
"--server.port", "8501",
"--server.headless", "true",
"--server.enableCORS", "false",
"--server.enableXsrfProtection", "false",
"--server.maxUploadSize", "200"
]
process = subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
cwd="/ultralytics"
)
time.sleep(5) # Wait for startup
print("ā
Enhanced Streamlit Interface launched successfully!")
print(f"š Access URL: http://localhost:8501")
print(f"š± Enhanced Features available:")
print(f" ⢠š·ļø Pre-trained YOLO models")
print(f" ⢠šÆ Custom trained models from your runs")
print(f" ⢠š¹ Webcam, video & image inference")
print(f" ⢠š Real-time object tracking")
print(f" ⢠šļø Advanced threshold controls")
print(f" ⢠šØ Enhanced UI with model details")
print(f" ⢠š Detection statistics")
print(f"š Process ID: {process.pid}")
print(f"š¦ Default Model: {model}")
print(f"šÆ Custom Models: Auto-scanned from /ultralytics/runs")
return {"success": True, "port": 8501, "pid": process.pid}
except Exception as e:
print(f"ā Failed to launch Streamlit: {e}")
return {"error": str(e)}
result = launch_streamlit()
`;
const result = await executeInUltralyticsContainer(launchCommand);
return {
content: [
{
type: 'text',
text: `š **Enhanced Streamlit Interface Status:**\n\n${result}\n\nšÆ **Enhanced Features:**\n⢠š·ļø **Pre-trained Models**: Official YOLO models (COCO dataset)\n⢠šÆ **Custom Models**: Auto-scanned from your training runs\n⢠š¹ **Multi-Source**: Webcam, video files, and images\n⢠šļø **Advanced Controls**: Confidence, IoU, class selection\n⢠š **Object Tracking**: Multi-object tracking support\n⢠š **Statistics**: Real-time detection metrics\n⢠šØ **Enhanced UI**: Modern interface with model details\n\nš **Quick Start:**\n1. Open http://localhost:8501\n2. Choose Model Category (Pre-trained or Custom)\n3. Select your model\n4. Choose source and upload files\n5. Adjust settings and click "Start Inference"\n\nš” **Custom Models**: Automatically scans /ultralytics/runs for your trained models!`
}
]
};
} catch (error) {
return {
content: [
{
type: 'text',
text: `ā Failed to launch Streamlit interface: ${error.message}`
}
],
isError: true
};
}
}
);
// Tool 7: System Information & Interface URLs
server.tool(
'get_system_info',
'Get comprehensive system information including all available interfaces and their URLs',
{},
async () => {
try {
console.log('š Getting system information...');
const systemInfoCommand = `
import json
import subprocess
import psutil
import torch
from pathlib import Path
import sys
def get_system_info():
info = {
"system": {
"python_version": sys.version,
"torch_version": torch.__version__,
"cuda_available": torch.cuda.is_available(),
"gpu_count": torch.cuda.device_count() if torch.cuda.is_available() else 0
},
"interfaces": {
"streamlit": {
"url": "http://localhost:8501",
"status": "active",
"description": "Interactive AI inference interface with webcam, video, and image upload support",
"features": [
"Real-time object detection",
"Webcam inference",
"File upload (images/videos)",
"Confidence & IoU threshold adjustment",
"Object tracking",
"Side-by-side comparison",
"Multiple model support"
]
},
"tensorboard": {
"url": "http://localhost:6006",
"status": "active",
"description": "Training metrics visualization and monitoring dashboard",
"features": [
"Training loss curves",
"Validation metrics",
"Model architecture visualization",
"Real-time training progress",
"Hyperparameter tracking",
"Scalar metrics plotting"
]
},
"jupyter": {
"url": "http://localhost:8888",
"status": "active",
"description": "Development environment for AI experimentation",
"features": [
"Interactive notebooks",
"Code development",
"Data exploration",
"Model prototyping",
"Visualization tools"
]
},
"mcp_server": {
"url": "http://localhost:8092",
"status": "active",
"description": "Model Context Protocol server for N8N integration",
"features": [
"7 AI tools available",
"Training automation",
"Prediction workflows",
"Results management",
"Interface launching",
"Metrics monitoring"
]
}
},
"tools_available": [
"yolo_train - Train custom YOLO models",
"yolo_predict - Run inference on images/videos",
"list_model_results - Browse training outputs",
"get_training_metrics - Extract performance data",
"check_tensorboard_status - Monitor training progress",
"launch_streamlit_interface - Start web UI",
"get_system_info - This tool for system overview"
],
"storage": {
"models_path": "/ultralytics/runs",
"results_path": "/ultralytics/runs/detect",
"training_path": "/ultralytics/runs/detect/train*",
"workspace": "/workspace"
},
"quick_access": {
"streamlit_ui": "http://localhost:8501 - Visual AI Interface",
"tensorboard": "http://localhost:6006 - Training Metrics",
"jupyter": "http://localhost:8888 - Development",
"n8n_workflows": "http://localhost:5678 - Automation"
}
}
# Check actual process status
try:
# Check Streamlit
streamlit_proc = subprocess.run(["pgrep", "-f", "streamlit"], capture_output=True)
info["interfaces"]["streamlit"]["process_active"] = streamlit_proc.returncode == 0
# Check TensorBoard
tensorboard_proc = subprocess.run(["pgrep", "-f", "tensorboard"], capture_output=True)
info["interfaces"]["tensorboard"]["process_active"] = tensorboard_proc.returncode == 0
except Exception as e:
info["process_check_error"] = str(e)
return info
result = get_system_info()
print(json.dumps(result, indent=2))
`;
const result = await executeInUltralyticsContainer(systemInfoCommand);
const systemData = JSON.parse(result);
const formattedInfo = `š„ļø **ULTRALYTICS AI SYSTEM OVERVIEW**
=============================================
š **ACTIVE INTERFACES:**
⢠š **Streamlit UI**: http://localhost:8501
ā Interactive AI inference with webcam & file upload
⢠š **TensorBoard**: http://localhost:6006
ā Training metrics visualization & monitoring
⢠š **Jupyter Lab**: http://localhost:8888
ā Development environment & experimentation
⢠š **MCP Server**: http://localhost:8092
ā N8N integration with 7 AI tools
š ļø **AVAILABLE MCP TOOLS:**
1ļøā£ yolo_train - Train custom YOLO models
2ļøā£ yolo_predict - Run inference on images/videos
3ļøā£ list_model_results - Browse training outputs
4ļøā£ get_training_metrics - Extract performance data
5ļøā£ check_tensorboard_status - Monitor training progress
6ļøā£ launch_streamlit_interface - Start web UI
7ļøā£ get_system_info - System overview (this tool)
š¾ **STORAGE PATHS:**
⢠Models: /ultralytics/runs
⢠Results: /ultralytics/runs/detect
⢠Workspace: /workspace
šÆ **QUICK ACCESS URLS:**
⢠Visual AI Interface: http://localhost:8501
⢠Training Metrics: http://localhost:6006
⢠Development: http://localhost:8888
⢠N8N Automation: http://localhost:5678
ā” **SYSTEM STATUS:**
⢠Python: ${systemData.system?.python_version?.split(' ')[0] || 'Active'}
⢠PyTorch: ${systemData.system?.torch_version || 'Active'}
⢠CUDA: ${systemData.system?.cuda_available ? 'ā
Available' : 'ā Not Available'}
⢠GPU Count: ${systemData.system?.gpu_count || 0}
š„ **HYBRID WORKFLOW:**
N8N (Automation) ā MCP Tools ā Streamlit (Visual Interface)`;
return {
content: [
{
type: 'text',
text: formattedInfo
}
]
};
} catch (error) {
return {
content: [
{
type: 'text',
text: `ā Failed to get system information: ${error.message}`
}
],
isError: true
};
}
}
);
return server;
};
// Create Express application
const app = express();
app.use(express.json());
// Store transports by session ID
const transports = {};
console.log('š Starting Ultralytics MCP Server...');
//=============================================================================
// SSE TRANSPORT - N8N COMPATIBLE
//=============================================================================
app.get('/sse', async (req, res) => {
console.log('š” GET /sse - Establishing SSE connection');
try {
const transport = new SSEServerTransport('/messages', res);
transports[transport.sessionId] = transport;
res.on("close", () => {
console.log(`ā SSE connection closed for session ${transport.sessionId}`);
delete transports[transport.sessionId];
});
const server = createUltralyticsServer();
await server.connect(transport);
console.log(`ā
SSE transport connected: ${transport.sessionId}`);
} catch (error) {
console.error('ā SSE connection error:', error);
if (!res.headersSent) {
res.status(500).send('Internal server error');
}
}
});
app.post("/messages", async (req, res) => {
const sessionId = req.query.sessionId;
console.log(`š” POST /messages: ${sessionId}`);
try {
const transport = transports[sessionId];
if (transport instanceof SSEServerTransport) {
await transport.handlePostMessage(req, res, req.body);
} else {
res.status(400).json({
jsonrpc: '2.0',
error: {
code: -32000,
message: 'No transport found for sessionId',
},
id: null,
});
}
} catch (error) {
console.error('ā POST message error:', error);
if (!res.headersSent) {
res.status(500).json({
jsonrpc: '2.0',
error: {
code: -32603,
message: 'Internal server error',
},
id: null,
});
}
}
});
// Health check
app.get('/health', (req, res) => {
res.json({
status: 'healthy',
server: 'ultralytics_mcp',
version: '1.0.0',
activeSessions: Object.keys(transports).length,
timestamp: new Date().toISOString()
});
});
// Start server
const PORT = process.env.PORT || 8092;
app.listen(PORT, '0.0.0.0', () => {
console.log(`
šÆ Ultralytics MCP Server Running on Port ${PORT}
=================================================
š” SSE Endpoint: http://localhost:${PORT}/sse
šØ Messages: http://localhost:${PORT}/messages
ā¤ļø Health: http://localhost:${PORT}/health
ā
N8N MCP Client Compatible
=================================================
`);
});
// Graceful shutdown
process.on('SIGINT', async () => {
console.log('\nš Shutting down...');
for (const sessionId in transports) {
try {
await transports[sessionId].close();
delete transports[sessionId];
} catch (error) {
console.error(`Error closing session ${sessionId}:`, error);
}
}
console.log('ā
Shutdown complete');
process.exit(0);
});