Skip to main content
Glama
hieuttmmo

EntraID MCP Server

by hieuttmmo
applications.py10.1 kB
"""Applications resource module for Microsoft Graph. This module provides access to Microsoft Graph application resources (app registrations). """ import logging from typing import Dict, List, Any, Optional from utils.graph_client import GraphClient from msgraph.generated.models.application import Application from .service_principals import get_service_principal_by_app_id logger = logging.getLogger(__name__) async def list_applications(graph_client: GraphClient, limit: int = 100) -> List[Dict[str, Any]]: """List all applications (app registrations) in the tenant, with paging.""" try: client = graph_client.get_client() response = await client.applications.get() applications = [] if response and response.value: applications.extend(response.value) # Paging: fetch more if odata_next_link is present while response is not None and getattr(response, 'odata_next_link', None) and len(applications) < limit: response = await client.applications.with_url(response.odata_next_link).get() if response and response.value: applications.extend(response.value) formatted_apps = [] for app in applications[:limit]: app_data = { 'id': getattr(app, 'id', None), 'appId': getattr(app, 'app_id', None), 'displayName': getattr(app, 'display_name', None), 'createdDateTime': app.created_date_time.isoformat() if getattr(app, 'created_date_time', None) else None, 'signInAudience': getattr(app, 'sign_in_audience', None), 'publisherDomain': getattr(app, 'publisher_domain', None), 'tags': getattr(app, 'tags', None), } formatted_apps.append(app_data) return formatted_apps except Exception as e: logger.error(f"Error listing applications: {str(e)}") raise async def get_application_by_id(graph_client: GraphClient, app_id: str) -> Optional[Dict[str, Any]]: """Get a specific application by its object ID, including appRoleAssignments and oauth2PermissionGrants from the corresponding service principal.""" try: client = graph_client.get_client() app = await client.applications.by_application_id(app_id).get() if app: app_data = { 'id': getattr(app, 'id', None), 'appId': getattr(app, 'app_id', None), 'displayName': getattr(app, 'display_name', None), 'createdDateTime': app.created_date_time.isoformat() if getattr(app, 'created_date_time', None) else None, 'signInAudience': getattr(app, 'sign_in_audience', None), 'publisherDomain': getattr(app, 'publisher_domain', None), 'tags': getattr(app, 'tags', None), } # Find the corresponding service principal by appId sp = await get_service_principal_by_app_id(graph_client, getattr(app, 'app_id', None)) if sp: sp_id = getattr(sp, 'id', None) # Fetch appRoleAssignments and oauth2PermissionGrants using the same logic as in service_principals.py # Fetch appRoleAssignments app_role_assignments = [] try: response = await client.service_principals.by_service_principal_id(sp_id).app_role_assignments.get() while response: if response.value: for assignment in response.value: app_role_assignments.append({ 'id': getattr(assignment, 'id', None), 'createdDateTime': getattr(assignment, 'created_date_time', None), 'appRoleId': getattr(assignment, 'app_role_id', None), 'principalDisplayName': getattr(assignment, 'principal_display_name', None), 'principalId': getattr(assignment, 'principal_id', None), 'principalType': getattr(assignment, 'principal_type', None), 'resourceDisplayName': getattr(assignment, 'resource_display_name', None), 'resourceId': getattr(assignment, 'resource_id', None), }) if getattr(response, 'odata_next_link', None): response = await client.service_principals.by_service_principal_id(sp_id).app_role_assignments.with_url(response.odata_next_link).get() else: break except Exception as e: logger.warning(f"Error fetching appRoleAssignments for service principal {sp_id}: {str(e)}") app_data['appRoleAssignments'] = app_role_assignments # Fetch oauth2PermissionGrants oauth2_permission_grants = [] try: response = await client.service_principals.by_service_principal_id(sp_id).oauth2_permission_grants.get() while response: if response.value: for grant in response.value: oauth2_permission_grants.append({ 'id': getattr(grant, 'id', None), 'clientId': getattr(grant, 'client_id', None), 'consentType': getattr(grant, 'consent_type', None), 'principalId': getattr(grant, 'principal_id', None), 'resourceId': getattr(grant, 'resource_id', None), 'scope': getattr(grant, 'scope', None), }) if getattr(response, 'odata_next_link', None): response = await client.service_principals.by_service_principal_id(sp_id).oauth2_permission_grants.with_url(response.odata_next_link).get() else: break except Exception as e: logger.warning(f"Error fetching oauth2PermissionGrants for service principal {sp_id}: {str(e)}") app_data['oauth2PermissionGrants'] = oauth2_permission_grants else: app_data['appRoleAssignments'] = [] app_data['oauth2PermissionGrants'] = [] return app_data return None except Exception as e: logger.error(f"Error getting application {app_id}: {str(e)}") raise async def create_application(graph_client: GraphClient, app_data: Dict[str, Any]) -> Dict[str, Any]: """Create a new application (app registration).""" try: client = graph_client.get_client() app = Application() # Set properties from app_data if 'displayName' in app_data: app.display_name = app_data['displayName'] if 'signInAudience' in app_data: app.sign_in_audience = app_data['signInAudience'] if 'tags' in app_data: app.tags = app_data['tags'] if 'identifierUris' in app_data: app.identifier_uris = app_data['identifierUris'] if 'web' in app_data: app.web = app_data['web'] if 'api' in app_data: app.api = app_data['api'] if 'requiredResourceAccess' in app_data: app.required_resource_access = app_data['requiredResourceAccess'] new_app = await client.applications.post(app) if new_app: return { 'id': getattr(new_app, 'id', None), 'appId': getattr(new_app, 'app_id', None), 'displayName': getattr(new_app, 'display_name', None), 'createdDateTime': new_app.created_date_time.isoformat() if getattr(new_app, 'created_date_time', None) else None, 'signInAudience': getattr(new_app, 'sign_in_audience', None), 'publisherDomain': getattr(new_app, 'publisher_domain', None), 'tags': getattr(new_app, 'tags', None), } raise Exception("Failed to create application") except Exception as e: logger.error(f"Error creating application: {str(e)}") raise async def update_application(graph_client: GraphClient, app_id: str, app_data: Dict[str, Any]) -> Dict[str, Any]: """Update an existing application (app registration).""" try: client = graph_client.get_client() app = Application() # Set updatable properties from app_data if 'displayName' in app_data: app.display_name = app_data['displayName'] if 'signInAudience' in app_data: app.sign_in_audience = app_data['signInAudience'] if 'tags' in app_data: app.tags = app_data['tags'] if 'identifierUris' in app_data: app.identifier_uris = app_data['identifierUris'] if 'web' in app_data: app.web = app_data['web'] if 'api' in app_data: app.api = app_data['api'] if 'requiredResourceAccess' in app_data: app.required_resource_access = app_data['requiredResourceAccess'] await client.applications.by_application_id(app_id).patch(app) # Return the updated application return await get_application_by_id(graph_client, app_id) except Exception as e: logger.error(f"Error updating application {app_id}: {str(e)}") raise async def delete_application(graph_client: GraphClient, app_id: str) -> bool: """Delete an application (app registration) by its object ID.""" try: client = graph_client.get_client() await client.applications.by_application_id(app_id).delete() return True except Exception as e: logger.error(f"Error deleting application {app_id}: {str(e)}") raise

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/hieuttmmo/entraid-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server