"""
MCP Client for IRIS Bot Integration.
Provides a unified interface to interact with MCP Server v2 (area-based architecture).
Handles all HTTP communication, error management, and response parsing.
"""
import logging
import aiohttp
from typing import Dict, Any, Optional, List
from datetime import datetime
logger = logging.getLogger(__name__)
class MCPError(Exception):
"""Base exception for MCP client errors."""
def __init__(self, message: str, code: str = None, details: Dict = None):
super().__init__(message)
self.code = code
self.details = details or {}
class MCPClient:
"""
Client for Microsoft MCP Server v2.
Provides methods to interact with 6 area-based MCP tools:
- user_operations (5 actions)
- calendar_operations (7 actions)
- email_operations (10 actions)
- teams_operations (7 actions)
- file_operations (8 actions)
- bookings_operations (19 actions: 13 Microsoft Bookings + 6 Hybrid Booking)
"""
def __init__(self, base_url: str = "http://localhost:8001"):
"""
Initialize MCP Client.
Args:
base_url: Base URL of MCP Server v2 (default: http://localhost:8001)
"""
self.base_url = base_url.rstrip('/')
self.timeout = aiohttp.ClientTimeout(total=30)
logger.info(f"MCP Client initialized with base URL: {self.base_url}")
async def _call_mcp(
self,
area: str,
action: str,
params: Dict[str, Any]
) -> Dict[str, Any]:
"""
Internal method to call MCP endpoint.
Args:
area: Area name (user, calendar, email, teams, file)
action: Action name
params: Action parameters
Returns:
Response data dict
Raises:
MCPError: If request fails or API returns error
"""
url = f"{self.base_url}/mcp/{area}_operations"
payload = {
"action": action,
"params": params
}
try:
async with aiohttp.ClientSession(timeout=self.timeout) as session:
async with session.post(url, json=payload) as response:
response_data = await response.json()
# Handle HTTP error status codes
if response.status >= 400:
error_detail = response_data.get('detail', {})
if isinstance(error_detail, dict):
error_info = error_detail.get('error', {})
raise MCPError(
message=error_info.get('message', f"HTTP {response.status}"),
code=error_info.get('code', 'HTTP_ERROR'),
details=error_info.get('details', {})
)
else:
raise MCPError(
message=str(error_detail),
code='HTTP_ERROR'
)
# Check success flag
if not response_data.get('success'):
error_info = response_data.get('error', {})
raise MCPError(
message=error_info.get('message', 'Unknown error'),
code=error_info.get('code', 'UNKNOWN_ERROR'),
details=error_info.get('details', {})
)
return response_data.get('data')
except aiohttp.ClientError as e:
logger.error(f"MCP HTTP error: {e}")
raise MCPError(f"Failed to connect to MCP server: {e}", code='CONNECTION_ERROR')
except MCPError:
raise
except Exception as e:
logger.error(f"MCP unexpected error: {e}")
raise MCPError(f"Unexpected error: {e}", code='UNEXPECTED_ERROR')
# ========================================
# USER OPERATIONS (5 actions)
# ========================================
async def search_users(
self,
query: str,
max_results: int = 10
) -> List[Dict[str, Any]]:
"""
Search for users in the organization.
Args:
query: Search query (searches displayName, email, jobTitle)
max_results: Maximum number of results (default: 10)
Returns:
List of user objects with id, displayName, email, jobTitle
"""
result = await self._call_mcp('user', 'search', {
'query': query,
'max_results': max_results
})
# Extract users list from response
return result.get('users', []) if result else []
async def get_user(self, user_id: str) -> Dict[str, Any]:
"""
Get detailed information about a specific user.
Args:
user_id: User ID or email
Returns:
User object with full details
"""
return await self._call_mcp('user', 'get', {'user_id': user_id})
async def get_user_presence(self, user_id: str) -> Dict[str, Any]:
"""
Get user's presence status (available, busy, away, etc.).
Args:
user_id: User ID or email
Returns:
Presence object with availability and activity status
"""
return await self._call_mcp('user', 'get_presence', {'user_id': user_id})
async def get_authenticated_user(self) -> Dict[str, Any]:
"""
Get information about the authenticated user (token owner).
This is THE definitive method to determine who the organizer is.
The user who performed OAuth authentication IS the organizer.
Returns:
Dict with authenticated user information:
- user: User object with id, displayName, mail, etc.
- is_authenticated: True
- message: Confirmation message
Example:
user_info = await client.get_authenticated_user()
organizer_email = user_info['user']['mail']
organizer_name = user_info['user']['displayName']
"""
return await self._call_mcp('user', 'get_me', {})
# ========================================
# CALENDAR OPERATIONS (7 actions)
# ========================================
async def list_calendar_events(
self,
start_datetime: Optional[str] = None,
end_datetime: Optional[str] = None,
max_results: int = 50,
user_email: Optional[str] = None
) -> List[Dict[str, Any]]:
"""
List calendar events.
Args:
start_datetime: Start datetime in ISO format (default: now)
end_datetime: End datetime in ISO format (default: +7 days)
max_results: Maximum results (default: 50)
user_email: User email (default: authenticated user)
Returns:
List of event objects
"""
params = {'max_results': max_results}
if start_datetime:
params['start_datetime'] = start_datetime
if end_datetime:
params['end_datetime'] = end_datetime
if user_email:
params['user_email'] = user_email
result = await self._call_mcp('calendar', 'list', params)
# Extract events list from response
return result.get('events', []) if result else []
async def get_calendar_event(self, event_id: str) -> Dict[str, Any]:
"""
Get detailed information about a calendar event.
Args:
event_id: Event ID
Returns:
Event object with full details
"""
return await self._call_mcp('calendar', 'get', {'event_id': event_id})
async def create_calendar_event(
self,
subject: str,
start_datetime: str,
end_datetime: str,
attendees: Optional[List[str]] = None,
body: Optional[str] = None,
location: Optional[str] = None,
is_online_meeting: bool = False
) -> Dict[str, Any]:
"""
Create a new calendar event.
Args:
subject: Event title
start_datetime: Start datetime in ISO format
end_datetime: End datetime in ISO format
attendees: List of attendee emails
body: Event description
location: Physical location
is_online_meeting: Create Teams meeting link
Returns:
Created event object with ID and details
"""
params = {
'subject': subject,
'start_datetime': start_datetime,
'end_datetime': end_datetime,
'is_online_meeting': is_online_meeting
}
if attendees:
params['attendees'] = attendees
if body:
params['body'] = body
if location:
params['location'] = location
return await self._call_mcp('calendar', 'create', params)
async def find_meeting_times(
self,
attendees: List[str],
duration_minutes: int = 60,
start_datetime: Optional[str] = None,
end_datetime: Optional[str] = None,
max_results: int = 5
) -> List[Dict[str, Any]]:
"""
Find available meeting times for attendees.
Args:
attendees: List of attendee emails
duration_minutes: Meeting duration in minutes
start_datetime: Search start (default: now)
end_datetime: Search end (default: +7 days)
max_results: Max suggestions (default: 5)
Returns:
List of suggested time slots with availability scores
"""
params = {
'attendees': attendees,
'duration_minutes': duration_minutes,
'max_results': max_results
}
if start_datetime:
params['start_datetime'] = start_datetime
if end_datetime:
params['end_datetime'] = end_datetime
result = await self._call_mcp('calendar', 'find_times', params)
# Extract suggestions list from response
return result.get('suggestions', []) if result else []
async def delete_calendar_event(self, event_id: str) -> Dict[str, Any]:
"""
Delete a calendar event.
Args:
event_id: Event ID to delete
Returns:
Success confirmation
"""
return await self._call_mcp('calendar', 'delete', {'event_id': event_id})
# ========================================
# EMAIL OPERATIONS (10 actions)
# ========================================
async def list_emails(
self,
folder: str = "inbox",
max_results: int = 50,
filter_query: Optional[str] = None
) -> List[Dict[str, Any]]:
"""
List emails from a folder.
Args:
folder: Folder name (inbox, sent, drafts, etc.)
max_results: Maximum results (default: 50)
filter_query: OData filter query
Returns:
List of email objects
"""
params = {'folder': folder, 'max_results': max_results}
if filter_query:
params['filter'] = filter_query
return await self._call_mcp('email', 'list', params)
async def send_email(
self,
to: List[str],
subject: str,
body: str,
cc: Optional[List[str]] = None,
bcc: Optional[List[str]] = None,
body_type: str = "text"
) -> Dict[str, Any]:
"""
Send a new email.
Args:
to: List of recipient emails
subject: Email subject
body: Email body content
cc: CC recipients (optional)
bcc: BCC recipients (optional)
body_type: "text" or "html" (default: text)
Returns:
Success confirmation with message ID
"""
params = {
'to': to,
'subject': subject,
'body': body,
'body_type': body_type
}
if cc:
params['cc'] = cc
if bcc:
params['bcc'] = bcc
return await self._call_mcp('email', 'send', params)
# ========================================
# TEAMS OPERATIONS (7 actions)
# ========================================
async def send_teams_message(
self,
chat_id: str,
content: str,
content_type: str = "text"
) -> Dict[str, Any]:
"""
Send a message to a Teams chat or channel.
Args:
chat_id: Chat or channel ID
content: Message content
content_type: "text" or "html" (default: text)
Returns:
Sent message object
"""
return await self._call_mcp('teams', 'send_message', {
'chat_id': chat_id,
'content': content,
'content_type': content_type
})
async def list_teams_chats(
self,
max_results: int = 50
) -> List[Dict[str, Any]]:
"""
List user's Teams chats.
Args:
max_results: Maximum results (default: 50)
Returns:
List of chat objects
"""
return await self._call_mcp('teams', 'list_chats', {'max_results': max_results})
# ========================================
# FILE OPERATIONS (8 actions)
# ========================================
async def search_files(
self,
query: str,
max_results: int = 20
) -> List[Dict[str, Any]]:
"""
Search for files in OneDrive/SharePoint.
Args:
query: Search query
max_results: Maximum results (default: 20)
Returns:
List of file objects
"""
return await self._call_mcp('file', 'search', {
'query': query,
'max_results': max_results
})
async def share_file(
self,
file_id: str,
permission_type: str = "view",
scope: str = "organization"
) -> Dict[str, Any]:
"""
Create a sharing link for a file.
Args:
file_id: File ID
permission_type: "view" or "edit" (default: view)
scope: "anonymous", "organization" (default: organization)
Returns:
Sharing link object
"""
return await self._call_mcp('file', 'share', {
'file_id': file_id,
'permission_type': permission_type,
'scope': scope
})
# ========================================
# UTILITY METHODS
# ========================================
async def health_check(self) -> Dict[str, Any]:
"""
Check MCP server health status.
Returns:
Health status object
"""
url = f"{self.base_url}/health"
async with aiohttp.ClientSession(timeout=self.timeout) as session:
async with session.get(url) as response:
return await response.json()
async def get_mcp_info(self) -> Dict[str, Any]:
"""
Get MCP server info with all available operations.
Returns:
Server info with areas and actions
"""
url = f"{self.base_url}/mcp/info"
async with aiohttp.ClientSession(timeout=self.timeout) as session:
async with session.get(url) as response:
return await response.json()
# ========================================
# HYBRID BOOKING METHODS
# ========================================
async def create_booking_session(
self,
organizer_email: str,
organizer_name: str,
internal_attendees: List[Dict[str, str]],
external_email: str,
proposed_slots: List[Dict[str, str]],
external_name: Optional[str] = None,
meeting_subject: Optional[str] = None,
meeting_duration: int = 30
) -> Dict[str, Any]:
"""
Create hybrid booking session for external user.
Args:
organizer_email: Organizer's email address
organizer_name: Organizer's display name
internal_attendees: List of internal attendees [{"email": "...", "name": "..."}]
external_email: External attendee's email
proposed_slots: 3 available time slots [{"start": "...", "end": "..."}]
external_name: Optional external attendee's name
meeting_subject: Optional meeting subject
meeting_duration: Meeting duration in minutes (default: 30)
Returns:
Session object with session_id and booking_url
Example:
```python
session = await client.create_booking_session(
organizer_email="filippo.savarese@infocert.it",
organizer_name="Filippo Savarese",
internal_attendees=[
{"email": "giovanni.rossi@infocert.it", "name": "Giovanni Rossi"}
],
external_email="mario.rossi@acme.com",
external_name="Mario Rossi",
proposed_slots=[
{"start": "2025-10-16T14:00:00", "end": "2025-10-16T15:00:00"},
{"start": "2025-10-16T16:00:00", "end": "2025-10-16T17:00:00"},
{"start": "2025-10-17T10:00:00", "end": "2025-10-17T11:00:00"}
],
meeting_subject="Product Demo",
meeting_duration=60
)
# Returns: {"session_id": "...", "booking_url": "https://...", ...}
```
"""
return await self._call_mcp(
area="bookings",
action="create_hybrid_session",
params={
"organizer_email": organizer_email,
"organizer_name": organizer_name,
"internal_attendees": internal_attendees,
"external_email": external_email,
"proposed_slots": proposed_slots,
"external_name": external_name,
"meeting_subject": meeting_subject,
"meeting_duration": meeting_duration
}
)
async def send_booking_invitation(self, session_id: str) -> Dict[str, Any]:
"""
Send branded booking invitation email from organizer's Microsoft account.
Args:
session_id: Booking session ID
Returns:
Email sent status
Example:
```python
result = await client.send_booking_invitation("abc-123-def")
# Returns: {"status": "sent", "recipient": "...", "session_id": "..."}
```
"""
return await self._call_mcp(
area="bookings",
action="send_hybrid_invitation",
params={"session_id": session_id}
)
async def get_booking_session(self, session_id: str) -> Dict[str, Any]:
"""
Get booking session details with expiry check.
Args:
session_id: Booking session ID
Returns:
Session object with all details
Example:
```python
session = await client.get_booking_session("abc-123-def")
# Returns: {"id": "...", "status": "pending", "organizer_email": "...", ...}
```
"""
return await self._call_mcp(
area="bookings",
action="get_hybrid_session",
params={"session_id": session_id}
)
async def check_slot_availability(
self,
session_id: str,
slot_index: int
) -> Dict[str, Any]:
"""
Check if specific slot is still available for all attendees.
Args:
session_id: Booking session ID
slot_index: Index of slot to check (0-2)
Returns:
Availability status and slot details
Example:
```python
result = await client.check_slot_availability("abc-123-def", 0)
# Returns: {"available": true, "slot_index": 0, "slot": {...}, "confidence": 100}
```
"""
return await self._call_mcp(
area="bookings",
action="check_hybrid_slot",
params={
"session_id": session_id,
"slot_index": slot_index
}
)
async def confirm_booking(
self,
session_id: str,
selected_slot_index: int
) -> Dict[str, Any]:
"""
Confirm booking and create calendar event for all attendees.
Args:
session_id: Booking session ID
selected_slot_index: Index of selected slot (0-2)
Returns:
Created event details with event_id and meeting links
Example:
```python
result = await client.confirm_booking("abc-123-def", 1)
# Returns: {
# "status": "confirmed",
# "event_id": "...",
# "selected_slot": {...},
# "web_link": "...",
# "online_meeting_url": "..."
# }
```
"""
return await self._call_mcp(
area="bookings",
action="confirm_hybrid_booking",
params={
"session_id": session_id,
"selected_slot_index": selected_slot_index
}
)
async def find_new_booking_slots(self, session_id: str) -> Dict[str, Any]:
"""
Find new available slots if original ones are taken.
Args:
session_id: Booking session ID
Returns:
New list of 3 available slots
Example:
```python
result = await client.find_new_booking_slots("abc-123-def")
# Returns: {"new_slots": [{...}, {...}, {...}], "session_id": "..."}
```
"""
return await self._call_mcp(
area="bookings",
action="find_new_hybrid_slots",
params={"session_id": session_id}
)