Skip to main content
Glama

send_platform_message

Send messages via AstrBot's Web Chat API, supporting text, images, files, and multimedia content across platforms.

Instructions

通过 AstrBot 的 Web Chat API 发送消息链(支持文本、图片、文件等)。

参数:

  • platform_id: 平台 ID,例如 "webchat" 或配置中的平台 ID。

  • message_chain: 消息链,由 MessagePart 列表组成。

    • 文本: {"type": "plain", "text": "..."}

    • 回复: {"type": "reply", "message_id": "..."}

    • 图片/文件/语音/视频: {"type": "image"|"file"|"record"|"video", "file_path": "本地路径或URL"} 或 {"type": "...", "url": "http(s) URL"}

  • message / images / files / videos / records: 可选便捷参数;当未传 message_chain 时,会自动拼成消息链。

  • session_id: 可选的平台会话 ID;如果为空,会自动为该平台创建新会话。

  • selected_provider / selected_model: 可选,指定 AstrBot 内部的 provider/model。

  • enable_streaming: 是否启用流式回复(影响 AstrBot 返回的 SSE 事件类型)。

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
platform_idYes
message_chainNo
messageNo
imagesNo
filesNo
videosNo
recordsNo
target_idNo
message_typeNoGroupMessage
session_idNo
conversation_idNo
use_last_sessionNo
new_sessionNo
reply_to_message_idNo
reply_to_last_saved_messageNo
reply_to_last_user_messageNo
selected_providerNo
selected_modelNo
enable_streamingNo

Implementation Reference

  • The core handler function implementing the 'send_platform_message' tool logic. It sends messages to AstrBot's Web Chat API, managing sessions, message chains (text, media, replies), attachment uploads, provider/model selection, and processes SSE response events.
    async def send_platform_message(
        platform_id: str,
        message_chain: Optional[List[MessagePart]] = None,
        message: Optional[str] = None,
        images: Optional[List[str]] = None,
        files: Optional[List[str]] = None,
        videos: Optional[List[str]] = None,
        records: Optional[List[str]] = None,
        target_id: Optional[str] = None,
        message_type: Literal["GroupMessage", "FriendMessage"] = "GroupMessage",
        session_id: Optional[str] = None,
        conversation_id: Optional[str] = None,
        use_last_session: bool = True,
        new_session: bool = False,
        reply_to_message_id: Optional[str] = None,
        reply_to_last_saved_message: bool = False,
        reply_to_last_user_message: bool = False,
        selected_provider: Optional[str] = None,
        selected_model: Optional[str] = None,
        enable_streaming: bool = True,
    ) -> Dict[str, Any]:
        """
        通过 AstrBot 的 Web Chat API 发送消息链(支持文本、图片、文件等)。
    
        参数:
          - platform_id: 平台 ID,例如 "webchat" 或配置中的平台 ID。
          - message_chain: 消息链,由 MessagePart 列表组成。
            - 文本:  {"type": "plain", "text": "..."}
            - 回复:  {"type": "reply", "message_id": "..."}
            - 图片/文件/语音/视频: {"type": "image"|"file"|"record"|"video", "file_path": "本地路径或URL"} 或 {"type": "...", "url": "http(s) URL"}
          - message / images / files / videos / records: 可选便捷参数;当未传 message_chain 时,会自动拼成消息链。
          - session_id: 可选的平台会话 ID;如果为空,会自动为该平台创建新会话。
          - selected_provider / selected_model: 可选,指定 AstrBot 内部的 provider/model。
          - enable_streaming: 是否启用流式回复(影响 AstrBot 返回的 SSE 事件类型)。
        """
        client = AstrBotClient.from_env()
    
        if target_id:
            direct_result = await send_platform_message_direct(
                platform_id=platform_id,
                target_id=str(target_id),
                message_chain=message_chain,
                message=message,
                images=images,
                files=files,
                videos=videos,
                records=records,
                message_type=message_type,
            )
            if isinstance(direct_result, dict):
                direct_result.setdefault("mode", "direct")
            return direct_result
    
        mode = "webchat"
        session_platform_id = "webchat"
        routing_debug: Dict[str, Any] = {}
        send_started_at = datetime.now(timezone.utc)
    
        if message_chain is None:
            message_chain = []
            if message:
                message_chain.append({"type": "plain", "text": message})
            for src in images or []:
                message_chain.append({"type": "image", "file_path": src})
            for src in files or []:
                message_chain.append({"type": "file", "file_path": src})
            for src in records or []:
                message_chain.append({"type": "record", "file_path": src})
            for src in videos or []:
                message_chain.append({"type": "video", "file_path": src})
    
        # 1. 确保有 session_id
        explicit_session_id = session_id or conversation_id
        used_session_id: str | None = None
        session_reused = False
    
        if (
            explicit_session_id
            and isinstance(explicit_session_id, str)
            and explicit_session_id.strip()
        ):
            used_session_id = explicit_session_id.strip()
            async with _SESSION_CACHE_LOCK:
                _SESSION_CACHE[_session_cache_key(client, session_platform_id)] = used_session_id
        elif use_last_session and not new_session:
            async with _SESSION_CACHE_LOCK:
                cached = _SESSION_CACHE.get(_session_cache_key(client, session_platform_id))
            if cached:
                used_session_id = cached
                session_reused = True
    
        if new_session or not used_session_id:
            try:
                session_resp = await client.create_platform_session(
                    platform_id=session_platform_id
                )
            except Exception as e:
                return {
                    "status": "error",
                    "message": f"AstrBot API error: {e.response.status_code if hasattr(e, 'response') else 'Unknown'}",
                    "mode": mode,
                    "platform_id": session_platform_id,
                    "requested_platform_id": platform_id,
                    "base_url": client.base_url,
                    "detail": _httpx_error_detail(e),
                }
            if session_resp.get("status") != "ok":
                return {
                    "status": session_resp.get("status"),
                    "message": session_resp.get("message"),
                    "raw": session_resp,
                }
            data = session_resp.get("data") or {}
            used_session_id = data.get("session_id")
            if not used_session_id:
                return {
                    "status": "error",
                    "message": "Failed to create platform session: missing session_id",
                    "raw": session_resp,
                }
            used_session_id = str(used_session_id)
            async with _SESSION_CACHE_LOCK:
                _SESSION_CACHE[_session_cache_key(client, session_platform_id)] = used_session_id
            session_reused = False
    
        used_session_id = str(used_session_id)
    
        if client.settings.username:
            username = client.settings.username.strip() or "astrbot"
            umo = f"webchat:FriendMessage:webchat!{username}!{used_session_id}"
            routing_debug["umo"] = umo
    
            # 1) Ensure UMO -> abconf route exists (the dashboard does this automatically).
            try:
                ucr_resp = await client.get_umo_abconf_routes()
                routing_debug["ucr_get"] = ucr_resp if ucr_resp.get("status") != "ok" else None
                if ucr_resp.get("status") == "ok":
                    routing = (ucr_resp.get("data") or {}).get("routing") or {}
                    if isinstance(routing, dict):
                        if umo in routing:
                            routing_debug["ucr_has_route"] = True
                        else:
                            routing_debug["ucr_has_route"] = False
                            prefix = f"webchat:FriendMessage:webchat!{username}!"
                            conf_id: str | None = None
                            for k, v in routing.items():
                                if isinstance(k, str) and k.startswith(prefix):
                                    conf_id = str(v)
                                    break
    
                            if not conf_id:
                                abconfs = await client.get_abconf_list()
                                info_list = (abconfs.get("data") or {}).get("info_list") or []
                                if isinstance(info_list, list):
                                    # Prefer an active/current config if present.
                                    for item in info_list:
                                        if not isinstance(item, dict):
                                            continue
                                        if item.get("active") or item.get("current") or item.get("is_current"):
                                            cid = item.get("id") or item.get("conf_id")
                                            if cid:
                                                conf_id = str(cid)
                                                break
                                    if not conf_id:
                                        for item in info_list:
                                            if not isinstance(item, dict):
                                                continue
                                            cid = item.get("id") or item.get("conf_id")
                                            if cid:
                                                conf_id = str(cid)
                                                break
                                routing_debug["abconf_pick"] = conf_id
    
                            if conf_id:
                                upd = await client.update_umo_abconf_route(umo=umo, conf_id=conf_id)
                                routing_debug["ucr_update"] = upd
            except Exception as e:
                routing_debug["ucr_exception"] = str(e)
    
            # 2) Copy provider_perf rule from an existing webchat UMO (avoids "no provider supported" on fresh sessions).
            try:
                rules_resp = await client.list_session_rules(
                    page=1, page_size=100, search=f"webchat!{username}!"
                )
                routing_debug["session_rules_get"] = (
                    rules_resp if rules_resp.get("status") != "ok" else None
                )
                if rules_resp.get("status") == "ok":
                    data = rules_resp.get("data") or {}
                    rules_list = data.get("rules") or []
                    if isinstance(rules_list, list):
                        source_umo = None
                        source_key = None
                        source_val = None
                        for item in rules_list:
                            if not isinstance(item, dict):
                                continue
                            rules = item.get("rules") or {}
                            if not isinstance(rules, dict):
                                continue
                            for k, v in rules.items():
                                if isinstance(k, str) and k.startswith("provider_perf_") and "chat" in k:
                                    source_umo = item.get("umo")
                                    source_key = k
                                    source_val = v
                                    break
                            if source_key:
                                break
    
                        if source_key and source_val is not None:
                            upd = await client.update_session_rule(
                                umo=umo, rule_key=source_key, rule_value=source_val
                            )
                            routing_debug["provider_rule_copied_from"] = source_umo
                            routing_debug["provider_rule_key"] = source_key
                            routing_debug["provider_rule_update"] = upd
            except Exception as e:
                routing_debug["session_rules_exception"] = str(e)
        else:
            routing_debug["skipped"] = "No ASTRBOT_USERNAME configured; cannot mirror dashboard session routing."
    
        if reply_to_last_user_message and not reply_to_message_id:
            async with _LAST_USER_MESSAGE_ID_LOCK:
                reply_to_message_id = _LAST_USER_MESSAGE_ID_BY_SESSION.get(
                    _last_saved_key(client, used_session_id)
                )
    
        # reply_to_last_saved_message historically points to the last saved bot message (message_saved.id).
        # With user_message_saved supported, callers can prefer last_user_message_id from the response.
        if reply_to_last_saved_message and not reply_to_message_id:
            async with _LAST_SAVED_MESSAGE_ID_LOCK:
                reply_to_message_id = _LAST_SAVED_MESSAGE_ID_BY_SESSION.get(
                    _last_saved_key(client, used_session_id)
                )
    
        # 2. 把 message_chain 转成 AstrBot chat/send 需要的 message_parts
        explicit_reply_present = False
        for part in message_chain:
            if not isinstance(part, dict):
                continue
            if part.get("type") in ("reply", "quote", "reference"):
                msg_id = part.get("message_id") or part.get("id")
                if msg_id is not None and str(msg_id).strip():
                    explicit_reply_present = True
                    break
    
        message_parts: List[Dict[str, Any]] = []
        reply_ids: List[str] = []
        if reply_to_message_id and not explicit_reply_present:
            message_parts.append(
                {
                    "type": "reply",
                    "message_id": _normalize_history_message_id(reply_to_message_id),
                }
            )
            reply_ids.append(str(reply_to_message_id))
    
        quote_debug: Dict[str, Any] | None = None
        uploaded_attachments: List[Dict[str, Any]] = []
    
        for part in message_chain:
            p_type = part.get("type")
    
            if p_type == "plain":
                text = part.get("text", "")
                message_parts.append({"type": "plain", "text": text})
            elif p_type in ("reply", "quote", "reference"):
                msg_id = part.get("message_id") or part.get("id")
                if msg_id is None:
                    continue
                msg_id_str = str(msg_id).strip()
                if not msg_id_str:
                    continue
                message_parts.append(
                    {"type": "reply", "message_id": _normalize_history_message_id(msg_id)}
                )
                reply_ids.append(msg_id_str)
            elif p_type in ("image", "file", "record", "video"):
                file_path = part.get("file_path")
                url = part.get("url")
                file_name = part.get("file_name")
                mime_type = part.get("mime_type")
    
                src = url or file_path
                if not src:
                    continue
    
                if isinstance(src, str) and src.startswith(("http://", "https://")):
                    if not file_name:
                        from urllib.parse import urlparse
                        parsed = urlparse(src)
                        file_name = os.path.basename(parsed.path) or None
                    try:
                        attach_resp = await client.post_attachment_url(
                            src,
                            file_name=file_name,
                            mime_type=mime_type,
                        )
                    except Exception as e:
                        return {
                            "status": "error",
                            "message": f"AstrBot API error: {e.response.status_code if hasattr(e, 'response') else 'Unknown'}",
                            "platform_id": platform_id,
                            "session_id": used_session_id,
                            "base_url": client.base_url,
                            "detail": _httpx_error_detail(e),
                        }
                else:
                    if not isinstance(src, str):
                        return {
                            "status": "error",
                            "message": f"Invalid local file_path: {src!r}",
                            "platform_id": platform_id,
                            "session_id": used_session_id,
                            "part": dict(part),
                        }
                    try:
                        src = _resolve_local_file_path(client, src)
                    except ValueError as e:
                        return {
                            "status": "error",
                            "message": str(e),
                            "platform_id": platform_id,
                            "session_id": used_session_id,
                            "part": dict(part),
                            "hint": "Set ASTRBOTMCP_FILE_ROOT to control how relative paths are resolved.",
                        }
                    except FileNotFoundError:
                        return {
                            "status": "error",
                            "message": f"Local file_path does not exist: {src!r}",
                            "platform_id": platform_id,
                            "session_id": used_session_id,
                            "part": dict(part),
                            "hint": "If you passed a relative path, set ASTRBOTMCP_FILE_ROOT (or run the server in the correct working directory).",
                        }
                    try:
                        attach_resp = await client.post_attachment_file(
                            src,
                            file_name=file_name,
                            mime_type=mime_type,
                        )
                    except Exception as e:
                        return {
                            "status": "error",
                            "message": f"AstrBot API error: {e.response.status_code if hasattr(e, 'response') else 'Unknown'}",
                            "platform_id": platform_id,
                            "session_id": used_session_id,
                            "base_url": client.base_url,
                            "detail": _httpx_error_detail(e),
                        }
                if attach_resp.get("status") != "ok":
                    return {
                        "status": attach_resp.get("status"),
                        "message": attach_resp.get("message"),
                        "raw": attach_resp,
                    }
                attach_data = attach_resp.get("data") or {}
                attachment_id = attach_data.get("attachment_id")
                if not attachment_id:
                    return {
                        "status": "error",
                        "message": "Attachment upload succeeded but attachment_id is missing",
                        "raw": attach_resp,
                    }
                # /api/chat/send has a pre-check based on `type`, so include media type
                # alongside attachment_id (otherwise it may treat the message as empty).
                attachment_type = attach_data.get("type") or p_type
                message_parts.append({"type": attachment_type, "attachment_id": attachment_id})
                uploaded_attachments.append(attach_data)
            else:
                # 忽略未知类型
                continue
    
        if reply_ids:
            try:
                _ignored, quote_debug = await _resolve_webchat_quotes(
                    client, session_id=used_session_id, reply_ids=reply_ids
                )
            except Exception as e:
                quote_debug = {"error": str(e), "resolved": {}, "missing": reply_ids}
    
        if not message_parts:
            return {
                "status": "error",
                "message": "message_chain did not produce any valid message parts",
                "mode": mode,
                "platform_id": session_platform_id,
                "requested_platform_id": platform_id,
                "quote_debug": quote_debug,
                "routing_debug": routing_debug,
            }
    
        # 3. 调用 /api/chat/send 并消费 SSE 回复
        # Mirror dashboard behavior: prefer session rules and UMO routing.
        # If we cannot infer/copy provider rules for a brand-new session, fall back to env defaults.
        effective_provider = selected_provider
        effective_model = selected_model
        if (
            effective_provider is None
            and effective_model is None
            and not routing_debug.get("provider_rule_key")
        ):
            effective_provider = client.settings.default_provider
            effective_model = client.settings.default_model
    
        try:
            events = await client.send_chat_message_sse(
                session_id=used_session_id,
                message_parts=message_parts,
                selected_provider=effective_provider,
                selected_model=effective_model,
                enable_streaming=enable_streaming,
            )
        except Exception as e:
            status_code = getattr(getattr(e, "response", None), "status_code", None)
            return {
                "status": "error",
                "message": (
                    f"AstrBot API error: HTTP {status_code}"
                    if status_code is not None
                    else f"AstrBot API error: {e}"
                ),
                "mode": mode,
                "platform_id": session_platform_id,
                "requested_platform_id": platform_id,
                "session_id": used_session_id,
                "selected_provider": effective_provider,
                "selected_model": effective_model,
                "request_message_parts": message_parts,
                "detail": _httpx_error_detail(e),
                "astrbot_logs_tail": await _get_astrbot_log_tail(client),
                "hint": (
                    "If you see 'has no provider supported' in AstrBot logs, "
                    "set selected_provider/selected_model (or env ASTRBOT_DEFAULT_PROVIDER/ASTRBOT_DEFAULT_MODEL)."
                ),
                "quote_debug": quote_debug,
                "routing_debug": routing_debug,
            }
    
        # 简单聚合文本回复(仅供参考,保留原始事件)
        reply_text_chunks: List[str] = []
        saved_message_ids: List[str] = []
        user_message_ids: List[str] = []
        if not events:
            return {
                "status": "error",
                "message": "AstrBot returned no SSE events for /api/chat/send",
                "mode": mode,
                "platform_id": session_platform_id,
                "requested_platform_id": platform_id,
                "session_id": used_session_id,
                "selected_provider": effective_provider,
                "selected_model": effective_model,
                "request_message_parts": message_parts,
                "astrbot_logs_tail": await _get_astrbot_log_tail(client),
                "hint": "Check AstrBot logs for the root cause (often provider/model config).",
                "quote_debug": quote_debug,
                "routing_debug": routing_debug,
            }
    
        # If we only got bookkeeping events (e.g., user_message_saved) but no response stream at all,
        # treat it as an error while still returning useful ids.
        response_types = {
            "plain",
            "complete",
            "image",
            "record",
            "file",
            "message_saved",
            "end",
            "break",
            "raw",
        }
        has_response = any(ev.get("type") in response_types for ev in events if isinstance(ev, dict))
        if not has_response:
            user_ids = []
            for ev in events:
                if not isinstance(ev, dict):
                    continue
                if ev.get("type") == "user_message_saved":
                    data = ev.get("data") or {}
                    mid = data.get("id")
                    if mid is not None:
                        user_ids.append(str(mid))
            # Some plugins reply by side effects (e.g., sending messages via adapters) and may only
            # emit bookkeeping events on the WebChat SSE stream. Treat as ok but include a warning.
            return {
                "status": "ok",
                "warning": "No reply events were observed on the /api/chat/send SSE stream; check AstrBot logs if you expected an LLM reply.",
                "mode": mode,
                "platform_id": session_platform_id,
                "requested_platform_id": platform_id,
                "session_id": used_session_id,
                "selected_provider": effective_provider,
                "selected_model": effective_model,
                "request_message_parts": message_parts,
                "user_message_ids": user_ids,
                "last_user_message_id": (user_ids[-1] if user_ids else None),
                "quote_debug": quote_debug,
                "routing_debug": routing_debug,
                "reply_events": events,
                "astrbot_logs_tail": await _get_astrbot_log_tail(client),
            }
    
        for ev in events:
            if ev.get("type") == "user_message_saved":
                data = ev.get("data") or {}
                saved_id = data.get("id")
                if saved_id is not None:
                    user_message_ids.append(str(saved_id))
                    async with _LAST_USER_MESSAGE_ID_LOCK:
                        _LAST_USER_MESSAGE_ID_BY_SESSION[_last_saved_key(client, used_session_id)] = str(
                            saved_id
                        )
            if ev.get("type") == "message_saved":
                data = ev.get("data") or {}
                saved_id = data.get("id")
                if saved_id is not None:
                    saved_message_ids.append(str(saved_id))
            if ev.get("type") in ("plain", "complete"):
                data = ev.get("data")
                if isinstance(data, str):
                    reply_text_chunks.append(data)
    
        # Fallback: some AstrBot versions do not emit `user_message_saved`.
        # Try to infer the latest user message id by fetching /api/chat/get_session and scanning history.
        if not user_message_ids:
            match_hint = None
            for part in message_parts:
                if not isinstance(part, dict):
                    continue
                if part.get("type") == "plain":
                    txt = part.get("text")
                    if isinstance(txt, str) and txt.strip():
                        match_hint = txt.strip()
                        break
    
            try:
                sess = await client.get_platform_session(session_id=used_session_id)
                if sess.get("status") == "ok":
                    history = (sess.get("data") or {}).get("history") or []
                    if isinstance(history, list):
                        expected_sender = (client.settings.username or "").strip() or None
    
                        def is_recent_user_record(item: Dict[str, Any]) -> bool:
                            if not isinstance(item, dict):
                                return False
                            content = item.get("content") or {}
                            if not isinstance(content, dict) or content.get("type") != "user":
                                return False
                            if expected_sender and item.get("sender_name") != expected_sender:
                                return False
                            if match_hint:
                                extracted = _extract_plain_text_from_history_item(item)
                                if match_hint[:32] not in extracted:
                                    return False
                            created_at = item.get("created_at")
                            if isinstance(created_at, str) and created_at:
                                try:
                                    # e.g. 2025-12-18T21:47:07.684801+08:00
                                    dt = datetime.fromisoformat(created_at)
                                    if dt.tzinfo is None:
                                        dt = dt.replace(tzinfo=timezone.utc)
                                    # accept a small clock skew window
                                    return dt.astimezone(timezone.utc) >= send_started_at.replace(
                                        microsecond=0
                                    ) - timedelta(seconds=5)
                                except Exception:
                                    pass
                            return True
    
                        # Look from newest to oldest for a likely match.
                        for item in reversed(history):
                            if not isinstance(item, dict):
                                continue
                            if not is_recent_user_record(item):
                                continue
                            mid = item.get("id")
                            if mid is None:
                                continue
                            user_message_ids.append(str(mid))
                            async with _LAST_USER_MESSAGE_ID_LOCK:
                                _LAST_USER_MESSAGE_ID_BY_SESSION[_last_saved_key(client, used_session_id)] = str(
                                    mid
                                )
                            break
            except Exception as e:
                routing_debug["user_id_fallback_exception"] = str(e)
    
        last_saved_message_id: str | None = (
            saved_message_ids[-1] if saved_message_ids else None
        )
        last_user_message_id: str | None = (
            user_message_ids[-1] if user_message_ids else None
        )
        if last_saved_message_id:
            async with _LAST_SAVED_MESSAGE_ID_LOCK:
                _LAST_SAVED_MESSAGE_ID_BY_SESSION[_last_saved_key(client, used_session_id)] = (
                    last_saved_message_id
                )
    
        return {
            "status": "ok",
            "mode": mode,
            "platform_id": session_platform_id,
            "requested_platform_id": platform_id,
            "session_id": used_session_id,
            "conversation_id": used_session_id,
            "session_reused": session_reused,
            "selected_provider": effective_provider,
            "selected_model": effective_model,
            "request_message_parts": message_parts,
            "uploaded_attachments": uploaded_attachments,
            "reply_events": events,
            "reply_text": "".join(reply_text_chunks),
            "user_message_ids": user_message_ids,
            "last_user_message_id": last_user_message_id,
            "saved_message_ids": saved_message_ids,
            "last_saved_message_id": last_saved_message_id,
            "quote_debug": quote_debug,
            "routing_debug": routing_debug,
        }
  • Registers the send_platform_message handler with the FastMCP server under the tool name 'send_platform_message'.
    server.tool(astrbot_tools.send_platform_message, name="send_platform_message")
  • TypedDict schema defining the structure of individual parts in the message_chain parameter for the tool.
    class MessagePart(TypedDict, total=False):
        """
        A single message part for send_platform_message.
    
        Types:
          - plain:  {"type": "plain", "text": "..."}
          - reply:  {"type": "reply", "message_id": "..."}
          - quote:  {"type": "quote", "message_id": "..."}  (alias of reply)
          - reference: {"type": "reference", "message_id": "..."}  (alias of reply)
          - image:  {"type": "image", "file_path": "..."} or {"type": "image", "url": "https://..."}
          - file:   {"type": "file", "file_path": "..."}  or {"type": "file", "url": "https://..."}
          - record: {"type": "record", "file_path": "..."} or {"type": "record", "url": "https://..."}
          - video:  {"type": "video", "file_path": "..."} or {"type": "video", "url": "https://..."}
        """
    
        type: Literal[
            "plain",
            "reply",
            "quote",
            "reference",
            "image",
            "file",
            "record",
            "video",
        ]
        text: str
        message_id: str
        id: str
        file_path: str
        url: str
        file_name: str
        mime_type: str
  • The tool is listed in the astrbot://info resource, advertising it to MCP hosts.
    "send_platform_message",
    "send_platform_message_direct",

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/xunxiing/astrbotmcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server