Skip to main content
Glama

MCP PII Tools

by czangyeob
mcp_pii_tools.py49.2 kB
#!/usr/bin/env python3 """ MCP (Model Context Protocol) Tools for PII Detection and Processing PII 탐지 및 처리 기능을 MCP Tool로 제공 """ import os import re import sys import time from typing import Dict, List, Optional, Any from dataclasses import dataclass, asdict # from dotenv import load_dotenv # Claude Desktop에서 환경변수를 직접 설정하므로 불필요 import langextract as lx from pii_crypto import PIICrypto from openai import OpenAI from vllm_provider import VLLMProvider, ModelProviderFactory, get_provider_config from mcp.server.fastmcp import FastMCP import logging # 로깅 설정 logging.basicConfig(level=logging.DEBUG) logger = logging.getLogger(__name__) mcp = FastMCP("hello-mcp") # 전역 PII 탐지기 인스턴스 (서버 시작 시 초기화) _global_detector = None def get_detector(): """전역 PII 탐지기 인스턴스를 반환합니다.""" global _global_detector if _global_detector is None: _global_detector = MCPPIIDetector() return _global_detector # 환경 변수 로드 (Claude Desktop에서 직접 설정됨) # load_dotenv() @dataclass class PIIItem: """PII 항목을 나타내는 데이터 클래스""" type: str # PII 유형 (이름, 전화번호, 이메일 등) value: str # 추출된 값 confidence: float # 신뢰도 (0.0-1.0) start_pos: int # 텍스트 내 시작 위치 end_pos: int # 텍스트 내 끝 위치 @dataclass class ProcessedPII: """처리된 PII 정보""" original_text: str # 원본 텍스트 encrypted_text: str # 암호화된 텍스트 (검색용) anonymized_text: str # 완전 익명화된 텍스트 (표시용) pii_items: List[PIIItem] # 탐지된 PII 항목들 encrypted_items: Dict[str, str] # 암호화된 PII 매핑 class MCPPIIDetector: """MCP용 PII 탐지기 - langextract 기반""" def __init__(self): """PII 탐지기 초기화""" # Provider 설정 로드 self.provider_config = get_provider_config() self.provider_type = self.provider_config["provider_type"] self.model_id = self.provider_config["model_id"] self.api_key = self.provider_config["api_key"] # MCP 서버 시작 시 Provider 정보를 stderr로 출력 print(f"🚀 MCP PII Tools 시작 - Provider: {self.provider_type.upper()}, Model: {self.model_id}", file=sys.stderr) if not self.api_key: raise ValueError(f"{self.provider_type.upper()} API 키가 설정되지 않았습니다. 환경변수를 확인하세요.") # Provider 인스턴스 생성 self.provider = ModelProviderFactory.create_provider( provider_type=self.provider_type, model_id=self.model_id, api_key=self.api_key ) logger.info(f"PII 탐지기 초기화 완료 - Provider: {self.provider_type}, Model: {self.model_id}") # langextract 예제 데이터 설정 self.examples = [ lx.data.ExampleData( text="오늘은 Rachel Lim이(가) 방문했다. 이메일 주소는 rachel.lim@gmail.com이다. 여권번호는 M31143886이다.", extractions=[ lx.data.Extraction(extraction_class="name", extraction_text="Rachel Lim"), lx.data.Extraction(extraction_class="email", extraction_text="rachel.lim@gmail.com"), lx.data.Extraction(extraction_class="passport_number", extraction_text="M31143886"), ], ), lx.data.ExampleData( text="김철수 씨가 테이블에 앉았다. 연락처는 010-1234-5678이고 이메일은 kim@example.com입니다.", extractions=[ lx.data.Extraction(extraction_class="name", extraction_text="김철수"), lx.data.Extraction(extraction_class="phone", extraction_text="010-1234-5678"), lx.data.Extraction(extraction_class="email", extraction_text="kim@example.com"), ], ), lx.data.ExampleData( text="박지현 님이 딜러와 대화했다. 주소는 서울시 강남구 테헤란로 123입니다.", extractions=[ lx.data.Extraction(extraction_class="name", extraction_text="박지현"), lx.data.Extraction(extraction_class="address", extraction_text="서울시 강남구 테헤란로 123"), ], ), lx.data.ExampleData( text="이름은 김창엽이고 주소는 분당구야. 전화번호는 010-9876-5432야.", extractions=[ lx.data.Extraction(extraction_class="name", extraction_text="김창엽"), lx.data.Extraction(extraction_class="address", extraction_text="분당구"), lx.data.Extraction(extraction_class="phone", extraction_text="010-9876-5432"), ], ), lx.data.ExampleData( text="사는 곳은 강남구이고 이름은 박민수입니다. 연락처는 010-5555-1234입니다.", extractions=[ lx.data.Extraction(extraction_class="address", extraction_text="강남구"), lx.data.Extraction(extraction_class="name", extraction_text="박민수"), lx.data.Extraction(extraction_class="phone", extraction_text="010-5555-1234"), ], ), lx.data.ExampleData( text="이름은 김창엽이고 주소는 경기도 성남시 분당구야. 전화번호는 010-9876-5432야 여권번호는 M3234234 으로 확인되.", extractions=[ lx.data.Extraction(extraction_class="name", extraction_text="김창엽"), lx.data.Extraction(extraction_class="address", extraction_text="경기도 성남시 분당구"), lx.data.Extraction(extraction_class="phone", extraction_text="010-9876-5432"), lx.data.Extraction(extraction_class="passport_number", extraction_text="M3234234"), ], ), ] self.prompt = """주어진 텍스트에서 PII(개인정보)를 추출하세요. 다음 항목들을 찾아주세요: - 이름: 사람의 이름 (김철수, 박지현, Rachel Lim 등) - 이메일: 이메일 주소 (example@email.com 등) - 전화번호: 전화번호 (010-1234-5678, 02-123-4567 등) - 여권번호: 여권 번호 (M31143886, M3234234 등) - 주소: 주소 정보 (서울시 강남구, 분당구, 강남구, 서초구, 경기도 성남시 분당구 등) 중요: 주소의 경우 다음을 모두 주소로 인식해주세요: - "분당구", "강남구", "서초구" 같은 지역명 - "경기도 성남시 분당구" 같은 구체적인 주소 - "주소는 분당구야"에서 "분당구"는 주소입니다. - "주소는 경기도 성남시 분당구야"에서 "경기도 성남시 분당구"는 주소입니다. 응답은 반드시 ```json 코드 블록으로 감싸서 반환하세요.""" print("✅ MCP PII Detector 초기화 완료") def detect_pii(self, text: str) -> Dict[str, Any]: """ 텍스트에서 PII를 탐지 (MCP Tool용) Args: text (str): 분석할 텍스트 Returns: Dict[str, Any]: MCP Tool 응답 형식 """ try: start_time = time.time() # Provider에 따른 langextract 호출 if self.provider_type == "vllm": # vLLM Provider 사용 result = lx.extract( text_or_documents=text, prompt_description=self.prompt, examples=self.examples, model=self.provider, # 커스텀 Provider 인스턴스 사용 use_schema_constraints=False ) else: # OpenAI Provider 사용 (기본) os.environ["OPENAI_BASE_URL"] = "https://api.openai.com/v1" result = lx.extract( text_or_documents=text, prompt_description=self.prompt, examples=self.examples, model_id=self.model_id, api_key=self.api_key, fence_output=True ) # 결과를 PIIItem 리스트로 변환 pii_items = [] logger.info(f"탐지된 extraction 수: {len(result.extractions)}") for i, extraction in enumerate(result.extractions): logger.info(f"Extraction {i+1}: class='{extraction.extraction_class}', text='{extraction.extraction_text}'") # char_interval이 없으면 텍스트에서 직접 위치 찾기 start_pos = 0 end_pos = 0 if extraction.char_interval: start_pos = extraction.char_interval.start_pos end_pos = extraction.char_interval.end_pos logger.info(f" char_interval 사용: {start_pos}-{end_pos}") else: # 텍스트에서 직접 위치 찾기 (대소문자 구분 없이) search_text = extraction.extraction_text start_pos = text.find(search_text) # 대소문자 구분 없이 찾기 if start_pos == -1: start_pos = text.lower().find(search_text.lower()) if start_pos != -1: end_pos = start_pos + len(search_text) logger.info(f" 텍스트에서 직접 찾음: {start_pos}-{end_pos}") # 실제 찾은 텍스트와 원본이 일치하는지 확인 actual_found = text[start_pos:end_pos] if actual_found != search_text: logger.warning(f" 대소문자 차이: 찾은='{actual_found}', 원본='{search_text}'") else: start_pos = -1 end_pos = -1 logger.warning(f" 텍스트에서 찾을 수 없음: '{extraction.extraction_text}'") mapped_type = self._map_extraction_class(extraction.extraction_class) logger.info(f" 매핑된 타입: '{extraction.extraction_class}' -> '{mapped_type}'") pii_items.append(PIIItem( type=mapped_type, value=extraction.extraction_text, confidence=0.9, # langextract는 confidence를 제공하지 않으므로 기본값 start_pos=start_pos, end_pos=end_pos )) processing_time = time.time() - start_time return { "success": True, "pii_items": [asdict(item) for item in pii_items], "count": len(pii_items), "processing_time": processing_time, "summary": self._get_pii_summary(pii_items) } except Exception as e: return { "success": False, "error": str(e), "pii_items": [], "count": 0, "processing_time": 0, "summary": {} } def anonymize_text(self, text: str, pii_items: List[PIIItem]) -> str: """텍스트에서 PII를 익명화 처리""" if not pii_items: return text logger.info(f"익명화 시작: 원본 텍스트 길이={len(text)}") logger.info(f"익명화할 PII 항목 수: {len(pii_items)}") # PII 항목들을 위치별로 정렬하고 중복 제거 valid_items = [] for item in pii_items: if item.start_pos != -1 and item.end_pos != -1 and item.start_pos < len(text): # 실제 텍스트에서 해당 위치의 내용이 일치하는지 확인 actual_text = text[item.start_pos:item.end_pos] if actual_text == item.value: valid_items.append(item) logger.info(f"유효한 PII: '{item.value}' ({item.type}) at {item.start_pos}-{item.end_pos}") else: # 주소의 경우 위치 불일치를 무시하고 유효한 것으로 처리 if item.type == "주소": valid_items.append(item) logger.info(f"주소 위치 불일치 무시: '{item.value}' ({item.type}) at {item.start_pos}-{item.end_pos}") else: logger.warning(f"위치 불일치: 예상='{item.value}', 실제='{actual_text}' at {item.start_pos}-{item.end_pos}") else: # 주소의 경우 위치가 유효하지 않아도 처리 if item.type == "주소": valid_items.append(item) logger.info(f"주소 위치 무효 무시: '{item.value}' ({item.type}) at {item.start_pos}-{item.end_pos}") else: logger.warning(f"유효하지 않은 위치: '{item.value}' ({item.type}) at {item.start_pos}-{item.end_pos}") # 뒤에서부터 치환 (인덱스 변화 방지) sorted_items = sorted(valid_items, key=lambda x: x.start_pos, reverse=True) anonymized_text = text # 주소의 경우 특별 처리: langextract 위치 정보 무시하고 직접 찾기 address_items = [item for item in sorted_items if item.type == "주소"] other_items = [item for item in sorted_items if item.type != "주소"] # 주소 먼저 처리 for item in address_items: logger.info(f"주소 특별 처리: '{item.value}' ({item.type})") # PII 유형에 따른 익명화 anonymized_value = "[주소]" # 직접 텍스트에서 찾기 if item.value in anonymized_text: anonymized_text = anonymized_text.replace(item.value, anonymized_value, 1) logger.info(f"주소 직접 매치 익명화 완료: '{item.value}' -> '{anonymized_value}'") else: # 부분 매치 시도 keywords = [word for word in item.value.split() if len(word) > 1] for keyword in reversed(keywords): # 뒤에서부터 시도 if keyword in anonymized_text: anonymized_text = anonymized_text.replace(keyword, anonymized_value, 1) logger.info(f"주소 키워드 매치 익명화 완료: '{keyword}' -> '{anonymized_value}' (원본: '{item.value}')") break else: logger.warning(f"주소 '{item.value}'를 찾을 수 없음") # 나머지 PII 처리 for item in other_items: logger.info(f"익명화 처리 중: '{item.value}' ({item.type}) at {item.start_pos}-{item.end_pos}") # PII 유형에 따른 익명화 if item.type == "이름": anonymized_value = "[이름]" elif item.type == "전화번호": anonymized_value = "[전화번호]" elif item.type == "이메일": anonymized_value = "[이메일]" elif item.type == "주소": anonymized_value = "[주소]" elif item.type == "여권번호": anonymized_value = "[여권번호]" else: anonymized_value = f"[{item.type}]" # 텍스트에서 치환 (강화된 문자열 치환 사용) try: # 현재 텍스트에서 해당 값이 있는지 확인 if item.value in anonymized_text: # 문자열 치환 수행 anonymized_text = anonymized_text.replace(item.value, anonymized_value, 1) # 첫 번째 매치만 치환 logger.info(f"익명화 완료: '{item.value}' -> '{anonymized_value}'") else: # 대소문자 무시하고 찾기 import re pattern = re.escape(item.value) match = re.search(pattern, anonymized_text, re.IGNORECASE) if match: start, end = match.span() anonymized_text = anonymized_text[:start] + anonymized_value + anonymized_text[end:] logger.info(f"대소문자 무시 익명화 완료: '{item.value}' -> '{anonymized_value}'") else: # 부분 매치 시도 (주소의 경우) if item.type == "주소" and len(item.value) > 3: # 주소의 마지막 부분으로 찾기 last_part = item.value.split()[-1] if ' ' in item.value else item.value[-3:] if last_part in anonymized_text: anonymized_text = anonymized_text.replace(last_part, anonymized_value, 1) logger.info(f"부분 매치 익명화 완료: '{last_part}' -> '{anonymized_value}' (원본: '{item.value}')") else: # 더 강력한 주소 매칭: 정규식으로 찾기 import re # 주소 패턴을 정규식으로 변환 address_pattern = re.escape(item.value).replace(r'\ ', r'\s+') match = re.search(address_pattern, anonymized_text, re.IGNORECASE) if match: start, end = match.span() anonymized_text = anonymized_text[:start] + anonymized_value + anonymized_text[end:] logger.info(f"정규식 매치 익명화 완료: '{item.value}' -> '{anonymized_value}'") else: # 마지막 시도: 주소의 핵심 키워드로 찾기 keywords = [word for word in item.value.split() if len(word) > 1] for keyword in reversed(keywords): # 뒤에서부터 시도 if keyword in anonymized_text: anonymized_text = anonymized_text.replace(keyword, anonymized_value, 1) logger.info(f"키워드 매치 익명화 완료: '{keyword}' -> '{anonymized_value}' (원본: '{item.value}')") break else: logger.warning(f"텍스트에서 '{item.value}'를 찾을 수 없음") else: logger.warning(f"텍스트에서 '{item.value}'를 찾을 수 없음") except Exception as e: logger.error(f"익명화 실패: {e}, item: {item}") logger.info(f"익명화 완료: 결과 텍스트 길이={len(anonymized_text)}") return anonymized_text def _map_extraction_class(self, extraction_class: str) -> str: """langextract 클래스를 한국어 PII 유형으로 매핑""" mapping = { "name": "이름", "email": "이메일", "phone": "전화번호", "passport_number": "여권번호", "address": "주소" } return mapping.get(extraction_class, extraction_class) def _get_pii_summary(self, pii_items: List[PIIItem]) -> Dict[str, int]: """PII 항목들의 요약 통계 반환""" summary = {} for item in pii_items: if item.type in summary: summary[item.type] += 1 else: summary[item.type] = 1 return summary class MCPPIIProcessor: """MCP용 PII 처리기 - 탐지 + 암호화""" def __init__(self): """PII 처리기 초기화""" self.detector = get_detector() self.openai_client = OpenAI(api_key=os.environ.get("OPENAI_API_KEY")) print("✅ MCP PII Processor 초기화 완료") def process_text(self, text: str) -> Dict[str, Any]: """ 텍스트에서 PII를 탐지하고 처리 (MCP Tool용) Args: text (str): 처리할 텍스트 Returns: Dict[str, Any]: MCP Tool 응답 형식 """ try: start_time = time.time() if not text: return { "success": True, "original_text": "", "anonymized_text": "", "pii_items": [], "count": 0, "processing_time": 0, "summary": {} } # 1. PII 탐지 detection_result = self.detector.detect_pii(text) if not detection_result["success"]: return detection_result # PIIItem 객체로 변환 pii_items = [PIIItem(**item) for item in detection_result["pii_items"]] # 2. 익명화 처리 anonymized_text = self.detector.anonymize_text(text, pii_items) processing_time = time.time() - start_time return { "success": True, "original_text": text, "anonymized_text": anonymized_text, "pii_items": [asdict(item) for item in pii_items], "count": len(pii_items), "processing_time": processing_time, "summary": detection_result["summary"] } except Exception as e: return { "success": False, "error": str(e), "original_text": text, "anonymized_text": text, "pii_items": [], "count": 0, "processing_time": 0, "summary": {} } def batch_process(self, texts: List[str]) -> Dict[str, Any]: """ 여러 텍스트를 일괄 처리 (MCP Tool용) Args: texts (List[str]): 처리할 텍스트 리스트 Returns: Dict[str, Any]: MCP Tool 응답 형식 """ try: start_time = time.time() results = [] for i, text in enumerate(texts): result = self.process_text(text) result["index"] = i results.append(result) processing_time = time.time() - start_time # 전체 통계 total_pii = sum(result["count"] for result in results if result["success"]) successful_count = sum(1 for result in results if result["success"]) return { "success": True, "results": results, "total_texts": len(texts), "successful_count": successful_count, "total_pii_detected": total_pii, "processing_time": processing_time, "average_time_per_text": processing_time / len(texts) if texts else 0 } except Exception as e: return { "success": False, "error": str(e), "results": [], "total_texts": len(texts), "successful_count": 0, "total_pii_detected": 0, "processing_time": 0, "average_time_per_text": 0 } # MCP Tool 함수들 @mcp.tool() def mcp_detect_pii(text: str) -> Dict[str, Any]: """ MCP Tool: 텍스트에서 PII 탐지 Args: text (str): 분석할 텍스트 Returns: Dict[str, Any]: 탐지 결과 """ detector = get_detector() return detector.detect_pii(text) @mcp.tool() def mcp_process_text(text: str) -> Dict[str, Any]: """ MCP Tool: 텍스트 PII 처리 (탐지 + 익명화) Args: text (str): 처리할 텍스트 Returns: Dict[str, Any]: 처리 결과 """ processor = MCPPIIProcessor() return processor.process_text(text) @mcp.tool() def mcp_batch_process(texts: List[str]) -> Dict[str, Any]: """ MCP Tool: 여러 텍스트 일괄 처리 Args: texts (List[str]): 처리할 텍스트 리스트 Returns: Dict[str, Any]: 일괄 처리 결과 """ processor = MCPPIIProcessor() return processor.batch_process(texts) @mcp.tool() def mcp_anonymize_text(text: str, pii_items: List[Dict[str, Any]]) -> str: """ MCP Tool: 텍스트 익명화 Args: text (str): 원본 텍스트 pii_items (List[Dict[str, Any]]): PII 항목들 Returns: str: 익명화된 텍스트 """ detector = get_detector() pii_objects = [PIIItem(**item) for item in pii_items] return detector.anonymize_text(text, pii_objects) @mcp.tool() def mcp_encrypt_pii_item(pii_value: str, pii_type: str) -> Dict[str, Any]: """ MCP Tool: PII 항목 암호화 Args: pii_value (str): 암호화할 PII 값 pii_type (str): PII 유형 (이름, 전화번호, 이메일 등) Returns: Dict[str, Any]: 암호화 결과 """ try: crypto = PIICrypto() encrypted_value = crypto.encrypt_pii_item(pii_value, pii_type) return { "success": True, "original_value": pii_value, "encrypted_value": encrypted_value, "pii_type": pii_type, "encryption_method": "deterministic" if pii_type in crypto.deterministic_types else "fpe" } except Exception as e: return { "success": False, "error": str(e), "original_value": pii_value, "encrypted_value": "", "pii_type": pii_type } @mcp.tool() def mcp_decrypt_pii_item(encrypted_value: str, pii_type: str) -> Dict[str, Any]: """ MCP Tool: PII 항목 복호화 Args: encrypted_value (str): 복호화할 암호화된 값 pii_type (str): PII 유형 Returns: Dict[str, Any]: 복호화 결과 """ try: crypto = PIICrypto() decrypted_value = crypto.decrypt_pii_item(encrypted_value, pii_type) return { "success": True, "encrypted_value": encrypted_value, "decrypted_value": decrypted_value, "pii_type": pii_type, "decryption_method": "deterministic" if pii_type in crypto.deterministic_types else "fpe" } except Exception as e: return { "success": False, "error": str(e), "encrypted_value": encrypted_value, "decrypted_value": "", "pii_type": pii_type } @mcp.tool() def mcp_encrypt_text_pii(text: str) -> Dict[str, Any]: """ MCP Tool: 텍스트에서 PII를 탐지하고 암호화 Args: text (str): 처리할 텍스트 Returns: Dict[str, Any]: 암호화 처리 결과 """ try: start_time = time.time() # 1. PII 탐지 detection_result = mcp_detect_pii(text) if not detection_result["success"]: return detection_result if not detection_result["pii_items"]: return { "success": True, "original_text": text, "encrypted_text": text, "pii_items": [], "encrypted_items": {}, "count": 0, "processing_time": time.time() - start_time, "summary": {} } # 2. PII 항목들 암호화 crypto = PIICrypto() encrypted_items = {} encrypted_text = text for item in detection_result["pii_items"]: pii_type = item["type"] pii_value = item["value"] # 암호화 encrypted_value = crypto.encrypt_pii_item(pii_value, pii_type) encrypted_items[pii_value] = encrypted_value # 텍스트에서 치환 encrypted_text = encrypted_text.replace(pii_value, encrypted_value) processing_time = time.time() - start_time return { "success": True, "original_text": text, "encrypted_text": encrypted_text, "pii_items": detection_result["pii_items"], "encrypted_items": encrypted_items, "count": len(detection_result["pii_items"]), "processing_time": processing_time, "summary": detection_result["summary"] } except Exception as e: return { "success": False, "error": str(e), "original_text": text, "encrypted_text": text, "pii_items": [], "encrypted_items": {}, "count": 0, "processing_time": 0, "summary": {} } @mcp.tool() def mcp_decrypt_text_pii(encrypted_text: str, encrypted_items: Dict[str, str]) -> Dict[str, Any]: """ MCP Tool: 암호화된 텍스트에서 PII 복호화 Args: encrypted_text (str): 암호화된 텍스트 encrypted_items (Dict[str, str]): 원본값 -> 암호화값 매핑 Returns: Dict[str, Any]: 복호화 결과 """ try: start_time = time.time() # 암호화값 -> 원본값 매핑 생성 decrypted_items = {} decrypted_text = encrypted_text crypto = PIICrypto() for original_value, encrypted_value in encrypted_items.items(): # PII 유형 추정 (암호화된 값을 기준으로) pii_type = _guess_pii_type(encrypted_value) # Base64 패턴인 경우 (결정론적 암호화) 여러 유형 시도 if re.match(r'^[A-Za-z0-9+/=]+$', encrypted_value) and len(encrypted_value) > 20: # 이름, 주소, 이메일 순서로 시도 decrypted_value = None for try_type in ["이름", "주소", "이메일"]: try: result = crypto.decrypt_pii_item(encrypted_value, try_type) if not (result.startswith("[복호화실패:") or result.startswith("[FPE복호화실패:")): decrypted_value = result pii_type = try_type break except: continue if decrypted_value is None: decrypted_value = f"[복호화실패:{encrypted_value[:20]}...]" else: # FPE 암호화 또는 다른 패턴 decrypted_value = crypto.decrypt_pii_item(encrypted_value, pii_type) decrypted_items[encrypted_value] = decrypted_value # 텍스트에서 복호화 decrypted_text = decrypted_text.replace(encrypted_value, decrypted_value) processing_time = time.time() - start_time return { "success": True, "encrypted_text": encrypted_text, "decrypted_text": decrypted_text, "decrypted_items": decrypted_items, "count": len(encrypted_items), "processing_time": processing_time } except Exception as e: return { "success": False, "error": str(e), "encrypted_text": encrypted_text, "decrypted_text": encrypted_text, "decrypted_items": {}, "count": 0, "processing_time": 0 } def _guess_pii_type(value: str) -> str: """PII 값으로부터 유형을 추정하는 헬퍼 함수 (암호화된 값도 고려)""" import re # 이메일 패턴 if re.match(r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$', value): return "이메일" # 전화번호 패턴 (한국) - 하이픈 포함 if re.match(r'^\d{3}-\d{4}-\d{4}$', value) or re.match(r'^\d{2,3}-\d{3,4}-\d{4}$', value): return "전화번호" # 신용카드번호 패턴 - 하이픈 포함 if re.match(r'^\d{4}-\d{4}-\d{4}-\d{4}$', value): return "신용카드번호" # 여권번호 패턴 - 알파벳 + 숫자 (암호화된 경우도 고려) if re.match(r'^[A-Z]\d{8,9}$', value) or re.match(r'^[A-Z]\d{7,10}$', value): return "여권번호" # 주민등록번호 패턴 - 하이픈 포함 if re.match(r'^\d{6}-\d{7}$', value): return "주민등록번호" # 주소 패턴 (한국) - 한글 키워드 포함 if any(keyword in value for keyword in ["시", "구", "로", "동", "번지"]): return "주소" # Base64 패턴 (결정론적 암호화 결과) - 이름, 주소, 이메일만 사용 if re.match(r'^[A-Za-z0-9+/=]+$', value): # Base64는 결정론적 암호화 결과이므로 이름, 주소, 이메일 중 하나 # 길이로 대략적인 구분 (정확하지 않을 수 있음) if len(value) < 50: return "이름" # 짧은 Base64는 보통 이름 elif len(value) < 70: return "이메일" # 중간 길이 Base64는 보통 이메일 else: return "주소" # 긴 Base64는 보통 주소 # 기본적으로 이름으로 추정 return "이름" # MCP Tool 메타데이터 MCP_TOOLS = { "detect_pii": { "name": "detect_pii", "description": "텍스트에서 PII(개인정보)를 탐지합니다. 이름, 이메일, 전화번호, 여권번호, 주소 등을 찾습니다.", "parameters": { "type": "object", "properties": { "text": { "type": "string", "description": "분석할 텍스트" } }, "required": ["text"] } }, "process_text": { "name": "process_text", "description": "텍스트에서 PII(개인 정보) 를 탐지하고 익명화 처리합니다.", "parameters": { "type": "object", "properties": { "text": { "type": "string", "description": "처리할 텍스트" } }, "required": ["text"] } }, "batch_process": { "name": "batch_process", "description": "여러 텍스트를 일괄적으로 PII(개인 정보) 처리합니다.", "parameters": { "type": "object", "properties": { "texts": { "type": "array", "items": {"type": "string"}, "description": "처리할 텍스트 리스트" } }, "required": ["texts"] } }, "anonymize_text": { "name": "anonymize_text", "description": "PII(개인 정보) 항목들을 사용하여 텍스트를 익명화합니다.", "parameters": { "type": "object", "properties": { "text": { "type": "string", "description": "원본 텍스트" }, "pii_items": { "type": "array", "items": { "type": "object", "properties": { "type": {"type": "string"}, "value": {"type": "string"}, "confidence": {"type": "number"}, "start_pos": {"type": "number"}, "end_pos": {"type": "number"} } }, "description": "PII 항목들" } }, "required": ["text", "pii_items"] } }, "encrypt_pii_item": { "name": "encrypt_pii_item", "description": "개별 PII 항목을 암호화합니다. 이름, 주소, 이메일은 결정론적 암호화, 전화번호, 신용카드번호 등은 FPE(형식 유지 암호화)를 사용합니다.", "parameters": { "type": "object", "properties": { "pii_value": { "type": "string", "description": "암호화할 PII 값" }, "pii_type": { "type": "string", "description": "PII 유형 (이름, 전화번호, 이메일, 주소, 신용카드번호, 여권번호, 주민등록번호 등)" } }, "required": ["pii_value", "pii_type"] } }, "decrypt_pii_item": { "name": "decrypt_pii_item", "description": "암호화된 PII 항목을 복호화합니다.", "parameters": { "type": "object", "properties": { "encrypted_value": { "type": "string", "description": "복호화할 암호화된 값" }, "pii_type": { "type": "string", "description": "PII 유형 (암호화 시 사용한 유형과 동일해야 함)" } }, "required": ["encrypted_value", "pii_type"] } }, "encrypt_text_pii": { "name": "encrypt_text_pii", "description": "텍스트에서 PII를 탐지하고 모든 PII를 암호화합니다. 검색 가능한 암호화된 텍스트를 생성합니다.", "parameters": { "type": "object", "properties": { "text": { "type": "string", "description": "처리할 텍스트" } }, "required": ["text"] } }, "decrypt_text_pii": { "name": "decrypt_text_pii", "description": "암호화된 텍스트에서 PII를 복호화합니다.", "parameters": { "type": "object", "properties": { "encrypted_text": { "type": "string", "description": "암호화된 텍스트" }, "encrypted_items": { "type": "object", "description": "원본값 -> 암호화값 매핑 딕셔너리" } }, "required": ["encrypted_text", "encrypted_items"] } } } def main(): """테스트 함수""" print("=== MCP PII Tools 테스트 ===\n") # 테스트 텍스트 test_text = "오늘은 Rachel Lim이(가) 방문했다. 이메일 주소는 rachel.lim@gmail.com이고 전화번호는 010-1234-5678입니다. 여권번호는 M31143886입니다." print(f"테스트 텍스트: {test_text}\n") # 1. PII 탐지 테스트 print("1. PII 탐지 테스트:") detection_result = mcp_detect_pii(test_text) print(f"성공: {detection_result['success']}") print(f"탐지된 PII: {detection_result['count']}개") print(f"처리 시간: {detection_result['processing_time']:.3f}초") print(f"요약: {detection_result['summary']}") if detection_result['pii_items']: print("탐지된 PII 상세:") for item in detection_result['pii_items']: print(f" - {item['type']}: {item['value']}") print() # 2. 텍스트 처리 테스트 print("2. 텍스트 처리 테스트:") process_result = mcp_process_text(test_text) print(f"성공: {process_result['success']}") print(f"원본: {process_result['original_text']}") print(f"익명화: {process_result['anonymized_text']}") print() # 3. 일괄 처리 테스트 print("3. 일괄 처리 테스트:") batch_texts = [ "김철수 씨가 방문했다. 연락처는 010-1234-5678입니다.", "박지현 님의 이메일은 park@test.com입니다.", "John Smith의 주소는 서울시 강남구입니다." ] batch_result = mcp_batch_process(batch_texts) print(f"성공: {batch_result['success']}") print(f"총 텍스트: {batch_result['total_texts']}개") print(f"성공한 처리: {batch_result['successful_count']}개") print(f"총 탐지된 PII: {batch_result['total_pii_detected']}개") print(f"평균 처리 시간: {batch_result['average_time_per_text']:.3f}초") print() # 4. 개별 PII 항목 암호화/복호화 테스트 print("4. 개별 PII 항목 암호화/복호화 테스트:") # 결정론적 암호화 테스트 (이름, 주소, 이메일) print("4-1. 결정론적 암호화 (검색 가능):") deterministic_tests = [ ("김철수", "이름"), ("서울시 강남구 테헤란로 123", "주소"), ("kim@example.com", "이메일") ] for original, pii_type in deterministic_tests: # 암호화 encrypt_result = mcp_encrypt_pii_item(original, pii_type) if encrypt_result['success']: encrypted = encrypt_result['encrypted_value'] print(f" {pii_type}: {original} → {encrypted[:50]}...") # 복호화 decrypt_result = mcp_decrypt_pii_item(encrypted, pii_type) if decrypt_result['success']: decrypted = decrypt_result['decrypted_value'] match = "✅" if original == decrypted else "❌" print(f" 복호화: {decrypted} {match}") # 결정론적 테스트 (같은 입력 → 같은 암호문) encrypt_result2 = mcp_encrypt_pii_item(original, pii_type) if encrypt_result2['success']: encrypted2 = encrypt_result2['encrypted_value'] deterministic = "✅" if encrypted == encrypted2 else "❌" print(f" 결정론적: {deterministic}") print() # FPE 암호화 테스트 (전화번호, 카드번호, 여권번호 등) print("4-2. FPE 암호화 (형식 유지):") fpe_tests = [ ("010-1234-5678", "전화번호"), ("1234-5678-9012-3456", "신용카드번호"), ("M12345678", "여권번호"), ("123456-1234567", "주민등록번호") ] for original, pii_type in fpe_tests: # 암호화 encrypt_result = mcp_encrypt_pii_item(original, pii_type) if encrypt_result['success']: encrypted = encrypt_result['encrypted_value'] print(f" {pii_type}: {original} → {encrypted}") # 복호화 decrypt_result = mcp_decrypt_pii_item(encrypted, pii_type) if decrypt_result['success']: decrypted = decrypt_result['decrypted_value'] match = "✅" if original == decrypted else "❌" format_preserved = "✅" if len(original) == len(encrypted) else "❌" print(f" 복호화: {decrypted} {match}") print(f" 형식유지: {format_preserved}") print() # 5. 텍스트 전체 PII 암호화/복호화 테스트 print("5. 텍스트 전체 PII 암호화/복호화 테스트:") # 암호화된 텍스트 생성 encrypt_text_result = mcp_encrypt_text_pii(test_text) if encrypt_text_result['success']: print(f"원본 텍스트: {encrypt_text_result['original_text']}") print(f"암호화 텍스트: {encrypt_text_result['encrypted_text']}") print(f"암호화된 PII: {encrypt_text_result['count']}개") print(f"처리 시간: {encrypt_text_result['processing_time']:.3f}초") # 암호화된 항목들 표시 if encrypt_text_result['encrypted_items']: print("암호화된 항목들:") for original, encrypted in encrypt_text_result['encrypted_items'].items(): print(f" {original} → {encrypted}") print() # 복호화 테스트 decrypt_text_result = mcp_decrypt_text_pii( encrypt_text_result['encrypted_text'], encrypt_text_result['encrypted_items'] ) if decrypt_text_result['success']: print(f"복호화 텍스트: {decrypt_text_result['decrypted_text']}") print(f"복호화된 항목: {decrypt_text_result['count']}개") print(f"복호화 시간: {decrypt_text_result['processing_time']:.3f}초") # 원본과 복호화 결과 비교 original_match = "✅" if test_text == decrypt_text_result['decrypted_text'] else "❌" print(f"원본 일치: {original_match}") print() # 6. 암호화 방식 비교 테스트 print("6. 암호화 방식 비교 테스트:") # 같은 이름을 여러 번 암호화해서 결정론적 암호화 확인 test_name = "김철수" print(f"테스트 이름: {test_name}") encrypt1 = mcp_encrypt_pii_item(test_name, "이름") encrypt2 = mcp_encrypt_pii_item(test_name, "이름") if encrypt1['success'] and encrypt2['success']: same_encryption = "✅" if encrypt1['encrypted_value'] == encrypt2['encrypted_value'] else "❌" print(f"결정론적 암호화 (같은 입력 → 같은 암호문): {same_encryption}") print(f" 암호문1: {encrypt1['encrypted_value'][:50]}...") print(f" 암호문2: {encrypt2['encrypted_value'][:50]}...") print() # 7. 다양한 PII 유형별 암호화 테스트 print("7. 다양한 PII 유형별 암호화 테스트:") comprehensive_tests = [ ("홍길동", "이름"), ("서울시 강남구 역삼동 123-45", "주소"), ("hong@company.co.kr", "이메일"), ("010-9876-5432", "전화번호"), ("02-123-4567", "전화번호"), ("4532-1234-5678-9012", "신용카드번호"), ("123-456-789012", "은행계좌번호"), ("M98765432", "여권번호"), ("A12345678", "여권번호"), ("901234-1234567", "주민등록번호") ] for original, pii_type in comprehensive_tests: encrypt_result = mcp_encrypt_pii_item(original, pii_type) if encrypt_result['success']: encrypted = encrypt_result['encrypted_value'] method = encrypt_result['encryption_method'] # 복호화로 검증 decrypt_result = mcp_decrypt_pii_item(encrypted, pii_type) if decrypt_result['success']: decrypted = decrypt_result['decrypted_value'] match = "✅" if original == decrypted else "❌" print(f" {pii_type:8} | {original:20} → {encrypted:30} | {method:12} | {match}") print() print("✅ MCP PII Tools 테스트 완료!") if __name__ == "__main__": # 서버 정보 출력 logger.info("Starting MCP server: pii-tools-mcp") logger.info("Available tools: detect_pii, process_text, batch_process, anonymize_text, encrypt_pii_item, decrypt_pii_item, encrypt_text_pii, decrypt_text_pii") # Provider 정보를 stderr로 출력 try: provider_config = get_provider_config() print(f"🔧 MCP PII Tools 설정 - Provider: {provider_config['provider_type'].upper()}, Model: {provider_config['model_id']}", file=sys.stderr) except Exception as e: print(f"⚠️ Provider 설정 로드 실패: {e}", file=sys.stderr) # PII 탐지기 초기화 (Provider 정보 출력을 위해) try: detector = get_detector() print(f"✅ MCP PII Tools 준비 완료 - Provider: {detector.provider_type.upper()}, Model: {detector.model_id}", file=sys.stderr) except Exception as e: print(f"❌ MCP PII Tools 초기화 실패: {e}", file=sys.stderr) # STDIO 전송으로 서버 실행 (Claude/Inspector가 붙음) mcp.run(transport="stdio")

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/czangyeob/mcp-pii-tools'

If you have feedback or need assistance with the MCP directory API, please join our Discord server