send_platform_message
Send messages via AstrBot's Web Chat API, supporting text, images, files, and multimedia content across platforms.
Instructions
通过 AstrBot 的 Web Chat API 发送消息链(支持文本、图片、文件等)。
参数:
platform_id: 平台 ID,例如 "webchat" 或配置中的平台 ID。
message_chain: 消息链,由 MessagePart 列表组成。
文本: {"type": "plain", "text": "..."}
回复: {"type": "reply", "message_id": "..."}
图片/文件/语音/视频: {"type": "image"|"file"|"record"|"video", "file_path": "本地路径或URL"} 或 {"type": "...", "url": "http(s) URL"}
message / images / files / videos / records: 可选便捷参数;当未传 message_chain 时,会自动拼成消息链。
session_id: 可选的平台会话 ID;如果为空,会自动为该平台创建新会话。
selected_provider / selected_model: 可选,指定 AstrBot 内部的 provider/model。
enable_streaming: 是否启用流式回复(影响 AstrBot 返回的 SSE 事件类型)。
Input Schema
| Name | Required | Description | Default |
|---|---|---|---|
| platform_id | Yes | ||
| message_chain | No | ||
| message | No | ||
| images | No | ||
| files | No | ||
| videos | No | ||
| records | No | ||
| target_id | No | ||
| message_type | No | GroupMessage | |
| session_id | No | ||
| conversation_id | No | ||
| use_last_session | No | ||
| new_session | No | ||
| reply_to_message_id | No | ||
| reply_to_last_saved_message | No | ||
| reply_to_last_user_message | No | ||
| selected_provider | No | ||
| selected_model | No | ||
| enable_streaming | No |
Implementation Reference
- The core handler function implementing the 'send_platform_message' tool logic. It sends messages to AstrBot's Web Chat API, managing sessions, message chains (text, media, replies), attachment uploads, provider/model selection, and processes SSE response events.async def send_platform_message( platform_id: str, message_chain: Optional[List[MessagePart]] = None, message: Optional[str] = None, images: Optional[List[str]] = None, files: Optional[List[str]] = None, videos: Optional[List[str]] = None, records: Optional[List[str]] = None, target_id: Optional[str] = None, message_type: Literal["GroupMessage", "FriendMessage"] = "GroupMessage", session_id: Optional[str] = None, conversation_id: Optional[str] = None, use_last_session: bool = True, new_session: bool = False, reply_to_message_id: Optional[str] = None, reply_to_last_saved_message: bool = False, reply_to_last_user_message: bool = False, selected_provider: Optional[str] = None, selected_model: Optional[str] = None, enable_streaming: bool = True, ) -> Dict[str, Any]: """ 通过 AstrBot 的 Web Chat API 发送消息链(支持文本、图片、文件等)。 参数: - platform_id: 平台 ID,例如 "webchat" 或配置中的平台 ID。 - message_chain: 消息链,由 MessagePart 列表组成。 - 文本: {"type": "plain", "text": "..."} - 回复: {"type": "reply", "message_id": "..."} - 图片/文件/语音/视频: {"type": "image"|"file"|"record"|"video", "file_path": "本地路径或URL"} 或 {"type": "...", "url": "http(s) URL"} - message / images / files / videos / records: 可选便捷参数;当未传 message_chain 时,会自动拼成消息链。 - session_id: 可选的平台会话 ID;如果为空,会自动为该平台创建新会话。 - selected_provider / selected_model: 可选,指定 AstrBot 内部的 provider/model。 - enable_streaming: 是否启用流式回复(影响 AstrBot 返回的 SSE 事件类型)。 """ client = AstrBotClient.from_env() if target_id: direct_result = await send_platform_message_direct( platform_id=platform_id, target_id=str(target_id), message_chain=message_chain, message=message, images=images, files=files, videos=videos, records=records, message_type=message_type, ) if isinstance(direct_result, dict): direct_result.setdefault("mode", "direct") return direct_result mode = "webchat" session_platform_id = "webchat" routing_debug: Dict[str, Any] = {} send_started_at = datetime.now(timezone.utc) if message_chain is None: message_chain = [] if message: message_chain.append({"type": "plain", "text": message}) for src in images or []: message_chain.append({"type": "image", "file_path": src}) for src in files or []: message_chain.append({"type": "file", "file_path": src}) for src in records or []: message_chain.append({"type": "record", "file_path": src}) for src in videos or []: message_chain.append({"type": "video", "file_path": src}) # 1. 确保有 session_id explicit_session_id = session_id or conversation_id used_session_id: str | None = None session_reused = False if ( explicit_session_id and isinstance(explicit_session_id, str) and explicit_session_id.strip() ): used_session_id = explicit_session_id.strip() async with _SESSION_CACHE_LOCK: _SESSION_CACHE[_session_cache_key(client, session_platform_id)] = used_session_id elif use_last_session and not new_session: async with _SESSION_CACHE_LOCK: cached = _SESSION_CACHE.get(_session_cache_key(client, session_platform_id)) if cached: used_session_id = cached session_reused = True if new_session or not used_session_id: try: session_resp = await client.create_platform_session( platform_id=session_platform_id ) except Exception as e: return { "status": "error", "message": f"AstrBot API error: {e.response.status_code if hasattr(e, 'response') else 'Unknown'}", "mode": mode, "platform_id": session_platform_id, "requested_platform_id": platform_id, "base_url": client.base_url, "detail": _httpx_error_detail(e), } if session_resp.get("status") != "ok": return { "status": session_resp.get("status"), "message": session_resp.get("message"), "raw": session_resp, } data = session_resp.get("data") or {} used_session_id = data.get("session_id") if not used_session_id: return { "status": "error", "message": "Failed to create platform session: missing session_id", "raw": session_resp, } used_session_id = str(used_session_id) async with _SESSION_CACHE_LOCK: _SESSION_CACHE[_session_cache_key(client, session_platform_id)] = used_session_id session_reused = False used_session_id = str(used_session_id) if client.settings.username: username = client.settings.username.strip() or "astrbot" umo = f"webchat:FriendMessage:webchat!{username}!{used_session_id}" routing_debug["umo"] = umo # 1) Ensure UMO -> abconf route exists (the dashboard does this automatically). try: ucr_resp = await client.get_umo_abconf_routes() routing_debug["ucr_get"] = ucr_resp if ucr_resp.get("status") != "ok" else None if ucr_resp.get("status") == "ok": routing = (ucr_resp.get("data") or {}).get("routing") or {} if isinstance(routing, dict): if umo in routing: routing_debug["ucr_has_route"] = True else: routing_debug["ucr_has_route"] = False prefix = f"webchat:FriendMessage:webchat!{username}!" conf_id: str | None = None for k, v in routing.items(): if isinstance(k, str) and k.startswith(prefix): conf_id = str(v) break if not conf_id: abconfs = await client.get_abconf_list() info_list = (abconfs.get("data") or {}).get("info_list") or [] if isinstance(info_list, list): # Prefer an active/current config if present. for item in info_list: if not isinstance(item, dict): continue if item.get("active") or item.get("current") or item.get("is_current"): cid = item.get("id") or item.get("conf_id") if cid: conf_id = str(cid) break if not conf_id: for item in info_list: if not isinstance(item, dict): continue cid = item.get("id") or item.get("conf_id") if cid: conf_id = str(cid) break routing_debug["abconf_pick"] = conf_id if conf_id: upd = await client.update_umo_abconf_route(umo=umo, conf_id=conf_id) routing_debug["ucr_update"] = upd except Exception as e: routing_debug["ucr_exception"] = str(e) # 2) Copy provider_perf rule from an existing webchat UMO (avoids "no provider supported" on fresh sessions). try: rules_resp = await client.list_session_rules( page=1, page_size=100, search=f"webchat!{username}!" ) routing_debug["session_rules_get"] = ( rules_resp if rules_resp.get("status") != "ok" else None ) if rules_resp.get("status") == "ok": data = rules_resp.get("data") or {} rules_list = data.get("rules") or [] if isinstance(rules_list, list): source_umo = None source_key = None source_val = None for item in rules_list: if not isinstance(item, dict): continue rules = item.get("rules") or {} if not isinstance(rules, dict): continue for k, v in rules.items(): if isinstance(k, str) and k.startswith("provider_perf_") and "chat" in k: source_umo = item.get("umo") source_key = k source_val = v break if source_key: break if source_key and source_val is not None: upd = await client.update_session_rule( umo=umo, rule_key=source_key, rule_value=source_val ) routing_debug["provider_rule_copied_from"] = source_umo routing_debug["provider_rule_key"] = source_key routing_debug["provider_rule_update"] = upd except Exception as e: routing_debug["session_rules_exception"] = str(e) else: routing_debug["skipped"] = "No ASTRBOT_USERNAME configured; cannot mirror dashboard session routing." if reply_to_last_user_message and not reply_to_message_id: async with _LAST_USER_MESSAGE_ID_LOCK: reply_to_message_id = _LAST_USER_MESSAGE_ID_BY_SESSION.get( _last_saved_key(client, used_session_id) ) # reply_to_last_saved_message historically points to the last saved bot message (message_saved.id). # With user_message_saved supported, callers can prefer last_user_message_id from the response. if reply_to_last_saved_message and not reply_to_message_id: async with _LAST_SAVED_MESSAGE_ID_LOCK: reply_to_message_id = _LAST_SAVED_MESSAGE_ID_BY_SESSION.get( _last_saved_key(client, used_session_id) ) # 2. 把 message_chain 转成 AstrBot chat/send 需要的 message_parts explicit_reply_present = False for part in message_chain: if not isinstance(part, dict): continue if part.get("type") in ("reply", "quote", "reference"): msg_id = part.get("message_id") or part.get("id") if msg_id is not None and str(msg_id).strip(): explicit_reply_present = True break message_parts: List[Dict[str, Any]] = [] reply_ids: List[str] = [] if reply_to_message_id and not explicit_reply_present: message_parts.append( { "type": "reply", "message_id": _normalize_history_message_id(reply_to_message_id), } ) reply_ids.append(str(reply_to_message_id)) quote_debug: Dict[str, Any] | None = None uploaded_attachments: List[Dict[str, Any]] = [] for part in message_chain: p_type = part.get("type") if p_type == "plain": text = part.get("text", "") message_parts.append({"type": "plain", "text": text}) elif p_type in ("reply", "quote", "reference"): msg_id = part.get("message_id") or part.get("id") if msg_id is None: continue msg_id_str = str(msg_id).strip() if not msg_id_str: continue message_parts.append( {"type": "reply", "message_id": _normalize_history_message_id(msg_id)} ) reply_ids.append(msg_id_str) elif p_type in ("image", "file", "record", "video"): file_path = part.get("file_path") url = part.get("url") file_name = part.get("file_name") mime_type = part.get("mime_type") src = url or file_path if not src: continue if isinstance(src, str) and src.startswith(("http://", "https://")): if not file_name: from urllib.parse import urlparse parsed = urlparse(src) file_name = os.path.basename(parsed.path) or None try: attach_resp = await client.post_attachment_url( src, file_name=file_name, mime_type=mime_type, ) except Exception as e: return { "status": "error", "message": f"AstrBot API error: {e.response.status_code if hasattr(e, 'response') else 'Unknown'}", "platform_id": platform_id, "session_id": used_session_id, "base_url": client.base_url, "detail": _httpx_error_detail(e), } else: if not isinstance(src, str): return { "status": "error", "message": f"Invalid local file_path: {src!r}", "platform_id": platform_id, "session_id": used_session_id, "part": dict(part), } try: src = _resolve_local_file_path(client, src) except ValueError as e: return { "status": "error", "message": str(e), "platform_id": platform_id, "session_id": used_session_id, "part": dict(part), "hint": "Set ASTRBOTMCP_FILE_ROOT to control how relative paths are resolved.", } except FileNotFoundError: return { "status": "error", "message": f"Local file_path does not exist: {src!r}", "platform_id": platform_id, "session_id": used_session_id, "part": dict(part), "hint": "If you passed a relative path, set ASTRBOTMCP_FILE_ROOT (or run the server in the correct working directory).", } try: attach_resp = await client.post_attachment_file( src, file_name=file_name, mime_type=mime_type, ) except Exception as e: return { "status": "error", "message": f"AstrBot API error: {e.response.status_code if hasattr(e, 'response') else 'Unknown'}", "platform_id": platform_id, "session_id": used_session_id, "base_url": client.base_url, "detail": _httpx_error_detail(e), } if attach_resp.get("status") != "ok": return { "status": attach_resp.get("status"), "message": attach_resp.get("message"), "raw": attach_resp, } attach_data = attach_resp.get("data") or {} attachment_id = attach_data.get("attachment_id") if not attachment_id: return { "status": "error", "message": "Attachment upload succeeded but attachment_id is missing", "raw": attach_resp, } # /api/chat/send has a pre-check based on `type`, so include media type # alongside attachment_id (otherwise it may treat the message as empty). attachment_type = attach_data.get("type") or p_type message_parts.append({"type": attachment_type, "attachment_id": attachment_id}) uploaded_attachments.append(attach_data) else: # 忽略未知类型 continue if reply_ids: try: _ignored, quote_debug = await _resolve_webchat_quotes( client, session_id=used_session_id, reply_ids=reply_ids ) except Exception as e: quote_debug = {"error": str(e), "resolved": {}, "missing": reply_ids} if not message_parts: return { "status": "error", "message": "message_chain did not produce any valid message parts", "mode": mode, "platform_id": session_platform_id, "requested_platform_id": platform_id, "quote_debug": quote_debug, "routing_debug": routing_debug, } # 3. 调用 /api/chat/send 并消费 SSE 回复 # Mirror dashboard behavior: prefer session rules and UMO routing. # If we cannot infer/copy provider rules for a brand-new session, fall back to env defaults. effective_provider = selected_provider effective_model = selected_model if ( effective_provider is None and effective_model is None and not routing_debug.get("provider_rule_key") ): effective_provider = client.settings.default_provider effective_model = client.settings.default_model try: events = await client.send_chat_message_sse( session_id=used_session_id, message_parts=message_parts, selected_provider=effective_provider, selected_model=effective_model, enable_streaming=enable_streaming, ) except Exception as e: status_code = getattr(getattr(e, "response", None), "status_code", None) return { "status": "error", "message": ( f"AstrBot API error: HTTP {status_code}" if status_code is not None else f"AstrBot API error: {e}" ), "mode": mode, "platform_id": session_platform_id, "requested_platform_id": platform_id, "session_id": used_session_id, "selected_provider": effective_provider, "selected_model": effective_model, "request_message_parts": message_parts, "detail": _httpx_error_detail(e), "astrbot_logs_tail": await _get_astrbot_log_tail(client), "hint": ( "If you see 'has no provider supported' in AstrBot logs, " "set selected_provider/selected_model (or env ASTRBOT_DEFAULT_PROVIDER/ASTRBOT_DEFAULT_MODEL)." ), "quote_debug": quote_debug, "routing_debug": routing_debug, } # 简单聚合文本回复(仅供参考,保留原始事件) reply_text_chunks: List[str] = [] saved_message_ids: List[str] = [] user_message_ids: List[str] = [] if not events: return { "status": "error", "message": "AstrBot returned no SSE events for /api/chat/send", "mode": mode, "platform_id": session_platform_id, "requested_platform_id": platform_id, "session_id": used_session_id, "selected_provider": effective_provider, "selected_model": effective_model, "request_message_parts": message_parts, "astrbot_logs_tail": await _get_astrbot_log_tail(client), "hint": "Check AstrBot logs for the root cause (often provider/model config).", "quote_debug": quote_debug, "routing_debug": routing_debug, } # If we only got bookkeeping events (e.g., user_message_saved) but no response stream at all, # treat it as an error while still returning useful ids. response_types = { "plain", "complete", "image", "record", "file", "message_saved", "end", "break", "raw", } has_response = any(ev.get("type") in response_types for ev in events if isinstance(ev, dict)) if not has_response: user_ids = [] for ev in events: if not isinstance(ev, dict): continue if ev.get("type") == "user_message_saved": data = ev.get("data") or {} mid = data.get("id") if mid is not None: user_ids.append(str(mid)) # Some plugins reply by side effects (e.g., sending messages via adapters) and may only # emit bookkeeping events on the WebChat SSE stream. Treat as ok but include a warning. return { "status": "ok", "warning": "No reply events were observed on the /api/chat/send SSE stream; check AstrBot logs if you expected an LLM reply.", "mode": mode, "platform_id": session_platform_id, "requested_platform_id": platform_id, "session_id": used_session_id, "selected_provider": effective_provider, "selected_model": effective_model, "request_message_parts": message_parts, "user_message_ids": user_ids, "last_user_message_id": (user_ids[-1] if user_ids else None), "quote_debug": quote_debug, "routing_debug": routing_debug, "reply_events": events, "astrbot_logs_tail": await _get_astrbot_log_tail(client), } for ev in events: if ev.get("type") == "user_message_saved": data = ev.get("data") or {} saved_id = data.get("id") if saved_id is not None: user_message_ids.append(str(saved_id)) async with _LAST_USER_MESSAGE_ID_LOCK: _LAST_USER_MESSAGE_ID_BY_SESSION[_last_saved_key(client, used_session_id)] = str( saved_id ) if ev.get("type") == "message_saved": data = ev.get("data") or {} saved_id = data.get("id") if saved_id is not None: saved_message_ids.append(str(saved_id)) if ev.get("type") in ("plain", "complete"): data = ev.get("data") if isinstance(data, str): reply_text_chunks.append(data) # Fallback: some AstrBot versions do not emit `user_message_saved`. # Try to infer the latest user message id by fetching /api/chat/get_session and scanning history. if not user_message_ids: match_hint = None for part in message_parts: if not isinstance(part, dict): continue if part.get("type") == "plain": txt = part.get("text") if isinstance(txt, str) and txt.strip(): match_hint = txt.strip() break try: sess = await client.get_platform_session(session_id=used_session_id) if sess.get("status") == "ok": history = (sess.get("data") or {}).get("history") or [] if isinstance(history, list): expected_sender = (client.settings.username or "").strip() or None def is_recent_user_record(item: Dict[str, Any]) -> bool: if not isinstance(item, dict): return False content = item.get("content") or {} if not isinstance(content, dict) or content.get("type") != "user": return False if expected_sender and item.get("sender_name") != expected_sender: return False if match_hint: extracted = _extract_plain_text_from_history_item(item) if match_hint[:32] not in extracted: return False created_at = item.get("created_at") if isinstance(created_at, str) and created_at: try: # e.g. 2025-12-18T21:47:07.684801+08:00 dt = datetime.fromisoformat(created_at) if dt.tzinfo is None: dt = dt.replace(tzinfo=timezone.utc) # accept a small clock skew window return dt.astimezone(timezone.utc) >= send_started_at.replace( microsecond=0 ) - timedelta(seconds=5) except Exception: pass return True # Look from newest to oldest for a likely match. for item in reversed(history): if not isinstance(item, dict): continue if not is_recent_user_record(item): continue mid = item.get("id") if mid is None: continue user_message_ids.append(str(mid)) async with _LAST_USER_MESSAGE_ID_LOCK: _LAST_USER_MESSAGE_ID_BY_SESSION[_last_saved_key(client, used_session_id)] = str( mid ) break except Exception as e: routing_debug["user_id_fallback_exception"] = str(e) last_saved_message_id: str | None = ( saved_message_ids[-1] if saved_message_ids else None ) last_user_message_id: str | None = ( user_message_ids[-1] if user_message_ids else None ) if last_saved_message_id: async with _LAST_SAVED_MESSAGE_ID_LOCK: _LAST_SAVED_MESSAGE_ID_BY_SESSION[_last_saved_key(client, used_session_id)] = ( last_saved_message_id ) return { "status": "ok", "mode": mode, "platform_id": session_platform_id, "requested_platform_id": platform_id, "session_id": used_session_id, "conversation_id": used_session_id, "session_reused": session_reused, "selected_provider": effective_provider, "selected_model": effective_model, "request_message_parts": message_parts, "uploaded_attachments": uploaded_attachments, "reply_events": events, "reply_text": "".join(reply_text_chunks), "user_message_ids": user_message_ids, "last_user_message_id": last_user_message_id, "saved_message_ids": saved_message_ids, "last_saved_message_id": last_saved_message_id, "quote_debug": quote_debug, "routing_debug": routing_debug, }
- astrbot_mcp/server.py:27-27 (registration)Registers the send_platform_message handler with the FastMCP server under the tool name 'send_platform_message'.server.tool(astrbot_tools.send_platform_message, name="send_platform_message")
- astrbot_mcp/tools/types.py:12-44 (schema)TypedDict schema defining the structure of individual parts in the message_chain parameter for the tool.class MessagePart(TypedDict, total=False): """ A single message part for send_platform_message. Types: - plain: {"type": "plain", "text": "..."} - reply: {"type": "reply", "message_id": "..."} - quote: {"type": "quote", "message_id": "..."} (alias of reply) - reference: {"type": "reference", "message_id": "..."} (alias of reply) - image: {"type": "image", "file_path": "..."} or {"type": "image", "url": "https://..."} - file: {"type": "file", "file_path": "..."} or {"type": "file", "url": "https://..."} - record: {"type": "record", "file_path": "..."} or {"type": "record", "url": "https://..."} - video: {"type": "video", "file_path": "..."} or {"type": "video", "url": "https://..."} """ type: Literal[ "plain", "reply", "quote", "reference", "image", "file", "record", "video", ] text: str message_id: str id: str file_path: str url: str file_name: str mime_type: str
- astrbot_mcp/server.py:51-52 (registration)The tool is listed in the astrbot://info resource, advertising it to MCP hosts."send_platform_message", "send_platform_message_direct",