Skip to main content
Glama
hieuttmmo

EntraID MCP Server

by hieuttmmo
conditional_access.py13.9 kB
"""Conditional Access resource module for Microsoft Graph. This module provides access to Microsoft Graph conditional access policy resources. """ import logging from typing import Dict, List, Any from msgraph.generated.identity.conditional_access.policies.policies_request_builder import PoliciesRequestBuilder from utils.graph_client import GraphClient logger = logging.getLogger(__name__) def format_list_for_csv(lst): if not lst: return "" return "; ".join(str(item) for item in lst) async def get_group_details(client, group_ids): group_details = {} for group_id in group_ids: if not group_id: continue if group_id in ['All', 'None', 'GuestsOrExternalUsers', 'GuestOrExternalUserTypes']: group_details[group_id] = group_id continue try: group = await client.groups.by_group_id(group_id).get() group_details[group_id] = f"{getattr(group, 'display_name', group_id)} ({group_id})" except Exception as e: logger.warning(f"Could not fetch details for group {group_id}: {str(e)}") group_details[group_id] = f"Unknown Group ({group_id})" return group_details async def parse_conditions(client, conditions): parsed = { 'Users_Include': [], 'Users_Exclude': [], 'Groups_Include': [], 'Groups_Include_Names': [], 'Groups_Exclude': [], 'Groups_Exclude_Names': [], 'Roles_Include': [], 'Roles_Exclude': [], 'Include_Guest_Or_External_Users': '', 'Exclude_Guest_Or_External_Users': '', 'Apps_Include': [], 'Apps_Exclude': [], 'User_Actions': [], 'Authentication_Context_References': [], 'Application_Filter': '', 'User_Risk_Levels': [], 'Sign_In_Risk_Levels': [], 'Service_Principal_Risk_Levels': [], 'Insider_Risk_Levels': '', 'Client_App_Types': [], 'Platforms': '', 'Locations': '', 'Devices': '', 'Client_Applications': '' } try: if hasattr(conditions, 'user_risk_levels'): parsed['User_Risk_Levels'] = conditions.user_risk_levels or [] if hasattr(conditions, 'sign_in_risk_levels'): parsed['Sign_In_Risk_Levels'] = conditions.sign_in_risk_levels or [] if hasattr(conditions, 'service_principal_risk_levels'): parsed['Service_Principal_Risk_Levels'] = conditions.service_principal_risk_levels or [] if hasattr(conditions, 'insider_risk_levels'): parsed['Insider_Risk_Levels'] = conditions.insider_risk_levels or '' if hasattr(conditions, 'client_app_types'): parsed['Client_App_Types'] = conditions.client_app_types or [] if hasattr(conditions, 'applications'): if hasattr(conditions.applications, 'include_applications'): parsed['Apps_Include'] = conditions.applications.include_applications or [] if hasattr(conditions.applications, 'exclude_applications'): parsed['Apps_Exclude'] = conditions.applications.exclude_applications or [] if hasattr(conditions.applications, 'include_user_actions'): parsed['User_Actions'] = conditions.applications.include_user_actions or [] if hasattr(conditions.applications, 'include_authentication_context_class_references'): parsed['Authentication_Context_References'] = conditions.applications.include_authentication_context_class_references or [] if hasattr(conditions.applications, 'application_filter'): parsed['Application_Filter'] = conditions.applications.application_filter or '' if hasattr(conditions, 'users'): if hasattr(conditions.users, 'include_users'): parsed['Users_Include'] = conditions.users.include_users or [] if hasattr(conditions.users, 'exclude_users'): parsed['Users_Exclude'] = conditions.users.exclude_users or [] if hasattr(conditions.users, 'include_groups'): parsed['Groups_Include'] = conditions.users.include_groups or [] if hasattr(conditions.users, 'exclude_groups'): parsed['Groups_Exclude'] = conditions.users.exclude_groups or [] if hasattr(conditions.users, 'include_roles'): parsed['Roles_Include'] = conditions.users.include_roles or [] if hasattr(conditions.users, 'exclude_roles'): parsed['Roles_Exclude'] = conditions.users.exclude_roles or [] if hasattr(conditions.users, 'include_guests_or_external_users'): parsed['Include_Guest_Or_External_Users'] = str(conditions.users.include_guests_or_external_users or '') if hasattr(conditions.users, 'exclude_guests_or_external_users'): parsed['Exclude_Guest_Or_External_Users'] = str(conditions.users.exclude_guests_or_external_users or '') all_groups = list(set(parsed['Groups_Include'] + parsed['Groups_Exclude'])) if all_groups: group_details = await get_group_details(client, all_groups) parsed['Groups_Include_Names'] = [group_details.get(group_id, f"Unknown Group ({group_id})") for group_id in parsed['Groups_Include'] if group_id] parsed['Groups_Exclude_Names'] = [group_details.get(group_id, f"Unknown Group ({group_id})") for group_id in parsed['Groups_Exclude'] if group_id] parsed['Platforms'] = str(conditions.platforms or '') parsed['Locations'] = str(conditions.locations or '') parsed['Devices'] = str(conditions.devices or '') parsed['Client_Applications'] = str(conditions.client_applications or '') except Exception as e: logger.warning(f"Error parsing conditions: {str(e)}") return {k: format_list_for_csv(v) if isinstance(v, list) else v for k, v in parsed.items()} def parse_grant_controls(grant_controls): try: if not grant_controls: return { 'Operator': '', 'Built_In_Controls': '', 'Custom_Authentication_Factors': '', 'Terms_Of_Use': '', 'Auth_Strength_Id': '', 'Auth_Strength_DisplayName': '', 'Auth_Strength_Description': '', 'Auth_Strength_PolicyType': '', 'Auth_Strength_Requirements': '', 'Auth_Strength_Combinations': '' } parsed = {} parsed['Operator'] = grant_controls.operator if hasattr(grant_controls, 'operator') else '' parsed['Built_In_Controls'] = format_list_for_csv(grant_controls.built_in_controls) if hasattr(grant_controls, 'built_in_controls') else '' parsed['Custom_Authentication_Factors'] = format_list_for_csv(grant_controls.custom_authentication_factors) if hasattr(grant_controls, 'custom_authentication_factors') else '' parsed['Terms_Of_Use'] = format_list_for_csv(grant_controls.terms_of_use) if hasattr(grant_controls, 'terms_of_use') else '' if hasattr(grant_controls, 'authentication_strength'): auth_strength = grant_controls.authentication_strength parsed['Auth_Strength_Id'] = getattr(auth_strength, 'id', '') parsed['Auth_Strength_DisplayName'] = getattr(auth_strength, 'display_name', '') parsed['Auth_Strength_Description'] = getattr(auth_strength, 'description', '') parsed['Auth_Strength_PolicyType'] = getattr(auth_strength, 'policy_type', '') parsed['Auth_Strength_Requirements'] = getattr(auth_strength, 'requirements_satisfied', '') parsed['Auth_Strength_Combinations'] = format_list_for_csv(getattr(auth_strength, 'allowed_combinations', [])) else: parsed.update({ 'Auth_Strength_Id': '', 'Auth_Strength_DisplayName': '', 'Auth_Strength_Description': '', 'Auth_Strength_PolicyType': '', 'Auth_Strength_Requirements': '', 'Auth_Strength_Combinations': '' }) return parsed except Exception as e: logger.warning(f"Error parsing grant controls: {str(e)}") return { 'Operator': '', 'Built_In_Controls': '', 'Custom_Authentication_Factors': '', 'Terms_Of_Use': '', 'Auth_Strength_Id': '', 'Auth_Strength_DisplayName': '', 'Auth_Strength_Description': '', 'Auth_Strength_PolicyType': '', 'Auth_Strength_Requirements': '', 'Auth_Strength_Combinations': '' } def parse_session_controls(session_controls): try: if not session_controls: return { 'Disable_Resilience_Defaults': '', 'Application_Enforced_Restrictions': '', 'Cloud_App_Security': '', 'Persistent_Browser': '', 'Sign_In_Frequency_Value': '', 'Sign_In_Frequency_Type': '', 'Sign_In_Frequency_Auth_Type': '', 'Sign_In_Frequency_Interval': '', 'Sign_In_Frequency_IsEnabled': '' } parsed = {} parsed['Disable_Resilience_Defaults'] = str(getattr(session_controls, 'disable_resilience_defaults', '')) parsed['Application_Enforced_Restrictions'] = str(getattr(session_controls, 'application_enforced_restrictions', '')) parsed['Cloud_App_Security'] = str(getattr(session_controls, 'cloud_app_security', '')) parsed['Persistent_Browser'] = str(getattr(session_controls, 'persistent_browser', '')) if hasattr(session_controls, 'sign_in_frequency'): sign_in_freq = session_controls.sign_in_frequency parsed['Sign_In_Frequency_Value'] = str(getattr(sign_in_freq, 'value', '')) parsed['Sign_In_Frequency_Type'] = str(getattr(sign_in_freq, 'type', '')) parsed['Sign_In_Frequency_Auth_Type'] = str(getattr(sign_in_freq, 'authentication_type', '')) parsed['Sign_In_Frequency_Interval'] = str(getattr(sign_in_freq, 'frequency_interval', '')) parsed['Sign_In_Frequency_IsEnabled'] = 'Yes' if getattr(sign_in_freq, 'is_enabled', False) else 'No' else: parsed.update({ 'Sign_In_Frequency_Value': '', 'Sign_In_Frequency_Type': '', 'Sign_In_Frequency_Auth_Type': '', 'Sign_In_Frequency_Interval': '', 'Sign_In_Frequency_IsEnabled': '' }) return parsed except Exception as e: logger.warning(f"Error parsing session controls: {str(e)}") return { 'Disable_Resilience_Defaults': '', 'Application_Enforced_Restrictions': '', 'Cloud_App_Security': '', 'Persistent_Browser': '', 'Sign_In_Frequency_Value': '', 'Sign_In_Frequency_Type': '', 'Sign_In_Frequency_Auth_Type': '', 'Sign_In_Frequency_Interval': '', 'Sign_In_Frequency_IsEnabled': '' } async def get_conditional_access_policies(graph_client: GraphClient) -> List[Dict[str, Any]]: """Get all conditional access policies with comprehensive details.""" try: client = graph_client.get_client() policies_response = await client.identity.conditional_access.policies.get() policies = [] if policies_response and policies_response.value: for policy in policies_response.value: # Parse conditions conditions = await parse_conditions(client, getattr(policy, 'conditions', None)) # Parse grant controls grant_controls = parse_grant_controls(getattr(policy, 'grant_controls', None)) # Parse session controls session_controls = parse_session_controls(getattr(policy, 'session_controls', None)) # Compose policy data policy_data = { 'id': getattr(policy, 'id', None), 'displayName': getattr(policy, 'display_name', None), 'state': getattr(policy, 'state', None).value if getattr(policy, 'state', None) else None, 'createdDateTime': policy.created_date_time.isoformat() if getattr(policy, 'created_date_time', None) else None, 'modifiedDateTime': policy.modified_date_time.isoformat() if getattr(policy, 'modified_date_time', None) else None, **conditions, **{f'Grant_{k}': v for k, v in grant_controls.items()}, **session_controls } policies.append(policy_data) return policies except Exception as e: logger.error(f"Error fetching conditional access policies: {str(e)}") raise async def get_conditional_access_policy_by_id(graph_client: GraphClient, policy_id: str) -> Dict[str, Any]: """Get a single conditional access policy by its ID with comprehensive details.""" try: client = graph_client.get_client() policy = await client.identity.conditional_access.policies.by_conditional_access_policy_id(policy_id).get() if not policy: return {} # Parse conditions conditions = await parse_conditions(client, getattr(policy, 'conditions', None)) # Parse grant controls grant_controls = parse_grant_controls(getattr(policy, 'grant_controls', None)) # Parse session controls session_controls = parse_session_controls(getattr(policy, 'session_controls', None)) # Compose policy data policy_data = { 'id': getattr(policy, 'id', None), 'displayName': getattr(policy, 'display_name', None), 'state': getattr(policy, 'state', None).value if getattr(policy, 'state', None) else None, 'createdDateTime': policy.created_date_time.isoformat() if getattr(policy, 'created_date_time', None) else None, 'modifiedDateTime': policy.modified_date_time.isoformat() if getattr(policy, 'modified_date_time', None) else None, **conditions, **{f'Grant_{k}': v for k, v in grant_controls.items()}, **session_controls } return policy_data except Exception as e: logger.error(f"Error fetching conditional access policy by ID {policy_id}: {str(e)}") raise

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/hieuttmmo/entraid-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server