utils.py•2.16 kB
from __future__ import annotations
import json
import os
import time
from functools import lru_cache
from pathlib import Path
from typing import Any, Callable
from cachetools import TTLCache
# cache for expensive calls – default ttl 30s, configurable via env
_DEFAULT_TTL = int(os.getenv("MCP_CACHE_TTL", "30"))
_CACHE: TTLCache = TTLCache(maxsize=2048, ttl=_DEFAULT_TTL)
def cache_call(key: tuple[Any, ...], fn: Callable[[], Any], ttl: int | None = None) -> Any:
"""return cached result if present else compute and store."""
if ttl is None:
ttl = _DEFAULT_TTL
if ttl == 0:
return fn()
if key in _CACHE:
return _CACHE[key]
res = fn()
_CACHE[key] = res
# override ttl per-call
if ttl != _DEFAULT_TTL:
_CACHE.expire(time.time() + ttl, key)
return res
# ---------------------------------------------------------------------
# audit logging
# ---------------------------------------------------------------------
_LOG_DIR = Path(os.getenv("MCP_LOG_DIR", "logs"))
_LOG_DIR.mkdir(exist_ok=True)
def write_audit(record: dict[str, Any]) -> None: # noqa: d401
ts = time.strftime("%Y-%m-%d")
file = _LOG_DIR / f"mcp-{ts}.jsonl"
record["ts"] = time.time()
with file.open("a") as fp:
fp.write(json.dumps(record, default=str) + "\n")
# ---------------------------------------------------------------------
# rbac policy – very simple allow/deny matcher
# ---------------------------------------------------------------------
@lru_cache(maxsize=1)
def _load_policy() -> dict[str, str]:
policy_path = os.getenv("MCP_POLICY", "policies/default.yaml")
try:
import yaml # type: ignore
except ImportError:
return {}
p_file = Path(policy_path)
if not p_file.exists():
return {}
data = yaml.safe_load(p_file.read_text()) or {}
return data.get("rules", {})
def check_permission(fq_name: str) -> bool:
rules = _load_policy()
# exact match or wildcard '*'
if fq_name in rules:
return rules[fq_name].lower() == "allow"
if "*" in rules:
return rules["*"].lower() == "allow"
return True