Skip to main content
Glama
semgrep_scanner.py7.89 kB
"""Semgrep scanner integration.""" import json import logging import subprocess import tempfile from pathlib import Path from typing import Optional from ..asvs.mapper import get_asvs_mapper from ..config import settings from ..models import Location, ScanResult, SeverityLevel from .base import BaseScanner logger = logging.getLogger(__name__) class SemgrepScanner(BaseScanner): """Security scanner using Semgrep.""" # Language support SUPPORTED_LANGUAGES = { "python": "py", "py": "py", "javascript": "js", "js": "js", "typescript": "ts", "ts": "ts", "java": "java", "go": "go", "ruby": "rb", "php": "php", "c": "c", "cpp": "cpp", "csharp": "cs", "rust": "rs", } # Map Semgrep severity to our severity levels SEVERITY_MAP = { "ERROR": SeverityLevel.HIGH, "WARNING": SeverityLevel.MEDIUM, "INFO": SeverityLevel.LOW, } def __init__(self, rules_dir: Optional[Path] = None) -> None: """ Initialize Semgrep scanner. Args: rules_dir: Directory containing custom Semgrep rules """ self.rules_dir = rules_dir or settings.rules_path self.mapper = get_asvs_mapper() async def scan( self, code: str, language: str, filename: Optional[str] = None, ) -> list[ScanResult]: """ Scan code using Semgrep. Args: code: Source code to scan language: Programming language filename: Optional filename for context Returns: List of security findings """ if not self.supports_language(language): logger.warning(f"Language not supported by Semgrep: {language}") return [] # Determine file extension extension = self._get_file_extension(language) if not filename: filename = f"input.{extension}" # Create temporary file with the code with tempfile.TemporaryDirectory() as tmpdir: tmpdir_path = Path(tmpdir) code_file = tmpdir_path / filename # Create parent directories if filename has path components code_file.parent.mkdir(parents=True, exist_ok=True) code_file.write_text(code, encoding="utf-8") # Run Semgrep results = await self._run_semgrep(code_file, tmpdir_path) return results def supports_language(self, language: str) -> bool: """Check if Semgrep supports this language.""" return language.lower() in self.SUPPORTED_LANGUAGES async def _run_semgrep(self, code_file: Path, working_dir: Path) -> list[ScanResult]: """ Run Semgrep on a file. Args: code_file: Path to code file working_dir: Working directory Returns: List of scan results """ try: # Build Semgrep command cmd = [ "semgrep", "--config=auto", # Use Semgrep Registry rules "--json", "--no-git-ignore", "--timeout", str(settings.scan_timeout), ] # Add custom rules if available if self.rules_dir.exists(): cmd.extend(["--config", str(self.rules_dir)]) logger.debug(f"Using custom rules from: {self.rules_dir}") cmd.append(str(code_file)) # Run Semgrep logger.debug(f"Running: {' '.join(cmd)}") result = subprocess.run( cmd, cwd=working_dir, capture_output=True, text=True, timeout=settings.scan_timeout, ) # Parse JSON output if result.stdout: output = json.loads(result.stdout) return self._parse_semgrep_output(output, code_file.name) else: logger.warning("Semgrep returned no output") return [] except subprocess.TimeoutExpired: logger.error(f"Semgrep scan timed out after {settings.scan_timeout}s") return [] except FileNotFoundError: logger.error("Semgrep not found - please install: pip install semgrep") return [] except json.JSONDecodeError as e: logger.error(f"Failed to parse Semgrep output: {e}") return [] except Exception as e: logger.error(f"Semgrep scan failed: {e}", exc_info=True) return [] def _parse_semgrep_output(self, output: dict, filename: str) -> list[ScanResult]: """ Parse Semgrep JSON output into ScanResult objects. Args: output: Semgrep JSON output filename: Filename that was scanned Returns: List of scan results """ results = [] for finding in output.get("results", []): try: # Extract rule metadata rule_id = finding.get("check_id", "") message = finding.get("extra", {}).get("message", finding.get("message", "")) severity = finding.get("extra", {}).get("severity", "WARNING") # Extract location info start = finding.get("start", {}) end = finding.get("end", {}) line = start.get("line") column = start.get("col") end_line = end.get("line") # Extract code snippet code_snippet = finding.get("extra", {}).get("lines", "") # Extract metadata metadata = finding.get("extra", {}).get("metadata", {}) vulnerability_type = metadata.get("vulnerability", rule_id.split(".")[-1]) cwe = metadata.get("cwe") asvs_id = metadata.get("asvs_id") # Map to ASVS requirements asvs_requirements = [] if asvs_id: asvs_requirements.append(asvs_id) else: # Try to map based on vulnerability type and CWE asvs_requirements = self.mapper.map_finding_to_asvs( vulnerability_type=vulnerability_type, cwe=cwe, ) # Build remediation message remediation = metadata.get("remediation", "Review and fix this security issue.") # Create scan result result = ScanResult( severity=self.SEVERITY_MAP.get(severity.upper(), SeverityLevel.MEDIUM), vulnerability_type=vulnerability_type, asvs_requirements=asvs_requirements, location=Location( line=line, column=column, end_line=end_line, filename=filename, code_snippet=code_snippet, ), message=message, remediation=remediation, scanner="semgrep", metadata={ "rule_id": rule_id, "cwe": cwe, }, ) results.append(result) except Exception as e: logger.error(f"Error parsing Semgrep finding: {e}", exc_info=True) logger.debug(f"Finding data: {finding}") continue return results def _get_file_extension(self, language: str) -> str: """Get file extension for a language.""" return self.SUPPORTED_LANGUAGES.get(language.lower(), "txt")

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/harn1shmodi/vsguard-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server