Skip to main content
Glama

Gemini MCP Server

pii_sanitizer.pyโ€ข11.2 kB
#!/usr/bin/env python3 """ PII (Personally Identifiable Information) Sanitizer for HTTP recordings. This module provides comprehensive sanitization of sensitive data in HTTP request/response recordings to prevent accidental exposure of API keys, tokens, personal information, and other sensitive data. """ import logging import re from copy import deepcopy from dataclasses import dataclass from re import Pattern from typing import Any, Optional logger = logging.getLogger(__name__) @dataclass class PIIPattern: """Defines a pattern for detecting and sanitizing PII.""" name: str pattern: Pattern[str] replacement: str description: str @classmethod def create(cls, name: str, pattern: str, replacement: str, description: str) -> "PIIPattern": """Create a PIIPattern with compiled regex.""" return cls(name=name, pattern=re.compile(pattern), replacement=replacement, description=description) class PIISanitizer: """Sanitizes PII from various data structures while preserving format.""" def __init__(self, patterns: Optional[list[PIIPattern]] = None): """Initialize with optional custom patterns.""" self.patterns: list[PIIPattern] = patterns or [] self.sanitize_enabled = True # Add default patterns if none provided if not patterns: self._add_default_patterns() def _add_default_patterns(self): """Add comprehensive default PII patterns.""" default_patterns = [ # API Keys - Core patterns (Bearer tokens handled in sanitize_headers) PIIPattern.create( name="openai_api_key_proj", pattern=r"sk-proj-[A-Za-z0-9\-_]{48,}", replacement="sk-proj-SANITIZED", description="OpenAI project API keys", ), PIIPattern.create( name="openai_api_key", pattern=r"sk-[A-Za-z0-9]{48,}", replacement="sk-SANITIZED", description="OpenAI API keys", ), PIIPattern.create( name="anthropic_api_key", pattern=r"sk-ant-[A-Za-z0-9\-_]{48,}", replacement="sk-ant-SANITIZED", description="Anthropic API keys", ), PIIPattern.create( name="google_api_key", pattern=r"AIza[A-Za-z0-9\-_]{35,}", replacement="AIza-SANITIZED", description="Google API keys", ), PIIPattern.create( name="github_tokens", pattern=r"gh[psr]_[A-Za-z0-9]{36}", replacement="gh_SANITIZED", description="GitHub tokens (all types)", ), # JWT tokens PIIPattern.create( name="jwt_token", pattern=r"eyJ[A-Za-z0-9\-_]+\.eyJ[A-Za-z0-9\-_]+\.[A-Za-z0-9\-_]+", replacement="eyJ-SANITIZED", description="JSON Web Tokens", ), # Personal Information PIIPattern.create( name="email_address", pattern=r"[a-zA-Z0-9._%+\-]+@[a-zA-Z0-9.\-]+\.[a-zA-Z]{2,}", replacement="user@example.com", description="Email addresses", ), PIIPattern.create( name="ipv4_address", pattern=r"\b(?:(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.){3}(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\b", replacement="0.0.0.0", description="IPv4 addresses", ), PIIPattern.create( name="ssn", pattern=r"\b\d{3}-\d{2}-\d{4}\b", replacement="XXX-XX-XXXX", description="Social Security Numbers", ), PIIPattern.create( name="credit_card", pattern=r"\b\d{4}[\s\-]?\d{4}[\s\-]?\d{4}[\s\-]?\d{4}\b", replacement="XXXX-XXXX-XXXX-XXXX", description="Credit card numbers", ), PIIPattern.create( name="phone_number", pattern=r"(?:\+\d{1,3}[\s\-]?)?\(?\d{3}\)?[\s\-]?\d{3}[\s\-]?\d{4}\b(?![\d\.\,\]\}])", replacement="(XXX) XXX-XXXX", description="Phone numbers (all formats)", ), # AWS PIIPattern.create( name="aws_access_key", pattern=r"AKIA[0-9A-Z]{16}", replacement="AKIA-SANITIZED", description="AWS access keys", ), # Other common patterns PIIPattern.create( name="slack_token", pattern=r"xox[baprs]-[0-9]{10,13}-[0-9]{10,13}-[a-zA-Z0-9]{24,34}", replacement="xox-SANITIZED", description="Slack tokens", ), PIIPattern.create( name="stripe_key", pattern=r"(?:sk|pk)_(?:test|live)_[0-9a-zA-Z]{24,99}", replacement="sk_SANITIZED", description="Stripe API keys", ), ] self.patterns.extend(default_patterns) def add_pattern(self, pattern: PIIPattern): """Add a custom PII pattern.""" self.patterns.append(pattern) logger.info(f"Added PII pattern: {pattern.name}") def sanitize_string(self, text: str) -> str: """Apply all patterns to sanitize a string.""" if not self.sanitize_enabled or not isinstance(text, str): return text sanitized = text for pattern in self.patterns: if pattern.pattern.search(sanitized): sanitized = pattern.pattern.sub(pattern.replacement, sanitized) logger.debug(f"Applied {pattern.name} sanitization") return sanitized def sanitize_headers(self, headers: dict[str, str]) -> dict[str, str]: """Special handling for HTTP headers.""" if not self.sanitize_enabled: return headers sanitized_headers = {} for key, value in headers.items(): # Special case for Authorization headers to preserve auth type if key.lower() == "authorization" and " " in value: auth_type = value.split(" ", 1)[0] if auth_type in ("Bearer", "Basic"): sanitized_headers[key] = f"{auth_type} SANITIZED" else: sanitized_headers[key] = self.sanitize_string(value) else: # Apply standard sanitization to all other headers sanitized_headers[key] = self.sanitize_string(value) return sanitized_headers def sanitize_value(self, value: Any) -> Any: """Recursively sanitize any value (string, dict, list, etc).""" if not self.sanitize_enabled: return value if isinstance(value, str): return self.sanitize_string(value) elif isinstance(value, dict): return {k: self.sanitize_value(v) for k, v in value.items()} elif isinstance(value, list): return [self.sanitize_value(item) for item in value] elif isinstance(value, tuple): return tuple(self.sanitize_value(item) for item in value) else: # For other types (int, float, bool, None), return as-is return value def sanitize_url(self, url: str) -> str: """Sanitize sensitive data from URLs (query params, etc).""" if not self.sanitize_enabled: return url # First apply general string sanitization url = self.sanitize_string(url) # Parse and sanitize query parameters if "?" in url: base, query = url.split("?", 1) params = [] for param in query.split("&"): if "=" in param: key, value = param.split("=", 1) # Sanitize common sensitive parameter names sensitive_params = {"key", "token", "api_key", "secret", "password"} if key.lower() in sensitive_params: params.append(f"{key}=SANITIZED") else: # Still sanitize the value for PII params.append(f"{key}={self.sanitize_string(value)}") else: params.append(param) return f"{base}?{'&'.join(params)}" return url def sanitize_request(self, request_data: dict[str, Any]) -> dict[str, Any]: """Sanitize a complete request dictionary.""" sanitized = deepcopy(request_data) # Sanitize headers if "headers" in sanitized: sanitized["headers"] = self.sanitize_headers(sanitized["headers"]) # Sanitize URL if "url" in sanitized: sanitized["url"] = self.sanitize_url(sanitized["url"]) # Sanitize content if "content" in sanitized: sanitized["content"] = self.sanitize_value(sanitized["content"]) return sanitized def sanitize_response(self, response_data: dict[str, Any]) -> dict[str, Any]: """Sanitize a complete response dictionary.""" sanitized = deepcopy(response_data) # Sanitize headers if "headers" in sanitized: sanitized["headers"] = self.sanitize_headers(sanitized["headers"]) # Sanitize content if "content" in sanitized: # Handle base64 encoded content specially if isinstance(sanitized["content"], dict) and sanitized["content"].get("encoding") == "base64": if "data" in sanitized["content"]: import base64 try: # Decode, sanitize, and re-encode the actual response body decoded_bytes = base64.b64decode(sanitized["content"]["data"]) # Attempt to decode as UTF-8 for sanitization. If it fails, it's likely binary. try: decoded_str = decoded_bytes.decode("utf-8") sanitized_str = self.sanitize_string(decoded_str) sanitized["content"]["data"] = base64.b64encode(sanitized_str.encode("utf-8")).decode( "utf-8" ) except UnicodeDecodeError: # Content is not text, leave as is. pass except (base64.binascii.Error, TypeError): # Handle cases where data is not valid base64 pass # Sanitize other metadata fields for key, value in sanitized["content"].items(): if key != "data": sanitized["content"][key] = self.sanitize_value(value) else: sanitized["content"] = self.sanitize_value(sanitized["content"]) return sanitized # Global instance for convenience default_sanitizer = PIISanitizer()

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/BeehiveInnovations/gemini-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server