Skip to main content
Glama

get_platform_session_messages

Retrieve recent chat messages from AstrBot logs for debugging and monitoring, using log history as the reliable source when conversation data is unavailable.

Instructions

Get a platform target's recent messages from AstrBot logs.

This tool intentionally uses AstrBot's log broker history (/api/log-history) as the source of truth, since some AstrBot builds do not persist platform conversation history under /api/conversation/detail for group/user targets.

Args:

  • target_id: Platform target ID (e.g. group_id like "1030223077").

  • platform_id: Optional platform id (e.g. "napcat"). If omitted, use the first enabled platform.

  • message_type: "GroupMessage" or "FriendMessage" (default: "GroupMessage").

  • wait_seconds: If > 0, poll and return SSE-like events for up to this many seconds.

  • max_messages: Max number of history items to return (from the tail).

  • poll_interval_seconds: Poll interval when wait_seconds > 0.

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
target_idYes
platform_idNo
message_typeNoGroupMessage
wait_secondsNo
max_messagesNo
poll_interval_secondsNo

Output Schema

TableJSON Schema
NameRequiredDescriptionDefault

No arguments

Implementation Reference

  • The core async handler function implementing the tool logic: fetches recent platform session messages from AstrBot's log history (/api/log-history) with fallback parsing, optional live polling, and compaction/deduping.
    async def get_platform_session_messages(
        target_id: str,
        platform_id: Optional[str] = None,
        message_type: str = "GroupMessage",
        wait_seconds: int = 0,
        max_messages: int = 50,
        poll_interval_seconds: float = 1.0,
    ) -> Dict[str, Any]:
        """
        Get a platform target's recent messages from AstrBot logs.
    
        This tool intentionally uses AstrBot's log broker history (/api/log-history) as the
        source of truth, since some AstrBot builds do not persist platform conversation history
        under /api/conversation/detail for group/user targets.
    
        Args:
          - target_id: Platform target ID (e.g. group_id like "1030223077").
          - platform_id: Optional platform id (e.g. "napcat"). If omitted, use the first enabled platform.
          - message_type: "GroupMessage" or "FriendMessage" (default: "GroupMessage").
          - wait_seconds: If > 0, poll and return SSE-like events for up to this many seconds.
          - max_messages: Max number of history items to return (from the tail).
          - poll_interval_seconds: Poll interval when wait_seconds > 0.
        """
        client = AstrBotClient.from_env()
    
        if not target_id or not str(target_id).strip():
            return {"status": "error", "message": "Missing key: target_id"}
    
        if max_messages <= 0:
            return {"status": "error", "message": "max_messages must be > 0"}
        if max_messages > 5000:
            max_messages = 5000
    
        if wait_seconds < 0:
            wait_seconds = 0
        if poll_interval_seconds <= 0:
            poll_interval_seconds = 1.0
    
        resolved_platform_id = platform_id
        if not resolved_platform_id:
            try:
                plist = await client.get_platform_list()
            except Exception as e:
                return {
                    "status": "error",
                    "message": f"AstrBot API error: {e.response.status_code if hasattr(e, 'response') else 'Unknown'}",
                    "base_url": client.base_url,
                    "detail": _httpx_error_detail(e),
                    "hint": _astrbot_connect_hint(client),
                }
            if plist.get("status") != "ok":
                return {
                    "status": plist.get("status"),
                    "message": plist.get("message"),
                    "raw": plist,
                }
            platforms = (plist.get("data") or {}).get("platforms", [])
            enabled = [p for p in platforms if p.get("enable") is True]
            if not enabled:
                return {
                    "status": "error",
                    "message": "No enabled platforms found; pass platform_id explicitly.",
                    "platforms": platforms,
                }
            resolved_platform_id = str(enabled[0].get("id") or "").strip() or None
            if not resolved_platform_id:
                return {
                    "status": "error",
                    "message": "Failed to resolve platform_id from AstrBot platform list.",
                    "platforms": platforms,
                }
    
        # Primary: use /api/log-history
        history_source = "astrbot_log"
        log_fallback_used = True
        cid: str | None = None
        umo = f"{resolved_platform_id}:{message_type}:{target_id}"
    
        history, history_meta = await _fallback_history_from_logs(
            client,
            platform_id=resolved_platform_id,
            message_type=message_type,
            target_id=target_id,
            max_messages=max_messages,
        )
    
        log_fallback_error: Dict[str, Any] | None = None
        log_fallback_compaction: Dict[str, Any] | None = None
        if history_meta and history_meta.get("status") == "ok":
            log_fallback_compaction = (history_meta.get("compaction") or None)
        elif history_meta:
            log_fallback_error = history_meta
    
        # Optional: wait for new logs and return as delta
        live_error: Dict[str, Any] | None = None
        delta_items: List[Dict[str, Any]] = []
        if wait_seconds > 0:
            try:
                live_events = await client.get_live_logs(
                    wait_seconds=wait_seconds,
                    max_events=min(max_messages * 50, 2000),
                )
                # Reuse compaction by treating the live events list as a log tail.
                # Seed with existing keys to avoid duplicates.
                seed_seen: set[tuple] = set()
                seed_ltm: set[tuple] = set()
                seed_raw_sig_to_idx: Dict[tuple, int] = {}
                for it in history:
                    t = it.get("time")
                    s = it.get("sender")
                    c = (it.get("content") or "").strip()
                    seed_seen.add((umo, t, s, c))
                    if it.get("kind") == "ltm":
                        seed_ltm.add((umo, t, s))
                # Inline-compaction of live events (reverse scan to prefer ltm).
                scan_limit = len(live_events)
                kept: List[Dict[str, Any] | None] = []
                raw_sig_to_idx = dict(seed_raw_sig_to_idx)
                ltm_sig = set(seed_ltm)
                seen_keys = set(seed_seen)
                dropped_duplicates = 0
                dropped_raw_due_to_ltm = 0
    
                target_id_int: int | None = int(target_id) if str(target_id).isdigit() else None
                for entry in reversed(live_events):
                    if not _log_entry_matches(
                        entry,
                        platform_id=resolved_platform_id,
                        message_type=message_type,
                        target_id=target_id,
                        umo=umo,
                    ):
                        continue
    
                    text = _strip_ansi(_extract_log_text(entry))
                    entry_time = _extract_log_time(entry) or _extract_time_from_text(text)
                    level = _extract_log_level(entry)
    
                    kind = "log"
                    sender: str | None = None
                    content: str | None = None
                    message_id: int | None = None
                    user_id: int | None = None
                    group_id: int | None = None
    
                    ltm_parsed = _parse_ltm_line(text)
                    if ltm_parsed and ltm_parsed.get("umo") == umo:
                        kind = "ltm"
                        sender = ltm_parsed.get("sender")
                        content = ltm_parsed.get("content")
                        entry_time = entry_time or ltm_parsed.get("time")
                    else:
                        raw_parsed = _parse_aiocqhttp_rawmessage(text)
                        if (
                            raw_parsed
                            and target_id_int is not None
                            and raw_parsed.get("group_id") == target_id_int
                        ):
                            kind = "raw_message"
                            sender = raw_parsed.get("sender")
                            content = raw_parsed.get("content")
                            message_id = raw_parsed.get("message_id")
                            user_id = raw_parsed.get("user_id")
                            group_id = raw_parsed.get("group_id")
    
                    sig = (umo, entry_time, sender)
                    if kind == "ltm":
                        ltm_sig.add(sig)
                        raw_idx = raw_sig_to_idx.pop(sig, None)
                        if raw_idx is not None and 0 <= raw_idx < len(kept) and kept[raw_idx] is not None:
                            kept[raw_idx] = None
                            dropped_raw_due_to_ltm += 1
                    elif kind == "raw_message":
                        if sig in ltm_sig:
                            dropped_raw_due_to_ltm += 1
                            continue
    
                    content_norm = (content or "").strip()
                    key = (umo, entry_time, sender, content_norm)
                    if key in seen_keys:
                        dropped_duplicates += 1
                        continue
                    seen_keys.add(key)
    
                    item: Dict[str, Any] = {"kind": kind, "time": entry_time, "sender": sender, "content": content, "text": text, "raw": entry}
                    if level:
                        item["level"] = level
                    if message_id is not None:
                        item["message_id"] = message_id
                    if user_id is not None:
                        item["user_id"] = user_id
                    if group_id is not None:
                        item["group_id"] = group_id
    
                    if kind == "raw_message":
                        raw_sig_to_idx[sig] = len(kept)
                    kept.append(item)
    
                delta_items = [x for x in reversed(kept) if x is not None]
                if log_fallback_compaction is not None:
                    log_fallback_compaction = {
                        **log_fallback_compaction,
                        "live_scan_limit": scan_limit,
                        "live_dropped_duplicates": dropped_duplicates,
                        "live_dropped_raw_due_to_ltm": dropped_raw_due_to_ltm,
                    }
            except Exception as e:
                live_error = {
                    "status": "error",
                    "message": f"AstrBot API error: {e.response.status_code if hasattr(e, 'response') else 'Unknown'}",
                    "base_url": client.base_url,
                    "detail": _httpx_error_detail(e),
                    "hint": "Failed to fetch /api/live-log for wait_seconds mode.",
                }
    
        merged_history = (history + delta_items)[-max_messages:]
        levels = {it.get("level") for it in merged_history if it.get("level")}
        log_level: str | None = None
        if len(levels) == 1:
            log_level = next(iter(levels))
            for it in merged_history:
                it.pop("level", None)
            for it in history:
                it.pop("level", None)
            for it in delta_items:
                it.pop("level", None)
    
        sse_events: List[Dict[str, Any]] = [{"type": "snapshot", "data": history}]
        if delta_items:
            sse_events.append({"type": "delta", "data": delta_items})
    
        return {
            "status": "ok",
            "platform_id": resolved_platform_id,
            "message_type": message_type,
            "target_id": target_id,
            "umo": umo,
            "cid": cid,
            "wait_seconds": wait_seconds,
            "max_messages": max_messages,
            "poll_interval_seconds": poll_interval_seconds,
            "history_source": history_source,
            "log_fallback_used": log_fallback_used,
            "log_fallback_error": log_fallback_error,
            "log_fallback_compaction": log_fallback_compaction,
            "log_level": log_level,
            "live_log_error": live_error,
            "sse_events": sse_events,
            "history": merged_history,
        }
  • FastMCP server.tool() registration of the get_platform_session_messages handler from astrbot_tools module.
    server.tool(
        astrbot_tools.get_platform_session_messages,
        name="get_platform_session_messages",
    )
  • Import and re-export of the tool handler in the astrbot_tools namespace used by server.py.
    from .session_tools import get_platform_session_messages
  • Key helper function called by the handler to parse and extract relevant log entries for the target UMO.
    async def _fallback_history_from_logs(
        client: AstrBotClient,
        *,
        platform_id: str,
        message_type: str,
        target_id: str,
        max_messages: int,
    ) -> Tuple[List[Dict[str, Any]], Dict[str, Any] | None]:
        """
        Best-effort fallback: derive recent messages from AstrBot log broker history.
    
        This is a pragmatic workaround for cases where AstrBot does not persist platform
        conversations (conversation.history is empty) but logs still contain message events.
        """
        try:
            log_resp = await client.get_log_history()
        except Exception as e:
            return [], {
                "status": "error",
                "message": f"AstrBot API error: {e.response.status_code if hasattr(e, 'response') else 'Unknown'}",
                "base_url": client.base_url,
                "detail": _httpx_error_detail(e),
                "hint": "Failed to fetch /api/log-history for log fallback.",
            }
    
        if log_resp.get("status") != "ok":
            return [], {
                "status": log_resp.get("status"),
                "message": log_resp.get("message"),
                "raw": log_resp,
            }
    
        logs = (log_resp.get("data") or {}).get("logs", [])
        if not isinstance(logs, list) or not logs:
            return [], None
    
        umo = f"{platform_id}:{message_type}:{target_id}"
        target_id_int: int | None = int(target_id) if str(target_id).isdigit() else None
        scan_limit = min(len(logs), max(max_messages * 50, max_messages))
        tail = logs[-scan_limit:]
    
        kept: List[Dict[str, Any] | None] = []
        seen_keys: set[tuple] = set()
        raw_sig_to_idx: Dict[tuple, int] = {}
        ltm_sig: set[tuple] = set()
        dropped_duplicates = 0
        dropped_raw_due_to_ltm = 0
    
        for entry in reversed(tail):
            if not _log_entry_matches(
                entry,
                platform_id=platform_id,
                message_type=message_type,
                target_id=target_id,
                umo=umo,
            ):
                continue
    
            text = _strip_ansi(_extract_log_text(entry))
            entry_time = _extract_log_time(entry) or _extract_time_from_text(text)
            level = _extract_log_level(entry)
    
            kind = "log"
            sender: str | None = None
            content: str | None = None
            message_id: int | None = None
            user_id: int | None = None
            group_id: int | None = None
    
            ltm_parsed = _parse_ltm_line(text)
            if ltm_parsed and ltm_parsed.get("umo") == umo:
                kind = "ltm"
                sender = ltm_parsed.get("sender")
                content = ltm_parsed.get("content")
                entry_time = entry_time or ltm_parsed.get("time")
            else:
                raw_parsed = _parse_aiocqhttp_rawmessage(text)
                if raw_parsed and target_id_int is not None and raw_parsed.get("group_id") == target_id_int:
                    kind = "raw_message"
                    sender = raw_parsed.get("sender")
                    content = raw_parsed.get("content")
                    message_id = raw_parsed.get("message_id")
                    user_id = raw_parsed.get("user_id")
                    group_id = raw_parsed.get("group_id")
    
            # Dedup heuristics:
            # - Prefer ltm over raw_message when same (umo,time,sender).
            sig = (umo, entry_time, sender)
            if kind == "ltm":
                ltm_sig.add(sig)
                raw_idx = raw_sig_to_idx.pop(sig, None)
                if raw_idx is not None and 0 <= raw_idx < len(kept) and kept[raw_idx] is not None:
                    kept[raw_idx] = None
                    dropped_raw_due_to_ltm += 1
            elif kind == "raw_message":
                if sig in ltm_sig:
                    dropped_raw_due_to_ltm += 1
                    continue
    
            content_norm = (content or "").strip()
            key = (umo, entry_time, sender, content_norm)
            if key in seen_keys:
                dropped_duplicates += 1
                continue
            seen_keys.add(key)
    
            item: Dict[str, Any] = {"kind": kind, "time": entry_time, "sender": sender, "content": content, "text": text, "raw": entry}
            if level:
                item["level"] = level
            if message_id is not None:
                item["message_id"] = message_id
            if user_id is not None:
                item["user_id"] = user_id
            if group_id is not None:
                item["group_id"] = group_id
    
            if kind == "raw_message":
                raw_sig_to_idx[sig] = len(kept)
    
            kept.append(item)
    
        matched = [x for x in reversed(kept) if x is not None]
        compacted = matched[-max_messages:]
        compaction = {
            "scan_limit": scan_limit,
            "dropped_duplicates": dropped_duplicates,
            "dropped_raw_due_to_ltm": dropped_raw_due_to_ltm,
        }
        return compacted, {"status": "ok", "compaction": compaction}
Behavior4/5

Does the description disclose side effects, auth requirements, rate limits, or destructive behavior?

With no annotations provided, the description carries full burden and does well by explaining the data source rationale, polling behavior when wait_seconds > 0, and that it returns 'history items from the tail.' It doesn't mention rate limits, authentication needs, or error conditions, but provides substantial behavioral context beyond basic functionality.

Agents need to know what a tool does to the world before calling it. Descriptions should go beyond structured annotations to explain consequences.

Conciseness5/5

Is the description appropriately sized, front-loaded, and free of redundancy?

The description is efficiently structured with a clear purpose statement, usage rationale, and well-organized parameter documentation. Every sentence earns its place, with no redundant information. The parameter explanations are front-loaded with the most critical information first.

Shorter descriptions cost fewer tokens and are easier for agents to parse. Every sentence should earn its place.

Completeness5/5

Given the tool's complexity, does the description cover enough for an agent to succeed on first attempt?

Given the tool's complexity (6 parameters, polling behavior, multiple data sources) and the presence of an output schema (which handles return values), the description provides excellent context. It covers the why (source rationale), when (build-specific usage), how (polling behavior), and what (parameter semantics) without needing to duplicate output schema information.

Complex tools with many parameters or behaviors need more documentation. Simple tools need less. This dimension scales expectations accordingly.

Parameters5/5

Does the description clarify parameter syntax, constraints, interactions, or defaults beyond what the schema provides?

With 0% schema description coverage for 6 parameters, the description fully compensates by providing clear semantic explanations for all parameters: target_id examples, platform_id default behavior, message_type options and default, wait_seconds polling behavior, max_messages scope, and poll_interval_seconds usage context. This adds significant value beyond the bare schema.

Input schemas describe structure but not intent. Descriptions should explain non-obvious parameter relationships and valid value ranges.

Purpose5/5

Does the description clearly state what the tool does and how it differs from similar tools?

The description clearly states the specific action ('Get'), resource ('platform target's recent messages'), and source ('from AstrBot logs'). It explicitly distinguishes this tool from alternatives by explaining it uses '/api/log-history' instead of '/api/conversation/detail' for certain builds, making it distinct from any potential sibling tools that might use different endpoints.

Agents choose between tools based on descriptions. A clear purpose with a specific verb and resource helps agents select the right tool.

Usage Guidelines5/5

Does the description explain when to use this tool, when not to, or what alternatives exist?

The description provides explicit guidance on when to use this tool versus alternatives: 'This tool intentionally uses AstrBot's log broker history... since some AstrBot builds do not persist platform conversation history under /api/conversation/detail.' This clearly indicates this is the preferred tool for certain build configurations and implicitly suggests alternatives might exist for other scenarios.

Agents often have multiple tools that could apply. Explicit usage guidance like "use X instead of Y when Z" prevents misuse.

Install Server

Other Tools

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/xunxiing/astrbotmcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server