"""
Microsoft Graph API client for full M365 suite access.
Supports: SharePoint, OneDrive, Outlook, Teams, Calendar
"""
import os
import msal
import requests
from typing import Dict, List, Optional, Any
from datetime import datetime, timedelta
import logging
logger = logging.getLogger(__name__)
class GraphClient:
"""
Microsoft Graph API client for M365 read-only operations.
Authentication uses client credentials flow (app-only).
Requires Azure AD app registration with appropriate permissions.
"""
BASE_URL = "https://graph.microsoft.com/v1.0"
SCOPES = ["https://graph.microsoft.com/.default"]
def __init__(
self,
client_id: str = None,
tenant_id: str = None,
client_secret: str = None
):
"""Initialize Graph client with Azure credentials."""
self.client_id = client_id or os.getenv("AZURE_CLIENT_ID", "")
self.tenant_id = tenant_id or os.getenv("AZURE_TENANT_ID", "")
self.client_secret = client_secret or os.getenv("AZURE_CLIENT_SECRET", "")
self._access_token = None
self._token_expires = None
self._msal_app = None
def is_configured(self) -> bool:
"""Check if Azure credentials are configured."""
return bool(self.client_id and self.tenant_id and self.client_secret)
def _get_msal_app(self) -> msal.ConfidentialClientApplication:
"""Get or create MSAL client application."""
if not self._msal_app:
if not self.is_configured():
raise ValueError(
"Azure credentials not configured. "
"Set AZURE_CLIENT_ID, AZURE_TENANT_ID, AZURE_CLIENT_SECRET"
)
self._msal_app = msal.ConfidentialClientApplication(
client_id=self.client_id,
client_credential=self.client_secret,
authority=f"https://login.microsoftonline.com/{self.tenant_id}",
)
return self._msal_app
def _get_access_token(self) -> str:
"""Get or refresh access token."""
if self._access_token and self._token_expires:
if datetime.utcnow() < self._token_expires - timedelta(minutes=5):
return self._access_token
app = self._get_msal_app()
result = app.acquire_token_for_client(scopes=self.SCOPES)
if "access_token" in result:
self._access_token = result["access_token"]
self._token_expires = datetime.utcnow() + timedelta(
seconds=result.get("expires_in", 3600)
)
logger.info(f"Token acquired, expires in {result.get('expires_in')}s")
return self._access_token
else:
error = result.get("error_description", result.get("error", "Unknown"))
logger.error(f"Token acquisition failed: {error}")
raise ValueError(f"Failed to acquire token: {error}")
def _request(
self,
method: str,
endpoint: str,
params: Dict = None,
**kwargs
) -> requests.Response:
"""Make authenticated request to Graph API."""
token = self._get_access_token()
headers = {
"Authorization": f"Bearer {token}",
"Content-Type": "application/json",
}
headers.update(kwargs.pop("headers", {}))
url = f"{self.BASE_URL}{endpoint}"
response = requests.request(
method, url, headers=headers, params=params, **kwargs
)
if response.status_code == 401:
self._access_token = None
token = self._get_access_token()
headers["Authorization"] = f"Bearer {token}"
response = requests.request(
method, url, headers=headers, params=params, **kwargs
)
return response
def test_connection(self) -> Dict:
"""Test Graph API connectivity."""
try:
# Try to get organization info (works with app permissions)
response = self._request("GET", "/organization")
if response.status_code == 200:
org = response.json().get("value", [{}])[0]
return {
"success": True,
"organization": org.get("displayName", "Unknown"),
"tenant_id": org.get("id", self.tenant_id)
}
return {"success": False, "error": f"HTTP {response.status_code}"}
except Exception as e:
return {"success": False, "error": str(e)}
# ========================================
# User Operations
# ========================================
def list_users(self, query: str = None, limit: int = 50) -> List[Dict]:
"""List users in the organization."""
params = {"$top": limit, "$select": "id,displayName,mail,jobTitle,department"}
if query:
params["$filter"] = f"startswith(displayName,'{query}') or startswith(mail,'{query}')"
response = self._request("GET", "/users", params=params)
response.raise_for_status()
return response.json().get("value", [])
def get_user(self, user_id_or_email: str) -> Dict:
"""Get user by ID or email."""
response = self._request("GET", f"/users/{user_id_or_email}")
response.raise_for_status()
return response.json()
# ========================================
# SharePoint Site Operations
# ========================================
def search_sites(self, query: str, limit: int = 10) -> List[Dict]:
"""Search for SharePoint sites."""
response = self._request("GET", f"/sites?search={query}")
response.raise_for_status()
sites = response.json().get("value", [])
return sites[:limit]
def get_site(self, site_id: str) -> Dict:
"""Get SharePoint site info."""
response = self._request("GET", f"/sites/{site_id}")
response.raise_for_status()
return response.json()
def list_site_drives(self, site_id: str) -> List[Dict]:
"""List document libraries in a site."""
response = self._request("GET", f"/sites/{site_id}/drives")
response.raise_for_status()
return response.json().get("value", [])
# ========================================
# SharePoint/OneDrive File Operations
# ========================================
def search_files(
self,
query: str,
site_id: str = None,
limit: int = 20
) -> List[Dict]:
"""
Search for files across SharePoint or a specific site.
Args:
query: Search query
site_id: Optional site ID to scope search
limit: Max results
"""
if site_id:
endpoint = f"/sites/{site_id}/drive/root/search(q='{query}')"
else:
# Search across all sites the app has access to
endpoint = f"/search/query"
# Use search API for cross-site search
body = {
"requests": [{
"entityTypes": ["driveItem"],
"query": {"queryString": query},
"from": 0,
"size": limit
}]
}
response = self._request("POST", endpoint, json=body)
if response.status_code == 200:
hits = response.json().get("value", [{}])[0].get("hitsContainers", [{}])[0].get("hits", [])
return [hit.get("resource", {}) for hit in hits][:limit]
# Fallback to simpler search if search API fails
endpoint = f"/sites/root/drive/root/search(q='{query}')"
response = self._request("GET", endpoint)
response.raise_for_status()
return response.json().get("value", [])[:limit]
def list_drive_items(
self,
site_id: str,
path: str = "/",
drive_id: str = None
) -> List[Dict]:
"""List items in a SharePoint folder."""
if drive_id:
if path == "/" or not path:
endpoint = f"/drives/{drive_id}/root/children"
else:
path = path.lstrip("/")
endpoint = f"/drives/{drive_id}/root:/{path}:/children"
else:
if path == "/" or not path:
endpoint = f"/sites/{site_id}/drive/root/children"
else:
path = path.lstrip("/")
endpoint = f"/sites/{site_id}/drive/root:/{path}:/children"
response = self._request("GET", endpoint)
if response.status_code == 404:
return []
response.raise_for_status()
return response.json().get("value", [])
def get_file_content(
self,
site_id: str,
item_id: str,
max_size_mb: int = 10
) -> Optional[bytes]:
"""
Get file content by item ID.
Only for text/small files. Returns None if too large.
"""
# First check file size
meta_response = self._request(
"GET", f"/sites/{site_id}/drive/items/{item_id}"
)
if meta_response.status_code != 200:
return None
size = meta_response.json().get("size", 0)
if size > max_size_mb * 1024 * 1024:
logger.warning(f"File too large: {size} bytes")
return None
# Download content
response = self._request(
"GET", f"/sites/{site_id}/drive/items/{item_id}/content"
)
if response.status_code == 200:
return response.content
return None
def get_file_metadata(self, site_id: str, item_id: str) -> Dict:
"""Get file metadata by item ID."""
response = self._request(
"GET", f"/sites/{site_id}/drive/items/{item_id}"
)
response.raise_for_status()
return response.json()
# ========================================
# OneDrive (User-scoped)
# ========================================
def list_user_files(
self,
user_id_or_email: str,
path: str = "/"
) -> List[Dict]:
"""List files in user's OneDrive."""
if path == "/" or not path:
endpoint = f"/users/{user_id_or_email}/drive/root/children"
else:
path = path.lstrip("/")
endpoint = f"/users/{user_id_or_email}/drive/root:/{path}:/children"
response = self._request("GET", endpoint)
if response.status_code == 404:
return []
response.raise_for_status()
return response.json().get("value", [])
def search_user_files(
self,
user_id_or_email: str,
query: str,
limit: int = 20
) -> List[Dict]:
"""Search files in user's OneDrive."""
endpoint = f"/users/{user_id_or_email}/drive/root/search(q='{query}')"
response = self._request("GET", endpoint)
response.raise_for_status()
return response.json().get("value", [])[:limit]
# ========================================
# Outlook Mail Operations
# ========================================
def list_recent_emails(
self,
user_id_or_email: str,
limit: int = 20,
folder: str = "inbox"
) -> List[Dict]:
"""List recent emails from a user's mailbox."""
params = {
"$top": limit,
"$select": "id,subject,from,receivedDateTime,bodyPreview,hasAttachments",
"$orderby": "receivedDateTime desc"
}
endpoint = f"/users/{user_id_or_email}/mailFolders/{folder}/messages"
response = self._request("GET", endpoint, params=params)
response.raise_for_status()
return response.json().get("value", [])
def search_emails(
self,
query: str,
user_id_or_email: str = None,
limit: int = 20
) -> List[Dict]:
"""
Search emails using KQL syntax.
Args:
query: Search query (KQL syntax)
user_id_or_email: User to search (None = all accessible)
limit: Max results
"""
params = {
"$top": limit,
"$search": f'"{query}"',
"$select": "id,subject,from,receivedDateTime,bodyPreview"
}
if user_id_or_email:
endpoint = f"/users/{user_id_or_email}/messages"
else:
# Would need to iterate users or use search API
# For now, require user specification
raise ValueError("user_id_or_email required for email search")
response = self._request("GET", endpoint, params=params)
response.raise_for_status()
return response.json().get("value", [])
def get_email(
self,
user_id_or_email: str,
message_id: str,
include_body: bool = True
) -> Dict:
"""Get full email by ID."""
select = "id,subject,from,toRecipients,ccRecipients,receivedDateTime,hasAttachments"
if include_body:
select += ",body"
params = {"$select": select}
endpoint = f"/users/{user_id_or_email}/messages/{message_id}"
response = self._request("GET", endpoint, params=params)
response.raise_for_status()
return response.json()
# ========================================
# Teams Operations
# ========================================
def list_teams(self, limit: int = 50) -> List[Dict]:
"""List all teams the app can access."""
params = {"$top": limit}
response = self._request("GET", "/groups", params={
"$filter": "resourceProvisioningOptions/Any(x:x eq 'Team')",
"$top": limit,
"$select": "id,displayName,description,mail"
})
response.raise_for_status()
return response.json().get("value", [])
def get_team(self, team_id: str) -> Dict:
"""Get team details."""
response = self._request("GET", f"/teams/{team_id}")
response.raise_for_status()
return response.json()
def list_channels(self, team_id: str) -> List[Dict]:
"""List channels in a team."""
response = self._request("GET", f"/teams/{team_id}/channels")
response.raise_for_status()
return response.json().get("value", [])
def get_channel_messages(
self,
team_id: str,
channel_id: str,
limit: int = 50
) -> List[Dict]:
"""Get recent messages from a channel."""
params = {"$top": limit}
endpoint = f"/teams/{team_id}/channels/{channel_id}/messages"
response = self._request("GET", endpoint, params=params)
response.raise_for_status()
return response.json().get("value", [])
def search_channel_messages(
self,
team_id: str,
channel_id: str,
query: str,
limit: int = 20
) -> List[Dict]:
"""
Search messages in a channel.
Note: Graph API search for channel messages is limited.
This fetches messages and filters client-side.
"""
messages = self.get_channel_messages(team_id, channel_id, limit=100)
query_lower = query.lower()
matching = []
for msg in messages:
body = msg.get("body", {}).get("content", "").lower()
subject = msg.get("subject", "").lower()
if query_lower in body or query_lower in subject:
matching.append(msg)
if len(matching) >= limit:
break
return matching
# ========================================
# Calendar Operations
# ========================================
def list_events(
self,
user_id_or_email: str,
days_ahead: int = 7,
days_back: int = 0
) -> List[Dict]:
"""List calendar events for a user."""
now = datetime.utcnow()
start = (now - timedelta(days=days_back)).isoformat() + "Z"
end = (now + timedelta(days=days_ahead)).isoformat() + "Z"
params = {
"startDateTime": start,
"endDateTime": end,
"$select": "id,subject,start,end,location,organizer,attendees,bodyPreview",
"$orderby": "start/dateTime"
}
endpoint = f"/users/{user_id_or_email}/calendarView"
response = self._request("GET", endpoint, params=params)
response.raise_for_status()
return response.json().get("value", [])
def search_events(
self,
user_id_or_email: str,
query: str,
limit: int = 20
) -> List[Dict]:
"""Search calendar events by subject."""
params = {
"$filter": f"contains(subject,'{query}')",
"$top": limit,
"$select": "id,subject,start,end,location,organizer",
"$orderby": "start/dateTime desc"
}
endpoint = f"/users/{user_id_or_email}/events"
response = self._request("GET", endpoint, params=params)
response.raise_for_status()
return response.json().get("value", [])
def get_event(self, user_id_or_email: str, event_id: str) -> Dict:
"""Get full event details."""
endpoint = f"/users/{user_id_or_email}/events/{event_id}"
response = self._request("GET", endpoint)
response.raise_for_status()
return response.json()
# Singleton instance
_client: Optional[GraphClient] = None
def get_client() -> GraphClient:
"""Get or create the Graph client singleton."""
global _client
if _client is None:
_client = GraphClient()
return _client