search_around
Retrieve log entries surrounding a specific timestamp in OpenObserve logs to analyze context and patterns.
Instructions
Fetch records around a specific log entry. key must be the target record's _timestamp value in microseconds. output_format can be 'records' or 'columns' for a more token-efficient table shape. record_profile can be 'generic' or 'kubernetes_compact'.
Input Schema
TableJSON Schema
| Name | Required | Description | Default |
|---|---|---|---|
| stream_name | Yes | ||
| key | Yes | ||
| size | No | ||
| regions | No | ||
| timeout | No | ||
| output_format | No | records | |
| record_profile | No | generic | |
| include_raw | No |
Implementation Reference
- The core client method that performs the API request to fetch records around a specific log entry.
def search_around( self, *, stream_name: str, key: int, size: int = 20, regions: str | None = None, timeout: int | None = None, ) -> Any: query: dict[str, str | int | float | bool] = { "key": key, "size": size, } if regions: query["regions"] = regions if timeout is not None: query["timeout"] = timeout return self.request_json( "GET", self._org_path("/api/{org_id}/{stream_name}/_around", stream_name=stream_name), query=query, ) - openobserve_mcp/server.py:123-146 (handler)The MCP tool registration and handler that interfaces with the client and formats the response.
@server.tool() def search_around( stream_name: str, key: int, size: int = 20, regions: str | None = None, timeout: int | None = None, output_format: str = "records", record_profile: str = "generic", include_raw: bool = False, ) -> dict[str, Any]: """Fetch records around a specific log entry. key must be the target record's _timestamp value in microseconds. output_format can be 'records' or 'columns' for a more token-efficient table shape. record_profile can be 'generic' or 'kubernetes_compact'.""" client = client_provider.get() raw = client.search_around( stream_name=stream_name, key=key, size=size, regions=regions, timeout=timeout, ) return build_search_around_result( org_id=client.resolve_org_id(), stream_name=stream_name, size=size, - Helper function that processes and formats the raw API results into a response compatible with MCP tool outputs.
def build_search_around_result( *, org_id: str, stream_name: str, size: int, raw: Any, output_format: str, record_profile: str, include_raw: bool, ) -> dict[str, Any]: hits = raw.get("hits", []) if isinstance(raw, dict) else [] records = [_apply_record_profile(summarize_search_record(hit), record_profile=record_profile) for hit in hits if isinstance(hit, dict)] result: dict[str, Any] = { "org_id": org_id, "stream_name": stream_name, "requested_size": size, "hit_count": len(hits), "output_format": _normalize_output_format(output_format), "record_profile": _normalize_record_profile(record_profile), } _attach_record_payload(result, records, output_format=output_format) return maybe_include_raw(result, raw, include_raw) def build_search_values_result( *, org_id: str, stream_name: str, fields: str, raw: Any, include_raw: bool, ) -> dict[str, Any]: