Skip to main content
Glama

get_platform_session_messages

Retrieve recent chat messages from AstrBot logs for debugging and monitoring, using log history as the reliable source when conversation data is unavailable.

Instructions

Get a platform target's recent messages from AstrBot logs.

This tool intentionally uses AstrBot's log broker history (/api/log-history) as the source of truth, since some AstrBot builds do not persist platform conversation history under /api/conversation/detail for group/user targets.

Args:

  • target_id: Platform target ID (e.g. group_id like "1030223077").

  • platform_id: Optional platform id (e.g. "napcat"). If omitted, use the first enabled platform.

  • message_type: "GroupMessage" or "FriendMessage" (default: "GroupMessage").

  • wait_seconds: If > 0, poll and return SSE-like events for up to this many seconds.

  • max_messages: Max number of history items to return (from the tail).

  • poll_interval_seconds: Poll interval when wait_seconds > 0.

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
target_idYes
platform_idNo
message_typeNoGroupMessage
wait_secondsNo
max_messagesNo
poll_interval_secondsNo

Implementation Reference

  • The core async handler function implementing the tool logic: fetches recent platform session messages from AstrBot's log history (/api/log-history) with fallback parsing, optional live polling, and compaction/deduping.
    async def get_platform_session_messages(
        target_id: str,
        platform_id: Optional[str] = None,
        message_type: str = "GroupMessage",
        wait_seconds: int = 0,
        max_messages: int = 50,
        poll_interval_seconds: float = 1.0,
    ) -> Dict[str, Any]:
        """
        Get a platform target's recent messages from AstrBot logs.
    
        This tool intentionally uses AstrBot's log broker history (/api/log-history) as the
        source of truth, since some AstrBot builds do not persist platform conversation history
        under /api/conversation/detail for group/user targets.
    
        Args:
          - target_id: Platform target ID (e.g. group_id like "1030223077").
          - platform_id: Optional platform id (e.g. "napcat"). If omitted, use the first enabled platform.
          - message_type: "GroupMessage" or "FriendMessage" (default: "GroupMessage").
          - wait_seconds: If > 0, poll and return SSE-like events for up to this many seconds.
          - max_messages: Max number of history items to return (from the tail).
          - poll_interval_seconds: Poll interval when wait_seconds > 0.
        """
        client = AstrBotClient.from_env()
    
        if not target_id or not str(target_id).strip():
            return {"status": "error", "message": "Missing key: target_id"}
    
        if max_messages <= 0:
            return {"status": "error", "message": "max_messages must be > 0"}
        if max_messages > 5000:
            max_messages = 5000
    
        if wait_seconds < 0:
            wait_seconds = 0
        if poll_interval_seconds <= 0:
            poll_interval_seconds = 1.0
    
        resolved_platform_id = platform_id
        if not resolved_platform_id:
            try:
                plist = await client.get_platform_list()
            except Exception as e:
                return {
                    "status": "error",
                    "message": f"AstrBot API error: {e.response.status_code if hasattr(e, 'response') else 'Unknown'}",
                    "base_url": client.base_url,
                    "detail": _httpx_error_detail(e),
                    "hint": _astrbot_connect_hint(client),
                }
            if plist.get("status") != "ok":
                return {
                    "status": plist.get("status"),
                    "message": plist.get("message"),
                    "raw": plist,
                }
            platforms = (plist.get("data") or {}).get("platforms", [])
            enabled = [p for p in platforms if p.get("enable") is True]
            if not enabled:
                return {
                    "status": "error",
                    "message": "No enabled platforms found; pass platform_id explicitly.",
                    "platforms": platforms,
                }
            resolved_platform_id = str(enabled[0].get("id") or "").strip() or None
            if not resolved_platform_id:
                return {
                    "status": "error",
                    "message": "Failed to resolve platform_id from AstrBot platform list.",
                    "platforms": platforms,
                }
    
        # Primary: use /api/log-history
        history_source = "astrbot_log"
        log_fallback_used = True
        cid: str | None = None
        umo = f"{resolved_platform_id}:{message_type}:{target_id}"
    
        history, history_meta = await _fallback_history_from_logs(
            client,
            platform_id=resolved_platform_id,
            message_type=message_type,
            target_id=target_id,
            max_messages=max_messages,
        )
    
        log_fallback_error: Dict[str, Any] | None = None
        log_fallback_compaction: Dict[str, Any] | None = None
        if history_meta and history_meta.get("status") == "ok":
            log_fallback_compaction = (history_meta.get("compaction") or None)
        elif history_meta:
            log_fallback_error = history_meta
    
        # Optional: wait for new logs and return as delta
        live_error: Dict[str, Any] | None = None
        delta_items: List[Dict[str, Any]] = []
        if wait_seconds > 0:
            try:
                live_events = await client.get_live_logs(
                    wait_seconds=wait_seconds,
                    max_events=min(max_messages * 50, 2000),
                )
                # Reuse compaction by treating the live events list as a log tail.
                # Seed with existing keys to avoid duplicates.
                seed_seen: set[tuple] = set()
                seed_ltm: set[tuple] = set()
                seed_raw_sig_to_idx: Dict[tuple, int] = {}
                for it in history:
                    t = it.get("time")
                    s = it.get("sender")
                    c = (it.get("content") or "").strip()
                    seed_seen.add((umo, t, s, c))
                    if it.get("kind") == "ltm":
                        seed_ltm.add((umo, t, s))
                # Inline-compaction of live events (reverse scan to prefer ltm).
                scan_limit = len(live_events)
                kept: List[Dict[str, Any] | None] = []
                raw_sig_to_idx = dict(seed_raw_sig_to_idx)
                ltm_sig = set(seed_ltm)
                seen_keys = set(seed_seen)
                dropped_duplicates = 0
                dropped_raw_due_to_ltm = 0
    
                target_id_int: int | None = int(target_id) if str(target_id).isdigit() else None
                for entry in reversed(live_events):
                    if not _log_entry_matches(
                        entry,
                        platform_id=resolved_platform_id,
                        message_type=message_type,
                        target_id=target_id,
                        umo=umo,
                    ):
                        continue
    
                    text = _strip_ansi(_extract_log_text(entry))
                    entry_time = _extract_log_time(entry) or _extract_time_from_text(text)
                    level = _extract_log_level(entry)
    
                    kind = "log"
                    sender: str | None = None
                    content: str | None = None
                    message_id: int | None = None
                    user_id: int | None = None
                    group_id: int | None = None
    
                    ltm_parsed = _parse_ltm_line(text)
                    if ltm_parsed and ltm_parsed.get("umo") == umo:
                        kind = "ltm"
                        sender = ltm_parsed.get("sender")
                        content = ltm_parsed.get("content")
                        entry_time = entry_time or ltm_parsed.get("time")
                    else:
                        raw_parsed = _parse_aiocqhttp_rawmessage(text)
                        if (
                            raw_parsed
                            and target_id_int is not None
                            and raw_parsed.get("group_id") == target_id_int
                        ):
                            kind = "raw_message"
                            sender = raw_parsed.get("sender")
                            content = raw_parsed.get("content")
                            message_id = raw_parsed.get("message_id")
                            user_id = raw_parsed.get("user_id")
                            group_id = raw_parsed.get("group_id")
    
                    sig = (umo, entry_time, sender)
                    if kind == "ltm":
                        ltm_sig.add(sig)
                        raw_idx = raw_sig_to_idx.pop(sig, None)
                        if raw_idx is not None and 0 <= raw_idx < len(kept) and kept[raw_idx] is not None:
                            kept[raw_idx] = None
                            dropped_raw_due_to_ltm += 1
                    elif kind == "raw_message":
                        if sig in ltm_sig:
                            dropped_raw_due_to_ltm += 1
                            continue
    
                    content_norm = (content or "").strip()
                    key = (umo, entry_time, sender, content_norm)
                    if key in seen_keys:
                        dropped_duplicates += 1
                        continue
                    seen_keys.add(key)
    
                    item: Dict[str, Any] = {"kind": kind, "time": entry_time, "sender": sender, "content": content, "text": text, "raw": entry}
                    if level:
                        item["level"] = level
                    if message_id is not None:
                        item["message_id"] = message_id
                    if user_id is not None:
                        item["user_id"] = user_id
                    if group_id is not None:
                        item["group_id"] = group_id
    
                    if kind == "raw_message":
                        raw_sig_to_idx[sig] = len(kept)
                    kept.append(item)
    
                delta_items = [x for x in reversed(kept) if x is not None]
                if log_fallback_compaction is not None:
                    log_fallback_compaction = {
                        **log_fallback_compaction,
                        "live_scan_limit": scan_limit,
                        "live_dropped_duplicates": dropped_duplicates,
                        "live_dropped_raw_due_to_ltm": dropped_raw_due_to_ltm,
                    }
            except Exception as e:
                live_error = {
                    "status": "error",
                    "message": f"AstrBot API error: {e.response.status_code if hasattr(e, 'response') else 'Unknown'}",
                    "base_url": client.base_url,
                    "detail": _httpx_error_detail(e),
                    "hint": "Failed to fetch /api/live-log for wait_seconds mode.",
                }
    
        merged_history = (history + delta_items)[-max_messages:]
        levels = {it.get("level") for it in merged_history if it.get("level")}
        log_level: str | None = None
        if len(levels) == 1:
            log_level = next(iter(levels))
            for it in merged_history:
                it.pop("level", None)
            for it in history:
                it.pop("level", None)
            for it in delta_items:
                it.pop("level", None)
    
        sse_events: List[Dict[str, Any]] = [{"type": "snapshot", "data": history}]
        if delta_items:
            sse_events.append({"type": "delta", "data": delta_items})
    
        return {
            "status": "ok",
            "platform_id": resolved_platform_id,
            "message_type": message_type,
            "target_id": target_id,
            "umo": umo,
            "cid": cid,
            "wait_seconds": wait_seconds,
            "max_messages": max_messages,
            "poll_interval_seconds": poll_interval_seconds,
            "history_source": history_source,
            "log_fallback_used": log_fallback_used,
            "log_fallback_error": log_fallback_error,
            "log_fallback_compaction": log_fallback_compaction,
            "log_level": log_level,
            "live_log_error": live_error,
            "sse_events": sse_events,
            "history": merged_history,
        }
  • FastMCP server.tool() registration of the get_platform_session_messages handler from astrbot_tools module.
    server.tool(
        astrbot_tools.get_platform_session_messages,
        name="get_platform_session_messages",
    )
  • Import and re-export of the tool handler in the astrbot_tools namespace used by server.py.
    from .session_tools import get_platform_session_messages
  • Key helper function called by the handler to parse and extract relevant log entries for the target UMO.
    async def _fallback_history_from_logs(
        client: AstrBotClient,
        *,
        platform_id: str,
        message_type: str,
        target_id: str,
        max_messages: int,
    ) -> Tuple[List[Dict[str, Any]], Dict[str, Any] | None]:
        """
        Best-effort fallback: derive recent messages from AstrBot log broker history.
    
        This is a pragmatic workaround for cases where AstrBot does not persist platform
        conversations (conversation.history is empty) but logs still contain message events.
        """
        try:
            log_resp = await client.get_log_history()
        except Exception as e:
            return [], {
                "status": "error",
                "message": f"AstrBot API error: {e.response.status_code if hasattr(e, 'response') else 'Unknown'}",
                "base_url": client.base_url,
                "detail": _httpx_error_detail(e),
                "hint": "Failed to fetch /api/log-history for log fallback.",
            }
    
        if log_resp.get("status") != "ok":
            return [], {
                "status": log_resp.get("status"),
                "message": log_resp.get("message"),
                "raw": log_resp,
            }
    
        logs = (log_resp.get("data") or {}).get("logs", [])
        if not isinstance(logs, list) or not logs:
            return [], None
    
        umo = f"{platform_id}:{message_type}:{target_id}"
        target_id_int: int | None = int(target_id) if str(target_id).isdigit() else None
        scan_limit = min(len(logs), max(max_messages * 50, max_messages))
        tail = logs[-scan_limit:]
    
        kept: List[Dict[str, Any] | None] = []
        seen_keys: set[tuple] = set()
        raw_sig_to_idx: Dict[tuple, int] = {}
        ltm_sig: set[tuple] = set()
        dropped_duplicates = 0
        dropped_raw_due_to_ltm = 0
    
        for entry in reversed(tail):
            if not _log_entry_matches(
                entry,
                platform_id=platform_id,
                message_type=message_type,
                target_id=target_id,
                umo=umo,
            ):
                continue
    
            text = _strip_ansi(_extract_log_text(entry))
            entry_time = _extract_log_time(entry) or _extract_time_from_text(text)
            level = _extract_log_level(entry)
    
            kind = "log"
            sender: str | None = None
            content: str | None = None
            message_id: int | None = None
            user_id: int | None = None
            group_id: int | None = None
    
            ltm_parsed = _parse_ltm_line(text)
            if ltm_parsed and ltm_parsed.get("umo") == umo:
                kind = "ltm"
                sender = ltm_parsed.get("sender")
                content = ltm_parsed.get("content")
                entry_time = entry_time or ltm_parsed.get("time")
            else:
                raw_parsed = _parse_aiocqhttp_rawmessage(text)
                if raw_parsed and target_id_int is not None and raw_parsed.get("group_id") == target_id_int:
                    kind = "raw_message"
                    sender = raw_parsed.get("sender")
                    content = raw_parsed.get("content")
                    message_id = raw_parsed.get("message_id")
                    user_id = raw_parsed.get("user_id")
                    group_id = raw_parsed.get("group_id")
    
            # Dedup heuristics:
            # - Prefer ltm over raw_message when same (umo,time,sender).
            sig = (umo, entry_time, sender)
            if kind == "ltm":
                ltm_sig.add(sig)
                raw_idx = raw_sig_to_idx.pop(sig, None)
                if raw_idx is not None and 0 <= raw_idx < len(kept) and kept[raw_idx] is not None:
                    kept[raw_idx] = None
                    dropped_raw_due_to_ltm += 1
            elif kind == "raw_message":
                if sig in ltm_sig:
                    dropped_raw_due_to_ltm += 1
                    continue
    
            content_norm = (content or "").strip()
            key = (umo, entry_time, sender, content_norm)
            if key in seen_keys:
                dropped_duplicates += 1
                continue
            seen_keys.add(key)
    
            item: Dict[str, Any] = {"kind": kind, "time": entry_time, "sender": sender, "content": content, "text": text, "raw": entry}
            if level:
                item["level"] = level
            if message_id is not None:
                item["message_id"] = message_id
            if user_id is not None:
                item["user_id"] = user_id
            if group_id is not None:
                item["group_id"] = group_id
    
            if kind == "raw_message":
                raw_sig_to_idx[sig] = len(kept)
    
            kept.append(item)
    
        matched = [x for x in reversed(kept) if x is not None]
        compacted = matched[-max_messages:]
        compaction = {
            "scan_limit": scan_limit,
            "dropped_duplicates": dropped_duplicates,
            "dropped_raw_due_to_ltm": dropped_raw_due_to_ltm,
        }
        return compacted, {"status": "ok", "compaction": compaction}

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/xunxiing/astrbotmcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server