server.py•34.6 kB
"""MCP Server implementation"""
import os
import time
import logging
import asyncio
from typing import Optional
from mcp.server import Server
from mcp.types import Tool, TextContent
from .session import registry
from .message_queue import message_queue
from . import config
logger = logging.getLogger(__name__)
# Global telegram bot instance (set by main)
telegram_bot = None
def get_session_id() -> str:
"""
Get session ID for current Claude Code instance
Priority: TELEGRAM_SESSION env var > current directory name
"""
if config.TELEGRAM_SESSION_ID:
return config.TELEGRAM_SESSION_ID
# Use current working directory name as session ID
cwd = os.getcwd()
session_id = os.path.basename(cwd)
return session_id
def get_project_path() -> str:
"""Get absolute path of current project"""
return os.getcwd()
async def ensure_session_registered(session_id: str) -> None:
"""
Ensure session is registered
Lazy registration: only register when first tool is called
"""
if not registry.exists(session_id):
project_path = get_project_path()
chat_id = config.TELEGRAM_CHAT_ID
session = registry.register(session_id, project_path, chat_id)
logger.info(f"Registered session: {session_id} at {project_path}")
# Send notification to Telegram
if telegram_bot:
try:
message = (
f"✅ 新会话已启动\n"
f"🆔 `{session_id}`\n"
f"📁 `{project_path}`\n"
f"使用 /to {session_id} <消息> 与之交互"
)
await send_telegram_message(chat_id, message)
except Exception as e:
logger.error(f"Failed to send registration notification: {e}")
def escape_markdown(text: str) -> str:
"""Escape special characters for Telegram Markdown"""
# Characters that need escaping in Telegram Markdown
special_chars = ['_', '*', '[', ']', '(', ')', '~', '`', '>', '#', '+', '-', '=', '|', '{', '}', '.', '!']
for char in special_chars:
text = text.replace(char, f'\\{char}')
return text
async def send_telegram_message(chat_id: str, message: str, parse_mode: str = "Markdown") -> None:
"""Send message to Telegram (async) using HTTP API"""
import httpx
url = f"https://api.telegram.org/bot{config.TELEGRAM_BOT_TOKEN}/sendMessage"
payload = {
"chat_id": chat_id,
"text": message
}
# Only add parse_mode if it's not None
if parse_mode:
payload["parse_mode"] = parse_mode
try:
async with httpx.AsyncClient() as client:
response = await client.post(url, json=payload, timeout=10.0)
response.raise_for_status()
except Exception as e:
logger.warning(f"Failed to send with {parse_mode}, retrying as plain text: {e}")
try:
# Retry without parse_mode
payload.pop("parse_mode", None)
async with httpx.AsyncClient() as client:
response = await client.post(url, json=payload, timeout=10.0)
response.raise_for_status()
except Exception as e2:
logger.error(f"Failed to send Telegram message: {e2}")
raise
def get_poll_interval(elapsed_seconds: float) -> int:
"""
Get polling interval based on elapsed time
Progressive slowdown: 30s -> 60s -> 120s
"""
if elapsed_seconds < config.POLL_THRESHOLDS[0]: # < 10 minutes
return config.POLL_INTERVALS[0] # 30 seconds
elif elapsed_seconds < config.POLL_THRESHOLDS[1]: # < 1 hour
return config.POLL_INTERVALS[1] # 60 seconds
else:
return config.POLL_INTERVALS[2] # 120 seconds
# Create MCP server
server = Server("telegram")
@server.list_tools()
async def list_tools() -> list[Tool]:
"""List available tools"""
return [
Tool(
name="telegram_notify",
description="""
发送结构化通知到 Telegram
参数:
- event: 事件类型(completed/error/question/progress)
- summary: 简短总结,必填,限制200字以内
- details: 详细信息,可选
最佳实践:
1. summary 必须简洁明了(1-2句话),说明做了什么、结果如何
2. 不要包含思考过程、不要包含代码片段
3. 需要用户决策时,清晰说明选项
示例:
telegram_notify(
event="completed",
summary="修复了 auth.py:45 的空指针异常,所有测试通过",
details="修改文件: auth.py, test_auth.py\\n测试: 12/12 passed"
)
telegram_notify(
event="question",
summary="发现3种修复方案:1)添加空值检查 2)使用Optional类型 3)重构整个模块。推荐方案1,是否继续?"
)
""",
inputSchema={
"type": "object",
"properties": {
"event": {
"type": "string",
"enum": ["completed", "error", "question", "progress"],
"description": "事件类型"
},
"summary": {
"type": "string",
"description": "简短总结(必填,200字以内)",
"maxLength": 200
},
"details": {
"type": "string",
"description": "详细信息(可选)"
}
},
"required": ["event", "summary"]
}
),
Tool(
name="telegram_wait_reply",
description="""
等待用户回复(阻塞式轮询)
参数:
- max_wait: 最长等待时间(秒),默认604800(7天/1周)
行为:
- 前10分钟:每30秒检查一次
- 10分钟-1小时:每60秒检查一次
- 1小时以上:每120秒检查一次
- 用户可以按 Ctrl+C 中断等待
- 超时返回 timeout: true
返回:
- reply: 用户回复内容
- timeout: 是否超时
- interrupted: 是否被用户中断
""",
inputSchema={
"type": "object",
"properties": {
"max_wait": {
"type": "integer",
"description": "最长等待时间(秒),默认604800(7天)",
"default": 604800
}
}
}
),
Tool(
name="telegram_send",
description="""
发送自由格式消息到 Telegram(不推荐,请优先使用 telegram_notify)
自动处理:
- 超过300字自动截断
- 会提示使用 telegram_notify 发送结构化消息
""",
inputSchema={
"type": "object",
"properties": {
"message": {
"type": "string",
"description": "消息内容"
}
},
"required": ["message"]
}
),
Tool(
name="telegram_send_code",
description="""
发送代码段到 Telegram(带语法高亮)
⚠️ 使用场景(仅在必要时使用):
- 遇到关键错误需要展示问题代码
- 修复了重要 bug,需要展示修复方案
- 用户明确要求查看某段代码
- 需要用户 review 关键代码片段
❌ 不要使用的场景:
- 一般性任务完成(使用 telegram_notify)
- 创建了新文件(使用 telegram_send_file)
- 例行操作(使用 telegram_notify 总结即可)
参数:
- code: 代码内容(建议不超过50行)
- language: 编程语言(python/javascript/go/rust/bash/json/yaml等)
- caption: 可选说明文字(建议填写,解释发送这段代码的原因)
示例:
telegram_send_code(
code="def hello():\\n print('Hello')",
language="python",
caption="修复了空指针异常的关键函数"
)
""",
inputSchema={
"type": "object",
"properties": {
"code": {
"type": "string",
"description": "代码内容"
},
"language": {
"type": "string",
"description": "编程语言(python/javascript/go/rust/bash/json/yaml等)",
"default": ""
},
"caption": {
"type": "string",
"description": "可选说明文字"
}
},
"required": ["code"]
}
),
Tool(
name="telegram_send_image",
description="""
发送图片到 Telegram
⚠️ 使用场景:
- 生成了图表、可视化结果
- 创建了截图、示意图
- 需要用户查看图片内容
- 图片格式:PNG, JPG, GIF, WebP 等
参数:
- image_path: 图片文件路径(相对于项目目录或绝对路径)
- caption: 可选说明文字
示例:
telegram_send_image(
image_path="output/chart.png",
caption="性能测试结果图表"
)
""",
inputSchema={
"type": "object",
"properties": {
"image_path": {
"type": "string",
"description": "图片文件路径"
},
"caption": {
"type": "string",
"description": "可选说明文字"
}
},
"required": ["image_path"]
}
),
Tool(
name="telegram_send_file",
description="""
发送文件到 Telegram
⚠️ 使用场景(仅在必要时使用):
- 用户明确要求查看某个文件:"查看 config.json"、"发送 main.py 给我"
- 创建了重要的输出文件需要用户确认(如配置文件、报告等)
- 生成了图表、图片等需要展示的文件
- 需要用户下载某个文件进行后续操作
❌ 不要使用的场景:
- 创建普通代码文件(用 telegram_notify 总结即可)
- 修改了文件但不需要用户查看内容
- 例行的文件操作
优先级:
1. 优先使用 telegram_notify 总结文件变更
2. 如果用户明确要求,或确实需要查看,才发送文件
3. 对于代码文件,如果只需要展示关键片段,优先使用 telegram_send_code
参数:
- file_path: 文件路径(相对于项目目录或绝对路径)
- caption: 可选说明文字
支持:
- 文本文件、代码文件
- 图片、PDF、文档等任意文件类型
- 自动处理大文件
示例:
telegram_send_file(
file_path="src/main.py",
caption="修复后的主文件(用户要求查看)"
)
""",
inputSchema={
"type": "object",
"properties": {
"file_path": {
"type": "string",
"description": "文件路径"
},
"caption": {
"type": "string",
"description": "可选说明文字"
}
},
"required": ["file_path"]
}
),
Tool(
name="telegram_get_context_info",
description="""
获取并发送当前会话的上下文信息到 Telegram
⚠️ 重要:调用此工具时,你应该:
1. 检查系统提供的 token 使用量信息(<budget:token_budget> 标签)
2. 将 token 使用情况包含在发送的信息中
3. 发送完整的上下文信息(会话信息 + token 使用情况)
返回信息应包括:
- 会话 ID 和项目路径
- Token 使用量(已使用 / 总量 / 剩余 / 使用率)
- 会话运行时间
- 系统信息
- Telegram 配置
示例输出格式:
📊 会话上下文信息
🆔 会话: testtg
📁 项目: /path/to/project
💾 Token 使用:
- 已使用: 41,853 tokens
- 总容量: 1,000,000 tokens
- 剩余: 958,147 tokens
- 使用率: 4.2%
⏱️ 运行时间: 15 分钟
🖥️ 系统: Darwin 24.6.0
🐍 Python: 3.14.0
""",
inputSchema={
"type": "object",
"properties": {
"token_used": {
"type": "integer",
"description": "已使用的 token 数量(从系统预算信息中获取)"
},
"token_total": {
"type": "integer",
"description": "总 token 容量(从系统预算信息中获取)",
"default": 1000000
}
}
}
),
Tool(
name="telegram_unattended_mode",
description="""
进入无人值守模式 - 智能远程任务循环
工作流程:
1. 执行当前任务
2. 根据情况智能选择通知方式:
- 默认:使用 telegram_notify 发送总结
- 遇到关键问题/错误:使用 telegram_send_code 展示问题代码
- 用户明确要求:使用 telegram_send_file 发送文件
3. 调用 telegram_unattended_mode 等待下一步指令(静默等待,不发送额外提示)
4. 收到指令后执行,重复循环
⚠️ 重要:
- 完成任务后必须调用 telegram_notify 发送结果
- telegram_unattended_mode 本身不发送消息,只等待
- 这样用户每次只收到任务结果,不会有重复的等待提示
📋 通知内容最佳实践:
✅ 优先发送总结:
- "修复了 auth.py 的空指针异常,测试通过"
- "创建了 3 个文件:main.py, utils.py, test.py"
- "代码重构完成,性能提升 30%"
⚠️ 仅在必要时发送代码:
- 遇到无法自动修复的错误,需要展示错误代码
- 修复了关键 bug,展示修复前后对比
- 用户明确要求:"查看 main.py"、"发送代码给我"
🎯 智能判断示例:
- 创建新文件 → telegram_notify("创建了 config.json")
- 修复 bug → telegram_notify("修复了登录异常") + 如果复杂就 telegram_send_code
- 用户问"文件内容是什么" → telegram_send_file
退出方式:
- Telegram 发送 "退出" 或 "exit"
- Claude Code 按 Ctrl+C 或 ESC
轮询策略:
- 前10分钟:每30秒检查一次
- 10分钟-1小时:每60秒检查一次
- 1小时以上:每120秒检查一次
参数:
- current_status: 当前任务状态的简短总结(1-2句话)
- max_wait: 每次等待的最长时间(秒),默认604800(7天)
- silent: 静默模式(不发送等待提示,默认 false)
- 首次进入时使用 false(发送提示)
- 后续循环使用 true(减少噪音)
返回:
- next_instruction: 用户的下一步指令
- should_exit: 是否应该退出无人值守模式
- interrupted: 是否被用户中断(Ctrl+C/ESC)
""",
inputSchema={
"type": "object",
"properties": {
"current_status": {
"type": "string",
"description": "当前任务状态描述"
},
"max_wait": {
"type": "integer",
"description": "最长等待时间(秒),默认604800(7天)",
"default": 604800
}
},
"required": []
}
)
]
@server.call_tool()
async def call_tool(name: str, arguments: dict) -> list[TextContent]:
"""Handle tool calls"""
# Validate configuration
try:
config.validate_config()
except ValueError as e:
return [TextContent(type="text", text=f"配置错误: {str(e)}")]
session_id = get_session_id()
# Ensure session is registered (lazy registration)
await ensure_session_registered(session_id)
session = registry.get(session_id)
if name == "telegram_notify":
return await handle_telegram_notify(session, arguments)
elif name == "telegram_wait_reply":
return await handle_telegram_wait_reply(session, arguments)
elif name == "telegram_send":
return await handle_telegram_send(session, arguments)
elif name == "telegram_send_code":
return await handle_telegram_send_code(session, arguments)
elif name == "telegram_send_image":
return await handle_telegram_send_image(session, arguments)
elif name == "telegram_send_file":
return await handle_telegram_send_file(session, arguments)
elif name == "telegram_get_context_info":
return await handle_telegram_get_context_info(session, arguments)
elif name == "telegram_unattended_mode":
return await handle_telegram_unattended_mode(session, arguments)
else:
return [TextContent(type="text", text=f"Unknown tool: {name}")]
async def handle_telegram_notify(session, arguments: dict) -> list[TextContent]:
"""Handle telegram_notify tool"""
event = arguments.get("event")
summary = arguments.get("summary", "")
details = arguments.get("details", "")
# Validate summary length
if len(summary) > 200:
return [TextContent(
type="text",
text="错误: summary 过长,请精炼到200字以内"
)]
# Format message
emoji = {
"completed": "✅",
"error": "❌",
"question": "❓",
"progress": "⏳"
}
message = f"{emoji.get(event, '🔔')} [`{session.session_id}`]\n{summary}"
if details:
message += f"\n\n━━━━━━━━━━━━\n📝 详情:\n{details}"
# Update session
session.last_message = summary
session.update_activity()
registry.update_session(session) # Save to shared storage
# Send to Telegram
try:
await send_telegram_message(session.chat_id, message)
return [TextContent(
type="text",
text=f"✅ 已发送通知到 Telegram (会话: {session.session_id})"
)]
except Exception as e:
return [TextContent(
type="text",
text=f"❌ 发送失败: {str(e)}"
)]
async def handle_telegram_wait_reply(session, arguments: dict) -> list[TextContent]:
"""Handle telegram_wait_reply tool"""
max_wait = arguments.get("max_wait", config.TELEGRAM_MAX_WAIT)
logger.info(f"Session {session.session_id} waiting for reply (max {max_wait}s)")
# Mark session as waiting
session.set_waiting()
registry.update_session(session) # Save to shared storage
# Poll for messages
start_time = time.time()
try:
while True:
elapsed = time.time() - start_time
# Check timeout
if elapsed >= max_wait:
session.set_running()
registry.update_session(session) # Save to shared storage
logger.info(f"Session {session.session_id} wait timeout")
return [TextContent(
type="text",
text=f"超时: 等待了 {int(elapsed)} 秒未收到回复"
)]
# Check message queue
if message_queue.has_messages(session.session_id):
reply = message_queue.pop(session.session_id)
session.set_running()
registry.update_session(session) # Save to shared storage
logger.info(f"Session {session.session_id} received reply: {reply}")
return [TextContent(
type="text",
text=f"用户回复: {reply}"
)]
# Progressive polling
interval = get_poll_interval(elapsed)
logger.debug(f"Session {session.session_id} polling (interval={interval}s, elapsed={int(elapsed)}s)")
await asyncio.sleep(interval)
except (KeyboardInterrupt, asyncio.CancelledError):
session.set_running()
registry.update_session(session) # Save to shared storage
logger.info(f"Session {session.session_id} wait interrupted by user")
return [TextContent(
type="text",
text=f"⚠️ 等待被用户中断 (Ctrl+C)\n\n已等待: {int(time.time() - start_time)} 秒\n\n你可以继续正常对话。"
)]
async def handle_telegram_send(session, arguments: dict) -> list[TextContent]:
"""Handle telegram_send tool"""
message = arguments.get("message", "")
# Auto-truncate if too long
if len(message) > 300:
message = message[:280] + "\n\n... [消息过长已截断,建议使用 telegram_notify]"
# Format message
formatted = f"🤖 [`{session.session_id}`]\n{message}"
# Update session
session.last_message = message
session.update_activity()
registry.update_session(session) # Save to shared storage
# Send to Telegram
try:
await send_telegram_message(session.chat_id, formatted)
return [TextContent(
type="text",
text=f"✅ 已发送消息到 Telegram (会话: {session.session_id})"
)]
except Exception as e:
return [TextContent(
type="text",
text=f"❌ 发送失败: {str(e)}"
)]
async def handle_telegram_send_image(session, arguments: dict) -> list[TextContent]:
"""Handle telegram_send_image tool"""
image_path = arguments.get("image_path", "")
caption = arguments.get("caption", "")
if not image_path:
return [TextContent(type="text", text="错误: image_path 参数不能为空")]
# Resolve image path (relative to project or absolute)
if not os.path.isabs(image_path):
full_path = os.path.join(session.project_path, image_path)
else:
full_path = image_path
# Check if file exists
if not os.path.exists(full_path):
return [TextContent(
type="text",
text=f"❌ 图片文件不存在: {image_path}"
)]
if not os.path.isfile(full_path):
return [TextContent(
type="text",
text=f"❌ 不是文件(可能是目录): {image_path}"
)]
# Build caption
if not caption:
caption = f"🖼️ [{session.session_id}] {image_path}"
else:
caption = f"🖼️ [{session.session_id}] {caption}"
# Update session
session.update_activity()
# Send image to Telegram using HTTP API
try:
import httpx
url = f"https://api.telegram.org/bot{config.TELEGRAM_BOT_TOKEN}/sendPhoto"
with open(full_path, 'rb') as f:
files = {'photo': (os.path.basename(image_path), f, 'image/jpeg')}
data = {
'chat_id': session.chat_id,
'caption': caption
}
async with httpx.AsyncClient() as client:
response = await client.post(url, files=files, data=data, timeout=30.0)
response.raise_for_status()
return [TextContent(
type="text",
text=f"✅ 已发送图片到 Telegram (会话: {session.session_id}, 图片: {image_path})"
)]
except Exception as e:
return [TextContent(
type="text",
text=f"❌ 发送图片失败: {str(e)}"
)]
async def handle_telegram_send_code(session, arguments: dict) -> list[TextContent]:
"""Handle telegram_send_code tool"""
code = arguments.get("code", "")
language = arguments.get("language", "")
caption = arguments.get("caption", "")
if not code:
return [TextContent(type="text", text="错误: code 参数不能为空")]
# Build message
if caption:
message = f"📝 [`{session.session_id}`] {caption}\n\n"
else:
message = f"💻 [`{session.session_id}`] 代码段\n\n"
# Add code block with syntax highlighting
message += f"```{language}\n{code}\n```"
# Update session
session.update_activity()
# Send to Telegram
try:
await send_telegram_message(session.chat_id, message)
return [TextContent(
type="text",
text=f"✅ 已发送代码段到 Telegram (会话: {session.session_id}, 语言: {language or '未指定'})"
)]
except Exception as e:
return [TextContent(
type="text",
text=f"❌ 发送代码段失败: {str(e)}"
)]
async def handle_telegram_get_context_info(session, arguments: dict) -> list[TextContent]:
"""Handle telegram_get_context_info tool"""
import platform
from datetime import datetime
token_used = arguments.get("token_used", 0)
token_total = arguments.get("token_total", 1000000)
# Gather context information
info_parts = []
info_parts.append("📊 会话上下文信息")
info_parts.append("━━━━━━━━━━━━━━━━")
info_parts.append(f"🆔 会话 ID: {session.session_id}")
info_parts.append(f"📁 项目路径: {session.project_path}")
info_parts.append(f"📂 当前目录: {os.getcwd()}")
# Token usage (if provided)
if token_used > 0:
token_remaining = token_total - token_used
usage_percent = (token_used / token_total) * 100
info_parts.append("")
info_parts.append("💾 Token 使用情况:")
info_parts.append(f"- 已使用: {token_used:,} tokens")
info_parts.append(f"- 总容量: {token_total:,} tokens")
info_parts.append(f"- 剩余: {token_remaining:,} tokens")
info_parts.append(f"- 使用率: {usage_percent:.1f}%")
# Session timing
created = datetime.fromisoformat(session.created_at)
last_active = datetime.fromisoformat(session.last_active)
uptime = (datetime.now() - created).total_seconds()
info_parts.append("")
info_parts.append("⏱️ 会话时间:")
info_parts.append(f"- 创建时间: {created.strftime('%Y-%m-%d %H:%M:%S')}")
if uptime < 60:
info_parts.append(f"- 运行时长: {int(uptime)} 秒")
elif uptime < 3600:
info_parts.append(f"- 运行时长: {int(uptime / 60)} 分钟")
elif uptime < 86400:
info_parts.append(f"- 运行时长: {int(uptime / 3600)} 小时")
else:
info_parts.append(f"- 运行时长: {int(uptime / 86400)} 天")
# System info
info_parts.append("")
info_parts.append("🖥️ 系统环境:")
info_parts.append(f"- 操作系统: {platform.system()} {platform.release()}")
info_parts.append(f"- Python: {platform.python_version()}")
info_parts.append(f"- 状态: {session.status}")
# Telegram config
info_parts.append("")
info_parts.append("📱 Telegram 配置:")
info_parts.append(f"- 最长等待: {config.TELEGRAM_MAX_WAIT // 86400} 天")
info_parts.append(f"- 轮询: {config.POLL_INTERVALS[0]}s → {config.POLL_INTERVALS[1]}s → {config.POLL_INTERVALS[2]}s")
message = "\n".join(info_parts)
# Update session
session.update_activity()
# Send to Telegram
try:
await send_telegram_message(session.chat_id, message)
return [TextContent(
type="text",
text=f"✅ 上下文信息已发送到 Telegram (会话: {session.session_id})\n\n💡 提示:下次调用时传入 token_used 参数可显示 token 使用量"
)]
except Exception as e:
return [TextContent(
type="text",
text=f"❌ 发送失败: {str(e)}"
)]
async def handle_telegram_send_file(session, arguments: dict) -> list[TextContent]:
"""Handle telegram_send_file tool"""
file_path = arguments.get("file_path", "")
caption = arguments.get("caption", "")
if not file_path:
return [TextContent(type="text", text="错误: file_path 参数不能为空")]
# Resolve file path (relative to project or absolute)
if not os.path.isabs(file_path):
full_path = os.path.join(session.project_path, file_path)
else:
full_path = file_path
# Check if file exists
if not os.path.exists(full_path):
return [TextContent(
type="text",
text=f"❌ 文件不存在: {file_path}"
)]
if not os.path.isfile(full_path):
return [TextContent(
type="text",
text=f"❌ 不是文件(可能是目录): {file_path}"
)]
# Build caption
if not caption:
caption = f"📄 [{session.session_id}] {file_path}"
else:
caption = f"📄 [{session.session_id}] {caption}"
# Update session
session.update_activity()
# Send file to Telegram using HTTP API
try:
import httpx
url = f"https://api.telegram.org/bot{config.TELEGRAM_BOT_TOKEN}/sendDocument"
with open(full_path, 'rb') as f:
files = {'document': (os.path.basename(file_path), f, 'application/octet-stream')}
data = {
'chat_id': session.chat_id,
'caption': caption
}
async with httpx.AsyncClient() as client:
response = await client.post(url, files=files, data=data, timeout=60.0)
response.raise_for_status()
return [TextContent(
type="text",
text=f"✅ 已发送文件到 Telegram (会话: {session.session_id}, 文件: {file_path})"
)]
except Exception as e:
return [TextContent(
type="text",
text=f"❌ 发送文件失败: {str(e)}"
)]
async def handle_telegram_unattended_mode(session, arguments: dict) -> list[TextContent]:
"""Handle telegram_unattended_mode tool"""
current_status = arguments.get("current_status", "")
max_wait = arguments.get("max_wait", config.TELEGRAM_MAX_WAIT)
# Update session state
session.last_message = current_status
session.update_activity()
session.set_waiting()
registry.update_session(session) # Save to shared storage
# Silent waiting - no notification sent
# User should call telegram_notify before calling this tool
logger.info(f"Session {session.session_id} in unattended mode, waiting for instruction (silent)")
start_time = time.time()
try:
while True:
elapsed = time.time() - start_time
# Check timeout
if elapsed >= max_wait:
session.set_running()
registry.update_session(session) # Save to shared storage
logger.info(f"Session {session.session_id} unattended mode timeout")
return [TextContent(
type="text",
text=f"⏱️ 超时: 等待了 {int(elapsed)} 秒未收到指令\n\n建议:可以继续调用此工具重新进入等待,或者退出无人值守模式。"
)]
# Check message queue
if message_queue.has_messages(session.session_id):
reply = message_queue.pop(session.session_id)
session.set_running()
registry.update_session(session) # Save to shared storage
logger.info(f"Session {session.session_id} received instruction: {reply}")
# Check if user wants to exit
if reply.lower() in ['退出', 'exit', 'quit', '结束']:
return [TextContent(
type="text",
text=f"🚪 已退出无人值守模式\n\n用户指令: {reply}\n\n你可以继续正常对话,不再自动循环。"
)]
# Return the instruction
return [TextContent(
type="text",
text=f"📨 收到新指令: {reply}\n\n请执行此指令,完成后再次调用 telegram_unattended_mode 继续循环。"
)]
# Progressive polling
interval = get_poll_interval(elapsed)
logger.debug(f"Session {session.session_id} unattended mode polling (interval={interval}s, elapsed={int(elapsed)}s)")
await asyncio.sleep(interval)
except (KeyboardInterrupt, asyncio.CancelledError):
session.set_running()
registry.update_session(session) # Save to shared storage
logger.info(f"Session {session.session_id} unattended mode interrupted by user")
return [TextContent(
type="text",
text=f"⚠️ 无人值守模式被用户中断 (Ctrl+C)\n\n已运行: {int(time.time() - start_time)} 秒\n\n已退出无人值守模式,你可以继续正常对话。"
)]