/**
* MySQL Security - Audit and Firewall Tools
*
* Tools for security auditing, firewall monitoring, and compliance.
*/
import { z } from "zod";
import type { MySQLAdapter } from "../../MySQLAdapter.js";
import type {
ToolDefinition,
RequestContext,
} from "../../../../types/index.js";
// =============================================================================
// Zod Schemas
// ============================================================================
const AuditLogSchema = z.object({
limit: z.number().default(100).describe("Maximum number of records"),
user: z.string().optional().describe("Filter by username"),
eventType: z
.string()
.optional()
.describe('Filter by event type (e.g., "CONNECT", "QUERY")'),
startTime: z.string().optional().describe("Start time filter (ISO 8601)"),
});
const FirewallRulesSchema = z.object({
user: z.string().optional().describe("Filter by username"),
mode: z
.enum(["RECORDING", "PROTECTING", "DETECTING", "OFF"])
.optional()
.describe("Filter by mode"),
});
// =============================================================================
// Tool Creation Functions
// =============================================================================
/**
* Query audit log
*/
export function createSecurityAuditTool(adapter: MySQLAdapter): ToolDefinition {
return {
name: "mysql_security_audit",
title: "MySQL Security Audit Log",
description:
"Query the MySQL audit log (requires Enterprise Audit or compatible plugin).",
group: "security",
inputSchema: AuditLogSchema,
requiredScopes: ["admin"],
annotations: {
readOnlyHint: true,
idempotentHint: true,
},
handler: async (params: unknown, _context: RequestContext) => {
const { limit, user, eventType, startTime } =
AuditLogSchema.parse(params);
// First check if audit log table exists
try {
const checkResult = await adapter.executeQuery(`
SELECT TABLE_NAME
FROM information_schema.TABLES
WHERE TABLE_SCHEMA = 'mysql'
AND TABLE_NAME = 'audit_log'
`);
if (!checkResult.rows || checkResult.rows.length === 0) {
// Try performance_schema alternative
let query = `
SELECT
EVENT_NAME as event,
OBJECT_TYPE as objectType,
OBJECT_NAME as objectName,
CURRENT_USER as user,
HOST as host,
TIMER_START as startTime
FROM performance_schema.events_statements_history
`;
const conditions: string[] = [];
const queryParams: unknown[] = [];
if (user) {
conditions.push("CURRENT_USER LIKE ?");
queryParams.push(`%${user}%`);
}
if (conditions.length > 0) {
query += " WHERE " + conditions.join(" AND ");
}
query += ` ORDER BY TIMER_START DESC LIMIT ${String(limit)}`;
const result = await adapter.executeQuery(query, queryParams);
return {
source: "performance_schema",
message: "Using performance_schema as audit log is not available",
events: result.rows ?? [],
count: result.rows?.length ?? 0,
};
}
// Query actual audit log
let query = `
SELECT *
FROM mysql.audit_log
`;
const conditions: string[] = [];
const queryParams: unknown[] = [];
if (user) {
conditions.push("user LIKE ?");
queryParams.push(`%${user}%`);
}
if (eventType) {
conditions.push("event_type = ?");
queryParams.push(eventType);
}
if (startTime) {
conditions.push("timestamp >= ?");
queryParams.push(startTime);
}
if (conditions.length > 0) {
query += " WHERE " + conditions.join(" AND ");
}
query += ` ORDER BY timestamp DESC LIMIT ${String(limit)}`;
const result = await adapter.executeQuery(query, queryParams);
return {
source: "mysql.audit_log",
events: result.rows ?? [],
count: result.rows?.length ?? 0,
};
} catch {
return {
available: false,
message:
"Audit logging is not enabled. Install MySQL Enterprise Audit or Percona Audit plugin.",
suggestion:
'Install audit plugin with: INSTALL PLUGIN audit_log SONAME "audit_log.so"',
};
}
},
};
}
/**
* Get firewall status
*/
export function createSecurityFirewallStatusTool(
adapter: MySQLAdapter,
): ToolDefinition {
return {
name: "mysql_security_firewall_status",
title: "MySQL Firewall Status",
description: "Get MySQL Enterprise Firewall plugin status.",
group: "security",
inputSchema: z.object({}),
requiredScopes: ["read"],
annotations: {
readOnlyHint: true,
idempotentHint: true,
},
handler: async (_params: unknown, _context: RequestContext) => {
try {
// Check if firewall plugin is installed
const pluginResult = await adapter.executeQuery(`
SELECT PLUGIN_NAME, PLUGIN_STATUS
FROM information_schema.PLUGINS
WHERE PLUGIN_NAME LIKE '%firewall%'
`);
if (!pluginResult.rows || pluginResult.rows.length === 0) {
return {
installed: false,
message: "MySQL Enterprise Firewall is not installed",
suggestion:
'Install with: INSTALL PLUGIN mysql_firewall SONAME "firewall.so"',
};
}
// Get firewall variables
const varsResult = await adapter.executeQuery(
"SHOW VARIABLES LIKE 'mysql_firewall%'",
);
const variables: Record<string, unknown> = Object.fromEntries(
(varsResult.rows ?? []).map((row) => {
const r = row;
const varName =
typeof r["Variable_name"] === "string" ? r["Variable_name"] : "";
return [varName, r["Value"]];
}),
);
return {
installed: true,
plugins: pluginResult.rows,
configuration: variables,
};
} catch {
return {
installed: false,
message: "Firewall plugin check failed",
suggestion: "Ensure you have privileges to view plugin information",
};
}
},
};
}
/**
* List firewall rules
*/
export function createSecurityFirewallRulesTool(
adapter: MySQLAdapter,
): ToolDefinition {
return {
name: "mysql_security_firewall_rules",
title: "MySQL Firewall Rules",
description: "List MySQL Enterprise Firewall allowlist rules.",
group: "security",
inputSchema: FirewallRulesSchema,
requiredScopes: ["admin"],
annotations: {
readOnlyHint: true,
idempotentHint: true,
},
handler: async (params: unknown, _context: RequestContext) => {
const { user, mode } = FirewallRulesSchema.parse(params);
try {
// Get firewall users
let usersQuery = `
SELECT USERHOST, MODE
FROM mysql.firewall_users
`;
const conditions: string[] = [];
const queryParams: unknown[] = [];
if (user) {
conditions.push("USERHOST LIKE ?");
queryParams.push(`%${user}%`);
}
if (mode) {
conditions.push("MODE = ?");
queryParams.push(mode);
}
if (conditions.length > 0) {
usersQuery += " WHERE " + conditions.join(" AND ");
}
const usersResult = await adapter.executeQuery(usersQuery, queryParams);
// Get firewall whitelist
let rulesQuery = `
SELECT USERHOST, RULE
FROM mysql.firewall_whitelist
`;
if (user) {
rulesQuery += " WHERE USERHOST LIKE ?";
}
const rulesResult = await adapter.executeQuery(
rulesQuery,
user ? [`%${user}%`] : [],
);
return {
users: usersResult.rows ?? [],
rules: rulesResult.rows ?? [],
userCount: usersResult.rows?.length ?? 0,
ruleCount: rulesResult.rows?.length ?? 0,
};
} catch {
return {
available: false,
message:
"Firewall tables not accessible. Ensure MySQL Enterprise Firewall is installed and you have appropriate privileges.",
};
}
},
};
}