Skip to main content
Glama
metrics_intelligence.py82.2 kB
#!/usr/bin/env python3 """ Metrics Intelligence Script This script analyzes metrics datasets in Observe to create rich metadata for semantic search. It focuses specifically on datasets with 'metric' interface and extracts detailed information about individual metrics including their dimensions, value patterns, and usage contexts. Usage: python metrics_intelligence.py --help """ import asyncio import json import logging import os import sys import argparse import time import statistics from datetime import datetime, timedelta, timezone from typing import Dict, List, Optional, Any, Tuple, Set import asyncpg import httpx from dotenv import load_dotenv # ANSI color codes for terminal output class Colors: RED = '\033[91m' GREEN = '\033[92m' YELLOW = '\033[93m' BLUE = '\033[94m' MAGENTA = '\033[95m' CYAN = '\033[96m' WHITE = '\033[97m' BOLD = '\033[1m' UNDERLINE = '\033[4m' RESET = '\033[0m' # Background colors BG_RED = '\033[101m' BG_GREEN = '\033[102m' BG_YELLOW = '\033[103m' BG_BLUE = '\033[104m' class ColoredFormatter(logging.Formatter): """Custom formatter that adds colors to log messages based on level and content.""" def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) # Color mapping for log levels self.level_colors = { logging.DEBUG: Colors.CYAN, logging.INFO: Colors.WHITE, logging.WARNING: Colors.YELLOW, logging.ERROR: Colors.RED, logging.CRITICAL: Colors.RED + Colors.BOLD } # Special patterns for highlighting key events self.event_patterns = { 'unchanged - skipping': Colors.GREEN + Colors.BOLD, 'has changed - performing': Colors.BLUE + Colors.BOLD, 'not found in database': Colors.MAGENTA + Colors.BOLD, 'Excluding metric': Colors.YELLOW, 'Successfully analyzed': Colors.GREEN, 'Progress:': Colors.CYAN + Colors.BOLD, 'Analysis Statistics': Colors.YELLOW + Colors.BOLD + Colors.UNDERLINE, 'Database connection established': Colors.GREEN + Colors.BOLD, 'Metrics analysis completed': Colors.GREEN + Colors.BOLD + Colors.UNDERLINE, 'Discovered': Colors.CYAN, 'Failed': Colors.RED + Colors.BOLD, 'Error': Colors.RED + Colors.BOLD, 'has no metrics data': Colors.RED, 'checking for metrics': Colors.CYAN, 'has metrics - analyzing': Colors.GREEN + Colors.BOLD, 'High cardinality': Colors.YELLOW + Colors.BOLD, } def format(self, record): # Format the message first message = super().format(record) # Apply level-based coloring level_color = self.level_colors.get(record.levelno, Colors.WHITE) # Check for special event patterns and apply specific colors colored_message = message for pattern, color in self.event_patterns.items(): if pattern in message: colored_message = colored_message.replace( pattern, f"{color}{pattern}{Colors.RESET}" ) # Apply level color to the entire message if no special pattern matched if colored_message == message: colored_message = f"{level_color}{message}{Colors.RESET}" return colored_message # Add src directory to path for imports sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..', 'src')) # Load environment variables load_dotenv() # Configure logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) logger = logging.getLogger(__name__) class MetricsIntelligenceAnalyzer: """Analyzes metrics datasets and generates intelligence for semantic search.""" def __init__(self): # Database connection self.db_pool = None # Observe API configuration self.observe_customer_id = os.getenv('OBSERVE_CUSTOMER_ID') self.observe_token = os.getenv('OBSERVE_TOKEN') self.observe_domain = os.getenv('OBSERVE_DOMAIN', 'observe-staging.com') # HTTP client for Observe API self.http_client = httpx.AsyncClient( timeout=httpx.Timeout(60.0, connect=10.0, read=60.0), headers={ 'Authorization': f'Bearer {self.observe_customer_id} {self.observe_token}', 'Content-Type': 'application/json' } ) # Statistics self.stats = { 'datasets_processed': 0, 'datasets_skipped': 0, 'datasets_failed': 0, 'metrics_discovered': 0, 'metrics_processed': 0, 'metrics_skipped': 0, 'metrics_excluded': 0, 'high_cardinality_metrics': 0 } # Rate limiting configuration self.last_observe_call = 0 self.observe_delay = 0.2 # 200ms between Observe API calls self.max_retries = 3 self.base_retry_delay = 1.0 # Cardinality thresholds self.HIGH_CARDINALITY_THRESHOLD = 1000 # Warn about dimensions with >1000 unique values # Force mode flag self.force_mode = False async def rate_limit_observe(self) -> None: """Apply rate limiting for Observe API calls.""" elapsed = time.time() - self.last_observe_call if elapsed < self.observe_delay: await asyncio.sleep(self.observe_delay - elapsed) self.last_observe_call = time.time() async def retry_with_backoff(self, func, *args, **kwargs): """Retry a function with exponential backoff.""" for attempt in range(self.max_retries): try: return await func(*args, **kwargs) except Exception as e: if attempt == self.max_retries - 1: raise e wait_time = self.base_retry_delay * (2 ** attempt) if "timeout" in str(e).lower() or "429" in str(e): wait_time *= 2 elif "502" in str(e) or "503" in str(e) or "504" in str(e): wait_time *= 1.5 logger.warning(f"Attempt {attempt + 1}/{self.max_retries} failed: {e}. Retrying in {wait_time:.1f}s...") await asyncio.sleep(wait_time) async def initialize_database(self) -> None: """Initialize database connection and ensure schema exists.""" db_password = os.getenv('SEMANTIC_GRAPH_PASSWORD') if not db_password: raise ValueError("SEMANTIC_GRAPH_PASSWORD environment variable must be set") db_config = { 'host': os.getenv('POSTGRES_HOST', 'localhost'), 'port': int(os.getenv('POSTGRES_PORT', '5432')), 'database': os.getenv('POSTGRES_DB', 'semantic_graph'), 'user': os.getenv('POSTGRES_USER', 'semantic_graph'), 'password': db_password } try: self.db_pool = await asyncpg.create_pool(**db_config, min_size=1, max_size=5) logger.info("Database connection established") # Create schema if it doesn't exist await self.ensure_schema_exists() except Exception as e: logger.error(f"Failed to connect to database: {e}") raise async def ensure_schema_exists(self) -> None: """Ensure the metrics intelligence schema exists.""" # Get the absolute path to the schema file script_dir = os.path.dirname(os.path.abspath(__file__)) schema_path = os.path.join(script_dir, '..', 'sql', 'metrics_intelligence_schema.sql') with open(schema_path, 'r') as f: schema_sql = f.read() async with self.db_pool.acquire() as conn: await conn.execute(schema_sql) logger.info("Metrics intelligence schema created/verified") async def get_metrics_with_data(self, dataset_id: str) -> Dict[str, int]: """ Get all metrics that have data in the last 24 hours with their counts. More efficient than checking each metric individually. Returns: Dict mapping metric_name -> count for metrics with data """ validation_query = """ statsby count(), group_by(metric) """ try: result = await self.execute_opal_query(dataset_id, validation_query, "240m") if not result: logger.debug(f"No metric data found in dataset {dataset_id} in last 240m") return {} metrics_with_data = {} for row in result: metric_name = row.get('metric', '') count_value = row.get('count()', row.get('count', 0)) if metric_name and count_value: try: count_int = int(str(count_value)) if count_int > 0: metrics_with_data[metric_name] = count_int logger.debug(f"Metric {metric_name} has {count_int} data points in last 24h") except (ValueError, TypeError) as e: logger.debug(f"Could not parse count {count_value} for metric {metric_name}: {e}") # If we can't parse but got result, include it metrics_with_data[metric_name] = 1 # Default to 1 to include logger.info(f"Found {len(metrics_with_data)} metrics with recent data (240m) in dataset {dataset_id}") return metrics_with_data except Exception as e: logger.warning(f"Failed to get metric data counts for dataset {dataset_id}: {e}") # If validation fails completely, return empty dict (fail open at individual level) return {} async def has_recent_data(self, dataset_id: str, metric_name: str) -> bool: """ Check if a metric has any data in the last 24 hours. Note: This is the individual fallback method. Use get_metrics_with_data() for bulk validation when possible. Returns: True if metric has recent data, False if empty/stale """ # Query to check for any data points in the last 24 hours validation_query = f""" filter metric = "{metric_name}" | statsby count() """ try: result = await self.execute_opal_query(dataset_id, validation_query, "240m") if not result or len(result) == 0: logger.debug(f"No data found for metric {metric_name} in last 240m") return False # Check if we got any count > 0 for row in result: count_value = row.get('count()', row.get('count', 0)) if count_value: try: # Convert to int in case it's returned as string count_int = int(str(count_value)) if count_int > 0: logger.debug(f"Metric {metric_name} has {count_int} data points in last 240m") return True except (ValueError, TypeError) as e: logger.debug(f"Could not parse count value {count_value} for metric {metric_name}: {e}") # If we can't parse the count but got a result, assume it has data return True logger.debug(f"Metric {metric_name} has no data points in last 240m") return False except Exception as e: logger.warning(f"Failed to validate data for metric {metric_name}: {e}") # If validation fails, include the metric (fail open) return True async def store_excluded_metric(self, dataset_id: str, dataset_name: str, metric_name: str, exclusion_type: str, exclusion_reason: str) -> None: """Store an excluded metric in the database for tracking purposes.""" try: query = """ INSERT INTO metrics_intelligence ( dataset_id, metric_name, dataset_name, dataset_type, workspace_id, metric_type, unit, description, common_dimensions, dimension_cardinality, sample_dimensions, value_type, value_range, sample_values, data_frequency, last_seen, first_seen, inferred_purpose, typical_usage, business_categories, technical_category, common_fields, nested_field_paths, nested_field_analysis, excluded, exclusion_reason, confidence_score, last_analyzed ) VALUES ( $1, $2, $3, 'unknown', NULL, 'unknown', NULL, NULL, '{}', '{}', '{}', 'unknown', '{}', '{}', 'none', NULL, NULL, 'Excluded metric', 'Not analyzed due to exclusion', '["Unknown"]', 'Unknown', '{}', '{}', '{}', TRUE, $4, 0.0, NOW() ) ON CONFLICT (dataset_id, metric_name) DO UPDATE SET excluded = TRUE, exclusion_reason = EXCLUDED.exclusion_reason, last_analyzed = NOW() """ async with self.db_pool.acquire() as conn: await conn.execute(query, dataset_id, metric_name, dataset_name, exclusion_reason) logger.debug(f"Stored excluded metric: {metric_name} - {exclusion_reason}") except Exception as e: logger.error(f"Failed to store excluded metric {metric_name}: {e}") def should_exclude_metric(self, metric_name: str, labels_or_attributes: Dict[str, Any]) -> Tuple[bool, Optional[str]]: """ Determine if a metric should be excluded from analysis based on static criteria. Note: This is for static exclusions only. Data validation happens separately. Returns: (exclude: bool, reason: str) """ # No static exclusions - analyze all metrics comprehensively # Data validation (empty metrics) is handled separately in has_recent_data() return False, None async def check_metric_needs_update(self, dataset_id: str, metric_name: str, sample_data_count: int) -> bool: """ Check if a metric needs to be updated based on existing database record. Returns True if metric needs analysis, False if it can be skipped. Criteria for skipping: - Metric exists in database with same dataset_id and metric_name - Last analyzed within the last 24 hours - Sample data count is similar (within 20% difference) """ # Force mode: always analyze if self.force_mode: return True try: async with self.db_pool.acquire() as conn: # Check if metric exists and when it was last analyzed result = await conn.fetchrow(""" SELECT last_analyzed, confidence_score FROM metrics_intelligence WHERE dataset_id = $1 AND metric_name = $2 """, dataset_id, metric_name) if not result: # Metric doesn't exist, needs full analysis logger.debug(f"Metric {metric_name} not found in database, needs analysis") return True last_analyzed = result['last_analyzed'] # Check if analyzed within last 24 hours if last_analyzed: # Use timezone-naive datetime for comparison (database stores timezone-naive) hours_since_analysis = (datetime.now() - last_analyzed).total_seconds() / 3600 if hours_since_analysis < 24: logger.info(f"Skipping {metric_name} - analyzed {hours_since_analysis:.1f} hours ago") self.stats['metrics_skipped'] += 1 return False # If we get here, metric exists but needs refresh logger.debug(f"Metric {metric_name} needs refresh - last analyzed {hours_since_analysis:.1f} hours ago") return True except Exception as e: logger.warning(f"Error checking metric update status for {metric_name}: {e}") # If we can't check, err on the side of updating return True async def fetch_metrics_datasets(self) -> List[Dict[str, Any]]: """Fetch all datasets with metric interface from Observe API.""" url = f"https://{self.observe_customer_id}.{self.observe_domain}/v1/dataset" async def _fetch(): await self.rate_limit_observe() response = await self.http_client.get(url) response.raise_for_status() data = response.json() all_datasets = data.get('data', []) # Filter for datasets with metric interface metrics_datasets = [] for dataset in all_datasets: state = dataset.get('state', {}) interfaces = state.get('interfaces', []) # Skip if interfaces is None or not a list if not interfaces or not isinstance(interfaces, list): continue # Check if metric interface exists has_metric_interface = False for iface in interfaces: if isinstance(iface, dict) and iface.get('path') == 'metric': has_metric_interface = True break if has_metric_interface: metrics_datasets.append(dataset) logger.info(f"Discovered {len(metrics_datasets)} datasets with metric interface out of {len(all_datasets)} total") return metrics_datasets try: return await self.retry_with_backoff(_fetch) except Exception as e: logger.error(f"Failed to fetch metrics datasets after retries: {e}") raise async def execute_opal_query(self, dataset_id: str, query: str, time_range: str = "1h") -> Optional[List[Dict[str, Any]]]: """Execute an OPAL query and return parsed results.""" url = f"https://{self.observe_customer_id}.{self.observe_domain}/v1/meta/export/query" payload = { "query": { "stages": [ { "input": [ { "inputName": "main", "datasetId": dataset_id } ], "stageID": "query_stage", "pipeline": query } ] }, "rowCount": "10000" # Allow larger result sets for metric discovery } params = {"interval": time_range} async def _execute_and_parse(): await self.rate_limit_observe() response = await self.http_client.post(url, json=payload, params=params) if response.status_code != 200: logger.warning(f"Query failed for dataset {dataset_id}: {response.status_code}") return None content_type = response.headers.get('content-type', '') logger.debug(f"Response content-type: {content_type} for dataset {dataset_id}") if 'text/csv' in content_type: csv_data = response.text.strip() if not csv_data: logger.debug(f"Empty CSV response for dataset {dataset_id}") return None lines = csv_data.split('\n') logger.debug(f"CSV response has {len(lines)} lines for dataset {dataset_id}") if len(lines) <= 1: logger.debug(f"Only header line found for dataset {dataset_id}") return None # Parse CSV into list of dictionaries header = [col.strip('"') for col in lines[0].split(',')] results = [] for line in lines[1:]: if not line.strip(): continue # Simple CSV parsing (handles quoted fields) values = [] current_value = "" in_quotes = False for char in line: if char == '"' and not in_quotes: in_quotes = True elif char == '"' and in_quotes: in_quotes = False elif char == ',' and not in_quotes: values.append(current_value.strip('"')) current_value = "" else: current_value += char # Don't forget the last value values.append(current_value.strip('"')) # Create row dict row = {} for i, col in enumerate(header): value = values[i] if i < len(values) else '' # Parse JSON fields if col in ['labels', 'attributes', 'resource_attributes', 'meta'] and value: try: row[col] = json.loads(value) except json.JSONDecodeError: row[col] = {} else: row[col] = value results.append(row) return results elif 'application/x-ndjson' in content_type or 'application/json' in content_type: # Handle NDJSON (newline-delimited JSON) format ndjson_data = response.text.strip() if not ndjson_data: logger.debug(f"Empty NDJSON response for dataset {dataset_id}") return None results = [] for line in ndjson_data.split('\n'): if not line.strip(): continue try: row = json.loads(line) results.append(row) except json.JSONDecodeError as e: logger.debug(f"Failed to parse JSON line for dataset {dataset_id}: {e}") continue logger.debug(f"NDJSON response parsed {len(results)} rows for dataset {dataset_id}") return results if results else None logger.debug(f"Unsupported content type received for dataset {dataset_id}: {content_type}") return None try: return await self.retry_with_backoff(_execute_and_parse) except Exception as e: logger.warning(f"Failed to execute query for dataset {dataset_id} after retries: {e}") return None def extract_nested_fields(self, data: Any, parent_path: str = "", max_depth: int = 4) -> Dict[str, Any]: """Recursively extract nested field paths and their value samples.""" if max_depth <= 0: return {} nested_fields = {} if isinstance(data, dict): for key, value in data.items(): current_path = f"{parent_path}.{key}" if parent_path else key # Store this field path if not isinstance(value, (dict, list)): nested_fields[current_path] = { 'type': type(value).__name__, 'sample_value': str(value)[:100] if value is not None else None } # Recurse into nested structures if isinstance(value, dict) and len(value) <= 20: # Limit expansion for large objects nested_fields.update(self.extract_nested_fields(value, current_path, max_depth - 1)) elif isinstance(value, list) and len(value) > 0 and isinstance(value[0], dict): # Analyze first list item for structure nested_fields.update(self.extract_nested_fields(value[0], f"{current_path}[]", max_depth - 1)) return nested_fields def analyze_nested_field_patterns(self, metric_data: List[Dict[str, Any]]) -> Dict[str, Any]: """Analyze patterns in nested JSON fields across the dataset.""" all_nested_fields = {} field_occurrence_count = {} field_cardinality = {} # Analyze nested structures in key JSON fields json_fields_to_analyze = ['labels', 'attributes', 'resource_attributes', 'meta', 'properties', 'context'] for row in metric_data: for field_name in json_fields_to_analyze: if field_name in row and isinstance(row[field_name], dict): nested_fields = self.extract_nested_fields(row[field_name], field_name) for field_path, field_info in nested_fields.items(): # Track occurrence if field_path not in field_occurrence_count: field_occurrence_count[field_path] = 0 field_cardinality[field_path] = set() field_occurrence_count[field_path] += 1 if field_info['sample_value']: field_cardinality[field_path].add(field_info['sample_value']) # Store field info all_nested_fields[field_path] = field_info # Calculate field importance scores total_rows = len(metric_data) important_nested_fields = {} for field_path, occurrence_count in field_occurrence_count.items(): presence_rate = occurrence_count / total_rows if total_rows > 0 else 0 cardinality = len(field_cardinality[field_path]) # Consider field important if present in >5% of records and has reasonable cardinality if presence_rate > 0.05 and cardinality < 1000: important_nested_fields[field_path] = { 'presence_rate': presence_rate, 'cardinality': cardinality, 'sample_values': list(field_cardinality[field_path])[:10], 'field_type': all_nested_fields[field_path]['type'] } return { 'nested_fields': important_nested_fields, 'total_nested_fields_found': len(all_nested_fields), 'important_fields_count': len(important_nested_fields) } async def analyze_metric_dimensions(self, metric_data: List[Dict[str, Any]]) -> Dict[str, Any]: """Analyze dimensions/labels/attributes of a metric with enhanced nested field analysis.""" dimension_values = {} dimension_keys = set() # Enhanced: Analyze nested JSON field patterns nested_analysis = self.analyze_nested_field_patterns(metric_data) for row in metric_data: # Handle different dataset structures dimensions = {} # Prometheus-style labels if 'labels' in row and isinstance(row['labels'], dict): dimensions.update(row['labels']) # OpenTelemetry attributes if 'attributes' in row and isinstance(row['attributes'], dict): dimensions.update(row['attributes']) # Resource attributes if 'resource_attributes' in row and isinstance(row['resource_attributes'], dict): dimensions.update(row['resource_attributes']) # Top-level dimension fields (for ServiceExplorer and other flat-structure metrics) # These datasets have dimensions as top-level fields instead of nested in labels/attributes excluded_top_level_fields = { 'metric', 'value', 'timestamp', 'time', 'metricType', 'tdigestValue', 'meta', 'tags', 'labels', 'attributes', 'resource_attributes' } for key, value in row.items(): # Skip fields we've already processed or that aren't dimensions if key in dimensions or key in excluded_top_level_fields: continue # Skip link fields (link_42160969_2, etc.) - these are internal references if key.startswith('link_'): continue # Add non-null, non-empty top-level fields as dimensions if value is not None and value != '': dimensions[key] = value # Enhanced: Include important nested fields as dimensions for field_path, field_info in nested_analysis['nested_fields'].items(): # Convert nested field path to value if it exists in this row try: parts = field_path.split('.') current_value = row for part in parts: if part.endswith('[]'): part = part[:-2] if isinstance(current_value.get(part), list) and len(current_value[part]) > 0: current_value = current_value[part][0] else: current_value = None break else: current_value = current_value.get(part) if isinstance(current_value, dict) else None if current_value is None: break if current_value is not None: dimensions[field_path] = current_value except (KeyError, TypeError, AttributeError): continue # Collect dimension keys and values for key, value in dimensions.items(): if key not in dimension_values: dimension_values[key] = set() dimension_values[key].add(str(value)) dimension_keys.add(key) # Calculate cardinalities and create summaries common_dimensions = {} dimension_cardinality = {} sample_dimensions = {} for key in dimension_keys: values = dimension_values.get(key, set()) cardinality = len(values) dimension_cardinality[key] = cardinality # Sample up to 10 values sample_values = list(values)[:10] sample_dimensions[key] = sample_values # Consider common if present in >10% of data points and not too high cardinality if len(metric_data) > 0: # Check presence in both nested structures and top-level fields presence_count = 0 for row in metric_data: # Check nested structures found_in_nested = any(key in dims for dims in [ row.get('labels', {}), row.get('attributes', {}), row.get('resource_attributes', {}) ] if isinstance(dims, dict)) # Check top-level (for flat-structure metrics) found_in_top_level = key in row and row[key] is not None and row[key] != '' if found_in_nested or found_in_top_level: presence_count += 1 presence_rate = presence_count / len(metric_data) if presence_rate > 0.1 and cardinality < self.HIGH_CARDINALITY_THRESHOLD: common_dimensions[key] = { 'presence_rate': presence_rate, 'cardinality': cardinality, 'sample_values': sample_values } return { 'common_dimensions': common_dimensions, 'dimension_cardinality': dimension_cardinality, 'sample_dimensions': sample_dimensions, 'nested_field_analysis': nested_analysis } async def analyze_metric_values(self, metric_data: List[Dict[str, Any]]) -> Dict[str, Any]: """Analyze value patterns of a metric.""" values = [] value_type = "unknown" for row in metric_data: if 'value' in row and row['value'] is not None and row['value'] != '': try: val = float(row['value']) values.append(val) except (ValueError, TypeError): continue if not values: return { 'value_type': 'unknown', 'value_range': {}, 'sample_values': [], 'data_frequency': 'unknown' } # Determine value type if all(val == int(val) for val in values[:100]): # Check first 100 value_type = "integer" else: value_type = "float" # Calculate statistics value_range = { 'min': min(values), 'max': max(values), 'avg': statistics.mean(values), 'count': len(values) } if len(values) > 1: value_range['std'] = statistics.stdev(values) # Sample values sample_values = values[:20] # First 20 values # Estimate data frequency based on number of data points if len(values) > 1000: data_frequency = "high" elif len(values) > 100: data_frequency = "medium" else: data_frequency = "low" return { 'value_type': value_type, 'value_range': value_range, 'sample_values': sample_values, 'data_frequency': data_frequency } def detect_metric_type(self, metric_name: str, metric_data: List[Dict[str, Any]], first_row: Dict[str, Any]) -> str: """Detect metric type based on multiple indicators.""" # Check explicit type field first explicit_type = first_row.get('metricType') or first_row.get('type', '') if explicit_type and explicit_type != 'unknown': return explicit_type.lower() # Check for tdigest field (indicates histogram/percentile metric) if any('tdigestValue' in row and row['tdigestValue'] for row in metric_data[:5]): return 'tdigest' # Pattern-based detection from metric name metric_lower = metric_name.lower() # Histogram patterns if any(pattern in metric_lower for pattern in ['_bucket', '_histogram', 'duration_', 'latency_', '_lg_']): return 'histogram' # Counter patterns if any(pattern in metric_lower for pattern in ['_total', '_count', '_sum', 'requests_', 'errors_']): return 'counter' # Gauge patterns if any(pattern in metric_lower for pattern in ['_current', '_usage', '_utilization', 'memory_', 'cpu_']): return 'gauge' # Default to gauge for unknown patterns return 'gauge' def get_metric_type_info(self, metric_type: str) -> Dict[str, str]: """Get metadata about the metric type for analysis context.""" type_info = { 'tdigest': { 'description': 'Distribution metric (latency, duration) - use tdigest functions', 'typical_aggregations': 'tdigest_quantile() for percentiles, tdigest_combine() with align', 'common_use_cases': 'Latency analysis, performance monitoring, SLA tracking' }, 'histogram': { 'description': 'Histogram metric - use tdigest functions for percentiles', 'typical_aggregations': 'tdigest_quantile() for percentiles, tdigest_combine() with align', 'common_use_cases': 'Response time distributions, resource usage patterns' }, 'counter': { 'description': 'Counter metric (monotonically increasing) - use rate() function', 'typical_aggregations': 'rate() for rate calculation, sum() for totals', 'common_use_cases': 'Request counts, error counts, throughput analysis' }, 'gauge': { 'description': 'Gauge metric (point-in-time values) - use avg() or latest values', 'typical_aggregations': 'avg(), min(), max() for current values', 'common_use_cases': 'Resource utilization, queue lengths, current state' } } return type_info.get(metric_type, { 'description': f'Unknown metric type: {metric_type}', 'typical_aggregations': 'sum(), avg(), count() based on use case', 'common_use_cases': 'General metric analysis' }) def expand_metric_keywords(self, metric_name: str) -> Set[str]: """Expand keywords from metric name for better matching.""" expanded_keywords = set() # Add original words name_lower = metric_name.lower() words = name_lower.replace('/', ' ').replace('-', ' ').replace('_', ' ').replace('.', ' ').split() expanded_keywords.update(words) # Add common metric abbreviations and expansions keyword_expansions = { 'cpu': ['cpu', 'processor', 'compute', 'utilization'], 'mem': ['memory', 'mem', 'ram'], 'memory': ['memory', 'mem', 'ram'], 'disk': ['disk', 'storage', 'volume', 'filesystem'], 'net': ['network', 'net', 'bandwidth', 'traffic'], 'network': ['network', 'net', 'bandwidth', 'traffic'], 'req': ['request', 'req', 'http'], 'request': ['request', 'req', 'http'], 'resp': ['response', 'resp', 'http'], 'response': ['response', 'resp', 'http'], 'err': ['error', 'err', 'failure', 'exception'], 'error': ['error', 'err', 'failure', 'exception'], 'latency': ['latency', 'duration', 'time', 'delay'], 'duration': ['duration', 'latency', 'time', 'delay'], 'throughput': ['throughput', 'rate', 'qps', 'rps'], 'rate': ['rate', 'throughput', 'qps', 'rps'], 'qps': ['qps', 'queries', 'rate', 'throughput'], 'rps': ['rps', 'requests', 'rate', 'throughput'], 'http': ['http', 'web', 'request', 'response'], 'web': ['web', 'http', 'request', 'response'], 'db': ['database', 'db', 'sql', 'query'], 'database': ['database', 'db', 'sql', 'query'], 'k8s': ['kubernetes', 'k8s', 'container', 'pod'], 'kubernetes': ['kubernetes', 'k8s', 'container', 'pod'], 'container': ['container', 'pod', 'docker', 'k8s'], 'pod': ['pod', 'container', 'kubernetes', 'k8s'], 'svc': ['service', 'svc', 'app'], 'service': ['service', 'svc', 'app', 'application'], 'app': ['application', 'app', 'service'], 'application': ['application', 'app', 'service'], 'total': ['total', 'sum', 'count', 'aggregate'], 'count': ['count', 'total', 'number'], 'sum': ['sum', 'total', 'aggregate'], 'avg': ['average', 'avg', 'mean'], 'average': ['average', 'avg', 'mean'], 'min': ['minimum', 'min', 'lowest'], 'max': ['maximum', 'max', 'highest'], 'p95': ['percentile95', 'p95', '95th'], 'p99': ['percentile99', 'p99', '99th'], } # Apply expansions for word in list(expanded_keywords): if word in keyword_expansions: expanded_keywords.update(keyword_expansions[word]) return expanded_keywords def expand_keywords(self, name_lower: str) -> Set[str]: """Expand keywords from metric name for better matching.""" expanded_keywords = set() # Add original words words = name_lower.replace('/', ' ').replace('-', ' ').replace('_', ' ').split() expanded_keywords.update(words) # Add common abbreviations and expansions keyword_expansions = { 'k8s': ['kubernetes', 'k8s', 'kube'], 'kubernetes': ['kubernetes', 'k8s', 'kube', 'container', 'pod'], 'app': ['application', 'app', 'service'], 'application': ['application', 'app', 'service'], 'db': ['database', 'db', 'sql'], 'database': ['database', 'db', 'sql'], 'auth': ['authentication', 'auth', 'security'], 'svc': ['service', 'svc', 'app'], 'service': ['service', 'svc', 'app', 'application'], 'otel': ['opentelemetry', 'otel', 'trace', 'span'], 'opentelemetry': ['opentelemetry', 'otel', 'trace', 'span'], 'log': ['logs', 'log', 'logging'], 'logs': ['logs', 'log', 'logging'], 'metric': ['metrics', 'metric', 'monitoring'], 'metrics': ['metrics', 'metric', 'monitoring'], 'infra': ['infrastructure', 'infra', 'system'], 'infrastructure': ['infrastructure', 'infra', 'system'], 'host': ['host', 'node', 'server', 'machine'], 'node': ['node', 'host', 'server', 'machine'], 'network': ['network', 'net', 'connection', 'traffic'], 'net': ['network', 'net', 'connection'], 'user': ['user', 'customer', 'session'], 'error': ['error', 'exception', 'failure', 'issue'], 'perf': ['performance', 'perf', 'latency', 'speed'], 'performance': ['performance', 'perf', 'latency', 'speed'], } # Apply expansions for word in list(expanded_keywords): if word in keyword_expansions: expanded_keywords.update(keyword_expansions[word]) return expanded_keywords def categorize_metric_with_enhanced_matching(self, metric_name: str, expanded_keywords: Set[str], metric_type: str, dimensions: Dict[str, Any]) -> Tuple[List[str], str]: """Enhanced metric categorization using expanded keywords.""" # Enhanced business category matching business_patterns = { "Infrastructure": { 'primary': ['cpu', 'memory', 'disk', 'filesystem', 'host', 'node', 'server', 'system'], 'secondary': ['hardware', 'vm', 'container', 'pod', 'cluster'], 'weight': 3 }, "Application": { 'primary': ['service', 'application', 'app', 'http', 'request', 'response', 'endpoint'], 'secondary': ['api', 'web', 'microservice', 'frontend', 'backend'], 'weight': 3 }, "Database": { 'primary': ['database', 'db', 'sql', 'query', 'transaction', 'connection'], 'secondary': ['table', 'index', 'postgres', 'mysql', 'redis'], 'weight': 3 }, "Network": { 'primary': ['network', 'net', 'bandwidth', 'traffic', 'connection', 'tcp'], 'secondary': ['packet', 'protocol', 'dns', 'load', 'proxy'], 'weight': 2 }, "Storage": { 'primary': ['storage', 'volume', 'disk', 'filesystem', 'file', 'io'], 'secondary': ['backup', 'archive', 'blob', 'object', 'bucket'], 'weight': 2 }, "Monitoring": { 'primary': ['monitor', 'health', 'status', 'check', 'probe'], 'secondary': ['alert', 'notification', 'threshold', 'sla', 'slo'], 'weight': 1 } } business_scores = {} for category, pattern in business_patterns.items(): score = 0 # Primary keywords get higher weight for keyword in pattern['primary']: if keyword in expanded_keywords or keyword in metric_name.lower(): score += pattern['weight'] * 2 # Secondary keywords get lower weight for keyword in pattern['secondary']: if keyword in expanded_keywords or keyword in metric_name.lower(): score += pattern['weight'] business_scores[category] = score # Get business categories - include multiple if they have significant scores business_categories = [] max_score = max(business_scores.values()) if business_scores.values() else 0 if max_score > 0: # Include primary category (highest score) primary_category = max(business_scores, key=business_scores.get) business_categories.append(primary_category) # Include additional categories if they score >= 50% of max score threshold = max_score * 0.5 for category, score in business_scores.items(): if category != primary_category and score >= threshold: business_categories.append(category) # Default to Application if no matches if not business_categories: business_categories = ["Application"] # Enhanced technical category matching technical_patterns = { "Error": { 'keywords': ['error', 'err', 'failure', 'exception', 'fault'], 'metric_types': ['counter'], 'weight': 4 }, "Latency": { 'keywords': ['latency', 'duration', 'time', 'delay', 'response_time'], 'metric_types': ['histogram', 'tdigest', 'gauge'], 'weight': 4 }, "Performance": { 'keywords': ['cpu', 'memory', 'utilization', 'usage', 'load', 'throughput'], 'metric_types': ['gauge', 'counter'], 'weight': 3 }, "Count": { 'keywords': ['count', 'total', 'number', 'requests', 'connections'], 'metric_types': ['counter'], 'weight': 3 }, "Resource": { 'keywords': ['disk', 'storage', 'filesystem', 'volume', 'capacity'], 'metric_types': ['gauge'], 'weight': 2 }, "Throughput": { 'keywords': ['rate', 'qps', 'rps', 'throughput', 'bandwidth'], 'metric_types': ['gauge', 'counter'], 'weight': 3 }, "Availability": { 'keywords': ['health', 'status', 'up', 'down', 'available'], 'metric_types': ['gauge'], 'weight': 2 } } technical_scores = {} for category, pattern in technical_patterns.items(): score = 0 # Keyword matching for keyword in pattern['keywords']: if keyword in expanded_keywords or keyword in metric_name.lower(): score += pattern['weight'] # Metric type matching if metric_type in pattern['metric_types']: score += pattern['weight'] technical_scores[category] = score # Get technical category with highest score technical_category = max(technical_scores, key=technical_scores.get) if max(technical_scores.values()) > 0 else "Performance" return business_categories, technical_category def extract_common_fields(self, metric_data: List[Dict[str, Any]]) -> List[str]: """Extract commonly available fields for grouping.""" common_fields = [] if not metric_data: return common_fields # Check for standard service fields sample_row = metric_data[0] if 'service_name' in sample_row: common_fields.append('service_name') if 'span_name' in sample_row: common_fields.append('span_name') if 'environment' in sample_row: common_fields.append('environment') # Check common label fields labels = sample_row.get('labels', {}) if isinstance(labels, dict): for key in ['service', 'instance', 'job', 'method', 'status_code']: if key in labels: common_fields.append(f'labels.{key}') return common_fields async def generate_metric_analysis(self, metric_name: str, metric_data: List[Dict[str, Any]], dataset_name: str, dimensions: Dict[str, Any], values: Dict[str, Any]) -> Dict[str, Any]: """Generate rule-based analysis of the metric for fast processing.""" # Extract any available metadata first_row = metric_data[0] if metric_data else {} metric_type = self.detect_metric_type(metric_name, metric_data, first_row) unit = first_row.get('unit', '') description = first_row.get('description', '') # Rule-based categorization based on metric name patterns metric_lower = metric_name.lower() # Use enhanced categorization with multi-category support expanded_keywords = self.expand_keywords(metric_lower) business_categories, technical_category = self.categorize_metric_with_enhanced_matching( metric_name, expanded_keywords, metric_type, dimensions ) # Generate simple purpose and usage text purpose = f"Tracks {metric_name.replace('_', ' ')} metrics for {dataset_name}" if description: purpose = description primary_business = business_categories[0] if business_categories else "system" usage = f"Monitor {technical_category.lower()} issues, analyze trends, set alerts, troubleshoot {primary_business.lower()} problems" return { "inferred_purpose": purpose, "typical_usage": usage, "business_categories": business_categories, "technical_category": technical_category, "metric_type": metric_type, "metric_type_info": self.get_metric_type_info(metric_type), "common_fields": self.extract_common_fields(metric_data) } async def store_metric_intelligence(self, metric_data: Dict[str, Any]) -> None: """Store metric intelligence in the database.""" async with self.db_pool.acquire() as conn: await conn.execute(""" INSERT INTO metrics_intelligence ( dataset_id, metric_name, dataset_name, dataset_type, workspace_id, metric_type, unit, description, common_dimensions, dimension_cardinality, sample_dimensions, value_type, value_range, sample_values, data_frequency, last_seen, first_seen, inferred_purpose, typical_usage, business_categories, technical_category, common_fields, nested_field_paths, nested_field_analysis, excluded, exclusion_reason, confidence_score, last_analyzed ) VALUES ( $1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19, $20, $21, $22, $23, $24, $25, $26, $27, $28 ) ON CONFLICT (dataset_id, metric_name) DO UPDATE SET dataset_name = EXCLUDED.dataset_name, dataset_type = EXCLUDED.dataset_type, workspace_id = EXCLUDED.workspace_id, metric_type = EXCLUDED.metric_type, unit = EXCLUDED.unit, description = EXCLUDED.description, common_dimensions = EXCLUDED.common_dimensions, dimension_cardinality = EXCLUDED.dimension_cardinality, sample_dimensions = EXCLUDED.sample_dimensions, value_type = EXCLUDED.value_type, value_range = EXCLUDED.value_range, sample_values = EXCLUDED.sample_values, data_frequency = EXCLUDED.data_frequency, last_seen = EXCLUDED.last_seen, inferred_purpose = EXCLUDED.inferred_purpose, typical_usage = EXCLUDED.typical_usage, business_categories = EXCLUDED.business_categories, technical_category = EXCLUDED.technical_category, common_fields = EXCLUDED.common_fields, nested_field_paths = EXCLUDED.nested_field_paths, nested_field_analysis = EXCLUDED.nested_field_analysis, excluded = EXCLUDED.excluded, exclusion_reason = EXCLUDED.exclusion_reason, confidence_score = EXCLUDED.confidence_score, last_analyzed = EXCLUDED.last_analyzed """, *[ metric_data['dataset_id'], metric_data['metric_name'], metric_data['dataset_name'], metric_data['dataset_type'], metric_data['workspace_id'], metric_data['metric_type'], metric_data['unit'], metric_data['description'], json.dumps(metric_data['common_dimensions']), json.dumps(metric_data['dimension_cardinality']), json.dumps(metric_data['sample_dimensions']), metric_data['value_type'], json.dumps(metric_data['value_range']), list(metric_data['sample_values']) if metric_data['sample_values'] else [], metric_data['data_frequency'], metric_data['last_seen'], metric_data['first_seen'], metric_data['inferred_purpose'], metric_data['typical_usage'], json.dumps(metric_data['business_categories']), metric_data['technical_category'], metric_data['common_fields'], json.dumps(metric_data['nested_field_paths']) if metric_data.get('nested_field_paths') else None, json.dumps(metric_data['nested_field_analysis']) if metric_data.get('nested_field_analysis') else None, metric_data['excluded'], metric_data['exclusion_reason'], metric_data['confidence_score'], datetime.now() ]) async def check_dataset_has_data(self, dataset_id: str, dataset_type: str) -> bool: """Check if a dataset has any data over the last 24 hours.""" # Simple query to check for any data query = "limit 1" url = f"https://{self.observe_customer_id}.{self.observe_domain}/v1/meta/export/query" payload = { "query": { "stages": [ { "input": [ { "inputName": "main", "datasetId": dataset_id } ], "stageID": "query_stage", "pipeline": query } ] }, "rowCount": "1" } # Check 24-hour window for data params = {"interval": "24h"} async def _check_data(): logger.debug(f"checking for data in dataset {dataset_id} (type: {dataset_type}) over 24h") await self.rate_limit_observe() response = await self.http_client.post(url, json=payload, params=params) if response.status_code != 200: logger.debug(f"Data check failed for dataset {dataset_id}: {response.status_code}") return False # Check if response has any data content_type = response.headers.get('content-type', '') response_text = response.text if not response_text or len(response_text.strip()) == 0: logger.debug(f"Dataset {dataset_id} has no data: empty response") return False # Check for permission/access errors in response body # Observe API returns 200 with error messages for inaccessible datasets error_indicators = [ 'cannot be bound', 'is not accessible', 'permission denied', 'not authorized' ] response_lower = response_text.lower() for indicator in error_indicators: if indicator.lower() in response_lower: logger.warning(f"Dataset {dataset_id} access denied (check API token permissions): {response_text[:200]}") return False if 'text/csv' in content_type: csv_data = response_text.strip() lines = csv_data.split('\n') # Has data if more than just header has_data = len(lines) > 1 and len(lines[1].strip()) > 0 logger.debug(f"Dataset {dataset_id} has data: {has_data} (CSV lines: {len(lines)})") return has_data elif 'application/x-ndjson' in content_type or 'application/json' in content_type: # Handle NDJSON format ndjson_data = response_text.strip() lines = [line.strip() for line in ndjson_data.split('\n') if line.strip()] if lines: # Check if we have actual data try: first_obj = json.loads(lines[0]) has_data = len(lines) > 0 logger.debug(f"Dataset {dataset_id} has data: {has_data} (NDJSON lines: {len(lines)})") return has_data except json.JSONDecodeError as e: logger.debug(f"Failed to parse NDJSON for dataset {dataset_id}: {e}") return False else: logger.debug(f"Dataset {dataset_id} has no data: empty NDJSON response") return False else: logger.debug(f"Dataset {dataset_id} unexpected content type: {content_type}") return False try: return await self.retry_with_backoff(_check_data) except Exception as e: logger.warning(f"Failed to check data for dataset {dataset_id} after retries: {e}") return False # Default to excluding dataset if we can't verify it has data async def analyze_dataset(self, dataset: Dict[str, Any]) -> None: """Analyze a single metrics dataset and discover all metrics within it.""" # Extract dataset information meta = dataset.get('meta', {}) config = dataset.get('config', {}) state = dataset.get('state', {}) dataset_id = meta.get('id', '').replace('o::', '').split(':')[-1] dataset_name = config.get('name', '') dataset_type = state.get('kind', '') workspace_id = meta.get('workspaceId', '') if not dataset_id or not dataset_name: logger.warning(f"Skipping dataset with empty ID or name: id='{dataset_id}', name='{dataset_name}'") self.stats['datasets_failed'] += 1 return logger.info(f"Analyzing metrics dataset: {dataset_name} ({dataset_id})") try: # Check if dataset has any data before proceeding logger.info(f"Dataset {dataset_name} checking for data availability") has_data = await self.check_dataset_has_data(dataset_id, dataset_type) if not has_data: logger.info(f"Dataset {dataset_name} has no data - skipping") self.stats['datasets_skipped'] += 1 return logger.info(f"Dataset {dataset_name} has data - analyzing metrics") # Discover all unique metrics in this dataset # Try multiple query variations to handle different dataset structures metrics_queries = [ # Primary: Standard metrics dataset with timestamp field """ filter metric != "" | statsby count:count(), latest_timestamp:max(timestamp), group_by(metric) | sort asc(metric) """, # Fallback 1: Dataset with 'time' field instead of 'timestamp' """ filter metric != "" | statsby count:count(), latest_time:max(time), group_by(metric) | sort asc(metric) """, # Fallback 2: Try '__metric' field (some datasets use this) """ filter __metric != "" | statsby count:count(), group_by(__metric) | sort asc(__metric) """, # Fallback 3: Try 'name' field """ filter name != "" | statsby count:count(), group_by(name) | sort asc(name) """, # Fallback 4: Simple grouping without filtering """ statsby count:count(), group_by(metric) | sort asc(metric) | filter metric != "" """, # Fallback 5: Most basic - just group by any metric-like field """ limit 1000 """ ] metrics_list = None for i, metrics_query in enumerate(metrics_queries): logger.debug(f"Trying metrics query variation {i+1} for dataset {dataset_id}") metrics_list = await self.execute_opal_query(dataset_id, metrics_query, "24h") if metrics_list: logger.debug(f"Query variation {i+1} succeeded for dataset {dataset_id}") # If this was the raw data fallback, extract unique metrics manually if i == len(metrics_queries) - 1: # Last fallback query unique_metrics = set() for row in metrics_list: for field in ['metric', '__metric', 'name', 'metricName']: if field in row and row[field]: unique_metrics.add(row[field]) # Convert to expected format if unique_metrics: metrics_list = [{'metric': m} for m in sorted(unique_metrics)] else: metrics_list = None continue break else: logger.debug(f"Query variation {i+1} failed for dataset {dataset_id}") # Small delay between attempts await asyncio.sleep(0.5) if not metrics_list: logger.warning(f"No metrics found in dataset {dataset_name}") self.stats['datasets_skipped'] += 1 return logger.info(f"Discovered {len(metrics_list)} unique metrics in {dataset_name}") self.stats['metrics_discovered'] += len(metrics_list) # Bulk validate metrics with recent data (more efficient) logger.info(f"Validating data availability for all metrics in {dataset_name}") metrics_with_data = await self.get_metrics_with_data(dataset_id) # Filter metrics list to only those with recent data valid_metrics = [] for metric_info in metrics_list: metric_name = metric_info.get('metric', '') if metric_name in metrics_with_data: valid_metrics.append(metric_info) elif metric_name: # Metric discovered but has no recent data logger.info(f"Excluding metric {metric_name}: no data in last 240m ({metrics_with_data.get(metric_name, 0)} data points)") self.stats['metrics_excluded'] += 1 # Store the exclusion in database await self.store_excluded_metric(dataset_id, dataset_name, metric_name, "no recent data", "No data points in last 240m") if not valid_metrics: logger.warning(f"No metrics with recent data found in {dataset_name}") self.stats['datasets_skipped'] += 1 return logger.info(f"Processing {len(valid_metrics)} metrics with recent data (excluded {len(metrics_list) - len(valid_metrics)} empty metrics)") # OPTIMIZATION: Fetch sample data for all metrics using statsby to ensure we get all metrics logger.info(f"Fetching sample data for all valid metrics (much faster than individual queries)") # Use make_table + dedup to get one representative row per metric with all fields preserved # This is much better than statsby which drops fields with any() functions bulk_query = """ filter metric != "" | make_table | dedup metric """ logger.debug(f"Executing bulk dedup query to get sample data for all metrics") all_metric_samples = await self.execute_opal_query(dataset_id, bulk_query, "240m") if not all_metric_samples: logger.warning(f"No bulk data found for any metrics in {dataset_name}") self.stats['datasets_skipped'] += 1 return logger.info(f"Retrieved sample data for {len(all_metric_samples)} metrics via bulk query") # Filter to only metrics that passed our validation valid_metric_names_set = {m.get('metric', '') for m in valid_metrics if m.get('metric')} valid_samples = [] for sample in all_metric_samples: metric_name = sample.get('metric', '') if metric_name in valid_metric_names_set: valid_samples.append(sample) logger.info(f"Found {len(valid_samples)} valid metrics in bulk sample data") if not valid_samples: logger.warning(f"No valid metrics found in bulk sample data for {dataset_name}") self.stats['datasets_skipped'] += 1 return # For metrics that need detailed analysis, fetch more data efficiently # Use a smaller batch approach for metrics that need full analysis batch_size = 20 # Process metrics in batches to balance efficiency vs. query size for batch_start in range(0, len(valid_samples), batch_size): batch_samples = valid_samples[batch_start:batch_start + batch_size] batch_metrics = [s['metric'] for s in batch_samples] logger.info(f"Processing batch {batch_start//batch_size + 1}/{(len(valid_samples) + batch_size - 1)//batch_size} ({len(batch_metrics)} metrics)") # Fetch detailed data for this batch if len(batch_metrics) <= 10: # Safe size for filter metric in batch_query = f""" filter metric in ({', '.join([f'"{name}"' for name in batch_metrics])}) | limit 1000 """ else: # Fall back to individual queries for large batches batch_query = None if batch_query: batch_detailed_data = await self.execute_opal_query(dataset_id, batch_query, "240m") # Group batch data by metric batch_data_grouped = {} if batch_detailed_data: for row in batch_detailed_data: metric_name = row.get('metric', '') if metric_name in batch_metrics: if metric_name not in batch_data_grouped: batch_data_grouped[metric_name] = [] batch_data_grouped[metric_name].append(row) else: batch_data_grouped = {} # Process each metric in this batch for i, sample in enumerate(batch_samples): metric_name = sample.get('metric', '') if not metric_name: continue global_index = batch_start + i + 1 logger.info(f"Processing metric {global_index}/{len(valid_samples)}: {metric_name}") # Check if metric needs update (skip if analyzed recently) needs_update = await self.check_metric_needs_update(dataset_id, metric_name, 0) if not needs_update: continue # Use batch data if available, otherwise fall back to sample data metric_data = batch_data_grouped.get(metric_name, []) # If no detailed data available, use the full sample row (dedup preserves all fields) if not metric_data: metric_data = [sample] # sample already has all original fields intact if not metric_data: logger.debug(f"No data found for metric {metric_name}") continue logger.debug(f"Analyzing {len(metric_data)} data points for {metric_name}") # Analyze dimensions and values dimensions = await self.analyze_metric_dimensions(metric_data) values = await self.analyze_metric_values(metric_data) # Check for high cardinality dimensions high_card_dims = [k for k, v in dimensions['dimension_cardinality'].items() if v > self.HIGH_CARDINALITY_THRESHOLD] if high_card_dims: logger.warning(f"High cardinality dimensions in {metric_name}: {high_card_dims}") self.stats['high_cardinality_metrics'] += 1 # Generate rule-based analysis logger.debug(f"Generating analysis for {metric_name}") analysis = await self.generate_metric_analysis( metric_name, metric_data, dataset_name, dimensions, values ) # Determine timestamps (handle both ISO format and nanoseconds since epoch) timestamps = [] for row in metric_data: if 'timestamp' not in row: continue try: ts = row['timestamp'] if isinstance(ts, str): # Try ISO format first if 'T' in ts or 'Z' in ts: timestamps.append(datetime.fromisoformat(ts.replace('Z', '+00:00'))) else: # Try parsing as nanoseconds since epoch timestamps.append(datetime.fromtimestamp(int(ts) / 1_000_000_000)) else: # Assume it's nanoseconds since epoch timestamps.append(datetime.fromtimestamp(int(ts) / 1_000_000_000)) except (ValueError, TypeError) as e: logger.debug(f"Failed to parse timestamp {row.get('timestamp')}: {e}") continue first_seen = min(timestamps) if timestamps else datetime.now() last_seen = max(timestamps) if timestamps else datetime.now() # Store metric intelligence await self.store_metric_intelligence({ 'dataset_id': dataset_id, 'metric_name': metric_name, 'dataset_name': dataset_name, 'dataset_type': dataset_type, 'workspace_id': workspace_id, 'metric_type': analysis['metric_type'], 'unit': metric_data[0].get('unit', ''), 'description': metric_data[0].get('description', ''), 'common_dimensions': dimensions['common_dimensions'], 'dimension_cardinality': dimensions['dimension_cardinality'], 'sample_dimensions': dimensions['sample_dimensions'], 'value_type': values['value_type'], 'value_range': values['value_range'], 'sample_values': values['sample_values'], 'data_frequency': values['data_frequency'], 'last_seen': last_seen, 'first_seen': first_seen, 'inferred_purpose': analysis['inferred_purpose'], 'typical_usage': analysis['typical_usage'], 'business_categories': analysis['business_categories'], 'technical_category': analysis['technical_category'], 'excluded': False, 'exclusion_reason': None, 'confidence_score': 1.0, 'common_fields': analysis['common_fields'] }) self.stats['metrics_processed'] += 1 logger.info(f"Successfully analyzed metric: {metric_name}") # Brief pause for database operations await asyncio.sleep(0.01) self.stats['datasets_processed'] += 1 logger.info(f"Successfully analyzed dataset: {dataset_name}") except Exception as e: logger.error(f"Failed to analyze dataset {dataset_name}: {e}") self.stats['datasets_failed'] += 1 async def analyze_all_metrics(self, limit: Optional[int] = None) -> None: """Analyze all metrics datasets from Observe.""" # Print startup banner logger.info("") logger.info("╔═══════════════════════════════════════════════════════════════╗") logger.info("║ 📊 Metrics Intelligence Analyzer ║") logger.info("║ ║") logger.info("║ Analyzing Observe metrics for semantic search discovery ║") if self.force_mode: logger.info("║ 🧹 FORCE MODE ENABLED 🧹 ║") logger.info("╚═══════════════════════════════════════════════════════════════╝") logger.info("") logger.info("🚀 Starting metrics analysis...") # Fetch all metrics datasets datasets = await self.fetch_metrics_datasets() if limit: datasets = datasets[:limit] logger.info(f"Limited analysis to {limit} datasets") # Process datasets for i, dataset in enumerate(datasets): logger.info(f"Progress: {i+1}/{len(datasets)}") await self.analyze_dataset(dataset) # Add delay to avoid overwhelming APIs await asyncio.sleep(1.0) logger.info("Metrics analysis completed") self.print_statistics() def print_statistics(self) -> None: """Print analysis statistics.""" logger.info("") logger.info("╔═══════════════════════════════════════════════════════════════╗") logger.info("║ 📈 Metrics Analysis Statistics ║") logger.info("╠═══════════════════════════════════════════════════════════════╣") logger.info(f"║ Datasets processed: {self.stats['datasets_processed']:>35} ║") logger.info(f"║ Datasets skipped: {self.stats['datasets_skipped']:>37} ║") logger.info(f"║ Datasets failed: {self.stats['datasets_failed']:>38} ║") logger.info(f"║ Metrics discovered: {self.stats['metrics_discovered']:>35} ║") logger.info(f"║ Metrics processed: {self.stats['metrics_processed']:>36} ║") logger.info(f"║ Metrics skipped: {self.stats['metrics_skipped']:>38} ║") logger.info(f"║ Metrics excluded: {self.stats['metrics_excluded']:>37} ║") logger.info(f"║ High cardinality metrics: {self.stats['high_cardinality_metrics']:>27} ║") logger.info("╚═══════════════════════════════════════════════════════════════╝") logger.info("") async def clear_database(self) -> None: """Clear all data from metrics_intelligence table for fresh start.""" async with self.db_pool.acquire() as conn: # Clear the main table and reset sequence (TRUNCATE is faster and resets SERIAL sequences) result = await conn.execute("TRUNCATE TABLE metrics_intelligence RESTART IDENTITY CASCADE") logger.info(f"🧹 Cleared all records from metrics_intelligence table and reset ID sequence") # Force refresh of materialized views and indexes to clear any cached data # This ensures search functions return fresh results try: await conn.execute("VACUUM ANALYZE metrics_intelligence") await conn.execute("REINDEX TABLE metrics_intelligence") logger.info("🧹 Refreshed indexes and statistics") except Exception as e: logger.warning(f"Failed to refresh indexes: {e} (non-critical)") # Clear any potential connection-level query cache await conn.execute("DISCARD ALL") logger.info("🧹 Cleared connection cache") async def cleanup(self) -> None: """Cleanup resources.""" if self.http_client: await self.http_client.aclose() if self.db_pool: await self.db_pool.close() async def main(): parser = argparse.ArgumentParser(description="Analyze Observe metrics for semantic search") parser.add_argument('--limit', type=int, help='Limit number of datasets to analyze') parser.add_argument('--verbose', action='store_true', help='Enable verbose logging') parser.add_argument('--force', action='store_true', help='Force clean database and reprocess all metrics from scratch') args = parser.parse_args() # Configure colored logging handler = logging.StreamHandler() formatter = ColoredFormatter( '%(asctime)s - %(name)s - %(levelname)s - %(message)s', datefmt='%Y-%m-%d %H:%M:%S' ) handler.setFormatter(formatter) # Set up root logger root_logger = logging.getLogger() root_logger.handlers.clear() root_logger.addHandler(handler) root_logger.setLevel(logging.INFO) if args.verbose: root_logger.setLevel(logging.DEBUG) # Reduce noise from HTTP libraries unless in verbose mode if not args.verbose: logging.getLogger('httpx').setLevel(logging.WARNING) logging.getLogger('httpcore').setLevel(logging.WARNING) analyzer = MetricsIntelligenceAnalyzer() try: await analyzer.initialize_database() # Set force mode analyzer.force_mode = args.force # Clear database if force mode is enabled if args.force: logger.info("🧹 Force mode enabled - clearing metrics database...") await analyzer.clear_database() await analyzer.analyze_all_metrics(limit=args.limit) except KeyboardInterrupt: logger.info("Analysis interrupted by user") except Exception as e: logger.error(f"Analysis failed: {e}") raise finally: await analyzer.cleanup() if __name__ == "__main__": asyncio.run(main())

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/rustomax/observe-experimental-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server