"""
Data Privacy and Security Module for AI Integration
This module provides comprehensive data protection mechanisms to ensure
sensitive data never leaves your local environment when using AI models.
"""
import re
import hashlib
import logging
from typing import Dict, List, Any, Optional, Set, Tuple
from dataclasses import dataclass, field
from enum import Enum
import json
logger = logging.getLogger(__name__)
class PrivacyLevel(Enum):
"""Privacy protection levels."""
NONE = "none" # No protection (for public data)
BASIC = "basic" # Basic sanitization (remove obvious PII)
MODERATE = "moderate" # Moderate protection (mask sensitive patterns)
STRICT = "strict" # Strict protection (only metadata/schema)
PARANOID = "paranoid" # Maximum protection (no actual data)
@dataclass
class SensitivePattern:
"""Represents a pattern for detecting sensitive data."""
name: str
pattern: str
replacement: str = "[REDACTED]"
category: str = "general"
severity: int = 1 # 1-5, higher = more sensitive
@dataclass
class DataMaskingConfig:
"""Configuration for data masking operations."""
privacy_level: PrivacyLevel = PrivacyLevel.MODERATE
allowed_columns: Set[str] = field(default_factory=set)
blocked_columns: Set[str] = field(default_factory=set)
max_rows: int = 100
max_string_length: int = 50
enable_schema_only: bool = False
custom_patterns: List[SensitivePattern] = field(default_factory=list)
class DataPrivacyManager:
"""Manages data privacy and sanitization for AI interactions."""
def __init__(self, config: DataMaskingConfig = None):
self.config = config or DataMaskingConfig()
self.sensitive_patterns = self._load_default_patterns()
self.sensitive_patterns.extend(self.config.custom_patterns)
def _load_default_patterns(self) -> List[SensitivePattern]:
"""Load default sensitive data patterns."""
return [
# Email addresses
SensitivePattern(
name="email",
pattern=r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b',
replacement="[EMAIL]",
category="pii",
severity=3
),
# Phone numbers (various formats)
SensitivePattern(
name="phone",
pattern=r'(\+?1[-.\s]?)?\(?\d{3}\)?[-.\s]?\d{3}[-.\s]?\d{4}',
replacement="[PHONE]",
category="pii",
severity=3
),
# Social Security Numbers
SensitivePattern(
name="ssn",
pattern=r'\b\d{3}-\d{2}-\d{4}\b|\b\d{9}\b',
replacement="[SSN]",
category="pii",
severity=5
),
# Credit card numbers
SensitivePattern(
name="credit_card",
pattern=r'\b(?:\d{4}[-\s]?){3}\d{4}\b',
replacement="[CARD]",
category="financial",
severity=5
),
# IP addresses
SensitivePattern(
name="ip_address",
pattern=r'\b(?:[0-9]{1,3}\.){3}[0-9]{1,3}\b',
replacement="[IP]",
category="technical",
severity=2
),
# API keys and tokens (generic patterns)
SensitivePattern(
name="api_key",
pattern=r'["\']?(?:api[_-]?key|token|secret)["\']?\s*[:=]\s*["\']?[A-Za-z0-9_-]{20,}["\']?',
replacement="[API_KEY]",
category="credentials",
severity=5
),
# Passwords
SensitivePattern(
name="password",
pattern=r'["\']?password["\']?\s*[:=]\s*["\']?[^\s"\']{8,}["\']?',
replacement="[PASSWORD]",
category="credentials",
severity=5
),
# Database connection strings
SensitivePattern(
name="db_connection",
pattern=r'(?:postgresql|mysql|mongodb)://[^\s]+',
replacement="[DB_CONNECTION]",
category="credentials",
severity=4
),
]
def sanitize_text(self, text: str) -> str:
"""Sanitize text content based on privacy level."""
if self.config.privacy_level == PrivacyLevel.NONE:
return text
sanitized = text
# Apply pattern-based sanitization
for pattern in self.sensitive_patterns:
if self.config.privacy_level.value in ["basic", "moderate", "strict", "paranoid"]:
sanitized = re.sub(pattern.pattern, pattern.replacement, sanitized, flags=re.IGNORECASE)
# Apply length limits
if len(sanitized) > self.config.max_string_length:
sanitized = sanitized[:self.config.max_string_length] + "..."
return sanitized
def sanitize_query_result(self, data: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""Sanitize database query results."""
if self.config.privacy_level == PrivacyLevel.PARANOID:
return []
if not data:
return data
# Limit number of rows
limited_data = data[:self.config.max_rows]
if self.config.privacy_level == PrivacyLevel.STRICT:
# Return only schema information
if limited_data:
return [{"columns": list(limited_data[0].keys()), "row_count": len(data)}]
sanitized_data = []
for row in limited_data:
sanitized_row = {}
for column, value in row.items():
# Check if column is explicitly blocked
if column.lower() in self.config.blocked_columns:
sanitized_row[column] = "[BLOCKED]"
continue
# Check if only specific columns are allowed
if self.config.allowed_columns and column.lower() not in self.config.allowed_columns:
sanitized_row[column] = "[NOT_ALLOWED]"
continue
# Sanitize the value
if isinstance(value, str):
sanitized_row[column] = self.sanitize_text(value)
elif isinstance(value, (int, float)) and self._is_sensitive_number(column, value):
sanitized_row[column] = "[REDACTED_NUMBER]"
else:
sanitized_row[column] = value
sanitized_data.append(sanitized_row)
return sanitized_data
def _is_sensitive_number(self, column: str, value: Any) -> bool:
"""Check if a numeric value might be sensitive based on column name and value."""
sensitive_column_patterns = [
r'.*(?:ssn|social|security).*',
r'.*(?:credit|card|account).*',
r'.*(?:phone|mobile|tel).*',
r'.*(?:salary|wage|income|pay).*',
r'.*(?:price|cost|amount|balance).*'
]
column_lower = column.lower()
for pattern in sensitive_column_patterns:
if re.match(pattern, column_lower):
return True
# Check if number looks like SSN, phone, etc.
if isinstance(value, int):
if len(str(value)) == 9: # Potential SSN
return True
if len(str(value)) == 10: # Potential phone number
return True
return False
def create_safe_schema_summary(self, table_info: Dict[str, Any]) -> Dict[str, Any]:
"""Create a safe summary of table schema without sensitive data."""
return {
"table_name": table_info.get("table_name", "unknown"),
"column_count": len(table_info.get("columns", [])),
"column_names": [col.get("name", "") for col in table_info.get("columns", [])],
"column_types": [col.get("type", "") for col in table_info.get("columns", [])],
"estimated_rows": "available" if table_info.get("row_count") else "unknown",
"privacy_note": f"Data protected at {self.config.privacy_level.value} level"
}
def validate_ai_prompt(self, prompt: str) -> Tuple[bool, str]:
"""Validate that an AI prompt doesn't contain sensitive data."""
issues = []
# Check for patterns that might leak sensitive info
for pattern in self.sensitive_patterns:
if pattern.severity >= 4: # High severity patterns
matches = re.findall(pattern.pattern, prompt, re.IGNORECASE)
if matches:
issues.append(f"Potential {pattern.category} data detected: {pattern.name}")
# Check for SQL injection patterns
sql_injection_patterns = [
r';\s*drop\s+table',
r';\s*delete\s+from',
r';\s*update\s+.*\s+set',
r'union\s+select',
r';\s*insert\s+into'
]
for pattern in sql_injection_patterns:
if re.search(pattern, prompt, re.IGNORECASE):
issues.append("Potential SQL injection pattern detected")
is_safe = len(issues) == 0
message = "Prompt is safe" if is_safe else f"Issues found: {'; '.join(issues)}"
return is_safe, message
def log_ai_interaction(self, interaction_type: str, sanitized_input: str,
model_used: str, response_summary: str):
"""Log AI interactions for audit purposes (with sanitized data only)."""
log_entry = {
"timestamp": str(logger.handlers[0].formatter.formatTime(logger.makeRecord(
logger.name, logging.INFO, __file__, 0, "", (), None
)) if logger.handlers else "unknown"),
"interaction_type": interaction_type,
"model": model_used,
"input_summary": sanitized_input[:100] + "..." if len(sanitized_input) > 100 else sanitized_input,
"response_summary": response_summary,
"privacy_level": self.config.privacy_level.value
}
logger.info(f"AI Interaction: {json.dumps(log_entry)}")
class SchemaOnlyMode:
"""Provides schema and metadata information without actual data."""
@staticmethod
def get_table_schema_summary(columns: List[Dict[str, Any]]) -> Dict[str, Any]:
"""Get table schema without data."""
return {
"columns": [
{
"name": col.get("column_name", ""),
"type": col.get("data_type", ""),
"nullable": col.get("is_nullable", ""),
"has_default": bool(col.get("column_default"))
}
for col in columns
],
"total_columns": len(columns),
"data_access": "schema_only_mode"
}
@staticmethod
def get_safe_sample_query(table_name: str, columns: List[str]) -> str:
"""Generate a safe sample query that shows structure without data."""
safe_columns = ", ".join(columns[:5]) # Limit to first 5 columns
return f"SELECT {safe_columns} FROM {table_name} LIMIT 0" # Returns structure, no data
# Pre-configured privacy managers for different use cases
PRIVACY_CONFIGS = {
"development": DataMaskingConfig(
privacy_level=PrivacyLevel.BASIC,
max_rows=50,
max_string_length=100
),
"production": DataMaskingConfig(
privacy_level=PrivacyLevel.STRICT,
max_rows=10,
max_string_length=50,
blocked_columns={"password", "ssn", "credit_card", "api_key", "secret"}
),
"demo": DataMaskingConfig(
privacy_level=PrivacyLevel.MODERATE,
max_rows=20,
max_string_length=30
),
"high_security": DataMaskingConfig(
privacy_level=PrivacyLevel.PARANOID,
enable_schema_only=True,
max_rows=0
)
}