query_logs
Search and filter JSON log entries by level, module, function, time range, or message content to analyze application logs.
Instructions
Search and filter log entries across log files
Input Schema
TableJSON Schema
| Name | Required | Description | Default |
|---|---|---|---|
| files | No | Log files to search (default: all files) | |
| level | No | Filter by log level (DEBUG, INFO, WARNING, ERROR, CRITICAL) | |
| module | No | Filter by module name | |
| function | No | Filter by function name | |
| message_contains | No | Filter by message content (case-insensitive) | |
| start_time | No | Start time filter (ISO format) | |
| end_time | No | End time filter (ISO format) | |
| limit | No | Maximum number of results |
Implementation Reference
- json_logs_mcp_server.py:99-163 (handler)Core handler function in JsonLogAnalyzer that reads log files, parses entries, applies filters (level, module, function, message, time range), sorts by timestamp, and limits results.def query_logs(self, files: Optional[List[str]] = None, level: Optional[str] = None, module: Optional[str] = None, function: Optional[str] = None, message_contains: Optional[str] = None, start_time: Optional[str] = None, end_time: Optional[str] = None, limit: int = 100) -> List[Dict[str, Any]]: """Query logs with various filters""" # Determine which files to search if files is None: files = list(self.log_files_cache.keys()) all_entries = [] for filename in files: try: entries = self.read_log_file(filename) all_entries.extend(entries) except (FileNotFoundError, RuntimeError): continue # Apply filters filtered_entries = [] for entry in all_entries: # Level filter if level and entry.get("level", "").upper() != level.upper(): continue # Module filter if module and entry.get("module", "") != module: continue # Function filter if function and entry.get("function", "") != function: continue # Message contains filter if message_contains and message_contains.lower() not in entry.get("message", "").lower(): continue # Time range filters timestamp = entry.get("parsed_timestamp") if start_time: try: start_dt = datetime.fromisoformat(start_time) if timestamp and timestamp < start_dt: continue except ValueError: pass if end_time: try: end_dt = datetime.fromisoformat(end_time) if timestamp and timestamp > end_dt: continue except ValueError: pass filtered_entries.append(entry) # Sort by timestamp (newest first) and limit filtered_entries.sort(key=lambda x: x.get("parsed_timestamp", datetime.min), reverse=True) return filtered_entries[:limit]
- json_logs_mcp_server.py:437-448 (handler)MCP tool dispatch handler in call_tool that invokes the query_logs method and formats results as JSON text content.if name == "query_logs": results = log_analyzer.query_logs(**arguments) # Remove parsed_timestamp for JSON serialization for entry in results: entry.pop("parsed_timestamp", None) return [ types.TextContent( type="text", text=json.dumps(results, indent=2, default=str) ) ]
- json_logs_mcp_server.py:349-387 (schema)Input schema defining parameters for the query_logs tool, including optional filters and limit.inputSchema={ "type": "object", "properties": { "files": { "type": "array", "items": {"type": "string"}, "description": "Log files to search (default: all files)" }, "level": { "type": "string", "description": "Filter by log level (DEBUG, INFO, WARNING, ERROR, CRITICAL)" }, "module": { "type": "string", "description": "Filter by module name" }, "function": { "type": "string", "description": "Filter by function name" }, "message_contains": { "type": "string", "description": "Filter by message content (case-insensitive)" }, "start_time": { "type": "string", "description": "Start time filter (ISO format)" }, "end_time": { "type": "string", "description": "End time filter (ISO format)" }, "limit": { "type": "integer", "default": 100, "description": "Maximum number of results" } } }
- json_logs_mcp_server.py:346-388 (registration)Tool registration in list_tools() where query_logs is defined with name, description, and schema.types.Tool( name="query_logs", description="Search and filter log entries across log files", inputSchema={ "type": "object", "properties": { "files": { "type": "array", "items": {"type": "string"}, "description": "Log files to search (default: all files)" }, "level": { "type": "string", "description": "Filter by log level (DEBUG, INFO, WARNING, ERROR, CRITICAL)" }, "module": { "type": "string", "description": "Filter by module name" }, "function": { "type": "string", "description": "Filter by function name" }, "message_contains": { "type": "string", "description": "Filter by message content (case-insensitive)" }, "start_time": { "type": "string", "description": "Start time filter (ISO format)" }, "end_time": { "type": "string", "description": "End time filter (ISO format)" }, "limit": { "type": "integer", "default": 100, "description": "Maximum number of results" } } } ),