auth.py•5.17 kB
import os
import sys
import msal
import pathlib as pl
from typing import NamedTuple
from dotenv import load_dotenv
load_dotenv()
CACHE_FILE = pl.Path.home() / ".microsoft_mcp_token_cache.json"
# Use .default scope which includes all permissions granted to the app in Azure AD
# This works better with token caching across different API calls
SCOPES = ["https://graph.microsoft.com/.default"]
class Account(NamedTuple):
username: str
account_id: str
def _read_cache() -> str | None:
try:
return CACHE_FILE.read_text()
except FileNotFoundError:
return None
def _write_cache(content: str) -> None:
CACHE_FILE.parent.mkdir(parents=True, exist_ok=True)
CACHE_FILE.write_text(content)
def get_app() -> msal.PublicClientApplication:
client_id = os.getenv("MICROSOFT_MCP_CLIENT_ID")
if not client_id:
raise ValueError("MICROSOFT_MCP_CLIENT_ID environment variable is required")
# Use 'common' for multi-tenant (personal, work, school accounts)
# Use 'organizations' for work/school accounts only
# Use 'consumers' for personal Microsoft accounts only
# Or use a specific tenant ID
tenant_id = os.getenv("MICROSOFT_MCP_TENANT_ID", "common")
authority = f"https://login.microsoftonline.com/{tenant_id}"
cache = msal.SerializableTokenCache()
cache_content = _read_cache()
if cache_content:
cache.deserialize(cache_content)
app = msal.PublicClientApplication(
client_id, authority=authority, token_cache=cache
)
return app
def get_token(account_id: str | None = None) -> str:
app = get_app()
accounts = app.get_accounts()
account = None
if account_id:
account = next(
(a for a in accounts if a["home_account_id"] == account_id), None
)
if not account:
raise Exception(
f"Account with ID {account_id} not found. Please authenticate first using the authenticate_account tool."
)
elif accounts:
account = accounts[0]
else:
raise Exception(
"No authenticated accounts found. Please authenticate first using the authenticate_account tool."
)
print(f"[DEBUG] Requesting token with scopes: {SCOPES}", file=sys.stderr)
print(f"[DEBUG] Account details: {account}", file=sys.stderr)
result = app.acquire_token_silent(SCOPES, account=account)
print(f"[DEBUG] Token result: {result is not None}, has error: {'error' in result if result else 'N/A'}", file=sys.stderr)
if not result:
# Token acquisition failed - need to re-authenticate
# Don't start an interactive flow here - raise an error instead
raise Exception(
f"Failed to acquire token for account {account.get('username', 'unknown')}. "
"The token may have expired. Please re-authenticate using the authenticate_account tool."
)
if "error" in result:
raise Exception(
f"Auth failed: {result.get('error_description', result['error'])}"
)
cache = app.token_cache
if isinstance(cache, msal.SerializableTokenCache) and cache.has_state_changed:
_write_cache(cache.serialize())
return result["access_token"]
def list_accounts() -> list[Account]:
app = get_app()
return [
Account(username=a["username"], account_id=a["home_account_id"])
for a in app.get_accounts()
]
def authenticate_new_account() -> Account | None:
"""Authenticate a new account interactively"""
app = get_app()
flow = app.initiate_device_flow(scopes=SCOPES)
if "user_code" not in flow:
raise Exception(
f"Failed to get device code: {flow.get('error_description', 'Unknown error')}"
)
print("\nTo authenticate:")
print(
f"1. Visit: {flow.get('verification_uri', flow.get('verification_url', 'https://microsoft.com/devicelogin'))}"
)
print(f"2. Enter code: {flow['user_code']}")
print("3. Sign in with your Microsoft account")
print("\nWaiting for authentication...")
result = app.acquire_token_by_device_flow(flow)
if "error" in result:
raise Exception(
f"Auth failed: {result.get('error_description', result['error'])}"
)
cache = app.token_cache
if isinstance(cache, msal.SerializableTokenCache) and cache.has_state_changed:
_write_cache(cache.serialize())
# Get the newly added account
accounts = app.get_accounts()
if accounts:
# Find the account that matches the token we just got
for account in accounts:
if (
account.get("username", "").lower()
== result.get("id_token_claims", {})
.get("preferred_username", "")
.lower()
):
return Account(
username=account["username"], account_id=account["home_account_id"]
)
# If exact match not found, return the last account
account = accounts[-1]
return Account(
username=account["username"], account_id=account["home_account_id"]
)
return None