mcp_process_text
Detect and anonymize personally identifiable information in text using GPT-4o-based detection and cryptographic methods for data protection.
Instructions
MCP Tool: 텍스트 PII 처리 (탐지 + 익명화)
Args:
text (str): 처리할 텍스트
Returns:
Dict[str, Any]: 처리 결과
Input Schema
TableJSON Schema
| Name | Required | Description | Default |
|---|---|---|---|
| text | Yes |
Implementation Reference
- mcp_pii_tools.py:542-555 (handler)The primary handler function for the 'mcp_process_text' MCP tool. Decorated with @mcp.tool() for automatic registration with the FastMCP server. It creates an MCPPIIProcessor instance and calls its process_text method to perform PII detection and anonymization.@mcp.tool() def mcp_process_text(text: str) -> Dict[str, Any]: """ MCP Tool: 텍스트 PII 처리 (탐지 + 익명화) Args: text (str): 처리할 텍스트 Returns: Dict[str, Any]: 처리 결과 """ processor = MCPPIIProcessor() return processor.process_text(text)
- mcp_pii_tools.py:855-868 (schema)JSON schema definition for the 'process_text' tool, specifying input parameters (text: string) and metadata. Note: Tool name in schema is 'process_text', corresponding to the 'mcp_process_text' function."process_text": { "name": "process_text", "description": "텍스트에서 PII(개인 정보) 를 탐지하고 익명화 처리합니다.", "parameters": { "type": "object", "properties": { "text": { "type": "string", "description": "처리할 텍스트" } }, "required": ["text"] } },
- mcp_pii_tools.py:420-479 (helper)Core helper method in MCPPIIProcessor class that implements the PII processing logic: detects PII using the detector, anonymizes the text, and returns a structured result dictionary. Called directly by the tool handler.def process_text(self, text: str) -> Dict[str, Any]: """ 텍스트에서 PII를 탐지하고 처리 (MCP Tool용) Args: text (str): 처리할 텍스트 Returns: Dict[str, Any]: MCP Tool 응답 형식 """ try: start_time = time.time() if not text: return { "success": True, "original_text": "", "anonymized_text": "", "pii_items": [], "count": 0, "processing_time": 0, "summary": {} } # 1. PII 탐지 detection_result = self.detector.detect_pii(text) if not detection_result["success"]: return detection_result # PIIItem 객체로 변환 pii_items = [PIIItem(**item) for item in detection_result["pii_items"]] # 2. 익명화 처리 anonymized_text = self.detector.anonymize_text(text, pii_items) processing_time = time.time() - start_time return { "success": True, "original_text": text, "anonymized_text": anonymized_text, "pii_items": [asdict(item) for item in pii_items], "count": len(pii_items), "processing_time": processing_time, "summary": detection_result["summary"] } except Exception as e: return { "success": False, "error": str(e), "original_text": text, "anonymized_text": text, "pii_items": [], "count": 0, "processing_time": 0, "summary": {} }
- mcp_pii_tools.py:257-388 (helper)Supporting helper method in MCPPIIDetector class that performs text anonymization by replacing detected PII items with placeholders like [이름], handling edge cases especially for addresses.def anonymize_text(self, text: str, pii_items: List[PIIItem]) -> str: """텍스트에서 PII를 익명화 처리""" if not pii_items: return text logger.info(f"익명화 시작: 원본 텍스트 길이={len(text)}") logger.info(f"익명화할 PII 항목 수: {len(pii_items)}") # PII 항목들을 위치별로 정렬하고 중복 제거 valid_items = [] for item in pii_items: if item.start_pos != -1 and item.end_pos != -1 and item.start_pos < len(text): # 실제 텍스트에서 해당 위치의 내용이 일치하는지 확인 actual_text = text[item.start_pos:item.end_pos] if actual_text == item.value: valid_items.append(item) logger.info(f"유효한 PII: '{item.value}' ({item.type}) at {item.start_pos}-{item.end_pos}") else: # 주소의 경우 위치 불일치를 무시하고 유효한 것으로 처리 if item.type == "주소": valid_items.append(item) logger.info(f"주소 위치 불일치 무시: '{item.value}' ({item.type}) at {item.start_pos}-{item.end_pos}") else: logger.warning(f"위치 불일치: 예상='{item.value}', 실제='{actual_text}' at {item.start_pos}-{item.end_pos}") else: # 주소의 경우 위치가 유효하지 않아도 처리 if item.type == "주소": valid_items.append(item) logger.info(f"주소 위치 무효 무시: '{item.value}' ({item.type}) at {item.start_pos}-{item.end_pos}") else: logger.warning(f"유효하지 않은 위치: '{item.value}' ({item.type}) at {item.start_pos}-{item.end_pos}") # 뒤에서부터 치환 (인덱스 변화 방지) sorted_items = sorted(valid_items, key=lambda x: x.start_pos, reverse=True) anonymized_text = text # 주소의 경우 특별 처리: langextract 위치 정보 무시하고 직접 찾기 address_items = [item for item in sorted_items if item.type == "주소"] other_items = [item for item in sorted_items if item.type != "주소"] # 주소 먼저 처리 for item in address_items: logger.info(f"주소 특별 처리: '{item.value}' ({item.type})") # PII 유형에 따른 익명화 anonymized_value = "[주소]" # 직접 텍스트에서 찾기 if item.value in anonymized_text: anonymized_text = anonymized_text.replace(item.value, anonymized_value, 1) logger.info(f"주소 직접 매치 익명화 완료: '{item.value}' -> '{anonymized_value}'") else: # 부분 매치 시도 keywords = [word for word in item.value.split() if len(word) > 1] for keyword in reversed(keywords): # 뒤에서부터 시도 if keyword in anonymized_text: anonymized_text = anonymized_text.replace(keyword, anonymized_value, 1) logger.info(f"주소 키워드 매치 익명화 완료: '{keyword}' -> '{anonymized_value}' (원본: '{item.value}')") break else: logger.warning(f"주소 '{item.value}'를 찾을 수 없음") # 나머지 PII 처리 for item in other_items: logger.info(f"익명화 처리 중: '{item.value}' ({item.type}) at {item.start_pos}-{item.end_pos}") # PII 유형에 따른 익명화 if item.type == "이름": anonymized_value = "[이름]" elif item.type == "전화번호": anonymized_value = "[전화번호]" elif item.type == "이메일": anonymized_value = "[이메일]" elif item.type == "주소": anonymized_value = "[주소]" elif item.type == "여권번호": anonymized_value = "[여권번호]" else: anonymized_value = f"[{item.type}]" # 텍스트에서 치환 (강화된 문자열 치환 사용) try: # 현재 텍스트에서 해당 값이 있는지 확인 if item.value in anonymized_text: # 문자열 치환 수행 anonymized_text = anonymized_text.replace(item.value, anonymized_value, 1) # 첫 번째 매치만 치환 logger.info(f"익명화 완료: '{item.value}' -> '{anonymized_value}'") else: # 대소문자 무시하고 찾기 import re pattern = re.escape(item.value) match = re.search(pattern, anonymized_text, re.IGNORECASE) if match: start, end = match.span() anonymized_text = anonymized_text[:start] + anonymized_value + anonymized_text[end:] logger.info(f"대소문자 무시 익명화 완료: '{item.value}' -> '{anonymized_value}'") else: # 부분 매치 시도 (주소의 경우) if item.type == "주소" and len(item.value) > 3: # 주소의 마지막 부분으로 찾기 last_part = item.value.split()[-1] if ' ' in item.value else item.value[-3:] if last_part in anonymized_text: anonymized_text = anonymized_text.replace(last_part, anonymized_value, 1) logger.info(f"부분 매치 익명화 완료: '{last_part}' -> '{anonymized_value}' (원본: '{item.value}')") else: # 더 강력한 주소 매칭: 정규식으로 찾기 import re # 주소 패턴을 정규식으로 변환 address_pattern = re.escape(item.value).replace(r'\ ', r'\s+') match = re.search(address_pattern, anonymized_text, re.IGNORECASE) if match: start, end = match.span() anonymized_text = anonymized_text[:start] + anonymized_value + anonymized_text[end:] logger.info(f"정규식 매치 익명화 완료: '{item.value}' -> '{anonymized_value}'") else: # 마지막 시도: 주소의 핵심 키워드로 찾기 keywords = [word for word in item.value.split() if len(word) > 1] for keyword in reversed(keywords): # 뒤에서부터 시도 if keyword in anonymized_text: anonymized_text = anonymized_text.replace(keyword, anonymized_value, 1) logger.info(f"키워드 매치 익명화 완료: '{keyword}' -> '{anonymized_value}' (원본: '{item.value}')") break else: logger.warning(f"텍스트에서 '{item.value}'를 찾을 수 없음") else: logger.warning(f"텍스트에서 '{item.value}'를 찾을 수 없음") except Exception as e: logger.error(f"익명화 실패: {e}, item: {item}") logger.info(f"익명화 완료: 결과 텍스트 길이={len(anonymized_text)}") return anonymized_text