Skip to main content
Glama
hieuttmmo

EntraID MCP Server

by hieuttmmo
audit_logs.py5.54 kB
import logging from typing import Dict, List, Any from datetime import datetime, timedelta, timezone from msgraph.generated.audit_logs.directory_audits.directory_audits_request_builder import DirectoryAuditsRequestBuilder from kiota_abstractions.base_request_configuration import RequestConfiguration from utils.graph_client import GraphClient logger = logging.getLogger(__name__) async def get_user_audit_logs(graph_client: GraphClient, user_id: str, days: int = 30) -> List[Dict[str, Any]]: """Get all relevant directory audit logs for a user by user_id within the last N days (default 30), with paging support.""" try: client = graph_client.get_client() end_date = datetime.now(timezone.utc) start_date = end_date - timedelta(days=days) start_date_str = start_date.strftime('%Y-%m-%dT%H:%M:%SZ') end_date_str = end_date.strftime('%Y-%m-%dT%H:%M:%SZ') # Filter: initiatedBy/user/id eq '{user_id}' and activityDateTime in range filter_query = f"initiatedBy/user/id eq '{user_id}' and activityDateTime ge {start_date_str} and activityDateTime le {end_date_str}" logger.info(f"Fetching directory audit logs for user ID: {user_id}") logger.info(f"Date range: {start_date_str} to {end_date_str}") logger.info(f"Filter query: {filter_query}") query_params = DirectoryAuditsRequestBuilder.DirectoryAuditsRequestBuilderGetQueryParameters( filter=filter_query, orderby=["activityDateTime desc"], top=1000 ) request_configuration = RequestConfiguration(query_parameters=query_params) request_configuration.headers.add("ConsistencyLevel", "eventual") response = await client.audit_logs.directory_audits.get(request_configuration=request_configuration) logs = [] if response and response.value: logs.extend(response.value) while response is not None and getattr(response, 'odata_next_link', None): response = await client.audit_logs.directory_audits.with_url(response.odata_next_link).get(request_configuration=request_configuration) if response and response.value: logs.extend(response.value) formatted_logs = [] for log in logs: log_data = { "id": getattr(log, "id", None), "activityDateTime": log.activity_date_time.isoformat() if getattr(log, "activity_date_time", None) else None, "activityDisplayName": getattr(log, "activity_display_name", None), "category": getattr(log, "category", None), "operationType": getattr(log, "operation_type", None), "result": getattr(log, "result", None), "resultReason": getattr(log, "result_reason", None), "initiatedBy": None, "targetResources": None, "loggedByService": getattr(log, "logged_by_service", None), "correlationId": getattr(log, "correlation_id", None), "additionalDetails": [ {"key": getattr(kv, 'key', None), "value": getattr(kv, 'value', None)} for kv in getattr(log, 'additional_details', []) ] if hasattr(log, 'additional_details') and log.additional_details else [], } # initiatedBy if hasattr(log, 'initiated_by') and log.initiated_by: ib = log.initiated_by log_data["initiatedBy"] = { "user": { "id": getattr(ib.user, 'id', None) if hasattr(ib, 'user') and ib.user else None, "displayName": getattr(ib.user, 'display_name', None) if hasattr(ib, 'user') and ib.user else None, "userPrincipalName": getattr(ib.user, 'user_principal_name', None) if hasattr(ib, 'user') and ib.user else None } if hasattr(ib, 'user') and ib.user else None, "app": { "appId": getattr(ib.app, 'app_id', None) if hasattr(ib, 'app') and ib.app else None, "displayName": getattr(ib.app, 'display_name', None) if hasattr(ib, 'app') and ib.app else None } if hasattr(ib, 'app') and ib.app else None } # targetResources if hasattr(log, 'target_resources') and log.target_resources: log_data["targetResources"] = [ { "id": getattr(tr, 'id', None), "displayName": getattr(tr, 'display_name', None), "type": getattr(tr, 'type', None), "userPrincipalName": getattr(tr, 'user_principal_name', None), "modifiedProperties": [ { "displayName": getattr(mp, 'display_name', None), "oldValue": getattr(mp, 'old_value', None), "newValue": getattr(mp, 'new_value', None) } for mp in getattr(tr, 'modified_properties', []) ] if hasattr(tr, 'modified_properties') and tr.modified_properties else [] } for tr in log.target_resources ] formatted_logs.append(log_data) return formatted_logs except Exception as e: logger.error(f"Error fetching directory audit logs for user {user_id}: {str(e)}") raise

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/hieuttmmo/entraid-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server