MCP Personal Assistant Agent
- modules
from typing import List, Dict, Any, Optional
import os
import logging
import httpx
import json
import re
# Import the MCP server instance from the main file
from mcp_server import mcp, Context
logger = logging.getLogger("mcp-pa-agent.smarthome")
# Helper functions
async def get_home_assistant_client():
"""Get a client for Home Assistant if configured."""
base_url = os.getenv("HOME_ASSISTANT_URL")
token = os.getenv("HOME_ASSISTANT_TOKEN")
if not base_url or not token:
return None
headers = {
"Authorization": f"Bearer {token}",
"Content-Type": "application/json",
}
return httpx.AsyncClient(base_url=base_url, headers=headers, timeout=10.0)
# Prompts
@mcp.prompt()
def create_scene_prompt(name: str) -> str:
"""Create a prompt for defining a smart home scene"""
return f"Please help me create a smart home scene called '{name}'. What devices should be included and what states should they have? List specific devices and their desired settings."
@mcp.prompt()
def automation_prompt(trigger: str) -> str:
"""Create a prompt for defining a smart home automation"""
return f"Please help me create a smart home automation with the trigger '{trigger}'. What actions should occur when this trigger happens?"
# Resources
@mcp.resource("smarthome://devices")
async def all_devices_resource() -> str:
"""Provide all smart home devices as a resource"""
client = await get_home_assistant_client()
if not client:
return "Smart home integration is not available. Please configure HOME_ASSISTANT_URL and HOME_ASSISTANT_TOKEN."
try:
# In a real implementation, you would make an actual API call
# This is a demonstration response
mock_devices = [
{"entity_id": "light.living_room", "state": "on", "attributes": {"brightness": 255, "friendly_name": "Living Room Light"}},
{"entity_id": "light.kitchen", "state": "off", "attributes": {"friendly_name": "Kitchen Light"}},
{"entity_id": "switch.office_fan", "state": "on", "attributes": {"friendly_name": "Office Fan"}},
{"entity_id": "climate.thermostat", "state": "heat", "attributes": {"temperature": 72, "friendly_name": "Thermostat"}}
]
return json.dumps(mock_devices, indent=2)
except Exception as e:
logger.error(f"Error listing devices: {str(e)}")
return f"Error listing devices: {str(e)}"
finally:
await client.aclose()
@mcp.resource("smarthome://device/{entity_id}")
async def device_resource(entity_id: str) -> str:
"""Provide details about a specific device"""
client = await get_home_assistant_client()
if not client:
return "Smart home integration is not available. Please configure HOME_ASSISTANT_URL and HOME_ASSISTANT_TOKEN."
try:
# Mock responses for different device types
mock_states = {
"light.living_room": {
"state": "on",
"attributes": {
"brightness": 255,
"friendly_name": "Living Room Light",
"color_temp": 300
}
},
"climate.thermostat": {
"state": "heat",
"attributes": {
"temperature": 72,
"current_temperature": 70,
"hvac_modes": ["auto", "heat", "cool", "off"],
"friendly_name": "Thermostat"
}
}
}
if entity_id not in mock_states:
return f"Device {entity_id} not found."
return json.dumps(mock_states[entity_id], indent=2)
except Exception as e:
logger.error(f"Error getting device state: {str(e)}")
return f"Error getting device state: {str(e)}"
finally:
await client.aclose()
# Tool functions
@mcp.tool()
async def list_devices(ctx: Context = None) -> str:
"""List all available smart home devices."""
if ctx:
ctx.info("Listing all smart home devices")
client = await get_home_assistant_client()
if not client:
error_msg = "Smart home integration is not available. Please configure HOME_ASSISTANT_URL and HOME_ASSISTANT_TOKEN."
if ctx:
ctx.error(error_msg)
return error_msg
try:
if ctx:
ctx.info("Fetching devices from Home Assistant")
# In a real implementation, you would make an actual API call
# This is a demonstration response
mock_devices = [
{"entity_id": "light.living_room", "state": "on", "attributes": {"brightness": 255, "friendly_name": "Living Room Light"}},
{"entity_id": "light.kitchen", "state": "off", "attributes": {"friendly_name": "Kitchen Light"}},
{"entity_id": "switch.office_fan", "state": "on", "attributes": {"friendly_name": "Office Fan"}},
{"entity_id": "climate.thermostat", "state": "heat", "attributes": {"temperature": 72, "friendly_name": "Thermostat"}}
]
if not mock_devices:
return "No smart home devices found."
formatted_devices = []
for i, device in enumerate(mock_devices):
if ctx:
await ctx.report_progress(i, len(mock_devices))
attributes = device.get("attributes", {})
friendly_name = attributes.get("friendly_name", device.get("entity_id", "Unknown"))
device_info = f"Device: {friendly_name}\nID: {device.get('entity_id', 'Unknown')}\nState: {device.get('state', 'Unknown')}"
# Add additional attributes based on device type
if "brightness" in attributes:
device_info += f"\nBrightness: {attributes['brightness']}"
if "temperature" in attributes:
device_info += f"\nTemperature: {attributes['temperature']}°F"
formatted_devices.append(device_info)
return "\n---\n".join(formatted_devices)
except Exception as e:
error_msg = f"Error listing devices: {str(e)}"
if ctx:
ctx.error(error_msg)
return error_msg
finally:
await client.aclose()
@mcp.tool()
async def control_device(entity_id: str, action: str, value: Optional[int] = None, ctx: Context = None) -> str:
"""Control a smart home device.
Args:
entity_id: The entity ID of the device to control
action: The action to perform (on, off, set_temperature, brightness, etc.)
value: Optional value for the action (e.g., brightness level, temperature)
"""
if ctx:
if value is not None:
ctx.info(f"Controlling device {entity_id}: {action} = {value}")
else:
ctx.info(f"Controlling device {entity_id}: {action}")
if not entity_id or len(entity_id.strip()) == 0:
error_msg = "Entity ID cannot be empty."
if ctx:
ctx.error(error_msg)
return error_msg
if not action or len(action.strip()) == 0:
error_msg = "Action cannot be empty."
if ctx:
ctx.error(error_msg)
return error_msg
# Validate entity_id format
entity_pattern = re.compile(r'^[a-z0-9_]+\.[a-z0-9_]+$')
if not entity_pattern.match(entity_id):
error_msg = f"Invalid entity ID format: {entity_id}. Expected format: domain.entity (e.g., light.living_room)"
if ctx:
ctx.error(error_msg)
return error_msg
client = await get_home_assistant_client()
if not client:
error_msg = "Smart home integration is not available. Please configure HOME_ASSISTANT_URL and HOME_ASSISTANT_TOKEN."
if ctx:
ctx.error(error_msg)
return error_msg
try:
# Validate action based on entity type
valid_actions = {
"light": ["on", "off", "brightness"],
"switch": ["on", "off"],
"climate": ["set_temperature", "set_mode"]
}
entity_type = entity_id.split(".")[0] if "." in entity_id else None
if not entity_type:
error_msg = f"Invalid entity ID: {entity_id}. Missing domain prefix."
if ctx:
ctx.error(error_msg)
return error_msg
if entity_type not in valid_actions:
error_msg = f"Unsupported entity type: {entity_type}. Supported types are: {', '.join(valid_actions.keys())}"
if ctx:
ctx.error(error_msg)
return error_msg
if action.lower() not in valid_actions[entity_type] and action.lower() not in ["on", "off"]:
error_msg = f"Invalid action for {entity_type}: {action}. Supported actions are: {', '.join(valid_actions[entity_type])}"
if ctx:
ctx.error(error_msg)
return error_msg
# In a real implementation, you would make an actual API call
# This is a demonstration response
# Build response based on action
if ctx:
ctx.info(f"Sending {action} command to {entity_id}")
if action.lower() in ["on", "off"]:
service = f"{entity_type}.turn_{action.lower()}"
return f"Successfully turned {action.lower()} {entity_id}"
elif action.lower() == "brightness" and entity_type == "light":
if value is None or not (0 <= value <= 255):
error_msg = "Brightness value must be between 0 and 255."
if ctx:
ctx.error(error_msg)
return error_msg
return f"Successfully set brightness of {entity_id} to {value}"
elif action.lower() == "set_temperature" and entity_type == "climate":
if value is None:
error_msg = "Temperature value is required."
if ctx:
ctx.error(error_msg)
return error_msg
return f"Successfully set temperature of {entity_id} to {value}°F"
elif action.lower() == "set_mode" and entity_type == "climate":
if not value and isinstance(value, str):
error_msg = "Mode value is required."
if ctx:
ctx.error(error_msg)
return error_msg
return f"Successfully set mode of {entity_id} to {value}"
else:
error_msg = f"Action {action} not implemented for {entity_type}"
if ctx:
ctx.error(error_msg)
return error_msg
except Exception as e:
error_msg = f"Error controlling device: {str(e)}"
if ctx:
ctx.error(error_msg)
return error_msg
finally:
await client.aclose()
@mcp.tool()
async def get_device_state(entity_id: str, ctx: Context = None) -> str:
"""Get the current state of a smart home device.
Args:
entity_id: The entity ID of the device
"""
if ctx:
ctx.info(f"Getting state of device: {entity_id}")
if not entity_id or len(entity_id.strip()) == 0:
error_msg = "Entity ID cannot be empty."
if ctx:
ctx.error(error_msg)
return error_msg
# Validate entity_id format
entity_pattern = re.compile(r'^[a-z0-9_]+\.[a-z0-9_]+$')
if not entity_pattern.match(entity_id):
error_msg = f"Invalid entity ID format: {entity_id}. Expected format: domain.entity (e.g., light.living_room)"
if ctx:
ctx.error(error_msg)
return error_msg
client = await get_home_assistant_client()
if not client:
error_msg = "Smart home integration is not available. Please configure HOME_ASSISTANT_URL and HOME_ASSISTANT_TOKEN."
if ctx:
ctx.error(error_msg)
return error_msg
try:
if ctx:
ctx.info(f"Fetching state for {entity_id}")
# In a real implementation, you would make an actual API call
# This is a demonstration response
# Mock responses for different device types
mock_states = {
"light.living_room": {
"state": "on",
"attributes": {
"brightness": 255,
"friendly_name": "Living Room Light",
"color_temp": 300
}
},
"light.kitchen": {
"state": "off",
"attributes": {
"friendly_name": "Kitchen Light"
}
},
"switch.office_fan": {
"state": "on",
"attributes": {
"friendly_name": "Office Fan"
}
},
"climate.thermostat": {
"state": "heat",
"attributes": {
"temperature": 72,
"current_temperature": 70,
"hvac_modes": ["auto", "heat", "cool", "off"],
"friendly_name": "Thermostat"
}
}
}
if entity_id not in mock_states:
error_msg = f"Device {entity_id} not found."
if ctx:
ctx.error(error_msg)
return error_msg
device = mock_states[entity_id]
attributes = device.get("attributes", {})
friendly_name = attributes.get("friendly_name", entity_id)
state_info = f"Device: {friendly_name}\nID: {entity_id}\nState: {device.get('state', 'Unknown')}"
# Add specific attributes based on entity type
entity_type = entity_id.split(".")[0] if "." in entity_id else ""
if entity_type == "light":
if "brightness" in attributes:
state_info += f"\nBrightness: {attributes['brightness']}"
if "color_temp" in attributes:
state_info += f"\nColor Temperature: {attributes['color_temp']}K"
elif entity_type == "climate":
if "temperature" in attributes:
state_info += f"\nSet Temperature: {attributes['temperature']}°F"
if "current_temperature" in attributes:
state_info += f"\nCurrent Temperature: {attributes['current_temperature']}°F"
if "hvac_modes" in attributes:
state_info += f"\nAvailable Modes: {', '.join(attributes['hvac_modes'])}"
return state_info
except Exception as e:
error_msg = f"Error getting device state: {str(e)}"
if ctx:
ctx.error(error_msg)
return error_msg
finally:
await client.aclose()
@mcp.tool()
async def create_scene(name: str, device_states: Dict[str, Any], ctx: Context = None) -> str:
"""Create a new scene with defined device states.
Args:
name: Name for the scene
device_states: Dictionary of device entity_ids and their target states
"""
if ctx:
ctx.info(f"Creating scene: {name}")
if not name or len(name.strip()) == 0:
error_msg = "Scene name cannot be empty."
if ctx:
ctx.error(error_msg)
return error_msg
if not device_states or not isinstance(device_states, dict) or len(device_states) == 0:
error_msg = "Device states must be a non-empty dictionary."
if ctx:
ctx.error(error_msg)
return error_msg
client = await get_home_assistant_client()
if not client:
error_msg = "Smart home integration is not available. Please configure HOME_ASSISTANT_URL and HOME_ASSISTANT_TOKEN."
if ctx:
ctx.error(error_msg)
return error_msg
try:
if ctx:
ctx.info(f"Creating scene with {len(device_states)} device states")
# In a real implementation, you would make an actual API call
# This is a demonstration response
scene_entities = []
for entity_id, state in device_states.items():
if isinstance(state, dict):
scene_entities.append(f"{entity_id}: {json.dumps(state)}")
else:
scene_entities.append(f"{entity_id}: {state}")
return f"Scene '{name}' created successfully with the following device states:\n\n" + "\n".join(scene_entities)
except Exception as e:
error_msg = f"Error creating scene: {str(e)}"
if ctx:
ctx.error(error_msg)
return error_msg
finally:
await client.aclose()