import os
import sys
import json
import time
import asyncio
import sqlite3
import logging
import mimetypes
from datetime import datetime, timedelta
from enum import Enum
from typing import List, Dict, Optional, Union, Any
# Third-party libraries
import nest_asyncio
from dotenv import load_dotenv
from mcp.server.fastmcp import FastMCP
from telethon import TelegramClient, functions, utils
from telethon.sessions import StringSession
from telethon.tl.types import (
User,
Chat,
Channel,
ChatAdminRights,
ChatBannedRights,
ChannelParticipantsKicked,
ChannelParticipantsAdmins,
InputChatPhoto,
InputChatUploadedPhoto,
InputChatPhotoEmpty,
InputPeerUser,
InputPeerChat,
InputPeerChannel,
)
import telethon.errors.rpcerrorlist
def json_serializer(obj):
"""Helper function to convert non-serializable objects for JSON serialization."""
if isinstance(obj, datetime):
return obj.isoformat()
if isinstance(obj, bytes):
return obj.decode("utf-8", errors="replace")
# Add other non-serializable types as needed
raise TypeError(f"Object of type {type(obj)} is not JSON serializable")
load_dotenv()
TELEGRAM_API_ID = int(os.getenv("TELEGRAM_API_ID"))
TELEGRAM_API_HASH = os.getenv("TELEGRAM_API_HASH")
TELEGRAM_SESSION_NAME = os.getenv("TELEGRAM_SESSION_NAME")
# Check if a string session exists in environment, otherwise use file-based session
SESSION_STRING = os.getenv("TELEGRAM_SESSION_STRING")
mcp = FastMCP("telegram")
if SESSION_STRING:
# Use the string session if available
client = TelegramClient(StringSession(SESSION_STRING), TELEGRAM_API_ID, TELEGRAM_API_HASH)
else:
# Use file-based session
client = TelegramClient(TELEGRAM_SESSION_NAME, TELEGRAM_API_ID, TELEGRAM_API_HASH)
# Setup robust logging with both file and console output
logger = logging.getLogger("telegram_mcp")
logger.setLevel(logging.ERROR) # Set to ERROR for production, INFO for debugging
# Create console handler
console_handler = logging.StreamHandler()
console_handler.setLevel(logging.ERROR) # Set to ERROR for production, INFO for debugging
# Create file handler with absolute path
script_dir = os.path.dirname(os.path.abspath(__file__))
log_file_path = os.path.join(script_dir, "mcp_errors.log")
try:
file_handler = logging.FileHandler(log_file_path, mode="a") # Append mode
file_handler.setLevel(logging.ERROR)
# Create formatter and add to handlers
formatter = logging.Formatter(
"%(asctime)s [%(levelname)s] %(name)s - %(message)s - %(filename)s:%(lineno)d"
)
console_handler.setFormatter(formatter)
file_handler.setFormatter(formatter)
# Add handlers to logger
logger.addHandler(console_handler)
logger.addHandler(file_handler)
logger.info(f"Logging initialized to {log_file_path}")
except Exception as log_error:
print(f"WARNING: Error setting up log file: {log_error}")
# Fallback to console-only logging
logger.addHandler(console_handler)
logger.error(f"Failed to set up log file handler: {log_error}")
# Error code prefix mapping for better error tracing
class ErrorCategory(str, Enum):
CHAT = "CHAT"
MSG = "MSG"
CONTACT = "CONTACT"
GROUP = "GROUP"
MEDIA = "MEDIA"
PROFILE = "PROFILE"
AUTH = "AUTH"
ADMIN = "ADMIN"
def log_and_format_error(
function_name: str,
error: Exception,
prefix: Optional[ErrorCategory] = None,
**kwargs,
) -> str:
"""
Centralized error handling function.
Logs an error and returns a formatted, user-friendly message.
Args:
function_name: Name of the function where the error occurred.
error: The exception that was raised.
prefix: Error code prefix (e.g., "CHAT", "MSG").
If None, it will be derived from the function_name.
**kwargs: Additional context parameters to include in the log.
Returns:
A user-friendly error message with an error code.
"""
# Generate a consistent error code
if prefix is None:
# Try to derive prefix from function name
for category in ErrorCategory:
if category.name.lower() in function_name.lower():
prefix = category
break
prefix_str = prefix.value if prefix else "GEN"
error_code = f"{prefix_str}-ERR-{abs(hash(function_name)) % 1000:03d}"
# Format the additional context parameters
context = ", ".join(f"{k}={v}" for k, v in kwargs.items())
# Log the full technical error
logger.exception(f"{function_name} failed ({context}): {error}")
# Return a user-friendly message
return f"An error occurred (code: {error_code}). " f"Check mcp_errors.log for details."
def format_entity(entity) -> Dict[str, Any]:
"""Helper function to format entity information consistently."""
result = {"id": entity.id}
if hasattr(entity, "title"):
result["name"] = entity.title
result["type"] = "group" if isinstance(entity, Chat) else "channel"
elif hasattr(entity, "first_name"):
name_parts = []
if entity.first_name:
name_parts.append(entity.first_name)
if hasattr(entity, "last_name") and entity.last_name:
name_parts.append(entity.last_name)
result["name"] = " ".join(name_parts)
result["type"] = "user"
if hasattr(entity, "username") and entity.username:
result["username"] = entity.username
if hasattr(entity, "phone") and entity.phone:
result["phone"] = entity.phone
return result
def format_message(message) -> Dict[str, Any]:
"""Helper function to format message information consistently."""
result = {
"id": message.id,
"date": message.date.isoformat(),
"text": message.message or "",
}
if message.from_id:
result["from_id"] = utils.get_peer_id(message.from_id)
if message.media:
result["has_media"] = True
result["media_type"] = type(message.media).__name__
return result
def get_sender_name(message) -> str:
"""Helper function to get sender name from a message."""
if not message.sender:
return "Unknown"
# Check for group/channel title first
if hasattr(message.sender, "title") and message.sender.title:
return message.sender.title
elif hasattr(message.sender, "first_name"):
# User sender
first_name = getattr(message.sender, "first_name", "") or ""
last_name = getattr(message.sender, "last_name", "") or ""
full_name = f"{first_name} {last_name}".strip()
return full_name if full_name else "Unknown"
else:
return "Unknown"
@mcp.tool()
async def get_chats(page: int = 1, page_size: int = 20) -> str:
"""
Get a paginated list of chats.
Args:
page: Page number (1-indexed).
page_size: Number of chats per page.
"""
try:
dialogs = await client.get_dialogs()
start = (page - 1) * page_size
end = start + page_size
if start >= len(dialogs):
return "Page out of range."
chats = dialogs[start:end]
lines = []
for dialog in chats:
entity = dialog.entity
chat_id = entity.id
title = getattr(entity, "title", None) or getattr(entity, "first_name", "Unknown")
lines.append(f"Chat ID: {chat_id}, Title: {title}")
return "\n".join(lines)
except Exception as e:
return log_and_format_error("get_chats", e)
@mcp.tool()
async def get_messages(chat_id: int, page: int = 1, page_size: int = 20) -> str:
"""
Get paginated messages from a specific chat.
Args:
chat_id: The ID of the chat.
page: Page number (1-indexed).
page_size: Number of messages per page.
"""
try:
entity = await client.get_entity(chat_id)
offset = (page - 1) * page_size
messages = await client.get_messages(entity, limit=page_size, add_offset=offset)
if not messages:
return "No messages found for this page."
lines = []
for msg in messages:
sender_name = get_sender_name(msg)
reply_info = ""
if msg.reply_to and msg.reply_to.reply_to_msg_id:
reply_info = f" | reply to {msg.reply_to.reply_to_msg_id}"
lines.append(
f"ID: {msg.id} | {sender_name} | Date: {msg.date}{reply_info} | Message: {msg.message}"
)
return "\n".join(lines)
except Exception as e:
return log_and_format_error(
"get_messages", e, chat_id=chat_id, page=page, page_size=page_size
)
@mcp.tool()
async def send_message(chat_id: int, message: str) -> str:
"""
Send a message to a specific chat.
Args:
chat_id: The ID of the chat.
message: The message content to send.
"""
try:
entity = await client.get_entity(chat_id)
await client.send_message(entity, message)
return "Message sent successfully."
except Exception as e:
return log_and_format_error("send_message", e, chat_id=chat_id)
@mcp.tool()
async def list_contacts() -> str:
"""
List all contacts in your Telegram account.
"""
try:
result = await client(functions.contacts.GetContactsRequest(hash=0))
users = result.users
if not users:
return "No contacts found."
lines = []
for user in users:
name = f"{getattr(user, 'first_name', '')} {getattr(user, 'last_name', '')}".strip()
username = getattr(user, "username", "")
phone = getattr(user, "phone", "")
contact_info = f"ID: {user.id}, Name: {name}"
if username:
contact_info += f", Username: @{username}"
if phone:
contact_info += f", Phone: {phone}"
lines.append(contact_info)
return "\n".join(lines)
except Exception as e:
return log_and_format_error("list_contacts", e)
@mcp.tool()
async def search_contacts(query: str) -> str:
"""
Search for contacts by name, username, or phone number using Telethon's SearchRequest.
Args:
query: The search term to look for in contact names, usernames, or phone numbers.
"""
try:
result = await client(functions.contacts.SearchRequest(q=query, limit=50))
users = result.users
if not users:
return f"No contacts found matching '{query}'."
lines = []
for user in users:
name = f"{getattr(user, 'first_name', '')} {getattr(user, 'last_name', '')}".strip()
username = getattr(user, "username", "")
phone = getattr(user, "phone", "")
contact_info = f"ID: {user.id}, Name: {name}"
if username:
contact_info += f", Username: @{username}"
if phone:
contact_info += f", Phone: {phone}"
lines.append(contact_info)
return "\n".join(lines)
except Exception as e:
return log_and_format_error("search_contacts", e, query=query)
@mcp.tool()
async def get_contact_ids() -> str:
"""
Get all contact IDs in your Telegram account.
"""
try:
result = await client(functions.contacts.GetContactIDsRequest(hash=0))
if not result:
return "No contact IDs found."
return "Contact IDs: " + ", ".join(str(cid) for cid in result)
except Exception as e:
return log_and_format_error("get_contact_ids", e)
@mcp.tool()
async def list_messages(
chat_id: int,
limit: int = 20,
search_query: str = None,
from_date: str = None,
to_date: str = None,
) -> str:
"""
Retrieve messages with optional filters.
Args:
chat_id: The ID of the chat to get messages from.
limit: Maximum number of messages to retrieve.
search_query: Filter messages containing this text.
from_date: Filter messages starting from this date (format: YYYY-MM-DD).
to_date: Filter messages until this date (format: YYYY-MM-DD).
"""
try:
entity = await client.get_entity(chat_id)
# Parse date filters if provided
from_date_obj = None
to_date_obj = None
if from_date:
try:
from_date_obj = datetime.strptime(from_date, "%Y-%m-%d")
# Make it timezone aware by adding UTC timezone info
# Use datetime.timezone.utc for Python 3.9+ or import timezone directly for 3.13+
try:
# For Python 3.9+
from_date_obj = from_date_obj.replace(tzinfo=datetime.timezone.utc)
except AttributeError:
# For Python 3.13+
from datetime import timezone
from_date_obj = from_date_obj.replace(tzinfo=timezone.utc)
except ValueError:
return f"Invalid from_date format. Use YYYY-MM-DD."
if to_date:
try:
to_date_obj = datetime.strptime(to_date, "%Y-%m-%d")
# Set to end of day and make timezone aware
to_date_obj = to_date_obj + timedelta(days=1, microseconds=-1)
# Add timezone info
try:
to_date_obj = to_date_obj.replace(tzinfo=datetime.timezone.utc)
except AttributeError:
from datetime import timezone
to_date_obj = to_date_obj.replace(tzinfo=timezone.utc)
except ValueError:
return f"Invalid to_date format. Use YYYY-MM-DD."
# Prepare filter parameters
params = {}
if search_query:
params["search"] = search_query
messages = await client.get_messages(entity, limit=limit, **params)
# Apply date filters (Telethon doesn't support date filtering in get_messages directly)
if from_date_obj or to_date_obj:
filtered_messages = []
for msg in messages:
if from_date_obj and msg.date < from_date_obj:
continue
if to_date_obj and msg.date > to_date_obj:
continue
filtered_messages.append(msg)
messages = filtered_messages
if not messages:
return "No messages found matching the criteria."
lines = []
for msg in messages:
sender_name = get_sender_name(msg)
message_text = msg.message or "[Media/No text]"
reply_info = ""
if msg.reply_to and msg.reply_to.reply_to_msg_id:
reply_info = f" | reply to {msg.reply_to.reply_to_msg_id}"
lines.append(
f"ID: {msg.id} | {sender_name} | Date: {msg.date}{reply_info} | Message: {message_text}"
)
return "\n".join(lines)
except Exception as e:
return log_and_format_error("list_messages", e, chat_id=chat_id)
@mcp.tool()
async def list_chats(chat_type: str = None, limit: int = 20) -> str:
"""
List available chats with metadata.
Args:
chat_type: Filter by chat type ('user', 'group', 'channel', or None for all)
limit: Maximum number of chats to retrieve.
"""
try:
dialogs = await client.get_dialogs(limit=limit)
results = []
for dialog in dialogs:
entity = dialog.entity
# Filter by type if requested
current_type = None
if isinstance(entity, User):
current_type = "user"
elif isinstance(entity, Chat):
current_type = "group"
elif isinstance(entity, Channel):
if getattr(entity, "broadcast", False):
current_type = "channel"
else:
current_type = "group" # Supergroup
if chat_type and current_type != chat_type.lower():
continue
# Format chat info
chat_info = f"Chat ID: {entity.id}"
if hasattr(entity, "title"):
chat_info += f", Title: {entity.title}"
elif hasattr(entity, "first_name"):
name = f"{entity.first_name}"
if hasattr(entity, "last_name") and entity.last_name:
name += f" {entity.last_name}"
chat_info += f", Name: {name}"
chat_info += f", Type: {current_type}"
if hasattr(entity, "username") and entity.username:
chat_info += f", Username: @{entity.username}"
# Add unread count if available
if hasattr(dialog, "unread_count") and dialog.unread_count > 0:
chat_info += f", Unread: {dialog.unread_count}"
results.append(chat_info)
if not results:
return f"No chats found matching the criteria."
return "\n".join(results)
except Exception as e:
return log_and_format_error("list_chats", e, chat_type=chat_type, limit=limit)
@mcp.tool()
async def get_chat(chat_id: int) -> str:
"""
Get detailed information about a specific chat.
Args:
chat_id: The ID of the chat.
"""
try:
entity = await client.get_entity(chat_id)
result = []
result.append(f"ID: {entity.id}")
is_channel = isinstance(entity, Channel)
is_chat = isinstance(entity, Chat)
is_user = isinstance(entity, User)
if hasattr(entity, "title"):
result.append(f"Title: {entity.title}")
chat_type = (
"Channel" if is_channel and getattr(entity, "broadcast", False) else "Group"
)
if is_channel and getattr(entity, "megagroup", False):
chat_type = "Supergroup"
elif is_chat:
chat_type = "Group (Basic)"
result.append(f"Type: {chat_type}")
if hasattr(entity, "username") and entity.username:
result.append(f"Username: @{entity.username}")
# Fetch participants count reliably
try:
participants_count = (await client.get_participants(entity, limit=0)).total
result.append(f"Participants: {participants_count}")
except Exception as pe:
result.append(f"Participants: Error fetching ({pe})")
elif is_user:
name = f"{entity.first_name}"
if entity.last_name:
name += f" {entity.last_name}"
result.append(f"Name: {name}")
result.append(f"Type: User")
if entity.username:
result.append(f"Username: @{entity.username}")
if entity.phone:
result.append(f"Phone: {entity.phone}")
result.append(f"Bot: {'Yes' if entity.bot else 'No'}")
result.append(f"Verified: {'Yes' if entity.verified else 'No'}")
# Get last activity if it's a dialog
try:
# Using get_dialogs might be slow if there are many dialogs
# Alternative: Get entity again via get_dialogs if needed for unread count
dialog = await client.get_dialogs(limit=1, offset_id=0, offset_peer=entity)
if dialog:
dialog = dialog[0]
result.append(f"Unread Messages: {dialog.unread_count}")
if dialog.message:
last_msg = dialog.message
sender_name = "Unknown"
if last_msg.sender:
sender_name = getattr(last_msg.sender, "first_name", "") or getattr(
last_msg.sender, "title", "Unknown"
)
if hasattr(last_msg.sender, "last_name") and last_msg.sender.last_name:
sender_name += f" {last_msg.sender.last_name}"
sender_name = sender_name.strip() or "Unknown"
result.append(f"Last Message: From {sender_name} at {last_msg.date}")
result.append(f"Message: {last_msg.message or '[Media/No text]'}")
except Exception as diag_ex:
logger.warning(f"Could not get dialog info for {chat_id}: {diag_ex}")
pass
return "\n".join(result)
except Exception as e:
return log_and_format_error("get_chat", e, chat_id=chat_id)
@mcp.tool()
async def get_direct_chat_by_contact(contact_query: str) -> str:
"""
Find a direct chat with a specific contact by name, username, or phone.
Args:
contact_query: Name, username, or phone number to search for.
"""
try:
# Fetch all contacts using the correct Telethon method
result = await client(functions.contacts.GetContactsRequest(hash=0))
contacts = result.users
found_contacts = []
for contact in contacts:
if not contact:
continue
name = (
f"{getattr(contact, 'first_name', '')} {getattr(contact, 'last_name', '')}".strip()
)
username = getattr(contact, "username", "")
phone = getattr(contact, "phone", "")
if (
contact_query.lower() in name.lower()
or (username and contact_query.lower() in username.lower())
or (phone and contact_query in phone)
):
found_contacts.append(contact)
if not found_contacts:
return f"No contacts found matching '{contact_query}'."
# If we found contacts, look for direct chats with them
results = []
dialogs = await client.get_dialogs()
for contact in found_contacts:
contact_name = (
f"{getattr(contact, 'first_name', '')} {getattr(contact, 'last_name', '')}".strip()
)
for dialog in dialogs:
if isinstance(dialog.entity, User) and dialog.entity.id == contact.id:
chat_info = f"Chat ID: {dialog.entity.id}, Contact: {contact_name}"
if getattr(contact, "username", ""):
chat_info += f", Username: @{contact.username}"
if dialog.unread_count:
chat_info += f", Unread: {dialog.unread_count}"
results.append(chat_info)
break
if not results:
found_names = ", ".join(
[f"{c.first_name} {c.last_name}".strip() for c in found_contacts]
)
return f"Found contacts: {found_names}, but no direct chats were found with them."
return "\n".join(results)
except Exception as e:
return log_and_format_error("get_direct_chat_by_contact", e, contact_query=contact_query)
@mcp.tool()
async def get_contact_chats(contact_id: int) -> str:
"""
List all chats involving a specific contact.
Args:
contact_id: The ID of the contact.
"""
try:
# Get contact info
contact = await client.get_entity(contact_id)
if not isinstance(contact, User):
return f"ID {contact_id} is not a user/contact."
contact_name = (
f"{getattr(contact, 'first_name', '')} {getattr(contact, 'last_name', '')}".strip()
)
# Find direct chat
direct_chat = None
dialogs = await client.get_dialogs()
results = []
# Look for direct chat
for dialog in dialogs:
if isinstance(dialog.entity, User) and dialog.entity.id == contact_id:
chat_info = f"Direct Chat ID: {dialog.entity.id}, Type: Private"
if dialog.unread_count:
chat_info += f", Unread: {dialog.unread_count}"
results.append(chat_info)
break
# Look for common groups/channels
common_chats = []
try:
common = await client.get_common_chats(contact)
for chat in common:
chat_type = "Channel" if getattr(chat, "broadcast", False) else "Group"
chat_info = f"Chat ID: {chat.id}, Title: {chat.title}, Type: {chat_type}"
results.append(chat_info)
except:
results.append("Could not retrieve common groups.")
if not results:
return f"No chats found with {contact_name} (ID: {contact_id})."
return f"Chats with {contact_name} (ID: {contact_id}):\n" + "\n".join(results)
except Exception as e:
return log_and_format_error("get_contact_chats", e, contact_id=contact_id)
@mcp.tool()
async def get_last_interaction(contact_id: int) -> str:
"""
Get the most recent message with a contact.
Args:
contact_id: The ID of the contact.
"""
try:
# Get contact info
contact = await client.get_entity(contact_id)
if not isinstance(contact, User):
return f"ID {contact_id} is not a user/contact."
contact_name = (
f"{getattr(contact, 'first_name', '')} {getattr(contact, 'last_name', '')}".strip()
)
# Get the last few messages
messages = await client.get_messages(contact, limit=5)
if not messages:
return f"No messages found with {contact_name} (ID: {contact_id})."
results = [f"Last interactions with {contact_name} (ID: {contact_id}):"]
for msg in messages:
sender = "You" if msg.out else contact_name
message_text = msg.message or "[Media/No text]"
results.append(f"Date: {msg.date}, From: {sender}, Message: {message_text}")
return "\n".join(results)
except Exception as e:
return log_and_format_error("get_last_interaction", e, contact_id=contact_id)
@mcp.tool()
async def get_message_context(chat_id: int, message_id: int, context_size: int = 3) -> str:
"""
Retrieve context around a specific message.
Args:
chat_id: The ID of the chat.
message_id: The ID of the central message.
context_size: Number of messages before and after to include.
"""
try:
chat = await client.get_entity(chat_id)
# Get messages around the specified message
messages_before = await client.get_messages(chat, limit=context_size, max_id=message_id)
central_message = await client.get_messages(chat, ids=message_id)
# Fix: get_messages(ids=...) returns a single Message, not a list
if central_message is not None and not isinstance(central_message, list):
central_message = [central_message]
elif central_message is None:
central_message = []
messages_after = await client.get_messages(
chat, limit=context_size, min_id=message_id, reverse=True
)
if not central_message:
return f"Message with ID {message_id} not found in chat {chat_id}."
# Combine messages in chronological order
all_messages = list(messages_before) + list(central_message) + list(messages_after)
all_messages.sort(key=lambda m: m.id)
results = [f"Context for message {message_id} in chat {chat_id}:"]
for msg in all_messages:
sender_name = get_sender_name(msg)
highlight = " [THIS MESSAGE]" if msg.id == message_id else ""
# Check if this message is a reply and get the replied message
reply_content = ""
if msg.reply_to and msg.reply_to.reply_to_msg_id:
try:
replied_msg = await client.get_messages(chat, ids=msg.reply_to.reply_to_msg_id)
if replied_msg:
replied_sender = "Unknown"
if replied_msg.sender:
replied_sender = getattr(
replied_msg.sender, "first_name", ""
) or getattr(replied_msg.sender, "title", "Unknown")
reply_content = f" | reply to {msg.reply_to.reply_to_msg_id}\n → Replied message: [{replied_sender}] {replied_msg.message or '[Media/No text]'}"
except Exception:
reply_content = (
f" | reply to {msg.reply_to.reply_to_msg_id} (original message not found)"
)
results.append(
f"ID: {msg.id} | {sender_name} | {msg.date}{highlight}{reply_content}\n{msg.message or '[Media/No text]'}\n"
)
return "\n".join(results)
except Exception as e:
return log_and_format_error(
"get_message_context",
e,
chat_id=chat_id,
message_id=message_id,
context_size=context_size,
)
@mcp.tool()
async def add_contact(phone: str, first_name: str, last_name: str = "") -> str:
"""
Add a new contact to your Telegram account.
Args:
phone: The phone number of the contact (with country code).
first_name: The contact's first name.
last_name: The contact's last name (optional).
"""
try:
# Try to import the required types first
from telethon.tl.types import InputPhoneContact
result = await client(
functions.contacts.ImportContactsRequest(
contacts=[
InputPhoneContact(
client_id=0, phone=phone, first_name=first_name, last_name=last_name
)
]
)
)
if result.imported:
return f"Contact {first_name} {last_name} added successfully."
else:
return f"Contact not added. Response: {str(result)}"
except (ImportError, AttributeError) as type_err:
# Try alternative approach using raw API
try:
result = await client(
functions.contacts.ImportContactsRequest(
contacts=[
{
"client_id": 0,
"phone": phone,
"first_name": first_name,
"last_name": last_name,
}
]
)
)
if hasattr(result, "imported") and result.imported:
return f"Contact {first_name} {last_name} added successfully (alt method)."
else:
return f"Contact not added. Alternative method response: {str(result)}"
except Exception as alt_e:
logger.exception(f"add_contact (alt method) failed (phone={phone})")
return log_and_format_error("add_contact", alt_e, phone=phone)
except Exception as e:
logger.exception(f"add_contact failed (phone={phone})")
return log_and_format_error("add_contact", e, phone=phone)
@mcp.tool()
async def delete_contact(user_id: int) -> str:
"""
Delete a contact by user ID.
Args:
user_id: The Telegram user ID of the contact to delete.
"""
try:
user = await client.get_entity(user_id)
await client(functions.contacts.DeleteContactsRequest(id=[user]))
return f"Contact with user ID {user_id} deleted."
except Exception as e:
return log_and_format_error("delete_contact", e, user_id=user_id)
@mcp.tool()
async def block_user(user_id: int) -> str:
"""
Block a user by user ID.
Args:
user_id: The Telegram user ID to block.
"""
try:
user = await client.get_entity(user_id)
await client(functions.contacts.BlockRequest(id=user))
return f"User {user_id} blocked."
except Exception as e:
return log_and_format_error("block_user", e, user_id=user_id)
@mcp.tool()
async def unblock_user(user_id: int) -> str:
"""
Unblock a user by user ID.
Args:
user_id: The Telegram user ID to unblock.
"""
try:
user = await client.get_entity(user_id)
await client(functions.contacts.UnblockRequest(id=user))
return f"User {user_id} unblocked."
except Exception as e:
return log_and_format_error("unblock_user", e, user_id=user_id)
@mcp.tool()
async def get_me() -> str:
"""
Get your own user information.
"""
try:
me = await client.get_me()
return json.dumps(format_entity(me), indent=2)
except Exception as e:
return log_and_format_error("get_me", e)
@mcp.tool()
async def create_group(title: str, user_ids: list) -> str:
"""
Create a new group or supergroup and add users.
Args:
title: Title for the new group
user_ids: List of user IDs to add to the group
"""
try:
# Convert user IDs to entities
users = []
for user_id in user_ids:
try:
user = await client.get_entity(user_id)
users.append(user)
except Exception as e:
logger.error(f"Failed to get entity for user ID {user_id}: {e}")
return f"Error: Could not find user with ID {user_id}"
if not users:
return "Error: No valid users provided"
# Create the group with the users
try:
# Create a new chat with selected users
result = await client(functions.messages.CreateChatRequest(users=users, title=title))
# Check what type of response we got
if hasattr(result, "chats") and result.chats:
created_chat = result.chats[0]
return f"Group created with ID: {created_chat.id}"
elif hasattr(result, "chat") and result.chat:
return f"Group created with ID: {result.chat.id}"
elif hasattr(result, "chat_id"):
return f"Group created with ID: {result.chat_id}"
else:
# If we can't determine the chat ID directly from the result
# Try to find it in recent dialogs
await asyncio.sleep(1) # Give Telegram a moment to register the new group
dialogs = await client.get_dialogs(limit=5) # Get recent dialogs
for dialog in dialogs:
if dialog.title == title:
return f"Group created with ID: {dialog.id}"
# If we still can't find it, at least return success
return f"Group created successfully. Please check your recent chats for '{title}'."
except Exception as create_err:
if "PEER_FLOOD" in str(create_err):
return "Error: Cannot create group due to Telegram limits. Try again later."
else:
raise # Let the outer exception handler catch it
except Exception as e:
logger.exception(f"create_group failed (title={title}, user_ids={user_ids})")
return log_and_format_error("create_group", e, title=title, user_ids=user_ids)
@mcp.tool()
async def invite_to_group(group_id: int, user_ids: list) -> str:
"""
Invite users to a group or channel.
Args:
group_id: The ID of the group/channel.
user_ids: List of user IDs to invite.
"""
try:
entity = await client.get_entity(group_id)
users_to_add = []
for user_id in user_ids:
try:
user = await client.get_entity(user_id)
users_to_add.append(user)
except ValueError as e:
return f"Error: User with ID {user_id} could not be found. {e}"
try:
result = await client(
functions.channels.InviteToChannelRequest(channel=entity, users=users_to_add)
)
invited_count = 0
if hasattr(result, "users") and result.users:
invited_count = len(result.users)
elif hasattr(result, "count"):
invited_count = result.count
return f"Successfully invited {invited_count} users to {entity.title}"
except telethon.errors.rpcerrorlist.UserNotMutualContactError:
return "Error: Cannot invite users who are not mutual contacts. Please ensure the users are in your contacts and have added you back."
except telethon.errors.rpcerrorlist.UserPrivacyRestrictedError:
return (
"Error: One or more users have privacy settings that prevent you from adding them."
)
except Exception as e:
return log_and_format_error("invite_to_group", e, group_id=group_id, user_ids=user_ids)
except Exception as e:
logger.error(
f"telegram_mcp invite_to_group failed (group_id={group_id}, user_ids={user_ids})",
exc_info=True,
)
return log_and_format_error("invite_to_group", e, group_id=group_id, user_ids=user_ids)
@mcp.tool()
async def leave_chat(chat_id: int) -> str:
"""
Leave a group or channel by chat ID.
Args:
chat_id: The chat ID to leave.
"""
try:
entity = await client.get_entity(chat_id)
# Check the entity type carefully
if isinstance(entity, Channel):
# Handle both channels and supergroups (which are also channels in Telegram)
try:
await client(functions.channels.LeaveChannelRequest(channel=entity))
chat_name = getattr(entity, "title", str(chat_id))
return f"Left channel/supergroup {chat_name} (ID: {chat_id})."
except Exception as chan_err:
return log_and_format_error("leave_chat", chan_err, chat_id=chat_id)
elif isinstance(entity, Chat):
# Traditional basic groups (not supergroups)
try:
# First try with InputPeerUser
me = await client.get_me(input_peer=True)
await client(
functions.messages.DeleteChatUserRequest(
chat_id=entity.id, user_id=me # Use the entity ID directly
)
)
chat_name = getattr(entity, "title", str(chat_id))
return f"Left basic group {chat_name} (ID: {chat_id})."
except Exception as chat_err:
# If the above fails, try the second approach
logger.warning(
f"First leave attempt failed: {chat_err}, trying alternative method"
)
try:
# Alternative approach - sometimes this works better
me_full = await client.get_me()
await client(
functions.messages.DeleteChatUserRequest(
chat_id=entity.id, user_id=me_full.id
)
)
chat_name = getattr(entity, "title", str(chat_id))
return f"Left basic group {chat_name} (ID: {chat_id})."
except Exception as alt_err:
return log_and_format_error("leave_chat", alt_err, chat_id=chat_id)
else:
# Cannot leave a user chat this way
entity_type = type(entity).__name__
return log_and_format_error(
"leave_chat",
Exception(
f"Cannot leave chat ID {chat_id} of type {entity_type}. This function is for groups and channels only."
),
chat_id=chat_id,
)
except Exception as e:
logger.exception(f"leave_chat failed (chat_id={chat_id})")
# Provide helpful hint for common errors
error_str = str(e).lower()
if "invalid" in error_str and "chat" in error_str:
return log_and_format_error(
"leave_chat",
Exception(
f"Error leaving chat: This appears to be a channel/supergroup. Please check the chat ID and try again."
),
chat_id=chat_id,
)
return log_and_format_error("leave_chat", e, chat_id=chat_id)
@mcp.tool()
async def get_participants(chat_id: int) -> str:
"""
List all participants in a group or channel.
Args:
chat_id: The group or channel ID.
"""
try:
participants = await client.get_participants(chat_id)
lines = [
f"ID: {p.id}, Name: {getattr(p, 'first_name', '')} {getattr(p, 'last_name', '')}"
for p in participants
]
return "\n".join(lines)
except Exception as e:
return log_and_format_error("get_participants", e, chat_id=chat_id)
@mcp.tool()
async def send_file(chat_id: int, file_path: str, caption: str = None) -> str:
"""
Send a file to a chat.
Args:
chat_id: The chat ID.
file_path: Absolute path to the file to send (must exist and be readable).
caption: Optional caption for the file.
"""
try:
if not os.path.isfile(file_path):
return f"File not found: {file_path}"
if not os.access(file_path, os.R_OK):
return f"File is not readable: {file_path}"
entity = await client.get_entity(chat_id)
await client.send_file(entity, file_path, caption=caption)
return f"File sent to chat {chat_id}."
except Exception as e:
return log_and_format_error(
"send_file", e, chat_id=chat_id, file_path=file_path, caption=caption
)
@mcp.tool()
async def download_media(chat_id: int, message_id: int, file_path: str) -> str:
"""
Download media from a message in a chat.
Args:
chat_id: The chat ID.
message_id: The message ID containing the media.
file_path: Absolute path to save the downloaded file (must be writable).
"""
try:
entity = await client.get_entity(chat_id)
msg = await client.get_messages(entity, ids=message_id)
if not msg or not msg.media:
return "No media found in the specified message."
# Check if directory is writable
dir_path = os.path.dirname(file_path) or "."
if not os.access(dir_path, os.W_OK):
return f"Directory not writable: {dir_path}"
await client.download_media(msg, file=file_path)
if not os.path.isfile(file_path):
return f"Download failed: file not created at {file_path}"
return f"Media downloaded to {file_path}."
except Exception as e:
return log_and_format_error(
"download_media", e, chat_id=chat_id, message_id=message_id, file_path=file_path
)
@mcp.tool()
async def update_profile(first_name: str = None, last_name: str = None, about: str = None) -> str:
"""
Update your profile information (name, bio).
"""
try:
await client(
functions.account.UpdateProfileRequest(
first_name=first_name, last_name=last_name, about=about
)
)
return "Profile updated."
except Exception as e:
return log_and_format_error(
"update_profile", e, first_name=first_name, last_name=last_name, about=about
)
@mcp.tool()
async def set_profile_photo(file_path: str) -> str:
"""
Set a new profile photo.
"""
try:
await client(
functions.photos.UploadProfilePhotoRequest(file=await client.upload_file(file_path))
)
return "Profile photo updated."
except Exception as e:
return log_and_format_error("set_profile_photo", e, file_path=file_path)
@mcp.tool()
async def delete_profile_photo() -> str:
"""
Delete your current profile photo.
"""
try:
photos = await client(
functions.photos.GetUserPhotosRequest(user_id="me", offset=0, max_id=0, limit=1)
)
if not photos.photos:
return "No profile photo to delete."
await client(functions.photos.DeletePhotosRequest(id=[photos.photos[0].id]))
return "Profile photo deleted."
except Exception as e:
return log_and_format_error("delete_profile_photo", e)
@mcp.tool()
async def get_privacy_settings() -> str:
"""
Get your privacy settings for last seen status.
"""
try:
# Import needed types directly
from telethon.tl.types import InputPrivacyKeyStatusTimestamp
try:
settings = await client(
functions.account.GetPrivacyRequest(key=InputPrivacyKeyStatusTimestamp())
)
return str(settings)
except TypeError as e:
if "TLObject was expected" in str(e):
return "Error: Privacy settings API call failed due to type mismatch. This is likely a version compatibility issue with Telethon."
else:
raise
except Exception as e:
logger.exception("get_privacy_settings failed")
return log_and_format_error("get_privacy_settings", e)
@mcp.tool()
async def set_privacy_settings(
key: str, allow_users: list = None, disallow_users: list = None
) -> str:
"""
Set privacy settings (e.g., last seen, phone, etc.).
Args:
key: The privacy setting to modify ('status' for last seen, 'phone', 'profile_photo', etc.)
allow_users: List of user IDs to allow
disallow_users: List of user IDs to disallow
"""
try:
# Import needed types
from telethon.tl.types import (
InputPrivacyKeyStatusTimestamp,
InputPrivacyKeyPhoneNumber,
InputPrivacyKeyProfilePhoto,
InputPrivacyValueAllowUsers,
InputPrivacyValueDisallowUsers,
InputPrivacyValueAllowAll,
InputPrivacyValueDisallowAll,
)
# Map the simplified keys to their corresponding input types
key_mapping = {
"status": InputPrivacyKeyStatusTimestamp,
"phone": InputPrivacyKeyPhoneNumber,
"profile_photo": InputPrivacyKeyProfilePhoto,
}
# Get the appropriate key class
if key not in key_mapping:
return f"Error: Unsupported privacy key '{key}'. Supported keys: {', '.join(key_mapping.keys())}"
privacy_key = key_mapping[key]()
# Prepare the rules
rules = []
# Process allow rules
if allow_users is None or len(allow_users) == 0:
# If no specific users to allow, allow everyone by default
rules.append(InputPrivacyValueAllowAll())
else:
# Convert user IDs to InputUser entities
try:
allow_entities = []
for user_id in allow_users:
try:
user = await client.get_entity(user_id)
allow_entities.append(user)
except Exception as user_err:
logger.warning(f"Could not get entity for user ID {user_id}: {user_err}")
if allow_entities:
rules.append(InputPrivacyValueAllowUsers(users=allow_entities))
except Exception as allow_err:
logger.error(f"Error processing allowed users: {allow_err}")
return log_and_format_error("set_privacy_settings", allow_err, key=key)
# Process disallow rules
if disallow_users and len(disallow_users) > 0:
try:
disallow_entities = []
for user_id in disallow_users:
try:
user = await client.get_entity(user_id)
disallow_entities.append(user)
except Exception as user_err:
logger.warning(f"Could not get entity for user ID {user_id}: {user_err}")
if disallow_entities:
rules.append(InputPrivacyValueDisallowUsers(users=disallow_entities))
except Exception as disallow_err:
logger.error(f"Error processing disallowed users: {disallow_err}")
return log_and_format_error("set_privacy_settings", disallow_err, key=key)
# Apply the privacy settings
try:
result = await client(
functions.account.SetPrivacyRequest(key=privacy_key, rules=rules)
)
return f"Privacy settings for {key} updated successfully."
except TypeError as type_err:
if "TLObject was expected" in str(type_err):
return "Error: Privacy settings API call failed due to type mismatch. This is likely a version compatibility issue with Telethon."
else:
raise
except Exception as e:
logger.exception(f"set_privacy_settings failed (key={key})")
return log_and_format_error("set_privacy_settings", e, key=key)
@mcp.tool()
async def import_contacts(contacts: list) -> str:
"""
Import a list of contacts. Each contact should be a dict with phone, first_name, last_name.
"""
try:
input_contacts = [
functions.contacts.InputPhoneContact(
client_id=i,
phone=c["phone"],
first_name=c["first_name"],
last_name=c.get("last_name", ""),
)
for i, c in enumerate(contacts)
]
result = await client(functions.contacts.ImportContactsRequest(contacts=input_contacts))
return f"Imported {len(result.imported)} contacts."
except Exception as e:
return log_and_format_error("import_contacts", e, contacts=contacts)
@mcp.tool()
async def export_contacts() -> str:
"""
Export all contacts as a JSON string.
"""
try:
result = await client(functions.contacts.GetContactsRequest(hash=0))
users = result.users
return json.dumps([format_entity(u) for u in users], indent=2)
except Exception as e:
return log_and_format_error("export_contacts", e)
@mcp.tool()
async def get_blocked_users() -> str:
"""
Get a list of blocked users.
"""
try:
result = await client(functions.contacts.GetBlockedRequest(offset=0, limit=100))
return json.dumps([format_entity(u) for u in result.users], indent=2)
except Exception as e:
return log_and_format_error("get_blocked_users", e)
@mcp.tool()
async def create_channel(title: str, about: str = "", megagroup: bool = False) -> str:
"""
Create a new channel or supergroup.
"""
try:
result = await client(
functions.channels.CreateChannelRequest(title=title, about=about, megagroup=megagroup)
)
return f"Channel '{title}' created with ID: {result.chats[0].id}"
except Exception as e:
return log_and_format_error(
"create_channel", e, title=title, about=about, megagroup=megagroup
)
@mcp.tool()
async def edit_chat_title(chat_id: int, title: str) -> str:
"""
Edit the title of a chat, group, or channel.
"""
try:
entity = await client.get_entity(chat_id)
if isinstance(entity, Channel):
await client(functions.channels.EditTitleRequest(channel=entity, title=title))
elif isinstance(entity, Chat):
await client(functions.messages.EditChatTitleRequest(chat_id=chat_id, title=title))
else:
return f"Cannot edit title for this entity type ({type(entity)})."
return f"Chat {chat_id} title updated to '{title}'."
except Exception as e:
logger.exception(f"edit_chat_title failed (chat_id={chat_id}, title='{title}')")
return log_and_format_error("edit_chat_title", e, chat_id=chat_id, title=title)
@mcp.tool()
async def edit_chat_photo(chat_id: int, file_path: str) -> str:
"""
Edit the photo of a chat, group, or channel. Requires a file path to an image.
"""
try:
if not os.path.isfile(file_path):
return f"Photo file not found: {file_path}"
if not os.access(file_path, os.R_OK):
return f"Photo file not readable: {file_path}"
entity = await client.get_entity(chat_id)
uploaded_file = await client.upload_file(file_path)
if isinstance(entity, Channel):
# For channels/supergroups, use EditPhotoRequest with InputChatUploadedPhoto
input_photo = InputChatUploadedPhoto(file=uploaded_file)
await client(functions.channels.EditPhotoRequest(channel=entity, photo=input_photo))
elif isinstance(entity, Chat):
# For basic groups, use EditChatPhotoRequest with InputChatUploadedPhoto
input_photo = InputChatUploadedPhoto(file=uploaded_file)
await client(
functions.messages.EditChatPhotoRequest(chat_id=chat_id, photo=input_photo)
)
else:
return f"Cannot edit photo for this entity type ({type(entity)})."
return f"Chat {chat_id} photo updated."
except Exception as e:
logger.exception(f"edit_chat_photo failed (chat_id={chat_id}, file_path='{file_path}')")
return log_and_format_error("edit_chat_photo", e, chat_id=chat_id, file_path=file_path)
@mcp.tool()
async def delete_chat_photo(chat_id: int) -> str:
"""
Delete the photo of a chat, group, or channel.
"""
try:
entity = await client.get_entity(chat_id)
if isinstance(entity, Channel):
# Use InputChatPhotoEmpty for channels/supergroups
await client(
functions.channels.EditPhotoRequest(channel=entity, photo=InputChatPhotoEmpty())
)
elif isinstance(entity, Chat):
# Use None (or InputChatPhotoEmpty) for basic groups
await client(
functions.messages.EditChatPhotoRequest(
chat_id=chat_id, photo=InputChatPhotoEmpty()
)
)
else:
return f"Cannot delete photo for this entity type ({type(entity)})."
return f"Chat {chat_id} photo deleted."
except Exception as e:
logger.exception(f"delete_chat_photo failed (chat_id={chat_id})")
return log_and_format_error("delete_chat_photo", e, chat_id=chat_id)
@mcp.tool()
async def promote_admin(group_id: int, user_id: int, rights: dict = None) -> str:
"""
Promote a user to admin in a group/channel.
Args:
group_id: ID of the group/channel
user_id: User ID to promote
rights: Admin rights to give (optional)
"""
try:
chat = await client.get_entity(group_id)
user = await client.get_entity(user_id)
# Set default admin rights if not provided
if not rights:
rights = {
"change_info": True,
"post_messages": True,
"edit_messages": True,
"delete_messages": True,
"ban_users": True,
"invite_users": True,
"pin_messages": True,
"add_admins": False,
"anonymous": False,
"manage_call": True,
"other": True,
}
admin_rights = ChatAdminRights(
change_info=rights.get("change_info", True),
post_messages=rights.get("post_messages", True),
edit_messages=rights.get("edit_messages", True),
delete_messages=rights.get("delete_messages", True),
ban_users=rights.get("ban_users", True),
invite_users=rights.get("invite_users", True),
pin_messages=rights.get("pin_messages", True),
add_admins=rights.get("add_admins", False),
anonymous=rights.get("anonymous", False),
manage_call=rights.get("manage_call", True),
other=rights.get("other", True),
)
try:
result = await client(
functions.channels.EditAdminRequest(
channel=chat, user_id=user, admin_rights=admin_rights, rank="Admin"
)
)
return f"Successfully promoted user {user_id} to admin in {chat.title}"
except telethon.errors.rpcerrorlist.UserNotMutualContactError:
return "Error: Cannot promote users who are not mutual contacts. Please ensure the user is in your contacts and has added you back."
except Exception as e:
return log_and_format_error("promote_admin", e, group_id=group_id, user_id=user_id)
except Exception as e:
logger.error(
f"telegram_mcp promote_admin failed (group_id={group_id}, user_id={user_id})",
exc_info=True,
)
return log_and_format_error("promote_admin", e, group_id=group_id, user_id=user_id)
@mcp.tool()
async def demote_admin(group_id: int, user_id: int) -> str:
"""
Demote a user from admin in a group/channel.
Args:
group_id: ID of the group/channel
user_id: User ID to demote
"""
try:
chat = await client.get_entity(group_id)
user = await client.get_entity(user_id)
# Create empty admin rights (regular user)
admin_rights = ChatAdminRights(
change_info=False,
post_messages=False,
edit_messages=False,
delete_messages=False,
ban_users=False,
invite_users=False,
pin_messages=False,
add_admins=False,
anonymous=False,
manage_call=False,
other=False,
)
try:
result = await client(
functions.channels.EditAdminRequest(
channel=chat, user_id=user, admin_rights=admin_rights, rank=""
)
)
return f"Successfully demoted user {user_id} from admin in {chat.title}"
except telethon.errors.rpcerrorlist.UserNotMutualContactError:
return "Error: Cannot modify admin status of users who are not mutual contacts. Please ensure the user is in your contacts and has added you back."
except Exception as e:
return log_and_format_error("demote_admin", e, group_id=group_id, user_id=user_id)
except Exception as e:
logger.error(
f"telegram_mcp demote_admin failed (group_id={group_id}, user_id={user_id})",
exc_info=True,
)
return log_and_format_error("demote_admin", e, group_id=group_id, user_id=user_id)
@mcp.tool()
async def ban_user(chat_id: int, user_id: int) -> str:
"""
Ban a user from a group or channel.
Args:
chat_id: ID of the group/channel
user_id: User ID to ban
"""
try:
chat = await client.get_entity(chat_id)
user = await client.get_entity(user_id)
# Create banned rights (all restrictions enabled)
banned_rights = ChatBannedRights(
until_date=None, # Ban forever
view_messages=True,
send_messages=True,
send_media=True,
send_stickers=True,
send_gifs=True,
send_games=True,
send_inline=True,
embed_links=True,
send_polls=True,
change_info=True,
invite_users=True,
pin_messages=True,
)
try:
await client(
functions.channels.EditBannedRequest(
channel=chat, participant=user, banned_rights=banned_rights
)
)
return f"User {user_id} banned from chat {chat.title} (ID: {chat_id})."
except telethon.errors.rpcerrorlist.UserNotMutualContactError:
return "Error: Cannot ban users who are not mutual contacts. Please ensure the user is in your contacts and has added you back."
except Exception as e:
return log_and_format_error("ban_user", e, chat_id=chat_id, user_id=user_id)
except Exception as e:
logger.exception(f"ban_user failed (chat_id={chat_id}, user_id={user_id})")
return log_and_format_error("ban_user", e, chat_id=chat_id, user_id=user_id)
@mcp.tool()
async def unban_user(chat_id: int, user_id: int) -> str:
"""
Unban a user from a group or channel.
Args:
chat_id: ID of the group/channel
user_id: User ID to unban
"""
try:
chat = await client.get_entity(chat_id)
user = await client.get_entity(user_id)
# Create unbanned rights (no restrictions)
unbanned_rights = ChatBannedRights(
until_date=None,
view_messages=False,
send_messages=False,
send_media=False,
send_stickers=False,
send_gifs=False,
send_games=False,
send_inline=False,
embed_links=False,
send_polls=False,
change_info=False,
invite_users=False,
pin_messages=False,
)
try:
await client(
functions.channels.EditBannedRequest(
channel=chat, participant=user, banned_rights=unbanned_rights
)
)
return f"User {user_id} unbanned from chat {chat.title} (ID: {chat_id})."
except telethon.errors.rpcerrorlist.UserNotMutualContactError:
return "Error: Cannot modify status of users who are not mutual contacts. Please ensure the user is in your contacts and has added you back."
except Exception as e:
return log_and_format_error("unban_user", e, chat_id=chat_id, user_id=user_id)
except Exception as e:
logger.exception(f"unban_user failed (chat_id={chat_id}, user_id={user_id})")
return log_and_format_error("unban_user", e, chat_id=chat_id, user_id=user_id)
@mcp.tool()
async def get_admins(chat_id: int) -> str:
"""
Get all admins in a group or channel.
"""
try:
# Fix: Use the correct filter type ChannelParticipantsAdmins
participants = await client.get_participants(chat_id, filter=ChannelParticipantsAdmins())
lines = [
f"ID: {p.id}, Name: {getattr(p, 'first_name', '')} {getattr(p, 'last_name', '')}".strip()
for p in participants
]
return "\n".join(lines) if lines else "No admins found."
except Exception as e:
logger.exception(f"get_admins failed (chat_id={chat_id})")
return log_and_format_error("get_admins", e, chat_id=chat_id)
@mcp.tool()
async def get_banned_users(chat_id: int) -> str:
"""
Get all banned users in a group or channel.
"""
try:
# Fix: Use the correct filter type ChannelParticipantsKicked
participants = await client.get_participants(
chat_id, filter=ChannelParticipantsKicked(q="")
)
lines = [
f"ID: {p.id}, Name: {getattr(p, 'first_name', '')} {getattr(p, 'last_name', '')}".strip()
for p in participants
]
return "\n".join(lines) if lines else "No banned users found."
except Exception as e:
logger.exception(f"get_banned_users failed (chat_id={chat_id})")
return log_and_format_error("get_banned_users", e, chat_id=chat_id)
@mcp.tool()
async def get_invite_link(chat_id: int) -> str:
"""
Get the invite link for a group or channel.
"""
try:
entity = await client.get_entity(chat_id)
# Try using ExportChatInviteRequest first
try:
from telethon.tl import functions
result = await client(functions.messages.ExportChatInviteRequest(peer=entity))
return result.link
except AttributeError:
# If the function doesn't exist in the current Telethon version
logger.warning("ExportChatInviteRequest not available, using alternative method")
except Exception as e1:
# If that fails, log and try alternative approach
logger.warning(f"ExportChatInviteRequest failed: {e1}")
# Alternative approach using client.export_chat_invite_link
try:
invite_link = await client.export_chat_invite_link(entity)
return invite_link
except Exception as e2:
logger.warning(f"export_chat_invite_link failed: {e2}")
# Last resort: Try directly fetching chat info
try:
if isinstance(entity, (Chat, Channel)):
full_chat = await client(functions.messages.GetFullChatRequest(chat_id=entity.id))
if hasattr(full_chat, "full_chat") and hasattr(full_chat.full_chat, "invite_link"):
return full_chat.full_chat.invite_link or "No invite link available."
except Exception as e3:
logger.warning(f"GetFullChatRequest failed: {e3}")
return "Could not retrieve invite link for this chat."
except Exception as e:
logger.exception(f"get_invite_link failed (chat_id={chat_id})")
return log_and_format_error("get_invite_link", e, chat_id=chat_id)
@mcp.tool()
async def join_chat_by_link(link: str) -> str:
"""
Join a chat by invite link.
"""
try:
# Extract the hash from the invite link
if "/" in link:
hash_part = link.split("/")[-1]
if hash_part.startswith("+"):
hash_part = hash_part[1:] # Remove the '+' if present
else:
hash_part = link
# Try checking the invite before joining
try:
from telethon.errors import (
InviteHashExpiredError,
InviteHashInvalidError,
UserAlreadyParticipantError,
ChatAdminRequiredError,
UsersTooMuchError,
)
# Try to check invite info first (will often fail if not a member)
invite_info = await client(functions.messages.CheckChatInviteRequest(hash=hash_part))
if hasattr(invite_info, "chat") and invite_info.chat:
# If we got chat info, we're already a member
chat_title = getattr(invite_info.chat, "title", "Unknown Chat")
return f"You are already a member of this chat: {chat_title}"
except Exception as check_err:
# This often fails if not a member - just continue
pass
# Join the chat using the hash
try:
result = await client(functions.messages.ImportChatInviteRequest(hash=hash_part))
if result and hasattr(result, "chats") and result.chats:
chat_title = getattr(result.chats[0], "title", "Unknown Chat")
return f"Successfully joined chat: {chat_title}"
return f"Joined chat via invite hash."
except Exception as join_err:
err_str = str(join_err).lower()
if "expired" in err_str:
return "The invite hash has expired and is no longer valid."
elif "invalid" in err_str:
return "The invite hash is invalid or malformed."
elif "already" in err_str and "participant" in err_str:
return "You are already a member of this chat."
elif "admin" in err_str:
return "Cannot join this chat - requires admin approval."
elif "too much" in err_str or "too many" in err_str:
return "Cannot join this chat - it has reached maximum number of participants."
else:
raise # Re-raise to be caught by the outer exception handler
except Exception as e:
logger.exception(f"join_chat_by_link failed (link={link})")
return log_and_format_error("join_chat_by_link", e, link=link)
@mcp.tool()
async def export_chat_invite(chat_id: int) -> str:
"""
Export a chat invite link.
"""
try:
entity = await client.get_entity(chat_id)
# Try using ExportChatInviteRequest first
try:
from telethon.tl import functions
result = await client(functions.messages.ExportChatInviteRequest(peer=entity))
return result.link
except AttributeError:
# If the function doesn't exist in the current Telethon version
logger.warning("ExportChatInviteRequest not available, using alternative method")
except Exception as e1:
# If that fails, log and try alternative approach
logger.warning(f"ExportChatInviteRequest failed: {e1}")
# Alternative approach using client.export_chat_invite_link
try:
invite_link = await client.export_chat_invite_link(entity)
return invite_link
except Exception as e2:
logger.warning(f"export_chat_invite_link failed: {e2}")
return log_and_format_error("export_chat_invite", e2, chat_id=chat_id)
except Exception as e:
logger.exception(f"export_chat_invite failed (chat_id={chat_id})")
return log_and_format_error("export_chat_invite", e, chat_id=chat_id)
@mcp.tool()
async def import_chat_invite(hash: str) -> str:
"""
Import a chat invite by hash.
"""
try:
# Remove any prefixes like '+' if present
if hash.startswith("+"):
hash = hash[1:]
# Try checking the invite before joining
try:
from telethon.errors import (
InviteHashExpiredError,
InviteHashInvalidError,
UserAlreadyParticipantError,
ChatAdminRequiredError,
UsersTooMuchError,
)
# Try to check invite info first (will often fail if not a member)
invite_info = await client(functions.messages.CheckChatInviteRequest(hash=hash))
if hasattr(invite_info, "chat") and invite_info.chat:
# If we got chat info, we're already a member
chat_title = getattr(invite_info.chat, "title", "Unknown Chat")
return f"You are already a member of this chat: {chat_title}"
except Exception as check_err:
# This often fails if not a member - just continue
pass
# Join the chat using the hash
try:
result = await client(functions.messages.ImportChatInviteRequest(hash=hash))
if result and hasattr(result, "chats") and result.chats:
chat_title = getattr(result.chats[0], "title", "Unknown Chat")
return f"Successfully joined chat: {chat_title}"
return f"Joined chat via invite hash."
except Exception as join_err:
err_str = str(join_err).lower()
if "expired" in err_str:
return "The invite hash has expired and is no longer valid."
elif "invalid" in err_str:
return "The invite hash is invalid or malformed."
elif "already" in err_str and "participant" in err_str:
return "You are already a member of this chat."
elif "admin" in err_str:
return "Cannot join this chat - requires admin approval."
elif "too much" in err_str or "too many" in err_str:
return "Cannot join this chat - it has reached maximum number of participants."
else:
raise # Re-raise to be caught by the outer exception handler
except Exception as e:
logger.exception(f"import_chat_invite failed (hash={hash})")
return log_and_format_error("import_chat_invite", e, hash=hash)
@mcp.tool()
async def send_voice(chat_id: int, file_path: str) -> str:
"""
Send a voice message to a chat. File must be an OGG/OPUS voice note.
Args:
chat_id: The chat ID.
file_path: Absolute path to the OGG/OPUS file.
"""
try:
if not os.path.isfile(file_path):
return f"File not found: {file_path}"
if not os.access(file_path, os.R_OK):
return f"File is not readable: {file_path}"
mime, _ = mimetypes.guess_type(file_path)
if not (
mime
and (
mime == "audio/ogg"
or file_path.lower().endswith(".ogg")
or file_path.lower().endswith(".opus")
)
):
return "Voice file must be .ogg or .opus format."
entity = await client.get_entity(chat_id)
await client.send_file(entity, file_path, voice_note=True)
return f"Voice message sent to chat {chat_id}."
except Exception as e:
return log_and_format_error("send_voice", e, chat_id=chat_id, file_path=file_path)
@mcp.tool()
async def forward_message(from_chat_id: int, message_id: int, to_chat_id: int) -> str:
"""
Forward a message from one chat to another.
"""
try:
from_entity = await client.get_entity(from_chat_id)
to_entity = await client.get_entity(to_chat_id)
await client.forward_messages(to_entity, message_id, from_entity)
return f"Message {message_id} forwarded from {from_chat_id} to {to_chat_id}."
except Exception as e:
return log_and_format_error(
"forward_message",
e,
from_chat_id=from_chat_id,
message_id=message_id,
to_chat_id=to_chat_id,
)
@mcp.tool()
async def edit_message(chat_id: int, message_id: int, new_text: str) -> str:
"""
Edit a message you sent.
"""
try:
entity = await client.get_entity(chat_id)
await client.edit_message(entity, message_id, new_text)
return f"Message {message_id} edited."
except Exception as e:
return log_and_format_error(
"edit_message", e, chat_id=chat_id, message_id=message_id, new_text=new_text
)
@mcp.tool()
async def delete_message(chat_id: int, message_id: int) -> str:
"""
Delete a message by ID.
"""
try:
entity = await client.get_entity(chat_id)
await client.delete_messages(entity, message_id)
return f"Message {message_id} deleted."
except Exception as e:
return log_and_format_error("delete_message", e, chat_id=chat_id, message_id=message_id)
@mcp.tool()
async def pin_message(chat_id: int, message_id: int) -> str:
"""
Pin a message in a chat.
"""
try:
entity = await client.get_entity(chat_id)
await client.pin_message(entity, message_id)
return f"Message {message_id} pinned in chat {chat_id}."
except Exception as e:
return log_and_format_error("pin_message", e, chat_id=chat_id, message_id=message_id)
@mcp.tool()
async def unpin_message(chat_id: int, message_id: int) -> str:
"""
Unpin a message in a chat.
"""
try:
entity = await client.get_entity(chat_id)
await client.unpin_message(entity, message_id)
return f"Message {message_id} unpinned in chat {chat_id}."
except Exception as e:
return log_and_format_error("unpin_message", e, chat_id=chat_id, message_id=message_id)
@mcp.tool()
async def mark_as_read(chat_id: int) -> str:
"""
Mark all messages as read in a chat.
"""
try:
entity = await client.get_entity(chat_id)
await client.send_read_acknowledge(entity)
return f"Marked all messages as read in chat {chat_id}."
except Exception as e:
return log_and_format_error("mark_as_read", e, chat_id=chat_id)
@mcp.tool()
async def reply_to_message(chat_id: int, message_id: int, text: str) -> str:
"""
Reply to a specific message in a chat.
"""
try:
entity = await client.get_entity(chat_id)
await client.send_message(entity, text, reply_to=message_id)
return f"Replied to message {message_id} in chat {chat_id}."
except Exception as e:
return log_and_format_error(
"reply_to_message", e, chat_id=chat_id, message_id=message_id, text=text
)
@mcp.tool()
async def get_media_info(chat_id: int, message_id: int) -> str:
"""
Get info about media in a message.
Args:
chat_id: The chat ID.
message_id: The message ID.
"""
try:
entity = await client.get_entity(chat_id)
msg = await client.get_messages(entity, ids=message_id)
if not msg or not msg.media:
return "No media found in the specified message."
return str(msg.media)
except Exception as e:
return log_and_format_error("get_media_info", e, chat_id=chat_id, message_id=message_id)
@mcp.tool()
async def search_public_chats(query: str) -> str:
"""
Search for public chats, channels, or bots by username or title.
"""
try:
result = await client(functions.contacts.SearchRequest(q=query, limit=20))
return json.dumps([format_entity(u) for u in result.users], indent=2)
except Exception as e:
return log_and_format_error("search_public_chats", e, query=query)
@mcp.tool()
async def search_messages(chat_id: int, query: str, limit: int = 20) -> str:
"""
Search for messages in a chat by text.
"""
try:
entity = await client.get_entity(chat_id)
messages = await client.get_messages(entity, limit=limit, search=query)
lines = []
for msg in messages:
sender_name = get_sender_name(msg)
reply_info = ""
if msg.reply_to and msg.reply_to.reply_to_msg_id:
reply_info = f" | reply to {msg.reply_to.reply_to_msg_id}"
lines.append(
f"ID: {msg.id} | {sender_name} | Date: {msg.date}{reply_info} | Message: {msg.message}"
)
return "\n".join(lines)
except Exception as e:
return log_and_format_error(
"search_messages", e, chat_id=chat_id, query=query, limit=limit
)
@mcp.tool()
async def resolve_username(username: str) -> str:
"""
Resolve a username to a user or chat ID.
"""
try:
result = await client(functions.contacts.ResolveUsernameRequest(username=username))
return str(result)
except Exception as e:
return log_and_format_error("resolve_username", e, username=username)
@mcp.tool()
async def mute_chat(chat_id: int) -> str:
"""
Mute notifications for a chat.
"""
try:
from telethon.tl.types import InputPeerNotifySettings
peer = await client.get_entity(chat_id)
await client(
functions.account.UpdateNotifySettingsRequest(
peer=peer, settings=InputPeerNotifySettings(mute_until=2**31 - 1)
)
)
return f"Chat {chat_id} muted."
except (ImportError, AttributeError) as type_err:
try:
# Alternative approach directly using raw API
peer = await client.get_input_entity(chat_id)
await client(
functions.account.UpdateNotifySettingsRequest(
peer=peer,
settings={
"mute_until": 2**31 - 1, # Far future
"show_previews": False,
"silent": True,
},
)
)
return f"Chat {chat_id} muted (using alternative method)."
except Exception as alt_e:
logger.exception(f"mute_chat (alt method) failed (chat_id={chat_id})")
return log_and_format_error("mute_chat", alt_e, chat_id=chat_id)
except Exception as e:
logger.exception(f"mute_chat failed (chat_id={chat_id})")
return log_and_format_error("mute_chat", e, chat_id=chat_id)
@mcp.tool()
async def unmute_chat(chat_id: int) -> str:
"""
Unmute notifications for a chat.
"""
try:
from telethon.tl.types import InputPeerNotifySettings
peer = await client.get_entity(chat_id)
await client(
functions.account.UpdateNotifySettingsRequest(
peer=peer, settings=InputPeerNotifySettings(mute_until=0)
)
)
return f"Chat {chat_id} unmuted."
except (ImportError, AttributeError) as type_err:
try:
# Alternative approach directly using raw API
peer = await client.get_input_entity(chat_id)
await client(
functions.account.UpdateNotifySettingsRequest(
peer=peer,
settings={
"mute_until": 0, # Unmute (current time)
"show_previews": True,
"silent": False,
},
)
)
return f"Chat {chat_id} unmuted (using alternative method)."
except Exception as alt_e:
logger.exception(f"unmute_chat (alt method) failed (chat_id={chat_id})")
return log_and_format_error("unmute_chat", alt_e, chat_id=chat_id)
except Exception as e:
logger.exception(f"unmute_chat failed (chat_id={chat_id})")
return log_and_format_error("unmute_chat", e, chat_id=chat_id)
@mcp.tool()
async def archive_chat(chat_id: int) -> str:
"""
Archive a chat.
"""
try:
await client(
functions.messages.ToggleDialogPinRequest(
peer=await client.get_entity(chat_id), pinned=True
)
)
return f"Chat {chat_id} archived."
except Exception as e:
return log_and_format_error("archive_chat", e, chat_id=chat_id)
@mcp.tool()
async def unarchive_chat(chat_id: int) -> str:
"""
Unarchive a chat.
"""
try:
await client(
functions.messages.ToggleDialogPinRequest(
peer=await client.get_entity(chat_id), pinned=False
)
)
return f"Chat {chat_id} unarchived."
except Exception as e:
return log_and_format_error("unarchive_chat", e, chat_id=chat_id)
@mcp.tool()
async def get_sticker_sets() -> str:
"""
Get all sticker sets.
"""
try:
result = await client(functions.messages.GetAllStickersRequest(hash=0))
return json.dumps([s.title for s in result.sets], indent=2)
except Exception as e:
return log_and_format_error("get_sticker_sets", e)
@mcp.tool()
async def send_sticker(chat_id: int, file_path: str) -> str:
"""
Send a sticker to a chat. File must be a valid .webp sticker file.
Args:
chat_id: The chat ID.
file_path: Absolute path to the .webp sticker file.
"""
try:
if not os.path.isfile(file_path):
return f"Sticker file not found: {file_path}"
if not os.access(file_path, os.R_OK):
return f"Sticker file is not readable: {file_path}"
if not file_path.lower().endswith(".webp"):
return "Sticker file must be a .webp file."
entity = await client.get_entity(chat_id)
await client.send_file(entity, file_path, force_document=False)
return f"Sticker sent to chat {chat_id}."
except Exception as e:
return log_and_format_error("send_sticker", e, chat_id=chat_id, file_path=file_path)
@mcp.tool()
async def get_gif_search(query: str, limit: int = 10) -> str:
"""
Search for GIFs by query. Returns a list of Telegram document IDs (not file paths).
Args:
query: Search term for GIFs.
limit: Max number of GIFs to return.
"""
try:
# Try approach 1: SearchGifsRequest
try:
result = await client(
functions.messages.SearchGifsRequest(q=query, offset_id=0, limit=limit)
)
if not result.gifs:
return "[]"
return json.dumps(
[g.document.id for g in result.gifs], indent=2, default=json_serializer
)
except (AttributeError, ImportError):
# Fallback approach: Use SearchRequest with GIF filter
try:
from telethon.tl.types import InputMessagesFilterGif
result = await client(
functions.messages.SearchRequest(
peer="gif",
q=query,
filter=InputMessagesFilterGif(),
min_date=None,
max_date=None,
offset_id=0,
add_offset=0,
limit=limit,
max_id=0,
min_id=0,
hash=0,
)
)
if not result or not hasattr(result, "messages") or not result.messages:
return "[]"
# Extract document IDs from any messages with media
gif_ids = []
for msg in result.messages:
if hasattr(msg, "media") and msg.media and hasattr(msg.media, "document"):
gif_ids.append(msg.media.document.id)
return json.dumps(gif_ids, default=json_serializer)
except Exception as inner_e:
# Last resort: Try to fetch from a public bot
return f"Could not search GIFs using available methods: {inner_e}"
except Exception as e:
logger.exception(f"get_gif_search failed (query={query}, limit={limit})")
return log_and_format_error("get_gif_search", e, query=query, limit=limit)
@mcp.tool()
async def send_gif(chat_id: int, gif_id: int) -> str:
"""
Send a GIF to a chat by Telegram GIF document ID (not a file path).
Args:
chat_id: The chat ID.
gif_id: Telegram document ID for the GIF (from get_gif_search).
"""
try:
if not isinstance(gif_id, int):
return "gif_id must be a Telegram document ID (integer), not a file path. Use get_gif_search to find IDs."
entity = await client.get_entity(chat_id)
await client.send_file(entity, gif_id)
return f"GIF sent to chat {chat_id}."
except Exception as e:
return log_and_format_error("send_gif", e, chat_id=chat_id, gif_id=gif_id)
@mcp.tool()
async def get_bot_info(bot_username: str) -> str:
"""
Get information about a bot by username.
"""
try:
entity = await client.get_entity(bot_username)
if not entity:
return f"Bot with username {bot_username} not found."
result = await client(functions.users.GetFullUserRequest(id=entity))
# Create a more structured, serializable response
if hasattr(result, "to_dict"):
# Use custom serializer to handle non-serializable types
return json.dumps(result.to_dict(), indent=2, default=json_serializer)
else:
# Fallback if to_dict is not available
info = {
"bot_info": {
"id": entity.id,
"username": entity.username,
"first_name": entity.first_name,
"last_name": getattr(entity, "last_name", ""),
"is_bot": getattr(entity, "bot", False),
"verified": getattr(entity, "verified", False),
}
}
if hasattr(result, "full_user") and hasattr(result.full_user, "about"):
info["bot_info"]["about"] = result.full_user.about
return json.dumps(info, indent=2)
except Exception as e:
logger.exception(f"get_bot_info failed (bot_username={bot_username})")
return log_and_format_error("get_bot_info", e, bot_username=bot_username)
@mcp.tool()
async def set_bot_commands(bot_username: str, commands: list) -> str:
"""
Set bot commands for a bot you own.
Note: This function can only be used if the Telegram client is a bot account.
Regular user accounts cannot set bot commands.
Args:
bot_username: The username of the bot to set commands for.
commands: List of command dictionaries with 'command' and 'description' keys.
"""
try:
# First check if the current client is a bot
me = await client.get_me()
if not getattr(me, "bot", False):
return "Error: This function can only be used by bot accounts. Your current Telegram account is a regular user account, not a bot."
# Import required types
from telethon.tl.types import BotCommand, BotCommandScopeDefault
from telethon.tl.functions.bots import SetBotCommandsRequest
# Create BotCommand objects from the command dictionaries
bot_commands = [
BotCommand(command=c["command"], description=c["description"]) for c in commands
]
# Get the bot entity
bot = await client.get_entity(bot_username)
# Set the commands with proper scope
await client(
SetBotCommandsRequest(
scope=BotCommandScopeDefault(),
lang_code="en", # Default language code
commands=bot_commands,
)
)
return f"Bot commands set for {bot_username}."
except ImportError as ie:
logger.exception(f"set_bot_commands failed - ImportError: {ie}")
return log_and_format_error("set_bot_commands", ie)
except Exception as e:
logger.exception(f"set_bot_commands failed (bot_username={bot_username})")
return log_and_format_error("set_bot_commands", e, bot_username=bot_username)
@mcp.tool()
async def get_history(chat_id: int, limit: int = 100) -> str:
"""
Get full chat history (up to limit).
"""
try:
entity = await client.get_entity(chat_id)
messages = await client.get_messages(entity, limit=limit)
lines = []
for msg in messages:
sender_name = get_sender_name(msg)
reply_info = ""
if msg.reply_to and msg.reply_to.reply_to_msg_id:
reply_info = f" | reply to {msg.reply_to.reply_to_msg_id}"
lines.append(
f"ID: {msg.id} | {sender_name} | Date: {msg.date}{reply_info} | Message: {msg.message}"
)
return "\n".join(lines)
except Exception as e:
return log_and_format_error("get_history", e, chat_id=chat_id, limit=limit)
@mcp.tool()
async def get_user_photos(user_id: int, limit: int = 10) -> str:
"""
Get profile photos of a user.
"""
try:
user = await client.get_entity(user_id)
photos = await client(
functions.photos.GetUserPhotosRequest(user_id=user, offset=0, max_id=0, limit=limit)
)
return json.dumps([p.id for p in photos.photos], indent=2)
except Exception as e:
return log_and_format_error("get_user_photos", e, user_id=user_id, limit=limit)
@mcp.tool()
async def get_user_status(user_id: int) -> str:
"""
Get the online status of a user.
"""
try:
user = await client.get_entity(user_id)
return str(user.status)
except Exception as e:
return log_and_format_error("get_user_status", e, user_id=user_id)
@mcp.tool()
async def get_recent_actions(chat_id: int) -> str:
"""
Get recent admin actions (admin log) in a group or channel.
"""
try:
result = await client(
functions.channels.GetAdminLogRequest(
channel=chat_id, q="", events_filter=None, admins=[], max_id=0, min_id=0, limit=20
)
)
if not result or not result.events:
return "No recent admin actions found."
# Use the custom serializer to handle datetime objects
return json.dumps([e.to_dict() for e in result.events], indent=2, default=json_serializer)
except Exception as e:
logger.exception(f"get_recent_actions failed (chat_id={chat_id})")
return log_and_format_error("get_recent_actions", e, chat_id=chat_id)
@mcp.tool()
async def get_pinned_messages(chat_id: int) -> str:
"""
Get all pinned messages in a chat.
"""
try:
entity = await client.get_entity(chat_id)
# Use correct filter based on Telethon version
try:
# Try newer Telethon approach
from telethon.tl.types import InputMessagesFilterPinned
messages = await client.get_messages(entity, filter=InputMessagesFilterPinned())
except (ImportError, AttributeError):
# Fallback - try without filter and manually filter pinned
all_messages = await client.get_messages(entity, limit=50)
messages = [m for m in all_messages if getattr(m, "pinned", False)]
if not messages:
return "No pinned messages found in this chat."
lines = []
for msg in messages:
sender_name = get_sender_name(msg)
reply_info = ""
if msg.reply_to and msg.reply_to.reply_to_msg_id:
reply_info = f" | reply to {msg.reply_to.reply_to_msg_id}"
lines.append(
f"ID: {msg.id} | {sender_name} | Date: {msg.date}{reply_info} | Message: {msg.message or '[Media/No text]'}"
)
return "\n".join(lines)
except Exception as e:
logger.exception(f"get_pinned_messages failed (chat_id={chat_id})")
return log_and_format_error("get_pinned_messages", e, chat_id=chat_id)
if __name__ == "__main__":
nest_asyncio.apply()
async def main() -> None:
try:
# Start the Telethon client non-interactively
print("Starting Telegram client...")
await client.start()
print("Telegram client started. Running MCP server...")
# Use the asynchronous entrypoint instead of mcp.run()
await mcp.run_stdio_async()
except Exception as e:
print(f"Error starting client: {e}", file=sys.stderr)
if isinstance(e, sqlite3.OperationalError) and "database is locked" in str(e):
print(
"Database lock detected. Please ensure no other instances are running.",
file=sys.stderr,
)
sys.exit(1)
asyncio.run(main())