Skip to main content
Glama
piekstra

New Relic MCP Server

by piekstra

generate_log_parsing_rule

Create log parsing rules from queries or sample logs to extract structured data from New Relic log messages, generating GROK patterns and NRQL patterns for analysis.

Instructions

Generate a log parsing rule from either a query or provided samples. Args: log_query: Optional NRQL WHERE clause to fetch logs (e.g., "service = 'api'") log_samples: Optional list of log message samples time_range: Time range for log query (default: "1 hour ago") field_hints: Optional hints for field types (e.g., {"user_id": "UUID"}) account_id: Optional account ID (uses default if not provided) Returns: Generated GROK pattern, NRQL pattern, and analysis

Input Schema

NameRequiredDescriptionDefault
log_queryNo
log_samplesNo
time_rangeNo1 hour ago
field_hintsNo
account_idNo

Input Schema (JSON Schema)

{ "properties": { "account_id": { "anyOf": [ { "type": "string" }, { "type": "null" } ], "default": null, "title": "Account Id" }, "field_hints": { "anyOf": [ { "additionalProperties": { "type": "string" }, "type": "object" }, { "type": "null" } ], "default": null, "title": "Field Hints" }, "log_query": { "anyOf": [ { "type": "string" }, { "type": "null" } ], "default": null, "title": "Log Query" }, "log_samples": { "anyOf": [ { "items": { "type": "string" }, "type": "array" }, { "type": "null" } ], "default": null, "title": "Log Samples" }, "time_range": { "default": "1 hour ago", "title": "Time Range", "type": "string" } }, "type": "object" }

Implementation Reference

  • MCP tool handler function for 'generate_log_parsing_rule'. Registers the tool via @mcp.tool() decorator and handles input/output, delegating core logic to log_parsing.generate_parsing_rule_from_logs.
    @mcp.tool() async def generate_log_parsing_rule( log_query: Optional[str] = None, log_samples: Optional[List[str]] = None, time_range: str = "1 hour ago", field_hints: Optional[Dict[str, str]] = None, account_id: Optional[str] = None, ) -> str: """ Generate a log parsing rule from either a query or provided samples. Args: log_query: Optional NRQL WHERE clause to fetch logs (e.g., "service = 'api'") log_samples: Optional list of log message samples time_range: Time range for log query (default: "1 hour ago") field_hints: Optional hints for field types (e.g., {"user_id": "UUID"}) account_id: Optional account ID (uses default if not provided) Returns: Generated GROK pattern, NRQL pattern, and analysis """ if not client: return json.dumps({"error": "New Relic client not initialized"}) acct_id = account_id or client.account_id if not acct_id: return json.dumps({"error": "Account ID required but not provided"}) try: result = await log_parsing.generate_parsing_rule_from_logs( client, acct_id, log_query, log_samples, time_range, field_hints ) return json.dumps(result, indent=2) except Exception as e: return json.dumps({"error": str(e)}, indent=2)
  • Core implementation of log parsing rule generation. Fetches log samples if needed via NRQL, uses GrokPatternGenerator for multiple samples or single-log function, returns generated GROK/NRQL patterns and analysis.
    async def generate_parsing_rule_from_logs( client, account_id: str, log_query: Optional[str] = None, log_samples: Optional[List[str]] = None, time_range: str = "1 hour ago", field_hints: Optional[Dict[str, str]] = None, ) -> Dict[str, Any]: """ Generate a log parsing rule from either a query or provided samples Args: client: New Relic client account_id: Account ID log_query: Optional NRQL query to fetch logs log_samples: Optional list of log message samples time_range: Time range for log query (default: "1 hour ago") field_hints: Optional hints for field types Returns: Dict containing the generated GROK pattern, NRQL pattern, and analysis """ samples = log_samples or [] # If no samples provided, fetch from New Relic if not samples and log_query: query = f""" SELECT message FROM Log WHERE {log_query} SINCE {time_range} LIMIT 10 """ result = await client.query_nrql(account_id, query) if result and "results" in result: samples = [ r.get("message", "") for r in result["results"] if r.get("message") ] if not samples: raise ValueError("No log samples available to generate pattern") # Use improved pattern generation for single samples if len(samples) == 1: grok_pattern, nrql_pattern = generate_grok_pattern_for_log(samples[0]) # Create a simple analysis for single sample analysis = {"patterns_found": [], "samples_analyzed": 1} suggested_desc = "Auto-generated parsing rule for single log sample" else: generator = GrokPatternGenerator() analysis = generator.analyze_log_samples(samples) grok_pattern, nrql_pattern = generator.generate_grok_pattern( samples, field_hints ) suggested_desc = ( f"Auto-generated parsing rule for {analysis['patterns_found']}" if analysis["patterns_found"] else "Auto-generated parsing rule" ) return { "grok_pattern": grok_pattern, "nrql_pattern": f"SELECT * FROM Log WHERE message LIKE '{nrql_pattern}'", "analysis": analysis, "samples_used": len(samples), "suggested_description": suggested_desc, }
  • GrokPatternGenerator class: core utility for analyzing multiple log samples, detecting patterns (UUID, IP, timestamps, etc.), generating GROK patterns by identifying variable parts and assigning semantic field names/types.
    class GrokPatternGenerator: """Generates GROK patterns from log samples""" COMMON_PATTERNS = { # Basic patterns "WORD": r"[A-Za-z0-9_-]+", "INT": r"[0-9]+", "NUMBER": r"[0-9]+(?:\.[0-9]+)?", "GREEDYDATA": r".*", "DATA": r".*?", "SPACE": r"\s+", "NOTSPACE": r"\S+", # Common log patterns "TIMESTAMP_ISO8601": r"\d{4}-\d{2}-\d{2}[T ]\d{2}:\d{2}:\d{2}(?:\.\d+)?(?:Z|[+-]\d{2}:\d{2})?", "LOGLEVEL": r"(?:DEBUG|INFO|WARN(?:ING)?|ERROR|FATAL|TRACE)", "UUID": r"[0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{12}", "IPV4": r"\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}", "HOSTNAME": r"[a-zA-Z0-9](?:[a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?(?:\.[a-zA-Z0-9](?:[a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?)*", "PATH": r"(?:/[^/\s]*)+", "EMAIL": r"[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}", "URL": r"https?://[^\s]+", "JAVACLASS": r"(?:[a-zA-Z_][a-zA-Z0-9_]*\.)*[a-zA-Z_][a-zA-Z0-9_]*", "JAVAMETHOD": r"[a-zA-Z_][a-zA-Z0-9_]*", "JAVAFILE": r"[a-zA-Z_][a-zA-Z0-9_]*\.java", "QUOTEDSTRING": r'"(?:[^"\\]|\\.)*"|\'(?:[^\'\\]|\\.)*\'', } def analyze_log_samples(self, samples: List[str]) -> Dict[str, Any]: """ Analyze log samples to identify common patterns and extractable fields """ analysis = { "common_prefix": self._find_common_prefix(samples), "common_suffix": self._find_common_suffix(samples), "variable_parts": [], "suggested_fields": [], "patterns_found": [], } # Find variable parts between samples if len(samples) > 1: analysis["variable_parts"] = self._find_variable_parts(samples) # Detect common patterns in the logs for sample in samples: # Check for timestamps if re.search(self.COMMON_PATTERNS["TIMESTAMP_ISO8601"], sample): analysis["patterns_found"].append("timestamp") # Check for log levels if re.search(self.COMMON_PATTERNS["LOGLEVEL"], sample): analysis["patterns_found"].append("loglevel") # Check for UUIDs if re.search(self.COMMON_PATTERNS["UUID"], sample): analysis["patterns_found"].append("uuid") # Check for IPs if re.search(self.COMMON_PATTERNS["IPV4"], sample): analysis["patterns_found"].append("ipv4") # Check for URLs if re.search(self.COMMON_PATTERNS["URL"], sample): analysis["patterns_found"].append("url") # Check for Java stack traces if ( re.search(self.COMMON_PATTERNS["JAVACLASS"], sample) and "Exception" in sample ): analysis["patterns_found"].append("java_stacktrace") # Check for numeric values (potential metrics) numbers = re.findall(r"\b\d+(?:\.\d+)?\b", sample) if numbers: analysis["patterns_found"].append("numeric_values") return analysis def _find_common_prefix(self, samples: List[str]) -> str: """Find the longest common prefix among samples""" if not samples: return "" prefix = samples[0] for sample in samples[1:]: while not sample.startswith(prefix): prefix = prefix[:-1] if not prefix: return "" return prefix def _find_common_suffix(self, samples: List[str]) -> str: """Find the longest common suffix among samples""" if not samples: return "" suffix = samples[0] for sample in samples[1:]: while not sample.endswith(suffix): suffix = suffix[1:] if not suffix: return "" return suffix def _find_variable_parts(self, samples: List[str]) -> List[Dict[str, Any]]: """Identify parts that vary between samples""" if len(samples) < 2: return [] variable_parts = [] # Simple approach: find differences between first two samples s1, s2 = samples[0], samples[1] # Find all differences import difflib matcher = difflib.SequenceMatcher(None, s1, s2) for tag, i1, i2, j1, j2 in matcher.get_opcodes(): if tag == "replace": part1 = s1[i1:i2] part2 = s2[j1:j2] # Try to identify what type of data this is field_type = self._identify_field_type(part1, part2) variable_parts.append( { "position": i1, "sample_values": [part1, part2], "suggested_type": field_type, } ) return variable_parts def _identify_field_type(self, val1: str, val2: str) -> str: """Identify the type of field based on sample values""" # Check if numeric if val1.isdigit() and val2.isdigit(): return "INT" try: float(val1) float(val2) return "NUMBER" except ValueError: pass # Check if UUID uuid_pattern = re.compile( r"^[0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{12}$" ) if uuid_pattern.match(val1) and uuid_pattern.match(val2): return "UUID" # Check if looks like an ID (alphanumeric) if re.match(r"^[A-Za-z0-9_-]+$", val1) and re.match(r"^[A-Za-z0-9_-]+$", val2): if len(val1) == len(val2): return "ID" return "WORD" # Default to GREEDYDATA for complex strings return "GREEDYDATA" def generate_grok_pattern( self, samples: List[str], field_hints: Optional[Dict[str, str]] = None ) -> Tuple[str, str]: """ Generate a GROK pattern from log samples Returns: Tuple of (grok_pattern, nrql_like_pattern) """ if not samples: return "", "" analysis = self.analyze_log_samples(samples) # Start with the first sample as a template template = samples[0] grok_pattern = re.escape(template) nrql_pattern = template # Replace variable parts with GROK patterns for var_part in sorted( analysis["variable_parts"], key=lambda x: x["position"], reverse=True ): pos = var_part["position"] sample_val = var_part["sample_values"][0] field_type = var_part["suggested_type"] # Generate field name from context or use generic name field_name = self._generate_field_name(template, pos, field_type) # Apply field hints if provided if field_hints and field_name in field_hints: field_type = field_hints[field_name] # Create the GROK capture group if field_type == "INT": grok_replacement = f"%{{INT:{field_name}:long}}" elif field_type == "NUMBER": grok_replacement = f"%{{NUMBER:{field_name}:float}}" elif field_type == "UUID": grok_replacement = f"%{{UUID:{field_name}:string}}" elif field_type == "WORD": grok_replacement = f"%{{WORD:{field_name}:string}}" elif field_type == "ID": grok_replacement = f"%{{NOTSPACE:{field_name}:string}}" else: grok_replacement = f"%{{GREEDYDATA:{field_name}:string}}" # Replace in the pattern escaped_val = re.escape(sample_val) grok_pattern = ( grok_pattern[:pos] + grok_replacement + grok_pattern[pos + len(escaped_val) :] ) # Create NRQL LIKE pattern nrql_pattern = ( nrql_pattern[:pos] + "%" + nrql_pattern[pos + len(sample_val) :] ) # Unescape the static parts for readability # grok_pattern = grok_pattern.replace("\\", "") return grok_pattern, nrql_pattern def _generate_field_name( self, template: str, position: int, field_type: str ) -> str: """Generate a meaningful field name based on context""" # Look at surrounding words for context before = template[:position].split()[-1] if position > 0 else "" after = template[position:].split()[0] if position < len(template) else "" # Common patterns if ( "time" in before.lower() or "duration" in before.lower() or "ms" in after.lower() ): return "duration_ms" if "ms" in after else "timestamp" elif "id" in before.lower() or "id" in after.lower(): return ( before.lower().replace(":", "").replace("-", "_") + "_id" if before else "id" ) elif "user" in before.lower(): return "user_id" elif "account" in before.lower(): return "account_id" elif "company" in before.lower(): return "company_id" elif "bytes" in after.lower(): return "bytes" elif field_type == "INT" or field_type == "NUMBER": return "value" elif field_type == "UUID": return "uuid" else: return "field"
  • Helper function for generating GROK patterns from a single log sample, detecting and replacing UUIDs, emails, row counts, booleans, names with appropriate GROK captures.
    def generate_grok_pattern_for_log(log_sample: str) -> Tuple[str, str]: """ Generate a GROK pattern from a single log sample by identifying known patterns This is an improved version that handles your log format better """ grok_pattern = log_sample nrql_pattern = log_sample # Replace UUIDs uuid_pattern = re.compile( r"\b[0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{12}\b" ) for match in uuid_pattern.finditer(log_sample): uuid_str = match.group() # Determine context for field name before_text = log_sample[: match.start()].rstrip() if "user" in before_text.lower()[-20:]: field_name = "user_id" else: field_name = "uuid" grok_pattern = grok_pattern.replace( uuid_str, f"%{{UUID:{field_name}:string}}", 1 ) nrql_pattern = nrql_pattern.replace(uuid_str, "%", 1) # Replace email addresses email_pattern = re.compile(r"\b[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}\b") for match in email_pattern.finditer(log_sample): email_str = match.group() grok_pattern = grok_pattern.replace(email_str, "%{DATA:email:string}", 1) nrql_pattern = nrql_pattern.replace(email_str, "%", 1) # Replace integers in specific contexts (e.g., "updated X rows") # Look for patterns like "updated N rows, expected M" rows_pattern = re.compile( r"(updated|deleted|inserted|affected)\s+(\d+)\s+rows?,\s+expected\s+(\d+)" ) for match in rows_pattern.finditer(log_sample): full_match = match.group() action = match.group(1) replacement = f"{action} %{{INT:rows_affected:long}} rows, expected %{{INT:rows_expected:long}}" grok_pattern = grok_pattern.replace(full_match, replacement, 1) nrql_pattern = nrql_pattern.replace( full_match, f"{action} % rows, expected %", 1 ) # Replace boolean values in key-value pairs bool_pattern = re.compile(r"(\w+)\s+(True|False|true|false)") for match in bool_pattern.finditer(log_sample): full_match = match.group() key = match.group(1) # Convert key to snake_case field name field_name = key.lower() replacement = f"{key} %{{WORD:{field_name}:string}}" grok_pattern = grok_pattern.replace(full_match, replacement, 1) nrql_pattern = nrql_pattern.replace(full_match, f"{key} %", 1) # Replace name patterns (FirstName X, LastName Y) name_pattern = re.compile(r"(FirstName|LastName)\s+([A-Za-z]+)") for match in name_pattern.finditer(log_sample): full_match = match.group() label = match.group(1) field_name = "first_name" if label == "FirstName" else "last_name" replacement = f"{label} %{{WORD:{field_name}:string}}" grok_pattern = grok_pattern.replace(full_match, replacement, 1) nrql_pattern = nrql_pattern.replace(full_match, f"{label} %", 1) # Escape special regex characters for GROK # Parentheses need to be escaped grok_pattern = grok_pattern.replace("(", "\(").replace(")", "\)") return grok_pattern, nrql_pattern

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/piekstra/newrelic-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server