YOLO MCP Server
by GongRzhe
Verified
# server.py
import fnmatch
import os
import base64
import cv2
import time
import threading
from io import BytesIO
from typing import List, Dict, Any, Optional, Union
import numpy as np
from PIL import Image
from mcp.server.fastmcp import FastMCP
from ultralytics import YOLO
# Add this near the top of server.py with other imports
import os.path
import sys
import logging
import contextlib
import logging
import sys
import contextlib
import signal
import atexit
# Set up logging configuration - add this near the top of the file
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler("yolo_service.log"),
logging.StreamHandler(sys.stderr)
]
)
logger = logging.getLogger('yolo_service')
# Global variables for camera control
camera_running = False
camera_thread = None
detection_results = []
camera_last_access_time = 0
CAMERA_INACTIVITY_TIMEOUT = 60 # Auto-shutdown after 60 seconds of inactivity
@contextlib.contextmanager
def redirect_stdout_to_stderr():
old_stdout = sys.stdout
sys.stdout = sys.stderr
try:
yield
finally:
sys.stdout = old_stdout
def camera_watchdog_thread():
"""Monitor thread that auto-stops the camera after inactivity"""
global camera_running, camera_last_access_time
logger.info("Camera watchdog thread started")
while True:
# Sleep for a short time to avoid excessive CPU usage
time.sleep(5)
# Check if camera is running
if camera_running:
current_time = time.time()
elapsed_time = current_time - camera_last_access_time
# If no access for more than the timeout, auto-stop
if elapsed_time > CAMERA_INACTIVITY_TIMEOUT:
logger.info(f"Auto-stopping camera after {elapsed_time:.1f} seconds of inactivity")
stop_camera_detection()
else:
# If camera is not running, no need to check frequently
time.sleep(10)
def load_image(image_source, is_path=False):
"""
Load image from file path or base64 data
Args:
image_source: File path or base64 encoded image data
is_path: Whether image_source is a file path
Returns:
PIL Image object
"""
try:
if is_path:
# Load image from file path
if os.path.exists(image_source):
return Image.open(image_source)
else:
raise FileNotFoundError(f"Image file not found: {image_source}")
else:
# Load image from base64 data
image_bytes = base64.b64decode(image_source)
return Image.open(BytesIO(image_bytes))
except Exception as e:
raise ValueError(f"Failed to load image: {str(e)}")
# Create MCP server
mcp = FastMCP("YOLO_Service")
# Global model cache
models = {}
def get_model(model_name: str = "yolov8n.pt") -> YOLO:
"""Get or load YOLO model from any of the configured model directories"""
if model_name in models:
return models[model_name]
# Try to find the model in any of the configured directories
model_path = None
for directory in CONFIG["model_dirs"]:
potential_path = os.path.join(directory, model_name)
if os.path.exists(potential_path):
model_path = potential_path
break
if model_path is None:
available = list_available_models()
available_str = ", ".join(available) if available else "none"
raise FileNotFoundError(f"Model '{model_name}' not found in any configured directories. Available models: {available_str}")
# Load and cache the model - with stdout redirected
logger.info(f"Loading model: {model_name} from {model_path}")
with redirect_stdout_to_stderr():
models[model_name] = YOLO(model_path)
return models[model_name]
# Global configuration
CONFIG = {
"model_dirs": [
".", # Current directory
"./models", # Models subdirectory
os.path.join(os.path.dirname(os.path.abspath(__file__)), "models"), # Absolute path to models
# Add any other potential model directories here
]
}
# Add a new tool to get information about model directories
@mcp.tool()
def get_model_directories() -> Dict[str, Any]:
"""Get information about configured model directories and available models"""
directories = []
for directory in CONFIG["model_dirs"]:
dir_info = {
"path": directory,
"exists": os.path.exists(directory),
"is_directory": os.path.isdir(directory) if os.path.exists(directory) else False,
"models": []
}
if dir_info["exists"] and dir_info["is_directory"]:
for filename in os.listdir(directory):
if filename.endswith(".pt"):
dir_info["models"].append(filename)
directories.append(dir_info)
return {
"configured_directories": CONFIG["model_dirs"],
"directory_details": directories,
"available_models": list_available_models(),
"loaded_models": list(models.keys())
}
@mcp.tool()
def detect_objects(
image_data: str,
model_name: str = "yolov8n.pt",
confidence: float = 0.25,
save_results: bool = False,
is_path: bool = False
) -> Dict[str, Any]:
"""
Detect objects in an image using YOLO
Args:
image_data: Base64 encoded image or file path (if is_path=True)
model_name: YOLO model name
confidence: Detection confidence threshold
save_results: Whether to save results to disk
is_path: Whether image_data is a file path
Returns:
Dictionary containing detection results
"""
try:
# Load image (supports path or base64)
image = load_image(image_data, is_path=is_path)
# Load model and perform detection - with stdout redirected
model = get_model(model_name)
with redirect_stdout_to_stderr(): # Ensure all YOLO outputs go to stderr
results = model.predict(image, conf=confidence, save=save_results)
# Format results
formatted_results = []
for result in results:
boxes = result.boxes
detections = []
for i in range(len(boxes)):
box = boxes[i]
x1, y1, x2, y2 = box.xyxy[0].tolist()
confidence = float(box.conf[0])
class_id = int(box.cls[0])
class_name = result.names[class_id]
detections.append({
"box": [x1, y1, x2, y2],
"confidence": confidence,
"class_id": class_id,
"class_name": class_name
})
formatted_results.append({
"detections": detections,
"image_shape": result.orig_shape
})
return {
"results": formatted_results,
"model_used": model_name,
"total_detections": sum(len(r["detections"]) for r in formatted_results),
"source": image_data if is_path else "base64_image"
}
except Exception as e:
logger.error(f"Error in detect_objects: {str(e)}")
return {
"error": f"Failed to detect objects: {str(e)}",
"source": image_data if is_path else "base64_image"
}
@mcp.tool()
def segment_objects(
image_data: str,
model_name: str = "yolov11n-seg.pt",
confidence: float = 0.25,
save_results: bool = False,
is_path: bool = False
) -> Dict[str, Any]:
"""
Perform instance segmentation on an image using YOLO
Args:
image_data: Base64 encoded image or file path (if is_path=True)
model_name: YOLO segmentation model name
confidence: Detection confidence threshold
save_results: Whether to save results to disk
is_path: Whether image_data is a file path
Returns:
Dictionary containing segmentation results
"""
try:
# Load image (supports path or base64)
image = load_image(image_data, is_path=is_path)
# Load model and perform segmentation
model = get_model(model_name)
with redirect_stdout_to_stderr(): # Add this context manager
results = model.predict(image, conf=confidence, save=save_results)
# Format results
formatted_results = []
for result in results:
if not hasattr(result, 'masks') or result.masks is None:
continue
boxes = result.boxes
masks = result.masks
segments = []
for i in range(len(boxes)):
box = boxes[i]
mask = masks[i].data[0].cpu().numpy() if masks else None
x1, y1, x2, y2 = box.xyxy[0].tolist()
confidence = float(box.conf[0])
class_id = int(box.cls[0])
class_name = result.names[class_id]
segment = {
"box": [x1, y1, x2, y2],
"confidence": confidence,
"class_id": class_id,
"class_name": class_name
}
if mask is not None:
# Convert binary mask to simplified format for API response
segment["mask"] = mask.tolist()
segments.append(segment)
formatted_results.append({
"segments": segments,
"image_shape": result.orig_shape
})
return {
"results": formatted_results,
"model_used": model_name,
"total_segments": sum(len(r["segments"]) for r in formatted_results),
"source": image_data if is_path else "base64_image"
}
except Exception as e:
return {
"error": f"Failed to segment objects: {str(e)}",
"source": image_data if is_path else "base64_image"
}
@mcp.tool()
def classify_image(
image_data: str,
model_name: str = "yolov11n-cls.pt",
top_k: int = 5,
save_results: bool = False,
is_path: bool = False
) -> Dict[str, Any]:
"""
Classify an image using YOLO classification model
Args:
image_data: Base64 encoded image or file path (if is_path=True)
model_name: YOLO classification model name
top_k: Number of top categories to return
save_results: Whether to save results to disk
is_path: Whether image_data is a file path
Returns:
Dictionary containing classification results
"""
try:
# Load image (supports path or base64)
image = load_image(image_data, is_path=is_path)
# Load model and perform classification
model = get_model(model_name)
with redirect_stdout_to_stderr(): # Add this context manager
results = model.predict(image, save=save_results)
# Format results
formatted_results = []
for result in results:
if not hasattr(result, 'probs') or result.probs is None:
continue
probs = result.probs
top_indices = probs.top5
top_probs = probs.top5conf.tolist()
top_classes = [result.names[idx] for idx in top_indices]
classifications = [
{"class_id": int(idx), "class_name": name, "probability": float(prob)}
for idx, name, prob in zip(top_indices[:top_k], top_classes[:top_k], top_probs[:top_k])
]
formatted_results.append({
"classifications": classifications,
"image_shape": result.orig_shape
})
return {
"results": formatted_results,
"model_used": model_name,
"top_k": top_k,
"source": image_data if is_path else "base64_image"
}
except Exception as e:
return {
"error": f"Failed to classify image: {str(e)}",
"source": image_data if is_path else "base64_image"
}
@mcp.tool()
def track_objects(
image_data: str,
model_name: str = "yolov8n.pt",
confidence: float = 0.25,
tracker: str = "bytetrack.yaml",
save_results: bool = False
) -> Dict[str, Any]:
"""
Track objects in an image sequence using YOLO
Args:
image_data: Base64 encoded image
model_name: YOLO model name
confidence: Detection confidence threshold
tracker: Tracker name to use (e.g., 'bytetrack.yaml', 'botsort.yaml')
save_results: Whether to save results to disk
Returns:
Dictionary containing tracking results
"""
# Decode Base64 image
image_bytes = base64.b64decode(image_data)
image = Image.open(BytesIO(image_bytes))
# Load model and perform tracking
model = get_model(model_name)
# Add redirect_stdout_to_stderr context manager
with redirect_stdout_to_stderr():
results = model.track(image, conf=confidence, tracker=tracker, save=save_results)
# Format results
formatted_results = []
for result in results:
if not hasattr(result, 'boxes') or result.boxes is None:
continue
boxes = result.boxes
tracks = []
for i in range(len(boxes)):
box = boxes[i]
x1, y1, x2, y2 = box.xyxy[0].tolist()
confidence = float(box.conf[0])
class_id = int(box.cls[0])
class_name = result.names[class_id]
# Extract track ID (if any)
track_id = int(box.id[0]) if box.id is not None else None
track = {
"box": [x1, y1, x2, y2],
"confidence": confidence,
"class_id": class_id,
"class_name": class_name,
"track_id": track_id
}
tracks.append(track)
formatted_results.append({
"tracks": tracks,
"image_shape": result.orig_shape
})
return {
"results": formatted_results,
"model_used": model_name,
"tracker": tracker,
"total_tracks": sum(len(r["tracks"]) for r in formatted_results)
}
# 3. FIX train_model FUNCTION TO USE REDIRECTION:
@mcp.tool()
def train_model(
dataset_path: str,
model_name: str = "yolov8n.pt",
epochs: int = 100,
imgsz: int = 640,
batch: int = 16,
name: str = "yolo_custom_model",
project: str = "runs/train"
) -> Dict[str, Any]:
"""
Train a YOLO model on a custom dataset
Args:
dataset_path: Path to YOLO format dataset
model_name: Base model to start with
epochs: Number of training epochs
imgsz: Image size for training
batch: Batch size
name: Name for the training run
project: Project directory
Returns:
Dictionary containing training results
"""
# Validate dataset path
if not os.path.exists(dataset_path):
return {"error": f"Dataset not found: {dataset_path}"}
# Initialize model
model = get_model(model_name)
# Train model
try:
# Add redirect_stdout_to_stderr context manager
with redirect_stdout_to_stderr():
results = model.train(
data=dataset_path,
epochs=epochs,
imgsz=imgsz,
batch=batch,
name=name,
project=project
)
# Get best model path
best_model_path = os.path.join(project, name, "weights", "best.pt")
return {
"status": "success",
"model_path": best_model_path,
"epochs_completed": epochs,
"final_metrics": {
"precision": float(results.results_dict.get("metrics/precision(B)", 0)),
"recall": float(results.results_dict.get("metrics/recall(B)", 0)),
"mAP50": float(results.results_dict.get("metrics/mAP50(B)", 0)),
"mAP50-95": float(results.results_dict.get("metrics/mAP50-95(B)", 0))
}
}
except Exception as e:
return {"error": f"Training failed: {str(e)}"}
# 4. FIX validate_model FUNCTION TO USE REDIRECTION:
@mcp.tool()
def validate_model(
model_path: str,
data_path: str,
imgsz: int = 640,
batch: int = 16
) -> Dict[str, Any]:
"""
Validate a YOLO model on a dataset
Args:
model_path: Path to YOLO model (.pt file)
data_path: Path to YOLO format validation dataset
imgsz: Image size for validation
batch: Batch size
Returns:
Dictionary containing validation results
"""
# Validate model path
if not os.path.exists(model_path):
return {"error": f"Model file not found: {model_path}"}
# Validate dataset path
if not os.path.exists(data_path):
return {"error": f"Dataset not found: {data_path}"}
# Load model
try:
model = get_model(model_path)
except Exception as e:
return {"error": f"Failed to load model: {str(e)}"}
# Validate model
try:
# Add redirect_stdout_to_stderr context manager
with redirect_stdout_to_stderr():
results = model.val(data=data_path, imgsz=imgsz, batch=batch)
return {
"status": "success",
"metrics": {
"precision": float(results.results_dict.get("metrics/precision(B)", 0)),
"recall": float(results.results_dict.get("metrics/recall(B)", 0)),
"mAP50": float(results.results_dict.get("metrics/mAP50(B)", 0)),
"mAP50-95": float(results.results_dict.get("metrics/mAP50-95(B)", 0))
}
}
except Exception as e:
return {"error": f"Validation failed: {str(e)}"}
# 5. FIX export_model FUNCTION TO USE REDIRECTION:
@mcp.tool()
def export_model(
model_path: str,
format: str = "onnx",
imgsz: int = 640
) -> Dict[str, Any]:
"""
Export a YOLO model to different formats
Args:
model_path: Path to YOLO model (.pt file)
format: Export format (onnx, torchscript, openvino, etc.)
imgsz: Image size for export
Returns:
Dictionary containing export results
"""
# Validate model path
if not os.path.exists(model_path):
return {"error": f"Model file not found: {model_path}"}
# Valid export formats
valid_formats = [
"torchscript", "onnx", "openvino", "engine", "coreml", "saved_model",
"pb", "tflite", "edgetpu", "tfjs", "paddle"
]
if format not in valid_formats:
return {"error": f"Invalid export format: {format}. Valid formats include: {', '.join(valid_formats)}"}
# Load model
try:
model = get_model(model_path)
except Exception as e:
return {"error": f"Failed to load model: {str(e)}"}
# Export model
try:
# Add redirect_stdout_to_stderr context manager
with redirect_stdout_to_stderr():
export_path = model.export(format=format, imgsz=imgsz)
return {
"status": "success",
"export_path": str(export_path),
"format": format
}
except Exception as e:
return {"error": f"Export failed: {str(e)}"}
# 6. ADD REDIRECTION TO get_model_info FUNCTION:
@mcp.resource("model_info/{model_name}")
def get_model_info(model_name: str) -> Dict[str, Any]:
"""
Get information about a YOLO model
Args:
model_name: YOLO model name
Returns:
Dictionary containing model information
"""
try:
model = get_model(model_name)
# Get model task
task = 'detect' # Default task
if 'seg' in model_name:
task = 'segment'
elif 'pose' in model_name:
task = 'pose'
elif 'cls' in model_name:
task = 'classify'
elif 'obb' in model_name:
task = 'obb'
# Make sure any model property access that might trigger output is wrapped
with redirect_stdout_to_stderr():
yaml_str = str(model.yaml)
pt_path = str(model.pt_path) if hasattr(model, 'pt_path') else None
class_names = model.names
# Get model info
return {
"model_name": model_name,
"task": task,
"yaml": yaml_str,
"pt_path": pt_path,
"class_names": class_names
}
except Exception as e:
return {"error": f"Failed to get model info: {str(e)}"}
# 7. MODIFY list_available_models to use logging instead of print
@mcp.tool()
def list_available_models() -> List[str]:
"""List available YOLO models that actually exist on disk in any configured directory"""
# Common YOLO model patterns
model_patterns = [
"yolov11*.pt",
"yolov8*.pt"
]
# Find all existing models in all configured directories
available_models = set()
for directory in CONFIG["model_dirs"]:
if not os.path.exists(directory):
continue
# Check for model files directly
for filename in os.listdir(directory):
if filename.endswith(".pt") and any(
fnmatch.fnmatch(filename, pattern) for pattern in model_patterns
):
available_models.add(filename)
# Convert to sorted list
result = sorted(list(available_models))
if not result:
# Replace print with logger
logger.warning("No model files found in configured directories.")
return ["No models available - download models to any of these directories: " + ", ".join(CONFIG["model_dirs"])]
return result
@mcp.resource("model_info/{model_name}")
def get_model_info(model_name: str) -> Dict[str, Any]:
"""
Get information about a YOLO model
Args:
model_name: YOLO model name
Returns:
Dictionary containing model information
"""
try:
model = get_model(model_name)
# Get model task
task = 'detect' # Default task
if 'seg' in model_name:
task = 'segment'
elif 'pose' in model_name:
task = 'pose'
elif 'cls' in model_name:
task = 'classify'
elif 'obb' in model_name:
task = 'obb'
# Get model info
return {
"model_name": model_name,
"task": task,
"yaml": str(model.yaml),
"pt_path": str(model.pt_path) if hasattr(model, 'pt_path') else None,
"class_names": model.names
}
except Exception as e:
return {"error": f"Failed to get model info: {str(e)}"}
@mcp.tool()
def list_available_models() -> List[str]:
"""List available YOLO models that actually exist on disk in any configured directory"""
# Common YOLO model patterns
model_patterns = [
"yolov11*.pt",
"yolov8*.pt"
]
# Find all existing models in all configured directories
available_models = set()
for directory in CONFIG["model_dirs"]:
if not os.path.exists(directory):
continue
# Check for model files directly
for filename in os.listdir(directory):
if filename.endswith(".pt") and any(
fnmatch.fnmatch(filename, pattern) for pattern in model_patterns
):
available_models.add(filename)
# Convert to sorted list
result = sorted(list(available_models))
if not result:
print("Warning: No model files found in configured directories.")
return ["No models available - download models to any of these directories: " + ", ".join(CONFIG["model_dirs"])]
return result
# Camera detection background thread
camera_thread = None
camera_running = False
detection_results = []
def camera_detection_thread(model_name, confidence, fps_limit=30, camera_id=0):
"""Background thread for camera detection"""
global camera_running, detection_results
# Load model
try:
with redirect_stdout_to_stderr():
model = get_model(model_name)
logger.info(f"Model {model_name} loaded successfully")
except Exception as e:
logger.error(f"Error loading model: {str(e)}")
camera_running = False
detection_results.append({
"timestamp": time.time(),
"error": f"Failed to load model: {str(e)}",
"detections": []
})
return
# Rest of the function...
# Try to open camera with multiple attempts and multiple camera IDs if necessary
cap = None
error_message = ""
# Try camera IDs from 0 to 2
for cam_id in range(3):
try:
logger.info(f"Attempting to open camera with ID {cam_id}...")
cap = cv2.VideoCapture(cam_id)
if cap.isOpened():
logger.info(f"Successfully opened camera {cam_id}")
break
except Exception as e:
error_message = f"Error opening camera {cam_id}: {str(e)}"
logger.error(error_message)
# Check if any camera was successfully opened
if cap is None or not cap.isOpened():
logger.error("Error: Could not open any camera.")
camera_running = False
detection_results.append({
"timestamp": time.time(),
"error": "Failed to open camera. Make sure camera is connected and not in use by another application.",
"camera_status": "unavailable",
"detections": []
})
return
# Get camera properties for diagnostics
width = cap.get(cv2.CAP_PROP_FRAME_WIDTH)
height = cap.get(cv2.CAP_PROP_FRAME_HEIGHT)
fps = cap.get(cv2.CAP_PROP_FPS)
logger.info(f"Camera properties: {width}x{height} at {fps} FPS")
# Calculate frame interval based on fps_limit
frame_interval = 1.0 / fps_limit
frame_count = 0
error_count = 0
while camera_running:
start_time = time.time()
try:
ret, frame = cap.read()
if not ret:
logger.warning(f"Error: Failed to capture frame (attempt {error_count+1}).")
error_count += 1
# Add error to detection results
detection_results.append({
"timestamp": time.time(),
"error": f"Failed to capture frame (attempt {error_count})",
"camera_status": "error",
"detections": []
})
# If we have consistent failures, try to restart the camera
if error_count >= 5:
logger.warning("Too many frame capture errors, attempting to restart camera...")
cap.release()
time.sleep(1)
cap = cv2.VideoCapture(camera_id)
error_count = 0
if not cap.isOpened():
logger.error("Failed to reopen camera after errors.")
break
time.sleep(1) # Wait before trying again
continue
# Reset error count on successful frame capture
error_count = 0
frame_count += 1
# Perform detection on frame
with redirect_stdout_to_stderr(): # Add this context manager
results = model.predict(frame, conf=confidence)
# Update detection results (only keep the last 10)
if len(detection_results) >= 10:
detection_results.pop(0)
# Format results
for result in results:
boxes = result.boxes
detections = []
for i in range(len(boxes)):
box = boxes[i]
x1, y1, x2, y2 = box.xyxy[0].tolist()
conf = float(box.conf[0])
class_id = int(box.cls[0])
class_name = result.names[class_id]
detections.append({
"box": [x1, y1, x2, y2],
"confidence": conf,
"class_id": class_id,
"class_name": class_name
})
detection_results.append({
"timestamp": time.time(),
"frame_count": frame_count,
"detections": detections,
"camera_status": "running",
"image_shape": result.orig_shape
})
# Log occasional status
if frame_count % 30 == 0:
logger.info(f"Camera running: processed {frame_count} frames")
detection_count = sum(len(r.get("detections", [])) for r in detection_results if "detections" in r)
logger.info(f"Total detections in current buffer: {detection_count}")
# Limit FPS by waiting if necessary
elapsed = time.time() - start_time
if elapsed < frame_interval:
time.sleep(frame_interval - elapsed)
except Exception as e:
logger.error(f"Error in camera thread: {str(e)}")
detection_results.append({
"timestamp": time.time(),
"error": f"Exception in camera processing: {str(e)}",
"camera_status": "error",
"detections": []
})
time.sleep(1) # Wait before continuing
# Clean up
logger.info("Shutting down camera...")
if cap is not None:
cap.release()
@mcp.tool()
def start_camera_detection(
model_name: str = "yolov8n.pt",
confidence: float = 0.25,
camera_id: int = 0
) -> Dict[str, Any]:
"""
Start realtime object detection using the computer's camera
Args:
model_name: YOLO model name to use
confidence: Detection confidence threshold
camera_id: Camera device ID (0 is usually the default camera)
Returns:
Status of camera detection
"""
global camera_thread, camera_running, detection_results, camera_last_access_time
# Check if already running
if camera_running:
# Update last access time
camera_last_access_time = time.time()
return {"status": "success", "message": "Camera detection is already running"}
# Clear previous results
detection_results = []
# First, try to check if OpenCV is properly installed
try:
cv2_version = cv2.__version__
logger.info(f"OpenCV version: {cv2_version}")
except Exception as e:
logger.error(f"OpenCV not properly installed: {str(e)}")
return {
"status": "error",
"message": f"OpenCV not properly installed: {str(e)}",
"solution": "Please check OpenCV installation"
}
# Start detection thread
camera_running = True
camera_last_access_time = time.time() # Update access time
camera_thread = threading.Thread(
target=camera_detection_thread,
args=(model_name, confidence, 30, camera_id),
daemon=True
)
camera_thread.start()
# Add initial status to detection results
detection_results.append({
"timestamp": time.time(),
"system_info": {
"os": platform.system() if 'platform' in globals() else "Unknown",
"opencv_version": cv2.__version__,
"camera_id": camera_id
},
"camera_status": "starting",
"detections": []
})
return {
"status": "success",
"message": f"Started camera detection using model {model_name}",
"model": model_name,
"confidence": confidence,
"camera_id": camera_id,
"auto_shutdown": f"Camera will auto-shutdown after {CAMERA_INACTIVITY_TIMEOUT} seconds of inactivity",
"note": "If camera doesn't work, try different camera_id values (0, 1, or 2)"
}
@mcp.tool()
def stop_camera_detection() -> Dict[str, Any]:
"""
Stop realtime camera detection
Returns:
Status message
"""
global camera_running
if not camera_running:
return {"status": "error", "message": "Camera detection is not running"}
logger.info("Stopping camera detection by user request")
camera_running = False
# Wait for thread to terminate
if camera_thread and camera_thread.is_alive():
camera_thread.join(timeout=2.0)
return {
"status": "success",
"message": "Stopped camera detection"
}
@mcp.tool()
def get_camera_detections() -> Dict[str, Any]:
"""
Get the latest detections from the camera
Returns:
Dictionary with recent detections
"""
global detection_results, camera_thread, camera_last_access_time
# Update the last access time whenever this function is called
if camera_running:
camera_last_access_time = time.time()
# Check if thread is alive
thread_alive = camera_thread is not None and camera_thread.is_alive()
# If camera_running is True but thread is dead, there's an issue
if camera_running and not thread_alive:
return {
"status": "error",
"message": "Camera thread has stopped unexpectedly",
"is_running": False,
"camera_status": "error",
"thread_alive": thread_alive,
"detections": detection_results,
"count": len(detection_results),
"solution": "Please try restart the camera with a different camera_id"
}
if not camera_running:
return {
"status": "error",
"message": "Camera detection is not running",
"is_running": False,
"camera_status": "stopped"
}
# Check for errors in detection results
errors = [result.get("error") for result in detection_results if "error" in result]
recent_errors = errors[-5:] if errors else []
# Count actual detections
detection_count = sum(len(result.get("detections", [])) for result in detection_results if "detections" in result)
return {
"status": "success",
"is_running": camera_running,
"thread_alive": thread_alive,
"detections": detection_results,
"count": len(detection_results),
"total_detections": detection_count,
"recent_errors": recent_errors if recent_errors else None,
"camera_status": "error" if recent_errors else "running",
"inactivity_timeout": {
"seconds_remaining": int(CAMERA_INACTIVITY_TIMEOUT - (time.time() - camera_last_access_time)),
"last_access": camera_last_access_time
}
}
def cleanup_resources():
"""Clean up resources when the server is shutting down"""
global camera_running
logger.info("Cleaning up resources...")
# Stop camera if it's running
if camera_running:
logger.info("Shutting down camera during server exit")
camera_running = False
# Give the camera thread a moment to clean up
if camera_thread and camera_thread.is_alive():
camera_thread.join(timeout=2.0)
logger.info("Cleanup complete")
atexit.register(cleanup_resources)
def signal_handler(sig, frame):
"""Handle termination signals"""
logger.info(f"Received signal {sig}, shutting down...")
cleanup_resources()
sys.exit(0)
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
def start_watchdog():
"""Start the camera watchdog thread"""
watchdog = threading.Thread(
target=camera_watchdog_thread,
daemon=True
)
watchdog.start()
return watchdog
@mcp.tool()
def comprehensive_image_analysis(
image_path: str,
confidence: float = 0.25,
save_results: bool = False
) -> Dict[str, Any]:
"""
Perform comprehensive analysis on an image by combining multiple model results
Args:
image_path: Path to the image file
confidence: Detection confidence threshold
save_results: Whether to save results to disk
Returns:
Dictionary containing comprehensive analysis results
"""
try:
if not os.path.exists(image_path):
return {"error": f"Image file not found: {image_path}"}
# Load image
image = load_image(image_path, is_path=True)
analysis_results = {}
# 1. Object detection
object_model = get_model("yolov11n.pt")
with redirect_stdout_to_stderr(): # Add this context manager
object_results = object_model.predict(image, conf=confidence, save=save_results)
# Process object detection results
detected_objects = []
for result in object_results:
boxes = result.boxes
for i in range(len(boxes)):
box = boxes[i]
conf = float(box.conf[0])
class_id = int(box.cls[0])
class_name = result.names[class_id]
detected_objects.append({
"class_name": class_name,
"confidence": conf
})
analysis_results["objects"] = detected_objects
# 2. Scene classification
try:
cls_model = get_model("yolov8n-cls.pt")
with redirect_stdout_to_stderr(): # Add this context manager
cls_results = cls_model.predict(image, save=False)
scene_classifications = []
for result in cls_results:
if hasattr(result, 'probs') and result.probs is not None:
probs = result.probs
top_indices = probs.top5
top_probs = probs.top5conf.tolist()
top_classes = [result.names[idx] for idx in top_indices]
for idx, name, prob in zip(top_indices[:3], top_classes[:3], top_probs[:3]):
scene_classifications.append({
"class_name": name,
"probability": float(prob)
})
analysis_results["scene"] = scene_classifications
except Exception as e:
analysis_results["scene_error"] = str(e)
# 3. Human pose detection
try:
pose_model = get_model("yolov8n-pose.pt")
with redirect_stdout_to_stderr(): # Add this context manager
pose_results = pose_model.predict(image, conf=confidence, save=False)
detected_poses = []
for result in pose_results:
if hasattr(result, 'keypoints') and result.keypoints is not None:
boxes = result.boxes
keypoints = result.keypoints
for i in range(len(boxes)):
box = boxes[i]
conf = float(box.conf[0])
detected_poses.append({
"person_confidence": conf,
"has_keypoints": keypoints[i].data.shape[1] if keypoints else 0
})
analysis_results["poses"] = detected_poses
except Exception as e:
analysis_results["pose_error"] = str(e)
# Rest of the function remains the same...
# 4. Comprehensive task description
tasks = []
# Detect main objects
main_objects = [obj["class_name"] for obj in detected_objects if obj["confidence"] > 0.5]
if "person" in main_objects:
tasks.append("Person Detection")
# Check for weapon objects
weapon_objects = ["sword", "knife", "katana", "gun", "pistol", "rifle"]
weapons = [obj for obj in main_objects if any(weapon in obj.lower() for weapon in weapon_objects)]
if weapons:
tasks.append(f"Weapon Detection ({', '.join(weapons)})")
# Count people
person_count = main_objects.count("person")
if person_count > 0:
tasks.append(f"Person Count ({person_count} people)")
# Pose analysis
if "poses" in analysis_results and analysis_results["poses"]:
tasks.append("Human Pose Analysis")
# Scene classification
if "scene" in analysis_results and analysis_results["scene"]:
scene_types = [scene["class_name"] for scene in analysis_results["scene"][:2]]
tasks.append(f"Scene Classification ({', '.join(scene_types)})")
analysis_results["identified_tasks"] = tasks
# Return comprehensive results
return {
"status": "success",
"image_path": image_path,
"analysis": analysis_results,
"summary": "Tasks identified in the image: " + ", ".join(tasks) if tasks else "No clear tasks identified"
}
except Exception as e:
return {
"status": "error",
"image_path": image_path,
"error": f"Comprehensive analysis failed: {str(e)}"
}
@mcp.tool()
def analyze_image_from_path(
image_path: str,
model_name: str = "yolov8n.pt",
confidence: float = 0.25,
save_results: bool = False
) -> Dict[str, Any]:
"""
Analyze image from file path using YOLO
Args:
image_path: Path to the image file
model_name: YOLO model name
confidence: Detection confidence threshold
save_results: Whether to save results to disk
Returns:
Dictionary containing detection results
"""
try:
# Call detect_objects function with is_path=True
return detect_objects(
image_data=image_path,
model_name=model_name,
confidence=confidence,
save_results=save_results,
is_path=True
)
except Exception as e:
return {
"error": f"Failed to analyze image: {str(e)}",
"image_path": image_path
}
@mcp.tool()
def test_connection() -> Dict[str, Any]:
"""
Test if YOLO MCP service is running properly
Returns:
Status information and available tools
"""
return {
"status": "YOLO MCP service is running normally",
"available_models": list_available_models(),
"available_tools": [
"list_available_models", "detect_objects", "segment_objects",
"classify_image", "detect_poses", "detect_oriented_objects",
"track_objects", "train_model", "validate_model",
"export_model", "start_camera_detection", "stop_camera_detection",
"get_camera_detections", "test_connection",
# Additional tools
"analyze_image_from_path",
"comprehensive_image_analysis"
],
"new_features": [
"Support for loading images directly from file paths",
"Support for comprehensive image analysis with task identification",
"All detection functions support both file paths and base64 data"
]
}
# Modify the main execution section
if __name__ == "__main__":
logger.info("Starting YOLO MCP service")
# Start the camera watchdog thread
watchdog_thread = start_watchdog()
# Initialize and run server
mcp.run(transport='stdio')