pii_crypto.py•20.4 kB
"""
PII 암호화 모듈
- 결정론적 암호화: 이름, 주소 등 (검색 가능, 복원 가능)
- FPE (Format Preserving Encryption): 전화번호, 카드번호, 여권번호 등 (형식 유지)
"""
import os
import base64
import hashlib
from typing import Dict, List, Optional, Tuple, Any
from dataclasses import dataclass
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.backends import default_backend
import secrets
import re
from dotenv import load_dotenv
load_dotenv()
@dataclass
class CryptoConfig:
"""암호화 설정"""
# 결정론적 암호화용 키 (환경변수에서 가져오거나 생성)
deterministic_key: bytes
# FPE용 키
fpe_key: bytes
# 솔트
salt: bytes
class PIICrypto:
"""PII 암호화/복호화 클래스"""
def __init__(self, master_key: Optional[str] = None):
"""
Args:
master_key: 마스터 키 (환경변수 PII_MASTER_KEY에서 가져오거나 제공)
"""
if master_key is None:
master_key = os.environ.get("PII_MASTER_KEY", "pii_master_key_1!smartmind")
self.config = self._generate_keys(master_key)
# PII 유형별 처리 방식 정의
self.deterministic_types = {"이름", "주소", "이메일"}
self.fpe_types = {"전화번호", "신용카드번호", "은행계좌번호", "여권번호", "주민등록번호"}
def _generate_keys(self, master_key: str) -> CryptoConfig:
"""마스터 키로부터 각종 암호화 키들을 생성"""
master_bytes = master_key.encode('utf-8')
# 고정 솔트 (결정론적 암호화를 위해)
salt = hashlib.sha256(b"pii_crypto_salt_" + master_bytes).digest()[:16]
# 키 유도 함수
kdf = PBKDF2HMAC(
algorithm=hashes.SHA256(),
length=32,
salt=salt,
iterations=100000,
backend=default_backend()
)
# 결정론적 암호화용 키
det_kdf = PBKDF2HMAC(
algorithm=hashes.SHA256(),
length=32,
salt=salt + b"deterministic",
iterations=100000,
backend=default_backend()
)
deterministic_key = det_kdf.derive(master_bytes)
# FPE용 키
fpe_kdf = PBKDF2HMAC(
algorithm=hashes.SHA256(),
length=32,
salt=salt + b"fpe",
iterations=100000,
backend=default_backend()
)
fpe_key = fpe_kdf.derive(master_bytes)
return CryptoConfig(
deterministic_key=deterministic_key,
fpe_key=fpe_key,
salt=salt
)
def deterministic_encrypt(self, plaintext: str, context: str = "") -> str:
"""
결정론적 암호화 (AES-SIV 유사)
같은 입력 → 같은 암호문 (검색 가능)
Args:
plaintext: 암호화할 평문
context: 추가 컨텍스트 (PII 유형 등)
Returns:
Base64 인코딩된 암호문
"""
if not plaintext:
return ""
try:
# 컨텍스트를 포함한 고정 IV 생성 (결정론적)
iv_source = plaintext + context + "deterministic_iv"
iv = hashlib.sha256(iv_source.encode('utf-8')).digest()[:16]
# AES-CBC 암호화
cipher = Cipher(
algorithms.AES(self.config.deterministic_key),
modes.CBC(iv),
backend=default_backend()
)
encryptor = cipher.encryptor()
# PKCS7 패딩
padded_data = self._pkcs7_pad(plaintext.encode('utf-8'))
ciphertext = encryptor.update(padded_data) + encryptor.finalize()
# IV + 암호문을 Base64로 인코딩
encrypted_data = iv + ciphertext
return base64.b64encode(encrypted_data).decode('utf-8')
except Exception as e:
print(f"결정론적 암호화 실패: {e}")
return f"[암호화실패:{plaintext[:10]}...]"
def deterministic_decrypt(self, ciphertext: str, context: str = "") -> str:
"""
결정론적 복호화
Args:
ciphertext: Base64 인코딩된 암호문
context: 암호화 시 사용한 컨텍스트
Returns:
복호화된 평문
"""
if not ciphertext or ciphertext.startswith("[암호화실패:"):
return ciphertext
try:
# Base64 디코딩
encrypted_data = base64.b64decode(ciphertext.encode('utf-8'))
# IV와 암호문 분리
iv = encrypted_data[:16]
actual_ciphertext = encrypted_data[16:]
# AES-CBC 복호화
cipher = Cipher(
algorithms.AES(self.config.deterministic_key),
modes.CBC(iv),
backend=default_backend()
)
decryptor = cipher.decryptor()
padded_plaintext = decryptor.update(actual_ciphertext) + decryptor.finalize()
# PKCS7 패딩 제거
plaintext = self._pkcs7_unpad(padded_plaintext)
return plaintext.decode('utf-8')
except Exception as e:
print(f"결정론적 복호화 실패: {e}")
return f"[복호화실패:{ciphertext[:20]}...]"
def fpe_encrypt(self, plaintext: str, pii_type: str) -> str:
"""
Format Preserving Encryption
형식을 유지하면서 암호화 (숫자는 숫자로, 하이픈은 하이픈으로 유지)
Args:
plaintext: 암호화할 평문
pii_type: PII 유형 (전화번호, 신용카드번호 등)
Returns:
형식이 보존된 암호문
"""
if not plaintext:
return ""
try:
# 유형별 패턴 추출 및 암호화
if pii_type == "전화번호":
return self._fpe_phone_number(plaintext, encrypt=True)
elif pii_type == "신용카드번호":
return self._fpe_credit_card(plaintext, encrypt=True)
elif pii_type == "은행계좌번호":
return self._fpe_bank_account(plaintext, encrypt=True)
elif pii_type == "여권번호":
return self._fpe_passport(plaintext, encrypt=True)
elif pii_type == "주민등록번호":
return self._fpe_ssn(plaintext, encrypt=True)
else:
# 기본적으로 숫자만 FPE 처리
return self._fpe_digits_only(plaintext, encrypt=True)
except Exception as e:
print(f"FPE 암호화 실패 ({pii_type}): {e}")
return f"[FPE암호화실패:{plaintext[:10]}...]"
def fpe_decrypt(self, ciphertext: str, pii_type: str) -> str:
"""
Format Preserving Decryption
Args:
ciphertext: FPE 암호문
pii_type: PII 유형
Returns:
복호화된 평문
"""
if not ciphertext or ciphertext.startswith("[FPE암호화실패:"):
return ciphertext
try:
# 유형별 복호화
if pii_type == "전화번호":
return self._fpe_phone_number(ciphertext, encrypt=False)
elif pii_type == "신용카드번호":
return self._fpe_credit_card(ciphertext, encrypt=False)
elif pii_type == "은행계좌번호":
return self._fpe_bank_account(ciphertext, encrypt=False)
elif pii_type == "여권번호":
return self._fpe_passport(ciphertext, encrypt=False)
elif pii_type == "주민등록번호":
return self._fpe_ssn(ciphertext, encrypt=False)
else:
return self._fpe_digits_only(ciphertext, encrypt=False)
except Exception as e:
print(f"FPE 복호화 실패 ({pii_type}): {e}")
return f"[FPE복호화실패:{ciphertext[:20]}...]"
def _fpe_phone_number(self, text: str, encrypt: bool = True) -> str:
"""전화번호 FPE 처리 (010-1234-5678 형식 유지)"""
# 숫자만 추출
digits = re.sub(r'\D', '', text)
if len(digits) < 8:
return text # 너무 짧으면 그대로 반환
# 숫자 부분만 FPE 처리
if encrypt:
encrypted_digits = self._simple_fpe_digits(digits, "phone")
else:
encrypted_digits = self._simple_fpe_digits(text.replace('-', '').replace(' ', ''), "phone", decrypt=True)
# 원본 형식에 맞춰 재조합
if '-' in text:
if len(encrypted_digits) == 11: # 010-1234-5678
return f"{encrypted_digits[:3]}-{encrypted_digits[3:7]}-{encrypted_digits[7:]}"
elif len(encrypted_digits) == 10: # 02-1234-5678
return f"{encrypted_digits[:2]}-{encrypted_digits[2:6]}-{encrypted_digits[6:]}"
return encrypted_digits
def _fpe_credit_card(self, text: str, encrypt: bool = True) -> str:
"""신용카드번호 FPE 처리"""
digits = re.sub(r'\D', '', text)
if len(digits) < 13:
return text
if encrypt:
encrypted_digits = self._simple_fpe_digits(digits, "card")
else:
encrypted_digits = self._simple_fpe_digits(text.replace('-', '').replace(' ', ''), "card", decrypt=True)
# 4자리씩 하이픈으로 구분
if '-' in text or ' ' in text:
separator = '-' if '-' in text else ' '
return separator.join([encrypted_digits[i:i+4] for i in range(0, len(encrypted_digits), 4)])
return encrypted_digits
def _fpe_bank_account(self, text: str, encrypt: bool = True) -> str:
"""은행계좌번호 FPE 처리"""
digits = re.sub(r'\D', '', text)
if len(digits) < 8:
return text
if encrypt:
encrypted_digits = self._simple_fpe_digits(digits, "bank")
else:
encrypted_digits = self._simple_fpe_digits(text.replace('-', ''), "bank", decrypt=True)
# 하이픈 형식 복원
if '-' in text:
parts = text.split('-')
if len(parts) == 3: # 123-456-789012 형식
return f"{encrypted_digits[:len(parts[0])]}-{encrypted_digits[len(parts[0]):len(parts[0])+len(parts[1])]}-{encrypted_digits[len(parts[0])+len(parts[1]):]}"
return encrypted_digits
def _fpe_passport(self, text: str, encrypt: bool = True) -> str:
"""여권번호 FPE 처리 (M12345678 → X87654321 형식)"""
if len(text) < 2:
return text
# 첫 글자(알파벳)와 숫자 분리
alpha_part = text[0] if text[0].isalpha() else ""
digit_part = text[1:] if text[0].isalpha() else text
if encrypt:
# 알파벳 치환
if alpha_part:
alpha_encrypted = self._simple_fpe_alpha(alpha_part, "passport")
else:
alpha_encrypted = ""
# 숫자 FPE
digit_encrypted = self._simple_fpe_digits(digit_part, "passport")
return alpha_encrypted + digit_encrypted
else:
# 복호화
if alpha_part:
alpha_decrypted = self._simple_fpe_alpha(alpha_part, "passport", decrypt=True)
else:
alpha_decrypted = ""
digit_decrypted = self._simple_fpe_digits(digit_part, "passport", decrypt=True)
return alpha_decrypted + digit_decrypted
def _fpe_ssn(self, text: str, encrypt: bool = True) -> str:
"""주민등록번호 FPE 처리 (123456-1234567)"""
digits = re.sub(r'\D', '', text)
if len(digits) != 13:
return text
if encrypt:
encrypted_digits = self._simple_fpe_digits(digits, "ssn")
else:
encrypted_digits = self._simple_fpe_digits(text.replace('-', ''), "ssn", decrypt=True)
# 하이픈 형식 복원
return f"{encrypted_digits[:6]}-{encrypted_digits[6:]}"
def _fpe_digits_only(self, text: str, encrypt: bool = True) -> str:
"""일반적인 숫자 FPE 처리"""
digits = re.sub(r'\D', '', text)
if not digits:
return text
if encrypt:
return self._simple_fpe_digits(digits, "general")
else:
return self._simple_fpe_digits(digits, "general", decrypt=True)
def _simple_fpe_digits(self, digits: str, context: str, decrypt: bool = False) -> str:
"""간단한 숫자 FPE 구현 - 컨텍스트만으로 결정론적 변환"""
if not digits:
return digits
# 컨텍스트만으로 결정론적 시드 생성 (암호화/복호화 모두 동일)
seed = hashlib.sha256(context.encode()).digest()
# 숫자 배열로 변환
digit_list = [int(d) for d in digits]
# 컨텍스트 기반 결정론적 변환 테이블 생성 (0-9 매핑)
transform_table = list(range(10))
# 시드를 기반으로 변환 테이블 셔플 (Fisher-Yates 알고리즘)
for i in range(9, 0, -1):
seed_byte = seed[i % len(seed)]
j = seed_byte % (i + 1)
transform_table[i], transform_table[j] = transform_table[j], transform_table[i]
# 역변환 테이블 생성
reverse_table = [0] * 10
for i, val in enumerate(transform_table):
reverse_table[val] = i
# 변환 적용
if not decrypt:
# 암호화: transform_table 사용
result = [transform_table[digit] for digit in digit_list]
else:
# 복호화: reverse_table 사용
result = [reverse_table[digit] for digit in digit_list]
return ''.join(map(str, result))
def _simple_fpe_alpha(self, alpha: str, context: str, decrypt: bool = False) -> str:
"""간단한 알파벳 FPE 구현 - 컨텍스트만으로 결정론적 변환"""
if not alpha:
return alpha
# 컨텍스트만으로 시드 생성
seed = hashlib.sha256(context.encode()).digest()
# 대문자 변환 테이블 생성
upper_table = list(range(26))
for i in range(25, 0, -1):
seed_byte = seed[i % len(seed)]
j = seed_byte % (i + 1)
upper_table[i], upper_table[j] = upper_table[j], upper_table[i]
# 소문자 변환 테이블 생성 (다른 시드 사용)
lower_seed = hashlib.sha256((context + "_lower").encode()).digest()
lower_table = list(range(26))
for i in range(25, 0, -1):
seed_byte = lower_seed[i % len(lower_seed)]
j = seed_byte % (i + 1)
lower_table[i], lower_table[j] = lower_table[j], lower_table[i]
# 역변환 테이블 생성
upper_reverse = [0] * 26
lower_reverse = [0] * 26
for i in range(26):
upper_reverse[upper_table[i]] = i
lower_reverse[lower_table[i]] = i
result = ""
for char in alpha:
if char.isupper():
char_idx = ord(char) - ord('A')
if not decrypt:
new_idx = upper_table[char_idx]
else:
new_idx = upper_reverse[char_idx]
result += chr(new_idx + ord('A'))
elif char.islower():
char_idx = ord(char) - ord('a')
if not decrypt:
new_idx = lower_table[char_idx]
else:
new_idx = lower_reverse[char_idx]
result += chr(new_idx + ord('a'))
else:
result += char
return result
def _pkcs7_pad(self, data: bytes) -> bytes:
"""PKCS7 패딩 추가"""
pad_len = 16 - (len(data) % 16)
return data + bytes([pad_len] * pad_len)
def _pkcs7_unpad(self, data: bytes) -> bytes:
"""PKCS7 패딩 제거"""
pad_len = data[-1]
return data[:-pad_len]
def encrypt_pii_item(self, pii_value: str, pii_type: str) -> str:
"""
PII 항목을 유형에 따라 적절한 방식으로 암호화
Args:
pii_value: PII 값
pii_type: PII 유형
Returns:
암호화된 값
"""
if not pii_value:
return pii_value
if pii_type in self.deterministic_types:
return self.deterministic_encrypt(pii_value, pii_type)
elif pii_type in self.fpe_types:
return self.fpe_encrypt(pii_value, pii_type)
else:
# 기본적으로 결정론적 암호화 사용
return self.deterministic_encrypt(pii_value, pii_type)
def decrypt_pii_item(self, encrypted_value: str, pii_type: str) -> str:
"""
암호화된 PII 항목을 복호화
Args:
encrypted_value: 암호화된 값
pii_type: PII 유형
Returns:
복호화된 값
"""
if not encrypted_value:
return encrypted_value
if pii_type in self.deterministic_types:
return self.deterministic_decrypt(encrypted_value, pii_type)
elif pii_type in self.fpe_types:
return self.fpe_decrypt(encrypted_value, pii_type)
else:
# 기본적으로 결정론적 복호화 사용
return self.deterministic_decrypt(encrypted_value, pii_type)
if __name__ == "__main__":
# 테스트 코드
crypto = PIICrypto()
print("=== PII 암호화 모듈 테스트 ===\n")
# 결정론적 암호화 테스트
print("1. 결정론적 암호화 (이름, 주소, 이메일)")
test_data = [
("김철수", "이름"),
("서울시 강남구 테헤란로 123", "주소"),
("kim@example.com", "이메일")
]
for original, pii_type in test_data:
encrypted = crypto.encrypt_pii_item(original, pii_type)
decrypted = crypto.decrypt_pii_item(encrypted, pii_type)
print(f" {pii_type}:")
print(f" 원본: {original}")
print(f" 암호화: {encrypted}")
print(f" 복호화: {decrypted}")
print(f" 일치: {'✅' if original == decrypted else '❌'}")
# 결정론적 테스트 (같은 입력 → 같은 암호문)
encrypted2 = crypto.encrypt_pii_item(original, pii_type)
print(f" 결정론적: {'✅' if encrypted == encrypted2 else '❌'}")
print()
# FPE 테스트
print("2. FPE (Format Preserving Encryption)")
fpe_test_data = [
("010-1234-5678", "전화번호"),
("1234-5678-9012-3456", "신용카드번호"),
("123-456-789012", "은행계좌번호"),
("M12345678", "여권번호"),
("123456-1234567", "주민등록번호")
]
for original, pii_type in fpe_test_data:
encrypted = crypto.encrypt_pii_item(original, pii_type)
decrypted = crypto.decrypt_pii_item(encrypted, pii_type)
print(f" {pii_type}:")
print(f" 원본: {original}")
print(f" FPE: {encrypted}")
print(f" 복호화: {decrypted}")
print(f" 일치: {'✅' if original == decrypted else '❌'}")
print(f" 형식유지: {'✅' if len(original) == len(encrypted) else '❌'}")
print()
print("✅ 테스트 완료!")