"""
Core Secret Scanner Engine
Context-aware secret detection with pattern matching and entropy analysis
"""
import logging
import os
import re
import zipfile
from dataclasses import dataclass, field
from pathlib import Path
from typing import Optional
from .entropy import analyze_string_randomness, calculate_shannon_entropy
from .patterns import SECRET_PATTERNS
from .utils import count_by_severity, count_by_category, is_scannable_file
from .formats import scan_file_dynamic, classify_format, FormatInfo, ScanProfile
# Use module-level logger (configured in __init__.py)
logger = logging.getLogger(__name__)
@dataclass
class Finding:
"""A secret detection finding"""
type: str
value: str
file_path: str
line_number: int
line_content: str
severity: str
category: str
description: str
entropy: float = 0.0
confidence: float = 0.0
is_false_positive: bool = False
context: dict = field(default_factory=dict)
class SecretScanner:
"""Main secret scanner with context-aware detection"""
# Files/dirs to ignore by default
IGNORE_PATTERNS = [
r'\.git/',
r'\.venv/',
r'\.env/',
r'venv/',
r'node_modules/',
r'__pycache__/',
r'\.pytest_cache/',
r'\.idea/',
r'\.vscode/',
r'dist/',
r'build/',
r'\.egg-info/',
r'\.min\.js$',
r'\.min\.css$',
]
# Context patterns that indicate false positives
FALSE_POSITIVE_CONTEXTS = [
# Test/example values
r'test[_-]?(?:key|token|secret|password)',
r'example[_-]?(?:key|token|secret)',
r'demo[_-]?(?:key|token|secret)',
r'sample[_-]?(?:key|token|secret)',
r'fake[_-]?(?:key|token|secret)',
r'mock[_-]?(?:key|token|secret)',
r'dummy[_-]?(?:key|token|secret)',
r'placeholder',
r'your[_-]?(?:key|token|here|secret)',
# Template/placeholder patterns
r'<[A-Z_]+>', # Placeholder like <API_KEY>
r'\$\{[A-Z_]+\}', # Template variable like ${API_KEY}
r'\{\{[A-Z_]+\}\}', # Mustache template
r'%[A-Z_]+%', # Windows env var style
# Environment variable references
r'process\.env\.', # Node.js env
r'os\.environ', # Python env
r'getenv\(', # C/Go getenv
r'ENV\[', # Ruby env
# Common false positive strings
r'xxxx+', # Redacted
r'\*\*\*+', # Starred out
r'your[_-]?api',
r'insert[_-]?(?:key|token|here)',
r'replace[_-]?(?:with|this)',
# Example file patterns
r'\.example',
r'\.sample',
r'\.template',
r'config\.example',
# Inline allowlist (like detect-secrets)
r'pragma:\s*allowlist',
r'nosec',
r'noqa',
]
def __init__(self, entropy_threshold: float = 4.5):
"""
Initialize scanner
Args:
entropy_threshold: Minimum entropy for detection
"""
self.entropy_threshold = entropy_threshold
self.patterns = SECRET_PATTERNS
logger.info(f"Scanner initialized with {len(self.patterns)} patterns")
def should_ignore_path(self, path: str) -> bool:
"""Check if path should be ignored"""
# Normalize path separators for cross-platform regex matching
normalized_path = path.replace('\\', '/')
for pattern in self.IGNORE_PATTERNS:
if re.search(pattern, normalized_path):
return True
return False
def is_likely_false_positive(self, value: str, context: str) -> bool:
"""
Check if finding is likely a false positive based on context
Args:
value: The detected secret value
context: Surrounding code context
Returns:
True if likely false positive
"""
combined = f"{value} {context}".lower()
for pattern in self.FALSE_POSITIVE_CONTEXTS:
if re.search(pattern, combined, re.IGNORECASE):
logger.debug(f"False positive detected: {pattern}")
return True
return False
# max line length to scan (longer lines are chunked)
MAX_LINE_LENGTH = 10000
def scan_content(self, content: str, file_path: str = "content") -> list[Finding]:
"""
Scan text content for secrets
Args:
content: Text content to scan
file_path: Virtual file path for reporting
Returns:
List of findings
"""
findings = []
lines = content.split('\n')
for line_num, line in enumerate(lines, 1):
# handle very long lines by chunking to avoid regex timeout
if len(line) > self.MAX_LINE_LENGTH:
# scan in overlapping chunks
chunk_findings = self._scan_long_line(line, line_num, file_path, lines)
findings.extend(chunk_findings)
continue
# Check each pattern
for pattern_info in self.patterns:
matches = pattern_info.pattern.finditer(line)
for match in matches:
secret_value = match.group()
# Calculate entropy
entropy = calculate_shannon_entropy(secret_value)
# Get context (3 lines before and after)
context_start = max(0, line_num - 4)
context_end = min(len(lines), line_num + 3)
context_lines = lines[context_start:context_end]
context_str = '\n'.join(context_lines)
# Check if false positive
is_fp = self.is_likely_false_positive(secret_value, context_str)
# Analyze randomness
analysis = analyze_string_randomness(secret_value)
finding = Finding(
type=pattern_info.name,
value=secret_value,
file_path=file_path,
line_number=line_num,
line_content=line.strip(),
severity=pattern_info.severity,
category=pattern_info.category,
description=pattern_info.description,
entropy=round(entropy, 2),
confidence=analysis['confidence'],
is_false_positive=is_fp,
context={
'entropy_category': analysis['category'],
'has_repeating_chars': analysis['has_repeating_chars'],
'has_common_words': analysis['has_common_words'],
}
)
findings.append(finding)
logger.debug(f"Found {finding.type} at line {line_num}")
return findings
def _scan_long_line(
self,
line: str,
line_num: int,
file_path: str,
all_lines: list[str],
) -> list[Finding]:
"""
Scan a very long line by breaking it into overlapping chunks.
This prevents regex timeout on lines > MAX_LINE_LENGTH.
"""
findings = []
chunk_size = self.MAX_LINE_LENGTH
overlap = 200 # overlap to catch secrets at chunk boundaries
seen_values = set() # avoid duplicate findings
pos = 0
while pos < len(line):
# get chunk with overlap
chunk_end = min(pos + chunk_size, len(line))
chunk = line[pos:chunk_end]
# scan chunk
for pattern_info in self.patterns:
matches = pattern_info.pattern.finditer(chunk)
for match in matches:
secret_value = match.group()
# skip duplicates
if secret_value in seen_values:
continue
seen_values.add(secret_value)
entropy = calculate_shannon_entropy(secret_value)
# get context
context_start = max(0, line_num - 4)
context_end = min(len(all_lines), line_num + 3)
context_lines = all_lines[context_start:context_end]
context_str = '\n'.join(context_lines)
is_fp = self.is_likely_false_positive(secret_value, context_str)
analysis = analyze_string_randomness(secret_value)
finding = Finding(
type=pattern_info.name,
value=secret_value,
file_path=file_path,
line_number=line_num,
line_content=f"[long line: {len(line)} chars]",
severity=pattern_info.severity,
category=pattern_info.category,
description=pattern_info.description,
entropy=round(entropy, 2),
confidence=analysis['confidence'],
is_false_positive=is_fp,
context={
'entropy_category': analysis['category'],
'has_repeating_chars': analysis['has_repeating_chars'],
'has_common_words': analysis['has_common_words'],
'chunk_position': pos,
}
)
findings.append(finding)
# move to next chunk (with overlap)
pos += chunk_size - overlap
if pos >= len(line) - overlap:
break
return findings
def scan_file(
self,
file_path: str,
format_hint: Optional[str] = None,
profile: ScanProfile = "balanced",
use_dynamic: bool = True,
) -> list[Finding]:
"""
Scan a single file for secrets using dynamic format detection
Args:
file_path: Path to file to scan
format_hint: Optional format hint (e.g. "text:json", "archive:zip")
profile: Scan profile - "fast", "balanced", or "deep"
use_dynamic: Use dynamic format handler (default True)
Returns:
List of findings
"""
path = Path(file_path)
if not path.exists():
logger.error(f"File not found: {file_path}")
return []
if not path.is_file():
logger.error(f"Not a file: {file_path}")
return []
if self.should_ignore_path(str(path)):
logger.debug(f"Ignoring path: {file_path}")
return []
# use dynamic format handler
if use_dynamic:
logger.info(f"Scanning file (dynamic): {file_path}")
findings = scan_file_dynamic(
path=str(path),
scanner=self,
format_hint=format_hint,
profile=profile,
)
logger.info(f"Found {len(findings)} potential secrets in {file_path}")
return findings
# fallback: legacy text-only scan
try:
with open(path, 'r', encoding='utf-8', errors='ignore') as f:
content = f.read()
logger.info(f"Scanning file: {file_path}")
findings = self.scan_content(content, str(path))
logger.info(f"Found {len(findings)} potential secrets in {file_path}")
return findings
except Exception as e:
logger.error(f"Error reading {file_path}: {e}")
return []
def scan_directory(
self,
directory_path: str,
exclude_patterns: Optional[list[str]] = None,
max_depth: Optional[int] = None,
profile: ScanProfile = "balanced",
) -> dict:
"""
Recursively scan directory for secrets
Args:
directory_path: Path to directory
exclude_patterns: Additional patterns to exclude
max_depth: Maximum directory depth
profile: Scan profile - "fast", "balanced", or "deep"
Returns:
Dict with findings and statistics
"""
path = Path(directory_path)
if not path.exists():
logger.error(f"Directory not found: {directory_path}")
return {'findings': [], 'stats': {}}
if not path.is_dir():
logger.error(f"Not a directory: {directory_path}")
return {'findings': [], 'stats': {}}
logger.info(f"Scanning directory: {directory_path}")
all_findings = []
files_scanned = 0
files_skipped = 0
# Walk directory
for root, dirs, files in os.walk(path):
# Check depth
if max_depth is not None:
depth = len(Path(root).relative_to(path).parts)
if depth > max_depth:
continue
# Remove ignored directories from search
dirs[:] = [d for d in dirs if not self.should_ignore_path(os.path.join(root, d))]
for file in files:
file_path = os.path.join(root, file)
if self.should_ignore_path(file_path):
files_skipped += 1
continue
# Additional exclusions
if exclude_patterns:
if any(re.search(p, file_path) for p in exclude_patterns):
files_skipped += 1
continue
# use dynamic format handler for all files
findings = self.scan_file(file_path, profile=profile)
all_findings.extend(findings)
files_scanned += 1
stats = {
'files_scanned': files_scanned,
'files_skipped': files_skipped,
'total_findings': len(all_findings),
'findings_by_severity': self._count_by_severity(all_findings),
'findings_by_category': self._count_by_category(all_findings),
}
logger.info(f"Scan complete: {files_scanned} files, {len(all_findings)} findings")
return {
'findings': all_findings,
'stats': stats
}
def _scan_zip_file(
self,
zip_path: str,
max_members: int = 1000,
max_member_size: int = 1024 * 1024,
) -> list[Finding]:
"""scan text files inside a zip archive"""
findings: list[Finding] = []
try:
with zipfile.ZipFile(zip_path, "r") as zf:
for idx, info in enumerate(zf.infolist()):
if idx >= max_members:
break
if info.is_dir():
continue
if info.file_size > max_member_size:
continue
if not is_scannable_file(info.filename):
continue
try:
with zf.open(info, "r") as fp:
data = fp.read()
except Exception as e:
logger.debug(f"skipping {info.filename} in {zip_path}: {e}")
continue
try:
text = data.decode("utf-8", errors="ignore")
except Exception:
continue
virtual_path = f"{zip_path}:{info.filename}"
findings.extend(self.scan_content(text, virtual_path))
except Exception as e:
logger.error(f"error scanning zip archive {zip_path}: {e}")
return findings
def _count_by_severity(self, findings: list[Finding]) -> dict:
"""Count findings by severity (uses shared utility)"""
return count_by_severity(findings)
def _count_by_category(self, findings: list[Finding]) -> dict:
"""Count findings by category (uses shared utility)"""
return count_by_category(findings)