Skip to main content
Glama

aggregate_logs

Group and analyze JSON log files by level, module, function, or hour to identify patterns and insights efficiently.

Instructions

Aggregate log data by specified criteria

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
filesNoLog files to analyze (default: all files)
group_byNoField to group bylevel

Implementation Reference

  • The core handler function in JsonLogAnalyzer class that aggregates log entries from specified files, groups them by the given criteria (level, module, function, hour, or other), and computes statistics including count, percentage, first_seen, and last_seen for each group.
    def aggregate_logs(self, files: Optional[List[str]] = None, group_by: str = "level") -> Dict[str, Any]: """Aggregate log data by specified criteria""" if files is None: files = list(self.log_files_cache.keys()) all_entries = [] for filename in files: try: entries = self.read_log_file(filename) all_entries.extend(entries) except (FileNotFoundError, RuntimeError): continue # Group entries groups = {} for entry in all_entries: if group_by == "level": key = entry.get("level", "UNKNOWN") elif group_by == "module": key = entry.get("module", "UNKNOWN") elif group_by == "function": key = entry.get("function", "UNKNOWN") elif group_by == "hour": timestamp = entry.get("parsed_timestamp") if timestamp: key = timestamp.strftime("%Y-%m-%d %H:00") else: key = "UNKNOWN" else: key = entry.get(group_by, "UNKNOWN") if key not in groups: groups[key] = [] groups[key].append(entry) # Calculate statistics result = { "group_by": group_by, "total_entries": len(all_entries), "groups": {} } for key, entries in groups.items(): result["groups"][key] = { "count": len(entries), "percentage": round((len(entries) / len(all_entries)) * 100, 2) if all_entries else 0, "first_seen": min( e.get("parsed_timestamp", datetime.max) for e in entries).isoformat() if entries else None, "last_seen": max( e.get("parsed_timestamp", datetime.min) for e in entries).isoformat() if entries else None } return result
  • The input schema for the aggregate_logs tool defining parameters: files (array of strings, optional) and group_by (string enum: level/module/function/hour, default level).
    "type": "object", "properties": { "files": { "type": "array", "items": {"type": "string"}, "description": "Log files to analyze (default: all files)" }, "group_by": { "type": "string", "enum": ["level", "module", "function", "hour"], "default": "level", "description": "Field to group by" } } }
  • Registration of the aggregate_logs tool in the list_tools() handler, providing name, description, and input schema.
    name="aggregate_logs", description="Aggregate log data by specified criteria", inputSchema={ "type": "object", "properties": { "files": { "type": "array", "items": {"type": "string"}, "description": "Log files to analyze (default: all files)" }, "group_by": { "type": "string", "enum": ["level", "module", "function", "hour"], "default": "level", "description": "Field to group by" } } } ), types.Tool(
  • Dispatch logic in the call_tool() handler that matches the tool name and invokes the aggregate_logs method with arguments, returning JSON serialized results.
    elif name == "aggregate_logs": results = log_analyzer.aggregate_logs(**arguments) return [ types.TextContent( type="text", text=json.dumps(results, indent=2, default=str) ) ]

Other Tools

Related Tools

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/mfreeman451/json-logs-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server