---
description: FastMCP Python v2.0 validation patterns
globs:
alwaysApply: false
---
> You are an expert in FastMCP Python v2.0 validation patterns, type safety, Pydantic integration, and schema enforcement. You focus on creating robust, type-safe applications with comprehensive input validation, error handling, and runtime type checking.
## FastMCP Validation Architecture Flow
```
┌─────────────────┐ ┌──────────────────┐ ┌─────────────────┐
│ Input │ │ Schema │ │ Validated │
│ Data │───▶│ Validation │───▶│ Output │
│ │ │ │ │ │
│ - Tool args │ │ - Pydantic │ │ - Type-safe │
│ - Resource data │ │ models │ │ - Sanitized │
│ - Responses │ │ - Custom │ │ - Normalized │
└─────────────────┘ └──────────────────┘ └─────────────────┘
│ │ │
▼ ▼ ▼
┌─────────────────┐ ┌──────────────────┐ ┌─────────────────┐
│ Error │ │ Runtime │ │ Response │
│ Handling │ │ Type Check │ │ Validation │
│ & Recovery │ │ & Coercion │ │ & Serialization│
└─────────────────┘ └──────────────────┘ └─────────────────┘
```
## Project Structure
```
fastmcp_validation_project/
├── src/
│ ├── my_mcp_server/
│ │ ├── __init__.py # Package initialization
│ │ ├── server.py # Main server with validation
│ │ └── models.py # Pydantic models
│ ├── validation/
│ │ ├── __init__.py # Validation exports
│ │ ├── schemas.py # Pydantic schemas
│ │ ├── validators.py # Custom validators
│ │ ├── types.py # Custom types
│ │ └── sanitizers.py # Data sanitization
│ ├── tools/
│ │ ├── __init__.py # Tool exports
│ │ ├── models.py # Tool-specific models
│ │ ├── validators.py # Tool validators
│ │ └── handlers.py # Tool handlers
│ ├── resources/
│ │ ├── __init__.py # Resource exports
│ │ ├── models.py # Resource models
│ │ └── validators.py # Resource validators
│ └── utils/
│ ├── __init__.py # Utility exports
│ ├── type_guards.py # Type guard functions
│ └── converters.py # Type converters
├── tests/
│ ├── test_validation/
│ │ ├── test_schemas.py
│ │ ├── test_validators.py
│ │ └── test_types.py
│ └── test_integration.py
└── config/
└── validation.yaml # Validation configuration
```
## Core Implementation Patterns
### Pydantic Model Integration
```python
# ✅ DO: Comprehensive Pydantic model integration with FastMCP
from fastmcp import FastMCP
from pydantic import BaseModel, Field, validator, root_validator, ValidationError
from typing import Dict, Any, List, Optional, Union, Literal
from datetime import datetime, date
from enum import Enum
import re
from decimal import Decimal
server = FastMCP(name="validation-server")
# Base models for common patterns
class BaseValidatedModel(BaseModel):
"""Base model with common validation patterns."""
class Config:
# Enable JSON schema generation
json_schema_mode = "validation"
# Validate assignment to fields
validate_assignment = True
# Allow population by field name or alias
allow_population_by_field_name = True
# Use enum values in validation
use_enum_values = True
# Additional validation options
arbitrary_types_allowed = False
extra = "forbid" # Forbid extra fields
# Enums for controlled vocabularies
class UserRole(str, Enum):
ADMIN = "admin"
USER = "user"
MODERATOR = "moderator"
GUEST = "guest"
class Priority(str, Enum):
LOW = "low"
MEDIUM = "medium"
HIGH = "high"
CRITICAL = "critical"
class Status(str, Enum):
PENDING = "pending"
IN_PROGRESS = "in_progress"
COMPLETED = "completed"
FAILED = "failed"
CANCELLED = "cancelled"
# Custom types with validation
class EmailStr(str):
"""Email string with validation."""
@classmethod
def __get_validators__(cls):
yield cls.validate
@classmethod
def validate(cls, v):
if not isinstance(v, str):
raise TypeError('string required')
email_regex = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$'
if not re.match(email_regex, v):
raise ValueError('invalid email format')
return cls(v.lower().strip())
# User model with comprehensive validation
class User(BaseValidatedModel):
"""User model with comprehensive validation."""
id: Optional[int] = Field(None, ge=1, description="User ID")
username: str = Field(
...,
min_length=3,
max_length=50,
regex=r'^[a-zA-Z0-9_]+$',
description="Username (alphanumeric and underscore only)"
)
email: EmailStr = Field(..., description="User email address")
full_name: str = Field(
...,
min_length=1,
max_length=100,
description="User's full name"
)
age: Optional[int] = Field(
None,
ge=13,
le=120,
description="User age (13-120)"
)
role: UserRole = Field(
UserRole.USER,
description="User role in the system"
)
is_active: bool = Field(True, description="Whether user is active")
created_at: Optional[datetime] = Field(
None,
description="User creation timestamp"
)
metadata: Dict[str, Any] = Field(
default_factory=dict,
description="Additional user metadata"
)
@validator('username')
def validate_username(cls, v):
"""Custom username validation."""
if v.lower() in ['admin', 'root', 'system']:
raise ValueError('Username not allowed')
return v
@validator('full_name')
def validate_full_name(cls, v):
"""Validate full name format."""
v = v.strip()
if not v:
raise ValueError('Full name cannot be empty')
# Check for special characters
if re.search(r'[<>@!#$%^&*()+=\[\]{}|;:,.<>?]', v):
raise ValueError('Full name contains invalid characters')
return v
@validator('metadata')
def validate_metadata(cls, v):
"""Validate metadata structure."""
if not isinstance(v, dict):
raise ValueError('Metadata must be a dictionary')
# Limit metadata size
if len(str(v)) > 1000:
raise ValueError('Metadata too large (max 1000 chars)')
return v
@root_validator
def validate_user_consistency(cls, values):
"""Cross-field validation."""
role = values.get('role')
age = values.get('age')
# Admin users must be at least 18
if role == UserRole.ADMIN and age and age < 18:
raise ValueError('Admin users must be at least 18 years old')
return values
# Task model with complex validation
class Task(BaseValidatedModel):
"""Task model with comprehensive validation."""
id: Optional[int] = Field(None, ge=1)
title: str = Field(
...,
min_length=1,
max_length=200,
description="Task title"
)
description: Optional[str] = Field(
None,
max_length=2000,
description="Task description"
)
priority: Priority = Field(
Priority.MEDIUM,
description="Task priority level"
)
status: Status = Field(
Status.PENDING,
description="Current task status"
)
assignee_id: Optional[int] = Field(
None,
ge=1,
description="ID of assigned user"
)
due_date: Optional[date] = Field(
None,
description="Task due date"
)
tags: List[str] = Field(
default_factory=list,
description="Task tags"
)
estimated_hours: Optional[Decimal] = Field(
None,
ge=Decimal('0.1'),
le=Decimal('1000.0'),
decimal_places=1,
description="Estimated hours to complete"
)
@validator('title')
def validate_title(cls, v):
"""Validate task title."""
v = v.strip()
if not v:
raise ValueError('Title cannot be empty')
# Check for HTML tags
if re.search(r'<[^>]+>', v):
raise ValueError('Title cannot contain HTML tags')
return v
@validator('tags')
def validate_tags(cls, v):
"""Validate task tags."""
if len(v) > 10:
raise ValueError('Maximum 10 tags allowed')
validated_tags = []
for tag in v:
if not isinstance(tag, str):
raise ValueError('All tags must be strings')
tag = tag.strip().lower()
if not tag:
continue
if len(tag) > 50:
raise ValueError('Tag length cannot exceed 50 characters')
if not re.match(r'^[a-zA-Z0-9_-]+$', tag):
raise ValueError('Tags can only contain letters, numbers, hyphens, and underscores')
validated_tags.append(tag)
return list(set(validated_tags)) # Remove duplicates
@validator('due_date')
def validate_due_date(cls, v):
"""Validate due date is not in the past."""
if v and v < date.today():
raise ValueError('Due date cannot be in the past')
return v
# Tools with validated models
@server.tool()
async def create_user(user_data: User) -> Dict[str, Any]:
"""Create a new user with validated data."""
# The user_data is already validated by Pydantic
# Additional business logic validation can be added here
# Simulate user creation
user_dict = user_data.dict()
user_dict['id'] = 12345 # Simulated ID assignment
user_dict['created_at'] = datetime.now()
return {
"success": True,
"user": user_dict,
"message": f"User '{user_data.username}' created successfully"
}
@server.tool()
async def update_user(
user_id: int = Field(..., ge=1, description="User ID to update"),
updates: Dict[str, Any] = Field(..., description="Fields to update")
) -> Dict[str, Any]:
"""Update user with partial validation."""
# Validate individual fields that can be updated
allowed_fields = {'username', 'email', 'full_name', 'age', 'role', 'is_active', 'metadata'}
if not set(updates.keys()).issubset(allowed_fields):
invalid_fields = set(updates.keys()) - allowed_fields
raise ValueError(f"Invalid fields for update: {invalid_fields}")
# Create a partial user model for validation
try:
# Get existing user (simulated)
existing_user = User(
id=user_id,
username="existing_user",
email="existing@example.com",
full_name="Existing User"
)
# Apply updates with validation
updated_data = existing_user.dict()
updated_data.update(updates)
# Validate the complete updated model
validated_user = User(**updated_data)
return {
"success": True,
"user": validated_user.dict(),
"updated_fields": list(updates.keys())
}
except ValidationError as e:
raise ValueError(f"Validation failed: {e}")
@server.tool()
async def create_task(task_data: Task) -> Dict[str, Any]:
"""Create a task with comprehensive validation."""
# Additional business validation
if task_data.assignee_id:
# Simulate checking if assignee exists
if task_data.assignee_id not in [1, 2, 3, 4, 5]: # Mock valid IDs
raise ValueError(f"Assignee {task_data.assignee_id} not found")
# Create task
task_dict = task_data.dict()
task_dict['id'] = 67890 # Simulated ID
return {
"success": True,
"task": task_dict,
"message": f"Task '{task_data.title}' created successfully"
}
# ❌ DON'T: Use unvalidated models or skip type checking
@server.tool()
async def bad_create_user(user_data: Dict[str, Any]) -> Dict[str, Any]:
# No validation, no type safety
username = user_data.get("username") # Could be None or any type
email = user_data.get("email") # No email format validation
# Direct database insertion without validation - dangerous!
return {"id": 123, "username": username, "email": email}
```
### Custom Validators and Sanitizers
```python
# ✅ DO: Implement comprehensive custom validators and sanitizers
from pydantic import validator, Field
from typing import Any, Union, List
import html
import bleach
import phonenumbers
from urllib.parse import urlparse
class ValidationUtils:
"""Utility class for common validation patterns."""
@staticmethod
def sanitize_html(value: str, allowed_tags: List[str] = None) -> str:
"""Sanitize HTML content."""
if allowed_tags is None:
allowed_tags = ['b', 'i', 'u', 'em', 'strong', 'p', 'br']
# Use bleach to sanitize HTML
return bleach.clean(value, tags=allowed_tags, strip=True)
@staticmethod
def validate_phone_number(value: str, country_code: str = "US") -> str:
"""Validate and format phone number."""
try:
parsed = phonenumbers.parse(value, country_code)
if not phonenumbers.is_valid_number(parsed):
raise ValueError("Invalid phone number")
return phonenumbers.format_number(
parsed,
phonenumbers.PhoneNumberFormat.INTERNATIONAL
)
except phonenumbers.NumberParseException:
raise ValueError("Invalid phone number format")
@staticmethod
def validate_url(value: str, allowed_schemes: List[str] = None) -> str:
"""Validate URL format and scheme."""
if allowed_schemes is None:
allowed_schemes = ['http', 'https']
try:
parsed = urlparse(value)
if not parsed.scheme:
raise ValueError("URL must include scheme (http/https)")
if parsed.scheme not in allowed_schemes:
raise ValueError(f"URL scheme must be one of: {allowed_schemes}")
if not parsed.netloc:
raise ValueError("URL must include domain")
return value.lower().strip()
except Exception as e:
raise ValueError(f"Invalid URL: {e}")
@staticmethod
def sanitize_filename(value: str) -> str:
"""Sanitize filename for safe filesystem usage."""
# Remove/replace dangerous characters
import string
valid_chars = f"-_.() {string.ascii_letters}{string.digits}"
sanitized = ''.join(c for c in value if c in valid_chars)
# Remove leading/trailing spaces and dots
sanitized = sanitized.strip('. ')
# Limit length
if len(sanitized) > 255:
name, ext = os.path.splitext(sanitized)
sanitized = name[:255-len(ext)] + ext
if not sanitized:
raise ValueError("Filename cannot be empty after sanitization")
return sanitized
# Advanced model with custom validators
class ContentItem(BaseValidatedModel):
"""Content item with advanced validation and sanitization."""
title: str = Field(..., min_length=1, max_length=200)
content: str = Field(..., min_length=1, max_length=10000)
url: Optional[str] = Field(None, description="Associated URL")
phone: Optional[str] = Field(None, description="Contact phone number")
filename: Optional[str] = Field(None, description="Associated filename")
tags: List[str] = Field(default_factory=list, max_items=20)
metadata: Dict[str, Union[str, int, float, bool]] = Field(default_factory=dict)
@validator('title', pre=True)
def sanitize_title(cls, v):
"""Sanitize and validate title."""
if not isinstance(v, str):
raise TypeError('Title must be a string')
# HTML escape and strip
v = html.escape(v.strip())
if not v:
raise ValueError('Title cannot be empty')
# Remove excessive whitespace
v = re.sub(r'\s+', ' ', v)
return v
@validator('content', pre=True)
def sanitize_content(cls, v):
"""Sanitize HTML content."""
if not isinstance(v, str):
raise TypeError('Content must be a string')
# Allow basic HTML tags
allowed_tags = ['p', 'br', 'b', 'i', 'u', 'em', 'strong', 'ul', 'ol', 'li']
return ValidationUtils.sanitize_html(v, allowed_tags)
@validator('url', pre=True)
def validate_url(cls, v):
"""Validate URL if provided."""
if v is None or v == '':
return None
return ValidationUtils.validate_url(v, ['http', 'https', 'ftp'])
@validator('phone', pre=True)
def validate_phone(cls, v):
"""Validate phone number if provided."""
if v is None or v == '':
return None
return ValidationUtils.validate_phone_number(v)
@validator('filename', pre=True)
def sanitize_filename(cls, v):
"""Sanitize filename if provided."""
if v is None or v == '':
return None
return ValidationUtils.sanitize_filename(v)
@validator('tags', pre=True)
def process_tags(cls, v):
"""Process and validate tags."""
if not v:
return []
if isinstance(v, str):
# Split string by commas
v = [tag.strip() for tag in v.split(',')]
if not isinstance(v, list):
raise TypeError('Tags must be a list or comma-separated string')
processed_tags = []
for tag in v:
if not isinstance(tag, str):
continue
tag = tag.strip().lower()
if not tag:
continue
# Validate tag format
if not re.match(r'^[a-z0-9_-]+$', tag):
continue # Skip invalid tags
if len(tag) > 30:
tag = tag[:30] # Truncate long tags
processed_tags.append(tag)
return list(set(processed_tags)) # Remove duplicates
@validator('metadata')
def validate_metadata(cls, v):
"""Validate metadata structure and types."""
if not isinstance(v, dict):
raise TypeError('Metadata must be a dictionary')
# Validate keys and values
validated_metadata = {}
for key, value in v.items():
# Validate key
if not isinstance(key, str):
continue
key = key.strip()
if not key or len(key) > 50:
continue
# Validate value types
if isinstance(value, (str, int, float, bool)):
if isinstance(value, str) and len(value) > 500:
value = value[:500] # Truncate long strings
validated_metadata[key] = value
return validated_metadata
# Tools with advanced validation
@server.tool()
async def create_content(content: ContentItem) -> Dict[str, Any]:
"""Create content with comprehensive validation and sanitization."""
# Additional business validation
if content.url and content.url.startswith('file://'):
raise ValueError("File URLs are not allowed")
# Check for spam patterns
spam_patterns = ['free money', 'click here', 'urgent']
content_lower = content.content.lower()
for pattern in spam_patterns:
if pattern in content_lower:
raise ValueError(f"Content appears to be spam (contains: {pattern})")
# Create content
content_dict = content.dict()
content_dict['id'] = 98765
content_dict['created_at'] = datetime.now().isoformat()
return {
"success": True,
"content": content_dict,
"sanitized_fields": ["title", "content", "filename"],
"processed_tags": len(content.tags)
}
# ❌ DON'T: Skip sanitization or use unsafe validation
@server.tool()
async def bad_create_content(data: Dict[str, Any]) -> Dict[str, Any]:
# No sanitization - XSS vulnerability
title = data.get("title", "")
content = data.get("content", "") # Could contain malicious HTML
# No validation of URLs - could be malicious
url = data.get("url")
# Direct usage without validation
return {
"title": title, # Unsafe
"content": content, # Unsafe
"url": url # Unsafe
}
```
### Runtime Type Checking and Validation
```python
# ✅ DO: Implement comprehensive runtime type checking and validation
from typing import get_type_hints, get_origin, get_args, Any, Union
import inspect
from functools import wraps
class TypeValidator:
"""Runtime type validation utility."""
@staticmethod
def validate_type(value: Any, expected_type: Any) -> bool:
"""Validate if value matches expected type."""
# Handle None values
if value is None:
return type(None) in get_args(expected_type) if hasattr(expected_type, '__args__') else expected_type is type(None)
# Handle Union types (including Optional)
origin = get_origin(expected_type)
if origin is Union:
return any(TypeValidator.validate_type(value, arg) for arg in get_args(expected_type))
# Handle List, Dict, etc.
if origin is not None:
if origin is list:
if not isinstance(value, list):
return False
args = get_args(expected_type)
if args:
return all(TypeValidator.validate_type(item, args[0]) for item in value)
return True
elif origin is dict:
if not isinstance(value, dict):
return False
args = get_args(expected_type)
if len(args) == 2:
key_type, value_type = args
return all(
TypeValidator.validate_type(k, key_type) and
TypeValidator.validate_type(v, value_type)
for k, v in value.items()
)
return True
# Handle basic types
if isinstance(expected_type, type):
return isinstance(value, expected_type)
return False
@staticmethod
def coerce_type(value: Any, expected_type: Any) -> Any:
"""Attempt to coerce value to expected type."""
# Handle None
if value is None:
return None
# Handle Union types
origin = get_origin(expected_type)
if origin is Union:
args = get_args(expected_type)
# Try each type in order
for arg in args:
if arg is type(None) and value is None:
return None
try:
return TypeValidator.coerce_type(value, arg)
except (ValueError, TypeError):
continue
raise ValueError(f"Cannot coerce {type(value)} to any of {args}")
# Handle basic type coercion
if expected_type is str:
return str(value)
elif expected_type is int:
if isinstance(value, str) and value.isdigit():
return int(value)
elif isinstance(value, (int, float)):
return int(value)
else:
raise ValueError(f"Cannot coerce {value} to int")
elif expected_type is float:
return float(value)
elif expected_type is bool:
if isinstance(value, str):
return value.lower() in ('true', '1', 'yes', 'on')
return bool(value)
return value
def validate_inputs(strict: bool = True, coerce: bool = False):
"""Decorator to validate function inputs at runtime."""
def decorator(func):
type_hints = get_type_hints(func)
sig = inspect.signature(func)
@wraps(func)
async def wrapper(*args, **kwargs):
# Bind arguments to parameters
bound = sig.bind(*args, **kwargs)
bound.apply_defaults()
# Validate each parameter
for param_name, value in bound.arguments.items():
if param_name in type_hints:
expected_type = type_hints[param_name]
# Skip validation for 'self' parameter
if param_name == 'self':
continue
# Validate type
if not TypeValidator.validate_type(value, expected_type):
if coerce:
try:
bound.arguments[param_name] = TypeValidator.coerce_type(value, expected_type)
continue
except (ValueError, TypeError):
pass
if strict:
raise TypeError(
f"Parameter '{param_name}' expected {expected_type}, "
f"got {type(value).__name__}: {value}"
)
# Call function with validated/coerced arguments
return await func(*bound.args, **bound.kwargs)
return wrapper
return decorator
# Example usage with runtime validation
@server.tool()
@validate_inputs(strict=True, coerce=True)
async def calculate_score(
base_score: float,
multiplier: int = 1,
bonus_points: List[float] = None,
metadata: Dict[str, Union[str, int]] = None
) -> Dict[str, Any]:
"""Calculate score with runtime type validation."""
if bonus_points is None:
bonus_points = []
if metadata is None:
metadata = {}
# Calculate final score
total_bonus = sum(bonus_points)
final_score = (base_score * multiplier) + total_bonus
return {
"base_score": base_score,
"multiplier": multiplier,
"bonus_points": bonus_points,
"total_bonus": total_bonus,
"final_score": final_score,
"metadata": metadata
}
# Validation with custom error messages
class ValidationConfig:
"""Configuration for validation behavior."""
def __init__(self):
self.strict_mode = True
self.allow_coercion = False
self.max_string_length = 1000
self.max_list_length = 100
self.max_dict_size = 50
self.custom_validators = {}
def add_validator(self, type_name: str, validator_func):
"""Add custom validator for a type."""
self.custom_validators[type_name] = validator_func
# Global validation configuration
validation_config = ValidationConfig()
def validate_with_config(func):
"""Decorator that uses global validation configuration."""
@wraps(func)
async def wrapper(*args, **kwargs):
# Validate according to configuration
if validation_config.strict_mode:
# Perform strict validation
pass
return await func(*args, **kwargs)
return wrapper
# ❌ DON'T: Skip runtime validation or use unsafe type assumptions
@server.tool()
async def bad_calculation(data: Any) -> Any:
# No type validation - could receive anything
score = data["score"] # Could be string, None, etc.
multiplier = data["multiplier"] # No type checking
# Unsafe operations without validation
return score * multiplier # Could fail at runtime
```
## Advanced Validation Patterns
```python
# ✅ DO: Implement advanced validation patterns for complex scenarios
from dataclasses import dataclass
from typing import Protocol, runtime_checkable
from abc import ABC, abstractmethod
@runtime_checkable
class Validatable(Protocol):
"""Protocol for objects that can be validated."""
def validate(self) -> bool:
"""Validate the object."""
...
def get_validation_errors(self) -> List[str]:
"""Get list of validation errors."""
...
class BaseValidator(ABC):
"""Base class for custom validators."""
@abstractmethod
async def validate(self, value: Any, context: Dict[str, Any] = None) -> bool:
"""Validate a value."""
pass
@abstractmethod
def get_error_message(self, value: Any) -> str:
"""Get error message for invalid value."""
pass
class UniqueUsernameValidator(BaseValidator):
"""Validator to check username uniqueness."""
def __init__(self, existing_usernames: set = None):
self.existing_usernames = existing_usernames or set()
async def validate(self, value: Any, context: Dict[str, Any] = None) -> bool:
"""Check if username is unique."""
if not isinstance(value, str):
return False
# Simulate database check
return value.lower() not in self.existing_usernames
def get_error_message(self, value: Any) -> str:
"""Get error message for duplicate username."""
return f"Username '{value}' is already taken"
class DatabaseConstraintValidator(BaseValidator):
"""Validator for database constraints."""
def __init__(self, constraint_type: str, table: str, column: str):
self.constraint_type = constraint_type
self.table = table
self.column = column
async def validate(self, value: Any, context: Dict[str, Any] = None) -> bool:
"""Validate database constraint."""
if self.constraint_type == "foreign_key":
# Simulate foreign key check
valid_ids = {1, 2, 3, 4, 5} # Mock valid IDs
return value in valid_ids
elif self.constraint_type == "unique":
# Simulate uniqueness check
existing_values = {"admin", "root", "system"} # Mock existing values
return value not in existing_values
return True
def get_error_message(self, value: Any) -> str:
"""Get error message for constraint violation."""
if self.constraint_type == "foreign_key":
return f"Referenced {self.table}.{self.column} does not exist: {value}"
elif self.constraint_type == "unique":
return f"Value already exists in {self.table}.{self.column}: {value}"
return f"Constraint violation: {value}"
# Validation context for complex scenarios
@dataclass
class ValidationContext:
"""Context information for validation."""
user_id: Optional[int] = None
request_ip: Optional[str] = None
user_role: Optional[str] = None
session_data: Optional[Dict[str, Any]] = None
operation_type: Optional[str] = None
class ContextAwareValidator:
"""Validator that considers context information."""
def __init__(self):
self.validators = {}
def register_validator(self, field_name: str, validator: BaseValidator):
"""Register a validator for a field."""
if field_name not in self.validators:
self.validators[field_name] = []
self.validators[field_name].append(validator)
async def validate_with_context(
self,
data: Dict[str, Any],
context: ValidationContext
) -> Dict[str, List[str]]:
"""Validate data with context information."""
errors = {}
for field_name, value in data.items():
field_errors = []
# Get validators for this field
field_validators = self.validators.get(field_name, [])
for validator in field_validators:
try:
is_valid = await validator.validate(
value,
context.__dict__
)
if not is_valid:
field_errors.append(validator.get_error_message(value))
except Exception as e:
field_errors.append(f"Validation error: {e}")
if field_errors:
errors[field_name] = field_errors
return errors
# Setup context-aware validation
context_validator = ContextAwareValidator()
context_validator.register_validator(
"username",
UniqueUsernameValidator({"admin", "root", "system"})
)
context_validator.register_validator(
"assignee_id",
DatabaseConstraintValidator("foreign_key", "users", "id")
)
# Advanced tool with context-aware validation
@server.tool()
async def create_user_advanced(
user_data: Dict[str, Any],
context_data: Dict[str, Any] = None
) -> Dict[str, Any]:
"""Create user with advanced context-aware validation."""
# Create validation context
context = ValidationContext(
user_id=context_data.get("user_id") if context_data else None,
request_ip=context_data.get("request_ip") if context_data else None,
user_role=context_data.get("user_role") if context_data else None,
operation_type="create_user"
)
# Perform context-aware validation
validation_errors = await context_validator.validate_with_context(
user_data,
context
)
if validation_errors:
raise ValidationError(
f"Validation failed: {validation_errors}"
)
# Additional role-based validation
if context.user_role != "admin":
# Non-admin users cannot create admin users
if user_data.get("role") == "admin":
raise ValueError("Insufficient permissions to create admin user")
# Validate with Pydantic model
try:
user = User(**user_data)
except ValidationError as e:
raise ValueError(f"User data validation failed: {e}")
# Create user
user_dict = user.dict()
user_dict['id'] = 12345
user_dict['created_by'] = context.user_id
user_dict['created_at'] = datetime.now()
return {
"success": True,
"user": user_dict,
"validation_context": {
"performed_by": context.user_id,
"operation": context.operation_type
}
}
# ❌ DON'T: Use simple validation without context or proper error handling
@server.tool()
async def bad_create_user_simple(user_data: Dict[str, Any]) -> Dict[str, Any]:
# No context validation
# No consideration of user permissions
# No comprehensive error handling
if "username" not in user_data:
raise ValueError("Username required") # Too simple
return {"id": 123, "username": user_data["username"]}
```
## Best Practices Summary
### Pydantic Integration
- Use comprehensive BaseModel configurations
- Implement custom validators for business logic
- Use type annotations consistently
- Handle validation errors gracefully
### Type Safety
- Implement runtime type checking for critical paths
- Use type guards and protocols where appropriate
- Provide meaningful error messages
- Consider type coercion for user input
### Validation Patterns
- Separate validation logic from business logic
- Use context-aware validation for complex scenarios
- Implement custom validators for domain-specific rules
- Provide comprehensive error reporting
### Data Sanitization
- Sanitize all user input before processing
- Use established libraries for common patterns
- Implement custom sanitizers for domain-specific needs
- Always validate after sanitization
## References
- [Pydantic Documentation](mdc:https:/docs.pydantic.dev)
- [FastMCP Validation Guide](mdc:https:/gofastmcp.com/servers/validation)
- [Python Type Hints](mdc:https:/docs.python.org/3/library/typing.html)
- [Data Validation Best Practices](mdc:https:/owasp.org/www-project-cheat-sheets/cheatsheets/Input_Validation_Cheat_Sheet.html)