mcp_pii_tools.py•49.2 kB
#!/usr/bin/env python3
"""
MCP (Model Context Protocol) Tools for PII Detection and Processing
PII 탐지 및 처리 기능을 MCP Tool로 제공
"""
import os
import re
import sys
import time
from typing import Dict, List, Optional, Any
from dataclasses import dataclass, asdict
# from dotenv import load_dotenv # Claude Desktop에서 환경변수를 직접 설정하므로 불필요
import langextract as lx
from pii_crypto import PIICrypto
from openai import OpenAI
from vllm_provider import VLLMProvider, ModelProviderFactory, get_provider_config
from mcp.server.fastmcp import FastMCP
import logging
# 로깅 설정
logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger(__name__)
mcp = FastMCP("hello-mcp")
# 전역 PII 탐지기 인스턴스 (서버 시작 시 초기화)
_global_detector = None
def get_detector():
"""전역 PII 탐지기 인스턴스를 반환합니다."""
global _global_detector
if _global_detector is None:
_global_detector = MCPPIIDetector()
return _global_detector
# 환경 변수 로드 (Claude Desktop에서 직접 설정됨)
# load_dotenv()
@dataclass
class PIIItem:
"""PII 항목을 나타내는 데이터 클래스"""
type: str # PII 유형 (이름, 전화번호, 이메일 등)
value: str # 추출된 값
confidence: float # 신뢰도 (0.0-1.0)
start_pos: int # 텍스트 내 시작 위치
end_pos: int # 텍스트 내 끝 위치
@dataclass
class ProcessedPII:
"""처리된 PII 정보"""
original_text: str # 원본 텍스트
encrypted_text: str # 암호화된 텍스트 (검색용)
anonymized_text: str # 완전 익명화된 텍스트 (표시용)
pii_items: List[PIIItem] # 탐지된 PII 항목들
encrypted_items: Dict[str, str] # 암호화된 PII 매핑
class MCPPIIDetector:
"""MCP용 PII 탐지기 - langextract 기반"""
def __init__(self):
"""PII 탐지기 초기화"""
# Provider 설정 로드
self.provider_config = get_provider_config()
self.provider_type = self.provider_config["provider_type"]
self.model_id = self.provider_config["model_id"]
self.api_key = self.provider_config["api_key"]
# MCP 서버 시작 시 Provider 정보를 stderr로 출력
print(f"🚀 MCP PII Tools 시작 - Provider: {self.provider_type.upper()}, Model: {self.model_id}", file=sys.stderr)
if not self.api_key:
raise ValueError(f"{self.provider_type.upper()} API 키가 설정되지 않았습니다. 환경변수를 확인하세요.")
# Provider 인스턴스 생성
self.provider = ModelProviderFactory.create_provider(
provider_type=self.provider_type,
model_id=self.model_id,
api_key=self.api_key
)
logger.info(f"PII 탐지기 초기화 완료 - Provider: {self.provider_type}, Model: {self.model_id}")
# langextract 예제 데이터 설정
self.examples = [
lx.data.ExampleData(
text="오늘은 Rachel Lim이(가) 방문했다. 이메일 주소는 rachel.lim@gmail.com이다. 여권번호는 M31143886이다.",
extractions=[
lx.data.Extraction(extraction_class="name", extraction_text="Rachel Lim"),
lx.data.Extraction(extraction_class="email", extraction_text="rachel.lim@gmail.com"),
lx.data.Extraction(extraction_class="passport_number", extraction_text="M31143886"),
],
),
lx.data.ExampleData(
text="김철수 씨가 테이블에 앉았다. 연락처는 010-1234-5678이고 이메일은 kim@example.com입니다.",
extractions=[
lx.data.Extraction(extraction_class="name", extraction_text="김철수"),
lx.data.Extraction(extraction_class="phone", extraction_text="010-1234-5678"),
lx.data.Extraction(extraction_class="email", extraction_text="kim@example.com"),
],
),
lx.data.ExampleData(
text="박지현 님이 딜러와 대화했다. 주소는 서울시 강남구 테헤란로 123입니다.",
extractions=[
lx.data.Extraction(extraction_class="name", extraction_text="박지현"),
lx.data.Extraction(extraction_class="address", extraction_text="서울시 강남구 테헤란로 123"),
],
),
lx.data.ExampleData(
text="이름은 김창엽이고 주소는 분당구야. 전화번호는 010-9876-5432야.",
extractions=[
lx.data.Extraction(extraction_class="name", extraction_text="김창엽"),
lx.data.Extraction(extraction_class="address", extraction_text="분당구"),
lx.data.Extraction(extraction_class="phone", extraction_text="010-9876-5432"),
],
),
lx.data.ExampleData(
text="사는 곳은 강남구이고 이름은 박민수입니다. 연락처는 010-5555-1234입니다.",
extractions=[
lx.data.Extraction(extraction_class="address", extraction_text="강남구"),
lx.data.Extraction(extraction_class="name", extraction_text="박민수"),
lx.data.Extraction(extraction_class="phone", extraction_text="010-5555-1234"),
],
),
lx.data.ExampleData(
text="이름은 김창엽이고 주소는 경기도 성남시 분당구야. 전화번호는 010-9876-5432야 여권번호는 M3234234 으로 확인되.",
extractions=[
lx.data.Extraction(extraction_class="name", extraction_text="김창엽"),
lx.data.Extraction(extraction_class="address", extraction_text="경기도 성남시 분당구"),
lx.data.Extraction(extraction_class="phone", extraction_text="010-9876-5432"),
lx.data.Extraction(extraction_class="passport_number", extraction_text="M3234234"),
],
),
]
self.prompt = """주어진 텍스트에서 PII(개인정보)를 추출하세요. 다음 항목들을 찾아주세요:
- 이름: 사람의 이름 (김철수, 박지현, Rachel Lim 등)
- 이메일: 이메일 주소 (example@email.com 등)
- 전화번호: 전화번호 (010-1234-5678, 02-123-4567 등)
- 여권번호: 여권 번호 (M31143886, M3234234 등)
- 주소: 주소 정보 (서울시 강남구, 분당구, 강남구, 서초구, 경기도 성남시 분당구 등)
중요: 주소의 경우 다음을 모두 주소로 인식해주세요:
- "분당구", "강남구", "서초구" 같은 지역명
- "경기도 성남시 분당구" 같은 구체적인 주소
- "주소는 분당구야"에서 "분당구"는 주소입니다.
- "주소는 경기도 성남시 분당구야"에서 "경기도 성남시 분당구"는 주소입니다.
응답은 반드시 ```json 코드 블록으로 감싸서 반환하세요."""
print("✅ MCP PII Detector 초기화 완료")
def detect_pii(self, text: str) -> Dict[str, Any]:
"""
텍스트에서 PII를 탐지 (MCP Tool용)
Args:
text (str): 분석할 텍스트
Returns:
Dict[str, Any]: MCP Tool 응답 형식
"""
try:
start_time = time.time()
# Provider에 따른 langextract 호출
if self.provider_type == "vllm":
# vLLM Provider 사용
result = lx.extract(
text_or_documents=text,
prompt_description=self.prompt,
examples=self.examples,
model=self.provider, # 커스텀 Provider 인스턴스 사용
use_schema_constraints=False
)
else:
# OpenAI Provider 사용 (기본)
os.environ["OPENAI_BASE_URL"] = "https://api.openai.com/v1"
result = lx.extract(
text_or_documents=text,
prompt_description=self.prompt,
examples=self.examples,
model_id=self.model_id,
api_key=self.api_key,
fence_output=True
)
# 결과를 PIIItem 리스트로 변환
pii_items = []
logger.info(f"탐지된 extraction 수: {len(result.extractions)}")
for i, extraction in enumerate(result.extractions):
logger.info(f"Extraction {i+1}: class='{extraction.extraction_class}', text='{extraction.extraction_text}'")
# char_interval이 없으면 텍스트에서 직접 위치 찾기
start_pos = 0
end_pos = 0
if extraction.char_interval:
start_pos = extraction.char_interval.start_pos
end_pos = extraction.char_interval.end_pos
logger.info(f" char_interval 사용: {start_pos}-{end_pos}")
else:
# 텍스트에서 직접 위치 찾기 (대소문자 구분 없이)
search_text = extraction.extraction_text
start_pos = text.find(search_text)
# 대소문자 구분 없이 찾기
if start_pos == -1:
start_pos = text.lower().find(search_text.lower())
if start_pos != -1:
end_pos = start_pos + len(search_text)
logger.info(f" 텍스트에서 직접 찾음: {start_pos}-{end_pos}")
# 실제 찾은 텍스트와 원본이 일치하는지 확인
actual_found = text[start_pos:end_pos]
if actual_found != search_text:
logger.warning(f" 대소문자 차이: 찾은='{actual_found}', 원본='{search_text}'")
else:
start_pos = -1
end_pos = -1
logger.warning(f" 텍스트에서 찾을 수 없음: '{extraction.extraction_text}'")
mapped_type = self._map_extraction_class(extraction.extraction_class)
logger.info(f" 매핑된 타입: '{extraction.extraction_class}' -> '{mapped_type}'")
pii_items.append(PIIItem(
type=mapped_type,
value=extraction.extraction_text,
confidence=0.9, # langextract는 confidence를 제공하지 않으므로 기본값
start_pos=start_pos,
end_pos=end_pos
))
processing_time = time.time() - start_time
return {
"success": True,
"pii_items": [asdict(item) for item in pii_items],
"count": len(pii_items),
"processing_time": processing_time,
"summary": self._get_pii_summary(pii_items)
}
except Exception as e:
return {
"success": False,
"error": str(e),
"pii_items": [],
"count": 0,
"processing_time": 0,
"summary": {}
}
def anonymize_text(self, text: str, pii_items: List[PIIItem]) -> str:
"""텍스트에서 PII를 익명화 처리"""
if not pii_items:
return text
logger.info(f"익명화 시작: 원본 텍스트 길이={len(text)}")
logger.info(f"익명화할 PII 항목 수: {len(pii_items)}")
# PII 항목들을 위치별로 정렬하고 중복 제거
valid_items = []
for item in pii_items:
if item.start_pos != -1 and item.end_pos != -1 and item.start_pos < len(text):
# 실제 텍스트에서 해당 위치의 내용이 일치하는지 확인
actual_text = text[item.start_pos:item.end_pos]
if actual_text == item.value:
valid_items.append(item)
logger.info(f"유효한 PII: '{item.value}' ({item.type}) at {item.start_pos}-{item.end_pos}")
else:
# 주소의 경우 위치 불일치를 무시하고 유효한 것으로 처리
if item.type == "주소":
valid_items.append(item)
logger.info(f"주소 위치 불일치 무시: '{item.value}' ({item.type}) at {item.start_pos}-{item.end_pos}")
else:
logger.warning(f"위치 불일치: 예상='{item.value}', 실제='{actual_text}' at {item.start_pos}-{item.end_pos}")
else:
# 주소의 경우 위치가 유효하지 않아도 처리
if item.type == "주소":
valid_items.append(item)
logger.info(f"주소 위치 무효 무시: '{item.value}' ({item.type}) at {item.start_pos}-{item.end_pos}")
else:
logger.warning(f"유효하지 않은 위치: '{item.value}' ({item.type}) at {item.start_pos}-{item.end_pos}")
# 뒤에서부터 치환 (인덱스 변화 방지)
sorted_items = sorted(valid_items, key=lambda x: x.start_pos, reverse=True)
anonymized_text = text
# 주소의 경우 특별 처리: langextract 위치 정보 무시하고 직접 찾기
address_items = [item for item in sorted_items if item.type == "주소"]
other_items = [item for item in sorted_items if item.type != "주소"]
# 주소 먼저 처리
for item in address_items:
logger.info(f"주소 특별 처리: '{item.value}' ({item.type})")
# PII 유형에 따른 익명화
anonymized_value = "[주소]"
# 직접 텍스트에서 찾기
if item.value in anonymized_text:
anonymized_text = anonymized_text.replace(item.value, anonymized_value, 1)
logger.info(f"주소 직접 매치 익명화 완료: '{item.value}' -> '{anonymized_value}'")
else:
# 부분 매치 시도
keywords = [word for word in item.value.split() if len(word) > 1]
for keyword in reversed(keywords): # 뒤에서부터 시도
if keyword in anonymized_text:
anonymized_text = anonymized_text.replace(keyword, anonymized_value, 1)
logger.info(f"주소 키워드 매치 익명화 완료: '{keyword}' -> '{anonymized_value}' (원본: '{item.value}')")
break
else:
logger.warning(f"주소 '{item.value}'를 찾을 수 없음")
# 나머지 PII 처리
for item in other_items:
logger.info(f"익명화 처리 중: '{item.value}' ({item.type}) at {item.start_pos}-{item.end_pos}")
# PII 유형에 따른 익명화
if item.type == "이름":
anonymized_value = "[이름]"
elif item.type == "전화번호":
anonymized_value = "[전화번호]"
elif item.type == "이메일":
anonymized_value = "[이메일]"
elif item.type == "주소":
anonymized_value = "[주소]"
elif item.type == "여권번호":
anonymized_value = "[여권번호]"
else:
anonymized_value = f"[{item.type}]"
# 텍스트에서 치환 (강화된 문자열 치환 사용)
try:
# 현재 텍스트에서 해당 값이 있는지 확인
if item.value in anonymized_text:
# 문자열 치환 수행
anonymized_text = anonymized_text.replace(item.value, anonymized_value, 1) # 첫 번째 매치만 치환
logger.info(f"익명화 완료: '{item.value}' -> '{anonymized_value}'")
else:
# 대소문자 무시하고 찾기
import re
pattern = re.escape(item.value)
match = re.search(pattern, anonymized_text, re.IGNORECASE)
if match:
start, end = match.span()
anonymized_text = anonymized_text[:start] + anonymized_value + anonymized_text[end:]
logger.info(f"대소문자 무시 익명화 완료: '{item.value}' -> '{anonymized_value}'")
else:
# 부분 매치 시도 (주소의 경우)
if item.type == "주소" and len(item.value) > 3:
# 주소의 마지막 부분으로 찾기
last_part = item.value.split()[-1] if ' ' in item.value else item.value[-3:]
if last_part in anonymized_text:
anonymized_text = anonymized_text.replace(last_part, anonymized_value, 1)
logger.info(f"부분 매치 익명화 완료: '{last_part}' -> '{anonymized_value}' (원본: '{item.value}')")
else:
# 더 강력한 주소 매칭: 정규식으로 찾기
import re
# 주소 패턴을 정규식으로 변환
address_pattern = re.escape(item.value).replace(r'\ ', r'\s+')
match = re.search(address_pattern, anonymized_text, re.IGNORECASE)
if match:
start, end = match.span()
anonymized_text = anonymized_text[:start] + anonymized_value + anonymized_text[end:]
logger.info(f"정규식 매치 익명화 완료: '{item.value}' -> '{anonymized_value}'")
else:
# 마지막 시도: 주소의 핵심 키워드로 찾기
keywords = [word for word in item.value.split() if len(word) > 1]
for keyword in reversed(keywords): # 뒤에서부터 시도
if keyword in anonymized_text:
anonymized_text = anonymized_text.replace(keyword, anonymized_value, 1)
logger.info(f"키워드 매치 익명화 완료: '{keyword}' -> '{anonymized_value}' (원본: '{item.value}')")
break
else:
logger.warning(f"텍스트에서 '{item.value}'를 찾을 수 없음")
else:
logger.warning(f"텍스트에서 '{item.value}'를 찾을 수 없음")
except Exception as e:
logger.error(f"익명화 실패: {e}, item: {item}")
logger.info(f"익명화 완료: 결과 텍스트 길이={len(anonymized_text)}")
return anonymized_text
def _map_extraction_class(self, extraction_class: str) -> str:
"""langextract 클래스를 한국어 PII 유형으로 매핑"""
mapping = {
"name": "이름",
"email": "이메일",
"phone": "전화번호",
"passport_number": "여권번호",
"address": "주소"
}
return mapping.get(extraction_class, extraction_class)
def _get_pii_summary(self, pii_items: List[PIIItem]) -> Dict[str, int]:
"""PII 항목들의 요약 통계 반환"""
summary = {}
for item in pii_items:
if item.type in summary:
summary[item.type] += 1
else:
summary[item.type] = 1
return summary
class MCPPIIProcessor:
"""MCP용 PII 처리기 - 탐지 + 암호화"""
def __init__(self):
"""PII 처리기 초기화"""
self.detector = get_detector()
self.openai_client = OpenAI(api_key=os.environ.get("OPENAI_API_KEY"))
print("✅ MCP PII Processor 초기화 완료")
def process_text(self, text: str) -> Dict[str, Any]:
"""
텍스트에서 PII를 탐지하고 처리 (MCP Tool용)
Args:
text (str): 처리할 텍스트
Returns:
Dict[str, Any]: MCP Tool 응답 형식
"""
try:
start_time = time.time()
if not text:
return {
"success": True,
"original_text": "",
"anonymized_text": "",
"pii_items": [],
"count": 0,
"processing_time": 0,
"summary": {}
}
# 1. PII 탐지
detection_result = self.detector.detect_pii(text)
if not detection_result["success"]:
return detection_result
# PIIItem 객체로 변환
pii_items = [PIIItem(**item) for item in detection_result["pii_items"]]
# 2. 익명화 처리
anonymized_text = self.detector.anonymize_text(text, pii_items)
processing_time = time.time() - start_time
return {
"success": True,
"original_text": text,
"anonymized_text": anonymized_text,
"pii_items": [asdict(item) for item in pii_items],
"count": len(pii_items),
"processing_time": processing_time,
"summary": detection_result["summary"]
}
except Exception as e:
return {
"success": False,
"error": str(e),
"original_text": text,
"anonymized_text": text,
"pii_items": [],
"count": 0,
"processing_time": 0,
"summary": {}
}
def batch_process(self, texts: List[str]) -> Dict[str, Any]:
"""
여러 텍스트를 일괄 처리 (MCP Tool용)
Args:
texts (List[str]): 처리할 텍스트 리스트
Returns:
Dict[str, Any]: MCP Tool 응답 형식
"""
try:
start_time = time.time()
results = []
for i, text in enumerate(texts):
result = self.process_text(text)
result["index"] = i
results.append(result)
processing_time = time.time() - start_time
# 전체 통계
total_pii = sum(result["count"] for result in results if result["success"])
successful_count = sum(1 for result in results if result["success"])
return {
"success": True,
"results": results,
"total_texts": len(texts),
"successful_count": successful_count,
"total_pii_detected": total_pii,
"processing_time": processing_time,
"average_time_per_text": processing_time / len(texts) if texts else 0
}
except Exception as e:
return {
"success": False,
"error": str(e),
"results": [],
"total_texts": len(texts),
"successful_count": 0,
"total_pii_detected": 0,
"processing_time": 0,
"average_time_per_text": 0
}
# MCP Tool 함수들
@mcp.tool()
def mcp_detect_pii(text: str) -> Dict[str, Any]:
"""
MCP Tool: 텍스트에서 PII 탐지
Args:
text (str): 분석할 텍스트
Returns:
Dict[str, Any]: 탐지 결과
"""
detector = get_detector()
return detector.detect_pii(text)
@mcp.tool()
def mcp_process_text(text: str) -> Dict[str, Any]:
"""
MCP Tool: 텍스트 PII 처리 (탐지 + 익명화)
Args:
text (str): 처리할 텍스트
Returns:
Dict[str, Any]: 처리 결과
"""
processor = MCPPIIProcessor()
return processor.process_text(text)
@mcp.tool()
def mcp_batch_process(texts: List[str]) -> Dict[str, Any]:
"""
MCP Tool: 여러 텍스트 일괄 처리
Args:
texts (List[str]): 처리할 텍스트 리스트
Returns:
Dict[str, Any]: 일괄 처리 결과
"""
processor = MCPPIIProcessor()
return processor.batch_process(texts)
@mcp.tool()
def mcp_anonymize_text(text: str, pii_items: List[Dict[str, Any]]) -> str:
"""
MCP Tool: 텍스트 익명화
Args:
text (str): 원본 텍스트
pii_items (List[Dict[str, Any]]): PII 항목들
Returns:
str: 익명화된 텍스트
"""
detector = get_detector()
pii_objects = [PIIItem(**item) for item in pii_items]
return detector.anonymize_text(text, pii_objects)
@mcp.tool()
def mcp_encrypt_pii_item(pii_value: str, pii_type: str) -> Dict[str, Any]:
"""
MCP Tool: PII 항목 암호화
Args:
pii_value (str): 암호화할 PII 값
pii_type (str): PII 유형 (이름, 전화번호, 이메일 등)
Returns:
Dict[str, Any]: 암호화 결과
"""
try:
crypto = PIICrypto()
encrypted_value = crypto.encrypt_pii_item(pii_value, pii_type)
return {
"success": True,
"original_value": pii_value,
"encrypted_value": encrypted_value,
"pii_type": pii_type,
"encryption_method": "deterministic" if pii_type in crypto.deterministic_types else "fpe"
}
except Exception as e:
return {
"success": False,
"error": str(e),
"original_value": pii_value,
"encrypted_value": "",
"pii_type": pii_type
}
@mcp.tool()
def mcp_decrypt_pii_item(encrypted_value: str, pii_type: str) -> Dict[str, Any]:
"""
MCP Tool: PII 항목 복호화
Args:
encrypted_value (str): 복호화할 암호화된 값
pii_type (str): PII 유형
Returns:
Dict[str, Any]: 복호화 결과
"""
try:
crypto = PIICrypto()
decrypted_value = crypto.decrypt_pii_item(encrypted_value, pii_type)
return {
"success": True,
"encrypted_value": encrypted_value,
"decrypted_value": decrypted_value,
"pii_type": pii_type,
"decryption_method": "deterministic" if pii_type in crypto.deterministic_types else "fpe"
}
except Exception as e:
return {
"success": False,
"error": str(e),
"encrypted_value": encrypted_value,
"decrypted_value": "",
"pii_type": pii_type
}
@mcp.tool()
def mcp_encrypt_text_pii(text: str) -> Dict[str, Any]:
"""
MCP Tool: 텍스트에서 PII를 탐지하고 암호화
Args:
text (str): 처리할 텍스트
Returns:
Dict[str, Any]: 암호화 처리 결과
"""
try:
start_time = time.time()
# 1. PII 탐지
detection_result = mcp_detect_pii(text)
if not detection_result["success"]:
return detection_result
if not detection_result["pii_items"]:
return {
"success": True,
"original_text": text,
"encrypted_text": text,
"pii_items": [],
"encrypted_items": {},
"count": 0,
"processing_time": time.time() - start_time,
"summary": {}
}
# 2. PII 항목들 암호화
crypto = PIICrypto()
encrypted_items = {}
encrypted_text = text
for item in detection_result["pii_items"]:
pii_type = item["type"]
pii_value = item["value"]
# 암호화
encrypted_value = crypto.encrypt_pii_item(pii_value, pii_type)
encrypted_items[pii_value] = encrypted_value
# 텍스트에서 치환
encrypted_text = encrypted_text.replace(pii_value, encrypted_value)
processing_time = time.time() - start_time
return {
"success": True,
"original_text": text,
"encrypted_text": encrypted_text,
"pii_items": detection_result["pii_items"],
"encrypted_items": encrypted_items,
"count": len(detection_result["pii_items"]),
"processing_time": processing_time,
"summary": detection_result["summary"]
}
except Exception as e:
return {
"success": False,
"error": str(e),
"original_text": text,
"encrypted_text": text,
"pii_items": [],
"encrypted_items": {},
"count": 0,
"processing_time": 0,
"summary": {}
}
@mcp.tool()
def mcp_decrypt_text_pii(encrypted_text: str, encrypted_items: Dict[str, str]) -> Dict[str, Any]:
"""
MCP Tool: 암호화된 텍스트에서 PII 복호화
Args:
encrypted_text (str): 암호화된 텍스트
encrypted_items (Dict[str, str]): 원본값 -> 암호화값 매핑
Returns:
Dict[str, Any]: 복호화 결과
"""
try:
start_time = time.time()
# 암호화값 -> 원본값 매핑 생성
decrypted_items = {}
decrypted_text = encrypted_text
crypto = PIICrypto()
for original_value, encrypted_value in encrypted_items.items():
# PII 유형 추정 (암호화된 값을 기준으로)
pii_type = _guess_pii_type(encrypted_value)
# Base64 패턴인 경우 (결정론적 암호화) 여러 유형 시도
if re.match(r'^[A-Za-z0-9+/=]+$', encrypted_value) and len(encrypted_value) > 20:
# 이름, 주소, 이메일 순서로 시도
decrypted_value = None
for try_type in ["이름", "주소", "이메일"]:
try:
result = crypto.decrypt_pii_item(encrypted_value, try_type)
if not (result.startswith("[복호화실패:") or result.startswith("[FPE복호화실패:")):
decrypted_value = result
pii_type = try_type
break
except:
continue
if decrypted_value is None:
decrypted_value = f"[복호화실패:{encrypted_value[:20]}...]"
else:
# FPE 암호화 또는 다른 패턴
decrypted_value = crypto.decrypt_pii_item(encrypted_value, pii_type)
decrypted_items[encrypted_value] = decrypted_value
# 텍스트에서 복호화
decrypted_text = decrypted_text.replace(encrypted_value, decrypted_value)
processing_time = time.time() - start_time
return {
"success": True,
"encrypted_text": encrypted_text,
"decrypted_text": decrypted_text,
"decrypted_items": decrypted_items,
"count": len(encrypted_items),
"processing_time": processing_time
}
except Exception as e:
return {
"success": False,
"error": str(e),
"encrypted_text": encrypted_text,
"decrypted_text": encrypted_text,
"decrypted_items": {},
"count": 0,
"processing_time": 0
}
def _guess_pii_type(value: str) -> str:
"""PII 값으로부터 유형을 추정하는 헬퍼 함수 (암호화된 값도 고려)"""
import re
# 이메일 패턴
if re.match(r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$', value):
return "이메일"
# 전화번호 패턴 (한국) - 하이픈 포함
if re.match(r'^\d{3}-\d{4}-\d{4}$', value) or re.match(r'^\d{2,3}-\d{3,4}-\d{4}$', value):
return "전화번호"
# 신용카드번호 패턴 - 하이픈 포함
if re.match(r'^\d{4}-\d{4}-\d{4}-\d{4}$', value):
return "신용카드번호"
# 여권번호 패턴 - 알파벳 + 숫자 (암호화된 경우도 고려)
if re.match(r'^[A-Z]\d{8,9}$', value) or re.match(r'^[A-Z]\d{7,10}$', value):
return "여권번호"
# 주민등록번호 패턴 - 하이픈 포함
if re.match(r'^\d{6}-\d{7}$', value):
return "주민등록번호"
# 주소 패턴 (한국) - 한글 키워드 포함
if any(keyword in value for keyword in ["시", "구", "로", "동", "번지"]):
return "주소"
# Base64 패턴 (결정론적 암호화 결과) - 이름, 주소, 이메일만 사용
if re.match(r'^[A-Za-z0-9+/=]+$', value):
# Base64는 결정론적 암호화 결과이므로 이름, 주소, 이메일 중 하나
# 길이로 대략적인 구분 (정확하지 않을 수 있음)
if len(value) < 50:
return "이름" # 짧은 Base64는 보통 이름
elif len(value) < 70:
return "이메일" # 중간 길이 Base64는 보통 이메일
else:
return "주소" # 긴 Base64는 보통 주소
# 기본적으로 이름으로 추정
return "이름"
# MCP Tool 메타데이터
MCP_TOOLS = {
"detect_pii": {
"name": "detect_pii",
"description": "텍스트에서 PII(개인정보)를 탐지합니다. 이름, 이메일, 전화번호, 여권번호, 주소 등을 찾습니다.",
"parameters": {
"type": "object",
"properties": {
"text": {
"type": "string",
"description": "분석할 텍스트"
}
},
"required": ["text"]
}
},
"process_text": {
"name": "process_text",
"description": "텍스트에서 PII(개인 정보) 를 탐지하고 익명화 처리합니다.",
"parameters": {
"type": "object",
"properties": {
"text": {
"type": "string",
"description": "처리할 텍스트"
}
},
"required": ["text"]
}
},
"batch_process": {
"name": "batch_process",
"description": "여러 텍스트를 일괄적으로 PII(개인 정보) 처리합니다.",
"parameters": {
"type": "object",
"properties": {
"texts": {
"type": "array",
"items": {"type": "string"},
"description": "처리할 텍스트 리스트"
}
},
"required": ["texts"]
}
},
"anonymize_text": {
"name": "anonymize_text",
"description": "PII(개인 정보) 항목들을 사용하여 텍스트를 익명화합니다.",
"parameters": {
"type": "object",
"properties": {
"text": {
"type": "string",
"description": "원본 텍스트"
},
"pii_items": {
"type": "array",
"items": {
"type": "object",
"properties": {
"type": {"type": "string"},
"value": {"type": "string"},
"confidence": {"type": "number"},
"start_pos": {"type": "number"},
"end_pos": {"type": "number"}
}
},
"description": "PII 항목들"
}
},
"required": ["text", "pii_items"]
}
},
"encrypt_pii_item": {
"name": "encrypt_pii_item",
"description": "개별 PII 항목을 암호화합니다. 이름, 주소, 이메일은 결정론적 암호화, 전화번호, 신용카드번호 등은 FPE(형식 유지 암호화)를 사용합니다.",
"parameters": {
"type": "object",
"properties": {
"pii_value": {
"type": "string",
"description": "암호화할 PII 값"
},
"pii_type": {
"type": "string",
"description": "PII 유형 (이름, 전화번호, 이메일, 주소, 신용카드번호, 여권번호, 주민등록번호 등)"
}
},
"required": ["pii_value", "pii_type"]
}
},
"decrypt_pii_item": {
"name": "decrypt_pii_item",
"description": "암호화된 PII 항목을 복호화합니다.",
"parameters": {
"type": "object",
"properties": {
"encrypted_value": {
"type": "string",
"description": "복호화할 암호화된 값"
},
"pii_type": {
"type": "string",
"description": "PII 유형 (암호화 시 사용한 유형과 동일해야 함)"
}
},
"required": ["encrypted_value", "pii_type"]
}
},
"encrypt_text_pii": {
"name": "encrypt_text_pii",
"description": "텍스트에서 PII를 탐지하고 모든 PII를 암호화합니다. 검색 가능한 암호화된 텍스트를 생성합니다.",
"parameters": {
"type": "object",
"properties": {
"text": {
"type": "string",
"description": "처리할 텍스트"
}
},
"required": ["text"]
}
},
"decrypt_text_pii": {
"name": "decrypt_text_pii",
"description": "암호화된 텍스트에서 PII를 복호화합니다.",
"parameters": {
"type": "object",
"properties": {
"encrypted_text": {
"type": "string",
"description": "암호화된 텍스트"
},
"encrypted_items": {
"type": "object",
"description": "원본값 -> 암호화값 매핑 딕셔너리"
}
},
"required": ["encrypted_text", "encrypted_items"]
}
}
}
def main():
"""테스트 함수"""
print("=== MCP PII Tools 테스트 ===\n")
# 테스트 텍스트
test_text = "오늘은 Rachel Lim이(가) 방문했다. 이메일 주소는 rachel.lim@gmail.com이고 전화번호는 010-1234-5678입니다. 여권번호는 M31143886입니다."
print(f"테스트 텍스트: {test_text}\n")
# 1. PII 탐지 테스트
print("1. PII 탐지 테스트:")
detection_result = mcp_detect_pii(test_text)
print(f"성공: {detection_result['success']}")
print(f"탐지된 PII: {detection_result['count']}개")
print(f"처리 시간: {detection_result['processing_time']:.3f}초")
print(f"요약: {detection_result['summary']}")
if detection_result['pii_items']:
print("탐지된 PII 상세:")
for item in detection_result['pii_items']:
print(f" - {item['type']}: {item['value']}")
print()
# 2. 텍스트 처리 테스트
print("2. 텍스트 처리 테스트:")
process_result = mcp_process_text(test_text)
print(f"성공: {process_result['success']}")
print(f"원본: {process_result['original_text']}")
print(f"익명화: {process_result['anonymized_text']}")
print()
# 3. 일괄 처리 테스트
print("3. 일괄 처리 테스트:")
batch_texts = [
"김철수 씨가 방문했다. 연락처는 010-1234-5678입니다.",
"박지현 님의 이메일은 park@test.com입니다.",
"John Smith의 주소는 서울시 강남구입니다."
]
batch_result = mcp_batch_process(batch_texts)
print(f"성공: {batch_result['success']}")
print(f"총 텍스트: {batch_result['total_texts']}개")
print(f"성공한 처리: {batch_result['successful_count']}개")
print(f"총 탐지된 PII: {batch_result['total_pii_detected']}개")
print(f"평균 처리 시간: {batch_result['average_time_per_text']:.3f}초")
print()
# 4. 개별 PII 항목 암호화/복호화 테스트
print("4. 개별 PII 항목 암호화/복호화 테스트:")
# 결정론적 암호화 테스트 (이름, 주소, 이메일)
print("4-1. 결정론적 암호화 (검색 가능):")
deterministic_tests = [
("김철수", "이름"),
("서울시 강남구 테헤란로 123", "주소"),
("kim@example.com", "이메일")
]
for original, pii_type in deterministic_tests:
# 암호화
encrypt_result = mcp_encrypt_pii_item(original, pii_type)
if encrypt_result['success']:
encrypted = encrypt_result['encrypted_value']
print(f" {pii_type}: {original} → {encrypted[:50]}...")
# 복호화
decrypt_result = mcp_decrypt_pii_item(encrypted, pii_type)
if decrypt_result['success']:
decrypted = decrypt_result['decrypted_value']
match = "✅" if original == decrypted else "❌"
print(f" 복호화: {decrypted} {match}")
# 결정론적 테스트 (같은 입력 → 같은 암호문)
encrypt_result2 = mcp_encrypt_pii_item(original, pii_type)
if encrypt_result2['success']:
encrypted2 = encrypt_result2['encrypted_value']
deterministic = "✅" if encrypted == encrypted2 else "❌"
print(f" 결정론적: {deterministic}")
print()
# FPE 암호화 테스트 (전화번호, 카드번호, 여권번호 등)
print("4-2. FPE 암호화 (형식 유지):")
fpe_tests = [
("010-1234-5678", "전화번호"),
("1234-5678-9012-3456", "신용카드번호"),
("M12345678", "여권번호"),
("123456-1234567", "주민등록번호")
]
for original, pii_type in fpe_tests:
# 암호화
encrypt_result = mcp_encrypt_pii_item(original, pii_type)
if encrypt_result['success']:
encrypted = encrypt_result['encrypted_value']
print(f" {pii_type}: {original} → {encrypted}")
# 복호화
decrypt_result = mcp_decrypt_pii_item(encrypted, pii_type)
if decrypt_result['success']:
decrypted = decrypt_result['decrypted_value']
match = "✅" if original == decrypted else "❌"
format_preserved = "✅" if len(original) == len(encrypted) else "❌"
print(f" 복호화: {decrypted} {match}")
print(f" 형식유지: {format_preserved}")
print()
# 5. 텍스트 전체 PII 암호화/복호화 테스트
print("5. 텍스트 전체 PII 암호화/복호화 테스트:")
# 암호화된 텍스트 생성
encrypt_text_result = mcp_encrypt_text_pii(test_text)
if encrypt_text_result['success']:
print(f"원본 텍스트: {encrypt_text_result['original_text']}")
print(f"암호화 텍스트: {encrypt_text_result['encrypted_text']}")
print(f"암호화된 PII: {encrypt_text_result['count']}개")
print(f"처리 시간: {encrypt_text_result['processing_time']:.3f}초")
# 암호화된 항목들 표시
if encrypt_text_result['encrypted_items']:
print("암호화된 항목들:")
for original, encrypted in encrypt_text_result['encrypted_items'].items():
print(f" {original} → {encrypted}")
print()
# 복호화 테스트
decrypt_text_result = mcp_decrypt_text_pii(
encrypt_text_result['encrypted_text'],
encrypt_text_result['encrypted_items']
)
if decrypt_text_result['success']:
print(f"복호화 텍스트: {decrypt_text_result['decrypted_text']}")
print(f"복호화된 항목: {decrypt_text_result['count']}개")
print(f"복호화 시간: {decrypt_text_result['processing_time']:.3f}초")
# 원본과 복호화 결과 비교
original_match = "✅" if test_text == decrypt_text_result['decrypted_text'] else "❌"
print(f"원본 일치: {original_match}")
print()
# 6. 암호화 방식 비교 테스트
print("6. 암호화 방식 비교 테스트:")
# 같은 이름을 여러 번 암호화해서 결정론적 암호화 확인
test_name = "김철수"
print(f"테스트 이름: {test_name}")
encrypt1 = mcp_encrypt_pii_item(test_name, "이름")
encrypt2 = mcp_encrypt_pii_item(test_name, "이름")
if encrypt1['success'] and encrypt2['success']:
same_encryption = "✅" if encrypt1['encrypted_value'] == encrypt2['encrypted_value'] else "❌"
print(f"결정론적 암호화 (같은 입력 → 같은 암호문): {same_encryption}")
print(f" 암호문1: {encrypt1['encrypted_value'][:50]}...")
print(f" 암호문2: {encrypt2['encrypted_value'][:50]}...")
print()
# 7. 다양한 PII 유형별 암호화 테스트
print("7. 다양한 PII 유형별 암호화 테스트:")
comprehensive_tests = [
("홍길동", "이름"),
("서울시 강남구 역삼동 123-45", "주소"),
("hong@company.co.kr", "이메일"),
("010-9876-5432", "전화번호"),
("02-123-4567", "전화번호"),
("4532-1234-5678-9012", "신용카드번호"),
("123-456-789012", "은행계좌번호"),
("M98765432", "여권번호"),
("A12345678", "여권번호"),
("901234-1234567", "주민등록번호")
]
for original, pii_type in comprehensive_tests:
encrypt_result = mcp_encrypt_pii_item(original, pii_type)
if encrypt_result['success']:
encrypted = encrypt_result['encrypted_value']
method = encrypt_result['encryption_method']
# 복호화로 검증
decrypt_result = mcp_decrypt_pii_item(encrypted, pii_type)
if decrypt_result['success']:
decrypted = decrypt_result['decrypted_value']
match = "✅" if original == decrypted else "❌"
print(f" {pii_type:8} | {original:20} → {encrypted:30} | {method:12} | {match}")
print()
print("✅ MCP PII Tools 테스트 완료!")
if __name__ == "__main__":
# 서버 정보 출력
logger.info("Starting MCP server: pii-tools-mcp")
logger.info("Available tools: detect_pii, process_text, batch_process, anonymize_text, encrypt_pii_item, decrypt_pii_item, encrypt_text_pii, decrypt_text_pii")
# Provider 정보를 stderr로 출력
try:
provider_config = get_provider_config()
print(f"🔧 MCP PII Tools 설정 - Provider: {provider_config['provider_type'].upper()}, Model: {provider_config['model_id']}", file=sys.stderr)
except Exception as e:
print(f"⚠️ Provider 설정 로드 실패: {e}", file=sys.stderr)
# PII 탐지기 초기화 (Provider 정보 출력을 위해)
try:
detector = get_detector()
print(f"✅ MCP PII Tools 준비 완료 - Provider: {detector.provider_type.upper()}, Model: {detector.model_id}", file=sys.stderr)
except Exception as e:
print(f"❌ MCP PII Tools 초기화 실패: {e}", file=sys.stderr)
# STDIO 전송으로 서버 실행 (Claude/Inspector가 붙음)
mcp.run(transport="stdio")