Skip to main content
Glama

MCP PII Tools

by czangyeob
pii_crypto.py20.4 kB
""" PII 암호화 모듈 - 결정론적 암호화: 이름, 주소 등 (검색 가능, 복원 가능) - FPE (Format Preserving Encryption): 전화번호, 카드번호, 여권번호 등 (형식 유지) """ import os import base64 import hashlib from typing import Dict, List, Optional, Tuple, Any from dataclasses import dataclass from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC from cryptography.hazmat.primitives import hashes from cryptography.hazmat.backends import default_backend import secrets import re from dotenv import load_dotenv load_dotenv() @dataclass class CryptoConfig: """암호화 설정""" # 결정론적 암호화용 키 (환경변수에서 가져오거나 생성) deterministic_key: bytes # FPE용 키 fpe_key: bytes # 솔트 salt: bytes class PIICrypto: """PII 암호화/복호화 클래스""" def __init__(self, master_key: Optional[str] = None): """ Args: master_key: 마스터 키 (환경변수 PII_MASTER_KEY에서 가져오거나 제공) """ if master_key is None: master_key = os.environ.get("PII_MASTER_KEY", "pii_master_key_1!smartmind") self.config = self._generate_keys(master_key) # PII 유형별 처리 방식 정의 self.deterministic_types = {"이름", "주소", "이메일"} self.fpe_types = {"전화번호", "신용카드번호", "은행계좌번호", "여권번호", "주민등록번호"} def _generate_keys(self, master_key: str) -> CryptoConfig: """마스터 키로부터 각종 암호화 키들을 생성""" master_bytes = master_key.encode('utf-8') # 고정 솔트 (결정론적 암호화를 위해) salt = hashlib.sha256(b"pii_crypto_salt_" + master_bytes).digest()[:16] # 키 유도 함수 kdf = PBKDF2HMAC( algorithm=hashes.SHA256(), length=32, salt=salt, iterations=100000, backend=default_backend() ) # 결정론적 암호화용 키 det_kdf = PBKDF2HMAC( algorithm=hashes.SHA256(), length=32, salt=salt + b"deterministic", iterations=100000, backend=default_backend() ) deterministic_key = det_kdf.derive(master_bytes) # FPE용 키 fpe_kdf = PBKDF2HMAC( algorithm=hashes.SHA256(), length=32, salt=salt + b"fpe", iterations=100000, backend=default_backend() ) fpe_key = fpe_kdf.derive(master_bytes) return CryptoConfig( deterministic_key=deterministic_key, fpe_key=fpe_key, salt=salt ) def deterministic_encrypt(self, plaintext: str, context: str = "") -> str: """ 결정론적 암호화 (AES-SIV 유사) 같은 입력 → 같은 암호문 (검색 가능) Args: plaintext: 암호화할 평문 context: 추가 컨텍스트 (PII 유형 등) Returns: Base64 인코딩된 암호문 """ if not plaintext: return "" try: # 컨텍스트를 포함한 고정 IV 생성 (결정론적) iv_source = plaintext + context + "deterministic_iv" iv = hashlib.sha256(iv_source.encode('utf-8')).digest()[:16] # AES-CBC 암호화 cipher = Cipher( algorithms.AES(self.config.deterministic_key), modes.CBC(iv), backend=default_backend() ) encryptor = cipher.encryptor() # PKCS7 패딩 padded_data = self._pkcs7_pad(plaintext.encode('utf-8')) ciphertext = encryptor.update(padded_data) + encryptor.finalize() # IV + 암호문을 Base64로 인코딩 encrypted_data = iv + ciphertext return base64.b64encode(encrypted_data).decode('utf-8') except Exception as e: print(f"결정론적 암호화 실패: {e}") return f"[암호화실패:{plaintext[:10]}...]" def deterministic_decrypt(self, ciphertext: str, context: str = "") -> str: """ 결정론적 복호화 Args: ciphertext: Base64 인코딩된 암호문 context: 암호화 시 사용한 컨텍스트 Returns: 복호화된 평문 """ if not ciphertext or ciphertext.startswith("[암호화실패:"): return ciphertext try: # Base64 디코딩 encrypted_data = base64.b64decode(ciphertext.encode('utf-8')) # IV와 암호문 분리 iv = encrypted_data[:16] actual_ciphertext = encrypted_data[16:] # AES-CBC 복호화 cipher = Cipher( algorithms.AES(self.config.deterministic_key), modes.CBC(iv), backend=default_backend() ) decryptor = cipher.decryptor() padded_plaintext = decryptor.update(actual_ciphertext) + decryptor.finalize() # PKCS7 패딩 제거 plaintext = self._pkcs7_unpad(padded_plaintext) return plaintext.decode('utf-8') except Exception as e: print(f"결정론적 복호화 실패: {e}") return f"[복호화실패:{ciphertext[:20]}...]" def fpe_encrypt(self, plaintext: str, pii_type: str) -> str: """ Format Preserving Encryption 형식을 유지하면서 암호화 (숫자는 숫자로, 하이픈은 하이픈으로 유지) Args: plaintext: 암호화할 평문 pii_type: PII 유형 (전화번호, 신용카드번호 등) Returns: 형식이 보존된 암호문 """ if not plaintext: return "" try: # 유형별 패턴 추출 및 암호화 if pii_type == "전화번호": return self._fpe_phone_number(plaintext, encrypt=True) elif pii_type == "신용카드번호": return self._fpe_credit_card(plaintext, encrypt=True) elif pii_type == "은행계좌번호": return self._fpe_bank_account(plaintext, encrypt=True) elif pii_type == "여권번호": return self._fpe_passport(plaintext, encrypt=True) elif pii_type == "주민등록번호": return self._fpe_ssn(plaintext, encrypt=True) else: # 기본적으로 숫자만 FPE 처리 return self._fpe_digits_only(plaintext, encrypt=True) except Exception as e: print(f"FPE 암호화 실패 ({pii_type}): {e}") return f"[FPE암호화실패:{plaintext[:10]}...]" def fpe_decrypt(self, ciphertext: str, pii_type: str) -> str: """ Format Preserving Decryption Args: ciphertext: FPE 암호문 pii_type: PII 유형 Returns: 복호화된 평문 """ if not ciphertext or ciphertext.startswith("[FPE암호화실패:"): return ciphertext try: # 유형별 복호화 if pii_type == "전화번호": return self._fpe_phone_number(ciphertext, encrypt=False) elif pii_type == "신용카드번호": return self._fpe_credit_card(ciphertext, encrypt=False) elif pii_type == "은행계좌번호": return self._fpe_bank_account(ciphertext, encrypt=False) elif pii_type == "여권번호": return self._fpe_passport(ciphertext, encrypt=False) elif pii_type == "주민등록번호": return self._fpe_ssn(ciphertext, encrypt=False) else: return self._fpe_digits_only(ciphertext, encrypt=False) except Exception as e: print(f"FPE 복호화 실패 ({pii_type}): {e}") return f"[FPE복호화실패:{ciphertext[:20]}...]" def _fpe_phone_number(self, text: str, encrypt: bool = True) -> str: """전화번호 FPE 처리 (010-1234-5678 형식 유지)""" # 숫자만 추출 digits = re.sub(r'\D', '', text) if len(digits) < 8: return text # 너무 짧으면 그대로 반환 # 숫자 부분만 FPE 처리 if encrypt: encrypted_digits = self._simple_fpe_digits(digits, "phone") else: encrypted_digits = self._simple_fpe_digits(text.replace('-', '').replace(' ', ''), "phone", decrypt=True) # 원본 형식에 맞춰 재조합 if '-' in text: if len(encrypted_digits) == 11: # 010-1234-5678 return f"{encrypted_digits[:3]}-{encrypted_digits[3:7]}-{encrypted_digits[7:]}" elif len(encrypted_digits) == 10: # 02-1234-5678 return f"{encrypted_digits[:2]}-{encrypted_digits[2:6]}-{encrypted_digits[6:]}" return encrypted_digits def _fpe_credit_card(self, text: str, encrypt: bool = True) -> str: """신용카드번호 FPE 처리""" digits = re.sub(r'\D', '', text) if len(digits) < 13: return text if encrypt: encrypted_digits = self._simple_fpe_digits(digits, "card") else: encrypted_digits = self._simple_fpe_digits(text.replace('-', '').replace(' ', ''), "card", decrypt=True) # 4자리씩 하이픈으로 구분 if '-' in text or ' ' in text: separator = '-' if '-' in text else ' ' return separator.join([encrypted_digits[i:i+4] for i in range(0, len(encrypted_digits), 4)]) return encrypted_digits def _fpe_bank_account(self, text: str, encrypt: bool = True) -> str: """은행계좌번호 FPE 처리""" digits = re.sub(r'\D', '', text) if len(digits) < 8: return text if encrypt: encrypted_digits = self._simple_fpe_digits(digits, "bank") else: encrypted_digits = self._simple_fpe_digits(text.replace('-', ''), "bank", decrypt=True) # 하이픈 형식 복원 if '-' in text: parts = text.split('-') if len(parts) == 3: # 123-456-789012 형식 return f"{encrypted_digits[:len(parts[0])]}-{encrypted_digits[len(parts[0]):len(parts[0])+len(parts[1])]}-{encrypted_digits[len(parts[0])+len(parts[1]):]}" return encrypted_digits def _fpe_passport(self, text: str, encrypt: bool = True) -> str: """여권번호 FPE 처리 (M12345678 → X87654321 형식)""" if len(text) < 2: return text # 첫 글자(알파벳)와 숫자 분리 alpha_part = text[0] if text[0].isalpha() else "" digit_part = text[1:] if text[0].isalpha() else text if encrypt: # 알파벳 치환 if alpha_part: alpha_encrypted = self._simple_fpe_alpha(alpha_part, "passport") else: alpha_encrypted = "" # 숫자 FPE digit_encrypted = self._simple_fpe_digits(digit_part, "passport") return alpha_encrypted + digit_encrypted else: # 복호화 if alpha_part: alpha_decrypted = self._simple_fpe_alpha(alpha_part, "passport", decrypt=True) else: alpha_decrypted = "" digit_decrypted = self._simple_fpe_digits(digit_part, "passport", decrypt=True) return alpha_decrypted + digit_decrypted def _fpe_ssn(self, text: str, encrypt: bool = True) -> str: """주민등록번호 FPE 처리 (123456-1234567)""" digits = re.sub(r'\D', '', text) if len(digits) != 13: return text if encrypt: encrypted_digits = self._simple_fpe_digits(digits, "ssn") else: encrypted_digits = self._simple_fpe_digits(text.replace('-', ''), "ssn", decrypt=True) # 하이픈 형식 복원 return f"{encrypted_digits[:6]}-{encrypted_digits[6:]}" def _fpe_digits_only(self, text: str, encrypt: bool = True) -> str: """일반적인 숫자 FPE 처리""" digits = re.sub(r'\D', '', text) if not digits: return text if encrypt: return self._simple_fpe_digits(digits, "general") else: return self._simple_fpe_digits(digits, "general", decrypt=True) def _simple_fpe_digits(self, digits: str, context: str, decrypt: bool = False) -> str: """간단한 숫자 FPE 구현 - 컨텍스트만으로 결정론적 변환""" if not digits: return digits # 컨텍스트만으로 결정론적 시드 생성 (암호화/복호화 모두 동일) seed = hashlib.sha256(context.encode()).digest() # 숫자 배열로 변환 digit_list = [int(d) for d in digits] # 컨텍스트 기반 결정론적 변환 테이블 생성 (0-9 매핑) transform_table = list(range(10)) # 시드를 기반으로 변환 테이블 셔플 (Fisher-Yates 알고리즘) for i in range(9, 0, -1): seed_byte = seed[i % len(seed)] j = seed_byte % (i + 1) transform_table[i], transform_table[j] = transform_table[j], transform_table[i] # 역변환 테이블 생성 reverse_table = [0] * 10 for i, val in enumerate(transform_table): reverse_table[val] = i # 변환 적용 if not decrypt: # 암호화: transform_table 사용 result = [transform_table[digit] for digit in digit_list] else: # 복호화: reverse_table 사용 result = [reverse_table[digit] for digit in digit_list] return ''.join(map(str, result)) def _simple_fpe_alpha(self, alpha: str, context: str, decrypt: bool = False) -> str: """간단한 알파벳 FPE 구현 - 컨텍스트만으로 결정론적 변환""" if not alpha: return alpha # 컨텍스트만으로 시드 생성 seed = hashlib.sha256(context.encode()).digest() # 대문자 변환 테이블 생성 upper_table = list(range(26)) for i in range(25, 0, -1): seed_byte = seed[i % len(seed)] j = seed_byte % (i + 1) upper_table[i], upper_table[j] = upper_table[j], upper_table[i] # 소문자 변환 테이블 생성 (다른 시드 사용) lower_seed = hashlib.sha256((context + "_lower").encode()).digest() lower_table = list(range(26)) for i in range(25, 0, -1): seed_byte = lower_seed[i % len(lower_seed)] j = seed_byte % (i + 1) lower_table[i], lower_table[j] = lower_table[j], lower_table[i] # 역변환 테이블 생성 upper_reverse = [0] * 26 lower_reverse = [0] * 26 for i in range(26): upper_reverse[upper_table[i]] = i lower_reverse[lower_table[i]] = i result = "" for char in alpha: if char.isupper(): char_idx = ord(char) - ord('A') if not decrypt: new_idx = upper_table[char_idx] else: new_idx = upper_reverse[char_idx] result += chr(new_idx + ord('A')) elif char.islower(): char_idx = ord(char) - ord('a') if not decrypt: new_idx = lower_table[char_idx] else: new_idx = lower_reverse[char_idx] result += chr(new_idx + ord('a')) else: result += char return result def _pkcs7_pad(self, data: bytes) -> bytes: """PKCS7 패딩 추가""" pad_len = 16 - (len(data) % 16) return data + bytes([pad_len] * pad_len) def _pkcs7_unpad(self, data: bytes) -> bytes: """PKCS7 패딩 제거""" pad_len = data[-1] return data[:-pad_len] def encrypt_pii_item(self, pii_value: str, pii_type: str) -> str: """ PII 항목을 유형에 따라 적절한 방식으로 암호화 Args: pii_value: PII 값 pii_type: PII 유형 Returns: 암호화된 값 """ if not pii_value: return pii_value if pii_type in self.deterministic_types: return self.deterministic_encrypt(pii_value, pii_type) elif pii_type in self.fpe_types: return self.fpe_encrypt(pii_value, pii_type) else: # 기본적으로 결정론적 암호화 사용 return self.deterministic_encrypt(pii_value, pii_type) def decrypt_pii_item(self, encrypted_value: str, pii_type: str) -> str: """ 암호화된 PII 항목을 복호화 Args: encrypted_value: 암호화된 값 pii_type: PII 유형 Returns: 복호화된 값 """ if not encrypted_value: return encrypted_value if pii_type in self.deterministic_types: return self.deterministic_decrypt(encrypted_value, pii_type) elif pii_type in self.fpe_types: return self.fpe_decrypt(encrypted_value, pii_type) else: # 기본적으로 결정론적 복호화 사용 return self.deterministic_decrypt(encrypted_value, pii_type) if __name__ == "__main__": # 테스트 코드 crypto = PIICrypto() print("=== PII 암호화 모듈 테스트 ===\n") # 결정론적 암호화 테스트 print("1. 결정론적 암호화 (이름, 주소, 이메일)") test_data = [ ("김철수", "이름"), ("서울시 강남구 테헤란로 123", "주소"), ("kim@example.com", "이메일") ] for original, pii_type in test_data: encrypted = crypto.encrypt_pii_item(original, pii_type) decrypted = crypto.decrypt_pii_item(encrypted, pii_type) print(f" {pii_type}:") print(f" 원본: {original}") print(f" 암호화: {encrypted}") print(f" 복호화: {decrypted}") print(f" 일치: {'✅' if original == decrypted else '❌'}") # 결정론적 테스트 (같은 입력 → 같은 암호문) encrypted2 = crypto.encrypt_pii_item(original, pii_type) print(f" 결정론적: {'✅' if encrypted == encrypted2 else '❌'}") print() # FPE 테스트 print("2. FPE (Format Preserving Encryption)") fpe_test_data = [ ("010-1234-5678", "전화번호"), ("1234-5678-9012-3456", "신용카드번호"), ("123-456-789012", "은행계좌번호"), ("M12345678", "여권번호"), ("123456-1234567", "주민등록번호") ] for original, pii_type in fpe_test_data: encrypted = crypto.encrypt_pii_item(original, pii_type) decrypted = crypto.decrypt_pii_item(encrypted, pii_type) print(f" {pii_type}:") print(f" 원본: {original}") print(f" FPE: {encrypted}") print(f" 복호화: {decrypted}") print(f" 일치: {'✅' if original == decrypted else '❌'}") print(f" 형식유지: {'✅' if len(original) == len(encrypted) else '❌'}") print() print("✅ 테스트 완료!")

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/czangyeob/mcp-pii-tools'

If you have feedback or need assistance with the MCP directory API, please join our Discord server