Skip to main content
Glama
bot.py27.2 kB
"""Telegram Bot handlers""" import os import logging from datetime import datetime from telegram import Update, InlineKeyboardButton, InlineKeyboardMarkup from telegram.ext import Application, CommandHandler, MessageHandler, CallbackQueryHandler, filters, ContextTypes from .session import registry from .message_queue import message_queue from . import config logger = logging.getLogger(__name__) # User context management user_contexts = {} # {user_id: {"active_session": session_id}} pending_messages = {} # {user_id: message_text} def format_time_ago(iso_time: str) -> str: """Format ISO timestamp as human-readable time ago""" try: dt = datetime.fromisoformat(iso_time) now = datetime.now() diff = (now - dt).total_seconds() if diff < 60: return f"{int(diff)}秒前" elif diff < 3600: return f"{int(diff / 60)}分钟前" elif diff < 86400: return f"{int(diff / 3600)}小时前" else: return f"{int(diff / 86400)}天前" except: return iso_time async def cmd_sessions(update: Update, context: ContextTypes.DEFAULT_TYPE): """List all active sessions""" sessions = registry.list_all() if not sessions: await update.message.reply_text("没有活跃会话") return text = "📋 活跃会话:\n\n" for i, (sid, session) in enumerate(sessions.items(), 1): status_emoji = { "running": "▶️", "waiting": "⏸️", "idle": "⏹️" }.get(session.status, "❓") text += ( f"{i}️⃣ {status_emoji} `{sid}`\n" f" 📁 `{session.project_path}`\n" f" 🕐 {format_time_ago(session.last_active)}\n\n" ) await update.message.reply_text(text, parse_mode="Markdown") async def cmd_status(update: Update, context: ContextTypes.DEFAULT_TYPE): """Show session status""" if not context.args: await update.message.reply_text("用法: /status <session_id>") return session_id = context.args[0] session = registry.get(session_id) if not session: await update.message.reply_text(f"❌ 会话 `{session_id}` 不存在", parse_mode="Markdown") return status_emoji = { "running": "▶️ 运行中", "waiting": "⏸️ 等待回复", "idle": "⏹️ 空闲" }.get(session.status, "❓ 未知") status_text = f"""📊 会话状态 ━━━━━━━━━━━━━━━━ 🆔 ID: `{session.session_id}` 📁 项目: `{session.project_path}` ⏱️ 状态: {status_emoji} 🕐 最后活动: {format_time_ago(session.last_active)} """ if session.last_message: status_text += f"💬 最后消息: {session.last_message[:100]}" await update.message.reply_text(status_text, parse_mode="Markdown") async def cmd_to(update: Update, context: ContextTypes.DEFAULT_TYPE): """Send message to specific session (improved with session locking)""" user_id = update.effective_user.id if not context.args: await update.message.reply_text("用法: /to <session_id> [消息]") return session_id = context.args[0] # Check if session exists if not registry.exists(session_id): await update.message.reply_text(f"❌ 会话 `{session_id}` 不存在", parse_mode="Markdown") return # If no message, set as active session if len(context.args) == 1: if user_id not in user_contexts: user_contexts[user_id] = {} user_contexts[user_id]['active_session'] = session_id await update.message.reply_text( f"📌 已切换到会话: `{session_id}`\n\n" f"✅ 后续消息将自动发送到此会话\n" f"💡 使用 `/keep off` 取消锁定", parse_mode="Markdown" ) return # Has message, send and set as active session message = " ".join(context.args[1:]) message_queue.push(session_id, message) # Also set as active session if user_id not in user_contexts: user_contexts[user_id] = {} user_contexts[user_id]['active_session'] = session_id await update.message.reply_text( f"✅ 消息已发送到 `{session_id}`\n\n" f"💬 {message}\n\n" f"📌 已锁定此会话,后续消息将自动发送到这里", parse_mode="Markdown" ) async def cmd_file(update: Update, context: ContextTypes.DEFAULT_TYPE): """View or download project file""" if len(context.args) < 2: await update.message.reply_text( "用法: /file <session_id> <file_path> [download]\n\n" "示例:\n" "/file testtg src/main.py - 查看文件内容\n" "/file testtg config.json download - 下载文件" ) return session_id = context.args[0] file_path = context.args[1] download_mode = len(context.args) > 2 and context.args[2] == "download" session = registry.get(session_id) if not session: await update.message.reply_text(f"❌ 会话 `{session_id}` 不存在", parse_mode="Markdown") return full_path = os.path.join(session.project_path, file_path) if not os.path.exists(full_path): await update.message.reply_text(f"❌ 文件不存在: `{file_path}`", parse_mode="Markdown") return # Check if it's a file (not directory) if not os.path.isfile(full_path): await update.message.reply_text(f"❌ 不是文件: `{file_path}`", parse_mode="Markdown") return # If download mode, always send as document if download_mode: try: with open(full_path, 'rb') as f: await update.message.reply_document( document=f, filename=os.path.basename(file_path), caption=f"📄 {file_path}" ) except Exception as e: await update.message.reply_text(f"❌ 发送文件失败: {str(e)}") return # Otherwise, try to display content try: with open(full_path, 'r', encoding='utf-8') as f: content = f.read() # If file is too large, send as document if len(content) > 4000: with open(full_path, 'rb') as f: await update.message.reply_document( document=f, filename=os.path.basename(file_path), caption=f"📄 {file_path} (文件过大,作为附件发送)\n💡 提示: 可直接下载查看" ) else: # Detect language for syntax highlighting ext = os.path.splitext(file_path)[1] lang_map = { ".py": "python", ".js": "javascript", ".ts": "typescript", ".go": "go", ".rs": "rust", ".java": "java", ".c": "c", ".cpp": "cpp", ".h": "c", ".hpp": "cpp", ".sh": "bash", ".json": "json", ".yaml": "yaml", ".yml": "yaml", ".xml": "xml", ".html": "html", ".css": "css", ".md": "markdown" } lang = lang_map.get(ext, "") # Truncate if still too long for Telegram if len(content) > 3800: content = content[:3800] + "\n\n... (已截断)" # Try to send with Markdown try: await update.message.reply_text( f"📄 `{file_path}`\n\n```{lang}\n{content}\n```", parse_mode="Markdown" ) except Exception: # Fallback: send as plain text if Markdown fails await update.message.reply_text( f"📄 {file_path}\n\n{content}" ) except UnicodeDecodeError: # Binary file, send as document try: with open(full_path, 'rb') as f: await update.message.reply_document( document=f, filename=os.path.basename(file_path), caption=f"📄 {file_path} (二进制文件)" ) except Exception as e: await update.message.reply_text(f"❌ 发送文件失败: {str(e)}") except Exception as e: await update.message.reply_text(f"❌ 读取文件失败: {str(e)}") async def handle_photo(update: Update, context: ContextTypes.DEFAULT_TYPE): """Handle photo messages""" # Get largest photo photo = update.message.photo[-1] caption = update.message.caption or "用户发送了一张图片" # Download photo try: file = await context.bot.get_file(photo.file_id) file_path = f"/tmp/telegram_photo_{photo.file_id}.jpg" await file.download_to_drive(file_path) # Format message with image info photo_message = f"[图片] {caption}\n文件路径: {file_path}" # Check if this is a reply to a message (priority) if update.message.reply_to_message: replied_text = update.message.reply_to_message.text or update.message.reply_to_message.caption or "" import re session_id = None # Try to extract session_id match = re.search(r'`([a-zA-Z0-9_-]+)`', replied_text) if match and registry.exists(match.group(1)): session_id = match.group(1) if not session_id: all_sessions = registry.list_all() for sid in all_sessions.keys(): if sid in replied_text: session_id = sid break if session_id: message_queue.push(session_id, photo_message) try: await update.message.set_reaction("👍") except: pass return # No reply, or couldn't extract session - find waiting sessions waiting_sessions = registry.list_waiting() if len(waiting_sessions) == 0: await update.message.reply_text("没有会话在等待。图片已保存到:\n" + file_path) elif len(waiting_sessions) == 1: session_id = list(waiting_sessions.keys())[0] message_queue.push(session_id, photo_message) try: await update.message.set_reaction("👍") except: pass else: # Store photo data first import json import hashlib # Use hash to create short ID photo_id = hashlib.md5(photo.file_id.encode()).hexdigest()[:8] photo_data = { 'file_path': file_path, 'caption': caption, 'message': photo_message } with open(f"/tmp/photo_{photo_id}.json", 'w') as f: json.dump(photo_data, f) # Create buttons with short callback_data keyboard = [] for sid in waiting_sessions.keys(): keyboard.append([ InlineKeyboardButton( f"📤 发送到 {sid}", callback_data=f"photo_{photo_id}_{sid}" # Short format: photo_HASH_SESSION ) ]) reply_markup = InlineKeyboardMarkup(keyboard) await update.message.reply_text( f"多个会话在等待,请选择:\n\n📷 {caption}", reply_markup=reply_markup ) except Exception as e: await update.message.reply_text(f"❌ 处理图片失败: {str(e)}") async def handle_message(update: Update, context: ContextTypes.DEFAULT_TYPE): """Handle non-command messages with smart routing""" user_id = update.effective_user.id message_text = update.message.text # Check if user has active session context if user_id in user_contexts and 'active_session' in user_contexts[user_id]: # Has active session, send directly session_id = user_contexts[user_id]['active_session'] await send_to_session(update, session_id, message_text) return # No active session, check how many sessions exist sessions = registry.list_all() if not sessions: await update.message.reply_text( "❌ 没有活跃会话\n\n" "请先在 AI 编程工具中启动会话,或使用命令:\n" "• `/sessions` - 查看会话\n" "• `/help` - 查看帮助" ) return if len(sessions) == 1: # Only one session, send directly session_id = list(sessions.keys())[0] await send_to_session(update, session_id, message_text) return # Multiple sessions, show selection buttons pending_messages[user_id] = message_text keyboard = [] for sid, session in sessions.items(): status_emoji = { "running": "▶️", "waiting": "⏸️", "idle": "⏹️" }.get(session.status, "❓") button_text = f"{status_emoji} {sid}" keyboard.append([InlineKeyboardButton( button_text, callback_data=f"send_to:{sid}" )]) # Add cancel button keyboard.append([InlineKeyboardButton("❌ 取消", callback_data="cancel")]) reply_markup = InlineKeyboardMarkup(keyboard) # Truncate message for display display_message = message_text[:100] + "..." if len(message_text) > 100 else message_text await update.message.reply_text( f"📨 你的消息:\n\n{display_message}\n\n" f"请选择要发送到的会话:", reply_markup=reply_markup ) async def button_callback(update: Update, context: ContextTypes.DEFAULT_TYPE): """Handle button callbacks""" query = update.callback_query await query.answer() user_id = update.effective_user.id data = query.data if data == "cancel": await query.edit_message_text("❌ 已取消") pending_messages.pop(user_id, None) return if data.startswith("send_to:"): session_id = data.split(":", 1)[1] message_text = pending_messages.get(user_id) if not message_text: await query.edit_message_text("❌ 消息已过期,请重新发送") return # Send message if not registry.exists(session_id): await query.edit_message_text(f"❌ 会话 `{session_id}` 不存在", parse_mode="Markdown") return message_queue.push(session_id, message_text) await query.edit_message_text( f"✅ 消息已发送到 `{session_id}`\n\n" f"💬 {message_text}", parse_mode="Markdown" ) pending_messages.pop(user_id, None) return if data.startswith("exec:"): # Handle action button clicks action_id = data.split(":", 1)[1] await handle_action_execution(query, action_id) return async def handle_action_execution(query, action_id: str): """Handle execution of action buttons""" try: import json from pathlib import Path # Load actions from file actions_file = Path.home() / ".telegram-mcp-actions.json" if not actions_file.exists(): await query.edit_message_text("❌ 操作已过期") return with open(actions_file, 'r') as f: actions = json.load(f) if action_id not in actions: await query.edit_message_text("❌ 操作已过期") return action_data = actions[action_id] session_id = action_data["session_id"] command = action_data["command"] # Check if session still exists if not registry.exists(session_id): await query.edit_message_text(f"❌ 会话 `{session_id}` 不存在", parse_mode="Markdown") return # Send command to session message_queue.push(session_id, command) # Update message to show execution await query.edit_message_text( f"✅ 已执行操作\n\n" f"📤 发送到: `{session_id}`\n" f"💬 指令: {command}", parse_mode="Markdown" ) # Remove executed action actions.pop(action_id, None) with open(actions_file, 'w') as f: json.dump(actions, f, indent=2) except Exception as e: logger.error(f"Failed to execute action {action_id}: {e}") await query.edit_message_text(f"❌ 执行失败: {str(e)}") async def send_to_session(update: Update, session_id: str, message: str): """Send message to session""" if not registry.exists(session_id): await update.message.reply_text(f"❌ 会话 `{session_id}` 不存在", parse_mode="Markdown") return message_queue.push(session_id, message) await update.message.reply_text( f"✅ 消息已发送到 `{session_id}`\n\n" f"💬 {message}", parse_mode="Markdown" ) async def handle_plain_message(update: Update, context: ContextTypes.DEFAULT_TYPE): """Handle plain text messages (auto-route to waiting sessions)""" message = update.message.text # Check if this is a reply to a previous message if update.message.reply_to_message: # Try to extract session_id from the replied message replied_text = update.message.reply_to_message.text or update.message.reply_to_message.caption or "" # Extract session_id using multiple patterns import re session_id = None # Pattern 1: [`session-id`] match = re.search(r'\[`([a-zA-Z0-9_-]+)`\]', replied_text) if match: session_id = match.group(1) # Pattern 2: [session-id] if not session_id: match = re.search(r'\[([a-zA-Z0-9_-]+)\]', replied_text) if match: session_id = match.group(1) # Pattern 3: `session-id` (without brackets) - like: ✅ `test-new` if not session_id: match = re.search(r'`([a-zA-Z0-9_-]+)`', replied_text) if match: candidate = match.group(1) if registry.exists(candidate): session_id = candidate # Pattern 4: Search for any known session ID in the text if not session_id: all_sessions = registry.list_all() for sid in all_sessions.keys(): if sid in replied_text: session_id = sid break if session_id and registry.exists(session_id): message_queue.push(session_id, message) # Silent - user will see Claude's response # Add a subtle reaction to confirm try: await update.message.set_reaction("👍") except: pass # Reaction might not be supported return # If we couldn't extract session_id, show available sessions all_sessions = registry.list_all() text = "⚠️ 无法识别回复的会话。\n\n" if all_sessions: text += "可用会话:\n" for sid in all_sessions.keys(): text += f" /to {sid} {message}\n" else: text += "当前没有活跃会话。" await update.message.reply_text(text) return # Normal message routing (not a reply) # Find sessions waiting for reply waiting_sessions = registry.list_waiting() if len(waiting_sessions) == 0: await update.message.reply_text( "没有会话在等待回复。\n" "使用 /to <session_id> <消息> 向指定会话发送消息。" ) elif len(waiting_sessions) == 1: # Auto-route to the only waiting session session_id = list(waiting_sessions.keys())[0] message_queue.push(session_id, message) # Silent confirmation - user will see Claude's response directly # No need to confirm, reduces noise else: # Multiple sessions waiting, ask user to choose text = "多个会话在等待回复,请选择:\n\n" for sid in waiting_sessions.keys(): text += f"/to {sid} {message}\n" await update.message.reply_text(text) async def handle_callback(update: Update, context: ContextTypes.DEFAULT_TYPE): """Handle callback queries from inline keyboards""" query = update.callback_query await query.answer() data = query.data # Handle photo callback: photo_{photo_id}_{session_id} if data.startswith("photo_"): import json parts = data.split("_") # photo, photo_id, session_id if len(parts) >= 3: photo_id = parts[1] session_id = parts[2] # Load photo data try: with open(f"/tmp/photo_{photo_id}.json", 'r') as f: photo_data = json.load(f) message_queue.push(session_id, photo_data['message']) await query.edit_message_text( f"✅ 图片已发送到会话 `{session_id}`", parse_mode="Markdown" ) # Clean up temp file try: os.remove(f"/tmp/photo_{photo_id}.json") except: pass except Exception as e: await query.edit_message_text(f"❌ 发送失败: {str(e)}") async def cmd_delete(update: Update, context: ContextTypes.DEFAULT_TYPE): """Delete a session""" if not context.args: await update.message.reply_text("用法: /delete <session_id>") return session_id = context.args[0] session = registry.get(session_id) if not session: await update.message.reply_text(f"❌ 会话 `{session_id}` 不存在", parse_mode="Markdown") return # Try to send exit command message_queue.push(session_id, "exit") await update.message.reply_text( f"🗑️ 正在删除会话 `{session_id}`...\n\n" f"已发送退出命令。\n" f"- 如果 AI 助手正在运行,它会收到退出指令\n" f"- 如果进程已关闭,会话将被清理\n\n" f"会话将在 10 秒后从列表中移除。", parse_mode="Markdown" ) # Wait a bit for the session to process exit await asyncio.sleep(10) # Remove session from registry if registry.exists(session_id): registry.sessions.pop(session_id, None) registry._save_to_file() await update.message.reply_text( f"✅ 会话 `{session_id}` 已删除", parse_mode="Markdown" ) async def cmd_keep(update: Update, context: ContextTypes.DEFAULT_TYPE): """Keep sending messages to a specific session""" user_id = update.effective_user.id if not context.args: # Show current active session if user_id in user_contexts and 'active_session' in user_contexts[user_id]: session_id = user_contexts[user_id]['active_session'] await update.message.reply_text( f"📌 当前活跃会话: `{session_id}`\n\n" f"使用 `/keep off` 取消锁定\n" f"使用 `/keep <session_id>` 切换会话", parse_mode="Markdown" ) else: await update.message.reply_text( "❌ 没有活跃会话\n\n" "使用 `/keep <session_id>` 设置活跃会话" ) return session_id = context.args[0] # Special command: cancel lock if session_id.lower() in ['off', 'cancel', 'clear']: if user_id in user_contexts: user_contexts[user_id].pop('active_session', None) await update.message.reply_text("✅ 已取消会话锁定") return # Check if session exists if not registry.exists(session_id): await update.message.reply_text(f"❌ 会话 `{session_id}` 不存在", parse_mode="Markdown") return # Set active session if user_id not in user_contexts: user_contexts[user_id] = {} user_contexts[user_id]['active_session'] = session_id await update.message.reply_text( f"📌 已锁定会话: `{session_id}`\n\n" f"✅ 后续消息将自动发送到此会话\n" f"💡 使用 `/keep off` 取消锁定", parse_mode="Markdown" ) async def cmd_help(update: Update, context: ContextTypes.DEFAULT_TYPE): """Show help message""" help_text = """🤖 Telegram MCP Server 使用帮助 📋 会话管理 /sessions - 列出所有活跃会话 /status <session_id> - 查看会话状态 /delete <session_id> - 删除会话(发送退出命令) 💬 消息发送 /to <session_id> [消息] - 向指定会话发送消息(或锁定会话) /keep <session_id> - 锁定会话(后续消息自动发送) /keep off - 取消会话锁定 直接发送消息 - 自动发送到锁定的会话或唯一等待的会话 📄 文件操作 /file <session_id> <file_path> - 查看文件内容(带语法高亮) /file <session_id> <file_path> download - 下载文件 💬 自然语言请求(在无人值守模式下) "查看 src/main.py" - AI 会自动发送文件 "发送 config.json 给我" - AI 会自动发送 "展示刚才修改的代码" - AI 会智能发送代码段 ❓ 帮助 /help - 显示此帮助信息 💡 提示 - 如果只有一个会话在等待回复,直接发送消息即可 - 会话 ID 通常是项目目录名 - 使用 TELEGRAM_SESSION 环境变量自定义会话名 - 在无人值守模式下,AI 会智能判断何时发送代码/文件 """ await update.message.reply_text(help_text) def setup_bot(token: str) -> Application: """Setup and configure bot""" application = Application.builder().token(token).build() # Add command handlers application.add_handler(CommandHandler("sessions", cmd_sessions)) application.add_handler(CommandHandler("status", cmd_status)) application.add_handler(CommandHandler("to", cmd_to)) application.add_handler(CommandHandler("keep", cmd_keep)) application.add_handler(CommandHandler("file", cmd_file)) application.add_handler(CommandHandler("delete", cmd_delete)) application.add_handler(CommandHandler("help", cmd_help)) application.add_handler(CommandHandler("start", cmd_help)) # Add callback query handler (for inline keyboard buttons) - must be before other handlers application.add_handler(CallbackQueryHandler(button_callback)) # Add photo handler application.add_handler( MessageHandler(filters.PHOTO, handle_photo) ) # Add smart message handler (with session context and selection) application.add_handler( MessageHandler(filters.TEXT & ~filters.COMMAND, handle_message) ) return application

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/batianVolyc/telegram-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server