query_logs
Search and filter JSON log entries by level, module, function, message content, or time range across specified log files to extract precise data efficiently.
Instructions
Search and filter log entries across log files
Input Schema
TableJSON Schema
| Name | Required | Description | Default |
|---|---|---|---|
| end_time | No | End time filter (ISO format) | |
| files | No | Log files to search (default: all files) | |
| function | No | Filter by function name | |
| level | No | Filter by log level (DEBUG, INFO, WARNING, ERROR, CRITICAL) | |
| limit | No | Maximum number of results | |
| message_contains | No | Filter by message content (case-insensitive) | |
| module | No | Filter by module name | |
| start_time | No | Start time filter (ISO format) |
Implementation Reference
- json_logs_mcp_server.py:99-163 (handler)Core handler function in JsonLogAnalyzer that implements the query_logs tool logic: loads log entries from specified files, applies multiple filters (level, module, function, message_contains, time range), sorts by newest timestamp first, limits results, and returns filtered log entries.def query_logs(self, files: Optional[List[str]] = None, level: Optional[str] = None, module: Optional[str] = None, function: Optional[str] = None, message_contains: Optional[str] = None, start_time: Optional[str] = None, end_time: Optional[str] = None, limit: int = 100) -> List[Dict[str, Any]]: """Query logs with various filters""" # Determine which files to search if files is None: files = list(self.log_files_cache.keys()) all_entries = [] for filename in files: try: entries = self.read_log_file(filename) all_entries.extend(entries) except (FileNotFoundError, RuntimeError): continue # Apply filters filtered_entries = [] for entry in all_entries: # Level filter if level and entry.get("level", "").upper() != level.upper(): continue # Module filter if module and entry.get("module", "") != module: continue # Function filter if function and entry.get("function", "") != function: continue # Message contains filter if message_contains and message_contains.lower() not in entry.get("message", "").lower(): continue # Time range filters timestamp = entry.get("parsed_timestamp") if start_time: try: start_dt = datetime.fromisoformat(start_time) if timestamp and timestamp < start_dt: continue except ValueError: pass if end_time: try: end_dt = datetime.fromisoformat(end_time) if timestamp and timestamp > end_dt: continue except ValueError: pass filtered_entries.append(entry) # Sort by timestamp (newest first) and limit filtered_entries.sort(key=lambda x: x.get("parsed_timestamp", datetime.min), reverse=True) return filtered_entries[:limit]
- json_logs_mcp_server.py:349-387 (schema)Input schema for the query_logs tool, defining optional parameters for filtering logs: files list, level, module, function, message_contains, start_time, end_time, and limit.inputSchema={ "type": "object", "properties": { "files": { "type": "array", "items": {"type": "string"}, "description": "Log files to search (default: all files)" }, "level": { "type": "string", "description": "Filter by log level (DEBUG, INFO, WARNING, ERROR, CRITICAL)" }, "module": { "type": "string", "description": "Filter by module name" }, "function": { "type": "string", "description": "Filter by function name" }, "message_contains": { "type": "string", "description": "Filter by message content (case-insensitive)" }, "start_time": { "type": "string", "description": "Start time filter (ISO format)" }, "end_time": { "type": "string", "description": "End time filter (ISO format)" }, "limit": { "type": "integer", "default": 100, "description": "Maximum number of results" } } }
- json_logs_mcp_server.py:437-448 (registration)MCP server tool dispatch in call_tool method: handles query_logs calls by invoking log_analyzer.query_logs, cleans parsed_timestamp, serializes to JSON, and returns as TextContent.if name == "query_logs": results = log_analyzer.query_logs(**arguments) # Remove parsed_timestamp for JSON serialization for entry in results: entry.pop("parsed_timestamp", None) return [ types.TextContent( type="text", text=json.dumps(results, indent=2, default=str) ) ]
- json_logs_mcp_server.py:346-388 (registration)Registration of the query_logs tool in the MCP server's list_tools method, specifying name, description, and input schema.types.Tool( name="query_logs", description="Search and filter log entries across log files", inputSchema={ "type": "object", "properties": { "files": { "type": "array", "items": {"type": "string"}, "description": "Log files to search (default: all files)" }, "level": { "type": "string", "description": "Filter by log level (DEBUG, INFO, WARNING, ERROR, CRITICAL)" }, "module": { "type": "string", "description": "Filter by module name" }, "function": { "type": "string", "description": "Filter by function name" }, "message_contains": { "type": "string", "description": "Filter by message content (case-insensitive)" }, "start_time": { "type": "string", "description": "Start time filter (ISO format)" }, "end_time": { "type": "string", "description": "End time filter (ISO format)" }, "limit": { "type": "integer", "default": 100, "description": "Maximum number of results" } } } ),