Skip to main content
Glama

Agent MCP

task_tools.py162 kB
# Agent-MCP/mcp_template/mcp_server_src/tools/task_tools.py import json import datetime import secrets # For task_id generation import os # For request_assistance (notifications path) import sqlite3 # For database operations from pathlib import Path # For request_assistance from typing import List, Dict, Any, Optional import mcp.types as mcp_types from .registry import register_tool from ..core.config import logger, ENABLE_TASK_PLACEMENT_RAG, ALLOW_RAG_OVERRIDE from ..core import globals as g from ..core.auth import verify_token, get_agent_id from ..utils.audit_utils import log_audit from ..db.connection import get_db_connection, execute_db_write from ..db.actions.agent_actions_db import log_agent_action_to_db from ..features.task_placement.validator import validate_task_placement from ..features.task_placement.suggestions import ( format_suggestions_for_agent, format_override_reason, should_escalate_to_admin, ) from ..features.rag.indexing import index_task_data # For request_assistance, generate_id was used. Let's use secrets.token_hex for consistency. # from main.py:1191 (generate_id - not present, assuming secrets.token_hex was intended) from .agent_communication_tools import send_agent_message_tool_impl # For testing agent auto-launch from ..core.auth import generate_token from ..core.config import AGENT_COLORS from ..utils.tmux_utils import ( create_tmux_session, send_prompt_async, send_command_to_session, sanitize_session_name, ) from ..utils.prompt_templates import build_agent_prompt def estimate_tokens(text: str) -> int: """Accurate token estimation using tiktoken for GPT-4""" try: import tiktoken encoding = tiktoken.encoding_for_model("gpt-4") return len(encoding.encode(text)) except ImportError: # Fallback to rough estimation if tiktoken not available return len(text) // 4 except Exception: # Fallback for any other tiktoken errors return len(text) // 4 def _generate_task_id() -> str: """Generates a unique task ID.""" return f"task_{secrets.token_hex(6)}" def _generate_notification_id() -> str: """Generates a unique notification ID.""" return f"notification_{secrets.token_hex(8)}" async def _send_escape_to_agent(agent_id: str) -> bool: """Send escape sequences to pause an agent's tmux session.""" try: if agent_id in g.agent_tmux_sessions: session_name = g.agent_tmux_sessions[agent_id] logger.info( f"Sending escape sequences to pause agent {agent_id} in session {session_name}" ) # Send 4 escape sequences with 1 second intervals to stop current operation import subprocess import time clean_session_name = sanitize_session_name(session_name) for i in range(4): result = subprocess.run( ["tmux", "send-keys", "-t", clean_session_name, "Escape"], capture_output=True, text=True, timeout=5, ) if result.returncode != 0: logger.error( f"Failed to send Escape {i+1}/4 to agent {agent_id}: {result.stderr}" ) return False logger.debug(f"Sent Escape {i+1}/4 to agent {agent_id}") if i < 3: # Don't sleep after the last one time.sleep(1) logger.info(f"Successfully paused agent {agent_id}") return True else: logger.warning(f"Agent {agent_id} has no active tmux session to pause") return False except Exception as e: logger.error(f"Error sending escape to agent {agent_id}: {e}") return False async def _launch_testing_agent_for_completed_task( cursor, completed_task_id: str, completed_by_agent: str ) -> bool: """Launch testing agent when task completes.""" try: # 1. Send Escape sequences to pause completing agent await _send_escape_to_agent(completed_by_agent) # 2. Get task details for context cursor.execute("SELECT * FROM tasks WHERE task_id = ?", (completed_task_id,)) task_row = cursor.fetchone() if not task_row: logger.error(f"Cannot find completed task {completed_task_id} for testing") return False task_data = dict(task_row) # 3. Generate testing agent ID testing_agent_id = f"test-{completed_task_id[-6:]}" # 4. Kill existing testing agent if it exists (task re-completed after fixes) from ..utils.tmux_utils import kill_tmux_session # Check if testing agent already exists existing_agent = None cursor.execute( "SELECT agent_id FROM agents WHERE agent_id = ?", (testing_agent_id,) ) existing_agent = cursor.fetchone() if existing_agent or testing_agent_id in g.agent_working_dirs: logger.info( f"Task {completed_task_id} re-completed - killing existing testing agent {testing_agent_id} to launch fresh one" ) # Kill tmux session if it exists if testing_agent_id in g.agent_tmux_sessions: session_name = g.agent_tmux_sessions[testing_agent_id] kill_tmux_session(session_name) del g.agent_tmux_sessions[testing_agent_id] # Clean up global tracking if testing_agent_id in g.agent_working_dirs: del g.agent_working_dirs[testing_agent_id] if testing_agent_id in g.active_agents: del g.active_agents[testing_agent_id] # Remove from database cursor.execute("DELETE FROM agents WHERE agent_id = ?", (testing_agent_id,)) logger.info(f"Cleaned up existing testing agent {testing_agent_id}") # 5. Create testing agent token and database entry testing_token = generate_token() created_at_iso = datetime.datetime.now().isoformat() # Assign a color for the testing agent agent_color = AGENT_COLORS[g.agent_color_index % len(AGENT_COLORS)] g.agent_color_index += 1 # Get project directory project_dir_env = os.environ.get("MCP_PROJECT_DIR") if not project_dir_env: logger.error("MCP_PROJECT_DIR not set, cannot launch testing agent") return False # Insert testing agent into database cursor.execute( """ INSERT INTO agents (token, agent_id, capabilities, created_at, status, current_task, working_directory, color) VALUES (?, ?, ?, ?, ?, ?, ?, ?) """, ( testing_token, testing_agent_id, json.dumps(["testing", "validation", "criticism"]), created_at_iso, "created", completed_task_id, # Set the completed task as current task project_dir_env, agent_color, ), ) # 6. Build enriched prompt for testing agent prompt = build_agent_prompt( agent_id=testing_agent_id, agent_token=testing_token, admin_token=g.admin_token, template_name="testing_agent", completed_by_agent=completed_by_agent, completed_task_id=completed_task_id, completed_task_title=task_data.get("title", "Unknown"), completed_task_description=task_data.get("description", "No description"), ) if not prompt: logger.error(f"Failed to build prompt for testing agent {testing_agent_id}") return False # 7. Create tmux session for testing agent def get_admin_token_suffix(admin_token: str) -> str: """Extract last 4 chars from admin token for session naming.""" if not admin_token or len(admin_token) < 4: return "0000" return admin_token[-4:].lower() suffix = get_admin_token_suffix(g.admin_token) clean_agent_id = sanitize_session_name(testing_agent_id) session_name = f"{clean_agent_id}-{suffix}" # Set up environment variables for the testing agent env_vars = { "MCP_AGENT_ID": testing_agent_id, "MCP_AGENT_TOKEN": testing_token, "MCP_SERVER_URL": f"http://localhost:{os.environ.get('PORT', '8080')}", "MCP_WORKING_DIR": project_dir_env, } # Create tmux session with environment variables success = create_tmux_session( session_name=session_name, working_dir=project_dir_env, command=None, # Don't start Claude immediately env_vars=env_vars, ) if not success: logger.error( f"Failed to create tmux session for testing agent {testing_agent_id}" ) return False # 8. Store session mapping and update in-memory state g.agent_tmux_sessions[testing_agent_id] = session_name g.agent_working_dirs[testing_agent_id] = project_dir_env g.active_agents[testing_token] = { "agent_id": testing_agent_id, "created_at": created_at_iso, "last_activity": created_at_iso, } # 9. Follow proper agent setup procedure try: import time def wait_for_command_completion(delay: float = 1.0): """Smart delay system - wait for command completion or timeout""" time.sleep(delay) setup_delay = 1.0 # 1 second delay between setup commands # Welcome message welcome_message = f"echo '=== Testing Agent {testing_agent_id} initialization starting ==='" if send_command_to_session(session_name, welcome_message): logger.info( f"✅ Sent welcome message to testing agent '{testing_agent_id}'" ) else: logger.error( f"❌ Failed to send welcome message to testing agent '{testing_agent_id}'" ) wait_for_command_completion(setup_delay) # Verify working directory verify_command = f"echo 'Working directory:' && pwd" if send_command_to_session(session_name, verify_command): logger.info( f"✅ Sent directory verification to testing agent '{testing_agent_id}'" ) wait_for_command_completion(setup_delay) # Get server port for MCP registration server_port = os.environ.get("PORT", "8080") mcp_server_url = f"http://localhost:{server_port}/sse" # Log MCP server info mcp_info_command = f"echo 'MCP Server URL: {mcp_server_url}'" send_command_to_session(session_name, mcp_info_command) wait_for_command_completion(setup_delay) # Register MCP server connection mcp_add_command = f"claude mcp add -t sse AgentMCP {mcp_server_url}" logger.info( f"Registering MCP server for testing agent '{testing_agent_id}': {mcp_add_command}" ) if not send_command_to_session(session_name, mcp_add_command): logger.error( f"Failed to register MCP server for testing agent '{testing_agent_id}'" ) return False # Add delay to ensure MCP registration completes wait_for_command_completion(setup_delay) # Verify MCP registration verify_mcp_command = "claude mcp list" logger.info( f"Verifying MCP registration for testing agent '{testing_agent_id}'" ) send_command_to_session(session_name, verify_mcp_command) wait_for_command_completion(setup_delay) # Start Claude start_claude_message = "echo '--- Starting Claude with MCP ---'" send_command_to_session(session_name, start_claude_message) wait_for_command_completion(setup_delay) claude_command = "claude --dangerously-skip-permissions" logger.info( f"Starting Claude for testing agent '{testing_agent_id}': {claude_command}" ) if not send_command_to_session(session_name, claude_command): logger.error( f"Failed to start Claude for testing agent '{testing_agent_id}'" ) return False # Log completion message completion_message = f"echo '=== Testing Agent {testing_agent_id} setup complete - Claude starting ==='" send_command_to_session(session_name, completion_message) # Send enriched prompt with delay send_prompt_async(session_name, prompt, delay_seconds=5) logger.info( f"Testing agent {testing_agent_id} launched successfully for task {completed_task_id}" ) # Log the testing agent creation log_agent_action_to_db( cursor, "admin", "create_testing_agent", details={ "testing_agent_id": testing_agent_id, "completed_task_id": completed_task_id, "completed_by_agent": completed_by_agent, }, ) return True except Exception as e: logger.error( f"Failed to send prompt to testing agent {testing_agent_id}: {e}" ) return False except Exception as e: logger.error(f"Error launching testing agent for task {completed_task_id}: {e}") return False async def _update_single_task( cursor, task_id: str, new_status: str, requesting_agent_id: str, is_admin_request: bool, notes_content: Optional[str] = None, new_title: Optional[str] = None, new_description: Optional[str] = None, new_priority: Optional[str] = None, new_assigned_to: Optional[str] = None, new_depends_on_tasks: Optional[List[str]] = None, ) -> Dict[str, Any]: """Helper function to update a single task with smart features""" # Fetch task current data cursor.execute("SELECT * FROM tasks WHERE task_id = ?", (task_id,)) task_db_row = cursor.fetchone() if not task_db_row: return {"success": False, "error": f"Task '{task_id}' not found"} task_current_data = dict(task_db_row) # Verify permissions if ( task_current_data.get("assigned_to") != requesting_agent_id and not is_admin_request ): return { "success": False, "error": f"Unauthorized: Cannot update task '{task_id}' assigned to {task_current_data.get('assigned_to')}", } updated_at_iso = datetime.datetime.now().isoformat() # Build update query update_fields_sql = ["status = ?", "updated_at = ?"] update_params = [new_status, updated_at_iso] # Handle notes current_notes_list = json.loads(task_current_data.get("notes") or "[]") if notes_content: current_notes_list.append( { "timestamp": updated_at_iso, "author": requesting_agent_id, "content": notes_content, } ) update_fields_sql.append("notes = ?") update_params.append(json.dumps(current_notes_list)) # Admin-only field updates if is_admin_request: if new_title is not None: update_fields_sql.append("title = ?") update_params.append(new_title) if new_description is not None: update_fields_sql.append("description = ?") update_params.append(new_description) if new_priority is not None: update_fields_sql.append("priority = ?") update_params.append(new_priority) if new_assigned_to is not None: update_fields_sql.append("assigned_to = ?") update_params.append(new_assigned_to) if new_depends_on_tasks is not None: update_fields_sql.append("depends_on_tasks = ?") update_params.append(json.dumps(new_depends_on_tasks)) update_params.append(task_id) # Execute update with validated field assignments if update_fields_sql: # Validate all field assignments are safe (contain only expected patterns) allowed_field_patterns = [ "status = ?", "updated_at = ?", "notes = ?", "title = ?", "description = ?", "priority = ?", "assigned_to = ?", "depends_on_tasks = ?", ] safe_fields = [] for field in update_fields_sql: if field in allowed_field_patterns: safe_fields.append(field) else: logger.warning( f"Task update: Skipping unsafe field assignment: {field}" ) if safe_fields: set_clause = ", ".join(safe_fields) update_sql = f"UPDATE tasks SET {set_clause} WHERE task_id = ?" cursor.execute(update_sql, tuple(update_params)) # Update in-memory cache if task_id in g.tasks: g.tasks[task_id]["status"] = new_status g.tasks[task_id]["updated_at"] = updated_at_iso g.tasks[task_id]["notes"] = current_notes_list if is_admin_request: if new_title is not None: g.tasks[task_id]["title"] = new_title if new_description is not None: g.tasks[task_id]["description"] = new_description if new_priority is not None: g.tasks[task_id]["priority"] = new_priority if new_assigned_to is not None: g.tasks[task_id]["assigned_to"] = new_assigned_to if new_depends_on_tasks is not None: g.tasks[task_id]["depends_on_tasks"] = new_depends_on_tasks # Handle parent task notifications if new_status in ["completed", "cancelled", "failed"] and task_current_data.get( "parent_task" ): parent_task_id = task_current_data["parent_task"] cursor.execute("SELECT notes FROM tasks WHERE task_id = ?", (parent_task_id,)) parent_row = cursor.fetchone() if parent_row: parent_notes_list = json.loads(parent_row["notes"] or "[]") parent_notes_list.append( { "timestamp": updated_at_iso, "author": "system", "content": f"Subtask '{task_id}' ({task_current_data.get('title', '')}) status changed to: {new_status}", } ) cursor.execute( "UPDATE tasks SET notes = ?, updated_at = ? WHERE task_id = ?", (json.dumps(parent_notes_list), updated_at_iso, parent_task_id), ) if parent_task_id in g.tasks: g.tasks[parent_task_id]["notes"] = parent_notes_list g.tasks[parent_task_id]["updated_at"] = updated_at_iso return { "success": True, "task_id": task_id, "old_status": task_current_data.get("status"), "new_status": new_status, "child_tasks": json.loads(task_current_data.get("child_tasks") or "[]"), "depends_on_tasks": json.loads( task_current_data.get("depends_on_tasks") or "[]" ), } def _analyze_task_dependencies( task: Dict[str, Any], all_tasks: Dict[str, Dict[str, Any]] ) -> Dict[str, Any]: """Analyze task dependencies and blocking conditions""" task_id = task.get("task_id") status = task.get("status") depends_on = task.get("depends_on_tasks", []) if isinstance(depends_on, str): try: depends_on = json.loads(depends_on) except: depends_on = [] analysis = { "is_blocked": False, "blocking_dependencies": [], "completed_dependencies": [], "missing_dependencies": [], "can_start": True, "blocks_tasks": [], "dependency_health": "healthy", } # Check dependencies for dep_id in depends_on: if dep_id in all_tasks: dep_task = all_tasks[dep_id] dep_status = dep_task.get("status") if dep_status == "completed": analysis["completed_dependencies"].append(dep_id) elif dep_status in ["failed", "cancelled"]: analysis["blocking_dependencies"].append(dep_id) analysis["is_blocked"] = True analysis["can_start"] = False elif dep_status in ["pending", "in_progress"]: analysis["blocking_dependencies"].append(dep_id) if status == "pending": analysis["can_start"] = False else: analysis["missing_dependencies"].append(dep_id) analysis["is_blocked"] = True analysis["can_start"] = False # Find tasks that depend on this one for other_id, other_task in all_tasks.items(): other_deps = other_task.get("depends_on_tasks", []) if isinstance(other_deps, str): try: other_deps = json.loads(other_deps) except: other_deps = [] if task_id in other_deps: analysis["blocks_tasks"].append(other_id) # Determine health if analysis["missing_dependencies"]: analysis["dependency_health"] = "critical" elif analysis["is_blocked"] and status == "in_progress": analysis["dependency_health"] = "warning" elif not analysis["can_start"] and status == "pending": analysis["dependency_health"] = "waiting" return analysis def _calculate_task_health_metrics(tasks: List[Dict[str, Any]]) -> Dict[str, Any]: """Calculate overall task health metrics""" if not tasks: return {"total": 0, "status": "no_data"} total = len(tasks) status_counts = {} priority_counts = {} blocked_count = 0 overdue_count = 0 stale_count = 0 current_time = datetime.datetime.now() for task in tasks: # Status distribution status = task.get("status", "unknown") status_counts[status] = status_counts.get(status, 0) + 1 # Priority distribution priority = task.get("priority", "medium") priority_counts[priority] = priority_counts.get(priority, 0) + 1 # Check for blocked tasks (has dependencies but can't proceed) deps = task.get("depends_on_tasks", []) if isinstance(deps, str): try: deps = json.loads(deps) except: deps = [] if deps and status == "pending": blocked_count += 1 # Check for stale tasks (no updates in 7+ days) updated_at = task.get("updated_at") if updated_at: try: updated_time = datetime.datetime.fromisoformat( updated_at.replace("Z", "+00:00").replace("+00:00", "") ) days_since_update = (current_time - updated_time).days if days_since_update > 7 and status in ["in_progress", "pending"]: stale_count += 1 except: pass # Calculate health score (0-100) completed_ratio = status_counts.get("completed", 0) / total active_ratio = ( status_counts.get("in_progress", 0) + status_counts.get("pending", 0) ) / total blocked_ratio = blocked_count / total if total > 0 else 0 stale_ratio = stale_count / total if total > 0 else 0 health_score = max( 0, min( 100, completed_ratio * 30 # 30% weight for completion + active_ratio * 40 # 40% weight for active work + (1 - blocked_ratio) * 20 # 20% penalty for blocked tasks + (1 - stale_ratio) * 10, # 10% penalty for stale tasks ), ) return { "total": total, "status_distribution": status_counts, "priority_distribution": priority_counts, "blocked_tasks": blocked_count, "stale_tasks": stale_count, "health_score": round(health_score, 1), "health_status": ( "excellent" if health_score >= 80 else ( "good" if health_score >= 60 else "needs_attention" if health_score >= 40 else "critical" ) ), } # --- Helper functions for assign_task modes --- async def _create_unassigned_tasks( arguments: Dict[str, Any], ) -> List[mcp_types.TextContent]: """Mode 0: Create unassigned tasks (assigned_to = NULL)""" task_title = arguments.get("task_title") task_description = arguments.get("task_description") tasks = arguments.get("tasks") priority = arguments.get("priority", "medium") parent_task_id_arg = arguments.get("parent_task_id") # Define the write operation as an async function async def write_operation(): conn = None try: conn = get_db_connection() cursor = conn.cursor() created_tasks = [] created_at = datetime.datetime.now().isoformat() if tasks: # Multiple unassigned task creation for i, task in enumerate(tasks): task_id = ( f"task_{int(datetime.datetime.now().timestamp() * 1000)}_{i}" ) title = task["title"] description = task["description"] task_priority = task.get("priority", "medium") parent_task = task.get("parent_task_id") # Create unassigned task task_data = { "task_id": task_id, "title": title, "description": description, "assigned_to": None, # UNASSIGNED "created_by": "admin", "status": "unassigned", "priority": task_priority, "created_at": created_at, "updated_at": created_at, "parent_task": parent_task, "child_tasks": json.dumps([]), "depends_on_tasks": json.dumps([]), "notes": json.dumps([]), } cursor.execute( """ INSERT INTO tasks (task_id, title, description, assigned_to, created_by, status, priority, created_at, updated_at, parent_task, child_tasks, depends_on_tasks, notes) VALUES (:task_id, :title, :description, :assigned_to, :created_by, :status, :priority, :created_at, :updated_at, :parent_task, :child_tasks, :depends_on_tasks, :notes) """, task_data, ) log_agent_action_to_db( cursor, "admin", "created_unassigned_task", task_id=task_id, details={"title": title, "mode": "unassigned_multiple"}, ) # Add to global cache g.tasks[task_id] = task_data created_tasks.append( {"task_id": task_id, "title": title, "priority": task_priority} ) elif task_title and task_description: # Single unassigned task creation task_id = f"task_{int(datetime.datetime.now().timestamp() * 1000)}" task_data = { "task_id": task_id, "title": task_title, "description": task_description, "assigned_to": None, # UNASSIGNED "created_by": "admin", "status": "unassigned", "priority": priority, "created_at": created_at, "updated_at": created_at, "parent_task": parent_task_id_arg, "child_tasks": json.dumps([]), "depends_on_tasks": json.dumps([]), "notes": json.dumps([]), } cursor.execute( """ INSERT INTO tasks (task_id, title, description, assigned_to, created_by, status, priority, created_at, updated_at, parent_task, child_tasks, depends_on_tasks, notes) VALUES (:task_id, :title, :description, :assigned_to, :created_by, :status, :priority, :created_at, :updated_at, :parent_task, :child_tasks, :depends_on_tasks, :notes) """, task_data, ) log_agent_action_to_db( cursor, "admin", "created_unassigned_task", task_id=task_id, details={"title": task_title, "mode": "unassigned_single"}, ) # Add to global cache g.tasks[task_id] = task_data created_tasks.append( {"task_id": task_id, "title": task_title, "priority": priority} ) else: raise ValueError( "Error: Provide either 'task_title' and 'task_description' for single task, or 'tasks' array for multiple tasks." ) conn.commit() return created_tasks except Exception as e: if conn: conn.rollback() logger.error(f"Error creating unassigned tasks: {e}", exc_info=True) raise e finally: if conn: conn.close() # Execute the write operation through the queue try: created_tasks = await execute_db_write(write_operation) # Build response response_parts = [ f"✅ **Unassigned Tasks Created**", f" Tasks Created: {len(created_tasks)}", f" Status: Unassigned", "", ] for i, task in enumerate(created_tasks, 1): response_parts.append( f" {i}. {task['task_id']}: {task['title']} (Priority: {task['priority']})" ) response_parts.append( "\n💡 Use assign_task with task_ids parameter to assign these tasks to agents." ) return [mcp_types.TextContent(type="text", text="\n".join(response_parts))] except ValueError as e: return [mcp_types.TextContent(type="text", text=str(e))] except Exception as e: return [ mcp_types.TextContent( type="text", text=f"Error creating unassigned tasks: {e}" ) ] async def _assign_to_existing_tasks( arguments: Dict[str, Any], target_agent_id: str, task_ids: List[str], validate_agent_workload: bool, coordination_notes: str, ) -> List[mcp_types.TextContent]: """Mode 3: Assign agent to existing unassigned tasks""" conn = None try: conn = get_db_connection() cursor = conn.cursor() # Validate that all tasks exist and are unassigned placeholders = ",".join(["?" for _ in task_ids]) cursor.execute( f"SELECT task_id, title, assigned_to FROM tasks WHERE task_id IN ({placeholders})", task_ids, ) found_tasks = cursor.fetchall() if len(found_tasks) != len(task_ids): found_ids = [task["task_id"] for task in found_tasks] missing_ids = [tid for tid in task_ids if tid not in found_ids] return [ mcp_types.TextContent( type="text", text=f"Error: Tasks not found: {', '.join(missing_ids)}", ) ] # Check for already assigned tasks assigned_tasks = [ task for task in found_tasks if task["assigned_to"] is not None ] if assigned_tasks: assigned_list = [ f"{task['task_id']} (assigned to {task['assigned_to']})" for task in assigned_tasks ] return [ mcp_types.TextContent( type="text", text=f"Error: Some tasks are already assigned: {', '.join(assigned_list)}", ) ] # Validate agent exists cursor.execute( "SELECT agent_id FROM agents WHERE agent_id = ?", (target_agent_id,) ) if not cursor.fetchone(): return [ mcp_types.TextContent( type="text", text=f"Error: Agent '{target_agent_id}' not found." ) ] # Assign all tasks to the agent updated_at = datetime.datetime.now().isoformat() for task_id in task_ids: cursor.execute( "UPDATE tasks SET assigned_to = ?, updated_at = ? WHERE task_id = ?", (target_agent_id, updated_at, task_id), ) # Log the assignment log_agent_action_to_db( cursor, "admin", "assigned_task", task_id=task_id, details={ "agent_id": target_agent_id, "mode": "existing_task_assignment", }, ) # Update agent's current task if they don't have one (use first task) cursor.execute( "SELECT current_task FROM agents WHERE agent_id = ?", (target_agent_id,) ) agent_row = cursor.fetchone() if agent_row and agent_row["current_task"] is None: cursor.execute( "UPDATE agents SET current_task = ?, updated_at = ? WHERE agent_id = ?", (task_ids[0], updated_at, target_agent_id), ) conn.commit() # Build response task_titles = [task["title"] for task in found_tasks] response_parts = [ f"✅ **Tasks Assigned Successfully**", f" Agent: {target_agent_id}", f" Tasks Assigned: {len(task_ids)}", "", ] for i, (task_id, title) in enumerate(zip(task_ids, task_titles), 1): response_parts.append(f" {i}. {task_id}: {title}") if coordination_notes: response_parts.append(f"\n📋 **Coordination Notes:** {coordination_notes}") return [mcp_types.TextContent(type="text", text="\n".join(response_parts))] except Exception as e: if conn: conn.rollback() logger.error(f"Error assigning existing tasks: {e}", exc_info=True) return [mcp_types.TextContent(type="text", text=f"Error assigning tasks: {e}")] finally: if conn: conn.close() async def _create_and_assign_multiple_tasks( arguments: Dict[str, Any], target_agent_id: str, tasks: List[Dict[str, Any]], auto_suggest_parent: bool, validate_agent_workload: bool, coordination_notes: str, ) -> List[mcp_types.TextContent]: """Mode 2: Create multiple tasks and assign to agent""" conn = None try: conn = get_db_connection() cursor = conn.cursor() # Validate agent exists cursor.execute( "SELECT agent_id FROM agents WHERE agent_id = ?", (target_agent_id,) ) if not cursor.fetchone(): return [ mcp_types.TextContent( type="text", text=f"Error: Agent '{target_agent_id}' not found." ) ] created_tasks = [] created_at = datetime.datetime.now().isoformat() # Create each task for i, task in enumerate(tasks): task_id = f"task_{int(datetime.datetime.now().timestamp() * 1000)}_{i}" title = task["title"] description = task["description"] priority = task.get("priority", "medium") parent_task = task.get("parent_task_id") # Create task data task_data = { "task_id": task_id, "title": title, "description": description, "assigned_to": target_agent_id, "created_by": "admin", "status": "pending", "priority": priority, "created_at": created_at, "updated_at": created_at, "parent_task": parent_task, "child_tasks": json.dumps([]), "depends_on_tasks": json.dumps([]), "notes": json.dumps([]), } # Insert task cursor.execute( """ INSERT INTO tasks (task_id, title, description, assigned_to, created_by, status, priority, created_at, updated_at, parent_task, child_tasks, depends_on_tasks, notes) VALUES (:task_id, :title, :description, :assigned_to, :created_by, :status, :priority, :created_at, :updated_at, :parent_task, :child_tasks, :depends_on_tasks, :notes) """, task_data, ) # Log the creation log_agent_action_to_db( cursor, "admin", "assigned_task", task_id=task_id, details={ "agent_id": target_agent_id, "title": title, "mode": "multiple_task_creation", }, ) created_tasks.append( {"task_id": task_id, "title": title, "priority": priority} ) # Update agent's current task if they don't have one (use first task) cursor.execute( "SELECT current_task FROM agents WHERE agent_id = ?", (target_agent_id,) ) agent_row = cursor.fetchone() if agent_row and agent_row["current_task"] is None and created_tasks: cursor.execute( "UPDATE agents SET current_task = ?, updated_at = ? WHERE agent_id = ?", (created_tasks[0]["task_id"], created_at, target_agent_id), ) conn.commit() # Build response response_parts = [ f"✅ **Multiple Tasks Created and Assigned**", f" Agent: {target_agent_id}", f" Tasks Created: {len(created_tasks)}", "", ] for i, task in enumerate(created_tasks, 1): response_parts.append( f" {i}. {task['task_id']}: {task['title']} (Priority: {task['priority']})" ) if coordination_notes: response_parts.append(f"\n📋 **Coordination Notes:** {coordination_notes}") return [mcp_types.TextContent(type="text", text="\n".join(response_parts))] except Exception as e: if conn: conn.rollback() logger.error(f"Error creating multiple tasks: {e}", exc_info=True) return [ mcp_types.TextContent( type="text", text=f"Error creating multiple tasks: {e}" ) ] finally: if conn: conn.close() # --- assign_task tool --- # Original logic from main.py: lines 1319-1384 (assign_task_tool function) async def assign_task_tool_impl( arguments: Dict[str, Any], ) -> List[mcp_types.TextContent]: admin_auth_token = arguments.get("token") target_agent_token = arguments.get("agent_token") # Mode 1: Single task creation (existing behavior) task_title = arguments.get("task_title") task_description = arguments.get("task_description") priority = arguments.get("priority", "medium") # Default from schema depends_on_tasks_list = arguments.get("depends_on_tasks") # List[str] or None parent_task_id_arg = arguments.get("parent_task_id") # Optional str # Mode 2: Multiple task creation (new) tasks = arguments.get("tasks") # List[Dict] with task details # Mode 3: Existing task assignment (new) task_ids = arguments.get("task_ids") # List[str] of existing task IDs # Smart coordination features auto_suggest_parent = arguments.get( "auto_suggest_parent", True ) # Auto-suggest parent tasks validate_agent_workload = arguments.get( "validate_agent_workload", True ) # Check agent capacity auto_schedule = arguments.get( "auto_schedule", False ) # Auto-schedule based on dependencies coordination_notes = arguments.get( "coordination_notes" ) # Optional coordination context estimated_hours = arguments.get("estimated_hours") # Optional workload estimation if not verify_token(admin_auth_token, "admin"): # main.py:1326 return [ mcp_types.TextContent( type="text", text="Unauthorized: Admin token required" ) ] # Handle unassigned task creation (agent_token is optional) if not target_agent_token: # Mode 0: Create unassigned tasks return await _create_unassigned_tasks(arguments) # Convert agent_token to agent_id and validate agent conn = get_db_connection() cursor = conn.cursor() try: # Find agent by token cursor.execute( "SELECT agent_id FROM agents WHERE token = ?", (target_agent_token,) ) agent_row = cursor.fetchone() if not agent_row: conn.close() return [ mcp_types.TextContent( type="text", text="Error: Agent token not found. Agent may not exist or token is invalid.", ) ] target_agent_id = agent_row["agent_id"] # Prevent admin agents from being assigned tasks if target_agent_id.lower().startswith("admin"): conn.close() return [ mcp_types.TextContent( type="text", text="Error: Admin agents cannot be assigned tasks. Admin agents are for coordination and management only.", ) ] except Exception as e: conn.close() logger.error(f"Error validating agent token: {e}", exc_info=True) return [mcp_types.TextContent(type="text", text=f"Error validating agent: {e}")] finally: conn.close() # Determine operation mode and validate parameters (when agent_token provided) if task_ids: # Mode 3: Assign to existing tasks operation_mode = "existing" if not isinstance(task_ids, list) or not task_ids: return [ mcp_types.TextContent( type="text", text="Error: task_ids must be a non-empty list of task IDs.", ) ] elif tasks: # Mode 2: Create multiple tasks + assign operation_mode = "multiple" if not isinstance(tasks, list) or not tasks: return [ mcp_types.TextContent( type="text", text="Error: tasks must be a non-empty list of task objects.", ) ] # Validate each task object for i, task in enumerate(tasks): if not isinstance(task, dict) or not all( [task.get("title"), task.get("description")] ): return [ mcp_types.TextContent( type="text", text=f"Error: Task {i+1} must have 'title' and 'description' fields.", ) ] else: # Mode 1: Single task creation (existing behavior) operation_mode = "single" if not all([task_title, task_description]): return [ mcp_types.TextContent( type="text", text="Error: task_title and task_description are required for single task creation, or provide 'tasks' array for multiple tasks, or 'task_ids' for existing task assignment.", ) ] # Route to appropriate handler based on operation mode if operation_mode == "existing": return await _assign_to_existing_tasks( arguments, target_agent_id, task_ids, validate_agent_workload, coordination_notes, ) elif operation_mode == "multiple": return await _create_and_assign_multiple_tasks( arguments, target_agent_id, tasks, auto_suggest_parent, validate_agent_workload, coordination_notes, ) else: # operation_mode == "single" - continue with existing logic pass # Enforce single root task rule BEFORE any processing (Mode 1: Single task) if parent_task_id_arg is None: conn = get_db_connection() cursor = conn.cursor() cursor.execute( "SELECT COUNT(*) as count, GROUP_CONCAT(task_id) as root_ids FROM tasks WHERE parent_task IS NULL" ) result = cursor.fetchone() root_count = result["count"] root_ids = result["root_ids"] if root_count > 0: if auto_suggest_parent: # Use smart parent suggestion parent_suggestions = _suggest_optimal_parent_task( cursor, target_agent_id, task_description ) suggestion_text = "\n🧠 **Smart Parent Suggestions:**\n" if parent_suggestions["has_suggestions"]: for i, suggestion in enumerate( parent_suggestions["suggestions"], 1 ): suggestion_text += ( f" {i}. {suggestion['task_id']}: {suggestion['title']}\n" ) suggestion_text += f" Status: {suggestion['status']} | Priority: {suggestion['priority']} | {suggestion['reason']}\n" else: # Fallback to basic suggestions cursor.execute( """ SELECT task_id, title, status FROM tasks WHERE status IN ('pending', 'in_progress') AND assigned_to = ? ORDER BY updated_at DESC LIMIT 3 """, (target_agent_id,), ) basic_suggestions = cursor.fetchall() if basic_suggestions: suggestion_text += " Based on agent's recent tasks:\n" for task in basic_suggestions: suggestion_text += f" - {task['task_id']}: {task['title']} (status: {task['status']})\n" else: suggestion_text += ( " No suitable parent tasks found for this agent.\n" ) suggestion_text += " Consider assigning to a different agent with active tasks.\n" else: # Basic suggestion fallback cursor.execute( """ SELECT task_id, title, status FROM tasks WHERE status IN ('pending', 'in_progress') ORDER BY CASE WHEN assigned_to = ? THEN 0 ELSE 1 END, created_at DESC LIMIT 5 """, (target_agent_id,), ) suggestions = cursor.fetchall() suggestion_text = "\nSuggested parent tasks:\n" for task in suggestions: suggestion_text += f" - {task['task_id']}: {task['title']} (status: {task['status']})\n" conn.close() return [ mcp_types.TextContent( type="text", text=f"ERROR: Cannot create task without parent. {root_count} root task(s) already exist: {root_ids}\n\n" f"You MUST specify a parent_task_id. Every task except the first must have a parent.\n" f"{suggestion_text}\n" f"💡 Use auto_suggest_parent=true for smarter suggestions based on task content.\n" f"Use 'view_tasks' for complete task list, or use one of the suggestions above.", ) ] conn.close() conn = None try: conn = get_db_connection() cursor = conn.cursor() # Check if agent exists (in memory or DB) - main.py:1331-1346 agent_exists_in_memory = target_agent_id in g.agent_working_dirs assigned_agent_active_token: Optional[str] = None if agent_exists_in_memory: for tkn, data in g.active_agents.items(): if data.get("agent_id") == target_agent_id: assigned_agent_active_token = tkn break if not agent_exists_in_memory: cursor.execute( "SELECT token FROM agents WHERE agent_id = ? AND status != ?", (target_agent_id, "terminated"), ) row = cursor.fetchone() if not row: return [ mcp_types.TextContent( type="text", text=f"Agent '{target_agent_id}' not found or is terminated.", ) ] # Agent exists in DB but not memory, can still assign task. logger.warning( f"Assigning task to agent {target_agent_id} found in DB but not active memory." ) # assigned_agent_active_token remains None if not in active_agents # Generate task ID and timestamps first new_task_id = _generate_task_id() created_at_iso = datetime.datetime.now().isoformat() status = "pending" # Check single root task rule if parent_task_id_arg is None: cursor.execute( "SELECT COUNT(*) as count, MIN(task_id) as root_id FROM tasks WHERE parent_task IS NULL" ) result = cursor.fetchone() root_count = result["count"] existing_root_id = result["root_id"] if root_count > 0: logger.error( f"Attempt to create second root task. Existing root: {existing_root_id}" ) return [ mcp_types.TextContent( type="text", text=f"ERROR: Cannot create root task. A root task already exists ({existing_root_id}). All new tasks must have a parent.", ) ] # Smart workload validation workload_analysis = None workload_warnings = [] if validate_agent_workload: workload_analysis = _analyze_agent_workload(cursor, target_agent_id) if not workload_analysis["can_take_new_task"]: warning_msg = ( f"⚠️ Agent workload warning: {workload_analysis['capacity_status']} " ) warning_msg += ( f"({workload_analysis['total_active_tasks']} active tasks, " ) warning_msg += ( f"{workload_analysis['high_priority_tasks']} high priority)" ) workload_warnings.append(warning_msg) if workload_analysis["recommendations"]: workload_warnings.extend( [ f" 💡 {rec}" for rec in workload_analysis["recommendations"][:2] ] ) # System 8: RAG Pre-Check for Task Placement final_parent_task_id = parent_task_id_arg final_depends_on_tasks = depends_on_tasks_list validation_performed = False validation_message = "" if ENABLE_TASK_PLACEMENT_RAG: validation_performed = True validation_result = await validate_task_placement( title=task_title, description=task_description, parent_task_id=parent_task_id_arg, depends_on_tasks=depends_on_tasks_list, created_by="admin", auth_token=admin_auth_token, ) suggestion_message = format_suggestions_for_agent( validation_result, parent_task_id_arg, depends_on_tasks_list ) # For denied status, block creation unless override is allowed if validation_result["status"] == "denied" and not ALLOW_RAG_OVERRIDE: return [ mcp_types.TextContent( type="text", text=f"Task creation BLOCKED by RAG validation:\n{suggestion_message}", ) ] # Process suggestions - always apply them unless explicitly overridden if validation_result["status"] != "approved": validation_message = f"\nRAG Validation ({validation_result['status']}):\n{suggestion_message}\n" # Apply suggestions by default (agent should see this behavior) suggestions = validation_result["suggestions"] if suggestions.get("parent_task") is not None: final_parent_task_id = suggestions["parent_task"] validation_message += ( f"✓ Applied suggested parent: {final_parent_task_id}\n" ) if suggestions.get("dependencies"): final_depends_on_tasks = suggestions["dependencies"] validation_message += ( f"✓ Applied suggested dependencies: {final_depends_on_tasks}\n" ) logger.info( f"RAG suggestions automatically applied for task {new_task_id}" ) else: validation_message = "\n✓ RAG validation approved placement\n" # Build initial notes with coordination information initial_notes = [] # Add coordination notes if provided if coordination_notes: initial_notes.append( { "timestamp": created_at_iso, "author": "admin", "content": f"📋 Coordination: {coordination_notes}", } ) # Add workload information if workload_analysis: workload_note = ( f"👤 Agent workload: {workload_analysis['capacity_status']} " ) workload_note += f"({workload_analysis['total_active_tasks']} active tasks)" if estimated_hours: workload_note += f" | Estimated: {estimated_hours}h" initial_notes.append( { "timestamp": created_at_iso, "author": "system", "content": workload_note, } ) # Add smart parent suggestion note if used if auto_suggest_parent and final_parent_task_id: initial_notes.append( { "timestamp": created_at_iso, "author": "system", "content": f"🧠 Smart assignment: Parent task suggested based on content similarity", } ) task_data_for_db = { # main.py:1354-1367 "task_id": new_task_id, "title": task_title, "description": task_description, "assigned_to": target_agent_id, "created_by": "admin", # Admin is assigning "status": status, "priority": priority, "created_at": created_at_iso, "updated_at": created_at_iso, "parent_task": final_parent_task_id, # Use validated value "child_tasks": json.dumps([]), "depends_on_tasks": json.dumps( final_depends_on_tasks or [] ), # Use validated value "notes": json.dumps(initial_notes), } # Save task to database (main.py:1370-1373) cursor.execute( """ INSERT INTO tasks (task_id, title, description, assigned_to, created_by, status, priority, created_at, updated_at, parent_task, child_tasks, depends_on_tasks, notes) VALUES (:task_id, :title, :description, :assigned_to, :created_by, :status, :priority, :created_at, :updated_at, :parent_task, :child_tasks, :depends_on_tasks, :notes) """, task_data_for_db, ) # Update agent's current task in DB if they don't have one (main.py:1376-1387) should_update_agent_current_task = False if ( assigned_agent_active_token and assigned_agent_active_token in g.active_agents ): if g.active_agents[assigned_agent_active_token].get("current_task") is None: should_update_agent_current_task = True else: # Agent not in active memory, check DB cursor.execute( "SELECT current_task FROM agents WHERE agent_id = ?", (target_agent_id,) ) agent_row = cursor.fetchone() if agent_row and agent_row["current_task"] is None: should_update_agent_current_task = True if should_update_agent_current_task: cursor.execute( "UPDATE agents SET current_task = ?, updated_at = ? WHERE agent_id = ?", (new_task_id, created_at_iso, target_agent_id), ) log_agent_action_to_db( cursor, "admin", "assigned_task", task_id=new_task_id, details={"agent_id": target_agent_id, "title": task_title}, ) conn.commit() # Update agent's current task in memory if needed (main.py:1390-1391) if ( should_update_agent_current_task and assigned_agent_active_token and assigned_agent_active_token in g.active_agents ): g.active_agents[assigned_agent_active_token]["current_task"] = new_task_id # Add task to in-memory tasks dictionary (main.py:1394-1398) # Convert JSON strings back for in-memory representation task_data_for_memory = task_data_for_db.copy() task_data_for_memory["child_tasks"] = [] task_data_for_memory["depends_on_tasks"] = ( final_depends_on_tasks or [] ) # Use validated value task_data_for_memory["notes"] = [] g.tasks[new_task_id] = task_data_for_memory # System 8: Index the new task for RAG # Convert database format to the format expected by indexing index_data = task_data_for_memory.copy() index_data["depends_on_tasks"] = final_depends_on_tasks or [] # Start indexing asynchronously (fire and forget) import asyncio asyncio.create_task(index_task_data(new_task_id, index_data)) log_audit( "admin", "assign_task", {"task_id": new_task_id, "agent_id": target_agent_id, "title": task_title}, ) # main.py:1404 logger.info( f"Task '{new_task_id}' ({task_title}) assigned to agent '{target_agent_id}'." ) # Build comprehensive response response_parts = [f"✅ **Task Assigned Successfully**"] response_parts.append(f" Task ID: {new_task_id}") response_parts.append(f" Title: {task_title}") response_parts.append(f" Agent: {target_agent_id}") response_parts.append(f" Priority: {priority}") if final_parent_task_id: response_parts.append(f" Parent: {final_parent_task_id}") if final_depends_on_tasks: response_parts.append( f" Dependencies: {', '.join(final_depends_on_tasks)}" ) if estimated_hours: response_parts.append(f" Estimated: {estimated_hours} hours") # Add workload analysis if workload_analysis: response_parts.append("") capacity_icon = ( "🟢" if workload_analysis["capacity_status"] == "available" else "🟡" if workload_analysis["capacity_status"] == "busy" else "🔴" ) response_parts.append( f"👤 **Agent Workload:** {capacity_icon} {workload_analysis['capacity_status'].title()}" ) response_parts.append( f" Active Tasks: {workload_analysis['total_active_tasks']} ({workload_analysis['high_priority_tasks']} high priority)" ) if workload_warnings: response_parts.extend(workload_warnings) # Add RAG validation info if validation_performed and validation_message: response_parts.append(validation_message) # Add coordination info if coordination_notes: response_parts.append(f"\n📋 **Coordination Notes:** {coordination_notes}") # Add smart feature usage tips response_parts.append("\n💡 **Smart Features Used:**") if auto_suggest_parent: response_parts.append( "• Smart parent suggestion based on content similarity" ) if validate_agent_workload: response_parts.append("• Agent workload analysis and capacity checking") if coordination_notes: response_parts.append("• Coordination context captured for team awareness") return [mcp_types.TextContent(type="text", text="\n".join(response_parts))] except sqlite3.Error as e_sql: if conn: conn.rollback() logger.error( f"Database error assigning task to agent {target_agent_id}: {e_sql}", exc_info=True, ) return [ mcp_types.TextContent( type="text", text=f"Database error assigning task: {e_sql}" ) ] except Exception as e: if conn: conn.rollback() logger.error( f"Unexpected error assigning task to agent {target_agent_id}: {e}", exc_info=True, ) return [ mcp_types.TextContent( type="text", text=f"Unexpected error assigning task: {e}" ) ] finally: if conn: conn.close() # --- create_self_task tool --- # Original logic from main.py: lines 1409-1474 (create_self_task_tool function) async def create_self_task_tool_impl( arguments: Dict[str, Any], ) -> List[mcp_types.TextContent]: agent_auth_token = arguments.get("token") task_title = arguments.get("task_title") task_description = arguments.get("task_description") priority = arguments.get("priority", "medium") depends_on_tasks_list = arguments.get("depends_on_tasks") parent_task_id_arg = arguments.get("parent_task_id") requesting_agent_id = get_agent_id(agent_auth_token) # main.py:1415 if not requesting_agent_id: return [ mcp_types.TextContent( type="text", text="Unauthorized: Valid token required" ) ] if not all([task_title, task_description]): return [ mcp_types.TextContent( type="text", text="Error: task_title and task_description are required." ) ] # Determine actual parent task ID (main.py:1419-1423) actual_parent_task_id = parent_task_id_arg if actual_parent_task_id is None and agent_auth_token in g.active_agents: actual_parent_task_id = g.active_agents[agent_auth_token].get("current_task") conn = None try: conn = get_db_connection() cursor = conn.cursor() # Hierarchy Validation - Agents can NEVER create root tasks if requesting_agent_id != "admin" and actual_parent_task_id is None: logger.error( f"Agent '{requesting_agent_id}' attempted to create a root task" ) # Find a suitable parent task for the agent cursor.execute( """ SELECT task_id, title FROM tasks WHERE assigned_to = ? OR created_by = ? ORDER BY created_at DESC LIMIT 1 """, (requesting_agent_id, requesting_agent_id), ) suggested_parent = cursor.fetchone() suggestion_text = "" if suggested_parent: suggestion_text = f"\nSuggested parent: {suggested_parent['task_id']} ({suggested_parent['title']})" return [ mcp_types.TextContent( type="text", text=f"ERROR: Agents cannot create root tasks. Every task must have a parent.{suggestion_text}\nPlease specify a parent_task_id.", ) ] # Additional check for single root rule even for admin if actual_parent_task_id is None: cursor.execute( "SELECT COUNT(*) as count, MIN(task_id) as root_id FROM tasks WHERE parent_task IS NULL" ) result = cursor.fetchone() root_count = result["count"] existing_root_id = result["root_id"] if root_count > 0: logger.error( f"Attempt to create second root task. Existing root: {existing_root_id}" ) return [ mcp_types.TextContent( type="text", text=f"ERROR: Cannot create root task. A root task already exists ({existing_root_id}). All new tasks must have a parent.", ) ] # Generate task ID and timestamps first new_task_id = _generate_task_id() created_at_iso = datetime.datetime.now().isoformat() status = "pending" # System 8: RAG Pre-Check for Task Placement final_parent_task_id = actual_parent_task_id final_depends_on_tasks = depends_on_tasks_list validation_message = "" if ENABLE_TASK_PLACEMENT_RAG: validation_result = await validate_task_placement( title=task_title, description=task_description, parent_task_id=actual_parent_task_id, depends_on_tasks=depends_on_tasks_list, created_by=requesting_agent_id, auth_token=agent_auth_token, ) suggestion_message = format_suggestions_for_agent( validation_result, actual_parent_task_id, depends_on_tasks_list ) # Check for denial if validation_result["status"] == "denied": return [ mcp_types.TextContent( type="text", text=f"Task creation BLOCKED by RAG validation:\n{suggestion_message}", ) ] # Process validation results if validation_result["status"] != "approved": validation_message = f"\nRAG Validation ({validation_result['status']}):\n{suggestion_message}\n" # For agents, always accept suggestions automatically suggestions = validation_result["suggestions"] if suggestions.get("parent_task") is not None: final_parent_task_id = suggestions["parent_task"] validation_message += ( f"✓ Applied suggested parent: {final_parent_task_id}\n" ) if suggestions.get("dependencies"): final_depends_on_tasks = suggestions["dependencies"] validation_message += ( f"✓ Applied suggested dependencies: {final_depends_on_tasks}\n" ) logger.info( f"Agent {requesting_agent_id} automatically accepted RAG suggestions" ) # Check if escalation is needed if should_escalate_to_admin(validation_result, requesting_agent_id): logger.warning( f"Task {new_task_id} flagged for admin review: {validation_result.get('message')}" ) validation_message += "⚠️ Task flagged for admin review\n" else: validation_message = "\n✓ RAG validation approved placement\n" task_data_for_db = { # main.py:1439-1452 "task_id": new_task_id, "title": task_title, "description": task_description, "assigned_to": requesting_agent_id, "created_by": requesting_agent_id, # Agent creates for self "status": status, "priority": priority, "created_at": created_at_iso, "updated_at": created_at_iso, "parent_task": final_parent_task_id, # Use validated value "child_tasks": json.dumps([]), "depends_on_tasks": json.dumps( final_depends_on_tasks or [] ), # Use validated value "notes": json.dumps([]), } cursor.execute( """ INSERT INTO tasks (task_id, title, description, assigned_to, created_by, status, priority, created_at, updated_at, parent_task, child_tasks, depends_on_tasks, notes) VALUES (:task_id, :title, :description, :assigned_to, :created_by, :status, :priority, :created_at, :updated_at, :parent_task, :child_tasks, :depends_on_tasks, :notes) """, task_data_for_db, ) # Update agent's current task in DB if they don't have one (main.py:1455-1469) should_update_agent_current_task = False if agent_auth_token in g.active_agents: # Check memory first if g.active_agents[agent_auth_token].get("current_task") is None: should_update_agent_current_task = True elif ( requesting_agent_id != "admin" ): # If not admin and not in active_agents (e.g. loaded from DB only) cursor.execute( "SELECT current_task FROM agents WHERE agent_id = ?", (requesting_agent_id,), ) agent_row = cursor.fetchone() if agent_row and agent_row["current_task"] is None: should_update_agent_current_task = True # Admin agents don't have a persistent 'current_task' in the agents table. if should_update_agent_current_task and requesting_agent_id != "admin": cursor.execute( "UPDATE agents SET current_task = ?, updated_at = ? WHERE agent_id = ?", (new_task_id, created_at_iso, requesting_agent_id), ) log_agent_action_to_db( cursor, requesting_agent_id, "created_self_task", task_id=new_task_id, details={"title": task_title}, ) conn.commit() if should_update_agent_current_task and agent_auth_token in g.active_agents: g.active_agents[agent_auth_token]["current_task"] = new_task_id task_data_for_memory = task_data_for_db.copy() task_data_for_memory["child_tasks"] = [] task_data_for_memory["depends_on_tasks"] = ( final_depends_on_tasks or [] ) # Use validated value task_data_for_memory["notes"] = [] g.tasks[new_task_id] = task_data_for_memory # System 8: Index the new task for RAG # Convert database format to the format expected by indexing index_data = task_data_for_memory.copy() # No need to override depends_on_tasks again, it's already the validated value # Start indexing asynchronously (fire and forget) import asyncio asyncio.create_task(index_task_data(new_task_id, index_data)) log_audit( requesting_agent_id, "create_self_task", {"task_id": new_task_id, "title": task_title}, ) # main.py:1485 logger.info( f"Agent '{requesting_agent_id}' created self-task '{new_task_id}' ({task_title})." ) response_text = ( f"Self-assigned task '{new_task_id}' created.\nTitle: {task_title}" ) if validation_message: response_text += validation_message return [mcp_types.TextContent(type="text", text=response_text)] except sqlite3.Error as e_sql: if conn: conn.rollback() logger.error( f"Database error creating self task for agent {requesting_agent_id}: {e_sql}", exc_info=True, ) return [ mcp_types.TextContent( type="text", text=f"Database error creating self task: {e_sql}" ) ] except Exception as e: if conn: conn.rollback() logger.error( f"Unexpected error creating self task for agent {requesting_agent_id}: {e}", exc_info=True, ) return [ mcp_types.TextContent( type="text", text=f"Unexpected error creating self task: {e}" ) ] finally: if conn: conn.close() # --- update_task_status tool --- # Original logic from main.py: lines 1477-1583 (update_task_status_tool function) async def update_task_status_tool_impl( arguments: Dict[str, Any], ) -> List[mcp_types.TextContent]: agent_auth_token = arguments.get("token") task_id_to_update = arguments.get("task_id") task_ids_bulk = arguments.get( "task_ids" ) # NEW: List of task IDs for bulk operations new_status = arguments.get("status") notes_content = arguments.get("notes") # Optional string for new note # Admin-only fields for full task update new_title = arguments.get("title") new_description = arguments.get("description") new_priority = arguments.get("priority") new_assigned_to = arguments.get("assigned_to") new_depends_on_tasks = arguments.get("depends_on_tasks") # List[str] or None # Smart features auto_update_dependencies = arguments.get( "auto_update_dependencies", True ) # Auto-update dependent tasks cascade_to_children = arguments.get( "cascade_to_children", False ) # Cascade status to child tasks validate_dependencies = arguments.get( "validate_dependencies", True ) # Validate dependency constraints requesting_agent_id = get_agent_id(agent_auth_token) if not requesting_agent_id: return [ mcp_types.TextContent( type="text", text="Unauthorized: Valid token required" ) ] # Determine if this is bulk or single operation task_ids_to_process = [] if task_ids_bulk: task_ids_to_process = task_ids_bulk elif task_id_to_update: task_ids_to_process = [task_id_to_update] else: return [ mcp_types.TextContent( type="text", text="Error: Either task_id or task_ids is required." ) ] if not new_status: return [mcp_types.TextContent(type="text", text="Error: status is required.")] valid_statuses = ["pending", "in_progress", "completed", "cancelled", "failed"] if new_status not in valid_statuses: return [ mcp_types.TextContent( type="text", text=f"Invalid status: {new_status}. Valid: {', '.join(valid_statuses)}", ) ] is_admin_request = verify_token(agent_auth_token, "admin") conn = None try: conn = get_db_connection() cursor = conn.cursor() # Process tasks (bulk or single) results = [] tasks_to_cascade = [] # Phase 1: Update primary tasks for task_id in task_ids_to_process: result = await _update_single_task( cursor, task_id, new_status, requesting_agent_id, is_admin_request, notes_content, new_title, new_description, new_priority, new_assigned_to, new_depends_on_tasks, ) results.append(result) if result["success"] and cascade_to_children: tasks_to_cascade.extend(result["child_tasks"]) # Log individual task action if result["success"]: log_details = {"status": new_status, "old_status": result["old_status"]} if notes_content: log_details["notes_added"] = True log_agent_action_to_db( cursor, requesting_agent_id, "update_task_status", task_id=task_id, details=log_details, ) # Phase 2: Smart cascade to children if requested cascade_results = [] if cascade_to_children and tasks_to_cascade: for child_task_id in tasks_to_cascade: # Only cascade certain status changes to avoid breaking workflows if new_status in ["cancelled", "failed"]: # Cascade blocking states child_result = await _update_single_task( cursor, child_task_id, new_status, requesting_agent_id, is_admin_request, f"Auto-cascaded from parent task status change", None, None, None, None, None, ) cascade_results.append(child_result) # Phase 3: Smart dependency updates if requested dependency_updates = [] if auto_update_dependencies: for result in results: if result["success"] and new_status == "completed": # Find tasks that depend on this completed task cursor.execute("SELECT task_id, depends_on_tasks FROM tasks") all_tasks = cursor.fetchall() for task_row in all_tasks: task_deps = json.loads(task_row["depends_on_tasks"] or "[]") if result["task_id"] in task_deps: # Check if all dependencies are now completed all_deps_completed = True for dep_id in task_deps: if ( dep_id != result["task_id"] ): # Skip the one we just completed cursor.execute( "SELECT status FROM tasks WHERE task_id = ?", (dep_id,), ) dep_row = cursor.fetchone() if not dep_row or dep_row["status"] != "completed": all_deps_completed = False break if all_deps_completed: # Auto-update dependent task to in_progress if it's pending cursor.execute( "SELECT status FROM tasks WHERE task_id = ?", (task_row["task_id"],), ) dependent_task = cursor.fetchone() if ( dependent_task and dependent_task["status"] == "pending" ): dep_result = await _update_single_task( cursor, task_row["task_id"], "in_progress", requesting_agent_id, is_admin_request, f"Auto-advanced: all dependencies completed", None, None, None, None, None, ) dependency_updates.append(dep_result) # Phase 3.5: Auto-launch testing agents for completed tasks testing_agent_launches = [] for result in results: if result["success"] and new_status == "completed": try: testing_success = await _launch_testing_agent_for_completed_task( cursor, result["task_id"], requesting_agent_id ) testing_agent_launches.append( { "task_id": result["task_id"], "testing_agent_launched": testing_success, } ) logger.info( f"Testing agent launch for task {result['task_id']}: {'SUCCESS' if testing_success else 'FAILED'}" ) except Exception as e: logger.error( f"Failed to launch testing agent for task {result['task_id']}: {e}" ) testing_agent_launches.append( { "task_id": result["task_id"], "testing_agent_launched": False, "error": str(e), } ) # Commit all changes conn.commit() # Phase 4: Re-index updated tasks import asyncio for result in results + cascade_results + dependency_updates: if result.get("success"): task_id = result["task_id"] if task_id in g.tasks: asyncio.create_task( index_task_data(task_id, g.tasks[task_id].copy()) ) # Build comprehensive response successful_updates = [r for r in results if r.get("success")] failed_updates = [r for r in results if not r.get("success")] response_parts = [] if len(task_ids_to_process) == 1: # Single task response if successful_updates: response_parts.append( f"Task {successful_updates[0]['task_id']} status updated to {new_status}." ) else: response_parts.append( f"Failed to update task: {failed_updates[0]['error']}" ) else: # Bulk operation response response_parts.append( f"Bulk update completed: {len(successful_updates)}/{len(task_ids_to_process)} tasks updated." ) if failed_updates: response_parts.append(f"Failed updates:") for fail in failed_updates[:3]: # Limit to first 3 failures response_parts.append(f" - {fail['error']}") if len(failed_updates) > 3: response_parts.append( f" ... and {len(failed_updates) - 3} more failures" ) # Add smart feature results if cascade_results: successful_cascades = [r for r in cascade_results if r.get("success")] response_parts.append( f"Cascaded to {len(successful_cascades)} child tasks." ) if dependency_updates: successful_deps = [r for r in dependency_updates if r.get("success")] response_parts.append( f"Auto-advanced {len(successful_deps)} dependent tasks." ) log_audit( requesting_agent_id, "update_task_status", { "task_count": len(task_ids_to_process), "successful": len(successful_updates), "failed": len(failed_updates), "status": new_status, "cascade_count": len(cascade_results), "dependency_updates": len(dependency_updates), }, ) return [mcp_types.TextContent(type="text", text="\n".join(response_parts))] except sqlite3.Error as e_sql: if conn: conn.rollback() logger.error(f"Database error updating tasks: {e_sql}", exc_info=True) return [ mcp_types.TextContent( type="text", text=f"Database error updating tasks: {e_sql}" ) ] except Exception as e: if conn: conn.rollback() logger.error(f"Unexpected error updating tasks: {e}", exc_info=True) return [ mcp_types.TextContent( type="text", text=f"Unexpected error updating tasks: {e}" ) ] finally: if conn: conn.close() # --- view_tasks tool --- # Original logic from main.py: lines 1586-1655 (view_tasks_tool function) async def view_tasks_tool_impl( arguments: Dict[str, Any], ) -> List[mcp_types.TextContent]: agent_auth_token = arguments.get("token") filter_agent_id = arguments.get("agent_id") # Optional agent_id to filter by filter_status = arguments.get("status") # Optional status to filter by max_tokens = arguments.get( "max_tokens", 25000 ) # Maximum response tokens (default: 25k) start_after = arguments.get( "start_after" ) # Task ID to start after (for pagination) summary_mode = arguments.get( "summary_mode", False ) # If True, show only summary info # Smart filtering and analysis options show_dependencies = arguments.get( "show_dependencies", False ) # Show dependency graph info show_health_analysis = arguments.get( "show_health_analysis", False ) # Show task health metrics filter_priority = arguments.get("filter_priority") # Filter by priority filter_parent_task = arguments.get("filter_parent_task") # Filter by parent task show_blocked_tasks = arguments.get( "show_blocked_tasks", False ) # Show only blocked tasks sort_by = arguments.get( "sort_by", "created_at" ) # Sort by: created_at, updated_at, priority, status requesting_agent_id = get_agent_id(agent_auth_token) if not requesting_agent_id: return [ mcp_types.TextContent( type="text", text="Unauthorized: Valid token required" ) ] is_admin_request = verify_token(agent_auth_token, "admin") # Permission check target_agent_id_for_filter = filter_agent_id if not is_admin_request: if filter_agent_id is None: target_agent_id_for_filter = requesting_agent_id elif filter_agent_id != requesting_agent_id: return [ mcp_types.TextContent( type="text", text="Unauthorized: Non-admin agents can only view their own tasks or all tasks assigned to them if no agent_id filter is specified.", ) ] # Advanced filtering with dependency analysis tasks_to_display: List[Dict[str, Any]] = [] # Pre-analyze all tasks for dependency checking all_tasks_dict = dict(g.tasks) for task_id, task_data in g.tasks.items(): # Basic permission filtering matches_agent = True if ( target_agent_id_for_filter and task_data.get("assigned_to") != target_agent_id_for_filter ): matches_agent = False # Status filtering matches_status = True if filter_status and task_data.get("status") != filter_status: matches_status = False # Priority filtering matches_priority = True if filter_priority and task_data.get("priority") != filter_priority: matches_priority = False # Parent task filtering matches_parent = True if filter_parent_task and task_data.get("parent_task") != filter_parent_task: matches_parent = False # Blocked tasks filtering matches_blocked = True if show_blocked_tasks: dependency_analysis = _analyze_task_dependencies(task_data, all_tasks_dict) matches_blocked = ( dependency_analysis["is_blocked"] or not dependency_analysis["can_start"] ) if ( matches_agent and matches_status and matches_priority and matches_parent and matches_blocked ): # Add dependency analysis if requested if show_dependencies: task_data_copy = task_data.copy() task_data_copy["_dependency_analysis"] = _analyze_task_dependencies( task_data, all_tasks_dict ) tasks_to_display.append(task_data_copy) else: tasks_to_display.append(task_data) # Smart sorting def get_sort_key(task): if sort_by == "priority": priority_order = {"high": 3, "medium": 2, "low": 1} return priority_order.get(task.get("priority", "medium"), 2) elif sort_by == "status": status_order = { "failed": 5, "in_progress": 4, "pending": 3, "completed": 2, "cancelled": 1, } return status_order.get(task.get("status", "pending"), 3) elif sort_by == "updated_at": return task.get("updated_at", "") else: # created_at (default) return task.get("created_at", "") reverse_sort = sort_by in ["created_at", "updated_at", "priority", "status"] tasks_to_display.sort(key=get_sort_key, reverse=reverse_sort) # Handle pagination with start_after if start_after: start_index = 0 for i, task in enumerate(tasks_to_display): if task.get("task_id") == start_after: start_index = i + 1 break tasks_to_display = tasks_to_display[start_index:] if not tasks_to_display: response_text = "No tasks found matching the criteria." else: # Generate health analysis if requested health_analysis = None if show_health_analysis: health_analysis = _calculate_task_health_metrics(tasks_to_display) # Build response with smart headers filter_info = [] if filter_status: filter_info.append(f"status={filter_status}") if filter_priority: filter_info.append(f"priority={filter_priority}") if filter_agent_id: filter_info.append(f"agent={filter_agent_id}") if filter_parent_task: filter_info.append(f"parent={filter_parent_task}") if show_blocked_tasks: filter_info.append("blocked_only=true") header = f"Tasks ({len(tasks_to_display)} found" if filter_info: header += f", filtered by: {', '.join(filter_info)}" header += f", sorted by: {sort_by})" response_parts = [header + "\n"] # Add health analysis at the top if requested if health_analysis: health_status = health_analysis["health_status"] health_score = health_analysis["health_score"] health_icon = ( "🟢" if health_status == "excellent" else ( "🟡" if health_status == "good" else "🟠" if health_status == "needs_attention" else "🔴" ) ) response_parts.append( f"📊 **Health Analysis:** {health_icon} {health_status.title()} ({health_score}/100)" ) response_parts.append( f" Status: {health_analysis['status_distribution']}" ) response_parts.append( f" Issues: {health_analysis['blocked_tasks']} blocked, {health_analysis['stale_tasks']} stale" ) response_parts.append("") current_tokens = estimate_tokens("\n".join(response_parts)) tasks_included = 0 last_task_id = None truncated = False for task in tasks_to_display: # Format task with dependency info if requested if show_dependencies and "_dependency_analysis" in task: task_text = _format_task_with_dependencies(task) elif summary_mode: task_text = _format_task_summary(task) else: task_text = _format_task_detailed(task) task_tokens = estimate_tokens(task_text) # Check token limit with safety buffer safety_buffer = 1000 if ( current_tokens + task_tokens > (max_tokens - safety_buffer) and tasks_included > 0 ): truncated = True break response_parts.append(f"{task_text}\n") current_tokens += task_tokens tasks_included += 1 last_task_id = task.get("task_id") # Add smart pagination and usage tips if truncated: remaining_count = len(tasks_to_display) - tasks_included response_parts.append( f"--- Response truncated to stay under {max_tokens} tokens ---" ) response_parts.append( f"Showing {tasks_included} of {len(tasks_to_display)} tasks ({remaining_count} remaining)" ) response_parts.append( f"Continue: view_tasks(start_after='{last_task_id}', max_tokens={max_tokens})" ) if not summary_mode: response_parts.append(f"Overview: view_tasks(summary_mode=true)") else: response_parts.append(f"--- All {tasks_included} matching tasks shown ---") # Add smart usage tips response_parts.append("\n💡 Smart Tips:") if not show_dependencies: response_parts.append( "• Add show_dependencies=true to see dependency chains" ) if not show_health_analysis: response_parts.append("• Add show_health_analysis=true for health metrics") if not show_blocked_tasks: response_parts.append( "• Add show_blocked_tasks=true to see only blocked tasks" ) response_parts.append( "• Use sort_by=[priority|status|updated_at] for different sorting" ) response_text = "\n".join(response_parts) log_audit( requesting_agent_id, "view_tasks", {"filter_agent_id": filter_agent_id, "filter_status": filter_status}, ) return [mcp_types.TextContent(type="text", text=response_text)] def _format_task_summary(task: Dict[str, Any]) -> str: """Format task in summary mode (minimal tokens)""" task_id = task.get("task_id", "N/A") title = task.get("title", "N/A") status = task.get("status", "N/A") priority = task.get("priority", "medium") assigned_to = task.get("assigned_to", "Unassigned") # Truncate description description = task.get("description", "No description") if len(description) > 100: description = description[:100] + "..." return f"""ID: {task_id} Title: {title} Status: {status} | Priority: {priority} Assigned to: {assigned_to} Description: {description}""" def _format_task_detailed(task: Dict[str, Any]) -> str: """Format task in detailed mode (includes notes, full description)""" parts = [] parts.append(f"ID: {task.get('task_id', 'N/A')}") parts.append(f"Title: {task.get('title', 'N/A')}") parts.append(f"Description: {task.get('description', 'No description')}") parts.append(f"Status: {task.get('status', 'N/A')}") parts.append(f"Priority: {task.get('priority', 'medium')}") parts.append(f"Assigned to: {task.get('assigned_to', 'None')}") parts.append(f"Created by: {task.get('created_by', 'N/A')}") parts.append(f"Created: {task.get('created_at', 'N/A')}") parts.append(f"Updated: {task.get('updated_at', 'N/A')}") if task.get("parent_task"): parts.append(f"Parent task: {task['parent_task']}") child_tasks_val = task.get("child_tasks", []) if isinstance(child_tasks_val, str): try: child_tasks_val = json.loads(child_tasks_val or "[]") except: child_tasks_val = ["Error decoding child_tasks"] if child_tasks_val: parts.append(f"Child tasks: {', '.join(child_tasks_val)}") notes_val = task.get("notes", []) if isinstance(notes_val, str): try: notes_val = json.loads(notes_val or "[]") except: notes_val = [{"author": "System", "content": "Error decoding notes"}] if notes_val: parts.append("Notes:") # Limit notes to prevent token explosion recent_notes = notes_val[-5:] if len(notes_val) > 5 else notes_val for note in recent_notes: if isinstance(note, dict): ts = note.get("timestamp", "Unknown time") auth = note.get("author", "Unknown") cont = note.get("content", "No content") parts.append(f" - [{ts}] {auth}: {cont}") else: parts.append(f" - [Invalid Note Format: {str(note)}]") if len(notes_val) > 5: parts.append(f" ... and {len(notes_val) - 5} more notes") return "\n".join(parts) def _format_task_with_dependencies(task: Dict[str, Any]) -> str: """Format task with dependency analysis information""" # Start with detailed format task_text = _format_task_detailed(task) # Add dependency analysis dep_analysis = task.get("_dependency_analysis", {}) if dep_analysis: dep_parts = ["\n🔗 Dependency Analysis:"] # Health status health = dep_analysis.get("dependency_health", "unknown") health_icon = ( "🟢" if health == "healthy" else "🟡" if health == "waiting" else "🟠" if health == "warning" else "🔴" ) dep_parts.append(f" Status: {health_icon} {health}") # Blocking info if dep_analysis.get("is_blocked"): dep_parts.append(" ⚠️ BLOCKED - Cannot proceed") elif not dep_analysis.get("can_start"): dep_parts.append(" ⏳ WAITING - Dependencies not ready") else: dep_parts.append(" ✅ READY - Can proceed") # Dependencies details completed_deps = dep_analysis.get("completed_dependencies", []) blocking_deps = dep_analysis.get("blocking_dependencies", []) missing_deps = dep_analysis.get("missing_dependencies", []) if completed_deps: dep_parts.append( f" ✅ Completed: {', '.join(completed_deps[:3])}" + ( f" (+{len(completed_deps)-3} more)" if len(completed_deps) > 3 else "" ) ) if blocking_deps: dep_parts.append( f" 🔴 Blocking: {', '.join(blocking_deps[:3])}" + (f" (+{len(blocking_deps)-3} more)" if len(blocking_deps) > 3 else "") ) if missing_deps: dep_parts.append( f" ❌ Missing: {', '.join(missing_deps[:3])}" + (f" (+{len(missing_deps)-3} more)" if len(missing_deps) > 3 else "") ) # What this task blocks blocks_tasks = dep_analysis.get("blocks_tasks", []) if blocks_tasks: dep_parts.append( f" 🔒 Blocks: {', '.join(blocks_tasks[:3])}" + (f" (+{len(blocks_tasks)-3} more)" if len(blocks_tasks) > 3 else "") ) task_text += "\n".join(dep_parts) return task_text def _analyze_agent_workload(cursor, agent_id: str) -> Dict[str, Any]: """Analyze agent's current workload and capacity""" # Get agent's current tasks cursor.execute( """ SELECT task_id, title, status, priority, created_at, updated_at FROM tasks WHERE assigned_to = ? AND status IN ('pending', 'in_progress') ORDER BY priority DESC, created_at ASC """, (agent_id,), ) active_tasks = [dict(row) for row in cursor.fetchall()] # Calculate workload metrics total_tasks = len(active_tasks) high_priority_tasks = len([t for t in active_tasks if t.get("priority") == "high"]) in_progress_tasks = len( [t for t in active_tasks if t.get("status") == "in_progress"] ) pending_tasks = len([t for t in active_tasks if t.get("status") == "pending"]) # Calculate "staleness" - tasks not updated recently current_time = datetime.datetime.now() stale_tasks = 0 for task in active_tasks: if task.get("updated_at"): try: updated_time = datetime.datetime.fromisoformat( task["updated_at"].replace("Z", "+00:00").replace("+00:00", "") ) days_stale = (current_time - updated_time).days if days_stale > 3: # No update in 3+ days stale_tasks += 1 except: pass # Simple capacity assessment capacity_status = "available" if total_tasks >= 8: capacity_status = "overloaded" elif total_tasks >= 5: capacity_status = "busy" elif high_priority_tasks >= 3: capacity_status = "busy" return { "agent_id": agent_id, "total_active_tasks": total_tasks, "high_priority_tasks": high_priority_tasks, "in_progress_tasks": in_progress_tasks, "pending_tasks": pending_tasks, "stale_tasks": stale_tasks, "capacity_status": capacity_status, "workload_score": min( 100, total_tasks * 10 + high_priority_tasks * 5 ), # 0-100+ "can_take_new_task": capacity_status in ["available", "busy"] and high_priority_tasks < 4, "recommendations": _generate_workload_recommendations( capacity_status, total_tasks, stale_tasks ), } def _generate_workload_recommendations( capacity_status: str, total_tasks: int, stale_tasks: int ) -> List[str]: """Generate workload management recommendations""" recommendations = [] if capacity_status == "overloaded": recommendations.append("Consider redistributing some tasks to other agents") recommendations.append("Focus on completing high-priority tasks first") if stale_tasks > 0: recommendations.append( f"Review {stale_tasks} stale tasks that haven't been updated recently" ) if total_tasks > 6: recommendations.append( "Consider breaking down large tasks into smaller subtasks" ) if not recommendations: recommendations.append("Workload appears manageable") return recommendations def _suggest_optimal_parent_task( cursor, agent_id: str, task_description: str ) -> Dict[str, Any]: """Suggest optimal parent task based on context and agent workload""" # Get agent's current tasks that could be parents cursor.execute( """ SELECT task_id, title, description, status, priority FROM tasks WHERE assigned_to = ? AND status IN ('pending', 'in_progress') ORDER BY CASE WHEN status = 'in_progress' THEN 1 ELSE 2 END, CASE priority WHEN 'high' THEN 3 WHEN 'medium' THEN 2 ELSE 1 END DESC, updated_at DESC LIMIT 10 """, (agent_id,), ) agent_tasks = [dict(row) for row in cursor.fetchall()] # Simple text similarity scoring (could be enhanced with embeddings) def similarity_score(text1: str, text2: str) -> float: text1_words = set(text1.lower().split()) text2_words = set(text2.lower().split()) if not text1_words or not text2_words: return 0.0 intersection = text1_words.intersection(text2_words) union = text1_words.union(text2_words) return len(intersection) / len(union) if union else 0.0 suggestions = [] for task in agent_tasks: # Score based on title and description similarity title_sim = similarity_score(task_description, task.get("title", "")) desc_sim = similarity_score(task_description, task.get("description", "")) combined_score = (title_sim * 0.6) + (desc_sim * 0.4) # Boost score for in-progress tasks (more likely to be good parents) if task.get("status") == "in_progress": combined_score *= 1.2 if combined_score > 0.1: # Only suggest if there's some relevance suggestions.append( { "task_id": task["task_id"], "title": task["title"], "status": task["status"], "priority": task["priority"], "similarity_score": round(combined_score, 3), "reason": f"Similar content ({int(combined_score*100)}% match)", } ) # Sort by score and take top 3 suggestions.sort(key=lambda x: x["similarity_score"], reverse=True) return { "agent_id": agent_id, "suggestions": suggestions[:3], "has_suggestions": len(suggestions) > 0, } # --- request_assistance tool --- # Original logic from main.py: lines 1658-1763 (request_assistance_tool function) # This tool had file-based notification system. We'll replicate that for 1-to-1. async def request_assistance_tool_impl( arguments: Dict[str, Any], ) -> List[mcp_types.TextContent]: agent_auth_token = arguments.get("token") parent_task_id = arguments.get("task_id") # Task ID needing assistance assistance_description = arguments.get("description") requesting_agent_id = get_agent_id(agent_auth_token) # main.py:1666 if not requesting_agent_id: return [ mcp_types.TextContent( type="text", text="Unauthorized: Valid token required" ) ] if not parent_task_id or not assistance_description: return [ mcp_types.TextContent( type="text", text="Error: task_id (for parent) and description are required.", ) ] # Fetch parent task data (original used in-memory g.tasks, main.py:1674) # For robustness, let's fetch from DB, then update g.tasks. conn = None try: conn = get_db_connection() cursor = conn.cursor() cursor.execute("SELECT * FROM tasks WHERE task_id = ?", (parent_task_id,)) parent_task_db_row = cursor.fetchone() if not parent_task_db_row: return [ mcp_types.TextContent( type="text", text=f"Parent task '{parent_task_id}' not found." ) ] parent_task_current_data = dict(parent_task_db_row) # Verify ownership or admin (main.py:1688-1691) is_admin_request = verify_token(agent_auth_token, "admin") if ( parent_task_current_data.get("assigned_to") != requesting_agent_id and not is_admin_request ): return [ mcp_types.TextContent( type="text", text="Unauthorized: You can only request assistance for tasks assigned to you, or use an admin token.", ) ] # Create child assistance task (main.py:1694-1696) child_task_id = _generate_task_id() child_task_title = f"Assistance for {parent_task_id}: {parent_task_current_data.get('title', 'Untitled Task')}" timestamp_iso = datetime.datetime.now().isoformat() # Create notification for admin (main.py:1699-1710) # This part used file system notifications. notification_id = _generate_notification_id() notification_data = { "id": notification_id, "type": "assistance_request", "source_agent_id": requesting_agent_id, "task_id": parent_task_id, # Parent task "child_task_id": child_task_id, # The new assistance task "timestamp": timestamp_iso, "description": assistance_description, "status": "pending", # Notification status } # Save notification file (main.py:1713-1718) project_dir_env = os.environ.get("MCP_PROJECT_DIR") if not project_dir_env: logger.error( "MCP_PROJECT_DIR not set. Cannot save assistance notification file." ) # Decide if this is critical enough to stop. Original didn't explicitly stop. else: try: notifications_pending_dir = ( Path(project_dir_env) / ".agent" / "notifications" / "pending" ) notifications_pending_dir.mkdir(parents=True, exist_ok=True) notification_file_path = ( notifications_pending_dir / f"{notification_id}.json" ) with open(notification_file_path, "w", encoding="utf-8") as f: json.dump(notification_data, f, indent=2) logger.info( f"Assistance request notification saved to {notification_file_path}" ) except Exception as e_notify: logger.error( f"Failed to save assistance notification file: {e_notify}", exc_info=True, ) # Insert the child (assistance) task into DB (main.py:1722-1734) child_task_db_data = { "task_id": child_task_id, "title": child_task_title, "description": assistance_description, "status": "pending", "assigned_to": None, "priority": "high", # Assistance tasks are high priority "created_at": timestamp_iso, "updated_at": timestamp_iso, "parent_task": parent_task_id, "depends_on_tasks": json.dumps([]), "created_by": requesting_agent_id, # The agent who requested assistance "child_tasks": json.dumps([]), "notes": json.dumps([]), } cursor.execute( """ INSERT INTO tasks (task_id, title, description, status, assigned_to, priority, created_at, updated_at, parent_task, depends_on_tasks, created_by, child_tasks, notes) VALUES (:task_id, :title, :description, :status, :assigned_to, :priority, :created_at, :updated_at, :parent_task, :depends_on_tasks, :created_by, :child_tasks, :notes) """, child_task_db_data, ) # Update parent task's child_tasks field and notes (main.py:1737-1764) parent_child_tasks_list = json.loads( parent_task_current_data.get("child_tasks") or "[]" ) parent_child_tasks_list.append(child_task_id) parent_notes_list = json.loads(parent_task_current_data.get("notes") or "[]") parent_notes_list.append( { "timestamp": timestamp_iso, "author": requesting_agent_id, "content": f"Requested assistance: {assistance_description}. Assistance task created: {child_task_id}", } ) cursor.execute( "UPDATE tasks SET child_tasks = ?, notes = ?, updated_at = ? WHERE task_id = ?", ( json.dumps(parent_child_tasks_list), json.dumps(parent_notes_list), timestamp_iso, parent_task_id, ), ) log_agent_action_to_db( cursor, requesting_agent_id, "request_assistance", task_id=parent_task_id, details={ "description": assistance_description, "child_task_id": child_task_id, }, ) conn.commit() # Update in-memory caches (g.tasks) # Parent task if parent_task_id in g.tasks: g.tasks[parent_task_id]["child_tasks"] = parent_child_tasks_list g.tasks[parent_task_id]["notes"] = parent_notes_list g.tasks[parent_task_id]["updated_at"] = timestamp_iso # New child task child_task_mem_data = child_task_db_data.copy() child_task_mem_data["depends_on_tasks"] = [] # from json.dumps([]) child_task_mem_data["child_tasks"] = [] child_task_mem_data["notes"] = [] g.tasks[child_task_id] = child_task_mem_data # Send direct message to admin via new communication system try: admin_message = ( f"🚨 Assistance Request from {requesting_agent_id}\n\n" f"Task: {parent_task_id} - {parent_task_current_data.get('title', 'Untitled Task')}\n" f"Description: {assistance_description}\n\n" f"Child assistance task created: {child_task_id}\n" f"Notification ID: {notification_id}" ) # Send message to admin using the new communication system message_result = await send_agent_message_tool_impl( { "token": agent_auth_token, "recipient_id": "admin", "message": admin_message, "message_type": "assistance_request", "priority": "high", "deliver_method": "both", } ) logger.info(f"Assistance request message sent to admin: {message_result}") except Exception as e_msg: logger.error( f"Failed to send assistance request message to admin: {e_msg}", exc_info=True, ) # Don't fail the entire operation if messaging fails # Original code also wrote parent and child task JSON files (main.py:1766-1771) # This was part of an older file-based task system. We are now DB-centric. # For 1-to-1, if those files are still used by something, they'd need to be written. # However, the primary task store is now the DB. # We will skip writing these individual task JSON files as they are redundant with the DB. # If get_task_file_path was used elsewhere, that system needs re-evaluation. log_audit( requesting_agent_id, "request_assistance", { "parent_task_id": parent_task_id, "child_task_id": child_task_id, "description": assistance_description, }, ) logger.info( f"Agent '{requesting_agent_id}' requested assistance for task '{parent_task_id}'. Child task '{child_task_id}' created." ) return [ mcp_types.TextContent( type="text", text=f"Assistance requested for task {parent_task_id}. Child assistance task {child_task_id} created. Admin notified via file notification and direct message.", ) ] except sqlite3.Error as e_sql: if conn: conn.rollback() logger.error( f"Database error requesting assistance for task {parent_task_id}: {e_sql}", exc_info=True, ) return [ mcp_types.TextContent( type="text", text=f"Database error requesting assistance: {e_sql}" ) ] except Exception as e: if conn: conn.rollback() logger.error( f"Unexpected error requesting assistance for task {parent_task_id}: {e}", exc_info=True, ) return [ mcp_types.TextContent( type="text", text=f"Unexpected error requesting assistance: {e}" ) ] finally: if conn: conn.close() # --- bulk_task_operations tool --- async def bulk_task_operations_tool_impl( arguments: Dict[str, Any], ) -> List[mcp_types.TextContent]: agent_auth_token = arguments.get("token") operations = arguments.get("operations", []) # List of operation objects requesting_agent_id = get_agent_id(agent_auth_token) if not requesting_agent_id: return [ mcp_types.TextContent( type="text", text="Unauthorized: Valid token required" ) ] if not operations or not isinstance(operations, list): return [ mcp_types.TextContent( type="text", text="Error: operations list is required and must be a non-empty array", ) ] is_admin_request = verify_token(agent_auth_token, "admin") # Process operations in a single transaction conn = None try: conn = get_db_connection() cursor = conn.cursor() results = [] updated_at_iso = datetime.datetime.now().isoformat() for i, op in enumerate(operations): if not isinstance(op, dict): results.append( f"Operation {i+1}: Invalid operation format (must be object)" ) continue operation_type = op.get("type") task_id = op.get("task_id") if not task_id or not operation_type: results.append( f"Operation {i+1}: Missing required fields 'type' and 'task_id'" ) continue # Verify task exists and permissions cursor.execute("SELECT * FROM tasks WHERE task_id = ?", (task_id,)) task_row = cursor.fetchone() if not task_row: results.append(f"Operation {i+1}: Task '{task_id}' not found") continue task_data = dict(task_row) # Permission check if ( task_data.get("assigned_to") != requesting_agent_id and not is_admin_request ): results.append( f"Operation {i+1}: Unauthorized - can only modify own tasks" ) continue try: if operation_type == "update_status": new_status = op.get("status") notes_content = op.get("notes") if not new_status: results.append( f"Operation {i+1}: Missing 'status' for update_status operation" ) continue valid_statuses = [ "pending", "in_progress", "completed", "cancelled", "failed", ] if new_status not in valid_statuses: results.append( f"Operation {i+1}: Invalid status '{new_status}'" ) continue # Update status update_fields = ["status = ?", "updated_at = ?"] update_params = [new_status, updated_at_iso] # Handle notes current_notes = json.loads(task_data.get("notes") or "[]") if notes_content: current_notes.append( { "timestamp": updated_at_iso, "author": requesting_agent_id, "content": notes_content, } ) update_fields.append("notes = ?") update_params.append(json.dumps(current_notes)) update_params.append(task_id) # Validate field assignments for security allowed_bulk_fields = ["status = ?", "updated_at = ?", "notes = ?"] safe_fields = [ field for field in update_fields if field in allowed_bulk_fields ] if safe_fields: set_clause = ", ".join(safe_fields) bulk_update_sql = ( f"UPDATE tasks SET {set_clause} WHERE task_id = ?" ) cursor.execute(bulk_update_sql, tuple(update_params)) # Update in-memory cache if task_id in g.tasks: g.tasks[task_id]["status"] = new_status g.tasks[task_id]["updated_at"] = updated_at_iso g.tasks[task_id]["notes"] = current_notes results.append( f"Operation {i+1}: Task '{task_id}' status updated to '{new_status}'" ) elif operation_type == "update_priority": new_priority = op.get("priority") if not new_priority or new_priority not in [ "low", "medium", "high", ]: results.append( f"Operation {i+1}: Invalid priority '{new_priority}'" ) continue cursor.execute( "UPDATE tasks SET priority = ?, updated_at = ? WHERE task_id = ?", (new_priority, updated_at_iso, task_id), ) if task_id in g.tasks: g.tasks[task_id]["priority"] = new_priority g.tasks[task_id]["updated_at"] = updated_at_iso results.append( f"Operation {i+1}: Task '{task_id}' priority updated to '{new_priority}'" ) elif operation_type == "add_note": note_content = op.get("content") if not note_content: results.append( f"Operation {i+1}: Missing 'content' for add_note operation" ) continue current_notes = json.loads(task_data.get("notes") or "[]") current_notes.append( { "timestamp": updated_at_iso, "author": requesting_agent_id, "content": note_content, } ) cursor.execute( "UPDATE tasks SET notes = ?, updated_at = ? WHERE task_id = ?", (json.dumps(current_notes), updated_at_iso, task_id), ) if task_id in g.tasks: g.tasks[task_id]["notes"] = current_notes g.tasks[task_id]["updated_at"] = updated_at_iso results.append(f"Operation {i+1}: Note added to task '{task_id}'") elif operation_type == "reassign" and is_admin_request: new_assigned_to = op.get("assigned_to") if not new_assigned_to: results.append( f"Operation {i+1}: Missing 'assigned_to' for reassign operation" ) continue cursor.execute( "UPDATE tasks SET assigned_to = ?, updated_at = ? WHERE task_id = ?", (new_assigned_to, updated_at_iso, task_id), ) if task_id in g.tasks: g.tasks[task_id]["assigned_to"] = new_assigned_to g.tasks[task_id]["updated_at"] = updated_at_iso results.append( f"Operation {i+1}: Task '{task_id}' reassigned to '{new_assigned_to}'" ) else: if operation_type == "reassign" and not is_admin_request: results.append( f"Operation {i+1}: Reassign operation requires admin privileges" ) else: results.append( f"Operation {i+1}: Unknown operation type '{operation_type}'" ) except Exception as e: results.append(f"Operation {i+1}: Error processing - {str(e)}") logger.error(f"Error in bulk operation {i+1}: {e}", exc_info=True) # Log the bulk operation log_agent_action_to_db( cursor, requesting_agent_id, "bulk_task_operations", details={ "operations_count": len(operations), "success_count": len([r for r in results if "Error" not in r]), }, ) conn.commit() response_text = ( f"Bulk Task Operations Results ({len(operations)} operations):\n\n" + "\n".join(results) ) log_audit( requesting_agent_id, "bulk_task_operations", {"operations_count": len(operations)}, ) return [mcp_types.TextContent(type="text", text=response_text)] except sqlite3.Error as e_sql: if conn: conn.rollback() logger.error(f"Database error in bulk task operations: {e_sql}", exc_info=True) return [ mcp_types.TextContent( type="text", text=f"Database error in bulk operations: {e_sql}" ) ] except Exception as e: if conn: conn.rollback() logger.error(f"Unexpected error in bulk task operations: {e}", exc_info=True) return [ mcp_types.TextContent( type="text", text=f"Unexpected error in bulk operations: {e}" ) ] finally: if conn: conn.close() # --- search_tasks tool --- async def search_tasks_tool_impl( arguments: Dict[str, Any], ) -> List[mcp_types.TextContent]: agent_auth_token = arguments.get("token") search_query = arguments.get("search_query") status_filter = arguments.get("status_filter") max_results = arguments.get("max_results", 20) include_notes = arguments.get("include_notes", True) requesting_agent_id = get_agent_id(agent_auth_token) if not requesting_agent_id: return [ mcp_types.TextContent( type="text", text="Unauthorized: Valid token required" ) ] if not search_query or not search_query.strip(): return [ mcp_types.TextContent( type="text", text="Error: search_query is required and cannot be empty." ) ] is_admin_request = verify_token(agent_auth_token, "admin") # Prepare search terms search_terms = [ term.strip().lower() for term in search_query.split() if len(term.strip()) > 2 ] if not search_terms: return [ mcp_types.TextContent( type="text", text="Error: Search query must contain terms longer than 2 characters.", ) ] # Get tasks user can see candidate_tasks = [] for task_id, task_data in g.tasks.items(): # Permission check if not is_admin_request and task_data.get("assigned_to") != requesting_agent_id: continue # Status filter if status_filter and task_data.get("status") != status_filter: continue candidate_tasks.append(task_data) if not candidate_tasks: return [ mcp_types.TextContent( type="text", text="No tasks found matching the criteria." ) ] # Score tasks by relevance scored_results = [] for task in candidate_tasks: score = 0.0 matched_fields = [] # Search in title (highest weight) title = (task.get("title") or "").lower() title_matches = sum(1 for term in search_terms if term in title) if title_matches > 0: score += title_matches * 3.0 matched_fields.append(f"title ({title_matches} terms)") # Search in description (medium weight) description = (task.get("description") or "").lower() desc_matches = sum(1 for term in search_terms if term in description) if desc_matches > 0: score += desc_matches * 2.0 matched_fields.append(f"description ({desc_matches} terms)") # Search in notes (lower weight) if include_notes: notes = task.get("notes", []) if isinstance(notes, str): try: notes = json.loads(notes) except: notes = [] notes_content = " ".join( [note.get("content", "") for note in notes if isinstance(note, dict)] ).lower() notes_matches = sum(1 for term in search_terms if term in notes_content) if notes_matches > 0: score += notes_matches * 1.0 matched_fields.append(f"notes ({notes_matches} terms)") # Exact phrase bonus full_text = f"{title} {description}".lower() if search_query.lower() in full_text: score += 2.0 matched_fields.append("exact phrase") if score > 0: scored_results.append((task, score, matched_fields)) if not scored_results: return [ mcp_types.TextContent( type="text", text=f"No tasks found containing '{search_query}'." ) ] # Sort by relevance (score descending, then by updated_at descending) scored_results.sort(key=lambda x: (x[1], x[0].get("updated_at", "")), reverse=True) # Limit results scored_results = scored_results[:max_results] # Format response with token awareness response_parts = [ f"Search Results for '{search_query}' ({len(scored_results)} found):\n" ] current_tokens = len("\n".join(response_parts)) // 4 # Simple token estimation for i, (task, score, matched_fields) in enumerate(scored_results): if current_tokens >= 20000: # Leave room for truncation message remaining = len(scored_results) - i response_parts.append( f"\n⚠️ Response truncated - {remaining} more results available" ) response_parts.append( "Use max_results parameter or refine search to see more" ) break # Format task result task_text = f"\n{i+1}. **{task.get('title', 'Untitled')}** (ID: {task.get('task_id', 'N/A')})" task_text += f"\n Status: {task.get('status', 'N/A')} | Priority: {task.get('priority', 'medium')} | Assigned: {task.get('assigned_to', 'None')}" task_text += ( f"\n Relevance Score: {score:.1f} | Matched: {', '.join(matched_fields)}" ) # Add truncated description desc = task.get("description", "No description") if len(desc) > 200: desc = desc[:200] + "..." task_text += f"\n Description: {desc}" # Check token limit with safety buffer task_tokens = estimate_tokens(task_text) safety_buffer = 1000 if current_tokens + task_tokens <= (20000 - safety_buffer): response_parts.append(task_text) current_tokens += task_tokens else: remaining = len(scored_results) - i response_parts.append( f"\n⚠️ Response truncated - {remaining} more results available" ) break # Add usage tips response_parts.append(f"\n\n💡 Tips:") response_parts.append("• Use view_tasks(task_id='ID') for full task details") response_parts.append("• Add status_filter to narrow results") response_parts.append("• Use max_results to control response size") log_audit( requesting_agent_id, "search_tasks", {"query": search_query, "results": len(scored_results)}, ) return [mcp_types.TextContent(type="text", text="\n".join(response_parts))] # --- Register all task tools --- def register_task_tools(): register_tool( name="assign_task", description="Multi-mode task assignment tool. Mode 1: Create single task + assign agent. Mode 2: Create multiple tasks + assign agent. Mode 3: Assign agent to existing unassigned tasks. Includes workload analysis, intelligent parent suggestions, and coordination features.", input_schema={ "type": "object", "properties": { "token": { "type": "string", "description": "Admin authentication token", }, "agent_token": { "type": "string", "description": "Agent token to assign the task(s) to (optional - if not provided, creates unassigned tasks)", }, # Mode 1: Single task creation (existing behavior) "task_title": { "type": "string", "description": "Title of the task (Mode 1: single task creation)", }, "task_description": { "type": "string", "description": "Detailed description of the task (Mode 1: single task creation)", }, "priority": { "type": "string", "description": "Task priority (low, medium, high) - for single task mode", "enum": ["low", "medium", "high"], "default": "medium", }, "depends_on_tasks": { "type": "array", "description": "List of task IDs this task depends on (Mode 1 only)", "items": {"type": "string"}, }, "parent_task_id": { "type": "string", "description": "ID of the parent task (Mode 1 only)", }, # Mode 2: Multiple task creation "tasks": { "type": "array", "description": "Array of tasks to create and assign (Mode 2: multiple task creation)", "items": { "type": "object", "properties": { "title": {"type": "string", "description": "Task title"}, "description": { "type": "string", "description": "Task description", }, "priority": { "type": "string", "description": "Task priority", "enum": ["low", "medium", "high"], "default": "medium", }, "parent_task_id": { "type": "string", "description": "Parent task ID for this task", }, }, "required": ["title", "description"], "additionalProperties": False, }, }, # Mode 3: Existing task assignment "task_ids": { "type": "array", "description": "Array of existing task IDs to assign to agent (Mode 3: existing task assignment)", "items": {"type": "string"}, }, # Smart coordination features (apply to all modes) "auto_suggest_parent": { "type": "boolean", "description": "Use AI to suggest optimal parent task based on content similarity (default: true)", "default": True, }, "validate_agent_workload": { "type": "boolean", "description": "Analyze agent capacity and provide workload warnings (default: true)", "default": True, }, "auto_schedule": { "type": "boolean", "description": "Auto-schedule task based on dependencies and agent availability (default: false)", "default": False, }, "coordination_notes": { "type": "string", "description": "Optional coordination context for team awareness and handoffs", }, "estimated_hours": { "type": "number", "description": "Optional workload estimation in hours for capacity planning", }, # RAG validation options "override_rag": { "type": "boolean", "description": "Override RAG validation suggestions (optional, defaults to false - accepts suggestions)", "default": False, }, "override_reason": { "type": "string", "description": "Reason for overriding RAG validation (required if override_rag is true)", }, }, "required": ["token"], "additionalProperties": False, }, implementation=assign_task_tool_impl, ) register_tool( name="create_self_task", # main.py:1726 description="Agent tool to create a task for themselves. IMPORTANT: parent_task_id is REQUIRED - agents cannot create root tasks.", input_schema={ # From main.py:1727-1750 "type": "object", "properties": { "token": { "type": "string", "description": "Agent authentication token", }, "task_title": {"type": "string", "description": "Title of the task"}, "task_description": { "type": "string", "description": "Detailed description of the task", }, "priority": { "type": "string", "description": "Task priority (low, medium, high)", "enum": ["low", "medium", "high"], "default": "medium", }, "depends_on_tasks": { "type": "array", "description": "List of task IDs this task depends on (optional)", "items": {"type": "string"}, }, "parent_task_id": { "type": "string", "description": "ID of the parent task (defaults to agent's current task if not specified, but MUST have a parent)", }, }, "required": ["token", "task_title", "task_description"], "additionalProperties": False, }, implementation=create_self_task_tool_impl, ) register_tool( name="update_task_status", description="Smart task status update tool with bulk operations, dependency management, and cascade features. Supports single task or bulk updates with intelligent automation.", input_schema={ "type": "object", "properties": { "token": { "type": "string", "description": "Authentication token (agent or admin)", }, "task_id": { "type": "string", "description": "ID of the task to update (for single task operations)", }, "task_ids": { "type": "array", "description": "List of task IDs for bulk operations (alternative to task_id)", "items": {"type": "string"}, }, "status": { "type": "string", "description": "New status for the task(s)", "enum": [ "pending", "in_progress", "completed", "cancelled", "failed", ], }, "notes": { "type": "string", "description": "Optional notes about the status update to be appended.", }, # Admin Only Optional Fields "title": { "type": "string", "description": "(Admin Only) New title for the task", }, "description": { "type": "string", "description": "(Admin Only) New description for the task", }, "priority": { "type": "string", "description": "(Admin Only) New priority", "enum": ["low", "medium", "high"], }, "assigned_to": { "type": "string", "description": "(Admin Only) New agent ID to assign the task to", }, "depends_on_tasks": { "type": "array", "description": "(Admin Only) New list of task IDs this task depends on", "items": {"type": "string"}, }, # Smart Features "auto_update_dependencies": { "type": "boolean", "description": "Automatically advance dependent tasks when their dependencies are completed (default: true)", "default": True, }, "cascade_to_children": { "type": "boolean", "description": "Cascade status changes to child tasks (only for failed/cancelled states, default: false)", "default": False, }, "validate_dependencies": { "type": "boolean", "description": "Validate dependency constraints before updating (default: true)", "default": True, }, }, "required": ["token", "status"], "additionalProperties": False, }, implementation=update_task_status_tool_impl, ) register_tool( name="view_tasks", description="Smart task viewer with dependency analysis, health metrics, and advanced filtering. Provides comprehensive task insights with intelligent pagination.", input_schema={ "type": "object", "properties": { "token": {"type": "string", "description": "Authentication token"}, "agent_id": { "type": "string", "description": "Filter tasks by agent ID (optional). If non-admin, can only be self.", }, "status": { "type": "string", "description": "Filter tasks by status (optional)", "enum": [ "pending", "in_progress", "completed", "cancelled", "failed", ], }, "max_tokens": { "type": "integer", "description": "Maximum response tokens (default: 25000)", "minimum": 1000, "maximum": 25000, }, "start_after": { "type": "string", "description": "Task ID to start after (for pagination)", }, "summary_mode": { "type": "boolean", "description": "If true, show only summary info to fit more tasks (default: false)", }, # Smart filtering options "filter_priority": { "type": "string", "description": "Filter by priority level", "enum": ["low", "medium", "high"], }, "filter_parent_task": { "type": "string", "description": "Filter by parent task ID", }, "show_blocked_tasks": { "type": "boolean", "description": "Show only blocked/waiting tasks (default: false)", }, # Analysis and insights "show_dependencies": { "type": "boolean", "description": "Include dependency chain analysis for each task (default: false)", }, "show_health_analysis": { "type": "boolean", "description": "Include overall task health metrics and analysis (default: false)", }, # Sorting options "sort_by": { "type": "string", "description": "Sort tasks by specified field (default: created_at)", "enum": ["created_at", "updated_at", "priority", "status"], "default": "created_at", }, }, "required": ["token"], "additionalProperties": False, }, implementation=view_tasks_tool_impl, ) register_tool( name="search_tasks", description="Full-text search across task titles, descriptions, and notes. Critical for finding related work and avoiding duplication.", input_schema={ "type": "object", "properties": { "token": {"type": "string", "description": "Authentication token"}, "search_query": { "type": "string", "description": "Search terms to find in tasks", }, "status_filter": { "type": "string", "description": "Optional status filter", "enum": [ "pending", "in_progress", "completed", "cancelled", "failed", ], }, "max_results": { "type": "integer", "description": "Maximum results to return (default: 20)", "minimum": 1, "maximum": 100, }, "include_notes": { "type": "boolean", "description": "Include notes content in search (default: true)", }, }, "required": ["token", "search_query"], "additionalProperties": False, }, implementation=search_tasks_tool_impl, ) register_tool( name="request_assistance", # main.py:1808 description="Request assistance with a task. This creates a child task assigned to 'None' and notifies admin.", input_schema={ # From main.py:1809-1823 "type": "object", "properties": { "token": { "type": "string", "description": "Agent authentication token", }, "task_id": { "type": "string", "description": "ID of the task for which assistance is needed (parent task).", }, "description": { "type": "string", "description": "Description of the assistance required.", }, }, "required": ["token", "task_id", "description"], "additionalProperties": False, }, implementation=request_assistance_tool_impl, ) register_tool( name="bulk_task_operations", description="Perform multiple task operations in a single atomic transaction. Supports update_status, update_priority, add_note, and reassign (admin only) operations. Critical for efficient batch task management.", input_schema={ "type": "object", "properties": { "token": { "type": "string", "description": "Authentication token (agent or admin)", }, "operations": { "type": "array", "description": "List of operations to perform", "items": { "type": "object", "properties": { "type": { "type": "string", "description": "Operation type", "enum": [ "update_status", "update_priority", "add_note", "reassign", ], }, "task_id": { "type": "string", "description": "Task ID to operate on", }, "status": { "type": "string", "description": "New status for update_status operation", "enum": [ "pending", "in_progress", "completed", "cancelled", "failed", ], }, "priority": { "type": "string", "description": "New priority for update_priority operation", "enum": ["low", "medium", "high"], }, "content": { "type": "string", "description": "Note content for add_note operation", }, "notes": { "type": "string", "description": "Notes for update_status operation", }, "assigned_to": { "type": "string", "description": "New assignee for reassign operation (admin only)", }, }, "required": ["type", "task_id"], "additionalProperties": False, }, "minItems": 1, }, }, "required": ["token", "operations"], "additionalProperties": False, }, implementation=bulk_task_operations_tool_impl, ) register_tool( name="delete_task", description="Delete a task permanently with cascade handling for related tasks. Admin-only operation with comprehensive safety checks.", input_schema={ "type": "object", "properties": { "token": { "type": "string", "description": "Admin authentication token", }, "task_id": { "type": "string", "description": "ID of the task to delete", }, "force_delete": { "type": "boolean", "description": "Force deletion even if task has children or dependencies (default: false)", "default": False, }, }, "required": ["token", "task_id"], "additionalProperties": False, }, implementation=delete_task_tool_impl, ) async def delete_task_tool_impl( arguments: Dict[str, Any], ) -> List[mcp_types.TextContent]: """ Delete a task permanently with cascade handling for related tasks. Admin-only operation with comprehensive safety checks. """ admin_token = arguments.get("token") task_id = arguments.get("task_id") force_delete = arguments.get("force_delete", False) # Verify admin permissions if not verify_token(admin_token, "admin"): return [ mcp_types.TextContent( type="text", text="Unauthorized: Admin token required" ) ] if not task_id: return [mcp_types.TextContent(type="text", text="Error: task_id is required")] conn = None try: conn = get_db_connection() cursor = conn.cursor() # Check if task exists cursor.execute("SELECT * FROM tasks WHERE task_id = ?", (task_id,)) task_row = cursor.fetchone() if not task_row: return [ mcp_types.TextContent( type="text", text=f"Error: Task '{task_id}' not found" ) ] task_data = dict(task_row) # Parse relationships child_tasks = json.loads(task_data.get("child_tasks", "[]")) depends_on_tasks = json.loads(task_data.get("depends_on_tasks", "[]")) # Check for child tasks if child_tasks and not force_delete: return [ mcp_types.TextContent( type="text", text=f"Error: Task '{task_id}' has {len(child_tasks)} child tasks: {child_tasks}. Use force_delete=true to cascade delete.", ) ] # Check for tasks that depend on this one cursor.execute( "SELECT task_id, title FROM tasks WHERE json_extract(depends_on_tasks, '$') LIKE ?", (f'%"{task_id}"%',), ) dependent_tasks = cursor.fetchall() if dependent_tasks and not force_delete: dependent_list = [ f"{row['task_id']} ({row['title']})" for row in dependent_tasks ] return [ mcp_types.TextContent( type="text", text=f"Error: {len(dependent_tasks)} tasks depend on '{task_id}': {dependent_list}. Use force_delete=true to cascade delete.", ) ] # Begin cascade deletion operations cascade_operations = [] # Update parent task to remove this child if task_data.get("parent_task"): parent_id = task_data["parent_task"] cursor.execute( "SELECT child_tasks FROM tasks WHERE task_id = ?", (parent_id,) ) parent_row = cursor.fetchone() if parent_row: parent_children = json.loads(parent_row["child_tasks"] or "[]") if task_id in parent_children: parent_children.remove(task_id) cursor.execute( "UPDATE tasks SET child_tasks = ?, updated_at = ? WHERE task_id = ?", ( json.dumps(parent_children), datetime.datetime.now().isoformat(), parent_id, ), ) cascade_operations.append( f"Updated parent task '{parent_id}' to remove child reference" ) # Handle child tasks if child_tasks and force_delete: for child_id in child_tasks: cursor.execute("DELETE FROM tasks WHERE task_id = ?", (child_id,)) if cursor.rowcount > 0: cascade_operations.append(f"Deleted child task '{child_id}'") # Handle dependent tasks if dependent_tasks and force_delete: for dep_row in dependent_tasks: dep_id = dep_row["task_id"] cursor.execute( "SELECT depends_on_tasks FROM tasks WHERE task_id = ?", (dep_id,) ) dep_task_row = cursor.fetchone() if dep_task_row: dep_dependencies = json.loads( dep_task_row["depends_on_tasks"] or "[]" ) if task_id in dep_dependencies: dep_dependencies.remove(task_id) cursor.execute( "UPDATE tasks SET depends_on_tasks = ?, updated_at = ? WHERE task_id = ?", ( json.dumps(dep_dependencies), datetime.datetime.now().isoformat(), dep_id, ), ) cascade_operations.append( f"Updated task '{dep_id}' to remove dependency on '{task_id}'" ) # Delete the main task cursor.execute("DELETE FROM tasks WHERE task_id = ?", (task_id,)) if cursor.rowcount == 0: return [ mcp_types.TextContent( type="text", text=f"Error: Failed to delete task '{task_id}'" ) ] # Log the deletion action log_agent_action_to_db( cursor=cursor, agent_id="admin", action_type="deleted_task", task_id=task_id, details={ "task_title": task_data.get("title"), "force_delete": force_delete, "cascade_operations": cascade_operations, }, ) conn.commit() # Prepare response response_parts = [ f"Task '{task_id}' ({task_data.get('title', 'Untitled')}) deleted successfully." ] if cascade_operations: response_parts.append("\nCascade Operations:") for op in cascade_operations: response_parts.append(f" • {op}") response_parts.append( f"\nDeletion completed at: {datetime.datetime.now().isoformat()}" ) return [mcp_types.TextContent(type="text", text="\n".join(response_parts))] except Exception as e: if conn: conn.rollback() logger.error(f"Error in delete_task_tool_impl: {e}", exc_info=True) return [ mcp_types.TextContent(type="text", text=f"Error deleting task: {str(e)}") ] finally: if conn: conn.close() # Call registration when this module is imported register_task_tools()

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/rinadelph/Agent-MCP'

If you have feedback or need assistance with the MCP directory API, please join our Discord server