Skip to main content
Glama
query_history.py13.8 kB
from __future__ import annotations import hashlib import json import logging import os from pathlib import Path from threading import Lock from typing import Any, Iterable, Optional, TypedDict, cast from ..path_utils import ( DEFAULT_HISTORY_PATH, apply_namespacing, find_repo_root, get_global_base, resolve_history_path, ) logger = logging.getLogger(__name__) class Insight(TypedDict, total=False): """Normalized insight structure for post-query analysis.""" summary: str key_metrics: list[str] business_impact: str follow_up_needed: bool source: str def normalize_insight(value: str | dict[str, Any]) -> Insight: """Normalize insight value to structured Insight format. Args: value: Either a string summary or a dict with insight fields Returns: Normalized Insight dict with all required fields set """ if isinstance(value, str): return { "summary": value, "key_metrics": [], "business_impact": "", "follow_up_needed": False, } norm = cast(Insight, dict(value)) norm.setdefault("summary", "") norm.setdefault("key_metrics", []) norm.setdefault("business_impact", "") norm.setdefault("follow_up_needed", False) return norm def truncate_insight_for_storage(insight: Insight, max_bytes: int = 16384) -> Insight: """Truncate insight to fit within storage size limit. Args: insight: Normalized insight dict max_bytes: Maximum size in bytes (default: 16KB) Returns: Truncated insight dict (or original if within limit) """ serialized = json.dumps(insight, ensure_ascii=False) if len(serialized.encode("utf-8")) <= max_bytes: return insight # Truncate summary field if present and too large truncated = dict(insight) summary = str(truncated.get("summary", "")) if summary: # Estimate bytes and truncate with ellipsis summary_bytes = summary.encode("utf-8") if len(summary_bytes) > max_bytes - 100: # Reserve space for other fields max_summary_bytes = max_bytes - 100 truncated_summary = summary_bytes[:max_summary_bytes].decode( "utf-8", errors="ignore" ) # Try to avoid cutting in the middle of a multi-byte character while len(truncated_summary.encode("utf-8")) > max_summary_bytes - 3: truncated_summary = truncated_summary[:-1] truncated["summary"] = truncated_summary + "..." return cast(Insight, truncated) class QueryHistory: """Lightweight JSONL history writer for queries. Enabled when IGLOO_MCP_QUERY_HISTORY is set to a writable file path. Writes one JSON object per line with minimal fields for auditing. """ _DISABLE_SENTINELS = {"", "disabled", "off", "false", "0"} def __init__( self, path: Optional[Path], *, fallbacks: Optional[Iterable[Path]] = None, disabled: bool = False, ) -> None: self._path: Optional[Path] = None self._lock = Lock() self._enabled = False self._disabled = disabled self._warnings: list[str] = [] if self._disabled: return candidates: list[Path] = [] if path is not None: candidates.append(path) if fallbacks: for candidate in fallbacks: if candidate not in candidates: candidates.append(candidate) for index, candidate in enumerate(candidates): try: candidate.parent.mkdir(parents=True, exist_ok=True) self._path = candidate self._enabled = True if index > 0: warning = ( "Query history path unavailable; using fallback: " f"{candidate}" ) self._warnings.append(warning) logger.warning(warning) break except Exception as exc: warning = "Failed to initialise query history path %s: %s" % ( candidate, exc, ) self._warnings.append(warning) logger.warning(warning) if not self._enabled: if candidates: warning = ( "Query history disabled because no writable path was available." ) self._warnings.append(warning) logger.warning(warning) else: # No candidates means caller explicitly passed None; stay silent. pass @classmethod def from_env(cls) -> "QueryHistory": """Create QueryHistory instance from environment configuration. Uses resolve_history_path() exclusively for path resolution. Honors IGLOO_MCP_LOG_SCOPE and IGLOO_MCP_NAMESPACED_LOGS. """ raw = os.environ.get("IGLOO_MCP_QUERY_HISTORY") raw_clean = raw.strip() if raw is not None else None disabled = False if raw_clean is not None and raw_clean.lower() in cls._DISABLE_SENTINELS: disabled = True if disabled: return cls(None, disabled=True) def _fallback_candidates() -> list[Path]: candidates: list[Path] = [] try: global_path = ( get_global_base() / apply_namespacing(DEFAULT_HISTORY_PATH) ).resolve() candidates.append(global_path) except Exception: pass try: repo_root = find_repo_root() repo_path = ( repo_root / apply_namespacing(DEFAULT_HISTORY_PATH) ).resolve() if repo_path not in candidates: candidates.append(repo_path) except Exception: pass return candidates fallbacks = _fallback_candidates() try: path = resolve_history_path(raw=raw) except Exception: warning = "Unable to resolve query history path; attempting fallback" logger.warning(warning, exc_info=True) if not fallbacks: warning = "History disabled; no fallback paths available" logger.warning(warning) return cls(None, disabled=False) primary = fallbacks[0] remaining = [ candidate for candidate in fallbacks[1:] if candidate != primary ] return cls(primary, fallbacks=remaining or None) path = path.resolve() remaining_fallbacks = [ candidate for candidate in fallbacks if candidate != path ] return cls(path, fallbacks=remaining_fallbacks or None) @property def enabled(self) -> bool: return self._enabled @property def path(self) -> Optional[Path]: return self._path @property def disabled(self) -> bool: return self._disabled def pop_warnings(self) -> list[str]: warnings = list(self._warnings) self._warnings.clear() return warnings def record(self, payload: dict[str, Any]) -> None: """Record a query execution to the JSONL history file. Args: payload: Query execution payload with standard fields """ if self._path is None or self._disabled: return # Ensure ISO timestamp format for better readability if "ts" in payload and isinstance(payload["ts"], (int, float)): import datetime payload["timestamp"] = datetime.datetime.fromtimestamp( payload["ts"] ).isoformat() try: line = json.dumps(payload, ensure_ascii=False) except (TypeError, ValueError) as e: # Fallback: convert to string representation line = json.dumps( { "error": f"Serialization failed: {str(e)}", "original_preview": str(payload)[:200], }, ensure_ascii=False, ) with self._lock: try: with self._path.open("a", encoding="utf-8") as fh: fh.write(line) fh.write("\n") except Exception: warning = "Failed to append query history entry to %s" % (self._path,) self._warnings.append(warning) logger.warning(warning, exc_info=True) def record_insight( self, execution_id: str, post_query_insight: str | dict[str, Any], *, source: Optional[str] = None, ) -> dict[str, Any]: """Record a post-hoc insight for a prior query execution. Args: execution_id: Execution ID from execute_query.audit_info.execution_id post_query_insight: LLM-provided post-query insight (str or dict) source: Optional source identifier (e.g., "human", "agent:claude") Returns: Dict with execution_id, content_sha256, and deduped flag """ if self._path is None or self._disabled: return { "execution_id": execution_id, "deduped": False, "content_sha256": None, } # Normalize insight normalized = normalize_insight(post_query_insight) truncated = truncate_insight_for_storage(normalized) # Compute content hash for deduplication content_json = json.dumps(truncated, ensure_ascii=False, sort_keys=True) content_sha256 = hashlib.sha256(content_json.encode("utf-8")).hexdigest() # Check for duplicate (execution_id, content_sha256) deduped = False if self._path.exists(): try: with self._path.open("r", encoding="utf-8") as fh: for line in fh: line = line.strip() if not line: continue try: entry = json.loads(line) if ( entry.get("execution_id") == execution_id and entry.get("status") == "insight_recorded" and entry.get("content_sha256") == content_sha256 ): deduped = True break except Exception: continue except Exception: pass if not deduped: # Append new history entry import time from datetime import datetime, timezone payload = { "ts": time.time(), "timestamp": datetime.now(timezone.utc).isoformat(), "execution_id": execution_id, "status": "insight_recorded", "post_query_insight": truncated, "content_sha256": content_sha256, } if source: payload["source"] = source try: line = json.dumps(payload, ensure_ascii=False) except Exception: # Fallback serialization payload_fallback = { "execution_id": execution_id, "status": "insight_recorded", "error": "Serialization failed", } line = json.dumps(payload_fallback, ensure_ascii=False) with self._lock: try: with self._path.open("a", encoding="utf-8") as fh: fh.write(line) fh.write("\n") except Exception: warning = "Failed to append insight record to %s" % (self._path,) self._warnings.append(warning) logger.warning(warning, exc_info=True) return { "execution_id": execution_id, "deduped": deduped, "content_sha256": content_sha256, } def update_cache_manifest_insight( manifest_path: Path, post_query_insight: str | dict[str, Any] ) -> bool: """Atomically update cache manifest with post_query_insight. Args: manifest_path: Path to cache manifest.json post_query_insight: LLM-provided insight (str or dict) Returns: True if update succeeded, False otherwise """ if not manifest_path.exists(): return False try: # Load existing manifest manifest_data = json.loads(manifest_path.read_text(encoding="utf-8")) # Normalize and truncate insight normalized = normalize_insight(post_query_insight) truncated = truncate_insight_for_storage(normalized) # Update manifest manifest_data["post_query_insight"] = truncated # Atomic write: temp file + os.replace temp_path = manifest_path.with_suffix(".tmp.json") try: temp_path.write_text( json.dumps(manifest_data, ensure_ascii=False, indent=2) + "\n", encoding="utf-8", ) temp_path.replace(manifest_path) return True except Exception: # Clean up temp file on failure try: temp_path.unlink(missing_ok=True) except Exception: pass return False except Exception: logger.debug("Failed to update cache manifest", exc_info=True) return False

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/Evan-Kim2028/igloo-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server