Skip to main content
Glama
server.py51.2 kB
""" Betaflight MCP Server An MCP server that provides tools for configuring Betaflight flight controllers through Claude and other MCP-compatible AI assistants. """ from __future__ import annotations import asyncio import json import sys from contextlib import asynccontextmanager from dataclasses import asdict from typing import Any, Optional import serial.tools.list_ports from mcp.server import Server from mcp.server.stdio import stdio_server from mcp.types import TextContent, Tool from .msp import ( MSPConnection, MSPError, PIDSettings, FEATURE_FLAGS, FLIGHT_MODES, ModeRange, FailsafeConfig, PreflightResult, ) # Global connection state _connection: Optional[MSPConnection] = None def get_connection() -> MSPConnection: """Get the current MSP connection or raise an error.""" if _connection is None or not _connection.is_connected(): raise MSPError( "Not connected to flight controller. " "Use 'connect_flight_controller' tool first." ) return _connection # PID validation ranges (Betaflight typical safe ranges) PID_RANGES = { 'p': (0, 200), # Proportional: 0-200 'i': (0, 200), # Integral: 0-200 'd': (0, 200), # Derivative: 0-200 } def validate_pid_value(name: str, value: int, component: str) -> None: """Validate a PID value is within safe range.""" min_val, max_val = PID_RANGES[component.lower()] if not min_val <= value <= max_val: raise ValueError( f"{name} {component.upper()} value {value} is outside safe range " f"[{min_val}-{max_val}]" ) # Create the MCP server app = Server("betaflight-mcp") @app.list_tools() async def list_tools() -> list[Tool]: """List all available Betaflight tools.""" return [ Tool( name="list_serial_ports", description="List available serial ports for connecting to flight controllers", inputSchema={ "type": "object", "properties": {}, "required": [], }, ), Tool( name="connect_flight_controller", description=( "Connect to a Betaflight flight controller via serial port. " "Must be called before using other flight controller tools." ), inputSchema={ "type": "object", "properties": { "port": { "type": "string", "description": ( "Serial port path (e.g., '/dev/tty.usbmodem0001' on Mac, " "'/dev/ttyACM0' on Linux, 'COM3' on Windows)" ), }, "baudrate": { "type": "integer", "description": "Serial baudrate (default: 115200)", "default": 115200, }, }, "required": ["port"], }, ), Tool( name="disconnect_flight_controller", description="Disconnect from the flight controller", inputSchema={ "type": "object", "properties": {}, "required": [], }, ), Tool( name="get_flight_controller_info", description=( "Get flight controller information including firmware version, " "board type, and craft name" ), inputSchema={ "type": "object", "properties": {}, "required": [], }, ), Tool( name="get_flight_controller_status", description=( "Get current flight controller status including arming state, " "sensors, CPU load, and error counts" ), inputSchema={ "type": "object", "properties": {}, "required": [], }, ), Tool( name="get_pid_settings", description="Get current PID controller settings for roll, pitch, and yaw", inputSchema={ "type": "object", "properties": {}, "required": [], }, ), Tool( name="set_pid_settings", description=( "Set PID controller settings. Only specified values will be changed. " "Values are validated against safe ranges before applying. " "Use save_settings tool after to persist changes." ), inputSchema={ "type": "object", "properties": { "roll_p": {"type": "integer", "description": "Roll P gain (0-200)"}, "roll_i": {"type": "integer", "description": "Roll I gain (0-200)"}, "roll_d": {"type": "integer", "description": "Roll D gain (0-200)"}, "pitch_p": {"type": "integer", "description": "Pitch P gain (0-200)"}, "pitch_i": {"type": "integer", "description": "Pitch I gain (0-200)"}, "pitch_d": {"type": "integer", "description": "Pitch D gain (0-200)"}, "yaw_p": {"type": "integer", "description": "Yaw P gain (0-200)"}, "yaw_i": {"type": "integer", "description": "Yaw I gain (0-200)"}, "yaw_d": {"type": "integer", "description": "Yaw D gain (0-200)"}, }, "required": [], }, ), Tool( name="get_rc_tuning", description="Get RC rates, expo, and throttle settings", inputSchema={ "type": "object", "properties": {}, "required": [], }, ), Tool( name="save_settings", description=( "Save current settings to EEPROM. Call this after making changes " "to persist them across power cycles." ), inputSchema={ "type": "object", "properties": {}, "required": [], }, ), Tool( name="backup_configuration", description=( "Backup flight controller configuration. Returns CLI commands " "that can be used to restore the configuration." ), inputSchema={ "type": "object", "properties": { "mode": { "type": "string", "enum": ["diff", "dump"], "description": ( "'diff' returns only changed settings (recommended), " "'dump' returns all settings" ), "default": "diff", }, }, "required": [], }, ), Tool( name="send_cli_command", description=( "Send a raw CLI command to the flight controller. " "Use with caution - some commands can affect flight safety. " "The FC must not be armed." ), inputSchema={ "type": "object", "properties": { "command": { "type": "string", "description": "CLI command to send (e.g., 'get pid', 'status')", }, }, "required": ["command"], }, ), # Sensor Data Tools Tool( name="get_attitude", description="Get current aircraft attitude (roll, pitch, yaw angles in degrees)", inputSchema={ "type": "object", "properties": {}, "required": [], }, ), Tool( name="get_imu_data", description="Get raw IMU sensor data (accelerometer, gyroscope, magnetometer)", inputSchema={ "type": "object", "properties": {}, "required": [], }, ), Tool( name="get_battery_status", description="Get battery voltage, current draw, mAh consumed, and RSSI", inputSchema={ "type": "object", "properties": {}, "required": [], }, ), Tool( name="get_motor_values", description="Get current motor output values (PWM/DSHOT values)", inputSchema={ "type": "object", "properties": {}, "required": [], }, ), Tool( name="get_rc_channels", description="Get current RC channel values from the receiver", inputSchema={ "type": "object", "properties": {}, "required": [], }, ), # Configuration Tools Tool( name="get_filter_config", description="Get gyro and D-term filter settings (lowpass, notch filters)", inputSchema={ "type": "object", "properties": {}, "required": [], }, ), Tool( name="get_battery_config", description="Get battery configuration (voltage limits, capacity, meter sources)", inputSchema={ "type": "object", "properties": {}, "required": [], }, ), Tool( name="get_motor_config", description="Get motor configuration (throttle limits, protocol, poles)", inputSchema={ "type": "object", "properties": {}, "required": [], }, ), Tool( name="get_vtx_config", description="Get VTX (video transmitter) settings (band, channel, power)", inputSchema={ "type": "object", "properties": {}, "required": [], }, ), Tool( name="get_osd_config", description="Get OSD (on-screen display) configuration", inputSchema={ "type": "object", "properties": {}, "required": [], }, ), Tool( name="get_blackbox_config", description="Get blackbox data logging configuration", inputSchema={ "type": "object", "properties": {}, "required": [], }, ), Tool( name="get_dataflash_summary", description="Get onboard dataflash memory status (used/total space for blackbox)", inputSchema={ "type": "object", "properties": {}, "required": [], }, ), Tool( name="get_features", description="Get list of enabled/disabled features (AIRMODE, OSD, GPS, etc.)", inputSchema={ "type": "object", "properties": {}, "required": [], }, ), Tool( name="set_feature", description=( "Enable or disable a feature. " f"Available features: {', '.join(FEATURE_FLAGS.values())}" ), inputSchema={ "type": "object", "properties": { "feature": { "type": "string", "description": "Feature name (e.g., 'AIRMODE', 'OSD', 'GPS')", }, "enabled": { "type": "boolean", "description": "True to enable, False to disable", }, }, "required": ["feature", "enabled"], }, ), # Motor Testing Tools Tool( name="test_motor", description=( "DANGER: Spin a single motor for testing. REMOVE PROPS FIRST! " "Only works when disarmed. Motor will spin at specified throttle." ), inputSchema={ "type": "object", "properties": { "motor": { "type": "integer", "description": "Motor number (1-8)", "minimum": 1, "maximum": 8, }, "throttle_percent": { "type": "integer", "description": "Throttle percentage (0-100). Start low (5-10%)!", "minimum": 0, "maximum": 100, }, }, "required": ["motor", "throttle_percent"], }, ), Tool( name="stop_motors", description="Stop all motors immediately", inputSchema={ "type": "object", "properties": {}, "required": [], }, ), # Calibration Tools Tool( name="calibrate_accelerometer", description=( "Calibrate the accelerometer. The drone must be on a level surface " "and completely still during calibration (~2 seconds)." ), inputSchema={ "type": "object", "properties": {}, "required": [], }, ), Tool( name="calibrate_magnetometer", description=( "Start magnetometer (compass) calibration. Rotate the drone through " "all orientations (360 degrees on each axis) during calibration." ), inputSchema={ "type": "object", "properties": {}, "required": [], }, ), # Utility Tools Tool( name="beep", description="Make the flight controller beep (useful for finding lost drone)", inputSchema={ "type": "object", "properties": {}, "required": [], }, ), Tool( name="reboot_flight_controller", description="Reboot the flight controller. Connection will be lost.", inputSchema={ "type": "object", "properties": {}, "required": [], }, ), Tool( name="erase_blackbox_logs", description="Erase all blackbox logs from onboard flash memory", inputSchema={ "type": "object", "properties": {}, "required": [], }, ), Tool( name="set_craft_name", description="Set the craft name (max 16 characters)", inputSchema={ "type": "object", "properties": { "name": { "type": "string", "description": "New craft name (max 16 chars)", "maxLength": 16, }, }, "required": ["name"], }, ), # ===== MODES & AUX CHANNELS ===== Tool( name="get_available_modes", description="List all available flight modes that can be assigned to aux channels", inputSchema={ "type": "object", "properties": {}, "required": [], }, ), Tool( name="get_aux_modes", description="Get all aux channel to flight mode mappings (which switch activates which mode)", inputSchema={ "type": "object", "properties": {}, "required": [], }, ), Tool( name="set_aux_mode", description=( "Map a flight mode to an aux channel range. " "Example: ARM on AUX1 when switch is HIGH (1800-2100)" ), inputSchema={ "type": "object", "properties": { "mode": { "type": "string", "description": "Mode name (e.g., 'ARM', 'ANGLE', 'HORIZON', 'AIRMODE', 'BEEPERON')", }, "aux_channel": { "type": "integer", "description": "Aux channel number (1-8)", "minimum": 1, "maximum": 8, }, "range_start": { "type": "integer", "description": "Start of activation range (900-2100)", "minimum": 900, "maximum": 2100, }, "range_end": { "type": "integer", "description": "End of activation range (900-2100)", "minimum": 900, "maximum": 2100, }, }, "required": ["mode", "aux_channel", "range_start", "range_end"], }, ), Tool( name="clear_aux_mode", description="Remove a mode mapping by its slot index", inputSchema={ "type": "object", "properties": { "index": { "type": "integer", "description": "Slot index to clear (from get_aux_modes)", }, }, "required": ["index"], }, ), # ===== PROFILES ===== Tool( name="get_current_profile", description="Get current PID profile (1-3) and rate profile (1-6)", inputSchema={ "type": "object", "properties": {}, "required": [], }, ), Tool( name="set_pid_profile", description="Switch to a different PID profile", inputSchema={ "type": "object", "properties": { "profile": { "type": "integer", "description": "Profile number (1-3)", "minimum": 1, "maximum": 3, }, }, "required": ["profile"], }, ), Tool( name="set_rate_profile", description="Switch to a different rate profile", inputSchema={ "type": "object", "properties": { "profile": { "type": "integer", "description": "Profile number (1-6)", "minimum": 1, "maximum": 6, }, }, "required": ["profile"], }, ), Tool( name="copy_pid_profile", description="Copy PID settings from one profile to another", inputSchema={ "type": "object", "properties": { "source": { "type": "integer", "description": "Source profile number (1-3)", "minimum": 1, "maximum": 3, }, "destination": { "type": "integer", "description": "Destination profile number (1-3)", "minimum": 1, "maximum": 3, }, }, "required": ["source", "destination"], }, ), # ===== FAILSAFE & SAFETY ===== Tool( name="get_failsafe_config", description="Get failsafe configuration (procedure, delays, throttle settings)", inputSchema={ "type": "object", "properties": {}, "required": [], }, ), Tool( name="set_failsafe_config", description="Configure failsafe behavior", inputSchema={ "type": "object", "properties": { "procedure": { "type": "string", "enum": ["AUTO_LAND", "DROP", "GPS_RESCUE"], "description": "Failsafe procedure", }, "delay": { "type": "number", "description": "Delay before failsafe activates (seconds)", }, }, "required": [], }, ), Tool( name="get_arming_disable_flags", description="Get detailed reasons why the drone cannot arm", inputSchema={ "type": "object", "properties": {}, "required": [], }, ), Tool( name="preflight_check", description=( "Run comprehensive preflight checks: battery, sensors, RC link, " "arming status, failsafe, CPU load" ), inputSchema={ "type": "object", "properties": {}, "required": [], }, ), # ===== VTX CONTROL ===== Tool( name="set_vtx_channel", description="Change VTX channel (1-8)", inputSchema={ "type": "object", "properties": { "channel": { "type": "integer", "description": "Channel number (1-8)", "minimum": 1, "maximum": 8, }, }, "required": ["channel"], }, ), Tool( name="set_vtx_band", description="Change VTX band", inputSchema={ "type": "object", "properties": { "band": { "type": "string", "enum": ["A", "B", "E", "F", "R"], "description": "Band letter", }, }, "required": ["band"], }, ), Tool( name="set_vtx_power", description="Change VTX power level", inputSchema={ "type": "object", "properties": { "power": { "type": "integer", "description": "Power level (0-4, varies by VTX)", "minimum": 0, "maximum": 4, }, }, "required": ["power"], }, ), Tool( name="set_vtx_pit_mode", description="Enable or disable VTX pit mode (low power when disarmed)", inputSchema={ "type": "object", "properties": { "enabled": { "type": "boolean", "description": "True to enable pit mode", }, }, "required": ["enabled"], }, ), ] @app.call_tool() async def call_tool(name: str, arguments: dict[str, Any]) -> list[TextContent]: """Handle tool calls.""" global _connection try: result = await _handle_tool(name, arguments) return [TextContent(type="text", text=result)] except MSPError as e: return [TextContent(type="text", text=f"Error: {e}")] except Exception as e: return [TextContent(type="text", text=f"Unexpected error: {type(e).__name__}: {e}")] async def _handle_tool(name: str, arguments: dict[str, Any]) -> str: """Process a tool call and return the result.""" global _connection if name == "list_serial_ports": ports = list(serial.tools.list_ports.comports()) if not ports: return "No serial ports found." result = "Available serial ports:\n" for port in ports: result += f"\n- {port.device}" if port.description: result += f"\n Description: {port.description}" if port.manufacturer: result += f"\n Manufacturer: {port.manufacturer}" if port.vid and port.pid: result += f"\n VID:PID: {port.vid:04X}:{port.pid:04X}" # Add hint for Betaflight result += "\n\nTip: Betaflight FCs typically show as 'STM32' or contain 'usbmodem' in the name." return result elif name == "connect_flight_controller": port = arguments["port"] baudrate = arguments.get("baudrate", 115200) # Disconnect existing connection if any if _connection is not None: _connection.disconnect() _connection = MSPConnection(port, baudrate) _connection.connect() # Verify connection by reading FC info try: info = _connection.get_fc_info() return ( f"Connected to flight controller!\n\n" f"Firmware: {info.variant} {info.version}\n" f"API Version: {info.api_version}\n" f"Board: {info.board_name}\n" f"Target: {info.target_name}\n" f"Craft Name: {info.craft_name or '(not set)'}" ) except Exception as e: _connection.disconnect() _connection = None raise MSPError(f"Connected but failed to read FC info: {e}") elif name == "disconnect_flight_controller": if _connection is None: return "Not connected to any flight controller." _connection.disconnect() _connection = None return "Disconnected from flight controller." elif name == "get_flight_controller_info": conn = get_connection() info = conn.get_fc_info() return json.dumps(asdict(info), indent=2) elif name == "get_flight_controller_status": conn = get_connection() status = conn.get_status() return json.dumps(asdict(status), indent=2) elif name == "get_pid_settings": conn = get_connection() pids = conn.get_pid_settings() return ( "Current PID Settings:\n\n" " P I D\n" f"Roll: {pids.roll_p:3d} {pids.roll_i:3d} {pids.roll_d:3d}\n" f"Pitch: {pids.pitch_p:3d} {pids.pitch_i:3d} {pids.pitch_d:3d}\n" f"Yaw: {pids.yaw_p:3d} {pids.yaw_i:3d} {pids.yaw_d:3d}" ) elif name == "set_pid_settings": conn = get_connection() # Get current settings current = conn.get_pid_settings() # Build new settings with validation new_pids = PIDSettings( roll_p=arguments.get("roll_p", current.roll_p), roll_i=arguments.get("roll_i", current.roll_i), roll_d=arguments.get("roll_d", current.roll_d), pitch_p=arguments.get("pitch_p", current.pitch_p), pitch_i=arguments.get("pitch_i", current.pitch_i), pitch_d=arguments.get("pitch_d", current.pitch_d), yaw_p=arguments.get("yaw_p", current.yaw_p), yaw_i=arguments.get("yaw_i", current.yaw_i), yaw_d=arguments.get("yaw_d", current.yaw_d), ) # Validate all values validate_pid_value("Roll", new_pids.roll_p, "p") validate_pid_value("Roll", new_pids.roll_i, "i") validate_pid_value("Roll", new_pids.roll_d, "d") validate_pid_value("Pitch", new_pids.pitch_p, "p") validate_pid_value("Pitch", new_pids.pitch_i, "i") validate_pid_value("Pitch", new_pids.pitch_d, "d") validate_pid_value("Yaw", new_pids.yaw_p, "p") validate_pid_value("Yaw", new_pids.yaw_i, "i") validate_pid_value("Yaw", new_pids.yaw_d, "d") # Apply settings conn.set_pid_settings(new_pids) # Show changes changes = [] for axis in ['roll', 'pitch', 'yaw']: for comp in ['p', 'i', 'd']: old_val = getattr(current, f"{axis}_{comp}") new_val = getattr(new_pids, f"{axis}_{comp}") if old_val != new_val: changes.append(f" {axis.capitalize()} {comp.upper()}: {old_val} -> {new_val}") if changes: return ( "PID settings updated:\n" + "\n".join(changes) + "\n\nNote: Use 'save_settings' to persist changes to EEPROM." ) else: return "No changes made (all values already match)." elif name == "get_rc_tuning": conn = get_connection() tuning = conn.get_rc_tuning() return json.dumps(asdict(tuning), indent=2) elif name == "save_settings": conn = get_connection() conn.save_settings() return "Settings saved to EEPROM successfully." elif name == "backup_configuration": conn = get_connection() mode = arguments.get("mode", "diff") if mode == "diff": config = conn.get_diff() header = "Configuration Diff (changes from defaults)" else: config = conn.get_dump() header = "Full Configuration Dump" return f"{header}:\n\n{config}" elif name == "send_cli_command": conn = get_connection() command = arguments["command"] # Basic safety checks dangerous_commands = ['bl', 'dfu', 'exit'] cmd_lower = command.lower().split()[0] if command.split() else "" if cmd_lower in dangerous_commands: return ( f"Command '{cmd_lower}' is blocked for safety. " "This command could put the FC in an unrecoverable state." ) response = conn.send_cli_command(command) return f"CLI Response:\n{response}" # ----- Sensor Data Tools ----- elif name == "get_attitude": conn = get_connection() att = conn.get_attitude() return ( f"Aircraft Attitude:\n" f" Roll: {att.roll:+7.1f}°\n" f" Pitch: {att.pitch:+7.1f}°\n" f" Yaw: {att.yaw:+7.1f}° (heading)" ) elif name == "get_imu_data": conn = get_connection() imu = conn.get_imu_data() return ( f"IMU Sensor Data:\n\n" f"Accelerometer:\n" f" X: {imu.acc_x:6d} Y: {imu.acc_y:6d} Z: {imu.acc_z:6d}\n\n" f"Gyroscope:\n" f" X: {imu.gyro_x:6d} Y: {imu.gyro_y:6d} Z: {imu.gyro_z:6d}\n\n" f"Magnetometer:\n" f" X: {imu.mag_x:6d} Y: {imu.mag_y:6d} Z: {imu.mag_z:6d}" ) elif name == "get_battery_status": conn = get_connection() analog = conn.get_analog_data() return ( f"Battery Status:\n" f" Voltage: {analog.voltage:.1f}V\n" f" Current: {analog.amperage:.2f}A\n" f" Consumed: {analog.mah_drawn} mAh\n" f" RSSI: {analog.rssi}" ) elif name == "get_motor_values": conn = get_connection() motors = conn.get_motor_values() lines = ["Motor Output Values:"] for i, val in enumerate(motors.motors, 1): # Visual bar representation bar_len = (val - 1000) // 50 # 0-20 chars for 1000-2000 bar = "█" * bar_len + "░" * (20 - bar_len) lines.append(f" Motor {i}: {val:4d} [{bar}]") return "\n".join(lines) elif name == "get_rc_channels": conn = get_connection() rc = conn.get_rc_channels() channel_names = ["Roll", "Pitch", "Yaw", "Throttle", "Aux1", "Aux2", "Aux3", "Aux4"] lines = ["RC Channel Values:"] for i, val in enumerate(rc.channels): name = channel_names[i] if i < len(channel_names) else f"Ch{i+1}" # Center at 1500, show deviation deviation = val - 1500 lines.append(f" {name:8s}: {val:4d} ({deviation:+4d})") return "\n".join(lines) # ----- Configuration Tools ----- elif name == "get_filter_config": conn = get_connection() filt = conn.get_filter_config() filter_types = {0: "PT1", 1: "BIQUAD", 2: "PT2", 3: "PT3"} return ( f"Filter Configuration:\n\n" f"Gyro Lowpass:\n" f" Lowpass 1: {filt.gyro_lowpass_hz} Hz ({filter_types.get(filt.gyro_lowpass_type, 'Unknown')})\n" f" Lowpass 2: {filt.gyro_lowpass2_hz} Hz ({filter_types.get(filt.gyro_lowpass2_type, 'Unknown')})\n\n" f"Gyro Notch:\n" f" Center: {filt.gyro_notch_hz} Hz, Cutoff: {filt.gyro_notch_cutoff} Hz\n\n" f"D-Term Lowpass:\n" f" Lowpass 1: {filt.dterm_lowpass_hz} Hz ({filter_types.get(filt.dterm_lowpass_type, 'Unknown')})\n" f" Lowpass 2: {filt.dterm_lowpass2_hz} Hz ({filter_types.get(filt.dterm_lowpass2_type, 'Unknown')})\n\n" f"D-Term Notch:\n" f" Center: {filt.dterm_notch_hz} Hz, Cutoff: {filt.dterm_notch_cutoff} Hz" ) elif name == "get_battery_config": conn = get_connection() batt = conn.get_battery_config() meter_sources = {0: "None", 1: "ADC", 2: "ESC"} return ( f"Battery Configuration:\n" f" Min Cell Voltage: {batt.vbat_min_cell_voltage:.1f}V\n" f" Max Cell Voltage: {batt.vbat_max_cell_voltage:.1f}V\n" f" Warning Cell Voltage: {batt.vbat_warning_cell_voltage:.1f}V\n" f" Capacity: {batt.capacity} mAh\n" f" Voltage Meter: {meter_sources.get(batt.voltage_meter_source, 'Unknown')}\n" f" Current Meter: {meter_sources.get(batt.current_meter_source, 'Unknown')}" ) elif name == "get_motor_config": conn = get_connection() motor = conn.get_motor_config() protocols = { 0: "PWM", 1: "ONESHOT125", 2: "ONESHOT42", 3: "MULTISHOT", 4: "BRUSHED", 5: "DSHOT150", 6: "DSHOT300", 7: "DSHOT600", 8: "DSHOT1200", 9: "PROSHOT1000" } return ( f"Motor Configuration:\n" f" Min Throttle: {motor.min_throttle}\n" f" Max Throttle: {motor.max_throttle}\n" f" Min Command: {motor.min_command}\n" f" Motor Poles: {motor.motor_poles}\n" f" Protocol: {protocols.get(motor.motor_pwm_protocol, 'Unknown')}\n" f" DSHOT Telemetry: {'Enabled' if motor.use_dshot_telemetry else 'Disabled'}" ) elif name == "get_vtx_config": conn = get_connection() vtx = conn.get_vtx_config() bands = {0: "N/A", 1: "A", 2: "B", 3: "E", 4: "F", 5: "R"} vtx_types = {0: "None", 1: "RTC6705", 2: "SmartAudio", 3: "Tramp", 4: "Table"} return ( f"VTX Configuration:\n" f" Type: {vtx_types.get(vtx.vtx_type, 'Unknown')}\n" f" Band: {bands.get(vtx.band, 'Unknown')}\n" f" Channel: {vtx.channel}\n" f" Power: {vtx.power}\n" f" Frequency: {vtx.frequency} MHz\n" f" Pit Mode: {'On' if vtx.pit_mode else 'Off'}\n" f" Low Power Disarm: {'On' if vtx.low_power_disarm else 'Off'}" ) elif name == "get_osd_config": conn = get_connection() osd = conn.get_osd_config() video_systems = {0: "AUTO", 1: "PAL", 2: "NTSC", 3: "HD"} units = {0: "Imperial", 1: "Metric"} return ( f"OSD Configuration:\n" f" Video System: {video_systems.get(osd.video_system, 'Unknown')}\n" f" Units: {units.get(osd.units, 'Unknown')}\n" f" RSSI Alarm: {osd.rssi_alarm}%\n" f" Capacity Alarm: {osd.cap_alarm} mAh" ) elif name == "get_blackbox_config": conn = get_connection() bb = conn.get_blackbox_config() devices = {0: "None", 1: "Flash", 2: "SD Card", 3: "Serial"} return ( f"Blackbox Configuration:\n" f" Device: {devices.get(bb.device, 'Unknown')}\n" f" Rate: 1/{bb.rate_denom} ({100/bb.rate_denom:.0f}%)\n" f" P Ratio: {bb.p_ratio}" ) elif name == "get_dataflash_summary": conn = get_connection() df = conn.get_dataflash_summary() if not df.supported: return "Dataflash not supported on this board." used_pct = (df.used_size / df.total_size * 100) if df.total_size > 0 else 0 return ( f"Dataflash Summary:\n" f" Status: {'Ready' if df.ready else 'Not Ready'}\n" f" Sectors: {df.sectors}\n" f" Total Size: {df.total_size / 1024:.0f} KB\n" f" Used: {df.used_size / 1024:.0f} KB ({used_pct:.1f}%)\n" f" Free: {(df.total_size - df.used_size) / 1024:.0f} KB" ) elif name == "get_features": conn = get_connection() features = conn.get_features() enabled = [name for name, on in features.features.items() if on] disabled = [name for name, on in features.features.items() if not on] return ( f"Enabled Features:\n " + (", ".join(sorted(enabled)) if enabled else "(none)") + f"\n\nDisabled Features:\n " + (", ".join(sorted(disabled)) if disabled else "(none)") ) elif name == "set_feature": conn = get_connection() feature = arguments["feature"] enabled = arguments["enabled"] conn.set_feature(feature, enabled) action = "enabled" if enabled else "disabled" return ( f"Feature '{feature}' {action}.\n\n" "Note: Use 'save_settings' to persist changes to EEPROM." ) # ----- Motor Testing Tools ----- elif name == "test_motor": conn = get_connection() motor_num = arguments["motor"] throttle = arguments["throttle_percent"] # Safety check - verify not armed status = conn.get_status() if status.armed: return "ERROR: Cannot test motors while armed! Disarm first." # Convert 1-8 to 0-7 index motor_index = motor_num - 1 # Convert percentage to PWM value (1000-2000) pwm_value = 1000 + (throttle * 10) conn.set_motor(motor_index, pwm_value) return ( f"Motor {motor_num} spinning at {throttle}% (PWM: {pwm_value})\n\n" "WARNING: Motor is now spinning! Use 'stop_motors' to stop.\n" "SAFETY: Ensure propellers are removed!" ) elif name == "stop_motors": conn = get_connection() conn.stop_all_motors() return "All motors stopped." # ----- Calibration Tools ----- elif name == "calibrate_accelerometer": conn = get_connection() # Safety check status = conn.get_status() if status.armed: return "ERROR: Cannot calibrate while armed! Disarm first." conn.calibrate_accelerometer() return ( "Accelerometer calibration complete!\n\n" "The drone should now read level when on a flat surface.\n" "Use 'save_settings' to persist the calibration." ) elif name == "calibrate_magnetometer": conn = get_connection() # Safety check status = conn.get_status() if status.armed: return "ERROR: Cannot calibrate while armed! Disarm first." conn.calibrate_magnetometer() return ( "Magnetometer calibration started!\n\n" "Rotate the drone through all orientations (360° on each axis).\n" "Calibration will complete automatically after ~30 seconds." ) # ----- Utility Tools ----- elif name == "beep": conn = get_connection() conn.beep() return "Beep! (If you didn't hear it, check the buzzer connection)" elif name == "reboot_flight_controller": conn = get_connection() conn.reboot() _connection = None return ( "Flight controller is rebooting...\n\n" "Connection has been closed. Use 'connect_flight_controller' " "to reconnect after the FC finishes booting (~3-5 seconds)." ) elif name == "erase_blackbox_logs": conn = get_connection() conn.erase_dataflash() return ( "Blackbox logs erased!\n\n" "The onboard flash memory has been cleared." ) elif name == "set_craft_name": conn = get_connection() new_name = arguments["name"] conn.set_name(new_name) return ( f"Craft name set to: {new_name}\n\n" "Note: Use 'save_settings' to persist changes to EEPROM." ) # ===== MODES & AUX CHANNELS ===== elif name == "get_available_modes": modes = [(id, name) for id, name in FLIGHT_MODES.items()] lines = ["Available Flight Modes:\n"] for mode_id, mode_name in sorted(modes): lines.append(f" {mode_id:2d}: {mode_name}") lines.append("\nCommon modes: ARM, ANGLE, HORIZON, AIRMODE, BEEPERON, FLIP_OVER_AFTER_CRASH") return "\n".join(lines) elif name == "get_aux_modes": conn = get_connection() ranges = conn.get_mode_ranges() if not ranges: return "No aux mode mappings configured." lines = ["Aux Channel Mode Mappings:\n"] lines.append("Slot Mode AUX Range") lines.append("-" * 40) for r in ranges: mode_name = FLIGHT_MODES.get(r.mode_id, f"Unknown({r.mode_id})") lines.append( f"{r.index:3d} {mode_name:15s} AUX{r.aux_channel+1} {r.range_start}-{r.range_end}" ) return "\n".join(lines) elif name == "set_aux_mode": conn = get_connection() mode_name = arguments["mode"].upper() aux_channel = arguments["aux_channel"] - 1 # Convert to 0-indexed range_start = arguments["range_start"] range_end = arguments["range_end"] # Find mode ID mode_id = None for mid, mname in FLIGHT_MODES.items(): if mname == mode_name: mode_id = mid break if mode_id is None: return f"Unknown mode: {mode_name}. Use 'get_available_modes' to see valid modes." # Find empty slot or slot with same mode ranges = conn.get_mode_ranges() slot_index = None # First check if this mode already has a mapping for r in ranges: if r.mode_id == mode_id: slot_index = r.index break # Otherwise find first empty slot if slot_index is None: used_slots = {r.index for r in ranges} for i in range(20): if i not in used_slots: slot_index = i break if slot_index is None: return "No empty mode slots available (max 20 mappings)." conn.set_mode_range(slot_index, mode_id, aux_channel, range_start, range_end) return ( f"Mode '{mode_name}' mapped to AUX{aux_channel+1} range {range_start}-{range_end}\n\n" "Note: Use 'save_settings' to persist changes." ) elif name == "clear_aux_mode": conn = get_connection() index = arguments["index"] conn.clear_mode_range(index) return f"Mode mapping in slot {index} cleared.\n\nUse 'save_settings' to persist." # ===== PROFILES ===== elif name == "get_current_profile": conn = get_connection() pid_profile, rate_profile = conn.get_current_profile() return ( f"Current Profiles:\n" f" PID Profile: {pid_profile + 1} (of 3)\n" f" Rate Profile: {rate_profile + 1} (of 6)" ) elif name == "set_pid_profile": conn = get_connection() profile = arguments["profile"] - 1 # Convert to 0-indexed conn.set_pid_profile(profile) return ( f"Switched to PID profile {profile + 1}.\n\n" "Note: Profile change is temporary. Use 'save_settings' to make it default." ) elif name == "set_rate_profile": conn = get_connection() profile = arguments["profile"] - 1 # Convert to 0-indexed conn.set_rate_profile(profile) return ( f"Switched to rate profile {profile + 1}.\n\n" "Note: Profile change is temporary. Use 'save_settings' to make it default." ) elif name == "copy_pid_profile": conn = get_connection() source = arguments["source"] - 1 # Convert to 0-indexed destination = arguments["destination"] - 1 conn.copy_profile(source, destination) return ( f"Copied PID settings from profile {source + 1} to profile {destination + 1}.\n\n" "Use 'save_settings' to persist." ) # ===== FAILSAFE & SAFETY ===== elif name == "get_failsafe_config": conn = get_connection() fs = conn.get_failsafe_config() procedures = {0: "AUTO_LAND", 1: "DROP", 2: "GPS_RESCUE"} return ( f"Failsafe Configuration:\n" f" Procedure: {procedures.get(fs.procedure, 'Unknown')}\n" f" Delay: {fs.delay * 0.1:.1f} seconds\n" f" Motor Off Delay: {fs.off_delay * 0.1:.1f} seconds\n" f" Throttle: {fs.throttle}\n" f" Switch Mode: {'Stage 2' if fs.switch_mode else 'Stage 1'}" ) elif name == "set_failsafe_config": conn = get_connection() procedure = arguments.get("procedure") delay = arguments.get("delay") proc_map = {"AUTO_LAND": 0, "DROP": 1, "GPS_RESCUE": 2} proc_int = proc_map.get(procedure) if procedure else None # Convert delay from seconds to 0.1s units delay_int = int(delay * 10) if delay else None conn.set_failsafe_config( procedure=proc_int, delay=delay_int, ) return ( "Failsafe configuration updated.\n\n" "Use 'save_settings' to persist changes." ) elif name == "get_arming_disable_flags": conn = get_connection() flags = conn.get_arming_disable_flags() if not flags: return "No arming disable flags - drone is ready to arm!" lines = ["Arming is blocked by:\n"] for flag in flags: lines.append(f" - {flag}") # Add helpful hints hints = { "RXLOSS": "Radio receiver not connected or no signal", "FAILSAFE": "Failsafe is active", "BOXFAILSAFE": "Failsafe switch is on", "THROTTLE": "Throttle is not at minimum", "ANGLE": "Drone is not level", "BOOT_GRACE_TIME": "Wait a few seconds after boot", "NOPREARM": "PREARM switch not activated", "LOAD": "CPU load too high", "CALIB": "Calibration in progress", "CLI": "Currently in CLI mode", "MSP": "MSP connection preventing arm", "PARALYZE": "Paralyze mode active", "RPMFILTER": "RPM filter not ready", } lines.append("\nHints:") for flag in flags: if flag in hints: lines.append(f" {flag}: {hints[flag]}") return "\n".join(lines) elif name == "preflight_check": conn = get_connection() result = conn.run_preflight_check() lines = ["PREFLIGHT CHECK\n" + "=" * 40 + "\n"] for check in result.checks: status = "✓" if check.passed else "✗" lines.append(f"{status} {check.name}: {check.message}") lines.append("\n" + "=" * 40) if result.all_passed: lines.append("RESULT: ALL CHECKS PASSED - Ready to fly!") else: lines.append("RESULT: SOME CHECKS FAILED - Review issues above") return "\n".join(lines) # ===== VTX CONTROL ===== elif name == "set_vtx_channel": conn = get_connection() channel = arguments["channel"] # Get current band vtx = conn.get_vtx_config() conn.set_vtx_band_channel(vtx.band, channel) return ( f"VTX channel set to {channel}.\n\n" "Use 'save_settings' to persist." ) elif name == "set_vtx_band": conn = get_connection() band_letter = arguments["band"].upper() band_map = {"A": 1, "B": 2, "E": 3, "F": 4, "R": 5} band = band_map.get(band_letter, 1) # Get current channel vtx = conn.get_vtx_config() conn.set_vtx_band_channel(band, vtx.channel) return ( f"VTX band set to {band_letter}.\n\n" "Use 'save_settings' to persist." ) elif name == "set_vtx_power": conn = get_connection() power = arguments["power"] conn.set_vtx_power(power) return ( f"VTX power set to level {power}.\n\n" "Use 'save_settings' to persist." ) elif name == "set_vtx_pit_mode": conn = get_connection() enabled = arguments["enabled"] conn.set_vtx_pit_mode(enabled) status = "enabled" if enabled else "disabled" return ( f"VTX pit mode {status}.\n\n" "Use 'save_settings' to persist." ) else: return f"Unknown tool: {name}" def main(): """Main entry point for the MCP server.""" async def run(): async with stdio_server() as (read_stream, write_stream): await app.run( read_stream, write_stream, app.create_initialization_options(), ) asyncio.run(run()) if __name__ == "__main__": main()

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/jir13/MCP'

If you have feedback or need assistance with the MCP directory API, please join our Discord server