Skip to main content
Glama

Telegram MCP Server

by batianVolyc
bot.py18.7 kB
"""Telegram Bot handlers""" import os import logging from datetime import datetime from telegram import Update, InlineKeyboardButton, InlineKeyboardMarkup from telegram.ext import Application, CommandHandler, MessageHandler, CallbackQueryHandler, filters, ContextTypes from .session import registry from .message_queue import message_queue logger = logging.getLogger(__name__) def format_time_ago(iso_time: str) -> str: """Format ISO timestamp as human-readable time ago""" try: dt = datetime.fromisoformat(iso_time) now = datetime.now() diff = (now - dt).total_seconds() if diff < 60: return f"{int(diff)}秒前" elif diff < 3600: return f"{int(diff / 60)}分钟前" elif diff < 86400: return f"{int(diff / 3600)}小时前" else: return f"{int(diff / 86400)}天前" except: return iso_time async def cmd_sessions(update: Update, context: ContextTypes.DEFAULT_TYPE): """List all active sessions""" sessions = registry.list_all() if not sessions: await update.message.reply_text("没有活跃会话") return text = "📋 活跃会话:\n\n" for i, (sid, session) in enumerate(sessions.items(), 1): status_emoji = { "running": "▶️", "waiting": "⏸️", "idle": "⏹️" }.get(session.status, "❓") text += ( f"{i}️⃣ {status_emoji} `{sid}`\n" f" 📁 `{session.project_path}`\n" f" 🕐 {format_time_ago(session.last_active)}\n\n" ) await update.message.reply_text(text, parse_mode="Markdown") async def cmd_status(update: Update, context: ContextTypes.DEFAULT_TYPE): """Show session status""" if not context.args: await update.message.reply_text("用法: /status <session_id>") return session_id = context.args[0] session = registry.get(session_id) if not session: await update.message.reply_text(f"❌ 会话 `{session_id}` 不存在", parse_mode="Markdown") return status_emoji = { "running": "▶️ 运行中", "waiting": "⏸️ 等待回复", "idle": "⏹️ 空闲" }.get(session.status, "❓ 未知") status_text = f"""📊 会话状态 ━━━━━━━━━━━━━━━━ 🆔 ID: `{session.session_id}` 📁 项目: `{session.project_path}` ⏱️ 状态: {status_emoji} 🕐 最后活动: {format_time_ago(session.last_active)} """ if session.last_message: status_text += f"💬 最后消息: {session.last_message[:100]}" await update.message.reply_text(status_text, parse_mode="Markdown") async def cmd_to(update: Update, context: ContextTypes.DEFAULT_TYPE): """Send message to specific session""" if len(context.args) < 2: await update.message.reply_text("用法: /to <session_id> <消息>") return session_id = context.args[0] message = " ".join(context.args[1:]) if not registry.exists(session_id): await update.message.reply_text(f"❌ 会话 `{session_id}` 不存在", parse_mode="Markdown") return # Add message to queue message_queue.push(session_id, message) # Silent confirmation (no message) - user will see Claude's response directly # Only send confirmation if session is not waiting (might be delayed) session = registry.get(session_id) if not session.pending_reply: await update.message.reply_text( f"✅ 已加入队列 `{session_id}` (会话当前不在等待状态)", parse_mode="Markdown" ) async def cmd_file(update: Update, context: ContextTypes.DEFAULT_TYPE): """View or download project file""" if len(context.args) < 2: await update.message.reply_text( "用法: /file <session_id> <file_path> [download]\n\n" "示例:\n" "/file testtg src/main.py - 查看文件内容\n" "/file testtg config.json download - 下载文件" ) return session_id = context.args[0] file_path = context.args[1] download_mode = len(context.args) > 2 and context.args[2] == "download" session = registry.get(session_id) if not session: await update.message.reply_text(f"❌ 会话 `{session_id}` 不存在", parse_mode="Markdown") return full_path = os.path.join(session.project_path, file_path) if not os.path.exists(full_path): await update.message.reply_text(f"❌ 文件不存在: `{file_path}`", parse_mode="Markdown") return # Check if it's a file (not directory) if not os.path.isfile(full_path): await update.message.reply_text(f"❌ 不是文件: `{file_path}`", parse_mode="Markdown") return # If download mode, always send as document if download_mode: try: with open(full_path, 'rb') as f: await update.message.reply_document( document=f, filename=os.path.basename(file_path), caption=f"📄 {file_path}" ) except Exception as e: await update.message.reply_text(f"❌ 发送文件失败: {str(e)}") return # Otherwise, try to display content try: with open(full_path, 'r', encoding='utf-8') as f: content = f.read() # If file is too large, send as document if len(content) > 4000: with open(full_path, 'rb') as f: await update.message.reply_document( document=f, filename=os.path.basename(file_path), caption=f"📄 {file_path} (文件过大,作为附件发送)\n💡 提示: 可直接下载查看" ) else: # Detect language for syntax highlighting ext = os.path.splitext(file_path)[1] lang_map = { ".py": "python", ".js": "javascript", ".ts": "typescript", ".go": "go", ".rs": "rust", ".java": "java", ".c": "c", ".cpp": "cpp", ".h": "c", ".hpp": "cpp", ".sh": "bash", ".json": "json", ".yaml": "yaml", ".yml": "yaml", ".xml": "xml", ".html": "html", ".css": "css", ".md": "markdown" } lang = lang_map.get(ext, "") # Truncate if still too long for Telegram if len(content) > 3800: content = content[:3800] + "\n\n... (已截断)" # Try to send with Markdown try: await update.message.reply_text( f"📄 `{file_path}`\n\n```{lang}\n{content}\n```", parse_mode="Markdown" ) except Exception: # Fallback: send as plain text if Markdown fails await update.message.reply_text( f"📄 {file_path}\n\n{content}" ) except UnicodeDecodeError: # Binary file, send as document try: with open(full_path, 'rb') as f: await update.message.reply_document( document=f, filename=os.path.basename(file_path), caption=f"📄 {file_path} (二进制文件)" ) except Exception as e: await update.message.reply_text(f"❌ 发送文件失败: {str(e)}") except Exception as e: await update.message.reply_text(f"❌ 读取文件失败: {str(e)}") async def handle_photo(update: Update, context: ContextTypes.DEFAULT_TYPE): """Handle photo messages""" # Get largest photo photo = update.message.photo[-1] caption = update.message.caption or "用户发送了一张图片" # Download photo try: file = await context.bot.get_file(photo.file_id) file_path = f"/tmp/telegram_photo_{photo.file_id}.jpg" await file.download_to_drive(file_path) # Format message with image info photo_message = f"[图片] {caption}\n文件路径: {file_path}" # Check if this is a reply to a message (priority) if update.message.reply_to_message: replied_text = update.message.reply_to_message.text or update.message.reply_to_message.caption or "" import re session_id = None # Try to extract session_id match = re.search(r'`([a-zA-Z0-9_-]+)`', replied_text) if match and registry.exists(match.group(1)): session_id = match.group(1) if not session_id: all_sessions = registry.list_all() for sid in all_sessions.keys(): if sid in replied_text: session_id = sid break if session_id: message_queue.push(session_id, photo_message) try: await update.message.set_reaction("👍") except: pass return # No reply, or couldn't extract session - find waiting sessions waiting_sessions = registry.list_waiting() if len(waiting_sessions) == 0: await update.message.reply_text("没有会话在等待。图片已保存到:\n" + file_path) elif len(waiting_sessions) == 1: session_id = list(waiting_sessions.keys())[0] message_queue.push(session_id, photo_message) try: await update.message.set_reaction("👍") except: pass else: # Store photo data first import json import hashlib # Use hash to create short ID photo_id = hashlib.md5(photo.file_id.encode()).hexdigest()[:8] photo_data = { 'file_path': file_path, 'caption': caption, 'message': photo_message } with open(f"/tmp/photo_{photo_id}.json", 'w') as f: json.dump(photo_data, f) # Create buttons with short callback_data keyboard = [] for sid in waiting_sessions.keys(): keyboard.append([ InlineKeyboardButton( f"📤 发送到 {sid}", callback_data=f"photo_{photo_id}_{sid}" # Short format: photo_HASH_SESSION ) ]) reply_markup = InlineKeyboardMarkup(keyboard) await update.message.reply_text( f"多个会话在等待,请选择:\n\n📷 {caption}", reply_markup=reply_markup ) except Exception as e: await update.message.reply_text(f"❌ 处理图片失败: {str(e)}") async def handle_plain_message(update: Update, context: ContextTypes.DEFAULT_TYPE): """Handle plain text messages (auto-route to waiting sessions)""" message = update.message.text # Check if this is a reply to a previous message if update.message.reply_to_message: # Try to extract session_id from the replied message replied_text = update.message.reply_to_message.text or update.message.reply_to_message.caption or "" # Extract session_id using multiple patterns import re session_id = None # Pattern 1: [`session-id`] match = re.search(r'\[`([a-zA-Z0-9_-]+)`\]', replied_text) if match: session_id = match.group(1) # Pattern 2: [session-id] if not session_id: match = re.search(r'\[([a-zA-Z0-9_-]+)\]', replied_text) if match: session_id = match.group(1) # Pattern 3: `session-id` (without brackets) - like: ✅ `test-new` if not session_id: match = re.search(r'`([a-zA-Z0-9_-]+)`', replied_text) if match: candidate = match.group(1) if registry.exists(candidate): session_id = candidate # Pattern 4: Search for any known session ID in the text if not session_id: all_sessions = registry.list_all() for sid in all_sessions.keys(): if sid in replied_text: session_id = sid break if session_id and registry.exists(session_id): message_queue.push(session_id, message) # Silent - user will see Claude's response # Add a subtle reaction to confirm try: await update.message.set_reaction("👍") except: pass # Reaction might not be supported return # If we couldn't extract session_id, show available sessions all_sessions = registry.list_all() text = "⚠️ 无法识别回复的会话。\n\n" if all_sessions: text += "可用会话:\n" for sid in all_sessions.keys(): text += f" /to {sid} {message}\n" else: text += "当前没有活跃会话。" await update.message.reply_text(text) return # Normal message routing (not a reply) # Find sessions waiting for reply waiting_sessions = registry.list_waiting() if len(waiting_sessions) == 0: await update.message.reply_text( "没有会话在等待回复。\n" "使用 /to <session_id> <消息> 向指定会话发送消息。" ) elif len(waiting_sessions) == 1: # Auto-route to the only waiting session session_id = list(waiting_sessions.keys())[0] message_queue.push(session_id, message) # Silent confirmation - user will see Claude's response directly # No need to confirm, reduces noise else: # Multiple sessions waiting, ask user to choose text = "多个会话在等待回复,请选择:\n\n" for sid in waiting_sessions.keys(): text += f"/to {sid} {message}\n" await update.message.reply_text(text) async def handle_callback(update: Update, context: ContextTypes.DEFAULT_TYPE): """Handle callback queries from inline keyboards""" query = update.callback_query await query.answer() data = query.data # Handle photo callback: photo_{photo_id}_{session_id} if data.startswith("photo_"): import json parts = data.split("_") # photo, photo_id, session_id if len(parts) >= 3: photo_id = parts[1] session_id = parts[2] # Load photo data try: with open(f"/tmp/photo_{photo_id}.json", 'r') as f: photo_data = json.load(f) message_queue.push(session_id, photo_data['message']) await query.edit_message_text( f"✅ 图片已发送到会话 `{session_id}`", parse_mode="Markdown" ) # Clean up temp file try: os.remove(f"/tmp/photo_{photo_id}.json") except: pass except Exception as e: await query.edit_message_text(f"❌ 发送失败: {str(e)}") async def cmd_delete(update: Update, context: ContextTypes.DEFAULT_TYPE): """Delete a session""" if not context.args: await update.message.reply_text("用法: /delete <session_id>") return session_id = context.args[0] session = registry.get(session_id) if not session: await update.message.reply_text(f"❌ 会话 `{session_id}` 不存在", parse_mode="Markdown") return # Try to send exit command message_queue.push(session_id, "exit") await update.message.reply_text( f"🗑️ 正在删除会话 `{session_id}`...\n\n" f"已发送退出命令。\n" f"- 如果 AI 助手正在运行,它会收到退出指令\n" f"- 如果进程已关闭,会话将被清理\n\n" f"会话将在 10 秒后从列表中移除。", parse_mode="Markdown" ) # Wait a bit for the session to process exit await asyncio.sleep(10) # Remove session from registry if registry.exists(session_id): registry.sessions.pop(session_id, None) registry._save_to_file() await update.message.reply_text( f"✅ 会话 `{session_id}` 已删除", parse_mode="Markdown" ) async def cmd_help(update: Update, context: ContextTypes.DEFAULT_TYPE): """Show help message""" help_text = """🤖 Telegram MCP Server 使用帮助 📋 会话管理 /sessions - 列出所有活跃会话 /status <session_id> - 查看会话状态 /delete <session_id> - 删除会话(发送退出命令) 💬 消息发送 /to <session_id> <消息> - 向指定会话发送消息 直接发送消息 - 自动发送到唯一等待的会话 📄 文件操作 /file <session_id> <file_path> - 查看文件内容(带语法高亮) /file <session_id> <file_path> download - 下载文件 💬 自然语言请求(在无人值守模式下) "查看 src/main.py" - AI 会自动发送文件 "发送 config.json 给我" - AI 会自动发送 "展示刚才修改的代码" - AI 会智能发送代码段 ❓ 帮助 /help - 显示此帮助信息 💡 提示 - 如果只有一个会话在等待回复,直接发送消息即可 - 会话 ID 通常是项目目录名 - 使用 TELEGRAM_SESSION 环境变量自定义会话名 - 在无人值守模式下,AI 会智能判断何时发送代码/文件 """ await update.message.reply_text(help_text) def setup_bot(token: str) -> Application: """Setup and configure bot""" application = Application.builder().token(token).build() # Add command handlers application.add_handler(CommandHandler("sessions", cmd_sessions)) application.add_handler(CommandHandler("status", cmd_status)) application.add_handler(CommandHandler("to", cmd_to)) application.add_handler(CommandHandler("file", cmd_file)) application.add_handler(CommandHandler("delete", cmd_delete)) application.add_handler(CommandHandler("help", cmd_help)) application.add_handler(CommandHandler("start", cmd_help)) # Add callback query handler (for inline keyboard buttons) application.add_handler(CallbackQueryHandler(handle_callback)) # Add photo handler application.add_handler( MessageHandler(filters.PHOTO, handle_photo) ) # Add plain message handler application.add_handler( MessageHandler(filters.TEXT & ~filters.COMMAND, handle_plain_message) ) return application

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/batianVolyc/telegram-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server