#!/usr/bin/env python3
"""
AAP Controller Workflow & Automation Management Tool
"""
from typing import Any, Dict, Optional, Union, List
from fastmcp import FastMCP
from pydantic import Field
from connectors.aap_connector import get_aap_connector
def register_workflow_tools(mcp: FastMCP):
"""Register workflow management tools with the MCP server"""
@mcp.tool()
def workflow_automation_management(
action: str = Field(description="Action: list_workflows, create_workflow, launch_workflow, list_workflow_nodes, create_workflow_node, link_workflow_nodes, unlink_workflow_nodes, delete_workflow_node, create_sequential_workflow, create_complete_workflow, list_schedules, create_schedule, update_schedule, list_notifications"),
workflow_id: Optional[Union[int, float]] = Field(None, description="Workflow job template ID"),
workflow_job_id: Optional[Union[int, float]] = Field(None, description="Workflow job ID"),
node_id: Optional[Union[int, float]] = Field(None, description="Source workflow job template node ID"),
target_node_id: Optional[Union[int, float]] = Field(None, description="Target workflow job template node ID to link"),
link_type: Optional[str] = Field(None, description="Link type: success, failure, always"),
schedule_id: Optional[Union[int, float]] = Field(None, description="Schedule ID"),
notification_id: Optional[Union[int, float]] = Field(None, description="Notification template ID"),
workflow_data: Optional[Dict[str, Any]] = Field(None, description="Workflow data"),
node_data: Optional[Dict[str, Any]] = Field(None, description="Workflow node data"),
job_template_ids: Optional[List[int]] = Field(None, description="List of job template IDs for sequential workflow"),
schedule_data: Optional[Dict[str, Any]] = Field(None, description="Schedule data"),
notification_data: Optional[Dict[str, Any]] = Field(None, description="Notification data"),
survey_data: Optional[Dict[str, Any]] = Field(None, description="Survey specification data for complete workflow creation"),
filters: Optional[Dict[str, Any]] = Field(None, description="Filters for listing")
) -> Dict[str, Any]:
"""
Workflow and automation management tool.
Handles workflow templates, nodes, schedules, and notifications.
"""
try:
# Workflow Operations
if action == "list_workflows":
params = filters or {}
return get_aap_connector().get("workflow_job_templates/", params)
elif action == "create_workflow":
if not workflow_data:
return {"error": "workflow_data is required"}
return get_aap_connector().post("workflow_job_templates/", workflow_data)
elif action == "launch_workflow":
if not workflow_id:
return {"error": "workflow_id is required"}
data = workflow_data or {}
return get_aap_connector().post(f"workflow_job_templates/{workflow_id}/launch/", data)
# Workflow Node Operations
elif action == "list_workflow_nodes":
if not workflow_id:
return {"error": "workflow_id is required"}
params = filters or {}
return get_aap_connector().get(f"workflow_job_templates/{workflow_id}/workflow_nodes/", params)
elif action == "create_workflow_node":
if not workflow_id or not node_data:
return {"error": "workflow_id and node_data are required"}
return get_aap_connector().post(f"workflow_job_templates/{workflow_id}/workflow_nodes/", node_data)
elif action == "delete_workflow_node":
if not node_id:
return {"error": "node_id is required"}
return get_aap_connector().delete(f"workflow_job_template_nodes/{node_id}/")
# Workflow Node Linking Operations
elif action == "link_workflow_nodes":
if not node_id or not target_node_id or not link_type:
return {"error": "node_id, target_node_id, and link_type are required"}
# Validate link_type
if link_type not in ["success", "failure", "always"]:
return {"error": "link_type must be one of: success, failure, always"}
# Map to the correct endpoint
endpoint_map = {
"success": f"workflow_job_template_nodes/{node_id}/success_nodes/",
"failure": f"workflow_job_template_nodes/{node_id}/failure_nodes/",
"always": f"workflow_job_template_nodes/{node_id}/always_nodes/"
}
endpoint = endpoint_map[link_type]
# Try multiple methods to ensure reliable linking
try:
# Method 1: Simple ID
link_data = {"id": target_node_id}
result = get_aap_connector().post(endpoint, link_data)
if "error" not in result:
return result
except Exception:
pass
try:
# Method 2: With associate flag
link_data = {"id": target_node_id, "associate": True}
result = get_aap_connector().post(endpoint, link_data)
if "error" not in result:
return result
except Exception:
pass
try:
# Method 3: Direct association endpoint
result = get_aap_connector().post(f"{endpoint}{target_node_id}/")
if "error" not in result:
return result
except Exception:
pass
return {"error": f"Failed to link nodes {node_id} -> {target_node_id} with all methods"}
elif action == "unlink_workflow_nodes":
if not node_id or not target_node_id or not link_type:
return {"error": "node_id, target_node_id, and link_type are required"}
# Validate link_type
if link_type not in ["success", "failure", "always"]:
return {"error": "link_type must be one of: success, failure, always"}
# Map to the correct endpoint for unlinking (DELETE to association)
endpoint_map = {
"success": f"workflow_job_template_nodes/{node_id}/success_nodes/",
"failure": f"workflow_job_template_nodes/{node_id}/failure_nodes/",
"always": f"workflow_job_template_nodes/{node_id}/always_nodes/"
}
endpoint = endpoint_map[link_type]
try:
# Method 1: POST with disassociate
unlink_data = {"id": target_node_id, "disassociate": True}
result = get_aap_connector().post(endpoint, unlink_data)
if "error" not in result:
return result
except Exception:
pass
try:
# Method 2: DELETE to specific endpoint
result = get_aap_connector().delete(f"{endpoint}{target_node_id}/")
return result
except Exception as e:
return {"error": f"Failed to unlink nodes: {str(e)}"}
# High-level Workflow Creation
elif action == "create_sequential_workflow":
if not workflow_id or not job_template_ids:
return {"error": "workflow_id and job_template_ids are required"}
if len(job_template_ids) < 2:
return {"error": "At least 2 job templates required for sequential workflow"}
results = {
"workflow_id": workflow_id,
"created_nodes": [],
"created_links": [],
"errors": []
}
# Create nodes for each job template
node_ids = []
for i, job_template_id in enumerate(job_template_ids):
try:
node_data_seq = {
"unified_job_template": job_template_id,
"identifier": f"node_{i+1}_{job_template_id}"
}
node_result = get_aap_connector().post(f"workflow_job_templates/{workflow_id}/workflow_nodes/", node_data_seq)
if "id" in node_result:
node_ids.append(node_result["id"])
results["created_nodes"].append({
"node_id": node_result["id"],
"job_template_id": job_template_id,
"position": i + 1
})
else:
results["errors"].append(f"Failed to create node for job template {job_template_id}: {node_result}")
except Exception as e:
results["errors"].append(f"Error creating node for job template {job_template_id}: {str(e)}")
# Link nodes in sequence (success chain) using multiple attempts
for i in range(len(node_ids) - 1):
source_node_id = node_ids[i]
target_node_id = node_ids[i + 1]
linked = False
# Try multiple linking methods
for method_name, link_data in [
("simple", {"id": target_node_id}),
("associate", {"id": target_node_id, "associate": True})
]:
try:
link_result = get_aap_connector().post(f"workflow_job_template_nodes/{source_node_id}/success_nodes/", link_data)
if "error" not in str(link_result).lower():
results["created_links"].append({
"source_node": source_node_id,
"target_node": target_node_id,
"link_type": "success",
"method": method_name,
"result": link_result
})
linked = True
break
except Exception as e:
continue
if not linked:
results["errors"].append(f"Failed to link node {source_node_id} to {target_node_id} with all methods")
return results
# Complete Workflow Creation (All-in-One)
elif action == "create_complete_workflow":
"""Create a complete workflow with nodes, links, and survey in one operation"""
if not workflow_data:
return {"error": "workflow_data is required"}
if not job_template_ids or len(job_template_ids) < 1:
return {"error": "At least 1 job template ID required"}
results = {
"workflow_created": False,
"workflow_id": None,
"nodes_created": [],
"links_created": [],
"survey_created": False,
"errors": []
}
try:
# Step 1: Create the workflow template
workflow_result = get_aap_connector().post("workflow_job_templates/", workflow_data)
if "id" not in workflow_result:
return {"error": f"Failed to create workflow template: {workflow_result}"}
workflow_id = workflow_result["id"]
results["workflow_created"] = True
results["workflow_id"] = workflow_id
# Step 2: Create nodes for each job template
node_ids = []
for i, job_template_id in enumerate(job_template_ids):
try:
node_data_complete = {
"unified_job_template": job_template_id,
"identifier": f"node_{i+1}_{job_template_id}"
}
node_result = get_aap_connector().post(f"workflow_job_templates/{workflow_id}/workflow_nodes/", node_data_complete)
if "id" in node_result:
node_ids.append(node_result["id"])
results["nodes_created"].append({
"node_id": node_result["id"],
"job_template_id": job_template_id,
"position": i + 1,
"identifier": f"node_{i+1}_{job_template_id}"
})
else:
results["errors"].append(f"Failed to create node for job template {job_template_id}: {node_result}")
except Exception as e:
results["errors"].append(f"Error creating node for job template {job_template_id}: {str(e)}")
# Step 3: Link nodes in sequence if more than one node
if len(node_ids) > 1:
for i in range(len(node_ids) - 1):
source_node_id = node_ids[i]
target_node_id = node_ids[i + 1]
linked = False
# Try multiple linking methods
for method_name, link_data in [
("simple", {"id": target_node_id}),
("associate", {"id": target_node_id, "associate": True})
]:
try:
link_result = get_aap_connector().post(f"workflow_job_template_nodes/{source_node_id}/success_nodes/", link_data)
# Check if linking was successful
if "id" in str(link_result) or "message" in str(link_result) or link_result == {}:
results["links_created"].append({
"source_node": source_node_id,
"target_node": target_node_id,
"link_type": "success",
"method": method_name,
"result": link_result
})
linked = True
break
except Exception as e:
continue
if not linked:
results["errors"].append(f"Failed to link node {source_node_id} to {target_node_id}")
# Step 4: Create survey if provided
if survey_data:
try:
# Enable survey on the workflow template
get_aap_connector().patch(f"workflow_job_templates/{workflow_id}/", {"survey_enabled": True})
# Create the survey
survey_result = get_aap_connector().post(f"workflow_job_templates/{workflow_id}/survey_spec/", survey_data)
if "error" not in str(survey_result).lower():
results["survey_created"] = True
else:
results["errors"].append(f"Failed to create survey: {survey_result}")
except Exception as e:
results["errors"].append(f"Error creating survey: {str(e)}")
return results
except Exception as e:
results["errors"].append(f"Critical error in workflow creation: {str(e)}")
return results
# Schedule Operations
elif action == "list_schedules":
params = filters or {}
return get_aap_connector().get("schedules/", params)
elif action == "create_schedule":
if not schedule_data:
return {"error": "schedule_data is required"}
return get_aap_connector().post("schedules/", schedule_data)
elif action == "update_schedule":
if not schedule_id or not schedule_data:
return {"error": "schedule_id and schedule_data are required"}
return get_aap_connector().patch(f"schedules/{schedule_id}/", schedule_data)
# Notification Operations
elif action == "list_notifications":
params = filters or {}
return get_aap_connector().get("notification_templates/", params)
else:
return {"error": f"Unknown action: {action}"}
except Exception as e:
return {"error": f"Workflow/Automation management failed: {str(e)}"}