Skip to main content
Glama

Google Chat MCP Sever (Extendable to Teams,Slack.)

messages.pyโ€ข21.4 kB
import logging from typing import List, Dict, Optional from datetime import datetime, timezone, timedelta from googleapiclient.discovery import build from src.providers.google_chat.api.auth import get_credentials, get_user_info_by_id from src.providers.google_chat.utils import create_date_filter # Set up logging logger = logging.getLogger("messages") async def list_space_messages(space_name: str, include_sender_info: bool = False, page_size: int = 25, page_token: Optional[str] = None, filter_str: Optional[str] = None, order_by: Optional[str] = None, show_deleted: bool = False, days_window: int = 3, offset: int = 0) -> Dict: """Lists messages from a specific Google Chat space with optional time filtering. Args: space_name: The name/identifier of the space to fetch messages from include_sender_info: Whether to include sender info in the results (default: False) page_size: Maximum number of messages to return (default: 25) page_token: Optional page token for pagination filter_str: Optional filter string in the Google Chat API format (see API reference for format) order_by: How to order the messages, e.g., "createTime desc" show_deleted: Whether to include deleted messages (default: False) days_window: Number of days to look back (default: 3) offset: Number of days to offset the end date from today (default: 0) Returns: Dictionary containing messages and other metadata """ # Validate parameters if days_window <= 0: raise ValueError("days_window must be positive") if offset < 0: raise ValueError("offset cannot be negative") # Calculate date range today = datetime.now(timezone.utc) # Calculate end date by subtracting offset days from today end_date = today - timedelta(days=offset) end_date_str = end_date.strftime('%Y-%m-%d') # Calculate start date by going back days_window days from the end date start_date = end_date - timedelta(days=days_window) start_date_str = start_date.strftime('%Y-%m-%d') logger.info(f"Using calculated date range: {start_date_str} to {end_date_str} " + f"(window: {days_window} days, offset: {offset} days)") try: # Get credentials creds = get_credentials() service = build('chat', 'v1', credentials=creds) # Create date filter try: # Format dates for the Google Chat API date_filter = create_date_filter(start_date_str, end_date_str) logger.info(f"Using date filter: {date_filter}") # If we already have a filter, append the date filter if filter_str: filter_str = f"{filter_str} AND ({date_filter})" else: filter_str = date_filter except ValueError as e: logger.error(f"Invalid date format: {str(e)}") raise ValueError(f"Invalid date format: {str(e)}") # Prepare request parameters request_params = {'parent': space_name, 'pageSize': page_size} # No longer enforcing 1000 message limit # Add optional parameters if provided if filter_str: request_params['filter'] = filter_str if page_token: request_params['pageToken'] = page_token if order_by: request_params['orderBy'] = order_by else: # Default to newest messages first if not specified request_params['orderBy'] = 'createTime desc' if show_deleted: request_params['showDeleted'] = show_deleted # Make API request logger.info(f"Making API request with params: {request_params}") response = service.spaces().messages().list(**request_params).execute() # Extract messages and next page token messages = response.get('messages', []) next_page_token = response.get('nextPageToken') # Log timestamp details of retrieved messages for debugging date filter issues if start_date_str and len(messages) > 0: logger.info(f"Date filtering: First message createTime: {messages[0].get('createTime', 'unknown')}") logger.info(f"Date filtering: Looking for messages on/after: {start_date_str}") if end_date_str: logger.info(f"Date filtering: Looking for messages on/before: {end_date_str}") elif start_date_str and len(messages) == 0: logger.warning(f"Date filtering: No messages found for filter: {filter_str}") logger.warning(f"API response keys: {list(response.keys())}") logger.warning(f"API response snippet: {str(response)[:200]}...") logger.info(f"Retrieved {len(messages)} messages from space {space_name}") # Add sender information if requested if include_sender_info: for message in messages: if "sender" in message and "name" in message["sender"]: sender_id = message["sender"]["name"] try: sender_info = await get_user_info_by_id(sender_id) message["sender_info"] = sender_info except Exception: # If we fail to get sender info, continue with basic info message["sender_info"] = { "id": sender_id, "display_name": f"User {sender_id.split('/')[-1]}" } return { 'messages': messages, 'nextPageToken': next_page_token } except Exception as e: if isinstance(e, ValueError): raise logger.error(f"Failed to list messages in space: {str(e)}") raise Exception(f"Failed to list messages in space: {str(e)}") async def create_message(space_name: str, text: str, cards_v2=None) -> Dict: """Creates a new message in a Google Chat space. Args: space_name: The name/identifier of the space to send the message to text: Text content of the message cards_v2: Optional card content for the message (list of card objects) Returns: The created message object Raises: Exception: If authentication fails or message creation fails """ try: creds = get_credentials() if not creds: raise Exception("No valid credentials found. Please authenticate first.") service = build('chat', 'v1', credentials=creds) # Build message body message_body = {"text": text} if cards_v2: message_body["cardsV2"] = cards_v2 # Make API request response = service.spaces().messages().create( parent=space_name, body=message_body ).execute() return response except Exception as e: raise Exception(f"Failed to create message: {str(e)}") async def update_message(message_name: str, text: str = None, cards_v2=None) -> Dict: """Updates an existing message in a Google Chat space. Args: message_name: The resource name of the message to update (spaces/*/messages/*) text: New text content for the message (optional) cards_v2: New card content for the message (optional) Returns: The updated message object Raises: Exception: If authentication fails or message update fails """ try: creds = get_credentials() if not creds: raise Exception("No valid credentials found. Please authenticate first.") service = build('chat', 'v1', credentials=creds) # Build message and update mask message_body = {"name": message_name} update_mask = [] if text is not None: message_body["text"] = text update_mask.append("text") if cards_v2 is not None: message_body["cardsV2"] = cards_v2 update_mask.append("cardsV2") if not update_mask: raise ValueError("At least one of text or cards_v2 must be provided") # Make API request response = service.spaces().messages().patch( name=message_name, updateMask=','.join(update_mask), body=message_body ).execute() return response except ValueError: raise except Exception as e: raise Exception(f"Failed to update message: {str(e)}") async def batch_send_messages(messages: List[Dict]) -> Dict: """Send multiple messages in batch to different spaces. Args: messages: List of message objects, each containing: - space_name: The name/identifier of the space to send to - text: Text content of the message - thread_key: Optional thread key to reply to - cards_v2: Optional card content Returns: Dictionary with results for each message Raises: Exception: If authentication fails """ try: results = { "successful": [], "failed": [] } for idx, msg in enumerate(messages): space_name = msg.get("space_name") text = msg.get("text", "") thread_key = msg.get("thread_key") cards_v2 = msg.get("cards_v2") if not space_name: results["failed"].append({ "index": idx, "error": "Missing space_name" }) continue try: if thread_key: # Reply to thread response = await reply_to_thread(space_name, thread_key, text, cards_v2) else: # Create new message response = await create_message(space_name, text, cards_v2) results["successful"].append({ "index": idx, "message_name": response.get("name"), "space_name": space_name }) except Exception as e: results["failed"].append({ "index": idx, "space_name": space_name, "error": str(e) }) return results except Exception as e: raise Exception(f"Failed to batch send messages: {str(e)}") async def reply_to_thread(space_name: str, thread_key: str, text: str, cards_v2=None, file_path=None) -> Dict: """Replies to a thread in a Google Chat space. Args: space_name: The name/identifier of the space containing the thread thread_key: The thread key to reply to. Can be a simple ID, a threadKey, or a full thread name text: Text content of the reply cards_v2: Optional card content for the reply (list of card objects) file_path: Optional path to a file to attach to the reply. If provided, the file will be read and its contents included in the message. For text files, the content will be included directly. For binary files, a message indicating it's a binary file will be included. Returns: The created message object Raises: Exception: If authentication fails or message creation fails """ try: creds = get_credentials() if not creds: raise Exception("No valid credentials found. Please authenticate first.") service = build('chat', 'v1', credentials=creds) # If a file path is provided, read the file and include its contents in the message if file_path: from pathlib import Path # Validate file exists file_path = Path(file_path) if not file_path.exists(): raise FileNotFoundError(f"File not found: {file_path}") # Read file contents (limit to first 5000 characters) try: with open(file_path, 'r') as f: file_contents = f.read(5000) if len(file_contents) >= 5000: file_contents += "\n... [content truncated] ..." except UnicodeDecodeError: # Handle binary files file_contents = "[Binary file content not shown]" # Build message text full_message = "" if text: full_message += f"{text}\n\n" full_message += f"๐Ÿ“„ *File: {file_path.name}*\n" full_message += "```\n" full_message += file_contents full_message += "\n```" # Use the full message as the text text = full_message # Build message body message_body = { "text": text } # Try multiple approaches for thread identification to improve reliability # This uses a tiered approach to thread identification if thread_key.startswith("spaces/") and "/threads/" in thread_key: # Full thread name provided (spaces/*/threads/*) - use it directly message_body["thread"] = {"name": thread_key} elif thread_key.startswith("threads/"): # Thread key starts with "threads/" - extract the ID thread_id = thread_key.replace("threads/", "") message_body["thread"] = {"threadKey": thread_id} else: # Simple thread key or ID - try to use it directly message_body["thread"] = {"threadKey": thread_key} # Additionally try to find the original message to get its thread name try: # Try to get the message directly first direct_msg = None try: direct_msg = service.spaces().messages().get( name=f"{space_name}/messages/{thread_key}.{thread_key}" ).execute() except Exception: pass # If direct lookup failed, try finding from space messages if not direct_msg: space_messages = service.spaces().messages().list( parent=space_name, pageSize=100 ).execute().get('messages', []) # Look for messages with matching thread name or threadKey for msg in space_messages: if msg.get("name", "").endswith(thread_key): direct_msg = msg break if "thread" in msg and msg["thread"].get("name", "").endswith(thread_key): direct_msg = msg break # If we found a message, use its thread information if direct_msg and "thread" in direct_msg and "name" in direct_msg["thread"]: message_body["thread"] = {"name": direct_msg["thread"]["name"]} except Exception as e: # If thread lookup fails, continue with the simple threadKey approach print(f"Thread lookup failed: {str(e)}") if cards_v2: message_body["cardsV2"] = cards_v2 # Make API request with appropriate thread options response = service.spaces().messages().create( parent=space_name, messageReplyOption="REPLY_MESSAGE_FALLBACK_TO_NEW_THREAD", body=message_body ).execute() return response except Exception as e: raise Exception(f"Failed to reply to thread: {str(e)}") async def get_message(message_name: str, include_sender_info: bool = False) -> Dict: """Gets a specific message by its resource name. Args: message_name: The resource name of the message (spaces/*/messages/*) include_sender_info: Whether to include sender information in the returned message (default: False) Returns: The message object Raises: Exception: If authentication fails or message retrieval fails """ try: creds = get_credentials() if not creds: raise Exception("No valid credentials found. Please authenticate first.") service = build('chat', 'v1', credentials=creds) # Make API request message = service.spaces().messages().get(name=message_name).execute() # Add sender information if requested if include_sender_info and "sender" in message and "name" in message["sender"]: sender_id = message["sender"]["name"] try: sender_info = await get_user_info_by_id(sender_id) message["sender_info"] = sender_info except Exception: # If we fail to get sender info, continue with basic info message["sender_info"] = { "id": sender_id, "display_name": f"User {sender_id.split('/')[-1]}" } return message except Exception as e: raise Exception(f"Failed to get message: {str(e)}") async def delete_message(message_name: str) -> Dict: """Deletes a message by its resource name. Args: message_name: The resource name of the message (spaces/*/messages/*) Returns: Empty response on success Raises: Exception: If authentication fails or message deletion fails """ try: creds = get_credentials() if not creds: raise Exception("No valid credentials found. Please authenticate first.") service = build('chat', 'v1', credentials=creds) # Make API request response = service.spaces().messages().delete(name=message_name).execute() return response except Exception as e: raise Exception(f"Failed to delete message: {str(e)}") async def add_emoji_reaction(message_name: str, emoji: str) -> Dict: """Add an emoji reaction to a message. Args: message_name: The resource name of the message (spaces/*/messages/*) emoji: The emoji to add as a reaction (e.g. '๐Ÿ‘', 'โค๏ธ', etc.) Returns: Response with information about the added reaction Raises: Exception: If authentication fails or reaction fails """ try: creds = get_credentials() if not creds: raise Exception("No valid credentials found. Please authenticate first.") service = build('chat', 'v1', credentials=creds) if not message_name.startswith('spaces/'): raise ValueError("message_name must be a full resource name (spaces/*/messages/*)") # Add reaction reaction_body = { "emoji": { "unicode": emoji } } response = service.spaces().messages().reactions().create( parent=message_name, body=reaction_body ).execute() return response except Exception as e: raise Exception(f"Failed to add emoji reaction: {str(e)}") async def get_message_with_sender_info(message_name: str) -> Dict: """Gets a specific message by its resource name and adds sender information. Args: message_name: The resource name of the message (spaces/*/messages/*) Returns: The message object with additional sender information Raises: Exception: If authentication fails or message retrieval fails """ return await get_message(message_name, include_sender_info=True) async def list_messages_with_sender_info(space_name: str, limit: int = 10, page_token: Optional[str] = None, days_window: int = 3, offset: int = 0) -> Dict: """Lists messages from a specific Google Chat space with sender information. Args: space_name: The name/identifier of the space to fetch messages from limit: Maximum number of messages to return (default: 10) page_token: Optional page token for pagination days_window: Number of days to look back for messages (default: 3) This parameter controls the date range for message retrieval. For example, if days_window=3, messages from the last 3 days will be retrieved. offset: Number of days to offset the end date from today (default: 0). For example, if offset=3, the end date will be 3 days before today, and with days_window=3, messages from 6 to 3 days ago will be retrieved. Returns: Dictionary with 'messages' list (with sender info) and optional 'nextPageToken' Raises: Exception: If authentication fails or message retrieval fails """ result = await list_space_messages( space_name, include_sender_info=True, page_size=limit, page_token=page_token, order_by="createTime desc", # Default to newest first days_window=days_window, offset=offset ) return result

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/siva010928/multi-chat-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server