Skip to main content
Glama
pii_leak_engine.py8.53 kB
from engines.base_engine import BaseEngine from typing import Any from utils import safe_print import yara import os import asyncio class PIILeakEngine(BaseEngine): """Detection engine for Personal Identifiable Information (PII) leaks using YARA rules.""" def __init__(self, db): super().__init__( db=db, name='PIILeakEngine', event_types=['MCP'], producers=['local', 'remote'] ) self.db = db self.rules = None self.custom_rules = None # Load YARA rules from external file self._load_builtin_rules() # Load custom rules from database (async) asyncio.create_task(self._load_custom_rules()) def _load_builtin_rules(self): """Load built-in YARA rules from external file.""" try: current_dir = os.path.dirname(os.path.abspath(__file__)) project_root = os.path.dirname(current_dir) yara_file = os.path.join(project_root, 'yara_rules', 'pii_leak_rules.yar') if not os.path.exists(yara_file): raise FileNotFoundError(f"YARA rules file not found: {yara_file}") self.rules = yara.compile(filepath=yara_file) safe_print(f"[PIILeakEngine] Built-in YARA rules loaded from {yara_file}") except Exception as e: safe_print(f"[PIILeakEngine] Failed to load built-in YARA rules: {e}") self.rules = None async def _load_custom_rules(self): """Load custom YARA rules from database.""" try: custom_content = await self.db.get_custom_rules_content('pii_leak_engine') if custom_content: try: self.custom_rules = yara.compile(source=custom_content) safe_print(f"[PIILeakEngine] Custom YARA rules loaded from database") except Exception as e: safe_print(f"[PIILeakEngine] Failed to compile custom rules: {e}") self.custom_rules = None else: safe_print(f"[PIILeakEngine] No custom rules found in database") self.custom_rules = None except Exception as e: safe_print(f"[PIILeakEngine] Failed to load custom rules: {e}") self.custom_rules = None async def reload_rules(self): """Reload custom rules from database.""" safe_print(f"[PIILeakEngine] Reloading custom rules...") await self._load_custom_rules() def should_process(self, data: dict) -> bool: """Check if this event should be processed (tools/call requests and responses).""" if not super().should_process(data): return False mcp_data = data.get('data', {}) message = mcp_data.get('message', {}) if not isinstance(message, dict): return False # Process tools/call Request if message.get('method') == 'tools/call': return True # Process tools/call Response if mcp_data.get('task') == 'RECV' and 'result' in message: return True return False def process(self, data: Any) -> Any: """Process event and detect PII leaks.""" if not self.rules: safe_print("[PIILeakEngine] YARA rules not loaded, skipping") return None # Extract and analyze text analysis_text = self._extract_analysis_text(data) if not analysis_text: return None safe_print(f"[PIILeakEngine] Analyzing: {analysis_text[:100]}...") # Detect PII using YARA pii_matches = self._detect_pii(analysis_text) if not pii_matches: return None # Calculate severity and score severity = self._calculate_severity(pii_matches) score = self._calculate_score(severity, len(pii_matches)) safe_print(f"[PIILeakEngine] Detected {len(pii_matches)} PII(s): {severity} (score={score})") # Build result return { 'reference': [f"id-{data['ts']}"] if 'ts' in data else [], 'result': { 'detector': 'PIIFilter', 'severity': severity, 'evaluation': score, 'findings': pii_matches, 'event_type': data.get('eventType', 'Unknown'), 'original_event': data } } def _detect_pii(self, text: str) -> list[dict]: """Detect PII using YARA rules (built-in + custom).""" findings = [] # Run built-in rules if self.rules: try: matches = self.rules.match(data=text) for match in matches: for string_match in match.strings: instance = string_match.instances[0] findings.append({ 'rule': match.rule, 'category': match.meta.get('category', 'Unknown'), 'description': match.meta.get('description', match.rule), 'matched_text': instance.matched_data.decode('utf-8', errors='ignore'), 'reason': f"{match.meta.get('description', match.rule)}: {instance.matched_data.decode('utf-8', errors='ignore')}" }) except Exception as e: safe_print(f"[PIILeakEngine] Built-in YARA matching error: {e}") # Run custom rules if self.custom_rules: try: custom_matches = self.custom_rules.match(data=text) for match in custom_matches: for string_match in match.strings: instance = string_match.instances[0] findings.append({ 'rule': f"{match.rule} (custom)", 'category': match.meta.get('category', 'Custom'), 'description': match.meta.get('description', match.rule), 'matched_text': instance.matched_data.decode('utf-8', errors='ignore'), 'reason': f"[Custom Rule] {match.meta.get('description', match.rule)}: {instance.matched_data.decode('utf-8', errors='ignore')}" }) except Exception as e: safe_print(f"[PIILeakEngine] Custom YARA matching error: {e}") return findings def _extract_analysis_text(self, data: dict) -> str: """Extract text from MCP request and response for analysis.""" mcp_data = data.get('data', {}) message = mcp_data.get('message', {}) if not isinstance(message, dict): return '' texts = [] # Extract from Request params params = message.get('params', {}) if isinstance(params, dict): if 'arguments' in params: texts.append(str(params['arguments'])) # Extract from Response result result = message.get('result', {}) if isinstance(result, dict): # Extract content[].text content = result.get('content', []) if isinstance(content, list): for item in content: if isinstance(item, dict) and 'text' in item: texts.append(str(item['text'])) # Extract structuredContent if 'structuredContent' in result: texts.append(str(result['structuredContent'])) return ' '.join(texts) def _calculate_severity(self, matches: list[dict]) -> str: """Calculate severity based on PII category (Financial/Medical/Custom = high, PII = medium).""" categories = {m['category'] for m in matches} # Check if any match is from a custom rule has_custom = any('(custom)' in m.get('rule', '') for m in matches) if has_custom or categories & {'Financial PII', 'Medical PII'}: return 'high' elif 'PII' in categories or 'Custom' in categories: return 'medium' else: return 'low' def _calculate_score(self, severity: str, findings_count: int) -> int: """Calculate risk score (0-100) based on severity and findings count.""" base_scores = {'high': 85, 'medium': 50, 'low': 20} base_score = base_scores.get(severity, 0) findings_bonus = min(findings_count * 5, 15) # Max +15 return min(base_score + findings_bonus, 100)

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/seungwon9201/MCP-Dandan'

If you have feedback or need assistance with the MCP directory API, please join our Discord server