Skip to main content
Glama

Telegram MCP Server

by chigwell
import os import sys import json import time import asyncio import sqlite3 import logging import mimetypes from datetime import datetime, timedelta from typing import List, Dict, Optional, Union, Any # Third-party libraries import nest_asyncio from dotenv import load_dotenv from mcp.server.fastmcp import FastMCP from telethon import TelegramClient, functions, utils from telethon.sessions import StringSession from telethon.tl.types import ( User, Chat, Channel, ChatAdminRights, ChatBannedRights, ChannelParticipantsKicked, ChannelParticipantsAdmins, InputChatPhoto, InputChatUploadedPhoto, InputChatPhotoEmpty, InputPeerUser, InputPeerChat, InputPeerChannel, ) import telethon.errors.rpcerrorlist def json_serializer(obj): """Helper function to convert non-serializable objects for JSON serialization.""" if isinstance(obj, datetime): return obj.isoformat() if isinstance(obj, bytes): return obj.decode("utf-8", errors="replace") # Add other non-serializable types as needed raise TypeError(f"Object of type {type(obj)} is not JSON serializable") load_dotenv() TELEGRAM_API_ID = int(os.getenv("TELEGRAM_API_ID")) TELEGRAM_API_HASH = os.getenv("TELEGRAM_API_HASH") TELEGRAM_SESSION_NAME = os.getenv("TELEGRAM_SESSION_NAME") # Check if a string session exists in environment, otherwise use file-based session SESSION_STRING = os.getenv("TELEGRAM_SESSION_STRING") mcp = FastMCP("telegram") if SESSION_STRING: # Use the string session if available client = TelegramClient(StringSession(SESSION_STRING), TELEGRAM_API_ID, TELEGRAM_API_HASH) else: # Use file-based session client = TelegramClient(TELEGRAM_SESSION_NAME, TELEGRAM_API_ID, TELEGRAM_API_HASH) # Setup robust logging with both file and console output logger = logging.getLogger("telegram_mcp") logger.setLevel(logging.ERROR) # Set to ERROR for production, INFO for debugging # Create console handler console_handler = logging.StreamHandler() console_handler.setLevel(logging.ERROR) # Set to ERROR for production, INFO for debugging # Create file handler with absolute path script_dir = os.path.dirname(os.path.abspath(__file__)) log_file_path = os.path.join(script_dir, "mcp_errors.log") try: file_handler = logging.FileHandler(log_file_path, mode="a") # Append mode file_handler.setLevel(logging.ERROR) # Create formatter and add to handlers formatter = logging.Formatter( "%(asctime)s [%(levelname)s] %(name)s - %(message)s - %(filename)s:%(lineno)d" ) console_handler.setFormatter(formatter) file_handler.setFormatter(formatter) # Add handlers to logger logger.addHandler(console_handler) logger.addHandler(file_handler) logger.info(f"Logging initialized to {log_file_path}") except Exception as log_error: print(f"WARNING: Error setting up log file: {log_error}") # Fallback to console-only logging logger.addHandler(console_handler) logger.error(f"Failed to set up log file handler: {log_error}") # Error code prefix mapping for better error tracing ERROR_PREFIXES = { "chat": "CHAT", "msg": "MSG", "contact": "CONTACT", "group": "GROUP", "media": "MEDIA", "profile": "PROFILE", "auth": "AUTH", "admin": "ADMIN", } def log_and_format_error( function_name: str, error: Exception, prefix: str = None, **kwargs ) -> str: """ Centralized error handling function that logs the error and returns a formatted user-friendly message. Args: function_name: Name of the function where error occurred error: The exception that was raised prefix: Error code prefix (e.g., "CHAT", "MSG") - if None, will be derived from function_name **kwargs: Additional context parameters to include in log Returns: A user-friendly error message with error code """ # Generate a consistent error code if prefix is None: # Try to derive prefix from function name for key, value in ERROR_PREFIXES.items(): if key in function_name.lower(): prefix = value break if prefix is None: prefix = "GEN" # Generic prefix if none matches error_code = f"{prefix}-ERR-{abs(hash(function_name)) % 1000:03d}" # Format the additional context parameters context = ", ".join(f"{k}={v}" for k, v in kwargs.items()) # Log the full technical error logger.exception(f"{function_name} failed ({context}): {error}") # Return a user-friendly message return f"An error occurred (code: {error_code}). Check mcp_errors.log for details." def format_entity(entity) -> Dict[str, Any]: """Helper function to format entity information consistently.""" result = {"id": entity.id} if hasattr(entity, "title"): result["name"] = entity.title result["type"] = "group" if isinstance(entity, Chat) else "channel" elif hasattr(entity, "first_name"): name_parts = [] if entity.first_name: name_parts.append(entity.first_name) if hasattr(entity, "last_name") and entity.last_name: name_parts.append(entity.last_name) result["name"] = " ".join(name_parts) result["type"] = "user" if hasattr(entity, "username") and entity.username: result["username"] = entity.username if hasattr(entity, "phone") and entity.phone: result["phone"] = entity.phone return result def format_message(message) -> Dict[str, Any]: """Helper function to format message information consistently.""" result = { "id": message.id, "date": message.date.isoformat(), "text": message.message or "", } if message.from_id: result["from_id"] = utils.get_peer_id(message.from_id) if message.media: result["has_media"] = True result["media_type"] = type(message.media).__name__ return result @mcp.tool() async def get_chats(page: int = 1, page_size: int = 20) -> str: """ Get a paginated list of chats. Args: page: Page number (1-indexed). page_size: Number of chats per page. """ try: dialogs = await client.get_dialogs() start = (page - 1) * page_size end = start + page_size if start >= len(dialogs): return "Page out of range." chats = dialogs[start:end] lines = [] for dialog in chats: entity = dialog.entity chat_id = entity.id title = getattr(entity, "title", None) or getattr(entity, "first_name", "Unknown") lines.append(f"Chat ID: {chat_id}, Title: {title}") return "\n".join(lines) except Exception as e: return log_and_format_error("get_chats", e) @mcp.tool() async def get_messages(chat_id: int, page: int = 1, page_size: int = 20) -> str: """ Get paginated messages from a specific chat. Args: chat_id: The ID of the chat. page: Page number (1-indexed). page_size: Number of messages per page. """ try: entity = await client.get_entity(chat_id) offset = (page - 1) * page_size messages = await client.get_messages(entity, limit=page_size, add_offset=offset) if not messages: return "No messages found for this page." lines = [] for msg in messages: lines.append(f"ID: {msg.id} | Date: {msg.date} | Message: {msg.message}") return "\n".join(lines) except Exception as e: return log_and_format_error( "get_messages", e, chat_id=chat_id, page=page, page_size=page_size ) @mcp.tool() async def send_message(chat_id: int, message: str) -> str: """ Send a message to a specific chat. Args: chat_id: The ID of the chat. message: The message content to send. """ try: entity = await client.get_entity(chat_id) await client.send_message(entity, message) return "Message sent successfully." except Exception as e: return log_and_format_error("send_message", e, chat_id=chat_id) @mcp.tool() async def list_contacts() -> str: """ List all contacts in your Telegram account. """ try: result = await client(functions.contacts.GetContactsRequest(hash=0)) users = result.users if not users: return "No contacts found." lines = [] for user in users: name = f"{getattr(user, 'first_name', '')} {getattr(user, 'last_name', '')}".strip() username = getattr(user, "username", "") phone = getattr(user, "phone", "") contact_info = f"ID: {user.id}, Name: {name}" if username: contact_info += f", Username: @{username}" if phone: contact_info += f", Phone: {phone}" lines.append(contact_info) return "\n".join(lines) except Exception as e: return log_and_format_error("list_contacts", e) @mcp.tool() async def search_contacts(query: str) -> str: """ Search for contacts by name, username, or phone number using Telethon's SearchRequest. Args: query: The search term to look for in contact names, usernames, or phone numbers. """ try: result = await client(functions.contacts.SearchRequest(q=query, limit=50)) users = result.users if not users: return f"No contacts found matching '{query}'." lines = [] for user in users: name = f"{getattr(user, 'first_name', '')} {getattr(user, 'last_name', '')}".strip() username = getattr(user, "username", "") phone = getattr(user, "phone", "") contact_info = f"ID: {user.id}, Name: {name}" if username: contact_info += f", Username: @{username}" if phone: contact_info += f", Phone: {phone}" lines.append(contact_info) return "\n".join(lines) except Exception as e: return log_and_format_error("search_contacts", e, query=query) @mcp.tool() async def get_contact_ids() -> str: """ Get all contact IDs in your Telegram account. """ try: result = await client(functions.contacts.GetContactIDsRequest(hash=0)) if not result: return "No contact IDs found." return "Contact IDs: " + ", ".join(str(cid) for cid in result) except Exception as e: return log_and_format_error("get_contact_ids", e) @mcp.tool() async def list_messages( chat_id: int, limit: int = 20, search_query: str = None, from_date: str = None, to_date: str = None, ) -> str: """ Retrieve messages with optional filters. Args: chat_id: The ID of the chat to get messages from. limit: Maximum number of messages to retrieve. search_query: Filter messages containing this text. from_date: Filter messages starting from this date (format: YYYY-MM-DD). to_date: Filter messages until this date (format: YYYY-MM-DD). """ try: entity = await client.get_entity(chat_id) # Parse date filters if provided from_date_obj = None to_date_obj = None if from_date: try: from_date_obj = datetime.strptime(from_date, "%Y-%m-%d") # Make it timezone aware by adding UTC timezone info # Use datetime.timezone.utc for Python 3.9+ or import timezone directly for 3.13+ try: # For Python 3.9+ from_date_obj = from_date_obj.replace(tzinfo=datetime.timezone.utc) except AttributeError: # For Python 3.13+ from datetime import timezone from_date_obj = from_date_obj.replace(tzinfo=timezone.utc) except ValueError: return f"Invalid from_date format. Use YYYY-MM-DD." if to_date: try: to_date_obj = datetime.strptime(to_date, "%Y-%m-%d") # Set to end of day and make timezone aware to_date_obj = to_date_obj + timedelta(days=1, microseconds=-1) # Add timezone info try: to_date_obj = to_date_obj.replace(tzinfo=datetime.timezone.utc) except AttributeError: from datetime import timezone to_date_obj = to_date_obj.replace(tzinfo=timezone.utc) except ValueError: return f"Invalid to_date format. Use YYYY-MM-DD." # Prepare filter parameters params = {} if search_query: params["search"] = search_query messages = await client.get_messages(entity, limit=limit, **params) # Apply date filters (Telethon doesn't support date filtering in get_messages directly) if from_date_obj or to_date_obj: filtered_messages = [] for msg in messages: if from_date_obj and msg.date < from_date_obj: continue if to_date_obj and msg.date > to_date_obj: continue filtered_messages.append(msg) messages = filtered_messages if not messages: return "No messages found matching the criteria." lines = [] for msg in messages: sender = "" if msg.sender: sender_name = getattr(msg.sender, "first_name", "") or getattr( msg.sender, "title", "Unknown" ) sender = f"{sender_name} | " lines.append( f"ID: {msg.id} | {sender}Date: {msg.date} | Message: {msg.message or '[Media/No text]'}" ) return "\n".join(lines) except Exception as e: return log_and_format_error("list_messages", e, chat_id=chat_id) @mcp.tool() async def list_chats(chat_type: str = None, limit: int = 20) -> str: """ List available chats with metadata. Args: chat_type: Filter by chat type ('user', 'group', 'channel', or None for all) limit: Maximum number of chats to retrieve. """ try: dialogs = await client.get_dialogs(limit=limit) results = [] for dialog in dialogs: entity = dialog.entity # Filter by type if requested current_type = None if isinstance(entity, User): current_type = "user" elif isinstance(entity, Chat): current_type = "group" elif isinstance(entity, Channel): if getattr(entity, "broadcast", False): current_type = "channel" else: current_type = "group" # Supergroup if chat_type and current_type != chat_type.lower(): continue # Format chat info chat_info = f"Chat ID: {entity.id}" if hasattr(entity, "title"): chat_info += f", Title: {entity.title}" elif hasattr(entity, "first_name"): name = f"{entity.first_name}" if hasattr(entity, "last_name") and entity.last_name: name += f" {entity.last_name}" chat_info += f", Name: {name}" chat_info += f", Type: {current_type}" if hasattr(entity, "username") and entity.username: chat_info += f", Username: @{entity.username}" # Add unread count if available if hasattr(dialog, "unread_count") and dialog.unread_count > 0: chat_info += f", Unread: {dialog.unread_count}" results.append(chat_info) if not results: return f"No chats found matching the criteria." return "\n".join(results) except Exception as e: return log_and_format_error("list_chats", e, chat_type=chat_type, limit=limit) @mcp.tool() async def get_chat(chat_id: int) -> str: """ Get detailed information about a specific chat. Args: chat_id: The ID of the chat. """ try: entity = await client.get_entity(chat_id) result = [] result.append(f"ID: {entity.id}") is_channel = isinstance(entity, Channel) is_chat = isinstance(entity, Chat) is_user = isinstance(entity, User) if hasattr(entity, "title"): result.append(f"Title: {entity.title}") chat_type = ( "Channel" if is_channel and getattr(entity, "broadcast", False) else "Group" ) if is_channel and getattr(entity, "megagroup", False): chat_type = "Supergroup" elif is_chat: chat_type = "Group (Basic)" result.append(f"Type: {chat_type}") if hasattr(entity, "username") and entity.username: result.append(f"Username: @{entity.username}") # Fetch participants count reliably try: participants_count = (await client.get_participants(entity, limit=0)).total result.append(f"Participants: {participants_count}") except Exception as pe: result.append(f"Participants: Error fetching ({pe})") elif is_user: name = f"{entity.first_name}" if entity.last_name: name += f" {entity.last_name}" result.append(f"Name: {name}") result.append(f"Type: User") if entity.username: result.append(f"Username: @{entity.username}") if entity.phone: result.append(f"Phone: {entity.phone}") result.append(f"Bot: {'Yes' if entity.bot else 'No'}") result.append(f"Verified: {'Yes' if entity.verified else 'No'}") # Get last activity if it's a dialog try: # Using get_dialogs might be slow if there are many dialogs # Alternative: Get entity again via get_dialogs if needed for unread count dialog = await client.get_dialogs(limit=1, offset_id=0, offset_peer=entity) if dialog: dialog = dialog[0] result.append(f"Unread Messages: {dialog.unread_count}") if dialog.message: last_msg = dialog.message sender_name = "Unknown" if last_msg.sender: sender_name = getattr(last_msg.sender, "first_name", "") or getattr( last_msg.sender, "title", "Unknown" ) if hasattr(last_msg.sender, "last_name") and last_msg.sender.last_name: sender_name += f" {last_msg.sender.last_name}" sender_name = sender_name.strip() or "Unknown" result.append(f"Last Message: From {sender_name} at {last_msg.date}") result.append(f"Message: {last_msg.message or '[Media/No text]'}") except Exception as diag_ex: logger.warning(f"Could not get dialog info for {chat_id}: {diag_ex}") pass return "\n".join(result) except Exception as e: return log_and_format_error("get_chat", e, chat_id=chat_id) @mcp.tool() async def get_direct_chat_by_contact(contact_query: str) -> str: """ Find a direct chat with a specific contact by name, username, or phone. Args: contact_query: Name, username, or phone number to search for. """ try: # Fetch all contacts using the correct Telethon method result = await client(functions.contacts.GetContactsRequest(hash=0)) contacts = result.users found_contacts = [] for contact in contacts: if not contact: continue name = ( f"{getattr(contact, 'first_name', '')} {getattr(contact, 'last_name', '')}".strip() ) username = getattr(contact, "username", "") phone = getattr(contact, "phone", "") if ( contact_query.lower() in name.lower() or (username and contact_query.lower() in username.lower()) or (phone and contact_query in phone) ): found_contacts.append(contact) if not found_contacts: return f"No contacts found matching '{contact_query}'." # If we found contacts, look for direct chats with them results = [] dialogs = await client.get_dialogs() for contact in found_contacts: contact_name = ( f"{getattr(contact, 'first_name', '')} {getattr(contact, 'last_name', '')}".strip() ) for dialog in dialogs: if isinstance(dialog.entity, User) and dialog.entity.id == contact.id: chat_info = f"Chat ID: {dialog.entity.id}, Contact: {contact_name}" if getattr(contact, "username", ""): chat_info += f", Username: @{contact.username}" if dialog.unread_count: chat_info += f", Unread: {dialog.unread_count}" results.append(chat_info) break if not results: found_names = ", ".join( [f"{c.first_name} {c.last_name}".strip() for c in found_contacts] ) return f"Found contacts: {found_names}, but no direct chats were found with them." return "\n".join(results) except Exception as e: return log_and_format_error("get_direct_chat_by_contact", e, contact_query=contact_query) @mcp.tool() async def get_contact_chats(contact_id: int) -> str: """ List all chats involving a specific contact. Args: contact_id: The ID of the contact. """ try: # Get contact info contact = await client.get_entity(contact_id) if not isinstance(contact, User): return f"ID {contact_id} is not a user/contact." contact_name = ( f"{getattr(contact, 'first_name', '')} {getattr(contact, 'last_name', '')}".strip() ) # Find direct chat direct_chat = None dialogs = await client.get_dialogs() results = [] # Look for direct chat for dialog in dialogs: if isinstance(dialog.entity, User) and dialog.entity.id == contact_id: chat_info = f"Direct Chat ID: {dialog.entity.id}, Type: Private" if dialog.unread_count: chat_info += f", Unread: {dialog.unread_count}" results.append(chat_info) break # Look for common groups/channels common_chats = [] try: common = await client.get_common_chats(contact) for chat in common: chat_type = "Channel" if getattr(chat, "broadcast", False) else "Group" chat_info = f"Chat ID: {chat.id}, Title: {chat.title}, Type: {chat_type}" results.append(chat_info) except: results.append("Could not retrieve common groups.") if not results: return f"No chats found with {contact_name} (ID: {contact_id})." return f"Chats with {contact_name} (ID: {contact_id}):\n" + "\n".join(results) except Exception as e: return log_and_format_error("get_contact_chats", e, contact_id=contact_id) @mcp.tool() async def get_last_interaction(contact_id: int) -> str: """ Get the most recent message with a contact. Args: contact_id: The ID of the contact. """ try: # Get contact info contact = await client.get_entity(contact_id) if not isinstance(contact, User): return f"ID {contact_id} is not a user/contact." contact_name = ( f"{getattr(contact, 'first_name', '')} {getattr(contact, 'last_name', '')}".strip() ) # Get the last few messages messages = await client.get_messages(contact, limit=5) if not messages: return f"No messages found with {contact_name} (ID: {contact_id})." results = [f"Last interactions with {contact_name} (ID: {contact_id}):"] for msg in messages: sender = "You" if msg.out else contact_name message_text = msg.message or "[Media/No text]" results.append(f"Date: {msg.date}, From: {sender}, Message: {message_text}") return "\n".join(results) except Exception as e: return log_and_format_error("get_last_interaction", e, contact_id=contact_id) @mcp.tool() async def get_message_context(chat_id: int, message_id: int, context_size: int = 3) -> str: """ Retrieve context around a specific message. Args: chat_id: The ID of the chat. message_id: The ID of the central message. context_size: Number of messages before and after to include. """ try: chat = await client.get_entity(chat_id) # Get messages around the specified message messages_before = await client.get_messages(chat, limit=context_size, max_id=message_id) central_message = await client.get_messages(chat, ids=message_id) # Fix: get_messages(ids=...) returns a single Message, not a list if central_message is not None and not isinstance(central_message, list): central_message = [central_message] elif central_message is None: central_message = [] messages_after = await client.get_messages( chat, limit=context_size, min_id=message_id, reverse=True ) if not central_message: return f"Message with ID {message_id} not found in chat {chat_id}." # Combine messages in chronological order all_messages = list(messages_before) + list(central_message) + list(messages_after) all_messages.sort(key=lambda m: m.id) results = [f"Context for message {message_id} in chat {chat_id}:"] for msg in all_messages: sender_name = "Unknown" if msg.sender: sender_name = getattr(msg.sender, "first_name", "") or getattr( msg.sender, "title", "Unknown" ) highlight = " [THIS MESSAGE]" if msg.id == message_id else "" results.append( f"ID: {msg.id} | {sender_name} | {msg.date}{highlight}\n{msg.message or '[Media/No text]'}\n" ) return "\n".join(results) except Exception as e: return log_and_format_error( "get_message_context", e, chat_id=chat_id, message_id=message_id, context_size=context_size, ) @mcp.tool() async def add_contact(phone: str, first_name: str, last_name: str = "") -> str: """ Add a new contact to your Telegram account. Args: phone: The phone number of the contact (with country code). first_name: The contact's first name. last_name: The contact's last name (optional). """ try: # Try to import the required types first from telethon.tl.types import InputPhoneContact result = await client( functions.contacts.ImportContactsRequest( contacts=[ InputPhoneContact( client_id=0, phone=phone, first_name=first_name, last_name=last_name ) ] ) ) if result.imported: return f"Contact {first_name} {last_name} added successfully." else: return f"Contact not added. Response: {str(result)}" except (ImportError, AttributeError) as type_err: # Try alternative approach using raw API try: result = await client( functions.contacts.ImportContactsRequest( contacts=[ { "client_id": 0, "phone": phone, "first_name": first_name, "last_name": last_name, } ] ) ) if hasattr(result, "imported") and result.imported: return f"Contact {first_name} {last_name} added successfully (alt method)." else: return f"Contact not added. Alternative method response: {str(result)}" except Exception as alt_e: logger.exception(f"add_contact (alt method) failed (phone={phone})") return log_and_format_error("add_contact", alt_e, phone=phone) except Exception as e: logger.exception(f"add_contact failed (phone={phone})") return log_and_format_error("add_contact", e, phone=phone) @mcp.tool() async def delete_contact(user_id: int) -> str: """ Delete a contact by user ID. Args: user_id: The Telegram user ID of the contact to delete. """ try: user = await client.get_entity(user_id) await client(functions.contacts.DeleteContactsRequest(id=[user])) return f"Contact with user ID {user_id} deleted." except Exception as e: return log_and_format_error("delete_contact", e, user_id=user_id) @mcp.tool() async def block_user(user_id: int) -> str: """ Block a user by user ID. Args: user_id: The Telegram user ID to block. """ try: user = await client.get_entity(user_id) await client(functions.contacts.BlockRequest(id=user)) return f"User {user_id} blocked." except Exception as e: return log_and_format_error("block_user", e, user_id=user_id) @mcp.tool() async def unblock_user(user_id: int) -> str: """ Unblock a user by user ID. Args: user_id: The Telegram user ID to unblock. """ try: user = await client.get_entity(user_id) await client(functions.contacts.UnblockRequest(id=user)) return f"User {user_id} unblocked." except Exception as e: return log_and_format_error("unblock_user", e, user_id=user_id) @mcp.tool() async def get_me() -> str: """ Get your own user information. """ try: me = await client.get_me() return json.dumps(format_entity(me), indent=2) except Exception as e: return log_and_format_error("get_me", e) @mcp.tool() async def create_group(title: str, user_ids: list) -> str: """ Create a new group or supergroup and add users. Args: title: Title for the new group user_ids: List of user IDs to add to the group """ try: # Convert user IDs to entities users = [] for user_id in user_ids: try: user = await client.get_entity(user_id) users.append(user) except Exception as e: logger.error(f"Failed to get entity for user ID {user_id}: {e}") return f"Error: Could not find user with ID {user_id}" if not users: return "Error: No valid users provided" # Create the group with the users try: # Create a new chat with selected users result = await client(functions.messages.CreateChatRequest(users=users, title=title)) # Check what type of response we got if hasattr(result, "chats") and result.chats: created_chat = result.chats[0] return f"Group created with ID: {created_chat.id}" elif hasattr(result, "chat") and result.chat: return f"Group created with ID: {result.chat.id}" elif hasattr(result, "chat_id"): return f"Group created with ID: {result.chat_id}" else: # If we can't determine the chat ID directly from the result # Try to find it in recent dialogs await asyncio.sleep(1) # Give Telegram a moment to register the new group dialogs = await client.get_dialogs(limit=5) # Get recent dialogs for dialog in dialogs: if dialog.title == title: return f"Group created with ID: {dialog.id}" # If we still can't find it, at least return success return f"Group created successfully. Please check your recent chats for '{title}'." except Exception as create_err: if "PEER_FLOOD" in str(create_err): return "Error: Cannot create group due to Telegram limits. Try again later." else: raise # Let the outer exception handler catch it except Exception as e: logger.exception(f"create_group failed (title={title}, user_ids={user_ids})") return log_and_format_error("create_group", e, title=title, user_ids=user_ids) @mcp.tool() async def invite_to_group(group_id: int, user_ids: list) -> str: """ Invite users to a group or channel. Args: group_id: The ID of the group/channel. user_ids: List of user IDs to invite. """ try: entity = await client.get_entity(group_id) users_to_add = [] for user_id in user_ids: try: user = await client.get_entity(user_id) users_to_add.append(user) except ValueError as e: return f"Error: User with ID {user_id} could not be found. {e}" try: result = await client( functions.channels.InviteToChannelRequest(channel=entity, users=users_to_add) ) invited_count = 0 if hasattr(result, "users") and result.users: invited_count = len(result.users) elif hasattr(result, "count"): invited_count = result.count return f"Successfully invited {invited_count} users to {entity.title}" except telethon.errors.rpcerrorlist.UserNotMutualContactError: return "Error: Cannot invite users who are not mutual contacts. Please ensure the users are in your contacts and have added you back." except telethon.errors.rpcerrorlist.UserPrivacyRestrictedError: return ( "Error: One or more users have privacy settings that prevent you from adding them." ) except Exception as e: return log_and_format_error("invite_to_group", e, group_id=group_id, user_ids=user_ids) except Exception as e: logger.error( f"telegram_mcp invite_to_group failed (group_id={group_id}, user_ids={user_ids})", exc_info=True, ) return log_and_format_error("invite_to_group", e, group_id=group_id, user_ids=user_ids) @mcp.tool() async def leave_chat(chat_id: int) -> str: """ Leave a group or channel by chat ID. Args: chat_id: The chat ID to leave. """ try: entity = await client.get_entity(chat_id) # Check the entity type carefully if isinstance(entity, Channel): # Handle both channels and supergroups (which are also channels in Telegram) try: await client(functions.channels.LeaveChannelRequest(channel=entity)) chat_name = getattr(entity, "title", str(chat_id)) return f"Left channel/supergroup {chat_name} (ID: {chat_id})." except Exception as chan_err: return log_and_format_error("leave_chat", chan_err, chat_id=chat_id) elif isinstance(entity, Chat): # Traditional basic groups (not supergroups) try: # First try with InputPeerUser me = await client.get_me(input_peer=True) await client( functions.messages.DeleteChatUserRequest( chat_id=entity.id, user_id=me # Use the entity ID directly ) ) chat_name = getattr(entity, "title", str(chat_id)) return f"Left basic group {chat_name} (ID: {chat_id})." except Exception as chat_err: # If the above fails, try the second approach logger.warning( f"First leave attempt failed: {chat_err}, trying alternative method" ) try: # Alternative approach - sometimes this works better me_full = await client.get_me() await client( functions.messages.DeleteChatUserRequest( chat_id=entity.id, user_id=me_full.id ) ) chat_name = getattr(entity, "title", str(chat_id)) return f"Left basic group {chat_name} (ID: {chat_id})." except Exception as alt_err: return log_and_format_error("leave_chat", alt_err, chat_id=chat_id) else: # Cannot leave a user chat this way entity_type = type(entity).__name__ return log_and_format_error( "leave_chat", Exception( f"Cannot leave chat ID {chat_id} of type {entity_type}. This function is for groups and channels only." ), chat_id=chat_id, ) except Exception as e: logger.exception(f"leave_chat failed (chat_id={chat_id})") # Provide helpful hint for common errors error_str = str(e).lower() if "invalid" in error_str and "chat" in error_str: return log_and_format_error( "leave_chat", Exception( f"Error leaving chat: This appears to be a channel/supergroup. Please check the chat ID and try again." ), chat_id=chat_id, ) return log_and_format_error("leave_chat", e, chat_id=chat_id) @mcp.tool() async def get_participants(chat_id: int) -> str: """ List all participants in a group or channel. Args: chat_id: The group or channel ID. """ try: participants = await client.get_participants(chat_id) lines = [ f"ID: {p.id}, Name: {getattr(p, 'first_name', '')} {getattr(p, 'last_name', '')}" for p in participants ] return "\n".join(lines) except Exception as e: return log_and_format_error("get_participants", e, chat_id=chat_id) @mcp.tool() async def send_file(chat_id: int, file_path: str, caption: str = None) -> str: """ Send a file to a chat. Args: chat_id: The chat ID. file_path: Absolute path to the file to send (must exist and be readable). caption: Optional caption for the file. """ try: if not os.path.isfile(file_path): return f"File not found: {file_path}" if not os.access(file_path, os.R_OK): return f"File is not readable: {file_path}" entity = await client.get_entity(chat_id) await client.send_file(entity, file_path, caption=caption) return f"File sent to chat {chat_id}." except Exception as e: return log_and_format_error( "send_file", e, chat_id=chat_id, file_path=file_path, caption=caption ) @mcp.tool() async def download_media(chat_id: int, message_id: int, file_path: str) -> str: """ Download media from a message in a chat. Args: chat_id: The chat ID. message_id: The message ID containing the media. file_path: Absolute path to save the downloaded file (must be writable). """ try: entity = await client.get_entity(chat_id) msg = await client.get_messages(entity, ids=message_id) if not msg or not msg.media: return "No media found in the specified message." # Check if directory is writable dir_path = os.path.dirname(file_path) or "." if not os.access(dir_path, os.W_OK): return f"Directory not writable: {dir_path}" await client.download_media(msg, file=file_path) if not os.path.isfile(file_path): return f"Download failed: file not created at {file_path}" return f"Media downloaded to {file_path}." except Exception as e: return log_and_format_error( "download_media", e, chat_id=chat_id, message_id=message_id, file_path=file_path ) @mcp.tool() async def update_profile(first_name: str = None, last_name: str = None, about: str = None) -> str: """ Update your profile information (name, bio). """ try: await client( functions.account.UpdateProfileRequest( first_name=first_name, last_name=last_name, about=about ) ) return "Profile updated." except Exception as e: return log_and_format_error( "update_profile", e, first_name=first_name, last_name=last_name, about=about ) @mcp.tool() async def set_profile_photo(file_path: str) -> str: """ Set a new profile photo. """ try: await client( functions.photos.UploadProfilePhotoRequest(file=await client.upload_file(file_path)) ) return "Profile photo updated." except Exception as e: return log_and_format_error("set_profile_photo", e, file_path=file_path) @mcp.tool() async def delete_profile_photo() -> str: """ Delete your current profile photo. """ try: photos = await client( functions.photos.GetUserPhotosRequest(user_id="me", offset=0, max_id=0, limit=1) ) if not photos.photos: return "No profile photo to delete." await client(functions.photos.DeletePhotosRequest(id=[photos.photos[0].id])) return "Profile photo deleted." except Exception as e: return log_and_format_error("delete_profile_photo", e) @mcp.tool() async def get_privacy_settings() -> str: """ Get your privacy settings for last seen status. """ try: # Import needed types directly from telethon.tl.types import InputPrivacyKeyStatusTimestamp try: settings = await client( functions.account.GetPrivacyRequest(key=InputPrivacyKeyStatusTimestamp()) ) return str(settings) except TypeError as e: if "TLObject was expected" in str(e): return "Error: Privacy settings API call failed due to type mismatch. This is likely a version compatibility issue with Telethon." else: raise except Exception as e: logger.exception("get_privacy_settings failed") return log_and_format_error("get_privacy_settings", e) @mcp.tool() async def set_privacy_settings( key: str, allow_users: list = None, disallow_users: list = None ) -> str: """ Set privacy settings (e.g., last seen, phone, etc.). Args: key: The privacy setting to modify ('status' for last seen, 'phone', 'profile_photo', etc.) allow_users: List of user IDs to allow disallow_users: List of user IDs to disallow """ try: # Import needed types from telethon.tl.types import ( InputPrivacyKeyStatusTimestamp, InputPrivacyKeyPhoneNumber, InputPrivacyKeyProfilePhoto, InputPrivacyValueAllowUsers, InputPrivacyValueDisallowUsers, InputPrivacyValueAllowAll, InputPrivacyValueDisallowAll, ) # Map the simplified keys to their corresponding input types key_mapping = { "status": InputPrivacyKeyStatusTimestamp, "phone": InputPrivacyKeyPhoneNumber, "profile_photo": InputPrivacyKeyProfilePhoto, } # Get the appropriate key class if key not in key_mapping: return f"Error: Unsupported privacy key '{key}'. Supported keys: {', '.join(key_mapping.keys())}" privacy_key = key_mapping[key]() # Prepare the rules rules = [] # Process allow rules if allow_users is None or len(allow_users) == 0: # If no specific users to allow, allow everyone by default rules.append(InputPrivacyValueAllowAll()) else: # Convert user IDs to InputUser entities try: allow_entities = [] for user_id in allow_users: try: user = await client.get_entity(user_id) allow_entities.append(user) except Exception as user_err: logger.warning(f"Could not get entity for user ID {user_id}: {user_err}") if allow_entities: rules.append(InputPrivacyValueAllowUsers(users=allow_entities)) except Exception as allow_err: logger.error(f"Error processing allowed users: {allow_err}") return log_and_format_error("set_privacy_settings", allow_err, key=key) # Process disallow rules if disallow_users and len(disallow_users) > 0: try: disallow_entities = [] for user_id in disallow_users: try: user = await client.get_entity(user_id) disallow_entities.append(user) except Exception as user_err: logger.warning(f"Could not get entity for user ID {user_id}: {user_err}") if disallow_entities: rules.append(InputPrivacyValueDisallowUsers(users=disallow_entities)) except Exception as disallow_err: logger.error(f"Error processing disallowed users: {disallow_err}") return log_and_format_error("set_privacy_settings", disallow_err, key=key) # Apply the privacy settings try: result = await client( functions.account.SetPrivacyRequest(key=privacy_key, rules=rules) ) return f"Privacy settings for {key} updated successfully." except TypeError as type_err: if "TLObject was expected" in str(type_err): return "Error: Privacy settings API call failed due to type mismatch. This is likely a version compatibility issue with Telethon." else: raise except Exception as e: logger.exception(f"set_privacy_settings failed (key={key})") return log_and_format_error("set_privacy_settings", e, key=key) @mcp.tool() async def import_contacts(contacts: list) -> str: """ Import a list of contacts. Each contact should be a dict with phone, first_name, last_name. """ try: input_contacts = [ functions.contacts.InputPhoneContact( client_id=i, phone=c["phone"], first_name=c["first_name"], last_name=c.get("last_name", ""), ) for i, c in enumerate(contacts) ] result = await client(functions.contacts.ImportContactsRequest(contacts=input_contacts)) return f"Imported {len(result.imported)} contacts." except Exception as e: return log_and_format_error("import_contacts", e, contacts=contacts) @mcp.tool() async def export_contacts() -> str: """ Export all contacts as a JSON string. """ try: result = await client(functions.contacts.GetContactsRequest(hash=0)) users = result.users return json.dumps([format_entity(u) for u in users], indent=2) except Exception as e: return log_and_format_error("export_contacts", e) @mcp.tool() async def get_blocked_users() -> str: """ Get a list of blocked users. """ try: result = await client(functions.contacts.GetBlockedRequest(offset=0, limit=100)) return json.dumps([format_entity(u) for u in result.users], indent=2) except Exception as e: return log_and_format_error("get_blocked_users", e) @mcp.tool() async def create_channel(title: str, about: str = "", megagroup: bool = False) -> str: """ Create a new channel or supergroup. """ try: result = await client( functions.channels.CreateChannelRequest(title=title, about=about, megagroup=megagroup) ) return f"Channel '{title}' created with ID: {result.chats[0].id}" except Exception as e: return log_and_format_error( "create_channel", e, title=title, about=about, megagroup=megagroup ) @mcp.tool() async def edit_chat_title(chat_id: int, title: str) -> str: """ Edit the title of a chat, group, or channel. """ try: entity = await client.get_entity(chat_id) if isinstance(entity, Channel): await client(functions.channels.EditTitleRequest(channel=entity, title=title)) elif isinstance(entity, Chat): await client(functions.messages.EditChatTitleRequest(chat_id=chat_id, title=title)) else: return f"Cannot edit title for this entity type ({type(entity)})." return f"Chat {chat_id} title updated to '{title}'." except Exception as e: logger.exception(f"edit_chat_title failed (chat_id={chat_id}, title='{title}')") return log_and_format_error("edit_chat_title", e, chat_id=chat_id, title=title) @mcp.tool() async def edit_chat_photo(chat_id: int, file_path: str) -> str: """ Edit the photo of a chat, group, or channel. Requires a file path to an image. """ try: if not os.path.isfile(file_path): return f"Photo file not found: {file_path}" if not os.access(file_path, os.R_OK): return f"Photo file not readable: {file_path}" entity = await client.get_entity(chat_id) uploaded_file = await client.upload_file(file_path) if isinstance(entity, Channel): # For channels/supergroups, use EditPhotoRequest with InputChatUploadedPhoto input_photo = InputChatUploadedPhoto(file=uploaded_file) await client(functions.channels.EditPhotoRequest(channel=entity, photo=input_photo)) elif isinstance(entity, Chat): # For basic groups, use EditChatPhotoRequest with InputChatUploadedPhoto input_photo = InputChatUploadedPhoto(file=uploaded_file) await client( functions.messages.EditChatPhotoRequest(chat_id=chat_id, photo=input_photo) ) else: return f"Cannot edit photo for this entity type ({type(entity)})." return f"Chat {chat_id} photo updated." except Exception as e: logger.exception(f"edit_chat_photo failed (chat_id={chat_id}, file_path='{file_path}')") return log_and_format_error("edit_chat_photo", e, chat_id=chat_id, file_path=file_path) @mcp.tool() async def delete_chat_photo(chat_id: int) -> str: """ Delete the photo of a chat, group, or channel. """ try: entity = await client.get_entity(chat_id) if isinstance(entity, Channel): # Use InputChatPhotoEmpty for channels/supergroups await client( functions.channels.EditPhotoRequest(channel=entity, photo=InputChatPhotoEmpty()) ) elif isinstance(entity, Chat): # Use None (or InputChatPhotoEmpty) for basic groups await client( functions.messages.EditChatPhotoRequest( chat_id=chat_id, photo=InputChatPhotoEmpty() ) ) else: return f"Cannot delete photo for this entity type ({type(entity)})." return f"Chat {chat_id} photo deleted." except Exception as e: logger.exception(f"delete_chat_photo failed (chat_id={chat_id})") return log_and_format_error("delete_chat_photo", e, chat_id=chat_id) @mcp.tool() async def promote_admin(group_id: int, user_id: int, rights: dict = None) -> str: """ Promote a user to admin in a group/channel. Args: group_id: ID of the group/channel user_id: User ID to promote rights: Admin rights to give (optional) """ try: chat = await client.get_entity(group_id) user = await client.get_entity(user_id) # Set default admin rights if not provided if not rights: rights = { "change_info": True, "post_messages": True, "edit_messages": True, "delete_messages": True, "ban_users": True, "invite_users": True, "pin_messages": True, "add_admins": False, "anonymous": False, "manage_call": True, "other": True, } admin_rights = ChatAdminRights( change_info=rights.get("change_info", True), post_messages=rights.get("post_messages", True), edit_messages=rights.get("edit_messages", True), delete_messages=rights.get("delete_messages", True), ban_users=rights.get("ban_users", True), invite_users=rights.get("invite_users", True), pin_messages=rights.get("pin_messages", True), add_admins=rights.get("add_admins", False), anonymous=rights.get("anonymous", False), manage_call=rights.get("manage_call", True), other=rights.get("other", True), ) try: result = await client( functions.channels.EditAdminRequest( channel=chat, user_id=user, admin_rights=admin_rights, rank="Admin" ) ) return f"Successfully promoted user {user_id} to admin in {chat.title}" except telethon.errors.rpcerrorlist.UserNotMutualContactError: return "Error: Cannot promote users who are not mutual contacts. Please ensure the user is in your contacts and has added you back." except Exception as e: return log_and_format_error("promote_admin", e, group_id=group_id, user_id=user_id) except Exception as e: logger.error( f"telegram_mcp promote_admin failed (group_id={group_id}, user_id={user_id})", exc_info=True, ) return log_and_format_error("promote_admin", e, group_id=group_id, user_id=user_id) @mcp.tool() async def demote_admin(group_id: int, user_id: int) -> str: """ Demote a user from admin in a group/channel. Args: group_id: ID of the group/channel user_id: User ID to demote """ try: chat = await client.get_entity(group_id) user = await client.get_entity(user_id) # Create empty admin rights (regular user) admin_rights = ChatAdminRights( change_info=False, post_messages=False, edit_messages=False, delete_messages=False, ban_users=False, invite_users=False, pin_messages=False, add_admins=False, anonymous=False, manage_call=False, other=False, ) try: result = await client( functions.channels.EditAdminRequest( channel=chat, user_id=user, admin_rights=admin_rights, rank="" ) ) return f"Successfully demoted user {user_id} from admin in {chat.title}" except telethon.errors.rpcerrorlist.UserNotMutualContactError: return "Error: Cannot modify admin status of users who are not mutual contacts. Please ensure the user is in your contacts and has added you back." except Exception as e: return log_and_format_error("demote_admin", e, group_id=group_id, user_id=user_id) except Exception as e: logger.error( f"telegram_mcp demote_admin failed (group_id={group_id}, user_id={user_id})", exc_info=True, ) return log_and_format_error("demote_admin", e, group_id=group_id, user_id=user_id) @mcp.tool() async def ban_user(chat_id: int, user_id: int) -> str: """ Ban a user from a group or channel. Args: chat_id: ID of the group/channel user_id: User ID to ban """ try: chat = await client.get_entity(chat_id) user = await client.get_entity(user_id) # Create banned rights (all restrictions enabled) banned_rights = ChatBannedRights( until_date=None, # Ban forever view_messages=True, send_messages=True, send_media=True, send_stickers=True, send_gifs=True, send_games=True, send_inline=True, embed_links=True, send_polls=True, change_info=True, invite_users=True, pin_messages=True, ) try: await client( functions.channels.EditBannedRequest( channel=chat, participant=user, banned_rights=banned_rights ) ) return f"User {user_id} banned from chat {chat.title} (ID: {chat_id})." except telethon.errors.rpcerrorlist.UserNotMutualContactError: return "Error: Cannot ban users who are not mutual contacts. Please ensure the user is in your contacts and has added you back." except Exception as e: return log_and_format_error("ban_user", e, chat_id=chat_id, user_id=user_id) except Exception as e: logger.exception(f"ban_user failed (chat_id={chat_id}, user_id={user_id})") return log_and_format_error("ban_user", e, chat_id=chat_id, user_id=user_id) @mcp.tool() async def unban_user(chat_id: int, user_id: int) -> str: """ Unban a user from a group or channel. Args: chat_id: ID of the group/channel user_id: User ID to unban """ try: chat = await client.get_entity(chat_id) user = await client.get_entity(user_id) # Create unbanned rights (no restrictions) unbanned_rights = ChatBannedRights( until_date=None, view_messages=False, send_messages=False, send_media=False, send_stickers=False, send_gifs=False, send_games=False, send_inline=False, embed_links=False, send_polls=False, change_info=False, invite_users=False, pin_messages=False, ) try: await client( functions.channels.EditBannedRequest( channel=chat, participant=user, banned_rights=unbanned_rights ) ) return f"User {user_id} unbanned from chat {chat.title} (ID: {chat_id})." except telethon.errors.rpcerrorlist.UserNotMutualContactError: return "Error: Cannot modify status of users who are not mutual contacts. Please ensure the user is in your contacts and has added you back." except Exception as e: return log_and_format_error("unban_user", e, chat_id=chat_id, user_id=user_id) except Exception as e: logger.exception(f"unban_user failed (chat_id={chat_id}, user_id={user_id})") return log_and_format_error("unban_user", e, chat_id=chat_id, user_id=user_id) @mcp.tool() async def get_admins(chat_id: int) -> str: """ Get all admins in a group or channel. """ try: # Fix: Use the correct filter type ChannelParticipantsAdmins participants = await client.get_participants(chat_id, filter=ChannelParticipantsAdmins()) lines = [ f"ID: {p.id}, Name: {getattr(p, 'first_name', '')} {getattr(p, 'last_name', '')}".strip() for p in participants ] return "\n".join(lines) if lines else "No admins found." except Exception as e: logger.exception(f"get_admins failed (chat_id={chat_id})") return log_and_format_error("get_admins", e, chat_id=chat_id) @mcp.tool() async def get_banned_users(chat_id: int) -> str: """ Get all banned users in a group or channel. """ try: # Fix: Use the correct filter type ChannelParticipantsKicked participants = await client.get_participants( chat_id, filter=ChannelParticipantsKicked(q="") ) lines = [ f"ID: {p.id}, Name: {getattr(p, 'first_name', '')} {getattr(p, 'last_name', '')}".strip() for p in participants ] return "\n".join(lines) if lines else "No banned users found." except Exception as e: logger.exception(f"get_banned_users failed (chat_id={chat_id})") return log_and_format_error("get_banned_users", e, chat_id=chat_id) @mcp.tool() async def get_invite_link(chat_id: int) -> str: """ Get the invite link for a group or channel. """ try: entity = await client.get_entity(chat_id) # Try using ExportChatInviteRequest first try: from telethon.tl import functions result = await client(functions.messages.ExportChatInviteRequest(peer=entity)) return result.link except AttributeError: # If the function doesn't exist in the current Telethon version logger.warning("ExportChatInviteRequest not available, using alternative method") except Exception as e1: # If that fails, log and try alternative approach logger.warning(f"ExportChatInviteRequest failed: {e1}") # Alternative approach using client.export_chat_invite_link try: invite_link = await client.export_chat_invite_link(entity) return invite_link except Exception as e2: logger.warning(f"export_chat_invite_link failed: {e2}") # Last resort: Try directly fetching chat info try: if isinstance(entity, (Chat, Channel)): full_chat = await client(functions.messages.GetFullChatRequest(chat_id=entity.id)) if hasattr(full_chat, "full_chat") and hasattr(full_chat.full_chat, "invite_link"): return full_chat.full_chat.invite_link or "No invite link available." except Exception as e3: logger.warning(f"GetFullChatRequest failed: {e3}") return "Could not retrieve invite link for this chat." except Exception as e: logger.exception(f"get_invite_link failed (chat_id={chat_id})") return log_and_format_error("get_invite_link", e, chat_id=chat_id) @mcp.tool() async def join_chat_by_link(link: str) -> str: """ Join a chat by invite link. """ try: # Extract the hash from the invite link if "/" in link: hash_part = link.split("/")[-1] if hash_part.startswith("+"): hash_part = hash_part[1:] # Remove the '+' if present else: hash_part = link # Try checking the invite before joining try: from telethon.errors import ( InviteHashExpiredError, InviteHashInvalidError, UserAlreadyParticipantError, ChatAdminRequiredError, UsersTooMuchError, ) # Try to check invite info first (will often fail if not a member) invite_info = await client(functions.messages.CheckChatInviteRequest(hash=hash_part)) if hasattr(invite_info, "chat") and invite_info.chat: # If we got chat info, we're already a member chat_title = getattr(invite_info.chat, "title", "Unknown Chat") return f"You are already a member of this chat: {chat_title}" except Exception as check_err: # This often fails if not a member - just continue pass # Join the chat using the hash try: result = await client(functions.messages.ImportChatInviteRequest(hash=hash_part)) if result and hasattr(result, "chats") and result.chats: chat_title = getattr(result.chats[0], "title", "Unknown Chat") return f"Successfully joined chat: {chat_title}" return f"Joined chat via invite hash." except Exception as join_err: err_str = str(join_err).lower() if "expired" in err_str: return "The invite hash has expired and is no longer valid." elif "invalid" in err_str: return "The invite hash is invalid or malformed." elif "already" in err_str and "participant" in err_str: return "You are already a member of this chat." elif "admin" in err_str: return "Cannot join this chat - requires admin approval." elif "too much" in err_str or "too many" in err_str: return "Cannot join this chat - it has reached maximum number of participants." else: raise # Re-raise to be caught by the outer exception handler except Exception as e: logger.exception(f"join_chat_by_link failed (link={link})") return log_and_format_error("join_chat_by_link", e, link=link) @mcp.tool() async def export_chat_invite(chat_id: int) -> str: """ Export a chat invite link. """ try: entity = await client.get_entity(chat_id) # Try using ExportChatInviteRequest first try: from telethon.tl import functions result = await client(functions.messages.ExportChatInviteRequest(peer=entity)) return result.link except AttributeError: # If the function doesn't exist in the current Telethon version logger.warning("ExportChatInviteRequest not available, using alternative method") except Exception as e1: # If that fails, log and try alternative approach logger.warning(f"ExportChatInviteRequest failed: {e1}") # Alternative approach using client.export_chat_invite_link try: invite_link = await client.export_chat_invite_link(entity) return invite_link except Exception as e2: logger.warning(f"export_chat_invite_link failed: {e2}") return log_and_format_error("export_chat_invite", e2, chat_id=chat_id) except Exception as e: logger.exception(f"export_chat_invite failed (chat_id={chat_id})") return log_and_format_error("export_chat_invite", e, chat_id=chat_id) @mcp.tool() async def import_chat_invite(hash: str) -> str: """ Import a chat invite by hash. """ try: # Remove any prefixes like '+' if present if hash.startswith("+"): hash = hash[1:] # Try checking the invite before joining try: from telethon.errors import ( InviteHashExpiredError, InviteHashInvalidError, UserAlreadyParticipantError, ChatAdminRequiredError, UsersTooMuchError, ) # Try to check invite info first (will often fail if not a member) invite_info = await client(functions.messages.CheckChatInviteRequest(hash=hash)) if hasattr(invite_info, "chat") and invite_info.chat: # If we got chat info, we're already a member chat_title = getattr(invite_info.chat, "title", "Unknown Chat") return f"You are already a member of this chat: {chat_title}" except Exception as check_err: # This often fails if not a member - just continue pass # Join the chat using the hash try: result = await client(functions.messages.ImportChatInviteRequest(hash=hash)) if result and hasattr(result, "chats") and result.chats: chat_title = getattr(result.chats[0], "title", "Unknown Chat") return f"Successfully joined chat: {chat_title}" return f"Joined chat via invite hash." except Exception as join_err: err_str = str(join_err).lower() if "expired" in err_str: return "The invite hash has expired and is no longer valid." elif "invalid" in err_str: return "The invite hash is invalid or malformed." elif "already" in err_str and "participant" in err_str: return "You are already a member of this chat." elif "admin" in err_str: return "Cannot join this chat - requires admin approval." elif "too much" in err_str or "too many" in err_str: return "Cannot join this chat - it has reached maximum number of participants." else: raise # Re-raise to be caught by the outer exception handler except Exception as e: logger.exception(f"import_chat_invite failed (hash={hash})") return log_and_format_error("import_chat_invite", e, hash=hash) @mcp.tool() async def send_voice(chat_id: int, file_path: str) -> str: """ Send a voice message to a chat. File must be an OGG/OPUS voice note. Args: chat_id: The chat ID. file_path: Absolute path to the OGG/OPUS file. """ try: if not os.path.isfile(file_path): return f"File not found: {file_path}" if not os.access(file_path, os.R_OK): return f"File is not readable: {file_path}" mime, _ = mimetypes.guess_type(file_path) if not ( mime and ( mime == "audio/ogg" or file_path.lower().endswith(".ogg") or file_path.lower().endswith(".opus") ) ): return "Voice file must be .ogg or .opus format." entity = await client.get_entity(chat_id) await client.send_file(entity, file_path, voice_note=True) return f"Voice message sent to chat {chat_id}." except Exception as e: return log_and_format_error("send_voice", e, chat_id=chat_id, file_path=file_path) @mcp.tool() async def forward_message(from_chat_id: int, message_id: int, to_chat_id: int) -> str: """ Forward a message from one chat to another. """ try: from_entity = await client.get_entity(from_chat_id) to_entity = await client.get_entity(to_chat_id) await client.forward_messages(to_entity, message_id, from_entity) return f"Message {message_id} forwarded from {from_chat_id} to {to_chat_id}." except Exception as e: return log_and_format_error( "forward_message", e, from_chat_id=from_chat_id, message_id=message_id, to_chat_id=to_chat_id, ) @mcp.tool() async def edit_message(chat_id: int, message_id: int, new_text: str) -> str: """ Edit a message you sent. """ try: entity = await client.get_entity(chat_id) await client.edit_message(entity, message_id, new_text) return f"Message {message_id} edited." except Exception as e: return log_and_format_error( "edit_message", e, chat_id=chat_id, message_id=message_id, new_text=new_text ) @mcp.tool() async def delete_message(chat_id: int, message_id: int) -> str: """ Delete a message by ID. """ try: entity = await client.get_entity(chat_id) await client.delete_messages(entity, message_id) return f"Message {message_id} deleted." except Exception as e: return log_and_format_error("delete_message", e, chat_id=chat_id, message_id=message_id) @mcp.tool() async def pin_message(chat_id: int, message_id: int) -> str: """ Pin a message in a chat. """ try: entity = await client.get_entity(chat_id) await client.pin_message(entity, message_id) return f"Message {message_id} pinned in chat {chat_id}." except Exception as e: return log_and_format_error("pin_message", e, chat_id=chat_id, message_id=message_id) @mcp.tool() async def unpin_message(chat_id: int, message_id: int) -> str: """ Unpin a message in a chat. """ try: entity = await client.get_entity(chat_id) await client.unpin_message(entity, message_id) return f"Message {message_id} unpinned in chat {chat_id}." except Exception as e: return log_and_format_error("unpin_message", e, chat_id=chat_id, message_id=message_id) @mcp.tool() async def mark_as_read(chat_id: int) -> str: """ Mark all messages as read in a chat. """ try: entity = await client.get_entity(chat_id) await client.send_read_acknowledge(entity) return f"Marked all messages as read in chat {chat_id}." except Exception as e: return log_and_format_error("mark_as_read", e, chat_id=chat_id) @mcp.tool() async def reply_to_message(chat_id: int, message_id: int, text: str) -> str: """ Reply to a specific message in a chat. """ try: entity = await client.get_entity(chat_id) await client.send_message(entity, text, reply_to=message_id) return f"Replied to message {message_id} in chat {chat_id}." except Exception as e: return log_and_format_error( "reply_to_message", e, chat_id=chat_id, message_id=message_id, text=text ) @mcp.tool() async def get_media_info(chat_id: int, message_id: int) -> str: """ Get info about media in a message. Args: chat_id: The chat ID. message_id: The message ID. """ try: entity = await client.get_entity(chat_id) msg = await client.get_messages(entity, ids=message_id) if not msg or not msg.media: return "No media found in the specified message." return str(msg.media) except Exception as e: return log_and_format_error("get_media_info", e, chat_id=chat_id, message_id=message_id) @mcp.tool() async def search_public_chats(query: str) -> str: """ Search for public chats, channels, or bots by username or title. """ try: result = await client(functions.contacts.SearchRequest(q=query, limit=20)) return json.dumps([format_entity(u) for u in result.users], indent=2) except Exception as e: return log_and_format_error("search_public_chats", e, query=query) @mcp.tool() async def search_messages(chat_id: int, query: str, limit: int = 20) -> str: """ Search for messages in a chat by text. """ try: entity = await client.get_entity(chat_id) messages = await client.get_messages(entity, limit=limit, search=query) return "\n".join([f"ID: {m.id} | {m.date} | {m.message}" for m in messages]) except Exception as e: return log_and_format_error( "search_messages", e, chat_id=chat_id, query=query, limit=limit ) @mcp.tool() async def resolve_username(username: str) -> str: """ Resolve a username to a user or chat ID. """ try: result = await client(functions.contacts.ResolveUsernameRequest(username=username)) return str(result) except Exception as e: return log_and_format_error("resolve_username", e, username=username) @mcp.tool() async def mute_chat(chat_id: int) -> str: """ Mute notifications for a chat. """ try: from telethon.tl.types import InputPeerNotifySettings peer = await client.get_entity(chat_id) await client( functions.account.UpdateNotifySettingsRequest( peer=peer, settings=InputPeerNotifySettings(mute_until=2**31 - 1) ) ) return f"Chat {chat_id} muted." except (ImportError, AttributeError) as type_err: try: # Alternative approach directly using raw API peer = await client.get_input_entity(chat_id) await client( functions.account.UpdateNotifySettingsRequest( peer=peer, settings={ "mute_until": 2**31 - 1, # Far future "show_previews": False, "silent": True, }, ) ) return f"Chat {chat_id} muted (using alternative method)." except Exception as alt_e: logger.exception(f"mute_chat (alt method) failed (chat_id={chat_id})") return log_and_format_error("mute_chat", alt_e, chat_id=chat_id) except Exception as e: logger.exception(f"mute_chat failed (chat_id={chat_id})") return log_and_format_error("mute_chat", e, chat_id=chat_id) @mcp.tool() async def unmute_chat(chat_id: int) -> str: """ Unmute notifications for a chat. """ try: from telethon.tl.types import InputPeerNotifySettings peer = await client.get_entity(chat_id) await client( functions.account.UpdateNotifySettingsRequest( peer=peer, settings=InputPeerNotifySettings(mute_until=0) ) ) return f"Chat {chat_id} unmuted." except (ImportError, AttributeError) as type_err: try: # Alternative approach directly using raw API peer = await client.get_input_entity(chat_id) await client( functions.account.UpdateNotifySettingsRequest( peer=peer, settings={ "mute_until": 0, # Unmute (current time) "show_previews": True, "silent": False, }, ) ) return f"Chat {chat_id} unmuted (using alternative method)." except Exception as alt_e: logger.exception(f"unmute_chat (alt method) failed (chat_id={chat_id})") return log_and_format_error("unmute_chat", alt_e, chat_id=chat_id) except Exception as e: logger.exception(f"unmute_chat failed (chat_id={chat_id})") return log_and_format_error("unmute_chat", e, chat_id=chat_id) @mcp.tool() async def archive_chat(chat_id: int) -> str: """ Archive a chat. """ try: await client( functions.messages.ToggleDialogPinRequest( peer=await client.get_entity(chat_id), pinned=True ) ) return f"Chat {chat_id} archived." except Exception as e: return log_and_format_error("archive_chat", e, chat_id=chat_id) @mcp.tool() async def unarchive_chat(chat_id: int) -> str: """ Unarchive a chat. """ try: await client( functions.messages.ToggleDialogPinRequest( peer=await client.get_entity(chat_id), pinned=False ) ) return f"Chat {chat_id} unarchived." except Exception as e: return log_and_format_error("unarchive_chat", e, chat_id=chat_id) @mcp.tool() async def get_sticker_sets() -> str: """ Get all sticker sets. """ try: result = await client(functions.messages.GetAllStickersRequest(hash=0)) return json.dumps([s.title for s in result.sets], indent=2) except Exception as e: return log_and_format_error("get_sticker_sets", e) @mcp.tool() async def send_sticker(chat_id: int, file_path: str) -> str: """ Send a sticker to a chat. File must be a valid .webp sticker file. Args: chat_id: The chat ID. file_path: Absolute path to the .webp sticker file. """ try: if not os.path.isfile(file_path): return f"Sticker file not found: {file_path}" if not os.access(file_path, os.R_OK): return f"Sticker file is not readable: {file_path}" if not file_path.lower().endswith(".webp"): return "Sticker file must be a .webp file." entity = await client.get_entity(chat_id) await client.send_file(entity, file_path, force_document=False) return f"Sticker sent to chat {chat_id}." except Exception as e: return log_and_format_error("send_sticker", e, chat_id=chat_id, file_path=file_path) @mcp.tool() async def get_gif_search(query: str, limit: int = 10) -> str: """ Search for GIFs by query. Returns a list of Telegram document IDs (not file paths). Args: query: Search term for GIFs. limit: Max number of GIFs to return. """ try: # Try approach 1: SearchGifsRequest try: result = await client( functions.messages.SearchGifsRequest(q=query, offset_id=0, limit=limit) ) if not result.gifs: return "[]" return json.dumps( [g.document.id for g in result.gifs], indent=2, default=json_serializer ) except (AttributeError, ImportError): # Fallback approach: Use SearchRequest with GIF filter try: from telethon.tl.types import InputMessagesFilterGif result = await client( functions.messages.SearchRequest( peer="gif", q=query, filter=InputMessagesFilterGif(), min_date=None, max_date=None, offset_id=0, add_offset=0, limit=limit, max_id=0, min_id=0, hash=0, ) ) if not result or not hasattr(result, "messages") or not result.messages: return "[]" # Extract document IDs from any messages with media gif_ids = [] for msg in result.messages: if hasattr(msg, "media") and msg.media and hasattr(msg.media, "document"): gif_ids.append(msg.media.document.id) return json.dumps(gif_ids, default=json_serializer) except Exception as inner_e: # Last resort: Try to fetch from a public bot return f"Could not search GIFs using available methods: {inner_e}" except Exception as e: logger.exception(f"get_gif_search failed (query={query}, limit={limit})") return log_and_format_error("get_gif_search", e, query=query, limit=limit) @mcp.tool() async def send_gif(chat_id: int, gif_id: int) -> str: """ Send a GIF to a chat by Telegram GIF document ID (not a file path). Args: chat_id: The chat ID. gif_id: Telegram document ID for the GIF (from get_gif_search). """ try: if not isinstance(gif_id, int): return "gif_id must be a Telegram document ID (integer), not a file path. Use get_gif_search to find IDs." entity = await client.get_entity(chat_id) await client.send_file(entity, gif_id) return f"GIF sent to chat {chat_id}." except Exception as e: return log_and_format_error("send_gif", e, chat_id=chat_id, gif_id=gif_id) @mcp.tool() async def get_bot_info(bot_username: str) -> str: """ Get information about a bot by username. """ try: entity = await client.get_entity(bot_username) if not entity: return f"Bot with username {bot_username} not found." result = await client(functions.users.GetFullUserRequest(id=entity)) # Create a more structured, serializable response if hasattr(result, "to_dict"): # Use custom serializer to handle non-serializable types return json.dumps(result.to_dict(), indent=2, default=json_serializer) else: # Fallback if to_dict is not available info = { "bot_info": { "id": entity.id, "username": entity.username, "first_name": entity.first_name, "last_name": getattr(entity, "last_name", ""), "is_bot": getattr(entity, "bot", False), "verified": getattr(entity, "verified", False), } } if hasattr(result, "full_user") and hasattr(result.full_user, "about"): info["bot_info"]["about"] = result.full_user.about return json.dumps(info, indent=2) except Exception as e: logger.exception(f"get_bot_info failed (bot_username={bot_username})") return log_and_format_error("get_bot_info", e, bot_username=bot_username) @mcp.tool() async def set_bot_commands(bot_username: str, commands: list) -> str: """ Set bot commands for a bot you own. Note: This function can only be used if the Telegram client is a bot account. Regular user accounts cannot set bot commands. Args: bot_username: The username of the bot to set commands for. commands: List of command dictionaries with 'command' and 'description' keys. """ try: # First check if the current client is a bot me = await client.get_me() if not getattr(me, "bot", False): return "Error: This function can only be used by bot accounts. Your current Telegram account is a regular user account, not a bot." # Import required types from telethon.tl.types import BotCommand, BotCommandScopeDefault from telethon.tl.functions.bots import SetBotCommandsRequest # Create BotCommand objects from the command dictionaries bot_commands = [ BotCommand(command=c["command"], description=c["description"]) for c in commands ] # Get the bot entity bot = await client.get_entity(bot_username) # Set the commands with proper scope await client( SetBotCommandsRequest( scope=BotCommandScopeDefault(), lang_code="en", # Default language code commands=bot_commands, ) ) return f"Bot commands set for {bot_username}." except ImportError as ie: logger.exception(f"set_bot_commands failed - ImportError: {ie}") return log_and_format_error("set_bot_commands", ie) except Exception as e: logger.exception(f"set_bot_commands failed (bot_username={bot_username})") return log_and_format_error("set_bot_commands", e, bot_username=bot_username) @mcp.tool() async def get_history(chat_id: int, limit: int = 100) -> str: """ Get full chat history (up to limit). """ try: entity = await client.get_entity(chat_id) messages = await client.get_messages(entity, limit=limit) return "\n".join([f"ID: {m.id} | {m.date} | {m.message}" for m in messages]) except Exception as e: return log_and_format_error("get_history", e, chat_id=chat_id, limit=limit) @mcp.tool() async def get_user_photos(user_id: int, limit: int = 10) -> str: """ Get profile photos of a user. """ try: user = await client.get_entity(user_id) photos = await client( functions.photos.GetUserPhotosRequest(user_id=user, offset=0, max_id=0, limit=limit) ) return json.dumps([p.id for p in photos.photos], indent=2) except Exception as e: return log_and_format_error("get_user_photos", e, user_id=user_id, limit=limit) @mcp.tool() async def get_user_status(user_id: int) -> str: """ Get the online status of a user. """ try: user = await client.get_entity(user_id) return str(user.status) except Exception as e: return log_and_format_error("get_user_status", e, user_id=user_id) @mcp.tool() async def get_recent_actions(chat_id: int) -> str: """ Get recent admin actions (admin log) in a group or channel. """ try: result = await client( functions.channels.GetAdminLogRequest( channel=chat_id, q="", events_filter=None, admins=[], max_id=0, min_id=0, limit=20 ) ) if not result or not result.events: return "No recent admin actions found." # Use the custom serializer to handle datetime objects return json.dumps([e.to_dict() for e in result.events], indent=2, default=json_serializer) except Exception as e: logger.exception(f"get_recent_actions failed (chat_id={chat_id})") return log_and_format_error("get_recent_actions", e, chat_id=chat_id) @mcp.tool() async def get_pinned_messages(chat_id: int) -> str: """ Get all pinned messages in a chat. """ try: entity = await client.get_entity(chat_id) # Use correct filter based on Telethon version try: # Try newer Telethon approach from telethon.tl.types import InputMessagesFilterPinned messages = await client.get_messages(entity, filter=InputMessagesFilterPinned()) except (ImportError, AttributeError): # Fallback - try without filter and manually filter pinned all_messages = await client.get_messages(entity, limit=50) messages = [m for m in all_messages if getattr(m, "pinned", False)] if not messages: return "No pinned messages found in this chat." return "\n".join( [f"ID: {m.id} | {m.date} | {m.message or '[Media/No text]'}" for m in messages] ) except Exception as e: logger.exception(f"get_pinned_messages failed (chat_id={chat_id})") return log_and_format_error("get_pinned_messages", e, chat_id=chat_id) if __name__ == "__main__": nest_asyncio.apply() async def main() -> None: try: # Start the Telethon client non-interactively print("Starting Telegram client...") await client.start() print("Telegram client started. Running MCP server...") # Use the asynchronous entrypoint instead of mcp.run() await mcp.run_stdio_async() except Exception as e: print(f"Error starting client: {e}", file=sys.stderr) if isinstance(e, sqlite3.OperationalError) and "database is locked" in str(e): print( "Database lock detected. Please ensure no other instances are running.", file=sys.stderr, ) sys.exit(1) asyncio.run(main())

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/chigwell/telegram-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server