server2.pyβ’33.6 kB
#!/usr/bin/env python3
"""
Enhanced Meshtastic MCP Server
Provides comprehensive tools for interacting with Meshtastic mesh networks via the Model Context Protocol.
"""
import asyncio
import json
import logging
from typing import Any, Optional
from datetime import datetime
import meshtastic
import meshtastic.serial_interface
import meshtastic.tcp_interface
import meshtastic.ble_interface
from meshtastic.protobuf import portnums_pb2
from mcp.server import Server
from mcp.types import (
Tool,
TextContent,
ImageContent,
EmbeddedResource,
LoggingLevel
)
from mcp.server.stdio import stdio_server
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("meshtastic-mcp")
# Global interface - will be initialized on first use
interface: Optional[Any] = None
message_history = []
traceroute_results = {}
telemetry_results = {}
def get_interface(connection_type: str = "serial", device: Optional[str] = None):
"""Get or create the Meshtastic interface."""
global interface
if interface is None:
logger.info(f"Initializing Meshtastic interface: {connection_type}")
if connection_type == "serial":
interface = meshtastic.serial_interface.SerialInterface(devPath=device)
elif connection_type == "tcp":
interface = meshtastic.tcp_interface.TCPInterface(hostname=device or "meshtastic.local")
elif connection_type in ["bluetooth", "ble"]:
interface = meshtastic.ble_interface.BLEInterface(address=device)
else:
raise ValueError(f"Unknown connection type: {connection_type}")
return interface
def on_receive(packet, interface):
"""Callback for received messages."""
try:
if packet.get('decoded'):
message_history.append({
'from': packet.get('fromId', 'unknown'),
'to': packet.get('toId', 'broadcast'),
'text': packet['decoded'].get('text', ''),
'time': datetime.now().isoformat(),
'portnum': packet['decoded'].get('portnum', 'UNKNOWN')
})
logger.info(f"Received message from {packet.get('fromId')}: {packet['decoded'].get('text')}")
except Exception as e:
logger.error(f"Error processing received packet: {e}")
def on_traceroute_response(packet):
"""Callback for traceroute responses."""
try:
route = packet.get('decoded', {}).get('routeDiscovery', {})
node_id = packet.get('fromId', 'unknown')
traceroute_results[node_id] = {
'route': route.get('route', []),
'time': datetime.now().isoformat(),
'snr_towards': route.get('snrTowards', []),
'snr_back': route.get('snrBack', [])
}
logger.info(f"Received traceroute response from {node_id}")
except Exception as e:
logger.error(f"Error processing traceroute response: {e}")
def on_telemetry_response(packet):
"""Callback for telemetry responses."""
try:
telemetry = packet.get('decoded', {}).get('telemetry', {})
node_id = packet.get('fromId', 'unknown')
telemetry_results[node_id] = {
'data': telemetry,
'time': datetime.now().isoformat()
}
logger.info(f"Received telemetry from {node_id}")
except Exception as e:
logger.error(f"Error processing telemetry: {e}")
async def main():
"""Main entry point for the MCP server."""
server = Server("meshtastic-mcp-enhanced")
@server.list_tools()
async def list_tools() -> list[Tool]:
"""List available Meshtastic tools."""
return [
# Connection tools
Tool(
name="connect",
description="Connect to a Meshtastic device via serial, TCP/IP, or Bluetooth",
inputSchema={
"type": "object",
"properties": {
"connection_type": {
"type": "string",
"enum": ["serial", "tcp", "bluetooth"],
"description": "Connection type: 'serial' for USB, 'tcp' for network, or 'bluetooth' for BLE",
"default": "serial"
},
"device": {
"type": "string",
"description": "Device path (e.g., /dev/ttyUSB0), hostname for TCP, or BLE MAC address",
}
},
"required": []
}
),
Tool(
name="scan_bluetooth",
description="Scan for nearby Meshtastic devices via Bluetooth (useful before connecting). Returns devices that are properly advertising the Meshtastic service.",
inputSchema={
"type": "object",
"properties": {}
}
),
# Messaging tools
Tool(
name="send_message",
description="Send a text message to a node or channel on the mesh network",
inputSchema={
"type": "object",
"properties": {
"text": {
"type": "string",
"description": "The message text to send"
},
"destination": {
"type": "string",
"description": "Node ID or channel name (use '^all' for broadcast)",
"default": "^all"
},
"channel_index": {
"type": "number",
"description": "Channel index (0-7, default: 0)",
"default": 0
},
"want_ack": {
"type": "boolean",
"description": "Request acknowledgment from recipient",
"default": False
}
},
"required": ["text"]
}
),
Tool(
name="send_alert",
description="Send an alert message (higher priority than regular messages, generates special notifications)",
inputSchema={
"type": "object",
"properties": {
"text": {
"type": "string",
"description": "The alert text to send"
},
"destination": {
"type": "string",
"description": "Node ID or channel name (use '^all' for broadcast)",
"default": "^all"
},
"channel_index": {
"type": "number",
"description": "Channel index (0-7, default: 0)",
"default": 0
}
},
"required": ["text"]
}
),
Tool(
name="get_messages",
description="Get recent messages received from the mesh network",
inputSchema={
"type": "object",
"properties": {
"limit": {
"type": "number",
"description": "Maximum number of recent messages to return",
"default": 10
}
}
}
),
# Network discovery tools
Tool(
name="get_nodes",
description="Get information about all nodes in the mesh network",
inputSchema={
"type": "object",
"properties": {}
}
),
Tool(
name="get_node_info",
description="Get detailed information about a specific node",
inputSchema={
"type": "object",
"properties": {
"node_id": {
"type": "string",
"description": "The node ID to query"
}
},
"required": ["node_id"]
}
),
Tool(
name="get_local_node",
description="Get information about the locally connected Meshtastic device",
inputSchema={
"type": "object",
"properties": {}
}
),
Tool(
name="show_network_info",
description="Get formatted, human-readable information about the mesh network and all nodes",
inputSchema={
"type": "object",
"properties": {}
}
),
# Traceroute tool
Tool(
name="send_traceroute",
description="Send a traceroute request to discover the path to a destination node. Returns the route, hop count, and SNR at each hop.",
inputSchema={
"type": "object",
"properties": {
"destination": {
"type": "string",
"description": "Node ID or node number of the destination"
},
"hop_limit": {
"type": "number",
"description": "Maximum number of hops (default: 7)",
"default": 7
},
"channel_index": {
"type": "number",
"description": "Channel index (0-7, default: 0)",
"default": 0
}
},
"required": ["destination"]
}
),
Tool(
name="get_traceroute_results",
description="Get the results of recent traceroute requests",
inputSchema={
"type": "object",
"properties": {}
}
),
# Telemetry tools
Tool(
name="send_telemetry_request",
description="Request telemetry data from a node (device metrics, environment sensors, power stats)",
inputSchema={
"type": "object",
"properties": {
"destination": {
"type": "string",
"description": "Node ID (use '^all' for broadcast)",
"default": "^all"
},
"telemetry_type": {
"type": "string",
"enum": ["device_metrics", "environment_metrics", "power_metrics", "air_quality_metrics"],
"description": "Type of telemetry to request",
"default": "device_metrics"
},
"channel_index": {
"type": "number",
"description": "Channel index (0-7, default: 0)",
"default": 0
}
}
}
),
Tool(
name="get_telemetry_results",
description="Get recent telemetry data received from nodes",
inputSchema={
"type": "object",
"properties": {}
}
),
# Position tools
Tool(
name="send_position",
description="Send GPS position to the mesh network",
inputSchema={
"type": "object",
"properties": {
"latitude": {
"type": "number",
"description": "Latitude in degrees"
},
"longitude": {
"type": "number",
"description": "Longitude in degrees"
},
"altitude": {
"type": "number",
"description": "Altitude in meters (optional)",
"default": 0
},
"destination": {
"type": "string",
"description": "Node ID (use '^all' for broadcast)",
"default": "^all"
}
},
"required": ["latitude", "longitude"]
}
),
# Waypoint tools
Tool(
name="send_waypoint",
description="Send a waypoint (point of interest) to the mesh network",
inputSchema={
"type": "object",
"properties": {
"name": {
"type": "string",
"description": "Name of the waypoint"
},
"description": {
"type": "string",
"description": "Description of the waypoint"
},
"latitude": {
"type": "number",
"description": "Latitude in degrees"
},
"longitude": {
"type": "number",
"description": "Longitude in degrees"
},
"icon": {
"type": "number",
"description": "Icon ID for the waypoint (0-63)",
"default": 0
},
"expire": {
"type": "number",
"description": "Expiration time in seconds (0 for never)",
"default": 0
},
"waypoint_id": {
"type": "number",
"description": "Optional waypoint ID (auto-generated if not provided)"
},
"destination": {
"type": "string",
"description": "Node ID (use '^all' for broadcast)",
"default": "^all"
}
},
"required": ["name", "latitude", "longitude"]
}
),
Tool(
name="delete_waypoint",
description="Delete a waypoint from the mesh network",
inputSchema={
"type": "object",
"properties": {
"waypoint_id": {
"type": "number",
"description": "ID of the waypoint to delete"
},
"destination": {
"type": "string",
"description": "Node ID (use '^all' for broadcast)",
"default": "^all"
}
},
"required": ["waypoint_id"]
}
),
# Channel tools
Tool(
name="get_channels",
description="Get list of configured channels on the device",
inputSchema={
"type": "object",
"properties": {}
}
),
# Device management tools
Tool(
name="get_device_metadata",
description="Get device hardware metadata (firmware version, hardware model, etc.)",
inputSchema={
"type": "object",
"properties": {}
}
),
Tool(
name="set_owner",
description="Set the device owner name (long name and short name)",
inputSchema={
"type": "object",
"properties": {
"long_name": {
"type": "string",
"description": "Long name for the device (e.g., 'John's Radio')"
},
"short_name": {
"type": "string",
"description": "Short name for the device (e.g., 'JN', max 4 chars)"
},
"is_licensed": {
"type": "boolean",
"description": "Whether the operator is licensed (ham radio)",
"default": False
}
}
}
),
Tool(
name="reboot_node",
description="Reboot the connected Meshtastic device",
inputSchema={
"type": "object",
"properties": {
"seconds": {
"type": "number",
"description": "Delay in seconds before reboot (default: 10)",
"default": 10
}
}
}
),
Tool(
name="shutdown_node",
description="Shutdown the connected Meshtastic device",
inputSchema={
"type": "object",
"properties": {
"seconds": {
"type": "number",
"description": "Delay in seconds before shutdown (default: 10)",
"default": 10
}
}
}
)
]
@server.call_tool()
async def call_tool(name: str, arguments: Any) -> list[TextContent]:
"""Handle tool calls for Meshtastic operations."""
try:
# Connection tools
if name == "connect":
connection_type = arguments.get("connection_type", "serial")
device = arguments.get("device")
iface = get_interface(connection_type, device)
# Set up callbacks
iface.onReceive = lambda packet, interface=iface: on_receive(packet, interface)
return [TextContent(
type="text",
text=f"Successfully connected to Meshtastic device via {connection_type}"
)]
elif name == "scan_bluetooth":
logger.info("Scanning for Meshtastic Bluetooth devices...")
devices = meshtastic.ble_interface.BLEInterface.scan()
if not devices:
return [TextContent(
type="text",
text="No Meshtastic devices found in Bluetooth scan.\nMake sure your device has Bluetooth enabled and is advertising."
)]
meshtastic_devices = []
for device in devices:
meshtastic_devices.append({
'name': device.name or "Unknown",
'address': device.address
})
return [TextContent(
type="text",
text=f"Found {len(meshtastic_devices)} Meshtastic device(s):\n{json.dumps(meshtastic_devices, indent=2)}\n\nUse the address to connect via Bluetooth."
)]
# Messaging tools
elif name == "send_message":
iface = get_interface()
text = arguments["text"]
destination = arguments.get("destination", "^all")
channel_index = arguments.get("channel_index", 0)
want_ack = arguments.get("want_ack", False)
iface.sendText(
text=text,
destinationId=destination,
channelIndex=channel_index,
wantAck=want_ack
)
return [TextContent(
type="text",
text=f"Message sent to {destination} on channel {channel_index}: {text}"
)]
elif name == "send_alert":
iface = get_interface()
text = arguments["text"]
destination = arguments.get("destination", "^all")
channel_index = arguments.get("channel_index", 0)
iface.sendAlert(
text=text,
destinationId=destination,
channelIndex=channel_index
)
return [TextContent(
type="text",
text=f"Alert sent to {destination}: {text}"
)]
elif name == "get_messages":
limit = arguments.get("limit", 10)
recent_messages = message_history[-limit:]
if not recent_messages:
return [TextContent(
type="text",
text="No messages received yet"
)]
return [TextContent(
type="text",
text=f"Recent messages:\n{json.dumps(recent_messages, indent=2)}"
)]
# Network discovery tools
elif name == "get_nodes":
iface = get_interface()
nodes = iface.nodes
nodes_info = []
for node_id, node in nodes.items():
node_data = {
'id': node_id,
'user': node.get('user', {}),
'position': node.get('position', {}),
'snr': node.get('snr'),
'lastHeard': node.get('lastHeard'),
}
nodes_info.append(node_data)
return [TextContent(
type="text",
text=f"Mesh nodes ({len(nodes_info)} total):\n{json.dumps(nodes_info, indent=2, default=str)}"
)]
elif name == "get_node_info":
iface = get_interface()
node_id = arguments["node_id"]
if node_id in iface.nodes:
node = iface.nodes[node_id]
return [TextContent(
type="text",
text=f"Node info:\n{json.dumps(node, indent=2, default=str)}"
)]
else:
return [TextContent(
type="text",
text=f"Node {node_id} not found in network"
)]
elif name == "get_local_node":
iface = get_interface()
local_node = iface.getMyNodeInfo()
return [TextContent(
type="text",
text=f"Local node info:\n{json.dumps(local_node, indent=2, default=str)}"
)]
elif name == "show_network_info":
iface = get_interface()
info = iface.showNodes()
return [TextContent(
type="text",
text=f"Network information:\n{info}"
)]
# Traceroute tool
elif name == "send_traceroute":
iface = get_interface()
destination = arguments["destination"]
hop_limit = arguments.get("hop_limit", 7)
channel_index = arguments.get("channel_index", 0)
# Set up traceroute callback
iface.onResponseTraceRoute = on_traceroute_response
iface.sendTraceRoute(
dest=destination,
hopLimit=hop_limit,
channelIndex=channel_index
)
return [TextContent(
type="text",
text=f"Traceroute request sent to {destination} with hop limit {hop_limit}\nUse 'get_traceroute_results' to see the response."
)]
elif name == "get_traceroute_results":
if not traceroute_results:
return [TextContent(
type="text",
text="No traceroute results available yet"
)]
return [TextContent(
type="text",
text=f"Traceroute results:\n{json.dumps(traceroute_results, indent=2)}"
)]
# Telemetry tools
elif name == "send_telemetry_request":
iface = get_interface()
destination = arguments.get("destination", "^all")
telemetry_type = arguments.get("telemetry_type", "device_metrics")
channel_index = arguments.get("channel_index", 0)
# Set up telemetry callback
iface.onResponseTelemetry = on_telemetry_response
iface.sendTelemetry(
destinationId=destination,
wantResponse=True,
channelIndex=channel_index,
telemetryType=telemetry_type
)
return [TextContent(
type="text",
text=f"Telemetry request ({telemetry_type}) sent to {destination}\nUse 'get_telemetry_results' to see responses."
)]
elif name == "get_telemetry_results":
if not telemetry_results:
return [TextContent(
type="text",
text="No telemetry data available yet"
)]
return [TextContent(
type="text",
text=f"Telemetry data:\n{json.dumps(telemetry_results, indent=2, default=str)}"
)]
# Position tools
elif name == "send_position":
iface = get_interface()
lat = arguments["latitude"]
lon = arguments["longitude"]
alt = arguments.get("altitude", 0)
destination = arguments.get("destination", "^all")
iface.sendPosition(
latitude=lat,
longitude=lon,
altitude=alt,
destinationId=destination
)
return [TextContent(
type="text",
text=f"Position sent to {destination}: {lat}, {lon}, {alt}m"
)]
# Waypoint tools
elif name == "send_waypoint":
iface = get_interface()
name_val = arguments["name"]
description = arguments.get("description", "")
lat = arguments["latitude"]
lon = arguments["longitude"]
icon = arguments.get("icon", 0)
expire = arguments.get("expire", 0)
waypoint_id = arguments.get("waypoint_id")
destination = arguments.get("destination", "^all")
iface.sendWaypoint(
name=name_val,
description=description,
latitude=lat,
longitude=lon,
icon=icon,
expire=expire,
waypoint_id=waypoint_id,
destinationId=destination
)
return [TextContent(
type="text",
text=f"Waypoint '{name_val}' sent to {destination} at ({lat}, {lon})"
)]
elif name == "delete_waypoint":
iface = get_interface()
waypoint_id = arguments["waypoint_id"]
destination = arguments.get("destination", "^all")
iface.deleteWaypoint(
waypoint_id=waypoint_id,
destinationId=destination
)
return [TextContent(
type="text",
text=f"Waypoint {waypoint_id} deletion request sent to {destination}"
)]
# Channel tools
elif name == "get_channels":
iface = get_interface()
channels = iface.localNode.channels
channel_info = []
for i, channel in enumerate(channels):
if channel.settings.name:
channel_info.append({
'index': i,
'name': channel.settings.name,
'role': str(channel.role)
})
return [TextContent(
type="text",
text=f"Channels:\n{json.dumps(channel_info, indent=2)}"
)]
# Device management tools
elif name == "get_device_metadata":
iface = get_interface()
node = iface.getNode(iface.localNode.nodeNum)
metadata = node.getMetadata()
return [TextContent(
type="text",
text=f"Device metadata:\n{json.dumps(metadata, indent=2, default=str)}"
)]
elif name == "set_owner":
iface = get_interface()
long_name = arguments.get("long_name")
short_name = arguments.get("short_name")
is_licensed = arguments.get("is_licensed", False)
node = iface.getNode(iface.localNode.nodeNum)
node.setOwner(
long_name=long_name,
short_name=short_name,
is_licensed=is_licensed
)
return [TextContent(
type="text",
text=f"Owner set: {long_name} ({short_name})"
)]
elif name == "reboot_node":
iface = get_interface()
seconds = arguments.get("seconds", 10)
node = iface.getNode(iface.localNode.nodeNum)
node.reboot(secs=seconds)
return [TextContent(
type="text",
text=f"Reboot command sent. Device will reboot in {seconds} seconds."
)]
elif name == "shutdown_node":
iface = get_interface()
seconds = arguments.get("seconds", 10)
node = iface.getNode(iface.localNode.nodeNum)
node.shutdown(secs=seconds)
return [TextContent(
type="text",
text=f"Shutdown command sent. Device will shutdown in {seconds} seconds."
)]
else:
return [TextContent(
type="text",
text=f"Unknown tool: {name}"
)]
except Exception as e:
logger.error(f"Error executing tool {name}: {e}")
return [TextContent(
type="text",
text=f"Error: {str(e)}"
)]
# Run the server
async with stdio_server() as (read_stream, write_stream):
await server.run(
read_stream,
write_stream,
server.create_initialization_options()
)
if __name__ == "__main__":
asyncio.run(main())