"""
Betaflight MCP Server
An MCP server that provides tools for configuring Betaflight flight controllers
through Claude and other MCP-compatible AI assistants.
"""
from __future__ import annotations
import asyncio
import json
import sys
from contextlib import asynccontextmanager
from dataclasses import asdict
from typing import Any, Optional
import serial.tools.list_ports
from mcp.server import Server
from mcp.server.stdio import stdio_server
from mcp.types import TextContent, Tool
from .msp import (
MSPConnection,
MSPError,
PIDSettings,
FEATURE_FLAGS,
FLIGHT_MODES,
ModeRange,
FailsafeConfig,
PreflightResult,
)
# Global connection state
_connection: Optional[MSPConnection] = None
def get_connection() -> MSPConnection:
"""Get the current MSP connection or raise an error."""
if _connection is None or not _connection.is_connected():
raise MSPError(
"Not connected to flight controller. "
"Use 'connect_flight_controller' tool first."
)
return _connection
# PID validation ranges (Betaflight typical safe ranges)
PID_RANGES = {
'p': (0, 200), # Proportional: 0-200
'i': (0, 200), # Integral: 0-200
'd': (0, 200), # Derivative: 0-200
}
def validate_pid_value(name: str, value: int, component: str) -> None:
"""Validate a PID value is within safe range."""
min_val, max_val = PID_RANGES[component.lower()]
if not min_val <= value <= max_val:
raise ValueError(
f"{name} {component.upper()} value {value} is outside safe range "
f"[{min_val}-{max_val}]"
)
# Create the MCP server
app = Server("betaflight-mcp")
@app.list_tools()
async def list_tools() -> list[Tool]:
"""List all available Betaflight tools."""
return [
Tool(
name="list_serial_ports",
description="List available serial ports for connecting to flight controllers",
inputSchema={
"type": "object",
"properties": {},
"required": [],
},
),
Tool(
name="connect_flight_controller",
description=(
"Connect to a Betaflight flight controller via serial port. "
"Must be called before using other flight controller tools."
),
inputSchema={
"type": "object",
"properties": {
"port": {
"type": "string",
"description": (
"Serial port path (e.g., '/dev/tty.usbmodem0001' on Mac, "
"'/dev/ttyACM0' on Linux, 'COM3' on Windows)"
),
},
"baudrate": {
"type": "integer",
"description": "Serial baudrate (default: 115200)",
"default": 115200,
},
},
"required": ["port"],
},
),
Tool(
name="disconnect_flight_controller",
description="Disconnect from the flight controller",
inputSchema={
"type": "object",
"properties": {},
"required": [],
},
),
Tool(
name="get_flight_controller_info",
description=(
"Get flight controller information including firmware version, "
"board type, and craft name"
),
inputSchema={
"type": "object",
"properties": {},
"required": [],
},
),
Tool(
name="get_flight_controller_status",
description=(
"Get current flight controller status including arming state, "
"sensors, CPU load, and error counts"
),
inputSchema={
"type": "object",
"properties": {},
"required": [],
},
),
Tool(
name="get_pid_settings",
description="Get current PID controller settings for roll, pitch, and yaw",
inputSchema={
"type": "object",
"properties": {},
"required": [],
},
),
Tool(
name="set_pid_settings",
description=(
"Set PID controller settings. Only specified values will be changed. "
"Values are validated against safe ranges before applying. "
"Use save_settings tool after to persist changes."
),
inputSchema={
"type": "object",
"properties": {
"roll_p": {"type": "integer", "description": "Roll P gain (0-200)"},
"roll_i": {"type": "integer", "description": "Roll I gain (0-200)"},
"roll_d": {"type": "integer", "description": "Roll D gain (0-200)"},
"pitch_p": {"type": "integer", "description": "Pitch P gain (0-200)"},
"pitch_i": {"type": "integer", "description": "Pitch I gain (0-200)"},
"pitch_d": {"type": "integer", "description": "Pitch D gain (0-200)"},
"yaw_p": {"type": "integer", "description": "Yaw P gain (0-200)"},
"yaw_i": {"type": "integer", "description": "Yaw I gain (0-200)"},
"yaw_d": {"type": "integer", "description": "Yaw D gain (0-200)"},
},
"required": [],
},
),
Tool(
name="get_rc_tuning",
description="Get RC rates, expo, and throttle settings",
inputSchema={
"type": "object",
"properties": {},
"required": [],
},
),
Tool(
name="save_settings",
description=(
"Save current settings to EEPROM. Call this after making changes "
"to persist them across power cycles."
),
inputSchema={
"type": "object",
"properties": {},
"required": [],
},
),
Tool(
name="backup_configuration",
description=(
"Backup flight controller configuration. Returns CLI commands "
"that can be used to restore the configuration."
),
inputSchema={
"type": "object",
"properties": {
"mode": {
"type": "string",
"enum": ["diff", "dump"],
"description": (
"'diff' returns only changed settings (recommended), "
"'dump' returns all settings"
),
"default": "diff",
},
},
"required": [],
},
),
Tool(
name="send_cli_command",
description=(
"Send a raw CLI command to the flight controller. "
"Use with caution - some commands can affect flight safety. "
"The FC must not be armed."
),
inputSchema={
"type": "object",
"properties": {
"command": {
"type": "string",
"description": "CLI command to send (e.g., 'get pid', 'status')",
},
},
"required": ["command"],
},
),
# Sensor Data Tools
Tool(
name="get_attitude",
description="Get current aircraft attitude (roll, pitch, yaw angles in degrees)",
inputSchema={
"type": "object",
"properties": {},
"required": [],
},
),
Tool(
name="get_imu_data",
description="Get raw IMU sensor data (accelerometer, gyroscope, magnetometer)",
inputSchema={
"type": "object",
"properties": {},
"required": [],
},
),
Tool(
name="get_battery_status",
description="Get battery voltage, current draw, mAh consumed, and RSSI",
inputSchema={
"type": "object",
"properties": {},
"required": [],
},
),
Tool(
name="get_motor_values",
description="Get current motor output values (PWM/DSHOT values)",
inputSchema={
"type": "object",
"properties": {},
"required": [],
},
),
Tool(
name="get_rc_channels",
description="Get current RC channel values from the receiver",
inputSchema={
"type": "object",
"properties": {},
"required": [],
},
),
# Configuration Tools
Tool(
name="get_filter_config",
description="Get gyro and D-term filter settings (lowpass, notch filters)",
inputSchema={
"type": "object",
"properties": {},
"required": [],
},
),
Tool(
name="get_battery_config",
description="Get battery configuration (voltage limits, capacity, meter sources)",
inputSchema={
"type": "object",
"properties": {},
"required": [],
},
),
Tool(
name="get_motor_config",
description="Get motor configuration (throttle limits, protocol, poles)",
inputSchema={
"type": "object",
"properties": {},
"required": [],
},
),
Tool(
name="get_vtx_config",
description="Get VTX (video transmitter) settings (band, channel, power)",
inputSchema={
"type": "object",
"properties": {},
"required": [],
},
),
Tool(
name="get_osd_config",
description="Get OSD (on-screen display) configuration",
inputSchema={
"type": "object",
"properties": {},
"required": [],
},
),
Tool(
name="get_blackbox_config",
description="Get blackbox data logging configuration",
inputSchema={
"type": "object",
"properties": {},
"required": [],
},
),
Tool(
name="get_dataflash_summary",
description="Get onboard dataflash memory status (used/total space for blackbox)",
inputSchema={
"type": "object",
"properties": {},
"required": [],
},
),
Tool(
name="get_features",
description="Get list of enabled/disabled features (AIRMODE, OSD, GPS, etc.)",
inputSchema={
"type": "object",
"properties": {},
"required": [],
},
),
Tool(
name="set_feature",
description=(
"Enable or disable a feature. "
f"Available features: {', '.join(FEATURE_FLAGS.values())}"
),
inputSchema={
"type": "object",
"properties": {
"feature": {
"type": "string",
"description": "Feature name (e.g., 'AIRMODE', 'OSD', 'GPS')",
},
"enabled": {
"type": "boolean",
"description": "True to enable, False to disable",
},
},
"required": ["feature", "enabled"],
},
),
# Motor Testing Tools
Tool(
name="test_motor",
description=(
"DANGER: Spin a single motor for testing. REMOVE PROPS FIRST! "
"Only works when disarmed. Motor will spin at specified throttle."
),
inputSchema={
"type": "object",
"properties": {
"motor": {
"type": "integer",
"description": "Motor number (1-8)",
"minimum": 1,
"maximum": 8,
},
"throttle_percent": {
"type": "integer",
"description": "Throttle percentage (0-100). Start low (5-10%)!",
"minimum": 0,
"maximum": 100,
},
},
"required": ["motor", "throttle_percent"],
},
),
Tool(
name="stop_motors",
description="Stop all motors immediately",
inputSchema={
"type": "object",
"properties": {},
"required": [],
},
),
# Calibration Tools
Tool(
name="calibrate_accelerometer",
description=(
"Calibrate the accelerometer. The drone must be on a level surface "
"and completely still during calibration (~2 seconds)."
),
inputSchema={
"type": "object",
"properties": {},
"required": [],
},
),
Tool(
name="calibrate_magnetometer",
description=(
"Start magnetometer (compass) calibration. Rotate the drone through "
"all orientations (360 degrees on each axis) during calibration."
),
inputSchema={
"type": "object",
"properties": {},
"required": [],
},
),
# Utility Tools
Tool(
name="beep",
description="Make the flight controller beep (useful for finding lost drone)",
inputSchema={
"type": "object",
"properties": {},
"required": [],
},
),
Tool(
name="reboot_flight_controller",
description="Reboot the flight controller. Connection will be lost.",
inputSchema={
"type": "object",
"properties": {},
"required": [],
},
),
Tool(
name="erase_blackbox_logs",
description="Erase all blackbox logs from onboard flash memory",
inputSchema={
"type": "object",
"properties": {},
"required": [],
},
),
Tool(
name="set_craft_name",
description="Set the craft name (max 16 characters)",
inputSchema={
"type": "object",
"properties": {
"name": {
"type": "string",
"description": "New craft name (max 16 chars)",
"maxLength": 16,
},
},
"required": ["name"],
},
),
# ===== MODES & AUX CHANNELS =====
Tool(
name="get_available_modes",
description="List all available flight modes that can be assigned to aux channels",
inputSchema={
"type": "object",
"properties": {},
"required": [],
},
),
Tool(
name="get_aux_modes",
description="Get all aux channel to flight mode mappings (which switch activates which mode)",
inputSchema={
"type": "object",
"properties": {},
"required": [],
},
),
Tool(
name="set_aux_mode",
description=(
"Map a flight mode to an aux channel range. "
"Example: ARM on AUX1 when switch is HIGH (1800-2100)"
),
inputSchema={
"type": "object",
"properties": {
"mode": {
"type": "string",
"description": "Mode name (e.g., 'ARM', 'ANGLE', 'HORIZON', 'AIRMODE', 'BEEPERON')",
},
"aux_channel": {
"type": "integer",
"description": "Aux channel number (1-8)",
"minimum": 1,
"maximum": 8,
},
"range_start": {
"type": "integer",
"description": "Start of activation range (900-2100)",
"minimum": 900,
"maximum": 2100,
},
"range_end": {
"type": "integer",
"description": "End of activation range (900-2100)",
"minimum": 900,
"maximum": 2100,
},
},
"required": ["mode", "aux_channel", "range_start", "range_end"],
},
),
Tool(
name="clear_aux_mode",
description="Remove a mode mapping by its slot index",
inputSchema={
"type": "object",
"properties": {
"index": {
"type": "integer",
"description": "Slot index to clear (from get_aux_modes)",
},
},
"required": ["index"],
},
),
# ===== PROFILES =====
Tool(
name="get_current_profile",
description="Get current PID profile (1-3) and rate profile (1-6)",
inputSchema={
"type": "object",
"properties": {},
"required": [],
},
),
Tool(
name="set_pid_profile",
description="Switch to a different PID profile",
inputSchema={
"type": "object",
"properties": {
"profile": {
"type": "integer",
"description": "Profile number (1-3)",
"minimum": 1,
"maximum": 3,
},
},
"required": ["profile"],
},
),
Tool(
name="set_rate_profile",
description="Switch to a different rate profile",
inputSchema={
"type": "object",
"properties": {
"profile": {
"type": "integer",
"description": "Profile number (1-6)",
"minimum": 1,
"maximum": 6,
},
},
"required": ["profile"],
},
),
Tool(
name="copy_pid_profile",
description="Copy PID settings from one profile to another",
inputSchema={
"type": "object",
"properties": {
"source": {
"type": "integer",
"description": "Source profile number (1-3)",
"minimum": 1,
"maximum": 3,
},
"destination": {
"type": "integer",
"description": "Destination profile number (1-3)",
"minimum": 1,
"maximum": 3,
},
},
"required": ["source", "destination"],
},
),
# ===== FAILSAFE & SAFETY =====
Tool(
name="get_failsafe_config",
description="Get failsafe configuration (procedure, delays, throttle settings)",
inputSchema={
"type": "object",
"properties": {},
"required": [],
},
),
Tool(
name="set_failsafe_config",
description="Configure failsafe behavior",
inputSchema={
"type": "object",
"properties": {
"procedure": {
"type": "string",
"enum": ["AUTO_LAND", "DROP", "GPS_RESCUE"],
"description": "Failsafe procedure",
},
"delay": {
"type": "number",
"description": "Delay before failsafe activates (seconds)",
},
},
"required": [],
},
),
Tool(
name="get_arming_disable_flags",
description="Get detailed reasons why the drone cannot arm",
inputSchema={
"type": "object",
"properties": {},
"required": [],
},
),
Tool(
name="preflight_check",
description=(
"Run comprehensive preflight checks: battery, sensors, RC link, "
"arming status, failsafe, CPU load"
),
inputSchema={
"type": "object",
"properties": {},
"required": [],
},
),
# ===== VTX CONTROL =====
Tool(
name="set_vtx_channel",
description="Change VTX channel (1-8)",
inputSchema={
"type": "object",
"properties": {
"channel": {
"type": "integer",
"description": "Channel number (1-8)",
"minimum": 1,
"maximum": 8,
},
},
"required": ["channel"],
},
),
Tool(
name="set_vtx_band",
description="Change VTX band",
inputSchema={
"type": "object",
"properties": {
"band": {
"type": "string",
"enum": ["A", "B", "E", "F", "R"],
"description": "Band letter",
},
},
"required": ["band"],
},
),
Tool(
name="set_vtx_power",
description="Change VTX power level",
inputSchema={
"type": "object",
"properties": {
"power": {
"type": "integer",
"description": "Power level (0-4, varies by VTX)",
"minimum": 0,
"maximum": 4,
},
},
"required": ["power"],
},
),
Tool(
name="set_vtx_pit_mode",
description="Enable or disable VTX pit mode (low power when disarmed)",
inputSchema={
"type": "object",
"properties": {
"enabled": {
"type": "boolean",
"description": "True to enable pit mode",
},
},
"required": ["enabled"],
},
),
]
@app.call_tool()
async def call_tool(name: str, arguments: dict[str, Any]) -> list[TextContent]:
"""Handle tool calls."""
global _connection
try:
result = await _handle_tool(name, arguments)
return [TextContent(type="text", text=result)]
except MSPError as e:
return [TextContent(type="text", text=f"Error: {e}")]
except Exception as e:
return [TextContent(type="text", text=f"Unexpected error: {type(e).__name__}: {e}")]
async def _handle_tool(name: str, arguments: dict[str, Any]) -> str:
"""Process a tool call and return the result."""
global _connection
if name == "list_serial_ports":
ports = list(serial.tools.list_ports.comports())
if not ports:
return "No serial ports found."
result = "Available serial ports:\n"
for port in ports:
result += f"\n- {port.device}"
if port.description:
result += f"\n Description: {port.description}"
if port.manufacturer:
result += f"\n Manufacturer: {port.manufacturer}"
if port.vid and port.pid:
result += f"\n VID:PID: {port.vid:04X}:{port.pid:04X}"
# Add hint for Betaflight
result += "\n\nTip: Betaflight FCs typically show as 'STM32' or contain 'usbmodem' in the name."
return result
elif name == "connect_flight_controller":
port = arguments["port"]
baudrate = arguments.get("baudrate", 115200)
# Disconnect existing connection if any
if _connection is not None:
_connection.disconnect()
_connection = MSPConnection(port, baudrate)
_connection.connect()
# Verify connection by reading FC info
try:
info = _connection.get_fc_info()
return (
f"Connected to flight controller!\n\n"
f"Firmware: {info.variant} {info.version}\n"
f"API Version: {info.api_version}\n"
f"Board: {info.board_name}\n"
f"Target: {info.target_name}\n"
f"Craft Name: {info.craft_name or '(not set)'}"
)
except Exception as e:
_connection.disconnect()
_connection = None
raise MSPError(f"Connected but failed to read FC info: {e}")
elif name == "disconnect_flight_controller":
if _connection is None:
return "Not connected to any flight controller."
_connection.disconnect()
_connection = None
return "Disconnected from flight controller."
elif name == "get_flight_controller_info":
conn = get_connection()
info = conn.get_fc_info()
return json.dumps(asdict(info), indent=2)
elif name == "get_flight_controller_status":
conn = get_connection()
status = conn.get_status()
return json.dumps(asdict(status), indent=2)
elif name == "get_pid_settings":
conn = get_connection()
pids = conn.get_pid_settings()
return (
"Current PID Settings:\n\n"
" P I D\n"
f"Roll: {pids.roll_p:3d} {pids.roll_i:3d} {pids.roll_d:3d}\n"
f"Pitch: {pids.pitch_p:3d} {pids.pitch_i:3d} {pids.pitch_d:3d}\n"
f"Yaw: {pids.yaw_p:3d} {pids.yaw_i:3d} {pids.yaw_d:3d}"
)
elif name == "set_pid_settings":
conn = get_connection()
# Get current settings
current = conn.get_pid_settings()
# Build new settings with validation
new_pids = PIDSettings(
roll_p=arguments.get("roll_p", current.roll_p),
roll_i=arguments.get("roll_i", current.roll_i),
roll_d=arguments.get("roll_d", current.roll_d),
pitch_p=arguments.get("pitch_p", current.pitch_p),
pitch_i=arguments.get("pitch_i", current.pitch_i),
pitch_d=arguments.get("pitch_d", current.pitch_d),
yaw_p=arguments.get("yaw_p", current.yaw_p),
yaw_i=arguments.get("yaw_i", current.yaw_i),
yaw_d=arguments.get("yaw_d", current.yaw_d),
)
# Validate all values
validate_pid_value("Roll", new_pids.roll_p, "p")
validate_pid_value("Roll", new_pids.roll_i, "i")
validate_pid_value("Roll", new_pids.roll_d, "d")
validate_pid_value("Pitch", new_pids.pitch_p, "p")
validate_pid_value("Pitch", new_pids.pitch_i, "i")
validate_pid_value("Pitch", new_pids.pitch_d, "d")
validate_pid_value("Yaw", new_pids.yaw_p, "p")
validate_pid_value("Yaw", new_pids.yaw_i, "i")
validate_pid_value("Yaw", new_pids.yaw_d, "d")
# Apply settings
conn.set_pid_settings(new_pids)
# Show changes
changes = []
for axis in ['roll', 'pitch', 'yaw']:
for comp in ['p', 'i', 'd']:
old_val = getattr(current, f"{axis}_{comp}")
new_val = getattr(new_pids, f"{axis}_{comp}")
if old_val != new_val:
changes.append(f" {axis.capitalize()} {comp.upper()}: {old_val} -> {new_val}")
if changes:
return (
"PID settings updated:\n" +
"\n".join(changes) +
"\n\nNote: Use 'save_settings' to persist changes to EEPROM."
)
else:
return "No changes made (all values already match)."
elif name == "get_rc_tuning":
conn = get_connection()
tuning = conn.get_rc_tuning()
return json.dumps(asdict(tuning), indent=2)
elif name == "save_settings":
conn = get_connection()
conn.save_settings()
return "Settings saved to EEPROM successfully."
elif name == "backup_configuration":
conn = get_connection()
mode = arguments.get("mode", "diff")
if mode == "diff":
config = conn.get_diff()
header = "Configuration Diff (changes from defaults)"
else:
config = conn.get_dump()
header = "Full Configuration Dump"
return f"{header}:\n\n{config}"
elif name == "send_cli_command":
conn = get_connection()
command = arguments["command"]
# Basic safety checks
dangerous_commands = ['bl', 'dfu', 'exit']
cmd_lower = command.lower().split()[0] if command.split() else ""
if cmd_lower in dangerous_commands:
return (
f"Command '{cmd_lower}' is blocked for safety. "
"This command could put the FC in an unrecoverable state."
)
response = conn.send_cli_command(command)
return f"CLI Response:\n{response}"
# ----- Sensor Data Tools -----
elif name == "get_attitude":
conn = get_connection()
att = conn.get_attitude()
return (
f"Aircraft Attitude:\n"
f" Roll: {att.roll:+7.1f}°\n"
f" Pitch: {att.pitch:+7.1f}°\n"
f" Yaw: {att.yaw:+7.1f}° (heading)"
)
elif name == "get_imu_data":
conn = get_connection()
imu = conn.get_imu_data()
return (
f"IMU Sensor Data:\n\n"
f"Accelerometer:\n"
f" X: {imu.acc_x:6d} Y: {imu.acc_y:6d} Z: {imu.acc_z:6d}\n\n"
f"Gyroscope:\n"
f" X: {imu.gyro_x:6d} Y: {imu.gyro_y:6d} Z: {imu.gyro_z:6d}\n\n"
f"Magnetometer:\n"
f" X: {imu.mag_x:6d} Y: {imu.mag_y:6d} Z: {imu.mag_z:6d}"
)
elif name == "get_battery_status":
conn = get_connection()
analog = conn.get_analog_data()
return (
f"Battery Status:\n"
f" Voltage: {analog.voltage:.1f}V\n"
f" Current: {analog.amperage:.2f}A\n"
f" Consumed: {analog.mah_drawn} mAh\n"
f" RSSI: {analog.rssi}"
)
elif name == "get_motor_values":
conn = get_connection()
motors = conn.get_motor_values()
lines = ["Motor Output Values:"]
for i, val in enumerate(motors.motors, 1):
# Visual bar representation
bar_len = (val - 1000) // 50 # 0-20 chars for 1000-2000
bar = "█" * bar_len + "░" * (20 - bar_len)
lines.append(f" Motor {i}: {val:4d} [{bar}]")
return "\n".join(lines)
elif name == "get_rc_channels":
conn = get_connection()
rc = conn.get_rc_channels()
channel_names = ["Roll", "Pitch", "Yaw", "Throttle", "Aux1", "Aux2", "Aux3", "Aux4"]
lines = ["RC Channel Values:"]
for i, val in enumerate(rc.channels):
name = channel_names[i] if i < len(channel_names) else f"Ch{i+1}"
# Center at 1500, show deviation
deviation = val - 1500
lines.append(f" {name:8s}: {val:4d} ({deviation:+4d})")
return "\n".join(lines)
# ----- Configuration Tools -----
elif name == "get_filter_config":
conn = get_connection()
filt = conn.get_filter_config()
filter_types = {0: "PT1", 1: "BIQUAD", 2: "PT2", 3: "PT3"}
return (
f"Filter Configuration:\n\n"
f"Gyro Lowpass:\n"
f" Lowpass 1: {filt.gyro_lowpass_hz} Hz ({filter_types.get(filt.gyro_lowpass_type, 'Unknown')})\n"
f" Lowpass 2: {filt.gyro_lowpass2_hz} Hz ({filter_types.get(filt.gyro_lowpass2_type, 'Unknown')})\n\n"
f"Gyro Notch:\n"
f" Center: {filt.gyro_notch_hz} Hz, Cutoff: {filt.gyro_notch_cutoff} Hz\n\n"
f"D-Term Lowpass:\n"
f" Lowpass 1: {filt.dterm_lowpass_hz} Hz ({filter_types.get(filt.dterm_lowpass_type, 'Unknown')})\n"
f" Lowpass 2: {filt.dterm_lowpass2_hz} Hz ({filter_types.get(filt.dterm_lowpass2_type, 'Unknown')})\n\n"
f"D-Term Notch:\n"
f" Center: {filt.dterm_notch_hz} Hz, Cutoff: {filt.dterm_notch_cutoff} Hz"
)
elif name == "get_battery_config":
conn = get_connection()
batt = conn.get_battery_config()
meter_sources = {0: "None", 1: "ADC", 2: "ESC"}
return (
f"Battery Configuration:\n"
f" Min Cell Voltage: {batt.vbat_min_cell_voltage:.1f}V\n"
f" Max Cell Voltage: {batt.vbat_max_cell_voltage:.1f}V\n"
f" Warning Cell Voltage: {batt.vbat_warning_cell_voltage:.1f}V\n"
f" Capacity: {batt.capacity} mAh\n"
f" Voltage Meter: {meter_sources.get(batt.voltage_meter_source, 'Unknown')}\n"
f" Current Meter: {meter_sources.get(batt.current_meter_source, 'Unknown')}"
)
elif name == "get_motor_config":
conn = get_connection()
motor = conn.get_motor_config()
protocols = {
0: "PWM", 1: "ONESHOT125", 2: "ONESHOT42", 3: "MULTISHOT",
4: "BRUSHED", 5: "DSHOT150", 6: "DSHOT300", 7: "DSHOT600",
8: "DSHOT1200", 9: "PROSHOT1000"
}
return (
f"Motor Configuration:\n"
f" Min Throttle: {motor.min_throttle}\n"
f" Max Throttle: {motor.max_throttle}\n"
f" Min Command: {motor.min_command}\n"
f" Motor Poles: {motor.motor_poles}\n"
f" Protocol: {protocols.get(motor.motor_pwm_protocol, 'Unknown')}\n"
f" DSHOT Telemetry: {'Enabled' if motor.use_dshot_telemetry else 'Disabled'}"
)
elif name == "get_vtx_config":
conn = get_connection()
vtx = conn.get_vtx_config()
bands = {0: "N/A", 1: "A", 2: "B", 3: "E", 4: "F", 5: "R"}
vtx_types = {0: "None", 1: "RTC6705", 2: "SmartAudio", 3: "Tramp", 4: "Table"}
return (
f"VTX Configuration:\n"
f" Type: {vtx_types.get(vtx.vtx_type, 'Unknown')}\n"
f" Band: {bands.get(vtx.band, 'Unknown')}\n"
f" Channel: {vtx.channel}\n"
f" Power: {vtx.power}\n"
f" Frequency: {vtx.frequency} MHz\n"
f" Pit Mode: {'On' if vtx.pit_mode else 'Off'}\n"
f" Low Power Disarm: {'On' if vtx.low_power_disarm else 'Off'}"
)
elif name == "get_osd_config":
conn = get_connection()
osd = conn.get_osd_config()
video_systems = {0: "AUTO", 1: "PAL", 2: "NTSC", 3: "HD"}
units = {0: "Imperial", 1: "Metric"}
return (
f"OSD Configuration:\n"
f" Video System: {video_systems.get(osd.video_system, 'Unknown')}\n"
f" Units: {units.get(osd.units, 'Unknown')}\n"
f" RSSI Alarm: {osd.rssi_alarm}%\n"
f" Capacity Alarm: {osd.cap_alarm} mAh"
)
elif name == "get_blackbox_config":
conn = get_connection()
bb = conn.get_blackbox_config()
devices = {0: "None", 1: "Flash", 2: "SD Card", 3: "Serial"}
return (
f"Blackbox Configuration:\n"
f" Device: {devices.get(bb.device, 'Unknown')}\n"
f" Rate: 1/{bb.rate_denom} ({100/bb.rate_denom:.0f}%)\n"
f" P Ratio: {bb.p_ratio}"
)
elif name == "get_dataflash_summary":
conn = get_connection()
df = conn.get_dataflash_summary()
if not df.supported:
return "Dataflash not supported on this board."
used_pct = (df.used_size / df.total_size * 100) if df.total_size > 0 else 0
return (
f"Dataflash Summary:\n"
f" Status: {'Ready' if df.ready else 'Not Ready'}\n"
f" Sectors: {df.sectors}\n"
f" Total Size: {df.total_size / 1024:.0f} KB\n"
f" Used: {df.used_size / 1024:.0f} KB ({used_pct:.1f}%)\n"
f" Free: {(df.total_size - df.used_size) / 1024:.0f} KB"
)
elif name == "get_features":
conn = get_connection()
features = conn.get_features()
enabled = [name for name, on in features.features.items() if on]
disabled = [name for name, on in features.features.items() if not on]
return (
f"Enabled Features:\n " +
(", ".join(sorted(enabled)) if enabled else "(none)") +
f"\n\nDisabled Features:\n " +
(", ".join(sorted(disabled)) if disabled else "(none)")
)
elif name == "set_feature":
conn = get_connection()
feature = arguments["feature"]
enabled = arguments["enabled"]
conn.set_feature(feature, enabled)
action = "enabled" if enabled else "disabled"
return (
f"Feature '{feature}' {action}.\n\n"
"Note: Use 'save_settings' to persist changes to EEPROM."
)
# ----- Motor Testing Tools -----
elif name == "test_motor":
conn = get_connection()
motor_num = arguments["motor"]
throttle = arguments["throttle_percent"]
# Safety check - verify not armed
status = conn.get_status()
if status.armed:
return "ERROR: Cannot test motors while armed! Disarm first."
# Convert 1-8 to 0-7 index
motor_index = motor_num - 1
# Convert percentage to PWM value (1000-2000)
pwm_value = 1000 + (throttle * 10)
conn.set_motor(motor_index, pwm_value)
return (
f"Motor {motor_num} spinning at {throttle}% (PWM: {pwm_value})\n\n"
"WARNING: Motor is now spinning! Use 'stop_motors' to stop.\n"
"SAFETY: Ensure propellers are removed!"
)
elif name == "stop_motors":
conn = get_connection()
conn.stop_all_motors()
return "All motors stopped."
# ----- Calibration Tools -----
elif name == "calibrate_accelerometer":
conn = get_connection()
# Safety check
status = conn.get_status()
if status.armed:
return "ERROR: Cannot calibrate while armed! Disarm first."
conn.calibrate_accelerometer()
return (
"Accelerometer calibration complete!\n\n"
"The drone should now read level when on a flat surface.\n"
"Use 'save_settings' to persist the calibration."
)
elif name == "calibrate_magnetometer":
conn = get_connection()
# Safety check
status = conn.get_status()
if status.armed:
return "ERROR: Cannot calibrate while armed! Disarm first."
conn.calibrate_magnetometer()
return (
"Magnetometer calibration started!\n\n"
"Rotate the drone through all orientations (360° on each axis).\n"
"Calibration will complete automatically after ~30 seconds."
)
# ----- Utility Tools -----
elif name == "beep":
conn = get_connection()
conn.beep()
return "Beep! (If you didn't hear it, check the buzzer connection)"
elif name == "reboot_flight_controller":
conn = get_connection()
conn.reboot()
_connection = None
return (
"Flight controller is rebooting...\n\n"
"Connection has been closed. Use 'connect_flight_controller' "
"to reconnect after the FC finishes booting (~3-5 seconds)."
)
elif name == "erase_blackbox_logs":
conn = get_connection()
conn.erase_dataflash()
return (
"Blackbox logs erased!\n\n"
"The onboard flash memory has been cleared."
)
elif name == "set_craft_name":
conn = get_connection()
new_name = arguments["name"]
conn.set_name(new_name)
return (
f"Craft name set to: {new_name}\n\n"
"Note: Use 'save_settings' to persist changes to EEPROM."
)
# ===== MODES & AUX CHANNELS =====
elif name == "get_available_modes":
modes = [(id, name) for id, name in FLIGHT_MODES.items()]
lines = ["Available Flight Modes:\n"]
for mode_id, mode_name in sorted(modes):
lines.append(f" {mode_id:2d}: {mode_name}")
lines.append("\nCommon modes: ARM, ANGLE, HORIZON, AIRMODE, BEEPERON, FLIP_OVER_AFTER_CRASH")
return "\n".join(lines)
elif name == "get_aux_modes":
conn = get_connection()
ranges = conn.get_mode_ranges()
if not ranges:
return "No aux mode mappings configured."
lines = ["Aux Channel Mode Mappings:\n"]
lines.append("Slot Mode AUX Range")
lines.append("-" * 40)
for r in ranges:
mode_name = FLIGHT_MODES.get(r.mode_id, f"Unknown({r.mode_id})")
lines.append(
f"{r.index:3d} {mode_name:15s} AUX{r.aux_channel+1} {r.range_start}-{r.range_end}"
)
return "\n".join(lines)
elif name == "set_aux_mode":
conn = get_connection()
mode_name = arguments["mode"].upper()
aux_channel = arguments["aux_channel"] - 1 # Convert to 0-indexed
range_start = arguments["range_start"]
range_end = arguments["range_end"]
# Find mode ID
mode_id = None
for mid, mname in FLIGHT_MODES.items():
if mname == mode_name:
mode_id = mid
break
if mode_id is None:
return f"Unknown mode: {mode_name}. Use 'get_available_modes' to see valid modes."
# Find empty slot or slot with same mode
ranges = conn.get_mode_ranges()
slot_index = None
# First check if this mode already has a mapping
for r in ranges:
if r.mode_id == mode_id:
slot_index = r.index
break
# Otherwise find first empty slot
if slot_index is None:
used_slots = {r.index for r in ranges}
for i in range(20):
if i not in used_slots:
slot_index = i
break
if slot_index is None:
return "No empty mode slots available (max 20 mappings)."
conn.set_mode_range(slot_index, mode_id, aux_channel, range_start, range_end)
return (
f"Mode '{mode_name}' mapped to AUX{aux_channel+1} range {range_start}-{range_end}\n\n"
"Note: Use 'save_settings' to persist changes."
)
elif name == "clear_aux_mode":
conn = get_connection()
index = arguments["index"]
conn.clear_mode_range(index)
return f"Mode mapping in slot {index} cleared.\n\nUse 'save_settings' to persist."
# ===== PROFILES =====
elif name == "get_current_profile":
conn = get_connection()
pid_profile, rate_profile = conn.get_current_profile()
return (
f"Current Profiles:\n"
f" PID Profile: {pid_profile + 1} (of 3)\n"
f" Rate Profile: {rate_profile + 1} (of 6)"
)
elif name == "set_pid_profile":
conn = get_connection()
profile = arguments["profile"] - 1 # Convert to 0-indexed
conn.set_pid_profile(profile)
return (
f"Switched to PID profile {profile + 1}.\n\n"
"Note: Profile change is temporary. Use 'save_settings' to make it default."
)
elif name == "set_rate_profile":
conn = get_connection()
profile = arguments["profile"] - 1 # Convert to 0-indexed
conn.set_rate_profile(profile)
return (
f"Switched to rate profile {profile + 1}.\n\n"
"Note: Profile change is temporary. Use 'save_settings' to make it default."
)
elif name == "copy_pid_profile":
conn = get_connection()
source = arguments["source"] - 1 # Convert to 0-indexed
destination = arguments["destination"] - 1
conn.copy_profile(source, destination)
return (
f"Copied PID settings from profile {source + 1} to profile {destination + 1}.\n\n"
"Use 'save_settings' to persist."
)
# ===== FAILSAFE & SAFETY =====
elif name == "get_failsafe_config":
conn = get_connection()
fs = conn.get_failsafe_config()
procedures = {0: "AUTO_LAND", 1: "DROP", 2: "GPS_RESCUE"}
return (
f"Failsafe Configuration:\n"
f" Procedure: {procedures.get(fs.procedure, 'Unknown')}\n"
f" Delay: {fs.delay * 0.1:.1f} seconds\n"
f" Motor Off Delay: {fs.off_delay * 0.1:.1f} seconds\n"
f" Throttle: {fs.throttle}\n"
f" Switch Mode: {'Stage 2' if fs.switch_mode else 'Stage 1'}"
)
elif name == "set_failsafe_config":
conn = get_connection()
procedure = arguments.get("procedure")
delay = arguments.get("delay")
proc_map = {"AUTO_LAND": 0, "DROP": 1, "GPS_RESCUE": 2}
proc_int = proc_map.get(procedure) if procedure else None
# Convert delay from seconds to 0.1s units
delay_int = int(delay * 10) if delay else None
conn.set_failsafe_config(
procedure=proc_int,
delay=delay_int,
)
return (
"Failsafe configuration updated.\n\n"
"Use 'save_settings' to persist changes."
)
elif name == "get_arming_disable_flags":
conn = get_connection()
flags = conn.get_arming_disable_flags()
if not flags:
return "No arming disable flags - drone is ready to arm!"
lines = ["Arming is blocked by:\n"]
for flag in flags:
lines.append(f" - {flag}")
# Add helpful hints
hints = {
"RXLOSS": "Radio receiver not connected or no signal",
"FAILSAFE": "Failsafe is active",
"BOXFAILSAFE": "Failsafe switch is on",
"THROTTLE": "Throttle is not at minimum",
"ANGLE": "Drone is not level",
"BOOT_GRACE_TIME": "Wait a few seconds after boot",
"NOPREARM": "PREARM switch not activated",
"LOAD": "CPU load too high",
"CALIB": "Calibration in progress",
"CLI": "Currently in CLI mode",
"MSP": "MSP connection preventing arm",
"PARALYZE": "Paralyze mode active",
"RPMFILTER": "RPM filter not ready",
}
lines.append("\nHints:")
for flag in flags:
if flag in hints:
lines.append(f" {flag}: {hints[flag]}")
return "\n".join(lines)
elif name == "preflight_check":
conn = get_connection()
result = conn.run_preflight_check()
lines = ["PREFLIGHT CHECK\n" + "=" * 40 + "\n"]
for check in result.checks:
status = "✓" if check.passed else "✗"
lines.append(f"{status} {check.name}: {check.message}")
lines.append("\n" + "=" * 40)
if result.all_passed:
lines.append("RESULT: ALL CHECKS PASSED - Ready to fly!")
else:
lines.append("RESULT: SOME CHECKS FAILED - Review issues above")
return "\n".join(lines)
# ===== VTX CONTROL =====
elif name == "set_vtx_channel":
conn = get_connection()
channel = arguments["channel"]
# Get current band
vtx = conn.get_vtx_config()
conn.set_vtx_band_channel(vtx.band, channel)
return (
f"VTX channel set to {channel}.\n\n"
"Use 'save_settings' to persist."
)
elif name == "set_vtx_band":
conn = get_connection()
band_letter = arguments["band"].upper()
band_map = {"A": 1, "B": 2, "E": 3, "F": 4, "R": 5}
band = band_map.get(band_letter, 1)
# Get current channel
vtx = conn.get_vtx_config()
conn.set_vtx_band_channel(band, vtx.channel)
return (
f"VTX band set to {band_letter}.\n\n"
"Use 'save_settings' to persist."
)
elif name == "set_vtx_power":
conn = get_connection()
power = arguments["power"]
conn.set_vtx_power(power)
return (
f"VTX power set to level {power}.\n\n"
"Use 'save_settings' to persist."
)
elif name == "set_vtx_pit_mode":
conn = get_connection()
enabled = arguments["enabled"]
conn.set_vtx_pit_mode(enabled)
status = "enabled" if enabled else "disabled"
return (
f"VTX pit mode {status}.\n\n"
"Use 'save_settings' to persist."
)
else:
return f"Unknown tool: {name}"
def main():
"""Main entry point for the MCP server."""
async def run():
async with stdio_server() as (read_stream, write_stream):
await app.run(
read_stream,
write_stream,
app.create_initialization_options(),
)
asyncio.run(run())
if __name__ == "__main__":
main()