search_around
Fetch records surrounding a specific log entry using its timestamp. Supports Unix timestamps in seconds, milliseconds, microseconds, or nanoseconds. Returns nearby rows with configurable output format and record profile.
Instructions
Fetch records around a specific log entry. key accepts Unix timestamps in seconds, milliseconds, microseconds, or nanoseconds for convenience, but the best input is the exact _timestamp returned by search_logs; otherwise OpenObserve may return no nearby rows. output_format can be 'records' or 'columns' for a more token-efficient table shape. record_profile can be 'generic' or 'kubernetes_compact'.
Input Schema
| Name | Required | Description | Default |
|---|---|---|---|
| stream_name | Yes | ||
| key | Yes | ||
| size | No | ||
| regions | No | ||
| timeout | No | ||
| output_format | No | records | |
| record_profile | No | generic | |
| include_raw | No |
Output Schema
| Name | Required | Description | Default |
|---|---|---|---|
No arguments | |||
Implementation Reference
- openobserve_mcp/server.py:153-182 (registration)The MCP tool 'search_around' is registered via @server.tool() decorator on the search_around function in create_server(). It accepts parameters like stream_name, key, size, regions, timeout, output_format, record_profile, and include_raw.
@server.tool() def search_around( stream_name: str, key: int, size: int = 20, regions: str | None = None, timeout: int | None = None, output_format: str = "records", record_profile: str = "generic", include_raw: bool = False, ) -> dict[str, Any]: """Fetch records around a specific log entry. key accepts Unix timestamps in seconds, milliseconds, microseconds, or nanoseconds for convenience, but the best input is the exact `_timestamp` returned by search_logs; otherwise OpenObserve may return no nearby rows. output_format can be 'records' or 'columns' for a more token-efficient table shape. record_profile can be 'generic' or 'kubernetes_compact'.""" client = client_provider.get() key = _normalize_unix_timestamp(key, field_name="key") raw = client.search_around( stream_name=stream_name, key=key, size=size, regions=regions, timeout=timeout, ) return build_search_around_result( org_id=client.resolve_org_id(), stream_name=stream_name, size=size, raw=raw, output_format=output_format, record_profile=record_profile, include_raw=include_raw, ) - openobserve_mcp/server.py:154-182 (handler)The handler function that executes the tool logic: normalizes the key timestamp, calls client.search_around(), and returns build_search_around_result().
def search_around( stream_name: str, key: int, size: int = 20, regions: str | None = None, timeout: int | None = None, output_format: str = "records", record_profile: str = "generic", include_raw: bool = False, ) -> dict[str, Any]: """Fetch records around a specific log entry. key accepts Unix timestamps in seconds, milliseconds, microseconds, or nanoseconds for convenience, but the best input is the exact `_timestamp` returned by search_logs; otherwise OpenObserve may return no nearby rows. output_format can be 'records' or 'columns' for a more token-efficient table shape. record_profile can be 'generic' or 'kubernetes_compact'.""" client = client_provider.get() key = _normalize_unix_timestamp(key, field_name="key") raw = client.search_around( stream_name=stream_name, key=key, size=size, regions=regions, timeout=timeout, ) return build_search_around_result( org_id=client.resolve_org_id(), stream_name=stream_name, size=size, raw=raw, output_format=output_format, record_profile=record_profile, include_raw=include_raw, ) - Client method search_around() that makes an HTTP GET request to the OpenObserve API endpoint /api/{org_id}/{stream_name}/_around with query parameters key, size, regions, and timeout.
def search_around( self, *, stream_name: str, key: int, size: int = 20, regions: str | None = None, timeout: int | None = None, ) -> Any: query: dict[str, str | int | float | bool] = { "key": key, "size": size, } if regions: query["regions"] = regions if timeout is not None: query["timeout"] = timeout return self.request_json( "GET", self._org_path("/api/{org_id}/{stream_name}/_around", stream_name=stream_name), query=query, ) - Result builder function build_search_around_result() that formats the API response into a structured dict with org_id, stream_name, requested_size, hit_count, output_format, record_profile, and record/columnar payload.
def build_search_around_result( *, org_id: str, stream_name: str, size: int, raw: Any, output_format: str, record_profile: str, include_raw: bool, ) -> dict[str, Any]: hits = raw.get("hits", []) if isinstance(raw, dict) else [] records = [_apply_record_profile(summarize_search_record(hit), record_profile=record_profile) for hit in hits if isinstance(hit, dict)] result: dict[str, Any] = { "org_id": org_id, "stream_name": stream_name, "requested_size": size, "hit_count": len(hits), "output_format": _normalize_output_format(output_format), "record_profile": _normalize_record_profile(record_profile), } _attach_record_payload(result, records, output_format=output_format) return maybe_include_raw(result, raw, include_raw) - openobserve_mcp/server.py:154-162 (schema)Input parameters defined as function arguments to the search_around tool: stream_name (str), key (int), size (int=20), regions (str|None), timeout (int|None), output_format (str='records'), record_profile (str='generic'), include_raw (bool=False).
def search_around( stream_name: str, key: int, size: int = 20, regions: str | None = None, timeout: int | None = None, output_format: str = "records", record_profile: str = "generic", include_raw: bool = False,