Skip to main content
Glama

aggregate_logs

Combine log data from JSON files by grouping entries based on criteria like level, module, function, or hour to analyze patterns and trends.

Instructions

Aggregate log data by specified criteria

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
filesNoLog files to analyze (default: all files)
group_byNoField to group bylevel

Implementation Reference

  • The core handler function for the aggregate_logs tool. It reads specified log files, groups entries by the given field (level, module, function, or hour), and computes statistics like count, percentage, first/last seen for each group.
    def aggregate_logs(self, files: Optional[List[str]] = None, group_by: str = "level") -> Dict[str, Any]: """Aggregate log data by specified criteria""" if files is None: files = list(self.log_files_cache.keys()) all_entries = [] for filename in files: try: entries = self.read_log_file(filename) all_entries.extend(entries) except (FileNotFoundError, RuntimeError): continue # Group entries groups = {} for entry in all_entries: if group_by == "level": key = entry.get("level", "UNKNOWN") elif group_by == "module": key = entry.get("module", "UNKNOWN") elif group_by == "function": key = entry.get("function", "UNKNOWN") elif group_by == "hour": timestamp = entry.get("parsed_timestamp") if timestamp: key = timestamp.strftime("%Y-%m-%d %H:00") else: key = "UNKNOWN" else: key = entry.get(group_by, "UNKNOWN") if key not in groups: groups[key] = [] groups[key].append(entry) # Calculate statistics result = { "group_by": group_by, "total_entries": len(all_entries), "groups": {} } for key, entries in groups.items(): result["groups"][key] = { "count": len(entries), "percentage": round((len(entries) / len(all_entries)) * 100, 2) if all_entries else 0, "first_seen": min( e.get("parsed_timestamp", datetime.max) for e in entries).isoformat() if entries else None, "last_seen": max( e.get("parsed_timestamp", datetime.min) for e in entries).isoformat() if entries else None } return result
  • Registration of the aggregate_logs tool in the list_tools() function, including its name, description, and input schema definition.
    types.Tool( name="aggregate_logs", description="Aggregate log data by specified criteria", inputSchema={ "type": "object", "properties": { "files": { "type": "array", "items": {"type": "string"}, "description": "Log files to analyze (default: all files)" }, "group_by": { "type": "string", "enum": ["level", "module", "function", "hour"], "default": "level", "description": "Field to group by" } } } ),
  • Tool dispatch logic in call_tool() that handles calls to aggregate_logs by invoking the analyzer method and returning JSON-formatted results.
    elif name == "aggregate_logs": results = log_analyzer.aggregate_logs(**arguments) return [ types.TextContent( type="text", text=json.dumps(results, indent=2, default=str) ) ]
  • Pydantic-style input schema for the aggregate_logs tool, defining parameters files (array of strings) and group_by (enum with default).
    inputSchema={ "type": "object", "properties": { "files": { "type": "array", "items": {"type": "string"}, "description": "Log files to analyze (default: all files)" }, "group_by": { "type": "string", "enum": ["level", "module", "function", "hour"], "default": "level", "description": "Field to group by" } }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/mfreeman451/json-logs-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server