Skip to main content
Glama
controller_workflow_management.py18.8 kB
#!/usr/bin/env python3 """ AAP Controller Workflow & Automation Management Tool """ from typing import Any, Dict, Optional, Union, List from fastmcp import FastMCP from pydantic import Field from connectors.aap_connector import get_aap_connector def register_workflow_tools(mcp: FastMCP): """Register workflow management tools with the MCP server""" @mcp.tool() def workflow_automation_management( action: str = Field(description="Action: list_workflows, create_workflow, launch_workflow, list_workflow_nodes, create_workflow_node, link_workflow_nodes, unlink_workflow_nodes, delete_workflow_node, create_sequential_workflow, create_complete_workflow, list_schedules, create_schedule, update_schedule, list_notifications"), workflow_id: Optional[Union[int, float]] = Field(None, description="Workflow job template ID"), workflow_job_id: Optional[Union[int, float]] = Field(None, description="Workflow job ID"), node_id: Optional[Union[int, float]] = Field(None, description="Source workflow job template node ID"), target_node_id: Optional[Union[int, float]] = Field(None, description="Target workflow job template node ID to link"), link_type: Optional[str] = Field(None, description="Link type: success, failure, always"), schedule_id: Optional[Union[int, float]] = Field(None, description="Schedule ID"), notification_id: Optional[Union[int, float]] = Field(None, description="Notification template ID"), workflow_data: Optional[Dict[str, Any]] = Field(None, description="Workflow data"), node_data: Optional[Dict[str, Any]] = Field(None, description="Workflow node data"), job_template_ids: Optional[List[int]] = Field(None, description="List of job template IDs for sequential workflow"), schedule_data: Optional[Dict[str, Any]] = Field(None, description="Schedule data"), notification_data: Optional[Dict[str, Any]] = Field(None, description="Notification data"), survey_data: Optional[Dict[str, Any]] = Field(None, description="Survey specification data for complete workflow creation"), filters: Optional[Dict[str, Any]] = Field(None, description="Filters for listing") ) -> Dict[str, Any]: """ Workflow and automation management tool. Handles workflow templates, nodes, schedules, and notifications. """ try: # Workflow Operations if action == "list_workflows": params = filters or {} return get_aap_connector().get("workflow_job_templates/", params) elif action == "create_workflow": if not workflow_data: return {"error": "workflow_data is required"} return get_aap_connector().post("workflow_job_templates/", workflow_data) elif action == "launch_workflow": if not workflow_id: return {"error": "workflow_id is required"} data = workflow_data or {} return get_aap_connector().post(f"workflow_job_templates/{workflow_id}/launch/", data) # Workflow Node Operations elif action == "list_workflow_nodes": if not workflow_id: return {"error": "workflow_id is required"} params = filters or {} return get_aap_connector().get(f"workflow_job_templates/{workflow_id}/workflow_nodes/", params) elif action == "create_workflow_node": if not workflow_id or not node_data: return {"error": "workflow_id and node_data are required"} return get_aap_connector().post(f"workflow_job_templates/{workflow_id}/workflow_nodes/", node_data) elif action == "delete_workflow_node": if not node_id: return {"error": "node_id is required"} return get_aap_connector().delete(f"workflow_job_template_nodes/{node_id}/") # Workflow Node Linking Operations elif action == "link_workflow_nodes": if not node_id or not target_node_id or not link_type: return {"error": "node_id, target_node_id, and link_type are required"} # Validate link_type if link_type not in ["success", "failure", "always"]: return {"error": "link_type must be one of: success, failure, always"} # Map to the correct endpoint endpoint_map = { "success": f"workflow_job_template_nodes/{node_id}/success_nodes/", "failure": f"workflow_job_template_nodes/{node_id}/failure_nodes/", "always": f"workflow_job_template_nodes/{node_id}/always_nodes/" } endpoint = endpoint_map[link_type] # Try multiple methods to ensure reliable linking try: # Method 1: Simple ID link_data = {"id": target_node_id} result = get_aap_connector().post(endpoint, link_data) if "error" not in result: return result except Exception: pass try: # Method 2: With associate flag link_data = {"id": target_node_id, "associate": True} result = get_aap_connector().post(endpoint, link_data) if "error" not in result: return result except Exception: pass try: # Method 3: Direct association endpoint result = get_aap_connector().post(f"{endpoint}{target_node_id}/") if "error" not in result: return result except Exception: pass return {"error": f"Failed to link nodes {node_id} -> {target_node_id} with all methods"} elif action == "unlink_workflow_nodes": if not node_id or not target_node_id or not link_type: return {"error": "node_id, target_node_id, and link_type are required"} # Validate link_type if link_type not in ["success", "failure", "always"]: return {"error": "link_type must be one of: success, failure, always"} # Map to the correct endpoint for unlinking (DELETE to association) endpoint_map = { "success": f"workflow_job_template_nodes/{node_id}/success_nodes/", "failure": f"workflow_job_template_nodes/{node_id}/failure_nodes/", "always": f"workflow_job_template_nodes/{node_id}/always_nodes/" } endpoint = endpoint_map[link_type] try: # Method 1: POST with disassociate unlink_data = {"id": target_node_id, "disassociate": True} result = get_aap_connector().post(endpoint, unlink_data) if "error" not in result: return result except Exception: pass try: # Method 2: DELETE to specific endpoint result = get_aap_connector().delete(f"{endpoint}{target_node_id}/") return result except Exception as e: return {"error": f"Failed to unlink nodes: {str(e)}"} # High-level Workflow Creation elif action == "create_sequential_workflow": if not workflow_id or not job_template_ids: return {"error": "workflow_id and job_template_ids are required"} if len(job_template_ids) < 2: return {"error": "At least 2 job templates required for sequential workflow"} results = { "workflow_id": workflow_id, "created_nodes": [], "created_links": [], "errors": [] } # Create nodes for each job template node_ids = [] for i, job_template_id in enumerate(job_template_ids): try: node_data_seq = { "unified_job_template": job_template_id, "identifier": f"node_{i+1}_{job_template_id}" } node_result = get_aap_connector().post(f"workflow_job_templates/{workflow_id}/workflow_nodes/", node_data_seq) if "id" in node_result: node_ids.append(node_result["id"]) results["created_nodes"].append({ "node_id": node_result["id"], "job_template_id": job_template_id, "position": i + 1 }) else: results["errors"].append(f"Failed to create node for job template {job_template_id}: {node_result}") except Exception as e: results["errors"].append(f"Error creating node for job template {job_template_id}: {str(e)}") # Link nodes in sequence (success chain) using multiple attempts for i in range(len(node_ids) - 1): source_node_id = node_ids[i] target_node_id = node_ids[i + 1] linked = False # Try multiple linking methods for method_name, link_data in [ ("simple", {"id": target_node_id}), ("associate", {"id": target_node_id, "associate": True}) ]: try: link_result = get_aap_connector().post(f"workflow_job_template_nodes/{source_node_id}/success_nodes/", link_data) if "error" not in str(link_result).lower(): results["created_links"].append({ "source_node": source_node_id, "target_node": target_node_id, "link_type": "success", "method": method_name, "result": link_result }) linked = True break except Exception as e: continue if not linked: results["errors"].append(f"Failed to link node {source_node_id} to {target_node_id} with all methods") return results # Complete Workflow Creation (All-in-One) elif action == "create_complete_workflow": """Create a complete workflow with nodes, links, and survey in one operation""" if not workflow_data: return {"error": "workflow_data is required"} if not job_template_ids or len(job_template_ids) < 1: return {"error": "At least 1 job template ID required"} results = { "workflow_created": False, "workflow_id": None, "nodes_created": [], "links_created": [], "survey_created": False, "errors": [] } try: # Step 1: Create the workflow template workflow_result = get_aap_connector().post("workflow_job_templates/", workflow_data) if "id" not in workflow_result: return {"error": f"Failed to create workflow template: {workflow_result}"} workflow_id = workflow_result["id"] results["workflow_created"] = True results["workflow_id"] = workflow_id # Step 2: Create nodes for each job template node_ids = [] for i, job_template_id in enumerate(job_template_ids): try: node_data_complete = { "unified_job_template": job_template_id, "identifier": f"node_{i+1}_{job_template_id}" } node_result = get_aap_connector().post(f"workflow_job_templates/{workflow_id}/workflow_nodes/", node_data_complete) if "id" in node_result: node_ids.append(node_result["id"]) results["nodes_created"].append({ "node_id": node_result["id"], "job_template_id": job_template_id, "position": i + 1, "identifier": f"node_{i+1}_{job_template_id}" }) else: results["errors"].append(f"Failed to create node for job template {job_template_id}: {node_result}") except Exception as e: results["errors"].append(f"Error creating node for job template {job_template_id}: {str(e)}") # Step 3: Link nodes in sequence if more than one node if len(node_ids) > 1: for i in range(len(node_ids) - 1): source_node_id = node_ids[i] target_node_id = node_ids[i + 1] linked = False # Try multiple linking methods for method_name, link_data in [ ("simple", {"id": target_node_id}), ("associate", {"id": target_node_id, "associate": True}) ]: try: link_result = get_aap_connector().post(f"workflow_job_template_nodes/{source_node_id}/success_nodes/", link_data) # Check if linking was successful if "id" in str(link_result) or "message" in str(link_result) or link_result == {}: results["links_created"].append({ "source_node": source_node_id, "target_node": target_node_id, "link_type": "success", "method": method_name, "result": link_result }) linked = True break except Exception as e: continue if not linked: results["errors"].append(f"Failed to link node {source_node_id} to {target_node_id}") # Step 4: Create survey if provided if survey_data: try: # Enable survey on the workflow template get_aap_connector().patch(f"workflow_job_templates/{workflow_id}/", {"survey_enabled": True}) # Create the survey survey_result = get_aap_connector().post(f"workflow_job_templates/{workflow_id}/survey_spec/", survey_data) if "error" not in str(survey_result).lower(): results["survey_created"] = True else: results["errors"].append(f"Failed to create survey: {survey_result}") except Exception as e: results["errors"].append(f"Error creating survey: {str(e)}") return results except Exception as e: results["errors"].append(f"Critical error in workflow creation: {str(e)}") return results # Schedule Operations elif action == "list_schedules": params = filters or {} return get_aap_connector().get("schedules/", params) elif action == "create_schedule": if not schedule_data: return {"error": "schedule_data is required"} return get_aap_connector().post("schedules/", schedule_data) elif action == "update_schedule": if not schedule_id or not schedule_data: return {"error": "schedule_id and schedule_data are required"} return get_aap_connector().patch(f"schedules/{schedule_id}/", schedule_data) # Notification Operations elif action == "list_notifications": params = filters or {} return get_aap_connector().get("notification_templates/", params) else: return {"error": f"Unknown action: {action}"} except Exception as e: return {"error": f"Workflow/Automation management failed: {str(e)}"}

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/anshulbehl/aap-mcp-pilot'

If you have feedback or need assistance with the MCP directory API, please join our Discord server