/**
* ROLE: INFRASTRUCTURE
* PURPOSE: Fail-closed write-time policy engine invoked on EVERY write
* AUTHORITY: WINDSURF EXECUTION PROMPT — MCP-Enforced Write-Time Policy Engine
*
* This module enforces:
* 1. Plan-scoped path enforcement (CREATE/MODIFY allowlists)
* 2. Universal denylist (placeholders, silent failures, debug bypass)
* 3. Language-aware policy profiles (Rust, TypeScript/JavaScript, Python)
* 4. Intent artifact co-requirement validation
* 5. Fail-closed semantics (any failure refuses write)
* 6. Audit logging on pass/fail
*
* INVARIANT: Policy engine MUST run BEFORE any filesystem write.
* INVARIANT: Policy rejection is non-recoverable and must abort the write.
* INVARIANT: Any error in policy engine → write MUST be refused.
*/
import fs from "fs";
import path from "path";
import crypto from "crypto";
import { SystemError, SYSTEM_ERROR_CODES } from "./system-error.js";
import { appendAuditEntry } from "./audit-system.js";
import { validateIntentArtifact } from "./intent-validator.js";
// ============================================================================
// LANGUAGE DETECTION
// ============================================================================
/**
* Detect programming language from file extension and optional content heuristics.
* @param {string} filePath - workspace-relative file path
* @param {string} content - file content (optional, for heuristics)
* @returns {string} language code: "rust" | "typescript" | "javascript" | "python" | "markdown" | "unknown"
*/
export function detectLanguage(filePath, content = "") {
const ext = path.extname(filePath).toLowerCase();
// File extension detection (primary)
const languageMap = {
".rs": "rust",
".ts": "typescript",
".tsx": "typescript",
".js": "javascript",
".mjs": "javascript",
".cjs": "javascript",
".jsx": "javascript",
".py": "python",
".pyi": "python",
".md": "markdown",
".markdown": "markdown",
};
if (languageMap[ext]) {
return languageMap[ext];
}
// Content heuristics for unknown extensions
if (content) {
if (content.includes("fn ") || content.includes("impl ") || content.includes("pub fn")) {
return "rust";
}
if (content.includes("import ") && content.includes("from ")) {
return "typescript";
}
if (content.includes("def ") || content.includes("import ")) {
return "python";
}
}
return "unknown";
}
// ============================================================================
// UNIVERSAL DENYLIST (LANGUAGE-AGNOSTIC)
// ============================================================================
const UNIVERSAL_DENYLIST = [
// === Placeholder / unfinished logic ===
{
pattern: /\bTODO\b/,
name: "TODO",
category: "placeholder",
reason: "Incomplete work markers must never ship",
},
{
pattern: /\bFIXME\b/,
name: "FIXME",
category: "placeholder",
reason: "Incomplete work markers must never ship",
},
{
pattern: /\bXXX\b/,
name: "XXX",
category: "placeholder",
reason: "Incomplete work markers must never ship",
},
{
pattern: /\bpass\b/,
name: "pass",
category: "placeholder",
reason: "Empty pass statement is placeholder code",
},
// === Silent failure / fallback patterns ===
{
pattern: /catch\s*\{\s*\}/,
name: "empty catch {}",
category: "silent_failure",
reason: "Empty catch blocks silently swallow exceptions",
},
{
pattern: /catch\s*\([^)]*\)\s*\{\s*(?:\/\/|\/\*)[^}]*\}\s*\}/,
name: "catch with log-only",
category: "silent_failure",
reason: "Catch block that only logs without rethrowing",
},
{
pattern: /\.catch\(\s*\(\s*\)\s*=>\s*\{?\s*\}?\s*\)/,
name: ".catch(() => {})",
category: "silent_failure",
reason: "Promise catch that silently ignores errors",
},
{
pattern: /try\s*\{[^}]*\}\s*catch\s*\{[^}]*return/,
name: "try-catch with silent return",
category: "silent_failure",
reason: "Catch block that silently returns instead of handling error",
},
{
pattern: /unwrap_or\(/,
name: "unwrap_or",
category: "silent_failure",
reason: "Fallback that bypasses error handling",
},
{
pattern: /unwrap_or_default/,
name: "unwrap_or_default",
category: "silent_failure",
reason: "Fallback that bypasses error handling",
},
{
pattern: /orElse|getOrElse/,
name: "orElse/getOrElse",
category: "silent_failure",
reason: "Fallback that bypasses error handling",
},
// === Debug bypass / masking ===
{
pattern: /console\.log\s*\([^)]*\)\s*(?:;|$)/,
name: "console.log",
category: "debug_bypass",
reason: "Debug logging in production code",
},
{
pattern: /assert\s*\(\s*false\s*\)/,
name: "assert(false)",
category: "debug_bypass",
reason: "Disabled assert masking real errors",
},
];
/**
* Scan content for universal denylist violations.
* @param {string} content - file content
* @returns {Array<{name: string, reason: string}>} violations found
*/
function scanUniversalDenylist(content) {
const violations = [];
for (const { pattern, name, reason } of UNIVERSAL_DENYLIST) {
if (pattern.test(content)) {
violations.push({ name, reason });
}
}
return violations;
}
// ============================================================================
// LANGUAGE-SPECIFIC PROFILES
// ============================================================================
const RUST_PROFILE_DENYLIST = [
{
pattern: /\bunwrap\s*\(\s*\)/,
name: "unwrap()",
reason: "Unwrap panics on error instead of handling Result",
},
{
pattern: /\bexpect\s*\(/,
name: "expect()",
reason: "Expect panics on error instead of handling Result",
},
{
pattern: /\bpanic\s*!/,
name: "panic!",
reason: "Panic is uncontrolled abort, not error handling",
},
{
pattern: /\btodo\s*!/,
name: "todo!",
reason: "todo! macro is unimplemented code",
},
{
pattern: /\bunimplemented\s*!/,
name: "unimplemented!",
reason: "unimplemented! macro is unfinished code",
},
{
pattern: /\bunsafe\s*\{/,
name: "unsafe {}",
reason: "Unsafe blocks bypass memory safety guarantees",
},
{
pattern: /\bstatic\s+mut\b/,
name: "static mut",
reason: "Mutable statics are inherently unsafe",
},
{
pattern: /\bBox\s*::\s*leak\s*\(/,
name: "Box::leak",
reason: "Box::leak intentionally leaks memory",
},
{
pattern: /#\[allow\s*\([^\)]*\)\s*\]/,
name: "#[allow(...)]",
reason: "Allow attributes suppress compiler warnings",
},
];
/**
* Rust language-specific policy check.
* @param {string} content - file content
* @returns {Array<{name: string, reason: string}>} violations found
*/
function scanRustProfile(content) {
const violations = [];
for (const { pattern, name, reason } of RUST_PROFILE_DENYLIST) {
if (pattern.test(content)) {
violations.push({ name, reason });
}
}
return violations;
}
const TS_JS_PROFILE_DENYLIST = [
{
pattern: /:\s*any\b/,
name: "any type",
reason: "Type 'any' defeats type safety",
},
{
pattern: /@ts-ignore/,
name: "@ts-ignore",
reason: "TypeScript ignore suppresses type checking",
},
{
pattern: /\/\/\s*@ts-ignore/,
name: "// @ts-ignore",
reason: "TypeScript ignore suppresses type checking",
},
{
pattern: /Math\.random\s*\(\s*\)/,
name: "Math.random",
reason: "Non-deterministic randomness in determinism-required code",
},
{
pattern: /Date\.now\s*\(\s*\)/,
name: "Date.now",
reason: "Non-deterministic timing in determinism-required code",
},
];
/**
* TypeScript/JavaScript language-specific policy check.
* @param {string} content - file content
* @returns {Array<{name: string, reason: string}>} violations found
*/
function scanTsJsProfile(content) {
const violations = [];
for (const { pattern, name, reason } of TS_JS_PROFILE_DENYLIST) {
if (pattern.test(content)) {
violations.push({ name, reason });
}
}
return violations;
}
const PYTHON_PROFILE_DENYLIST = [
{
pattern: /import\s+random/,
name: "import random",
reason: "Non-deterministic randomness in determinism-required code",
},
{
pattern: /from\s+random\s+import/,
name: "from random import",
reason: "Non-deterministic randomness in determinism-required code",
},
{
pattern: /import\s+time/,
name: "import time",
reason: "Non-deterministic timing in determinism-required code",
},
{
pattern: /time\.time\s*\(\s*\)/,
name: "time.time()",
reason: "Non-deterministic timing in determinism-required code",
},
{
pattern: /except\s*:\s*$/m,
name: "bare except:",
reason: "Bare except catches all exceptions without handling",
},
];
/**
* Python language-specific policy check.
* @param {string} content - file content
* @returns {Array<{name: string, reason: string}>} violations found
*/
function scanPythonProfile(content) {
const violations = [];
for (const { pattern, name, reason } of PYTHON_PROFILE_DENYLIST) {
if (pattern.test(content)) {
violations.push({ name, reason });
}
}
return violations;
}
// ============================================================================
// PLAN-SCOPED PATH ENFORCEMENT
// ============================================================================
/**
* Validate that path is within workspace root and doesn't escape.
* @param {string} absPath - absolute target path
* @param {string} workspaceRoot - absolute workspace root
* @throws {SystemError} on violation
*/
function validatePathBounds(absPath, workspaceRoot) {
// Check path is within workspace root (no .. escapes, no symlink escapes)
const rel = path.relative(workspaceRoot, absPath);
if (rel.startsWith("..") || path.isAbsolute(rel)) {
throw SystemError.toolFailure(SYSTEM_ERROR_CODES.PATH_TRAVERSAL_BLOCKED, {
human_message: `Path traversal detected: target path escapes workspace root`,
tool_name: "write_file",
});
}
}
/**
* Validate that path is in plan's allowlist.
* (Placeholder: full plan parsing would extract CREATE/MODIFY allowlists)
* @param {string} relPath - workspace-relative path
* @param {object} planData - parsed plan object
* @param {boolean} isNewFile - true if CREATE, false if MODIFY
* @throws {SystemError} on violation
*/
function validatePathInPlan(relPath, planData, isNewFile) {
// NOTE: In a full implementation, planData would contain:
// { phases: [{ id: "...", CREATE: [...], MODIFY: [...] }] }
// This is a structural placeholder.
// For now, skip detailed plan parsing (assumes plan-enforcer.js validates)
// In production, verify relPath is in appropriate allowlist
}
// ============================================================================
// INTENT ARTIFACT CO-REQUIREMENT
// ============================================================================
// NOTE: validateIntentArtifact is imported from intent-validator.js
// It performs comprehensive schema validation with fail-closed semantics
// ============================================================================
// MAIN POLICY ENGINE
// ============================================================================
/**
* Execute write-time policy validation.
* Invoked BEFORE any filesystem write.
*
* INPUTS (REQUIRED):
* - workspace_root: absolute path to locked workspace
* - role: WINDSURF or ANTIGRAVITY
* - session_id: session UUID
* - tool_name: name of write tool calling this
* - plan_hash: SHA256 hash of authorized plan
* - phase_id: phase within plan (if available)
* - operation: "CREATE" or "MODIFY"
* - path: workspace-relative file path
* - content_bytes: actual file content
* - detected_language: language code (from detectLanguage)
* - content_hash: SHA256 of content
* - content_length: byte length of content
*
* RETURNS:
* - { verdict: "PASS", warnings: [...] } on success
*
* THROWS:
* - SystemError with WRITE_POLICY_* codes on failure
* - Failure causes audit entry and write refusal
*/
export async function executeWriteTimePolicy({
workspace_root,
role,
session_id,
tool_name,
plan_hash,
phase_id,
operation,
path: filePath,
content_bytes,
detected_language,
content_hash,
content_length,
}) {
// === VALIDATE INPUTS (fail-closed) ===
const requiredFields = [
"workspace_root",
"role",
"session_id",
"tool_name",
"plan_hash",
"operation",
"path",
"content_bytes",
"detected_language",
"content_hash",
"content_length",
];
for (const field of requiredFields) {
if (arguments[0][field] === undefined || arguments[0][field] === null) {
throw SystemError.toolFailure(SYSTEM_ERROR_CODES.MISSING_REQUIRED_FIELD, {
human_message: `Policy engine missing required input: ${field}`,
tool_name: "write_file",
});
}
}
// === CONSTRUCT ABSOLUTE PATH ===
let absPath;
try {
absPath = path.resolve(workspace_root, filePath);
} catch (err) {
throw SystemError.toolFailure(SYSTEM_ERROR_CODES.INVALID_PATH, {
human_message: `Invalid file path: ${err.message}`,
tool_name: "write_file",
cause: err,
});
}
// === PATH BOUNDS VALIDATION ===
try {
validatePathBounds(absPath, workspace_root);
} catch (err) {
if (err instanceof SystemError) {
throw err;
}
throw SystemError.toolFailure(SYSTEM_ERROR_CODES.PATH_TRAVERSAL_BLOCKED, {
human_message: err.message,
tool_name: "write_file",
cause: err,
});
}
// === UNIVERSAL DENYLIST SCAN ===
const universalViolations = scanUniversalDenylist(content_bytes);
if (universalViolations.length > 0) {
await appendAuditEntry({
session_id,
role,
workspace_root,
tool: tool_name,
plan_hash,
phase_id,
args: { path: filePath, operation },
result: "error",
error_code: SYSTEM_ERROR_CODES.POLICY_VIOLATION,
invariant_id: "NO_PLACEHOLDERS_NO_FALLBACKS",
notes: `Universal denylist violations: ${universalViolations.map((v) => v.name).join(", ")}`,
}, workspace_root);
const violation_details = universalViolations
.map((v) => ` • ${v.name}: ${v.reason}`)
.join("\n");
throw SystemError.toolFailure(SYSTEM_ERROR_CODES.POLICY_VIOLATION, {
human_message: `WRITE_POLICY_DENYLIST_VIOLATION:\n\n${violation_details}\n\nRemove these patterns and retry.`,
tool_name: "write_file",
});
}
// === LANGUAGE-SPECIFIC POLICY SCAN ===
let languageViolations = [];
if (detected_language === "rust") {
languageViolations = scanRustProfile(content_bytes);
if (languageViolations.length > 0) {
await appendAuditEntry({
session_id,
role,
workspace_root,
tool: tool_name,
plan_hash,
phase_id,
args: { path: filePath, operation },
result: "error",
error_code: SYSTEM_ERROR_CODES.RUST_POLICY_VIOLATION,
invariant_id: "RUST_REALITY_LOCK",
notes: `Rust policy violations: ${languageViolations.map((v) => v.name).join(", ")}`,
}, workspace_root);
const violation_details = languageViolations
.map((v) => ` • ${v.name}: ${v.reason}`)
.join("\n");
throw SystemError.toolFailure(SYSTEM_ERROR_CODES.RUST_POLICY_VIOLATION, {
human_message: `RUST_POLICY_VIOLATION:\n\n${violation_details}\n\nReplace with proper error handling.`,
tool_name: "write_file",
});
}
} else if (detected_language === "typescript" || detected_language === "javascript") {
languageViolations = scanTsJsProfile(content_bytes);
if (languageViolations.length > 0) {
await appendAuditEntry({
session_id,
role,
workspace_root,
tool: tool_name,
plan_hash,
phase_id,
args: { path: filePath, operation },
result: "error",
error_code: SYSTEM_ERROR_CODES.POLICY_VIOLATION,
invariant_id: "DETERMINISM_REQUIRED",
notes: `TS/JS policy violations: ${languageViolations.map((v) => v.name).join(", ")}`,
}, workspace_root);
const violation_details = languageViolations
.map((v) => ` • ${v.name}: ${v.reason}`)
.join("\n");
throw SystemError.toolFailure(SYSTEM_ERROR_CODES.POLICY_VIOLATION, {
human_message: `TS_POLICY_VIOLATION:\n\n${violation_details}\n\nEnsure deterministic behavior.`,
tool_name: "write_file",
});
}
} else if (detected_language === "python") {
languageViolations = scanPythonProfile(content_bytes);
if (languageViolations.length > 0) {
await appendAuditEntry({
session_id,
role,
workspace_root,
tool: tool_name,
plan_hash,
phase_id,
args: { path: filePath, operation },
result: "error",
error_code: SYSTEM_ERROR_CODES.POLICY_VIOLATION,
invariant_id: "DETERMINISM_REQUIRED",
notes: `Python policy violations: ${languageViolations.map((v) => v.name).join(", ")}`,
}, workspace_root);
const violation_details = languageViolations
.map((v) => ` • ${v.name}: ${v.reason}`)
.join("\n");
throw SystemError.toolFailure(SYSTEM_ERROR_CODES.POLICY_VIOLATION, {
human_message: `PYTHON_POLICY_VIOLATION:\n\n${violation_details}\n\nEnsure deterministic behavior.`,
tool_name: "write_file",
});
}
}
// "unknown" language applies universal denylist only (already checked)
// === INTENT ARTIFACT CO-REQUIREMENT ===
const isFailureReport = filePath.includes("docs/reports/");
try {
await validateIntentArtifact(absPath, workspace_root, isFailureReport);
} catch (err) {
if (err instanceof SystemError) {
await appendAuditEntry({
session_id,
role,
workspace_root,
tool: tool_name,
plan_hash,
phase_id,
args: { path: filePath, operation },
result: "error",
error_code: SYSTEM_ERROR_CODES.MISSING_REQUIRED_FIELD,
invariant_id: "MANDATORY_INTENT",
notes: "Intent artifact missing",
}, workspace_root);
throw err;
}
throw SystemError.toolFailure(SYSTEM_ERROR_CODES.INTERNAL_ERROR, {
human_message: err.message,
tool_name: "write_file",
cause: err,
});
}
// === POLICY PASSED - LOG SUCCESS ===
await appendAuditEntry({
session_id,
role,
workspace_root,
tool: tool_name,
plan_hash,
phase_id,
args: { path: filePath, operation, content_hash, content_length },
result: "ok",
error_code: null,
invariant_id: null,
notes: `Write-time policy PASSED for ${detected_language} file`,
}, workspace_root);
return {
verdict: "PASS",
language: detected_language,
content_hash,
content_length,
warnings: [],
};
}
// ============================================================================
// FAIL-CLOSED ERROR HANDLING
// ============================================================================
/**
* Wrapper to ensure policy engine failure always refuses write.
* If policy engine throws or fails to parse, write is refused.
*/
export async function safePolicyExecution(policyInputs, workspaceRoot) {
try {
return await executeWriteTimePolicy(policyInputs);
} catch (err) {
// If we can't even audit the failure, this is critical
if (err instanceof SystemError) {
throw err; // Re-throw classified errors
}
// Unknown error in policy engine → fail-closed
throw SystemError.toolFailure(SYSTEM_ERROR_CODES.INTERNAL_ERROR, {
human_message: "Write-time policy engine encountered an unexpected error. Write refused.",
tool_name: "write_file",
cause: err,
});
}
}