telegram_tools.py•13.8 kB
"""
Telegram integration tools for the MCP server.
"""
import logging
import os
import re
from datetime import datetime
from typing import Any, Dict, List, Optional
import requests
from config import ADMIN_ID, DOWNLOADS_BASE_PATH, TG_TOKEN
from . import mcp
def _merge_metadata(data, metadata, allowed_keys):
if metadata:
for k in allowed_keys:
if k in metadata:
data[k] = metadata[k]
def _append_album_to_caption(data, album):
if album:
if "caption" in data and data["caption"]:
data["caption"] += f"\nAlbum: {album}"
else:
data["caption"] = f"Album: {album}"
def _append_extra_lines_to_caption(data, extra_lines):
if extra_lines:
if "caption" in data and data["caption"]:
data["caption"] += "\n" + "\n".join(extra_lines)
else:
data["caption"] = "\n".join(extra_lines)
def _send_telegram_file(url, data, files, file_key, target_chat_id, file_type):
try:
response = requests.post(url, data=data, files=files, timeout=60)
files[file_key].close()
if response.status_code == 200:
logging.info("Sent Telegram %s to %s", file_type, target_chat_id)
return f"Sent Telegram {file_type} to {target_chat_id}"
logging.error("Failed to send Telegram %s: %s", file_type, response.text)
return f"Failed to send Telegram {file_type}: {response.text}"
except Exception as e:
logging.error("Exception sending Telegram %s: %s", file_type, e)
return f"Exception sending Telegram {file_type}: {e}"
def _prepare_audio_payload(audio_path, text, target_chat_id, audio_metadata):
url = f"https://api.telegram.org/bot{TG_TOKEN}/sendAudio"
if not os.path.isfile(audio_path):
return None, f"Audio file not found: {audio_path}"
files = {"audio": open(audio_path, "rb")}
data = {"chat_id": target_chat_id}
if text:
data["caption"] = text
allowed_audio_keys = {
"performer",
"title",
"duration",
"caption",
"parse_mode",
"thumbnail",
}
_merge_metadata(data, audio_metadata, allowed_audio_keys)
if audio_metadata and "album" in audio_metadata:
_append_album_to_caption(data, audio_metadata["album"])
return (url, data, files, "audio", "audio"), None
def _prepare_video_payload(video_path, text, target_chat_id, video_metadata):
url = f"https://api.telegram.org/bot{TG_TOKEN}/sendVideo"
if not os.path.isfile(video_path):
return None, f"Video file not found: {video_path}"
files = {"video": open(video_path, "rb")}
data = {"chat_id": target_chat_id}
if text:
data["caption"] = text
allowed_video_keys = {
"duration",
"width",
"height",
"supports_streaming",
"caption",
"parse_mode",
"thumbnail",
}
_merge_metadata(data, video_metadata, allowed_video_keys)
extra_lines = []
if video_metadata:
if "creator" in video_metadata:
extra_lines.append(f"Creator: {video_metadata['creator']}")
if "album" in video_metadata:
extra_lines.append(f"Album: {video_metadata['album']}")
_append_extra_lines_to_caption(data, extra_lines)
return (url, data, files, "video", "video"), None
def _prepare_document_payload(document_path, text, target_chat_id, document_metadata):
url = f"https://api.telegram.org/bot{TG_TOKEN}/sendDocument"
if not os.path.isfile(document_path):
return None, f"Document file not found: {document_path}"
files = {"document": open(document_path, "rb")}
data = {"chat_id": target_chat_id}
if text:
data["caption"] = text
allowed_doc_keys = {
"caption",
"parse_mode",
"disable_content_type_detection",
"thumbnail",
"file_name",
}
_merge_metadata(data, document_metadata, allowed_doc_keys)
extra_lines = []
if document_metadata:
if "creator" in document_metadata:
extra_lines.append(f"Creator: {document_metadata['creator']}")
if "album" in document_metadata:
extra_lines.append(f"Album: {document_metadata['album']}")
_append_extra_lines_to_caption(data, extra_lines)
return (url, data, files, "document", "document"), None
def _send_plain_text_message(token, target_chat_id, text):
url = f"https://api.telegram.org/bot{token}/sendMessage"
payload = {"chat_id": target_chat_id, "text": text, "parse_mode": "HTML"}
try:
response = requests.post(url, data=payload, timeout=10)
if response.status_code == 200:
logging.info("Sent Telegram message to %s", target_chat_id)
return f"Sent Telegram message to {target_chat_id}"
logging.error("Failed to send Telegram message: %s", response.text)
return f"Failed to send Telegram message: {response.text}"
except Exception as e:
logging.error("Exception sending Telegram message: %s", e)
return f"Exception sending Telegram message: {e}"
@mcp.tool()
async def send_telegram_message(
text: Optional[str] = None,
chat_id: Optional[str] = None,
audio_path: Optional[str] = None,
video_path: Optional[str] = None,
document_path: Optional[str] = None,
audio_metadata: Optional[dict] = None,
video_metadata: Optional[dict] = None,
document_metadata: Optional[dict] = None,
) -> str:
"""
Send a message, audio, video, or document to a Telegram user or chat using the bot token from environment variables.
Optionally, include metadata (such as performer, title, album for audio) if sending a file.
Args:
text: The message text to send (optional if sending file).
chat_id: The Telegram chat ID to send to. If not provided, uses ADMIN_ID from env.
audio_path: Path to an audio file to send (optional).
video_path: Path to a video file to send (optional).
document_path: Path to a document file to send (optional).
audio_metadata: Dict of extra metadata for audio (e.g. {"performer": "...", "title": "...", "album": "..."}).
video_metadata: Dict of extra metadata for video (e.g. {"supports_streaming": True, ...}).
document_metadata: Dict of extra metadata for document (e.g. {"file_name": "..."}).
Returns:
Success or error message.
"""
token = TG_TOKEN
target_chat_id = chat_id or ADMIN_ID
if not token or not target_chat_id:
logging.error("TG_TOKEN or ADMIN_ID not set in environment variables.")
return "Error: TG_TOKEN or ADMIN_ID not set in environment variables."
# Handle file sending
file_payloads = [
(audio_path, _prepare_audio_payload, audio_metadata),
(video_path, _prepare_video_payload, video_metadata),
(document_path, _prepare_document_payload, document_metadata),
]
for path, prepare_func, metadata in file_payloads:
if path:
result, error = prepare_func(path, text, target_chat_id, metadata)
if error:
return error
url, data, files, file_key, file_type = result
return _send_telegram_file(
url, data, files, file_key, target_chat_id, file_type
)
# Send plain text message
if text:
return _send_plain_text_message(token, target_chat_id, text)
return "Error: No text or file (audio, video, document) provided to send."
def _sanitize_filename(name):
name = re.sub(r"[\\/*?\"<>|]", "_", name)
return name.strip()
def _build_filename(message, file_id, ext, default_prefix):
date_unix = message.get("date")
if date_unix:
dt = datetime.utcfromtimestamp(date_unix)
date_str = dt.strftime("%Y-%m-%d_%H-%M-%S")
else:
date_str = datetime.utcnow().strftime("%Y-%m-%d_%H-%M-%S")
caption = message.get("caption")
if caption:
base = _sanitize_filename(caption)
filename = f"{base}_{date_str}{ext}"
else:
filename = f"{default_prefix}_{date_str}{ext}"
return filename
def _download_telegram_file(
file_id,
message,
ext,
default_prefix,
telegram_downloads_dir,
token,
suggested_filename=None,
):
file_info_url = f"https://api.telegram.org/bot{token}/getFile"
resp = requests.get(file_info_url, params={"file_id": file_id}, timeout=10)
if resp.status_code != 200 or not resp.json().get("ok"):
logging.error("Failed to get file info for file_id %s: %s", file_id, resp.text)
return None
file_path = resp.json()["result"]["file_path"]
file_url = f"https://api.telegram.org/file/bot{token}/{file_path}"
if suggested_filename:
local_filename = _sanitize_filename(suggested_filename)
else:
local_filename = _build_filename(message, file_id, ext, default_prefix)
local_path = os.path.join(telegram_downloads_dir, local_filename)
base, ext2 = os.path.splitext(local_path)
counter = 1
while os.path.exists(local_path):
local_path = f"{base}_{counter}{ext2}"
counter += 1
try:
with requests.get(file_url, stream=True, timeout=60) as r:
r.raise_for_status()
with open(local_path, "wb") as f:
for chunk in r.iter_content(chunk_size=8192):
if chunk:
f.write(chunk)
logging.info("Downloaded Telegram file to %s", local_path)
return local_path
except Exception as e:
logging.error("Exception downloading Telegram file: %s", e)
return None
@mcp.tool()
async def receive_telegram_updates(
offset: Optional[int] = None,
limit: Optional[int] = 10,
timeout: Optional[int] = 0,
) -> List[Dict[str, Any]]:
"""
Receive updates (messages, etc.) sent to the Telegram bot.
If any type of file is present in the update, download it to the download folder under telegram_downloads.
If there is a caption, use it as the name plus the date the message has been sent; else just use the date up to the seconds.
Args:
offset: Identifier of the first update to be returned. (Optional)
limit: Limits the number of updates to be retrieved. (Default: 10)
timeout: Timeout in seconds for long polling. (Default: 0)
Returns:
A list of update dicts received from Telegram.
"""
token = TG_TOKEN
if not token:
logging.error("TG_TOKEN not set in environment variables.")
return [{"error": "TG_TOKEN not set in environment variables."}]
url = f"https://api.telegram.org/bot{token}/getUpdates"
params = {
"limit": limit,
"timeout": timeout,
}
if offset is not None:
params["offset"] = offset
telegram_downloads_dir = os.path.join(DOWNLOADS_BASE_PATH, "telegram_downloads")
os.makedirs(telegram_downloads_dir, exist_ok=True)
try:
response = requests.get(url, params=params, timeout=timeout + 10)
if response.status_code != 200:
logging.error("Failed to receive Telegram updates: %s", response.text)
return [{"error": f"Failed to receive Telegram updates: {response.text}"}]
data = response.json()
if not data.get("ok"):
logging.error("Failed to receive Telegram updates: %r", data)
return [{"error": f"Failed to receive Telegram updates: {data!r}"}]
updates = data.get("result", [])
logging.info("Received %d Telegram updates", len(updates))
file_types = [
("document", "file_name", ".bin", "document"),
("audio", "file_name", ".mp3", "audio"),
("video", "file_name", ".mp4", "video"),
("voice", None, ".ogg", "voice"),
("photo", None, ".jpg", "photo"),
("sticker", None, ".webp", "sticker"),
("video_note", None, ".mp4", "video_note"),
("animation", "file_name", ".mp4", "animation"),
]
for update in updates:
message = update.get("message") or update.get("edited_message")
if not message:
continue
for file_type, name_key, default_ext, default_prefix in file_types:
if file_type in message:
file_obj = message[file_type]
if file_type == "photo" and isinstance(file_obj, list) and file_obj:
file_obj = max(file_obj, key=lambda x: x.get("file_size", 0))
file_id = file_obj.get("file_id")
if not file_id:
continue
ext = default_ext
suggested_filename = None
if name_key and file_obj.get(name_key):
suggested_filename = file_obj[name_key]
_, ext_from_name = os.path.splitext(suggested_filename)
if ext_from_name:
ext = ext_from_name
local_path = _download_telegram_file(
file_id,
message,
ext,
default_prefix,
telegram_downloads_dir,
token,
suggested_filename=None,
)
if local_path:
if "downloaded_files" not in update:
update["downloaded_files"] = []
update["downloaded_files"].append(local_path)
return updates
except Exception as e:
logging.error("Exception receiving Telegram updates: %s", e)
return [{"error": f"Exception receiving Telegram updates: {e}"}]