ServiceNow MCP Server

by osomai
Verified
""" Authentication manager for the ServiceNow MCP server. """ import base64 import logging from typing import Dict, Optional import requests from requests.auth import HTTPBasicAuth from servicenow_mcp.utils.config import AuthConfig, AuthType logger = logging.getLogger(__name__) class AuthManager: """ Authentication manager for ServiceNow API. This class handles authentication with the ServiceNow API using different authentication methods. """ def __init__(self, config: AuthConfig): """ Initialize the authentication manager. Args: config: Authentication configuration. """ self.config = config self.token: Optional[str] = None self.token_type: Optional[str] = None def get_headers(self) -> Dict[str, str]: """ Get the authentication headers for API requests. Returns: Dict[str, str]: Headers to include in API requests. """ headers = { "Accept": "application/json", "Content-Type": "application/json", } if self.config.type == AuthType.BASIC: if not self.config.basic: raise ValueError("Basic auth configuration is required") auth_str = f"{self.config.basic.username}:{self.config.basic.password}" encoded = base64.b64encode(auth_str.encode()).decode() headers["Authorization"] = f"Basic {encoded}" elif self.config.type == AuthType.OAUTH: if not self.token: self._get_oauth_token() headers["Authorization"] = f"{self.token_type} {self.token}" elif self.config.type == AuthType.API_KEY: if not self.config.api_key: raise ValueError("API key configuration is required") headers[self.config.api_key.header_name] = self.config.api_key.api_key return headers def _get_oauth_token(self): """ Get an OAuth token from ServiceNow. Raises: ValueError: If OAuth configuration is missing or token request fails. """ if not self.config.oauth: raise ValueError("OAuth configuration is required") oauth_config = self.config.oauth # Determine token URL token_url = oauth_config.token_url if not token_url: # Extract instance name from instance URL instance_parts = oauth_config.instance_url.split(".") if len(instance_parts) < 2: raise ValueError(f"Invalid instance URL: {oauth_config.instance_url}") instance_name = instance_parts[0].split("//")[-1] token_url = f"https://{instance_name}.service-now.com/oauth_token.do" # Request token data = { "grant_type": "password", "client_id": oauth_config.client_id, "client_secret": oauth_config.client_secret, "username": oauth_config.username, "password": oauth_config.password, } try: response = requests.post(token_url, data=data) response.raise_for_status() token_data = response.json() self.token = token_data.get("access_token") self.token_type = token_data.get("token_type", "Bearer") if not self.token: raise ValueError("No access token in response") except requests.RequestException as e: logger.error(f"Failed to get OAuth token: {e}") raise ValueError(f"Failed to get OAuth token: {e}") def refresh_token(self): """Refresh the OAuth token if using OAuth authentication.""" if self.config.type == AuthType.OAUTH: self._get_oauth_token()
ID: wfdzusqbvb