Skip to main content
Glama

Telegram MCP Server

by batianVolyc
server.py34.6 kB
"""MCP Server implementation""" import os import time import logging import asyncio from typing import Optional from mcp.server import Server from mcp.types import Tool, TextContent from .session import registry from .message_queue import message_queue from . import config logger = logging.getLogger(__name__) # Global telegram bot instance (set by main) telegram_bot = None def get_session_id() -> str: """ Get session ID for current Claude Code instance Priority: TELEGRAM_SESSION env var > current directory name """ if config.TELEGRAM_SESSION_ID: return config.TELEGRAM_SESSION_ID # Use current working directory name as session ID cwd = os.getcwd() session_id = os.path.basename(cwd) return session_id def get_project_path() -> str: """Get absolute path of current project""" return os.getcwd() async def ensure_session_registered(session_id: str) -> None: """ Ensure session is registered Lazy registration: only register when first tool is called """ if not registry.exists(session_id): project_path = get_project_path() chat_id = config.TELEGRAM_CHAT_ID session = registry.register(session_id, project_path, chat_id) logger.info(f"Registered session: {session_id} at {project_path}") # Send notification to Telegram if telegram_bot: try: message = ( f"✅ 新会话已启动\n" f"🆔 `{session_id}`\n" f"📁 `{project_path}`\n" f"使用 /to {session_id} <消息> 与之交互" ) await send_telegram_message(chat_id, message) except Exception as e: logger.error(f"Failed to send registration notification: {e}") def escape_markdown(text: str) -> str: """Escape special characters for Telegram Markdown""" # Characters that need escaping in Telegram Markdown special_chars = ['_', '*', '[', ']', '(', ')', '~', '`', '>', '#', '+', '-', '=', '|', '{', '}', '.', '!'] for char in special_chars: text = text.replace(char, f'\\{char}') return text async def send_telegram_message(chat_id: str, message: str, parse_mode: str = "Markdown") -> None: """Send message to Telegram (async) using HTTP API""" import httpx url = f"https://api.telegram.org/bot{config.TELEGRAM_BOT_TOKEN}/sendMessage" payload = { "chat_id": chat_id, "text": message } # Only add parse_mode if it's not None if parse_mode: payload["parse_mode"] = parse_mode try: async with httpx.AsyncClient() as client: response = await client.post(url, json=payload, timeout=10.0) response.raise_for_status() except Exception as e: logger.warning(f"Failed to send with {parse_mode}, retrying as plain text: {e}") try: # Retry without parse_mode payload.pop("parse_mode", None) async with httpx.AsyncClient() as client: response = await client.post(url, json=payload, timeout=10.0) response.raise_for_status() except Exception as e2: logger.error(f"Failed to send Telegram message: {e2}") raise def get_poll_interval(elapsed_seconds: float) -> int: """ Get polling interval based on elapsed time Progressive slowdown: 30s -> 60s -> 120s """ if elapsed_seconds < config.POLL_THRESHOLDS[0]: # < 10 minutes return config.POLL_INTERVALS[0] # 30 seconds elif elapsed_seconds < config.POLL_THRESHOLDS[1]: # < 1 hour return config.POLL_INTERVALS[1] # 60 seconds else: return config.POLL_INTERVALS[2] # 120 seconds # Create MCP server server = Server("telegram") @server.list_tools() async def list_tools() -> list[Tool]: """List available tools""" return [ Tool( name="telegram_notify", description=""" 发送结构化通知到 Telegram 参数: - event: 事件类型(completed/error/question/progress) - summary: 简短总结,必填,限制200字以内 - details: 详细信息,可选 最佳实践: 1. summary 必须简洁明了(1-2句话),说明做了什么、结果如何 2. 不要包含思考过程、不要包含代码片段 3. 需要用户决策时,清晰说明选项 示例: telegram_notify( event="completed", summary="修复了 auth.py:45 的空指针异常,所有测试通过", details="修改文件: auth.py, test_auth.py\\n测试: 12/12 passed" ) telegram_notify( event="question", summary="发现3种修复方案:1)添加空值检查 2)使用Optional类型 3)重构整个模块。推荐方案1,是否继续?" ) """, inputSchema={ "type": "object", "properties": { "event": { "type": "string", "enum": ["completed", "error", "question", "progress"], "description": "事件类型" }, "summary": { "type": "string", "description": "简短总结(必填,200字以内)", "maxLength": 200 }, "details": { "type": "string", "description": "详细信息(可选)" } }, "required": ["event", "summary"] } ), Tool( name="telegram_wait_reply", description=""" 等待用户回复(阻塞式轮询) 参数: - max_wait: 最长等待时间(秒),默认604800(7天/1周) 行为: - 前10分钟:每30秒检查一次 - 10分钟-1小时:每60秒检查一次 - 1小时以上:每120秒检查一次 - 用户可以按 Ctrl+C 中断等待 - 超时返回 timeout: true 返回: - reply: 用户回复内容 - timeout: 是否超时 - interrupted: 是否被用户中断 """, inputSchema={ "type": "object", "properties": { "max_wait": { "type": "integer", "description": "最长等待时间(秒),默认604800(7天)", "default": 604800 } } } ), Tool( name="telegram_send", description=""" 发送自由格式消息到 Telegram(不推荐,请优先使用 telegram_notify) 自动处理: - 超过300字自动截断 - 会提示使用 telegram_notify 发送结构化消息 """, inputSchema={ "type": "object", "properties": { "message": { "type": "string", "description": "消息内容" } }, "required": ["message"] } ), Tool( name="telegram_send_code", description=""" 发送代码段到 Telegram(带语法高亮) ⚠️ 使用场景(仅在必要时使用): - 遇到关键错误需要展示问题代码 - 修复了重要 bug,需要展示修复方案 - 用户明确要求查看某段代码 - 需要用户 review 关键代码片段 ❌ 不要使用的场景: - 一般性任务完成(使用 telegram_notify) - 创建了新文件(使用 telegram_send_file) - 例行操作(使用 telegram_notify 总结即可) 参数: - code: 代码内容(建议不超过50行) - language: 编程语言(python/javascript/go/rust/bash/json/yaml等) - caption: 可选说明文字(建议填写,解释发送这段代码的原因) 示例: telegram_send_code( code="def hello():\\n print('Hello')", language="python", caption="修复了空指针异常的关键函数" ) """, inputSchema={ "type": "object", "properties": { "code": { "type": "string", "description": "代码内容" }, "language": { "type": "string", "description": "编程语言(python/javascript/go/rust/bash/json/yaml等)", "default": "" }, "caption": { "type": "string", "description": "可选说明文字" } }, "required": ["code"] } ), Tool( name="telegram_send_image", description=""" 发送图片到 Telegram ⚠️ 使用场景: - 生成了图表、可视化结果 - 创建了截图、示意图 - 需要用户查看图片内容 - 图片格式:PNG, JPG, GIF, WebP 等 参数: - image_path: 图片文件路径(相对于项目目录或绝对路径) - caption: 可选说明文字 示例: telegram_send_image( image_path="output/chart.png", caption="性能测试结果图表" ) """, inputSchema={ "type": "object", "properties": { "image_path": { "type": "string", "description": "图片文件路径" }, "caption": { "type": "string", "description": "可选说明文字" } }, "required": ["image_path"] } ), Tool( name="telegram_send_file", description=""" 发送文件到 Telegram ⚠️ 使用场景(仅在必要时使用): - 用户明确要求查看某个文件:"查看 config.json"、"发送 main.py 给我" - 创建了重要的输出文件需要用户确认(如配置文件、报告等) - 生成了图表、图片等需要展示的文件 - 需要用户下载某个文件进行后续操作 ❌ 不要使用的场景: - 创建普通代码文件(用 telegram_notify 总结即可) - 修改了文件但不需要用户查看内容 - 例行的文件操作 优先级: 1. 优先使用 telegram_notify 总结文件变更 2. 如果用户明确要求,或确实需要查看,才发送文件 3. 对于代码文件,如果只需要展示关键片段,优先使用 telegram_send_code 参数: - file_path: 文件路径(相对于项目目录或绝对路径) - caption: 可选说明文字 支持: - 文本文件、代码文件 - 图片、PDF、文档等任意文件类型 - 自动处理大文件 示例: telegram_send_file( file_path="src/main.py", caption="修复后的主文件(用户要求查看)" ) """, inputSchema={ "type": "object", "properties": { "file_path": { "type": "string", "description": "文件路径" }, "caption": { "type": "string", "description": "可选说明文字" } }, "required": ["file_path"] } ), Tool( name="telegram_get_context_info", description=""" 获取并发送当前会话的上下文信息到 Telegram ⚠️ 重要:调用此工具时,你应该: 1. 检查系统提供的 token 使用量信息(<budget:token_budget> 标签) 2. 将 token 使用情况包含在发送的信息中 3. 发送完整的上下文信息(会话信息 + token 使用情况) 返回信息应包括: - 会话 ID 和项目路径 - Token 使用量(已使用 / 总量 / 剩余 / 使用率) - 会话运行时间 - 系统信息 - Telegram 配置 示例输出格式: 📊 会话上下文信息 🆔 会话: testtg 📁 项目: /path/to/project 💾 Token 使用: - 已使用: 41,853 tokens - 总容量: 1,000,000 tokens - 剩余: 958,147 tokens - 使用率: 4.2% ⏱️ 运行时间: 15 分钟 🖥️ 系统: Darwin 24.6.0 🐍 Python: 3.14.0 """, inputSchema={ "type": "object", "properties": { "token_used": { "type": "integer", "description": "已使用的 token 数量(从系统预算信息中获取)" }, "token_total": { "type": "integer", "description": "总 token 容量(从系统预算信息中获取)", "default": 1000000 } } } ), Tool( name="telegram_unattended_mode", description=""" 进入无人值守模式 - 智能远程任务循环 工作流程: 1. 执行当前任务 2. 根据情况智能选择通知方式: - 默认:使用 telegram_notify 发送总结 - 遇到关键问题/错误:使用 telegram_send_code 展示问题代码 - 用户明确要求:使用 telegram_send_file 发送文件 3. 调用 telegram_unattended_mode 等待下一步指令(静默等待,不发送额外提示) 4. 收到指令后执行,重复循环 ⚠️ 重要: - 完成任务后必须调用 telegram_notify 发送结果 - telegram_unattended_mode 本身不发送消息,只等待 - 这样用户每次只收到任务结果,不会有重复的等待提示 📋 通知内容最佳实践: ✅ 优先发送总结: - "修复了 auth.py 的空指针异常,测试通过" - "创建了 3 个文件:main.py, utils.py, test.py" - "代码重构完成,性能提升 30%" ⚠️ 仅在必要时发送代码: - 遇到无法自动修复的错误,需要展示错误代码 - 修复了关键 bug,展示修复前后对比 - 用户明确要求:"查看 main.py"、"发送代码给我" 🎯 智能判断示例: - 创建新文件 → telegram_notify("创建了 config.json") - 修复 bug → telegram_notify("修复了登录异常") + 如果复杂就 telegram_send_code - 用户问"文件内容是什么" → telegram_send_file 退出方式: - Telegram 发送 "退出" 或 "exit" - Claude Code 按 Ctrl+C 或 ESC 轮询策略: - 前10分钟:每30秒检查一次 - 10分钟-1小时:每60秒检查一次 - 1小时以上:每120秒检查一次 参数: - current_status: 当前任务状态的简短总结(1-2句话) - max_wait: 每次等待的最长时间(秒),默认604800(7天) - silent: 静默模式(不发送等待提示,默认 false) - 首次进入时使用 false(发送提示) - 后续循环使用 true(减少噪音) 返回: - next_instruction: 用户的下一步指令 - should_exit: 是否应该退出无人值守模式 - interrupted: 是否被用户中断(Ctrl+C/ESC) """, inputSchema={ "type": "object", "properties": { "current_status": { "type": "string", "description": "当前任务状态描述" }, "max_wait": { "type": "integer", "description": "最长等待时间(秒),默认604800(7天)", "default": 604800 } }, "required": [] } ) ] @server.call_tool() async def call_tool(name: str, arguments: dict) -> list[TextContent]: """Handle tool calls""" # Validate configuration try: config.validate_config() except ValueError as e: return [TextContent(type="text", text=f"配置错误: {str(e)}")] session_id = get_session_id() # Ensure session is registered (lazy registration) await ensure_session_registered(session_id) session = registry.get(session_id) if name == "telegram_notify": return await handle_telegram_notify(session, arguments) elif name == "telegram_wait_reply": return await handle_telegram_wait_reply(session, arguments) elif name == "telegram_send": return await handle_telegram_send(session, arguments) elif name == "telegram_send_code": return await handle_telegram_send_code(session, arguments) elif name == "telegram_send_image": return await handle_telegram_send_image(session, arguments) elif name == "telegram_send_file": return await handle_telegram_send_file(session, arguments) elif name == "telegram_get_context_info": return await handle_telegram_get_context_info(session, arguments) elif name == "telegram_unattended_mode": return await handle_telegram_unattended_mode(session, arguments) else: return [TextContent(type="text", text=f"Unknown tool: {name}")] async def handle_telegram_notify(session, arguments: dict) -> list[TextContent]: """Handle telegram_notify tool""" event = arguments.get("event") summary = arguments.get("summary", "") details = arguments.get("details", "") # Validate summary length if len(summary) > 200: return [TextContent( type="text", text="错误: summary 过长,请精炼到200字以内" )] # Format message emoji = { "completed": "✅", "error": "❌", "question": "❓", "progress": "⏳" } message = f"{emoji.get(event, '🔔')} [`{session.session_id}`]\n{summary}" if details: message += f"\n\n━━━━━━━━━━━━\n📝 详情:\n{details}" # Update session session.last_message = summary session.update_activity() registry.update_session(session) # Save to shared storage # Send to Telegram try: await send_telegram_message(session.chat_id, message) return [TextContent( type="text", text=f"✅ 已发送通知到 Telegram (会话: {session.session_id})" )] except Exception as e: return [TextContent( type="text", text=f"❌ 发送失败: {str(e)}" )] async def handle_telegram_wait_reply(session, arguments: dict) -> list[TextContent]: """Handle telegram_wait_reply tool""" max_wait = arguments.get("max_wait", config.TELEGRAM_MAX_WAIT) logger.info(f"Session {session.session_id} waiting for reply (max {max_wait}s)") # Mark session as waiting session.set_waiting() registry.update_session(session) # Save to shared storage # Poll for messages start_time = time.time() try: while True: elapsed = time.time() - start_time # Check timeout if elapsed >= max_wait: session.set_running() registry.update_session(session) # Save to shared storage logger.info(f"Session {session.session_id} wait timeout") return [TextContent( type="text", text=f"超时: 等待了 {int(elapsed)} 秒未收到回复" )] # Check message queue if message_queue.has_messages(session.session_id): reply = message_queue.pop(session.session_id) session.set_running() registry.update_session(session) # Save to shared storage logger.info(f"Session {session.session_id} received reply: {reply}") return [TextContent( type="text", text=f"用户回复: {reply}" )] # Progressive polling interval = get_poll_interval(elapsed) logger.debug(f"Session {session.session_id} polling (interval={interval}s, elapsed={int(elapsed)}s)") await asyncio.sleep(interval) except (KeyboardInterrupt, asyncio.CancelledError): session.set_running() registry.update_session(session) # Save to shared storage logger.info(f"Session {session.session_id} wait interrupted by user") return [TextContent( type="text", text=f"⚠️ 等待被用户中断 (Ctrl+C)\n\n已等待: {int(time.time() - start_time)} 秒\n\n你可以继续正常对话。" )] async def handle_telegram_send(session, arguments: dict) -> list[TextContent]: """Handle telegram_send tool""" message = arguments.get("message", "") # Auto-truncate if too long if len(message) > 300: message = message[:280] + "\n\n... [消息过长已截断,建议使用 telegram_notify]" # Format message formatted = f"🤖 [`{session.session_id}`]\n{message}" # Update session session.last_message = message session.update_activity() registry.update_session(session) # Save to shared storage # Send to Telegram try: await send_telegram_message(session.chat_id, formatted) return [TextContent( type="text", text=f"✅ 已发送消息到 Telegram (会话: {session.session_id})" )] except Exception as e: return [TextContent( type="text", text=f"❌ 发送失败: {str(e)}" )] async def handle_telegram_send_image(session, arguments: dict) -> list[TextContent]: """Handle telegram_send_image tool""" image_path = arguments.get("image_path", "") caption = arguments.get("caption", "") if not image_path: return [TextContent(type="text", text="错误: image_path 参数不能为空")] # Resolve image path (relative to project or absolute) if not os.path.isabs(image_path): full_path = os.path.join(session.project_path, image_path) else: full_path = image_path # Check if file exists if not os.path.exists(full_path): return [TextContent( type="text", text=f"❌ 图片文件不存在: {image_path}" )] if not os.path.isfile(full_path): return [TextContent( type="text", text=f"❌ 不是文件(可能是目录): {image_path}" )] # Build caption if not caption: caption = f"🖼️ [{session.session_id}] {image_path}" else: caption = f"🖼️ [{session.session_id}] {caption}" # Update session session.update_activity() # Send image to Telegram using HTTP API try: import httpx url = f"https://api.telegram.org/bot{config.TELEGRAM_BOT_TOKEN}/sendPhoto" with open(full_path, 'rb') as f: files = {'photo': (os.path.basename(image_path), f, 'image/jpeg')} data = { 'chat_id': session.chat_id, 'caption': caption } async with httpx.AsyncClient() as client: response = await client.post(url, files=files, data=data, timeout=30.0) response.raise_for_status() return [TextContent( type="text", text=f"✅ 已发送图片到 Telegram (会话: {session.session_id}, 图片: {image_path})" )] except Exception as e: return [TextContent( type="text", text=f"❌ 发送图片失败: {str(e)}" )] async def handle_telegram_send_code(session, arguments: dict) -> list[TextContent]: """Handle telegram_send_code tool""" code = arguments.get("code", "") language = arguments.get("language", "") caption = arguments.get("caption", "") if not code: return [TextContent(type="text", text="错误: code 参数不能为空")] # Build message if caption: message = f"📝 [`{session.session_id}`] {caption}\n\n" else: message = f"💻 [`{session.session_id}`] 代码段\n\n" # Add code block with syntax highlighting message += f"```{language}\n{code}\n```" # Update session session.update_activity() # Send to Telegram try: await send_telegram_message(session.chat_id, message) return [TextContent( type="text", text=f"✅ 已发送代码段到 Telegram (会话: {session.session_id}, 语言: {language or '未指定'})" )] except Exception as e: return [TextContent( type="text", text=f"❌ 发送代码段失败: {str(e)}" )] async def handle_telegram_get_context_info(session, arguments: dict) -> list[TextContent]: """Handle telegram_get_context_info tool""" import platform from datetime import datetime token_used = arguments.get("token_used", 0) token_total = arguments.get("token_total", 1000000) # Gather context information info_parts = [] info_parts.append("📊 会话上下文信息") info_parts.append("━━━━━━━━━━━━━━━━") info_parts.append(f"🆔 会话 ID: {session.session_id}") info_parts.append(f"📁 项目路径: {session.project_path}") info_parts.append(f"📂 当前目录: {os.getcwd()}") # Token usage (if provided) if token_used > 0: token_remaining = token_total - token_used usage_percent = (token_used / token_total) * 100 info_parts.append("") info_parts.append("💾 Token 使用情况:") info_parts.append(f"- 已使用: {token_used:,} tokens") info_parts.append(f"- 总容量: {token_total:,} tokens") info_parts.append(f"- 剩余: {token_remaining:,} tokens") info_parts.append(f"- 使用率: {usage_percent:.1f}%") # Session timing created = datetime.fromisoformat(session.created_at) last_active = datetime.fromisoformat(session.last_active) uptime = (datetime.now() - created).total_seconds() info_parts.append("") info_parts.append("⏱️ 会话时间:") info_parts.append(f"- 创建时间: {created.strftime('%Y-%m-%d %H:%M:%S')}") if uptime < 60: info_parts.append(f"- 运行时长: {int(uptime)} 秒") elif uptime < 3600: info_parts.append(f"- 运行时长: {int(uptime / 60)} 分钟") elif uptime < 86400: info_parts.append(f"- 运行时长: {int(uptime / 3600)} 小时") else: info_parts.append(f"- 运行时长: {int(uptime / 86400)} 天") # System info info_parts.append("") info_parts.append("🖥️ 系统环境:") info_parts.append(f"- 操作系统: {platform.system()} {platform.release()}") info_parts.append(f"- Python: {platform.python_version()}") info_parts.append(f"- 状态: {session.status}") # Telegram config info_parts.append("") info_parts.append("📱 Telegram 配置:") info_parts.append(f"- 最长等待: {config.TELEGRAM_MAX_WAIT // 86400} 天") info_parts.append(f"- 轮询: {config.POLL_INTERVALS[0]}s → {config.POLL_INTERVALS[1]}s → {config.POLL_INTERVALS[2]}s") message = "\n".join(info_parts) # Update session session.update_activity() # Send to Telegram try: await send_telegram_message(session.chat_id, message) return [TextContent( type="text", text=f"✅ 上下文信息已发送到 Telegram (会话: {session.session_id})\n\n💡 提示:下次调用时传入 token_used 参数可显示 token 使用量" )] except Exception as e: return [TextContent( type="text", text=f"❌ 发送失败: {str(e)}" )] async def handle_telegram_send_file(session, arguments: dict) -> list[TextContent]: """Handle telegram_send_file tool""" file_path = arguments.get("file_path", "") caption = arguments.get("caption", "") if not file_path: return [TextContent(type="text", text="错误: file_path 参数不能为空")] # Resolve file path (relative to project or absolute) if not os.path.isabs(file_path): full_path = os.path.join(session.project_path, file_path) else: full_path = file_path # Check if file exists if not os.path.exists(full_path): return [TextContent( type="text", text=f"❌ 文件不存在: {file_path}" )] if not os.path.isfile(full_path): return [TextContent( type="text", text=f"❌ 不是文件(可能是目录): {file_path}" )] # Build caption if not caption: caption = f"📄 [{session.session_id}] {file_path}" else: caption = f"📄 [{session.session_id}] {caption}" # Update session session.update_activity() # Send file to Telegram using HTTP API try: import httpx url = f"https://api.telegram.org/bot{config.TELEGRAM_BOT_TOKEN}/sendDocument" with open(full_path, 'rb') as f: files = {'document': (os.path.basename(file_path), f, 'application/octet-stream')} data = { 'chat_id': session.chat_id, 'caption': caption } async with httpx.AsyncClient() as client: response = await client.post(url, files=files, data=data, timeout=60.0) response.raise_for_status() return [TextContent( type="text", text=f"✅ 已发送文件到 Telegram (会话: {session.session_id}, 文件: {file_path})" )] except Exception as e: return [TextContent( type="text", text=f"❌ 发送文件失败: {str(e)}" )] async def handle_telegram_unattended_mode(session, arguments: dict) -> list[TextContent]: """Handle telegram_unattended_mode tool""" current_status = arguments.get("current_status", "") max_wait = arguments.get("max_wait", config.TELEGRAM_MAX_WAIT) # Update session state session.last_message = current_status session.update_activity() session.set_waiting() registry.update_session(session) # Save to shared storage # Silent waiting - no notification sent # User should call telegram_notify before calling this tool logger.info(f"Session {session.session_id} in unattended mode, waiting for instruction (silent)") start_time = time.time() try: while True: elapsed = time.time() - start_time # Check timeout if elapsed >= max_wait: session.set_running() registry.update_session(session) # Save to shared storage logger.info(f"Session {session.session_id} unattended mode timeout") return [TextContent( type="text", text=f"⏱️ 超时: 等待了 {int(elapsed)} 秒未收到指令\n\n建议:可以继续调用此工具重新进入等待,或者退出无人值守模式。" )] # Check message queue if message_queue.has_messages(session.session_id): reply = message_queue.pop(session.session_id) session.set_running() registry.update_session(session) # Save to shared storage logger.info(f"Session {session.session_id} received instruction: {reply}") # Check if user wants to exit if reply.lower() in ['退出', 'exit', 'quit', '结束']: return [TextContent( type="text", text=f"🚪 已退出无人值守模式\n\n用户指令: {reply}\n\n你可以继续正常对话,不再自动循环。" )] # Return the instruction return [TextContent( type="text", text=f"📨 收到新指令: {reply}\n\n请执行此指令,完成后再次调用 telegram_unattended_mode 继续循环。" )] # Progressive polling interval = get_poll_interval(elapsed) logger.debug(f"Session {session.session_id} unattended mode polling (interval={interval}s, elapsed={int(elapsed)}s)") await asyncio.sleep(interval) except (KeyboardInterrupt, asyncio.CancelledError): session.set_running() registry.update_session(session) # Save to shared storage logger.info(f"Session {session.session_id} unattended mode interrupted by user") return [TextContent( type="text", text=f"⚠️ 无人值守模式被用户中断 (Ctrl+C)\n\n已运行: {int(time.time() - start_time)} 秒\n\n已退出无人值守模式,你可以继续正常对话。" )]

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/batianVolyc/telegram-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server