"""Audit logger that writes JSONL entries for every guarded operation."""
from __future__ import annotations
import json
import re
import time
from datetime import datetime, timezone
from typing import Any
import config
_SENSITIVE_PATTERN = re.compile(
r"(Bearer\s+|token[\"']?\s*[:=]\s*[\"']?)[A-Za-z0-9/+_\-]{10,}",
re.IGNORECASE,
)
def _mask_secrets(obj: Any) -> Any:
"""Recursively mask sensitive values in dicts/lists/strings."""
if isinstance(obj, str):
return _SENSITIVE_PATTERN.sub(r"\1***REDACTED***", obj)
if isinstance(obj, dict):
return {k: _mask_secrets(v) for k, v in obj.items()}
if isinstance(obj, list):
return [_mask_secrets(v) for v in obj]
return obj
def log_operation(
*,
tool: str,
params: dict[str, Any],
asana_gid: str | None = None,
result: str | None = None,
error: str | None = None,
duration_ms: float | None = None,
) -> None:
"""Append one audit entry to today's JSONL file."""
entry = {
"timestamp": datetime.now(timezone.utc).isoformat(),
"user": "local",
"tool": tool,
"params": _mask_secrets(params),
"asana_gid": asana_gid,
"result": result,
"error": error,
"duration_ms": round(duration_ms, 1) if duration_ms is not None else None,
}
log_file = config.LOG_DIR / f"audit_{datetime.now(timezone.utc).strftime('%Y%m%d')}.jsonl"
with open(log_file, "a", encoding="utf-8") as f:
f.write(json.dumps(entry, ensure_ascii=False) + "\n")
class AuditTimer:
"""Context manager that measures elapsed time in milliseconds."""
def __init__(self) -> None:
self.start = 0.0
self.elapsed_ms = 0.0
def __enter__(self) -> "AuditTimer":
self.start = time.perf_counter()
return self
def __exit__(self, *_: Any) -> None:
self.elapsed_ms = (time.perf_counter() - self.start) * 1000