def read_file(path: str, ctx: Context | None = None) -> str:
context_tokens = activate_runtime_context(ctx)
path = str(pathlib.Path(WORKSPACE_ROOT) / path) if not os.path.isabs(path) else path
try:
path_check = check_path_policy(path, tool="read_file")
if path_check:
result = PolicyResult(allowed=False, reason=path_check[0], decision_tier="blocked", matched_rule=path_check[1])
else:
result = PolicyResult(allowed=True, reason="allowed", decision_tier="allowed", matched_rule=None)
if result.allowed:
max_mb = POLICY.get("allowed", {}).get("max_file_size_mb", 10)
try:
size_mb = os.path.getsize(path) / (1024 * 1024)
if size_mb > max_mb:
result = PolicyResult(
allowed=False,
reason=f"File size {size_mb:.1f} MB exceeds the policy limit of {max_mb} MB (allowed.max_file_size_mb)",
decision_tier="blocked",
matched_rule="allowed.max_file_size_mb",
)
except (FileNotFoundError, OSError):
pass
append_log_entry(build_log_entry("read_file", result, path=path))
if not result.allowed:
return f"[POLICY BLOCK] {result.reason}"
try:
with open(path, "r", errors="replace") as f:
return f.read()
except FileNotFoundError:
return f"Error: file not found: {path}"
except OSError as e:
return f"Error reading file: {e}"
finally:
reset_runtime_context(context_tokens)