# -*- coding: utf-8 -*-
"""Location: ./mcpgateway/common/validators.py
Copyright 2025
SPDX-License-Identifier: Apache-2.0
Authors: Mihai Criveti, Madhav Kandukuri
SecurityValidator for MCP Gateway
This module defines the `SecurityValidator` class, which provides centralized, configurable
validation logic for user-generated content in MCP-based applications.
The validator enforces strict security and structural rules across common input types such as:
- Display text (e.g., names, descriptions)
- Identifiers and tool names
- URIs and URLs
- JSON object depth
- Templates (including limited HTML/Jinja2)
- MIME types
Key Features:
- Pattern-based validation using settings-defined regex for HTML/script safety
- Configurable max lengths and depth limits
- Whitelist-based URL scheme and MIME type validation
- Safe escaping of user-visible text fields
- Reusable static/class methods for field-level and form-level validation
Intended to be used with Pydantic or similar schema-driven systems to validate and sanitize
user input in a consistent, centralized way.
Dependencies:
- Standard Library: re, html, logging, urllib.parse
- First-party: `settings` from `mcpgateway.config`
Example usage:
SecurityValidator.validate_name("my_tool", field_name="Tool Name")
SecurityValidator.validate_url("https://example.com")
SecurityValidator.validate_json_depth({...})
Examples:
>>> from mcpgateway.common.validators import SecurityValidator
>>> SecurityValidator.sanitize_display_text('<b>Test</b>', 'test')
'<b>Test</b>'
>>> SecurityValidator.validate_name('valid_name-123', 'test')
'valid_name-123'
>>> SecurityValidator.validate_identifier('my.test.id_123', 'test')
'my.test.id_123'
>>> SecurityValidator.validate_json_depth({'a': {'b': 1}})
>>> SecurityValidator.validate_json_depth({'a': 1})
"""
# Standard
import html
import ipaddress
import logging
from pathlib import Path
import re
import shlex
import socket
from typing import Any, Iterable, List, Optional, Pattern
from urllib.parse import urlparse
import uuid
# First-Party
from mcpgateway.config import settings
logger = logging.getLogger(__name__)
# ============================================================================
# Precompiled regex patterns (compiled once at module load for performance)
# ============================================================================
# Note: Settings-based patterns (DANGEROUS_HTML_PATTERN, DANGEROUS_JS_PATTERN,
# NAME_PATTERN, IDENTIFIER_PATTERN, etc.) are NOT precompiled here because tests
# override the class attributes at runtime. Only truly static patterns are
# precompiled at module level.
# Static inline patterns used multiple times
_HTML_SPECIAL_CHARS_RE: Pattern[str] = re.compile(r'[<>"\']') # / removed per SEP-986
_DANGEROUS_TEMPLATE_TAGS_RE: Pattern[str] = re.compile(r"<(script|iframe|object|embed|link|meta|base|form)\b", re.IGNORECASE)
_EVENT_HANDLER_RE: Pattern[str] = re.compile(r"on\w+\s*=", re.IGNORECASE)
_MIME_TYPE_RE: Pattern[str] = re.compile(r"^[a-zA-Z0-9][a-zA-Z0-9!#$&\-\^_+\.]*\/[a-zA-Z0-9][a-zA-Z0-9!#$&\-\^_+\.]*$")
_URI_SCHEME_RE: Pattern[str] = re.compile(r"^[a-zA-Z][a-zA-Z0-9+\-.]*://")
_SHELL_DANGEROUS_CHARS_RE: Pattern[str] = re.compile(r"[;&|`$(){}\[\]<>]")
_ANSI_ESCAPE_RE: Pattern[str] = re.compile(r"\x1B\[[0-9;]*[A-Za-z]")
_CONTROL_CHARS_RE: Pattern[str] = re.compile(r"[\x00-\x08\x0b\x0c\x0e-\x1f\x7f-\x9f]")
# Polyglot attack patterns (precompiled with IGNORECASE)
_POLYGLOT_PATTERNS: List[Pattern[str]] = [
re.compile(r"['\"];.*alert\s*\(", re.IGNORECASE),
re.compile(r"-->\s*<[^>]+>", re.IGNORECASE),
re.compile(r"['\"].*//['\"]", re.IGNORECASE),
re.compile(r"<<[A-Z]+>", re.IGNORECASE),
re.compile(r"String\.fromCharCode", re.IGNORECASE),
re.compile(r"javascript:.*\(", re.IGNORECASE),
]
# SSTI prevention - safe scanning without regex backtracking.
_SSTI_DANGEROUS_SUBSTRINGS: tuple[str, ...] = (
"__",
".",
"config",
"self",
"request",
"application",
"globals",
"builtins",
"import",
"getattr", # Python getattr function
"|attr", # Jinja2 attr filter (checked after whitespace normalization)
"|selectattr", # Jinja2 selectattr filter (takes attribute name as arg)
"|sort", # Jinja2 sort filter with attribute parameter
"|map", # Jinja2 map filter with attribute parameter
"attribute=", # Jinja2 filters: map(attribute=...), selectattr, sort(attribute=...)
"\\x", # Hex escape sequences (e.g., \x5f for underscore)
"\\u", # Unicode escape sequences (e.g., \u005f for underscore)
"\\n{", # Named unicode escapes (e.g., \N{LOW LINE})
"\\0",
"\\1",
"\\2",
"\\3",
"\\4",
"\\5",
"\\6",
"\\7", # Octal escapes
)
# Operators that enable code execution or dynamic construction
_SSTI_DANGEROUS_OPERATORS: tuple[str, ...] = (
"*",
"/",
"+",
"-",
"~", # Jinja2 string concatenation (can build dunder names dynamically)
"[", # Bracket notation for dynamic attribute access
"%", # Python string formatting (e.g., '%c' % 95 produces '_')
)
_SSTI_SIMPLE_TEMPLATE_PREFIXES: tuple[str, ...] = ("${", "#{", "%{")
def _iter_template_expressions(value: str, start: str, end: str) -> Iterable[str]:
"""Yield template expression contents for a start/end delimiter, skipping delimiters inside quotes.
Args:
value (str): Template text to scan.
start (str): Opening delimiter.
end (str): Closing delimiter.
Yields:
str: The template expression contents between delimiters.
Raises:
ValueError: If an unterminated template expression is found (fail-closed behavior).
"""
start_len = len(start)
end_len = len(end)
i = 0
value_len = len(value)
while i <= value_len - start_len:
if value.startswith(start, i):
j = i + start_len
in_quote: Optional[str] = None
escaped = False
while j <= value_len - end_len:
ch = value[j]
if escaped:
escaped = False
elif ch == "\\":
escaped = True
elif in_quote:
if ch == in_quote:
in_quote = None
else:
if ch in ("'", '"'):
in_quote = ch
elif value.startswith(end, j):
yield value[i + start_len : j]
i = j + end_len
break
j += 1
else:
raise ValueError("Template contains potentially dangerous expressions")
else:
i += 1
def _has_simple_template_expression(value: str, start: str) -> bool:
"""Return True if start is followed by any closing brace.
Uses O(n) linear scan by finding last } first, then checking prefixes.
Args:
value (str): Template text to scan.
start (str): Opening delimiter.
Returns:
bool: True if a closing brace exists after the delimiter.
"""
# Find the last closing brace - if none exists, no expression can be complete
last_close = value.rfind("}")
if last_close == -1:
return False
# Check if any prefix exists before the last closing brace - O(n) single find
idx = value.find(start)
return idx != -1 and idx < last_close
# Dangerous URL protocol patterns (precompiled with IGNORECASE)
_DANGEROUS_URL_PATTERNS: List[Pattern[str]] = [
re.compile(r"javascript:", re.IGNORECASE),
re.compile(r"data:", re.IGNORECASE),
re.compile(r"vbscript:", re.IGNORECASE),
re.compile(r"about:", re.IGNORECASE),
re.compile(r"chrome:", re.IGNORECASE),
re.compile(r"file:", re.IGNORECASE),
re.compile(r"ftp:", re.IGNORECASE),
re.compile(r"mailto:", re.IGNORECASE),
]
# SQL injection patterns (precompiled with IGNORECASE)
_SQL_PATTERNS: List[Pattern[str]] = [
re.compile(r"[';\"\\]", re.IGNORECASE),
re.compile(r"--", re.IGNORECASE),
re.compile(r"/\*.*?\*/", re.IGNORECASE),
re.compile(r"\b(union|select|insert|update|delete|drop|exec|execute)\b", re.IGNORECASE),
]
class SecurityValidator:
"""Configurable validation with MCP-compliant limits"""
# Configurable patterns (from settings)
DANGEROUS_HTML_PATTERN = (
settings.validation_dangerous_html_pattern
) # Default: '<(script|iframe|object|embed|link|meta|base|form|img|svg|video|audio|source|track|area|map|canvas|applet|frame|frameset|html|head|body|style)\b|</*(script|iframe|object|embed|link|meta|base|form|img|svg|video|audio|source|track|area|map|canvas|applet|frame|frameset|html|head|body|style)>'
DANGEROUS_JS_PATTERN = settings.validation_dangerous_js_pattern # Default: javascript:|vbscript:|on\w+\s*=|data:.*script
ALLOWED_URL_SCHEMES = settings.validation_allowed_url_schemes # Default: ["http://", "https://", "ws://", "wss://"]
# Character type patterns
NAME_PATTERN = settings.validation_name_pattern # Default: ^[a-zA-Z0-9_\-\s]+$
IDENTIFIER_PATTERN = settings.validation_identifier_pattern # Default: ^[a-zA-Z0-9_\-\.]+$
VALIDATION_SAFE_URI_PATTERN = settings.validation_safe_uri_pattern # Default: ^[a-zA-Z0-9_\-.:/?=&%]+$
VALIDATION_UNSAFE_URI_PATTERN = settings.validation_unsafe_uri_pattern # Default: [<>"\'\\]
TOOL_NAME_PATTERN = settings.validation_tool_name_pattern # Default: ^[a-zA-Z0-9_][a-zA-Z0-9._/-]*$ (SEP-986)
# MCP-compliant limits (configurable)
MAX_NAME_LENGTH = settings.validation_max_name_length # Default: 255
MAX_DESCRIPTION_LENGTH = settings.validation_max_description_length # Default: 8192 (8KB)
MAX_TEMPLATE_LENGTH = settings.validation_max_template_length # Default: 65536
MAX_CONTENT_LENGTH = settings.validation_max_content_length # Default: 1048576 (1MB)
MAX_JSON_DEPTH = settings.validation_max_json_depth # Default: 30
MAX_URL_LENGTH = settings.validation_max_url_length # Default: 2048
@classmethod
def sanitize_display_text(cls, value: str, field_name: str) -> str:
"""Ensure text is safe for display in UI by escaping special characters
Args:
value (str): Value to validate
field_name (str): Name of field being validated
Returns:
str: Value if acceptable
Raises:
ValueError: When input is not acceptable
Examples:
Basic HTML escaping:
>>> SecurityValidator.sanitize_display_text('Hello World', 'test')
'Hello World'
>>> SecurityValidator.sanitize_display_text('Hello <b>World</b>', 'test')
'Hello <b>World</b>'
Empty/None handling:
>>> SecurityValidator.sanitize_display_text('', 'test')
''
>>> SecurityValidator.sanitize_display_text(None, 'test') #doctest: +SKIP
Dangerous script patterns:
>>> SecurityValidator.sanitize_display_text('alert();', 'test')
'alert();'
>>> SecurityValidator.sanitize_display_text('javascript:alert(1)', 'test')
Traceback (most recent call last):
...
ValueError: test contains script patterns that may cause display issues
Polyglot attack patterns:
>>> SecurityValidator.sanitize_display_text('"; alert()', 'test')
Traceback (most recent call last):
...
ValueError: test contains potentially dangerous character sequences
>>> SecurityValidator.sanitize_display_text('-->test', 'test')
'-->test'
>>> SecurityValidator.sanitize_display_text('--><script>', 'test')
Traceback (most recent call last):
...
ValueError: test contains HTML tags that may cause display issues
>>> SecurityValidator.sanitize_display_text('String.fromCharCode(65)', 'test')
Traceback (most recent call last):
...
ValueError: test contains potentially dangerous character sequences
Safe character escaping:
>>> SecurityValidator.sanitize_display_text('User & Admin', 'test')
'User & Admin'
>>> SecurityValidator.sanitize_display_text('Quote: "Hello"', 'test')
'Quote: "Hello"'
>>> SecurityValidator.sanitize_display_text("Quote: 'Hello'", 'test')
'Quote: 'Hello''
"""
if not value:
return value
# Check for patterns that could cause display issues
if re.search(cls.DANGEROUS_HTML_PATTERN, value, re.IGNORECASE):
raise ValueError(f"{field_name} contains HTML tags that may cause display issues")
if re.search(cls.DANGEROUS_JS_PATTERN, value, re.IGNORECASE):
raise ValueError(f"{field_name} contains script patterns that may cause display issues")
# Check for polyglot patterns (uses precompiled regex list)
for pattern in _POLYGLOT_PATTERNS:
if pattern.search(value):
raise ValueError(f"{field_name} contains potentially dangerous character sequences")
# Escape HTML entities to ensure proper display
return html.escape(value, quote=True)
@classmethod
def validate_name(cls, value: str, field_name: str = "Name") -> str:
"""Validate names with strict character requirements
Args:
value (str): Value to validate
field_name (str): Name of field being validated
Returns:
str: Value if acceptable
Raises:
ValueError: When input is not acceptable
Examples:
>>> SecurityValidator.validate_name('valid_name')
'valid_name'
>>> SecurityValidator.validate_name('valid_name-123')
'valid_name-123'
>>> SecurityValidator.validate_name('valid_name_test')
'valid_name_test'
>>> SecurityValidator.validate_name('Test Name')
'Test Name'
>>> try:
... SecurityValidator.validate_name('Invalid Name!')
... except ValueError as e:
... 'can only contain' in str(e)
True
>>> try:
... SecurityValidator.validate_name('')
... except ValueError as e:
... 'cannot be empty' in str(e)
True
>>> try:
... SecurityValidator.validate_name('name<script>')
... except ValueError as e:
... 'HTML special characters' in str(e) or 'can only contain' in str(e)
True
Test length limit (line 181):
>>> long_name = 'a' * 256
>>> try:
... SecurityValidator.validate_name(long_name)
... except ValueError as e:
... 'exceeds maximum length' in str(e)
True
Test HTML special characters (line 178):
>>> try:
... SecurityValidator.validate_name('name"test')
... except ValueError as e:
... 'can only contain' in str(e)
True
>>> try:
... SecurityValidator.validate_name("name'test")
... except ValueError as e:
... 'can only contain' in str(e)
True
>>> try:
... SecurityValidator.validate_name('name/test')
... except ValueError as e:
... 'can only contain' in str(e)
True
"""
if not value:
raise ValueError(f"{field_name} cannot be empty")
# Check against allowed pattern
if not re.match(cls.NAME_PATTERN, value):
raise ValueError(f"{field_name} can only contain letters, numbers, underscore, and hyphen. Special characters like <, >, quotes are not allowed.")
# Additional check for HTML-like patterns (uses precompiled regex)
if _HTML_SPECIAL_CHARS_RE.search(value):
raise ValueError(f"{field_name} cannot contain HTML special characters")
if len(value) > cls.MAX_NAME_LENGTH:
raise ValueError(f"{field_name} exceeds maximum length of {cls.MAX_NAME_LENGTH}")
return value
@classmethod
def validate_identifier(cls, value: str, field_name: str) -> str:
"""Validate identifiers (IDs) - MCP compliant
Args:
value (str): Value to validate
field_name (str): Name of field being validated
Returns:
str: Value if acceptable
Raises:
ValueError: When input is not acceptable
Examples:
>>> SecurityValidator.validate_identifier('valid_id', 'ID')
'valid_id'
>>> SecurityValidator.validate_identifier('valid.id.123', 'ID')
'valid.id.123'
>>> SecurityValidator.validate_identifier('valid-id_test', 'ID')
'valid-id_test'
>>> SecurityValidator.validate_identifier('test123', 'ID')
'test123'
>>> try:
... SecurityValidator.validate_identifier('Invalid/ID', 'ID')
... except ValueError as e:
... 'can only contain' in str(e)
True
>>> try:
... SecurityValidator.validate_identifier('', 'ID')
... except ValueError as e:
... 'cannot be empty' in str(e)
True
>>> try:
... SecurityValidator.validate_identifier('id<script>', 'ID')
... except ValueError as e:
... 'HTML special characters' in str(e) or 'can only contain' in str(e)
True
Test HTML special characters (line 233):
>>> try:
... SecurityValidator.validate_identifier('id"test', 'ID')
... except ValueError as e:
... 'can only contain' in str(e)
True
>>> try:
... SecurityValidator.validate_identifier("id'test", 'ID')
... except ValueError as e:
... 'can only contain' in str(e)
True
>>> try:
... SecurityValidator.validate_identifier('id/test', 'ID')
... except ValueError as e:
... 'can only contain' in str(e)
True
Test length limit (line 236):
>>> long_id = 'a' * 256
>>> try:
... SecurityValidator.validate_identifier(long_id, 'ID')
... except ValueError as e:
... 'exceeds maximum length' in str(e)
True
"""
if not value:
raise ValueError(f"{field_name} cannot be empty")
# MCP spec: identifiers should be alphanumeric + limited special chars
if not re.match(cls.IDENTIFIER_PATTERN, value):
raise ValueError(f"{field_name} can only contain letters, numbers, underscore, hyphen, and dots")
# Block HTML-like patterns (uses precompiled regex)
if _HTML_SPECIAL_CHARS_RE.search(value):
raise ValueError(f"{field_name} cannot contain HTML special characters")
if len(value) > cls.MAX_NAME_LENGTH:
raise ValueError(f"{field_name} exceeds maximum length of {cls.MAX_NAME_LENGTH}")
return value
@classmethod
def validate_uri(cls, value: str, field_name: str = "URI") -> str:
"""Validate URIs - MCP compliant
Args:
value (str): Value to validate
field_name (str): Name of field being validated
Returns:
str: Value if acceptable
Raises:
ValueError: When input is not acceptable
Examples:
>>> SecurityValidator.validate_uri('/valid/uri', 'URI')
'/valid/uri'
>>> SecurityValidator.validate_uri('..', 'URI')
Traceback (most recent call last):
...
ValueError: URI cannot contain directory traversal sequences ('..')
"""
if not value:
raise ValueError(f"{field_name} cannot be empty")
# Block HTML-like patterns
if re.search(cls.VALIDATION_UNSAFE_URI_PATTERN, value):
raise ValueError(f"{field_name} cannot contain HTML special characters")
if ".." in value:
raise ValueError(f"{field_name} cannot contain directory traversal sequences ('..')")
if not re.search(cls.VALIDATION_SAFE_URI_PATTERN, value):
raise ValueError(f"{field_name} contains invalid characters")
if len(value) > cls.MAX_NAME_LENGTH:
raise ValueError(f"{field_name} exceeds maximum length of {cls.MAX_NAME_LENGTH}")
return value
@classmethod
def validate_tool_name(cls, value: str) -> str:
"""Special validation for MCP tool names
Args:
value (str): Value to validate
Returns:
str: Value if acceptable
Raises:
ValueError: When input is not acceptable
Examples:
>>> SecurityValidator.validate_tool_name('tool_1')
'tool_1'
>>> SecurityValidator.validate_tool_name('_5gpt_query')
'_5gpt_query'
>>> SecurityValidator.validate_tool_name('1tool')
'1tool'
Test invalid characters (rejected by pattern):
>>> try:
... SecurityValidator.validate_tool_name('tool<script>')
... except ValueError as e:
... 'must start with a letter, number, or underscore' in str(e)
True
>>> try:
... SecurityValidator.validate_tool_name('tool"test')
... except ValueError as e:
... 'must start with a letter, number, or underscore' in str(e)
True
>>> try:
... SecurityValidator.validate_tool_name("tool'test")
... except ValueError as e:
... 'must start with a letter, number, or underscore' in str(e)
True
>>> # Slashes are allowed per SEP-986
>>> SecurityValidator.validate_tool_name('tool/test')
'tool/test'
>>> SecurityValidator.validate_tool_name('namespace/subtool')
'namespace/subtool'
Test length limit (line 313):
>>> long_tool_name = 'a' * 256
>>> try:
... SecurityValidator.validate_tool_name(long_tool_name)
... except ValueError as e:
... 'exceeds maximum length' in str(e)
True
"""
if not value:
raise ValueError("Tool name cannot be empty")
# MCP tools have specific naming requirements
if not re.match(cls.TOOL_NAME_PATTERN, value):
raise ValueError("Tool name must start with a letter, number, or underscore and contain only letters, numbers, periods, underscores, hyphens, and slashes")
# Ensure no HTML-like content (uses precompiled regex)
if _HTML_SPECIAL_CHARS_RE.search(value):
raise ValueError("Tool name cannot contain HTML special characters")
if len(value) > cls.MAX_NAME_LENGTH:
raise ValueError(f"Tool name exceeds maximum length of {cls.MAX_NAME_LENGTH}")
return value
@classmethod
def validate_uuid(cls, value: str, field_name: str = "UUID") -> str:
"""Validate UUID format
Args:
value (str): Value to validate
field_name (str): Name of field being validated
Returns:
str: Value if validated as safe
Raises:
ValueError: When value is not a valid UUID
Examples:
>>> SecurityValidator.validate_uuid('550e8400-e29b-41d4-a716-446655440000')
'550e8400e29b41d4a716446655440000'
>>> SecurityValidator.validate_uuid('invalid-uuid')
Traceback (most recent call last):
...
ValueError: UUID must be a valid UUID format
Test empty UUID (line 340):
>>> SecurityValidator.validate_uuid('')
''
Test normalized UUID format (lines 344-346):
>>> SecurityValidator.validate_uuid('550E8400-E29B-41D4-A716-446655440000')
'550e8400e29b41d4a716446655440000'
>>> SecurityValidator.validate_uuid('550e8400e29b41d4a716446655440000')
'550e8400e29b41d4a716446655440000'
Test various invalid UUID formats (line 347-348):
>>> try:
... SecurityValidator.validate_uuid('not-a-uuid')
... except ValueError as e:
... 'valid UUID format' in str(e)
True
>>> try:
... SecurityValidator.validate_uuid('550e8400-e29b-41d4-a716')
... except ValueError as e:
... 'valid UUID format' in str(e)
True
>>> try:
... SecurityValidator.validate_uuid('550e8400-e29b-41d4-a716-446655440000-extra')
... except ValueError as e:
... 'valid UUID format' in str(e)
True
>>> try:
... SecurityValidator.validate_uuid('gggggggg-gggg-gggg-gggg-gggggggggggg')
... except ValueError as e:
... 'valid UUID format' in str(e)
True
"""
if not value:
return value
try:
# Validate UUID format by attempting to parse it
uuid_obj = uuid.UUID(value)
# Return the normalized string representation
return str(uuid_obj).replace("-", "")
except ValueError:
logger.error(f"Invalid UUID format for {field_name}: {value}")
raise ValueError(f"{field_name} must be a valid UUID format")
@classmethod
def validate_template(cls, value: str) -> str:
"""Special validation for templates - allow safe Jinja2 but prevent SSTI
Args:
value (str): Value to validate
Returns:
str: Value if acceptable
Raises:
ValueError: When input is not acceptable
Examples:
Empty template handling:
>>> SecurityValidator.validate_template('')
''
>>> SecurityValidator.validate_template(None) #doctest: +SKIP
Safe Jinja2 templates:
>>> SecurityValidator.validate_template('Hello {{ name }}')
'Hello {{ name }}'
>>> SecurityValidator.validate_template('{% if condition %}text{% endif %}')
'{% if condition %}text{% endif %}'
>>> SecurityValidator.validate_template('{{ username }}')
'{{ username }}'
Dangerous HTML tags blocked:
>>> SecurityValidator.validate_template('Hello <script>alert(1)</script>')
Traceback (most recent call last):
...
ValueError: Template contains HTML tags that may interfere with proper display
>>> SecurityValidator.validate_template('Test <iframe src="evil.com"></iframe>')
Traceback (most recent call last):
...
ValueError: Template contains HTML tags that may interfere with proper display
>>> SecurityValidator.validate_template('<form action="/evil"></form>')
Traceback (most recent call last):
...
ValueError: Template contains HTML tags that may interfere with proper display
Event handlers blocked:
>>> SecurityValidator.validate_template('<div onclick="evil()">Test</div>')
Traceback (most recent call last):
...
ValueError: Template contains event handlers that may cause display issues
>>> SecurityValidator.validate_template('onload = "alert(1)"')
Traceback (most recent call last):
...
ValueError: Template contains event handlers that may cause display issues
SSTI prevention patterns:
>>> SecurityValidator.validate_template('{{ __import__ }}')
Traceback (most recent call last):
...
ValueError: Template contains potentially dangerous expressions
>>> SecurityValidator.validate_template('{{ config }}')
Traceback (most recent call last):
...
ValueError: Template contains potentially dangerous expressions
>>> SecurityValidator.validate_template('{% import os %}')
Traceback (most recent call last):
...
ValueError: Template contains potentially dangerous expressions
>>> SecurityValidator.validate_template('{{ 7*7 }}')
Traceback (most recent call last):
...
ValueError: Template contains potentially dangerous expressions
>>> SecurityValidator.validate_template('{{ 10/2 }}')
Traceback (most recent call last):
...
ValueError: Template contains potentially dangerous expressions
>>> SecurityValidator.validate_template('{{ 5+5 }}')
Traceback (most recent call last):
...
ValueError: Template contains potentially dangerous expressions
>>> SecurityValidator.validate_template('{{ 10-5 }}')
Traceback (most recent call last):
...
ValueError: Template contains potentially dangerous expressions
Other template injection patterns:
>>> SecurityValidator.validate_template('${evil}')
Traceback (most recent call last):
...
ValueError: Template contains potentially dangerous expressions
>>> SecurityValidator.validate_template('#{evil}')
Traceback (most recent call last):
...
ValueError: Template contains potentially dangerous expressions
>>> SecurityValidator.validate_template('%{evil}')
Traceback (most recent call last):
...
ValueError: Template contains potentially dangerous expressions
Length limit testing:
>>> long_template = 'a' * 65537
>>> SecurityValidator.validate_template(long_template)
Traceback (most recent call last):
...
ValueError: Template exceeds maximum length of 65536
"""
if not value:
return value
if len(value) > cls.MAX_TEMPLATE_LENGTH:
raise ValueError(f"Template exceeds maximum length of {cls.MAX_TEMPLATE_LENGTH}")
# Block dangerous tags but allow Jinja2 syntax {{ }} and {% %} (uses precompiled regex)
if _DANGEROUS_TEMPLATE_TAGS_RE.search(value):
raise ValueError("Template contains HTML tags that may interfere with proper display")
# Check for event handlers that could cause issues (uses precompiled regex)
if _EVENT_HANDLER_RE.search(value):
raise ValueError("Template contains event handlers that may cause display issues")
# SSTI prevention - scan expressions without regex backtracking.
for expr in _iter_template_expressions(value, "{{", "}}"):
expr_lower = expr.lower()
# Normalize whitespace around | and = to catch bypass variants
expr_normalized = re.sub(r"\s*\|\s*", "|", expr_lower)
expr_normalized = re.sub(r"\s*=\s*", "=", expr_normalized)
if any(token in expr_normalized for token in _SSTI_DANGEROUS_SUBSTRINGS):
raise ValueError("Template contains potentially dangerous expressions")
if any(op in expr for op in _SSTI_DANGEROUS_OPERATORS):
raise ValueError("Template contains potentially dangerous expressions")
for expr in _iter_template_expressions(value, "{%", "%}"):
expr_lower = expr.lower()
# Normalize whitespace around | and = to catch bypass variants
expr_normalized = re.sub(r"\s*\|\s*", "|", expr_lower)
expr_normalized = re.sub(r"\s*=\s*", "=", expr_normalized)
if any(token in expr_normalized for token in _SSTI_DANGEROUS_SUBSTRINGS):
raise ValueError("Template contains potentially dangerous expressions")
if any(op in expr for op in _SSTI_DANGEROUS_OPERATORS):
raise ValueError("Template contains potentially dangerous expressions")
if any(_has_simple_template_expression(value, prefix) for prefix in _SSTI_SIMPLE_TEMPLATE_PREFIXES):
raise ValueError("Template contains potentially dangerous expressions")
return value
@classmethod
def validate_url(cls, value: str, field_name: str = "URL") -> str:
"""Validate URLs for allowed schemes and safe display
Args:
value (str): Value to validate
field_name (str): Name of field being validated
Returns:
str: Value if acceptable
Raises:
ValueError: When input is not acceptable
Examples:
Valid URLs:
>>> SecurityValidator.validate_url('https://example.com')
'https://example.com'
>>> SecurityValidator.validate_url('http://example.com')
'http://example.com'
>>> SecurityValidator.validate_url('ws://example.com')
'ws://example.com'
>>> SecurityValidator.validate_url('wss://example.com')
'wss://example.com'
>>> SecurityValidator.validate_url('https://example.com:8080/path')
'https://example.com:8080/path'
>>> SecurityValidator.validate_url('https://example.com/path?query=value')
'https://example.com/path?query=value'
Empty URL handling:
>>> SecurityValidator.validate_url('')
Traceback (most recent call last):
...
ValueError: URL cannot be empty
Length validation:
>>> long_url = 'https://example.com/' + 'a' * 2100
>>> SecurityValidator.validate_url(long_url)
Traceback (most recent call last):
...
ValueError: URL exceeds maximum length of 2048
Scheme validation:
>>> SecurityValidator.validate_url('ftp://example.com')
Traceback (most recent call last):
...
ValueError: URL must start with one of: http://, https://, ws://, wss://
>>> SecurityValidator.validate_url('file:///etc/passwd')
Traceback (most recent call last):
...
ValueError: URL must start with one of: http://, https://, ws://, wss://
>>> SecurityValidator.validate_url('javascript:alert(1)')
Traceback (most recent call last):
...
ValueError: URL must start with one of: http://, https://, ws://, wss://
>>> SecurityValidator.validate_url('data:text/plain,hello')
Traceback (most recent call last):
...
ValueError: URL must start with one of: http://, https://, ws://, wss://
>>> SecurityValidator.validate_url('vbscript:alert(1)')
Traceback (most recent call last):
...
ValueError: URL must start with one of: http://, https://, ws://, wss://
>>> SecurityValidator.validate_url('about:blank')
Traceback (most recent call last):
...
ValueError: URL must start with one of: http://, https://, ws://, wss://
>>> SecurityValidator.validate_url('chrome://settings')
Traceback (most recent call last):
...
ValueError: URL must start with one of: http://, https://, ws://, wss://
>>> SecurityValidator.validate_url('mailto:test@example.com')
Traceback (most recent call last):
...
ValueError: URL must start with one of: http://, https://, ws://, wss://
IPv6 URL blocking:
>>> SecurityValidator.validate_url('https://[::1]:8080/')
Traceback (most recent call last):
...
ValueError: URL contains IPv6 address which is not supported
>>> SecurityValidator.validate_url('https://[2001:db8::1]/')
Traceback (most recent call last):
...
ValueError: URL contains IPv6 address which is not supported
Protocol-relative URL blocking:
>>> SecurityValidator.validate_url('//example.com/path')
Traceback (most recent call last):
...
ValueError: URL must start with one of: http://, https://, ws://, wss://
Line break injection:
>>> SecurityValidator.validate_url('https://example.com\\rHost: evil.com')
Traceback (most recent call last):
...
ValueError: URL contains line breaks which are not allowed
>>> SecurityValidator.validate_url('https://example.com\\nHost: evil.com')
Traceback (most recent call last):
...
ValueError: URL contains line breaks which are not allowed
Space validation:
>>> SecurityValidator.validate_url('https://exam ple.com')
Traceback (most recent call last):
...
ValueError: URL contains spaces which are not allowed in URLs
>>> SecurityValidator.validate_url('https://example.com/path?query=hello world')
'https://example.com/path?query=hello world'
Malformed URLs:
>>> SecurityValidator.validate_url('https://')
Traceback (most recent call last):
...
ValueError: URL is not a valid URL
>>> SecurityValidator.validate_url('not-a-url')
Traceback (most recent call last):
...
ValueError: URL must start with one of: http://, https://, ws://, wss://
Restricted IP addresses:
>>> SecurityValidator.validate_url('https://0.0.0.0/')
Traceback (most recent call last):
...
ValueError: URL contains invalid IP address (0.0.0.0)
>>> SecurityValidator.validate_url('https://169.254.169.254/') # doctest: +ELLIPSIS
Traceback (most recent call last):
...
ValueError: URL contains IP address blocked by SSRF protection ...
Invalid port numbers:
>>> SecurityValidator.validate_url('https://example.com:0/')
Traceback (most recent call last):
...
ValueError: URL contains invalid port number
>>> try:
... SecurityValidator.validate_url('https://example.com:65536/')
... except ValueError as e:
... 'Port out of range' in str(e) or 'invalid port' in str(e)
True
Credentials in URL:
>>> SecurityValidator.validate_url('https://user:pass@example.com/')
Traceback (most recent call last):
...
ValueError: URL contains credentials which are not allowed
>>> SecurityValidator.validate_url('https://user@example.com/')
Traceback (most recent call last):
...
ValueError: URL contains credentials which are not allowed
XSS patterns in URLs:
>>> SecurityValidator.validate_url('https://example.com/<script>')
Traceback (most recent call last):
...
ValueError: URL contains HTML tags that may cause security issues
>>> SecurityValidator.validate_url('https://example.com?param=javascript:alert(1)')
Traceback (most recent call last):
...
ValueError: URL contains unsupported or potentially dangerous protocol
"""
if not value:
raise ValueError(f"{field_name} cannot be empty")
# Length check
if len(value) > cls.MAX_URL_LENGTH:
raise ValueError(f"{field_name} exceeds maximum length of {cls.MAX_URL_LENGTH}")
# Check allowed schemes
allowed_schemes = cls.ALLOWED_URL_SCHEMES
if not any(value.lower().startswith(scheme.lower()) for scheme in allowed_schemes):
raise ValueError(f"{field_name} must start with one of: {', '.join(allowed_schemes)}")
# Block dangerous URL patterns (uses precompiled regex list)
for pattern in _DANGEROUS_URL_PATTERNS:
if pattern.search(value):
raise ValueError(f"{field_name} contains unsupported or potentially dangerous protocol")
# Block IPv6 URLs (URLs with square brackets)
if "[" in value or "]" in value:
raise ValueError(f"{field_name} contains IPv6 address which is not supported")
# Block protocol-relative URLs
if value.startswith("//"):
raise ValueError(f"{field_name} contains protocol-relative URL which is not supported")
# Check for CRLF injection
if "\r" in value or "\n" in value:
raise ValueError(f"{field_name} contains line breaks which are not allowed")
# Check for spaces in domain
if " " in value.split("?")[0]: # Check only in the URL part, not query string
raise ValueError(f"{field_name} contains spaces which are not allowed in URLs")
# Basic URL structure validation
try:
result = urlparse(value)
if not all([result.scheme, result.netloc]):
raise ValueError(f"{field_name} is not a valid URL")
# Additional validation: ensure netloc doesn't contain brackets (double-check)
if "[" in result.netloc or "]" in result.netloc:
raise ValueError(f"{field_name} contains IPv6 address which is not supported")
# SSRF Protection: Block dangerous IP addresses and hostnames
hostname = result.hostname
if hostname:
# Always block 0.0.0.0 (all interfaces) regardless of SSRF settings
if hostname == "0.0.0.0": # nosec B104 - we're blocking this for security
raise ValueError(f"{field_name} contains invalid IP address (0.0.0.0)")
# Apply SSRF protection if enabled
if settings.ssrf_protection_enabled:
cls._validate_ssrf(hostname, field_name)
# Validate port number
if result.port is not None:
if result.port < 1 or result.port > 65535:
raise ValueError(f"{field_name} contains invalid port number")
# Check for credentials in URL
if result.username or result.password:
raise ValueError(f"{field_name} contains credentials which are not allowed")
# Check for XSS patterns in the entire URL
if re.search(cls.DANGEROUS_HTML_PATTERN, value, re.IGNORECASE):
raise ValueError(f"{field_name} contains HTML tags that may cause security issues")
if re.search(cls.DANGEROUS_JS_PATTERN, value, re.IGNORECASE):
raise ValueError(f"{field_name} contains script patterns that may cause security issues")
except ValueError:
# Re-raise ValueError as-is
raise
except Exception:
raise ValueError(f"{field_name} is not a valid URL")
return value
@classmethod
def _validate_ssrf(cls, hostname: str, field_name: str) -> None:
"""Validate hostname/IP against SSRF protection rules.
This method implements configurable SSRF (Server-Side Request Forgery) protection
to prevent the gateway from being used to access internal resources or cloud
metadata services.
Args:
hostname (str): The hostname or IP address to validate.
field_name (str): Name of field being validated (for error messages).
Raises:
ValueError: If the hostname/IP is blocked by SSRF protection rules.
Configuration (via settings):
- ssrf_protection_enabled: Master switch (must be True for this to be called)
- ssrf_blocked_networks: CIDR ranges always blocked (e.g., cloud metadata)
- ssrf_blocked_hosts: Hostnames always blocked
- ssrf_allow_localhost: If False, blocks 127.0.0.0/8 and localhost
- ssrf_allow_private_networks: If False, blocks RFC 1918 private ranges
Examples:
Cloud metadata (always blocked):
>>> from unittest.mock import patch, MagicMock
>>> mock_settings = MagicMock()
>>> mock_settings.ssrf_protection_enabled = True
>>> mock_settings.ssrf_blocked_networks = ["169.254.169.254/32"]
>>> mock_settings.ssrf_blocked_hosts = ["metadata.google.internal"]
>>> mock_settings.ssrf_allow_localhost = True
>>> mock_settings.ssrf_allow_private_networks = True
>>> with patch('mcpgateway.common.validators.settings', mock_settings):
... try:
... SecurityValidator._validate_ssrf('169.254.169.254', 'URL')
... except ValueError as e:
... 'blocked by SSRF protection' in str(e)
True
Localhost (configurable):
>>> mock_settings.ssrf_allow_localhost = False
>>> with patch('mcpgateway.common.validators.settings', mock_settings):
... try:
... SecurityValidator._validate_ssrf('127.0.0.1', 'URL')
... except ValueError as e:
... 'localhost' in str(e).lower()
True
Public IPs (always allowed):
>>> mock_settings.ssrf_allow_localhost = True
>>> mock_settings.ssrf_allow_private_networks = True
>>> with patch('mcpgateway.common.validators.settings', mock_settings):
... SecurityValidator._validate_ssrf('8.8.8.8', 'URL') # Should not raise
"""
# Normalize hostname: lowercase, strip trailing dots (DNS FQDN notation)
hostname_normalized = hostname.lower().rstrip(".")
# Check blocked hostnames (case-insensitive, normalized)
for blocked_host in settings.ssrf_blocked_hosts:
blocked_normalized = blocked_host.lower().rstrip(".")
if hostname_normalized == blocked_normalized:
raise ValueError(f"{field_name} contains blocked hostname '{hostname}' (SSRF protection)")
# Resolve hostname to IP for network-based checks
# Uses getaddrinfo to check ALL resolved addresses (A and AAAA records)
ip_addresses: list = []
try:
# Try to parse as IP address directly
ip_addresses = [ipaddress.ip_address(hostname)]
except ValueError:
# It's a hostname, resolve ALL addresses (IPv4 and IPv6)
try:
# getaddrinfo returns all A/AAAA records
addr_info = socket.getaddrinfo(hostname, None, socket.AF_UNSPEC, socket.SOCK_STREAM)
for _, _, _, _, sockaddr in addr_info:
try:
ip_addresses.append(ipaddress.ip_address(sockaddr[0]))
except ValueError:
continue
except (socket.gaierror, socket.herror):
# DNS resolution failed
if settings.ssrf_dns_fail_closed:
raise ValueError(f"{field_name} DNS resolution failed and SSRF_DNS_FAIL_CLOSED is enabled")
# Fail open: allow through (hostname blocking above catches known dangerous hostnames)
return
if not ip_addresses:
if settings.ssrf_dns_fail_closed:
raise ValueError(f"{field_name} DNS resolution returned no addresses and SSRF_DNS_FAIL_CLOSED is enabled")
return
# Check ALL resolved addresses - if ANY is blocked, reject the request
for ip_addr in ip_addresses:
# Check against blocked networks (always blocked regardless of other settings)
for network_str in settings.ssrf_blocked_networks:
try:
network = ipaddress.ip_network(network_str, strict=False)
except ValueError:
# Invalid network in config - log and skip
logger.warning(f"Invalid CIDR in ssrf_blocked_networks: {network_str}")
continue
if ip_addr in network:
raise ValueError(f"{field_name} contains IP address blocked by SSRF protection (network: {network_str})")
# Check localhost/loopback (if not allowed)
if not settings.ssrf_allow_localhost:
if ip_addr.is_loopback or hostname_normalized in ("localhost", "localhost.localdomain"):
raise ValueError(f"{field_name} contains localhost address which is blocked by SSRF protection")
# Check private networks (if not allowed)
if not settings.ssrf_allow_private_networks:
if ip_addr.is_private and not ip_addr.is_loopback:
raise ValueError(f"{field_name} contains private network address which is blocked by SSRF protection")
@classmethod
def validate_no_xss(cls, value: str, field_name: str) -> None:
"""
Validate that a string does not contain XSS patterns.
Args:
value (str): Value to validate.
field_name (str): Name of the field being validated.
Raises:
ValueError: If the value contains XSS patterns.
Examples:
Safe strings pass validation:
>>> SecurityValidator.validate_no_xss('Hello World', 'test_field')
>>> SecurityValidator.validate_no_xss('User: admin@example.com', 'email')
>>> SecurityValidator.validate_no_xss('Price: $10.99', 'price')
Empty/None strings are considered safe:
>>> SecurityValidator.validate_no_xss('', 'empty_field')
>>> SecurityValidator.validate_no_xss(None, 'none_field') #doctest: +SKIP
Dangerous HTML tags trigger validation errors:
>>> SecurityValidator.validate_no_xss('<script>alert(1)</script>', 'test_field')
Traceback (most recent call last):
...
ValueError: test_field contains HTML tags that may cause security issues
>>> SecurityValidator.validate_no_xss('<iframe src="evil.com"></iframe>', 'content')
Traceback (most recent call last):
...
ValueError: content contains HTML tags that may cause security issues
>>> SecurityValidator.validate_no_xss('<object data="malware.swf"></object>', 'data')
Traceback (most recent call last):
...
ValueError: data contains HTML tags that may cause security issues
>>> SecurityValidator.validate_no_xss('<embed src="evil.swf">', 'embed')
Traceback (most recent call last):
...
ValueError: embed contains HTML tags that may cause security issues
>>> SecurityValidator.validate_no_xss('<link rel="stylesheet" href="evil.css">', 'style')
Traceback (most recent call last):
...
ValueError: style contains HTML tags that may cause security issues
>>> SecurityValidator.validate_no_xss('<meta http-equiv="refresh" content="0;url=evil.com">', 'meta')
Traceback (most recent call last):
...
ValueError: meta contains HTML tags that may cause security issues
>>> SecurityValidator.validate_no_xss('<base href="http://evil.com">', 'base')
Traceback (most recent call last):
...
ValueError: base contains HTML tags that may cause security issues
>>> SecurityValidator.validate_no_xss('<form action="evil.php">', 'form')
Traceback (most recent call last):
...
ValueError: form contains HTML tags that may cause security issues
>>> SecurityValidator.validate_no_xss('<img src="x" onerror="alert(1)">', 'image')
Traceback (most recent call last):
...
ValueError: image contains HTML tags that may cause security issues
>>> SecurityValidator.validate_no_xss('<svg onload="alert(1)"></svg>', 'svg')
Traceback (most recent call last):
...
ValueError: svg contains HTML tags that may cause security issues
>>> SecurityValidator.validate_no_xss('<video src="x" onerror="alert(1)"></video>', 'video')
Traceback (most recent call last):
...
ValueError: video contains HTML tags that may cause security issues
>>> SecurityValidator.validate_no_xss('<audio src="x" onerror="alert(1)"></audio>', 'audio')
Traceback (most recent call last):
...
ValueError: audio contains HTML tags that may cause security issues
"""
if not value:
return # Empty values are considered safe
# Check for dangerous HTML tags
if re.search(cls.DANGEROUS_HTML_PATTERN, value, re.IGNORECASE):
raise ValueError(f"{field_name} contains HTML tags that may cause security issues")
@classmethod
def validate_json_depth(
cls,
obj: object,
max_depth: int | None = None,
current_depth: int = 0,
) -> None:
"""Validate that a JSON‑like structure does not exceed a depth limit.
A *depth* is counted **only** when we enter a container (`dict` or
`list`). Primitive values (`str`, `int`, `bool`, `None`, etc.) do not
increase the depth, but an *empty* container still counts as one level.
Args:
obj: Any Python object to inspect recursively.
max_depth: Maximum allowed depth (defaults to
:pyattr:`SecurityValidator.MAX_JSON_DEPTH`).
current_depth: Internal recursion counter. **Do not** set this
from user code.
Raises:
ValueError: If the nesting level exceeds *max_depth*.
Examples:
Simple flat dictionary – depth 1: ::
>>> SecurityValidator.validate_json_depth({'name': 'Alice'})
Nested dict – depth 2: ::
>>> SecurityValidator.validate_json_depth(
... {'user': {'name': 'Alice'}}
... )
Mixed dict/list – depth 3: ::
>>> SecurityValidator.validate_json_depth(
... {'users': [{'name': 'Alice', 'meta': {'age': 30}}]}
... )
At 10 levels of nesting – allowed: ::
>>> deep_10 = {'1': {'2': {'3': {'4': {'5': {'6': {'7': {'8':
... {'9': {'10': 'end'}}}}}}}}}}
>>> SecurityValidator.validate_json_depth(deep_10)
At new default limit (30) – allowed: ::
>>> deep_30 = {'1': {'2': {'3': {'4': {'5': {'6': {'7': {'8':
... {'9': {'10': {'11': {'12': {'13': {'14': {'15': {'16':
... {'17': {'18': {'19': {'20': {'21': {'22': {'23': {'24':
... {'25': {'26': {'27': {'28': {'29': {'30': 'end'}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}
>>> SecurityValidator.validate_json_depth(deep_30)
One level deeper – rejected: ::
>>> deep_31 = {'1': {'2': {'3': {'4': {'5': {'6': {'7': {'8':
... {'9': {'10': {'11': {'12': {'13': {'14': {'15': {'16':
... {'17': {'18': {'19': {'20': {'21': {'22': {'23': {'24':
... {'25': {'26': {'27': {'28': {'29': {'30': {'31': 'end'}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}}
>>> SecurityValidator.validate_json_depth(deep_31)
Traceback (most recent call last):
...
ValueError: JSON structure exceeds maximum depth of 30
"""
if max_depth is None:
max_depth = cls.MAX_JSON_DEPTH
# Only containers count toward depth; primitives are ignored
if not isinstance(obj, (dict, list)):
return
next_depth = current_depth + 1
if next_depth > max_depth:
raise ValueError(f"JSON structure exceeds maximum depth of {max_depth}")
if isinstance(obj, dict):
for value in obj.values():
cls.validate_json_depth(value, max_depth, next_depth)
else: # obj is a list
for item in obj:
cls.validate_json_depth(item, max_depth, next_depth)
@classmethod
def validate_mime_type(cls, value: str) -> str:
"""Validate MIME type format
Args:
value (str): Value to validate
Returns:
str: Value if acceptable
Raises:
ValueError: When input is not acceptable
Examples:
Empty/None handling:
>>> SecurityValidator.validate_mime_type('')
''
>>> SecurityValidator.validate_mime_type(None) #doctest: +SKIP
Valid standard MIME types:
>>> SecurityValidator.validate_mime_type('text/plain')
'text/plain'
>>> SecurityValidator.validate_mime_type('application/json')
'application/json'
>>> SecurityValidator.validate_mime_type('image/jpeg')
'image/jpeg'
>>> SecurityValidator.validate_mime_type('text/html')
'text/html'
>>> SecurityValidator.validate_mime_type('application/pdf')
'application/pdf'
Valid vendor-specific MIME types:
>>> SecurityValidator.validate_mime_type('application/x-custom')
'application/x-custom'
>>> SecurityValidator.validate_mime_type('text/x-log')
'text/x-log'
Valid MIME types with suffixes:
>>> SecurityValidator.validate_mime_type('application/vnd.api+json')
'application/vnd.api+json'
>>> SecurityValidator.validate_mime_type('image/svg+xml')
'image/svg+xml'
Invalid MIME type formats:
>>> SecurityValidator.validate_mime_type('invalid')
Traceback (most recent call last):
...
ValueError: Invalid MIME type format
>>> SecurityValidator.validate_mime_type('text/')
Traceback (most recent call last):
...
ValueError: Invalid MIME type format
>>> SecurityValidator.validate_mime_type('/plain')
Traceback (most recent call last):
...
ValueError: Invalid MIME type format
>>> SecurityValidator.validate_mime_type('text//plain')
Traceback (most recent call last):
...
ValueError: Invalid MIME type format
>>> SecurityValidator.validate_mime_type('text/plain/extra')
Traceback (most recent call last):
...
ValueError: Invalid MIME type format
>>> SecurityValidator.validate_mime_type('text plain')
Traceback (most recent call last):
...
ValueError: Invalid MIME type format
>>> SecurityValidator.validate_mime_type('<text/plain>')
Traceback (most recent call last):
...
ValueError: Invalid MIME type format
Disallowed MIME types (not in whitelist - line 620):
>>> try:
... SecurityValidator.validate_mime_type('application/evil')
... except ValueError as e:
... 'not in the allowed list' in str(e)
True
>>> try:
... SecurityValidator.validate_mime_type('text/evil')
... except ValueError as e:
... 'not in the allowed list' in str(e)
True
Test MIME type with parameters (line 618):
>>> try:
... SecurityValidator.validate_mime_type('application/evil; charset=utf-8')
... except ValueError as e:
... 'Invalid MIME type format' in str(e)
True
"""
if not value:
return value
# Basic MIME type pattern (uses precompiled regex)
if not _MIME_TYPE_RE.match(value):
raise ValueError("Invalid MIME type format")
# Common safe MIME types
safe_mime_types = settings.validation_allowed_mime_types
if value not in safe_mime_types:
# Allow x- vendor types and + suffixes
base_type = value.split(";")[0].strip()
if not (base_type.startswith("application/x-") or base_type.startswith("text/x-") or "+" in base_type):
raise ValueError(f"MIME type '{value}' is not in the allowed list")
return value
@classmethod
def validate_shell_parameter(cls, value: str) -> str:
"""Validate and escape shell parameters to prevent command injection.
Args:
value (str): Shell parameter to validate
Returns:
str: Validated/escaped parameter
Raises:
ValueError: If parameter contains dangerous characters in strict mode
Examples:
>>> SecurityValidator.validate_shell_parameter('safe_param')
'safe_param'
>>> SecurityValidator.validate_shell_parameter('param with spaces')
'param with spaces'
"""
if not isinstance(value, str):
raise ValueError("Parameter must be string")
# Check for dangerous patterns (uses precompiled regex)
if _SHELL_DANGEROUS_CHARS_RE.search(value):
# Check if validation is strict
strict_mode = getattr(settings, "validation_strict", True)
if strict_mode:
raise ValueError("Parameter contains shell metacharacters")
# In non-strict mode, escape using shlex
return shlex.quote(value)
return value
@classmethod
def validate_path(cls, path: str, allowed_roots: Optional[List[str]] = None) -> str:
"""Validate and normalize file paths to prevent directory traversal.
Args:
path (str): File path to validate
allowed_roots (Optional[List[str]]): List of allowed root directories
Returns:
str: Validated and normalized path
Raises:
ValueError: If path contains traversal attempts or is outside allowed roots
Examples:
>>> SecurityValidator.validate_path('/safe/path')
'/safe/path'
>>> SecurityValidator.validate_path('http://example.com/file')
'http://example.com/file'
"""
if not isinstance(path, str):
raise ValueError("Path must be string")
# Skip validation for URI schemes (http://, plugin://, etc.) (uses precompiled regex)
if _URI_SCHEME_RE.match(path):
return path
try:
p = Path(path)
# Check for path traversal
if ".." in p.parts:
raise ValueError("Path traversal detected")
resolved_path = p.resolve()
# Check against allowed roots
if allowed_roots:
allowed = any(str(resolved_path).startswith(str(Path(root).resolve())) for root in allowed_roots)
if not allowed:
raise ValueError("Path outside allowed roots")
return str(resolved_path)
except (OSError, ValueError) as e:
raise ValueError(f"Invalid path: {e}")
@classmethod
def validate_sql_parameter(cls, value: str) -> str:
"""Validate SQL parameters to prevent SQL injection attacks.
Args:
value (str): SQL parameter to validate
Returns:
str: Validated/escaped parameter
Raises:
ValueError: If parameter contains SQL injection patterns in strict mode
Examples:
>>> SecurityValidator.validate_sql_parameter('safe_value')
'safe_value'
>>> SecurityValidator.validate_sql_parameter('123')
'123'
"""
if not isinstance(value, str):
return value
# Check for SQL injection patterns (uses precompiled regex list)
for pattern in _SQL_PATTERNS:
if pattern.search(value):
if getattr(settings, "validation_strict", True):
raise ValueError("Parameter contains SQL injection patterns")
# Basic escaping
value = value.replace("'", "''").replace('"', '""')
return value
@classmethod
def validate_parameter_length(cls, value: str, max_length: Optional[int] = None) -> str:
"""Validate parameter length against configured limits.
Args:
value (str): Parameter to validate
max_length (int): Maximum allowed length
Returns:
str: Parameter if within length limits
Raises:
ValueError: If parameter exceeds maximum length
Examples:
>>> SecurityValidator.validate_parameter_length('short', 10)
'short'
"""
max_len = max_length or getattr(settings, "max_param_length", 10000)
if len(value) > max_len:
raise ValueError(f"Parameter exceeds maximum length of {max_len}")
return value
@classmethod
def sanitize_text(cls, text: str) -> str:
"""Remove control characters and ANSI escape sequences from text.
Args:
text (str): Text to sanitize
Returns:
str: Sanitized text with control characters removed
Examples:
>>> SecurityValidator.sanitize_text('Hello World')
'Hello World'
>>> SecurityValidator.sanitize_text('Text\x1b[31mwith\x1b[0mcolors')
'Textwithcolors'
"""
if not isinstance(text, str):
return text
# Remove ANSI escape sequences (uses precompiled regex)
text = _ANSI_ESCAPE_RE.sub("", text)
# Remove control characters except newlines and tabs (uses precompiled regex)
sanitized = _CONTROL_CHARS_RE.sub("", text)
return sanitized
@classmethod
def sanitize_json_response(cls, data: Any) -> Any:
"""Recursively sanitize JSON response data by removing control characters.
Args:
data (Any): JSON data structure to sanitize
Returns:
Any: Sanitized data structure with same type as input
Examples:
>>> SecurityValidator.sanitize_json_response('clean text')
'clean text'
>>> SecurityValidator.sanitize_json_response({'key': 'value'})
{'key': 'value'}
>>> SecurityValidator.sanitize_json_response(['item1', 'item2'])
['item1', 'item2']
"""
if isinstance(data, str):
return cls.sanitize_text(data)
if isinstance(data, dict):
return {k: cls.sanitize_json_response(v) for k, v in data.items()}
if isinstance(data, list):
return [cls.sanitize_json_response(item) for item in data]
return data