Skip to main content
Glama

Agent MCP

agentCommunication.ts15.8 kB
// Agent communication tools for Agent-MCP Node.js // Ported from Python agent_communication_tools.py import { z } from 'zod'; import { randomBytes } from 'crypto'; import { registerTool } from './registry.js'; import { getDbConnection } from '../db/connection.js'; import { getAgentId, verifyToken } from '../core/auth.js'; import { globalState } from '../core/globals.js'; import { sessionExists, sanitizeSessionName, sendCommandToSession, sendPromptToSession } from '../utils/tmux.js'; import { MCP_DEBUG } from '../core/config.js'; /** * Generate a unique message ID */ function generateMessageId(): string { return `msg_${randomBytes(8).toString('hex')}`; } /** * Check if two agents are allowed to communicate */ function canAgentsCommunicate(senderId: string, recipientId: string, isAdmin: boolean): [boolean, string] { // Admin can always communicate if (isAdmin) { return [true, "Admin privileges"]; } // Check if both agents are active if (globalState.activeAgents.has(senderId) && globalState.activeAgents.has(recipientId)) { return [true, "Both agents are active"]; } // Could be extended with more sophisticated permission system return [false, "Communication not permitted between these agents"]; } /** * Tool for sending messages between agents */ registerTool( 'send_agent_message', 'Send a message to another agent with permission checks and delivery options.', z.object({ token: z.string().describe('Sender\'s authentication token'), recipient_id: z.string().describe('ID of the agent to send message to'), message: z.string().max(4000).describe('Message content (max 4000 characters)'), message_type: z.enum(['text', 'assistance_request', 'task_update', 'notification', 'stop_command']) .optional().default('text').describe('Type of message'), priority: z.enum(['low', 'normal', 'high', 'urgent']) .optional().default('normal').describe('Message priority'), deliver_method: z.enum(['tmux', 'store', 'both']) .optional().default('tmux').describe('How to deliver the message') }), async (args, context) => { try { const { token, recipient_id, message, message_type = 'text', priority = 'normal', deliver_method = 'tmux' } = args; // Authentication const senderId = getAgentId(token); if (!senderId) { return { content: [{ type: 'text' as const, text: 'Unauthorized: Valid token required' }], isError: true }; } // Validation if (!recipient_id || !message) { return { content: [{ type: 'text' as const, text: 'Error: recipient_id and message are required' }], isError: true }; } // Admin-only check for stop commands const isAdmin = verifyToken(token, 'admin'); if (message_type === 'stop_command' && !isAdmin) { return { content: [{ type: 'text' as const, text: 'Error: Only admin can send stop commands' }], isError: true }; } // Permission check const [canCommunicate, reason] = canAgentsCommunicate(senderId, recipient_id, isAdmin); if (!canCommunicate) { return { content: [{ type: 'text' as const, text: `Communication denied: ${reason}` }], isError: true }; } // Create message data const messageId = generateMessageId(); const timestamp = new Date().toISOString(); const db = getDbConnection(); // Store message in database const stmt = db.prepare(` INSERT INTO agent_messages (message_id, sender_id, recipient_id, message_content, message_type, priority, timestamp, delivered, read) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) `); stmt.run(messageId, senderId, recipient_id, message, message_type, priority, timestamp, 0, 0); let deliveryStatus = 'stored'; // Attempt delivery based on method if (deliver_method === 'tmux' || deliver_method === 'both') { // Try to deliver to recipient's tmux session const recipientSession = globalState.agentTmuxSessions.get(recipient_id); if (recipientSession && await sessionExists(recipientSession)) { try { if (message_type === 'stop_command') { // Send escape sequences to interrupt current operation const cleanSession = sanitizeSessionName(recipientSession); for (let i = 0; i < 4; i++) { await sendCommandToSession(cleanSession, 'Escape'); await new Promise(resolve => setTimeout(resolve, 1000)); } deliveryStatus = 'delivered_via_tmux'; } else { // Format and send the message const formattedMessage = ` 📨 **Message from ${senderId}** (${priority.toUpperCase()}) Type: ${message_type} Time: ${new Date(timestamp).toLocaleString()} ${message} ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ `; const success = await sendPromptToSession(recipientSession, formattedMessage, 3); if (success) { deliveryStatus = 'delivered_via_tmux'; // Mark as delivered in database const updateStmt = db.prepare('UPDATE agent_messages SET delivered = 1 WHERE message_id = ?'); updateStmt.run(messageId); } } } catch (error) { console.error(`Failed to deliver message to ${recipient_id}:`, error); } } } if (MCP_DEBUG) { console.log(`📨 Message sent: ${senderId} → ${recipient_id} (${message_type}, ${deliveryStatus})`); } return { content: [{ type: 'text' as const, text: `✅ Message sent successfully to ${recipient_id}\n- Message ID: ${messageId}\n- Delivery: ${deliveryStatus}\n- Type: ${message_type}\n- Priority: ${priority}` }], isError: false }; } catch (error) { const errorMessage = error instanceof Error ? error.message : String(error); console.error('Error sending agent message:', error); return { content: [{ type: 'text' as const, text: `❌ Failed to send message: ${errorMessage}` }], isError: true }; } } ); /** * Tool for retrieving agent messages */ registerTool( 'get_agent_messages', 'Retrieve messages for the current agent.', z.object({ token: z.string().describe('Agent\'s authentication token'), include_sent: z.boolean().optional().default(false).describe('Include messages sent by this agent'), include_received: z.boolean().optional().default(true).describe('Include messages received by this agent'), mark_as_read: z.boolean().optional().default(true).describe('Mark retrieved messages as read'), limit: z.number().int().min(1).max(100).optional().default(20).describe('Maximum number of messages to retrieve'), message_type: z.enum(['text', 'assistance_request', 'task_update', 'notification', 'stop_command']) .optional().describe('Filter by message type'), unread_only: z.boolean().optional().default(false).describe('Only show unread messages') }), async (args, context) => { try { const { token, include_sent = false, include_received = true, mark_as_read = true, limit = 20, message_type, unread_only = false } = args; // Authentication const agentId = getAgentId(token); if (!agentId) { return { content: [{ type: 'text' as const, text: 'Unauthorized: Valid token required' }], isError: true }; } const db = getDbConnection(); // Build query conditions const conditions: string[] = []; const params: any[] = []; if (include_sent && include_received) { conditions.push('(sender_id = ? OR recipient_id = ?)'); params.push(agentId, agentId); } else if (include_sent) { conditions.push('sender_id = ?'); params.push(agentId); } else if (include_received) { conditions.push('recipient_id = ?'); params.push(agentId); } if (message_type) { conditions.push('message_type = ?'); params.push(message_type); } if (unread_only) { conditions.push('read = 0'); } const whereClause = conditions.length > 0 ? `WHERE ${conditions.join(' AND ')}` : ''; const query = ` SELECT message_id, sender_id, recipient_id, message_content, message_type, priority, timestamp, delivered, read FROM agent_messages ${whereClause} ORDER BY timestamp DESC LIMIT ? `; params.push(limit); const stmt = db.prepare(query); const messages = stmt.all(...params) as any[]; // Mark messages as read if requested if (mark_as_read && messages.length > 0) { const messageIds = messages .filter(msg => msg.recipient_id === agentId && !msg.read) .map(msg => msg.message_id); if (messageIds.length > 0) { const placeholders = messageIds.map(() => '?').join(','); const updateStmt = db.prepare(`UPDATE agent_messages SET read = 1 WHERE message_id IN (${placeholders})`); updateStmt.run(...messageIds); } } // Format response if (messages.length === 0) { return { content: [{ type: 'text' as const, text: '📭 No messages found matching your criteria.' }], isError: false }; } const formattedMessages = messages.map((msg, index) => { const isReceived = msg.recipient_id === agentId; const direction = isReceived ? '📥 RECEIVED' : '📤 SENT'; const otherParty = isReceived ? msg.sender_id : msg.recipient_id; const readStatus = msg.read ? '✓' : '●'; const deliveryStatus = msg.delivered ? '✓ delivered' : '○ stored'; return `${index + 1}. ${direction} ${readStatus} ${msg.message_type.toUpperCase()} | ${msg.priority.toUpperCase()} | ${deliveryStatus} ${isReceived ? 'From' : 'To'}: ${otherParty} Time: ${new Date(msg.timestamp).toLocaleString()} ID: ${msg.message_id} ${msg.message_content}`; }).join('\n\n━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━\n\n'); return { content: [{ type: 'text' as const, text: `📬 **Agent Messages for ${agentId}**\nShowing ${messages.length} message(s)\n\n${formattedMessages}` }], isError: false }; } catch (error) { const errorMessage = error instanceof Error ? error.message : String(error); console.error('Error retrieving agent messages:', error); return { content: [{ type: 'text' as const, text: `❌ Failed to retrieve messages: ${errorMessage}` }], isError: true }; } } ); /** * Tool for admin to broadcast messages to all active agents */ registerTool( 'broadcast_admin_message', 'Admin-only tool to broadcast a message to all active agents.', z.object({ token: z.string().describe('Admin authentication token'), message: z.string().max(4000).describe('Message content to broadcast'), message_type: z.enum(['broadcast', 'announcement', 'system_alert']) .optional().default('broadcast').describe('Type of broadcast message'), priority: z.enum(['low', 'normal', 'high', 'urgent']) .optional().default('high').describe('Message priority') }), async (args, context) => { try { const { token, message, message_type = 'broadcast', priority = 'high' } = args; // Admin authentication required if (!verifyToken(token, 'admin')) { return { content: [{ type: 'text' as const, text: 'Unauthorized: Admin privileges required' }], isError: true }; } // Validation if (!message) { return { content: [{ type: 'text' as const, text: 'Error: message is required' }], isError: true }; } const db = getDbConnection(); const adminId = 'admin'; const timestamp = new Date().toISOString(); // Get all active agents const activeAgents = Array.from(globalState.activeAgents.keys()); if (activeAgents.length === 0) { return { content: [{ type: 'text' as const, text: '📭 No active agents to broadcast to.' }], isError: false }; } let deliveredCount = 0; const results: string[] = []; // Send to each active agent for (const recipientId of activeAgents) { try { const messageId = generateMessageId(); // Store in database const stmt = db.prepare(` INSERT INTO agent_messages (message_id, sender_id, recipient_id, message_content, message_type, priority, timestamp, delivered, read) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) `); stmt.run(messageId, adminId, recipientId, message, message_type, priority, timestamp, 0, 0); // Try to deliver via tmux const recipientSession = globalState.agentTmuxSessions.get(recipientId); if (recipientSession && await sessionExists(recipientSession)) { const formattedMessage = ` 🔔 **ADMIN BROADCAST** (${priority.toUpperCase()}) Type: ${message_type.toUpperCase()} Time: ${new Date(timestamp).toLocaleString()} ${message} ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ `; const success = await sendPromptToSession(recipientSession, formattedMessage, 3); if (success) { deliveredCount++; // Mark as delivered const updateStmt = db.prepare('UPDATE agent_messages SET delivered = 1 WHERE message_id = ?'); updateStmt.run(messageId); results.push(`✓ ${recipientId}: delivered via tmux`); } else { results.push(`○ ${recipientId}: stored only (tmux delivery failed)`); } } else { results.push(`○ ${recipientId}: stored only (no tmux session)`); } } catch (error) { results.push(`✗ ${recipientId}: failed (${error instanceof Error ? error.message : String(error)})`); } } if (MCP_DEBUG) { console.log(`📢 Admin broadcast sent to ${activeAgents.length} agents, ${deliveredCount} delivered via tmux`); } return { content: [{ type: 'text' as const, text: `📢 **Broadcast Complete**\n\nMessage sent to ${activeAgents.length} active agent(s)\nDelivered via tmux: ${deliveredCount}\n\n**Delivery Results:**\n${results.join('\n')}` }], isError: false }; } catch (error) { const errorMessage = error instanceof Error ? error.message : String(error); console.error('Error broadcasting admin message:', error); return { content: [{ type: 'text' as const, text: `❌ Failed to broadcast message: ${errorMessage}` }], isError: true }; } } ); console.log('✅ Agent communication tools loaded');

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/rinadelph/Agent-MCP'

If you have feedback or need assistance with the MCP directory API, please join our Discord server