"""
dynamic format handler for credential-free mcp scanner
classifies files by format and routes to appropriate handlers
"""
from __future__ import annotations
import gzip
import io
import json
import logging
import tarfile
import zipfile
from dataclasses import dataclass, field
from pathlib import Path
from typing import TYPE_CHECKING, Any, Callable, Literal, Optional
# optional dependencies
try:
import yaml
YAML_AVAILABLE = True
except ImportError:
YAML_AVAILABLE = False
try:
import tree_sitter
import tree_sitter_python
import tree_sitter_javascript
TREE_SITTER_AVAILABLE = True
except ImportError:
TREE_SITTER_AVAILABLE = False
if TYPE_CHECKING:
from .scanner import Finding, SecretScanner
logger = logging.getLogger(__name__)
# scan profiles
ScanProfile = Literal["fast", "balanced", "deep"]
# ==================== FORMAT DEFINITIONS ====================
@dataclass
class FormatInfo:
"""metadata about a detected format"""
format_id: str # e.g. "text:plain", "text:json", "archive:zip"
category: str # "text", "archive", "binary"
subtype: str # "plain", "json", "yaml", "zip", etc.
confidence: float = 1.0 # 0.0-1.0 how sure we are
metadata: dict = field(default_factory=dict)
@dataclass
class FileContext:
"""context passed to format handlers"""
path: str
virtual_path: str # for archives: "archive.zip:inner/file.js"
raw_bytes: bytes
format_info: FormatInfo
scanner: "SecretScanner"
# scan profile
profile: ScanProfile = "balanced"
# limits
max_file_size: int = 5 * 1024 * 1024 # 5mb default
max_archive_members: int = 1000
max_member_size: int = 1024 * 1024 # 1mb per archive member
# ==================== FORMAT CLASSIFIER ====================
# extension to format mapping
EXTENSION_MAP: dict[str, str] = {
# text: plain
".txt": "text:plain",
".log": "text:plain",
".md": "text:plain",
".csv": "text:plain",
".sql": "text:plain",
".graphql": "text:plain",
# text: json
".json": "text:json",
# text: yaml
".yaml": "text:yaml",
".yml": "text:yaml",
# text: config
".env": "text:env",
".ini": "text:ini",
".cfg": "text:ini",
".conf": "text:ini",
".toml": "text:toml",
# code (treat as text with language hint)
".py": "code:python",
".js": "code:javascript",
".ts": "code:typescript",
".jsx": "code:javascript",
".tsx": "code:typescript",
".mjs": "code:javascript",
".cjs": "code:javascript",
".go": "code:go",
".rs": "code:rust",
".rb": "code:ruby",
".php": "code:php",
".java": "code:java",
".kt": "code:kotlin",
".swift": "code:swift",
".c": "code:c",
".cpp": "code:cpp",
".h": "code:c",
".sh": "code:shell",
".bash": "code:shell",
".zsh": "code:shell",
".ps1": "code:powershell",
".bat": "code:batch",
".cmd": "code:batch",
# markup
".xml": "text:xml",
".html": "text:html",
".htm": "text:html",
".vue": "code:vue",
".svelte": "code:svelte",
# infra
".tf": "code:terraform",
".tfvars": "text:tfvars",
".dockerfile": "code:dockerfile",
# archives
".zip": "archive:zip",
".tar": "archive:tar",
".gz": "archive:gzip",
".tgz": "archive:targz",
".tar.gz": "archive:targz",
# binary (skip)
".exe": "binary:executable",
".dll": "binary:library",
".so": "binary:library",
".dylib": "binary:library",
".bin": "binary:data",
".pyc": "binary:bytecode",
".pyo": "binary:bytecode",
".class": "binary:bytecode",
".o": "binary:object",
".a": "binary:archive",
# media (skip)
".jpg": "binary:image",
".jpeg": "binary:image",
".png": "binary:image",
".gif": "binary:image",
".bmp": "binary:image",
".ico": "binary:image",
".svg": "text:svg",
".pdf": "binary:document",
".doc": "binary:document",
".docx": "binary:document",
".xls": "binary:document",
".xlsx": "binary:document",
".mp3": "binary:audio",
".mp4": "binary:video",
".avi": "binary:video",
".mov": "binary:video",
".wav": "binary:audio",
".rar": "archive:rar",
".7z": "archive:7z",
}
# magic bytes for content sniffing
MAGIC_BYTES: list[tuple[bytes, str]] = [
(b"PK\x03\x04", "archive:zip"),
(b"\x1f\x8b", "archive:gzip"),
(b"Rar!", "archive:rar"),
(b"7z\xbc\xaf", "archive:7z"),
(b"\x89PNG", "binary:image"),
(b"\xff\xd8\xff", "binary:image"),
(b"GIF8", "binary:image"),
(b"%PDF", "binary:document"),
]
def classify_format(
path: str,
first_bytes: Optional[bytes] = None,
hint: Optional[str] = None,
) -> FormatInfo:
"""
classify file format based on extension, content, and optional hint
args:
path: file path (uses extension)
first_bytes: optional first bytes for content sniffing
hint: optional format hint to override detection
returns:
FormatInfo with detected format
"""
# if caller provides hint, use it
if hint:
cat, sub = _parse_format_id(hint)
return FormatInfo(format_id=hint, category=cat, subtype=sub, confidence=1.0)
p = Path(path)
ext = p.suffix.lower()
# check for double extensions like .tar.gz
if len(p.suffixes) >= 2:
double_ext = "".join(p.suffixes[-2:]).lower()
if double_ext in EXTENSION_MAP:
format_id = EXTENSION_MAP[double_ext]
cat, sub = _parse_format_id(format_id)
return FormatInfo(format_id=format_id, category=cat, subtype=sub, confidence=0.9)
# check magic bytes if available
if first_bytes:
for magic, format_id in MAGIC_BYTES:
if first_bytes.startswith(magic):
cat, sub = _parse_format_id(format_id)
return FormatInfo(format_id=format_id, category=cat, subtype=sub, confidence=0.95)
# check extension map
if ext in EXTENSION_MAP:
format_id = EXTENSION_MAP[ext]
cat, sub = _parse_format_id(format_id)
return FormatInfo(format_id=format_id, category=cat, subtype=sub, confidence=0.8)
# content sniff for text vs binary
if first_bytes:
if _looks_like_text(first_bytes):
# try to detect json/yaml from content
if _looks_like_json(first_bytes):
return FormatInfo(format_id="text:json", category="text", subtype="json", confidence=0.7)
if _looks_like_yaml(first_bytes):
return FormatInfo(format_id="text:yaml", category="text", subtype="yaml", confidence=0.6)
return FormatInfo(format_id="text:plain", category="text", subtype="plain", confidence=0.5)
else:
return FormatInfo(format_id="binary:unknown", category="binary", subtype="unknown", confidence=0.5)
# default: assume text
return FormatInfo(format_id="text:plain", category="text", subtype="plain", confidence=0.3)
def _parse_format_id(format_id: str) -> tuple[str, str]:
"""parse format_id into (category, subtype)"""
if ":" in format_id:
parts = format_id.split(":", 1)
return parts[0], parts[1]
return format_id, "unknown"
def _looks_like_text(data: bytes) -> bool:
"""check if bytes look like text content"""
if not data:
return True
# check for high ratio of printable ascii + common utf-8
try:
sample = data[:1024]
text = sample.decode("utf-8", errors="strict")
# if decode succeeds, likely text
return True
except UnicodeDecodeError:
pass
# fallback: check null bytes
null_count = data[:1024].count(b"\x00")
return null_count < len(data[:1024]) * 0.1
def _looks_like_json(data: bytes) -> bool:
"""check if content looks like json"""
try:
text = data[:256].decode("utf-8", errors="ignore").strip()
return text.startswith("{") or text.startswith("[")
except Exception:
return False
def _looks_like_yaml(data: bytes) -> bool:
"""check if content looks like yaml"""
try:
text = data[:256].decode("utf-8", errors="ignore").strip()
# yaml often starts with --- or key:
if text.startswith("---"):
return True
lines = text.split("\n")[:5]
for line in lines:
if ":" in line and not line.strip().startswith("#"):
return True
return False
except Exception:
return False
# ==================== FORMAT HANDLERS ====================
# handler type: takes FileContext, returns list of findings
FormatHandler = Callable[["FileContext"], list["Finding"]]
# handler registry
_handlers: dict[str, FormatHandler] = {}
def register_handler(format_pattern: str):
"""decorator to register a format handler"""
def decorator(func: FormatHandler) -> FormatHandler:
_handlers[format_pattern] = func
return func
return decorator
def get_handler(format_id: str) -> Optional[FormatHandler]:
"""get handler for a format, with fallback to category handler"""
# exact match
if format_id in _handlers:
return _handlers[format_id]
# category match (e.g. "text:*" matches "text:json")
cat, _ = _parse_format_id(format_id)
category_key = f"{cat}:*"
if category_key in _handlers:
return _handlers[category_key]
return None
# ==================== BUILT-IN HANDLERS ====================
@register_handler("text:*")
def handle_text(ctx: FileContext) -> list["Finding"]:
"""handle plain text files"""
try:
text = ctx.raw_bytes.decode("utf-8", errors="ignore")
return ctx.scanner.scan_content(text, ctx.virtual_path)
except Exception as e:
logger.error(f"error handling text file {ctx.path}: {e}")
return []
@register_handler("code:*")
def handle_code(ctx: FileContext) -> list["Finding"]:
"""handle code files with optional language-aware scanning"""
try:
text = ctx.raw_bytes.decode("utf-8", errors="ignore")
findings = ctx.scanner.scan_content(text, ctx.virtual_path)
# deep profile: use tree-sitter for language-aware scanning
if ctx.profile == "deep" and TREE_SITTER_AVAILABLE:
lang = ctx.format_info.subtype
if lang in ("python", "javascript", "typescript"):
ast_findings = _scan_code_ast(text, ctx.virtual_path, ctx.scanner, lang)
findings.extend(ast_findings)
return findings
except Exception as e:
logger.error(f"error handling code file {ctx.path}: {e}")
return []
# tree-sitter parsers (lazy init)
_ts_parsers: dict[str, Any] = {}
def _get_ts_parser(lang: str) -> Optional[Any]:
"""get or create tree-sitter parser for language"""
if not TREE_SITTER_AVAILABLE:
return None
if lang in _ts_parsers:
return _ts_parsers[lang]
try:
parser = tree_sitter.Parser()
if lang == "python":
parser.language = tree_sitter.Language(tree_sitter_python.language())
elif lang in ("javascript", "typescript"):
parser.language = tree_sitter.Language(tree_sitter_javascript.language())
else:
return None
_ts_parsers[lang] = parser
return parser
except Exception as e:
logger.debug(f"failed to init tree-sitter for {lang}: {e}")
return None
def _scan_code_ast(
code: str,
file_path: str,
scanner: "SecretScanner",
lang: str,
) -> list["Finding"]:
"""scan code using tree-sitter ast to find strings in sensitive contexts"""
findings: list["Finding"] = []
parser = _get_ts_parser(lang)
if not parser:
return findings
try:
tree = parser.parse(code.encode("utf-8"))
root = tree.root_node
# find string literals in assignments that look like secrets
sensitive_patterns = [
"api_key", "apikey", "secret", "password", "token",
"credential", "auth", "private_key", "access_key",
]
def visit(node: Any, context: str = ""):
# track assignment context
if node.type in ("assignment", "variable_declarator", "pair"):
# get the left side (variable name)
for child in node.children:
if child.type in ("identifier", "property_identifier"):
context = child.text.decode("utf-8", errors="ignore").lower()
break
# check string literals
if node.type in ("string", "string_literal", "template_string"):
value = node.text.decode("utf-8", errors="ignore")
# strip quotes
if len(value) >= 2 and value[0] in '"\'' and value[-1] in '"\'':
value = value[1:-1]
# if in sensitive context, scan the value
if any(p in context for p in sensitive_patterns) and len(value) > 8:
line = code[:node.start_byte].count("\n") + 1
virtual_path = f"{file_path}:ast:{context}:L{line}"
findings.extend(scanner.scan_content(value, virtual_path))
# recurse
for child in node.children:
visit(child, context)
visit(root)
except Exception as e:
logger.debug(f"tree-sitter scan failed for {file_path}: {e}")
return findings
@register_handler("text:json")
def handle_json(ctx: FileContext) -> list["Finding"]:
"""handle json files with structure-aware scanning"""
findings: list["Finding"] = []
try:
text = ctx.raw_bytes.decode("utf-8", errors="ignore")
# first do normal text scan
findings.extend(ctx.scanner.scan_content(text, ctx.virtual_path))
# then do structure-aware scan on string values
try:
data = json.loads(text)
findings.extend(_scan_json_structure(data, ctx.virtual_path, ctx.scanner))
except json.JSONDecodeError:
pass # already scanned as text
except Exception as e:
logger.error(f"error handling json file {ctx.path}: {e}")
return findings
def _scan_json_structure(
data: Any,
base_path: str,
scanner: "SecretScanner",
current_path: str = "",
depth: int = 0,
max_depth: int = 20,
) -> list["Finding"]:
"""recursively scan json structure for secrets in string values"""
if depth > max_depth:
return []
findings: list["Finding"] = []
if isinstance(data, dict):
for key, value in data.items():
key_path = f"{current_path}.{key}" if current_path else key
if isinstance(value, str) and len(value) > 8:
# scan the value with path context
virtual_path = f"{base_path}:{key_path}"
findings.extend(scanner.scan_content(value, virtual_path))
elif isinstance(value, (dict, list)):
findings.extend(_scan_json_structure(value, base_path, scanner, key_path, depth + 1, max_depth))
elif isinstance(data, list):
for idx, item in enumerate(data):
item_path = f"{current_path}[{idx}]"
if isinstance(item, str) and len(item) > 8:
virtual_path = f"{base_path}:{item_path}"
findings.extend(scanner.scan_content(item, virtual_path))
elif isinstance(item, (dict, list)):
findings.extend(_scan_json_structure(item, base_path, scanner, item_path, depth + 1, max_depth))
return findings
@register_handler("text:yaml")
def handle_yaml(ctx: FileContext) -> list["Finding"]:
"""handle yaml files with structure-aware scanning"""
findings: list["Finding"] = []
try:
text = ctx.raw_bytes.decode("utf-8", errors="ignore")
# always do text scan first
findings.extend(ctx.scanner.scan_content(text, ctx.virtual_path))
# structure-aware scan if yaml available and not in fast mode
if YAML_AVAILABLE and ctx.profile != "fast":
try:
data = yaml.safe_load(text)
if isinstance(data, (dict, list)):
findings.extend(_scan_yaml_structure(data, ctx.virtual_path, ctx.scanner))
except yaml.YAMLError:
pass # already scanned as text
except Exception as e:
logger.error(f"error handling yaml file {ctx.path}: {e}")
return findings
def _scan_yaml_structure(
data: Any,
base_path: str,
scanner: "SecretScanner",
current_path: str = "",
depth: int = 0,
max_depth: int = 20,
) -> list["Finding"]:
"""recursively scan yaml structure for secrets in string values"""
if depth > max_depth:
return []
findings: list["Finding"] = []
if isinstance(data, dict):
for key, value in data.items():
key_path = f"{current_path}.{key}" if current_path else str(key)
if isinstance(value, str) and len(value) > 8:
virtual_path = f"{base_path}:{key_path}"
findings.extend(scanner.scan_content(value, virtual_path))
elif isinstance(value, (dict, list)):
findings.extend(_scan_yaml_structure(value, base_path, scanner, key_path, depth + 1, max_depth))
elif isinstance(data, list):
for idx, item in enumerate(data):
item_path = f"{current_path}[{idx}]"
if isinstance(item, str) and len(item) > 8:
virtual_path = f"{base_path}:{item_path}"
findings.extend(scanner.scan_content(item, virtual_path))
elif isinstance(item, (dict, list)):
findings.extend(_scan_yaml_structure(item, base_path, scanner, item_path, depth + 1, max_depth))
return findings
@register_handler("text:env")
def handle_env(ctx: FileContext) -> list["Finding"]:
"""handle .env files with key=value awareness"""
try:
text = ctx.raw_bytes.decode("utf-8", errors="ignore")
# normal text scan catches most patterns
return ctx.scanner.scan_content(text, ctx.virtual_path)
except Exception as e:
logger.error(f"error handling env file {ctx.path}: {e}")
return []
@register_handler("archive:zip")
def handle_zip(ctx: FileContext) -> list["Finding"]:
"""handle zip archives by scanning text files inside"""
from .utils import is_scannable_file
findings: list["Finding"] = []
try:
with zipfile.ZipFile(ctx.path, "r") as zf:
for idx, info in enumerate(zf.infolist()):
if idx >= ctx.max_archive_members:
logger.warning(f"zip archive {ctx.path} has too many members, stopping at {idx}")
break
if info.is_dir():
continue
if info.file_size > ctx.max_member_size:
logger.debug(f"skipping large file {info.filename} in {ctx.path}")
continue
if not is_scannable_file(info.filename):
continue
try:
with zf.open(info, "r") as fp:
member_bytes = fp.read()
except Exception as e:
logger.debug(f"skipping {info.filename} in {ctx.path}: {e}")
continue
# classify and handle the member
member_format = classify_format(info.filename, member_bytes[:512])
virtual_path = f"{ctx.path}:{info.filename}"
member_ctx = FileContext(
path=ctx.path,
virtual_path=virtual_path,
raw_bytes=member_bytes,
format_info=member_format,
scanner=ctx.scanner,
max_file_size=ctx.max_file_size,
max_archive_members=ctx.max_archive_members,
max_member_size=ctx.max_member_size,
)
handler = get_handler(member_format.format_id)
if handler and member_format.category != "archive": # no nested archives
findings.extend(handler(member_ctx))
elif member_format.category == "text" or member_format.category == "code":
findings.extend(handle_text(member_ctx))
except zipfile.BadZipFile:
logger.error(f"invalid zip file: {ctx.path}")
except Exception as e:
logger.error(f"error handling zip archive {ctx.path}: {e}")
return findings
@register_handler("archive:tar")
def handle_tar(ctx: FileContext) -> list["Finding"]:
"""handle tar archives by scanning text files inside"""
from .utils import is_scannable_file
findings: list["Finding"] = []
try:
with tarfile.open(fileobj=io.BytesIO(ctx.raw_bytes), mode="r:*") as tf:
for idx, member in enumerate(tf.getmembers()):
if idx >= ctx.max_archive_members:
logger.warning(f"tar archive {ctx.path} has too many members, stopping at {idx}")
break
if not member.isfile():
continue
if member.size > ctx.max_member_size:
logger.debug(f"skipping large file {member.name} in {ctx.path}")
continue
if not is_scannable_file(member.name):
continue
try:
fp = tf.extractfile(member)
if fp is None:
continue
member_bytes = fp.read()
except Exception as e:
logger.debug(f"skipping {member.name} in {ctx.path}: {e}")
continue
member_format = classify_format(member.name, member_bytes[:512])
virtual_path = f"{ctx.path}:{member.name}"
member_ctx = FileContext(
path=ctx.path,
virtual_path=virtual_path,
raw_bytes=member_bytes,
format_info=member_format,
scanner=ctx.scanner,
profile=ctx.profile,
max_file_size=ctx.max_file_size,
max_archive_members=ctx.max_archive_members,
max_member_size=ctx.max_member_size,
)
handler = get_handler(member_format.format_id)
if handler and member_format.category != "archive":
findings.extend(handler(member_ctx))
elif member_format.category in ("text", "code"):
findings.extend(handle_text(member_ctx))
except tarfile.TarError as e:
logger.error(f"invalid tar file {ctx.path}: {e}")
except Exception as e:
logger.error(f"error handling tar archive {ctx.path}: {e}")
return findings
@register_handler("archive:gzip")
def handle_gzip(ctx: FileContext) -> list["Finding"]:
"""handle gzip files by decompressing and scanning content"""
findings: list["Finding"] = []
try:
decompressed = gzip.decompress(ctx.raw_bytes)
# check if it's a tar inside (tar.gz)
if decompressed[:5] == b"ustar" or (len(decompressed) > 257 and decompressed[257:262] == b"ustar"):
# it's a tar.gz, handle as tar
inner_ctx = FileContext(
path=ctx.path,
virtual_path=ctx.virtual_path,
raw_bytes=decompressed,
format_info=FormatInfo("archive:tar", "archive", "tar"),
scanner=ctx.scanner,
profile=ctx.profile,
max_file_size=ctx.max_file_size,
max_archive_members=ctx.max_archive_members,
max_member_size=ctx.max_member_size,
)
return handle_tar(inner_ctx)
# otherwise scan as text
inner_name = Path(ctx.path).stem # remove .gz
inner_format = classify_format(inner_name, decompressed[:512])
virtual_path = f"{ctx.path}:{inner_name}"
inner_ctx = FileContext(
path=ctx.path,
virtual_path=virtual_path,
raw_bytes=decompressed,
format_info=inner_format,
scanner=ctx.scanner,
profile=ctx.profile,
max_file_size=ctx.max_file_size,
max_archive_members=ctx.max_archive_members,
max_member_size=ctx.max_member_size,
)
handler = get_handler(inner_format.format_id)
if handler:
findings.extend(handler(inner_ctx))
elif inner_format.category != "binary":
findings.extend(handle_text(inner_ctx))
except gzip.BadGzipFile:
logger.error(f"invalid gzip file: {ctx.path}")
except Exception as e:
logger.error(f"error handling gzip file {ctx.path}: {e}")
return findings
@register_handler("archive:targz")
def handle_targz(ctx: FileContext) -> list["Finding"]:
"""handle .tar.gz files"""
return handle_gzip(ctx)
@register_handler("archive:*")
def handle_archive_fallback(ctx: FileContext) -> list["Finding"]:
"""fallback for unsupported archive formats"""
logger.info(f"archive format {ctx.format_info.format_id} not supported yet: {ctx.path}")
return []
@register_handler("binary:*")
def handle_binary(ctx: FileContext) -> list["Finding"]:
"""skip binary files"""
logger.debug(f"skipping binary file: {ctx.path}")
return []
# ==================== MAIN ENTRY POINT ====================
def scan_file_dynamic(
path: str,
scanner: "SecretScanner",
format_hint: Optional[str] = None,
profile: ScanProfile = "balanced",
max_file_size: int = 5 * 1024 * 1024,
max_archive_members: int = 1000,
max_member_size: int = 1024 * 1024,
) -> list["Finding"]:
"""
scan a file using dynamic format detection and handlers
args:
path: file path to scan
scanner: SecretScanner instance
format_hint: optional format hint to override detection
profile: scan profile - "fast" (text only), "balanced" (default), "deep" (ast)
max_file_size: max file size to read
max_archive_members: max members to scan in archives
max_member_size: max size per archive member
returns:
list of findings
"""
p = Path(path)
if not p.exists():
logger.error(f"file not found: {path}")
return []
if not p.is_file():
logger.error(f"not a file: {path}")
return []
# check file size
try:
size = p.stat().st_size
if size > max_file_size:
logger.warning(f"file too large ({size} bytes), skipping: {path}")
return []
except OSError as e:
logger.error(f"cannot stat file {path}: {e}")
return []
# read file
try:
with open(p, "rb") as f:
raw_bytes = f.read()
except Exception as e:
logger.error(f"cannot read file {path}: {e}")
return []
# classify format
format_info = classify_format(path, raw_bytes[:512], format_hint)
logger.debug(f"classified {path} as {format_info.format_id} (confidence {format_info.confidence})")
# build context
ctx = FileContext(
path=str(p),
virtual_path=str(p),
raw_bytes=raw_bytes,
format_info=format_info,
scanner=scanner,
profile=profile,
max_file_size=max_file_size,
max_archive_members=max_archive_members,
max_member_size=max_member_size,
)
# get handler and run
handler = get_handler(format_info.format_id)
if handler:
return handler(ctx)
# fallback to text handler for unknown formats
if format_info.category != "binary":
return handle_text(ctx)
return []