mcp_server_nordic_thingy.py•14.8 kB
#!/usr/bin/env python3
"""
Nordic Thingy:52 MCP Server
Model Context Protocol server for controlling Nordic Thingy:52 IoT devices via Bluetooth LE
Version: 1.0.0
Author: Community Contributors
License: MIT
Repository: https://github.com/yourusername/mcp-server-nordic-thingy
"""
import asyncio
import json
import logging
from typing import Any, Optional
from contextlib import asynccontextmanager
from mcp.server import Server
from mcp.types import Tool, TextContent, ImageContent, EmbeddedResource
from pydantic import Field
from bleak import BleakClient, BleakScanner
from bleak.backends.device import BLEDevice
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("nordic-thingy-mcp")
# Nordic Thingy:52 UUIDs
THINGY_SERVICE_UUID = "ef680100-9b35-4933-9b10-52ffa9740042"
LED_CHAR_UUID = "ef680301-9b35-4933-9b10-52ffa9740042"
SOUND_CHAR_UUID = "ef680502-9b35-4933-9b10-52ffa9740042"
SPEAKER_CHAR_UUID = "ef680503-9b35-4933-9b10-52ffa9740042"
TEMP_CHAR_UUID = "ef680201-9b35-4933-9b10-52ffa9740042"
PRESSURE_CHAR_UUID = "ef680202-9b35-4933-9b10-52ffa9740042"
HUMIDITY_CHAR_UUID = "ef680203-9b35-4933-9b10-52ffa9740042"
GAS_CHAR_UUID = "ef680204-9b35-4933-9b10-52ffa9740042"
COLOR_CHAR_UUID = "ef680205-9b35-4933-9b10-52ffa9740042"
# Global state
current_client: Optional[BleakClient] = None
current_device: Optional[BLEDevice] = None
# MCP Server instance
app = Server("nordic-thingy")
async def scan_for_thingys(timeout: float = 5.0) -> list[dict[str, Any]]:
"""Scan for Nordic Thingy devices"""
logger.info(f"Scanning for Thingy devices (timeout: {timeout}s)...")
devices = await BleakScanner.discover(timeout=timeout)
thingys = []
for device in devices:
# Nordic Thingy devices typically have "Thingy" in their name
if device.name and "Thingy" in device.name:
thingys.append({
"name": device.name,
"address": device.address,
"rssi": device.rssi if hasattr(device, 'rssi') else None
})
logger.info(f"Found {len(thingys)} Thingy device(s)")
return thingys
async def connect_to_thingy(address: str) -> bool:
"""Connect to a Thingy device"""
global current_client, current_device
try:
logger.info(f"Connecting to {address}...")
# Disconnect if already connected
if current_client and current_client.is_connected:
await current_client.disconnect()
# Connect to new device
current_client = BleakClient(address)
await current_client.connect()
current_device = current_client._device
logger.info(f"Connected successfully to {address}")
return True
except Exception as e:
logger.error(f"Connection failed: {e}")
current_client = None
current_device = None
return False
async def disconnect_thingy():
"""Disconnect from current Thingy"""
global current_client, current_device
if current_client and current_client.is_connected:
await current_client.disconnect()
logger.info("Disconnected from Thingy")
current_client = None
current_device = None
@app.list_tools()
async def list_tools() -> list[Tool]:
"""List available tools for Nordic Thingy control"""
return [
Tool(
name="thingy_scan",
description="Scan for nearby Nordic Thingy:52 devices. Returns list of discovered devices with name, address, and signal strength.",
inputSchema={
"type": "object",
"properties": {
"timeout": {
"type": "number",
"description": "Scan timeout in seconds (default: 5)",
"default": 5.0
}
}
}
),
Tool(
name="thingy_connect",
description="Connect to a Nordic Thingy:52 device by Bluetooth address. Must be called before using control commands.",
inputSchema={
"type": "object",
"properties": {
"address": {
"type": "string",
"description": "Bluetooth MAC address of the Thingy device (e.g., 'XX:XX:XX:XX:XX:XX')"
}
},
"required": ["address"]
}
),
Tool(
name="thingy_disconnect",
description="Disconnect from the currently connected Nordic Thingy device.",
inputSchema={
"type": "object",
"properties": {}
}
),
Tool(
name="thingy_set_led",
description="Set the RGB LED color on the Thingy. Values range from 0-255 for each color channel.",
inputSchema={
"type": "object",
"properties": {
"r": {
"type": "integer",
"description": "Red value (0-255)",
"minimum": 0,
"maximum": 255
},
"g": {
"type": "integer",
"description": "Green value (0-255)",
"minimum": 0,
"maximum": 255
},
"b": {
"type": "integer",
"description": "Blue value (0-255)",
"minimum": 0,
"maximum": 255
}
},
"required": ["r", "g", "b"]
}
),
Tool(
name="thingy_led_off",
description="Turn off the Thingy LED (set to black/0,0,0)",
inputSchema={
"type": "object",
"properties": {}
}
),
Tool(
name="thingy_play_sound",
description="Play a pre-programmed sound on the Thingy speaker. Sound IDs: 1-8 (different tones and effects)",
inputSchema={
"type": "object",
"properties": {
"sound_id": {
"type": "integer",
"description": "Sound ID to play (1-8)",
"minimum": 1,
"maximum": 8
}
},
"required": ["sound_id"]
}
),
Tool(
name="thingy_beep",
description="Play a quick beep sound (equivalent to sound ID 1). Quick way to get audio feedback.",
inputSchema={
"type": "object",
"properties": {}
}
),
Tool(
name="thingy_get_sensors",
description="Read all sensor values from the Thingy: temperature, humidity, pressure, air quality (CO2/TVOC), and color sensor. Returns comprehensive environmental data.",
inputSchema={
"type": "object",
"properties": {}
}
),
Tool(
name="thingy_get_status",
description="Get current connection status and device information",
inputSchema={
"type": "object",
"properties": {}
}
)
]
@app.call_tool()
async def call_tool(name: str, arguments: Any) -> list[TextContent]:
"""Handle tool calls"""
try:
if name == "thingy_scan":
timeout = arguments.get("timeout", 5.0)
devices = await scan_for_thingys(timeout)
if not devices:
return [TextContent(
type="text",
text="No Nordic Thingy devices found. Make sure your Thingy is powered on and nearby."
)]
result = "Found Nordic Thingy devices:\n\n"
for i, device in enumerate(devices, 1):
result += f"{i}. {device['name']}\n"
result += f" Address: {device['address']}\n"
if device['rssi']:
result += f" Signal: {device['rssi']} dBm\n"
result += "\n"
return [TextContent(type="text", text=result)]
elif name == "thingy_connect":
address = arguments["address"]
success = await connect_to_thingy(address)
if success:
return [TextContent(
type="text",
text=f"✅ Successfully connected to Thingy at {address}\n\nYou can now use control commands like setting LED colors, playing sounds, and reading sensors."
)]
else:
return [TextContent(
type="text",
text=f"❌ Failed to connect to {address}. Please check the address and try again."
)]
elif name == "thingy_disconnect":
await disconnect_thingy()
return [TextContent(
type="text",
text="Disconnected from Thingy device"
)]
elif name == "thingy_set_led":
if not current_client or not current_client.is_connected:
return [TextContent(
type="text",
text="❌ Not connected to any Thingy device. Use thingy_connect first."
)]
r = arguments["r"]
g = arguments["g"]
b = arguments["b"]
# LED mode: 0x01 = constant, then RGB values
data = bytes([0x01, r, g, b])
await current_client.write_gatt_char(LED_CHAR_UUID, data)
return [TextContent(
type="text",
text=f"✅ LED set to RGB({r}, {g}, {b})"
)]
elif name == "thingy_led_off":
if not current_client or not current_client.is_connected:
return [TextContent(
type="text",
text="❌ Not connected. Use thingy_connect first."
)]
data = bytes([0x01, 0, 0, 0])
await current_client.write_gatt_char(LED_CHAR_UUID, data)
return [TextContent(type="text", text="✅ LED turned off")]
elif name == "thingy_play_sound":
if not current_client or not current_client.is_connected:
return [TextContent(
type="text",
text="❌ Not connected. Use thingy_connect first."
)]
sound_id = arguments["sound_id"]
data = bytes([sound_id])
await current_client.write_gatt_char(SOUND_CHAR_UUID, data)
return [TextContent(
type="text",
text=f"🔊 Playing sound {sound_id}"
)]
elif name == "thingy_beep":
if not current_client or not current_client.is_connected:
return [TextContent(
type="text",
text="❌ Not connected. Use thingy_connect first."
)]
data = bytes([0x01])
await current_client.write_gatt_char(SOUND_CHAR_UUID, data)
return [TextContent(type="text", text="🔊 Beep!")]
elif name == "thingy_get_sensors":
if not current_client or not current_client.is_connected:
return [TextContent(
type="text",
text="❌ Not connected. Use thingy_connect first."
)]
# Read temperature
temp_data = await current_client.read_gatt_char(TEMP_CHAR_UUID)
temp_int = int.from_bytes(temp_data[0:1], byteorder='little', signed=True)
temp_dec = int.from_bytes(temp_data[1:2], byteorder='little')
temperature = temp_int + temp_dec / 100.0
# Read humidity
humidity_data = await current_client.read_gatt_char(HUMIDITY_CHAR_UUID)
humidity = int.from_bytes(humidity_data, byteorder='little')
# Read pressure
pressure_data = await current_client.read_gatt_char(PRESSURE_CHAR_UUID)
pressure_int = int.from_bytes(pressure_data[0:4], byteorder='little')
pressure_dec = int.from_bytes(pressure_data[4:5], byteorder='little')
pressure = pressure_int + pressure_dec / 100.0
# Read gas (air quality)
gas_data = await current_client.read_gatt_char(GAS_CHAR_UUID)
co2_ppm = int.from_bytes(gas_data[0:2], byteorder='little')
tvoc_ppb = int.from_bytes(gas_data[2:4], byteorder='little')
result = f"""📊 Thingy Sensor Readings:
🌡️ Temperature: {temperature:.2f}°C
💧 Humidity: {humidity}%
🔽 Pressure: {pressure:.2f} hPa
🌫️ CO2: {co2_ppm} ppm
🌬️ TVOC: {tvoc_ppb} ppb
Air Quality: {"Good" if co2_ppm < 800 else "Moderate" if co2_ppm < 1200 else "Poor"}
"""
return [TextContent(type="text", text=result)]
elif name == "thingy_get_status":
if current_client and current_client.is_connected:
status = f"""✅ Connected to Thingy
Device: {current_device.name if current_device else "Unknown"}
Address: {current_device.address if current_device else "Unknown"}
Available commands:
- Set LED colors
- Play sounds
- Read sensors
- Control hardware
"""
else:
status = """❌ Not connected to any Thingy device
Use 'thingy_scan' to find devices, then 'thingy_connect' to connect.
"""
return [TextContent(type="text", text=status)]
else:
return [TextContent(
type="text",
text=f"Unknown tool: {name}"
)]
except Exception as e:
logger.error(f"Tool execution error: {e}")
return [TextContent(
type="text",
text=f"❌ Error: {str(e)}"
)]
async def main():
"""Main entry point"""
from mcp.server.stdio import stdio_server
logger.info("Starting Nordic Thingy MCP Server...")
async with stdio_server() as (read_stream, write_stream):
await app.run(
read_stream,
write_stream,
app.create_initialization_options()
)
if __name__ == "__main__":
asyncio.run(main())