aggregate_logs
Combine log data from JSON files by grouping entries based on criteria like level, module, function, or hour to analyze patterns and trends.
Instructions
Aggregate log data by specified criteria
Input Schema
TableJSON Schema
| Name | Required | Description | Default |
|---|---|---|---|
| files | No | Log files to analyze (default: all files) | |
| group_by | No | Field to group by | level |
Implementation Reference
- json_logs_mcp_server.py:165-220 (handler)The core handler function for the aggregate_logs tool. It reads specified log files, groups entries by the given field (level, module, function, or hour), and computes statistics like count, percentage, first/last seen for each group.def aggregate_logs(self, files: Optional[List[str]] = None, group_by: str = "level") -> Dict[str, Any]: """Aggregate log data by specified criteria""" if files is None: files = list(self.log_files_cache.keys()) all_entries = [] for filename in files: try: entries = self.read_log_file(filename) all_entries.extend(entries) except (FileNotFoundError, RuntimeError): continue # Group entries groups = {} for entry in all_entries: if group_by == "level": key = entry.get("level", "UNKNOWN") elif group_by == "module": key = entry.get("module", "UNKNOWN") elif group_by == "function": key = entry.get("function", "UNKNOWN") elif group_by == "hour": timestamp = entry.get("parsed_timestamp") if timestamp: key = timestamp.strftime("%Y-%m-%d %H:00") else: key = "UNKNOWN" else: key = entry.get(group_by, "UNKNOWN") if key not in groups: groups[key] = [] groups[key].append(entry) # Calculate statistics result = { "group_by": group_by, "total_entries": len(all_entries), "groups": {} } for key, entries in groups.items(): result["groups"][key] = { "count": len(entries), "percentage": round((len(entries) / len(all_entries)) * 100, 2) if all_entries else 0, "first_seen": min( e.get("parsed_timestamp", datetime.max) for e in entries).isoformat() if entries else None, "last_seen": max( e.get("parsed_timestamp", datetime.min) for e in entries).isoformat() if entries else None } return result
- json_logs_mcp_server.py:389-408 (registration)Registration of the aggregate_logs tool in the list_tools() function, including its name, description, and input schema definition.types.Tool( name="aggregate_logs", description="Aggregate log data by specified criteria", inputSchema={ "type": "object", "properties": { "files": { "type": "array", "items": {"type": "string"}, "description": "Log files to analyze (default: all files)" }, "group_by": { "type": "string", "enum": ["level", "module", "function", "hour"], "default": "level", "description": "Field to group by" } } } ),
- json_logs_mcp_server.py:450-457 (registration)Tool dispatch logic in call_tool() that handles calls to aggregate_logs by invoking the analyzer method and returning JSON-formatted results.elif name == "aggregate_logs": results = log_analyzer.aggregate_logs(**arguments) return [ types.TextContent( type="text", text=json.dumps(results, indent=2, default=str) ) ]
- json_logs_mcp_server.py:392-406 (schema)Pydantic-style input schema for the aggregate_logs tool, defining parameters files (array of strings) and group_by (enum with default).inputSchema={ "type": "object", "properties": { "files": { "type": "array", "items": {"type": "string"}, "description": "Log files to analyze (default: all files)" }, "group_by": { "type": "string", "enum": ["level", "module", "function", "hour"], "default": "level", "description": "Field to group by" } }