Skip to main content
Glama
post_query_insights.py8.52 kB
"""Lightweight heuristics for deriving post-query insights from returned rows.""" from __future__ import annotations from collections import Counter from collections.abc import Sequence from datetime import date, datetime from decimal import Decimal from typing import Any MAX_SAMPLE_ROWS = 2_000 TOP_VALUES_LIMIT = 5 TIME_HINT_KEYWORDS = ("timestamp", "_ts", "_time", "time", "date", "_dt", "at_ts") def _normalize_row(row: Any, existing_columns: Sequence[str] | None) -> tuple[dict[str, Any], list[str] | None]: """Return a mapping representation of a row and inferred column names.""" if isinstance(row, dict): mapping = dict(row) inferred: list[str] | None = None if not existing_columns: inferred = list(mapping.keys()) return mapping, inferred if isinstance(row, (list, tuple)): names: list[str] names = list(existing_columns) if existing_columns else [f"column_{idx}" for idx in range(len(row))] mapping = {names[idx] if idx < len(names) else f"column_{idx}": value for idx, value in enumerate(row)} inferred = None if existing_columns else list(mapping.keys()) return mapping, inferred # Scalar fallback return {"value": row}, ([] if existing_columns else ["value"]) def _coerce_numeric(value: Any) -> float | None: if isinstance(value, bool): return None if isinstance(value, (int, float)): return float(value) if isinstance(value, Decimal): try: return float(value) except (OverflowError, ValueError): # Decimal too large or invalid for float conversion return None return None def _coerce_datetime(value: Any) -> datetime | None: if isinstance(value, datetime): return value if isinstance(value, date): return datetime.combine(value, datetime.min.time()) return None def _is_time_hint(column_name: str) -> bool: lower = column_name.lower() return any(hint in lower for hint in TIME_HINT_KEYWORDS) def _stringify(value: Any) -> str: if isinstance(value, str): return value if isinstance(value, (datetime, date)): return value.isoformat() return str(value) def _summarize_column( column: str, rows: list[dict[str, Any]], sample_size: int, ) -> dict[str, Any] | None: if sample_size == 0: return None non_null = 0 numeric_count = 0 numeric_sum = 0.0 numeric_min: float | None = None numeric_max: float | None = None time_count = 0 time_min: datetime | None = None time_max: datetime | None = None categorical_counter: Counter[str] = Counter() for row in rows: value = row.get(column) if value is None: continue non_null += 1 number = _coerce_numeric(value) if number is not None: numeric_count += 1 numeric_sum += number numeric_min = number if numeric_min is None else min(numeric_min, number) numeric_max = number if numeric_max is None else max(numeric_max, number) moment = _coerce_datetime(value) if moment is not None: time_count += 1 time_min = moment if time_min is None else min(time_min, moment) time_max = moment if time_max is None else max(time_max, moment) categorical_counter[_stringify(value)] += 1 if non_null == 0: return None non_null_ratio = round(non_null / sample_size, 3) preferred_kind = "categorical" kind_score = sum(categorical_counter.values()) if numeric_count >= max(time_count, kind_score) and numeric_count >= 2: preferred_kind = "numeric" kind_score = numeric_count if (_is_time_hint(column) and time_count > 0) or (time_count >= max(numeric_count, kind_score) and time_count >= 2): preferred_kind = "time" kind_score = time_count if preferred_kind == "numeric" and numeric_min is not None and numeric_max is not None: avg = numeric_sum / max(numeric_count, 1) return { "name": column, "kind": "numeric", "non_null_ratio": non_null_ratio, "min": round(numeric_min, 6), "max": round(numeric_max, 6), "avg": round(avg, 6), } if preferred_kind == "time" and time_min is not None and time_max is not None: span = (time_max - time_min).total_seconds() * 1000 return { "name": column, "kind": "time", "non_null_ratio": non_null_ratio, "min_ts": time_min.isoformat(), "max_ts": time_max.isoformat(), "span_ms": int(span), } if categorical_counter: total = sum(categorical_counter.values()) or 1 top_values = [] for value, count in categorical_counter.most_common(TOP_VALUES_LIMIT): top_values.append( { "value": value[:120], "count": count, "ratio": round(count / total, 3), } ) return { "name": column, "kind": "categorical", "non_null_ratio": non_null_ratio, "top_values": top_values, "distinct_values": len(categorical_counter), } return None def _compose_insights(key_metrics: dict[str, Any]) -> list[str]: insights: list[str] = [] total_rows = key_metrics.get("total_rows") sampled_rows = key_metrics.get("sampled_rows") num_columns = key_metrics.get("num_columns") truncated = key_metrics.get("truncated_output", False) if isinstance(total_rows, int) and isinstance(num_columns, int): if isinstance(sampled_rows, int) and truncated and sampled_rows < total_rows: insights.append(f"Analyzed first {sampled_rows:,} of {total_rows:,} rows across {num_columns} columns.") else: insights.append(f"Returned {total_rows:,} rows across {num_columns} columns.") for column in key_metrics.get("columns", []): if len(insights) >= 5: break kind = column.get("kind") name = column.get("name", "column") if kind == "numeric" and column.get("min") is not None and column.get("max") is not None: insights.append(f"{name} spans {column['min']} → {column['max']} (avg {column.get('avg')}).") elif kind == "categorical" and column.get("top_values"): top = column["top_values"][0] pct = round(top.get("ratio", 0) * 100, 1) insights.append(f"{name} most frequent value '{top.get('value')}' (~{pct}% of sampled rows).") elif kind == "time" and column.get("min_ts") and column.get("max_ts"): span_ms = column.get("span_ms") or 0 if span_ms >= 3_600_000: span_text = f"{round(span_ms / 3_600_000, 2)}h" elif span_ms >= 60_000: span_text = f"{round(span_ms / 60_000, 2)}m" else: span_text = f"{round(span_ms / 1000, 2)}s" insights.append(f"{name} covers {column['min_ts']} → {column['max_ts']} ({span_text}).") return insights def build_default_insights( rows: Sequence[Any] | None, *, columns: Sequence[str] | None, total_rows: int | None, truncated: bool, ) -> tuple[dict[str, Any] | None, list[str]]: if not rows: return None, [] sampled_rows: list[dict[str, Any]] = [] column_names = list(columns) if columns else [] for entry in rows[:MAX_SAMPLE_ROWS]: normalized, inferred = _normalize_row(entry, column_names or None) sampled_rows.append(normalized) if not column_names and inferred: column_names = inferred sample_size = len(sampled_rows) if sample_size == 0: return None, [] metrics_columns: list[dict[str, Any]] = [] for column in column_names: summary = _summarize_column(column, sampled_rows, sample_size) if summary: metrics_columns.append(summary) if not metrics_columns: return None, [] key_metrics: dict[str, Any] = { "total_rows": total_rows if total_rows is not None else sample_size, "sampled_rows": sample_size, "num_columns": len(column_names), "columns": metrics_columns, "truncated_output": bool(truncated), } insights = _compose_insights(key_metrics) return key_metrics, insights

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/Evan-Kim2028/igloo-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server