semgrep_scanner.py•3.31 kB
# semgrep_scanner.py
import logging
import subprocess
import os
import shlex
from datetime import datetime
from .utils import parse_json_file # Relative import
SEMGREP_TIMEOUT_SECONDS = 600 # 10 minutes default
def run_semgrep(code_path: str, config: str = "auto", output_dir="results", timeout=SEMGREP_TIMEOUT_SECONDS):
    """Runs the Semgrep CLI tool."""
    if not os.path.isdir(code_path):
        logging.error(f"Semgrep target path is not a valid directory: {code_path}")
        return []
    logging.info(f"Starting Semgrep scan for codebase: {code_path} using config: {config}")
    timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
    output_filename = f"semgrep_output_{timestamp}.json"
    output_filepath = os.path.join(output_dir, output_filename)
    if not os.path.exists(output_dir):
        os.makedirs(output_dir)
    # Use --json for machine-readable output
    command = ["semgrep", "scan", "--config", config, "--json", "-o", output_filepath, code_path]
    logging.debug(f"Executing Semgrep command: {' '.join(shlex.quote(cmd) for cmd in command)}")
    try:
        result = subprocess.run(command, capture_output=True, text=True, timeout=timeout, check=False) # check=False
        logging.info("Semgrep process finished.")
        logging.debug(f"Semgrep stdout:\n{result.stdout}") # Often has progress info
        # if result.stderr:
        #      logging.warning(f"Semgrep stderr:\n{result.stderr}")
        #      return [f"semgrep stderr: \n{result.stderr}"]
        if result.returncode != 0:
            logging.warning(f"Semgrep exited with non-zero status code: {result.returncode}")
            return [f"Semgrep exited with non-zero status code: {result.returncode}"]
            # It might still produce output even with errors (e.g., parse errors)
        # Parse the JSON output file
        report_data = parse_json_file(output_filepath)
        if report_data and "results" in report_data:
             findings = report_data["results"]
             logging.info(f"Successfully parsed {len(findings)} findings from Semgrep output.")
             # Add tool name for context
             for finding in findings:
                 finding['tool'] = 'Semgrep'
                 # Simplify structure slightly if needed
                 finding['message'] = finding.get('extra', {}).get('message')
                 finding['severity'] = finding.get('extra', {}).get('severity')
                 finding['code_snippet'] = finding.get('extra', {}).get('lines')
             return findings
        else:
             logging.warning(f"Could not parse findings from Semgrep output file: {output_filepath}")
             return [f"Could not parse findings from Semgrep output file: {output_filepath}"]
    except subprocess.TimeoutExpired:
        logging.error(f"Semgrep scan timed out after {timeout} seconds.")
        return [f"Semgrep scan timed out after {timeout} seconds."]
    except FileNotFoundError:
        logging.error("Semgrep command not found. Is Semgrep installed and in PATH?")
        return ["Semgrep command not found. Is Semgrep installed and in PATH?"]
    except Exception as e:
        logging.error(f"An unexpected error occurred while running Semgrep: {e}")
        return [f"An unexpected error occurred while running Semgrep: {e}"]