Skip to main content
Glama

Agent MCP

admin_tools.py60.9 kB
# Agent-MCP/mcp_template/mcp_server_src/tools/admin_tools.py import json import datetime import subprocess # For launching Cursor (will be commented out) import os import sqlite3 from typing import List, Dict, Any, Optional import mcp.types as mcp_types # Assuming this is your mcp.types path from .registry import register_tool from ..core.config import logger, AGENT_COLORS # AGENT_COLORS for create_agent from ..core import globals as g from ..core.auth import ( verify_token, generate_token, ) # For create_agent, terminate_agent from ..utils.audit_utils import log_audit from ..utils.project_utils import generate_system_prompt # For create_agent from ..utils.tmux_utils import ( is_tmux_available, create_tmux_session, kill_tmux_session, session_exists, sanitize_session_name, list_tmux_sessions, send_prompt_async, send_command_to_session, ) from ..utils.prompt_templates import build_agent_prompt from ..db.connection import get_db_connection, execute_db_write from ..db.actions.agent_actions_db import log_agent_action_to_db # For DB logging def get_admin_token_suffix(admin_token: str) -> str: """ Extract the last 4 characters from admin token for session naming. Args: admin_token: The admin authentication token Returns: Last 4 characters of the token in lowercase """ if not admin_token or len(admin_token) < 4: return "0000" # Fallback for invalid tokens return admin_token[-4:].lower() def create_agent_session_name(agent_id: str, admin_token: str) -> str: """ Create agent session name in format: agent_id-suffix where suffix is the last 4 characters of the admin token. Args: agent_id: The agent identifier admin_token: The admin authentication token Returns: Session name in format "agent_id-def2" where def2 is from admin token """ suffix = get_admin_token_suffix(admin_token) clean_agent_id = sanitize_session_name(agent_id) return f"{clean_agent_id}-{suffix}" # --- create_agent tool --- # Original logic from main.py: lines 1060-1203 (create_agent_tool function) async def create_agent_tool_impl( arguments: Dict[str, Any], ) -> List[mcp_types.TextContent]: token = arguments.get("token") agent_id = arguments.get("agent_id") capabilities = arguments.get("capabilities") # This was List[str] task_ids = arguments.get("task_ids") # Required list of task IDs # New prompt-related parameters prompt_template = arguments.get( "prompt_template", "worker_with_rag" ) # Default to RAG worker custom_prompt = arguments.get("custom_prompt") # Custom prompt text send_prompt = arguments.get("send_prompt", True) # Default to auto-send prompt prompt_delay = arguments.get("prompt_delay", 5) # Default 5 second delay if not verify_token(token, "admin"): # main.py:1066 return [ mcp_types.TextContent( type="text", text="Unauthorized: Admin token required" ) ] if not agent_id or not isinstance(agent_id, str): return [ mcp_types.TextContent( type="text", text="Error: agent_id is required and must be a string." ) ] # Validate task_ids parameter if not task_ids: return [ mcp_types.TextContent( type="text", text="Error: task_ids is required and must be a non-empty list.", ) ] if not isinstance(task_ids, list) or not task_ids: return [ mcp_types.TextContent( type="text", text="Error: task_ids must be a non-empty list of task IDs.", ) ] # Validate each task_id is a string for task_id in task_ids: if not isinstance(task_id, str): return [ mcp_types.TextContent( type="text", text=f"Error: All task IDs must be strings. Found: {type(task_id).__name__}", ) ] # Check in-memory map first (main.py:1072) if agent_id in g.agent_working_dirs: return [ mcp_types.TextContent( type="text", text=f"Agent '{agent_id}' already exists (in active memory).", ) ] conn = None try: conn = get_db_connection() cursor = conn.cursor() # Double check in DB (main.py:1077-1081) cursor.execute("SELECT agent_id FROM agents WHERE agent_id = ?", (agent_id,)) if cursor.fetchone(): return [ mcp_types.TextContent( type="text", text=f"Agent '{agent_id}' already exists (in database).", ) ] # Validate task existence and availability for task_id in task_ids: cursor.execute( "SELECT task_id, assigned_to, status FROM tasks WHERE task_id = ?", (task_id,), ) task_row = cursor.fetchone() if not task_row: return [ mcp_types.TextContent( type="text", text=f"Error: Task '{task_id}' not found in database.", ) ] task_data = dict(task_row) # Check if task is already assigned if task_data.get("assigned_to") is not None: return [ mcp_types.TextContent( type="text", text=f"Error: Task '{task_id}' is already assigned to agent '{task_data['assigned_to']}'.", ) ] # Check if task is in a valid state for assignment task_status = task_data.get("status", "").lower() if task_status not in ["created", "unassigned"]: return [ mcp_types.TextContent( type="text", text=f"Error: Task '{task_id}' has status '{task_status}' and cannot be assigned. Only tasks with status 'created' or 'unassigned' can be assigned.", ) ] # Generate token and prepare data (main.py:1089-1092) new_agent_token = generate_token() created_at_iso = datetime.datetime.now().isoformat() capabilities_json = json.dumps(capabilities or []) status = "created" # Or "active" immediately? Original used "created". # Assign a color (main.py:1095-1097) agent_color = AGENT_COLORS[g.agent_color_index % len(AGENT_COLORS)] g.agent_color_index += 1 # Determine working directory - all agents use shared project directory # MCP_PROJECT_DIR is set by cli.py or server startup. project_dir_env = os.environ.get("MCP_PROJECT_DIR") if not project_dir_env: logger.error( "MCP_PROJECT_DIR environment variable not set. Cannot determine agent working directory." ) return [ mcp_types.TextContent( type="text", text="Server configuration error: MCP_PROJECT_DIR not set.", ) ] # All agents work in the same shared directory with file-level locking agent_working_dir_abs = os.path.abspath(project_dir_env) # Ensure the working directory exists try: os.makedirs(agent_working_dir_abs, exist_ok=True) except OSError as e: logger.error( f"Failed to create working directory {agent_working_dir_abs} for agent {agent_id}: {e}" ) return [ mcp_types.TextContent( type="text", text=f"Error creating working directory: {e}" ) ] # Insert into Database (main.py:1107-1117) cursor.execute( """ INSERT INTO agents (token, agent_id, capabilities, created_at, status, working_directory, color, updated_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?) """, ( new_agent_token, agent_id, capabilities_json, created_at_iso, status, agent_working_dir_abs, agent_color, created_at_iso, # updated_at initially same as created_at ), ) # Log action to agent_actions table (main.py:1119) log_agent_action_to_db( cursor, "admin", "created_agent", details={ "agent_id": agent_id, "color": agent_color, "wd": agent_working_dir_abs, }, ) # Assign tasks to the agent atomically assigned_tasks = [] for task_id in task_ids: # Update task assignment cursor.execute( "UPDATE tasks SET assigned_to = ?, status = 'pending', updated_at = ? WHERE task_id = ?", (agent_id, created_at_iso, task_id), ) if cursor.rowcount == 0: # This should not happen since we validated earlier, but let's be safe raise Exception( f"Failed to assign task '{task_id}' to agent '{agent_id}'" ) assigned_tasks.append(task_id) # Update the in-memory global cache (g.tasks) to reflect the assignment if task_id in g.tasks: g.tasks[task_id]["assigned_to"] = agent_id g.tasks[task_id]["status"] = "pending" g.tasks[task_id]["updated_at"] = created_at_iso else: # If task not in cache, fetch from database and add to cache cursor.execute("SELECT * FROM tasks WHERE task_id = ?", (task_id,)) task_row = cursor.fetchone() if task_row: task_data = dict(task_row) task_data["assigned_to"] = ( agent_id # Ensure assignment is reflected ) task_data["status"] = "pending" task_data["updated_at"] = created_at_iso g.tasks[task_id] = task_data # Log task assignment action log_agent_action_to_db( cursor, "admin", "assigned_task", details={ "agent_id": agent_id, "task_id": task_id, "assignment_mode": "agent_creation", }, ) # Update agent with current task (set to first task if multiple) if assigned_tasks: cursor.execute( "UPDATE agents SET current_task = ? WHERE agent_id = ?", (assigned_tasks[0], agent_id), ) # Commit the transaction (agent creation + task assignments) conn.commit() # Update in-memory state (main.py:1126-1133) g.active_agents[new_agent_token] = { "agent_id": agent_id, "capabilities": capabilities or [], "created_at": created_at_iso, "status": status, "current_task": assigned_tasks[0] if assigned_tasks else None, "color": agent_color, } g.agent_working_dirs[agent_id] = agent_working_dir_abs # Log to audit log file (main.py:1136-1144) log_audit( "admin", "create_agent", { "agent_id": agent_id, "capabilities": capabilities or [], "working_directory": agent_working_dir_abs, "assigned_color": agent_color, "assigned_tasks": assigned_tasks, "current_task": assigned_tasks[0] if assigned_tasks else None, }, ) # Generate system prompt (main.py:1147) # The original passed `token` (admin token) if agent_id started with "admin". # `generate_system_prompt` now takes `admin_token_runtime`. system_prompt_str = generate_system_prompt( agent_id, new_agent_token, g.admin_token ) # Launch tmux session with Claude launch_status = "tmux session launching disabled - tmux not available." tmux_session_name = None if is_tmux_available(): try: # Create sanitized session name tmux_session_name = create_agent_session_name(agent_id, token) # Set up environment variables for the agent env_vars = { "MCP_AGENT_ID": agent_id, "MCP_AGENT_TOKEN": new_agent_token, "MCP_SERVER_URL": f"http://localhost:{os.environ.get('PORT', '8080')}", "MCP_WORKING_DIR": agent_working_dir_abs, } # Add admin token if this is an admin agent if ( agent_id.lower().startswith("admin") and new_agent_token == g.admin_token ): env_vars["MCP_ADMIN_TOKEN"] = g.admin_token # Create the tmux session (without immediate command) if create_tmux_session( session_name=tmux_session_name, working_dir=agent_working_dir_abs, command=None, # Don't start Claude immediately env_vars=env_vars, ): # Track the tmux session in globals g.agent_tmux_sessions[agent_id] = tmux_session_name # Initial setup commands for visibility in tmux session welcome_message = ( f"echo '=== Agent {agent_id} initialization starting ==='" ) if send_command_to_session(tmux_session_name, welcome_message): logger.info(f"✅ Sent welcome message to agent '{agent_id}'") else: logger.error( f"❌ Failed to send welcome message to agent '{agent_id}'" ) # Add setup delay to ensure commands execute properly import time def wait_for_command_completion( session_name: str, delay: float = 1.0 ): """Smart delay system - wait for command completion or timeout""" time.sleep(delay) # Could add tmux pane monitoring here in future for true completion detection setup_delay = 1.0 # 1 second delay between setup commands wait_for_command_completion(tmux_session_name, setup_delay) # Verify we're in the correct working directory verify_command = f"echo 'Working directory:' && pwd" if send_command_to_session(tmux_session_name, verify_command): logger.info( f"✅ Sent directory verification to agent '{agent_id}'" ) else: logger.error( f"❌ Failed to send directory verification to agent '{agent_id}'" ) wait_for_command_completion(tmux_session_name, setup_delay) # Get server port for MCP registration server_port = os.environ.get("PORT", "8080") mcp_server_url = f"http://localhost:{server_port}/sse" # Log MCP server info mcp_info_command = f"echo 'MCP Server URL: {mcp_server_url}'" send_command_to_session(tmux_session_name, mcp_info_command) wait_for_command_completion(tmux_session_name, setup_delay) # Register MCP server connection mcp_add_command = f"claude mcp add -t sse AgentMCP {mcp_server_url}" logger.info( f"Registering MCP server for agent '{agent_id}': {mcp_add_command}" ) if not send_command_to_session(tmux_session_name, mcp_add_command): logger.error( f"Failed to register MCP server for agent '{agent_id}'" ) base_status = ( f"❌ Failed to register MCP server for agent '{agent_id}'." ) else: # Add delay to ensure MCP registration completes wait_for_command_completion(tmux_session_name, setup_delay) # Verify MCP registration verify_mcp_command = "claude mcp list" logger.info( f"Verifying MCP registration for agent '{agent_id}'" ) send_command_to_session(tmux_session_name, verify_mcp_command) wait_for_command_completion(tmux_session_name, setup_delay) # Start Claude start_claude_message = "echo '--- Starting Claude with MCP ---'" send_command_to_session(tmux_session_name, start_claude_message) wait_for_command_completion(tmux_session_name, setup_delay) claude_command = "claude --dangerously-skip-permissions" logger.info( f"Starting Claude for agent '{agent_id}': {claude_command}" ) if not send_command_to_session( tmux_session_name, claude_command ): logger.error( f"Failed to start Claude for agent '{agent_id}'" ) base_status = f"❌ Failed to start Claude for agent '{agent_id}' after MCP registration." else: base_status = f"✅ tmux session '{tmux_session_name}' created for agent '{agent_id}' with MCP registration and Claude." # Log completion message to tmux session (will appear before Claude starts) completion_message = f"echo '=== Agent {agent_id} setup complete - Claude starting ==='" send_command_to_session( tmux_session_name, completion_message ) # Send prompt if requested prompt_status = "" if send_prompt: try: # Build the prompt using the template system agent_prompt = build_agent_prompt( agent_id=agent_id, agent_token=new_agent_token, admin_token=g.admin_token, template_name=prompt_template, custom_prompt=custom_prompt, ) if agent_prompt: # Send prompt asynchronously send_prompt_async( tmux_session_name, agent_prompt, prompt_delay ) prompt_status = f" Prompt will be sent in {prompt_delay} seconds using '{prompt_template}' template." logger.info( f"Scheduled prompt delivery for agent '{agent_id}' using template '{prompt_template}'" ) else: prompt_status = f" ❌ Failed to build prompt using template '{prompt_template}'." logger.error( f"Failed to build prompt for agent '{agent_id}' using template '{prompt_template}'" ) except Exception as e_prompt: prompt_status = ( f" ❌ Error setting up prompt: {str(e_prompt)}" ) logger.error( f"Error setting up prompt for agent '{agent_id}': {e_prompt}" ) launch_status = base_status + prompt_status logger.info( f"tmux session '{tmux_session_name}' launched for agent '{agent_id}'" ) else: launch_status = ( f"❌ Failed to create tmux session for agent '{agent_id}'." ) logger.error(launch_status) except Exception as e_launch: launch_status = f"❌ Failed to launch tmux session: {str(e_launch)}" logger.error(launch_status, exc_info=True) else: logger.warning( "tmux is not available - agent session cannot be launched automatically" ) launch_status = "⚠️ tmux not available - manual agent setup required." # All agents work in shared project directory with file-level locking # Log to console (main.py:1169-1174) console_output = ( f"\n=== Agent '{agent_id}' Created ===\n" f"Token: {new_agent_token}\n" f"Assigned Color: {agent_color}\n" f"Working Directory: {agent_working_dir_abs}\n" f"Assigned Tasks: {', '.join(assigned_tasks)}\n" f"Current Task: {assigned_tasks[0] if assigned_tasks else 'None'}\n" f"{launch_status}\n" f"=========================\n" f"=== System Prompt for {agent_id} ===\n{system_prompt_str}\n" f"=========================" ) logger.info( f"Agent '{agent_id}' created. Token: {new_agent_token}, Color: {agent_color}, WD: {agent_working_dir_abs}" ) print(console_output) # For direct CLI feedback return [ mcp_types.TextContent( type="text", text=f"Agent '{agent_id}' created successfully.\n" f"Token: {new_agent_token}\n" f"Assigned Color: {agent_color}\n" f"Working Directory: {agent_working_dir_abs}\n" f"Assigned Tasks: {', '.join(assigned_tasks)}\n" f"Current Task: {assigned_tasks[0] if assigned_tasks else 'None'}\n" f"{launch_status}\n\n" f"System Prompt:\n{system_prompt_str}", ) ] except sqlite3.Error as e_sql: if conn: conn.rollback() logger.error( f"Database error creating agent {agent_id}: {e_sql}", exc_info=True ) return [ mcp_types.TextContent( type="text", text=f"Database error creating agent: {e_sql}" ) ] except Exception as e: if conn: conn.rollback() logger.error(f"Unexpected error creating agent {agent_id}: {e}", exc_info=True) return [ mcp_types.TextContent( type="text", text=f"Unexpected error creating agent: {e}" ) ] finally: if conn: conn.close() # --- view_status tool --- # Original logic from main.py: lines 1242-1268 (view_status_tool function) async def view_status_tool_impl( arguments: Dict[str, Any], ) -> List[mcp_types.TextContent]: token = arguments.get("token") if not verify_token(token, "admin"): # main.py:1244 return [ mcp_types.TextContent( type="text", text="Unauthorized: Admin token required" ) ] log_audit("admin", "view_status", {}) # main.py:1249 # Build agent status from g.active_agents and g.agent_working_dirs (main.py:1251-1259) agent_status_dict = {} for agent_tkn, agent_data in g.active_agents.items(): agent_id = agent_data.get("agent_id") if agent_id: # Should always be present if agent_data is valid agent_status_dict[agent_id] = { "status": agent_data.get("status", "unknown"), "current_task": agent_data.get("current_task"), "capabilities": agent_data.get("capabilities", []), "working_directory": g.agent_working_dirs.get(agent_id, "N/A"), "color": agent_data.get( "color", "N/A" ), # Added color from active_agents } # Server uptime was N/A in original (main.py:1264) # We need a server start time global to calculate this, or pass it from app lifecycle. # For now, keeping it N/A for 1-to-1. server_start_time_iso = ( g.server_start_time if hasattr(g, "server_start_time") else None ) uptime_str = "N/A" if server_start_time_iso: uptime_delta = datetime.datetime.now() - datetime.datetime.fromisoformat( server_start_time_iso ) uptime_str = str(uptime_delta) # Get tmux session information tmux_info = { "tmux_available": is_tmux_available(), "tracked_sessions": len(g.agent_tmux_sessions), "active_sessions": [], "session_details": {}, } if is_tmux_available(): tmux_sessions = list_tmux_sessions() tmux_info["active_sessions"] = [s["name"] for s in tmux_sessions] tmux_info["session_details"] = {s["name"]: s for s in tmux_sessions} # Add tmux session info to agent details for agent_id, agent_data in agent_status_dict.items(): if agent_id in g.agent_tmux_sessions: session_name = g.agent_tmux_sessions[agent_id] agent_data["tmux_session"] = session_name agent_data["session_active"] = ( session_name in tmux_info["active_sessions"] ) else: agent_data["tmux_session"] = None agent_data["session_active"] = False status_payload = { # main.py:1260-1266 "active_connections": len( g.connections ), # g.connections might be managed by SSE transport layer "active_agents_count": len(g.active_agents), "agents_details": agent_status_dict, "server_uptime": uptime_str, "file_map_size": len(g.file_map), "file_map_preview": { k: v for i, (k, v) in enumerate(g.file_map.items()) if i < 5 }, # Preview first 5 "tmux_info": tmux_info, # Consider adding task counts, DB status, RAG index status etc. } try: status_json = json.dumps(status_payload, indent=2) except TypeError as e: logger.error(f"Error serializing server status to JSON: {e}") status_json = f"Error creating status JSON: {e}" return [ mcp_types.TextContent(type="text", text=f"MCP Server Status:\n{status_json}") ] # --- terminate_agent tool --- # Original logic from main.py: lines 1270-1316 (terminate_agent_tool function) async def terminate_agent_tool_impl( arguments: Dict[str, Any], ) -> List[mcp_types.TextContent]: token = arguments.get("token") agent_id_to_terminate = arguments.get("agent_id") if not verify_token(token, "admin"): # main.py:1274 return [ mcp_types.TextContent( type="text", text="Unauthorized: Admin token required" ) ] if not agent_id_to_terminate or not isinstance(agent_id_to_terminate, str): return [ mcp_types.TextContent( type="text", text="Error: agent_id to terminate is required." ) ] # Find agent token from in-memory map (main.py:1279-1283) found_agent_token: Optional[str] = None for tkn, data in g.active_agents.items(): if data.get("agent_id") == agent_id_to_terminate: found_agent_token = tkn break conn = None try: conn = get_db_connection() cursor = conn.cursor() if not found_agent_token: # Check DB if not found in memory (main.py:1285-1290) cursor.execute( "SELECT token FROM agents WHERE agent_id = ? AND status != ?", (agent_id_to_terminate, "terminated"), ) row = cursor.fetchone() if row: # Agent exists in DB but not active memory. Proceed to terminate in DB. logger.warning( f"Agent {agent_id_to_terminate} found in DB (token: {row['token']}) but not in active memory. Proceeding with DB termination." ) # We don't have its token to remove from g.active_agents if it's not there. else: return [ mcp_types.TextContent( type="text", text=f"Agent '{agent_id_to_terminate}' not found or already terminated.", ) ] # Update agent status in Database (main.py:1295-1302) terminated_at_iso = datetime.datetime.now().isoformat() cursor.execute( """ UPDATE agents SET status = ?, terminated_at = ?, updated_at = ?, current_task = NULL WHERE agent_id = ? AND status != ? """, ( "terminated", terminated_at_iso, terminated_at_iso, agent_id_to_terminate, "terminated", ), ) # Set current_task to NULL as well. if ( cursor.rowcount == 0 and not found_agent_token ): # If DB check didn't find it initially and update affected 0 rows return [ mcp_types.TextContent( type="text", text=f"Agent '{agent_id_to_terminate}' not found in DB or already terminated.", ) ] log_agent_action_to_db( cursor, "admin", "terminated_agent", details={"agent_id": agent_id_to_terminate}, ) conn.commit() # Remove from active in-memory state if present (main.py:1309-1311) if found_agent_token and found_agent_token in g.active_agents: del g.active_agents[found_agent_token] if agent_id_to_terminate in g.agent_working_dirs: del g.agent_working_dirs[agent_id_to_terminate] # Release any files held by this agent from g.file_map files_released_count = 0 for filepath, info in list(g.file_map.items()): # Iterate over a copy if info.get("agent_id") == agent_id_to_terminate: del g.file_map[filepath] files_released_count += 1 if files_released_count > 0: logger.info( f"Released {files_released_count} files held by terminated agent {agent_id_to_terminate}." ) # Kill tmux session if it exists tmux_kill_status = "" if agent_id_to_terminate in g.agent_tmux_sessions: session_name = g.agent_tmux_sessions[agent_id_to_terminate] if kill_tmux_session(session_name): tmux_kill_status = f" Killed tmux session '{session_name}'." logger.info( f"Killed tmux session '{session_name}' for agent '{agent_id_to_terminate}'" ) else: tmux_kill_status = f" Failed to kill tmux session '{session_name}'." logger.warning( f"Failed to kill tmux session '{session_name}' for agent '{agent_id_to_terminate}'" ) # Remove from tracking regardless of kill success del g.agent_tmux_sessions[agent_id_to_terminate] else: # Try to kill session by agent_id in case tracking is out of sync sanitized_name = sanitize_session_name(agent_id_to_terminate) if session_exists(sanitized_name): if kill_tmux_session(sanitized_name): tmux_kill_status = ( f" Killed orphaned tmux session '{sanitized_name}'." ) logger.info( f"Killed orphaned tmux session '{sanitized_name}' for agent '{agent_id_to_terminate}'" ) log_audit( "admin", "terminate_agent", {"agent_id": agent_id_to_terminate} ) # main.py:1313 logger.info(f"Agent '{agent_id_to_terminate}' terminated successfully.") return [ mcp_types.TextContent( type="text", text=f"Agent '{agent_id_to_terminate}' terminated.{tmux_kill_status}", ) ] except sqlite3.Error as e_sql: if conn: conn.rollback() logger.error( f"Database error terminating agent {agent_id_to_terminate}: {e_sql}", exc_info=True, ) return [ mcp_types.TextContent( type="text", text=f"Database error terminating agent: {e_sql}" ) ] except Exception as e: if conn: conn.rollback() logger.error( f"Unexpected error terminating agent {agent_id_to_terminate}: {e}", exc_info=True, ) return [ mcp_types.TextContent( type="text", text=f"Unexpected error terminating agent: {e}" ) ] finally: if conn: conn.close() # --- view_audit_log tool --- # Original logic from main.py: lines 1387-1408 (view_audit_log_tool function) async def view_audit_log_tool_impl( arguments: Dict[str, Any], ) -> List[mcp_types.TextContent]: token = arguments.get("token") filter_agent_id = arguments.get("agent_id") # Optional filter filter_action = arguments.get("action") # Optional filter limit = arguments.get("limit", 50) # Default limit 50 if not verify_token(token, "admin"): # main.py:1389 return [ mcp_types.TextContent( type="text", text="Unauthorized: Admin token required" ) ] # Validate limit try: limit = int(limit) if not (1 <= limit <= 200): # Max 200 for safety limit = 50 except ValueError: limit = 50 # Filter the in-memory audit log (g.audit_log) (main.py:1394-1400) # For a more complete audit log, one might query the agent_actions table from DB. # The original tool only viewed the in-memory `audit_log`. # The original `audit_log` was a global list. # The `log_audit` function in `utils/audit_utils.py` appends to `g.audit_log`. # So, we read from `g.audit_log`. # Create a working copy for filtering current_audit_log_snapshot = list(g.audit_log) # Filter from a snapshot filtered_log_entries = current_audit_log_snapshot if filter_agent_id: filtered_log_entries = [ entry for entry in filtered_log_entries if entry.get("agent_id") == filter_agent_id ] if filter_action: filtered_log_entries = [ entry for entry in filtered_log_entries if entry.get("action") == filter_action ] # Get the most recent entries up to the limit (main.py:1403) # Slicing from the end gives recent entries. limited_log_entries = filtered_log_entries[-limit:] # Log this action itself (main.py:1405) log_audit( "admin", "view_audit_log", { "filter_agent_id": filter_agent_id, "filter_action": filter_action, "limit": limit, }, ) try: log_json = json.dumps(limited_log_entries, indent=2) except TypeError as e: logger.error(f"Error serializing audit log to JSON: {e}") log_json = f"Error creating audit log JSON: {e}" return [ mcp_types.TextContent( type="text", text=f"Audit Log ({len(limited_log_entries)} entries displayed, filtered by agent: {filter_agent_id or 'Any'}, action: {filter_action or 'Any'}):\n{log_json}", ) ] # --- get_agent_tokens tool --- async def get_agent_tokens_tool_impl( arguments: Dict[str, Any], ) -> List[mcp_types.TextContent]: """ Retrieve agent tokens with advanced filtering capabilities. Supports filtering by status, agent_id pattern, creation date range, and more. """ token = arguments.get("token") # Authentication if not verify_token(token, "admin"): return [ mcp_types.TextContent( type="text", text="Unauthorized: Admin token required" ) ] # Extract and validate filter parameters filter_status = arguments.get( "filter_status" ) # e.g., "active", "terminated", "created" filter_agent_id_pattern = arguments.get( "filter_agent_id_pattern" ) # SQL LIKE pattern filter_created_after = arguments.get("filter_created_after") # ISO format date filter_created_before = arguments.get("filter_created_before") # ISO format date include_terminated = arguments.get("include_terminated", False) # Boolean include_sensitive_data = arguments.get("include_sensitive_data", True) # Boolean limit = arguments.get("limit", 50) # Default limit offset = arguments.get("offset", 0) # Pagination offset sort_by = arguments.get("sort_by", "created_at") # Sort field sort_order = arguments.get("sort_order", "DESC") # ASC or DESC # Validate parameters try: limit = int(limit) if not (1 <= limit <= 500): # Max 500 for safety limit = 50 except (ValueError, TypeError): limit = 50 try: offset = int(offset) if offset < 0: offset = 0 except (ValueError, TypeError): offset = 0 # Validate sort parameters allowed_sort_fields = ["created_at", "updated_at", "agent_id", "status"] if sort_by not in allowed_sort_fields: sort_by = "created_at" if sort_order.upper() not in ["ASC", "DESC"]: sort_order = "DESC" conn = None try: conn = get_db_connection() cursor = conn.cursor() # Build dynamic query base_query = """ SELECT token, agent_id, status, created_at FROM agents WHERE 1=1 """ query_params = [] # Apply filters if filter_status: base_query += " AND status = ?" query_params.append(filter_status) if filter_agent_id_pattern: base_query += " AND agent_id LIKE ?" query_params.append(filter_agent_id_pattern) if not include_terminated: base_query += " AND status != ?" query_params.append("terminated") if filter_created_after: base_query += " AND created_at >= ?" query_params.append(filter_created_after) if filter_created_before: base_query += " AND created_at <= ?" query_params.append(filter_created_before) # Add sorting and pagination base_query += f" ORDER BY {sort_by} {sort_order}" base_query += " LIMIT ? OFFSET ?" query_params.extend([limit, offset]) # Execute query cursor.execute(base_query, query_params) rows = cursor.fetchall() # Process results agents_data = [] for row in rows: agent_data = dict(row) # Handle sensitive data if not include_sensitive_data: # Mask the token for security if "token" in agent_data: token_value = agent_data["token"] if token_value and len(token_value) > 8: agent_data["token"] = token_value[:4] + "..." + token_value[-4:] else: agent_data["token"] = "***" agents_data.append(agent_data) # Get total count for pagination info count_query = """ SELECT COUNT(*) as total FROM agents WHERE 1=1 """ count_params = [] if filter_status: count_query += " AND status = ?" count_params.append(filter_status) if filter_agent_id_pattern: count_query += " AND agent_id LIKE ?" count_params.append(filter_agent_id_pattern) if not include_terminated: count_query += " AND status != ?" count_params.append("terminated") if filter_created_after: count_query += " AND created_at >= ?" count_params.append(filter_created_after) if filter_created_before: count_query += " AND created_at <= ?" count_params.append(filter_created_before) cursor.execute(count_query, count_params) total_count = cursor.fetchone()[0] # Log this access log_audit( "admin", "get_agent_tokens", { "filter_status": filter_status, "filter_agent_id_pattern": filter_agent_id_pattern, "agents_returned": len(agents_data), "total_matching": total_count, "include_sensitive_data": include_sensitive_data, }, ) # Build response response_data = { "agents": agents_data, "pagination": { "offset": offset, "limit": limit, "total_count": total_count, "returned_count": len(agents_data), "has_more": offset + len(agents_data) < total_count, }, "filters_applied": { "filter_status": filter_status, "filter_agent_id_pattern": filter_agent_id_pattern, "filter_created_after": filter_created_after, "filter_created_before": filter_created_before, "include_terminated": include_terminated, "include_sensitive_data": include_sensitive_data, }, "sort": {"sort_by": sort_by, "sort_order": sort_order}, } try: response_json = json.dumps(response_data, indent=2) except TypeError as e: logger.error(f"Error serializing agent tokens response to JSON: {e}") response_json = f"Error creating response JSON: {e}" return [ mcp_types.TextContent( type="text", text=f"Agent Tokens ({len(agents_data)} of {total_count} total):\n{response_json}", ) ] except sqlite3.Error as e_sql: logger.error(f"Database error retrieving agent tokens: {e_sql}", exc_info=True) return [ mcp_types.TextContent( type="text", text=f"Database error retrieving agent tokens: {e_sql}" ) ] except Exception as e: logger.error(f"Unexpected error retrieving agent tokens: {e}", exc_info=True) return [ mcp_types.TextContent( type="text", text=f"Unexpected error retrieving agent tokens: {e}" ) ] finally: if conn: conn.close() # --- relaunch_agent tool --- async def relaunch_agent_tool_impl( arguments: Dict[str, Any], ) -> List[mcp_types.TextContent]: """ Relaunch an existing agent by reusing its tmux session. Only works for agents with status: terminated, completed, failed, cancelled. Sends /clear to reset the session and sends a new prompt. """ admin_token = arguments.get("token") agent_id = arguments.get("agent_id") generate_new_token = arguments.get("generate_new_token", False) custom_prompt = arguments.get("custom_prompt") prompt_template = arguments.get("prompt_template", "worker_with_rag") # Admin authentication if not verify_token(admin_token, "admin"): return [ mcp_types.TextContent( type="text", text="Unauthorized: Admin token required" ) ] if not agent_id: return [mcp_types.TextContent(type="text", text="Error: agent_id is required")] conn = None try: conn = get_db_connection() cursor = conn.cursor() # Check if agent exists and get current status cursor.execute("SELECT * FROM agents WHERE agent_id = ?", (agent_id,)) agent_row = cursor.fetchone() if not agent_row: return [ mcp_types.TextContent(type="text", text=f"Agent '{agent_id}' not found") ] agent_data = dict(agent_row) current_status = agent_data.get("status") # Only allow relaunch for certain statuses allowed_statuses = ["terminated", "completed", "failed", "cancelled"] if current_status not in allowed_statuses: return [ mcp_types.TextContent( type="text", text=f"Cannot relaunch agent with status '{current_status}'. Allowed statuses: {', '.join(allowed_statuses)}", ) ] # Check if tmux session still exists if agent_id not in g.agent_tmux_sessions: return [ mcp_types.TextContent( type="text", text=f"Agent '{agent_id}' has no active tmux session to relaunch. Use create_agent instead.", ) ] session_name = g.agent_tmux_sessions[agent_id] if not session_exists(session_name): # Clean up the dead session reference del g.agent_tmux_sessions[agent_id] return [ mcp_types.TextContent( type="text", text=f"Tmux session '{session_name}' for agent '{agent_id}' no longer exists. Use create_agent instead.", ) ] # Send /clear command to reset the session clear_success = send_command_to_session(session_name, "/clear") if not clear_success: return [ mcp_types.TextContent( type="text", text=f"Failed to send /clear command to session '{session_name}'", ) ] # Generate new token if requested agent_token = agent_data.get("token") if generate_new_token: agent_token = generate_token() cursor.execute( "UPDATE agents SET token = ? WHERE agent_id = ?", (agent_token, agent_id), ) # Update agent status to active updated_at_iso = datetime.datetime.now().isoformat() cursor.execute( "UPDATE agents SET status = ?, updated_at = ? WHERE agent_id = ?", ("active", updated_at_iso, agent_id), ) # Build and send new prompt try: if custom_prompt: prompt_to_send = custom_prompt else: prompt_to_send = build_agent_prompt(prompt_template, admin_token) # Send the new prompt to restart the agent send_prompt_async(session_name, prompt_to_send, delay_seconds=2) except Exception as e_prompt: logger.error(f"Failed to build or send prompt for relaunch: {e_prompt}") # Revert status change cursor.execute( "UPDATE agents SET status = ? WHERE agent_id = ?", (current_status, agent_id), ) conn.commit() return [ mcp_types.TextContent( type="text", text=f"Failed to send restart prompt: {e_prompt}" ) ] # Update in-memory state if agent_token in g.active_agents: g.active_agents[agent_token]["status"] = "active" g.active_agents[agent_token]["updated_at"] = updated_at_iso else: # Add to active agents if not already there g.active_agents[agent_token] = { "agent_id": agent_id, "status": "active", "token": agent_token, "working_directory": agent_data.get("working_directory"), "capabilities": json.loads(agent_data.get("capabilities", "[]")), "updated_at": updated_at_iso, } # Log the action log_agent_action_to_db( cursor, "admin", "relaunch_agent", details={ "agent_id": agent_id, "session_name": session_name, "previous_status": current_status, "new_token_generated": generate_new_token, "prompt_template": prompt_template, }, ) conn.commit() log_audit( "admin", "relaunch_agent", { "agent_id": agent_id, "previous_status": current_status, "session_name": session_name, "new_token": generate_new_token, }, ) response_parts = [ f"Agent '{agent_id}' successfully relaunched in session '{session_name}'", f"Previous status: {current_status} → active", f"Session cleared and new prompt sent", ] if generate_new_token: response_parts.append(f"New token generated: {agent_token}") else: response_parts.append(f"Using existing token: {agent_token}") return [mcp_types.TextContent(type="text", text="\n".join(response_parts))] except sqlite3.Error as e_sql: if conn: conn.rollback() logger.error( f"Database error relaunching agent {agent_id}: {e_sql}", exc_info=True ) return [ mcp_types.TextContent( type="text", text=f"Database error relaunching agent: {e_sql}" ) ] except Exception as e: if conn: conn.rollback() logger.error( f"Unexpected error relaunching agent {agent_id}: {e}", exc_info=True ) return [ mcp_types.TextContent( type="text", text=f"Unexpected error relaunching agent: {e}" ) ] finally: if conn: conn.close() # --- Register all admin tools --- def register_admin_tools(): register_tool( name="create_agent", description="Create a new agent with the specified ID, capabilities, and prompt configuration. The agent will be assigned the specified tasks upon creation. Agents work in the shared project directory with file-level locking for coordination.", input_schema={ # Enhanced with prompt template support and required task_ids "type": "object", "properties": { "token": { "type": "string", "description": "Admin authentication token", }, "agent_id": { "type": "string", "description": "Unique identifier for the agent", }, "task_ids": { "type": "array", "description": "List of task IDs to assign to the agent (required). Tasks must exist and be unassigned.", "items": {"type": "string"}, "minItems": 1, }, "capabilities": { "type": "array", "description": "List of agent capabilities (e.g., 'code_edit', 'file_read')", "items": {"type": "string"}, "default": [], }, "prompt_template": { "type": "string", "description": "Prompt template to use ('worker_with_rag', 'basic_worker', 'frontend_worker', 'admin_agent', 'custom')", "enum": [ "worker_with_rag", "basic_worker", "frontend_worker", "admin_agent", "custom", ], "default": "worker_with_rag", }, "custom_prompt": { "type": "string", "description": "Custom prompt text (required if prompt_template is 'custom')", }, "send_prompt": { "type": "boolean", "description": "Whether to automatically send the prompt to the tmux session after launch", "default": True, }, "prompt_delay": { "type": "integer", "description": "Seconds to wait before sending prompt (allows Claude to start up)", "default": 5, "minimum": 1, "maximum": 30, }, }, "required": ["token", "agent_id", "task_ids"], "additionalProperties": False, }, implementation=create_agent_tool_impl, ) register_tool( name="view_status", description="View the status of all agents, connections, and the MCP server.", input_schema={ # From main.py:1663-1674 "type": "object", "properties": { "token": {"type": "string", "description": "Admin authentication token"} }, "required": ["token"], "additionalProperties": False, }, implementation=view_status_tool_impl, ) register_tool( name="terminate_agent", description="Terminate an active agent with the given ID.", input_schema={ # From main.py:1676-1689 "type": "object", "properties": { "token": { "type": "string", "description": "Admin authentication token", }, "agent_id": { "type": "string", "description": "Unique identifier for the agent to terminate", }, }, "required": ["token", "agent_id"], "additionalProperties": False, }, implementation=terminate_agent_tool_impl, ) register_tool( name="view_audit_log", description="View the in-memory audit log, optionally filtered by agent ID or action, with a limit.", input_schema={ # From main.py:1788-1810 "type": "object", "properties": { "token": { "type": "string", "description": "Admin authentication token", }, "agent_id": { "type": "string", "description": "Filter audit log by agent ID (optional)", }, "action": { "type": "string", "description": "Filter audit log by action (e.g., 'create_agent') (optional)", }, "limit": { "type": "integer", "description": "Maximum number of entries to return (default 50, max 200)", "default": 50, "minimum": 1, "maximum": 200, }, }, "required": ["token"], "additionalProperties": False, }, implementation=view_audit_log_tool_impl, ) register_tool( name="get_agent_tokens", description="Retrieve agent tokens with advanced filtering capabilities. Supports filtering by status, agent_id pattern, creation date range, and more.", input_schema={ "type": "object", "properties": { "token": { "type": "string", "description": "Admin authentication token", }, "filter_status": { "type": "string", "description": "Filter by agent status (e.g., 'active', 'terminated', 'created')", }, "filter_agent_id_pattern": { "type": "string", "description": "Filter by agent ID using SQL LIKE pattern (e.g., 'test_%', '%prod%')", }, "filter_created_after": { "type": "string", "description": "Filter agents created after this date (ISO format: YYYY-MM-DDTHH:MM:SS)", }, "filter_created_before": { "type": "string", "description": "Filter agents created before this date (ISO format: YYYY-MM-DDTHH:MM:SS)", }, "include_terminated": { "type": "boolean", "description": "Include terminated agents in results (default: false)", "default": False, }, "include_sensitive_data": { "type": "boolean", "description": "Include full tokens in response (default: true). If false, tokens will be masked for security.", "default": True, }, "limit": { "type": "integer", "description": "Maximum number of agents to return (default: 50, max: 500)", "default": 50, "minimum": 1, "maximum": 500, }, "offset": { "type": "integer", "description": "Number of agents to skip for pagination (default: 0)", "default": 0, "minimum": 0, }, "sort_by": { "type": "string", "description": "Field to sort by (default: 'created_at')", "enum": ["created_at", "updated_at", "agent_id", "status"], "default": "created_at", }, "sort_order": { "type": "string", "description": "Sort order (default: 'DESC')", "enum": ["ASC", "DESC"], "default": "DESC", }, }, "required": ["token"], "additionalProperties": False, }, implementation=get_agent_tokens_tool_impl, ) register_tool( name="relaunch_agent", description="Relaunch an existing terminated/completed/failed/cancelled agent by reusing its tmux session.", input_schema={ "type": "object", "properties": { "token": { "type": "string", "description": "Admin authentication token", }, "agent_id": { "type": "string", "description": "ID of the agent to relaunch", }, "generate_new_token": { "type": "boolean", "description": "Generate a new token for the relaunched agent (default: false)", "default": False, }, "custom_prompt": { "type": "string", "description": "Custom prompt to send instead of template prompt", }, "prompt_template": { "type": "string", "description": "Prompt template to use (default: 'worker_with_rag')", "default": "worker_with_rag", }, }, "required": ["token", "agent_id"], "additionalProperties": False, }, implementation=relaunch_agent_tool_impl, ) # Call registration when this module is imported register_admin_tools()

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/rinadelph/Agent-MCP'

If you have feedback or need assistance with the MCP directory API, please join our Discord server