Skip to main content
Glama
server2.pyβ€’33.6 kB
#!/usr/bin/env python3 """ Enhanced Meshtastic MCP Server Provides comprehensive tools for interacting with Meshtastic mesh networks via the Model Context Protocol. """ import asyncio import json import logging from typing import Any, Optional from datetime import datetime import meshtastic import meshtastic.serial_interface import meshtastic.tcp_interface import meshtastic.ble_interface from meshtastic.protobuf import portnums_pb2 from mcp.server import Server from mcp.types import ( Tool, TextContent, ImageContent, EmbeddedResource, LoggingLevel ) from mcp.server.stdio import stdio_server # Configure logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger("meshtastic-mcp") # Global interface - will be initialized on first use interface: Optional[Any] = None message_history = [] traceroute_results = {} telemetry_results = {} def get_interface(connection_type: str = "serial", device: Optional[str] = None): """Get or create the Meshtastic interface.""" global interface if interface is None: logger.info(f"Initializing Meshtastic interface: {connection_type}") if connection_type == "serial": interface = meshtastic.serial_interface.SerialInterface(devPath=device) elif connection_type == "tcp": interface = meshtastic.tcp_interface.TCPInterface(hostname=device or "meshtastic.local") elif connection_type in ["bluetooth", "ble"]: interface = meshtastic.ble_interface.BLEInterface(address=device) else: raise ValueError(f"Unknown connection type: {connection_type}") return interface def on_receive(packet, interface): """Callback for received messages.""" try: if packet.get('decoded'): message_history.append({ 'from': packet.get('fromId', 'unknown'), 'to': packet.get('toId', 'broadcast'), 'text': packet['decoded'].get('text', ''), 'time': datetime.now().isoformat(), 'portnum': packet['decoded'].get('portnum', 'UNKNOWN') }) logger.info(f"Received message from {packet.get('fromId')}: {packet['decoded'].get('text')}") except Exception as e: logger.error(f"Error processing received packet: {e}") def on_traceroute_response(packet): """Callback for traceroute responses.""" try: route = packet.get('decoded', {}).get('routeDiscovery', {}) node_id = packet.get('fromId', 'unknown') traceroute_results[node_id] = { 'route': route.get('route', []), 'time': datetime.now().isoformat(), 'snr_towards': route.get('snrTowards', []), 'snr_back': route.get('snrBack', []) } logger.info(f"Received traceroute response from {node_id}") except Exception as e: logger.error(f"Error processing traceroute response: {e}") def on_telemetry_response(packet): """Callback for telemetry responses.""" try: telemetry = packet.get('decoded', {}).get('telemetry', {}) node_id = packet.get('fromId', 'unknown') telemetry_results[node_id] = { 'data': telemetry, 'time': datetime.now().isoformat() } logger.info(f"Received telemetry from {node_id}") except Exception as e: logger.error(f"Error processing telemetry: {e}") async def main(): """Main entry point for the MCP server.""" server = Server("meshtastic-mcp-enhanced") @server.list_tools() async def list_tools() -> list[Tool]: """List available Meshtastic tools.""" return [ # Connection tools Tool( name="connect", description="Connect to a Meshtastic device via serial, TCP/IP, or Bluetooth", inputSchema={ "type": "object", "properties": { "connection_type": { "type": "string", "enum": ["serial", "tcp", "bluetooth"], "description": "Connection type: 'serial' for USB, 'tcp' for network, or 'bluetooth' for BLE", "default": "serial" }, "device": { "type": "string", "description": "Device path (e.g., /dev/ttyUSB0), hostname for TCP, or BLE MAC address", } }, "required": [] } ), Tool( name="scan_bluetooth", description="Scan for nearby Meshtastic devices via Bluetooth (useful before connecting). Returns devices that are properly advertising the Meshtastic service.", inputSchema={ "type": "object", "properties": {} } ), # Messaging tools Tool( name="send_message", description="Send a text message to a node or channel on the mesh network", inputSchema={ "type": "object", "properties": { "text": { "type": "string", "description": "The message text to send" }, "destination": { "type": "string", "description": "Node ID or channel name (use '^all' for broadcast)", "default": "^all" }, "channel_index": { "type": "number", "description": "Channel index (0-7, default: 0)", "default": 0 }, "want_ack": { "type": "boolean", "description": "Request acknowledgment from recipient", "default": False } }, "required": ["text"] } ), Tool( name="send_alert", description="Send an alert message (higher priority than regular messages, generates special notifications)", inputSchema={ "type": "object", "properties": { "text": { "type": "string", "description": "The alert text to send" }, "destination": { "type": "string", "description": "Node ID or channel name (use '^all' for broadcast)", "default": "^all" }, "channel_index": { "type": "number", "description": "Channel index (0-7, default: 0)", "default": 0 } }, "required": ["text"] } ), Tool( name="get_messages", description="Get recent messages received from the mesh network", inputSchema={ "type": "object", "properties": { "limit": { "type": "number", "description": "Maximum number of recent messages to return", "default": 10 } } } ), # Network discovery tools Tool( name="get_nodes", description="Get information about all nodes in the mesh network", inputSchema={ "type": "object", "properties": {} } ), Tool( name="get_node_info", description="Get detailed information about a specific node", inputSchema={ "type": "object", "properties": { "node_id": { "type": "string", "description": "The node ID to query" } }, "required": ["node_id"] } ), Tool( name="get_local_node", description="Get information about the locally connected Meshtastic device", inputSchema={ "type": "object", "properties": {} } ), Tool( name="show_network_info", description="Get formatted, human-readable information about the mesh network and all nodes", inputSchema={ "type": "object", "properties": {} } ), # Traceroute tool Tool( name="send_traceroute", description="Send a traceroute request to discover the path to a destination node. Returns the route, hop count, and SNR at each hop.", inputSchema={ "type": "object", "properties": { "destination": { "type": "string", "description": "Node ID or node number of the destination" }, "hop_limit": { "type": "number", "description": "Maximum number of hops (default: 7)", "default": 7 }, "channel_index": { "type": "number", "description": "Channel index (0-7, default: 0)", "default": 0 } }, "required": ["destination"] } ), Tool( name="get_traceroute_results", description="Get the results of recent traceroute requests", inputSchema={ "type": "object", "properties": {} } ), # Telemetry tools Tool( name="send_telemetry_request", description="Request telemetry data from a node (device metrics, environment sensors, power stats)", inputSchema={ "type": "object", "properties": { "destination": { "type": "string", "description": "Node ID (use '^all' for broadcast)", "default": "^all" }, "telemetry_type": { "type": "string", "enum": ["device_metrics", "environment_metrics", "power_metrics", "air_quality_metrics"], "description": "Type of telemetry to request", "default": "device_metrics" }, "channel_index": { "type": "number", "description": "Channel index (0-7, default: 0)", "default": 0 } } } ), Tool( name="get_telemetry_results", description="Get recent telemetry data received from nodes", inputSchema={ "type": "object", "properties": {} } ), # Position tools Tool( name="send_position", description="Send GPS position to the mesh network", inputSchema={ "type": "object", "properties": { "latitude": { "type": "number", "description": "Latitude in degrees" }, "longitude": { "type": "number", "description": "Longitude in degrees" }, "altitude": { "type": "number", "description": "Altitude in meters (optional)", "default": 0 }, "destination": { "type": "string", "description": "Node ID (use '^all' for broadcast)", "default": "^all" } }, "required": ["latitude", "longitude"] } ), # Waypoint tools Tool( name="send_waypoint", description="Send a waypoint (point of interest) to the mesh network", inputSchema={ "type": "object", "properties": { "name": { "type": "string", "description": "Name of the waypoint" }, "description": { "type": "string", "description": "Description of the waypoint" }, "latitude": { "type": "number", "description": "Latitude in degrees" }, "longitude": { "type": "number", "description": "Longitude in degrees" }, "icon": { "type": "number", "description": "Icon ID for the waypoint (0-63)", "default": 0 }, "expire": { "type": "number", "description": "Expiration time in seconds (0 for never)", "default": 0 }, "waypoint_id": { "type": "number", "description": "Optional waypoint ID (auto-generated if not provided)" }, "destination": { "type": "string", "description": "Node ID (use '^all' for broadcast)", "default": "^all" } }, "required": ["name", "latitude", "longitude"] } ), Tool( name="delete_waypoint", description="Delete a waypoint from the mesh network", inputSchema={ "type": "object", "properties": { "waypoint_id": { "type": "number", "description": "ID of the waypoint to delete" }, "destination": { "type": "string", "description": "Node ID (use '^all' for broadcast)", "default": "^all" } }, "required": ["waypoint_id"] } ), # Channel tools Tool( name="get_channels", description="Get list of configured channels on the device", inputSchema={ "type": "object", "properties": {} } ), # Device management tools Tool( name="get_device_metadata", description="Get device hardware metadata (firmware version, hardware model, etc.)", inputSchema={ "type": "object", "properties": {} } ), Tool( name="set_owner", description="Set the device owner name (long name and short name)", inputSchema={ "type": "object", "properties": { "long_name": { "type": "string", "description": "Long name for the device (e.g., 'John's Radio')" }, "short_name": { "type": "string", "description": "Short name for the device (e.g., 'JN', max 4 chars)" }, "is_licensed": { "type": "boolean", "description": "Whether the operator is licensed (ham radio)", "default": False } } } ), Tool( name="reboot_node", description="Reboot the connected Meshtastic device", inputSchema={ "type": "object", "properties": { "seconds": { "type": "number", "description": "Delay in seconds before reboot (default: 10)", "default": 10 } } } ), Tool( name="shutdown_node", description="Shutdown the connected Meshtastic device", inputSchema={ "type": "object", "properties": { "seconds": { "type": "number", "description": "Delay in seconds before shutdown (default: 10)", "default": 10 } } } ) ] @server.call_tool() async def call_tool(name: str, arguments: Any) -> list[TextContent]: """Handle tool calls for Meshtastic operations.""" try: # Connection tools if name == "connect": connection_type = arguments.get("connection_type", "serial") device = arguments.get("device") iface = get_interface(connection_type, device) # Set up callbacks iface.onReceive = lambda packet, interface=iface: on_receive(packet, interface) return [TextContent( type="text", text=f"Successfully connected to Meshtastic device via {connection_type}" )] elif name == "scan_bluetooth": logger.info("Scanning for Meshtastic Bluetooth devices...") devices = meshtastic.ble_interface.BLEInterface.scan() if not devices: return [TextContent( type="text", text="No Meshtastic devices found in Bluetooth scan.\nMake sure your device has Bluetooth enabled and is advertising." )] meshtastic_devices = [] for device in devices: meshtastic_devices.append({ 'name': device.name or "Unknown", 'address': device.address }) return [TextContent( type="text", text=f"Found {len(meshtastic_devices)} Meshtastic device(s):\n{json.dumps(meshtastic_devices, indent=2)}\n\nUse the address to connect via Bluetooth." )] # Messaging tools elif name == "send_message": iface = get_interface() text = arguments["text"] destination = arguments.get("destination", "^all") channel_index = arguments.get("channel_index", 0) want_ack = arguments.get("want_ack", False) iface.sendText( text=text, destinationId=destination, channelIndex=channel_index, wantAck=want_ack ) return [TextContent( type="text", text=f"Message sent to {destination} on channel {channel_index}: {text}" )] elif name == "send_alert": iface = get_interface() text = arguments["text"] destination = arguments.get("destination", "^all") channel_index = arguments.get("channel_index", 0) iface.sendAlert( text=text, destinationId=destination, channelIndex=channel_index ) return [TextContent( type="text", text=f"Alert sent to {destination}: {text}" )] elif name == "get_messages": limit = arguments.get("limit", 10) recent_messages = message_history[-limit:] if not recent_messages: return [TextContent( type="text", text="No messages received yet" )] return [TextContent( type="text", text=f"Recent messages:\n{json.dumps(recent_messages, indent=2)}" )] # Network discovery tools elif name == "get_nodes": iface = get_interface() nodes = iface.nodes nodes_info = [] for node_id, node in nodes.items(): node_data = { 'id': node_id, 'user': node.get('user', {}), 'position': node.get('position', {}), 'snr': node.get('snr'), 'lastHeard': node.get('lastHeard'), } nodes_info.append(node_data) return [TextContent( type="text", text=f"Mesh nodes ({len(nodes_info)} total):\n{json.dumps(nodes_info, indent=2, default=str)}" )] elif name == "get_node_info": iface = get_interface() node_id = arguments["node_id"] if node_id in iface.nodes: node = iface.nodes[node_id] return [TextContent( type="text", text=f"Node info:\n{json.dumps(node, indent=2, default=str)}" )] else: return [TextContent( type="text", text=f"Node {node_id} not found in network" )] elif name == "get_local_node": iface = get_interface() local_node = iface.getMyNodeInfo() return [TextContent( type="text", text=f"Local node info:\n{json.dumps(local_node, indent=2, default=str)}" )] elif name == "show_network_info": iface = get_interface() info = iface.showNodes() return [TextContent( type="text", text=f"Network information:\n{info}" )] # Traceroute tool elif name == "send_traceroute": iface = get_interface() destination = arguments["destination"] hop_limit = arguments.get("hop_limit", 7) channel_index = arguments.get("channel_index", 0) # Set up traceroute callback iface.onResponseTraceRoute = on_traceroute_response iface.sendTraceRoute( dest=destination, hopLimit=hop_limit, channelIndex=channel_index ) return [TextContent( type="text", text=f"Traceroute request sent to {destination} with hop limit {hop_limit}\nUse 'get_traceroute_results' to see the response." )] elif name == "get_traceroute_results": if not traceroute_results: return [TextContent( type="text", text="No traceroute results available yet" )] return [TextContent( type="text", text=f"Traceroute results:\n{json.dumps(traceroute_results, indent=2)}" )] # Telemetry tools elif name == "send_telemetry_request": iface = get_interface() destination = arguments.get("destination", "^all") telemetry_type = arguments.get("telemetry_type", "device_metrics") channel_index = arguments.get("channel_index", 0) # Set up telemetry callback iface.onResponseTelemetry = on_telemetry_response iface.sendTelemetry( destinationId=destination, wantResponse=True, channelIndex=channel_index, telemetryType=telemetry_type ) return [TextContent( type="text", text=f"Telemetry request ({telemetry_type}) sent to {destination}\nUse 'get_telemetry_results' to see responses." )] elif name == "get_telemetry_results": if not telemetry_results: return [TextContent( type="text", text="No telemetry data available yet" )] return [TextContent( type="text", text=f"Telemetry data:\n{json.dumps(telemetry_results, indent=2, default=str)}" )] # Position tools elif name == "send_position": iface = get_interface() lat = arguments["latitude"] lon = arguments["longitude"] alt = arguments.get("altitude", 0) destination = arguments.get("destination", "^all") iface.sendPosition( latitude=lat, longitude=lon, altitude=alt, destinationId=destination ) return [TextContent( type="text", text=f"Position sent to {destination}: {lat}, {lon}, {alt}m" )] # Waypoint tools elif name == "send_waypoint": iface = get_interface() name_val = arguments["name"] description = arguments.get("description", "") lat = arguments["latitude"] lon = arguments["longitude"] icon = arguments.get("icon", 0) expire = arguments.get("expire", 0) waypoint_id = arguments.get("waypoint_id") destination = arguments.get("destination", "^all") iface.sendWaypoint( name=name_val, description=description, latitude=lat, longitude=lon, icon=icon, expire=expire, waypoint_id=waypoint_id, destinationId=destination ) return [TextContent( type="text", text=f"Waypoint '{name_val}' sent to {destination} at ({lat}, {lon})" )] elif name == "delete_waypoint": iface = get_interface() waypoint_id = arguments["waypoint_id"] destination = arguments.get("destination", "^all") iface.deleteWaypoint( waypoint_id=waypoint_id, destinationId=destination ) return [TextContent( type="text", text=f"Waypoint {waypoint_id} deletion request sent to {destination}" )] # Channel tools elif name == "get_channels": iface = get_interface() channels = iface.localNode.channels channel_info = [] for i, channel in enumerate(channels): if channel.settings.name: channel_info.append({ 'index': i, 'name': channel.settings.name, 'role': str(channel.role) }) return [TextContent( type="text", text=f"Channels:\n{json.dumps(channel_info, indent=2)}" )] # Device management tools elif name == "get_device_metadata": iface = get_interface() node = iface.getNode(iface.localNode.nodeNum) metadata = node.getMetadata() return [TextContent( type="text", text=f"Device metadata:\n{json.dumps(metadata, indent=2, default=str)}" )] elif name == "set_owner": iface = get_interface() long_name = arguments.get("long_name") short_name = arguments.get("short_name") is_licensed = arguments.get("is_licensed", False) node = iface.getNode(iface.localNode.nodeNum) node.setOwner( long_name=long_name, short_name=short_name, is_licensed=is_licensed ) return [TextContent( type="text", text=f"Owner set: {long_name} ({short_name})" )] elif name == "reboot_node": iface = get_interface() seconds = arguments.get("seconds", 10) node = iface.getNode(iface.localNode.nodeNum) node.reboot(secs=seconds) return [TextContent( type="text", text=f"Reboot command sent. Device will reboot in {seconds} seconds." )] elif name == "shutdown_node": iface = get_interface() seconds = arguments.get("seconds", 10) node = iface.getNode(iface.localNode.nodeNum) node.shutdown(secs=seconds) return [TextContent( type="text", text=f"Shutdown command sent. Device will shutdown in {seconds} seconds." )] else: return [TextContent( type="text", text=f"Unknown tool: {name}" )] except Exception as e: logger.error(f"Error executing tool {name}: {e}") return [TextContent( type="text", text=f"Error: {str(e)}" )] # Run the server async with stdio_server() as (read_stream, write_stream): await server.run( read_stream, write_stream, server.create_initialization_options() ) if __name__ == "__main__": asyncio.run(main())

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/ConsentirDev/meshtastic.mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server