get_platform_session_messages
Retrieve recent chat messages from AstrBot logs for debugging and monitoring, using log history as the reliable source when conversation data is unavailable.
Instructions
Get a platform target's recent messages from AstrBot logs.
This tool intentionally uses AstrBot's log broker history (/api/log-history) as the source of truth, since some AstrBot builds do not persist platform conversation history under /api/conversation/detail for group/user targets.
Args:
target_id: Platform target ID (e.g. group_id like "1030223077").
platform_id: Optional platform id (e.g. "napcat"). If omitted, use the first enabled platform.
message_type: "GroupMessage" or "FriendMessage" (default: "GroupMessage").
wait_seconds: If > 0, poll and return SSE-like events for up to this many seconds.
max_messages: Max number of history items to return (from the tail).
poll_interval_seconds: Poll interval when wait_seconds > 0.
Input Schema
| Name | Required | Description | Default |
|---|---|---|---|
| target_id | Yes | ||
| platform_id | No | ||
| message_type | No | GroupMessage | |
| wait_seconds | No | ||
| max_messages | No | ||
| poll_interval_seconds | No |
Implementation Reference
- The core async handler function implementing the tool logic: fetches recent platform session messages from AstrBot's log history (/api/log-history) with fallback parsing, optional live polling, and compaction/deduping.async def get_platform_session_messages( target_id: str, platform_id: Optional[str] = None, message_type: str = "GroupMessage", wait_seconds: int = 0, max_messages: int = 50, poll_interval_seconds: float = 1.0, ) -> Dict[str, Any]: """ Get a platform target's recent messages from AstrBot logs. This tool intentionally uses AstrBot's log broker history (/api/log-history) as the source of truth, since some AstrBot builds do not persist platform conversation history under /api/conversation/detail for group/user targets. Args: - target_id: Platform target ID (e.g. group_id like "1030223077"). - platform_id: Optional platform id (e.g. "napcat"). If omitted, use the first enabled platform. - message_type: "GroupMessage" or "FriendMessage" (default: "GroupMessage"). - wait_seconds: If > 0, poll and return SSE-like events for up to this many seconds. - max_messages: Max number of history items to return (from the tail). - poll_interval_seconds: Poll interval when wait_seconds > 0. """ client = AstrBotClient.from_env() if not target_id or not str(target_id).strip(): return {"status": "error", "message": "Missing key: target_id"} if max_messages <= 0: return {"status": "error", "message": "max_messages must be > 0"} if max_messages > 5000: max_messages = 5000 if wait_seconds < 0: wait_seconds = 0 if poll_interval_seconds <= 0: poll_interval_seconds = 1.0 resolved_platform_id = platform_id if not resolved_platform_id: try: plist = await client.get_platform_list() except Exception as e: return { "status": "error", "message": f"AstrBot API error: {e.response.status_code if hasattr(e, 'response') else 'Unknown'}", "base_url": client.base_url, "detail": _httpx_error_detail(e), "hint": _astrbot_connect_hint(client), } if plist.get("status") != "ok": return { "status": plist.get("status"), "message": plist.get("message"), "raw": plist, } platforms = (plist.get("data") or {}).get("platforms", []) enabled = [p for p in platforms if p.get("enable") is True] if not enabled: return { "status": "error", "message": "No enabled platforms found; pass platform_id explicitly.", "platforms": platforms, } resolved_platform_id = str(enabled[0].get("id") or "").strip() or None if not resolved_platform_id: return { "status": "error", "message": "Failed to resolve platform_id from AstrBot platform list.", "platforms": platforms, } # Primary: use /api/log-history history_source = "astrbot_log" log_fallback_used = True cid: str | None = None umo = f"{resolved_platform_id}:{message_type}:{target_id}" history, history_meta = await _fallback_history_from_logs( client, platform_id=resolved_platform_id, message_type=message_type, target_id=target_id, max_messages=max_messages, ) log_fallback_error: Dict[str, Any] | None = None log_fallback_compaction: Dict[str, Any] | None = None if history_meta and history_meta.get("status") == "ok": log_fallback_compaction = (history_meta.get("compaction") or None) elif history_meta: log_fallback_error = history_meta # Optional: wait for new logs and return as delta live_error: Dict[str, Any] | None = None delta_items: List[Dict[str, Any]] = [] if wait_seconds > 0: try: live_events = await client.get_live_logs( wait_seconds=wait_seconds, max_events=min(max_messages * 50, 2000), ) # Reuse compaction by treating the live events list as a log tail. # Seed with existing keys to avoid duplicates. seed_seen: set[tuple] = set() seed_ltm: set[tuple] = set() seed_raw_sig_to_idx: Dict[tuple, int] = {} for it in history: t = it.get("time") s = it.get("sender") c = (it.get("content") or "").strip() seed_seen.add((umo, t, s, c)) if it.get("kind") == "ltm": seed_ltm.add((umo, t, s)) # Inline-compaction of live events (reverse scan to prefer ltm). scan_limit = len(live_events) kept: List[Dict[str, Any] | None] = [] raw_sig_to_idx = dict(seed_raw_sig_to_idx) ltm_sig = set(seed_ltm) seen_keys = set(seed_seen) dropped_duplicates = 0 dropped_raw_due_to_ltm = 0 target_id_int: int | None = int(target_id) if str(target_id).isdigit() else None for entry in reversed(live_events): if not _log_entry_matches( entry, platform_id=resolved_platform_id, message_type=message_type, target_id=target_id, umo=umo, ): continue text = _strip_ansi(_extract_log_text(entry)) entry_time = _extract_log_time(entry) or _extract_time_from_text(text) level = _extract_log_level(entry) kind = "log" sender: str | None = None content: str | None = None message_id: int | None = None user_id: int | None = None group_id: int | None = None ltm_parsed = _parse_ltm_line(text) if ltm_parsed and ltm_parsed.get("umo") == umo: kind = "ltm" sender = ltm_parsed.get("sender") content = ltm_parsed.get("content") entry_time = entry_time or ltm_parsed.get("time") else: raw_parsed = _parse_aiocqhttp_rawmessage(text) if ( raw_parsed and target_id_int is not None and raw_parsed.get("group_id") == target_id_int ): kind = "raw_message" sender = raw_parsed.get("sender") content = raw_parsed.get("content") message_id = raw_parsed.get("message_id") user_id = raw_parsed.get("user_id") group_id = raw_parsed.get("group_id") sig = (umo, entry_time, sender) if kind == "ltm": ltm_sig.add(sig) raw_idx = raw_sig_to_idx.pop(sig, None) if raw_idx is not None and 0 <= raw_idx < len(kept) and kept[raw_idx] is not None: kept[raw_idx] = None dropped_raw_due_to_ltm += 1 elif kind == "raw_message": if sig in ltm_sig: dropped_raw_due_to_ltm += 1 continue content_norm = (content or "").strip() key = (umo, entry_time, sender, content_norm) if key in seen_keys: dropped_duplicates += 1 continue seen_keys.add(key) item: Dict[str, Any] = {"kind": kind, "time": entry_time, "sender": sender, "content": content, "text": text, "raw": entry} if level: item["level"] = level if message_id is not None: item["message_id"] = message_id if user_id is not None: item["user_id"] = user_id if group_id is not None: item["group_id"] = group_id if kind == "raw_message": raw_sig_to_idx[sig] = len(kept) kept.append(item) delta_items = [x for x in reversed(kept) if x is not None] if log_fallback_compaction is not None: log_fallback_compaction = { **log_fallback_compaction, "live_scan_limit": scan_limit, "live_dropped_duplicates": dropped_duplicates, "live_dropped_raw_due_to_ltm": dropped_raw_due_to_ltm, } except Exception as e: live_error = { "status": "error", "message": f"AstrBot API error: {e.response.status_code if hasattr(e, 'response') else 'Unknown'}", "base_url": client.base_url, "detail": _httpx_error_detail(e), "hint": "Failed to fetch /api/live-log for wait_seconds mode.", } merged_history = (history + delta_items)[-max_messages:] levels = {it.get("level") for it in merged_history if it.get("level")} log_level: str | None = None if len(levels) == 1: log_level = next(iter(levels)) for it in merged_history: it.pop("level", None) for it in history: it.pop("level", None) for it in delta_items: it.pop("level", None) sse_events: List[Dict[str, Any]] = [{"type": "snapshot", "data": history}] if delta_items: sse_events.append({"type": "delta", "data": delta_items}) return { "status": "ok", "platform_id": resolved_platform_id, "message_type": message_type, "target_id": target_id, "umo": umo, "cid": cid, "wait_seconds": wait_seconds, "max_messages": max_messages, "poll_interval_seconds": poll_interval_seconds, "history_source": history_source, "log_fallback_used": log_fallback_used, "log_fallback_error": log_fallback_error, "log_fallback_compaction": log_fallback_compaction, "log_level": log_level, "live_log_error": live_error, "sse_events": sse_events, "history": merged_history, }
- astrbot_mcp/server.py:29-32 (registration)FastMCP server.tool() registration of the get_platform_session_messages handler from astrbot_tools module.server.tool( astrbot_tools.get_platform_session_messages, name="get_platform_session_messages", )
- astrbot_mcp/tools/__init__.py:26-26 (registration)Import and re-export of the tool handler in the astrbot_tools namespace used by server.py.from .session_tools import get_platform_session_messages
- Key helper function called by the handler to parse and extract relevant log entries for the target UMO.async def _fallback_history_from_logs( client: AstrBotClient, *, platform_id: str, message_type: str, target_id: str, max_messages: int, ) -> Tuple[List[Dict[str, Any]], Dict[str, Any] | None]: """ Best-effort fallback: derive recent messages from AstrBot log broker history. This is a pragmatic workaround for cases where AstrBot does not persist platform conversations (conversation.history is empty) but logs still contain message events. """ try: log_resp = await client.get_log_history() except Exception as e: return [], { "status": "error", "message": f"AstrBot API error: {e.response.status_code if hasattr(e, 'response') else 'Unknown'}", "base_url": client.base_url, "detail": _httpx_error_detail(e), "hint": "Failed to fetch /api/log-history for log fallback.", } if log_resp.get("status") != "ok": return [], { "status": log_resp.get("status"), "message": log_resp.get("message"), "raw": log_resp, } logs = (log_resp.get("data") or {}).get("logs", []) if not isinstance(logs, list) or not logs: return [], None umo = f"{platform_id}:{message_type}:{target_id}" target_id_int: int | None = int(target_id) if str(target_id).isdigit() else None scan_limit = min(len(logs), max(max_messages * 50, max_messages)) tail = logs[-scan_limit:] kept: List[Dict[str, Any] | None] = [] seen_keys: set[tuple] = set() raw_sig_to_idx: Dict[tuple, int] = {} ltm_sig: set[tuple] = set() dropped_duplicates = 0 dropped_raw_due_to_ltm = 0 for entry in reversed(tail): if not _log_entry_matches( entry, platform_id=platform_id, message_type=message_type, target_id=target_id, umo=umo, ): continue text = _strip_ansi(_extract_log_text(entry)) entry_time = _extract_log_time(entry) or _extract_time_from_text(text) level = _extract_log_level(entry) kind = "log" sender: str | None = None content: str | None = None message_id: int | None = None user_id: int | None = None group_id: int | None = None ltm_parsed = _parse_ltm_line(text) if ltm_parsed and ltm_parsed.get("umo") == umo: kind = "ltm" sender = ltm_parsed.get("sender") content = ltm_parsed.get("content") entry_time = entry_time or ltm_parsed.get("time") else: raw_parsed = _parse_aiocqhttp_rawmessage(text) if raw_parsed and target_id_int is not None and raw_parsed.get("group_id") == target_id_int: kind = "raw_message" sender = raw_parsed.get("sender") content = raw_parsed.get("content") message_id = raw_parsed.get("message_id") user_id = raw_parsed.get("user_id") group_id = raw_parsed.get("group_id") # Dedup heuristics: # - Prefer ltm over raw_message when same (umo,time,sender). sig = (umo, entry_time, sender) if kind == "ltm": ltm_sig.add(sig) raw_idx = raw_sig_to_idx.pop(sig, None) if raw_idx is not None and 0 <= raw_idx < len(kept) and kept[raw_idx] is not None: kept[raw_idx] = None dropped_raw_due_to_ltm += 1 elif kind == "raw_message": if sig in ltm_sig: dropped_raw_due_to_ltm += 1 continue content_norm = (content or "").strip() key = (umo, entry_time, sender, content_norm) if key in seen_keys: dropped_duplicates += 1 continue seen_keys.add(key) item: Dict[str, Any] = {"kind": kind, "time": entry_time, "sender": sender, "content": content, "text": text, "raw": entry} if level: item["level"] = level if message_id is not None: item["message_id"] = message_id if user_id is not None: item["user_id"] = user_id if group_id is not None: item["group_id"] = group_id if kind == "raw_message": raw_sig_to_idx[sig] = len(kept) kept.append(item) matched = [x for x in reversed(kept) if x is not None] compacted = matched[-max_messages:] compaction = { "scan_limit": scan_limit, "dropped_duplicates": dropped_duplicates, "dropped_raw_due_to_ltm": dropped_raw_due_to_ltm, } return compacted, {"status": "ok", "compaction": compaction}