def write_file(path: str, content: str, ctx: Context | None = None) -> str:
context_tokens = activate_runtime_context(ctx)
path = str(pathlib.Path(WORKSPACE_ROOT) / path) if not os.path.isabs(path) else path
try:
path_check = check_path_policy(path, tool="write_file")
if path_check:
result = PolicyResult(allowed=False, reason=path_check[0], decision_tier="blocked", matched_rule=path_check[1])
else:
result = PolicyResult(allowed=True, reason="allowed", decision_tier="allowed", matched_rule=None)
budget_fields: dict = {}
if result.allowed:
budget_allowed, budget_reason, budget_rule, budget_fields = check_and_record_cumulative_budget(
tool="write_file",
command=None,
affected_paths=[path],
operation_count=1,
bytes_estimate=len(content.encode()),
)
if not budget_allowed:
result = PolicyResult(
allowed=False,
reason=budget_reason or "Cumulative blast-radius budget exceeded for current scope.",
decision_tier="blocked",
matched_rule=budget_rule or "requires_simulation.cumulative_budget_exceeded",
)
log_entry = build_log_entry("write_file", result, path=path, **budget_fields)
append_log_entry(log_entry)
if not result.allowed:
return f"[POLICY BLOCK] {result.reason}"
backup_location = None
backup_enabled = bool(POLICY.get("audit", {}).get("backup_enabled", True))
if backup_enabled and os.path.exists(path):
backup_location = backup_paths([path])
if backup_location:
append_log_entry(
{
**log_entry,
"source": "mcp-server",
"backup_location": backup_location,
"event": "backup_created",
}
)
try:
with open(path, "w") as f:
f.write(content)
except OSError as e:
return f"Error writing file: {e}"
msg = f"Successfully wrote {len(content)} characters to {path}"
if backup_location:
msg += f" (previous version backed up to {backup_location})"