Skip to main content
Glama

Vivint Security System MCP Server

by bradmb
template_free_oauth_provider.py23.5 kB
#!/usr/bin/env python3 """Template-free Vivint OAuth provider that generates HTML directly.""" import secrets import time import logging from typing import Dict, Optional, Any, List from starlette.requests import Request from starlette.responses import HTMLResponse, RedirectResponse from fastmcp.server.auth.providers.in_memory import InMemoryOAuthProvider from mcp.server.auth.provider import AuthorizationParams, AuthorizeError, construct_redirect_uri from mcp.shared.auth import OAuthClientInformationFull try: from .vivint_client import VivintMCPClient, VivintAuthenticationError, VivintMfaRequiredError from .config import config except ImportError: # Handle case when run directly import sys import os sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) from vivint_client import VivintMCPClient, VivintAuthenticationError, VivintMfaRequiredError from config import config logger = logging.getLogger(__name__) class TemplateFreeOAuthProvider(InMemoryOAuthProvider): """OAuth provider that generates HTML directly without template dependencies.""" def __init__(self, base_url: str): """Initialize the Vivint OAuth provider.""" super().__init__(base_url=base_url) logger.info(f"TemplateFreeOAuthProvider initialized with base_url={base_url}") self._authenticated_sessions: Dict[str, str] = {} # session_id -> username def get_authorize_url_params(self, request: Request) -> Dict[str, Any]: """Extract OAuth parameters from authorization request.""" params = { "response_type": request.query_params.get("response_type"), "client_id": request.query_params.get("client_id"), "redirect_uri": request.query_params.get("redirect_uri"), "scope": request.query_params.get("scope"), "state": request.query_params.get("state"), "code_challenge": request.query_params.get("code_challenge"), "code_challenge_method": request.query_params.get("code_challenge_method"), } logger.info( "OAuth authorize GET params: client_id=%s redirect_uri=%s scope=%s state=%s code_challenge_method=%s", params.get("client_id"), params.get("redirect_uri"), params.get("scope"), params.get("state"), params.get("code_challenge_method") ) return params async def get_authorize_params_from_form(self, request: Request) -> Dict[str, Any]: """Extract OAuth parameters from form data (for POST requests).""" form_data = await request.form() params = { "response_type": form_data.get("response_type"), "client_id": form_data.get("client_id"), "redirect_uri": form_data.get("redirect_uri"), "scope": form_data.get("scope"), "state": form_data.get("state"), "code_challenge": form_data.get("code_challenge"), "code_challenge_method": form_data.get("code_challenge_method"), } logger.info( "OAuth authorize POST params: client_id=%s redirect_uri=%s scope=%s state=%s code_challenge_method=%s", params.get("client_id"), params.get("redirect_uri"), params.get("scope"), params.get("state"), params.get("code_challenge_method") ) return params async def handle_authorization_request(self, request: Request) -> str: """Handle GET /authorize - show login form.""" auth_params = self.get_authorize_url_params(request) client_id = auth_params.get("client_id") redirect_uri = auth_params.get("redirect_uri") # Auto-register Claude Desktop client if it doesn't exist if client_id and client_id not in self.clients: logger.info(f"Auto-registering client: {client_id}") # Import here to avoid circular imports from mcp.shared.auth import OAuthClientInformationFull from pydantic import AnyHttpUrl # Default redirect URIs for Claude Desktop and testing default_redirect_uris = [ "https://claude.ai/api/mcp/auth_callback", "http://localhost:3000/callback", "http://localhost:8080/callback" ] # Add the requested redirect URI if provided if redirect_uri and redirect_uri not in default_redirect_uris: default_redirect_uris.append(redirect_uri) client_info = OAuthClientInformationFull( client_id=client_id, client_secret="auto-generated-secret", # Simplified for testing client_name="Claude Desktop (Auto-registered)", redirect_uris=[AnyHttpUrl(uri) for uri in default_redirect_uris], grant_types=["authorization_code", "refresh_token"], response_types=["code"], scope="claudeai vivint:read" ) self.clients[client_id] = client_info logger.info(f"Auto-registered client {client_id} with redirect URIs: {default_redirect_uris}") # Validate client if not client_id or client_id not in self.clients: logger.warning("OAuth authorize: unknown client_id=%s. Available=%s", client_id, list(self.clients.keys())) return self.render_error_page( "Invalid Client", f"Unknown client ID: {client_id}. Available clients: {list(self.clients.keys())}" ) client_info = self.clients[client_id] logger.info("OAuth authorize: found client_id=%s with %d redirect URIs", client_id, len(client_info.redirect_uris)) # Validate redirect URI if redirect_uri: registered_uris = [str(uri) for uri in client_info.redirect_uris] logger.info("OAuth authorize: requested redirect_uri=%s; registered=%s", redirect_uri, registered_uris) if redirect_uri not in registered_uris: logger.warning(f"Redirect URI mismatch. Requested: {redirect_uri}, Registered: {registered_uris}") return self.render_error_page( "Invalid Redirect URI", f"Redirect URI '{redirect_uri}' not registered for client. Registered URIs: {', '.join(registered_uris)}" ) logger.info("OAuth authorize: redirect URI accepted; rendering login page") # Show login form return self.render_login_page(client_info, auth_params) async def handle_authorization_post( self, request: Request, username: str, password: str, mfa_code: Optional[str] = None, action: str = "login" ): """Handle POST /authorize - process login or consent.""" auth_params = await self.get_authorize_params_from_form(request) client_id = auth_params.get("client_id") redirect_uri = auth_params.get("redirect_uri") # Auto-register client if needed (same as GET handler) if client_id and client_id not in self.clients: logger.info(f"Auto-registering client in POST: {client_id}") from mcp.shared.auth import OAuthClientInformationFull from pydantic import AnyHttpUrl default_redirect_uris = [ "https://claude.ai/api/mcp/auth_callback", "http://localhost:3000/callback", "http://localhost:8080/callback" ] if redirect_uri and redirect_uri not in default_redirect_uris: default_redirect_uris.append(redirect_uri) client_info = OAuthClientInformationFull( client_id=client_id, client_secret="auto-generated-secret", client_name="Claude Desktop (Auto-registered)", redirect_uris=[AnyHttpUrl(uri) for uri in default_redirect_uris], grant_types=["authorization_code", "refresh_token"], response_types=["code"], scope="claudeai vivint:read" ) self.clients[client_id] = client_info if not client_id or client_id not in self.clients: return HTMLResponse( content=self.render_error_page( "Invalid Client", f"Client {client_id} not found. Available: {list(self.clients.keys())}" ), status_code=400 ) client_info = self.clients[client_id] if action == "login": # Authenticate with Vivint success, message = await self.authenticate_with_vivint(username, password, mfa_code) if success: # Generate authorization code immediately (simplified flow) try: logger.info("OAuth authorize: authentication successful for user; issuing code") redirect_url = await self.issue_authorization_code(client_info, auth_params) logger.info("OAuth authorize: redirecting to %s", redirect_url) return RedirectResponse(url=redirect_url, status_code=302) except Exception as e: logger.error(f"Failed to issue authorization code: {e}") return HTMLResponse( content=self.render_error_page( "Authorization Error", str(e) ), status_code=500 ) else: # Show error return HTMLResponse( content=self.render_login_page( client_info, auth_params, error=message ) ) else: return HTMLResponse( content=self.render_error_page( "Invalid Request", f"Unknown action: {action}" ), status_code=400 ) async def authenticate_with_vivint( self, username: str, password: str, mfa_code: Optional[str] = None ) -> tuple[bool, str]: """Simple Vivint authentication check.""" # For testing OAuth flow, allow a test user if username == "test@example.com" and password == "test123": logger.info(f"✅ Test authentication successful for: {username}") return True, "Authentication successful" # Check if credentials match environment (if set) if config.username and config.password: if username == config.username and password == config.password: logger.info(f"✅ Vivint authentication successful for: {username}") return True, "Authentication successful" logger.warning(f"❌ Vivint authentication failed for: {username}") return False, "Invalid Vivint credentials" async def issue_authorization_code( self, client_info: OAuthClientInformationFull, auth_params: Dict[str, Any] ) -> str: """Issue an authorization code after successful authentication.""" # Create authorization parameters object scopes = auth_params.get("scope", "").split() if auth_params.get("scope") else [] # Handle PKCE parameters - use defaults if not provided code_challenge = auth_params.get("code_challenge") code_challenge_method = auth_params.get("code_challenge_method") # If code_challenge is "None" string, None, or empty, use a default if code_challenge in [None, "None", ""]: code_challenge = "placeholder_challenge" # Required by MCP but not used if code_challenge_method in [None, "None", ""]: code_challenge_method = "plain" # Simple method for testing params = AuthorizationParams( response_type=auth_params["response_type"], client_id=auth_params["client_id"], redirect_uri=auth_params["redirect_uri"], redirect_uri_provided_explicitly=True, scopes=scopes, state=auth_params.get("state"), code_challenge=code_challenge ) logger.info( "OAuth code: client_id=%s redirect_uri=%s scopes=%s state=%s", params.client_id, params.redirect_uri, scopes, params.state ) # Generate authorization code auth_code_value = f"vivint_auth_{secrets.token_hex(32)}" expires_at = time.time() + 300 # 5 minute expiry # Validate and filter scopes scopes_list = params.scopes if params.scopes is not None else [] if client_info.scope: client_allowed_scopes = set(client_info.scope.split()) scopes_list = [s for s in scopes_list if s in client_allowed_scopes] # Store authorization code from mcp.server.auth.provider import AuthorizationCode auth_code = AuthorizationCode( code=auth_code_value, client_id=client_info.client_id, redirect_uri=params.redirect_uri, redirect_uri_provided_explicitly=params.redirect_uri_provided_explicitly, scopes=scopes_list, expires_at=expires_at, code_challenge=params.code_challenge, ) self.auth_codes[auth_code_value] = auth_code logger.info(f"✅ Issued authorization code for client {client_info.client_id}") # Build redirect URL return construct_redirect_uri( str(params.redirect_uri), code=auth_code_value, state=params.state ) def render_login_page( self, client_info: OAuthClientInformationFull, auth_params: Dict[str, Any], error: Optional[str] = None ) -> str: """Render the login page as HTML string.""" error_html = "" if error: error_html = f""" <div style="background: #fed7d7; border: 1px solid #feb2b2; color: #c53030; padding: 12px; border-radius: 6px; margin-bottom: 20px;"> <strong>Error:</strong> {error} </div> """ scopes = auth_params.get("scope", "").split() if auth_params.get("scope") else [] scopes_html = "" for scope in scopes: if scope == "claudeai": scope_desc = "🤖 <strong>Claude AI Integration</strong> - Allow Claude Desktop to access your Vivint system" elif scope == "vivint:read": scope_desc = "📖 <strong>Read Access</strong> - View security system status and device information" else: scope_desc = f"🔧 <strong>{scope}</strong> - Custom scope access" scopes_html += f"<div style='background: white; padding: 10px 15px; margin-bottom: 8px; border-radius: 6px; border-left: 3px solid #4299e1;'>{scope_desc}</div>" return f"""<!DOCTYPE html> <html lang="en"> <head> <meta charset="UTF-8"> <meta name="viewport" content="width=device-width, initial-scale=1.0"> <title>Vivint Login - {client_info.client_name}</title> <style> * {{ box-sizing: border-box; margin: 0; padding: 0; }} body {{ font-family: -apple-system, BlinkMacSystemFont, 'Segoe UI', Roboto, sans-serif; background: linear-gradient(135deg, #667eea 0%, #764ba2 100%); min-height: 100vh; display: flex; align-items: center; justify-content: center; padding: 20px; }} .auth-container {{ background: white; border-radius: 12px; box-shadow: 0 20px 40px rgba(0,0,0,0.1); max-width: 450px; width: 100%; overflow: hidden; }} .auth-header {{ background: #2d3748; color: white; padding: 30px; text-align: center; }} .auth-header h1 {{ font-size: 24px; margin-bottom: 8px; }} .auth-header p {{ opacity: 0.8; font-size: 14px; }} .auth-body {{ padding: 30px; }} .client-info {{ background: #f7fafc; border-left: 4px solid #4299e1; padding: 15px; margin-bottom: 25px; border-radius: 0 6px 6px 0; }} .client-info h3 {{ color: #2d3748; margin-bottom: 5px; }} .client-info p {{ color: #718096; font-size: 14px; }} .form-group {{ margin-bottom: 20px; }} .form-group label {{ display: block; margin-bottom: 8px; font-weight: 500; color: #2d3748; }} .form-group input {{ width: 100%; padding: 12px; border: 2px solid #e2e8f0; border-radius: 6px; font-size: 16px; transition: border-color 0.2s; }} .form-group input:focus {{ outline: none; border-color: #4299e1; }} .auth-button {{ width: 100%; background: #4299e1; color: white; border: none; padding: 15px; border-radius: 6px; font-size: 16px; font-weight: 600; cursor: pointer; transition: background-color 0.2s; }} .auth-button:hover {{ background: #3182ce; }} .security-notice {{ background: #fef5e7; border: 1px solid #f6d55c; color: #744210; padding: 12px; border-radius: 6px; margin-bottom: 20px; font-size: 12px; text-align: center; }} .vivint-logo {{ width: 40px; height: 40px; background: #4299e1; border-radius: 50%; display: inline-flex; align-items: center; justify-content: center; margin-bottom: 10px; font-weight: bold; font-size: 18px; }} .cancel-link {{ display: block; text-align: center; margin-top: 15px; color: #718096; text-decoration: none; font-size: 14px; }} .cancel-link:hover {{ color: #4a5568; }} </style> </head> <body> <div class="auth-container"> <div class="auth-header"> <div class="vivint-logo">V</div> <h1>Vivint Login</h1> <p>Authenticate with your Vivint security system credentials</p> </div> <div class="auth-body"> <div class="client-info"> <h3>{client_info.client_name}</h3> <p>Client ID: {client_info.client_id}</p> <p>Redirect: {auth_params.get('redirect_uri', 'N/A')}</p> </div> {error_html} <form method="post" action="/oauth/authorize"> <div class="form-group"> <label for="username">Vivint Username</label> <input type="email" id="username" name="username" required placeholder="your.email@example.com"> </div> <div class="form-group"> <label for="password">Vivint Password</label> <input type="password" id="password" name="password" required placeholder="Your Vivint password"> </div> <div class="security-notice"> 🛡️ Your credentials are used only for authentication and are not stored. This ensures only you can authorize access to your Vivint system. </div> <input type="hidden" name="action" value="login"> <input type="hidden" name="response_type" value="{auth_params.get('response_type', '')}"> <input type="hidden" name="client_id" value="{auth_params.get('client_id', '')}"> <input type="hidden" name="redirect_uri" value="{auth_params.get('redirect_uri', '')}"> <input type="hidden" name="scope" value="{auth_params.get('scope', '')}"> <input type="hidden" name="state" value="{auth_params.get('state', '')}"> <input type="hidden" name="code_challenge" value="{auth_params.get('code_challenge') or ''}"> <input type="hidden" name="code_challenge_method" value="{auth_params.get('code_challenge_method') or ''}"> <button type="submit" class="auth-button">Sign In</button> </form> <a href="{auth_params.get('redirect_uri', '#')}?error=access_denied&state={auth_params.get('state', '')}" class="cancel-link">Cancel</a> </div> </div> <script> document.addEventListener('DOMContentLoaded', function() {{ document.getElementById('username').focus(); }}); </script> </body> </html>""" def render_error_page(self, title: str, message: str) -> str: """Render an error page as HTML string.""" return f"""<!DOCTYPE html> <html lang="en"> <head> <meta charset="UTF-8"> <meta name="viewport" content="width=device-width, initial-scale=1.0"> <title>{title} - Vivint OAuth</title> <style> * {{ box-sizing: border-box; margin: 0; padding: 0; }} body {{ font-family: -apple-system, BlinkMacSystemFont, 'Segoe UI', Roboto, sans-serif; background: linear-gradient(135deg, #f56565 0%, #c53030 100%); min-height: 100vh; display: flex; align-items: center; justify-content: center; padding: 20px; }} .error-container {{ background: white; border-radius: 12px; box-shadow: 0 20px 40px rgba(0,0,0,0.2); max-width: 500px; width: 100%; overflow: hidden; }} .error-header {{ background: #c53030; color: white; padding: 30px; text-align: center; }} .error-icon {{ width: 60px; height: 60px; background: rgba(255,255,255,0.2); border-radius: 50%; display: inline-flex; align-items: center; justify-content: center; margin-bottom: 15px; font-size: 28px; }} .error-header h1 {{ font-size: 24px; margin-bottom: 8px; }} .error-header p {{ opacity: 0.9; font-size: 16px; }} .error-body {{ padding: 30px; text-align: center; }} .error-message {{ background: #fed7d7; border: 2px solid #feb2b2; color: #c53030; padding: 20px; border-radius: 8px; margin-bottom: 25px; font-size: 16px; line-height: 1.5; }} .btn {{ padding: 12px 24px; border-radius: 6px; text-decoration: none; font-weight: 600; font-size: 14px; transition: all 0.2s; border: none; cursor: pointer; display: inline-flex; align-items: center; gap: 8px; }} .btn-primary {{ background: #4299e1; color: white; }} .btn-primary:hover {{ background: #3182ce; }} </style> </head> <body> <div class="error-container"> <div class="error-header"> <div class="error-icon">⚠️</div> <h1>{title}</h1> <p>Something went wrong with your authorization request</p> </div> <div class="error-body"> <div class="error-message"> {message} </div> <button onclick="history.back()" class="btn btn-primary"> 🔄 Try Again </button> </div> </div> </body> </html>"""

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/bradmb/vivint-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server