safety.py•4.26 kB
"""safety layer: dry-run, secret redaction, destructive op blocking."""
from __future__ import annotations
import re
from typing import Any, Dict, List, Optional
from dataclasses import dataclass
@dataclass
class SafetyConfig:
"""safety configuration for execution."""
allow_mutating: bool = False
dry_run: bool = False
redact_secrets: bool = True
max_retries: int = 3
timeout_seconds: float = 30.0
class SecretRedactor:
"""redacts secrets from logs and responses."""
# patterns for common secret formats
SECRET_PATTERNS = [
(r"['\"]?api[-_]?key['\"]?\s*[:=]\s*['\"]([^'\"]+)['\"]", "api_key"),
(r"['\"]?token['\"]?\s*[:=]\s*['\"]([^'\"]+)['\"]", "token"),
(r"['\"]?password['\"]?\s*[:=]\s*['\"]([^'\"]+)['\"]", "password"),
(r"['\"]?secret['\"]?\s*[:=]\s*['\"]([^'\"]+)['\"]", "secret"),
(r"Bearer\s+([A-Za-z0-9\-._~+/]+)", "bearer_token"),
(r"ghp_[A-Za-z0-9]{36}", "github_token"),
(r"sk-[A-Za-z0-9]{48}", "openai_key"),
(r"AKIA[0-9A-Z]{16}", "aws_key"),
]
SECRET_FIELDS = {
"password", "secret", "api_key", "token", "auth",
"credential", "private_key", "access_key", "secret_key"
}
def redact(self, data: Any) -> Any:
"""redact secrets from data structure."""
if isinstance(data, dict):
return {k: self._redact_value(k, v) for k, v in data.items()}
elif isinstance(data, list):
return [self.redact(item) for item in data]
elif isinstance(data, str):
return self._redact_string(data)
else:
return data
def _redact_value(self, key: str, value: Any) -> Any:
"""redact value if key looks like a secret field."""
key_lower = key.lower()
if any(pattern in key_lower for pattern in self.SECRET_FIELDS):
return "***REDACTED***"
return self.redact(value)
def _redact_string(self, text: str) -> str:
"""redact secrets from string using patterns."""
for pattern, name in self.SECRET_PATTERNS:
text = re.sub(pattern, f"{name}=***REDACTED***", text, flags=re.IGNORECASE)
return text
class DryRunInterceptor:
"""intercepts calls in dry-run mode."""
def should_execute(self, is_mutating: bool, dry_run: bool) -> bool:
"""check if a call should actually execute."""
if not dry_run:
return True
return not is_mutating
def get_dry_run_response(self, tool_name: str, args: tuple, kwargs: dict) -> Dict[str, Any]:
"""generate dry-run response."""
return {
"dry_run": True,
"message": f"would execute {tool_name}",
"args": args,
"kwargs": kwargs,
"note": "call not executed due to dry_run=true"
}
class SafetyValidator:
"""validates calls against safety rules."""
def __init__(self, config: SafetyConfig):
self.config = config
self.redactor = SecretRedactor()
self.dry_run = DryRunInterceptor()
def validate_call(
self,
tool_name: str,
is_mutating: bool,
args: tuple,
kwargs: dict
) -> Optional[str]:
"""
validate if a call is safe to execute.
returns:
error message if unsafe, None if ok
"""
if is_mutating and not self.config.allow_mutating:
return f"destructive operation '{tool_name}' blocked by safety policy. set allow_mutating=true to enable."
return None
def should_execute(self, is_mutating: bool) -> bool:
"""check if call should actually execute."""
return self.dry_run.should_execute(is_mutating, self.config.dry_run)
def create_dry_run_response(
self,
tool_name: str,
args: tuple,
kwargs: dict
) -> Dict[str, Any]:
"""create dry-run response."""
return self.dry_run.get_dry_run_response(tool_name, args, kwargs)
def redact_secrets(self, data: Any) -> Any:
"""redact secrets from data."""
if self.config.redact_secrets:
return self.redactor.redact(data)
return data