"""
telegram-mcp server — MCP tools for Telegram bot interaction via MTProto.
Exposes tools to send messages to Telegram bots and retrieve chat history,
using your authenticated user account (not the Bot API).
"""
import os
import asyncio
import json
from pathlib import Path
from datetime import datetime, timezone
from dotenv import load_dotenv
from telethon import TelegramClient as TelethonClient
from mcp.server.fastmcp import FastMCP
# --- Configuration ---
_env_path = os.environ.get("TELEGRAM_ENV_PATH")
if _env_path:
load_dotenv(_env_path)
else:
# Try common locations
for candidate in [Path.cwd() / ".env", Path.home() / ".claude" / ".env"]:
if candidate.exists():
load_dotenv(candidate)
break
API_ID = os.environ.get("TELEGRAM_API_ID")
API_HASH = os.environ.get("TELEGRAM_API_HASH")
SESSION_DIR = os.environ.get("TELEGRAM_SESSION_DIR", str(Path.cwd()))
SESSION_NAME = os.path.join(SESSION_DIR, "telegram_user")
# --- MCP Server ---
mcp = FastMCP("telegram")
def _check_config() -> str | None:
"""Return an error message if config is missing, else None."""
if not API_ID or not API_HASH:
return (
"Missing TELEGRAM_API_ID or TELEGRAM_API_HASH. "
"Set them as environment variables or in your .env file. "
"See README for setup instructions."
)
session_file = f"{SESSION_NAME}.session"
if not Path(session_file).exists():
return (
f"Session file not found at {session_file}. "
"Run 'telegram-mcp-auth' to authenticate first."
)
return None
async def _get_client() -> TelethonClient:
"""Create and connect a Telethon client."""
client = TelethonClient(SESSION_NAME, int(API_ID), API_HASH)
await client.connect()
if not await client.is_user_authorized():
await client.disconnect()
raise RuntimeError(
"Telegram session expired. Run 'telegram-mcp-auth' to re-authenticate."
)
return client
@mcp.tool()
async def send_message(bot: str, message: str, timeout: int = 30) -> str:
"""Send a message to a Telegram bot and wait for its reply.
Args:
bot: Bot username (e.g. '@BotFather' or 'BotFather')
message: The message text to send
timeout: Seconds to wait for a reply (default 30)
"""
err = _check_config()
if err:
return json.dumps({"status": "error", "error": err})
client = await _get_client()
try:
bot_username = bot.lstrip("@")
entity = await client.get_entity(bot_username)
sent_msg = await client.send_message(entity, message)
# Poll for bot reply
reply_text = None
start = asyncio.get_event_loop().time()
while (asyncio.get_event_loop().time() - start) < timeout:
messages = await client.get_messages(entity, limit=5, min_id=sent_msg.id)
bot_replies = [m for m in messages if m.sender_id == entity.id]
if bot_replies:
reply_text = bot_replies[0].text
break
await asyncio.sleep(1)
result = {
"status": "ok" if reply_text else "timeout",
"bot": f"@{bot_username}",
"sent": message,
"reply": reply_text,
"timestamp": datetime.now(timezone.utc).isoformat(),
}
if not reply_text:
result["timeout_seconds"] = timeout
return json.dumps(result, indent=2)
except Exception as e:
return json.dumps({
"status": "error",
"bot": f"@{bot.lstrip('@')}",
"error": str(e),
"timestamp": datetime.now(timezone.utc).isoformat(),
})
finally:
await client.disconnect()
@mcp.tool()
async def get_history(bot: str, limit: int = 20) -> str:
"""Get message history with a Telegram bot.
Args:
bot: Bot username (e.g. '@BotFather' or 'BotFather')
limit: Number of messages to retrieve (default 20)
"""
err = _check_config()
if err:
return json.dumps({"status": "error", "error": err})
client = await _get_client()
try:
bot_username = bot.lstrip("@")
entity = await client.get_entity(bot_username)
messages = await client.get_messages(entity, limit=limit)
history = []
for msg in reversed(messages):
history.append({
"id": msg.id,
"from": "you" if msg.out else f"@{bot_username}",
"text": msg.text,
"date": msg.date.isoformat(),
})
return json.dumps({
"status": "ok",
"bot": f"@{bot_username}",
"messages": history,
"count": len(history),
"timestamp": datetime.now(timezone.utc).isoformat(),
}, indent=2)
except Exception as e:
return json.dumps({
"status": "error",
"error": str(e),
"timestamp": datetime.now(timezone.utc).isoformat(),
})
finally:
await client.disconnect()
def main():
"""Run the MCP server."""
transport = os.environ.get("TELEGRAM_TRANSPORT", "stdio")
mcp.run(transport=transport)