Skip to main content
Glama

Dual MCP Server for IoT & Memory

by jordy33
light_bulb_simulator.py7.6 kB
from flask import Flask, render_template, request, jsonify import paho.mqtt.client as mqtt import json import threading import logging import os import time from dotenv import load_dotenv # Load environment variables load_dotenv() # Configure logging logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) app = Flask(__name__) # MQTT Settings MQTT_BROKER = os.getenv("MQTT_BROKER", "localhost") MQTT_PORT = int(os.getenv("MQTT_PORT", "1883")) DEVICE_ID = "light_bulb_001" COMMAND_TOPIC = f"devices/{DEVICE_ID}/command" STATE_TOPIC = f"devices/{DEVICE_ID}/state" STATE_REQUEST_TOPIC = f"devices/{DEVICE_ID}/state/request" # Global device state device_state = { "device_id": DEVICE_ID, "type": "light", # Add device type "online": True, "last_seen": None, "properties": { "power": False, # False is off, True is on "brightness": 100, "color": "#FFFF00" # Yellow light } } # Initialize MQTT client with protocol version parameter to address deprecation warning mqtt_client = mqtt.Client(protocol=mqtt.MQTTv311) # Set up MQTT callbacks def on_connect(client, userdata, flags, rc): if rc == 0: logger.info(f"Connected to MQTT broker at {MQTT_BROKER}:{MQTT_PORT}") # Subscribe to command topic client.subscribe(COMMAND_TOPIC) client.subscribe(STATE_REQUEST_TOPIC) # Subscribe to broadcast requests client.subscribe("devices/broadcast/request") # Publish initial state immediately after connection publish_state() else: logger.error(f"Failed to connect to MQTT broker with result code {rc}") def on_message(client, userdata, message): topic = message.topic payload = message.payload.decode("utf-8") logger.info(f"Received message on {topic}: {payload}") try: # Handle command messages if topic == COMMAND_TOPIC: handle_command(payload) # Handle state request messages elif topic == STATE_REQUEST_TOPIC: publish_state() # Handle broadcast requests elif topic == "devices/broadcast/request": # Parse the message msg_data = json.loads(payload) action = msg_data.get("action") # If action is to report state, publish our state if action == "report_state": logger.info(f"Responding to broadcast request: {msg_data.get('request_id')}") publish_state() # Also publish to the broadcast response topic response = { "device_id": DEVICE_ID, "response_to": msg_data.get("request_id"), "state": device_state } mqtt_client.publish("devices/broadcast/response", json.dumps(response)) except Exception as e: logger.error(f"Error processing message: {str(e)}") def handle_command(payload_str): try: payload = json.loads(payload_str) command = payload.get("command") logger.info(f"Processing command: {command} with payload: {payload_str}") if command == "toggle": # Toggle power state device_state["properties"]["power"] = not device_state["properties"]["power"] logger.info(f"Toggled light bulb power to: {device_state['properties']['power']}") elif command == "set_power" and "payload" in payload: # Set power state directly power_state = payload["payload"].get("power", False) old_state = device_state["properties"]["power"] device_state["properties"]["power"] = bool(power_state) logger.info(f"Set light bulb power from {old_state} to {device_state['properties']['power']}") elif command == "set_brightness" and "payload" in payload: # Set brightness brightness = payload["payload"].get("brightness", 100) device_state["properties"]["brightness"] = max(0, min(100, int(brightness))) logger.info(f"Set light bulb brightness to: {device_state['properties']['brightness']}") elif command == "set_color" and "payload" in payload: # Set color color = payload["payload"].get("color") if color: device_state["properties"]["color"] = color logger.info(f"Set light bulb color to: {device_state['properties']['color']}") else: logger.warning(f"Unknown command or missing payload: {command}") # Publish updated state publish_state() except json.JSONDecodeError: logger.error(f"Invalid JSON payload: {payload_str}") except Exception as e: logger.error(f"Error handling command: {str(e)}") import traceback logger.error(traceback.format_exc()) def publish_state(): """Publish current state to MQTT""" try: # Update last_seen timestamp device_state["last_seen"] = time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()) mqtt_client.publish(STATE_TOPIC, json.dumps(device_state)) logger.info(f"Published state: {json.dumps(device_state)}") except Exception as e: logger.error(f"Error publishing state: {str(e)}") # Heartbeat function to periodically publish state def heartbeat(): while True: try: publish_state() except Exception as e: logger.error(f"Error in heartbeat: {str(e)}") time.sleep(60) # Publish state every 60 seconds # Routes @app.route('/') def index(): return render_template('index.html', device_state=device_state) @app.route('/api/state', methods=['GET']) def get_state(): return jsonify(device_state) @app.route('/api/toggle', methods=['POST']) def toggle_light(): device_state["properties"]["power"] = not device_state["properties"]["power"] publish_state() return jsonify({"success": True, "power": device_state["properties"]["power"]}) @app.route('/api/brightness', methods=['POST']) def set_brightness(): data = request.json brightness = data.get('brightness', 100) device_state["properties"]["brightness"] = max(0, min(100, int(brightness))) publish_state() return jsonify({"success": True, "brightness": device_state["properties"]["brightness"]}) @app.route('/api/color', methods=['POST']) def set_color(): data = request.json color = data.get('color', "#FFFF00") device_state["properties"]["color"] = color publish_state() return jsonify({"success": True, "color": device_state["properties"]["color"]}) def start_mqtt(): mqtt_client.on_connect = on_connect mqtt_client.on_message = on_message try: mqtt_client.connect(MQTT_BROKER, MQTT_PORT, 60) mqtt_client.loop_start() except Exception as e: logger.error(f"Failed to connect to MQTT broker: {str(e)}") if __name__ == '__main__': # Start MQTT client in a separate thread mqtt_thread = threading.Thread(target=start_mqtt) mqtt_thread.daemon = True mqtt_thread.start() # Start heartbeat in a separate thread heartbeat_thread = threading.Thread(target=heartbeat) heartbeat_thread.daemon = True heartbeat_thread.start() # Give MQTT client time to connect and publish initial state time.sleep(2) # Start Flask web server app.run(host='0.0.0.0', port=7003, debug=True, use_reloader=False)

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/jordy33/iot_mcp_server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server