#!/usr/bin/env python3
"""
Microsoft Graph MCP Server
Provides read-only access to M365 suite via code execution pattern:
- SharePoint: sites, files, search
- OneDrive: user files
- Outlook: emails, search
- Teams: channels, messages
- Calendar: events
"""
import asyncio
import logging
import sys
import os
# Add src to path
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
from mcp.server import Server
from mcp.types import Tool, TextContent
import mcp.server.stdio
from src.sandbox import execute_code
# Configure logging
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
stream=sys.stderr
)
logger = logging.getLogger("msgraph-mcp")
# Create MCP server
app = Server("msgraph-mcp")
@app.list_tools()
async def list_tools() -> list[Tool]:
"""Return available tools."""
return [
Tool(
name="execute_code",
description=(
"Execute Python code with Microsoft Graph API access for M365 queries. "
"BEST FOR: Searching SharePoint files, OneDrive, Outlook emails, Teams messages, Calendar events. "
"USE WHEN: Need to query UNO Nutrition M365 tenant content. "
"NOT FOR: Write operations (read-only access).\n\n"
"Available via 'msgraph' module:\n\n"
"CONNECTION:\n"
" msgraph.test_connection() - Test API connectivity\n"
" msgraph.list_users(query=None, limit=50) - List/search users\n"
" msgraph.get_user(user_email) - Get user details\n\n"
"SHAREPOINT:\n"
" msgraph.search_sites(query, limit=10) - Search SharePoint sites\n"
" msgraph.get_site(site_id) - Get site details\n"
" msgraph.list_site_contents(site_id, path='/') - List files/folders\n"
" msgraph.search_files(query, site_id=None, limit=20) - Search files\n"
" msgraph.get_file_metadata(site_id, item_id) - File metadata\n"
" msgraph.get_file_content(site_id, item_id) - File content (text only)\n\n"
"ONEDRIVE:\n"
" msgraph.list_user_files(user_email, path='/') - User's OneDrive\n"
" msgraph.search_user_files(user_email, query, limit=20) - Search OneDrive\n\n"
"OUTLOOK:\n"
" msgraph.search_emails(query, user_email, limit=20) - Search emails\n"
" msgraph.list_recent_emails(user_email, limit=20, folder='inbox') - Recent emails\n"
" msgraph.get_email(user_email, message_id) - Full email content\n\n"
"TEAMS:\n"
" msgraph.list_teams(limit=50) - List accessible teams\n"
" msgraph.list_channels(team_id) - Channels in a team\n"
" msgraph.get_channel_messages(team_id, channel_id, limit=50) - Messages\n"
" msgraph.search_channel_messages(team_id, channel_id, query) - Search\n\n"
"CALENDAR:\n"
" msgraph.list_events(user_email, days_ahead=7) - Upcoming events\n"
" msgraph.search_events(user_email, query, limit=20) - Search events\n"
" msgraph.get_event(user_email, event_id) - Event details\n\n"
"Allowed imports: msgraph, json, re, datetime, timedelta\n\n"
"Example:\n"
" result = msgraph.search_files('formulation xlsx', limit=10)\n"
" print(json.dumps(result, indent=2))"
),
inputSchema={
"type": "object",
"properties": {
"code": {
"type": "string",
"description": (
"Python code to execute. Should set 'result' variable "
"or use print() for output. Access M365 via 'msgraph' module."
)
},
"timeout": {
"type": "integer",
"default": 30,
"description": "Execution timeout in seconds (max 60)",
"minimum": 5,
"maximum": 60
}
},
"required": ["code"]
}
)
]
@app.call_tool()
async def call_tool(name: str, arguments: dict) -> list[TextContent]:
"""Handle tool calls."""
try:
if name == "execute_code":
code = arguments.get("code", "")
timeout = min(arguments.get("timeout", 30), 60)
if not code.strip():
return [TextContent(
type="text",
text='{"error": "No code provided"}'
)]
# Execute in sandbox
result = execute_code(code, timeout=timeout)
# Format response
import json
return [TextContent(
type="text",
text=json.dumps(result, indent=2, default=str)
)]
else:
return [TextContent(
type="text",
text=f'{{"error": "Unknown tool: {name}"}}'
)]
except Exception as e:
logger.exception(f"Error in tool {name}")
return [TextContent(
type="text",
text=f'{{"error": "{str(e)}"}}'
)]
async def main():
"""Run the MCP server."""
logger.info("Starting Microsoft Graph MCP server...")
async with mcp.server.stdio.stdio_server() as (read_stream, write_stream):
await app.run(
read_stream,
write_stream,
app.create_initialization_options()
)
if __name__ == "__main__":
asyncio.run(main())