YOLO MCP Server
by GongRzhe
Verified
# server.py - CLI version
import fnmatch
import os
import base64
import cv2
import time
import threading
import subprocess
import json
import tempfile
import platform
from io import BytesIO
from typing import List, Dict, Any, Optional, Union
import numpy as np
from PIL import Image
from mcp.server.fastmcp import FastMCP
# Set up logging configuration
import os.path
import sys
import logging
import contextlib
import signal
import atexit
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler("yolo_service.log"),
logging.StreamHandler(sys.stderr)
]
)
camera_startup_status = None # Will store error details if startup fails
camera_last_error = None
logger = logging.getLogger('yolo_service')
# Global variables for camera control
camera_running = False
camera_thread = None
detection_results = []
camera_last_access_time = 0
CAMERA_INACTIVITY_TIMEOUT = 60 # Auto-shutdown after 60 seconds of inactivity
def camera_watchdog_thread():
"""Monitor thread that auto-stops the camera after inactivity"""
global camera_running, camera_last_access_time
logger.info("Camera watchdog thread started")
while True:
# Sleep for a short time to avoid excessive CPU usage
time.sleep(5)
# Check if camera is running
if camera_running:
current_time = time.time()
elapsed_time = current_time - camera_last_access_time
# If no access for more than the timeout, auto-stop
if elapsed_time > CAMERA_INACTIVITY_TIMEOUT:
logger.info(f"Auto-stopping camera after {elapsed_time:.1f} seconds of inactivity")
stop_camera_detection()
else:
# If camera is not running, no need to check frequently
time.sleep(10)
def load_image(image_source, is_path=False):
"""
Load image from file path or base64 data
Args:
image_source: File path or base64 encoded image data
is_path: Whether image_source is a file path
Returns:
PIL Image object
"""
try:
if is_path:
# Load image from file path
if os.path.exists(image_source):
return Image.open(image_source)
else:
raise FileNotFoundError(f"Image file not found: {image_source}")
else:
# Load image from base64 data
image_bytes = base64.b64decode(image_source)
return Image.open(BytesIO(image_bytes))
except Exception as e:
raise ValueError(f"Failed to load image: {str(e)}")
# New function to run YOLO CLI commands
def run_yolo_cli(command_args, capture_output=True, timeout=60):
"""
Run YOLO CLI command and return the results
Args:
command_args: List of command arguments to pass to yolo CLI
capture_output: Whether to capture and return command output
timeout: Command timeout in seconds
Returns:
Command output or success status
"""
# Build the complete command
cmd = ["yolo"] + command_args
# Log the command
logger.info(f"Running YOLO CLI command: {' '.join(cmd)}")
try:
# Run the command
result = subprocess.run(
cmd,
capture_output=capture_output,
text=True,
check=False, # Don't raise exception on non-zero exit
timeout=timeout
)
# Check for errors
if result.returncode != 0:
logger.error(f"YOLO CLI command failed with code {result.returncode}")
logger.error(f"stderr: {result.stderr}")
return {
"success": False,
"error": result.stderr,
"command": " ".join(cmd),
"returncode": result.returncode
}
# Return the result
if capture_output:
return {
"success": True,
"stdout": result.stdout,
"stderr": result.stderr,
"command": " ".join(cmd)
}
else:
return {"success": True, "command": " ".join(cmd)}
except subprocess.TimeoutExpired:
logger.error(f"YOLO CLI command timed out after {timeout} seconds")
return {
"success": False,
"error": f"Command timed out after {timeout} seconds",
"command": " ".join(cmd)
}
except Exception as e:
logger.error(f"Error running YOLO CLI command: {str(e)}")
return {
"success": False,
"error": str(e),
"command": " ".join(cmd)
}
# Create MCP server
mcp = FastMCP("YOLO_Service")
# Global configuration
CONFIG = {
"model_dirs": [
".", # Current directory
"./models", # Models subdirectory
os.path.join(os.path.dirname(os.path.abspath(__file__)), "models"),
]
}
# Function to save base64 data to temp file
def save_base64_to_temp(base64_data, prefix="image", suffix=".jpg"):
"""Save base64 encoded data to a temporary file and return the path"""
try:
# Create a temporary file
fd, temp_path = tempfile.mkstemp(suffix=suffix, prefix=prefix)
# Decode base64 data
image_data = base64.b64decode(base64_data)
# Write data to file
with os.fdopen(fd, 'wb') as temp_file:
temp_file.write(image_data)
return temp_path
except Exception as e:
logger.error(f"Error saving base64 to temp file: {str(e)}")
raise ValueError(f"Failed to save base64 data: {str(e)}")
@mcp.tool()
def get_model_directories() -> Dict[str, Any]:
"""Get information about configured model directories and available models"""
directories = []
for directory in CONFIG["model_dirs"]:
dir_info = {
"path": directory,
"exists": os.path.exists(directory),
"is_directory": os.path.isdir(directory) if os.path.exists(directory) else False,
"models": []
}
if dir_info["exists"] and dir_info["is_directory"]:
for filename in os.listdir(directory):
if filename.endswith(".pt"):
dir_info["models"].append(filename)
directories.append(dir_info)
return {
"configured_directories": CONFIG["model_dirs"],
"directory_details": directories,
"available_models": list_available_models(),
"loaded_models": [] # No longer track loaded models with CLI approach
}
@mcp.tool()
def detect_objects(
image_data: str,
model_name: str = "yolov8n.pt",
confidence: float = 0.25,
save_results: bool = False,
is_path: bool = False
) -> Dict[str, Any]:
"""
Detect objects in an image using YOLO CLI
Args:
image_data: Base64 encoded image or file path (if is_path=True)
model_name: YOLO model name
confidence: Detection confidence threshold
save_results: Whether to save results to disk
is_path: Whether image_data is a file path
Returns:
Dictionary containing detection results
"""
try:
# Determine source path
if is_path:
source_path = image_data
if not os.path.exists(source_path):
return {
"error": f"Image file not found: {source_path}",
"source": source_path
}
else:
# Save base64 data to temp file
source_path = save_base64_to_temp(image_data)
# Determine full model path
model_path = None
for directory in CONFIG["model_dirs"]:
potential_path = os.path.join(directory, model_name)
if os.path.exists(potential_path):
model_path = potential_path
break
if model_path is None:
available = list_available_models()
available_str = ", ".join(available) if available else "none"
return {
"error": f"Model '{model_name}' not found in any configured directories. Available models: {available_str}",
"source": image_data if is_path else "base64_image"
}
# Setup output directory if saving results
output_dir = os.path.join(tempfile.gettempdir(), "yolo_results")
if save_results and not os.path.exists(output_dir):
os.makedirs(output_dir)
# Build YOLO CLI command
cmd_args = [
"detect", # Task
"predict", # Mode
f"model={model_path}",
f"source={source_path}",
f"conf={confidence}",
"format=json", # Request JSON output for parsing
]
if save_results:
cmd_args.append(f"project={output_dir}")
cmd_args.append("save=True")
else:
cmd_args.append("save=False")
# Run YOLO CLI command
result = run_yolo_cli(cmd_args)
# Clean up temp file if we created one
if not is_path:
try:
os.remove(source_path)
except Exception as e:
logger.warning(f"Failed to clean up temp file {source_path}: {str(e)}")
# Check for command success
if not result["success"]:
return {
"error": f"YOLO CLI command failed: {result.get('error', 'Unknown error')}",
"command": result.get("command", ""),
"source": image_data if is_path else "base64_image"
}
# Parse JSON output from stdout
try:
# Try to find JSON in the output
json_start = result["stdout"].find("{")
json_end = result["stdout"].rfind("}")
if json_start >= 0 and json_end > json_start:
json_str = result["stdout"][json_start:json_end+1]
detection_data = json.loads(json_str)
else:
# If no JSON found, create a basic response with info from stderr
return {
"results": [],
"model_used": model_name,
"total_detections": 0,
"source": image_data if is_path else "base64_image",
"command_output": result["stderr"]
}
# Format results
formatted_results = []
# Parse detection data from YOLO JSON output
if "predictions" in detection_data:
detections = []
for pred in detection_data["predictions"]:
# Extract box coordinates
box = pred.get("box", {})
x1, y1, x2, y2 = box.get("x1", 0), box.get("y1", 0), box.get("x2", 0), box.get("y2", 0)
# Extract class information
confidence = pred.get("confidence", 0)
class_name = pred.get("name", "unknown")
class_id = pred.get("class", -1)
detections.append({
"box": [x1, y1, x2, y2],
"confidence": confidence,
"class_id": class_id,
"class_name": class_name
})
# Get image dimensions if available
image_shape = [
detection_data.get("width", 0),
detection_data.get("height", 0)
]
formatted_results.append({
"detections": detections,
"image_shape": image_shape
})
return {
"results": formatted_results,
"model_used": model_name,
"total_detections": sum(len(r["detections"]) for r in formatted_results),
"source": image_data if is_path else "base64_image",
"save_dir": output_dir if save_results else None
}
except json.JSONDecodeError as e:
logger.error(f"Failed to parse JSON from YOLO output: {e}")
logger.error(f"Output: {result['stdout']}")
return {
"error": f"Failed to parse YOLO results: {str(e)}",
"command": result.get("command", ""),
"source": image_data if is_path else "base64_image",
"stdout": result.get("stdout", ""),
"stderr": result.get("stderr", "")
}
except Exception as e:
logger.error(f"Error in detect_objects: {str(e)}")
return {
"error": f"Failed to detect objects: {str(e)}",
"source": image_data if is_path else "base64_image"
}
@mcp.tool()
def segment_objects(
image_data: str,
model_name: str = "yolov11n-seg.pt",
confidence: float = 0.25,
save_results: bool = False,
is_path: bool = False
) -> Dict[str, Any]:
"""
Perform instance segmentation on an image using YOLO CLI
Args:
image_data: Base64 encoded image or file path (if is_path=True)
model_name: YOLO segmentation model name
confidence: Detection confidence threshold
save_results: Whether to save results to disk
is_path: Whether image_data is a file path
Returns:
Dictionary containing segmentation results
"""
try:
# Determine source path
if is_path:
source_path = image_data
if not os.path.exists(source_path):
return {
"error": f"Image file not found: {source_path}",
"source": source_path
}
else:
# Save base64 data to temp file
source_path = save_base64_to_temp(image_data)
# Determine full model path
model_path = None
for directory in CONFIG["model_dirs"]:
potential_path = os.path.join(directory, model_name)
if os.path.exists(potential_path):
model_path = potential_path
break
if model_path is None:
available = list_available_models()
available_str = ", ".join(available) if available else "none"
return {
"error": f"Model '{model_name}' not found in any configured directories. Available models: {available_str}",
"source": image_data if is_path else "base64_image"
}
# Setup output directory if saving results
output_dir = os.path.join(tempfile.gettempdir(), "yolo_results")
if save_results and not os.path.exists(output_dir):
os.makedirs(output_dir)
# Build YOLO CLI command
cmd_args = [
"segment", # Task
"predict", # Mode
f"model={model_path}",
f"source={source_path}",
f"conf={confidence}",
"format=json", # Request JSON output for parsing
]
if save_results:
cmd_args.append(f"project={output_dir}")
cmd_args.append("save=True")
else:
cmd_args.append("save=False")
# Run YOLO CLI command
result = run_yolo_cli(cmd_args)
# Clean up temp file if we created one
if not is_path:
try:
os.remove(source_path)
except Exception as e:
logger.warning(f"Failed to clean up temp file {source_path}: {str(e)}")
# Check for command success
if not result["success"]:
return {
"error": f"YOLO CLI command failed: {result.get('error', 'Unknown error')}",
"command": result.get("command", ""),
"source": image_data if is_path else "base64_image"
}
# Parse JSON output from stdout
try:
# Try to find JSON in the output
json_start = result["stdout"].find("{")
json_end = result["stdout"].rfind("}")
if json_start >= 0 and json_end > json_start:
json_str = result["stdout"][json_start:json_end+1]
segmentation_data = json.loads(json_str)
else:
# If no JSON found, create a basic response with info from stderr
return {
"results": [],
"model_used": model_name,
"total_segments": 0,
"source": image_data if is_path else "base64_image",
"command_output": result["stderr"]
}
# Format results
formatted_results = []
# Parse segmentation data from YOLO JSON output
if "predictions" in segmentation_data:
segments = []
for pred in segmentation_data["predictions"]:
# Extract box coordinates
box = pred.get("box", {})
x1, y1, x2, y2 = box.get("x1", 0), box.get("y1", 0), box.get("x2", 0), box.get("y2", 0)
# Extract class information
confidence = pred.get("confidence", 0)
class_name = pred.get("name", "unknown")
class_id = pred.get("class", -1)
segment = {
"box": [x1, y1, x2, y2],
"confidence": confidence,
"class_id": class_id,
"class_name": class_name
}
# Extract mask if available
if "mask" in pred:
segment["mask"] = pred["mask"]
segments.append(segment)
# Get image dimensions if available
image_shape = [
segmentation_data.get("width", 0),
segmentation_data.get("height", 0)
]
formatted_results.append({
"segments": segments,
"image_shape": image_shape
})
return {
"results": formatted_results,
"model_used": model_name,
"total_segments": sum(len(r["segments"]) for r in formatted_results),
"source": image_data if is_path else "base64_image",
"save_dir": output_dir if save_results else None
}
except json.JSONDecodeError as e:
logger.error(f"Failed to parse JSON from YOLO output: {e}")
logger.error(f"Output: {result['stdout']}")
return {
"error": f"Failed to parse YOLO results: {str(e)}",
"command": result.get("command", ""),
"source": image_data if is_path else "base64_image",
"stdout": result.get("stdout", ""),
"stderr": result.get("stderr", "")
}
except Exception as e:
logger.error(f"Error in segment_objects: {str(e)}")
return {
"error": f"Failed to segment objects: {str(e)}",
"source": image_data if is_path else "base64_image"
}
@mcp.tool()
def classify_image(
image_data: str,
model_name: str = "yolov11n-cls.pt",
top_k: int = 5,
save_results: bool = False,
is_path: bool = False
) -> Dict[str, Any]:
"""
Classify an image using YOLO classification model via CLI
Args:
image_data: Base64 encoded image or file path (if is_path=True)
model_name: YOLO classification model name
top_k: Number of top categories to return
save_results: Whether to save results to disk
is_path: Whether image_data is a file path
Returns:
Dictionary containing classification results
"""
try:
# Determine source path
if is_path:
source_path = image_data
if not os.path.exists(source_path):
return {
"error": f"Image file not found: {source_path}",
"source": source_path
}
else:
# Save base64 data to temp file
source_path = save_base64_to_temp(image_data)
# Determine full model path
model_path = None
for directory in CONFIG["model_dirs"]:
potential_path = os.path.join(directory, model_name)
if os.path.exists(potential_path):
model_path = potential_path
break
if model_path is None:
available = list_available_models()
available_str = ", ".join(available) if available else "none"
return {
"error": f"Model '{model_name}' not found in any configured directories. Available models: {available_str}",
"source": image_data if is_path else "base64_image"
}
# Setup output directory if saving results
output_dir = os.path.join(tempfile.gettempdir(), "yolo_results")
if save_results and not os.path.exists(output_dir):
os.makedirs(output_dir)
# Build YOLO CLI command
cmd_args = [
"classify", # Task
"predict", # Mode
f"model={model_path}",
f"source={source_path}",
"format=json", # Request JSON output for parsing
]
if save_results:
cmd_args.append(f"project={output_dir}")
cmd_args.append("save=True")
else:
cmd_args.append("save=False")
# Run YOLO CLI command
result = run_yolo_cli(cmd_args)
# Clean up temp file if we created one
if not is_path:
try:
os.remove(source_path)
except Exception as e:
logger.warning(f"Failed to clean up temp file {source_path}: {str(e)}")
# Check for command success
if not result["success"]:
return {
"error": f"YOLO CLI command failed: {result.get('error', 'Unknown error')}",
"command": result.get("command", ""),
"source": image_data if is_path else "base64_image"
}
# Parse JSON output from stdout
try:
# Try to find JSON in the output
json_start = result["stdout"].find("{")
json_end = result["stdout"].rfind("}")
if json_start >= 0 and json_end > json_start:
json_str = result["stdout"][json_start:json_end+1]
classification_data = json.loads(json_str)
else:
# If no JSON found, create a basic response with info from stderr
return {
"results": [],
"model_used": model_name,
"top_k": top_k,
"source": image_data if is_path else "base64_image",
"command_output": result["stderr"]
}
# Format results
formatted_results = []
# Parse classification data from YOLO JSON output
if "predictions" in classification_data:
classifications = []
predictions = classification_data["predictions"]
# Predictions could be an array of classifications
for i, pred in enumerate(predictions[:top_k]):
class_name = pred.get("name", f"class_{i}")
confidence = pred.get("confidence", 0)
classifications.append({
"class_id": i,
"class_name": class_name,
"probability": confidence
})
# Get image dimensions if available
image_shape = [
classification_data.get("width", 0),
classification_data.get("height", 0)
]
formatted_results.append({
"classifications": classifications,
"image_shape": image_shape
})
return {
"results": formatted_results,
"model_used": model_name,
"top_k": top_k,
"source": image_data if is_path else "base64_image",
"save_dir": output_dir if save_results else None
}
except json.JSONDecodeError as e:
logger.error(f"Failed to parse JSON from YOLO output: {e}")
logger.error(f"Output: {result['stdout']}")
return {
"error": f"Failed to parse YOLO results: {str(e)}",
"command": result.get("command", ""),
"source": image_data if is_path else "base64_image",
"stdout": result.get("stdout", ""),
"stderr": result.get("stderr", "")
}
except Exception as e:
logger.error(f"Error in classify_image: {str(e)}")
return {
"error": f"Failed to classify image: {str(e)}",
"source": image_data if is_path else "base64_image"
}
@mcp.tool()
def track_objects(
image_data: str,
model_name: str = "yolov8n.pt",
confidence: float = 0.25,
tracker: str = "bytetrack.yaml",
save_results: bool = False
) -> Dict[str, Any]:
"""
Track objects in an image sequence using YOLO CLI
Args:
image_data: Base64 encoded image
model_name: YOLO model name
confidence: Detection confidence threshold
tracker: Tracker name to use (e.g., 'bytetrack.yaml', 'botsort.yaml')
save_results: Whether to save results to disk
Returns:
Dictionary containing tracking results
"""
try:
# Save base64 data to temp file
source_path = save_base64_to_temp(image_data)
# Determine full model path
model_path = None
for directory in CONFIG["model_dirs"]:
potential_path = os.path.join(directory, model_name)
if os.path.exists(potential_path):
model_path = potential_path
break
if model_path is None:
available = list_available_models()
available_str = ", ".join(available) if available else "none"
return {
"error": f"Model '{model_name}' not found in any configured directories. Available models: {available_str}"
}
# Setup output directory if saving results
output_dir = os.path.join(tempfile.gettempdir(), "yolo_track_results")
if save_results and not os.path.exists(output_dir):
os.makedirs(output_dir)
# Build YOLO CLI command
cmd_args = [
"track", # Combined task and mode for tracking
f"model={model_path}",
f"source={source_path}",
f"conf={confidence}",
f"tracker={tracker}",
"format=json", # Request JSON output for parsing
]
if save_results:
cmd_args.append(f"project={output_dir}")
cmd_args.append("save=True")
else:
cmd_args.append("save=False")
# Run YOLO CLI command
result = run_yolo_cli(cmd_args)
# Clean up temp file
try:
os.remove(source_path)
except Exception as e:
logger.warning(f"Failed to clean up temp file {source_path}: {str(e)}")
# Check for command success
if not result["success"]:
return {
"error": f"YOLO CLI command failed: {result.get('error', 'Unknown error')}",
"command": result.get("command", ""),
}
# Parse JSON output from stdout
try:
# Try to find JSON in the output
json_start = result["stdout"].find("{")
json_end = result["stdout"].rfind("}")
if json_start >= 0 and json_end > json_start:
json_str = result["stdout"][json_start:json_end+1]
tracking_data = json.loads(json_str)
else:
# If no JSON found, create a basic response
return {
"results": [],
"model_used": model_name,
"tracker": tracker,
"total_tracks": 0,
"command_output": result["stderr"]
}
# Format results
formatted_results = []
# Parse tracking data from YOLO JSON output
if "predictions" in tracking_data:
tracks = []
for pred in tracking_data["predictions"]:
# Extract box coordinates
box = pred.get("box", {})
x1, y1, x2, y2 = box.get("x1", 0), box.get("y1", 0), box.get("x2", 0), box.get("y2", 0)
# Extract class and tracking information
confidence = pred.get("confidence", 0)
class_name = pred.get("name", "unknown")
class_id = pred.get("class", -1)
track_id = pred.get("id", -1)
track = {
"box": [x1, y1, x2, y2],
"confidence": confidence,
"class_id": class_id,
"class_name": class_name,
"track_id": track_id
}
tracks.append(track)
# Get image dimensions if available
image_shape = [
tracking_data.get("width", 0),
tracking_data.get("height", 0)
]
formatted_results.append({
"tracks": tracks,
"image_shape": image_shape
})
return {
"results": formatted_results,
"model_used": model_name,
"tracker": tracker,
"total_tracks": sum(len(r["tracks"]) for r in formatted_results),
"save_dir": output_dir if save_results else None
}
except json.JSONDecodeError as e:
logger.error(f"Failed to parse JSON from YOLO output: {e}")
logger.error(f"Output: {result['stdout']}")
return {
"error": f"Failed to parse YOLO results: {str(e)}",
"command": result.get("command", ""),
"stdout": result.get("stdout", ""),
"stderr": result.get("stderr", "")
}
except Exception as e:
logger.error(f"Error in track_objects: {str(e)}")
return {
"error": f"Failed to track objects: {str(e)}"
}
@mcp.tool()
def train_model(
dataset_path: str,
model_name: str = "yolov8n.pt",
epochs: int = 100,
imgsz: int = 640,
batch: int = 16,
name: str = "yolo_custom_model",
project: str = "runs/train"
) -> Dict[str, Any]:
"""
Train a YOLO model on a custom dataset using CLI
Args:
dataset_path: Path to YOLO format dataset
model_name: Base model to start with
epochs: Number of training epochs
imgsz: Image size for training
batch: Batch size
name: Name for the training run
project: Project directory
Returns:
Dictionary containing training results
"""
# Validate dataset path
if not os.path.exists(dataset_path):
return {"error": f"Dataset not found: {dataset_path}"}
# Determine full model path
model_path = None
for directory in CONFIG["model_dirs"]:
potential_path = os.path.join(directory, model_name)
if os.path.exists(potential_path):
model_path = potential_path
break
if model_path is None:
available = list_available_models()
available_str = ", ".join(available) if available else "none"
return {
"error": f"Model '{model_name}' not found in any configured directories. Available models: {available_str}"
}
# Create project directory if it doesn't exist
if not os.path.exists(project):
os.makedirs(project)
# Determine task type based on model name
task = "detect" # Default task
if "seg" in model_name:
task = "segment"
elif "pose" in model_name:
task = "pose"
elif "cls" in model_name:
task = "classify"
elif "obb" in model_name:
task = "obb"
# Build YOLO CLI command
cmd_args = [
task, # Task
"train", # Mode
f"model={model_path}",
f"data={dataset_path}",
f"epochs={epochs}",
f"imgsz={imgsz}",
f"batch={batch}",
f"name={name}",
f"project={project}"
]
# Run YOLO CLI command - with longer timeout
logger.info(f"Starting model training with {epochs} epochs - this may take a while...")
result = run_yolo_cli(cmd_args, timeout=epochs * 300) # 5 minutes per epoch
# Check for command success
if not result["success"]:
return {
"error": f"Training failed: {result.get('error', 'Unknown error')}",
"command": result.get("command", ""),
"stderr": result.get("stderr", "")
}
# Determine path to best model weights
best_model_path = os.path.join(project, name, "weights", "best.pt")
# Determine metrics from stdout if possible
metrics = {}
try:
# Look for metrics in output
stdout = result.get("stdout", "")
# Extract metrics from training output
import re
precision_match = re.search(r"Precision: ([\d\.]+)", stdout)
recall_match = re.search(r"Recall: ([\d\.]+)", stdout)
map50_match = re.search(r"mAP50: ([\d\.]+)", stdout)
map_match = re.search(r"mAP50-95: ([\d\.]+)", stdout)
if precision_match:
metrics["precision"] = float(precision_match.group(1))
if recall_match:
metrics["recall"] = float(recall_match.group(1))
if map50_match:
metrics["mAP50"] = float(map50_match.group(1))
if map_match:
metrics["mAP50-95"] = float(map_match.group(1))
except Exception as e:
logger.warning(f"Failed to parse metrics from training output: {str(e)}")
return {
"status": "success",
"model_path": best_model_path,
"epochs_completed": epochs,
"final_metrics": metrics,
"training_log_sample": result.get("stdout", "")[:1000] + "..." if len(result.get("stdout", "")) > 1000 else result.get("stdout", "")
}
@mcp.tool()
def validate_model(
model_path: str,
data_path: str,
imgsz: int = 640,
batch: int = 16
) -> Dict[str, Any]:
"""
Validate a YOLO model on a dataset using CLI
Args:
model_path: Path to YOLO model (.pt file)
data_path: Path to YOLO format validation dataset
imgsz: Image size for validation
batch: Batch size
Returns:
Dictionary containing validation results
"""
# Validate model path
if not os.path.exists(model_path):
return {"error": f"Model file not found: {model_path}"}
# Validate dataset path
if not os.path.exists(data_path):
return {"error": f"Dataset not found: {data_path}"}
# Determine task type based on model name
model_name = os.path.basename(model_path)
task = "detect" # Default task
if "seg" in model_name:
task = "segment"
elif "pose" in model_name:
task = "pose"
elif "cls" in model_name:
task = "classify"
elif "obb" in model_name:
task = "obb"
# Build YOLO CLI command
cmd_args = [
task, # Task
"val", # Mode
f"model={model_path}",
f"data={data_path}",
f"imgsz={imgsz}",
f"batch={batch}"
]
# Run YOLO CLI command
result = run_yolo_cli(cmd_args, timeout=300) # 5 minute timeout
# Check for command success
if not result["success"]:
return {
"error": f"Validation failed: {result.get('error', 'Unknown error')}",
"command": result.get("command", ""),
"stderr": result.get("stderr", "")
}
# Extract metrics from validation output
metrics = {}
try:
stdout = result.get("stdout", "")
import re
precision_match = re.search(r"Precision: ([\d\.]+)", stdout)
recall_match = re.search(r"Recall: ([\d\.]+)", stdout)
map50_match = re.search(r"mAP50: ([\d\.]+)", stdout)
map_match = re.search(r"mAP50-95: ([\d\.]+)", stdout)
if precision_match:
metrics["precision"] = float(precision_match.group(1))
if recall_match:
metrics["recall"] = float(recall_match.group(1))
if map50_match:
metrics["mAP50"] = float(map50_match.group(1))
if map_match:
metrics["mAP50-95"] = float(map_match.group(1))
except Exception as e:
logger.warning(f"Failed to parse metrics from validation output: {str(e)}")
return {
"status": "success",
"metrics": metrics,
"validation_output": result.get("stdout", "")[:1000] + "..." if len(result.get("stdout", "")) > 1000 else result.get("stdout", "")
}
@mcp.tool()
def export_model(
model_path: str,
format: str = "onnx",
imgsz: int = 640
) -> Dict[str, Any]:
"""
Export a YOLO model to different formats using CLI
Args:
model_path: Path to YOLO model (.pt file)
format: Export format (onnx, torchscript, openvino, etc.)
imgsz: Image size for export
Returns:
Dictionary containing export results
"""
# Validate model path
if not os.path.exists(model_path):
return {"error": f"Model file not found: {model_path}"}
# Valid export formats
valid_formats = [
"torchscript", "onnx", "openvino", "engine", "coreml", "saved_model",
"pb", "tflite", "edgetpu", "tfjs", "paddle"
]
if format not in valid_formats:
return {"error": f"Invalid export format: {format}. Valid formats include: {', '.join(valid_formats)}"}
# Build YOLO CLI command
cmd_args = [
"export", # Combined task and mode for export
f"model={model_path}",
f"format={format}",
f"imgsz={imgsz}"
]
# Run YOLO CLI command
result = run_yolo_cli(cmd_args, timeout=300) # 5 minute timeout
# Check for command success
if not result["success"]:
return {
"error": f"Export failed: {result.get('error', 'Unknown error')}",
"command": result.get("command", ""),
"stderr": result.get("stderr", "")
}
# Try to determine export path
export_path = None
try:
# Model path without extension
base_path = os.path.splitext(model_path)[0]
# Expected export paths based on format
format_extensions = {
"torchscript": ".torchscript",
"onnx": ".onnx",
"openvino": "_openvino_model",
"engine": ".engine",
"coreml": ".mlmodel",
"saved_model": "_saved_model",
"pb": ".pb",
"tflite": ".tflite",
"edgetpu": "_edgetpu.tflite",
"tfjs": "_web_model",
"paddle": "_paddle_model"
}
expected_ext = format_extensions.get(format, "")
expected_path = base_path + expected_ext
# Check if the exported file exists
if os.path.exists(expected_path) or os.path.isdir(expected_path):
export_path = expected_path
except Exception as e:
logger.warning(f"Failed to determine export path: {str(e)}")
return {
"status": "success",
"export_path": export_path,
"format": format,
"export_output": result.get("stdout", "")[:1000] + "..." if len(result.get("stdout", "")) > 1000 else result.get("stdout", "")
}
@mcp.tool()
def list_available_models() -> List[str]:
"""List available YOLO models that actually exist on disk in any configured directory"""
# Common YOLO model patterns
model_patterns = [
"yolov11*.pt",
"yolov8*.pt"
]
# Find all existing models in all configured directories
available_models = set()
for directory in CONFIG["model_dirs"]:
if not os.path.exists(directory):
continue
# Check for model files directly
for filename in os.listdir(directory):
if filename.endswith(".pt") and any(
fnmatch.fnmatch(filename, pattern) for pattern in model_patterns
):
available_models.add(filename)
# Convert to sorted list
result = sorted(list(available_models))
if not result:
logger.warning("No model files found in configured directories.")
return ["No models available - download models to any of these directories: " + ", ".join(CONFIG["model_dirs"])]
return result
# Camera detection functions using CLI instead of Python API
def camera_detection_thread(model_name, confidence, fps_limit=30, camera_id=0):
"""Background thread for camera detection using YOLO CLI"""
global camera_running, detection_results, camera_last_access_time, camera_startup_status, camera_last_error
try:
# Create a unique directory for camera results
output_dir = os.path.join(tempfile.gettempdir(), f"yolo_camera_{int(time.time())}")
os.makedirs(output_dir, exist_ok=True)
# Determine full model path
model_path = None
for directory in CONFIG["model_dirs"]:
potential_path = os.path.join(directory, model_name)
if os.path.exists(potential_path):
model_path = potential_path
break
if model_path is None:
error_msg = f"Model {model_name} not found in any configured directories"
logger.error(error_msg)
camera_running = False
camera_startup_status = {
"success": False,
"error": error_msg,
"timestamp": time.time()
}
detection_results.append({
"timestamp": time.time(),
"error": f"Failed to load model: Model not found",
"camera_status": "error",
"detections": []
})
return
# Log camera start
logger.info(f"Starting camera detection with model {model_name}, camera ID {camera_id}")
detection_results.append({
"timestamp": time.time(),
"system_info": {
"os": platform.system() if 'platform' in globals() else "Unknown",
"camera_id": camera_id
},
"camera_status": "starting",
"detections": []
})
# Determine task type based on model name
task = "detect" # Default task
if "seg" in model_name:
task = "segment"
elif "pose" in model_name:
task = "pose"
elif "cls" in model_name:
task = "classify"
# Build YOLO CLI command
base_cmd_args = [
task, # Task
"predict", # Mode
f"model={model_path}",
f"source={camera_id}", # Camera source ID
f"conf={confidence}",
"format=json",
"save=False", # Don't save frames by default
"show=False" # Don't show GUI window
]
# First verify YOLO command is available
logger.info("Verifying YOLO CLI availability before starting camera...")
check_cmd = ["yolo", "--version"]
try:
check_result = subprocess.run(
check_cmd,
capture_output=True,
text=True,
check=False,
timeout=10
)
if check_result.returncode != 0:
error_msg = f"YOLO CLI check failed with code {check_result.returncode}: {check_result.stderr}"
logger.error(error_msg)
camera_running = False
camera_startup_status = {
"success": False,
"error": error_msg,
"timestamp": time.time()
}
detection_results.append({
"timestamp": time.time(),
"error": error_msg,
"camera_status": "error",
"detections": []
})
return
logger.info(f"YOLO CLI is available: {check_result.stdout.strip()}")
except Exception as e:
error_msg = f"Error checking YOLO CLI: {str(e)}"
logger.error(error_msg)
camera_running = False
camera_startup_status = {
"success": False,
"error": error_msg,
"timestamp": time.time()
}
detection_results.append({
"timestamp": time.time(),
"error": error_msg,
"camera_status": "error",
"detections": []
})
return
# Set up subprocess for ongoing camera capture
process = None
frame_count = 0
error_count = 0
start_time = time.time()
# Start YOLO CLI process
cmd_str = "yolo " + " ".join(base_cmd_args)
logger.info(f"Starting YOLO CLI process: {cmd_str}")
try:
process = subprocess.Popen(
["yolo"] + base_cmd_args,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
bufsize=1, # Line buffered
)
# Wait a moment to check if the process immediately fails
time.sleep(1)
if process.poll() is not None:
error_msg = f"YOLO process failed to start (exit code {process.returncode})"
stderr_output = process.stderr.read()
logger.error(f"{error_msg} - STDERR: {stderr_output}")
camera_running = False
camera_startup_status = {
"success": False,
"error": error_msg,
"stderr": stderr_output,
"timestamp": time.time()
}
detection_results.append({
"timestamp": time.time(),
"error": error_msg,
"stderr": stderr_output,
"camera_status": "error",
"detections": []
})
return
# Process started successfully
camera_startup_status = {
"success": True,
"timestamp": time.time()
}
# Handle camera stream
while camera_running:
# Read output line from process
stdout_line = process.stdout.readline().strip()
if not stdout_line:
# Check if process is still running
if process.poll() is not None:
error_msg = f"YOLO process ended unexpectedly with code {process.returncode}"
stderr_output = process.stderr.read()
logger.error(f"{error_msg} - STDERR: {stderr_output}")
camera_running = False
camera_last_error = {
"error": error_msg,
"stderr": stderr_output,
"timestamp": time.time()
}
detection_results.append({
"timestamp": time.time(),
"error": error_msg,
"camera_status": "error",
"stderr": stderr_output,
"detections": []
})
break
time.sleep(0.1) # Short sleep to avoid CPU spin
continue
# Try to parse JSON output from YOLO
try:
# Find JSON in the output line
json_start = stdout_line.find("{")
if json_start >= 0:
json_str = stdout_line[json_start:]
detection_data = json.loads(json_str)
frame_count += 1
# Process detection data
if "predictions" in detection_data:
detections = []
for pred in detection_data["predictions"]:
# Extract box coordinates
box = pred.get("box", {})
x1, y1, x2, y2 = box.get("x1", 0), box.get("y1", 0), box.get("x2", 0), box.get("y2", 0)
# Extract class information
confidence = pred.get("confidence", 0)
class_name = pred.get("name", "unknown")
class_id = pred.get("class", -1)
detections.append({
"box": [x1, y1, x2, y2],
"confidence": confidence,
"class_id": class_id,
"class_name": class_name
})
# Update detection results (keep only the last 10)
if len(detection_results) >= 10:
detection_results.pop(0)
# Get image dimensions if available
image_shape = [
detection_data.get("width", 0),
detection_data.get("height", 0)
]
detection_results.append({
"timestamp": time.time(),
"frame_count": frame_count,
"detections": detections,
"camera_status": "running",
"image_shape": image_shape
})
# Update last access time when processing frames
camera_last_access_time = time.time()
# Log occasional status
if frame_count % 30 == 0:
fps = frame_count / (time.time() - start_time)
logger.info(f"Camera running: processed {frame_count} frames ({fps:.1f} FPS)")
detection_count = sum(len(r.get("detections", [])) for r in detection_results if "detections" in r)
logger.info(f"Total detections in current buffer: {detection_count}")
except json.JSONDecodeError:
# Not all lines will be valid JSON, that's normal
pass
except Exception as e:
error_msg = f"Error processing camera output: {str(e)}"
logger.warning(error_msg)
error_count += 1
if error_count > 10:
logger.error("Too many processing errors, stopping camera")
camera_running = False
camera_last_error = {
"error": "Too many processing errors",
"timestamp": time.time()
}
break
except Exception as e:
error_msg = f"Error in camera process management: {str(e)}"
logger.error(error_msg)
camera_running = False
camera_startup_status = {
"success": False,
"error": error_msg,
"timestamp": time.time()
}
detection_results.append({
"timestamp": time.time(),
"error": error_msg,
"camera_status": "error",
"detections": []
})
return
except Exception as e:
error_msg = f"Error in camera thread: {str(e)}"
logger.error(error_msg)
camera_running = False
camera_startup_status = {
"success": False,
"error": error_msg,
"timestamp": time.time()
}
detection_results.append({
"timestamp": time.time(),
"error": error_msg,
"camera_status": "error",
"detections": []
})
finally:
# Clean up
logger.info("Shutting down camera...")
camera_running = False
if process is not None and process.poll() is None:
try:
# Terminate process
process.terminate()
process.wait(timeout=5)
except subprocess.TimeoutExpired:
process.kill() # Force kill if terminate doesn't work
except Exception as e:
logger.error(f"Error terminating YOLO process: {str(e)}")
logger.info("Camera detection stopped")
@mcp.tool()
def start_camera_detection(
model_name: str = "yolov8n.pt",
confidence: float = 0.25,
camera_id: int = 0
) -> Dict[str, Any]:
"""
Start realtime object detection using the computer's camera via YOLO CLI
Args:
model_name: YOLO model name to use
confidence: Detection confidence threshold
camera_id: Camera device ID (0 is usually the default camera)
Returns:
Status of camera detection
"""
global camera_thread, camera_running, detection_results, camera_last_access_time, camera_startup_status, camera_last_error
# Reset status variables
camera_startup_status = None
camera_last_error = None
# Check if already running
if camera_running:
# Update last access time
camera_last_access_time = time.time()
return {"status": "success", "message": "Camera detection is already running"}
# Clear previous results
detection_results = []
# First check if YOLO CLI is available
try:
version_check = run_yolo_cli(["--version"], timeout=10)
if not version_check["success"]:
return {
"status": "error",
"message": "YOLO CLI not available or not properly installed",
"details": version_check.get("error", "Unknown error"),
"solution": "Please make sure the 'yolo' command is in your PATH"
}
except Exception as e:
error_msg = f"Error checking YOLO CLI: {str(e)}"
logger.error(error_msg)
return {
"status": "error",
"message": error_msg,
"solution": "Please make sure the 'yolo' command is in your PATH"
}
# Start detection thread
camera_running = True
camera_last_access_time = time.time() # Update access time
camera_thread = threading.Thread(
target=camera_detection_thread,
args=(model_name, confidence, 30, camera_id),
daemon=True
)
camera_thread.start()
# Give the thread a moment to initialize and potentially fail
time.sleep(2)
# Check if the thread has reported any startup issues
if camera_startup_status and not camera_startup_status.get("success", False):
# Camera thread encountered an error during startup
return {
"status": "error",
"message": "Camera detection failed to start",
"details": camera_startup_status,
"solution": "Check logs for detailed error information"
}
# Thread is running, camera should be starting
return {
"status": "success",
"message": f"Started camera detection using model {model_name}",
"model": model_name,
"confidence": confidence,
"camera_id": camera_id,
"auto_shutdown": f"Camera will auto-shutdown after {CAMERA_INACTIVITY_TIMEOUT} seconds of inactivity",
"note": "If camera doesn't work, try different camera_id values (0, 1, or 2)"
}
@mcp.tool()
def stop_camera_detection() -> Dict[str, Any]:
"""
Stop realtime camera detection
Returns:
Status message
"""
global camera_running
if not camera_running:
return {"status": "error", "message": "Camera detection is not running"}
logger.info("Stopping camera detection by user request")
camera_running = False
# Wait for thread to terminate
if camera_thread and camera_thread.is_alive():
camera_thread.join(timeout=2.0)
return {
"status": "success",
"message": "Stopped camera detection"
}
@mcp.tool()
def get_camera_detections() -> Dict[str, Any]:
"""
Get the latest detections from the camera
Returns:
Dictionary with recent detections
"""
global detection_results, camera_thread, camera_last_access_time, camera_startup_status, camera_last_error
# Update the last access time whenever this function is called
if camera_running:
camera_last_access_time = time.time()
# Check if thread is alive
thread_alive = camera_thread is not None and camera_thread.is_alive()
# If camera_running is False, check if we have startup status information
if not camera_running and camera_startup_status and not camera_startup_status.get("success", False):
return {
"status": "error",
"message": "Camera detection failed to start",
"is_running": False,
"camera_status": "error",
"startup_error": camera_startup_status,
"solution": "Check logs for detailed error information"
}
# If camera_running is True but thread is dead, there's an issue
if camera_running and not thread_alive:
return {
"status": "error",
"message": "Camera thread has stopped unexpectedly",
"is_running": False,
"camera_status": "error",
"thread_alive": thread_alive,
"last_error": camera_last_error,
"detections": detection_results,
"count": len(detection_results),
"solution": "Please try restart the camera with a different camera_id"
}
if not camera_running:
return {
"status": "error",
"message": "Camera detection is not running",
"is_running": False,
"camera_status": "stopped"
}
# Check for errors in detection results
errors = [result.get("error") for result in detection_results if "error" in result]
recent_errors = errors[-5:] if errors else []
# Count actual detections
detection_count = sum(len(result.get("detections", [])) for result in detection_results if "detections" in result)
return {
"status": "success",
"is_running": camera_running,
"thread_alive": thread_alive,
"detections": detection_results,
"count": len(detection_results),
"total_detections": detection_count,
"recent_errors": recent_errors if recent_errors else None,
"camera_status": "error" if recent_errors else "running",
"inactivity_timeout": {
"seconds_remaining": int(CAMERA_INACTIVITY_TIMEOUT - (time.time() - camera_last_access_time)),
"last_access": camera_last_access_time
}
}
@mcp.tool()
def comprehensive_image_analysis(
image_path: str,
confidence: float = 0.25,
save_results: bool = False
) -> Dict[str, Any]:
"""
Perform comprehensive analysis on an image by combining multiple CLI model results
Args:
image_path: Path to the image file
confidence: Detection confidence threshold
save_results: Whether to save results to disk
Returns:
Dictionary containing comprehensive analysis results
"""
try:
if not os.path.exists(image_path):
return {"error": f"Image file not found: {image_path}"}
analysis_results = {}
# 1. Object detection
logger.info("Running object detection for comprehensive analysis")
object_result = detect_objects(
image_data=image_path,
model_name="yolov11n.pt",
confidence=confidence,
save_results=save_results,
is_path=True
)
# Process object detection results
detected_objects = []
if "results" in object_result and object_result["results"]:
for result in object_result["results"]:
for obj in result.get("detections", []):
detected_objects.append({
"class_name": obj.get("class_name", "unknown"),
"confidence": obj.get("confidence", 0)
})
analysis_results["objects"] = detected_objects
# 2. Scene classification
try:
logger.info("Running classification for comprehensive analysis")
cls_result = classify_image(
image_data=image_path,
model_name="yolov8n-cls.pt",
top_k=3,
save_results=False,
is_path=True
)
scene_classifications = []
if "results" in cls_result and cls_result["results"]:
for result in cls_result["results"]:
for cls in result.get("classifications", []):
scene_classifications.append({
"class_name": cls.get("class_name", "unknown"),
"probability": cls.get("probability", 0)
})
analysis_results["scene"] = scene_classifications
except Exception as e:
logger.error(f"Error during scene classification: {str(e)}")
analysis_results["scene_error"] = str(e)
# 3. Human pose detection (if pose model is available)
try:
# Check if pose model exists
pose_model_exists = False
for directory in CONFIG["model_dirs"]:
if os.path.exists(os.path.join(directory, "yolov8n-pose.pt")):
pose_model_exists = True
break
if pose_model_exists:
logger.info("Running pose detection for comprehensive analysis")
# Build YOLO CLI command for pose detection
cmd_args = [
"pose", # Task
"predict", # Mode
f"model=yolov8n-pose.pt",
f"source={image_path}",
f"conf={confidence}",
"format=json",
]
result = run_yolo_cli(cmd_args)
if result["success"]:
# Parse JSON output
json_start = result["stdout"].find("{")
json_end = result["stdout"].rfind("}")
if json_start >= 0 and json_end > json_start:
json_str = result["stdout"][json_start:json_end+1]
pose_data = json.loads(json_str)
detected_poses = []
if "predictions" in pose_data:
for pred in pose_data["predictions"]:
confidence = pred.get("confidence", 0)
keypoints = pred.get("keypoints", [])
detected_poses.append({
"person_confidence": confidence,
"has_keypoints": len(keypoints) if keypoints else 0
})
analysis_results["poses"] = detected_poses
else:
analysis_results["pose_error"] = "Pose model not available"
except Exception as e:
logger.error(f"Error during pose detection: {str(e)}")
analysis_results["pose_error"] = str(e)
# 4. Comprehensive task description
tasks = []
# Detect main objects
main_objects = [obj["class_name"] for obj in detected_objects if obj["confidence"] > 0.5]
if "person" in main_objects:
tasks.append("Person Detection")
# Check for weapon objects
weapon_objects = ["sword", "knife", "katana", "gun", "pistol", "rifle"]
weapons = [obj for obj in main_objects if any(weapon in obj.lower() for weapon in weapon_objects)]
if weapons:
tasks.append(f"Weapon Detection ({', '.join(weapons)})")
# Count people
person_count = main_objects.count("person")
if person_count > 0:
tasks.append(f"Person Count ({person_count} people)")
# Pose analysis
if "poses" in analysis_results and analysis_results["poses"]:
tasks.append("Human Pose Analysis")
# Scene classification
if "scene" in analysis_results and analysis_results["scene"]:
scene_types = [scene["class_name"] for scene in analysis_results["scene"][:2]]
tasks.append(f"Scene Classification ({', '.join(scene_types)})")
analysis_results["identified_tasks"] = tasks
# Return comprehensive results
return {
"status": "success",
"image_path": image_path,
"analysis": analysis_results,
"summary": "Tasks identified in the image: " + ", ".join(tasks) if tasks else "No clear tasks identified"
}
except Exception as e:
return {
"status": "error",
"image_path": image_path,
"error": f"Comprehensive analysis failed: {str(e)}"
}
@mcp.tool()
def analyze_image_from_path(
image_path: str,
model_name: str = "yolov8n.pt",
confidence: float = 0.25,
save_results: bool = False
) -> Dict[str, Any]:
"""
Analyze image from file path using YOLO CLI
Args:
image_path: Path to the image file
model_name: YOLO model name
confidence: Detection confidence threshold
save_results: Whether to save results to disk
Returns:
Dictionary containing detection results
"""
try:
# Call detect_objects function with is_path=True
return detect_objects(
image_data=image_path,
model_name=model_name,
confidence=confidence,
save_results=save_results,
is_path=True
)
except Exception as e:
return {
"error": f"Failed to analyze image: {str(e)}",
"image_path": image_path
}
@mcp.tool()
def test_connection() -> Dict[str, Any]:
"""
Test if YOLO CLI service is running properly
Returns:
Status information and available tools
"""
# Test YOLO CLI availability
try:
version_result = run_yolo_cli(["--version"], timeout=10)
yolo_version = version_result.get("stdout", "Unknown") if version_result.get("success") else "Not available"
# Clean up version string
if "ultralytics" in yolo_version.lower():
yolo_version = yolo_version.strip()
else:
yolo_version = "YOLO CLI not found or not responding correctly"
except Exception as e:
yolo_version = f"Error checking YOLO CLI: {str(e)}"
return {
"status": "YOLO CLI service is running normally",
"yolo_version": yolo_version,
"available_models": list_available_models(),
"available_tools": [
"list_available_models", "detect_objects", "segment_objects",
"classify_image", "track_objects", "train_model", "validate_model",
"export_model", "start_camera_detection", "stop_camera_detection",
"get_camera_detections", "test_connection",
# Additional tools
"analyze_image_from_path",
"comprehensive_image_analysis"
],
"features": [
"All detection functions use YOLO CLI rather than Python API",
"Support for loading images directly from file paths",
"Support for comprehensive image analysis with task identification",
"Support for camera detection using YOLO CLI"
]
}
def cleanup_resources():
"""Clean up resources when the server is shutting down"""
global camera_running
logger.info("Cleaning up resources...")
# Stop camera if it's running
if camera_running:
logger.info("Shutting down camera during server exit")
camera_running = False
# Give the camera thread a moment to clean up
if camera_thread and camera_thread.is_alive():
camera_thread.join(timeout=2.0)
logger.info("Cleanup complete")
def signal_handler(sig, frame):
"""Handle termination signals"""
logger.info(f"Received signal {sig}, shutting down...")
cleanup_resources()
sys.exit(0)
def start_watchdog():
"""Start the camera watchdog thread"""
watchdog = threading.Thread(
target=camera_watchdog_thread,
daemon=True
)
watchdog.start()
return watchdog
# Register cleanup functions
atexit.register(cleanup_resources)
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
# Modify the main execution section
if __name__ == "__main__":
import platform
logger.info("Starting YOLO CLI service")
logger.info(f"Platform: {platform.system()} {platform.release()}")
# Test if YOLO CLI is available
try:
test_result = run_yolo_cli(["--version"], timeout=10)
if test_result["success"]:
logger.info(f"YOLO CLI available: {test_result.get('stdout', '').strip()}")
else:
logger.warning(f"YOLO CLI test failed: {test_result.get('stderr', '')}")
logger.warning("Service may not function correctly without YOLO CLI available")
except Exception as e:
logger.error(f"Error testing YOLO CLI: {str(e)}")
logger.warning("Service may not function correctly without YOLO CLI available")
# Start the camera watchdog thread
watchdog_thread = start_watchdog()
# Initialize and run server
logger.info("Starting MCP server...")
mcp.run(transport='stdio')