Skip to main content
Glama

Vivint Security System MCP Server

by bradmb
server.py48.8 kB
#!/usr/bin/env python3 """Vivint Security System MCP Server. Provides read-only access to Vivint security system data through MCP tools. """ import os import sys import asyncio import logging import secrets import time from typing import Dict, List, Any, Optional from datetime import datetime, timedelta from fastmcp import FastMCP # Import authentication from fastmcp.server.auth.providers.jwt import JWTVerifier from fastmcp.server.auth.providers.in_memory import InMemoryOAuthProvider # Import our custom persistent OAuth provider try: from .persistent_oauth_provider import PersistentOAuthProvider except ImportError: # Handle case when run directly from persistent_oauth_provider import PersistentOAuthProvider from mcp.shared.auth import OAuthClientInformationFull from mcp.server.auth.provider import AuthorizationParams from pydantic import AnyHttpUrl import json # Import Starlette for OAuth routes from starlette.requests import Request from starlette.responses import HTMLResponse, RedirectResponse # Import our custom modules try: from .config import config from .vivint_client import vivint_client, VivintClientError, VivintMfaRequiredError, VivintAuthenticationError from .template_free_oauth_provider import TemplateFreeOAuthProvider except ImportError: # Handle case when run directly import sys import os sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) from config import config from vivint_client import vivint_client, VivintClientError, VivintMfaRequiredError, VivintAuthenticationError from template_free_oauth_provider import TemplateFreeOAuthProvider # Configure logging logging.basicConfig( level=getattr(logging, config.log_level), format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) logger = logging.getLogger(__name__) # Authentication setup def setup_authentication(): """Setup authentication provider based on configuration.""" if not config.auth_enabled: logger.info("Authentication disabled") return None logger.info(f"Setting up {config.auth_type.upper()} authentication") try: if config.auth_type == "jwt": if config.jwt_algorithm.startswith("HS"): # HMAC-based JWT (symmetric key) if not config.auth_secret: raise ValueError("AUTH_SECRET required for HMAC JWT algorithms") auth = JWTVerifier( public_key=config.auth_secret, algorithm=config.jwt_algorithm, issuer=config.jwt_issuer, audience=config.jwt_audience, base_url=f"http://{config.host}:{config.port}" ) logger.info(f"✅ HMAC JWT authentication configured ({config.jwt_algorithm})") elif config.jwt_algorithm.startswith("RS"): # RSA-based JWT (asymmetric key) if not config.jwt_public_key: raise ValueError("JWT_PUBLIC_KEY required for RSA JWT algorithms") auth = JWTVerifier( public_key=config.jwt_public_key, algorithm=config.jwt_algorithm, issuer=config.jwt_issuer, audience=config.jwt_audience, base_url=f"http://{config.host}:{config.port}" ) logger.info(f"✅ RSA JWT authentication configured ({config.jwt_algorithm})") else: raise ValueError(f"Unsupported JWT algorithm: {config.jwt_algorithm}") return auth elif config.auth_type == "api_token": # Simple API token authentication from fastmcp.server.auth import StaticTokenVerifier # StaticTokenVerifier expects a dict mapping token -> metadata tokens = { config.api_token: { "client_id": "api_client", "scopes": ["execute"] } } auth = StaticTokenVerifier(tokens=tokens) logger.info("✅ API token authentication configured") return auth elif config.auth_type == "bearer": # Bearer token authentication using AUTH_SECRET from fastmcp.server.auth import StaticTokenVerifier # StaticTokenVerifier expects a dict mapping token -> metadata tokens = { config.auth_secret: { "client_id": "bearer_client", "scopes": ["execute"] } } auth = StaticTokenVerifier(tokens=tokens) logger.info("✅ Bearer token authentication configured") return auth elif config.auth_type == "oauth": # Set up OAuth provider using FastMCP's built-in InMemoryOAuthProvider # Use Cloudflare tunnel URL if configured, otherwise use localhost if config.cloudflare_tunnel_url: base_url = config.cloudflare_tunnel_url logger.info(f"Using Cloudflare tunnel URL: {base_url}") elif config.is_development: base_url = f"https://localhost:{config.port}" else: base_url = f"https://{config.host}:{config.port}" # Create a custom OAuth provider that extends PersistentOAuthProvider # This ensures /authorize works with FastMCP's built-in routing and persists tokens class VivintOAuthProvider(PersistentOAuthProvider): def __init__(self, base_url: str): super().__init__(base_url=base_url) # Store pending authorization requests self.pending_auth = {} # Store authenticated sessions (in production, use a proper session store) self.authenticated_sessions = {} async def register_client(self, client_info: OAuthClientInformationFull) -> None: """Register a client with optional restriction for new clients.""" # Check if new client registration is disabled if config.oauth_disable_new_clients: # Check if this is an existing client if client_info.client_id not in self.clients: logger.warning(f"🚫 New client registration blocked: {client_info.client_id}") raise ValueError("New client registration is currently disabled. Only existing clients can authenticate.") else: logger.info(f"✅ Existing client allowed: {client_info.client_id}") # Proceed with registration (existing clients can re-register) await super().register_client(client_info) logger.info(f"📝 Client registered: {client_info.client_id}") async def authorize(self, client: OAuthClientInformationFull, params: AuthorizationParams) -> str: """FastMCP-compatible OAuth authorization with user login.""" logger.info(f"🔍 VivintOAuthProvider.authorize() called for client {client.client_id}") logger.info(f"🔍 Redirect URI requested: {params.redirect_uri}") # Check if new client authorization is disabled if config.oauth_disable_new_clients: logger.warning(f"🚫 Authorization blocked - new client access disabled: {client.client_id}") raise ValueError("New client authorization is currently disabled. The server is locked to existing authenticated clients only.") # Check if user has an authenticated session # For now, we'll check if environment credentials are available (proof of concept) # In a full implementation, you'd check for a session cookie or token session_id = None # In real implementation, extract from request cookies if session_id and session_id in self.authenticated_sessions: # User is already authenticated, issue authorization code logger.info("✅ User session authenticated, issuing authorization code") return await self._issue_authorization_code(client, params) # For testing purposes, you can uncomment this to skip login form: # if self._validate_environment_credentials(): # logger.info("✅ Environment credentials validated, issuing authorization code") # return await self._issue_authorization_code(client, params) # User not authenticated - redirect to login page logger.info("❌ User not authenticated, redirecting to login page") # Generate a temporary auth request ID auth_request_id = secrets.token_urlsafe(32) # Store the authorization request parameters self.pending_auth[auth_request_id] = { 'client': client, 'params': params, 'created_at': time.time() } # FastMCP expects this method to return a redirect URL # We redirect to our custom login page with the auth request ID login_url = f"{base_url}/oauth/login?request_id={auth_request_id}" logger.info(f"🔗 Redirecting to login page: {login_url}") return login_url def _validate_environment_credentials(self) -> bool: """Validate that required Vivint credentials are present in environment.""" try: # Check if credentials exist in config if not (config.username and config.password and len(config.username.strip()) > 0 and len(config.password.strip()) > 0): logger.debug("❌ Vivint credentials not found in environment") return False logger.debug("✅ Vivint credentials found in environment") return True except Exception as e: logger.error(f"Error validating credentials: {e}") return False def _validate_user_credentials(self, username: str, password: str) -> bool: """Validate user-provided credentials against environment.""" try: # Validate against environment credentials return (username == config.username and password == config.password) except Exception as e: logger.error(f"Error validating user credentials: {e}") return False async def _issue_authorization_code(self, client: OAuthClientInformationFull, params: AuthorizationParams) -> str: """Issue an authorization code after successful authentication.""" from mcp.server.auth.provider import AuthorizationCode, construct_redirect_uri # Generate authorization code auth_code_value = f"vivint_auth_{secrets.token_hex(32)}" expires_at = time.time() + 300 # 5 minute expiry # Store authorization code auth_code = AuthorizationCode( code=auth_code_value, client_id=client.client_id, redirect_uri=params.redirect_uri, redirect_uri_provided_explicitly=params.redirect_uri_provided_explicitly, scopes=params.scopes or [], expires_at=expires_at, code_challenge=params.code_challenge, ) self.auth_codes[auth_code_value] = auth_code logger.info(f"✅ Issued authorization code for client {client.client_id}") # Build redirect URL return construct_redirect_uri( str(params.redirect_uri), code=auth_code_value, state=params.state ) oauth_provider = VivintOAuthProvider(base_url=base_url) # Register the configured client if credentials exist if config.oauth_client_id and config.oauth_client_secret: # Convert configured redirect URIs to AnyHttpUrl objects # Store redirect URIs as strings to match HTTP request parameter types redirect_uris = [uri.strip() for uri in config.oauth_redirect_uris if uri.strip()] client_info = OAuthClientInformationFull( client_id=config.oauth_client_id, client_secret=config.oauth_client_secret, client_name="Claude Desktop", redirect_uris=redirect_uris, grant_types=["authorization_code", "refresh_token"], response_types=["code"], scope="claudeai vivint:read" # Support both claudeai and custom scopes ) # Register the client directly (register_client is async, we're in sync context) # This is exactly what InMemoryOAuthProvider.register_client() does internally oauth_provider.clients[config.oauth_client_id] = client_info logger.info(f"✅ Vivint OAuth client registered with client ID: {config.oauth_client_id}") logger.info(f"✅ Vivint OAuth authentication configured with client ID: {config.oauth_client_id}") logger.info(f"OAuth redirect URIs from config: {config.oauth_redirect_uris}") # Debug: verify client is actually registered logger.info(f"🔍 Provider clients after registration: {list(oauth_provider.clients.keys())}") if config.oauth_client_id in oauth_provider.clients: registered_client = oauth_provider.clients[config.oauth_client_id] logger.info(f"🔍 Registered client redirect URIs: {[str(uri) for uri in registered_client.redirect_uris]}") logger.info(f"🔍 Redirect URI types: {[type(uri).__name__ for uri in registered_client.redirect_uris]}") # Test exact matching against Claude's callback test_uri = "https://claude.ai/api/mcp/auth_callback" logger.info(f"🔍 Testing exact match for: '{test_uri}'") for i, uri in enumerate(registered_client.redirect_uris): logger.info(f"🔍 [{i}] '{uri}' == '{test_uri}': {str(uri) == test_uri}") logger.info(f"🔍 [{i}] repr: {repr(str(uri))} vs {repr(test_uri)}") # Test the EXACT validation that's failing: "if redirect_uri not in self.redirect_uris" logger.info(f"🔍 Testing 'in' operator (the actual validation):") logger.info(f"🔍 '{test_uri}' in redirect_uris: {test_uri in registered_client.redirect_uris}") # Let's also try with AnyHttpUrl objects try: from pydantic import AnyHttpUrl as PydanticAnyHttpUrl test_url_obj = PydanticAnyHttpUrl(test_uri) logger.info(f"🔍 AnyHttpUrl('{test_uri}') in redirect_uris: {test_url_obj in registered_client.redirect_uris}") except Exception as e: logger.warning(f"Could not test AnyHttpUrl object: {e}") else: logger.error(f"❌ Client {config.oauth_client_id} NOT found in provider after registration!") return oauth_provider else: raise ValueError("OAuth client credentials not found - run: python src/generate_oauth_credentials.py") else: raise ValueError(f"Unsupported auth type: {config.auth_type}") except Exception as e: logger.error(f"❌ Authentication setup failed: {str(e)}") import traceback logger.error(f"Full error traceback: {traceback.format_exc()}") if config.is_production: raise # Fail hard in production else: logger.warning("⚠️ Continuing without authentication in development mode") return None # Setup authentication auth_provider = setup_authentication() # Initialize MCP server with authentication mcp = FastMCP("Vivint Security System MCP Server", auth=auth_provider) # Add custom OAuth login routes if using OAuth provider if auth_provider and hasattr(auth_provider, 'pending_auth'): from starlette.responses import HTMLResponse, RedirectResponse @mcp.custom_route("/oauth/login", methods=["GET", "POST"]) async def oauth_login_handler(request): """Handle OAuth login form display and submission.""" if request.method == "GET": # Show login form request_id = request.query_params.get("request_id") if not request_id or request_id not in auth_provider.pending_auth: return HTMLResponse("Invalid or expired authorization request", status_code=400) # Check if login is locked out locked_out, seconds_remaining = is_login_locked_out() if locked_out: minutes_remaining = seconds_remaining // 60 seconds_part = seconds_remaining % 60 time_display = f"{minutes_remaining}m {seconds_part}s" if minutes_remaining > 0 else f"{seconds_part}s" lockout_html = f""" <!DOCTYPE html> <html> <head> <title>Login Temporarily Locked</title> <style> body {{ font-family: Arial, sans-serif; max-width: 400px; margin: 50px auto; padding: 20px; }} .error {{ color: red; text-align: center; }} .lockout {{ background: #ffe6e6; border: 1px solid #ff9999; padding: 20px; border-radius: 8px; }} .timer {{ font-size: 24px; font-weight: bold; color: #d00; }} </style> <meta http-equiv="refresh" content="10"> </head> <body> <div class="lockout"> <h2>🔒 Login Temporarily Locked</h2> <p>Too many failed login attempts have been detected.</p> <p>Please wait <span class="timer">{time_display}</span> before trying again.</p> <p><small>This page will refresh automatically every 10 seconds.</small></p> </div> </body> </html> """ return HTMLResponse(lockout_html, status_code=429) # Create simple login form HTML html_content = f""" <!DOCTYPE html> <html> <head> <title>Vivint MCP Server - Login</title> <style> body {{ font-family: Arial, sans-serif; max-width: 400px; margin: 50px auto; padding: 20px; }} .form-group {{ margin-bottom: 15px; }} label {{ display: block; margin-bottom: 5px; font-weight: bold; }} input[type="text"], input[type="password"] {{ width: 100%; padding: 8px; border: 1px solid #ddd; border-radius: 4px; }} button {{ background-color: #007cba; color: white; padding: 10px 20px; border: none; border-radius: 4px; cursor: pointer; width: 100%; }} button:hover {{ background-color: #005a87; }} .error {{ color: red; margin-top: 10px; }} .header {{ text-align: center; margin-bottom: 30px; }} </style> </head> <body> <div class="header"> <h2>🏠 Vivint MCP Server</h2> <p>Please enter your Vivint credentials to authorize access</p> </div> <form method="post" action="/oauth/login"> <input type="hidden" name="request_id" value="{request_id}"> <div class="form-group"> <label for="username">Vivint Username:</label> <input type="text" id="username" name="username" required> </div> <div class="form-group"> <label for="password">Vivint Password:</label> <input type="password" id="password" name="password" required> </div> <button type="submit">Login & Authorize</button> </form> <p style="font-size: 12px; color: #666; margin-top: 20px; text-align: center;"> Your credentials are used only to verify your Vivint account access. </p> </body> </html> """ return HTMLResponse(html_content) elif request.method == "POST": # Handle login form submission form_data = await request.form() request_id = form_data.get("request_id") username = form_data.get("username") password = form_data.get("password") if not request_id or request_id not in auth_provider.pending_auth: return HTMLResponse("Invalid or expired authorization request", status_code=400) # Check if login is locked out locked_out, seconds_remaining = is_login_locked_out() if locked_out: minutes_remaining = seconds_remaining // 60 seconds_part = seconds_remaining % 60 time_display = f"{minutes_remaining}m {seconds_part}s" if minutes_remaining > 0 else f"{seconds_part}s" lockout_html = f""" <!DOCTYPE html> <html> <head><title>Login Locked</title><style>body{{font-family: Arial, sans-serif; max-width: 400px; margin: 50px auto; padding: 20px;}} .error{{color: red; text-align: center;}} .lockout{{background: #ffe6e6; border: 1px solid #ff9999; padding: 20px; border-radius: 8px;}} .timer{{font-size: 18px; font-weight: bold; color: #d00;}}</style></head> <body> <div class="lockout"> <h2>🔒 Login Locked</h2> <p>Authentication is temporarily locked due to failed attempts.</p> <p>Time remaining: <span class="timer">{time_display}</span></p> </div> </body> </html> """ return HTMLResponse(lockout_html, status_code=429) # Validate credentials if not auth_provider._validate_user_credentials(username, password): # Record failed login attempt and check for lockout lockout_triggered = record_failed_login() if lockout_triggered: # Return lockout page lockout_html = f""" <!DOCTYPE html> <html> <head><title>Login Locked</title><style>body{{font-family: Arial, sans-serif; max-width: 400px; margin: 50px auto; padding: 20px;}} .error{{color: red; text-align: center;}} .lockout{{background: #ffe6e6; border: 1px solid #ff9999; padding: 20px; border-radius: 8px;}} .timer{{font-size: 18px; font-weight: bold; color: #d00;}}</style></head> <body> <div class="lockout"> <h2>🔒 Login Locked</h2> <p>Too many failed login attempts. Authentication is now locked for {config.rate_limit_lockout_minutes} minutes.</p> <p><strong>This affects all users until the lockout expires.</strong></p> </div> </body> </html> """ return HTMLResponse(lockout_html, status_code=429) else: # Return regular error page html_content = f""" <!DOCTYPE html> <html> <head><title>Login Error</title><style>body{{font-family: Arial, sans-serif; max-width: 400px; margin: 50px auto; padding: 20px;}}</style></head> <body> <h2>❌ Authentication Failed</h2> <p>Invalid Vivint credentials. Please check your username and password.</p> <p><small>⚠️ Warning: Failed attempts may result in temporary lockout.</small></p> <a href="/oauth/login?request_id={request_id}">← Try Again</a> </body> </html> """ return HTMLResponse(html_content, status_code=401) # Login successful - reset rate limiting reset_login_attempts() # Get stored authorization request auth_request = auth_provider.pending_auth.pop(request_id) client = auth_request['client'] params = auth_request['params'] # Issue authorization code redirect_url = await auth_provider._issue_authorization_code(client, params) # Redirect to client with authorization code return RedirectResponse(redirect_url, status_code=302) # Session middleware will be added when we create custom routes below # Global connection state _connection_initialized = False # Rate limiting state (in-memory) _login_lockout_until = 0 # Timestamp when lockout expires _failed_login_count = 0 # Counter for failed login attempts def is_login_locked_out() -> tuple[bool, int]: """Check if login is currently locked out. Returns: tuple: (is_locked_out, seconds_remaining) """ if not config.rate_limit_enabled: return False, 0 current_time = time.time() if current_time < _login_lockout_until: seconds_remaining = int(_login_lockout_until - current_time) return True, seconds_remaining return False, 0 def record_failed_login() -> bool: """Record a failed login attempt and check if lockout should be triggered. Returns: bool: True if lockout was triggered, False otherwise """ global _failed_login_count, _login_lockout_until if not config.rate_limit_enabled: return False _failed_login_count += 1 logger.warning(f"🚨 Failed login attempt #{_failed_login_count}") if _failed_login_count >= config.rate_limit_max_attempts: lockout_duration = config.rate_limit_lockout_minutes * 60 # Convert to seconds _login_lockout_until = time.time() + lockout_duration logger.error(f"🔒 LOGIN LOCKOUT TRIGGERED - Locked for {config.rate_limit_lockout_minutes} minutes") return True return False def reset_login_attempts(): """Reset failed login counter after successful login.""" global _failed_login_count, _login_lockout_until if _failed_login_count > 0: logger.info("✅ Login successful - resetting failed attempt counter") _failed_login_count = 0 _login_lockout_until = 0 async def ensure_vivint_connection(): """Ensure Vivint connection is established.""" global _connection_initialized if not _connection_initialized: try: await vivint_client.connect() _connection_initialized = True logger.info("Vivint connection established") except VivintMfaRequiredError as e: logger.error(f"MFA required for Vivint connection: {str(e)}") raise ValueError( f"MFA authentication required: {str(e)} " "Please set VIVINT_MFA_CODE environment variable with your current 2FA code." ) except VivintAuthenticationError as e: logger.error(f"Vivint authentication failed: {str(e)}") raise ValueError(f"Authentication failed: {str(e)}") except Exception as e: logger.error(f"Failed to establish Vivint connection: {str(e)}") raise ValueError(f"Connection failed: {str(e)}") @mcp.tool(description="Get current security system status including armed state, mode, and alerts") async def get_system_status() -> Dict[str, Any]: """Get the current status of the Vivint security system.""" try: await ensure_vivint_connection() system_info = await vivint_client.get_system() # Get system object for more detailed info system_obj = next(s for s in vivint_client.client.systems if s.id == system_info["id"]) return { "armed": not system_info.get("is_disarmed", True), # True if not disarmed "arm_state": system_info.get("arm_state", "unknown"), "is_disarmed": system_info.get("is_disarmed", True), "is_armed_stay": system_info.get("is_armed_stay", False), "is_armed_away": system_info.get("is_armed_away", False), "panel_id": system_info.get("panel_id", system_info["id"]), "panel_name": system_info.get("panel_name", system_info["name"]), "system_name": system_info["name"], "system_id": system_info["id"], "panel_count": system_info.get("panel_count", 1), "user_count": system_info.get("user_count", 0), "is_admin": system_info.get("is_admin", False), "timestamp": datetime.now().isoformat() } except VivintClientError as e: logger.error(f"Vivint client error in get_system_status: {str(e)}") return {"error": str(e), "timestamp": datetime.now().isoformat()} except Exception as e: logger.error(f"Unexpected error in get_system_status: {str(e)}") return {"error": f"Unexpected error: {str(e)}", "timestamp": datetime.now().isoformat()} @mcp.tool(description="Get complete inventory of all devices including type, location, and status") async def get_all_devices() -> List[Dict[str, Any]]: """Get comprehensive list of all devices in the system.""" try: await ensure_vivint_connection() devices = await vivint_client.get_all_devices() # Enhance device information for device in devices: device["timestamp"] = datetime.now().isoformat() # Add health status battery_level = device.get("battery_level") if battery_level is not None: try: # Convert battery level to int if it's a string battery_int = int(float(str(battery_level))) if battery_level != "None" else None if battery_int is not None: device["battery_level"] = battery_int if battery_int < 20: device["health_status"] = "low_battery" elif battery_int < 50: device["health_status"] = "medium_battery" else: device["health_status"] = "good" else: device["health_status"] = "no_battery" except (ValueError, TypeError): device["health_status"] = "battery_unknown" else: device["health_status"] = "unknown" if not device.get("is_online", True) else "good" return devices except VivintClientError as e: logger.error(f"Vivint client error in get_all_devices: {str(e)}") return [{"error": str(e), "timestamp": datetime.now().isoformat()}] except Exception as e: logger.error(f"Unexpected error in get_all_devices: {str(e)}") return [{"error": f"Unexpected error: {str(e)}", "timestamp": datetime.now().isoformat()}] @mcp.tool(description="Get status of security sensors (motion, door/window, glass break, smoke, etc.)") async def get_security_sensors() -> List[Dict[str, Any]]: """Get status of all security-related sensors.""" try: await ensure_vivint_connection() all_devices = await vivint_client.get_all_devices() # Filter for security sensors security_types = ['sensor', 'motion', 'door', 'window', 'glass', 'smoke', 'co', 'flood'] security_sensors = [] for device in all_devices: device_type_lower = device["type"].lower() if any(sensor_type in device_type_lower for sensor_type in security_types): sensor_info = { **device, "sensor_type": device["type"], "triggered": device.get("state", "").lower() in ['triggered', 'open', 'motion', 'alarm'], "bypassed": device.get('bypassed', False), "zone_id": device.get('zone_id', None), "timestamp": datetime.now().isoformat() } security_sensors.append(sensor_info) return security_sensors except VivintClientError as e: logger.error(f"Vivint client error in get_security_sensors: {str(e)}") return [{"error": str(e), "timestamp": datetime.now().isoformat()}] except Exception as e: logger.error(f"Unexpected error in get_security_sensors: {str(e)}") return [{"error": f"Unexpected error: {str(e)}", "timestamp": datetime.now().isoformat()}] @mcp.tool(description="Get status and information about cameras in the system") async def get_cameras() -> List[Dict[str, Any]]: """Get information about all cameras in the system.""" try: await ensure_vivint_connection() camera_devices = await vivint_client.get_devices_by_type('camera') cameras = [] for device in camera_devices: camera_info = { **device, "recording": device.get('recording', False), "motion_detection": device.get('motion_detection_enabled', True), "resolution": device.get('resolution', 'Unknown'), "has_night_vision": device.get('night_vision', False), "rtsp_available": 'rtsp_url' in device, "timestamp": datetime.now().isoformat() } cameras.append(camera_info) return cameras except VivintClientError as e: logger.error(f"Vivint client error in get_cameras: {str(e)}") return [{"error": str(e), "timestamp": datetime.now().isoformat()}] except Exception as e: logger.error(f"Unexpected error in get_cameras: {str(e)}") return [{"error": f"Unexpected error: {str(e)}", "timestamp": datetime.now().isoformat()}] @mcp.tool(description="Get status of smart locks including lock state and battery level") async def get_locks() -> List[Dict[str, Any]]: """Get information about all smart locks in the system.""" try: await ensure_vivint_connection() lock_devices = await vivint_client.get_devices_by_type('lock') locks = [] for device in lock_devices: lock_info = { **device, "locked": device.get("state", "unknown").lower() == 'locked', "last_operated_by": device.get('last_operated_by', 'Unknown'), "last_operated_at": device.get('last_operated_time', None), "auto_lock_enabled": device.get('auto_lock', True), "tamper_status": device.get('tamper', 'normal'), "timestamp": datetime.now().isoformat() } locks.append(lock_info) return locks except VivintClientError as e: logger.error(f"Vivint client error in get_locks: {str(e)}") return [{"error": str(e), "timestamp": datetime.now().isoformat()}] except Exception as e: logger.error(f"Unexpected error in get_locks: {str(e)}") return [{"error": f"Unexpected error: {str(e)}", "timestamp": datetime.now().isoformat()}] @mcp.tool(description="Get thermostat data including current temperature, settings, and mode") async def get_thermostats() -> List[Dict[str, Any]]: """Get information about all thermostats in the system.""" try: await ensure_vivint_connection() thermostat_devices = await vivint_client.get_devices_by_type('thermostat') thermostats = [] for device in thermostat_devices: thermostat_info = { **device, "current_temperature": device.get('current_temperature', None), "target_temperature": device.get('target_temperature', None), "heat_setpoint": device.get('heat_setpoint', None), "cool_setpoint": device.get('cool_setpoint', None), "mode": device.get('mode', 'unknown'), "fan_mode": device.get('fan_mode', 'auto'), "humidity": device.get('humidity', None), "schedule_active": device.get('schedule_active', False), "timestamp": datetime.now().isoformat() } thermostats.append(thermostat_info) return thermostats except VivintClientError as e: logger.error(f"Vivint client error in get_thermostats: {str(e)}") return [{"error": str(e), "timestamp": datetime.now().isoformat()}] except Exception as e: logger.error(f"Unexpected error in get_thermostats: {str(e)}") return [{"error": f"Unexpected error: {str(e)}", "timestamp": datetime.now().isoformat()}] @mcp.tool(description="Get recent system events and activity log from the last 24 hours") async def get_recent_events(hours: int = 24) -> List[Dict[str, Any]]: """Get recent events from the system activity log.""" try: await ensure_vivint_connection() # Note: This is a placeholder implementation as vivintpy may not have direct event access # In a real implementation, you would access the event log through the API events = [] # Get current system state as an "event" system_info = await vivint_client.get_system() current_event = { "id": f"status_{datetime.now().strftime('%Y%m%d_%H%M%S')}", "type": "system_status", "description": f"System is {'disarmed' if system_info.get('is_disarmed', True) else 'armed'} ({system_info.get('arm_state', 'unknown')})", "timestamp": datetime.now().isoformat(), "device_id": system_info["panel_id"], "device_name": "Security Panel" } events.append(current_event) # Add recent device state changes (simulated - would need real event API) devices = await vivint_client.get_all_devices() for device in devices[:5]: # Limit to recent devices if device.get("last_update"): event = { "id": f"device_{device['id']}_{device['last_update']}", "type": "device_update", "description": f"{device['name']} state: {device['state']}", "timestamp": device["last_update"] or datetime.now().isoformat(), "device_id": device["id"], "device_name": device["name"] } events.append(event) # Sort by timestamp (newest first) events.sort(key=lambda x: x["timestamp"], reverse=True) return events[:50] # Limit to 50 most recent events except VivintClientError as e: logger.error(f"Vivint client error in get_recent_events: {str(e)}") return [{"error": str(e), "timestamp": datetime.now().isoformat()}] except Exception as e: logger.error(f"Unexpected error in get_recent_events: {str(e)}") return [{"error": f"Unexpected error: {str(e)}", "timestamp": datetime.now().isoformat()}] @mcp.tool(description="Get device health status including battery levels and connectivity issues") async def get_device_health() -> Dict[str, Any]: """Get comprehensive device health status.""" try: await ensure_vivint_connection() all_devices = await vivint_client.get_all_devices() health_summary = { "total_devices": len(all_devices), "online_devices": 0, "offline_devices": 0, "low_battery_devices": [], "offline_device_list": [], "devices_needing_attention": [], "timestamp": datetime.now().isoformat() } for device in all_devices: # Check online status if device.get("online", True): health_summary["online_devices"] += 1 else: health_summary["offline_devices"] += 1 health_summary["offline_device_list"].append({ "id": device["id"], "name": device["name"], "type": device["type"], "last_seen": device.get("last_update") }) # Check battery levels battery_level = device.get("battery_level") if battery_level is not None: try: battery_int = int(float(str(battery_level))) if str(battery_level).lower() != "none" else None if battery_int is not None and battery_int < 20: health_summary["low_battery_devices"].append({ "id": device["id"], "name": device["name"], "type": device["type"], "battery_level": battery_int }) except (ValueError, TypeError): pass # Skip invalid battery levels # Check for devices needing attention needs_attention = False reasons = [] if not device.get("online", True): needs_attention = True reasons.append("offline") try: battery_val = device.get("battery_level", 100) battery_check = int(float(str(battery_val))) if str(battery_val).lower() != "none" else 100 except (ValueError, TypeError): battery_check = 100 if battery_check < 20: needs_attention = True reasons.append("low_battery") if device.get('tamper', False): needs_attention = True reasons.append("tamper_alert") if needs_attention: health_summary["devices_needing_attention"].append({ "id": device["id"], "name": device["name"], "type": device["type"], "reasons": reasons, "battery_level": device.get("battery_level") }) # Sort low battery devices by battery level health_summary["low_battery_devices"].sort(key=lambda x: x["battery_level"]) return health_summary except VivintClientError as e: logger.error(f"Vivint client error in get_device_health: {str(e)}") return {"error": str(e), "timestamp": datetime.now().isoformat()} except Exception as e: logger.error(f"Unexpected error in get_device_health: {str(e)}") return {"error": f"Unexpected error: {str(e)}", "timestamp": datetime.now().isoformat()} # Debug endpoint to inspect OAuth state if config.auth_type == "oauth": @mcp.custom_route("/debug/oauth", methods=["GET"]) async def debug_oauth(request: Request): """Debug endpoint to inspect registered OAuth clients and redirect URIs.""" try: from starlette.responses import JSONResponse # Check if debug mode is enabled if not config.debug_mode: return JSONResponse( {"error": "Debug endpoints are disabled. Set DEBUG_MODE=true in .env to enable."}, status_code=403 ) if not auth_provider or not hasattr(auth_provider, 'clients'): return JSONResponse({"error": "No OAuth provider configured"}, status_code=500) data = { "auth_type": config.auth_type, "base_url": config.cloudflare_tunnel_url or f"http://{config.host}:{config.port}", "clients": [ { "client_id": cid, "redirect_uris": [str(u) for u in info.redirect_uris], "scope": info.scope, } for cid, info in auth_provider.clients.items() ], } logger.info(f"/debug/oauth -> {data}") return JSONResponse(data) except Exception as e: from starlette.responses import JSONResponse logger.error(f"/debug/oauth error: {e}") return JSONResponse({"error": str(e)}, status_code=500) # Cleanup function async def cleanup(): """Cleanup resources on shutdown.""" try: await vivint_client.disconnect() logger.info("Vivint client disconnected") except Exception as e: logger.error(f"Error during cleanup: {str(e)}") if __name__ == "__main__": port = config.port host = config.host logger.info(f"Starting Vivint MCP Server on {host}:{port}") logger.info(f"Environment: {config.environment}") logger.info(f"Debug mode: {config.debug_mode}") # Authentication status logging if auth_provider: if config.auth_type == "oauth": logger.info("🔐 OAuth authentication enabled - server supports OAuth client flow") else: logger.info("🔐 Authentication enabled - server requires valid tokens") else: logger.warning("⚠️ Authentication disabled - server is PUBLICLY accessible!") try: # Use configurable transport from environment transport_type = config.transport logger.info(f"🚀 Starting MCP server with {transport_type.upper()} transport on {host}:{port}") if transport_type == "stdio": # STDIO transport doesn't use host/port mcp.run(transport="stdio") else: # HTTP and SSE transports use host/port mcp.run( transport=transport_type, host=host, port=port ) except KeyboardInterrupt: logger.info("Received interrupt signal, shutting down...") except Exception as e: logger.error(f"Server error: {str(e)}") sys.exit(1) finally: # Run cleanup try: asyncio.run(cleanup()) except Exception as e: logger.error(f"Cleanup error: {str(e)}")

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/bradmb/vivint-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server