"""
Shared Utilities for Secret Scanner
Provides common functions used across modules
"""
import logging
import os
from collections import OrderedDict
from pathlib import Path
from typing import Any, Optional
# ==================== LOGGING SETUP ====================
def setup_logging(level: int = logging.INFO) -> logging.Logger:
"""
Setup logging for the application (call once at startup)
Args:
level: Logging level
Returns:
Root logger
"""
# Only configure if not already configured
if not logging.root.handlers:
logging.basicConfig(
level=level,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[logging.StreamHandler()] # stderr, not stdout (MCP compliance)
)
return logging.getLogger(__name__)
# ==================== STATISTICS ====================
def count_by_severity(findings: list, severity_key: str = 'severity') -> dict:
"""
Count findings by severity level
Args:
findings: List of Finding objects or dicts
severity_key: Key/attribute name for severity
Returns:
Dict with counts per severity
"""
counts = {'CRITICAL': 0, 'HIGH': 0, 'MEDIUM': 0, 'LOW': 0}
for finding in findings:
# Handle both objects and dicts
if hasattr(finding, 'is_false_positive'):
if finding.is_false_positive:
continue
severity = getattr(finding, severity_key, 'MEDIUM')
else:
if finding.get('is_false_positive', False):
continue
severity = finding.get(severity_key, 'MEDIUM')
counts[severity] = counts.get(severity, 0) + 1
return counts
def count_by_category(findings: list, category_key: str = 'category') -> dict:
"""
Count findings by category
Args:
findings: List of Finding objects or dicts
category_key: Key/attribute name for category
Returns:
Dict with counts per category
"""
counts = {}
for finding in findings:
# Handle both objects and dicts
if hasattr(finding, 'is_false_positive'):
if finding.is_false_positive:
continue
category = getattr(finding, category_key, 'unknown')
else:
if finding.get('is_false_positive', False):
continue
category = finding.get(category_key, 'unknown')
counts[category] = counts.get(category, 0) + 1
return counts
# ==================== PATH VALIDATION ====================
def validate_path(path: str, must_exist: bool = True, allow_relative: bool = False) -> tuple[bool, str]:
"""
Validate a file/directory path for security
Args:
path: Path to validate
must_exist: Check if path exists
allow_relative: Allow relative paths
Returns:
Tuple of (is_valid, error_message)
"""
if not path:
return False, "Path cannot be empty"
# Convert to Path object
p = Path(path)
# Check for path traversal attempts
try:
resolved = p.resolve()
# Check if path tries to escape (e.g., ../../etc/passwd)
if not allow_relative:
if '..' in str(path):
return False, "Path traversal not allowed"
except (ValueError, OSError) as e:
return False, f"Invalid path: {e}"
# Check existence if required
if must_exist and not p.exists():
return False, f"Path does not exist: {path}"
return True, ""
def is_safe_path(path: str, base_dir: Optional[str] = None) -> bool:
"""
Check if a path is safe (no traversal attacks)
Args:
path: Path to check
base_dir: Optional base directory to restrict to
Returns:
True if safe
"""
try:
resolved = Path(path).resolve()
if base_dir:
base_resolved = Path(base_dir).resolve()
return str(resolved).startswith(str(base_resolved))
return True
except (ValueError, OSError):
return False
# ==================== BOUNDED CACHE ====================
class BoundedCache(OrderedDict):
"""
A dict with a maximum size (LRU eviction)
Usage:
cache = BoundedCache(max_size=100)
cache['key'] = 'value'
"""
def __init__(self, max_size: int = 100, *args, **kwargs):
self.max_size = max_size
super().__init__(*args, **kwargs)
def __setitem__(self, key: Any, value: Any):
# Move to end if exists
if key in self:
self.move_to_end(key)
super().__setitem__(key, value)
# Evict oldest if over limit
while len(self) > self.max_size:
oldest = next(iter(self))
del self[oldest]
def get_stats(self) -> dict:
"""Get cache statistics"""
return {
'size': len(self),
'max_size': self.max_size,
'utilization': len(self) / self.max_size if self.max_size > 0 else 0
}
# ==================== SECRET MASKING ====================
def mask_secret(value: str, visible_chars: int = 4) -> str:
"""
Mask a secret value for safe display
Args:
value: Secret value to mask
visible_chars: Number of chars to show at start/end
Returns:
Masked string
"""
if not value:
return "****"
if len(value) <= visible_chars * 2:
return value[:2] + "****" + value[-2:] if len(value) > 4 else "****"
return value[:visible_chars] + "****" + value[-visible_chars:]
# ==================== FILE TYPE DETECTION ====================
SCANNABLE_EXTENSIONS = {
'.py', '.js', '.ts', '.jsx', '.tsx', '.mjs', '.cjs',
'.json', '.yml', '.yaml', '.toml', '.ini', '.cfg', '.conf',
'.env', '.env.local', '.env.development', '.env.production',
'.xml', '.html', '.htm', '.vue', '.svelte',
'.sh', '.bash', '.zsh', '.fish', '.ps1', '.bat', '.cmd',
'.rb', '.php', '.go', '.java', '.kt', '.swift', '.rs', '.c', '.cpp', '.h',
'.md', '.txt', '.csv', '.sql', '.graphql',
'.dockerfile', '.tf', '.tfvars', # Infrastructure
}
BINARY_EXTENSIONS = {
'.exe', '.dll', '.so', '.dylib', '.bin',
'.jpg', '.jpeg', '.png', '.gif', '.bmp', '.ico', '.svg',
'.pdf', '.doc', '.docx', '.xls', '.xlsx',
'.zip', '.tar', '.gz', '.rar', '.7z',
'.mp3', '.mp4', '.avi', '.mov', '.wav',
'.pyc', '.pyo', '.class', '.o', '.a',
}
def is_scannable_file(path: str) -> bool:
"""
Check if a file should be scanned based on extension
Args:
path: File path
Returns:
True if file should be scanned
"""
ext = Path(path).suffix.lower()
# Explicit binary = no
if ext in BINARY_EXTENSIONS:
return False
# Explicit text = yes
if ext in SCANNABLE_EXTENSIONS:
return True
# No extension or unknown = try to scan
return True