Skip to main content
Glama
server.pyβ€’14 kB
#!/usr/bin/env python3 """ Meshtastic MCP Server Provides tools for interacting with Meshtastic mesh networks via the Model Context Protocol. """ import asyncio import json import logging from typing import Any, Optional from datetime import datetime import meshtastic import meshtastic.serial_interface import meshtastic.tcp_interface import meshtastic.ble_interface from meshtastic.protobuf import portnums_pb2 from mcp.server import Server from mcp.types import ( Tool, TextContent, ImageContent, EmbeddedResource, LoggingLevel ) from mcp.server.stdio import stdio_server # Configure logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger("meshtastic-mcp") # Global interface - will be initialized on first use interface: Optional[Any] = None message_history = [] def get_interface(connection_type: str = "serial", device: Optional[str] = None): """Get or create the Meshtastic interface.""" global interface if interface is None: logger.info(f"Initializing Meshtastic interface: {connection_type}") if connection_type == "serial": interface = meshtastic.serial_interface.SerialInterface(devPath=device) elif connection_type == "tcp": interface = meshtastic.tcp_interface.TCPInterface(hostname=device or "meshtastic.local") elif connection_type in ["bluetooth", "ble"]: interface = meshtastic.ble_interface.BLEInterface(address=device) else: raise ValueError(f"Unknown connection type: {connection_type}") return interface def on_receive(packet, interface): """Callback for received messages.""" try: if packet.get('decoded'): message_history.append({ 'from': packet.get('fromId', 'unknown'), 'to': packet.get('toId', 'broadcast'), 'text': packet['decoded'].get('text', ''), 'time': datetime.now().isoformat(), 'portnum': packet['decoded'].get('portnum', 'UNKNOWN') }) logger.info(f"Received message from {packet.get('fromId')}: {packet['decoded'].get('text')}") except Exception as e: logger.error(f"Error processing received packet: {e}") async def main(): """Main entry point for the MCP server.""" server = Server("meshtastic-mcp") @server.list_tools() async def list_tools() -> list[Tool]: """List available Meshtastic tools.""" return [ Tool( name="connect", description="Connect to a Meshtastic device via serial, TCP/IP, or Bluetooth", inputSchema={ "type": "object", "properties": { "connection_type": { "type": "string", "enum": ["serial", "tcp", "bluetooth"], "description": "Connection type: 'serial' for USB, 'tcp' for network, or 'bluetooth' for BLE", "default": "serial" }, "device": { "type": "string", "description": "Device path (e.g., /dev/ttyUSB0), hostname for TCP, or BLE MAC address", } }, "required": [] } ), Tool( name="send_message", description="Send a text message to a node or channel on the mesh network", inputSchema={ "type": "object", "properties": { "text": { "type": "string", "description": "The message text to send" }, "destination": { "type": "string", "description": "Node ID or channel name (use '^all' for broadcast)", "default": "^all" } }, "required": ["text"] } ), Tool( name="get_messages", description="Get recent messages received from the mesh network", inputSchema={ "type": "object", "properties": { "limit": { "type": "number", "description": "Maximum number of recent messages to return", "default": 10 } } } ), Tool( name="get_nodes", description="Get information about all nodes in the mesh network", inputSchema={ "type": "object", "properties": {} } ), Tool( name="get_node_info", description="Get detailed information about a specific node", inputSchema={ "type": "object", "properties": { "node_id": { "type": "string", "description": "The node ID to query" } }, "required": ["node_id"] } ), Tool( name="get_local_node", description="Get information about the locally connected Meshtastic device", inputSchema={ "type": "object", "properties": {} } ), Tool( name="send_position", description="Send GPS position to the mesh network", inputSchema={ "type": "object", "properties": { "latitude": { "type": "number", "description": "Latitude in degrees" }, "longitude": { "type": "number", "description": "Longitude in degrees" }, "altitude": { "type": "number", "description": "Altitude in meters (optional)" } }, "required": ["latitude", "longitude"] } ), Tool( name="get_channels", description="Get list of configured channels on the device", inputSchema={ "type": "object", "properties": {} } ), Tool( name="scan_bluetooth", description="Scan for nearby Meshtastic devices via Bluetooth (useful before connecting). Returns devices that are properly advertising the Meshtastic service.", inputSchema={ "type": "object", "properties": {} } ) ] @server.call_tool() async def call_tool(name: str, arguments: Any) -> list[TextContent]: """Handle tool calls for Meshtastic operations.""" try: if name == "connect": connection_type = arguments.get("connection_type", "serial") device = arguments.get("device") iface = get_interface(connection_type, device) # Set up message callback iface.onReceive = lambda packet, interface=iface: on_receive(packet, interface) return [TextContent( type="text", text=f"Successfully connected to Meshtastic device via {connection_type}" )] elif name == "send_message": iface = get_interface() text = arguments["text"] destination = arguments.get("destination", "^all") iface.sendText(text=text, destinationId=destination) return [TextContent( type="text", text=f"Message sent to {destination}: {text}" )] elif name == "get_messages": limit = arguments.get("limit", 10) recent_messages = message_history[-limit:] if not recent_messages: return [TextContent( type="text", text="No messages received yet" )] messages_text = json.dumps(recent_messages, indent=2) return [TextContent( type="text", text=f"Recent messages:\n{messages_text}" )] elif name == "get_nodes": iface = get_interface() nodes = iface.nodes nodes_info = [] for node_id, node in nodes.items(): node_data = { 'id': node_id, 'user': node.get('user', {}), 'position': node.get('position', {}), 'snr': node.get('snr'), 'lastHeard': node.get('lastHeard'), } nodes_info.append(node_data) return [TextContent( type="text", text=f"Mesh nodes:\n{json.dumps(nodes_info, indent=2, default=str)}" )] elif name == "get_node_info": iface = get_interface() node_id = arguments["node_id"] if node_id in iface.nodes: node = iface.nodes[node_id] return [TextContent( type="text", text=f"Node info:\n{json.dumps(node, indent=2, default=str)}" )] else: return [TextContent( type="text", text=f"Node {node_id} not found in network" )] elif name == "get_local_node": iface = get_interface() local_node = iface.getMyNodeInfo() return [TextContent( type="text", text=f"Local node info:\n{json.dumps(local_node, indent=2, default=str)}" )] elif name == "send_position": iface = get_interface() lat = arguments["latitude"] lon = arguments["longitude"] alt = arguments.get("altitude", 0) iface.sendPosition(latitude=lat, longitude=lon, altitude=alt) return [TextContent( type="text", text=f"Position sent: {lat}, {lon}, {alt}m" )] elif name == "get_channels": iface = get_interface() channels = iface.localNode.channels channel_info = [] for i, channel in enumerate(channels): if channel.settings.name: channel_info.append({ 'index': i, 'name': channel.settings.name, 'role': str(channel.role) }) return [TextContent( type="text", text=f"Channels:\n{json.dumps(channel_info, indent=2)}" )] elif name == "scan_bluetooth": logger.info("Scanning for Meshtastic Bluetooth devices...") # Use meshtastic's built-in scan which filters by service UUID devices = meshtastic.ble_interface.BLEInterface.scan() if not devices: return [TextContent( type="text", text="No Meshtastic devices found in Bluetooth scan.\nMake sure your device has Bluetooth enabled and is advertising." )] # Format device info meshtastic_devices = [] for device in devices: meshtastic_devices.append({ 'name': device.name or "Unknown", 'address': device.address }) return [TextContent( type="text", text=f"Found {len(meshtastic_devices)} Meshtastic device(s):\n{json.dumps(meshtastic_devices, indent=2)}\n\nUse the address/UUID to connect via Bluetooth with the 'connect' tool." )] else: return [TextContent( type="text", text=f"Unknown tool: {name}" )] except Exception as e: logger.error(f"Error executing tool {name}: {e}") return [TextContent( type="text", text=f"Error: {str(e)}" )] # Run the server async with stdio_server() as (read_stream, write_stream): await server.run( read_stream, write_stream, server.create_initialization_options() ) if __name__ == "__main__": asyncio.run(main())

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/ConsentirDev/meshtastic.mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server