We provide all the information about MCP servers via our MCP API.
curl -X GET 'https://glama.ai/api/mcp/v1/servers/harn1shmodi/vsguard-mcp'
If you have feedback or need assistance with the MCP directory API, please join our Discord server
"""Semgrep scanner integration."""
import json
import logging
import subprocess
import tempfile
from pathlib import Path
from typing import Optional
from ..asvs.mapper import get_asvs_mapper
from ..config import settings
from ..models import Location, ScanResult, SeverityLevel
from .base import BaseScanner
logger = logging.getLogger(__name__)
class SemgrepScanner(BaseScanner):
"""Security scanner using Semgrep."""
# Language support
SUPPORTED_LANGUAGES = {
"python": "py",
"py": "py",
"javascript": "js",
"js": "js",
"typescript": "ts",
"ts": "ts",
"java": "java",
"go": "go",
"ruby": "rb",
"php": "php",
"c": "c",
"cpp": "cpp",
"csharp": "cs",
"rust": "rs",
}
# Map Semgrep severity to our severity levels
SEVERITY_MAP = {
"ERROR": SeverityLevel.HIGH,
"WARNING": SeverityLevel.MEDIUM,
"INFO": SeverityLevel.LOW,
}
def __init__(self, rules_dir: Optional[Path] = None) -> None:
"""
Initialize Semgrep scanner.
Args:
rules_dir: Directory containing custom Semgrep rules
"""
self.rules_dir = rules_dir or settings.rules_path
self.mapper = get_asvs_mapper()
async def scan(
self,
code: str,
language: str,
filename: Optional[str] = None,
) -> list[ScanResult]:
"""
Scan code using Semgrep.
Args:
code: Source code to scan
language: Programming language
filename: Optional filename for context
Returns:
List of security findings
"""
if not self.supports_language(language):
logger.warning(f"Language not supported by Semgrep: {language}")
return []
# Determine file extension
extension = self._get_file_extension(language)
if not filename:
filename = f"input.{extension}"
# Create temporary file with the code
with tempfile.TemporaryDirectory() as tmpdir:
tmpdir_path = Path(tmpdir)
code_file = tmpdir_path / filename
# Create parent directories if filename has path components
code_file.parent.mkdir(parents=True, exist_ok=True)
code_file.write_text(code, encoding="utf-8")
# Run Semgrep
results = await self._run_semgrep(code_file, tmpdir_path)
return results
def supports_language(self, language: str) -> bool:
"""Check if Semgrep supports this language."""
return language.lower() in self.SUPPORTED_LANGUAGES
async def _run_semgrep(self, code_file: Path, working_dir: Path) -> list[ScanResult]:
"""
Run Semgrep on a file.
Args:
code_file: Path to code file
working_dir: Working directory
Returns:
List of scan results
"""
try:
# Build Semgrep command
cmd = [
"semgrep",
"--config=auto", # Use Semgrep Registry rules
"--json",
"--no-git-ignore",
"--timeout", str(settings.scan_timeout),
]
# Add custom rules if available
if self.rules_dir.exists():
cmd.extend(["--config", str(self.rules_dir)])
logger.debug(f"Using custom rules from: {self.rules_dir}")
cmd.append(str(code_file))
# Run Semgrep
logger.debug(f"Running: {' '.join(cmd)}")
result = subprocess.run(
cmd,
cwd=working_dir,
capture_output=True,
text=True,
timeout=settings.scan_timeout,
)
# Parse JSON output
if result.stdout:
output = json.loads(result.stdout)
return self._parse_semgrep_output(output, code_file.name)
else:
logger.warning("Semgrep returned no output")
return []
except subprocess.TimeoutExpired:
logger.error(f"Semgrep scan timed out after {settings.scan_timeout}s")
return []
except FileNotFoundError:
logger.error("Semgrep not found - please install: pip install semgrep")
return []
except json.JSONDecodeError as e:
logger.error(f"Failed to parse Semgrep output: {e}")
return []
except Exception as e:
logger.error(f"Semgrep scan failed: {e}", exc_info=True)
return []
def _parse_semgrep_output(self, output: dict, filename: str) -> list[ScanResult]:
"""
Parse Semgrep JSON output into ScanResult objects.
Args:
output: Semgrep JSON output
filename: Filename that was scanned
Returns:
List of scan results
"""
results = []
for finding in output.get("results", []):
try:
# Extract rule metadata
rule_id = finding.get("check_id", "")
message = finding.get("extra", {}).get("message", finding.get("message", ""))
severity = finding.get("extra", {}).get("severity", "WARNING")
# Extract location info
start = finding.get("start", {})
end = finding.get("end", {})
line = start.get("line")
column = start.get("col")
end_line = end.get("line")
# Extract code snippet
code_snippet = finding.get("extra", {}).get("lines", "")
# Extract metadata
metadata = finding.get("extra", {}).get("metadata", {})
vulnerability_type = metadata.get("vulnerability", rule_id.split(".")[-1])
cwe = metadata.get("cwe")
asvs_id = metadata.get("asvs_id")
# Map to ASVS requirements
asvs_requirements = []
if asvs_id:
asvs_requirements.append(asvs_id)
else:
# Try to map based on vulnerability type and CWE
asvs_requirements = self.mapper.map_finding_to_asvs(
vulnerability_type=vulnerability_type,
cwe=cwe,
)
# Build remediation message
remediation = metadata.get("remediation", "Review and fix this security issue.")
# Create scan result
result = ScanResult(
severity=self.SEVERITY_MAP.get(severity.upper(), SeverityLevel.MEDIUM),
vulnerability_type=vulnerability_type,
asvs_requirements=asvs_requirements,
location=Location(
line=line,
column=column,
end_line=end_line,
filename=filename,
code_snippet=code_snippet,
),
message=message,
remediation=remediation,
scanner="semgrep",
metadata={
"rule_id": rule_id,
"cwe": cwe,
},
)
results.append(result)
except Exception as e:
logger.error(f"Error parsing Semgrep finding: {e}", exc_info=True)
logger.debug(f"Finding data: {finding}")
continue
return results
def _get_file_extension(self, language: str) -> str:
"""Get file extension for a language."""
return self.SUPPORTED_LANGUAGES.get(language.lower(), "txt")