"""
MCP Presidio Server
Provides PII detection and anonymization capabilities using Microsoft Presidio
through the Model Context Protocol (MCP).
"""
import json
from typing import List, Dict, Any, Optional
from mcp.server.fastmcp import FastMCP
from presidio_analyzer import AnalyzerEngine, PatternRecognizer, Pattern
from presidio_analyzer.nlp_engine import NlpEngineProvider
from presidio_anonymizer import AnonymizerEngine
from presidio_anonymizer.entities import OperatorConfig
# Initialize MCP server
mcp = FastMCP("Presidio PII Detection and Anonymization")
# Initialize Presidio engines (lazy initialization)
# Cache analyzer engines per language to support multi-language requests
_analyzer_engines: Dict[str, AnalyzerEngine] = {}
_anonymizer_engine: Optional[AnonymizerEngine] = None
def get_analyzer_engine(language: str = "en") -> AnalyzerEngine:
"""Get or create the analyzer engine with specified language."""
global _analyzer_engines
# Return cached engine if already initialized for this language
if language in _analyzer_engines:
return _analyzer_engines[language]
# Create NLP engine provider with spaCy for this language
try:
provider = NlpEngineProvider(nlp_configuration={
"nlp_engine_name": "spacy",
"models": [{"lang_code": language, "model_name": f"{language}_core_web_lg"}]
})
nlp_engine = provider.create_engine()
_analyzer_engines[language] = AnalyzerEngine(nlp_engine=nlp_engine)
except (OSError, ImportError, ValueError) as e:
# Fallback to basic configuration without NLP if model not available
# This allows the server to work even if spaCy models aren't installed
_analyzer_engines[language] = AnalyzerEngine()
return _analyzer_engines[language]
def get_anonymizer_engine() -> AnonymizerEngine:
"""Get or create the anonymizer engine."""
global _anonymizer_engine
if _anonymizer_engine is None:
_anonymizer_engine = AnonymizerEngine()
return _anonymizer_engine
@mcp.tool()
def analyze_text(
text: str,
language: str = "en",
entities: Optional[List[str]] = None,
score_threshold: float = 0.0,
return_decision_process: bool = False
) -> str:
"""
Analyze text to detect PII entities.
Args:
text: The text to analyze for PII
language: Language code (default: "en")
entities: List of entity types to detect (default: all). Examples: PERSON, EMAIL_ADDRESS,
PHONE_NUMBER, CREDIT_CARD, LOCATION, DATE_TIME, etc.
score_threshold: Minimum confidence score (0.0-1.0) for detection (default: 0.0)
return_decision_process: Include detailed decision process in results (default: False)
Returns:
JSON string with detected PII entities including type, location, and confidence score
"""
analyzer = get_analyzer_engine(language)
results = analyzer.analyze(
text=text,
language=language,
entities=entities,
score_threshold=score_threshold,
return_decision_process=return_decision_process
)
# Convert results to serializable format
output = []
for result in results:
item = {
"entity_type": result.entity_type,
"start": result.start,
"end": result.end,
"score": result.score,
"text": text[result.start:result.end]
}
if return_decision_process and hasattr(result, 'analysis_explanation'):
item["analysis_explanation"] = str(result.analysis_explanation)
output.append(item)
return json.dumps(output, indent=2)
@mcp.tool()
def anonymize_text(
text: str,
language: str = "en",
operator: str = "replace",
entities: Optional[List[str]] = None,
score_threshold: float = 0.0,
operator_params: Optional[Dict[str, Any]] = None
) -> str:
"""
Anonymize PII in text using various operators.
Args:
text: The text to anonymize
language: Language code (default: "en")
operator: Anonymization operator - "replace", "redact", "hash", "mask", "encrypt" (default: "replace")
entities: List of entity types to anonymize (default: all)
score_threshold: Minimum confidence score for detection (default: 0.0)
operator_params: Additional parameters for the operator (e.g., {"new_value": "ANONYMIZED"})
Returns:
JSON string with anonymized text and list of anonymized entities
"""
analyzer = get_analyzer_engine(language)
anonymizer = get_anonymizer_engine()
# Analyze text
analyzer_results = analyzer.analyze(
text=text,
language=language,
entities=entities,
score_threshold=score_threshold
)
# Build operator config
if operator_params is None:
operator_params = {}
# Default operator configurations
if operator == "replace" and "new_value" not in operator_params:
operator_params["new_value"] = "<{entity_type}>"
operators = {
entity_type: OperatorConfig(operator, operator_params)
for entity_type in (entities or [result.entity_type for result in analyzer_results])
}
# Anonymize
anonymized_result = anonymizer.anonymize(
text=text,
analyzer_results=analyzer_results,
operators=operators
)
output = {
"anonymized_text": anonymized_result.text,
"entities": [
{
"entity_type": item.entity_type,
"start": item.start,
"end": item.end,
"operator": item.operator
}
for item in anonymized_result.items
]
}
return json.dumps(output, indent=2)
@mcp.tool()
def get_supported_entities(language: str = "en") -> str:
"""
Get list of all supported PII entity types for a language.
Args:
language: Language code (default: "en")
Returns:
JSON string with list of supported entity types and their descriptions
"""
analyzer = get_analyzer_engine(language)
supported = analyzer.get_supported_entities(language=language)
# Get more details about each entity type
entity_details = []
for entity in supported:
entity_details.append({
"entity_type": entity,
"description": _get_entity_description(entity)
})
return json.dumps(entity_details, indent=2)
def _get_entity_description(entity_type: str) -> str:
"""Get human-readable description for entity type."""
descriptions = {
"PERSON": "Person names (first, last, full names)",
"EMAIL_ADDRESS": "Email addresses",
"PHONE_NUMBER": "Phone numbers (various formats)",
"CREDIT_CARD": "Credit card numbers",
"IBAN_CODE": "International Bank Account Numbers",
"US_SSN": "US Social Security Numbers",
"US_DRIVER_LICENSE": "US Driver's License numbers",
"US_PASSPORT": "US Passport numbers",
"LOCATION": "Geographic locations (cities, countries, addresses)",
"DATE_TIME": "Dates and times",
"URL": "URLs and web addresses",
"IP_ADDRESS": "IP addresses (IPv4 and IPv6)",
"CRYPTO": "Cryptocurrency wallet addresses",
"UK_NHS": "UK National Health Service numbers",
"NRP": "Spanish National ID",
"SG_NRIC_FIN": "Singapore National Registration ID",
"AU_ABN": "Australian Business Number",
"AU_ACN": "Australian Company Number",
"AU_TFN": "Australian Tax File Number",
"AU_MEDICARE": "Australian Medicare number",
"IN_PAN": "Indian Permanent Account Number",
"IN_AADHAAR": "Indian Aadhaar number",
"IN_VEHICLE_REGISTRATION": "Indian Vehicle Registration",
"MEDICAL_LICENSE": "Medical license numbers",
"US_BANK_NUMBER": "US bank account and routing numbers",
}
return descriptions.get(entity_type, f"PII entity of type {entity_type}")
@mcp.tool()
def add_custom_recognizer(
name: str,
entity_type: str,
patterns: List[Dict[str, Any]],
context: Optional[List[str]] = None,
supported_language: str = "en"
) -> str:
"""
Add a custom PII recognizer with regex patterns.
Args:
name: Unique name for this recognizer
entity_type: The entity type this recognizer detects
patterns: List of pattern dicts with 'name', 'regex', and 'score' (0.0-1.0)
Example: [{"name": "weak", "regex": "\\d{3}", "score": 0.3}]
context: Optional context words that increase confidence
supported_language: Language code (default: "en")
Returns:
JSON string confirming the recognizer was added
"""
analyzer = get_analyzer_engine(supported_language)
# Validate and convert pattern dicts to Pattern objects
pattern_objects = []
for idx, p in enumerate(patterns):
if not isinstance(p, dict):
return json.dumps({
"status": "error",
"message": f"Pattern at index {idx} is not a dictionary: {p!r}"
})
for key in ("name", "regex", "score"):
if key not in p:
return json.dumps({
"status": "error",
"message": f"Pattern at index {idx} missing required key '{key}': {p!r}"
})
score = p["score"]
try:
score_float = float(score)
except (TypeError, ValueError):
return json.dumps({
"status": "error",
"message": f"Pattern at index {idx} has non-numeric score: {score!r}"
})
if not (0.0 <= score_float <= 1.0):
return json.dumps({
"status": "error",
"message": f"Pattern at index {idx} has score out of range [0.0, 1.0]: {score!r}"
})
pattern_objects.append(Pattern(name=p["name"], regex=p["regex"], score=score_float))
# Create custom recognizer
custom_recognizer = PatternRecognizer(
supported_entity=entity_type,
name=name,
patterns=pattern_objects,
context=context,
supported_language=supported_language
)
# Add to registry
analyzer.registry.add_recognizer(custom_recognizer)
return json.dumps({
"status": "success",
"message": f"Custom recognizer '{name}' added for entity type '{entity_type}'"
})
@mcp.tool()
def batch_analyze(
texts: List[str],
language: str = "en",
entities: Optional[List[str]] = None,
score_threshold: float = 0.0
) -> str:
"""
Analyze multiple texts in batch for PII detection.
Args:
texts: List of texts to analyze
language: Language code (default: "en")
entities: List of entity types to detect (default: all)
score_threshold: Minimum confidence score (default: 0.0)
Returns:
JSON string with results for each text indexed by position
"""
analyzer = get_analyzer_engine(language)
results = []
for idx, text in enumerate(texts):
text_results = analyzer.analyze(
text=text,
language=language,
entities=entities,
score_threshold=score_threshold
)
results.append({
"index": idx,
"text_preview": text[:100] + "..." if len(text) > 100 else text,
"entities_found": [
{
"entity_type": result.entity_type,
"start": result.start,
"end": result.end,
"score": result.score,
"text": text[result.start:result.end]
}
for result in text_results
]
})
return json.dumps(results, indent=2)
@mcp.tool()
def batch_anonymize(
texts: List[str],
language: str = "en",
operator: str = "replace",
entities: Optional[List[str]] = None,
score_threshold: float = 0.0
) -> str:
"""
Anonymize multiple texts in batch.
Args:
texts: List of texts to anonymize
language: Language code (default: "en")
operator: Anonymization operator (default: "replace")
entities: List of entity types to anonymize (default: all)
score_threshold: Minimum confidence score (default: 0.0)
Returns:
JSON string with anonymized results for each text
"""
analyzer = get_analyzer_engine(language)
anonymizer = get_anonymizer_engine()
results = []
for idx, text in enumerate(texts):
# Analyze
analyzer_results = analyzer.analyze(
text=text,
language=language,
entities=entities,
score_threshold=score_threshold
)
# Anonymize
operators = {
entity_type: OperatorConfig(operator, {"new_value": f"<{entity_type}>"})
for entity_type in (entities or [result.entity_type for result in analyzer_results])
}
anonymized_result = anonymizer.anonymize(
text=text,
analyzer_results=analyzer_results,
operators=operators
)
results.append({
"index": idx,
"original_preview": text[:100] + "..." if len(text) > 100 else text,
"anonymized_text": anonymized_result.text,
"entities_count": len(anonymized_result.items)
})
return json.dumps(results, indent=2)
@mcp.tool()
def get_anonymization_operators() -> str:
"""
Get list of available anonymization operators and their descriptions.
Returns:
JSON string with operator names, descriptions, and example parameters
"""
operators = [
{
"operator": "replace",
"description": "Replace PII with a placeholder string",
"example_params": {"new_value": "<ANONYMIZED>"}
},
{
"operator": "redact",
"description": "Remove PII entirely from text",
"example_params": {}
},
{
"operator": "hash",
"description": "Replace PII with a hash value",
"example_params": {"hash_type": "sha256"}
},
{
"operator": "mask",
"description": "Mask PII with a character",
"example_params": {"chars_to_mask": 4, "masking_char": "*", "from_end": True}
},
{
"operator": "encrypt",
"description": "Encrypt PII using AES",
"example_params": {"key": "WmZq4t7w!z%C*F-J"}
},
{
"operator": "keep",
"description": "Keep the PII as-is (useful for selective anonymization)",
"example_params": {}
}
]
return json.dumps(operators, indent=2)
@mcp.tool()
def analyze_structured_data(
data: str,
language: str = "en",
entities: Optional[List[str]] = None,
score_threshold: float = 0.0
) -> str:
"""
Analyze structured data (JSON/dict) for PII.
Args:
data: JSON string representing structured data
language: Language code (default: "en")
entities: List of entity types to detect (default: all)
score_threshold: Minimum confidence score (default: 0.0)
Returns:
JSON string with PII findings organized by data structure path
"""
try:
data_obj = json.loads(data)
except json.JSONDecodeError:
return json.dumps({"error": "Invalid JSON data provided"})
analyzer = get_analyzer_engine(language)
def analyze_recursive(obj: Any, path: str = "", depth: int = 0) -> List[Dict[str, Any]]:
"""Recursively analyze nested data structures with depth limit."""
# Limit recursion depth to prevent stack overflow
if depth > 100:
return []
results = []
if isinstance(obj, dict):
for key, value in obj.items():
new_path = f"{path}.{key}" if path else key
results.extend(analyze_recursive(value, new_path, depth + 1))
elif isinstance(obj, list):
for idx, item in enumerate(obj):
new_path = f"{path}[{idx}]"
results.extend(analyze_recursive(item, new_path, depth + 1))
elif isinstance(obj, str):
# Analyze string values
text_results = analyzer.analyze(
text=obj,
language=language,
entities=entities,
score_threshold=score_threshold
)
if text_results:
results.append({
"path": path,
"value": obj,
"entities": [
{
"entity_type": r.entity_type,
"start": r.start,
"end": r.end,
"score": r.score,
"text": obj[r.start:r.end]
}
for r in text_results
]
})
return results
findings = analyze_recursive(data_obj)
return json.dumps({
"total_fields_with_pii": len(findings),
"findings": findings
}, indent=2)
@mcp.tool()
def anonymize_structured_data(
data: str,
language: str = "en",
operator: str = "replace",
entities: Optional[List[str]] = None,
score_threshold: float = 0.0
) -> str:
"""
Anonymize PII in structured data (JSON/dict).
Args:
data: JSON string representing structured data
language: Language code (default: "en")
operator: Anonymization operator (default: "replace")
entities: List of entity types to anonymize (default: all)
score_threshold: Minimum confidence score (default: 0.0)
Returns:
JSON string with anonymized structured data
"""
try:
data_obj = json.loads(data)
except json.JSONDecodeError:
return json.dumps({
"anonymized_data": None,
"error": "Invalid JSON data provided"
}, indent=2)
analyzer = get_analyzer_engine(language)
anonymizer = get_anonymizer_engine()
def anonymize_recursive(obj: Any, depth: int = 0) -> Any:
"""Recursively anonymize nested data structures with depth limit."""
# Limit recursion depth to prevent stack overflow
if depth > 100:
return obj
if isinstance(obj, dict):
return {k: anonymize_recursive(v, depth + 1) for k, v in obj.items()}
elif isinstance(obj, list):
return [anonymize_recursive(item, depth + 1) for item in obj]
elif isinstance(obj, str):
# Analyze and anonymize string values
analyzer_results = analyzer.analyze(
text=obj,
language=language,
entities=entities,
score_threshold=score_threshold
)
if analyzer_results:
operators = {
entity_type: OperatorConfig(operator, {"new_value": f"<{entity_type}>"})
for entity_type in [result.entity_type for result in analyzer_results]
}
anonymized = anonymizer.anonymize(
text=obj,
analyzer_results=analyzer_results,
operators=operators
)
return anonymized.text
return obj
else:
return obj
anonymized_data = anonymize_recursive(data_obj)
return json.dumps({
"anonymized_data": anonymized_data
}, indent=2)
@mcp.tool()
def validate_detection(
text: str,
expected_entities: List[Dict[str, Any]],
language: str = "en"
) -> str:
"""
Validate PII detection against expected results (useful for testing).
Args:
text: The text to analyze
expected_entities: List of expected entities with 'entity_type', 'start', 'end'
language: Language code (default: "en")
Returns:
JSON string with validation results including precision, recall, and F1 score
"""
analyzer = get_analyzer_engine(language)
# Detect entities
detected = analyzer.analyze(text=text, language=language)
# Convert to sets for comparison
detected_set = {
(r.entity_type, r.start, r.end) for r in detected
}
expected_set = {
(e["entity_type"], e["start"], e["end"]) for e in expected_entities
}
# Calculate metrics
true_positives = len(detected_set & expected_set)
false_positives = len(detected_set - expected_set)
false_negatives = len(expected_set - detected_set)
precision = true_positives / (true_positives + false_positives) if (true_positives + false_positives) > 0 else 0
recall = true_positives / (true_positives + false_negatives) if (true_positives + false_negatives) > 0 else 0
f1 = 2 * (precision * recall) / (precision + recall) if (precision + recall) > 0 else 0
return json.dumps({
"metrics": {
"precision": round(precision, 3),
"recall": round(recall, 3),
"f1_score": round(f1, 3),
"true_positives": true_positives,
"false_positives": false_positives,
"false_negatives": false_negatives
},
"detected": [
{"entity_type": r.entity_type, "start": r.start, "end": r.end, "text": text[r.start:r.end]}
for r in detected
],
"expected": expected_entities,
"missing": [
{"entity_type": e[0], "start": e[1], "end": e[2]}
for e in (expected_set - detected_set)
],
"unexpected": [
{"entity_type": e[0], "start": e[1], "end": e[2]}
for e in (detected_set - expected_set)
]
}, indent=2)
def main():
"""Run the MCP server."""
# Use stdio transport for MCP
mcp.run(transport="stdio")
if __name__ == "__main__":
main()