Skip to main content
Glama

MCP-Ambari-API

functions.pyβ€’69.8 kB
""" Utility functions for Ambari API operations. This module contains helper functions used by the main Ambari API tools. """ import os import aiohttp import json import datetime import logging import time from datetime import timedelta from typing import Dict, Optional, Tuple, List, Any, Iterable, Set import re from base64 import b64encode from functools import wraps # Set up logging logger = logging.getLogger("AmbariService") # ----------------------------------------------------------------------------- # Decorator for uniform tool call logging # ----------------------------------------------------------------------------- def log_tool(func): """Decorator for uniform tool call logging with timing and result categorization.""" tool_name = func.__name__ @wraps(func) async def wrapper(*args, **kwargs): start = time.monotonic() # Avoid logging huge argument content; simple key=... pairs try: arg_preview = [] if kwargs: for k, v in kwargs.items(): if v is None: continue sv = str(v) if len(sv) > 120: sv = sv[:117] + '…' arg_preview.append(f"{k}={sv}") logger.info(f"TOOL START {tool_name} {' '.join(arg_preview)}") result = await func(*args, **kwargs) duration_ms = (time.monotonic() - start) * 1000 # Categorize result if isinstance(result, str) and result.startswith("Error:"): logger.warning(f"TOOL ERROR_RETURN {tool_name} took={duration_ms:.1f}ms len={len(result)}") elif isinstance(result, str) and result.startswith("[ERROR]"): logger.warning(f"TOOL ERROR_RETURN {tool_name} took={duration_ms:.1f}ms len={len(result)}") else: logger.info(f"TOOL SUCCESS {tool_name} took={duration_ms:.1f}ms len={len(result) if hasattr(result,'__len__') else 'NA'}") return result except Exception: duration_ms = (time.monotonic() - start) * 1000 logger.exception(f"TOOL EXCEPTION {tool_name} failed after {duration_ms:.1f}ms") raise return wrapper # Ambari API connection information environment variable settings AMBARI_HOST = os.environ.get("AMBARI_HOST", "localhost") AMBARI_PORT = os.environ.get("AMBARI_PORT", "8080") AMBARI_USER = os.environ.get("AMBARI_USER", "admin") AMBARI_PASS = os.environ.get("AMBARI_PASS", "admin") AMBARI_CLUSTER_NAME = os.environ.get("AMBARI_CLUSTER_NAME", "c1") # AMBARI API base URL configuration AMBARI_API_BASE_URL = f"http://{AMBARI_HOST}:{AMBARI_PORT}/api/v1" # Ambari Metrics (AMS) connection settings AMBARI_METRICS_HOST = os.environ.get("AMBARI_METRICS_HOST", AMBARI_HOST) AMBARI_METRICS_PORT = os.environ.get("AMBARI_METRICS_PORT", os.environ.get("AMBARI_METRICS_COLLECTOR_PORT", "6188")) AMBARI_METRICS_PROTOCOL = os.environ.get("AMBARI_METRICS_PROTOCOL", "http") AMBARI_METRICS_BASE_URL = f"{AMBARI_METRICS_PROTOCOL}://{AMBARI_METRICS_HOST}:{AMBARI_METRICS_PORT}/ws/v1/timeline" AMBARI_METRICS_TIMEOUT = float(os.environ.get("AMBARI_METRICS_TIMEOUT", "10")) PRECISION_RULES_MS = ( (6 * 3600 * 1000, "seconds"), # up to ~6 hours β†’ seconds resolution (7 * 24 * 3600 * 1000, "minutes"), # up to ~7 days β†’ minutes (31 * 24 * 3600 * 1000, "hours"), # up to ~1 month β†’ hours (float("inf"), "days"), # beyond β†’ days ) # Cached metadata settings AMBARI_METRICS_METADATA_TTL = float(os.environ.get("AMBARI_METRICS_METADATA_TTL", "300")) # Default appIds we prioritise when metadata discovery yields no entries. CURATED_METRIC_APP_IDS = [ "ambari_server", "namenode", "datanode", "nodemanager", "resourcemanager", ] # Synonyms used to resolve user-provided appId hints to canonical AMS identifiers. APP_SYNONYMS: Dict[str, Tuple[str, ...]] = { "HOST": ("host", "hardware", "system"), "ambari_server": ("ambari", "server", "ambari_server"), "namenode": ("namenode", "hdfs", "nn", "name node"), "datanode": ("datanode", "dn", "data node"), "nodemanager": ("nodemanager", "nm", "node manager"), "resourcemanager": ("resourcemanager", "rm", "resource manager", "yarn"), } # AppIds that should not surface in catalog listings (internal collectors, smoke tests, etc.). EXCLUDED_APP_IDS = {"ams-hbase", "amssmoketestfake"} # Cache grouping of appId β†’ metric name list built from AMS metadata responses. _DYNAMIC_CATALOG_CACHE: Dict[str, Any] = { "timestamp": 0.0, "catalog": {}, # str (appId as returned by AMS) -> List[str] "lookup": {}, # str (lowercase appId) -> canonical appId } # Private caches for metadata responses (key β†’ {timestamp, entries}) _METRICS_METADATA_CACHE: Dict[str, Dict[str, Any]] = {} # Tokenization helper (split on non-alphanumeric) _TOKEN_SPLIT_RE = re.compile(r"[^a-z0-9]+") def format_timestamp(timestamp, is_milliseconds=True): """Convert timestamp to human readable format with original value in parentheses""" if not timestamp: return "N/A" try: # Handle string timestamps by converting to int first if isinstance(timestamp, str): try: timestamp = int(timestamp) except ValueError: return f"{timestamp} (Invalid timestamp format)" # If timestamp is in milliseconds, divide by 1000 if is_milliseconds: dt = datetime.datetime.fromtimestamp(timestamp / 1000, tz=datetime.timezone.utc) else: dt = datetime.datetime.fromtimestamp(timestamp, tz=datetime.timezone.utc) formatted_time = dt.strftime('%Y-%m-%d %H:%M:%S UTC') return f"{timestamp} ({formatted_time})" except (ValueError, OSError, TypeError) as e: return f"{timestamp} (Invalid timestamp)" def safe_timestamp_compare(timestamp, threshold, operator='>='): """ Safe timestamp comparison handling both str and int types. Args: timestamp: The timestamp to compare (can be str or int) threshold: The threshold to compare against (can be str or int) operator: Comparison operator ('>', '>=', '<', '<=') Returns: bool: Result of comparison, False if conversion fails """ try: # Convert string to int if needed if isinstance(timestamp, str): timestamp = int(timestamp) if isinstance(threshold, str): threshold = int(threshold) if operator == '>': return timestamp > threshold elif operator == '>=': return timestamp >= threshold elif operator == '<': return timestamp < threshold elif operator == '<=': return timestamp <= threshold else: return False except (ValueError, TypeError): return False async def make_ambari_request(endpoint: str, method: str = "GET", data: Optional[Dict] = None) -> Dict: """ Sends HTTP requests to Ambari API. Args: endpoint: API endpoint (e.g., "/clusters/c1/services") method: HTTP method (default: "GET") data: Request payload for PUT/POST requests Returns: API response data (JSON format) or {"error": "error_message"} on error """ start = time.monotonic() logger.debug(f"AMBARI_REQ start method={method} endpoint={endpoint} payload_keys={list(data.keys()) if data else []}") try: auth_string = f"{AMBARI_USER}:{AMBARI_PASS}" auth_bytes = auth_string.encode('ascii') auth_b64 = b64encode(auth_bytes).decode('ascii') headers = { 'Authorization': f'Basic {auth_b64}', 'Content-Type': 'application/json' } url = f"{AMBARI_API_BASE_URL}{endpoint}" async with aiohttp.ClientSession() as session: kwargs = {'headers': headers} if data: kwargs['data'] = json.dumps(data) async with session.request(method, url, **kwargs) as response: elapsed = (time.monotonic() - start) * 1000 if response.status in [200, 202]: # Accept both OK and Accepted try: js = await response.json() except Exception as je: text_body = await response.text() logger.warning(f"AMBARI_REQ json-parse-fallback status={response.status} took={elapsed:.1f}ms endpoint={endpoint} err={je}") return {"error": f"JSON_PARSE: {je}", "raw": text_body} size_hint = len(str(js)) if js else 0 logger.debug(f"AMBARI_REQ success status={response.status} took={elapsed:.1f}ms size={size_hint}") return js else: error_text = await response.text() logger.warning(f"AMBARI_REQ http-error status={response.status} took={elapsed:.1f}ms endpoint={endpoint} body_len={len(error_text)}") return {"error": f"HTTP {response.status}: {error_text}"} except Exception as e: elapsed = (time.monotonic() - start) * 1000 logger.exception(f"AMBARI_REQ exception took={elapsed:.1f}ms endpoint={endpoint}") return {"error": f"Request failed: {str(e)}"} def _normalize_epoch_ms(value: float) -> int: """Normalize a timestamp (seconds or milliseconds) to milliseconds.""" try: ts = float(value) except (TypeError, ValueError): raise ValueError("Invalid timestamp value") if ts >= 1_000_000_000_000: return int(ts) return int(ts * 1000) def parse_epoch_millis(value: Optional[str]) -> Optional[int]: """Parse various timestamp formats into epoch milliseconds.""" if value is None: return None raw = str(value).strip() if not raw: return None if re.fullmatch(r"[-+]?\d+(?:\.\d+)?", raw): try: num = float(raw) except ValueError: return None return _normalize_epoch_ms(num) iso_candidate = raw if iso_candidate.endswith('Z'): iso_candidate = iso_candidate[:-1] + '+00:00' try: dt = datetime.datetime.fromisoformat(iso_candidate) except ValueError: try: dt = datetime.datetime.strptime(raw, "%Y-%m-%d %H:%M:%S") dt = dt.replace(tzinfo=datetime.timezone.utc) except ValueError: return None if dt.tzinfo is None: dt = dt.replace(tzinfo=datetime.timezone.utc) return int(dt.timestamp() * 1000) _DURATION_PATTERN = re.compile(r"(\d+(?:\.\d+)?)\s*(milliseconds?|ms|seconds?|secs?|s|minutes?|mins?|m|hours?|hrs?|h|days?|d)") def parse_duration_to_millis(duration: Optional[str]) -> Optional[int]: """Parse human-friendly duration strings into milliseconds.""" if duration is None: return None text = duration.strip().lower() if not text: return None matches = list(_DURATION_PATTERN.finditer(text)) if not matches: try: numeric = float(text) except ValueError: return None return int(numeric * 1000) total_seconds = 0.0 for match in matches: amount = float(match.group(1)) unit = match.group(2) if unit.startswith('ms'): total_seconds += amount / 1000.0 elif unit.startswith('s') or unit.startswith('sec'): total_seconds += amount elif unit.startswith('m') and not unit.startswith('ms'): total_seconds += amount * 60 elif unit.startswith('h') or unit.startswith('hr'): total_seconds += amount * 3600 elif unit.startswith('d'): total_seconds += amount * 86400 return int(total_seconds * 1000) def resolve_metrics_time_range( duration: Optional[str], start_time: Optional[str], end_time: Optional[str], ) -> Tuple[Optional[int], Optional[int], str]: """Resolve start/end epoch milliseconds and describe the window.""" now_ms = int(datetime.datetime.now(datetime.timezone.utc).timestamp() * 1000) end_ms = parse_epoch_millis(end_time) if end_time else now_ms duration_ms = parse_duration_to_millis(duration) start_ms = parse_epoch_millis(start_time) if start_time else None if start_ms is None and duration_ms is not None: start_ms = end_ms - duration_ms if start_ms is None and end_time and duration_ms is None: start_ms = end_ms - 3600_000 if start_ms is None and duration_ms is None: start_ms = end_ms - 3600_000 if start_ms is not None and end_ms is not None and start_ms > end_ms: start_ms, end_ms = end_ms, start_ms desc_parts = [] if start_ms is not None: desc_parts.append(f"from {format_timestamp(start_ms)}") if end_ms is not None: desc_parts.append(f"to {format_timestamp(end_ms)}") if not desc_parts: desc_parts.append("time window not specified") return start_ms, end_ms, " ".join(desc_parts) def metrics_map_to_series(metrics_map: Dict[str, float]) -> List[Dict[str, float]]: """Convert AMS metrics dict to sorted timestamp/value pairs.""" if not metrics_map: return [] points: List[Dict[str, float]] = [] for raw_ts, raw_value in metrics_map.items(): ts_ms = parse_epoch_millis(raw_ts) if ts_ms is None: try: ts_ms = _normalize_epoch_ms(float(raw_ts)) except (TypeError, ValueError): continue try: value = float(raw_value) except (TypeError, ValueError): continue points.append({"timestamp": ts_ms, "value": value}) points.sort(key=lambda item: item["timestamp"]) return points def summarize_metric_series(points: List[Dict[str, float]]) -> Dict[str, float]: """Compute simple statistics for a metric series.""" if not points: return {} values = [float(p["value"]) for p in points] if not values: return {} first = values[0] last = values[-1] summary = { "count": len(values), "min": min(values), "max": max(values), "avg": sum(values) / len(values), "first": first, "last": last, "delta": last - first, "start_timestamp": points[0]["timestamp"], "end_timestamp": points[-1]["timestamp"], } summary["duration_ms"] = summary["end_timestamp"] - summary["start_timestamp"] if summary["count"] > 1 else 0 return summary async def make_ambari_metrics_request(endpoint: str, params: Optional[Dict[str, str]] = None, method: str = "GET") -> Dict: """Perform an HTTP request against the Ambari Metrics API.""" start = time.monotonic() param_preview = [] if params: for key, value in params.items(): if value is None: continue value_str = str(value) if len(value_str) > 80: value_str = value_str[:77] + '…' param_preview.append(f"{key}={value_str}") logger.debug( "AMS_REQ start method=%s endpoint=%s params=%s", method, endpoint, " ".join(param_preview), ) try: timeout = aiohttp.ClientTimeout(total=AMBARI_METRICS_TIMEOUT) url = f"{AMBARI_METRICS_BASE_URL}{endpoint}" async with aiohttp.ClientSession(timeout=timeout) as session: async with session.request(method, url, params=params) as response: elapsed = (time.monotonic() - start) * 1000 if response.status == 200: try: data = await response.json() except Exception as json_err: text_body = await response.text() logger.warning( "AMS_REQ json-parse-failure status=%s took=%.1fms endpoint=%s err=%s", response.status, elapsed, endpoint, json_err, ) return {"error": f"JSON_PARSE: {json_err}", "raw": text_body} logger.debug( "AMS_REQ success status=%s took=%.1fms size_hint=%s", response.status, elapsed, len(data) if hasattr(data, "__len__") else "NA", ) return data error_text = await response.text() logger.warning( "AMS_REQ http-error status=%s took=%.1fms endpoint=%s body_len=%s", response.status, elapsed, endpoint, len(error_text), ) return {"error": f"HTTP {response.status}: {error_text}"} except Exception as exc: elapsed = (time.monotonic() - start) * 1000 logger.exception("AMS_REQ exception took=%.1fms endpoint=%s", elapsed, endpoint) return {"error": f"Metrics request failed: {exc}"} def parse_metrics_metadata(response_obj: Any) -> List[Dict[str, Any]]: """Parse AMS metadata response into a flat list of metric dictionaries.""" if response_obj is None: return [] if isinstance(response_obj, dict): if response_obj.get("error"): return [] section = ( response_obj.get("metrics") or response_obj.get("Metrics") or response_obj.get("items") or response_obj.get("MetricsCollection") ) if section is None: section = response_obj elif isinstance(response_obj, list): section = response_obj else: return [] entries: List[Dict[str, Any]] = [] if isinstance(section, dict): for key, value in section.items(): if isinstance(value, list): for item in value: if not isinstance(item, dict): continue entry = dict(item) if isinstance(key, str): entry.setdefault("appid", key) entry.setdefault("appId", key) entries.append(entry) continue if not isinstance(value, dict): continue # Direct metric metadata dictionary. if any(field in value for field in ("metricname", "metricName", "metric_name")): entry = dict(value) metric_name = ( entry.get("metricname") or entry.get("metricName") or entry.get("metric_name") ) if metric_name is None and isinstance(key, str): entry["metricname"] = key entries.append(entry) continue # Nested structure such as {"metrics": {...}}; flatten individual metrics. nested_metrics = value.get("metrics") if isinstance(value, dict) else None if isinstance(nested_metrics, dict): for metric_name, meta in nested_metrics.items(): if not isinstance(meta, dict): continue entry = dict(meta) entry.setdefault("metricname", metric_name) if isinstance(key, str): entry.setdefault("appid", key) entry.setdefault("appId", key) entries.append(entry) continue entry = dict(value) if isinstance(key, str) and not entry.get("metricname"): entry["metricname"] = key entries.append(entry) elif isinstance(section, list): for item in section: if isinstance(item, dict): entries.append(dict(item)) return entries def _metadata_cache_key(app_id: Optional[str]) -> str: return (app_id or "__all__").strip().lower() async def get_metrics_metadata(app_id: Optional[str] = None, use_cache: bool = True) -> List[Dict[str, Any]]: """Return metric metadata for a specific appId (or all apps if None).""" cache_key = _metadata_cache_key(app_id) now = time.monotonic() if use_cache: cached = _METRICS_METADATA_CACHE.get(cache_key) if cached and now - cached.get("timestamp", 0) < AMBARI_METRICS_METADATA_TTL: return cached.get("entries", []) params: Dict[str, str] = {} if app_id: params["appId"] = app_id response = await make_ambari_metrics_request("/metrics/metadata", params=params or None) entries = parse_metrics_metadata(response) provided_app = app_id.strip() if isinstance(app_id, str) and app_id.strip() else None for entry in entries: app_hint = ( entry.get("appid") or entry.get("appId") or entry.get("application") or provided_app ) if app_hint: app_str = str(app_hint) entry["appid"] = app_str entry["appId"] = app_str metric_hint = ( entry.get("metricname") or entry.get("metricName") or entry.get("metric_name") ) if metric_hint: entry["metricname"] = str(metric_hint) _METRICS_METADATA_CACHE[cache_key] = { "timestamp": now, "entries": entries, } return entries def _normalize_app_key(app_id: str) -> str: return app_id.strip().lower() def _is_excluded_app(app_id: Optional[str]) -> bool: if not app_id: return False return app_id.strip().lower() in EXCLUDED_APP_IDS async def ensure_metric_catalog(use_cache: bool = True) -> Tuple[Dict[str, List[str]], Dict[str, str]]: """Return (catalog, lookup) built from AMS metadata, refreshing when cache expires.""" global _DYNAMIC_CATALOG_CACHE now = time.monotonic() cached_catalog = _DYNAMIC_CATALOG_CACHE.get("catalog") or {} cached_lookup = _DYNAMIC_CATALOG_CACHE.get("lookup") or {} cached_timestamp = float(_DYNAMIC_CATALOG_CACHE.get("timestamp", 0.0)) if use_cache and cached_catalog and now - cached_timestamp < AMBARI_METRICS_METADATA_TTL: return cached_catalog, cached_lookup entries = await get_metrics_metadata(None, use_cache=use_cache) # Fallback: probe common apps individually if the bulk metadata query returns nothing. if not entries: aggregated: List[Dict[str, Any]] = [] for fallback_app in CURATED_METRIC_APP_IDS: if _is_excluded_app(fallback_app): continue fallback_entries = await get_metrics_metadata(fallback_app, use_cache=use_cache) if fallback_entries: aggregated.extend(fallback_entries) entries = aggregated metrics_by_app: Dict[str, Set[str]] = {} lookup: Dict[str, str] = {} for entry in entries: app_hint = entry.get("appid") or entry.get("appId") or entry.get("application") metric_hint = entry.get("metricname") or entry.get("metricName") or entry.get("metric_name") if not app_hint or not metric_hint: continue app_name = str(app_hint).strip() metric_name = str(metric_hint).strip() if not app_name or not metric_name: continue if _is_excluded_app(app_name): continue metrics_by_app.setdefault(app_name, set()).add(metric_name) lookup.setdefault(_normalize_app_key(app_name), app_name) catalog = {app: sorted(values) for app, values in metrics_by_app.items()} # Ensure canonical entries exist even when AMS metadata is sparse. for canonical in APP_SYNONYMS.keys(): if _is_excluded_app(canonical): continue lower = _normalize_app_key(canonical) lookup.setdefault(lower, canonical) catalog.setdefault(canonical, []) for excluded in list(catalog.keys()): if _is_excluded_app(excluded): catalog.pop(excluded, None) lookup.pop(_normalize_app_key(excluded), None) _DYNAMIC_CATALOG_CACHE = { "timestamp": now, "catalog": catalog, "lookup": lookup, } return catalog, lookup def canonicalize_app_id(app_id: Optional[str], lookup: Optional[Dict[str, str]] = None) -> Optional[str]: """Resolve user-provided appId into an AMS-recognised identifier.""" if not app_id or not isinstance(app_id, str): return None normalized = app_id.strip() if not normalized: return None lowered = normalized.lower() if lookup and lowered in lookup: candidate = lookup[lowered] if _is_excluded_app(candidate): return None return candidate for canonical, synonyms in APP_SYNONYMS.items(): candidates = {canonical.lower(), *(syn.lower() for syn in synonyms)} if lowered in candidates: if lookup and canonical.lower() in lookup: resolved = lookup[canonical.lower()] if _is_excluded_app(resolved): return None return resolved if _is_excluded_app(canonical): return None return canonical if _is_excluded_app(normalized): return None return normalized async def get_metric_catalog(use_cache: bool = True) -> Dict[str, List[str]]: catalog, _ = await ensure_metric_catalog(use_cache=use_cache) return catalog async def get_available_app_ids(use_cache: bool = True) -> List[str]: catalog, _ = await ensure_metric_catalog(use_cache=use_cache) return sorted(catalog.keys()) async def get_metrics_for_app(app_id: Optional[str], use_cache: bool = True) -> List[str]: catalog, lookup = await ensure_metric_catalog(use_cache=use_cache) if not app_id: return [] resolved = canonicalize_app_id(app_id, lookup) if not resolved: return [] # Prefer resolved key from lookup when available. lower = resolved.lower() if lookup and lower in lookup: resolved = lookup[lower] return catalog.get(resolved, []) async def metric_supported_for_app(app_id: Optional[str], metric_name: Optional[str], use_cache: bool = True) -> bool: if not metric_name: return False metrics = await get_metrics_for_app(app_id, use_cache=use_cache) return metric_name in metrics async def collect_metadata_entries( app_ids: Optional[Iterable[str]] = None, prefer_app: Optional[str] = None, use_cache: bool = True, ) -> List[Dict[str, Any]]: """Gather metadata entries for given app IDs (defaults to curated list).""" entries: List[Dict[str, Any]] = [] seen_keys = set() candidate_app_ids: List[str] = [] candidate_seen = set() if app_ids: for app in app_ids: if not app: continue normalized = app.strip() if not normalized or normalized.lower() in candidate_seen: continue if _is_excluded_app(normalized): continue candidate_seen.add(normalized.lower()) candidate_app_ids.append(normalized) elif prefer_app: normalized = prefer_app.strip() if normalized and normalized.lower() not in candidate_seen: candidate_seen.add(normalized.lower()) if not _is_excluded_app(normalized): candidate_app_ids.append(normalized) if not candidate_app_ids: catalog, _ = await ensure_metric_catalog(use_cache=use_cache) for app_name in catalog.keys(): lowered = app_name.lower() if lowered in candidate_seen: continue if _is_excluded_app(app_name): continue candidate_seen.add(lowered) candidate_app_ids.append(app_name) if not candidate_app_ids: for curated_app in CURATED_METRIC_APP_IDS: if curated_app.lower() in candidate_seen: continue if _is_excluded_app(curated_app): continue candidate_seen.add(curated_app.lower()) candidate_app_ids.append(curated_app) for candidate in candidate_app_ids: normalized = candidate.strip() if isinstance(candidate, str) else "" if not normalized: continue if _is_excluded_app(normalized): continue metadata = await get_metrics_metadata(normalized, use_cache=use_cache) for entry in metadata: metric_name = str(entry.get("metricname") or entry.get("metricName") or entry.get("name") or "") app_name = str(entry.get("appid") or entry.get("appId") or entry.get("application") or "") if _is_excluded_app(app_name): continue cache_key = (metric_name.lower(), app_name.lower()) if cache_key in seen_keys: continue seen_keys.add(cache_key) entries.append(entry) return entries def tokenize_metric_queries(queries: Iterable[str]) -> List[str]: """Tokenize user-provided metric strings into normalized search tokens.""" tokens: List[str] = [] seen = set() for raw in queries: if not raw: continue lowered = str(raw).strip().lower() if not lowered: continue for candidate in { lowered, lowered.replace('.', ' ').replace(':', ' ').replace('/', ' '), lowered.replace('.', ''), lowered.replace(':', ''), lowered.replace('/', ''), }: for token in _TOKEN_SPLIT_RE.split(candidate): if not token: continue if token not in seen: seen.add(token) tokens.append(token) return tokens def score_metric_entry( entry: Dict[str, Any], query_tokens: Iterable[str], prefer_app: Optional[str] = None, ) -> int: """Return a heuristic score describing how well a metadata entry matches tokens.""" metric_name = str(entry.get("metricname") or entry.get("metricName") or entry.get("name") or "") if not metric_name: return 0 metric_lower = metric_name.lower() compact = re.sub(r"[^a-z0-9]", "", metric_lower) segments = [seg for seg in _TOKEN_SPLIT_RE.split(metric_lower) if seg] score = 0 token_list = list(query_tokens) if not token_list: return 0 for token in token_list: if token == metric_lower: score += 60 elif metric_lower.endswith(token): score += 18 elif token in metric_lower: score += 10 elif token in segments: score += 6 elif token in compact: score += 5 description = str(entry.get("description") or entry.get("desc") or "").lower() if description: for token in token_list: if token in description: score += 3 units = str(entry.get("units") or entry.get("unit") or "").lower() if units: for token in token_list: if token in units: score += 2 entry_app = str(entry.get("appid") or entry.get("appId") or entry.get("application") or "") if prefer_app and entry_app: pref_lower = prefer_app.lower() app_lower = entry_app.lower() if app_lower == pref_lower: score += 8 elif pref_lower in app_lower: score += 4 if len(metric_name) <= 20: score += 1 return score def build_metric_suggestions( query_tokens: Iterable[str], metadata_entries: Iterable[Dict[str, Any]], prefer_app: Optional[str] = None, limit: int = 8, min_score: int = 6, ) -> List[Dict[str, Any]]: """Return best matching metric metadata entries for given tokens.""" suggestions: List[Dict[str, Any]] = [] seen = set() for entry in metadata_entries: metric_name = str(entry.get("metricname") or entry.get("metricName") or entry.get("name") or "") app_name = str(entry.get("appid") or entry.get("appId") or entry.get("application") or "") cache_key = (metric_name.lower(), app_name.lower()) if cache_key in seen: continue score = score_metric_entry(entry, query_tokens, prefer_app=prefer_app) if score < min_score: continue record = { "metricname": metric_name, "appid": app_name, "score": score, "units": entry.get("units") or entry.get("unit"), "description": entry.get("description") or entry.get("desc"), } suggestions.append(record) seen.add(cache_key) suggestions.sort(key=lambda item: (-item["score"], item["metricname"])) return suggestions[:limit] def infer_precision_for_window(duration_ms: Optional[int]) -> Optional[str]: """Infer AMS precision parameter from duration (milliseconds).""" if not duration_ms or duration_ms <= 0: return None for boundary, precision in PRECISION_RULES_MS: if duration_ms <= boundary: return precision return None async def fetch_metric_series( metric_name: str, app_id: Optional[str] = None, hostnames: Optional[str] = None, duration_ms: int = 10 * 60 * 1000, ) -> List[Dict[str, float]]: """Fetch a time-series for the given metric within the specified lookback window.""" now_ms = int(datetime.datetime.now(datetime.timezone.utc).timestamp() * 1000) start_ms = max(0, now_ms - duration_ms) _, lookup = await ensure_metric_catalog() canonical_app = canonicalize_app_id(app_id, lookup) params: Dict[str, Any] = { "metricNames": metric_name, "startTime": start_ms, "endTime": now_ms, } if canonical_app: params["appId"] = canonical_app if hostnames: params["hostname"] = hostnames response = await make_ambari_metrics_request("/metrics", params=params) if response is None or isinstance(response, dict) and response.get("error"): return [] # fallback to empty series metrics_section = [] if isinstance(response, dict): metrics_section = response.get("metrics") or response.get("Metrics") or [] elif isinstance(response, list): metrics_section = response if not metrics_section: return [] series_container = metrics_section[0] if isinstance(metrics_section, list) else metrics_section if not isinstance(series_container, dict): return [] return metrics_map_to_series(series_container.get("metrics", {})) async def fetch_latest_metric_value( metric_name: str, app_id: Optional[str] = None, hostnames: Optional[str] = None, duration_ms: int = 10 * 60 * 1000, ) -> Optional[float]: """Fetch the latest datapoint for a metric within the lookback window.""" series = await fetch_metric_series(metric_name, app_id=app_id, hostnames=hostnames, duration_ms=duration_ms) if not series: return None return series[-1]["value"] async def get_component_hostnames( component_name: str, cluster_name: Optional[str] = None, ) -> List[str]: """Return sorted hostnames that run the specified Ambari component.""" if not component_name: return [] target_cluster = cluster_name or AMBARI_CLUSTER_NAME endpoint = ( f"/clusters/{target_cluster}/hosts" "?fields=Hosts/host_name,Hosts/public_host_name,Hosts/ip," "host_components/HostRoles/component_name" ) try: response = await make_ambari_request(endpoint) except Exception: logger.exception("Failed to retrieve hosts for component %s", component_name) return [] if not response or response.get("error"): return [] hostnames: List[str] = [] for item in response.get("items", []): if not isinstance(item, dict): continue host_components = item.get("host_components", []) or [] component_matches = False for host_component in host_components: roles = host_component.get("HostRoles") if isinstance(host_component, dict) else None role_name = roles.get("component_name") if isinstance(roles, dict) else None if role_name and role_name.upper() == component_name.upper(): component_matches = True break if not component_matches: continue host_info = item.get("Hosts", {}) if isinstance(item, dict) else {} host_name = host_info.get("host_name") or host_info.get("public_host_name") or host_info.get("ip") if host_name and host_name not in hostnames: hostnames.append(host_name) hostnames.sort() return hostnames async def format_single_host_details(host_name: str, cluster_name: str, show_header: bool = True) -> str: """ Format detailed information for a single host. Args: host_name: Name of the host to retrieve details for cluster_name: Name of the cluster show_header: Whether to show the host header information Returns: Formatted host details string """ try: # Include host component states and service names in the request endpoint = f"/clusters/{cluster_name}/hosts/{host_name}?fields=Hosts,host_components/HostRoles/state,host_components/HostRoles/service_name,host_components/HostRoles/component_name,host_components/HostRoles/actual_configs,metrics,alerts_summary,kerberos_identities" response_data = await make_ambari_request(endpoint) if response_data is None or "error" in response_data: return f"Error: Unable to retrieve details for host '{host_name}' in cluster '{cluster_name}'." host_info = response_data.get("Hosts", {}) host_components = response_data.get("host_components", []) metrics = response_data.get("metrics", {}) result_lines = [] if show_header: result_lines.extend([ f"Host Details for '{host_name}':", "=" * 50 ]) # Basic host information result_lines.append(f"Host Name: {host_info.get('host_name', host_name)}") result_lines.append(f"Cluster: {host_info.get('cluster_name', cluster_name)}") result_lines.append(f"Host State: {host_info.get('host_state', 'Unknown')}") result_lines.append(f"Host Status: {host_info.get('host_status', 'Unknown')}") result_lines.append(f"Public Host Name: {host_info.get('public_host_name', 'N/A')}") result_lines.append(f"IP Address: {host_info.get('ip', 'N/A')}") result_lines.append(f"Maintenance State: {host_info.get('maintenance_state', 'N/A')}") result_lines.append(f"OS Type: {host_info.get('os_type', 'N/A')}") result_lines.append(f"OS Family: {host_info.get('os_family', 'N/A')}") result_lines.append(f"OS Architecture: {host_info.get('os_arch', 'N/A')}") result_lines.append(f"Rack Info: {host_info.get('rack_info', 'N/A')}") result_lines.append("") # Timing and status information result_lines.append("Status Information:") last_heartbeat = host_info.get('last_heartbeat_time', 0) last_registration = host_info.get('last_registration_time', 0) if last_heartbeat: result_lines.append(f" Last Heartbeat: {format_timestamp(last_heartbeat)}") if last_registration: result_lines.append(f" Last Registration: {format_timestamp(last_registration)}") # Health report health_report = host_info.get('host_health_report', '') if health_report: result_lines.append(f" Health Report: {health_report}") else: result_lines.append(f" Health Report: No issues reported") # Recovery information recovery_summary = host_info.get('recovery_summary', 'N/A') recovery_report = host_info.get('recovery_report', {}) result_lines.append(f" Recovery Status: {recovery_summary}") if recovery_report: component_reports = recovery_report.get('component_reports', []) result_lines.append(f" Recovery Components: {len(component_reports)} components") result_lines.append("") # Agent environment information last_agent_env = host_info.get('last_agent_env', {}) if last_agent_env: result_lines.append("Agent Environment:") # Host health from agent host_health = last_agent_env.get('hostHealth', {}) if host_health: live_services = host_health.get('liveServices', []) active_java_procs = host_health.get('activeJavaProcs', []) agent_timestamp = host_health.get('agentTimeStampAtReporting', 0) server_timestamp = host_health.get('serverTimeStampAtReporting', 0) result_lines.append(f" Live Services: {len(live_services)}") for service in live_services[:5]: # Show first 5 services svc_name = service.get('name', 'Unknown') svc_status = service.get('status', 'Unknown') svc_desc = service.get('desc', '') result_lines.append(f" - {svc_name}: {svc_status} {svc_desc}".strip()) if len(live_services) > 5: result_lines.append(f" ... and {len(live_services) - 5} more services") result_lines.append(f" Active Java Processes: {len(active_java_procs)}") if agent_timestamp: result_lines.append(f" Agent Timestamp: {format_timestamp(agent_timestamp)}") if server_timestamp: result_lines.append(f" Server Timestamp: {format_timestamp(server_timestamp)}") # System information umask = last_agent_env.get('umask', 'N/A') firewall_running = last_agent_env.get('firewallRunning', False) firewall_name = last_agent_env.get('firewallName', 'N/A') has_unlimited_jce = last_agent_env.get('hasUnlimitedJcePolicy', False) reverse_lookup = last_agent_env.get('reverseLookup', False) transparent_huge_page = last_agent_env.get('transparentHugePage', '') result_lines.append(f" Umask: {umask}") result_lines.append(f" Firewall: {firewall_name} ({'Running' if firewall_running else 'Stopped'})") result_lines.append(f" JCE Policy: {'Unlimited' if has_unlimited_jce else 'Limited'}") result_lines.append(f" Reverse Lookup: {'Enabled' if reverse_lookup else 'Disabled'}") if transparent_huge_page: result_lines.append(f" Transparent Huge Page: {transparent_huge_page}") # Package and repository information installed_packages = last_agent_env.get('installedPackages', []) existing_repos = last_agent_env.get('existingRepos', []) existing_users = last_agent_env.get('existingUsers', []) alternatives = last_agent_env.get('alternatives', []) stack_folders = last_agent_env.get('stackFoldersAndFiles', []) result_lines.append(f" Installed Packages: {len(installed_packages)}") result_lines.append(f" Existing Repositories: {len(existing_repos)}") result_lines.append(f" Existing Users: {len(existing_users)}") result_lines.append(f" Alternatives: {len(alternatives)}") result_lines.append(f" Stack Folders: {len(stack_folders)}") result_lines.append("") # Alerts Summary alerts_summary = response_data.get('alerts_summary', {}) if alerts_summary: result_lines.append("Alerts Summary:") critical = alerts_summary.get('CRITICAL', 0) warning = alerts_summary.get('WARNING', 0) ok = alerts_summary.get('OK', 0) unknown = alerts_summary.get('UNKNOWN', 0) maintenance = alerts_summary.get('MAINTENANCE', 0) total_alerts = critical + warning + ok + unknown + maintenance result_lines.append(f" Total Alerts: {total_alerts}") result_lines.append(f" Critical: {critical}") result_lines.append(f" Warning: {warning}") result_lines.append(f" OK: {ok}") result_lines.append(f" Unknown: {unknown}") result_lines.append(f" Maintenance: {maintenance}") result_lines.append("") # Performance Metrics if metrics: result_lines.append("Performance Metrics:") # Boot time boottime = metrics.get('boottime', 0) if boottime: boot_dt = datetime.datetime.fromtimestamp(boottime/1000, tz=datetime.timezone.utc) result_lines.append(f" Boot Time: {boottime} ({boot_dt.strftime('%Y-%m-%d %H:%M:%S UTC')})") # Hardware information (CPU and Memory from metrics) cpu_metrics = metrics.get('cpu', {}) if cpu_metrics: cpu_count = cpu_metrics.get('cpu_num', host_info.get('cpu_count', 'N/A')) ph_cpu_count = host_info.get('ph_cpu_count', 'N/A') result_lines.append(f" CPU Count: {cpu_count} (Physical: {ph_cpu_count})") result_lines.append(" CPU Usage:") result_lines.append(f" Idle: {cpu_metrics.get('cpu_idle', 0)}%") result_lines.append(f" User: {cpu_metrics.get('cpu_user', 0)}%") result_lines.append(f" System: {cpu_metrics.get('cpu_system', 0)}%") result_lines.append(f" Nice: {cpu_metrics.get('cpu_nice', 0)}%") result_lines.append(f" I/O Wait: {cpu_metrics.get('cpu_wio', 0)}%") # Memory metrics memory_metrics = metrics.get('memory', {}) if memory_metrics: mem_total = memory_metrics.get('mem_total', 0) mem_free = memory_metrics.get('mem_free', 0) mem_cached = memory_metrics.get('mem_cached', 0) mem_shared = memory_metrics.get('mem_shared', 0) swap_total = memory_metrics.get('swap_total', 0) swap_free = memory_metrics.get('swap_free', 0) mem_used = mem_total - mem_free swap_used = swap_total - swap_free result_lines.append(" Memory Usage:") result_lines.append(f" Total: {mem_total/1024/1024:.1f} GB") result_lines.append(f" Used: {mem_used/1024/1024:.1f} GB ({(mem_used/mem_total)*100:.1f}%)") result_lines.append(f" Free: {mem_free/1024/1024:.1f} GB") result_lines.append(f" Cached: {mem_cached/1024/1024:.1f} GB") if mem_shared > 0: result_lines.append(f" Shared: {mem_shared/1024/1024:.1f} GB") result_lines.append(f" Swap Total: {swap_total/1024/1024:.1f} GB") result_lines.append(f" Swap Used: {swap_used/1024/1024:.1f} GB ({(swap_used/swap_total)*100 if swap_total > 0 else 0:.1f}%)") # Load average load_metrics = metrics.get('load', {}) if load_metrics: result_lines.append(" Load Average:") result_lines.append(f" 1 minute: {load_metrics.get('load_one', 0)}") result_lines.append(f" 5 minutes: {load_metrics.get('load_five', 0)}") result_lines.append(f" 15 minutes: {load_metrics.get('load_fifteen', 0)}") # Disk metrics and detailed disk information combined disk_metrics = metrics.get('disk', {}) disk_info = host_info.get('disk_info', []) if disk_metrics or disk_info: result_lines.append(" Disk Information:") # Show I/O metrics if available if disk_metrics: disk_total = disk_metrics.get('disk_total', 0) disk_free = disk_metrics.get('disk_free', 0) read_bytes = disk_metrics.get('read_bytes', 0) write_bytes = disk_metrics.get('write_bytes', 0) read_count = disk_metrics.get('read_count', 0) write_count = disk_metrics.get('write_count', 0) result_lines.append(f" Total Space: {disk_total:.1f} GB") result_lines.append(f" Free Space: {disk_free:.1f} GB") result_lines.append(f" Used Space: {disk_total - disk_free:.1f} GB ({((disk_total - disk_free)/disk_total)*100:.1f}%)") result_lines.append(f" Read: {read_bytes/1024/1024/1024:.2f} GB ({read_count:,.0f} operations)") result_lines.append(f" Write: {write_bytes/1024/1024/1024:.2f} GB ({write_count:,.0f} operations)") # Show detailed disk info if available if disk_info: result_lines.append(f" Disk Details ({len(disk_info)} disks):") total_size = 0 total_used = 0 total_available = 0 for i, disk in enumerate(disk_info, 1): size = int(disk.get('size', 0)) if disk.get('size', '0').isdigit() else 0 used = int(disk.get('used', 0)) if disk.get('used', '0').isdigit() else 0 available = int(disk.get('available', 0)) if disk.get('available', '0').isdigit() else 0 total_size += size total_used += used total_available += available result_lines.append(f" Disk {i} ({disk.get('device', 'Unknown')}): {disk.get('mountpoint', 'N/A')}") result_lines.append(f" Size: {size/1024/1024:.1f} GB, Used: {used/1024/1024:.1f} GB ({disk.get('percent', 'N/A')})") # Summary only if multiple disks if len(disk_info) > 1: result_lines.append(f" Total Summary: {total_size/1024/1024:.1f} GB total, {total_used/1024/1024:.1f} GB used") # Network metrics network_metrics = metrics.get('network', {}) if network_metrics: result_lines.append(" Network I/O:") result_lines.append(f" Bytes In: {network_metrics.get('bytes_in', 0):.2f} KB/s") result_lines.append(f" Bytes Out: {network_metrics.get('bytes_out', 0):.2f} KB/s") result_lines.append(f" Packets In: {network_metrics.get('pkts_in', 0):.2f} pkt/s") result_lines.append(f" Packets Out: {network_metrics.get('pkts_out', 0):.2f} pkt/s") # Process metrics process_metrics = metrics.get('process', {}) if process_metrics: result_lines.append(" Process Information:") result_lines.append(f" Total Processes: {process_metrics.get('proc_total', 0)}") result_lines.append(f" Running Processes: {process_metrics.get('proc_run', 0)}") result_lines.append("") else: # Fallback to basic hardware info if no metrics available cpu_count = host_info.get('cpu_count', 'N/A') ph_cpu_count = host_info.get('ph_cpu_count', 'N/A') total_mem_kb = host_info.get('total_mem', 0) if cpu_count != 'N/A' or total_mem_kb > 0: result_lines.append("Hardware Information:") if cpu_count != 'N/A': result_lines.append(f" CPU Count: {cpu_count} (Physical: {ph_cpu_count})") if total_mem_kb > 0: total_mem_gb = total_mem_kb / 1024 / 1024 result_lines.append(f" Total Memory: {total_mem_gb:.1f} GB ({total_mem_kb} KB)") result_lines.append("") # Host components if host_components: result_lines.append(f"Host Components ({len(host_components)} components):") # Group components by service for better organization components_by_service = {} for component in host_components: host_roles = component.get("HostRoles", {}) comp_name = host_roles.get("component_name", "Unknown") service_name = host_roles.get("service_name", "Unknown") comp_state = host_roles.get("state", "Unknown") actual_configs = host_roles.get("actual_configs", {}) if service_name not in components_by_service: components_by_service[service_name] = [] components_by_service[service_name].append({ "name": comp_name, "state": comp_state, "configs": len(actual_configs), "href": component.get("href", "") }) for service_name, components in components_by_service.items(): result_lines.append(f" Service: {service_name}") for comp in components: state_indicator = "[STARTED]" if comp["state"] == "STARTED" else "[STOPPED]" if comp["state"] in ["INSTALLED", "STOPPED"] else "[UNKNOWN]" result_lines.append(f" {comp['name']} {state_indicator}") if comp["configs"] > 0: result_lines.append(f" Configurations: {comp['configs']} config types") result_lines.append(f" API: {comp['href']}") result_lines.append("") # Summary by state states = {} for component in host_components: state = component.get("HostRoles", {}).get("state", "Unknown") states[state] = states.get(state, 0) + 1 result_lines.append(" Component State Summary:") for state, count in states.items(): result_lines.append(f" {state}: {count} components") result_lines.append("") else: result_lines.append("Host Components: None assigned") result_lines.append("") # Kerberos Information kerberos_identities = response_data.get('kerberos_identities', []) if kerberos_identities: result_lines.append("Kerberos Information:") result_lines.append(f" Identities: {len(kerberos_identities)} configured") for i, identity in enumerate(kerberos_identities[:3], 1): # Show first 3 result_lines.append(f" {i}. {identity}") if len(kerberos_identities) > 3: result_lines.append(f" ... and {len(kerberos_identities) - 3} more identities") result_lines.append("") else: result_lines.append("Kerberos: No identities configured") result_lines.append("") if show_header: result_lines.append(f"API Endpoint: {response_data.get('href', 'Not available')}") return "\n".join(result_lines) except Exception as e: return f"Error: Exception occurred while retrieving host details for '{host_name}' - {str(e)}" # ----------------------------------------------------------------------------- # Alert formatting utility functions # ----------------------------------------------------------------------------- def format_alerts_output(items, mode, cluster, format_type, host_name, service_name, state_filter, **kwargs): """ Unified alert output formatting function for both current alerts and alert history. Args: items: List of alert items from API response mode: "current" or "history" cluster: Cluster name format_type: "detailed", "summary", "compact", etc. host_name: Optional host name filter service_name: Optional service name filter state_filter: Optional state filter **kwargs: Additional parameters (limit, from_timestamp, to_timestamp, etc.) Returns: Formatted string output """ if not items: scope_desc = f"host '{host_name}'" if host_name else f"service '{service_name}'" if service_name else f"cluster '{cluster}'" return f"No {mode} alerts found for {scope_desc}" # Common header scope = f"host '{host_name}'" if host_name else f"service '{service_name}'" if service_name else f"cluster '{cluster}'" title = "Current Alerts" if mode == "current" else "Alert History" result_lines = [ f"{title} for {scope}", "=" * 60, f"Found {len(items)} alerts" ] # Add filter information if any filters = [] if state_filter: filters.append(f"State: {state_filter}") if kwargs.get('definition_name'): filters.append(f"Definition: {kwargs['definition_name']}") if mode == "current" and kwargs.get('maintenance_state'): filters.append(f"Maintenance: {kwargs['maintenance_state']}") if mode == "history": if kwargs.get('from_timestamp'): filters.append(f"From: {format_timestamp(kwargs['from_timestamp'])}") if kwargs.get('to_timestamp'): filters.append(f"To: {format_timestamp(kwargs['to_timestamp'])}") if filters: result_lines.append(f"Filters: {', '.join(filters)}") result_lines.append("") # Mode-specific field mapping field_prefix = "Alert" if mode == "current" else "AlertHistory" timestamp_field = "latest_timestamp" if mode == "current" else "timestamp" # Format based on type if format_type == "summary": result_lines.extend(format_alerts_summary(items, mode, field_prefix)) elif format_type == "compact": result_lines.extend(format_alerts_compact(items, field_prefix, timestamp_field, mode, kwargs.get('limit'))) else: # detailed result_lines.extend(format_alerts_detailed(items, field_prefix, timestamp_field, mode, kwargs.get('limit'))) return "\n".join(result_lines) def format_alerts_summary(items, mode, field_prefix): """Format alerts in summary mode - grouped by state and definition.""" result_lines = [] # Group by state and definition state_counts = {} definition_counts = {} service_counts = {} for item in items: alert_data = item.get(field_prefix, {}) state = alert_data.get("state", "UNKNOWN") definition = alert_data.get("definition_name", "Unknown") service = alert_data.get("service_name", "Unknown") state_counts[state] = state_counts.get(state, 0) + 1 definition_counts[definition] = definition_counts.get(definition, 0) + 1 service_counts[service] = service_counts.get(service, 0) + 1 result_lines.append("Summary by State:") for state in ["CRITICAL", "WARNING", "OK", "UNKNOWN"]: count = state_counts.get(state, 0) if count > 0: result_lines.append(f" {state}: {count}") result_lines.append("\nTop Alert Definitions:") sorted_definitions = sorted(definition_counts.items(), key=lambda x: x[1], reverse=True)[:10] for definition, count in sorted_definitions: result_lines.append(f" {definition}: {count}") result_lines.append("\nTop Services:") sorted_services = sorted(service_counts.items(), key=lambda x: x[1], reverse=True)[:10] for service, count in sorted_services: result_lines.append(f" {service}: {count}") return result_lines def format_alerts_compact(items, field_prefix, timestamp_field, mode, limit=None): """Format alerts in compact mode - one line per alert.""" result_lines = [] # Treat limit=0 as no limit if limit == 0: limit = None if mode == "current": result_lines.append("Current Alerts (compact):") result_lines.append("State | Maint | Service | Host | Definition") result_lines.append("-" * 85) for item in items: alert = item.get(field_prefix, {}) state = alert.get("state", "UNKNOWN") maintenance = alert.get("maintenance_state", "OFF") service = alert.get("service_name", "N/A") host = alert.get("host_name", "N/A") definition = alert.get("definition_name", "Unknown") state_padded = state.ljust(9) maint_padded = maintenance[:5].ljust(5) service_padded = service[:11].ljust(11) host_padded = host[:23].ljust(23) definition_short = definition[:25] + "..." if len(definition) > 25 else definition result_lines.append(f"{state_padded} | {maint_padded} | {service_padded} | {host_padded} | {definition_short}") else: # history result_lines.append("Alert History Entries (compact):") result_lines.append("Timestamp | State | Service | Host | Definition") result_lines.append("-" * 100) count = 0 for item in items: alert = item.get(field_prefix, {}) timestamp = alert.get(timestamp_field, 0) state = alert.get("state", "UNKNOWN") service = alert.get("service_name", "N/A") host = alert.get("host_name", "N/A") definition = alert.get("definition_name", "Unknown") time_formatted = format_timestamp(timestamp) time_str = time_formatted.split(" (")[1].rstrip(")") if " (" in time_formatted else time_formatted state_padded = state.ljust(9) service_padded = service[:11].ljust(11) host_padded = host[:23].ljust(23) definition_short = definition[:40] + "..." if len(definition) > 40 else definition result_lines.append(f"{time_str} | {state_padded} | {service_padded} | {host_padded} | {definition_short}") count += 1 if limit and count >= limit: result_lines.append(f"... (showing first {limit} entries)") break return result_lines def format_alerts_detailed(items, field_prefix, timestamp_field, mode, limit=None): """Format alerts in detailed mode - full information per alert.""" result_lines = [] # Treat limit=0 as no limit if limit == 0: limit = None if mode == "current": result_lines.append("Current Alerts (detailed):") result_lines.append("") # Group by state for better organization alerts_by_state = {"CRITICAL": [], "WARNING": [], "UNKNOWN": [], "OK": []} for item in items: alert = item.get(field_prefix, {}) state = alert.get("state", "UNKNOWN") if state not in alerts_by_state: alerts_by_state[state] = [] alerts_by_state[state].append(item) count = 0 for state in ["CRITICAL", "WARNING", "UNKNOWN", "OK"]: alerts = alerts_by_state.get(state, []) if not alerts: continue if count > 0: result_lines.append("") result_lines.append(f"=== {state} ALERTS ({len(alerts)}) ===") result_lines.append("") for item in alerts: alert = item.get(field_prefix, {}) count += 1 result_lines.extend(format_single_alert_detailed(alert, count, mode, timestamp_field)) result_lines.append("") if limit and count >= limit: result_lines.append(f"... (showing first {limit} entries)") break if limit and count >= limit: break else: # history result_lines.append("Alert History Entries (detailed):") result_lines.append("") count = 0 for item in items: alert = item.get(field_prefix, {}) count += 1 result_lines.extend(format_single_alert_detailed(alert, count, mode, timestamp_field)) result_lines.append("") if limit and count >= limit: result_lines.append(f"... (showing first {limit} entries)") break return result_lines def format_single_alert_detailed(alert, count, mode, timestamp_field): """Format a single alert in detailed view.""" result_lines = [] # Basic information alert_id = alert.get("id", "Unknown") state = alert.get("state", "UNKNOWN") definition_name = alert.get("definition_name", "Unknown") definition_id = alert.get("definition_id", "Unknown") service_name = alert.get("service_name", "Unknown") component_name = alert.get("component_name", "Unknown") host_name = alert.get("host_name", "N/A") label = alert.get("label", "No label") text = alert.get("text", "No description") instance = alert.get("instance", None) result_lines.extend([ f"[{count}] Alert ID: {alert_id}", f" State: {state}", f" Service: {service_name}", f" Component: {component_name}", f" Host: {host_name}", f" Definition: {definition_name} (ID: {definition_id})", f" Label: {label}", ]) if instance: result_lines.append(f" Instance: {instance}") # Mode-specific fields if mode == "current": maintenance_state = alert.get("maintenance_state", "OFF") scope = alert.get("scope", "Unknown") result_lines.append(f" Maintenance: {maintenance_state}") result_lines.append(f" Scope: {scope}") # Timestamps for current alerts latest_timestamp = alert.get("latest_timestamp", 0) original_timestamp = alert.get("original_timestamp", 0) if latest_timestamp: result_lines.append(f" Latest Update: {format_timestamp(latest_timestamp)}") if original_timestamp and original_timestamp != latest_timestamp: result_lines.append(f" First Occurrence: {format_timestamp(original_timestamp)}") else: # history timestamp = alert.get(timestamp_field, 0) if timestamp: result_lines.append(f" Timestamp: {format_timestamp(timestamp)}") # Format alert text if text: if "\n" in text: result_lines.append(" Text:") for line in text.split("\n"): result_lines.append(f" {line}") else: text_display = text if len(text) <= 100 else text[:97] + "..." result_lines.append(f" Text: {text_display}") return result_lines def get_current_time_context() -> str: """ Returns the current time context for accurate relative date calculations. This utility function provides current date and time information for reference in timestamp calculations. Returns: Current date/time context with calculation examples (success: formatted context info, failure: error message) """ try: current_time = datetime.datetime.now(datetime.timezone.utc) current_date_str = current_time.strftime('%Y-%m-%d') current_time_str = current_time.strftime('%Y-%m-%d %H:%M:%S UTC') current_time_ms = int(current_time.timestamp() * 1000) current_time_context = f""" CURRENT TIME CONTEXT FOR LLM CALCULATIONS: Current Date: {current_date_str} Current Time: {current_time_str} Current Timestamp (ms): {current_time_ms} Current Year: {current_time.year} Current Month: {current_time.month} Current Day: {current_time.day} INSTRUCTIONS FOR LLM: - Calculate your desired time range based on the current time above - Convert your calculated datetime to Unix epoch milliseconds (multiply by 1000) - Use the calculated timestamps in from_timestamp and to_timestamp parameters EXAMPLE CALCULATIONS: - "yesterday": Calculate start and end of {(current_time - timedelta(days=1)).strftime('%Y-%m-%d')} - "last week": Calculate 7 days ago ({(current_time - timedelta(days=7)).strftime('%Y-%m-%d')}) to yesterday - "last year": Calculate start and end of {current_time.year - 1} - "10 years ago": Calculate around {current_time.year - 10} - Any natural language time expression can be calculated from the current time above """ return current_time_context.strip() except Exception as e: return f"Error: Exception occurred while getting current time context - {str(e)}"

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/call518/MCP-Ambari-API'

If you have feedback or need assistance with the MCP directory API, please join our Discord server