RunCheckovScan
Scan Terraform code for security vulnerabilities and compliance issues using Checkov static analysis to identify misconfigurations in AWS infrastructure.
Instructions
Run Checkov security scan on Terraform code.
This tool runs Checkov to scan Terraform code for security and compliance issues,
identifying potential vulnerabilities and misconfigurations according to best practices.
Checkov (https://www.checkov.io/) is an open-source static code analysis tool that
can detect hundreds of security and compliance issues in infrastructure-as-code.
Parameters:
working_directory: Directory containing Terraform files to scan
framework: Framework to scan (default: terraform)
check_ids: Optional list of specific check IDs to run
skip_check_ids: Optional list of check IDs to skip
output_format: Format for scan results (default: json)
Returns:
A CheckovScanResult object containing scan results and identified vulnerabilities
Input Schema
TableJSON Schema
| Name | Required | Description | Default |
|---|---|---|---|
| working_directory | Yes | Directory containing Terraform files | |
| framework | No | Framework to scan (terraform, cloudformation, etc.) | terraform |
| check_ids | No | Specific check IDs to run | |
| skip_check_ids | No | Check IDs to skip | |
| output_format | No | Output format (json, cli, etc.) | json |
Implementation Reference
- The core handler function that performs the Checkov scan: installs Checkov if needed, validates inputs for security, builds and executes the checkov subprocess command, parses JSON or CLI output into structured vulnerabilities, and returns CheckovScanResult.async def run_checkov_scan_impl(request: CheckovScanRequest) -> CheckovScanResult: """Run Checkov scan on Terraform code. Args: request: Details about the Checkov scan to execute Returns: A CheckovScanResult object containing scan results and vulnerabilities """ logger.info(f'Running Checkov scan in {request.working_directory}') # Ensure Checkov is installed if not _ensure_checkov_installed(): return CheckovScanResult( status='error', working_directory=request.working_directory, error_message='Failed to install Checkov. Please install it manually with: pip install checkov', vulnerabilities=[], summary={}, raw_output=None, ) # Security checks for parameters # Check framework parameter for allowed values allowed_frameworks = ['terraform', 'cloudformation', 'kubernetes', 'dockerfile', 'arm', 'all'] if request.framework not in allowed_frameworks: logger.error(f'Security violation: Invalid framework: {request.framework}') return CheckovScanResult( status='error', working_directory=request.working_directory, error_message=f"Security violation: Invalid framework '{request.framework}'. Allowed frameworks are: {', '.join(allowed_frameworks)}", vulnerabilities=[], summary={}, raw_output=None, ) # Check output_format parameter for allowed values allowed_output_formats = [ 'cli', 'csv', 'cyclonedx', 'cyclonedx_json', 'spdx', 'json', 'junitxml', 'github_failed_only', 'gitlab_sast', 'sarif', ] if request.output_format not in allowed_output_formats: logger.error(f'Security violation: Invalid output format: {request.output_format}') return CheckovScanResult( status='error', working_directory=request.working_directory, error_message=f"Security violation: Invalid output format '{request.output_format}'. Allowed formats are: {', '.join(allowed_output_formats)}", vulnerabilities=[], summary={}, raw_output=None, ) # Check for command injection patterns in check_ids and skip_check_ids dangerous_patterns = get_dangerous_patterns() logger.debug(f'Checking for {len(dangerous_patterns)} dangerous patterns') if request.check_ids: for check_id in request.check_ids: for pattern in dangerous_patterns: if pattern in check_id: logger.error( f"Security violation: Potentially dangerous pattern '{pattern}' in check_id: {check_id}" ) return CheckovScanResult( status='error', working_directory=request.working_directory, error_message=f"Security violation: Potentially dangerous pattern '{pattern}' detected in check_id", vulnerabilities=[], summary={}, raw_output=None, ) if request.skip_check_ids: for skip_id in request.skip_check_ids: for pattern in dangerous_patterns: if pattern in skip_id: logger.error( f"Security violation: Potentially dangerous pattern '{pattern}' in skip_check_id: {skip_id}" ) return CheckovScanResult( status='error', working_directory=request.working_directory, error_message=f"Security violation: Potentially dangerous pattern '{pattern}' detected in skip_check_id", vulnerabilities=[], summary={}, raw_output=None, ) # Build the command # Convert working_directory to absolute path if it's not already working_dir = request.working_directory if not os.path.isabs(working_dir): # Get the current working directory of the MCP server current_dir = os.getcwd() # Go up to the project root directory (assuming we're in src/terraform-mcp-server/awslabs/terraform_mcp_server) project_root = os.path.abspath(os.path.join(current_dir, '..', '..', '..', '..')) # Join with the requested working directory working_dir = os.path.abspath(os.path.join(project_root, working_dir)) logger.info(f'Using absolute working directory: {working_dir}') cmd = ['checkov', '--quiet', '-d', working_dir] # Add framework if specified if request.framework: cmd.extend(['--framework', request.framework]) # Add specific check IDs if provided if request.check_ids: cmd.extend(['--check', ','.join(request.check_ids)]) # Add skip check IDs if provided if request.skip_check_ids: cmd.extend(['--skip-check', ','.join(request.skip_check_ids)]) # Set output format cmd.extend(['--output', request.output_format]) # Execute command try: logger.info(f'Executing command: {" ".join(cmd)}') process = subprocess.run( cmd, capture_output=True, text=True, ) # Clean output text stdout = _clean_output_text(process.stdout) stderr = _clean_output_text(process.stderr) # Debug logging logger.info(f'Checkov return code: {process.returncode}') logger.info(f'Checkov stdout: {stdout}') logger.info(f'Checkov stderr: {stderr}') # Parse results if JSON output was requested vulnerabilities = [] summary = {} if request.output_format == 'json' and stdout: vulnerabilities, summary = _parse_checkov_json_output(stdout) # For non-JSON output, try to parse vulnerabilities from the text output elif stdout and process.returncode == 1: # Return code 1 means vulnerabilities were found # Simple regex to extract failed checks from CLI output failed_checks = re.findall( r'Check: (CKV\w*_\d+).*?FAILED for resource: ([\w\.]+).*?File: ([\w\/\.-]+):(\d+)', stdout, re.DOTALL, ) for check_id, resource, file_path, line in failed_checks: vuln = CheckovVulnerability( id=check_id, type='terraform', resource=resource, file_path=file_path, line=int(line), description=f'Failed check: {check_id}', guideline=None, severity='MEDIUM', fixed=False, fix_details=None, ) vulnerabilities.append(vuln) # Extract summary counts passed_match = re.search(r'Passed checks: (\d+)', stdout) failed_match = re.search(r'Failed checks: (\d+)', stdout) skipped_match = re.search(r'Skipped checks: (\d+)', stdout) summary = { 'passed': int(passed_match.group(1)) if passed_match else 0, 'failed': int(failed_match.group(1)) if failed_match else 0, 'skipped': int(skipped_match.group(1)) if skipped_match else 0, } # Prepare the result - consider it a success even if vulnerabilities were found # A return code of 1 from Checkov means vulnerabilities were found, not an error is_error = process.returncode not in [0, 1] result = CheckovScanResult( status='error' if is_error else 'success', return_code=process.returncode, working_directory=request.working_directory, vulnerabilities=vulnerabilities, summary=summary, raw_output=stdout, ) return result except Exception as e: logger.error(f'Error running Checkov scan: {e}') return CheckovScanResult( status='error', working_directory=request.working_directory, error_message=str(e), vulnerabilities=[], summary={}, raw_output=None, )
- Pydantic models defining the input schema (CheckovScanRequest), output schema (CheckovScanResult), and vulnerability details (CheckovVulnerability) for type validation and serialization.class CheckovVulnerability(BaseModel): """Model representing a security vulnerability found by Checkov. Attributes: id: The Checkov check ID (e.g., CKV_AWS_1). type: The type of check (e.g., terraform_aws). resource: The resource identifier where the vulnerability was found. file_path: Path to the file containing the vulnerability. line: Line number where the vulnerability was found. description: Description of the vulnerability. guideline: Recommended fix or security guideline. severity: Severity level of the vulnerability. fixed: Whether the vulnerability has been fixed. fix_details: Details about how the vulnerability was fixed (if applicable). """ id: str = Field(..., description='Checkov check ID') type: str = Field(..., description='Type of security check') resource: str = Field(..., description='Resource identifier') file_path: str = Field(..., description='Path to the file with the vulnerability') line: int = Field(..., description='Line number of the vulnerability') description: str = Field(..., description='Description of the vulnerability') guideline: Optional[str] = Field(None, description='Recommended fix or guideline') severity: str = Field('MEDIUM', description='Severity level (HIGH, MEDIUM, LOW)') fixed: bool = Field(False, description='Whether the vulnerability has been fixed') fix_details: Optional[str] = Field(None, description='Details about the fix applied') class CheckovScanRequest(BaseModel): """Request model for Checkov scan execution. Attributes: working_directory: Directory containing Terraform files to scan. framework: Framework to scan (default: terraform). check_ids: Optional list of specific check IDs to run. skip_check_ids: Optional list of check IDs to skip. output_format: Format for the scan results output. """ working_directory: str = Field(..., description='Directory containing Terraform files') framework: str = Field( 'terraform', description='Framework to scan (terraform, cloudformation, etc.)' ) check_ids: Optional[List[str]] = Field(None, description='Specific check IDs to run') skip_check_ids: Optional[List[str]] = Field(None, description='Check IDs to skip') output_format: str = Field('json', description='Output format (json, cli, etc.)') class CheckovScanResult(BaseModel): """Result model for Checkov scan execution. Attributes: status: Execution status (success/error). return_code: The command's return code (0 for success). working_directory: Directory where the scan was executed. error_message: Optional error message if execution failed. vulnerabilities: List of vulnerabilities found by the scan. summary: Summary of the scan results. raw_output: Raw output from the Checkov command. """ status: Literal['success', 'error'] return_code: Optional[int] = None working_directory: str error_message: Optional[str] = None vulnerabilities: List[CheckovVulnerability] = Field( [], description='List of found vulnerabilities' ) summary: Dict[str, Any] = Field({}, description='Summary of scan results') raw_output: Optional[str] = Field(None, description='Raw output from Checkov')
- awslabs/terraform_mcp_server/server.py:303-338 (registration)MCP tool registration using @mcp.tool decorator, defining the tool name, input parameters via Field descriptions (which match the schema), and delegating to the impl function.@mcp.tool(name='RunCheckovScan') async def run_checkov_scan( working_directory: str = Field(..., description='Directory containing Terraform files'), framework: str = Field( 'terraform', description='Framework to scan (terraform, cloudformation, etc.)' ), check_ids: Optional[List[str]] = Field(None, description='Specific check IDs to run'), skip_check_ids: Optional[List[str]] = Field(None, description='Check IDs to skip'), output_format: str = Field('json', description='Output format (json, cli, etc.)'), ) -> CheckovScanResult: """Run Checkov security scan on Terraform code. This tool runs Checkov to scan Terraform code for security and compliance issues, identifying potential vulnerabilities and misconfigurations according to best practices. Checkov (https://www.checkov.io/) is an open-source static code analysis tool that can detect hundreds of security and compliance issues in infrastructure-as-code. Parameters: working_directory: Directory containing Terraform files to scan framework: Framework to scan (default: terraform) check_ids: Optional list of specific check IDs to run skip_check_ids: Optional list of check IDs to skip output_format: Format for scan results (default: json) Returns: A CheckovScanResult object containing scan results and identified vulnerabilities """ request = CheckovScanRequest( working_directory=working_directory, framework=framework, check_ids=check_ids, skip_check_ids=skip_check_ids, output_format=output_format, ) return await run_checkov_scan_impl(request)
- Helper function to clean Checkov output by removing ANSI escapes, control characters, and replacing Unicode/box-drawing chars with ASCII equivalents for safe handling.def _clean_output_text(text: str) -> str: """Clean output text by removing or replacing problematic Unicode characters. Args: text: The text to clean Returns: Cleaned text with ASCII-friendly replacements """ if not text: return text # First remove ANSI escape sequences (color codes, cursor movement) ansi_escape = re.compile(r'\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])') text = ansi_escape.sub('', text) # Remove C0 and C1 control characters (except common whitespace) control_chars = re.compile(r'[\x00-\x08\x0B-\x0C\x0E-\x1F\x7F-\x9F]') text = control_chars.sub('', text) # Replace HTML entities html_entities = { '->': '->', # Replace HTML arrow '<': '<', # Less than '>': '>', # Greater than '&': '&', # Ampersand } for entity, replacement in html_entities.items(): text = text.replace(entity, replacement) # Replace box-drawing and other special Unicode characters with ASCII equivalents unicode_chars = { '\u2500': '-', # Horizontal line '\u2502': '|', # Vertical line '\u2514': '+', # Up and right '\u2518': '+', # Up and left '\u2551': '|', # Double vertical '\u2550': '-', # Double horizontal '\u2554': '+', # Double down and right '\u2557': '+', # Double down and left '\u255a': '+', # Double up and right '\u255d': '+', # Double up and left '\u256c': '+', # Double cross '\u2588': '#', # Full block '\u25cf': '*', # Black circle '\u2574': '-', # Left box drawing '\u2576': '-', # Right box drawing '\u2577': '|', # Down box drawing '\u2575': '|', # Up box drawing } for char, replacement in unicode_chars.items(): text = text.replace(char, replacement) return text
- Helper function to parse Checkov JSON output into list of vulnerabilities and summary dict.def _parse_checkov_json_output(output: str) -> Tuple[List[CheckovVulnerability], Dict[str, Any]]: """Parse Checkov JSON output into structured vulnerability data. Args: output: JSON output from Checkov scan Returns: Tuple of (list of vulnerabilities, summary dictionary) """ try: data = json.loads(output) vulnerabilities = [] summary = { 'passed': 0, 'failed': 0, 'skipped': 0, 'parsing_errors': 0, 'resource_count': 0, } # Extract summary information if 'summary' in data: summary = data['summary'] # Process check results if 'results' in data and 'failed_checks' in data['results']: for check in data['results']['failed_checks']: vuln = CheckovVulnerability( id=check.get('check_id', 'UNKNOWN'), type=check.get('check_type', 'terraform'), resource=check.get('resource', 'UNKNOWN'), file_path=check.get('file_path', 'UNKNOWN'), line=check.get('file_line_range', [0, 0])[0], description=check.get('check_name', 'UNKNOWN'), guideline=check.get('guideline', None), severity=(check.get('severity', 'MEDIUM') or 'MEDIUM').upper(), fixed=False, fix_details=None, ) vulnerabilities.append(vuln) return vulnerabilities, summary except json.JSONDecodeError as e: logger.error(f'Failed to parse Checkov JSON output: {e}') return [], {'error': 'Failed to parse JSON output'}