Skip to main content
Glama
opal_validation.py48.3 kB
""" OPAL Query Validation and Auto-Fixing Provides comprehensive validation for OPAL queries to catch errors early and prevent common mistakes. Automatically fixes common patterns and provides educational feedback to help LLMs learn correct syntax over time. """ import re from typing import Tuple, Optional, List from dataclasses import dataclass @dataclass class ValidationResult: """Result of OPAL query validation and transformation.""" is_valid: bool transformed_query: Optional[str] = None # None if no transformations applied transformations: List[str] = None # Descriptions of transformations error_message: Optional[str] = None # Error if validation failed def __post_init__(self): if self.transformations is None: self.transformations = [] def transform_multi_term_angle_brackets(query: str) -> Tuple[str, List[str]]: """ Auto-fix multi-term angle bracket syntax by converting to explicit OR logic. LLMs often write: filter body ~ <error exception fail> Assuming OR semantics (any term matches). But OPAL interprets this as AND (all terms must match). This transformation converts to the assumed OR intent. Examples: Input: filter body ~ <error exception> Output: filter contains(body, "error") or contains(body, "exception") Input: filter message ~ <fail fatal panic> Output: filter contains(message, "fail") or contains(message, "fatal") or contains(message, "panic") Returns: Tuple of (transformed_query, list_of_transformation_descriptions) """ transformations = [] # Pattern to match: fieldname ~ <term1 term2 term3 ...> # We need to capture: # - The field (which could be complex like string(field) or resource_attributes.name) # - The operator (~) # - The multi-term angle bracket content # Match pattern: field name followed by ~ followed by <multiple terms> # Field name can be: word, dotted path, or function call like string(field) # Terms inside <> cannot contain angle brackets or pipes (to avoid matching across multiple patterns) # Use [^<>|]+ to match any chars except < > | , and make it non-greedy with +? # Then require at least one space and more content to ensure multi-term pattern = r'([\w.()\"]+)\s+~\s+<([^<>|]+)>' def replace_func(match): field = match.group(1) terms_str = match.group(2).strip() terms = terms_str.split() # Only transform if there are multiple terms (single-term angle brackets are fine in OPAL) if len(terms) <= 1: return match.group(0) # Return original, no transformation # Build the contains() or chain contains_exprs = [f'contains({field}, "{term}")' for term in terms] or_chain = ' or '.join(contains_exprs) # Wrap in parentheses to ensure correct precedence with pipeline operator replacement = f'({or_chain})' # Create educational feedback terms_preview = ' '.join(terms[:3]) if len(terms) > 3: terms_preview += f" ... ({len(terms)} terms total)" transformations.append( f"✓ Auto-fix applied: Multi-term angle bracket converted to OR logic\n" f" Original: {match.group(0)}\n" f" Fixed: {replacement}\n" f" Reason: <{terms_preview}> uses AND semantics in OPAL (all must match).\n" f" Converted to explicit OR for typical intent (any matches).\n" f" Note: Use single-term syntax if you meant AND: filter {field} ~ {terms[0]} and {field} ~ {terms[1]}" ) return replacement transformed_query = re.sub(pattern, replace_func, query) # Check if any transformations were made if transformed_query != query: return transformed_query, transformations else: return query, [] def transform_redundant_time_filters(query: str, time_range: Optional[str] = None) -> Tuple[str, List[str]]: """ Auto-remove redundant timestamp filters when time_range parameter is set. LLMs often add explicit timestamp filters even when the time_range parameter already constrains the query time window. This creates redundancy and confusion. Examples: Input: filter timestamp > @"1 hour ago" | filter body ~ error Context: time_range="1h" is set Output: filter body ~ error Input: filter body ~ error | filter BUNDLE_TIMESTAMP >= @"2024-01-01" | limit 10 Context: time_range="7d" is set Output: filter body ~ error | limit 10 Only removes filters when: 1. time_range parameter is explicitly set 2. Filter uses common timestamp field names 3. Filter uses @"..." time expression syntax Returns: Tuple of (transformed_query, list_of_transformation_descriptions) """ transformations = [] # Only transform if time_range is actually set if not time_range: return query, [] # Common timestamp field names in OPAL/OpenTelemetry TIME_FIELDS = [ 'timestamp', 'BUNDLE_TIMESTAMP', 'time', '@timestamp', 'event_time', 'eventTime', 'observedTimestamp', 'OBSERVATION_TIME', 'start_time', # OpenTelemetry span start time 'end_time' # OpenTelemetry span end time ] # Pattern to match timestamp filters with @"..." syntax # Matches: filter <time_field> <operator> @"..." # Captures the entire filter operation including surrounding whitespace/pipes time_field_pattern = '|'.join(TIME_FIELDS) # Match patterns: # 1. "filter FIELD OPERATOR @"..." |" (filter in middle/start with following pipe) # 2. "| filter FIELD OPERATOR @"..."" (filter at end or middle with preceding pipe) # Use alternation to handle both cases pattern = rf'(?:^\s*|\|\s*)filter\s+({time_field_pattern})\s*([><=!]+)\s*@"[^"]+"\s*(?:\||$)' matches = list(re.finditer(pattern, query)) if not matches: return query, [] # Process matches in reverse order to preserve positions during removal transformed_query = query for match in reversed(matches): field_name = match.group(1) operator = match.group(2) original_filter = match.group(0) # Determine how to remove the filter cleanly # Need to handle pipes properly to avoid leaving "| |" or starting with "|" start_pos = match.start() end_pos = match.end() # Check what comes before and after before = transformed_query[:start_pos] after = transformed_query[end_pos:] # Clean up pipes: # - If filter starts with "|" and ends with "|", keep one "|" between before/after # - If filter is at start (no "|" before), remove trailing "|" if present # - If filter is at end (no "|" after), remove preceding "|" if present if before.rstrip().endswith('|') or original_filter.lstrip().startswith('|'): # Has preceding pipe if after.lstrip().startswith('|') or original_filter.rstrip().endswith('|'): # Has following pipe - remove filter and keep one pipe before_trimmed = before.rstrip() if before_trimmed.endswith('|'): before_trimmed = before_trimmed[:-1].rstrip() after_trimmed = after.lstrip() if after_trimmed.startswith('|'): after_trimmed = after_trimmed[1:].lstrip() if before_trimmed and after_trimmed: # Both sides have content, need pipe between them transformed_query = before_trimmed + ' | ' + after_trimmed else: # One side is empty, just concatenate transformed_query = before_trimmed + after_trimmed else: # No following pipe - just remove filter and preceding pipe before_trimmed = before.rstrip() if before_trimmed.endswith('|'): before_trimmed = before_trimmed[:-1].rstrip() transformed_query = before_trimmed + ' ' + after.lstrip() else: # No preceding pipe (filter at start) after_trimmed = after.lstrip() if after_trimmed.startswith('|'): after_trimmed = after_trimmed[1:].lstrip() transformed_query = before + after_trimmed # Create feedback for this removal transformations.append( f"✓ Auto-fix applied: Redundant timestamp filter removed\n" f" Removed: filter {field_name} {operator} @\"...\"\n" f" Reason: The time_range=\"{time_range}\" parameter already constrains the query time window.\n" f" Explicit timestamp filters are redundant and can cause confusion.\n" f" Note: To narrow the time window beyond time_range, you can still add timestamp filters,\n" f" but in most cases the time_range parameter is sufficient." ) return transformed_query, transformations def transform_nested_field_quoting(query: str) -> Tuple[str, List[str]]: """ Auto-quote nested field names that contain dots. LLMs often write: resource_attributes.k8s.namespace.name Which OPAL interprets as: resource_attributes → k8s → namespace → name (4 levels) But actually means: resource_attributes → "k8s.namespace.name" (2 levels) This is the #1 cause of "field not found" errors in OpenTelemetry data. Examples: Input: resource_attributes.k8s.namespace.name Output: resource_attributes."k8s.namespace.name" Input: attributes.http.status_code Output: attributes."http.status_code" Input: resource_attributes.service.name Output: resource_attributes.service.name (no change - valid field) Detection strategy: - Look for common parent fields (resource_attributes, attributes, fields) - Followed by known dotted prefixes (k8s., http., service., net., etc.) - Group and quote the dotted portion Returns: Tuple of (transformed_query, list_of_transformation_descriptions) """ transformations = [] # Common parent field names in OpenTelemetry PARENT_FIELDS = [ 'resource_attributes', 'attributes', 'fields', 'span_attributes', 'resource', ] # Known OpenTelemetry attribute prefixes that use dots # See: https://opentelemetry.io/docs/specs/semconv/ DOTTED_PREFIXES = [ 'k8s', # k8s.namespace.name, k8s.pod.name, etc. 'http', # http.status_code, http.method, etc. 'service', # service.instance.id, service.namespace, etc. 'net', # net.host.name, net.peer.name, etc. 'db', # db.system, db.connection_string, etc. 'messaging', # messaging.system, messaging.destination, etc. 'rpc', # rpc.system, rpc.service, etc. 'code', # code.function, code.namespace, etc. 'enduser', # enduser.id, enduser.role, etc. 'thread', # thread.id, thread.name, etc. 'faas', # faas.execution, faas.document, etc. 'peer', # peer.service, etc. 'host', # host.name, host.type, etc. 'container', # container.id, container.name, etc. 'deployment', # deployment.environment, etc. 'telemetry', # telemetry.sdk.name, etc. 'cloud', # cloud.provider, cloud.region, etc. 'aws', # aws.ecs.task.arn, etc. 'gcp', # gcp.gce.instance.name, etc. 'azure', # azure.vm.scaleset.name, etc. ] # Pattern to match field access that might need quoting # Matches: parent_field.prefix.more.stuff # We'll use a callback to inspect and transform each match parent_pattern = '|'.join(PARENT_FIELDS) prefix_pattern = '|'.join(DOTTED_PREFIXES) # Match: (parent_field).(dotted_prefix).rest.of.path # Capture groups: (1) parent field, (2) the FULL dotted path from prefix onward # Use negative lookahead to avoid already-quoted fields: (?!") # IMPORTANT: Wrap prefix_pattern in (?:...) so the dot applies to all alternatives # Then capture the whole expression including the prefix pattern = rf'\b({parent_pattern})\.(?!")((?:{prefix_pattern})\.[a-zA-Z0-9_.]+)' def replace_func(match): parent = match.group(1) dotted_path = match.group(2) # Check if this is already quoted (should be caught by negative lookahead, but extra safety) full_match = match.group(0) match_start = match.start() if match_start > 0 and query[match_start - 1] == '"': return full_match # Already quoted # Build the quoted version replacement = f'{parent}."{dotted_path}"' # Create educational feedback transformations.append( f"✓ Auto-fix applied: Nested field name auto-quoted\n" f" Original: {full_match}\n" f" Fixed: {replacement}\n" f" Reason: Field names containing dots must be quoted in OPAL.\n" f" Without quotes, '{dotted_path}' is interpreted as nested object access.\n" f" Note: OpenTelemetry attributes like 'k8s.namespace.name' are single field names,\n" f" not nested paths. Always quote them: \"{dotted_path}\"" ) return replacement transformed_query = re.sub(pattern, replace_func, query) # Check if any transformations were made if transformed_query != query: return transformed_query, transformations else: return query, [] def transform_sort_syntax(query: str) -> Tuple[str, List[str]]: """ Auto-fix SQL-style sort syntax: sort -field → sort desc(field) LLMs often write: sort -field_name Which is SQL/shell syntax, not OPAL. OPAL requires: sort desc(field_name) or sort asc(field_name) Examples: Input: sort -count Output: sort desc(count) Input: filter error ~ true | sort -timestamp | limit 10 Output: filter error ~ true | sort desc(timestamp) | limit 10 Returns: Tuple of (transformed_query, list_of_transformation_descriptions) """ transformations = [] # Pattern to match: sort -field_name # Captures the field name after the minus sign # Field name can be simple (word) or complex (dotted path, quoted field) pattern = r'\bsort\s+-(\w+(?:\.\w+)*)' def replace_func(match): field_name = match.group(1) original = match.group(0) replacement = f'sort desc({field_name})' transformations.append( f"✓ Auto-fix applied: Sort syntax corrected\n" f" Original: {original}\n" f" Fixed: {replacement}\n" f" Reason: OPAL doesn't support SQL/shell-style 'sort -field' syntax.\n" f" Use 'sort desc(field)' for descending or 'sort asc(field)' for ascending.\n" f" Note: The minus prefix (-) has no meaning in OPAL sort operations." ) return replacement transformed_query = re.sub(pattern, replace_func, query) if transformed_query != query: return transformed_query, transformations else: return query, [] def transform_count_if(query: str) -> Tuple[str, List[str]]: """ Auto-fix count_if() function calls with proper OPAL pattern. LLMs often write: statsby count_if(condition) Which doesn't exist in OPAL. Proper OPAL pattern: make_col flag:if(condition,1,0) | statsby sum(flag) Examples: Input: statsby error_count:count_if(severity="error") Output: make_col __count_if_error_count:if(severity="error",1,0) | statsby error_count:sum(__count_if_error_count) Input: statsby errors:count_if(status_code >= 500), total:count() Output: make_col __count_if_errors:if(status_code >= 500,1,0) | statsby errors:sum(__count_if_errors), total:count() Returns: Tuple of (transformed_query, list_of_transformation_descriptions) """ transformations = [] # Pattern to match: label:count_if(condition) # Captures: (1) optional label before colon, (2) the condition inside count_if() # We need to handle this inside statsby or aggregate contexts pattern = r'\b(\w+):count_if\(([^)]+)\)' matches = list(re.finditer(pattern, query)) if not matches: return query, [] # Check if we're in a statsby or aggregate context # We need to inject a make_col before the statsby/aggregate transformed_query = query # Process each count_if occurrence for match in reversed(matches): # Reverse to preserve positions label = match.group(1) condition = match.group(2) original_expr = match.group(0) # Generate a unique temp field name temp_field = f'__count_if_{label}' # Replace count_if(condition) with sum(temp_field) replacement_agg = f'{label}:sum({temp_field})' transformed_query = transformed_query.replace(original_expr, replacement_agg, 1) # Now we need to inject make_col BEFORE the statsby/aggregate # Find the statsby or aggregate that contains this expression # Look for "statsby" or "aggregate" before our match position # Split query into pipeline stages stages = [s.strip() for s in transformed_query.split('|')] # Find which stage contains our aggregation agg_stage_idx = None for idx, stage in enumerate(stages): if replacement_agg in stage and (stage.startswith('statsby') or stage.startswith('aggregate')): agg_stage_idx = idx break if agg_stage_idx is not None: # Insert make_col stage before the aggregation make_col_stage = f'make_col {temp_field}:if({condition},1,0)' # Check if there's already a make_col in this position if agg_stage_idx > 0 and stages[agg_stage_idx - 1].startswith('make_col'): # Append to existing make_col stages[agg_stage_idx - 1] += f', {temp_field}:if({condition},1,0)' else: # Insert new make_col stage stages.insert(agg_stage_idx, make_col_stage) transformed_query = ' | '.join(stages) transformations.append( f"✓ Auto-fix applied: count_if() converted to OPAL pattern\n" f" Original: {original_expr}\n" f" Fixed: Added 'make_col {temp_field}:if({condition},1,0)' + '{replacement_agg}'\n" f" Reason: OPAL doesn't have count_if() function.\n" f" Use make_col with if() to create a flag, then sum() in aggregation.\n" f" Note: Pattern is: make_col flag:if(condition,1,0) | statsby sum(flag)" ) if transformed_query != query: return transformed_query, transformations else: return query, [] def transform_metric_pipeline(query: str) -> Tuple[str, List[str]]: """ Auto-fix metric queries missing required align verb. LLMs often write: filter m("metric_name") > 0 Or: statsby sum(m("metric_name")) But metrics REQUIRE: align + m() + aggregate pattern Examples: Input: filter m("k8s_pod_cpu_usage") > 1000000 Output: align 5m, cpu:avg(m("k8s_pod_cpu_usage")) | filter cpu > 1000000 Input: statsby errors:sum(m("error_count")), group_by(service_name) Output: align 5m, errors:sum(m("error_count")) | statsby errors:sum(errors), group_by(service_name) Returns: Tuple of (transformed_query, list_of_transformation_descriptions) """ transformations = [] # First, check if query contains m() or m_tdigest() calls has_metric_function = bool(re.search(r'\bm(?:_tdigest)?\s*\(', query)) if not has_metric_function: return query, [] # Check if query already has align verb has_align = bool(re.search(r'\balign\s+', query)) if has_align: # Already has align, no transformation needed return query, [] # Query has m() but no align - need to inject align # Pattern 1: filter m("metric") OPERATOR value # Example: filter m("metric_name") > 0 filter_pattern = r'\bfilter\s+m(?:_tdigest)?\s*\([^)]+\)\s*([><=!]+)\s*([^\s|]+)' filter_match = re.search(filter_pattern, query) if filter_match: # Extract the full m() call m_call_pattern = r'm(?:_tdigest)?\s*\([^)]+\)' m_call = re.search(m_call_pattern, query).group(0) operator = filter_match.group(1) threshold = filter_match.group(2) # Determine aggregation function - default to avg for filter context agg_func = 'avg' if operator in ['>', '>=']: agg_func = 'max' # If filtering for high values, use max elif operator in ['<', '<=']: agg_func = 'min' # If filtering for low values, use min # Build the align stage temp_field = 'metric_value' align_stage = f'align 5m, {temp_field}:{agg_func}({m_call})' # Replace the filter with field reference new_filter = f'filter {temp_field} {operator} {threshold}' original_filter = filter_match.group(0) # Build transformed query rest_of_query = query.replace(original_filter, new_filter, 1) transformed_query = f'{align_stage} | {rest_of_query}' transformations.append( f"✓ Auto-fix applied: Metric query missing align verb\n" f" Original: {original_filter}\n" f" Fixed: {align_stage} | {new_filter}\n" f" Reason: Metrics require the align+m()+aggregate pattern.\n" f" The m() function only works inside align verb.\n" f" Note: align [interval], field:aggregation(m(\"metric_name\"))\n" f" Common intervals: 1m, 5m, 15m, 1h" ) return transformed_query, transformations # Pattern 2: statsby/aggregate with m() calls # Example: statsby errors:sum(m("error_count")) agg_pattern = r'\b(statsby|aggregate)\s+.*?m(?:_tdigest)?\s*\([^)]+\)' agg_match = re.search(agg_pattern, query) if agg_match: # Find all metric aggregations like label:agg_func(m("metric")) metric_agg_pattern = r'(\w+):(sum|avg|min|max|count|tdigest_combine)\s*\(\s*m(?:_tdigest)?\s*\(([^)]+)\)\s*\)' metric_aggs = list(re.finditer(metric_agg_pattern, query)) if not metric_aggs: return query, [] # Build align stage with all metric aggregations align_parts = [] replacements = [] for match in metric_aggs: label = match.group(1) agg_func = match.group(2) metric_name = match.group(3) # Create align aggregation if agg_func == 'tdigest_combine': # Special handling for tdigest align_parts.append(f'{label}:tdigest_combine(m_tdigest({metric_name}))') else: align_parts.append(f'{label}:{agg_func}(m({metric_name}))') # Track replacement: m("metric") -> field_name original = match.group(0) # In statsby, keep the same label but reference the aligned field replacement = f'{label}:{agg_func}({label})' replacements.append((original, replacement)) # Build align stage align_stage = 'align 5m, ' + ', '.join(align_parts) # Replace m() calls in the rest of the query transformed_query = query for original, replacement in replacements: transformed_query = transformed_query.replace(original, replacement, 1) # Prepend align stage transformed_query = f'{align_stage} | {transformed_query}' transformations.append( f"✓ Auto-fix applied: Metric aggregation missing align verb\n" f" Original: {agg_match.group(0)[:80]}...\n" f" Fixed: {align_stage} | ...\n" f" Reason: Metric queries require align before aggregation.\n" f" Pattern: align [interval], field:agg(m(\"metric\")) | aggregate/statsby\n" f" Note: The align stage time-buckets metrics, then you aggregate across dimensions." ) return transformed_query, transformations # If we have m() but didn't match known patterns, return unchanged # The query will likely fail validation, but we don't want to make incorrect assumptions return query, [] def _split_pipeline_safely(query: str) -> list[str]: """ Split an OPAL query into pipeline operations, respecting regex delimiters. The pipe character '|' is used both as: 1. Pipeline operator between OPAL verbs 2. Regex OR operator inside regex patterns (e.g., /error|exception|fail/) This function correctly handles the latter case by tracking regex context. Args: query: The OPAL query string to split Returns: List of pipeline operations (strings) Example: >>> _split_pipeline_safely('filter body ~ /error|exception/i | make_col x:1') ['filter body ~ /error|exception/i', 'make_col x:1'] """ operations = [] current_op = [] in_regex = False in_double_quote = False in_single_quote = False escape_next = False i = 0 while i < len(query): char = query[i] # Handle escape sequences if escape_next: current_op.append(char) escape_next = False i += 1 continue if char == '\\': current_op.append(char) escape_next = True i += 1 continue # Track string contexts (strings can contain / that aren't regex delimiters) if char == '"' and not in_single_quote: in_double_quote = not in_double_quote current_op.append(char) i += 1 continue if char == "'" and not in_double_quote: in_single_quote = not in_single_quote current_op.append(char) i += 1 continue # Don't process special characters inside strings if in_double_quote or in_single_quote: current_op.append(char) i += 1 continue # Track regex context # Regex patterns in OPAL are delimited by / ... / with optional flags like /i if char == '/': # Check if this is a regex delimiter (not division operator) # Heuristic: preceded by whitespace or operators like ~, =, != if i > 0: prev_chars = query[max(0, i-3):i].strip() # Common patterns before regex: "~ /", "= /", "!= /", or start of line if prev_chars.endswith('~') or prev_chars.endswith('=') or prev_chars.endswith('!=') or i == 0: in_regex = not in_regex # If we're already in a regex, this closes it (with possible flags after) elif in_regex: in_regex = False current_op.append(char) # Consume any regex flags (i, g, m, etc.) i += 1 while i < len(query) and query[i] in 'igmsuy': current_op.append(query[i]) i += 1 continue else: # At start of query, assume it's a regex delimiter in_regex = not in_regex current_op.append(char) i += 1 continue # Handle pipe character if char == '|': if in_regex: # Inside regex, | is the OR operator, not a pipeline separator current_op.append(char) else: # Outside regex, | separates pipeline operations op_str = ''.join(current_op).strip() if op_str: operations.append(op_str) current_op = [] i += 1 continue # Regular character current_op.append(char) i += 1 # Add the final operation op_str = ''.join(current_op).strip() if op_str: operations.append(op_str) return operations def validate_opal_query_structure(query: str, time_range: Optional[str] = None) -> ValidationResult: """ Validate OPAL query structure and apply auto-fix transformations. This performs: 1. Auto-fix transformations for common LLM mistakes 2. Structural validation without full semantic parsing 3. Educational feedback on what was changed and why Transformations applied: - Multi-term angle brackets: <error exception> → contains() OR logic - Redundant time filters: Removes timestamp filters when time_range is set - Nested field quoting: resource_attributes.k8s.namespace.name → resource_attributes."k8s.namespace.name" - Sort syntax: sort -field → sort desc(field) - count_if() function: label:count_if(cond) → make_col flag:if(cond,1,0) | statsby label:sum(flag) - Metric pipeline: m() outside align → align + m() + aggregate pattern Validation checks: - Validates all verbs in piped sequences against whitelist - Checks balanced delimiters (prevents malformed queries) - Enforces complexity limits (prevents DoS) The Observe API remains the authoritative validator for full semantics. Args: query: OPAL query string to validate and transform time_range: Optional time range parameter (e.g., "1h", "24h") - if set, redundant time filters will be removed Returns: ValidationResult with: - is_valid: Whether query passed validation - transformed_query: Auto-fixed query (None if no changes) - transformations: List of transformation descriptions - error_message: Error details if validation failed """ all_transformations = [] # Apply transformations before validation # Transform 1: Metric pipeline detection (structural - do this first, before field quoting) # This must come first because it restructures the entire query query, metric_pipeline_transforms = transform_metric_pipeline(query) all_transformations.extend(metric_pipeline_transforms) # Transform 2: Nested field quoting (structural fix) query, field_quoting_transforms = transform_nested_field_quoting(query) all_transformations.extend(field_quoting_transforms) # Transform 3: Multi-term angle brackets query, angle_bracket_transforms = transform_multi_term_angle_brackets(query) all_transformations.extend(angle_bracket_transforms) # Transform 4: Redundant time filters (when time_range is set) query, time_filter_transforms = transform_redundant_time_filters(query, time_range) all_transformations.extend(time_filter_transforms) # Transform 5: Sort syntax (SQL-style to OPAL) query, sort_transforms = transform_sort_syntax(query) all_transformations.extend(sort_transforms) # Transform 6: count_if() function (doesn't exist in OPAL) query, count_if_transforms = transform_count_if(query) all_transformations.extend(count_if_transforms) # Complete list of OPAL functions (476 functions across 11 categories) ALLOWED_FUNCTIONS = { 'abs', 'any', 'any_not_null', 'append_item', 'arccos_deg', 'arccos_rad', 'asc', 'desc', # Sort direction functions 'arcsin_deg', 'arcsin_rad', 'arctan2_deg', 'arctan2_rad', 'arctan_deg', 'arctan_rad', 'array', 'array_agg', 'array_agg_distinct', 'array_concat', 'array_contains', 'array_flatten', 'array_length', 'array_null', 'array_to_string', 'array_union_agg', 'atoi', 'avg', 'base64decode', 'base64encode', 'bin_end_time', 'bin_size', 'bin_start_time', 'bool', 'bool_null', 'cast', 'cbrt', 'ceil', 'check_json', 'coalesce', 'concat', 'concat_arrays', 'concat_strings', 'contains', 'cos_deg', 'cos_rad', 'cosh', 'count', 'count_distinct', 'count_distinct_exact', 'count_regex_matches', 'decode_base64', 'decode_uri', 'decode_uri_component', 'degrees', 'delta', 'delta_monotonic', 'dense_rank', 'deriv', 'desc', 'detect_browser', 'distinct_count', 'drop_fields', 'duration', 'duration_hr', 'duration_min', 'duration_ms', 'duration_ns', 'duration_null', 'duration_sec', 'duration_us', 'editdistance', 'embed_sql_params', 'encode_base64', 'encode_uri', 'encode_uri_component', 'ends_with', 'eq', 'ewma', 'exp', 'exponential_histogram_null', 'extract_all', 'filter_index', 'find_index', 'first', 'first_not_null', 'firstnotnull', 'float64', 'float64_null', 'floor', 'fold_any', 'fold_interval', 'format_date', 'format_duration', 'format_time', 'format_url', 'frac', 'frame', 'frame_exact', 'frame_following', 'frame_preceding', 'from_base64', 'from_epochms', 'from_epochns', 'from_hex', 'from_json', 'from_milliseconds', 'from_nanoseconds', 'from_proto_timestamp', 'from_seconds', 'from_url', 'get_field', 'get_item', 'get_jmespath', 'get_regex', 'get_regex_all', 'group_by', 'gt', 'gte', 'hash', 'hash_agg', 'hash_agg_distinct', 'haversine_distance_km', 'histogram_combine', 'histogram_fraction', 'histogram_null', 'histogram_quantile', 'host', 'if', 'if_null', 'in', 'index_of_item', 'insert_item', 'int', 'int64', 'int64_null', 'int64_to_ipv4', 'int_div', 'intersect_arrays', 'ipaddr', 'ipsubnet', 'ipv4', 'ipv4_address_in_network', 'ipv4_network_int64', 'ipv4_to_int64', 'is_ipv4', 'is_ipv6', 'is_null', 'is_private_ip', 'is_url', 'json_extract', 'json_group_object', 'label', 'lag', 'lag_not_null', 'last', 'last_not_null', 'lastnotnull', 'lead', 'lead_not_null', 'left', 'left_pad', 'length', 'like', 'ln', 'log', 'log10', 'log2', 'lower', 'lt', 'lte', 'ltrim', 'm', 'm_exponential_histogram', 'm_histogram', 'm_object', 'm_tdigest', 'make_array', 'make_array_range', 'make_col', 'make_col_aggregated', 'make_fields', 'make_object', 'make_resource', 'make_set_col', 'map_get', 'map_keys', 'map_values', 'match', 'match_regex', 'max', 'md5', 'median', 'median_exact', 'merge_objects', 'metric', 'min', 'mod', 'mode', 'nanoseconds', 'nanoseconds_to_milliseconds', 'nanoseconds_to_seconds', 'ne', 'not_null', 'now', 'nth', 'null_if', 'nullsfirst', 'nullslast', 'num_bytes', 'num_codepoints', 'numeric_null', 'object', 'object_agg', 'object_keys', 'object_null', 'on', 'order_by', 'otel_exponential_histogram_quantile', 'otel_exponential_histogram_sum', 'otel_histogram_quantile', 'otel_histogram_sum', 'object_delete', 'options', 'parse_csv', 'parse_duration', 'parse_hex', 'parse_ip', 'parse_isotime', 'parse_json', 'parse_key_value', 'parse_kvs', 'parse_time', 'parse_timestamp', 'parse_url', 'parse_user_agent', 'path', 'path_exists', 'percentile', 'percentile_cont', 'percentile_disc', 'pi', 'pick_fields', 'pivot_array', 'position', 'pow', 'prepend_item', 'primary_key', 'prom_quantile', 'protocol', 'query_end_time', 'query_param', 'query_params', 'query_start_time', 'radians', 'radians_to_degrees', 'rand', 'rank', 'rate', 'regex', 'regextract', 'regexmatch', 'replace', 'replace_regex', 'right', 'right_pad', 'round', 'row_end_time', 'row_number', 'row_timestamp', 'rpad', 'rtrim', 'same', 'search', 'sha1', 'sha2', 'sha256', 'sign', 'sin_deg', 'sin_rad', 'sinh', 'slice', 'slice_array', 'sort_array', 'split', 'split_part', 'sqrt', 'starts_with', 'stddev', 'string', 'string_agg', 'string_agg_distinct', 'string_null', 'strip_null_columns', 'strip_prefix', 'strip_suffix', 'strlen', 'strpos', 'substr', 'sum', 'tags', 'tan_deg', 'tan_rad', 'tanh', 'tdigest', 'tdigest_agg', 'tdigest_combine', 'tdigest_merge', 'tdigest_null', 'tdigest_quantile', 'time_bucket', 'timestamp', 'timestamp_ms', 'timestamp_ns', 'timestamp_null', 'timestamp_sec', 'timestamp_us', 'to_base64', 'to_days', 'to_hex', 'to_hours', 'to_json', 'to_lowercase', 'to_milliseconds', 'to_minutes', 'to_nanoseconds', 'to_proto_timestamp', 'to_seconds', 'to_uppercase', 'to_url', 'to_weeks', 'tokenize', 'tokenize_part', 'tokenize_pattern', 'top', 'topk_agg', 'trim', 'trunc', 'typeof', 'uniform', 'unnest', 'unnest_cols', 'unpivot_array', 'upper', 'url_encode', 'urlparse', 'valid_for', 'value_counts', 'variant_null', 'variant_type_name', 'variance', 'width_bucket', 'window', 'zipf', # Additional window/analytic functions (already listed above in different context) 'first_value', 'last_value', 'nth_value', 'percent_rank', 'cume_dist', 'ntile', 'group_by_all', 'get_field', 'set_field', 'delete_field', 'at', 'every', 'extract_values', 'is_array', 'is_bool', 'is_int', 'is_float', 'is_object', 'is_string', 'get_or_default', 'exists', 'not_exists', 'keys', 'values', 'entries', 'merge_objects', 'parse_xml', 'case', 'when', 'otherwise', 'split_to_array', 'array_join', 'slice_array', 'reverse_array', 'sort_array', 'map', 'filter', 'reduce', 'zip', 'unzip', 'transpose', 'pivot', 'unpivot', 'cross_join', 'lateral_view', 'explode', 'explode_outer', 'posexplode', 'posexplode_outer', 'inline', 'inline_outer', 'stack', 'sequence', 'array_repeat', 'array_position', 'array_remove', 'array_distinct', 'array_intersect', 'array_union', 'array_except', 'shuffle', 'arrays_overlap', 'array_sort', 'array_max', 'array_min', 'flatten', 'array_compact', 'element_at', 'cardinality', 'size', 'sort_by', 'aggregate', 'transform', 'exists_any', 'forall', 'zip_with', 'map_filter', 'map_zip_with', 'transform_keys', 'transform_values', 'map_from_arrays', 'map_from_entries', 'map_concat', 'str_to_map', 'from_csv', 'schema_of_csv', 'schema_of_json', 'to_csv', 'sentences', 'named_struct', 'bit_length', 'char_length', 'character_length', 'ascii', 'base64', 'unbase64', 'decode', 'encode', 'format_number', 'format_string', 'initcap', 'lcase', 'locate', 'lpad', 'octet_length', 'overlay', 'position', 'printf', 'regexp_extract', 'regexp_extract_all', 'regexp_replace', 'repeat', 'reverse', 'right', 'rpad', 'soundex', 'space', 'split_string', 'substring', 'substring_index', 'translate', 'ucase', 'unhex', 'xpath', 'xpath_boolean', 'xpath_double', 'xpath_float', 'xpath_int', 'xpath_long', 'xpath_number', 'xpath_short', 'xpath_string' } # SQL→OPAL translation hints for common mistakes SQL_FUNCTION_HINTS = { 'count_if': 'OPAL doesn\'t have count_if(). Use: make_col flag:if(condition,1,0) | statsby sum(flag)', 'len': 'OPAL doesn\'t have len(). Use: strlen(string) or array_length(array)', 'length': 'In OPAL, use: strlen(string) or array_length(array)', 'isnull': 'OPAL doesn\'t have isnull(). Use: is_null(field)', 'ifnull': 'OPAL doesn\'t have ifnull(). Use: coalesce(field, default_value)', 'nvl': 'OPAL doesn\'t have nvl(). Use: coalesce(field, default_value)', 'decode': 'OPAL doesn\'t have SQL-style decode(). Use: if() or case expressions', 'to_char': 'OPAL doesn\'t have to_char(). Use: format_date() or string()', 'to_date': 'OPAL doesn\'t have to_date(). Use: parse_time() or parse_timestamp()', 'sysdate': 'OPAL doesn\'t have sysdate. Use: now()', 'getdate': 'OPAL doesn\'t have getdate(). Use: now()' } # Complete list of OPAL verbs (108 verbs across 6 categories) ALLOWED_VERBS = { # Aggregate verbs 'aggregate', 'align', 'always', 'bottomk', 'dedup', 'distinct', 'ever', 'fill', 'histogram', 'make_event', 'merge_events', 'never', 'rollup', 'statsby', 'timechart', 'timeshift', 'timestats', 'top', 'topk', 'top_logsources', 'window', 'bottom', # Filter verbs 'filter', 'filter_last', 'filter_null', 'filter_repeated_source', 'filter_repeated_value', 'flatten_all', 'flatten_leaves', 'flatten_single', 'limit', 'sample', 'search', 'tail', 'union', 'unsort', 'where', # Join verbs 'join', 'join_lookup', 'lookup', 'lookup_add', 'lookup_ip_info', 'set_col_visible', 'set_link', 'set_metric', 'top_grouping', 'follow', 'leftjoin', 'fulljoin', 'follow_not', 'not_exists', 'surrounding', 'update_resource', # Metadata verbs 'add_key', 'drop_col', 'drop_interface', 'extract_regex', 'interface', 'make_col', 'make_interval', 'make_metric', 'make_reference', 'make_resource', 'make_session', 'make_set_col', 'make_table', 'pick', 'pick_col', 'remove_col', 'rename_col', 'set_col', 'set_col_enum', 'set_col_immutable', 'set_col_searchable', 'set_col_tag', 'set_label', 'set_metadata', 'set_metric_metadata', 'set_primary_key', 'set_severity', 'set_timestamp', 'set_type', 'set_valid_from', 'set_valid_to', 'unset_all_links', 'unset_keys', 'unset_link', 'unwrap', # Projection verbs 'colcount', 'columns', 'exists', 'fields', 'head', 'metadata', 'sample_distinct', 'set_dataset_metadata', # Semistructured verbs 'expand', 'flatten', 'make_fields', 'parse_csv', 'parse_kvs', 'parse_xml', 'pivot', 'unflatten', 'unnest', 'unpivot', 'make_object', # Sort verbs 'sort' } # 1. Check for balanced parentheses, brackets, and braces if query.count('(') != query.count(')'): return ValidationResult( is_valid=False, transformed_query=query if all_transformations else None, transformations=all_transformations, error_message="Unbalanced parentheses in OPAL query" ) if query.count('[') != query.count(']'): return ValidationResult( is_valid=False, transformed_query=query if all_transformations else None, transformations=all_transformations, error_message="Unbalanced brackets in OPAL query" ) if query.count('{') != query.count('}'): return ValidationResult( is_valid=False, transformed_query=query if all_transformations else None, transformations=all_transformations, error_message="Unbalanced braces in OPAL query" ) # 2. Check for balanced quotes (simplified - just count double quotes) # More sophisticated quote handling would require state machine double_quote_count = query.count('"') if double_quote_count % 2 != 0: return ValidationResult( is_valid=False, transformed_query=query if all_transformations else None, transformations=all_transformations, error_message="Unbalanced double quotes in OPAL query" ) # 3. Check query complexity (prevent DoS) MAX_OPERATIONS = 20 operations = _split_pipeline_safely(query) if len(operations) > MAX_OPERATIONS: return ValidationResult( is_valid=False, transformed_query=query if all_transformations else None, transformations=all_transformations, error_message=f"Query too complex: {len(operations)} operations (max {MAX_OPERATIONS})" ) # 4. Check nesting depth (prevent stack overflow) MAX_NESTING = 10 max_depth = 0 current_depth = 0 for char in query: if char in '({[': current_depth += 1 max_depth = max(max_depth, current_depth) elif char in ')}]': current_depth -= 1 if max_depth > MAX_NESTING: return ValidationResult( is_valid=False, transformed_query=query if all_transformations else None, transformations=all_transformations, error_message=f"Query nesting too deep: {max_depth} levels (max {MAX_NESTING})" ) # 5. Validate all verbs in the pipeline (not just the first one) for i, operation in enumerate(operations, 1): # Extract the first word (the verb) # Use regex to extract just the verb name (alphanumeric + underscore) # This handles cases like "union(" where there's no space before the parenthesis verb_match = re.match(r'^([a-zA-Z_][a-zA-Z0-9_]*)', operation.strip()) if not verb_match: continue first_word = verb_match.group(1) # Check if it's a valid OPAL verb if first_word not in ALLOWED_VERBS: similar_verbs = [v for v in ALLOWED_VERBS if v.startswith(first_word[:3])][:5] suggestion = f" Similar verbs: {', '.join(similar_verbs)}" if similar_verbs else "" return ValidationResult( is_valid=False, transformed_query=query if all_transformations else None, transformations=all_transformations, error_message=( f"Unknown OPAL verb '{first_word}' at position {i} in pipeline. " f"Valid verbs include: filter, make_col, statsby, timechart, sort, etc.{suggestion} " f"(see https://docs.observeinc.com/en/latest/content/query-language-reference/ListOfOPALVerbs.html)" ) ) # 6. Validate function calls (including nested functions) # Use regex to find all function-like patterns: word followed by ( function_pattern = r'\b([a-zA-Z_][a-zA-Z0-9_]*)\s*\(' function_matches = re.findall(function_pattern, query) # Check each function against the whitelist # Skip verbs that happen to have parentheses (like union(...), pivot(...)) for func_name in set(function_matches): # Skip if it's actually a verb, not a function if func_name in ALLOWED_VERBS: continue if func_name not in ALLOWED_FUNCTIONS: # Check if it's a common SQL function with a hint if func_name in SQL_FUNCTION_HINTS: return ValidationResult( is_valid=False, transformed_query=query if all_transformations else None, transformations=all_transformations, error_message=f"Unknown function '{func_name}()'. {SQL_FUNCTION_HINTS[func_name]}" ) else: # Provide helpful similar function suggestions similar_funcs = [f for f in ALLOWED_FUNCTIONS if f.startswith(func_name[:3])][:5] suggestion = f" Similar functions: {', '.join(similar_funcs)}" if similar_funcs else "" return ValidationResult( is_valid=False, transformed_query=query if all_transformations else None, transformations=all_transformations, error_message=( f"Unknown function '{func_name}()'. " f"Valid OPAL functions: count, sum, avg, if, contains, string, parse_json, etc.{suggestion} " f"(see https://docs.observeinc.com/en/latest/content/query-language-reference/ListOfOPALFunctions.html)" ) ) # NOTE: Common syntax issues are now AUTO-FIXED above: # - Multi-term angle bracket syntax → contains() OR logic (transform_multi_term_angle_brackets) # - SQL-style sort syntax → sort desc(field) (transform_sort_syntax) # - count_if() function → make_col + if() + sum() (transform_count_if) # No longer blocking - we automatically convert to correct OPAL syntax # All validations passed return ValidationResult( is_valid=True, transformed_query=query if all_transformations else None, transformations=all_transformations, error_message=None )

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/rustomax/observe-experimental-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server