search_stream_logs
Search and filter log messages within specific Graylog streams using Elasticsearch query syntax, time ranges, and field selections to quickly find relevant log data.
Input Schema
| Name | Required | Description | Default |
|---|---|---|---|
| stream_id | Yes | ||
| search | Yes |
Output Schema
| Name | Required | Description | Default |
|---|---|---|---|
No arguments | |||
Implementation Reference
- mcp_graylog/server.py:33-41 (handler)The `search_stream_logs` handler function. It takes a stream_id and a MessageSearchInput, validates the stream_id, creates a copy of the search with the stream_id set, and delegates to graylog.search_messages().
def search_stream_logs( self, stream_id: str, search: MessageSearchInput ) -> dict[str, Any]: clean_stream_id = stream_id.strip() if not clean_stream_id: raise ValueError("stream_id must not be empty") stream_search = search.model_copy(update={"streams": [clean_stream_id]}) return self.graylog.search_messages(stream_search) - mcp_graylog/models.py:44-75 (schema)The `MessageSearchInput` Pydantic model used as input schema for search_stream_logs. Defines fields: query, timerange, streams, fields, limit, offset.
class MessageSearchInput(BaseModel): query: str = Field("*", min_length=1) timerange: TimeRange = Field( default_factory=lambda: RelativeTimeRange.model_validate({}) ) streams: list[str] = Field(default_factory=list) fields: list[str] = Field( default_factory=lambda: ["timestamp", "source", "level", "message"] ) limit: int = Field(50, ge=1, le=1000) offset: int = Field(0, ge=0) @field_validator("query") @classmethod def strip_query(cls, value: str) -> str: stripped = value.strip() if not stripped: raise ValueError("query must not be empty") return stripped def to_graylog_payload(self) -> dict[str, object]: payload: dict[str, object] = { "query": self.query, "timerange": self.timerange.to_graylog(), "size": self.limit, "from": self.offset, } if self.streams: payload["streams"] = list(self.streams) if self.fields: payload["fields"] = list(self.fields) return payload - mcp_graylog/server.py:129-129 (registration)Registration of `search_stream_logs` as an MCP tool via `mcp.tool()(handlers.search_stream_logs)`.
mcp.tool()(handlers.search_stream_logs) - mcp_graylog/graylog_client.py:60-60 (helper)The `search_messages` method on GraylogClient that the handler ultimately delegates to for executing the search.
def search_messages(self, search: MessageSearchInput) -> dict[str, Any]: - tests/test_server_tools.py:61-78 (helper)Test verifying that search_stream_logs correctly adds a stream without mutating the original search input.
def test_search_stream_logs_adds_stream_without_mutating_original_search() -> None: fake = FakeGraylogClient() handlers = create_tool_handlers(fake) search = MessageSearchInput( query="source:api", streams=["original"], limit=50, offset=0, ) result = handlers.search_stream_logs("stream-1", search) assert result["total_results"] == 1 assert search.streams == ["original"] assert fake.search_input is not search assert fake.search_input is not None assert fake.search_input.streams == ["stream-1"] assert fake.search_input.query == "source:api"