list_streams
List and filter streams in your OpenObserve organization. Specify stream type, keyword, pagination, and sorting to retrieve streams for monitoring or analysis.
Instructions
List streams available in the current organization.
Input Schema
| Name | Required | Description | Default |
|---|---|---|---|
| stream_type | No | logs | |
| keyword | No | ||
| offset | No | ||
| limit | No | ||
| sort | No | name | |
| include_raw | No |
Output Schema
| Name | Required | Description | Default |
|---|---|---|---|
No arguments | |||
Implementation Reference
- openobserve_mcp/server.py:79-101 (handler)The MCP tool handler for 'list_streams'. It is registered as a FastMCP tool, accepts parameters (stream_type, keyword, offset, limit, sort, include_raw), calls client.list_streams() and formats the result via build_list_streams_result().
def list_streams( stream_type: str = "logs", keyword: str = "", offset: int = 0, limit: int = 50, sort: str = "name", include_raw: bool = False, ) -> dict[str, Any]: """List streams available in the current organization.""" client = client_provider.get() raw = client.list_streams( stream_type=stream_type, keyword=keyword, offset=offset, limit=limit, sort=sort, ) return build_list_streams_result( org_id=client.resolve_org_id(), stream_type=stream_type, raw=raw, include_raw=include_raw, ) - The client-side method that sends the HTTP GET request to /api/{org_id}/streams with query parameters (type, keyword, offset, limit, sort). This is where the actual API call is made.
def list_streams( self, *, stream_type: str, keyword: str = "", offset: int = 0, limit: int = 50, sort: str = "name", ) -> Any: return self.request_json( "GET", self._org_path("/api/{org_id}/streams"), query={ "type": stream_type, "keyword": keyword, "offset": offset, "limit": limit, "sort": sort, }, ) - Helper function that shapes the raw API response into a compact result dict with org_id, stream_type, total, streams list (name, stream_type, storage_type, doc_num, doc_time_min, doc_time_max), and optionally raw data.
def build_list_streams_result( *, org_id: str, stream_type: str, raw: Any, include_raw: bool, ) -> dict[str, Any]: items = raw.get("list", []) if isinstance(raw, dict) else [] result: dict[str, Any] = { "org_id": org_id, "stream_type": stream_type, "total": raw.get("total") if isinstance(raw, dict) else None, "streams": [ { "name": item.get("name"), "stream_type": item.get("stream_type"), "storage_type": item.get("storage_type"), "doc_num": item.get("stats", {}).get("doc_num") if isinstance(item, dict) else None, "doc_time_min": item.get("stats", {}).get("doc_time_min") if isinstance(item, dict) else None, "doc_time_max": item.get("stats", {}).get("doc_time_max") if isinstance(item, dict) else None, } for item in items ], } return maybe_include_raw(result, raw, include_raw) - openobserve_mcp/server.py:78-81 (registration)The tool is registered with FastMCP via the @server.tool() decorator on line 78, which makes 'list_streams' available as an MCP tool.
@server.tool() def list_streams( stream_type: str = "logs", keyword: str = "",