Skip to main content
Glama

Nordic Thingy:52 MCP Server

by karthiksuku
mcp_server_nordic_thingy.py14.8 kB
#!/usr/bin/env python3 """ Nordic Thingy:52 MCP Server Model Context Protocol server for controlling Nordic Thingy:52 IoT devices via Bluetooth LE Version: 1.0.0 Author: Community Contributors License: MIT Repository: https://github.com/yourusername/mcp-server-nordic-thingy """ import asyncio import json import logging from typing import Any, Optional from contextlib import asynccontextmanager from mcp.server import Server from mcp.types import Tool, TextContent, ImageContent, EmbeddedResource from pydantic import Field from bleak import BleakClient, BleakScanner from bleak.backends.device import BLEDevice # Configure logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger("nordic-thingy-mcp") # Nordic Thingy:52 UUIDs THINGY_SERVICE_UUID = "ef680100-9b35-4933-9b10-52ffa9740042" LED_CHAR_UUID = "ef680301-9b35-4933-9b10-52ffa9740042" SOUND_CHAR_UUID = "ef680502-9b35-4933-9b10-52ffa9740042" SPEAKER_CHAR_UUID = "ef680503-9b35-4933-9b10-52ffa9740042" TEMP_CHAR_UUID = "ef680201-9b35-4933-9b10-52ffa9740042" PRESSURE_CHAR_UUID = "ef680202-9b35-4933-9b10-52ffa9740042" HUMIDITY_CHAR_UUID = "ef680203-9b35-4933-9b10-52ffa9740042" GAS_CHAR_UUID = "ef680204-9b35-4933-9b10-52ffa9740042" COLOR_CHAR_UUID = "ef680205-9b35-4933-9b10-52ffa9740042" # Global state current_client: Optional[BleakClient] = None current_device: Optional[BLEDevice] = None # MCP Server instance app = Server("nordic-thingy") async def scan_for_thingys(timeout: float = 5.0) -> list[dict[str, Any]]: """Scan for Nordic Thingy devices""" logger.info(f"Scanning for Thingy devices (timeout: {timeout}s)...") devices = await BleakScanner.discover(timeout=timeout) thingys = [] for device in devices: # Nordic Thingy devices typically have "Thingy" in their name if device.name and "Thingy" in device.name: thingys.append({ "name": device.name, "address": device.address, "rssi": device.rssi if hasattr(device, 'rssi') else None }) logger.info(f"Found {len(thingys)} Thingy device(s)") return thingys async def connect_to_thingy(address: str) -> bool: """Connect to a Thingy device""" global current_client, current_device try: logger.info(f"Connecting to {address}...") # Disconnect if already connected if current_client and current_client.is_connected: await current_client.disconnect() # Connect to new device current_client = BleakClient(address) await current_client.connect() current_device = current_client._device logger.info(f"Connected successfully to {address}") return True except Exception as e: logger.error(f"Connection failed: {e}") current_client = None current_device = None return False async def disconnect_thingy(): """Disconnect from current Thingy""" global current_client, current_device if current_client and current_client.is_connected: await current_client.disconnect() logger.info("Disconnected from Thingy") current_client = None current_device = None @app.list_tools() async def list_tools() -> list[Tool]: """List available tools for Nordic Thingy control""" return [ Tool( name="thingy_scan", description="Scan for nearby Nordic Thingy:52 devices. Returns list of discovered devices with name, address, and signal strength.", inputSchema={ "type": "object", "properties": { "timeout": { "type": "number", "description": "Scan timeout in seconds (default: 5)", "default": 5.0 } } } ), Tool( name="thingy_connect", description="Connect to a Nordic Thingy:52 device by Bluetooth address. Must be called before using control commands.", inputSchema={ "type": "object", "properties": { "address": { "type": "string", "description": "Bluetooth MAC address of the Thingy device (e.g., 'XX:XX:XX:XX:XX:XX')" } }, "required": ["address"] } ), Tool( name="thingy_disconnect", description="Disconnect from the currently connected Nordic Thingy device.", inputSchema={ "type": "object", "properties": {} } ), Tool( name="thingy_set_led", description="Set the RGB LED color on the Thingy. Values range from 0-255 for each color channel.", inputSchema={ "type": "object", "properties": { "r": { "type": "integer", "description": "Red value (0-255)", "minimum": 0, "maximum": 255 }, "g": { "type": "integer", "description": "Green value (0-255)", "minimum": 0, "maximum": 255 }, "b": { "type": "integer", "description": "Blue value (0-255)", "minimum": 0, "maximum": 255 } }, "required": ["r", "g", "b"] } ), Tool( name="thingy_led_off", description="Turn off the Thingy LED (set to black/0,0,0)", inputSchema={ "type": "object", "properties": {} } ), Tool( name="thingy_play_sound", description="Play a pre-programmed sound on the Thingy speaker. Sound IDs: 1-8 (different tones and effects)", inputSchema={ "type": "object", "properties": { "sound_id": { "type": "integer", "description": "Sound ID to play (1-8)", "minimum": 1, "maximum": 8 } }, "required": ["sound_id"] } ), Tool( name="thingy_beep", description="Play a quick beep sound (equivalent to sound ID 1). Quick way to get audio feedback.", inputSchema={ "type": "object", "properties": {} } ), Tool( name="thingy_get_sensors", description="Read all sensor values from the Thingy: temperature, humidity, pressure, air quality (CO2/TVOC), and color sensor. Returns comprehensive environmental data.", inputSchema={ "type": "object", "properties": {} } ), Tool( name="thingy_get_status", description="Get current connection status and device information", inputSchema={ "type": "object", "properties": {} } ) ] @app.call_tool() async def call_tool(name: str, arguments: Any) -> list[TextContent]: """Handle tool calls""" try: if name == "thingy_scan": timeout = arguments.get("timeout", 5.0) devices = await scan_for_thingys(timeout) if not devices: return [TextContent( type="text", text="No Nordic Thingy devices found. Make sure your Thingy is powered on and nearby." )] result = "Found Nordic Thingy devices:\n\n" for i, device in enumerate(devices, 1): result += f"{i}. {device['name']}\n" result += f" Address: {device['address']}\n" if device['rssi']: result += f" Signal: {device['rssi']} dBm\n" result += "\n" return [TextContent(type="text", text=result)] elif name == "thingy_connect": address = arguments["address"] success = await connect_to_thingy(address) if success: return [TextContent( type="text", text=f"✅ Successfully connected to Thingy at {address}\n\nYou can now use control commands like setting LED colors, playing sounds, and reading sensors." )] else: return [TextContent( type="text", text=f"❌ Failed to connect to {address}. Please check the address and try again." )] elif name == "thingy_disconnect": await disconnect_thingy() return [TextContent( type="text", text="Disconnected from Thingy device" )] elif name == "thingy_set_led": if not current_client or not current_client.is_connected: return [TextContent( type="text", text="❌ Not connected to any Thingy device. Use thingy_connect first." )] r = arguments["r"] g = arguments["g"] b = arguments["b"] # LED mode: 0x01 = constant, then RGB values data = bytes([0x01, r, g, b]) await current_client.write_gatt_char(LED_CHAR_UUID, data) return [TextContent( type="text", text=f"✅ LED set to RGB({r}, {g}, {b})" )] elif name == "thingy_led_off": if not current_client or not current_client.is_connected: return [TextContent( type="text", text="❌ Not connected. Use thingy_connect first." )] data = bytes([0x01, 0, 0, 0]) await current_client.write_gatt_char(LED_CHAR_UUID, data) return [TextContent(type="text", text="✅ LED turned off")] elif name == "thingy_play_sound": if not current_client or not current_client.is_connected: return [TextContent( type="text", text="❌ Not connected. Use thingy_connect first." )] sound_id = arguments["sound_id"] data = bytes([sound_id]) await current_client.write_gatt_char(SOUND_CHAR_UUID, data) return [TextContent( type="text", text=f"🔊 Playing sound {sound_id}" )] elif name == "thingy_beep": if not current_client or not current_client.is_connected: return [TextContent( type="text", text="❌ Not connected. Use thingy_connect first." )] data = bytes([0x01]) await current_client.write_gatt_char(SOUND_CHAR_UUID, data) return [TextContent(type="text", text="🔊 Beep!")] elif name == "thingy_get_sensors": if not current_client or not current_client.is_connected: return [TextContent( type="text", text="❌ Not connected. Use thingy_connect first." )] # Read temperature temp_data = await current_client.read_gatt_char(TEMP_CHAR_UUID) temp_int = int.from_bytes(temp_data[0:1], byteorder='little', signed=True) temp_dec = int.from_bytes(temp_data[1:2], byteorder='little') temperature = temp_int + temp_dec / 100.0 # Read humidity humidity_data = await current_client.read_gatt_char(HUMIDITY_CHAR_UUID) humidity = int.from_bytes(humidity_data, byteorder='little') # Read pressure pressure_data = await current_client.read_gatt_char(PRESSURE_CHAR_UUID) pressure_int = int.from_bytes(pressure_data[0:4], byteorder='little') pressure_dec = int.from_bytes(pressure_data[4:5], byteorder='little') pressure = pressure_int + pressure_dec / 100.0 # Read gas (air quality) gas_data = await current_client.read_gatt_char(GAS_CHAR_UUID) co2_ppm = int.from_bytes(gas_data[0:2], byteorder='little') tvoc_ppb = int.from_bytes(gas_data[2:4], byteorder='little') result = f"""📊 Thingy Sensor Readings: 🌡️ Temperature: {temperature:.2f}°C 💧 Humidity: {humidity}% 🔽 Pressure: {pressure:.2f} hPa 🌫️ CO2: {co2_ppm} ppm 🌬️ TVOC: {tvoc_ppb} ppb Air Quality: {"Good" if co2_ppm < 800 else "Moderate" if co2_ppm < 1200 else "Poor"} """ return [TextContent(type="text", text=result)] elif name == "thingy_get_status": if current_client and current_client.is_connected: status = f"""✅ Connected to Thingy Device: {current_device.name if current_device else "Unknown"} Address: {current_device.address if current_device else "Unknown"} Available commands: - Set LED colors - Play sounds - Read sensors - Control hardware """ else: status = """❌ Not connected to any Thingy device Use 'thingy_scan' to find devices, then 'thingy_connect' to connect. """ return [TextContent(type="text", text=status)] else: return [TextContent( type="text", text=f"Unknown tool: {name}" )] except Exception as e: logger.error(f"Tool execution error: {e}") return [TextContent( type="text", text=f"❌ Error: {str(e)}" )] async def main(): """Main entry point""" from mcp.server.stdio import stdio_server logger.info("Starting Nordic Thingy MCP Server...") async with stdio_server() as (read_stream, write_stream): await app.run( read_stream, write_stream, app.create_initialization_options() ) if __name__ == "__main__": asyncio.run(main())

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/karthiksuku/nordic-thingy-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server