Skip to main content
Glama
graph_client.py17.7 kB
""" Microsoft Graph API client for full M365 suite access. Supports: SharePoint, OneDrive, Outlook, Teams, Calendar """ import os import msal import requests from typing import Dict, List, Optional, Any from datetime import datetime, timedelta import logging logger = logging.getLogger(__name__) class GraphClient: """ Microsoft Graph API client for M365 read-only operations. Authentication uses client credentials flow (app-only). Requires Azure AD app registration with appropriate permissions. """ BASE_URL = "https://graph.microsoft.com/v1.0" SCOPES = ["https://graph.microsoft.com/.default"] def __init__( self, client_id: str = None, tenant_id: str = None, client_secret: str = None ): """Initialize Graph client with Azure credentials.""" self.client_id = client_id or os.getenv("AZURE_CLIENT_ID", "") self.tenant_id = tenant_id or os.getenv("AZURE_TENANT_ID", "") self.client_secret = client_secret or os.getenv("AZURE_CLIENT_SECRET", "") self._access_token = None self._token_expires = None self._msal_app = None def is_configured(self) -> bool: """Check if Azure credentials are configured.""" return bool(self.client_id and self.tenant_id and self.client_secret) def _get_msal_app(self) -> msal.ConfidentialClientApplication: """Get or create MSAL client application.""" if not self._msal_app: if not self.is_configured(): raise ValueError( "Azure credentials not configured. " "Set AZURE_CLIENT_ID, AZURE_TENANT_ID, AZURE_CLIENT_SECRET" ) self._msal_app = msal.ConfidentialClientApplication( client_id=self.client_id, client_credential=self.client_secret, authority=f"https://login.microsoftonline.com/{self.tenant_id}", ) return self._msal_app def _get_access_token(self) -> str: """Get or refresh access token.""" if self._access_token and self._token_expires: if datetime.utcnow() < self._token_expires - timedelta(minutes=5): return self._access_token app = self._get_msal_app() result = app.acquire_token_for_client(scopes=self.SCOPES) if "access_token" in result: self._access_token = result["access_token"] self._token_expires = datetime.utcnow() + timedelta( seconds=result.get("expires_in", 3600) ) logger.info(f"Token acquired, expires in {result.get('expires_in')}s") return self._access_token else: error = result.get("error_description", result.get("error", "Unknown")) logger.error(f"Token acquisition failed: {error}") raise ValueError(f"Failed to acquire token: {error}") def _request( self, method: str, endpoint: str, params: Dict = None, **kwargs ) -> requests.Response: """Make authenticated request to Graph API.""" token = self._get_access_token() headers = { "Authorization": f"Bearer {token}", "Content-Type": "application/json", } headers.update(kwargs.pop("headers", {})) url = f"{self.BASE_URL}{endpoint}" response = requests.request( method, url, headers=headers, params=params, **kwargs ) if response.status_code == 401: self._access_token = None token = self._get_access_token() headers["Authorization"] = f"Bearer {token}" response = requests.request( method, url, headers=headers, params=params, **kwargs ) return response def test_connection(self) -> Dict: """Test Graph API connectivity.""" try: # Try to get organization info (works with app permissions) response = self._request("GET", "/organization") if response.status_code == 200: org = response.json().get("value", [{}])[0] return { "success": True, "organization": org.get("displayName", "Unknown"), "tenant_id": org.get("id", self.tenant_id) } return {"success": False, "error": f"HTTP {response.status_code}"} except Exception as e: return {"success": False, "error": str(e)} # ======================================== # User Operations # ======================================== def list_users(self, query: str = None, limit: int = 50) -> List[Dict]: """List users in the organization.""" params = {"$top": limit, "$select": "id,displayName,mail,jobTitle,department"} if query: params["$filter"] = f"startswith(displayName,'{query}') or startswith(mail,'{query}')" response = self._request("GET", "/users", params=params) response.raise_for_status() return response.json().get("value", []) def get_user(self, user_id_or_email: str) -> Dict: """Get user by ID or email.""" response = self._request("GET", f"/users/{user_id_or_email}") response.raise_for_status() return response.json() # ======================================== # SharePoint Site Operations # ======================================== def search_sites(self, query: str, limit: int = 10) -> List[Dict]: """Search for SharePoint sites.""" response = self._request("GET", f"/sites?search={query}") response.raise_for_status() sites = response.json().get("value", []) return sites[:limit] def get_site(self, site_id: str) -> Dict: """Get SharePoint site info.""" response = self._request("GET", f"/sites/{site_id}") response.raise_for_status() return response.json() def list_site_drives(self, site_id: str) -> List[Dict]: """List document libraries in a site.""" response = self._request("GET", f"/sites/{site_id}/drives") response.raise_for_status() return response.json().get("value", []) # ======================================== # SharePoint/OneDrive File Operations # ======================================== def search_files( self, query: str, site_id: str = None, limit: int = 20 ) -> List[Dict]: """ Search for files across SharePoint or a specific site. Args: query: Search query site_id: Optional site ID to scope search limit: Max results """ if site_id: endpoint = f"/sites/{site_id}/drive/root/search(q='{query}')" else: # Search across all sites the app has access to endpoint = f"/search/query" # Use search API for cross-site search body = { "requests": [{ "entityTypes": ["driveItem"], "query": {"queryString": query}, "from": 0, "size": limit }] } response = self._request("POST", endpoint, json=body) if response.status_code == 200: hits = response.json().get("value", [{}])[0].get("hitsContainers", [{}])[0].get("hits", []) return [hit.get("resource", {}) for hit in hits][:limit] # Fallback to simpler search if search API fails endpoint = f"/sites/root/drive/root/search(q='{query}')" response = self._request("GET", endpoint) response.raise_for_status() return response.json().get("value", [])[:limit] def list_drive_items( self, site_id: str, path: str = "/", drive_id: str = None ) -> List[Dict]: """List items in a SharePoint folder.""" if drive_id: if path == "/" or not path: endpoint = f"/drives/{drive_id}/root/children" else: path = path.lstrip("/") endpoint = f"/drives/{drive_id}/root:/{path}:/children" else: if path == "/" or not path: endpoint = f"/sites/{site_id}/drive/root/children" else: path = path.lstrip("/") endpoint = f"/sites/{site_id}/drive/root:/{path}:/children" response = self._request("GET", endpoint) if response.status_code == 404: return [] response.raise_for_status() return response.json().get("value", []) def get_file_content( self, site_id: str, item_id: str, max_size_mb: int = 10 ) -> Optional[bytes]: """ Get file content by item ID. Only for text/small files. Returns None if too large. """ # First check file size meta_response = self._request( "GET", f"/sites/{site_id}/drive/items/{item_id}" ) if meta_response.status_code != 200: return None size = meta_response.json().get("size", 0) if size > max_size_mb * 1024 * 1024: logger.warning(f"File too large: {size} bytes") return None # Download content response = self._request( "GET", f"/sites/{site_id}/drive/items/{item_id}/content" ) if response.status_code == 200: return response.content return None def get_file_metadata(self, site_id: str, item_id: str) -> Dict: """Get file metadata by item ID.""" response = self._request( "GET", f"/sites/{site_id}/drive/items/{item_id}" ) response.raise_for_status() return response.json() # ======================================== # OneDrive (User-scoped) # ======================================== def list_user_files( self, user_id_or_email: str, path: str = "/" ) -> List[Dict]: """List files in user's OneDrive.""" if path == "/" or not path: endpoint = f"/users/{user_id_or_email}/drive/root/children" else: path = path.lstrip("/") endpoint = f"/users/{user_id_or_email}/drive/root:/{path}:/children" response = self._request("GET", endpoint) if response.status_code == 404: return [] response.raise_for_status() return response.json().get("value", []) def search_user_files( self, user_id_or_email: str, query: str, limit: int = 20 ) -> List[Dict]: """Search files in user's OneDrive.""" endpoint = f"/users/{user_id_or_email}/drive/root/search(q='{query}')" response = self._request("GET", endpoint) response.raise_for_status() return response.json().get("value", [])[:limit] # ======================================== # Outlook Mail Operations # ======================================== def list_recent_emails( self, user_id_or_email: str, limit: int = 20, folder: str = "inbox" ) -> List[Dict]: """List recent emails from a user's mailbox.""" params = { "$top": limit, "$select": "id,subject,from,receivedDateTime,bodyPreview,hasAttachments", "$orderby": "receivedDateTime desc" } endpoint = f"/users/{user_id_or_email}/mailFolders/{folder}/messages" response = self._request("GET", endpoint, params=params) response.raise_for_status() return response.json().get("value", []) def search_emails( self, query: str, user_id_or_email: str = None, limit: int = 20 ) -> List[Dict]: """ Search emails using KQL syntax. Args: query: Search query (KQL syntax) user_id_or_email: User to search (None = all accessible) limit: Max results """ params = { "$top": limit, "$search": f'"{query}"', "$select": "id,subject,from,receivedDateTime,bodyPreview" } if user_id_or_email: endpoint = f"/users/{user_id_or_email}/messages" else: # Would need to iterate users or use search API # For now, require user specification raise ValueError("user_id_or_email required for email search") response = self._request("GET", endpoint, params=params) response.raise_for_status() return response.json().get("value", []) def get_email( self, user_id_or_email: str, message_id: str, include_body: bool = True ) -> Dict: """Get full email by ID.""" select = "id,subject,from,toRecipients,ccRecipients,receivedDateTime,hasAttachments" if include_body: select += ",body" params = {"$select": select} endpoint = f"/users/{user_id_or_email}/messages/{message_id}" response = self._request("GET", endpoint, params=params) response.raise_for_status() return response.json() # ======================================== # Teams Operations # ======================================== def list_teams(self, limit: int = 50) -> List[Dict]: """List all teams the app can access.""" params = {"$top": limit} response = self._request("GET", "/groups", params={ "$filter": "resourceProvisioningOptions/Any(x:x eq 'Team')", "$top": limit, "$select": "id,displayName,description,mail" }) response.raise_for_status() return response.json().get("value", []) def get_team(self, team_id: str) -> Dict: """Get team details.""" response = self._request("GET", f"/teams/{team_id}") response.raise_for_status() return response.json() def list_channels(self, team_id: str) -> List[Dict]: """List channels in a team.""" response = self._request("GET", f"/teams/{team_id}/channels") response.raise_for_status() return response.json().get("value", []) def get_channel_messages( self, team_id: str, channel_id: str, limit: int = 50 ) -> List[Dict]: """Get recent messages from a channel.""" params = {"$top": limit} endpoint = f"/teams/{team_id}/channels/{channel_id}/messages" response = self._request("GET", endpoint, params=params) response.raise_for_status() return response.json().get("value", []) def search_channel_messages( self, team_id: str, channel_id: str, query: str, limit: int = 20 ) -> List[Dict]: """ Search messages in a channel. Note: Graph API search for channel messages is limited. This fetches messages and filters client-side. """ messages = self.get_channel_messages(team_id, channel_id, limit=100) query_lower = query.lower() matching = [] for msg in messages: body = msg.get("body", {}).get("content", "").lower() subject = msg.get("subject", "").lower() if query_lower in body or query_lower in subject: matching.append(msg) if len(matching) >= limit: break return matching # ======================================== # Calendar Operations # ======================================== def list_events( self, user_id_or_email: str, days_ahead: int = 7, days_back: int = 0 ) -> List[Dict]: """List calendar events for a user.""" now = datetime.utcnow() start = (now - timedelta(days=days_back)).isoformat() + "Z" end = (now + timedelta(days=days_ahead)).isoformat() + "Z" params = { "startDateTime": start, "endDateTime": end, "$select": "id,subject,start,end,location,organizer,attendees,bodyPreview", "$orderby": "start/dateTime" } endpoint = f"/users/{user_id_or_email}/calendarView" response = self._request("GET", endpoint, params=params) response.raise_for_status() return response.json().get("value", []) def search_events( self, user_id_or_email: str, query: str, limit: int = 20 ) -> List[Dict]: """Search calendar events by subject.""" params = { "$filter": f"contains(subject,'{query}')", "$top": limit, "$select": "id,subject,start,end,location,organizer", "$orderby": "start/dateTime desc" } endpoint = f"/users/{user_id_or_email}/events" response = self._request("GET", endpoint, params=params) response.raise_for_status() return response.json().get("value", []) def get_event(self, user_id_or_email: str, event_id: str) -> Dict: """Get full event details.""" endpoint = f"/users/{user_id_or_email}/events/{event_id}" response = self._request("GET", endpoint) response.raise_for_status() return response.json() # Singleton instance _client: Optional[GraphClient] = None def get_client() -> GraphClient: """Get or create the Graph client singleton.""" global _client if _client is None: _client = GraphClient() return _client

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/eesb99/msgraph-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server