"""
Test MCP Client Integration with Bot.
This script tests the MCP client in isolation before full bot integration.
"""
import asyncio
import sys
from pathlib import Path
# Add src to path
sys.path.insert(0, str(Path(__file__).parent / 'src'))
from mcp_servers.mcp_client import MCPClient, MCPError
async def test_mcp_client():
"""Test MCP client basic functionality."""
print("=" * 60)
print("π§ͺ TEST MCP CLIENT INTEGRATION")
print("=" * 60)
# Initialize client
client = MCPClient(base_url="http://localhost:8001")
print(f"\nβ
MCP Client initialized: {client.base_url}")
# Test 1: Health Check
print("\n" + "β" * 60)
print("TEST 1: Health Check")
print("β" * 60)
try:
health = await client.health_check()
print(f"β
Health: {health.get('status')}")
print(f" Service: {health.get('service')}")
print(f" Version: {health.get('version')}")
except Exception as e:
print(f"β Health check failed: {e}")
return False
# Test 2: Get MCP Info
print("\n" + "β" * 60)
print("TEST 2: MCP Info")
print("β" * 60)
try:
info = await client.get_mcp_info()
areas = info.get('areas', {})
print(f"β
MCP has {len(areas)} areas:")
for area_name, area_info in areas.items():
actions_count = len(area_info.get('actions', []))
print(f" β’ {area_name}: {actions_count} actions")
except Exception as e:
print(f"β MCP info failed: {e}")
return False
# Test 3: Search Users
print("\n" + "β" * 60)
print("TEST 3: Search Users (Giovanni)")
print("β" * 60)
try:
results = await client.search_users(query="Giovanni", max_results=5)
if results:
print(f"β
Found {len(results)} users:")
for idx, user in enumerate(results[:3], 1):
name = user.get('displayName', 'N/A')
email = user.get('mail', user.get('userPrincipalName', 'N/A'))
print(f" {idx}. {name} ({email})")
else:
print("β οΈ No users found")
except MCPError as e:
print(f"β MCP Error: {e.code} - {e}")
print(f" Details: {e.details}")
except Exception as e:
print(f"β Unexpected error: {e}")
# Test 4: Search Users (Daniele)
print("\n" + "β" * 60)
print("TEST 4: Search Users (Daniele)")
print("β" * 60)
try:
results = await client.search_users(query="Daniele", max_results=5)
if results:
print(f"β
Found {len(results)} users:")
for idx, user in enumerate(results[:3], 1):
name = user.get('displayName', 'N/A')
email = user.get('mail', user.get('userPrincipalName', 'N/A'))
print(f" {idx}. {name} ({email})")
else:
print("β οΈ No users found")
except MCPError as e:
print(f"β MCP Error: {e.code} - {e}")
print(f" Details: {e.details}")
except Exception as e:
print(f"β Unexpected error: {e}")
# Test 5: List Calendar Events
print("\n" + "β" * 60)
print("TEST 5: List Calendar Events (next 7 days)")
print("β" * 60)
try:
events = await client.list_calendar_events(max_results=5)
if events:
print(f"β
Found {len(events)} events:")
for idx, event in enumerate(events[:3], 1):
subject = event.get('subject', 'No subject')
start = event.get('start', {}).get('dateTime', 'N/A')
print(f" {idx}. {subject}")
print(f" Start: {start}")
else:
print("β οΈ No events found")
except MCPError as e:
print(f"β MCP Error: {e.code} - {e}")
print(f" Details: {e.details}")
except Exception as e:
print(f"β Unexpected error: {e}")
# Test 6: Find Meeting Times
print("\n" + "β" * 60)
print("TEST 6: Find Meeting Times (Giovanni & Daniele)")
print("β" * 60)
try:
# Get Giovanni and Daniele emails from previous searches
giovanni_results = await client.search_users(query="Giovanni", max_results=1)
daniele_results = await client.search_users(query="Daniele", max_results=1)
if giovanni_results and daniele_results:
giovanni_email = giovanni_results[0].get('mail') or giovanni_results[0].get('userPrincipalName')
daniele_email = daniele_results[0].get('mail') or daniele_results[0].get('userPrincipalName')
print(f" Checking availability for:")
print(f" β’ {giovanni_email}")
print(f" β’ {daniele_email}")
suggestions = await client.find_meeting_times(
attendees=[giovanni_email, daniele_email],
duration_minutes=30,
max_results=3
)
print(f"\nβ
Found {len(suggestions)} available slots:")
for idx, slot in enumerate(suggestions[:3], 1):
start = slot.get('start', 'N/A')
end = slot.get('end', 'N/A')
score = slot.get('confidence', 0)
print(f" {idx}. {start} - {end} (score: {score})")
else:
print("β οΈ Could not find Giovanni or Daniele")
except MCPError as e:
print(f"β MCP Error: {e.code} - {e}")
print(f" Details: {e.details}")
except Exception as e:
print(f"β Unexpected error: {e}")
print("\n" + "=" * 60)
print("β
MCP CLIENT INTEGRATION TEST COMPLETE")
print("=" * 60)
return True
if __name__ == "__main__":
print("\nπ Starting MCP Integration Tests...\n")
success = asyncio.run(test_mcp_client())
if success:
print("\nβ
All tests completed!")
sys.exit(0)
else:
print("\nβ Tests failed!")
sys.exit(1)