terraform-cloud-mcp

by severity1
Verified
  • tools
"""Workspace management tools for Terraform Cloud MCP""" from typing import Optional, List, Dict from api.client import make_api_request from utils.validators import validate_organization async def list_workspaces( organization: str, page_number: int = 1, page_size: int = 20, all_pages: bool = False, search_name: str = "", search_tags: str = "", search_exclude_tags: str = "", search_wildcard_name: str = "", sort: str = "", filter_project_id: str = "", filter_current_run_status: str = "", filter_tagged_key: str = "", filter_tagged_value: str = "" ) -> dict: """ List workspaces in an organization with comprehensive filtering and pagination Args: organization: The organization name (required) page_number: Page number to fetch (default: 1) page_size: Number of results per page (default: 20, max: 100) all_pages: If True, fetch all pages and combine results (default: False) search_name: Filter workspaces by name using fuzzy search search_tags: Filter workspaces with specific tags (comma separated) search_exclude_tags: Exclude workspaces with specific tags (comma separated) search_wildcard_name: Filter workspaces by name with wildcard matching (e.g., "prod-*", "*-test") sort: Sort workspaces by "name", "current-run.created-at", or "latest-change-at" (prefix with "-" for descending) filter_project_id: Filter workspaces belonging to a specific project filter_current_run_status: Filter workspaces by current run status filter_tagged_key: Filter workspaces by tag key filter_tagged_value: Filter workspaces by tag value (used with filter_tagged_key) Returns: List of workspaces with pagination information """ # Validate organization name valid, error_message = validate_organization(organization) if not valid: return {"error": error_message} # Validate pagination parameters if page_number < 1: return {"error": "Page number must be at least 1"} if page_size < 1 or page_size > 100: return {"error": "Page size must be between 1 and 100"} # Build query parameters params = { "page[number]": str(page_number), "page[size]": str(page_size) } # Add optional search and filter parameters if search_name: params["search[name]"] = search_name if search_tags: params["search[tags]"] = search_tags if search_exclude_tags: params["search[exclude-tags]"] = search_exclude_tags if search_wildcard_name: params["search[wildcard-name]"] = search_wildcard_name if sort: params["sort"] = sort if filter_project_id: params["filter[project][id]"] = filter_project_id if filter_current_run_status: params["filter[current-run][status]"] = filter_current_run_status if filter_tagged_key: params["filter[tagged][0][key]"] = filter_tagged_key if filter_tagged_value: params["filter[tagged][0][value]"] = filter_tagged_value # For all_pages mode, we'll start collecting all results if all_pages: all_workspaces = {"data": []} current_page = 1 while True: params["page[number]"] = str(current_page) success, page_data = await make_api_request( f"organizations/{organization}/workspaces", params=params ) if not success: return page_data # Return error info # Add this page's workspaces to our collection all_workspaces["data"].extend(page_data.get("data", [])) # Check if there's a next page meta = page_data.get("meta", {}) pagination = meta.get("pagination", {}) current_page = pagination.get("current-page", 1) total_pages = pagination.get("total-pages", 1) # Add pagination metadata if "meta" not in all_workspaces: all_workspaces["meta"] = meta # If we're on the last page, stop if current_page >= total_pages: break # Otherwise, continue to the next page current_page += 1 # Update the pagination info in the response if "meta" in all_workspaces: meta = all_workspaces["meta"] if isinstance(meta, dict) and "pagination" in meta: pagination = meta["pagination"] if isinstance(pagination, dict): pagination["current-page"] = 1 pagination["prev-page"] = None pagination["next-page"] = None return all_workspaces # Standard single-page mode else: success, data = await make_api_request( f"organizations/{organization}/workspaces", params=params ) if success: return data else: return data # Error info is already in the data dictionary async def get_workspace_details(organization: str, workspace_name: str) -> dict: """ Get details for a specific workspace Args: organization: The organization name (required) workspace_name: The workspace name (required) Returns: Workspace details """ # Validate organization name valid, error_message = validate_organization(organization) if not valid: return {"error": error_message} if not workspace_name: return {"error": "Workspace name is required"} success, data = await make_api_request(f"organizations/{organization}/workspaces/{workspace_name}") if success: return data else: return data # Error info is already in the data dictionary async def create_workspace( organization: str, name: str, description: str = "", terraform_version: str = "", working_directory: str = "", auto_apply: bool = False, file_triggers_enabled: bool = True, trigger_prefixes: list = [], trigger_patterns: list = [], queue_all_runs: bool = False, speculative_enabled: bool = True, global_remote_state: bool = False, execution_mode: str = "remote", allow_destroy_plan: bool = True, auto_apply_run_trigger: bool = False, project_id: str = "", vcs_repo: dict = {}, tags: list = [] ) -> dict: """ Create a new workspace in an organization Args: organization: The organization name (required) name: The name of the workspace (required) description: A description for the workspace terraform_version: Specific Terraform version to use (default: latest) working_directory: Relative path that Terraform will execute within auto_apply: Automatically apply changes when a Terraform plan is successful file_triggers_enabled: Whether to filter runs based on changed files in VCS trigger_prefixes: List of path prefixes that will trigger runs trigger_patterns: List of glob patterns that Terraform monitors for changes queue_all_runs: Whether runs should be queued immediately after workspace creation speculative_enabled: Whether this workspace allows automatic speculative plans global_remote_state: Whether all workspaces in the organization can access this workspace's state execution_mode: Which execution mode to use: "remote", "local", or "agent" allow_destroy_plan: Whether destroy plans can be queued on the workspace auto_apply_run_trigger: Whether to automatically apply changes from run triggers project_id: The ID of the project to create the workspace in vcs_repo: Settings for the workspace's VCS repository (optional) tags: List of tags to attach to the workspace Returns: The created workspace details """ # Validate organization name valid, error_message = validate_organization(organization) if not valid: return {"error": error_message} if not name: return {"error": "Workspace name is required"} # Build the request payload payload = { "data": { "type": "workspaces", "attributes": { "name": name } } } # Add optional attributes if provided if description: payload["data"]["attributes"]["description"] = description if terraform_version: payload["data"]["attributes"]["terraform-version"] = terraform_version if working_directory: payload["data"]["attributes"]["working-directory"] = working_directory # Add boolean attributes (only those that differ from API defaults need to be explicitly set) # Auto-apply defaults to false in API, so only set if true if auto_apply: payload["data"]["attributes"]["auto-apply"] = auto_apply # File-triggers-enabled defaults to true in API if file_triggers_enabled is False: payload["data"]["attributes"]["file-triggers-enabled"] = False # Queue-all-runs defaults to false in API if queue_all_runs: payload["data"]["attributes"]["queue-all-runs"] = queue_all_runs # Speculative-enabled defaults to true in API if speculative_enabled is False: payload["data"]["attributes"]["speculative-enabled"] = False # Global-remote-state defaults to false in API if global_remote_state: payload["data"]["attributes"]["global-remote-state"] = global_remote_state # Allow-destroy-plan defaults to true in API if allow_destroy_plan is False: payload["data"]["attributes"]["allow-destroy-plan"] = False # Auto-apply-run-trigger defaults to false in API if auto_apply_run_trigger: payload["data"]["attributes"]["auto-apply-run-trigger"] = auto_apply_run_trigger # Add trigger lists if provided if trigger_prefixes: payload["data"]["attributes"]["trigger-prefixes"] = trigger_prefixes if trigger_patterns: payload["data"]["attributes"]["trigger-patterns"] = trigger_patterns # Add execution mode if execution_mode not in ["remote", "local", "agent"]: return {"error": "Invalid execution mode. Must be one of: 'remote', 'local', 'agent'"} payload["data"]["attributes"]["execution-mode"] = execution_mode # Add VCS repository settings if provided if vcs_repo: if not isinstance(vcs_repo, dict): return {"error": "vcs_repo must be a dictionary"} payload["data"]["attributes"]["vcs-repo"] = vcs_repo # Add project relationship if provided if project_id: if "relationships" not in payload["data"]: payload["data"]["relationships"] = {} payload["data"]["relationships"]["project"] = { "data": { "id": project_id, "type": "projects" } } # Add tags if provided if tags: if not isinstance(tags, list): return {"error": "tags must be a list"} if "relationships" not in payload["data"]: payload["data"]["relationships"] = {} tag_bindings_data = [] for tag in tags: if not isinstance(tag, dict) or "key" not in tag or "value" not in tag: return {"error": "Each tag must be a dictionary with 'key' and 'value' fields"} tag_bindings_data.append({ "type": "tag-bindings", "attributes": { "key": tag["key"], "value": tag["value"] } }) if tag_bindings_data: payload["data"]["relationships"]["tag-bindings"] = { "data": tag_bindings_data } # Make the API request success, data = await make_api_request( f"organizations/{organization}/workspaces", method="POST", data=payload ) if success: return data else: return data # Error info is already in the data dictionary async def update_workspace( organization: str, workspace_name: str, name: str = "", description: str = "", terraform_version: str = "", working_directory: str = "", auto_apply: Optional[bool] = None, file_triggers_enabled: Optional[bool] = None, trigger_prefixes: list = [], trigger_patterns: list = [], queue_all_runs: Optional[bool] = None, speculative_enabled: Optional[bool] = None, global_remote_state: Optional[bool] = None, execution_mode: str = "", allow_destroy_plan: Optional[bool] = None, auto_apply_run_trigger: Optional[bool] = None, project_id: str = "", vcs_repo: dict = {}, tags: list = [] ) -> dict: """ Update an existing workspace in an organization Args: organization: The organization name (required) workspace_name: Current name of the workspace to update (required) name: New name for the workspace (optional) description: A description for the workspace terraform_version: Specific Terraform version to use working_directory: Relative path that Terraform will execute within auto_apply: Automatically apply changes when a Terraform plan is successful file_triggers_enabled: Whether to filter runs based on changed files in VCS trigger_prefixes: List of path prefixes that will trigger runs trigger_patterns: List of glob patterns that Terraform monitors for changes queue_all_runs: Whether runs should be queued immediately after workspace creation speculative_enabled: Whether this workspace allows automatic speculative plans global_remote_state: Whether all workspaces in the organization can access this workspace's state execution_mode: Which execution mode to use: "remote", "local", or "agent" allow_destroy_plan: Whether destroy plans can be queued on the workspace auto_apply_run_trigger: Whether to automatically apply changes from run triggers project_id: The ID of the project to move the workspace to vcs_repo: Settings for the workspace's VCS repository, or null to remove VCS tags: List of tags to attach to the workspace Returns: The updated workspace details """ # Validate organization name valid, error_message = validate_organization(organization) if not valid: return {"error": error_message} if not workspace_name: return {"error": "Current workspace name is required"} # Build the request payload payload = { "data": { "type": "workspaces", "attributes": {} } } # Add new name if provided if name: payload["data"]["attributes"]["name"] = name # Add optional attributes if provided if description: payload["data"]["attributes"]["description"] = description if terraform_version: payload["data"]["attributes"]["terraform-version"] = terraform_version if working_directory: payload["data"]["attributes"]["working-directory"] = working_directory # Add boolean attributes if specified if auto_apply is not None: payload["data"]["attributes"]["auto-apply"] = auto_apply if file_triggers_enabled is not None: payload["data"]["attributes"]["file-triggers-enabled"] = file_triggers_enabled if queue_all_runs is not None: payload["data"]["attributes"]["queue-all-runs"] = queue_all_runs if speculative_enabled is not None: payload["data"]["attributes"]["speculative-enabled"] = speculative_enabled if global_remote_state is not None: payload["data"]["attributes"]["global-remote-state"] = global_remote_state if allow_destroy_plan is not None: payload["data"]["attributes"]["allow-destroy-plan"] = allow_destroy_plan if auto_apply_run_trigger is not None: payload["data"]["attributes"]["auto-apply-run-trigger"] = auto_apply_run_trigger # Add trigger lists if provided if trigger_prefixes: payload["data"]["attributes"]["trigger-prefixes"] = trigger_prefixes if trigger_patterns: payload["data"]["attributes"]["trigger-patterns"] = trigger_patterns # Add execution mode if provided if execution_mode: if execution_mode not in ["remote", "local", "agent"]: return {"error": "Invalid execution mode. Must be one of: 'remote', 'local', 'agent'"} payload["data"]["attributes"]["execution-mode"] = execution_mode # Add VCS repository settings if provided if vcs_repo is not None: if not isinstance(vcs_repo, dict): return {"error": "vcs_repo must be a dictionary or null"} payload["data"]["attributes"]["vcs-repo"] = vcs_repo # Add project relationship if provided if project_id: if "relationships" not in payload["data"]: payload["data"]["relationships"] = {} payload["data"]["relationships"]["project"] = { "data": { "id": project_id, "type": "projects" } } # Add tags if provided if tags: if not isinstance(tags, list): return {"error": "tags must be a list"} if "relationships" not in payload["data"]: payload["data"]["relationships"] = {} tag_bindings_data = [] for tag in tags: if not isinstance(tag, dict) or "key" not in tag or "value" not in tag: return {"error": "Each tag must be a dictionary with 'key' and 'value' fields"} tag_bindings_data.append({ "type": "tag-bindings", "attributes": { "key": tag["key"], "value": tag["value"] } }) payload["data"]["relationships"]["tag-bindings"] = { "data": tag_bindings_data } # Make the API request success, data = await make_api_request( f"organizations/{organization}/workspaces/{workspace_name}", method="PATCH", data=payload ) if success: return data else: return data # Error info is already in the data dictionary async def delete_workspace(organization: str, workspace_name: str) -> dict: """ Delete a workspace from an organization Args: organization: The organization name (required) workspace_name: The name of the workspace to delete (required) Returns: Success message or error details """ # Validate organization name valid, error_message = validate_organization(organization) if not valid: return {"error": error_message} if not workspace_name: return {"error": "Workspace name is required"} # Make the API request success, data = await make_api_request( f"organizations/{organization}/workspaces/{workspace_name}", method="DELETE" ) if success: return {"status": "success", "message": f"Workspace '{workspace_name}' deleted successfully"} else: return data # Error info is already in the data dictionary async def safe_delete_workspace(organization: str, workspace_name: str) -> dict: """ Safely delete a workspace, but only if it's not managing any resources When you delete a Terraform workspace with resources, Terraform can no longer track or manage that infrastructure. During a safe delete, Terraform only deletes the workspace if it is not managing any resources. Args: organization: The organization name (required) workspace_name: The name of the workspace to delete (required) Returns: Success message or error details """ # Validate organization name valid, error_message = validate_organization(organization) if not valid: return {"error": error_message} if not workspace_name: return {"error": "Workspace name is required"} # Make the safe-delete API request success, data = await make_api_request( f"organizations/{organization}/workspaces/{workspace_name}/actions/safe-delete", method="POST" ) if success: return {"status": "success", "message": f"Workspace '{workspace_name}' safely deleted successfully"} else: return data # Error info is already in the data dictionary async def lock_workspace(organization: str, workspace_name: str, reason: str = "") -> dict: """ Lock a workspace to prevent Terraform runs Args: organization: The organization name (required) workspace_name: The name of the workspace to lock (required) reason: Optional reason for locking the workspace Returns: Updated workspace details or error message """ # Validate organization name valid, error_message = validate_organization(organization) if not valid: return {"error": error_message} if not workspace_name: return {"error": "Workspace name is required"} # Build the request payload payload = { "data": { "reason": reason if reason else "Locked via Terraform Cloud MCP Server" } } # First, get the workspace ID id_success, id_data = await make_api_request( f"organizations/{organization}/workspaces/{workspace_name}" ) if not id_success: return id_data # Return error from workspace lookup # Extract the workspace ID try: workspace_id = id_data["data"]["id"] except (KeyError, TypeError): return {"error": "Failed to get workspace ID"} # Make the API request success, data = await make_api_request( f"workspaces/{workspace_id}/actions/lock", method="POST", data=payload ) if success: return data else: return data # Error info is already in the data dictionary async def unlock_workspace(organization: str, workspace_name: str) -> dict: """ Unlock a workspace to allow Terraform runs Args: organization: The organization name (required) workspace_name: The name of the workspace to unlock (required) Returns: Updated workspace details or error message """ # Validate organization name valid, error_message = validate_organization(organization) if not valid: return {"error": error_message} if not workspace_name: return {"error": "Workspace name is required"} # First, get the workspace ID id_success, id_data = await make_api_request( f"organizations/{organization}/workspaces/{workspace_name}" ) if not id_success: return id_data # Return error from workspace lookup # Extract the workspace ID try: workspace_id = id_data["data"]["id"] except (KeyError, TypeError): return {"error": "Failed to get workspace ID"} # Make the API request success, data = await make_api_request( f"workspaces/{workspace_id}/actions/unlock", method="POST" ) if success: return data else: return data # Error info is already in the data dictionary async def force_unlock_workspace(organization: str, workspace_name: str) -> dict: """ Force unlock a workspace that may be locked by another user or process Args: organization: The organization name (required) workspace_name: The name of the workspace to force unlock (required) Returns: Updated workspace details or error message """ # Validate organization name valid, error_message = validate_organization(organization) if not valid: return {"error": error_message} if not workspace_name: return {"error": "Workspace name is required"} # First, get the workspace ID id_success, id_data = await make_api_request( f"organizations/{organization}/workspaces/{workspace_name}" ) if not id_success: return id_data # Return error from workspace lookup # Extract the workspace ID try: workspace_id = id_data["data"]["id"] except (KeyError, TypeError): return {"error": "Failed to get workspace ID"} # Make the API request success, data = await make_api_request( f"workspaces/{workspace_id}/actions/force-unlock", method="POST" ) if success: return data else: return data # Error info is already in the data dictionary