Skip to main content
Glama

Agent MCP

agent_communication_tools.py•22.5 kB
# Agent-MCP/agent_mcp/tools/agent_communication_tools.py import json import datetime import secrets import sqlite3 from typing import List, Dict, Any, Optional from pathlib import Path import os import mcp.types as mcp_types from .registry import register_tool from ..core.config import logger from ..core import globals as g from ..core.auth import verify_token, get_agent_id from ..utils.audit_utils import log_audit from ..db.connection import get_db_connection from ..db.actions.agent_actions_db import log_agent_action_to_db from ..utils.tmux_utils import send_prompt_async, session_exists, sanitize_session_name, send_command_to_session def _generate_message_id() -> str: """Generate a unique message ID.""" return f"msg_{secrets.token_hex(8)}" def _can_agents_communicate(sender_id: str, recipient_id: str, is_admin: bool) -> tuple[bool, str]: """ Check if two agents are allowed to communicate. Args: sender_id: ID of the sending agent recipient_id: ID of the receiving agent is_admin: Whether the sender has admin privileges Returns: Tuple of (allowed: bool, reason: str) """ # Admin can always communicate with anyone if is_admin: return True, "Admin privileges" # Self-communication not allowed (use internal methods) if sender_id == recipient_id: return False, "Self-communication not allowed" # Admin agent can always be contacted if recipient_id == "admin" or recipient_id.lower().startswith("admin"): return True, "Admin agent always contactable" # Check if recipient allows communication from sender # This could be extended with a permission system in the database # For now, we'll use a simple rule: agents can communicate if they're both active if sender_id in g.active_agents and recipient_id in g.active_agents: return True, "Both agents are active" # Check if either agent is in the same task context # (This would require additional task relationship checking) return False, "Communication not permitted between these agents" async def send_agent_message_tool_impl(arguments: Dict[str, Any]) -> List[mcp_types.TextContent]: """ Send a message from one agent to another with permission checks. Messages can be delivered via tmux session or stored for later retrieval. """ sender_token = arguments.get("token") recipient_id = arguments.get("recipient_id") message_content = arguments.get("message") message_type = arguments.get("message_type", "text") # text, assistance_request, task_update priority = arguments.get("priority", "normal") # low, normal, high, urgent deliver_method = arguments.get("deliver_method", "tmux") # tmux, store, both # Authentication sender_id = get_agent_id(sender_token) if not sender_id: return [mcp_types.TextContent(type="text", text="Unauthorized: Valid token required")] # Validation if not recipient_id or not message_content: return [mcp_types.TextContent(type="text", text="Error: recipient_id and message are required")] if len(message_content) > 4000: # Reasonable message size limit return [mcp_types.TextContent(type="text", text="Error: Message too long (max 4000 characters)")] # Admin-only check for stop commands is_admin = verify_token(sender_token, "admin") if message_type == "stop_command" and not is_admin: return [mcp_types.TextContent(type="text", text="Error: Only admin can send stop commands")] # Permission check can_communicate, reason = _can_agents_communicate(sender_id, recipient_id, is_admin) if not can_communicate: return [mcp_types.TextContent(type="text", text=f"Communication denied: {reason}")] # Create message data message_id = _generate_message_id() timestamp = datetime.datetime.now().isoformat() message_data = { "message_id": message_id, "sender_id": sender_id, "recipient_id": recipient_id, "message_content": message_content, "message_type": message_type, "priority": priority, "timestamp": timestamp, "delivered": False, "read": False } conn = None try: conn = get_db_connection() cursor = conn.cursor() # Store message in database cursor.execute(""" INSERT INTO agent_messages (message_id, sender_id, recipient_id, message_content, message_type, priority, timestamp, delivered, read) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) """, (message_id, sender_id, recipient_id, message_content, message_type, priority, timestamp, False, False)) # Attempt delivery based on method delivery_status = "stored" if deliver_method in ["tmux", "both"]: # Try to deliver to recipient's tmux session if recipient_id in g.agent_tmux_sessions: session_name = g.agent_tmux_sessions[recipient_id] if session_exists(session_name): # Handle stop commands differently if message_type == "stop_command": # Send control sequence to interrupt the agent try: import subprocess clean_session_name = sanitize_session_name(session_name) # Send Escape 4 times with 1 second intervals to stop current operation import time success = True for i in range(4): result = subprocess.run(['tmux', 'send-keys', '-t', clean_session_name, 'Escape'], capture_output=True, text=True, timeout=5) if result.returncode != 0: success = False break logger.debug(f"Sent Escape {i+1}/4 to agent {recipient_id}") if i < 3: # Don't sleep after the last one time.sleep(1) if success: delivery_status = "delivered_stop_command" logger.info(f"Stop command (4x Escape) sent to agent {recipient_id} in session {session_name}") else: delivery_status = "stop_command_failed" logger.error(f"Failed to send stop command: {result.stderr}") # Mark as delivered in database cursor.execute("UPDATE agent_messages SET delivered = ? WHERE message_id = ?", (success, message_id)) except Exception as e: logger.error(f"Failed to send stop command to tmux session '{session_name}': {e}") delivery_status = "stop_command_failed" else: # Format regular message for delivery formatted_message = f"\nšŸ’¬ Message from {sender_id} ({priority}): {message_content}\n" # Send message to tmux session try: send_prompt_async(session_name, formatted_message, delay_seconds=1) delivery_status = "delivered_tmux" # Mark as delivered in database cursor.execute("UPDATE agent_messages SET delivered = ? WHERE message_id = ?", (True, message_id)) except Exception as e: logger.error(f"Failed to deliver message to tmux session '{session_name}': {e}") delivery_status = "delivery_failed" else: delivery_status = "session_not_found" else: delivery_status = "no_session" # Log the communication log_agent_action_to_db(cursor, sender_id, "send_message", details={ "recipient": recipient_id, "message_type": message_type, "priority": priority, "delivery_status": delivery_status }) conn.commit() # Audit log log_audit(sender_id, "send_agent_message", { "recipient": recipient_id, "message_type": message_type, "priority": priority, "delivery_status": delivery_status, "message_id": message_id }) # Build response status_messages = { "stored": "Message stored for recipient", "delivered_tmux": "Message delivered to recipient's session", "delivery_failed": "Message stored but delivery failed", "session_not_found": "Message stored; recipient session not active", "no_session": "Message stored; recipient has no active session", "delivered_stop_command": "Stop command sent to recipient's session", "stop_command_failed": "Stop command failed to send" } response_text = f"Message sent to {recipient_id}. {status_messages.get(delivery_status, 'Unknown status')}" if delivery_status not in ["delivered_tmux", "delivered_stop_command"]: response_text += f" (Message ID: {message_id})" return [mcp_types.TextContent(type="text", text=response_text)] except sqlite3.Error as e: if conn: conn.rollback() logger.error(f"Database error sending message: {e}", exc_info=True) return [mcp_types.TextContent(type="text", text=f"Database error sending message: {e}")] except Exception as e: if conn: conn.rollback() logger.error(f"Unexpected error sending message: {e}", exc_info=True) return [mcp_types.TextContent(type="text", text=f"Unexpected error sending message: {e}")] finally: if conn: conn.close() async def get_agent_messages_tool_impl(arguments: Dict[str, Any]) -> List[mcp_types.TextContent]: """ Retrieve messages for an agent. """ agent_token = arguments.get("token") include_sent = arguments.get("include_sent", False) include_received = arguments.get("include_received", True) mark_as_read = arguments.get("mark_as_read", True) limit = arguments.get("limit", 20) message_type_filter = arguments.get("message_type") unread_only = arguments.get("unread_only", False) # Authentication agent_id = get_agent_id(agent_token) if not agent_id: return [mcp_types.TextContent(type="text", text="Unauthorized: Valid token required")] # Validation try: limit = int(limit) if not (1 <= limit <= 100): limit = 20 except (ValueError, TypeError): limit = 20 conn = None try: conn = get_db_connection() cursor = conn.cursor() # Build query query_conditions = [] query_params = [] if include_received and include_sent: query_conditions.append("(recipient_id = ? OR sender_id = ?)") query_params.extend([agent_id, agent_id]) elif include_received: query_conditions.append("recipient_id = ?") query_params.append(agent_id) elif include_sent: query_conditions.append("sender_id = ?") query_params.append(agent_id) else: return [mcp_types.TextContent(type="text", text="Error: Must include sent or received messages")] if message_type_filter: query_conditions.append("message_type = ?") query_params.append(message_type_filter) if unread_only: query_conditions.append("read = ?") query_params.append(False) where_clause = " AND ".join(query_conditions) query = f""" SELECT message_id, sender_id, recipient_id, message_content, message_type, priority, timestamp, delivered, read FROM agent_messages WHERE {where_clause} ORDER BY timestamp DESC LIMIT ? """ query_params.append(limit) cursor.execute(query, query_params) messages = cursor.fetchall() # Mark received messages as read if requested if mark_as_read and include_received: message_ids_to_mark = [msg["message_id"] for msg in messages if msg["recipient_id"] == agent_id and not msg["read"]] if message_ids_to_mark: placeholders = ",".join("?" * len(message_ids_to_mark)) cursor.execute(f"UPDATE agent_messages SET read = ? WHERE message_id IN ({placeholders})", [True] + message_ids_to_mark) conn.commit() # Format response if not messages: return [mcp_types.TextContent(type="text", text="No messages found")] response_lines = [f"Messages for {agent_id} (showing {len(messages)} of max {limit}):"] response_lines.append("") for msg in messages: direction = "āž”ļø" if msg["sender_id"] == agent_id else "ā¬…ļø" other_agent = msg["recipient_id"] if msg["sender_id"] == agent_id else msg["sender_id"] read_status = "šŸ“–" if msg["read"] else "šŸ“©" priority_icon = {"low": "šŸ”µ", "normal": "⚪", "high": "🟔", "urgent": "šŸ”“"}.get(msg["priority"], "⚪") response_lines.append(f"{direction} {read_status} {priority_icon} [{msg['message_type']}] {other_agent}") response_lines.append(f" {msg['timestamp']}") response_lines.append(f" {msg['message_content']}") response_lines.append("") log_audit(agent_id, "get_agent_messages", { "messages_retrieved": len(messages), "include_sent": include_sent, "include_received": include_received }) return [mcp_types.TextContent(type="text", text="\n".join(response_lines))] except sqlite3.Error as e: logger.error(f"Database error retrieving messages: {e}", exc_info=True) return [mcp_types.TextContent(type="text", text=f"Database error retrieving messages: {e}")] except Exception as e: logger.error(f"Unexpected error retrieving messages: {e}", exc_info=True) return [mcp_types.TextContent(type="text", text=f"Unexpected error retrieving messages: {e}")] finally: if conn: conn.close() async def broadcast_admin_message_tool_impl(arguments: Dict[str, Any]) -> List[mcp_types.TextContent]: """ Admin-only tool to broadcast a message to all active agents. """ admin_token = arguments.get("token") message_content = arguments.get("message") message_type = arguments.get("message_type", "broadcast") priority = arguments.get("priority", "high") # Authentication (admin only) if not verify_token(admin_token, "admin"): return [mcp_types.TextContent(type="text", text="Unauthorized: Admin token required")] if not message_content: return [mcp_types.TextContent(type="text", text="Error: message is required")] # Get all active agents active_agents = list(g.active_agents.keys()) if not active_agents: return [mcp_types.TextContent(type="text", text="No active agents to broadcast to")] # Send to each agent sent_count = 0 failed_count = 0 for agent_token in active_agents: agent_data = g.active_agents[agent_token] recipient_id = agent_data.get("agent_id") if recipient_id and recipient_id != "admin": # Don't send to admin itself try: # Use the send message function result = await send_agent_message_tool_impl({ "token": admin_token, "recipient_id": recipient_id, "message": message_content, "message_type": message_type, "priority": priority, "deliver_method": "both" }) sent_count += 1 except Exception as e: logger.error(f"Failed to send broadcast to {recipient_id}: {e}") failed_count += 1 log_audit("admin", "broadcast_message", { "message_type": message_type, "priority": priority, "sent_count": sent_count, "failed_count": failed_count }) return [mcp_types.TextContent( type="text", text=f"Broadcast sent to {sent_count} agents. {failed_count} failed." )] def register_agent_communication_tools(): """Register agent communication tools.""" register_tool( name="send_agent_message", description="Send a message to another agent with permission checks and delivery options.", input_schema={ "type": "object", "properties": { "token": { "type": "string", "description": "Sender's authentication token" }, "recipient_id": { "type": "string", "description": "ID of the agent to send message to" }, "message": { "type": "string", "description": "Message content (max 4000 characters)" }, "message_type": { "type": "string", "description": "Type of message", "enum": ["text", "assistance_request", "task_update", "notification", "stop_command"], "default": "text" }, "priority": { "type": "string", "description": "Message priority", "enum": ["low", "normal", "high", "urgent"], "default": "normal" }, "deliver_method": { "type": "string", "description": "How to deliver the message", "enum": ["tmux", "store", "both"], "default": "tmux" } }, "required": ["token", "recipient_id", "message"], "additionalProperties": False }, implementation=send_agent_message_tool_impl ) register_tool( name="get_agent_messages", description="Retrieve messages for the current agent.", input_schema={ "type": "object", "properties": { "token": { "type": "string", "description": "Agent's authentication token" }, "include_sent": { "type": "boolean", "description": "Include messages sent by this agent", "default": False }, "include_received": { "type": "boolean", "description": "Include messages received by this agent", "default": True }, "mark_as_read": { "type": "boolean", "description": "Mark retrieved messages as read", "default": True }, "limit": { "type": "integer", "description": "Maximum number of messages to retrieve", "default": 20, "minimum": 1, "maximum": 100 }, "message_type": { "type": "string", "description": "Filter by message type", "enum": ["text", "assistance_request", "task_update", "notification", "stop_command"] }, "unread_only": { "type": "boolean", "description": "Only show unread messages", "default": False } }, "required": ["token"], "additionalProperties": False }, implementation=get_agent_messages_tool_impl ) register_tool( name="broadcast_admin_message", description="Admin-only tool to broadcast a message to all active agents.", input_schema={ "type": "object", "properties": { "token": { "type": "string", "description": "Admin authentication token" }, "message": { "type": "string", "description": "Message content to broadcast" }, "message_type": { "type": "string", "description": "Type of broadcast message", "enum": ["broadcast", "announcement", "system_alert"], "default": "broadcast" }, "priority": { "type": "string", "description": "Message priority", "enum": ["low", "normal", "high", "urgent"], "default": "high" } }, "required": ["token", "message"], "additionalProperties": False }, implementation=broadcast_admin_message_tool_impl ) # Auto-register when imported register_agent_communication_tools()

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/rinadelph/Agent-MCP'

If you have feedback or need assistance with the MCP directory API, please join our Discord server