search_logs
Search OpenObserve logs using full SQL queries. Specify time range, limit, offset, and output format. Optionally reduce token usage with columnar output or trim Kubernetes metadata with compact profile.
Instructions
Run a full SQL search against OpenObserve logs. Example row query: SELECT _timestamp, message FROM "my_stream" ORDER BY _timestamp DESC LIMIT 20. Example aggregate query: SELECT level, count(*) AS cnt FROM "my_stream" GROUP BY level ORDER BY cnt DESC LIMIT 20. Prefer double quotes around stream names in SQL when in doubt, and confirm actual field names with get_stream_schema instead of assuming a log column. start_time and end_time accept Unix timestamps in seconds, milliseconds, microseconds, or nanoseconds and are normalized to microseconds. The limit parameter sets the API page size; if your OpenObserve/DataFusion setup still complains about ORDER BY without a SQL LIMIT, add an explicit LIMIT to the SQL as well. output_format can be 'records' or 'columns'; 'columns' is especially useful for wide SELECT * queries and can save roughly 35-40% tokens. record_profile can be 'generic' or 'kubernetes_compact'; the Kubernetes compact profile trims common noisy metadata fields such as pod labels and pod IP metadata.
Input Schema
| Name | Required | Description | Default |
|---|---|---|---|
| sql | Yes | ||
| start_time | Yes | ||
| end_time | Yes | ||
| limit | No | ||
| offset | No | ||
| use_cache | No | ||
| timeout | No | ||
| output_format | No | records | |
| record_profile | No | generic | |
| include_raw | No |
Output Schema
| Name | Required | Description | Default |
|---|---|---|---|
No arguments | |||
Implementation Reference
- openobserve_mcp/server.py:120-151 (handler)The search_logs tool handler function. It is registered as a FastMCP tool, accepts SQL query, time range, limit, offset, output_format, record_profile, etc., normalizes timestamps, calls client.search_sql(), and returns result via build_search_logs_result().
@server.tool() def search_logs( sql: str, start_time: int, end_time: int, limit: int = 100, offset: int = 0, use_cache: bool = False, timeout: int | None = None, output_format: str = "records", record_profile: str = "generic", include_raw: bool = False, ) -> dict[str, Any]: """Run a full SQL search against OpenObserve logs. Example row query: SELECT _timestamp, message FROM "my_stream" ORDER BY _timestamp DESC LIMIT 20. Example aggregate query: SELECT level, count(*) AS cnt FROM "my_stream" GROUP BY level ORDER BY cnt DESC LIMIT 20. Prefer double quotes around stream names in SQL when in doubt, and confirm actual field names with get_stream_schema instead of assuming a `log` column. start_time and end_time accept Unix timestamps in seconds, milliseconds, microseconds, or nanoseconds and are normalized to microseconds. The limit parameter sets the API page size; if your OpenObserve/DataFusion setup still complains about ORDER BY without a SQL LIMIT, add an explicit LIMIT to the SQL as well. output_format can be 'records' or 'columns'; 'columns' is especially useful for wide SELECT * queries and can save roughly 35-40% tokens. record_profile can be 'generic' or 'kubernetes_compact'; the Kubernetes compact profile trims common noisy metadata fields such as pod labels and pod IP metadata.""" client = client_provider.get() start_time, end_time = _normalize_time_range(start_time, end_time) raw = client.search_sql( sql=sql, start_time=start_time, end_time=end_time, offset=offset, limit=limit, use_cache=use_cache, timeout=timeout, ) return build_search_logs_result( org_id=client.resolve_org_id(), raw=raw, output_format=output_format, record_profile=record_profile, include_raw=include_raw, ) - The build_search_logs_result() helper that shapes the API response into a compact result dict with metadata (took, total, scan_records, cached_ratio, hit_count) and records/columns payload, applying record profile and output format normalization.
def build_search_logs_result( *, org_id: str, raw: Any, output_format: str, record_profile: str, include_raw: bool, ) -> dict[str, Any]: hits = raw.get("hits", []) if isinstance(raw, dict) else [] records = [_apply_record_profile(summarize_search_record(hit), record_profile=record_profile) for hit in hits if isinstance(hit, dict)] result: dict[str, Any] = { "org_id": org_id, "took": raw.get("took") if isinstance(raw, dict) else None, "total": raw.get("total") if isinstance(raw, dict) else None, "scan_records": raw.get("scan_records") if isinstance(raw, dict) else None, "cached_ratio": raw.get("cached_ratio") if isinstance(raw, dict) else None, "hit_count": len(hits), "output_format": _normalize_output_format(output_format), "record_profile": _normalize_record_profile(record_profile), } _attach_record_payload(result, records, output_format=output_format) return maybe_include_raw(result, raw, include_raw) - openobserve_mcp/server.py:120-151 (registration)The search_logs tool is registered via @server.tool() decorator on the search_logs function at line 78 (server creation blocks) within create_server().
@server.tool() def search_logs( sql: str, start_time: int, end_time: int, limit: int = 100, offset: int = 0, use_cache: bool = False, timeout: int | None = None, output_format: str = "records", record_profile: str = "generic", include_raw: bool = False, ) -> dict[str, Any]: """Run a full SQL search against OpenObserve logs. Example row query: SELECT _timestamp, message FROM "my_stream" ORDER BY _timestamp DESC LIMIT 20. Example aggregate query: SELECT level, count(*) AS cnt FROM "my_stream" GROUP BY level ORDER BY cnt DESC LIMIT 20. Prefer double quotes around stream names in SQL when in doubt, and confirm actual field names with get_stream_schema instead of assuming a `log` column. start_time and end_time accept Unix timestamps in seconds, milliseconds, microseconds, or nanoseconds and are normalized to microseconds. The limit parameter sets the API page size; if your OpenObserve/DataFusion setup still complains about ORDER BY without a SQL LIMIT, add an explicit LIMIT to the SQL as well. output_format can be 'records' or 'columns'; 'columns' is especially useful for wide SELECT * queries and can save roughly 35-40% tokens. record_profile can be 'generic' or 'kubernetes_compact'; the Kubernetes compact profile trims common noisy metadata fields such as pod labels and pod IP metadata.""" client = client_provider.get() start_time, end_time = _normalize_time_range(start_time, end_time) raw = client.search_sql( sql=sql, start_time=start_time, end_time=end_time, offset=offset, limit=limit, use_cache=use_cache, timeout=timeout, ) return build_search_logs_result( org_id=client.resolve_org_id(), raw=raw, output_format=output_format, record_profile=record_profile, include_raw=include_raw, ) - The search_sql() method on OpenObserveClient that makes the POST request to /api/{org_id}/_search with the SQL query, time range, pagination, and cache settings.
def search_sql( self, *, sql: str, start_time: int, end_time: int, offset: int = 0, limit: int = 100, use_cache: bool = False, timeout: int | None = None, ) -> Any: body: dict[str, Any] = { "query": { "sql": sql, "start_time": start_time, "end_time": end_time, "from": offset, "size": limit, }, "use_cache": use_cache, } if timeout is not None: body["timeout"] = timeout return self.request_json( "POST", self._org_path("/api/{org_id}/_search"), query={ "is_ui_histogram": "false", "is_multi_stream_search": "false", "validate": "false", }, json_body=body, ) - The _apply_record_profile() helper used within build_search_logs_result to trim Kubernetes noisy metadata fields when record_profile='kubernetes_compact'.
def _apply_record_profile(record: dict[str, Any], *, record_profile: str) -> dict[str, Any]: normalized = _normalize_record_profile(record_profile) if normalized == "generic": return record compact_record: dict[str, Any] = {} for key, value in record.items(): if key in _KUBERNETES_COMPACT_DROP_FIELDS: continue if any(key.startswith(prefix) for prefix in _KUBERNETES_COMPACT_DROP_PREFIXES): continue compact_record[key] = value return compact_record