Skip to main content
Glama
query_history.py13.9 kB
from __future__ import annotations import contextlib import hashlib import json import logging import os from collections.abc import Iterable from datetime import UTC from pathlib import Path from threading import Lock from typing import Any, ClassVar, TypedDict, cast from igloo_mcp.path_utils import ( DEFAULT_HISTORY_PATH, apply_namespacing, find_repo_root, get_global_base, resolve_history_path, ) logger = logging.getLogger(__name__) class Insight(TypedDict, total=False): """Normalized insight structure for post-query analysis.""" summary: str key_metrics: list[str] business_impact: str follow_up_needed: bool source: str def normalize_insight(value: str | dict[str, Any]) -> Insight: """Normalize insight value to structured Insight format. Args: value: Either a string summary or a dict with insight fields Returns: Normalized Insight dict with all required fields set """ if isinstance(value, str): return { "summary": value, "key_metrics": [], "business_impact": "", "follow_up_needed": False, } norm = cast("Insight", dict(value)) norm.setdefault("summary", "") norm.setdefault("key_metrics", []) norm.setdefault("business_impact", "") norm.setdefault("follow_up_needed", False) return norm def truncate_insight_for_storage(insight: Insight, max_bytes: int = 16384) -> Insight: """Truncate insight to fit within storage size limit. Args: insight: Normalized insight dict max_bytes: Maximum size in bytes (default: 16KB) Returns: Truncated insight dict (or original if within limit) """ serialized = json.dumps(insight, ensure_ascii=False) if len(serialized.encode("utf-8")) <= max_bytes: return insight # Truncate summary field if present and too large truncated = dict(insight) summary = str(truncated.get("summary", "")) if summary: # Estimate bytes and truncate with ellipsis summary_bytes = summary.encode("utf-8") if len(summary_bytes) > max_bytes - 100: # Reserve space for other fields max_summary_bytes = max_bytes - 100 truncated_summary = summary_bytes[:max_summary_bytes].decode("utf-8", errors="ignore") # Try to avoid cutting in the middle of a multi-byte character while len(truncated_summary.encode("utf-8")) > max_summary_bytes - 3: truncated_summary = truncated_summary[:-1] truncated["summary"] = truncated_summary + "..." return cast("Insight", truncated) class QueryHistory: """Lightweight JSONL history writer for queries. Enabled when IGLOO_MCP_QUERY_HISTORY is set to a writable file path. Writes one JSON object per line with minimal fields for auditing. """ _DISABLE_SENTINELS: ClassVar[set[str]] = {"", "disabled", "off", "false", "0"} def __init__( self, path: Path | None, *, fallbacks: Iterable[Path] | None = None, disabled: bool = False, ) -> None: self._path: Path | None = None self._lock = Lock() self._enabled = False self._disabled = disabled self._warnings: list[str] = [] if self._disabled: return candidates: list[Path] = [] if path is not None: candidates.append(path) if fallbacks: for candidate in fallbacks: if candidate not in candidates: candidates.append(candidate) for index, candidate in enumerate(candidates): try: candidate.parent.mkdir(parents=True, exist_ok=True) self._path = candidate self._enabled = True if index > 0: warning = f"Query history path unavailable; using fallback: {candidate}" self._warnings.append(warning) logger.warning(warning) break except Exception as exc: warning = f"Failed to initialise query history path {candidate}: {exc}" self._warnings.append(warning) logger.warning(warning) if not self._enabled: if candidates: warning = "Query history disabled because no writable path was available." self._warnings.append(warning) logger.warning(warning) else: # No candidates means caller explicitly passed None; stay silent. pass @classmethod def from_env(cls) -> QueryHistory: """Create QueryHistory instance from environment configuration. Uses resolve_history_path() exclusively for path resolution. Honors IGLOO_MCP_LOG_SCOPE and IGLOO_MCP_NAMESPACED_LOGS. """ raw = os.environ.get("IGLOO_MCP_QUERY_HISTORY") raw_clean = raw.strip() if raw is not None else None disabled = False if raw_clean is not None and raw_clean.lower() in cls._DISABLE_SENTINELS: disabled = True if disabled: return cls(None, disabled=True) def _fallback_candidates() -> list[Path]: candidates: list[Path] = [] try: global_path = (get_global_base() / apply_namespacing(DEFAULT_HISTORY_PATH)).resolve() candidates.append(global_path) except Exception: pass try: repo_root = find_repo_root() repo_path = (repo_root / apply_namespacing(DEFAULT_HISTORY_PATH)).resolve() if repo_path not in candidates: candidates.append(repo_path) except Exception: pass return candidates fallbacks = _fallback_candidates() try: path = resolve_history_path(raw=raw) except Exception: warning = "Unable to resolve query history path; attempting fallback" logger.warning(warning, exc_info=True) if not fallbacks: warning = "History disabled; no fallback paths available" logger.warning(warning) return cls(None, disabled=False) primary = fallbacks[0] remaining = [candidate for candidate in fallbacks[1:] if candidate != primary] return cls(primary, fallbacks=remaining or None) path = path.resolve() remaining_fallbacks = [candidate for candidate in fallbacks if candidate != path] return cls(path, fallbacks=remaining_fallbacks or None) @property def enabled(self) -> bool: return self._enabled @property def path(self) -> Path | None: return self._path @property def disabled(self) -> bool: return self._disabled def pop_warnings(self) -> list[str]: warnings = list(self._warnings) self._warnings.clear() return warnings def record(self, payload: dict[str, Any]) -> None: """Record a query execution to the JSONL history file. Args: payload: Query execution payload with standard fields """ if self._path is None or self._disabled: return # Ensure ISO timestamp format for better readability if "ts" in payload and isinstance(payload["ts"], (int, float)): import datetime # Use UTC timezone to ensure consistent timestamps across environments payload["timestamp"] = datetime.datetime.fromtimestamp(payload["ts"], tz=datetime.UTC).isoformat() try: line = json.dumps(payload, ensure_ascii=False) except (TypeError, ValueError) as e: # Fallback: convert to string representation line = json.dumps( { "error": f"Serialization failed: {e!s}", "original_preview": str(payload)[:200], }, ensure_ascii=False, ) with self._lock: try: with self._path.open("a", encoding="utf-8") as fh: fh.write(line) fh.write("\n") except Exception: warning = f"Failed to append query history entry to {self._path}" self._warnings.append(warning) logger.warning(warning, exc_info=True) def record_insight( self, execution_id: str, post_query_insight: str | dict[str, Any], *, source: str | None = None, ) -> dict[str, Any]: """Record a post-hoc insight for a prior query execution. Args: execution_id: Execution ID from execute_query.audit_info.execution_id post_query_insight: LLM-provided post-query insight (str or dict) source: Optional source identifier (e.g., "human", "agent:claude") Returns: Dict with execution_id, content_sha256, and deduped flag """ if self._path is None or self._disabled: return { "execution_id": execution_id, "deduped": False, "content_sha256": None, } # Normalize insight normalized = normalize_insight(post_query_insight) truncated = truncate_insight_for_storage(normalized) # Compute content hash for deduplication content_json = json.dumps(truncated, ensure_ascii=False, sort_keys=True) content_sha256 = hashlib.sha256(content_json.encode("utf-8")).hexdigest() # Check for duplicate (execution_id, content_sha256) deduped = False if self._path.exists(): try: with self._path.open("r", encoding="utf-8") as fh: for line in fh: line = line.strip() if not line: continue try: entry = json.loads(line) if ( entry.get("execution_id") == execution_id and entry.get("status") == "insight_recorded" and entry.get("content_sha256") == content_sha256 ): deduped = True break except (ValueError, KeyError, TypeError) as e: # Skip malformed history entries logger.debug(f"Skipping malformed history entry during dedup check: {e}") continue except (OSError, ValueError) as e: # Best effort - if history file is corrupt, continue anyway logger.debug(f"Error reading history during dedup check: {e}") if not deduped: # Append new history entry import time from datetime import datetime payload = { "ts": time.time(), "timestamp": datetime.now(UTC).isoformat(), "execution_id": execution_id, "status": "insight_recorded", "post_query_insight": truncated, "content_sha256": content_sha256, } if source: payload["source"] = source try: line = json.dumps(payload, ensure_ascii=False) except Exception: # Fallback serialization payload_fallback = { "execution_id": execution_id, "status": "insight_recorded", "error": "Serialization failed", } line = json.dumps(payload_fallback, ensure_ascii=False) with self._lock: try: with self._path.open("a", encoding="utf-8") as fh: fh.write(line) fh.write("\n") except Exception: warning = f"Failed to append insight record to {self._path}" self._warnings.append(warning) logger.warning(warning, exc_info=True) return { "execution_id": execution_id, "deduped": deduped, "content_sha256": content_sha256, } def update_cache_manifest_insight(manifest_path: Path, post_query_insight: str | dict[str, Any]) -> bool: """Atomically update cache manifest with post_query_insight. Args: manifest_path: Path to cache manifest.json post_query_insight: LLM-provided insight (str or dict) Returns: True if update succeeded, False otherwise """ if not manifest_path.exists(): return False try: # Load existing manifest manifest_data = json.loads(manifest_path.read_text(encoding="utf-8")) # Normalize and truncate insight normalized = normalize_insight(post_query_insight) truncated = truncate_insight_for_storage(normalized) # Update manifest manifest_data["post_query_insight"] = truncated # Atomic write: temp file + os.replace temp_path = manifest_path.with_suffix(".tmp.json") try: temp_path.write_text( json.dumps(manifest_data, ensure_ascii=False, indent=2) + "\n", encoding="utf-8", ) temp_path.replace(manifest_path) return True except Exception: # Clean up temp file on failure with contextlib.suppress(Exception): temp_path.unlink(missing_ok=True) return False except Exception: logger.debug("Failed to update cache manifest", exc_info=True) return False

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/Evan-Kim2028/igloo-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server