terraform-cloud-mcp

by severity1
Verified
  • tools
"""Run management tools for Terraform Cloud MCP""" from typing import Optional, List, Dict, Any, Union from api.client import make_api_request from utils.validators import validate_organization # Run operation constants RUN_OPERATIONS = ["plan_only", "plan_and_apply", "save_plan", "refresh_only", "destroy", "empty_apply"] # Run status constants RUN_STATUSES = ["pending", "fetching", "fetching_completed", "pre_plan_running", "pre_plan_completed", "queuing", "plan_queued", "planning", "planned", "cost_estimating", "cost_estimated", "policy_checking", "policy_override", "policy_soft_failed", "policy_checked", "confirmed", "post_plan_running", "post_plan_completed", "planned_and_finished", "planned_and_saved", "apply_queued", "applying", "applied", "discarded", "errored", "canceled", "force_canceled"] # Run sources RUN_SOURCES = ["tfe-ui", "tfe-api", "tfe-configuration-version"] # Run status groups RUN_STATUS_GROUPS = ["non_final", "final", "discardable"] async def create_run( organization: str, workspace_name: str, message: str = "", auto_apply: Optional[bool] = None, is_destroy: bool = False, refresh: bool = True, refresh_only: bool = False, plan_only: bool = False, allow_empty_apply: bool = False, allow_config_generation: bool = False, target_addrs: List[str] = [], replace_addrs: List[str] = [], variables: List[Dict[str, str]] = [], terraform_version: str = "", save_plan: bool = False, configuration_version_id: str = "", debugging_mode: bool = False ) -> dict: """ Create a run in a workspace Args: organization: The organization name (required) workspace_name: The name of the workspace to run in (required) message: Message to include with the run (default: "Queued manually via Terraform Cloud MCP") auto_apply: Whether to auto-apply the run when planned (defaults to workspace setting) is_destroy: Whether this run should destroy all resources (default: false) refresh: Whether to refresh state before plan (default: true) refresh_only: Whether this is a refresh-only run (default: false) plan_only: Whether this is a speculative, plan-only run (default: false) allow_empty_apply: Whether to allow apply when there are no changes (default: false) allow_config_generation: Whether to allow generating config for imports (default: false) target_addrs: Resource addresses to target (optional) replace_addrs: Resource addresses to replace (optional) variables: Run-specific variables [{"key": "name", "value": "value"}] (optional) terraform_version: Specific Terraform version (only valid for plan-only runs) save_plan: Whether to save the plan without becoming the current run (default: false) configuration_version_id: The configuration version ID to use (defaults to workspace's latest) debugging_mode: Enable debug logging for this run (default: false) Returns: The created run details """ # Validate organization name valid, error_message = validate_organization(organization) if not valid: return {"error": error_message} if not workspace_name: return {"error": "Workspace name is required"} # Validate mutually exclusive options if is_destroy and refresh_only: return {"error": "is_destroy and refresh_only are mutually exclusive"} # First, get the workspace ID (required for creating a run) success, workspace_data = await make_api_request( f"organizations/{organization}/workspaces/{workspace_name}" ) if not success: return workspace_data # Return error from workspace lookup # Extract the workspace ID try: workspace_id = workspace_data["data"]["id"] except (KeyError, TypeError): return {"error": "Failed to get workspace ID"} # Build the request payload payload: Dict[str, Any] = { "data": { "type": "runs", "relationships": { "workspace": { "data": { "type": "workspaces", "id": workspace_id } } } } } # Add configuration version if specified if configuration_version_id: payload["data"]["relationships"]["configuration-version"] = { "data": { "type": "configuration-versions", "id": configuration_version_id } } # Add attributes payload["data"]["attributes"] = {} # Add message if specified (with default if not provided) if message: payload["data"]["attributes"]["message"] = message else: payload["data"]["attributes"]["message"] = "Queued manually via Terraform Cloud MCP" # Add boolean attributes if auto_apply is not None: payload["data"]["attributes"]["auto-apply"] = auto_apply if is_destroy: payload["data"]["attributes"]["is-destroy"] = True if not refresh: # Only add if different from default (true) payload["data"]["attributes"]["refresh"] = False if refresh_only: payload["data"]["attributes"]["refresh-only"] = True if plan_only: payload["data"]["attributes"]["plan-only"] = True if allow_empty_apply: payload["data"]["attributes"]["allow-empty-apply"] = True if allow_config_generation: payload["data"]["attributes"]["allow-config-generation"] = True if save_plan: payload["data"]["attributes"]["save-plan"] = True if debugging_mode: payload["data"]["attributes"]["debugging-mode"] = True # Add terraform version (only valid for plan-only runs) if terraform_version: if not plan_only: return {"error": "terraform_version can only be set for plan-only runs"} payload["data"]["attributes"]["terraform-version"] = terraform_version # Add target and replace addresses if specified if target_addrs: payload["data"]["attributes"]["target-addrs"] = target_addrs if replace_addrs: payload["data"]["attributes"]["replace-addrs"] = replace_addrs # Add run variables if specified if variables: payload["data"]["attributes"]["variables"] = variables # Make the API request success, data = await make_api_request( "runs", method="POST", data=payload ) if success: return data else: return data # Error info is already in the data dictionary async def list_runs_in_workspace( organization: str, workspace_name: str, page_number: int = 1, page_size: int = 20, filter_operation: str = "", filter_status: str = "", filter_source: str = "", filter_status_group: str = "", filter_timeframe: str = "", filter_agent_pool_names: str = "", search_user: str = "", search_commit: str = "", search_basic: str = "" ) -> dict: """ List runs in a workspace with comprehensive filtering and pagination Args: organization: The organization name (required) workspace_name: The workspace name to list runs for (required) page_number: Page number to fetch (default: 1) page_size: Number of results per page (default: 20) filter_operation: Filter runs by operation type, comma-separated (e.g. "plan_only,plan_and_apply") filter_status: Filter runs by status, comma-separated (e.g. "planned,applied,errored") filter_source: Filter runs by source, comma-separated (e.g. "tfe-api,tfe-ui") filter_status_group: Filter runs by status group (e.g. "non_final", "final", "discardable") filter_timeframe: Filter runs by timeframe ("year" or specific year) filter_agent_pool_names: Filter runs by agent pool names, comma-separated search_user: Search for runs by VCS username search_commit: Search for runs by commit SHA search_basic: Basic search across run ID, message, commit SHA, and username Returns: List of runs with pagination information """ # Validate organization name valid, error_message = validate_organization(organization) if not valid: return {"error": error_message} if not workspace_name: return {"error": "Workspace name is required"} # First, get the workspace ID (required for listing runs) success, workspace_data = await make_api_request( f"organizations/{organization}/workspaces/{workspace_name}" ) if not success: return workspace_data # Return error from workspace lookup # Extract the workspace ID try: workspace_id = workspace_data["data"]["id"] except (KeyError, TypeError): return {"error": "Failed to get workspace ID"} # Build query parameters params: Dict[str, Union[str, int]] = { "page[number]": page_number, "page[size]": page_size } # Add filters if specified if filter_operation: params["filter[operation]"] = filter_operation if filter_status: params["filter[status]"] = filter_status if filter_source: params["filter[source]"] = filter_source if filter_status_group: params["filter[status_group]"] = filter_status_group if filter_timeframe: params["filter[timeframe]"] = filter_timeframe if filter_agent_pool_names: params["filter[agent_pool_names]"] = filter_agent_pool_names # Add search parameters if specified if search_user: params["search[user]"] = search_user if search_commit: params["search[commit]"] = search_commit if search_basic: params["search[basic]"] = search_basic # Make the API request success, data = await make_api_request( f"workspaces/{workspace_id}/runs", method="GET", params=params ) if success: return data else: return data # Error info is already in the data dictionary async def list_runs_in_organization( organization: str, page_number: int = 1, page_size: int = 20, filter_operation: str = "", filter_status: str = "", filter_source: str = "", filter_status_group: str = "", filter_timeframe: str = "", filter_agent_pool_names: str = "", filter_workspace_names: str = "", search_user: str = "", search_commit: str = "", search_basic: str = "" ) -> dict: """ List runs in an organization with comprehensive filtering and pagination Args: organization: The organization name (required) page_number: Page number to fetch (default: 1) page_size: Number of results per page (default: 20) filter_operation: Filter runs by operation type, comma-separated (e.g. "plan_only,plan_and_apply") filter_status: Filter runs by status, comma-separated (e.g. "planned,applied,errored") filter_source: Filter runs by source, comma-separated (e.g. "tfe-api,tfe-ui") filter_status_group: Filter runs by status group (e.g. "non_final", "final", "discardable") filter_timeframe: Filter runs by timeframe ("year" or specific year) filter_agent_pool_names: Filter runs by agent pool names, comma-separated filter_workspace_names: Filter runs by workspace names, comma-separated search_user: Search for runs by VCS username search_commit: Search for runs by commit SHA search_basic: Basic search across run ID, message, commit SHA, and username Returns: List of runs with pagination information """ # Validate organization name valid, error_message = validate_organization(organization) if not valid: return {"error": error_message} # Build query parameters params: Dict[str, Union[str, int]] = { "page[number]": page_number, "page[size]": page_size } # Add filters if specified if filter_operation: params["filter[operation]"] = filter_operation if filter_status: params["filter[status]"] = filter_status if filter_source: params["filter[source]"] = filter_source if filter_status_group: params["filter[status_group]"] = filter_status_group if filter_timeframe: params["filter[timeframe]"] = filter_timeframe if filter_agent_pool_names: params["filter[agent_pool_names]"] = filter_agent_pool_names if filter_workspace_names: params["filter[workspace_names]"] = filter_workspace_names # Add search parameters if specified if search_user: params["search[user]"] = search_user if search_commit: params["search[commit]"] = search_commit if search_basic: params["search[basic]"] = search_basic # Make the API request success, data = await make_api_request( f"organizations/{organization}/runs", method="GET", params=params ) if success: return data else: return data # Error info is already in the data dictionary async def get_run_details( run_id: str ) -> dict: """ Get detailed information about a specific run Args: run_id: The ID of the run to retrieve details for (required) Returns: The run details """ if not run_id: return {"error": "Run ID is required"} # Make the API request success, data = await make_api_request( f"runs/{run_id}", method="GET" ) if success: return data else: return data # Error info is already in the data dictionary async def apply_run( run_id: str, comment: str = "" ) -> dict: """ Apply a run that is paused waiting for confirmation after a plan This applies runs in the "needs confirmation" and "policy checked" states. Args: run_id: The ID of the run to apply (required) comment: An optional comment about the run Returns: Success message or error details """ if not run_id: return {"error": "Run ID is required"} # Build the request payload with optional comment payload = {} if comment: payload = {"comment": comment} # Make the API request success, data = await make_api_request( f"runs/{run_id}/actions/apply", method="POST", data=payload ) if success: return {"status": "success", "message": "Run apply has been queued"} else: return data # Error info is already in the data dictionary async def discard_run( run_id: str, comment: str = "" ) -> dict: """ Discard a run that is paused waiting for confirmation This discards runs in the "pending", "needs confirmation", "policy checked", and "policy override" states. Args: run_id: The ID of the run to discard (required) comment: An optional explanation for why the run was discarded Returns: Success message or error details """ if not run_id: return {"error": "Run ID is required"} # Build the request payload with optional comment payload = {} if comment: payload = {"comment": comment} # Make the API request success, data = await make_api_request( f"runs/{run_id}/actions/discard", method="POST", data=payload ) if success: return {"status": "success", "message": "Run discard has been queued"} else: return data # Error info is already in the data dictionary async def cancel_run( run_id: str, comment: str = "" ) -> dict: """ Cancel a run that is currently planning or applying This is equivalent to hitting ctrl+c during a Terraform plan or apply on the CLI. The running Terraform process is sent an INT signal to end its work safely. Args: run_id: The ID of the run to cancel (required) comment: An optional explanation for why the run was canceled Returns: Success message or error details """ if not run_id: return {"error": "Run ID is required"} # Build the request payload with optional comment payload = {} if comment: payload = {"comment": comment} # Make the API request success, data = await make_api_request( f"runs/{run_id}/actions/cancel", method="POST", data=payload ) if success: return {"status": "success", "message": "Run cancel has been queued"} else: return data # Error info is already in the data dictionary async def force_cancel_run( run_id: str, comment: str = "" ) -> dict: """ Forcefully cancel a run immediately Unlike cancel_run, this action ends the run immediately and places it into a canceled state. The workspace is immediately unlocked. Note: This endpoint requires that a normal cancel is performed first, and a cool-off period has elapsed. Args: run_id: The ID of the run to force cancel (required) comment: An optional explanation for why the run was force canceled Returns: Success message or error details """ if not run_id: return {"error": "Run ID is required"} # Build the request payload with optional comment payload = {} if comment: payload = {"comment": comment} # Make the API request success, data = await make_api_request( f"runs/{run_id}/actions/force-cancel", method="POST", data=payload ) if success: return {"status": "success", "message": "Run has been force canceled"} else: return data # Error info is already in the data dictionary async def force_execute_run( run_id: str ) -> dict: """ Forcefully execute a run by canceling all prior runs This action cancels all prior runs that are not already complete, unlocking the run's workspace and allowing the run to be executed. This is the same as clicking "Run this plan now" in the UI. Prerequisites: - The target run must be in the "pending" state - The workspace must be locked by another run - The run locking the workspace must be in a discardable state Args: run_id: The ID of the run to execute (required) Returns: Success message or error details """ if not run_id: return {"error": "Run ID is required"} # Make the API request (no payload) success, data = await make_api_request( f"runs/{run_id}/actions/force-execute", method="POST" ) if success: return {"status": "success", "message": "Run force-execution has been initiated"} else: return data # Error info is already in the data dictionary