Skip to main content
Glama

MCP Microsoft Teams Server

by InditexTech
Apache 2.0
318
  • Linux
  • Apple
teams.py16.6 kB
# SPDX-FileCopyrightText: 2025 INDUSTRIA DE DISEÑO TEXTIL, S.A. (INDITEX, S.A.) # SPDX-License-Identifier: Apache-2.0 import logging from botbuilder.core import BotAdapter, TurnContext from botbuilder.core.teams import TeamsInfo from botbuilder.integration.aiohttp import CloudAdapter from botbuilder.schema import ( Activity, ActivityTypes, ChannelAccount, ConversationAccount, ConversationReference, Mention, TextFormatTypes, ) from botbuilder.schema.teams import TeamsChannelAccount from botframework.connector.aio.operations_async import ConversationsOperations from kiota_abstractions.base_request_configuration import RequestConfiguration from msgraph.generated.models.chat_message import ChatMessage from msgraph.generated.teams.item.channels.item.messages.item.chat_message_item_request_builder import ( ChatMessageItemRequestBuilder, ) from msgraph.generated.teams.item.channels.item.messages.item.replies.replies_request_builder import ( RepliesRequestBuilder, ) from msgraph.generated.teams.item.channels.item.messages.messages_request_builder import ( MessagesRequestBuilder, ) from msgraph.graph_service_client import GraphServiceClient from pydantic import BaseModel, Field LOGGER = logging.getLogger(__name__) class TeamsThread(BaseModel): thread_id: str = Field( description="Thread ID as a string in the format '1743086901347'" ) title: str = Field(description="Message title") content: str = Field(description="Message content") class TeamsMessage(BaseModel): thread_id: str = Field( description="Thread ID as a string in the format '1743086901347'" ) message_id: str = Field(description="Message ID") content: str = Field(description="Message content") class TeamsMember(BaseModel): name: str = Field( description="Member name used in mentions and user information cards" ) email: str = Field(description="Member email") class PagedTeamsMessages(BaseModel): cursor: str | None = Field( description="Cursor to retrieve the next page of messages." ) limit: int = Field(description="Page limit, maximum number of items to retrieve") total: int = Field(description="Total items available for retrieval") items: list[TeamsMessage] = Field(description="List of channel messages or threads") class TeamsClient: def __init__( self, adapter: CloudAdapter, graph_client: GraphServiceClient, teams_app_id: str, team_id: str, teams_channel_id: str, ): self.adapter = adapter self.graph_client = graph_client self.teams_app_id = teams_app_id self.team_id = team_id self.teams_channel_id = teams_channel_id self.service_url = None self.adapter.on_turn_error = self.on_turn_error def get_team_id(self): return self.team_id @staticmethod async def on_turn_error(context: TurnContext, error: Exception): LOGGER.error(f"Error {str(error)}") # await context.send_activity("An error occurred in the bot, please try again later") def _create_conversation_reference(self) -> ConversationReference: service_url = "https://smba.trafficmanager.net/emea/" if self.service_url is not None: service_url = self.service_url return ConversationReference( bot=TeamsChannelAccount(id=self.teams_app_id, name="MCP Bot"), channel_id=self.teams_channel_id, service_url=service_url, conversation=ConversationAccount( id=self.teams_channel_id, is_group=True, conversation_type="channel", name="Teams channel", ), ) async def _initialize(self): if not self.service_url: def context_callback(context: TurnContext): self.service_url = context.activity.service_url await self.adapter.continue_conversation( bot_app_id=self.teams_app_id, reference=self._create_conversation_reference(), callback=context_callback, ) async def start_thread( self, title: str, content: str, member_name: str | None = None ) -> TeamsThread: """Start a new thread in a channel. Args: title: Thread title content: Initial thread content member_name: Member name to mention in content Returns: Created thread details including ID """ try: await self._initialize() result = TeamsThread(title=title, content=content, thread_id="") async def start_thread_callback(context: TurnContext): mention_member = None if member_name is not None: members = await TeamsInfo.get_team_members(context, self.team_id) for member in members: if member.name == member_name: mention_member = member mentions = [] if mention_member is not None: result.content = ( f"# **{title}**\n<at>{mention_member.name}</at> {content}" ) mention = Mention( text=f"<at>{mention_member.name}</at>", type="mention", mentioned=ChannelAccount( id=mention_member.id, name=mention_member.name ), ) mentions.append(mention) response = await context.send_activity( activity_or_text=Activity( type=ActivityTypes.message, topic_name=title, text=result.content, text_format=TextFormatTypes.markdown, entities=mentions, ) ) if response is not None: result.thread_id = response.id await self.adapter.continue_conversation( bot_app_id=self.teams_app_id, reference=self._create_conversation_reference(), callback=start_thread_callback, ) return result except Exception as e: LOGGER.error(f"Error creating thread: {str(e)}") raise @staticmethod def _get_conversation_operations(context: TurnContext) -> ConversationsOperations: # Hack to get the connector client and reply to an existing activity connector_client = context.turn_state[BotAdapter.BOT_CONNECTOR_CLIENT_KEY] return connector_client.conversations # pyright: ignore async def update_thread( self, thread_id: str, content: str, member_name: str | None = None ) -> TeamsMessage: """Add a message to an existing thread, mentioning a user optionally. Args: thread_id: Thread ID to update content: Message content to add member_name: Member name to mention (optional) Returns: Updated thread details """ try: await self._initialize() result = TeamsMessage(thread_id=thread_id, content=content, message_id="") async def update_thread_callback(context: TurnContext): mention_member = None if member_name is not None: members = await TeamsInfo.get_team_members(context, self.team_id) for member in members: if member.name == member_name: mention_member = member mentions = [] if mention_member is not None: result.content = f"<at>{mention_member.name}</at> {content}" mention = Mention( text=f"<at>{mention_member.name}</at>", type="mention", mentioned=ChannelAccount( id=mention_member.id, name=mention_member.name ), ) mentions.append(mention) reply = Activity( type=ActivityTypes.message, text=result.content, from_property=TeamsChannelAccount( id=self.teams_app_id, name="MCP Bot" ), conversation=ConversationAccount(id=thread_id), entities=mentions, ) # # Hack to get the connector client and reply to an existing activity # conversations = TeamsClient._get_conversation_operations(context) # # Hack to reply to conversation https://github.com/microsoft/botframework-sdk/issues/6626 # conversation_id = ( f"{context.activity.conversation.id};messageid={thread_id}" # pyright: ignore ) response = await conversations.send_to_conversation( conversation_id=conversation_id, activity=reply ) if response is not None: result.message_id = response.id # pyright: ignore await self.adapter.continue_conversation( bot_app_id=self.teams_app_id, reference=self._create_conversation_reference(), callback=update_thread_callback, ) return result except Exception as e: LOGGER.error(f"Error updating thread: {str(e)}") raise async def get_member_by_id(self, member_id: str) -> TeamsMember: try: await self._initialize() result = TeamsMember(name="", email="") async def get_member_by_id_callback(context: TurnContext): member = await TeamsInfo.get_team_member( context, self.team_id, member_id ) result.name = member.name result.email = member.email await self.adapter.continue_conversation( bot_app_id=self.teams_app_id, reference=self._create_conversation_reference(), callback=get_member_by_id_callback, ) return result except Exception as e: LOGGER.error(f"Error updating thread: {str(e)}") raise async def read_threads( self, limit: int = 50, cursor: str | None = None ) -> PagedTeamsMessages: """Read all threads in configured teams channel. Args: cursor: The pagination cursor. limit: The pagination page size Returns: Paged team channel messages containing """ try: query = MessagesRequestBuilder.MessagesRequestBuilderGetQueryParameters( top=limit ) request = RequestConfiguration(query_parameters=query) if cursor is not None: response = ( await self.graph_client.teams.by_team_id(self.team_id) .channels.by_channel_id(self.teams_channel_id) .messages.with_url(cursor) .get(request_configuration=request) ) else: response = ( await self.graph_client.teams.by_team_id(self.team_id) .channels.by_channel_id(self.teams_channel_id) .messages.get(request_configuration=request) ) result = PagedTeamsMessages( cursor=response.odata_next_link, # pyright: ignore limit=limit, total=response.odata_count, # pyright: ignore items=[], ) if response.value is not None: # pyright: ignore for message in response.value: # pyright: ignore result.items.append( TeamsMessage( message_id=message.id, # pyright: ignore content=message.body.content, # pyright: ignore thread_id=message.id, # pyright: ignore ) ) return result except Exception as e: LOGGER.error(f"Error reading thread: {str(e)}") raise async def read_thread_replies( self, thread_id: str, limit: int = 50, cursor: str | None = None ) -> PagedTeamsMessages: """Read all replies in a thread. Args: thread_id: Thread ID to read cursor: The pagination cursor limit: The pagination page size Returns: List of thread messages """ try: params = RepliesRequestBuilder.RepliesRequestBuilderGetQueryParameters( top=limit ) request = RequestConfiguration(query_parameters=params) if cursor is not None: replies = ( await self.graph_client.teams.by_team_id(self.team_id) .channels.by_channel_id(self.teams_channel_id) .messages.by_chat_message_id(thread_id) .replies.with_url(cursor) .get(request_configuration=request) ) else: replies = ( await self.graph_client.teams.by_team_id(self.team_id) .channels.by_channel_id(self.teams_channel_id) .messages.by_chat_message_id(thread_id) .replies.get(request_configuration=request) ) result = PagedTeamsMessages( cursor=cursor, limit=limit, total=replies.odata_count, # pyright: ignore items=[], ) if replies is not None and replies.value is not None: for reply in replies.value: result.items.append( TeamsMessage( message_id=reply.id, # pyright: ignore content=reply.body.content, # pyright: ignore thread_id=reply.reply_to_id, # pyright: ignore ) ) return result except Exception as e: LOGGER.error(f"Error reading thread: {str(e)}") raise async def read_message(self, message_id: str) -> ChatMessage | None: try: query = ChatMessageItemRequestBuilder.ChatMessageItemRequestBuilderGetQueryParameters() request = RequestConfiguration(query_parameters=query) response = ( await self.graph_client.teams.by_team_id(self.team_id) .channels.by_channel_id(self.teams_channel_id) .messages.by_chat_message_id(chat_message_id=message_id) .get(request_configuration=request) ) return response except Exception as e: LOGGER.error(f"Error reading thread: {str(e)}") raise async def list_members(self) -> list[TeamsMember]: """List all members in the configured team. Returns: List of team member details """ try: await self._initialize() result = [] async def list_members_callback(context: TurnContext): members = await TeamsInfo.get_team_members(context, self.team_id) for member in members: result.append(TeamsMember(name=member.name, email=member.email)) await self.adapter.continue_conversation( bot_app_id=self.teams_app_id, reference=self._create_conversation_reference(), callback=list_members_callback, ) return result except Exception as e: LOGGER.error(f"Error listing members: {str(e)}") raise async def get_member_by_name(self, name: str) -> TeamsMember | None: members = await self.list_members() for member in members: if member.name == name: return member

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/InditexTech/mcp-teams-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server