import { MCPTool, MCPInput } from "mcp-framework";
import { z } from "zod";
import RabbitMQConnection from "./RabbitMQConnection.js";
const PublishSchema = z.object({
exchange: z.string().optional().describe("Exchange name to publish to (default: empty string for default exchange)"),
routing_key: z.string().describe("Routing key or queue name for direct publishing"),
message: z.string().describe("Message content to publish"),
options: z.object({
persistent: z.boolean().optional().describe("Whether message should persist"),
priority: z.number().optional().describe("Message priority"),
expiration: z.string().optional().describe("Message expiration"),
messageId: z.string().optional().describe("Message ID"),
timestamp: z.number().optional().describe("Message timestamp"),
type: z.string().optional().describe("Message type"),
userId: z.string().optional().describe("User ID"),
appId: z.string().optional().describe("Application ID"),
clusterId: z.string().optional().describe("Cluster ID"),
headers: z.record(z.any()).optional().describe("Message headers"),
}).optional().describe("Message publishing options"),
});
class MessagePublisher extends MCPTool {
name = "message_publisher";
description = "Publish messages to RabbitMQ exchanges or queues";
schema = PublishSchema;
async execute(input: MCPInput<this>) {
const channel = RabbitMQConnection.getChannel();
if (!channel) {
return "Error: No active RabbitMQ connection. Please connect first.";
}
try {
const exchange = input.exchange ?? "";
const messageBuffer = Buffer.from(input.message);
const publishOptions = {
persistent: input.options?.persistent ?? true,
priority: input.options?.priority,
expiration: input.options?.expiration,
messageId: input.options?.messageId,
timestamp: input.options?.timestamp ?? Date.now(),
type: input.options?.type,
userId: input.options?.userId,
appId: input.options?.appId,
clusterId: input.options?.clusterId,
headers: input.options?.headers,
};
const success = channel.publish(
exchange,
input.routing_key,
messageBuffer,
publishOptions
);
if (success) {
return {
message: "Message published successfully",
exchange: exchange || "(default)",
routing_key: input.routing_key,
content: input.message,
options: publishOptions
};
} else {
return "Warning: Message may not have been published (channel buffer full)";
}
} catch (error) {
return `Error: ${error instanceof Error ? error.message : 'Unknown error'}`;
}
}
}
export default MessagePublisher;