Skip to main content
Glama
file_system_exposure_engine.py15.9 kB
from engines.base_engine import BaseEngine from typing import Any import re from utils import safe_print class FileSystemExposureEngine(BaseEngine): """ YARA Rule 기반 File System Exposure 탐지 엔진 탐지 기준: 1. 시스템 경로 키워드 (Windows/Linux/Mac) 2. 위험 확장자 3. 경로 깊이 4. 민감 키워드 """ def __init__(self, db): super().__init__( db=db, name='FileSystemExposureEngine', event_types=['MCP'], producers=['local', 'remote'] ) # ========== YARA-style Rules ========== # Rule 1: Critical system paths (highest priority) self.critical_system_paths = { # Windows 'windows': [ r'C:\\Windows\\System32', r'C:\\Windows\\SysWOW64', r'C:\\Windows\\system\.ini', r'C:\\Windows\\win\.ini', r'C:\\boot\.ini', ], # Linux/Unix 'linux': [ r'/etc/passwd', r'/etc/shadow', r'/etc/sudoers', r'/etc/hosts', r'/root/', r'/proc/', r'/sys/', r'/boot/', r'/var/log/', ], # Mac 'mac': [ r'/Library/Preferences/', r'/System/Library/', r'/private/var/', r'/private/etc/', ], # SSH/Credentials (cross-platform) 'credentials': [ r'\.ssh/id_rsa', r'\.ssh/id_dsa', r'\.ssh/id_ecdsa', r'\.ssh/id_ed25519', r'\.ssh/authorized_keys', r'\.ssh/known_hosts', r'\.aws/credentials', r'\.azure/', r'\.kube/config', r'\.docker/config\.json', ] } # Rule 2: System directory keywords (score based) self.system_keywords = { 'critical': [ # +40 points 'system32', 'syswow64', 'etc/passwd', 'etc/shadow', '.ssh/', '.aws/', '.azure/', '.kube/' ], 'high': [ # +30 points 'windows', 'program files', 'programdata', 'appdata', '/etc/', '/root/', '/proc/', '/sys/', '/boot/', '/var/log/', '/usr/bin/', '/usr/sbin/', 'library/preferences', 'system/library' ], 'medium': [ # +20 points 'users/', 'home/', 'documents/', 'desktop/', '/tmp/', '/var/', '/opt/', '/usr/', 'local/', 'roaming/' ] } # Rule 3: Dangerous file extensions self.dangerous_extensions = { 'critical': [ # +55 points '.pem', '.key', '.crt', '.pfx', '.p12', '.keystore', '.jks', '.der', 'id_rsa', 'id_dsa', 'id_ecdsa', 'id_ed25519' ], 'high': [ # +35 points '.env', '.htpasswd', '.htaccess', '.bashrc', '.bash_profile', '.zshrc', '.npmrc', '.pypirc', '.netrc', '.gitconfig', '.git-credentials', 'credentials', 'secrets' ], 'medium': [ # +15 points '.conf', '.config', '.ini', '.cfg', '.yaml', '.yml', '.json', '.xml', '.log', '.bak', '.old', '.backup' ] } # Rule 4: Path-related field names to check self.path_field_names = [ 'path', 'file', 'filepath', 'filename', 'dir', 'directory', 'folder', 'location', 'source', 'destination', 'target', 'url', 'uri', 'endpoint' # URL도 Path Traversal 검사 대상 ] # Rule 5: Path Traversal patterns self.path_traversal_patterns = [ (r'\.\./', 30, 'Parent directory traversal'), (r'\.\.\\', 30, 'Parent directory traversal (Windows)'), (r'%2e%2e%2f', 35, 'URL encoded traversal'), (r'%2e%2e/', 35, 'URL encoded traversal'), (r'\.\.%2f', 35, 'Mixed encoded traversal'), (r'%252e%252e%252f', 40, 'Double URL encoded traversal'), (r'\.\.%255c', 40, 'Double encoded backslash traversal'), ] # Compile traversal patterns self.traversal_regex = [ (re.compile(p, re.IGNORECASE), score, reason) for p, score, reason in self.path_traversal_patterns ] # Compile regex self._compile_patterns() def _compile_patterns(self): """Compile all regex patterns""" # Critical paths self.critical_path_regex = {} for category, patterns in self.critical_system_paths.items(): self.critical_path_regex[category] = [ re.compile(p, re.IGNORECASE) for p in patterns ] def process(self, data: Any) -> Any: safe_print(f"[FileSystemExposureEngine] Processing event") # Extract paths from specific fields only paths = self._extract_paths_from_fields(data) if not paths: safe_print(f"[FileSystemExposureEngine] No paths to analyze, skipping\n") return None safe_print(f"[FileSystemExposureEngine] Extracted {len(paths)} paths: {paths}") findings = [] total_score = 0 for path in paths: path_score = 0 path_findings = [] # Check 1: Critical system paths critical_match = self._check_critical_paths(path) if critical_match: path_score += 50 path_findings.append({ 'rule': 'critical_system_path', 'category': critical_match['category'], 'matched': critical_match['matched'], 'score': 50 }) # Check 2: System keywords keyword_score, keyword_matches = self._check_system_keywords(path) if keyword_score > 0: path_score += keyword_score for match in keyword_matches: path_findings.append({ 'rule': 'system_keyword', 'keyword': match['keyword'], 'severity': match['severity'], 'score': match['score'] }) # Check 3: Dangerous extensions ext_score, ext_match = self._check_dangerous_extensions(path) if ext_score > 0: path_score += ext_score path_findings.append({ 'rule': 'dangerous_extension', 'extension': ext_match['extension'], 'severity': ext_match['severity'], 'score': ext_score }) # Check 4: Path depth bonus depth_score = self._calculate_depth_score(path) if depth_score > 0: path_score += depth_score path_findings.append({ 'rule': 'path_depth', 'depth': path.count('/') + path.count('\\'), 'score': depth_score }) # Check 5: Path Traversal patterns traversal_score, traversal_match = self._check_path_traversal(path) if traversal_score > 0: path_score += traversal_score path_findings.append({ 'rule': 'path_traversal', 'pattern': traversal_match['pattern'], 'reason': traversal_match['reason'], 'score': traversal_score }) # Add findings if score > 0 if path_score > 0: # Convert to UI-compatible format for detail in path_findings: # Determine category based on score if detail.get('score', 0) >= 35: category = 'critical' elif detail.get('score', 0) >= 25: category = 'high' else: category = 'medium' # Build reason string rule = detail.get('rule', '') if rule == 'critical_system_path': reason = f"Critical system path detected: {detail.get('matched', '')}" elif rule == 'system_keyword': reason = f"System keyword '{detail.get('keyword', '')}' in path" elif rule == 'dangerous_extension': reason = f"Dangerous extension '{detail.get('extension', '')}' detected" elif rule == 'path_depth': reason = f"Deep path access (depth: {detail.get('depth', 0)})" elif rule == 'path_traversal': reason = detail.get('reason', 'Path traversal detected') else: reason = f"File system exposure: {rule}" findings.append({ 'category': category, 'pattern': detail.get('pattern', detail.get('keyword', '')), 'matched_text': path, 'full_path': path, 'reason': reason }) total_score = max(total_score, path_score) # No findings if not findings: safe_print(f"[FileSystemExposureEngine] No issues found\n") return None # Determine severity based on score if total_score >= 70: severity = 'high' elif total_score >= 40: severity = 'medium' else: severity = 'low' # Build result references = [] if 'ts' in data: references.append(f"id-{data['ts']}") result = { 'reference': references, 'result': { 'detector': 'FileSystemExposure', 'severity': severity, 'evaluation': min(total_score, 100), 'findings': findings, 'event_type': data.get('eventType', 'Unknown'), 'producer': data.get('producer', 'unknown'), 'original_event': data } } safe_print(f"[FileSystemExposureEngine] Detection: severity={severity}, score={min(total_score, 100)}") safe_print(f"[FileSystemExposureEngine] {len(findings)} paths flagged\n") return result def _extract_paths_from_fields(self, data: dict) -> list[str]: """ Extract paths only from specific field names (path, file, directory, etc.) """ paths = [] producer = data.get('producer', '') if producer not in ['local', 'remote']: return paths if 'data' not in data or not isinstance(data['data'], dict): return paths mcp_data = data['data'] if 'message' not in mcp_data or not isinstance(mcp_data['message'], dict): return paths message = mcp_data['message'] # Only check params.arguments (request) - not all text if 'params' in message and isinstance(message['params'], dict): params = message['params'] if 'arguments' in params and isinstance(params['arguments'], dict): arguments = params['arguments'] self._extract_from_dict(arguments, paths) return list(set(paths)) def _extract_from_dict(self, obj: dict, paths: list, depth: int = 0): """ Extract values only from path-related field names """ if depth > 5: # Prevent deep recursion return if isinstance(obj, dict): for key, value in obj.items(): key_lower = key.lower() # Only extract from path-related fields if any(field in key_lower for field in self.path_field_names): if isinstance(value, str) and len(value) > 1: paths.append(value) elif isinstance(value, list): for item in value: if isinstance(item, str) and len(item) > 1: paths.append(item) # Recurse into nested objects if isinstance(value, dict): self._extract_from_dict(value, paths, depth + 1) elif isinstance(value, list): for item in value: if isinstance(item, dict): self._extract_from_dict(item, paths, depth + 1) def _check_critical_paths(self, path: str) -> dict | None: """Check against critical system paths""" for category, patterns in self.critical_path_regex.items(): for pattern in patterns: match = pattern.search(path) if match: return { 'category': category, 'matched': match.group(0) } return None def _check_system_keywords(self, path: str) -> tuple[int, list]: """Check for system directory keywords""" path_lower = path.lower() total_score = 0 matches = [] scores = {'critical': 40, 'high': 30, 'medium': 20} for severity, keywords in self.system_keywords.items(): for keyword in keywords: if keyword in path_lower: score = scores[severity] total_score += score matches.append({ 'keyword': keyword, 'severity': severity, 'score': score }) break # Only count one match per severity level return total_score, matches def _check_dangerous_extensions(self, path: str) -> tuple[int, dict]: """Check for dangerous file extensions""" path_lower = path.lower() scores = {'critical': 55, 'high': 35, 'medium': 15} # First check: actual extension (endswith) - highest priority for severity, extensions in self.dangerous_extensions.items(): for ext in extensions: if path_lower.endswith(ext): return scores[severity], { 'extension': ext, 'severity': severity } # Second check: extension appears anywhere in path (lower priority) for severity, extensions in self.dangerous_extensions.items(): for ext in extensions: if ext in path_lower: return scores[severity], { 'extension': ext, 'severity': severity } return 0, {} def _calculate_depth_score(self, path: str) -> int: """ Calculate score based on path depth Deeper paths = higher score (more specific targeting) """ # Count separators depth = path.count('/') + path.count('\\') # Score: 2 points per level after 3 if depth > 3: return min((depth - 3) * 2, 10) # Max 10 points return 0 def _check_path_traversal(self, path: str) -> tuple[int, dict]: """Check for path traversal patterns""" for pattern, score, reason in self.traversal_regex: if pattern.search(path): return score, { 'pattern': pattern.pattern, 'reason': reason } return 0, {}

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/seungwon9201/MCP-Dandan'

If you have feedback or need assistance with the MCP directory API, please join our Discord server