Skip to main content
Glama
server.py47.8 kB
"""MCP Server implementation""" import os import time import logging import asyncio from typing import Optional from mcp.server import Server from mcp.types import Tool, TextContent from .session import registry from .message_queue import message_queue from . import config logger = logging.getLogger(__name__) # Global telegram bot instance (set by main) telegram_bot = None def get_session_id() -> str: """ Get session ID for current Claude Code instance Priority: TELEGRAM_SESSION env var > current directory name """ if config.TELEGRAM_SESSION_ID: return config.TELEGRAM_SESSION_ID # Use current working directory name as session ID cwd = os.getcwd() session_id = os.path.basename(cwd) return session_id def get_project_path() -> str: """Get absolute path of current project""" return os.getcwd() async def ensure_session_registered(session_id: str) -> None: """ Ensure session is registered Lazy registration: only register when first tool is called """ if not registry.exists(session_id): project_path = get_project_path() chat_id = config.TELEGRAM_CHAT_ID session = registry.register(session_id, project_path, chat_id) logger.info(f"Registered session: {session_id} at {project_path}") # Send notification to Telegram if telegram_bot: try: message = ( f"✅ 新会话已启动\n" f"🆔 `{session_id}`\n" f"📁 `{project_path}`\n" f"使用 /to {session_id} <消息> 与之交互" ) await send_telegram_message(chat_id, message) except Exception as e: logger.error(f"Failed to send registration notification: {e}") def escape_markdown(text: str) -> str: """Escape special characters for Telegram Markdown""" # Characters that need escaping in Telegram Markdown special_chars = ['_', '*', '[', ']', '(', ')', '~', '`', '>', '#', '+', '-', '=', '|', '{', '}', '.', '!'] for char in special_chars: text = text.replace(char, f'\\{char}') return text async def send_telegram_message(chat_id: str, message: str, parse_mode: str = "Markdown") -> None: """Send message to Telegram (async) using HTTP API""" import httpx url = f"https://api.telegram.org/bot{config.TELEGRAM_BOT_TOKEN}/sendMessage" payload = { "chat_id": chat_id, "text": message } # Only add parse_mode if it's not None if parse_mode: payload["parse_mode"] = parse_mode try: async with httpx.AsyncClient() as client: response = await client.post(url, json=payload, timeout=10.0) response.raise_for_status() except httpx.HTTPStatusError as e: if e.response.status_code == 400 and parse_mode: # Markdown parsing failed, retry without parse_mode logger.warning(f"Markdown parsing failed (400 Bad Request), retrying as plain text") payload.pop("parse_mode", None) async with httpx.AsyncClient() as client: response = await client.post(url, json=payload, timeout=10.0) response.raise_for_status() else: logger.error(f"Failed to send Telegram message: {e}") raise except Exception as e: logger.error(f"Failed to send Telegram message: {e}") raise def get_poll_interval(elapsed_seconds: float) -> int: """ Get polling interval based on elapsed time Progressive slowdown: 30s -> 60s -> 120s """ if elapsed_seconds < config.POLL_THRESHOLDS[0]: # < 10 minutes return config.POLL_INTERVALS[0] # 30 seconds elif elapsed_seconds < config.POLL_THRESHOLDS[1]: # < 1 hour return config.POLL_INTERVALS[1] # 60 seconds else: return config.POLL_INTERVALS[2] # 120 seconds # Create MCP server server = Server("telegram") @server.list_tools() async def list_tools() -> list[Tool]: """List available tools""" return [ Tool( name="telegram_notify", description=""" ⚠️ 这是 Telegram MCP Server 的通知工具 用于向用户的 Telegram Bot 发送任务进度通知 ❌ 这不是通用的 Telegram 消息发送工具 ❌ 不能发送消息到任意 Telegram 用户或群组 ✅ 只能发送通知到配置的 Telegram Bot(用户会在 Telegram 中收到) 💡 推荐使用 telegram_notify_with_actions 代替此工具 telegram_notify_with_actions 提供动态按钮,用户体验更好 此工具适用于: - 简单的状态更新(不需要用户交互) - 快速通知(无需提供下一步建议) - 向后兼容旧代码 参数: - event: 事件类型(completed/error/question/progress) - summary: 简短总结(必填,200字以内) - details: 详细信息(可选) 示例: telegram_notify( event="completed", summary="修复了 auth.py:45 的空指针异常,所有测试通过" ) 💡 更好的选择:使用 telegram_notify_with_actions 提供智能建议按钮 """, inputSchema={ "type": "object", "properties": { "event": { "type": "string", "enum": ["completed", "error", "question", "progress"], "description": "事件类型" }, "summary": { "type": "string", "description": "简短总结(必填,200字以内)", "maxLength": 200 }, "details": { "type": "string", "description": "详细信息(可选)" } }, "required": ["event", "summary"] } ), Tool( name="telegram_notify_with_actions", description=""" ⭐ 推荐:发送带有动态操作按钮的智能通知到 Telegram ⚠️ 这是 Telegram MCP Server 的通知工具(增强版) 用于向用户的 Telegram Bot 发送任务进度通知,并提供智能操作建议 ❌ 这不是通用的 Telegram 消息发送工具 ❌ 不能发送消息到任意 Telegram 用户或群组 ✅ 只能发送通知到配置的 Telegram Bot(用户会在 Telegram 中收到) 这是 telegram_notify 的增强版本,可以根据当前情况为用户提供智能的下一步操作建议。 优势: - ✅ 提供 2-4 个智能操作按钮,用户一键执行 - ✅ 按钮是建议,不强制,用户可以忽略 - ✅ 自动添加提示:"💡 这些是建议的下一步,你也可以直接发送其他指令" - ✅ 即使不提供按钮(actions=[]),也可以正常使用 参数: - event: 事件类型(completed/error/question/progress) - summary: 简短总结(必填,200字以内) - details: 详细信息(可选,建议填写) - actions: 操作按钮列表(可选,最多 4 个) actions 格式: [ { "text": "按钮显示文字", "action": "用户点击后发送的指令", "emoji": "可选的 emoji" } ] 使用场景: 1. 任务完成 - 提供下一步建议: telegram_notify_with_actions( event="completed", summary="✅ 完成用户认证模块\\n- 实现登录/注册\\n- JWT验证\\n- 15个测试通过", details="修改文件:auth.py, user.py\\n测试覆盖率:95%", actions=[ {"text": "实现权限管理", "action": "继续实现权限管理模块,包括角色和权限分配", "emoji": "💡"}, {"text": "优化性能", "action": "优化数据库查询性能,添加缓存层", "emoji": "⚡"} ] ) 2. 遇到错误 - 提供解决方案: telegram_notify_with_actions( event="error", summary="❌ 导入错误\\nModuleNotFoundError: No module named 'jwt'", details="缺少 PyJWT 依赖包", actions=[ {"text": "自动修复", "action": "运行 pip install PyJWT 并重试", "emoji": "🔧"}, {"text": "添加到依赖", "action": "将 PyJWT 添加到 requirements.txt", "emoji": "📝"}, {"text": "显示错误代码", "action": "显示出错位置的代码", "emoji": "🔍"} ] ) 3. 需要决策 - 提供选项: telegram_notify_with_actions( event="question", summary="❓ 数据库选择\\n需要选择数据库方案", details="方案A:PostgreSQL - 功能强大\\n方案B:SQLite - 简单轻量", actions=[ {"text": "PostgreSQL(推荐)", "action": "使用 PostgreSQL,我会配置 docker-compose", "emoji": "1️⃣"}, {"text": "SQLite", "action": "使用 SQLite,适合小型项目", "emoji": "2️⃣"} ] ) 按钮设计原则: - 明确具体:"💡 优化这 3 处性能瓶颈" 而不是 "优化" - 标记推荐:用 💡 标记推荐选项,但不强迫用户选择 - 数量适中:最多 4 个按钮,避免选择困难 - 可选性:用户可以忽略按钮,直接发送其他指令 注意: - 按钮是建议,不是强制选择 - 如果没有明确的下一步,可以不提供按钮(actions=[]) - 消息末尾会自动添加提示:"💡 这些是建议的下一步,你也可以直接发送其他指令" - 简单确认或自动进行的过程不需要按钮 """, inputSchema={ "type": "object", "properties": { "event": { "type": "string", "enum": ["completed", "error", "question", "progress"], "description": "事件类型" }, "summary": { "type": "string", "description": "简短总结(必填,200字以内)", "maxLength": 200 }, "details": { "type": "string", "description": "详细信息(可选,建议填写)" }, "actions": { "type": "array", "description": "操作按钮列表(可选,最多 4 个)", "items": { "type": "object", "properties": { "text": { "type": "string", "description": "按钮显示文字" }, "action": { "type": "string", "description": "用户点击后发送的指令" }, "emoji": { "type": "string", "description": "可选的 emoji" } }, "required": ["text", "action"] }, "maxItems": 4 } }, "required": ["event", "summary"] } ), Tool( name="telegram_wait_reply", description=""" 等待用户回复(阻塞式轮询) 参数: - max_wait: 最长等待时间(秒),默认604800(7天/1周) 行为: - 前10分钟:每30秒检查一次 - 10分钟-1小时:每60秒检查一次 - 1小时以上:每120秒检查一次 - 用户可以按 Ctrl+C 中断等待 - 超时返回 timeout: true 返回: - reply: 用户回复内容 - timeout: 是否超时 - interrupted: 是否被用户中断 """, inputSchema={ "type": "object", "properties": { "max_wait": { "type": "integer", "description": "最长等待时间(秒),默认604800(7天)", "default": 604800 } } } ), Tool( name="telegram_send", description=""" 发送自由格式消息到 Telegram(不推荐,请优先使用 telegram_notify) 自动处理: - 超过300字自动截断 - 会提示使用 telegram_notify 发送结构化消息 """, inputSchema={ "type": "object", "properties": { "message": { "type": "string", "description": "消息内容" } }, "required": ["message"] } ), Tool( name="telegram_send_code", description=""" 发送代码段到 Telegram(带语法高亮) ⚠️ 使用场景(仅在必要时使用): - 遇到关键错误需要展示问题代码 - 修复了重要 bug,需要展示修复方案 - 用户明确要求查看某段代码 - 需要用户 review 关键代码片段 ❌ 不要使用的场景: - 一般性任务完成(使用 telegram_notify) - 创建了新文件(使用 telegram_send_file) - 例行操作(使用 telegram_notify 总结即可) 参数: - code: 代码内容(建议不超过50行) - language: 编程语言(python/javascript/go/rust/bash/json/yaml等) - caption: 可选说明文字(建议填写,解释发送这段代码的原因) 示例: telegram_send_code( code="def hello():\\n print('Hello')", language="python", caption="修复了空指针异常的关键函数" ) """, inputSchema={ "type": "object", "properties": { "code": { "type": "string", "description": "代码内容" }, "language": { "type": "string", "description": "编程语言(python/javascript/go/rust/bash/json/yaml等)", "default": "" }, "caption": { "type": "string", "description": "可选说明文字" } }, "required": ["code"] } ), Tool( name="telegram_send_image", description=""" 发送图片到 Telegram ⚠️ 使用场景: - 生成了图表、可视化结果 - 创建了截图、示意图 - 需要用户查看图片内容 - 图片格式:PNG, JPG, GIF, WebP 等 参数: - image_path: 图片文件路径(相对于项目目录或绝对路径) - caption: 可选说明文字 示例: telegram_send_image( image_path="output/chart.png", caption="性能测试结果图表" ) """, inputSchema={ "type": "object", "properties": { "image_path": { "type": "string", "description": "图片文件路径" }, "caption": { "type": "string", "description": "可选说明文字" } }, "required": ["image_path"] } ), Tool( name="telegram_send_file", description=""" 发送文件到 Telegram ⚠️ 使用场景(仅在必要时使用): - 用户明确要求查看某个文件:"查看 config.json"、"发送 main.py 给我" - 创建了重要的输出文件需要用户确认(如配置文件、报告等) - 生成了图表、图片等需要展示的文件 - 需要用户下载某个文件进行后续操作 ❌ 不要使用的场景: - 创建普通代码文件(用 telegram_notify 总结即可) - 修改了文件但不需要用户查看内容 - 例行的文件操作 优先级: 1. 优先使用 telegram_notify 总结文件变更 2. 如果用户明确要求,或确实需要查看,才发送文件 3. 对于代码文件,如果只需要展示关键片段,优先使用 telegram_send_code 参数: - file_path: 文件路径(相对于项目目录或绝对路径) - caption: 可选说明文字 支持: - 文本文件、代码文件 - 图片、PDF、文档等任意文件类型 - 自动处理大文件 示例: telegram_send_file( file_path="src/main.py", caption="修复后的主文件(用户要求查看)" ) """, inputSchema={ "type": "object", "properties": { "file_path": { "type": "string", "description": "文件路径" }, "caption": { "type": "string", "description": "可选说明文字" } }, "required": ["file_path"] } ), Tool( name="telegram_get_context_info", description=""" 获取并发送当前会话的上下文信息到 Telegram ⚠️ 重要:调用此工具时,你应该: 1. 检查系统提供的 token 使用量信息(<budget:token_budget> 标签) 2. 将 token 使用情况包含在发送的信息中 3. 发送完整的上下文信息(会话信息 + token 使用情况) 返回信息应包括: - 会话 ID 和项目路径 - Token 使用量(已使用 / 总量 / 剩余 / 使用率) - 会话运行时间 - 系统信息 - Telegram 配置 示例输出格式: 📊 会话上下文信息 🆔 会话: testtg 📁 项目: /path/to/project 💾 Token 使用: - 已使用: 41,853 tokens - 总容量: 1,000,000 tokens - 剩余: 958,147 tokens - 使用率: 4.2% ⏱️ 运行时间: 15 分钟 🖥️ 系统: Darwin 24.6.0 🐍 Python: 3.14.0 """, inputSchema={ "type": "object", "properties": { "token_used": { "type": "integer", "description": "已使用的 token 数量(从系统预算信息中获取)" }, "token_total": { "type": "integer", "description": "总 token 容量(从系统预算信息中获取)", "default": 1000000 } } } ), Tool( name="telegram_unattended_mode", description=""" ⚠️ 这是 Telegram MCP Server 的无人值守模式工具 用于等待用户通过 Telegram Bot 发送的下一步指令 ❌ 这不是通用的 Telegram 操作工具 ❌ 不用于发送 Telegram 消息(使用 telegram_notify 系列工具) ❌ 不用于管理 Telegram 群组或频道 ✅ 正确用途:远程任务循环 - 等待用户通过 Telegram 发送指令 工作流程: 1. 执行当前任务 2. 使用 telegram_notify_with_actions 发送结果(带智能按钮) 3. 调用 telegram_unattended_mode 等待用户通过 Telegram 发送的下一步指令 4. 收到指令后执行,重复循环 示例场景: 用户说:"进入无人值守模式,任务:分析项目结构" 你应该: 1. 分析项目结构 2. 调用 telegram_notify_with_actions 发送分析结果 3. 调用 telegram_unattended_mode 等待下一步指令 4. 用户在 Telegram 中发送"优化性能" 5. 你收到指令,执行优化 6. 重复步骤 2-5 ⚠️ 重要: - 完成任务后必须调用通知工具发送结果 - telegram_unattended_mode 本身不发送消息,只等待 - 这样用户每次只收到任务结果,不会有重复的等待提示 📋 推荐使用 telegram_notify_with_actions 发送结果: ⭐ 最佳实践(带智能按钮): telegram_notify_with_actions( event="completed", summary="✅ 完成代码审查\\n- 发现 3 个可优化点\\n- 代码质量:B+", actions=[ {"text": "💡 优化这 3 处", "action": "自动优化发现的问题"}, {"text": "📊 查看详情", "action": "显示详细的优化建议"} ] ) ✅ 简单通知(无按钮): telegram_notify_with_actions( event="completed", summary="修复了 auth.py 的空指针异常,测试通过", actions=[] # 不提供按钮 ) 或使用基础版本: telegram_notify( event="completed", summary="创建了 3 个文件:main.py, utils.py, test.py" ) ⚠️ 仅在必要时发送代码/文件: - 遇到无法自动修复的错误 → telegram_send_code 展示错误代码 - 用户明确要求 → telegram_send_file 发送文件 - 修复关键 bug → telegram_send_code 展示修复对比 🎯 智能判断示例: - 任务完成 → telegram_notify_with_actions(带下一步建议按钮) - 遇到错误 → telegram_notify_with_actions(带修复方案按钮) - 需要决策 → telegram_notify_with_actions(带选项按钮) - 简单更新 → telegram_notify(无按钮) 退出方式: - Telegram 发送 "退出" 或 "exit" - Claude Code 按 Ctrl+C 或 ESC 轮询策略: - 前10分钟:每30秒检查一次 - 10分钟-1小时:每60秒检查一次 - 1小时以上:每120秒检查一次 参数: - current_status: 当前任务状态的简短总结(1-2句话) - max_wait: 每次等待的最长时间(秒),默认604800(7天) - silent: 静默模式(不发送等待提示,默认 false) - 首次进入时使用 false(发送提示) - 后续循环使用 true(减少噪音) 返回: - next_instruction: 用户的下一步指令 - should_exit: 是否应该退出无人值守模式 - interrupted: 是否被用户中断(Ctrl+C/ESC) """, inputSchema={ "type": "object", "properties": { "current_status": { "type": "string", "description": "当前任务状态描述" }, "max_wait": { "type": "integer", "description": "最长等待时间(秒),默认604800(7天)", "default": 604800 } }, "required": [] } ) ] @server.call_tool() async def call_tool(name: str, arguments: dict) -> list[TextContent]: """Handle tool calls""" # Validate configuration try: config.validate_config() except ValueError as e: return [TextContent(type="text", text=f"配置错误: {str(e)}")] session_id = get_session_id() # Ensure session is registered (lazy registration) await ensure_session_registered(session_id) session = registry.get(session_id) if name == "telegram_notify": return await handle_telegram_notify(session, arguments) elif name == "telegram_notify_with_actions": return await handle_telegram_notify_with_actions(session, arguments) elif name == "telegram_wait_reply": return await handle_telegram_wait_reply(session, arguments) elif name == "telegram_send": return await handle_telegram_send(session, arguments) elif name == "telegram_send_code": return await handle_telegram_send_code(session, arguments) elif name == "telegram_send_image": return await handle_telegram_send_image(session, arguments) elif name == "telegram_send_file": return await handle_telegram_send_file(session, arguments) elif name == "telegram_get_context_info": return await handle_telegram_get_context_info(session, arguments) elif name == "telegram_unattended_mode": return await handle_telegram_unattended_mode(session, arguments) else: return [TextContent(type="text", text=f"Unknown tool: {name}")] async def handle_telegram_notify(session, arguments: dict) -> list[TextContent]: """Handle telegram_notify tool""" event = arguments.get("event") summary = arguments.get("summary", "") details = arguments.get("details", "") # Validate summary length if len(summary) > 200: return [TextContent( type="text", text="错误: summary 过长,请精炼到200字以内" )] # Format message emoji = { "completed": "✅", "error": "❌", "question": "❓", "progress": "⏳" } message = f"{emoji.get(event, '🔔')} [`{session.session_id}`]\n{summary}" if details: message += f"\n\n━━━━━━━━━━━━\n📝 详情:\n{details}" # Update session session.last_message = summary session.update_activity() registry.update_session(session) # Save to shared storage # Send to Telegram try: await send_telegram_message(session.chat_id, message) return [TextContent( type="text", text=f"✅ 已发送通知到 Telegram (会话: {session.session_id})" )] except Exception as e: return [TextContent( type="text", text=f"❌ 发送失败: {str(e)}" )] async def handle_telegram_notify_with_actions(session, arguments: dict) -> list[TextContent]: """Handle telegram_notify_with_actions tool""" event = arguments.get("event") summary = arguments.get("summary", "") details = arguments.get("details", "") actions = arguments.get("actions", []) # Validate summary length if len(summary) > 200: return [TextContent( type="text", text="错误: summary 过长,请精炼到200字以内" )] # Validate actions count if len(actions) > 4: return [TextContent( type="text", text="错误: 最多只能提供 4 个操作按钮" )] # Format message emoji_map = { "completed": "✅", "error": "❌", "question": "❓", "progress": "⏳" } message = f"{emoji_map.get(event, '🔔')} [`{session.session_id}`]\n{summary}" if details: message += f"\n\n━━━━━━━━━━━━\n📝 详情:\n{details}" # Add hint about buttons if actions: message += "\n\n💡 这些是建议的下一步,你也可以直接发送其他指令" # Update session session.last_message = summary session.update_activity() registry.update_session(session) # Send to Telegram with buttons try: import httpx import json import hashlib import time from pathlib import Path # Create inline keyboard keyboard = [] action_store = {} for idx, action in enumerate(actions): emoji_prefix = action.get("emoji", "") text = f"{emoji_prefix} {action['text']}" if emoji_prefix else action['text'] # Generate unique action ID action_id = hashlib.md5( f"{session.session_id}:{time.time()}:{idx}".encode() ).hexdigest()[:16] # Store action command action_store[action_id] = { "session_id": session.session_id, "command": action["action"], "timestamp": time.time() } keyboard.append([{ "text": text, "callback_data": f"exec:{action_id}" }]) # Save action store to a temporary file if action_store: actions_file = Path.home() / ".telegram-mcp-actions.json" # Load existing actions existing_actions = {} if actions_file.exists(): try: with open(actions_file, 'r') as f: existing_actions = json.load(f) except Exception: pass # Merge and save existing_actions.update(action_store) # Clean old actions (older than 1 hour) current_time = time.time() existing_actions = { k: v for k, v in existing_actions.items() if current_time - v.get("timestamp", 0) < 3600 } with open(actions_file, 'w') as f: json.dump(existing_actions, f, indent=2) # Send message with inline keyboard url = f"https://api.telegram.org/bot{config.TELEGRAM_BOT_TOKEN}/sendMessage" payload = { "chat_id": session.chat_id, "text": message, "parse_mode": "Markdown" } if keyboard: payload["reply_markup"] = {"inline_keyboard": keyboard} async with httpx.AsyncClient() as client: try: # Try with Markdown first response = await client.post(url, json=payload, timeout=10.0) response.raise_for_status() except httpx.HTTPStatusError as e: if e.response.status_code == 400: # Markdown parsing failed, retry without parse_mode logger.warning(f"Markdown parsing failed, retrying as plain text") payload.pop("parse_mode", None) response = await client.post(url, json=payload, timeout=10.0) response.raise_for_status() else: raise return [TextContent( type="text", text=f"✅ 已发送通知到 Telegram (会话: {session.session_id}, 包含 {len(actions)} 个操作按钮)" )] except Exception as e: logger.error(f"Failed to send notification with actions: {e}") return [TextContent( type="text", text=f"❌ 发送失败: {str(e)}" )] async def handle_telegram_wait_reply(session, arguments: dict) -> list[TextContent]: """Handle telegram_wait_reply tool""" max_wait = arguments.get("max_wait", config.TELEGRAM_MAX_WAIT) logger.info(f"Session {session.session_id} waiting for reply (max {max_wait}s)") # Mark session as waiting session.set_waiting() registry.update_session(session) # Save to shared storage # Poll for messages start_time = time.time() try: while True: elapsed = time.time() - start_time # Check timeout if elapsed >= max_wait: session.set_running() registry.update_session(session) # Save to shared storage logger.info(f"Session {session.session_id} wait timeout") return [TextContent( type="text", text=f"超时: 等待了 {int(elapsed)} 秒未收到回复" )] # Check message queue if message_queue.has_messages(session.session_id): reply = message_queue.pop(session.session_id) session.set_running() registry.update_session(session) # Save to shared storage logger.info(f"Session {session.session_id} received reply: {reply}") return [TextContent( type="text", text=f"用户回复: {reply}" )] # Progressive polling interval = get_poll_interval(elapsed) logger.debug(f"Session {session.session_id} polling (interval={interval}s, elapsed={int(elapsed)}s)") await asyncio.sleep(interval) except (KeyboardInterrupt, asyncio.CancelledError): session.set_running() registry.update_session(session) # Save to shared storage logger.info(f"Session {session.session_id} wait interrupted by user") return [TextContent( type="text", text=f"⚠️ 等待被用户中断 (Ctrl+C)\n\n已等待: {int(time.time() - start_time)} 秒\n\n你可以继续正常对话。" )] async def handle_telegram_send(session, arguments: dict) -> list[TextContent]: """Handle telegram_send tool""" message = arguments.get("message", "") # Auto-truncate if too long if len(message) > 300: message = message[:280] + "\n\n... [消息过长已截断,建议使用 telegram_notify]" # Format message formatted = f"🤖 [`{session.session_id}`]\n{message}" # Update session session.last_message = message session.update_activity() registry.update_session(session) # Save to shared storage # Send to Telegram try: await send_telegram_message(session.chat_id, formatted) return [TextContent( type="text", text=f"✅ 已发送消息到 Telegram (会话: {session.session_id})" )] except Exception as e: return [TextContent( type="text", text=f"❌ 发送失败: {str(e)}" )] async def handle_telegram_send_image(session, arguments: dict) -> list[TextContent]: """Handle telegram_send_image tool""" image_path = arguments.get("image_path", "") caption = arguments.get("caption", "") if not image_path: return [TextContent(type="text", text="错误: image_path 参数不能为空")] # Resolve image path (relative to project or absolute) if not os.path.isabs(image_path): full_path = os.path.join(session.project_path, image_path) else: full_path = image_path # Check if file exists if not os.path.exists(full_path): return [TextContent( type="text", text=f"❌ 图片文件不存在: {image_path}" )] if not os.path.isfile(full_path): return [TextContent( type="text", text=f"❌ 不是文件(可能是目录): {image_path}" )] # Build caption if not caption: caption = f"🖼️ [{session.session_id}] {image_path}" else: caption = f"🖼️ [{session.session_id}] {caption}" # Update session session.update_activity() # Send image to Telegram using HTTP API try: import httpx url = f"https://api.telegram.org/bot{config.TELEGRAM_BOT_TOKEN}/sendPhoto" with open(full_path, 'rb') as f: files = {'photo': (os.path.basename(image_path), f, 'image/jpeg')} data = { 'chat_id': session.chat_id, 'caption': caption } async with httpx.AsyncClient() as client: response = await client.post(url, files=files, data=data, timeout=30.0) response.raise_for_status() return [TextContent( type="text", text=f"✅ 已发送图片到 Telegram (会话: {session.session_id}, 图片: {image_path})" )] except Exception as e: return [TextContent( type="text", text=f"❌ 发送图片失败: {str(e)}" )] async def handle_telegram_send_code(session, arguments: dict) -> list[TextContent]: """Handle telegram_send_code tool""" code = arguments.get("code", "") language = arguments.get("language", "") caption = arguments.get("caption", "") if not code: return [TextContent(type="text", text="错误: code 参数不能为空")] # Build message if caption: message = f"📝 [`{session.session_id}`] {caption}\n\n" else: message = f"💻 [`{session.session_id}`] 代码段\n\n" # Add code block with syntax highlighting message += f"```{language}\n{code}\n```" # Update session session.update_activity() # Send to Telegram try: await send_telegram_message(session.chat_id, message) return [TextContent( type="text", text=f"✅ 已发送代码段到 Telegram (会话: {session.session_id}, 语言: {language or '未指定'})" )] except Exception as e: return [TextContent( type="text", text=f"❌ 发送代码段失败: {str(e)}" )] async def handle_telegram_get_context_info(session, arguments: dict) -> list[TextContent]: """Handle telegram_get_context_info tool""" import platform from datetime import datetime token_used = arguments.get("token_used", 0) token_total = arguments.get("token_total", 1000000) # Gather context information info_parts = [] info_parts.append("📊 会话上下文信息") info_parts.append("━━━━━━━━━━━━━━━━") info_parts.append(f"🆔 会话 ID: {session.session_id}") info_parts.append(f"📁 项目路径: {session.project_path}") info_parts.append(f"📂 当前目录: {os.getcwd()}") # Token usage (if provided) if token_used > 0: token_remaining = token_total - token_used usage_percent = (token_used / token_total) * 100 info_parts.append("") info_parts.append("💾 Token 使用情况:") info_parts.append(f"- 已使用: {token_used:,} tokens") info_parts.append(f"- 总容量: {token_total:,} tokens") info_parts.append(f"- 剩余: {token_remaining:,} tokens") info_parts.append(f"- 使用率: {usage_percent:.1f}%") # Session timing created = datetime.fromisoformat(session.created_at) last_active = datetime.fromisoformat(session.last_active) uptime = (datetime.now() - created).total_seconds() info_parts.append("") info_parts.append("⏱️ 会话时间:") info_parts.append(f"- 创建时间: {created.strftime('%Y-%m-%d %H:%M:%S')}") if uptime < 60: info_parts.append(f"- 运行时长: {int(uptime)} 秒") elif uptime < 3600: info_parts.append(f"- 运行时长: {int(uptime / 60)} 分钟") elif uptime < 86400: info_parts.append(f"- 运行时长: {int(uptime / 3600)} 小时") else: info_parts.append(f"- 运行时长: {int(uptime / 86400)} 天") # System info info_parts.append("") info_parts.append("🖥️ 系统环境:") info_parts.append(f"- 操作系统: {platform.system()} {platform.release()}") info_parts.append(f"- Python: {platform.python_version()}") info_parts.append(f"- 状态: {session.status}") # Telegram config info_parts.append("") info_parts.append("📱 Telegram 配置:") info_parts.append(f"- 最长等待: {config.TELEGRAM_MAX_WAIT // 86400} 天") info_parts.append(f"- 轮询: {config.POLL_INTERVALS[0]}s → {config.POLL_INTERVALS[1]}s → {config.POLL_INTERVALS[2]}s") message = "\n".join(info_parts) # Update session session.update_activity() # Send to Telegram try: await send_telegram_message(session.chat_id, message) return [TextContent( type="text", text=f"✅ 上下文信息已发送到 Telegram (会话: {session.session_id})\n\n💡 提示:下次调用时传入 token_used 参数可显示 token 使用量" )] except Exception as e: return [TextContent( type="text", text=f"❌ 发送失败: {str(e)}" )] async def handle_telegram_send_file(session, arguments: dict) -> list[TextContent]: """Handle telegram_send_file tool""" file_path = arguments.get("file_path", "") caption = arguments.get("caption", "") if not file_path: return [TextContent(type="text", text="错误: file_path 参数不能为空")] # Resolve file path (relative to project or absolute) if not os.path.isabs(file_path): full_path = os.path.join(session.project_path, file_path) else: full_path = file_path # Check if file exists if not os.path.exists(full_path): return [TextContent( type="text", text=f"❌ 文件不存在: {file_path}" )] if not os.path.isfile(full_path): return [TextContent( type="text", text=f"❌ 不是文件(可能是目录): {file_path}" )] # Build caption if not caption: caption = f"📄 [{session.session_id}] {file_path}" else: caption = f"📄 [{session.session_id}] {caption}" # Update session session.update_activity() # Send file to Telegram using HTTP API try: import httpx url = f"https://api.telegram.org/bot{config.TELEGRAM_BOT_TOKEN}/sendDocument" with open(full_path, 'rb') as f: files = {'document': (os.path.basename(file_path), f, 'application/octet-stream')} data = { 'chat_id': session.chat_id, 'caption': caption } async with httpx.AsyncClient() as client: response = await client.post(url, files=files, data=data, timeout=60.0) response.raise_for_status() return [TextContent( type="text", text=f"✅ 已发送文件到 Telegram (会话: {session.session_id}, 文件: {file_path})" )] except Exception as e: return [TextContent( type="text", text=f"❌ 发送文件失败: {str(e)}" )] async def handle_telegram_unattended_mode(session, arguments: dict) -> list[TextContent]: """Handle telegram_unattended_mode tool""" current_status = arguments.get("current_status", "") max_wait = arguments.get("max_wait", config.TELEGRAM_MAX_WAIT) # Update session state session.last_message = current_status session.update_activity() session.set_waiting() registry.update_session(session) # Save to shared storage # Silent waiting - no notification sent # User should call telegram_notify before calling this tool logger.info(f"Session {session.session_id} in unattended mode, waiting for instruction (silent)") start_time = time.time() try: while True: elapsed = time.time() - start_time # Check timeout if elapsed >= max_wait: session.set_running() registry.update_session(session) # Save to shared storage logger.info(f"Session {session.session_id} unattended mode timeout") return [TextContent( type="text", text=f"⏱️ 超时: 等待了 {int(elapsed)} 秒未收到指令\n\n建议:可以继续调用此工具重新进入等待,或者退出无人值守模式。" )] # Check message queue if message_queue.has_messages(session.session_id): reply = message_queue.pop(session.session_id) session.set_running() registry.update_session(session) # Save to shared storage logger.info(f"Session {session.session_id} received instruction: {reply}") # Check if user wants to exit if reply.lower() in ['退出', 'exit', 'quit', '结束']: return [TextContent( type="text", text=f"🚪 已退出无人值守模式\n\n用户指令: {reply}\n\n你可以继续正常对话,不再自动循环。" )] # Return the instruction return [TextContent( type="text", text=f"📨 收到新指令: {reply}\n\n请执行此指令,完成后再次调用 telegram_unattended_mode 继续循环。" )] # Progressive polling interval = get_poll_interval(elapsed) logger.debug(f"Session {session.session_id} unattended mode polling (interval={interval}s, elapsed={int(elapsed)}s)") await asyncio.sleep(interval) except (KeyboardInterrupt, asyncio.CancelledError): session.set_running() registry.update_session(session) # Save to shared storage logger.info(f"Session {session.session_id} unattended mode interrupted by user") return [TextContent( type="text", text=f"⚠️ 无人值守模式被用户中断 (Ctrl+C)\n\n已运行: {int(time.time() - start_time)} 秒\n\n已退出无人值守模式,你可以继续正常对话。" )]

Implementation Reference

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/batianVolyc/telegram-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server