Skip to main content
Glama
hieuttmmo

EntraID MCP Server

by hieuttmmo
groups.py20.3 kB
"""Groups resource module for Microsoft Graph. This module provides access to Microsoft Graph group resources. """ import logging from typing import Dict, List, Any, Optional from msgraph.generated.groups.groups_request_builder import GroupsRequestBuilder from msgraph.generated.models.group import Group from msgraph.generated.models.directory_object import DirectoryObject from utils.graph_client import GraphClient logger = logging.getLogger(__name__) async def get_all_groups(graph_client: GraphClient, limit: int = 100) -> List[Dict[str, Any]]: """Get all groups (up to the specified limit, with paging).""" try: client = graph_client.get_client() query_params = GroupsRequestBuilder.GroupsRequestBuilderGetQueryParameters(top=limit) request_configuration = GroupsRequestBuilder.GroupsRequestBuilderGetRequestConfiguration(query_parameters=query_params) response = await client.groups.get(request_configuration=request_configuration) groups = [] if response and response.value: groups.extend(response.value) # Paging: fetch more if odata_next_link is present while response is not None and getattr(response, 'odata_next_link', None): response = await client.groups.with_url(response.odata_next_link).get() if response and response.value: groups.extend(response.value) # Format output formatted_groups = [] for group in groups[:limit]: group_data = { 'id': group.id, 'displayName': group.display_name, 'mail': group.mail, 'mailNickname': group.mail_nickname, 'description': group.description, 'groupTypes': group.group_types, 'securityEnabled': group.security_enabled, 'mailEnabled': group.mail_enabled, 'createdDateTime': group.created_date_time.isoformat() if group.created_date_time else None } formatted_groups.append(group_data) return formatted_groups except Exception as e: logger.error(f"Error fetching all groups: {str(e)}") raise async def get_group_by_id(graph_client: GraphClient, group_id: str) -> Optional[Dict[str, Any]]: """Get a specific group by ID.""" try: client = graph_client.get_client() group = await client.groups.by_group_id(group_id).get() if group: group_data = { 'id': group.id, 'displayName': group.display_name, 'mail': group.mail, 'mailNickname': group.mail_nickname, 'description': group.description, 'groupTypes': group.group_types, 'securityEnabled': group.security_enabled, 'mailEnabled': group.mail_enabled, 'visibility': group.visibility, 'createdDateTime': group.created_date_time.isoformat() if group.created_date_time else None } return group_data return None except Exception as e: logger.error(f"Error fetching group {group_id}: {str(e)}") raise async def search_groups_by_name(graph_client: GraphClient, name: str, limit: int = 50) -> List[Dict[str, Any]]: """Search for groups by display name (case-insensitive, partial match, with paging).""" try: client = graph_client.get_client() filter_query = f"startswith(displayName,'{name}')" query_params = GroupsRequestBuilder.GroupsRequestBuilderGetQueryParameters( filter=filter_query, top=limit ) request_configuration = GroupsRequestBuilder.GroupsRequestBuilderGetRequestConfiguration(query_parameters=query_params) response = await client.groups.get(request_configuration=request_configuration) groups = [] if response and response.value: groups.extend(response.value) # Paging while response is not None and getattr(response, 'odata_next_link', None): response = await client.groups.with_url(response.odata_next_link).get() if response and response.value: groups.extend(response.value) formatted_groups = [] for group in groups[:limit]: group_data = { 'id': group.id, 'displayName': group.display_name, 'mail': group.mail, 'mailNickname': group.mail_nickname, 'description': group.description, 'groupTypes': group.group_types, 'securityEnabled': group.security_enabled, 'mailEnabled': group.mail_enabled, 'createdDateTime': group.created_date_time.isoformat() if group.created_date_time else None } formatted_groups.append(group_data) return formatted_groups except Exception as e: logger.error(f"Error searching groups by name: {str(e)}") raise async def get_group_members(graph_client: GraphClient, group_id: str, limit: int = 100) -> List[Dict[str, Any]]: """Get members of a group by group ID (up to the specified limit, with paging).""" try: client = graph_client.get_client() members_response = await client.groups.by_group_id(group_id).members.get() members = [] if members_response and members_response.value: members.extend(members_response.value) # Paging while members_response is not None and getattr(members_response, 'odata_next_link', None): members_response = await client.groups.by_group_id(group_id).members.with_url(members_response.odata_next_link).get() if members_response and members_response.value: members.extend(members_response.value) formatted_members = [] for member in members[:limit]: member_data = { 'id': getattr(member, 'id', None), 'displayName': getattr(member, 'display_name', None), 'mail': getattr(member, 'mail', None), 'userPrincipalName': getattr(member, 'user_principal_name', None), 'givenName': getattr(member, 'given_name', None), 'surname': getattr(member, 'surname', None), 'jobTitle': getattr(member, 'job_title', None), 'officeLocation': getattr(member, 'office_location', None), 'businessPhones': getattr(member, 'business_phones', None), 'mobilePhone': getattr(member, 'mobile_phone', None), 'type': getattr(member, 'odata_type', None) } formatted_members.append(member_data) return formatted_members except Exception as e: logger.error(f"Error fetching group members for group {group_id}: {str(e)}") raise async def create_group(graph_client: GraphClient, group_data: Dict[str, Any]) -> Dict[str, Any]: """Create a new group in Microsoft Graph. Args: graph_client: GraphClient instance group_data: Dictionary containing group properties Returns: The created group data """ try: client = graph_client.get_client() # Check if group already exists with the same display name or mail nickname display_name = group_data.get('displayName') mail_nickname = group_data.get('mailNickname') if display_name: # Check if a group with the same display name already exists filter_query = f"displayName eq '{display_name}'" query_params = GroupsRequestBuilder.GroupsRequestBuilderGetQueryParameters( filter=filter_query ) request_configuration = GroupsRequestBuilder.GroupsRequestBuilderGetRequestConfiguration(query_parameters=query_params) response = await client.groups.get(request_configuration=request_configuration) if response and response.value and len(response.value) > 0: logger.info(f"Group with display name '{display_name}' already exists") # Return the existing group existing_group = response.value[0] return { 'id': existing_group.id, 'displayName': existing_group.display_name, 'mail': existing_group.mail, 'mailNickname': existing_group.mail_nickname, 'description': existing_group.description, 'groupTypes': existing_group.group_types, 'securityEnabled': existing_group.security_enabled, 'mailEnabled': existing_group.mail_enabled, 'visibility': existing_group.visibility, 'createdDateTime': existing_group.created_date_time.isoformat() if existing_group.created_date_time else None, 'status': 'already_exists' } # Create a group object with the provided data group = Group() # Set required properties if 'displayName' in group_data: group.display_name = group_data['displayName'] else: raise ValueError("displayName is required for creating a group") if 'mailNickname' in group_data: group.mail_nickname = group_data['mailNickname'] else: raise ValueError("mailNickname is required for creating a group") # Set optional properties if 'description' in group_data: group.description = group_data['description'] # Handle group types and dynamic membership is_dynamic = False if 'groupTypes' in group_data: group_types = group_data['groupTypes'] # Check if DynamicMembership is in the group types if 'DynamicMembership' in group_types: is_dynamic = True # For dynamic groups, membershipRule and membershipRuleProcessingState are required if 'membershipRule' not in group_data: raise ValueError("membershipRule is required for dynamic membership groups") group.membership_rule = group_data['membershipRule'] group.membership_rule_processing_state = group_data.get('membershipRuleProcessingState', 'On') group.group_types = group_types if 'mailEnabled' in group_data: group.mail_enabled = group_data['mailEnabled'] if 'securityEnabled' in group_data: group.security_enabled = group_data['securityEnabled'] if 'visibility' in group_data: group.visibility = group_data['visibility'] if 'owners' in group_data: if not isinstance(group_data['owners'], list): raise ValueError("owners must be a list of user IDs") if 'members' in group_data and not is_dynamic: # Members cannot be added during creation for dynamic groups if not isinstance(group_data['members'], list): raise ValueError("members must be a list of user IDs") # Create the group new_group = await client.groups.post(group) # Add owners if provided if 'owners' in group_data and new_group and new_group.id: for owner_id in group_data['owners']: await add_group_owner(graph_client, new_group.id, owner_id) # Add members if provided and not dynamic membership if 'members' in group_data and new_group and new_group.id and not is_dynamic: for member_id in group_data['members']: await add_group_member(graph_client, new_group.id, member_id) # Return the created group if new_group: created_group = { 'id': new_group.id, 'displayName': new_group.display_name, 'mail': new_group.mail, 'mailNickname': new_group.mail_nickname, 'description': new_group.description, 'groupTypes': new_group.group_types, 'securityEnabled': new_group.security_enabled, 'mailEnabled': new_group.mail_enabled, 'visibility': new_group.visibility, 'createdDateTime': new_group.created_date_time.isoformat() if new_group.created_date_time else None } # Add dynamic membership properties if applicable if is_dynamic: created_group['membershipRule'] = new_group.membership_rule created_group['membershipRuleProcessingState'] = new_group.membership_rule_processing_state return created_group raise Exception("Failed to create group") except Exception as e: logger.error(f"Error creating group: {str(e)}") raise async def update_group(graph_client: GraphClient, group_id: str, group_data: Dict[str, Any]) -> Dict[str, Any]: """Update an existing group in Microsoft Graph. Args: graph_client: GraphClient instance group_id: ID of the group to update group_data: Dictionary containing group properties to update Returns: The updated group data """ try: client = graph_client.get_client() # Create a group object with the provided update data group = Group() # Set properties to update if 'displayName' in group_data: group.display_name = group_data['displayName'] if 'mailNickname' in group_data: group.mail_nickname = group_data['mailNickname'] if 'description' in group_data: group.description = group_data['description'] if 'visibility' in group_data: group.visibility = group_data['visibility'] # Update the group await client.groups.by_group_id(group_id).patch(group) # Get the updated group to return updated_group = await get_group_by_id(graph_client, group_id) if not updated_group: raise Exception(f"Failed to retrieve updated group with ID {group_id}") return updated_group except Exception as e: logger.error(f"Error updating group {group_id}: {str(e)}") raise async def delete_group(graph_client: GraphClient, group_id: str) -> bool: """Delete a group from Microsoft Graph. Args: graph_client: GraphClient instance group_id: ID of the group to delete Returns: True if successful, raises an exception otherwise """ try: client = graph_client.get_client() # Delete the group await client.groups.by_group_id(group_id).delete() return True except Exception as e: logger.error(f"Error deleting group {group_id}: {str(e)}") raise async def add_group_member(graph_client: GraphClient, group_id: str, member_id: str) -> bool: """Add a member to a group. Args: graph_client: GraphClient instance group_id: ID of the group member_id: ID of the member (user, group, device, etc.) to add Returns: True if successful, raises an exception otherwise """ try: client = graph_client.get_client() # First, check if the group is dynamic - can't add members to dynamic groups group = await client.groups.by_group_id(group_id).get() if group and group.group_types and 'DynamicMembership' in group.group_types: logger.warning(f"Cannot add members to dynamic group {group_id}") raise ValueError("Cannot add members to a dynamic membership group. Members are determined by the membership rule.") # Check if member is already in the group try: # This will raise an exception if member is not found existing_member = await client.groups.by_group_id(group_id).members.by_directory_object_id(member_id).get() if existing_member: logger.info(f"Member {member_id} is already in group {group_id}") return True except Exception: # Member is not in the group, continue with adding pass # Create a reference to the directory object (member) directory_object = DirectoryObject() directory_object.id = member_id # Add the member to the group await client.groups.by_group_id(group_id).members.ref.post(directory_object) return True except Exception as e: logger.error(f"Error adding member {member_id} to group {group_id}: {str(e)}") raise async def remove_group_member(graph_client: GraphClient, group_id: str, member_id: str) -> bool: """Remove a member from a group. Args: graph_client: GraphClient instance group_id: ID of the group member_id: ID of the member to remove Returns: True if successful, raises an exception otherwise """ try: client = graph_client.get_client() # First, check if the group is dynamic - can't remove members from dynamic groups group = await client.groups.by_group_id(group_id).get() if group and group.group_types and 'DynamicMembership' in group.group_types: logger.warning(f"Cannot remove members from dynamic group {group_id}") raise ValueError("Cannot remove members from a dynamic membership group. Members are determined by the membership rule.") # Check if member exists in the group try: # This will raise an exception if member is not found await client.groups.by_group_id(group_id).members.by_directory_object_id(member_id).get() except Exception as e: logger.info(f"Member {member_id} not found in group {group_id}: {str(e)}") return True # Already not a member, so removal "succeeded" # Remove the member from the group await client.groups.by_group_id(group_id).members.by_directory_object_id(member_id).ref.delete() return True except Exception as e: logger.error(f"Error removing member {member_id} from group {group_id}: {str(e)}") raise async def add_group_owner(graph_client: GraphClient, group_id: str, owner_id: str) -> bool: """Add an owner to a group. Args: graph_client: GraphClient instance group_id: ID of the group owner_id: ID of the user to add as owner Returns: True if successful, raises an exception otherwise """ try: client = graph_client.get_client() # Create a reference to the directory object (owner) directory_object = DirectoryObject() directory_object.id = owner_id # Add the owner to the group await client.groups.by_group_id(group_id).owners.ref.post(directory_object) return True except Exception as e: logger.error(f"Error adding owner {owner_id} to group {group_id}: {str(e)}") raise async def remove_group_owner(graph_client: GraphClient, group_id: str, owner_id: str) -> bool: """Remove an owner from a group. Args: graph_client: GraphClient instance group_id: ID of the group owner_id: ID of the owner to remove Returns: True if successful, raises an exception otherwise """ try: client = graph_client.get_client() # Remove the owner from the group await client.groups.by_group_id(group_id).owners.by_directory_object_id(owner_id).ref.delete() return True except Exception as e: logger.error(f"Error removing owner {owner_id} from group {group_id}: {str(e)}") raise

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/hieuttmmo/entraid-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server