import os
import sys
import json
import time
import asyncio
import sqlite3
import logging
import mimetypes
from datetime import datetime, timedelta
from typing import List, Dict, Optional, Union, Any
# Third-party libraries
import nest_asyncio
from dotenv import load_dotenv
from mcp.server.fastmcp import FastMCP
from telethon import TelegramClient, functions, utils
from telethon.sessions import StringSession
from telethon.tl.types import (
User,
Chat,
Channel,
ChatAdminRights,
ChatBannedRights,
ChannelParticipantsKicked,
ChannelParticipantsAdmins,
InputChatPhoto,
InputChatUploadedPhoto,
InputChatPhotoEmpty,
InputPeerUser,
InputPeerChat,
InputPeerChannel,
)
import telethon.errors.rpcerrorlist
def json_serializer(obj):
"""Helper function to convert non-serializable objects for JSON serialization."""
if isinstance(obj, datetime):
return obj.isoformat()
if isinstance(obj, bytes):
return obj.decode("utf-8", errors="replace")
# Add other non-serializable types as needed
raise TypeError(f"Object of type {type(obj)} is not JSON serializable")
load_dotenv()
TELEGRAM_API_ID = int(os.getenv("TELEGRAM_API_ID"))
TELEGRAM_API_HASH = os.getenv("TELEGRAM_API_HASH")
TELEGRAM_SESSION_NAME = os.getenv("TELEGRAM_SESSION_NAME")
# Check if a string session exists in environment, otherwise use file-based session
SESSION_STRING = os.getenv("TELEGRAM_SESSION_STRING")
mcp = FastMCP("telegram")
if SESSION_STRING:
# Use the string session if available
client = TelegramClient(StringSession(SESSION_STRING), TELEGRAM_API_ID, TELEGRAM_API_HASH)
else:
# Use file-based session
client = TelegramClient(TELEGRAM_SESSION_NAME, TELEGRAM_API_ID, TELEGRAM_API_HASH)
# Setup robust logging with both file and console output
logger = logging.getLogger("telegram_mcp")
logger.setLevel(logging.ERROR) # Set to ERROR for production, INFO for debugging
# Create console handler
console_handler = logging.StreamHandler()
console_handler.setLevel(logging.ERROR) # Set to ERROR for production, INFO for debugging
# Create file handler with absolute path
script_dir = os.path.dirname(os.path.abspath(__file__))
log_file_path = os.path.join(script_dir, "mcp_errors.log")
try:
file_handler = logging.FileHandler(log_file_path, mode="a") # Append mode
file_handler.setLevel(logging.ERROR)
# Create formatter and add to handlers
formatter = logging.Formatter(
"%(asctime)s [%(levelname)s] %(name)s - %(message)s - %(filename)s:%(lineno)d"
)
console_handler.setFormatter(formatter)
file_handler.setFormatter(formatter)
# Add handlers to logger
logger.addHandler(console_handler)
logger.addHandler(file_handler)
logger.info(f"Logging initialized to {log_file_path}")
except Exception as log_error:
print(f"WARNING: Error setting up log file: {log_error}")
# Fallback to console-only logging
logger.addHandler(console_handler)
logger.error(f"Failed to set up log file handler: {log_error}")
# Error code prefix mapping for better error tracing
ERROR_PREFIXES = {
"chat": "CHAT",
"msg": "MSG",
"contact": "CONTACT",
"group": "GROUP",
"media": "MEDIA",
"profile": "PROFILE",
"auth": "AUTH",
"admin": "ADMIN",
}
def log_and_format_error(
function_name: str, error: Exception, prefix: str = None, **kwargs
) -> str:
"""
Centralized error handling function that logs the error and returns a formatted user-friendly message.
Args:
function_name: Name of the function where error occurred
error: The exception that was raised
prefix: Error code prefix (e.g., "CHAT", "MSG") - if None, will be derived from function_name
**kwargs: Additional context parameters to include in log
Returns:
A user-friendly error message with error code
"""
# Generate a consistent error code
if prefix is None:
# Try to derive prefix from function name
for key, value in ERROR_PREFIXES.items():
if key in function_name.lower():
prefix = value
break
if prefix is None:
prefix = "GEN" # Generic prefix if none matches
error_code = f"{prefix}-ERR-{abs(hash(function_name)) % 1000:03d}"
# Format the additional context parameters
context = ", ".join(f"{k}={v}" for k, v in kwargs.items())
# Log the full technical error
logger.exception(f"{function_name} failed ({context}): {error}")
# Return a user-friendly message
return f"An error occurred (code: {error_code}). Check mcp_errors.log for details."
def format_entity(entity) -> Dict[str, Any]:
"""Helper function to format entity information consistently."""
result = {"id": entity.id}
if hasattr(entity, "title"):
result["name"] = entity.title
result["type"] = "group" if isinstance(entity, Chat) else "channel"
elif hasattr(entity, "first_name"):
name_parts = []
if entity.first_name:
name_parts.append(entity.first_name)
if hasattr(entity, "last_name") and entity.last_name:
name_parts.append(entity.last_name)
result["name"] = " ".join(name_parts)
result["type"] = "user"
if hasattr(entity, "username") and entity.username:
result["username"] = entity.username
if hasattr(entity, "phone") and entity.phone:
result["phone"] = entity.phone
return result
def format_message(message) -> Dict[str, Any]:
"""Helper function to format message information consistently."""
result = {
"id": message.id,
"date": message.date.isoformat(),
"text": message.message or "",
}
if message.from_id:
result["from_id"] = utils.get_peer_id(message.from_id)
if message.media:
result["has_media"] = True
result["media_type"] = type(message.media).__name__
return result
@mcp.tool()
async def get_chats(page: int = 1, page_size: int = 20) -> str:
"""
Get a paginated list of chats.
Args:
page: Page number (1-indexed).
page_size: Number of chats per page.
"""
try:
dialogs = await client.get_dialogs()
start = (page - 1) * page_size
end = start + page_size
if start >= len(dialogs):
return "Page out of range."
chats = dialogs[start:end]
lines = []
for dialog in chats:
entity = dialog.entity
chat_id = entity.id
title = getattr(entity, "title", None) or getattr(entity, "first_name", "Unknown")
lines.append(f"Chat ID: {chat_id}, Title: {title}")
return "\n".join(lines)
except Exception as e:
return log_and_format_error("get_chats", e)
@mcp.tool()
async def get_messages(chat_id: int, page: int = 1, page_size: int = 20) -> str:
"""
Get paginated messages from a specific chat.
Args:
chat_id: The ID of the chat.
page: Page number (1-indexed).
page_size: Number of messages per page.
"""
try:
entity = await client.get_entity(chat_id)
offset = (page - 1) * page_size
messages = await client.get_messages(entity, limit=page_size, add_offset=offset)
if not messages:
return "No messages found for this page."
lines = []
for msg in messages:
lines.append(f"ID: {msg.id} | Date: {msg.date} | Message: {msg.message}")
return "\n".join(lines)
except Exception as e:
return log_and_format_error(
"get_messages", e, chat_id=chat_id, page=page, page_size=page_size
)
@mcp.tool()
async def send_message(chat_id: int, message: str) -> str:
"""
Send a message to a specific chat.
Args:
chat_id: The ID of the chat.
message: The message content to send.
"""
try:
entity = await client.get_entity(chat_id)
await client.send_message(entity, message)
return "Message sent successfully."
except Exception as e:
return log_and_format_error("send_message", e, chat_id=chat_id)
@mcp.tool()
async def list_contacts() -> str:
"""
List all contacts in your Telegram account.
"""
try:
result = await client(functions.contacts.GetContactsRequest(hash=0))
users = result.users
if not users:
return "No contacts found."
lines = []
for user in users:
name = f"{getattr(user, 'first_name', '')} {getattr(user, 'last_name', '')}".strip()
username = getattr(user, "username", "")
phone = getattr(user, "phone", "")
contact_info = f"ID: {user.id}, Name: {name}"
if username:
contact_info += f", Username: @{username}"
if phone:
contact_info += f", Phone: {phone}"
lines.append(contact_info)
return "\n".join(lines)
except Exception as e:
return log_and_format_error("list_contacts", e)
@mcp.tool()
async def search_contacts(query: str) -> str:
"""
Search for contacts by name, username, or phone number using Telethon's SearchRequest.
Args:
query: The search term to look for in contact names, usernames, or phone numbers.
"""
try:
result = await client(functions.contacts.SearchRequest(q=query, limit=50))
users = result.users
if not users:
return f"No contacts found matching '{query}'."
lines = []
for user in users:
name = f"{getattr(user, 'first_name', '')} {getattr(user, 'last_name', '')}".strip()
username = getattr(user, "username", "")
phone = getattr(user, "phone", "")
contact_info = f"ID: {user.id}, Name: {name}"
if username:
contact_info += f", Username: @{username}"
if phone:
contact_info += f", Phone: {phone}"
lines.append(contact_info)
return "\n".join(lines)
except Exception as e:
return log_and_format_error("search_contacts", e, query=query)
@mcp.tool()
async def get_contact_ids() -> str:
"""
Get all contact IDs in your Telegram account.
"""
try:
result = await client(functions.contacts.GetContactIDsRequest(hash=0))
if not result:
return "No contact IDs found."
return "Contact IDs: " + ", ".join(str(cid) for cid in result)
except Exception as e:
return log_and_format_error("get_contact_ids", e)
@mcp.tool()
async def list_messages(
chat_id: int,
limit: int = 20,
search_query: str = None,
from_date: str = None,
to_date: str = None,
) -> str:
"""
Retrieve messages with optional filters.
Args:
chat_id: The ID of the chat to get messages from.
limit: Maximum number of messages to retrieve.
search_query: Filter messages containing this text.
from_date: Filter messages starting from this date (format: YYYY-MM-DD).
to_date: Filter messages until this date (format: YYYY-MM-DD).
"""
try:
entity = await client.get_entity(chat_id)
# Parse date filters if provided
from_date_obj = None
to_date_obj = None
if from_date:
try:
from_date_obj = datetime.strptime(from_date, "%Y-%m-%d")
# Make it timezone aware by adding UTC timezone info
# Use datetime.timezone.utc for Python 3.9+ or import timezone directly for 3.13+
try:
# For Python 3.9+
from_date_obj = from_date_obj.replace(tzinfo=datetime.timezone.utc)
except AttributeError:
# For Python 3.13+
from datetime import timezone
from_date_obj = from_date_obj.replace(tzinfo=timezone.utc)
except ValueError:
return f"Invalid from_date format. Use YYYY-MM-DD."
if to_date:
try:
to_date_obj = datetime.strptime(to_date, "%Y-%m-%d")
# Set to end of day and make timezone aware
to_date_obj = to_date_obj + timedelta(days=1, microseconds=-1)
# Add timezone info
try:
to_date_obj = to_date_obj.replace(tzinfo=datetime.timezone.utc)
except AttributeError:
from datetime import timezone
to_date_obj = to_date_obj.replace(tzinfo=timezone.utc)
except ValueError:
return f"Invalid to_date format. Use YYYY-MM-DD."
# Prepare filter parameters
params = {}
if search_query:
params["search"] = search_query
messages = await client.get_messages(entity, limit=limit, **params)
# Apply date filters (Telethon doesn't support date filtering in get_messages directly)
if from_date_obj or to_date_obj:
filtered_messages = []
for msg in messages:
if from_date_obj and msg.date < from_date_obj:
continue
if to_date_obj and msg.date > to_date_obj:
continue
filtered_messages.append(msg)
messages = filtered_messages
if not messages:
return "No messages found matching the criteria."
lines = []
for msg in messages:
sender = ""
if msg.sender:
sender_name = getattr(msg.sender, "first_name", "") or getattr(
msg.sender, "title", "Unknown"
)
sender = f"{sender_name} | "
lines.append(
f"ID: {msg.id} | {sender}Date: {msg.date} | Message: {msg.message or '[Media/No text]'}"
)
return "\n".join(lines)
except Exception as e:
return log_and_format_error("list_messages", e, chat_id=chat_id)
@mcp.tool()
async def list_chats(chat_type: str = None, limit: int = 20) -> str:
"""
List available chats with metadata.
Args:
chat_type: Filter by chat type ('user', 'group', 'channel', or None for all)
limit: Maximum number of chats to retrieve.
"""
try:
dialogs = await client.get_dialogs(limit=limit)
results = []
for dialog in dialogs:
entity = dialog.entity
# Filter by type if requested
current_type = None
if isinstance(entity, User):
current_type = "user"
elif isinstance(entity, Chat):
current_type = "group"
elif isinstance(entity, Channel):
if getattr(entity, "broadcast", False):
current_type = "channel"
else:
current_type = "group" # Supergroup
if chat_type and current_type != chat_type.lower():
continue
# Format chat info
chat_info = f"Chat ID: {entity.id}"
if hasattr(entity, "title"):
chat_info += f", Title: {entity.title}"
elif hasattr(entity, "first_name"):
name = f"{entity.first_name}"
if hasattr(entity, "last_name") and entity.last_name:
name += f" {entity.last_name}"
chat_info += f", Name: {name}"
chat_info += f", Type: {current_type}"
if hasattr(entity, "username") and entity.username:
chat_info += f", Username: @{entity.username}"
# Add unread count if available
if hasattr(dialog, "unread_count") and dialog.unread_count > 0:
chat_info += f", Unread: {dialog.unread_count}"
results.append(chat_info)
if not results:
return f"No chats found matching the criteria."
return "\n".join(results)
except Exception as e:
return log_and_format_error("list_chats", e, chat_type=chat_type, limit=limit)
@mcp.tool()
async def get_chat(chat_id: int) -> str:
"""
Get detailed information about a specific chat.
Args:
chat_id: The ID of the chat.
"""
try:
entity = await client.get_entity(chat_id)
result = []
result.append(f"ID: {entity.id}")
is_channel = isinstance(entity, Channel)
is_chat = isinstance(entity, Chat)
is_user = isinstance(entity, User)
if hasattr(entity, "title"):
result.append(f"Title: {entity.title}")
chat_type = (
"Channel" if is_channel and getattr(entity, "broadcast", False) else "Group"
)
if is_channel and getattr(entity, "megagroup", False):
chat_type = "Supergroup"
elif is_chat:
chat_type = "Group (Basic)"
result.append(f"Type: {chat_type}")
if hasattr(entity, "username") and entity.username:
result.append(f"Username: @{entity.username}")
# Fetch participants count reliably
try:
participants_count = (await client.get_participants(entity, limit=0)).total
result.append(f"Participants: {participants_count}")
except Exception as pe:
result.append(f"Participants: Error fetching ({pe})")
elif is_user:
name = f"{entity.first_name}"
if entity.last_name:
name += f" {entity.last_name}"
result.append(f"Name: {name}")
result.append(f"Type: User")
if entity.username:
result.append(f"Username: @{entity.username}")
if entity.phone:
result.append(f"Phone: {entity.phone}")
result.append(f"Bot: {'Yes' if entity.bot else 'No'}")
result.append(f"Verified: {'Yes' if entity.verified else 'No'}")
# Get last activity if it's a dialog
try:
# Using get_dialogs might be slow if there are many dialogs
# Alternative: Get entity again via get_dialogs if needed for unread count
dialog = await client.get_dialogs(limit=1, offset_id=0, offset_peer=entity)
if dialog:
dialog = dialog[0]
result.append(f"Unread Messages: {dialog.unread_count}")
if dialog.message:
last_msg = dialog.message
sender_name = "Unknown"
if last_msg.sender:
sender_name = getattr(last_msg.sender, "first_name", "") or getattr(
last_msg.sender, "title", "Unknown"
)
if hasattr(last_msg.sender, "last_name") and last_msg.sender.last_name:
sender_name += f" {last_msg.sender.last_name}"
sender_name = sender_name.strip() or "Unknown"
result.append(f"Last Message: From {sender_name} at {last_msg.date}")
result.append(f"Message: {last_msg.message or '[Media/No text]'}")
except Exception as diag_ex:
logger.warning(f"Could not get dialog info for {chat_id}: {diag_ex}")
pass
return "\n".join(result)
except Exception as e:
return log_and_format_error("get_chat", e, chat_id=chat_id)
@mcp.tool()
async def get_direct_chat_by_contact(contact_query: str) -> str:
"""
Find a direct chat with a specific contact by name, username, or phone.
Args:
contact_query: Name, username, or phone number to search for.
"""
try:
# Fetch all contacts using the correct Telethon method
result = await client(functions.contacts.GetContactsRequest(hash=0))
contacts = result.users
found_contacts = []
for contact in contacts:
if not contact:
continue
name = (
f"{getattr(contact, 'first_name', '')} {getattr(contact, 'last_name', '')}".strip()
)
username = getattr(contact, "username", "")
phone = getattr(contact, "phone", "")
if (
contact_query.lower() in name.lower()
or (username and contact_query.lower() in username.lower())
or (phone and contact_query in phone)
):
found_contacts.append(contact)
if not found_contacts:
return f"No contacts found matching '{contact_query}'."
# If we found contacts, look for direct chats with them
results = []
dialogs = await client.get_dialogs()
for contact in found_contacts:
contact_name = (
f"{getattr(contact, 'first_name', '')} {getattr(contact, 'last_name', '')}".strip()
)
for dialog in dialogs:
if isinstance(dialog.entity, User) and dialog.entity.id == contact.id:
chat_info = f"Chat ID: {dialog.entity.id}, Contact: {contact_name}"
if getattr(contact, "username", ""):
chat_info += f", Username: @{contact.username}"
if dialog.unread_count:
chat_info += f", Unread: {dialog.unread_count}"
results.append(chat_info)
break
if not results:
found_names = ", ".join(
[f"{c.first_name} {c.last_name}".strip() for c in found_contacts]
)
return f"Found contacts: {found_names}, but no direct chats were found with them."
return "\n".join(results)
except Exception as e:
return log_and_format_error("get_direct_chat_by_contact", e, contact_query=contact_query)
@mcp.tool()
async def get_contact_chats(contact_id: int) -> str:
"""
List all chats involving a specific contact.
Args:
contact_id: The ID of the contact.
"""
try:
# Get contact info
contact = await client.get_entity(contact_id)
if not isinstance(contact, User):
return f"ID {contact_id} is not a user/contact."
contact_name = (
f"{getattr(contact, 'first_name', '')} {getattr(contact, 'last_name', '')}".strip()
)
# Find direct chat
direct_chat = None
dialogs = await client.get_dialogs()
results = []
# Look for direct chat
for dialog in dialogs:
if isinstance(dialog.entity, User) and dialog.entity.id == contact_id:
chat_info = f"Direct Chat ID: {dialog.entity.id}, Type: Private"
if dialog.unread_count:
chat_info += f", Unread: {dialog.unread_count}"
results.append(chat_info)
break
# Look for common groups/channels
common_chats = []
try:
common = await client.get_common_chats(contact)
for chat in common:
chat_type = "Channel" if getattr(chat, "broadcast", False) else "Group"
chat_info = f"Chat ID: {chat.id}, Title: {chat.title}, Type: {chat_type}"
results.append(chat_info)
except:
results.append("Could not retrieve common groups.")
if not results:
return f"No chats found with {contact_name} (ID: {contact_id})."
return f"Chats with {contact_name} (ID: {contact_id}):\n" + "\n".join(results)
except Exception as e:
return log_and_format_error("get_contact_chats", e, contact_id=contact_id)
@mcp.tool()
async def get_last_interaction(contact_id: int) -> str:
"""
Get the most recent message with a contact.
Args:
contact_id: The ID of the contact.
"""
try:
# Get contact info
contact = await client.get_entity(contact_id)
if not isinstance(contact, User):
return f"ID {contact_id} is not a user/contact."
contact_name = (
f"{getattr(contact, 'first_name', '')} {getattr(contact, 'last_name', '')}".strip()
)
# Get the last few messages
messages = await client.get_messages(contact, limit=5)
if not messages:
return f"No messages found with {contact_name} (ID: {contact_id})."
results = [f"Last interactions with {contact_name} (ID: {contact_id}):"]
for msg in messages:
sender = "You" if msg.out else contact_name
message_text = msg.message or "[Media/No text]"
results.append(f"Date: {msg.date}, From: {sender}, Message: {message_text}")
return "\n".join(results)
except Exception as e:
return log_and_format_error("get_last_interaction", e, contact_id=contact_id)
@mcp.tool()
async def get_message_context(chat_id: int, message_id: int, context_size: int = 3) -> str:
"""
Retrieve context around a specific message.
Args:
chat_id: The ID of the chat.
message_id: The ID of the central message.
context_size: Number of messages before and after to include.
"""
try:
chat = await client.get_entity(chat_id)
# Get messages around the specified message
messages_before = await client.get_messages(chat, limit=context_size, max_id=message_id)
central_message = await client.get_messages(chat, ids=message_id)
# Fix: get_messages(ids=...) returns a single Message, not a list
if central_message is not None and not isinstance(central_message, list):
central_message = [central_message]
elif central_message is None:
central_message = []
messages_after = await client.get_messages(
chat, limit=context_size, min_id=message_id, reverse=True
)
if not central_message:
return f"Message with ID {message_id} not found in chat {chat_id}."
# Combine messages in chronological order
all_messages = list(messages_before) + list(central_message) + list(messages_after)
all_messages.sort(key=lambda m: m.id)
results = [f"Context for message {message_id} in chat {chat_id}:"]
for msg in all_messages:
sender_name = "Unknown"
if msg.sender:
sender_name = getattr(msg.sender, "first_name", "") or getattr(
msg.sender, "title", "Unknown"
)
highlight = " [THIS MESSAGE]" if msg.id == message_id else ""
results.append(
f"ID: {msg.id} | {sender_name} | {msg.date}{highlight}\n{msg.message or '[Media/No text]'}\n"
)
return "\n".join(results)
except Exception as e:
return log_and_format_error(
"get_message_context",
e,
chat_id=chat_id,
message_id=message_id,
context_size=context_size,
)
@mcp.tool()
async def add_contact(phone: str, first_name: str, last_name: str = "") -> str:
"""
Add a new contact to your Telegram account.
Args:
phone: The phone number of the contact (with country code).
first_name: The contact's first name.
last_name: The contact's last name (optional).
"""
try:
# Try to import the required types first
from telethon.tl.types import InputPhoneContact
result = await client(
functions.contacts.ImportContactsRequest(
contacts=[
InputPhoneContact(
client_id=0, phone=phone, first_name=first_name, last_name=last_name
)
]
)
)
if result.imported:
return f"Contact {first_name} {last_name} added successfully."
else:
return f"Contact not added. Response: {str(result)}"
except (ImportError, AttributeError) as type_err:
# Try alternative approach using raw API
try:
result = await client(
functions.contacts.ImportContactsRequest(
contacts=[
{
"client_id": 0,
"phone": phone,
"first_name": first_name,
"last_name": last_name,
}
]
)
)
if hasattr(result, "imported") and result.imported:
return f"Contact {first_name} {last_name} added successfully (alt method)."
else:
return f"Contact not added. Alternative method response: {str(result)}"
except Exception as alt_e:
logger.exception(f"add_contact (alt method) failed (phone={phone})")
return log_and_format_error("add_contact", alt_e, phone=phone)
except Exception as e:
logger.exception(f"add_contact failed (phone={phone})")
return log_and_format_error("add_contact", e, phone=phone)
@mcp.tool()
async def delete_contact(user_id: int) -> str:
"""
Delete a contact by user ID.
Args:
user_id: The Telegram user ID of the contact to delete.
"""
try:
user = await client.get_entity(user_id)
await client(functions.contacts.DeleteContactsRequest(id=[user]))
return f"Contact with user ID {user_id} deleted."
except Exception as e:
return log_and_format_error("delete_contact", e, user_id=user_id)
@mcp.tool()
async def block_user(user_id: int) -> str:
"""
Block a user by user ID.
Args:
user_id: The Telegram user ID to block.
"""
try:
user = await client.get_entity(user_id)
await client(functions.contacts.BlockRequest(id=user))
return f"User {user_id} blocked."
except Exception as e:
return log_and_format_error("block_user", e, user_id=user_id)
@mcp.tool()
async def unblock_user(user_id: int) -> str:
"""
Unblock a user by user ID.
Args:
user_id: The Telegram user ID to unblock.
"""
try:
user = await client.get_entity(user_id)
await client(functions.contacts.UnblockRequest(id=user))
return f"User {user_id} unblocked."
except Exception as e:
return log_and_format_error("unblock_user", e, user_id=user_id)
@mcp.tool()
async def get_me() -> str:
"""
Get your own user information.
"""
try:
me = await client.get_me()
return json.dumps(format_entity(me), indent=2)
except Exception as e:
return log_and_format_error("get_me", e)
@mcp.tool()
async def create_group(title: str, user_ids: list) -> str:
"""
Create a new group or supergroup and add users.
Args:
title: Title for the new group
user_ids: List of user IDs to add to the group
"""
try:
# Convert user IDs to entities
users = []
for user_id in user_ids:
try:
user = await client.get_entity(user_id)
users.append(user)
except Exception as e:
logger.error(f"Failed to get entity for user ID {user_id}: {e}")
return f"Error: Could not find user with ID {user_id}"
if not users:
return "Error: No valid users provided"
# Create the group with the users
try:
# Create a new chat with selected users
result = await client(functions.messages.CreateChatRequest(users=users, title=title))
# Check what type of response we got
if hasattr(result, "chats") and result.chats:
created_chat = result.chats[0]
return f"Group created with ID: {created_chat.id}"
elif hasattr(result, "chat") and result.chat:
return f"Group created with ID: {result.chat.id}"
elif hasattr(result, "chat_id"):
return f"Group created with ID: {result.chat_id}"
else:
# If we can't determine the chat ID directly from the result
# Try to find it in recent dialogs
await asyncio.sleep(1) # Give Telegram a moment to register the new group
dialogs = await client.get_dialogs(limit=5) # Get recent dialogs
for dialog in dialogs:
if dialog.title == title:
return f"Group created with ID: {dialog.id}"
# If we still can't find it, at least return success
return f"Group created successfully. Please check your recent chats for '{title}'."
except Exception as create_err:
if "PEER_FLOOD" in str(create_err):
return "Error: Cannot create group due to Telegram limits. Try again later."
else:
raise # Let the outer exception handler catch it
except Exception as e:
logger.exception(f"create_group failed (title={title}, user_ids={user_ids})")
return log_and_format_error("create_group", e, title=title, user_ids=user_ids)
@mcp.tool()
async def invite_to_group(group_id: int, user_ids: list) -> str:
"""
Invite users to a group or channel.
Args:
group_id: The ID of the group/channel.
user_ids: List of user IDs to invite.
"""
try:
entity = await client.get_entity(group_id)
users_to_add = []
for user_id in user_ids:
try:
user = await client.get_entity(user_id)
users_to_add.append(user)
except ValueError as e:
return f"Error: User with ID {user_id} could not be found. {e}"
try:
result = await client(
functions.channels.InviteToChannelRequest(channel=entity, users=users_to_add)
)
invited_count = 0
if hasattr(result, "users") and result.users:
invited_count = len(result.users)
elif hasattr(result, "count"):
invited_count = result.count
return f"Successfully invited {invited_count} users to {entity.title}"
except telethon.errors.rpcerrorlist.UserNotMutualContactError:
return "Error: Cannot invite users who are not mutual contacts. Please ensure the users are in your contacts and have added you back."
except telethon.errors.rpcerrorlist.UserPrivacyRestrictedError:
return (
"Error: One or more users have privacy settings that prevent you from adding them."
)
except Exception as e:
return log_and_format_error("invite_to_group", e, group_id=group_id, user_ids=user_ids)
except Exception as e:
logger.error(
f"telegram_mcp invite_to_group failed (group_id={group_id}, user_ids={user_ids})",
exc_info=True,
)
return log_and_format_error("invite_to_group", e, group_id=group_id, user_ids=user_ids)
@mcp.tool()
async def leave_chat(chat_id: int) -> str:
"""
Leave a group or channel by chat ID.
Args:
chat_id: The chat ID to leave.
"""
try:
entity = await client.get_entity(chat_id)
# Check the entity type carefully
if isinstance(entity, Channel):
# Handle both channels and supergroups (which are also channels in Telegram)
try:
await client(functions.channels.LeaveChannelRequest(channel=entity))
chat_name = getattr(entity, "title", str(chat_id))
return f"Left channel/supergroup {chat_name} (ID: {chat_id})."
except Exception as chan_err:
return log_and_format_error("leave_chat", chan_err, chat_id=chat_id)
elif isinstance(entity, Chat):
# Traditional basic groups (not supergroups)
try:
# First try with InputPeerUser
me = await client.get_me(input_peer=True)
await client(
functions.messages.DeleteChatUserRequest(
chat_id=entity.id, user_id=me # Use the entity ID directly
)
)
chat_name = getattr(entity, "title", str(chat_id))
return f"Left basic group {chat_name} (ID: {chat_id})."
except Exception as chat_err:
# If the above fails, try the second approach
logger.warning(
f"First leave attempt failed: {chat_err}, trying alternative method"
)
try:
# Alternative approach - sometimes this works better
me_full = await client.get_me()
await client(
functions.messages.DeleteChatUserRequest(
chat_id=entity.id, user_id=me_full.id
)
)
chat_name = getattr(entity, "title", str(chat_id))
return f"Left basic group {chat_name} (ID: {chat_id})."
except Exception as alt_err:
return log_and_format_error("leave_chat", alt_err, chat_id=chat_id)
else:
# Cannot leave a user chat this way
entity_type = type(entity).__name__
return log_and_format_error(
"leave_chat",
Exception(
f"Cannot leave chat ID {chat_id} of type {entity_type}. This function is for groups and channels only."
),
chat_id=chat_id,
)
except Exception as e:
logger.exception(f"leave_chat failed (chat_id={chat_id})")
# Provide helpful hint for common errors
error_str = str(e).lower()
if "invalid" in error_str and "chat" in error_str:
return log_and_format_error(
"leave_chat",
Exception(
f"Error leaving chat: This appears to be a channel/supergroup. Please check the chat ID and try again."
),
chat_id=chat_id,
)
return log_and_format_error("leave_chat", e, chat_id=chat_id)
@mcp.tool()
async def get_participants(chat_id: int) -> str:
"""
List all participants in a group or channel.
Args:
chat_id: The group or channel ID.
"""
try:
participants = await client.get_participants(chat_id)
lines = [
f"ID: {p.id}, Name: {getattr(p, 'first_name', '')} {getattr(p, 'last_name', '')}"
for p in participants
]
return "\n".join(lines)
except Exception as e:
return log_and_format_error("get_participants", e, chat_id=chat_id)
@mcp.tool()
async def send_file(chat_id: int, file_path: str, caption: str = None) -> str:
"""
Send a file to a chat.
Args:
chat_id: The chat ID.
file_path: Absolute path to the file to send (must exist and be readable).
caption: Optional caption for the file.
"""
try:
if not os.path.isfile(file_path):
return f"File not found: {file_path}"
if not os.access(file_path, os.R_OK):
return f"File is not readable: {file_path}"
entity = await client.get_entity(chat_id)
await client.send_file(entity, file_path, caption=caption)
return f"File sent to chat {chat_id}."
except Exception as e:
return log_and_format_error(
"send_file", e, chat_id=chat_id, file_path=file_path, caption=caption
)
@mcp.tool()
async def download_media(chat_id: int, message_id: int, file_path: str) -> str:
"""
Download media from a message in a chat.
Args:
chat_id: The chat ID.
message_id: The message ID containing the media.
file_path: Absolute path to save the downloaded file (must be writable).
"""
try:
entity = await client.get_entity(chat_id)
msg = await client.get_messages(entity, ids=message_id)
if not msg or not msg.media:
return "No media found in the specified message."
# Check if directory is writable
dir_path = os.path.dirname(file_path) or "."
if not os.access(dir_path, os.W_OK):
return f"Directory not writable: {dir_path}"
await client.download_media(msg, file=file_path)
if not os.path.isfile(file_path):
return f"Download failed: file not created at {file_path}"
return f"Media downloaded to {file_path}."
except Exception as e:
return log_and_format_error(
"download_media", e, chat_id=chat_id, message_id=message_id, file_path=file_path
)
@mcp.tool()
async def update_profile(first_name: str = None, last_name: str = None, about: str = None) -> str:
"""
Update your profile information (name, bio).
"""
try:
await client(
functions.account.UpdateProfileRequest(
first_name=first_name, last_name=last_name, about=about
)
)
return "Profile updated."
except Exception as e:
return log_and_format_error(
"update_profile", e, first_name=first_name, last_name=last_name, about=about
)
@mcp.tool()
async def set_profile_photo(file_path: str) -> str:
"""
Set a new profile photo.
"""
try:
await client(
functions.photos.UploadProfilePhotoRequest(file=await client.upload_file(file_path))
)
return "Profile photo updated."
except Exception as e:
return log_and_format_error("set_profile_photo", e, file_path=file_path)
@mcp.tool()
async def delete_profile_photo() -> str:
"""
Delete your current profile photo.
"""
try:
photos = await client(
functions.photos.GetUserPhotosRequest(user_id="me", offset=0, max_id=0, limit=1)
)
if not photos.photos:
return "No profile photo to delete."
await client(functions.photos.DeletePhotosRequest(id=[photos.photos[0].id]))
return "Profile photo deleted."
except Exception as e:
return log_and_format_error("delete_profile_photo", e)
@mcp.tool()
async def get_privacy_settings() -> str:
"""
Get your privacy settings for last seen status.
"""
try:
# Import needed types directly
from telethon.tl.types import InputPrivacyKeyStatusTimestamp
try:
settings = await client(
functions.account.GetPrivacyRequest(key=InputPrivacyKeyStatusTimestamp())
)
return str(settings)
except TypeError as e:
if "TLObject was expected" in str(e):
return "Error: Privacy settings API call failed due to type mismatch. This is likely a version compatibility issue with Telethon."
else:
raise
except Exception as e:
logger.exception("get_privacy_settings failed")
return log_and_format_error("get_privacy_settings", e)
@mcp.tool()
async def set_privacy_settings(
key: str, allow_users: list = None, disallow_users: list = None
) -> str:
"""
Set privacy settings (e.g., last seen, phone, etc.).
Args:
key: The privacy setting to modify ('status' for last seen, 'phone', 'profile_photo', etc.)
allow_users: List of user IDs to allow
disallow_users: List of user IDs to disallow
"""
try:
# Import needed types
from telethon.tl.types import (
InputPrivacyKeyStatusTimestamp,
InputPrivacyKeyPhoneNumber,
InputPrivacyKeyProfilePhoto,
InputPrivacyValueAllowUsers,
InputPrivacyValueDisallowUsers,
InputPrivacyValueAllowAll,
InputPrivacyValueDisallowAll,
)
# Map the simplified keys to their corresponding input types
key_mapping = {
"status": InputPrivacyKeyStatusTimestamp,
"phone": InputPrivacyKeyPhoneNumber,
"profile_photo": InputPrivacyKeyProfilePhoto,
}
# Get the appropriate key class
if key not in key_mapping:
return f"Error: Unsupported privacy key '{key}'. Supported keys: {', '.join(key_mapping.keys())}"
privacy_key = key_mapping[key]()
# Prepare the rules
rules = []
# Process allow rules
if allow_users is None or len(allow_users) == 0:
# If no specific users to allow, allow everyone by default
rules.append(InputPrivacyValueAllowAll())
else:
# Convert user IDs to InputUser entities
try:
allow_entities = []
for user_id in allow_users:
try:
user = await client.get_entity(user_id)
allow_entities.append(user)
except Exception as user_err:
logger.warning(f"Could not get entity for user ID {user_id}: {user_err}")
if allow_entities:
rules.append(InputPrivacyValueAllowUsers(users=allow_entities))
except Exception as allow_err:
logger.error(f"Error processing allowed users: {allow_err}")
return log_and_format_error("set_privacy_settings", allow_err, key=key)
# Process disallow rules
if disallow_users and len(disallow_users) > 0:
try:
disallow_entities = []
for user_id in disallow_users:
try:
user = await client.get_entity(user_id)
disallow_entities.append(user)
except Exception as user_err:
logger.warning(f"Could not get entity for user ID {user_id}: {user_err}")
if disallow_entities:
rules.append(InputPrivacyValueDisallowUsers(users=disallow_entities))
except Exception as disallow_err:
logger.error(f"Error processing disallowed users: {disallow_err}")
return log_and_format_error("set_privacy_settings", disallow_err, key=key)
# Apply the privacy settings
try:
result = await client(
functions.account.SetPrivacyRequest(key=privacy_key, rules=rules)
)
return f"Privacy settings for {key} updated successfully."
except TypeError as type_err:
if "TLObject was expected" in str(type_err):
return "Error: Privacy settings API call failed due to type mismatch. This is likely a version compatibility issue with Telethon."
else:
raise
except Exception as e:
logger.exception(f"set_privacy_settings failed (key={key})")
return log_and_format_error("set_privacy_settings", e, key=key)
@mcp.tool()
async def import_contacts(contacts: list) -> str:
"""
Import a list of contacts. Each contact should be a dict with phone, first_name, last_name.
"""
try:
input_contacts = [
functions.contacts.InputPhoneContact(
client_id=i,
phone=c["phone"],
first_name=c["first_name"],
last_name=c.get("last_name", ""),
)
for i, c in enumerate(contacts)
]
result = await client(functions.contacts.ImportContactsRequest(contacts=input_contacts))
return f"Imported {len(result.imported)} contacts."
except Exception as e:
return log_and_format_error("import_contacts", e, contacts=contacts)
@mcp.tool()
async def export_contacts() -> str:
"""
Export all contacts as a JSON string.
"""
try:
result = await client(functions.contacts.GetContactsRequest(hash=0))
users = result.users
return json.dumps([format_entity(u) for u in users], indent=2)
except Exception as e:
return log_and_format_error("export_contacts", e)
@mcp.tool()
async def get_blocked_users() -> str:
"""
Get a list of blocked users.
"""
try:
result = await client(functions.contacts.GetBlockedRequest(offset=0, limit=100))
return json.dumps([format_entity(u) for u in result.users], indent=2)
except Exception as e:
return log_and_format_error("get_blocked_users", e)
@mcp.tool()
async def create_channel(title: str, about: str = "", megagroup: bool = False) -> str:
"""
Create a new channel or supergroup.
"""
try:
result = await client(
functions.channels.CreateChannelRequest(title=title, about=about, megagroup=megagroup)
)
return f"Channel '{title}' created with ID: {result.chats[0].id}"
except Exception as e:
return log_and_format_error(
"create_channel", e, title=title, about=about, megagroup=megagroup
)
@mcp.tool()
async def edit_chat_title(chat_id: int, title: str) -> str:
"""
Edit the title of a chat, group, or channel.
"""
try:
entity = await client.get_entity(chat_id)
if isinstance(entity, Channel):
await client(functions.channels.EditTitleRequest(channel=entity, title=title))
elif isinstance(entity, Chat):
await client(functions.messages.EditChatTitleRequest(chat_id=chat_id, title=title))
else:
return f"Cannot edit title for this entity type ({type(entity)})."
return f"Chat {chat_id} title updated to '{title}'."
except Exception as e:
logger.exception(f"edit_chat_title failed (chat_id={chat_id}, title='{title}')")
return log_and_format_error("edit_chat_title", e, chat_id=chat_id, title=title)
@mcp.tool()
async def edit_chat_photo(chat_id: int, file_path: str) -> str:
"""
Edit the photo of a chat, group, or channel. Requires a file path to an image.
"""
try:
if not os.path.isfile(file_path):
return f"Photo file not found: {file_path}"
if not os.access(file_path, os.R_OK):
return f"Photo file not readable: {file_path}"
entity = await client.get_entity(chat_id)
uploaded_file = await client.upload_file(file_path)
if isinstance(entity, Channel):
# For channels/supergroups, use EditPhotoRequest with InputChatUploadedPhoto
input_photo = InputChatUploadedPhoto(file=uploaded_file)
await client(functions.channels.EditPhotoRequest(channel=entity, photo=input_photo))
elif isinstance(entity, Chat):
# For basic groups, use EditChatPhotoRequest with InputChatUploadedPhoto
input_photo = InputChatUploadedPhoto(file=uploaded_file)
await client(
functions.messages.EditChatPhotoRequest(chat_id=chat_id, photo=input_photo)
)
else:
return f"Cannot edit photo for this entity type ({type(entity)})."
return f"Chat {chat_id} photo updated."
except Exception as e:
logger.exception(f"edit_chat_photo failed (chat_id={chat_id}, file_path='{file_path}')")
return log_and_format_error("edit_chat_photo", e, chat_id=chat_id, file_path=file_path)
@mcp.tool()
async def delete_chat_photo(chat_id: int) -> str:
"""
Delete the photo of a chat, group, or channel.
"""
try:
entity = await client.get_entity(chat_id)
if isinstance(entity, Channel):
# Use InputChatPhotoEmpty for channels/supergroups
await client(
functions.channels.EditPhotoRequest(channel=entity, photo=InputChatPhotoEmpty())
)
elif isinstance(entity, Chat):
# Use None (or InputChatPhotoEmpty) for basic groups
await client(
functions.messages.EditChatPhotoRequest(
chat_id=chat_id, photo=InputChatPhotoEmpty()
)
)
else:
return f"Cannot delete photo for this entity type ({type(entity)})."
return f"Chat {chat_id} photo deleted."
except Exception as e:
logger.exception(f"delete_chat_photo failed (chat_id={chat_id})")
return log_and_format_error("delete_chat_photo", e, chat_id=chat_id)
@mcp.tool()
async def promote_admin(group_id: int, user_id: int, rights: dict = None) -> str:
"""
Promote a user to admin in a group/channel.
Args:
group_id: ID of the group/channel
user_id: User ID to promote
rights: Admin rights to give (optional)
"""
try:
chat = await client.get_entity(group_id)
user = await client.get_entity(user_id)
# Set default admin rights if not provided
if not rights:
rights = {
"change_info": True,
"post_messages": True,
"edit_messages": True,
"delete_messages": True,
"ban_users": True,
"invite_users": True,
"pin_messages": True,
"add_admins": False,
"anonymous": False,
"manage_call": True,
"other": True,
}
admin_rights = ChatAdminRights(
change_info=rights.get("change_info", True),
post_messages=rights.get("post_messages", True),
edit_messages=rights.get("edit_messages", True),
delete_messages=rights.get("delete_messages", True),
ban_users=rights.get("ban_users", True),
invite_users=rights.get("invite_users", True),
pin_messages=rights.get("pin_messages", True),
add_admins=rights.get("add_admins", False),
anonymous=rights.get("anonymous", False),
manage_call=rights.get("manage_call", True),
other=rights.get("other", True),
)
try:
result = await client(
functions.channels.EditAdminRequest(
channel=chat, user_id=user, admin_rights=admin_rights, rank="Admin"
)
)
return f"Successfully promoted user {user_id} to admin in {chat.title}"
except telethon.errors.rpcerrorlist.UserNotMutualContactError:
return "Error: Cannot promote users who are not mutual contacts. Please ensure the user is in your contacts and has added you back."
except Exception as e:
return log_and_format_error("promote_admin", e, group_id=group_id, user_id=user_id)
except Exception as e:
logger.error(
f"telegram_mcp promote_admin failed (group_id={group_id}, user_id={user_id})",
exc_info=True,
)
return log_and_format_error("promote_admin", e, group_id=group_id, user_id=user_id)
@mcp.tool()
async def demote_admin(group_id: int, user_id: int) -> str:
"""
Demote a user from admin in a group/channel.
Args:
group_id: ID of the group/channel
user_id: User ID to demote
"""
try:
chat = await client.get_entity(group_id)
user = await client.get_entity(user_id)
# Create empty admin rights (regular user)
admin_rights = ChatAdminRights(
change_info=False,
post_messages=False,
edit_messages=False,
delete_messages=False,
ban_users=False,
invite_users=False,
pin_messages=False,
add_admins=False,
anonymous=False,
manage_call=False,
other=False,
)
try:
result = await client(
functions.channels.EditAdminRequest(
channel=chat, user_id=user, admin_rights=admin_rights, rank=""
)
)
return f"Successfully demoted user {user_id} from admin in {chat.title}"
except telethon.errors.rpcerrorlist.UserNotMutualContactError:
return "Error: Cannot modify admin status of users who are not mutual contacts. Please ensure the user is in your contacts and has added you back."
except Exception as e:
return log_and_format_error("demote_admin", e, group_id=group_id, user_id=user_id)
except Exception as e:
logger.error(
f"telegram_mcp demote_admin failed (group_id={group_id}, user_id={user_id})",
exc_info=True,
)
return log_and_format_error("demote_admin", e, group_id=group_id, user_id=user_id)
@mcp.tool()
async def ban_user(chat_id: int, user_id: int) -> str:
"""
Ban a user from a group or channel.
Args:
chat_id: ID of the group/channel
user_id: User ID to ban
"""
try:
chat = await client.get_entity(chat_id)
user = await client.get_entity(user_id)
# Create banned rights (all restrictions enabled)
banned_rights = ChatBannedRights(
until_date=None, # Ban forever
view_messages=True,
send_messages=True,
send_media=True,
send_stickers=True,
send_gifs=True,
send_games=True,
send_inline=True,
embed_links=True,
send_polls=True,
change_info=True,
invite_users=True,
pin_messages=True,
)
try:
await client(
functions.channels.EditBannedRequest(
channel=chat, participant=user, banned_rights=banned_rights
)
)
return f"User {user_id} banned from chat {chat.title} (ID: {chat_id})."
except telethon.errors.rpcerrorlist.UserNotMutualContactError:
return "Error: Cannot ban users who are not mutual contacts. Please ensure the user is in your contacts and has added you back."
except Exception as e:
return log_and_format_error("ban_user", e, chat_id=chat_id, user_id=user_id)
except Exception as e:
logger.exception(f"ban_user failed (chat_id={chat_id}, user_id={user_id})")
return log_and_format_error("ban_user", e, chat_id=chat_id, user_id=user_id)
@mcp.tool()
async def unban_user(chat_id: int, user_id: int) -> str:
"""
Unban a user from a group or channel.
Args:
chat_id: ID of the group/channel
user_id: User ID to unban
"""
try:
chat = await client.get_entity(chat_id)
user = await client.get_entity(user_id)
# Create unbanned rights (no restrictions)
unbanned_rights = ChatBannedRights(
until_date=None,
view_messages=False,
send_messages=False,
send_media=False,
send_stickers=False,
send_gifs=False,
send_games=False,
send_inline=False,
embed_links=False,
send_polls=False,
change_info=False,
invite_users=False,
pin_messages=False,
)
try:
await client(
functions.channels.EditBannedRequest(
channel=chat, participant=user, banned_rights=unbanned_rights
)
)
return f"User {user_id} unbanned from chat {chat.title} (ID: {chat_id})."
except telethon.errors.rpcerrorlist.UserNotMutualContactError:
return "Error: Cannot modify status of users who are not mutual contacts. Please ensure the user is in your contacts and has added you back."
except Exception as e:
return log_and_format_error("unban_user", e, chat_id=chat_id, user_id=user_id)
except Exception as e:
logger.exception(f"unban_user failed (chat_id={chat_id}, user_id={user_id})")
return log_and_format_error("unban_user", e, chat_id=chat_id, user_id=user_id)
@mcp.tool()
async def get_admins(chat_id: int) -> str:
"""
Get all admins in a group or channel.
"""
try:
# Fix: Use the correct filter type ChannelParticipantsAdmins
participants = await client.get_participants(chat_id, filter=ChannelParticipantsAdmins())
lines = [
f"ID: {p.id}, Name: {getattr(p, 'first_name', '')} {getattr(p, 'last_name', '')}".strip()
for p in participants
]
return "\n".join(lines) if lines else "No admins found."
except Exception as e:
logger.exception(f"get_admins failed (chat_id={chat_id})")
return log_and_format_error("get_admins", e, chat_id=chat_id)
@mcp.tool()
async def get_banned_users(chat_id: int) -> str:
"""
Get all banned users in a group or channel.
"""
try:
# Fix: Use the correct filter type ChannelParticipantsKicked
participants = await client.get_participants(
chat_id, filter=ChannelParticipantsKicked(q="")
)
lines = [
f"ID: {p.id}, Name: {getattr(p, 'first_name', '')} {getattr(p, 'last_name', '')}".strip()
for p in participants
]
return "\n".join(lines) if lines else "No banned users found."
except Exception as e:
logger.exception(f"get_banned_users failed (chat_id={chat_id})")
return log_and_format_error("get_banned_users", e, chat_id=chat_id)
@mcp.tool()
async def get_invite_link(chat_id: int) -> str:
"""
Get the invite link for a group or channel.
"""
try:
entity = await client.get_entity(chat_id)
# Try using ExportChatInviteRequest first
try:
from telethon.tl import functions
result = await client(functions.messages.ExportChatInviteRequest(peer=entity))
return result.link
except AttributeError:
# If the function doesn't exist in the current Telethon version
logger.warning("ExportChatInviteRequest not available, using alternative method")
except Exception as e1:
# If that fails, log and try alternative approach
logger.warning(f"ExportChatInviteRequest failed: {e1}")
# Alternative approach using client.export_chat_invite_link
try:
invite_link = await client.export_chat_invite_link(entity)
return invite_link
except Exception as e2:
logger.warning(f"export_chat_invite_link failed: {e2}")
# Last resort: Try directly fetching chat info
try:
if isinstance(entity, (Chat, Channel)):
full_chat = await client(functions.messages.GetFullChatRequest(chat_id=entity.id))
if hasattr(full_chat, "full_chat") and hasattr(full_chat.full_chat, "invite_link"):
return full_chat.full_chat.invite_link or "No invite link available."
except Exception as e3:
logger.warning(f"GetFullChatRequest failed: {e3}")
return "Could not retrieve invite link for this chat."
except Exception as e:
logger.exception(f"get_invite_link failed (chat_id={chat_id})")
return log_and_format_error("get_invite_link", e, chat_id=chat_id)
@mcp.tool()
async def join_chat_by_link(link: str) -> str:
"""
Join a chat by invite link.
"""
try:
# Extract the hash from the invite link
if "/" in link:
hash_part = link.split("/")[-1]
if hash_part.startswith("+"):
hash_part = hash_part[1:] # Remove the '+' if present
else:
hash_part = link
# Try checking the invite before joining
try:
from telethon.errors import (
InviteHashExpiredError,
InviteHashInvalidError,
UserAlreadyParticipantError,
ChatAdminRequiredError,
UsersTooMuchError,
)
# Try to check invite info first (will often fail if not a member)
invite_info = await client(functions.messages.CheckChatInviteRequest(hash=hash_part))
if hasattr(invite_info, "chat") and invite_info.chat:
# If we got chat info, we're already a member
chat_title = getattr(invite_info.chat, "title", "Unknown Chat")
return f"You are already a member of this chat: {chat_title}"
except Exception as check_err:
# This often fails if not a member - just continue
pass
# Join the chat using the hash
try:
result = await client(functions.messages.ImportChatInviteRequest(hash=hash_part))
if result and hasattr(result, "chats") and result.chats:
chat_title = getattr(result.chats[0], "title", "Unknown Chat")
return f"Successfully joined chat: {chat_title}"
return f"Joined chat via invite hash."
except Exception as join_err:
err_str = str(join_err).lower()
if "expired" in err_str:
return "The invite hash has expired and is no longer valid."
elif "invalid" in err_str:
return "The invite hash is invalid or malformed."
elif "already" in err_str and "participant" in err_str:
return "You are already a member of this chat."
elif "admin" in err_str:
return "Cannot join this chat - requires admin approval."
elif "too much" in err_str or "too many" in err_str:
return "Cannot join this chat - it has reached maximum number of participants."
else:
raise # Re-raise to be caught by the outer exception handler
except Exception as e:
logger.exception(f"join_chat_by_link failed (link={link})")
return log_and_format_error("join_chat_by_link", e, link=link)
@mcp.tool()
async def export_chat_invite(chat_id: int) -> str:
"""
Export a chat invite link.
"""
try:
entity = await client.get_entity(chat_id)
# Try using ExportChatInviteRequest first
try:
from telethon.tl import functions
result = await client(functions.messages.ExportChatInviteRequest(peer=entity))
return result.link
except AttributeError:
# If the function doesn't exist in the current Telethon version
logger.warning("ExportChatInviteRequest not available, using alternative method")
except Exception as e1:
# If that fails, log and try alternative approach
logger.warning(f"ExportChatInviteRequest failed: {e1}")
# Alternative approach using client.export_chat_invite_link
try:
invite_link = await client.export_chat_invite_link(entity)
return invite_link
except Exception as e2:
logger.warning(f"export_chat_invite_link failed: {e2}")
return log_and_format_error("export_chat_invite", e2, chat_id=chat_id)
except Exception as e:
logger.exception(f"export_chat_invite failed (chat_id={chat_id})")
return log_and_format_error("export_chat_invite", e, chat_id=chat_id)
@mcp.tool()
async def import_chat_invite(hash: str) -> str:
"""
Import a chat invite by hash.
"""
try:
# Remove any prefixes like '+' if present
if hash.startswith("+"):
hash = hash[1:]
# Try checking the invite before joining
try:
from telethon.errors import (
InviteHashExpiredError,
InviteHashInvalidError,
UserAlreadyParticipantError,
ChatAdminRequiredError,
UsersTooMuchError,
)
# Try to check invite info first (will often fail if not a member)
invite_info = await client(functions.messages.CheckChatInviteRequest(hash=hash))
if hasattr(invite_info, "chat") and invite_info.chat:
# If we got chat info, we're already a member
chat_title = getattr(invite_info.chat, "title", "Unknown Chat")
return f"You are already a member of this chat: {chat_title}"
except Exception as check_err:
# This often fails if not a member - just continue
pass
# Join the chat using the hash
try:
result = await client(functions.messages.ImportChatInviteRequest(hash=hash))
if result and hasattr(result, "chats") and result.chats:
chat_title = getattr(result.chats[0], "title", "Unknown Chat")
return f"Successfully joined chat: {chat_title}"
return f"Joined chat via invite hash."
except Exception as join_err:
err_str = str(join_err).lower()
if "expired" in err_str:
return "The invite hash has expired and is no longer valid."
elif "invalid" in err_str:
return "The invite hash is invalid or malformed."
elif "already" in err_str and "participant" in err_str:
return "You are already a member of this chat."
elif "admin" in err_str:
return "Cannot join this chat - requires admin approval."
elif "too much" in err_str or "too many" in err_str:
return "Cannot join this chat - it has reached maximum number of participants."
else:
raise # Re-raise to be caught by the outer exception handler
except Exception as e:
logger.exception(f"import_chat_invite failed (hash={hash})")
return log_and_format_error("import_chat_invite", e, hash=hash)
@mcp.tool()
async def send_voice(chat_id: int, file_path: str) -> str:
"""
Send a voice message to a chat. File must be an OGG/OPUS voice note.
Args:
chat_id: The chat ID.
file_path: Absolute path to the OGG/OPUS file.
"""
try:
if not os.path.isfile(file_path):
return f"File not found: {file_path}"
if not os.access(file_path, os.R_OK):
return f"File is not readable: {file_path}"
mime, _ = mimetypes.guess_type(file_path)
if not (
mime
and (
mime == "audio/ogg"
or file_path.lower().endswith(".ogg")
or file_path.lower().endswith(".opus")
)
):
return "Voice file must be .ogg or .opus format."
entity = await client.get_entity(chat_id)
await client.send_file(entity, file_path, voice_note=True)
return f"Voice message sent to chat {chat_id}."
except Exception as e:
return log_and_format_error("send_voice", e, chat_id=chat_id, file_path=file_path)
@mcp.tool()
async def forward_message(from_chat_id: int, message_id: int, to_chat_id: int) -> str:
"""
Forward a message from one chat to another.
"""
try:
from_entity = await client.get_entity(from_chat_id)
to_entity = await client.get_entity(to_chat_id)
await client.forward_messages(to_entity, message_id, from_entity)
return f"Message {message_id} forwarded from {from_chat_id} to {to_chat_id}."
except Exception as e:
return log_and_format_error(
"forward_message",
e,
from_chat_id=from_chat_id,
message_id=message_id,
to_chat_id=to_chat_id,
)
@mcp.tool()
async def edit_message(chat_id: int, message_id: int, new_text: str) -> str:
"""
Edit a message you sent.
"""
try:
entity = await client.get_entity(chat_id)
await client.edit_message(entity, message_id, new_text)
return f"Message {message_id} edited."
except Exception as e:
return log_and_format_error(
"edit_message", e, chat_id=chat_id, message_id=message_id, new_text=new_text
)
@mcp.tool()
async def delete_message(chat_id: int, message_id: int) -> str:
"""
Delete a message by ID.
"""
try:
entity = await client.get_entity(chat_id)
await client.delete_messages(entity, message_id)
return f"Message {message_id} deleted."
except Exception as e:
return log_and_format_error("delete_message", e, chat_id=chat_id, message_id=message_id)
@mcp.tool()
async def pin_message(chat_id: int, message_id: int) -> str:
"""
Pin a message in a chat.
"""
try:
entity = await client.get_entity(chat_id)
await client.pin_message(entity, message_id)
return f"Message {message_id} pinned in chat {chat_id}."
except Exception as e:
return log_and_format_error("pin_message", e, chat_id=chat_id, message_id=message_id)
@mcp.tool()
async def unpin_message(chat_id: int, message_id: int) -> str:
"""
Unpin a message in a chat.
"""
try:
entity = await client.get_entity(chat_id)
await client.unpin_message(entity, message_id)
return f"Message {message_id} unpinned in chat {chat_id}."
except Exception as e:
return log_and_format_error("unpin_message", e, chat_id=chat_id, message_id=message_id)
@mcp.tool()
async def mark_as_read(chat_id: int) -> str:
"""
Mark all messages as read in a chat.
"""
try:
entity = await client.get_entity(chat_id)
await client.send_read_acknowledge(entity)
return f"Marked all messages as read in chat {chat_id}."
except Exception as e:
return log_and_format_error("mark_as_read", e, chat_id=chat_id)
@mcp.tool()
async def reply_to_message(chat_id: int, message_id: int, text: str) -> str:
"""
Reply to a specific message in a chat.
"""
try:
entity = await client.get_entity(chat_id)
await client.send_message(entity, text, reply_to=message_id)
return f"Replied to message {message_id} in chat {chat_id}."
except Exception as e:
return log_and_format_error(
"reply_to_message", e, chat_id=chat_id, message_id=message_id, text=text
)
@mcp.tool()
async def get_media_info(chat_id: int, message_id: int) -> str:
"""
Get info about media in a message.
Args:
chat_id: The chat ID.
message_id: The message ID.
"""
try:
entity = await client.get_entity(chat_id)
msg = await client.get_messages(entity, ids=message_id)
if not msg or not msg.media:
return "No media found in the specified message."
return str(msg.media)
except Exception as e:
return log_and_format_error("get_media_info", e, chat_id=chat_id, message_id=message_id)
@mcp.tool()
async def search_public_chats(query: str) -> str:
"""
Search for public chats, channels, or bots by username or title.
"""
try:
result = await client(functions.contacts.SearchRequest(q=query, limit=20))
return json.dumps([format_entity(u) for u in result.users], indent=2)
except Exception as e:
return log_and_format_error("search_public_chats", e, query=query)
@mcp.tool()
async def search_messages(chat_id: int, query: str, limit: int = 20) -> str:
"""
Search for messages in a chat by text.
"""
try:
entity = await client.get_entity(chat_id)
messages = await client.get_messages(entity, limit=limit, search=query)
return "\n".join([f"ID: {m.id} | {m.date} | {m.message}" for m in messages])
except Exception as e:
return log_and_format_error(
"search_messages", e, chat_id=chat_id, query=query, limit=limit
)
@mcp.tool()
async def resolve_username(username: str) -> str:
"""
Resolve a username to a user or chat ID.
"""
try:
result = await client(functions.contacts.ResolveUsernameRequest(username=username))
return str(result)
except Exception as e:
return log_and_format_error("resolve_username", e, username=username)
@mcp.tool()
async def mute_chat(chat_id: int) -> str:
"""
Mute notifications for a chat.
"""
try:
from telethon.tl.types import InputPeerNotifySettings
peer = await client.get_entity(chat_id)
await client(
functions.account.UpdateNotifySettingsRequest(
peer=peer, settings=InputPeerNotifySettings(mute_until=2**31 - 1)
)
)
return f"Chat {chat_id} muted."
except (ImportError, AttributeError) as type_err:
try:
# Alternative approach directly using raw API
peer = await client.get_input_entity(chat_id)
await client(
functions.account.UpdateNotifySettingsRequest(
peer=peer,
settings={
"mute_until": 2**31 - 1, # Far future
"show_previews": False,
"silent": True,
},
)
)
return f"Chat {chat_id} muted (using alternative method)."
except Exception as alt_e:
logger.exception(f"mute_chat (alt method) failed (chat_id={chat_id})")
return log_and_format_error("mute_chat", alt_e, chat_id=chat_id)
except Exception as e:
logger.exception(f"mute_chat failed (chat_id={chat_id})")
return log_and_format_error("mute_chat", e, chat_id=chat_id)
@mcp.tool()
async def unmute_chat(chat_id: int) -> str:
"""
Unmute notifications for a chat.
"""
try:
from telethon.tl.types import InputPeerNotifySettings
peer = await client.get_entity(chat_id)
await client(
functions.account.UpdateNotifySettingsRequest(
peer=peer, settings=InputPeerNotifySettings(mute_until=0)
)
)
return f"Chat {chat_id} unmuted."
except (ImportError, AttributeError) as type_err:
try:
# Alternative approach directly using raw API
peer = await client.get_input_entity(chat_id)
await client(
functions.account.UpdateNotifySettingsRequest(
peer=peer,
settings={
"mute_until": 0, # Unmute (current time)
"show_previews": True,
"silent": False,
},
)
)
return f"Chat {chat_id} unmuted (using alternative method)."
except Exception as alt_e:
logger.exception(f"unmute_chat (alt method) failed (chat_id={chat_id})")
return log_and_format_error("unmute_chat", alt_e, chat_id=chat_id)
except Exception as e:
logger.exception(f"unmute_chat failed (chat_id={chat_id})")
return log_and_format_error("unmute_chat", e, chat_id=chat_id)
@mcp.tool()
async def archive_chat(chat_id: int) -> str:
"""
Archive a chat.
"""
try:
await client(
functions.messages.ToggleDialogPinRequest(
peer=await client.get_entity(chat_id), pinned=True
)
)
return f"Chat {chat_id} archived."
except Exception as e:
return log_and_format_error("archive_chat", e, chat_id=chat_id)
@mcp.tool()
async def unarchive_chat(chat_id: int) -> str:
"""
Unarchive a chat.
"""
try:
await client(
functions.messages.ToggleDialogPinRequest(
peer=await client.get_entity(chat_id), pinned=False
)
)
return f"Chat {chat_id} unarchived."
except Exception as e:
return log_and_format_error("unarchive_chat", e, chat_id=chat_id)
@mcp.tool()
async def get_sticker_sets() -> str:
"""
Get all sticker sets.
"""
try:
result = await client(functions.messages.GetAllStickersRequest(hash=0))
return json.dumps([s.title for s in result.sets], indent=2)
except Exception as e:
return log_and_format_error("get_sticker_sets", e)
@mcp.tool()
async def send_sticker(chat_id: int, file_path: str) -> str:
"""
Send a sticker to a chat. File must be a valid .webp sticker file.
Args:
chat_id: The chat ID.
file_path: Absolute path to the .webp sticker file.
"""
try:
if not os.path.isfile(file_path):
return f"Sticker file not found: {file_path}"
if not os.access(file_path, os.R_OK):
return f"Sticker file is not readable: {file_path}"
if not file_path.lower().endswith(".webp"):
return "Sticker file must be a .webp file."
entity = await client.get_entity(chat_id)
await client.send_file(entity, file_path, force_document=False)
return f"Sticker sent to chat {chat_id}."
except Exception as e:
return log_and_format_error("send_sticker", e, chat_id=chat_id, file_path=file_path)
@mcp.tool()
async def get_gif_search(query: str, limit: int = 10) -> str:
"""
Search for GIFs by query. Returns a list of Telegram document IDs (not file paths).
Args:
query: Search term for GIFs.
limit: Max number of GIFs to return.
"""
try:
# Try approach 1: SearchGifsRequest
try:
result = await client(
functions.messages.SearchGifsRequest(q=query, offset_id=0, limit=limit)
)
if not result.gifs:
return "[]"
return json.dumps(
[g.document.id for g in result.gifs], indent=2, default=json_serializer
)
except (AttributeError, ImportError):
# Fallback approach: Use SearchRequest with GIF filter
try:
from telethon.tl.types import InputMessagesFilterGif
result = await client(
functions.messages.SearchRequest(
peer="gif",
q=query,
filter=InputMessagesFilterGif(),
min_date=None,
max_date=None,
offset_id=0,
add_offset=0,
limit=limit,
max_id=0,
min_id=0,
hash=0,
)
)
if not result or not hasattr(result, "messages") or not result.messages:
return "[]"
# Extract document IDs from any messages with media
gif_ids = []
for msg in result.messages:
if hasattr(msg, "media") and msg.media and hasattr(msg.media, "document"):
gif_ids.append(msg.media.document.id)
return json.dumps(gif_ids, default=json_serializer)
except Exception as inner_e:
# Last resort: Try to fetch from a public bot
return f"Could not search GIFs using available methods: {inner_e}"
except Exception as e:
logger.exception(f"get_gif_search failed (query={query}, limit={limit})")
return log_and_format_error("get_gif_search", e, query=query, limit=limit)
@mcp.tool()
async def send_gif(chat_id: int, gif_id: int) -> str:
"""
Send a GIF to a chat by Telegram GIF document ID (not a file path).
Args:
chat_id: The chat ID.
gif_id: Telegram document ID for the GIF (from get_gif_search).
"""
try:
if not isinstance(gif_id, int):
return "gif_id must be a Telegram document ID (integer), not a file path. Use get_gif_search to find IDs."
entity = await client.get_entity(chat_id)
await client.send_file(entity, gif_id)
return f"GIF sent to chat {chat_id}."
except Exception as e:
return log_and_format_error("send_gif", e, chat_id=chat_id, gif_id=gif_id)
@mcp.tool()
async def get_bot_info(bot_username: str) -> str:
"""
Get information about a bot by username.
"""
try:
entity = await client.get_entity(bot_username)
if not entity:
return f"Bot with username {bot_username} not found."
result = await client(functions.users.GetFullUserRequest(id=entity))
# Create a more structured, serializable response
if hasattr(result, "to_dict"):
# Use custom serializer to handle non-serializable types
return json.dumps(result.to_dict(), indent=2, default=json_serializer)
else:
# Fallback if to_dict is not available
info = {
"bot_info": {
"id": entity.id,
"username": entity.username,
"first_name": entity.first_name,
"last_name": getattr(entity, "last_name", ""),
"is_bot": getattr(entity, "bot", False),
"verified": getattr(entity, "verified", False),
}
}
if hasattr(result, "full_user") and hasattr(result.full_user, "about"):
info["bot_info"]["about"] = result.full_user.about
return json.dumps(info, indent=2)
except Exception as e:
logger.exception(f"get_bot_info failed (bot_username={bot_username})")
return log_and_format_error("get_bot_info", e, bot_username=bot_username)
@mcp.tool()
async def set_bot_commands(bot_username: str, commands: list) -> str:
"""
Set bot commands for a bot you own.
Note: This function can only be used if the Telegram client is a bot account.
Regular user accounts cannot set bot commands.
Args:
bot_username: The username of the bot to set commands for.
commands: List of command dictionaries with 'command' and 'description' keys.
"""
try:
# First check if the current client is a bot
me = await client.get_me()
if not getattr(me, "bot", False):
return "Error: This function can only be used by bot accounts. Your current Telegram account is a regular user account, not a bot."
# Import required types
from telethon.tl.types import BotCommand, BotCommandScopeDefault
from telethon.tl.functions.bots import SetBotCommandsRequest
# Create BotCommand objects from the command dictionaries
bot_commands = [
BotCommand(command=c["command"], description=c["description"]) for c in commands
]
# Get the bot entity
bot = await client.get_entity(bot_username)
# Set the commands with proper scope
await client(
SetBotCommandsRequest(
scope=BotCommandScopeDefault(),
lang_code="en", # Default language code
commands=bot_commands,
)
)
return f"Bot commands set for {bot_username}."
except ImportError as ie:
logger.exception(f"set_bot_commands failed - ImportError: {ie}")
return log_and_format_error("set_bot_commands", ie)
except Exception as e:
logger.exception(f"set_bot_commands failed (bot_username={bot_username})")
return log_and_format_error("set_bot_commands", e, bot_username=bot_username)
@mcp.tool()
async def get_history(chat_id: int, limit: int = 100) -> str:
"""
Get full chat history (up to limit).
"""
try:
entity = await client.get_entity(chat_id)
messages = await client.get_messages(entity, limit=limit)
return "\n".join([f"ID: {m.id} | {m.date} | {m.message}" for m in messages])
except Exception as e:
return log_and_format_error("get_history", e, chat_id=chat_id, limit=limit)
@mcp.tool()
async def get_user_photos(user_id: int, limit: int = 10) -> str:
"""
Get profile photos of a user.
"""
try:
user = await client.get_entity(user_id)
photos = await client(
functions.photos.GetUserPhotosRequest(user_id=user, offset=0, max_id=0, limit=limit)
)
return json.dumps([p.id for p in photos.photos], indent=2)
except Exception as e:
return log_and_format_error("get_user_photos", e, user_id=user_id, limit=limit)
@mcp.tool()
async def get_user_status(user_id: int) -> str:
"""
Get the online status of a user.
"""
try:
user = await client.get_entity(user_id)
return str(user.status)
except Exception as e:
return log_and_format_error("get_user_status", e, user_id=user_id)
@mcp.tool()
async def get_recent_actions(chat_id: int) -> str:
"""
Get recent admin actions (admin log) in a group or channel.
"""
try:
result = await client(
functions.channels.GetAdminLogRequest(
channel=chat_id, q="", events_filter=None, admins=[], max_id=0, min_id=0, limit=20
)
)
if not result or not result.events:
return "No recent admin actions found."
# Use the custom serializer to handle datetime objects
return json.dumps([e.to_dict() for e in result.events], indent=2, default=json_serializer)
except Exception as e:
logger.exception(f"get_recent_actions failed (chat_id={chat_id})")
return log_and_format_error("get_recent_actions", e, chat_id=chat_id)
@mcp.tool()
async def get_pinned_messages(chat_id: int) -> str:
"""
Get all pinned messages in a chat.
"""
try:
entity = await client.get_entity(chat_id)
# Use correct filter based on Telethon version
try:
# Try newer Telethon approach
from telethon.tl.types import InputMessagesFilterPinned
messages = await client.get_messages(entity, filter=InputMessagesFilterPinned())
except (ImportError, AttributeError):
# Fallback - try without filter and manually filter pinned
all_messages = await client.get_messages(entity, limit=50)
messages = [m for m in all_messages if getattr(m, "pinned", False)]
if not messages:
return "No pinned messages found in this chat."
return "\n".join(
[f"ID: {m.id} | {m.date} | {m.message or '[Media/No text]'}" for m in messages]
)
except Exception as e:
logger.exception(f"get_pinned_messages failed (chat_id={chat_id})")
return log_and_format_error("get_pinned_messages", e, chat_id=chat_id)
if __name__ == "__main__":
nest_asyncio.apply()
async def main() -> None:
try:
# Start the Telethon client non-interactively
print("Starting Telegram client...")
await client.start()
print("Telegram client started. Running MCP server...")
# Use the asynchronous entrypoint instead of mcp.run()
await mcp.run_stdio_async()
except Exception as e:
print(f"Error starting client: {e}", file=sys.stderr)
if isinstance(e, sqlite3.OperationalError) and "database is locked" in str(e):
print(
"Database lock detected. Please ensure no other instances are running.",
file=sys.stderr,
)
sys.exit(1)
asyncio.run(main())