import { MCPTool, MCPInput } from "mcp-framework";
import { z } from "zod";
import RabbitMQConnection from "./RabbitMQConnection.js";
const QueueSchema = z.object({
action: z.enum(["create", "delete", "purge", "info"]).describe("Action to perform on the queue"),
queue_name: z.string().describe("Name of the queue"),
durable: z.boolean().optional().describe("Whether the queue should survive server restarts (default: true)"),
exclusive: z.boolean().optional().describe("Whether the queue should be exclusive to this connection (default: false)"),
auto_delete: z.boolean().optional().describe("Whether the queue should be deleted when no consumers (default: false)"),
});
class QueueManagement extends MCPTool {
name = "queue_management";
description = "Manage RabbitMQ queues - create, delete, purge, or get info";
schema = QueueSchema;
async execute(input: MCPInput<this>) {
const channel = RabbitMQConnection.getChannel();
if (!channel) {
return "Error: No active RabbitMQ connection. Please connect first.";
}
try {
switch (input.action) {
case "create":
const options = {
durable: input.durable ?? true,
exclusive: input.exclusive ?? false,
autoDelete: input.auto_delete ?? false,
};
const result = await channel.assertQueue(input.queue_name, options);
return {
message: `Queue '${input.queue_name}' created successfully`,
queue: result.queue,
message_count: result.messageCount,
consumer_count: result.consumerCount,
options
};
case "delete":
await channel.deleteQueue(input.queue_name);
return `Queue '${input.queue_name}' deleted successfully`;
case "purge":
const purgeResult = await channel.purgeQueue(input.queue_name);
return `Queue '${input.queue_name}' purged. ${purgeResult.messageCount} messages removed`;
case "info":
const queueInfo = await channel.checkQueue(input.queue_name);
return {
queue: queueInfo.queue,
message_count: queueInfo.messageCount,
consumer_count: queueInfo.consumerCount
};
default:
return "Invalid action. Use 'create', 'delete', 'purge', or 'info'";
}
} catch (error) {
return `Error: ${error instanceof Error ? error.message : 'Unknown error'}`;
}
}
}
export default QueueManagement;