aggregate_logs
Group and analyze JSON log files by level, module, function, or hour to identify patterns and insights efficiently.
Instructions
Aggregate log data by specified criteria
Input Schema
TableJSON Schema
| Name | Required | Description | Default |
|---|---|---|---|
| files | No | Log files to analyze (default: all files) | |
| group_by | No | Field to group by | level |
Implementation Reference
- json_logs_mcp_server.py:165-220 (handler)The core handler function in JsonLogAnalyzer class that aggregates log entries from specified files, groups them by the given criteria (level, module, function, hour, or other), and computes statistics including count, percentage, first_seen, and last_seen for each group.def aggregate_logs(self, files: Optional[List[str]] = None, group_by: str = "level") -> Dict[str, Any]: """Aggregate log data by specified criteria""" if files is None: files = list(self.log_files_cache.keys()) all_entries = [] for filename in files: try: entries = self.read_log_file(filename) all_entries.extend(entries) except (FileNotFoundError, RuntimeError): continue # Group entries groups = {} for entry in all_entries: if group_by == "level": key = entry.get("level", "UNKNOWN") elif group_by == "module": key = entry.get("module", "UNKNOWN") elif group_by == "function": key = entry.get("function", "UNKNOWN") elif group_by == "hour": timestamp = entry.get("parsed_timestamp") if timestamp: key = timestamp.strftime("%Y-%m-%d %H:00") else: key = "UNKNOWN" else: key = entry.get(group_by, "UNKNOWN") if key not in groups: groups[key] = [] groups[key].append(entry) # Calculate statistics result = { "group_by": group_by, "total_entries": len(all_entries), "groups": {} } for key, entries in groups.items(): result["groups"][key] = { "count": len(entries), "percentage": round((len(entries) / len(all_entries)) * 100, 2) if all_entries else 0, "first_seen": min( e.get("parsed_timestamp", datetime.max) for e in entries).isoformat() if entries else None, "last_seen": max( e.get("parsed_timestamp", datetime.min) for e in entries).isoformat() if entries else None } return result
- json_logs_mcp_server.py:393-407 (schema)The input schema for the aggregate_logs tool defining parameters: files (array of strings, optional) and group_by (string enum: level/module/function/hour, default level)."type": "object", "properties": { "files": { "type": "array", "items": {"type": "string"}, "description": "Log files to analyze (default: all files)" }, "group_by": { "type": "string", "enum": ["level", "module", "function", "hour"], "default": "level", "description": "Field to group by" } } }
- json_logs_mcp_server.py:390-409 (registration)Registration of the aggregate_logs tool in the list_tools() handler, providing name, description, and input schema.name="aggregate_logs", description="Aggregate log data by specified criteria", inputSchema={ "type": "object", "properties": { "files": { "type": "array", "items": {"type": "string"}, "description": "Log files to analyze (default: all files)" }, "group_by": { "type": "string", "enum": ["level", "module", "function", "hour"], "default": "level", "description": "Field to group by" } } } ), types.Tool(
- json_logs_mcp_server.py:450-457 (registration)Dispatch logic in the call_tool() handler that matches the tool name and invokes the aggregate_logs method with arguments, returning JSON serialized results.elif name == "aggregate_logs": results = log_analyzer.aggregate_logs(**arguments) return [ types.TextContent( type="text", text=json.dumps(results, indent=2, default=str) ) ]