"""
Test script for search_users MCP tool.
Tests the new /user/search endpoint with real Microsoft Graph API.
"""
import json
import httpx
import asyncio
from pathlib import Path
async def test_search_users():
"""Test the search_users MCP tool."""
# MCP server should be running on localhost:8001
mcp_url = "http://localhost:8001"
print("=" * 60)
print("TEST: Search Users MCP Tool")
print("=" * 60)
# Test 1: Health check
print("\n1. Health check...")
async with httpx.AsyncClient() as client:
try:
response = await client.get(f"{mcp_url}/health", timeout=5.0)
if response.status_code == 200:
print("✅ MCP Server is running")
else:
print(f"❌ Health check failed: {response.status_code}")
return
except Exception as e:
print(f"❌ Cannot connect to MCP server: {e}")
print(f" Make sure server is running: python -m uvicorn src.mcp_servers.microsoft.server:app --port 8001")
return
# Test 2: Search users
print("\n2. Searching users...")
query = input("Enter search query (e.g., 'Giovanni'): ").strip()
if not query:
print("❌ No query provided")
return
async with httpx.AsyncClient() as client:
try:
response = await client.get(
f"{mcp_url}/user/search",
params={"query": query, "max_results": 10},
timeout=10.0
)
if response.status_code == 200:
data = response.json()
print(f"\n✅ Search successful!")
print(f" Found {data['count']} users:\n")
for i, user in enumerate(data['users'], 1):
print(f" {i}. {user.get('displayName', 'N/A')}")
print(f" Email: {user.get('mail', 'N/A')}")
print(f" Job: {user.get('jobTitle', 'N/A')}")
print(f" Dept: {user.get('department', 'N/A')}")
print(f" Office: {user.get('officeLocation', 'N/A')}")
print()
else:
print(f"❌ Search failed: {response.status_code}")
print(f" Response: {response.text}")
except Exception as e:
print(f"❌ Search error: {e}")
print("=" * 60)
print("✅ Test completed!")
print("=" * 60)
if __name__ == "__main__":
asyncio.run(test_search_users())