aggregate_logs
Aggregate log entries by specified field and compute metrics such as count, average, sum, or percentile over a defined time range and streams.
Input Schema
| Name | Required | Description | Default |
|---|---|---|---|
| aggregation | Yes |
Output Schema
| Name | Required | Description | Default |
|---|---|---|---|
No arguments | |||
Implementation Reference
- mcp_graylog/server.py:43-44 (handler)The 'aggregate_logs' tool handler function in ToolHandlers class. It delegates to GraylogClient.aggregate() passing the typed AggregateLogsInput model.
def aggregate_logs(self, aggregation: AggregateLogsInput) -> dict[str, Any]: return self.graylog.aggregate(aggregation) - mcp_graylog/models.py:78-143 (schema)The AggregateLogsInput Pydantic model - defines schema/validation for the aggregate_logs tool input, including query, timerange, streams, field, metric, metric_field, percentile, and limit fields with validation logic.
class AggregateLogsInput(BaseModel): query: str = Field("*", min_length=1) timerange: TimeRange = Field( default_factory=lambda: RelativeTimeRange.model_validate({}) ) streams: list[str] = Field(default_factory=list) field: str = Field(..., min_length=1) metric: Literal[ "average", "avg", "count", "latest", "max", "min", "percentile", "stdDev", "sum", "sumOfSquares", "variance", ] = "count" metric_field: str | None = Field( None, description="Target field for non-count Graylog aggregation metrics.", ) percentile: float | None = Field( None, ge=0, le=100, description="Percentile value used when metric is percentile.", ) limit: int = Field(10, ge=1, le=100) @field_validator("query") @classmethod def strip_query(cls, value: str) -> str: stripped = value.strip() if not stripped: raise ValueError("query must not be empty") return stripped @model_validator(mode="after") def validate_metric_configuration(self) -> "AggregateLogsInput": if self.metric != "count" and not self.metric_field: raise ValueError("metric_field is required for non-count metrics") if self.metric == "percentile" and self.percentile is None: raise ValueError("percentile is required for percentile metrics") if self.metric != "percentile" and self.percentile is not None: raise ValueError("percentile is only valid for percentile metrics") return self def to_graylog_payload(self) -> dict[str, object]: metric: dict[str, object] = {"function": self.metric} if self.metric_field: metric["field"] = self.metric_field if self.percentile is not None: metric["configuration"] = {"percentile": self.percentile} payload: dict[str, object] = { "query": self.query, "timerange": self.timerange.to_graylog(), "group_by": [{"field": self.field, "limit": self.limit}], "metrics": [metric], } if self.streams: payload["streams"] = list(self.streams) return payload - mcp_graylog/server.py:128-130 (registration)Registration of aggregate_logs as an MCP tool on the FastMCP server. Line 130: mcp.tool()(handlers.aggregate_logs)
mcp.tool()(handlers.search_logs) mcp.tool()(handlers.search_stream_logs) mcp.tool()(handlers.aggregate_logs) - mcp_graylog/graylog_client.py:67-72 (helper)The GraylogClient.aggregate() method that makes the actual HTTP POST request to Graylog's /api/search/aggregate endpoint using the payload built by AggregateLogsInput.to_graylog_payload().
def aggregate(self, aggregation: AggregateLogsInput) -> dict[str, Any]: return self._request( "POST", "/api/search/aggregate", json=aggregation.to_graylog_payload(), )