Skip to main content
Glama
graph.py10 kB
import httpx import time from typing import Any, Iterator from .auth import get_token BASE_URL = "https://graph.microsoft.com/v1.0" # 15 x 320 KiB = 4,915,200 bytes UPLOAD_CHUNK_SIZE = 15 * 320 * 1024 _client = httpx.Client(timeout=30.0, follow_redirects=True) def request( method: str, path: str, account_id: str | None = None, params: dict[str, Any] | None = None, json: dict[str, Any] | None = None, data: bytes | None = None, max_retries: int = 3, ) -> dict[str, Any] | None: headers = { "Authorization": f"Bearer {get_token(account_id)}", } if method == "GET": if "$search" in (params or {}): headers["Prefer"] = 'outlook.body-content-type="text"' elif "body" in (params or {}).get("$select", ""): headers["Prefer"] = 'outlook.body-content-type="text"' else: headers["Content-Type"] = ( "application/json" if json else "application/octet-stream" ) if params and ( "$search" in params or "contains(" in params.get("$filter", "") or "/any(" in params.get("$filter", "") ): headers["ConsistencyLevel"] = "eventual" params.setdefault("$count", "true") retry_count = 0 while retry_count <= max_retries: try: response = _client.request( method=method, url=f"{BASE_URL}{path}", headers=headers, params=params, json=json, content=data, ) if response.status_code == 429: retry_after = int(response.headers.get("Retry-After", "5")) if retry_count < max_retries: time.sleep(min(retry_after, 60)) retry_count += 1 continue if response.status_code >= 500 and retry_count < max_retries: wait_time = (2**retry_count) * 1 time.sleep(wait_time) retry_count += 1 continue response.raise_for_status() if response.content: return response.json() return None except httpx.HTTPStatusError as e: if retry_count < max_retries and e.response.status_code >= 500: wait_time = (2**retry_count) * 1 time.sleep(wait_time) retry_count += 1 continue raise return None def request_paginated( path: str, account_id: str | None = None, params: dict[str, Any] | None = None, limit: int | None = None, ) -> Iterator[dict[str, Any]]: """Make paginated requests following @odata.nextLink""" items_returned = 0 next_link = None while True: if next_link: result = request("GET", next_link.replace(BASE_URL, ""), account_id) else: result = request("GET", path, account_id, params=params) if not result: break if "value" in result: for item in result["value"]: if limit and items_returned >= limit: return yield item items_returned += 1 next_link = result.get("@odata.nextLink") if not next_link: break def download_raw( path: str, account_id: str | None = None, max_retries: int = 3 ) -> bytes: headers = {"Authorization": f"Bearer {get_token(account_id)}"} retry_count = 0 while retry_count <= max_retries: try: response = _client.get(f"{BASE_URL}{path}", headers=headers) if response.status_code == 429: retry_after = int(response.headers.get("Retry-After", "5")) if retry_count < max_retries: time.sleep(min(retry_after, 60)) retry_count += 1 continue if response.status_code >= 500 and retry_count < max_retries: wait_time = (2**retry_count) * 1 time.sleep(wait_time) retry_count += 1 continue response.raise_for_status() return response.content except httpx.HTTPStatusError as e: if retry_count < max_retries and e.response.status_code >= 500: wait_time = (2**retry_count) * 1 time.sleep(wait_time) retry_count += 1 continue raise raise ValueError("Failed to download file after all retries") def _do_chunked_upload( upload_url: str, data: bytes, headers: dict[str, str], ) -> dict[str, Any]: """Internal helper for chunked uploads""" file_size = len(data) for i in range(0, file_size, UPLOAD_CHUNK_SIZE): chunk_start = i chunk_end = min(i + UPLOAD_CHUNK_SIZE, file_size) chunk = data[chunk_start:chunk_end] chunk_headers = headers.copy() chunk_headers["Content-Length"] = str(len(chunk)) chunk_headers["Content-Range"] = ( f"bytes {chunk_start}-{chunk_end - 1}/{file_size}" ) retry_count = 0 while retry_count <= 3: try: response = _client.put(upload_url, content=chunk, headers=chunk_headers) if response.status_code == 429: retry_after = int(response.headers.get("Retry-After", "5")) if retry_count < 3: time.sleep(min(retry_after, 60)) retry_count += 1 continue response.raise_for_status() if response.status_code in (200, 201): return response.json() break except httpx.HTTPStatusError as e: if retry_count < 3 and e.response.status_code >= 500: time.sleep((2**retry_count) * 1) retry_count += 1 continue raise raise ValueError("Upload completed but no final response received") def create_upload_session( path: str, account_id: str | None = None, item_properties: dict[str, Any] | None = None, ) -> dict[str, Any]: """Create an upload session for large files""" payload = {"item": item_properties or {}} result = request("POST", f"{path}/createUploadSession", account_id, json=payload) if not result: raise ValueError("Failed to create upload session") return result def upload_large_file( path: str, data: bytes, account_id: str | None = None, item_properties: dict[str, Any] | None = None, ) -> dict[str, Any]: """Upload a large file using upload sessions""" file_size = len(data) if file_size <= UPLOAD_CHUNK_SIZE: result = request("PUT", f"{path}/content", account_id, data=data) if not result: raise ValueError("Failed to upload file") return result session = create_upload_session(path, account_id, item_properties) upload_url = session["uploadUrl"] headers = {"Authorization": f"Bearer {get_token(account_id)}"} return _do_chunked_upload(upload_url, data, headers) def create_mail_upload_session( message_id: str, attachment_item: dict[str, Any], account_id: str | None = None, ) -> dict[str, Any]: """Create an upload session for large mail attachments""" result = request( "POST", f"/me/messages/{message_id}/attachments/createUploadSession", account_id, json={"AttachmentItem": attachment_item}, ) if not result: raise ValueError("Failed to create mail attachment upload session") return result def upload_large_mail_attachment( message_id: str, name: str, data: bytes, account_id: str | None = None, content_type: str = "application/octet-stream", ) -> dict[str, Any]: """Upload a large mail attachment using upload sessions""" file_size = len(data) attachment_item = { "attachmentType": "file", "name": name, "size": file_size, "contentType": content_type, } session = create_mail_upload_session(message_id, attachment_item, account_id) upload_url = session["uploadUrl"] headers = {"Authorization": f"Bearer {get_token(account_id)}"} return _do_chunked_upload(upload_url, data, headers) def search_query( query: str, entity_types: list[str], account_id: str | None = None, limit: int = 50, fields: list[str] | None = None, ) -> Iterator[dict[str, Any]]: """Use the modern /search/query API endpoint""" payload = { "requests": [ { "entityTypes": entity_types, "query": {"queryString": query}, "size": min(limit, 25), "from": 0, } ] } if fields: payload["requests"][0]["fields"] = fields items_returned = 0 while True: result = request("POST", "/search/query", account_id, json=payload) if not result or "value" not in result: break for response in result["value"]: if "hitsContainers" in response: for container in response["hitsContainers"]: if "hits" in container: for hit in container["hits"]: if limit and items_returned >= limit: return yield hit["resource"] items_returned += 1 if "@odata.nextLink" in result: break has_more = False for response in result.get("value", []): for container in response.get("hitsContainers", []): if container.get("moreResultsAvailable"): has_more = True break if not has_more: break payload["requests"][0]["from"] += payload["requests"][0]["size"]

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/purva-kashyap/microsoft-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server