import httpx
import logging
import sys
import os
import time
from mcp_app import mcp
BOT_TOKEN = os.getenv("TELEGRAM_BOT_TOKEN")
BASE_URL = f"https://api.telegram.org/bot{BOT_TOKEN}"
# Setup Logging
logging.basicConfig(
stream=sys.stderr, level=logging.INFO, format="[LOG] %(message)s", force=True
)
@mcp.tool()
async def send_telegram_message(message: str) -> str:
"""
Send a notification to the configured Telegram Chat.
Args:
message: The text to send to your phone.
"""
token = os.getenv("TELEGRAM_BOT_TOKEN")
chat_id = os.getenv("TELEGRAM_CHAT_ID")
if not token or not chat_id:
return "Error: Missing TELEGRAM_BOT_TOKEN or TELEGRAM_CHAT_ID in .env file."
url = f"https://api.telegram.org/bot{token}/sendMessage"
payload = {"chat_id": chat_id, "text": message, "parse_mode": "Markdown"}
logging.info(f"Sending Telegram message to {chat_id}...")
async with httpx.AsyncClient() as client:
try:
response = await client.post(url, json=payload)
if response.status_code == 200:
return "Message sent successfully!"
else:
return f"Telegram Error: {response.status_code} - {response.text}"
except Exception as e:
return f"Connection Error: {str(e)}"
async def make_request(
endpoint: str, method: str = "GET", params: dict = None, data: dict = None
):
"""
Helper to handle Telegram API calls.
"""
async with httpx.AsyncClient() as client:
if method == "POST":
response = await client.post(f"{BASE_URL}/{endpoint}", json=data)
else:
response = await client.get(f"{BASE_URL}/{endpoint}", params=params)
return response.json()
@mcp.tool()
async def telegram_read_messages(limit: int = 5) -> str:
"""
Reads the last few messages sent to the bot by the user.
Useful for checking if the user has replied or asked a question.
"""
data = await make_request(
"getUpdates", method="GET", params={"limit": limit, "offset": -limit}
)
if not data.get("ok"):
return f"Error reading messages: {data.get('description')}"
messages = []
for update in data.get("result", []):
if "message" in update:
user = update["message"]["from"].get("first_name", "Unknown")
text = update["message"].get("text", "[No Text]")
chat_id = update["message"]["chat"]["id"]
# Format: [ChatID] User: Message
messages.append(f"[{chat_id}] {user}: {text}")
if not messages:
return "No new messages found."
return "\n".join(messages)
@mcp.tool()
async def telegram_reply(chat_id: int, text: str) -> str:
"""
Sends a reply to a specific user/chat ID.
Args:
chat_id: The ID of the user to reply to.
text: The message to send back.
"""
payload = {"chat_id": chat_id, "text": text}
data = await make_request("sendMessage", method="POST", data=payload)
if data.get("ok"):
return "Reply sent successfully!"
else:
return f"Failed to send: {data.get('description')}"
@mcp.tool()
def wait_for_seconds(seconds: int) -> str:
"""
Pauses execution for a set number of seconds.
REQUIRED for running automated loops to reply messages.
"""
time.sleep(seconds)
return f"Waited for {seconds} seconds."