blinds.py•5.2 kB
import json
import os
import time
import aiohttp
from mcp.server.fastmcp import FastMCP
# Initialize FastMCP server
mcp = FastMCP("control_blinds")
# Constants from environment variables
MY_TOKEN = os.getenv("MY_TOKEN")
MY_IP = os.getenv("MY_IP")
def get_devices(blind_name="all", blind_location="all", blind_row="all"):
"""
Get device IDs based on filtering criteria.
:param blind_name: Device name or "all" to match any name
:param blind_location: Device location or "all" to match any location
:param blind_row: Device row or "all" to match any row
:return: List of device IDs matching the criteria
"""
with open('data/devices.json', 'r') as f:
devices = json.load(f)
assert len(devices) > 0, f"Expected at least one devices, got {len(devices)}"
result = []
for device in devices:
if (blind_name == "all" or device["name"] == blind_name) and \
(blind_location == "all" or device["location"] == blind_location) and \
(blind_row == "all" or device["row"] == blind_row):
result.append(device)
return result
@mcp.tool()
async def set_blinds_state(action: str, blind_name="all", blind_location="all", blind_row="all") -> str:
"""
Control window blinds by sending commands to API. It supports opening, closing, or stopping blinds in motion,
with filtering capabilities to target specific blinds by name, location, or row.
Args:
action (str): The command to send to the blinds. Must be one of:
- "Open": Fully open the blinds
- "Close": Fully close the blinds
- "Hold": Stop blinds movement at current position
blind_name (str, optional): Filter by blind name. Options:
- "all" (default): Target all blinds regardless of name
- "front": Target only the front blind
- "sunrise": Target only the sunrise blind
- "sunset": Target only the sunset blind
blind_location (str, optional): Filter by blind location. Options:
- "all" (default): Target all blinds regardless of location
- "foyer": Target only blinds in the foyer
- "office": Target only blinds in the office
blind_row (str, optional): Filter by blind row position. Options:
- "all" (default): Target all blinds regardless of row
- "top": Target only top row blinds
- "bottom": Target only bottom row blinds
Returns:
str: Response text from the API, or error message if request fails.
Successful responses typically contain JSON with operation status.
Raises:
ValueError: If any parameter contains an invalid value not in the allowed options.
Examples:
# Open all blinds
await set_blinds_state("Open")
# Close only office blinds
await set_blinds_state("Close", blind_location="office")
# Stop movement of sunset blinds
await set_blinds_state("Hold", blind_name="sunset")
# Close bottom row blinds in office
await set_blinds_state("Close", blind_location="office", blind_row="bottom")
Notes:
- Function processes all matching devices but only returns response from last device
"""
if action not in {"Open", "Close", "Hold"}:
raise ValueError("Invalid action. Must be 'Open', 'Close', or 'Hold'.")
if blind_name not in {"all", "front", "sunrise", "sunset"}:
raise ValueError('Invalid blind_name. Must be "all", "front", "sunrise", "sunset"')
if blind_location not in {"all", "foyer", "office"}:
raise ValueError('Invalid blind_location. Must be "all", "foyer", "office"')
if blind_row not in {"all", "top", "bottom"}:
raise ValueError('Invalid blind_row. Must be "all", "top", "bottom"')
devices = get_devices(blind_name, blind_location, blind_row)
ret = ["action" + "=" + action + "\n", "blind_name" + ": " + blind_name + "\n",
"blind_location" + ": " + blind_location + "\n", "blind_row" + ": " + blind_row + "\n",
"devices" + ": " + str(len(devices)) + "\n" + "----------------\n"]
for device in devices:
url = f"http://{MY_IP}/v2/devices/{device['id']}/actions/{action}"
headers = {
"BOND-Token": MY_TOKEN,
"Content-Type": "application/json"
}
data = {"open": 0}
async with aiohttp.ClientSession() as session:
try:
async with session.put(url, headers=headers, json=data) as response:
text = await response.text()
res = f"{device['name']} {device['location']} {action} → {response.status}, {response.text}"
ret.append(res + "\n")
except Exception as e:
print(f"Error sending {action} command: {e}")
ret.append("Error:" + str(e) + "\n")
time.sleep(1)
print(ret)
return "".join(ret)
if __name__ == "__main__":
# asyncio.run(main())
# Initialize and run the server
mcp.run(transport='stdio')