---
description: FastMCP Python v2.0 Deployment & Production
globs:
alwaysApply: false
---
> You are an expert in FastMCP Python v2.0 Deployment & Production.
# FastMCP Python Deployment & Production
## Overview
FastMCP Python v2.0 supports multiple deployment patterns for production environments. This includes transport selection (stdio, HTTP, SSE), ASGI deployment, authentication configuration, environment management, health monitoring, and performance optimization.
## Transport Selection & Configuration
### **Transport Types Overview**
- **stdio**: Direct process communication (CLI, subprocess)
- **streamable-http**: HTTP-based with streaming support
- **sse**: Server-Sent Events for real-time communication
```python
# ✅ DO: Choose transport based on deployment scenario
from fastmcp import FastMCP
app = FastMCP(name="production-server")
# Development: stdio for testing
if environment == "development":
await app.run_async(transport="stdio")
# Production: HTTP for web deployment
elif environment == "production":
await app.run_async(
transport="streamable-http",
host="0.0.0.0",
port=8000
)
# Real-time: SSE for event-driven applications
elif use_case == "real-time":
await app.run_async(
transport="sse",
host="0.0.0.0",
port=8000
)
```
### **Environment-Based Transport Configuration**
```python
# ✅ DO: Configure transport via environment variables
from fastmcp.settings import ServerSettings
class ProductionSettings(ServerSettings):
model_config = SettingsConfigDict(env_prefix="FASTMCP_")
# Transport configuration
transport: Literal["stdio", "streamable-http", "sse"] = "streamable-http"
host: str = "127.0.0.1"
port: int = 8000
# Path configuration
sse_path: str = "/sse"
message_path: str = "/messages/"
streamable_http_path: str = "/mcp"
# Production settings
debug: bool = False
log_level: str = "INFO"
workers: int = 1
async def run_server():
"""Run server with environment configuration."""
settings = ProductionSettings()
app = FastMCP(name="production-app")
await app.run_async(
transport=settings.transport,
host=settings.host,
port=settings.port,
log_level=settings.log_level
)
```
## ASGI Deployment Patterns
### **Uvicorn Production Deployment**
```python
# ✅ DO: Create ASGI application for production
from fastmcp import FastMCP
import uvicorn
app = FastMCP(name="production-server")
@app.tool()
async def health_check() -> str:
"""Health check endpoint."""
return "OK"
# Create ASGI app
asgi_app = app.streamable_http_app(path="/mcp")
# Production uvicorn configuration
if __name__ == "__main__":
uvicorn.run(
"main:asgi_app",
host="0.0.0.0",
port=8000,
workers=4,
log_level="info",
access_log=True,
loop="asyncio"
)
```
### **Gunicorn + Uvicorn Deployment**
```python
# gunicorn_config.py
# ✅ DO: Configure Gunicorn for production
bind = "0.0.0.0:8000"
workers = 4
worker_class = "uvicorn.workers.UvicornWorker"
worker_connections = 1000
max_requests = 10000
max_requests_jitter = 1000
preload_app = True
keepalive = 5
# Logging
accesslog = "/var/log/fastmcp/access.log"
errorlog = "/var/log/fastmcp/error.log"
loglevel = "info"
# Security
limit_request_line = 4094
limit_request_fields = 100
limit_request_field_size = 8190
# main.py
from fastmcp import FastMCP
app = FastMCP(name="gunicorn-server")
asgi_app = app.streamable_http_app()
# Run with: gunicorn -c gunicorn_config.py main:asgi_app
```
### **Docker Deployment**
```dockerfile
# ✅ DO: Create optimized Docker image
FROM python:3.11-slim
# Set working directory
WORKDIR /app
# Install system dependencies
RUN apt-get update && apt-get install -y \
gcc \
&& rm -rf /var/lib/apt/lists/*
# Install Python dependencies
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# Copy application code
COPY . .
# Create non-root user
RUN useradd --create-home --shell /bin/bash app \
&& chown -R app:app /app
USER app
# Health check
HEALTHCHECK --interval=30s --timeout=30s --start-period=5s --retries=3 \
CMD curl -f http://localhost:8000/health || exit 1
# Run application
EXPOSE 8000
CMD ["uvicorn", "main:asgi_app", "--host", "0.0.0.0", "--port", "8000"]
```
### **Kubernetes Deployment**
```yaml
# ✅ DO: Configure Kubernetes deployment
apiVersion: apps/v1
kind: Deployment
metadata:
name: fastmcp-server
spec:
replicas: 3
selector:
matchLabels:
app: fastmcp-server
template:
metadata:
labels:
app: fastmcp-server
spec:
containers:
- name: fastmcp-server
image: your-registry/fastmcp-server:latest
ports:
- containerPort: 8000
env:
- name: FASTMCP_LOG_LEVEL
value: "INFO"
- name: FASTMCP_HOST
value: "0.0.0.0"
resources:
requests:
memory: "256Mi"
cpu: "250m"
limits:
memory: "512Mi"
cpu: "500m"
livenessProbe:
httpGet:
path: /health
port: 8000
initialDelaySeconds: 30
periodSeconds: 10
readinessProbe:
httpGet:
path: /health
port: 8000
initialDelaySeconds: 5
periodSeconds: 5
---
apiVersion: v1
kind: Service
metadata:
name: fastmcp-service
spec:
selector:
app: fastmcp-server
ports:
- port: 80
targetPort: 8000
type: ClusterIP
```
## Authentication & Security
### **OAuth Provider Configuration**
```python
# ✅ DO: Configure OAuth authentication
from fastmcp.server.auth import OAuthProvider
from fastmcp import FastMCP
# Create OAuth provider
oauth_provider = OAuthProvider(
issuer_url="https://auth.example.com",
service_documentation_url="https://docs.example.com/oauth",
required_scopes=["mcp:read", "mcp:write"]
)
# Create server with authentication
app = FastMCP(
name="secure-server",
auth=oauth_provider
)
@app.tool()
async def protected_tool(data: str) -> str:
"""Tool that requires authentication."""
return f"Processed: {data}"
# Run with authentication
asgi_app = app.streamable_http_app()
```
### **Custom Authentication**
```python
# ✅ DO: Implement custom authentication
from fastmcp.server.auth import OAuthProvider
from typing import Dict, Any
class CustomOAuthProvider(OAuthProvider):
def __init__(self, api_key_header: str = "X-API-Key"):
super().__init__(
issuer_url="https://internal-auth.company.com",
service_documentation_url="https://docs.company.com/auth"
)
self.api_key_header = api_key_header
self.valid_keys = self._load_valid_keys()
def _load_valid_keys(self) -> set[str]:
"""Load valid API keys from environment or database."""
# Implementation
return {"key1", "key2", "key3"}
async def authenticate_request(self, headers: Dict[str, str]) -> bool:
"""Custom authentication logic."""
api_key = headers.get(self.api_key_header)
return api_key in self.valid_keys
# Use custom auth
auth_provider = CustomOAuthProvider()
app = FastMCP(name="custom-auth-server", auth=auth_provider)
```
### **Environment-Based Security Configuration**
```python
# ✅ DO: Configure security via environment
class SecuritySettings(BaseSettings):
model_config = SettingsConfigDict(env_prefix="SECURITY_")
# OAuth configuration
oauth_issuer_url: str | None = None
oauth_client_id: str | None = None
oauth_client_secret: str | None = None
required_scopes: list[str] = ["mcp:read"]
# API key configuration
api_key_header: str = "X-API-Key"
valid_api_keys: list[str] = []
# Security settings
enable_cors: bool = True
cors_origins: list[str] = ["*"]
enable_rate_limiting: bool = True
rate_limit_requests: int = 100
rate_limit_window: int = 60 # seconds
def create_secure_app(security_settings: SecuritySettings) -> FastMCP:
"""Create application with security configuration."""
auth_provider = None
if security_settings.oauth_issuer_url:
auth_provider = OAuthProvider(
issuer_url=security_settings.oauth_issuer_url,
required_scopes=security_settings.required_scopes
)
app = FastMCP(
name="secure-production-app",
auth=auth_provider
)
return app
```
## Environment Variable Management
### **Comprehensive Environment Configuration**
```python
# ✅ DO: Use comprehensive environment configuration
from pydantic_settings import BaseSettings, SettingsConfigDict
class AppSettings(BaseSettings):
"""Application settings with environment variable support."""
model_config = SettingsConfigDict(
env_file=".env",
env_file_encoding="utf-8",
env_nested_delimiter="__",
case_sensitive=False
)
# Application settings
app_name: str = "FastMCP Production Server"
app_version: str = "1.0.0"
debug: bool = False
# Server settings
host: str = "127.0.0.1"
port: int = 8000
workers: int = 1
# Transport settings
transport: Literal["stdio", "streamable-http", "sse"] = "streamable-http"
path_prefix: str = "/mcp"
# Database settings
database_url: str | None = None
database_pool_size: int = 5
database_max_overflow: int = 10
# Redis settings
redis_url: str | None = None
redis_timeout: int = 30
# Logging settings
log_level: str = "INFO"
log_format: str = "json"
log_file: str | None = None
# Security settings
secret_key: str | None = None
cors_origins: list[str] = []
trusted_hosts: list[str] = []
# External service settings
external_api_url: str | None = None
external_api_key: str | None = None
external_api_timeout: int = 30
# Feature flags
enable_metrics: bool = True
enable_tracing: bool = False
enable_caching: bool = True
# Environment file (.env)
"""
APP_NAME=My Production FastMCP Server
APP_VERSION=2.1.0
DEBUG=false
HOST=0.0.0.0
PORT=8000
WORKERS=4
TRANSPORT=streamable-http
PATH_PREFIX=/api/mcp
DATABASE_URL=postgresql://user:pass@localhost/mydb
DATABASE_POOL_SIZE=10
REDIS_URL=redis://localhost:6379/0
LOG_LEVEL=INFO
LOG_FORMAT=json
LOG_FILE=/var/log/fastmcp/app.log
SECRET_KEY=your-secret-key-here
CORS_ORIGINS=["https://app.example.com", "https://admin.example.com"]
EXTERNAL_API_URL=https://api.external.com
EXTERNAL_API_KEY=your-api-key
"""
```
### **Environment Validation**
```python
# ✅ DO: Validate environment configuration
from pydantic import field_validator, ValidationError
class ProductionSettings(BaseSettings):
database_url: str
secret_key: str
external_api_key: str
@field_validator("database_url")
@classmethod
def validate_database_url(cls, v: str) -> str:
"""Validate database URL format."""
if not v.startswith(("postgresql://", "mysql://", "sqlite://")):
raise ValueError("Invalid database URL format")
return v
@field_validator("secret_key")
@classmethod
def validate_secret_key(cls, v: str) -> str:
"""Validate secret key strength."""
if len(v) < 32:
raise ValueError("Secret key must be at least 32 characters")
return v
def load_validated_settings() -> ProductionSettings:
"""Load and validate production settings."""
try:
return ProductionSettings()
except ValidationError as e:
logger.error(f"Configuration validation failed: {e}")
raise SystemExit(1)
```
## Health Checks & Monitoring
### **Health Check Implementation**
```python
# ✅ DO: Implement comprehensive health checks
from fastmcp import FastMCP
from datetime import datetime
import asyncio
import asyncpg
import redis.asyncio as redis
app = FastMCP(name="monitored-server")
class HealthChecker:
def __init__(self, db_pool: asyncpg.Pool, redis_client: redis.Redis):
self.db_pool = db_pool
self.redis_client = redis_client
self.start_time = datetime.utcnow()
async def check_database(self) -> dict:
"""Check database connectivity."""
try:
async with self.db_pool.acquire() as conn:
await conn.fetchval("SELECT 1")
return {"status": "healthy", "response_time_ms": 0}
except Exception as e:
return {"status": "unhealthy", "error": str(e)}
async def check_redis(self) -> dict:
"""Check Redis connectivity."""
try:
await self.redis_client.ping()
return {"status": "healthy"}
except Exception as e:
return {"status": "unhealthy", "error": str(e)}
async def check_external_apis(self) -> dict:
"""Check external API dependencies."""
# Implementation for external API health checks
return {"status": "healthy"}
health_checker = HealthChecker(db_pool, redis_client)
@app.tool()
async def health() -> dict:
"""Comprehensive health check."""
checks = await asyncio.gather(
health_checker.check_database(),
health_checker.check_redis(),
health_checker.check_external_apis(),
return_exceptions=True
)
return {
"status": "healthy" if all(
check.get("status") == "healthy" for check in checks
) else "unhealthy",
"timestamp": datetime.utcnow().isoformat(),
"uptime_seconds": (datetime.utcnow() - health_checker.start_time).total_seconds(),
"checks": {
"database": checks[0],
"redis": checks[1],
"external_apis": checks[2]
}
}
@app.tool()
async def readiness() -> dict:
"""Readiness probe for Kubernetes."""
# Check if application is ready to serve requests
return {"status": "ready", "timestamp": datetime.utcnow().isoformat()}
```
### **Metrics Collection**
```python
# ✅ DO: Implement metrics collection
from prometheus_client import Counter, Histogram, Gauge, start_http_server
import time
# Metrics
REQUEST_COUNT = Counter(
'fastmcp_requests_total',
'Total number of requests',
['method', 'endpoint', 'status']
)
REQUEST_DURATION = Histogram(
'fastmcp_request_duration_seconds',
'Request duration in seconds',
['method', 'endpoint']
)
ACTIVE_CONNECTIONS = Gauge(
'fastmcp_active_connections',
'Number of active connections'
)
class MetricsMiddleware:
def __init__(self, app: FastMCP):
self.app = app
self._setup_metrics()
def _setup_metrics(self):
"""Setup metrics collection."""
# Start Prometheus metrics server
start_http_server(9090)
async def __call__(self, scope, receive, send):
"""Middleware to collect metrics."""
start_time = time.time()
try:
ACTIVE_CONNECTIONS.inc()
await self.app(scope, receive, send)
# Record successful request
duration = time.time() - start_time
REQUEST_DURATION.labels(
method=scope.get("method", "unknown"),
endpoint=scope.get("path", "unknown")
).observe(duration)
REQUEST_COUNT.labels(
method=scope.get("method", "unknown"),
endpoint=scope.get("path", "unknown"),
status="success"
).inc()
except Exception as e:
# Record failed request
REQUEST_COUNT.labels(
method=scope.get("method", "unknown"),
endpoint=scope.get("path", "unknown"),
status="error"
).inc()
raise
finally:
ACTIVE_CONNECTIONS.dec()
```
## Performance Optimization
### **Connection Pooling**
```python
# ✅ DO: Implement proper connection pooling
import asyncpg
import redis.asyncio as redis
from contextlib import asynccontextmanager
class ConnectionManager:
def __init__(self, settings: AppSettings):
self.settings = settings
self.db_pool: asyncpg.Pool | None = None
self.redis_client: redis.Redis | None = None
async def startup(self):
"""Initialize connections on startup."""
# Database pool
if self.settings.database_url:
self.db_pool = await asyncpg.create_pool(
self.settings.database_url,
min_size=self.settings.database_pool_size,
max_size=self.settings.database_pool_size + self.settings.database_max_overflow,
command_timeout=30,
max_queries=50000,
max_inactive_connection_lifetime=300
)
# Redis client
if self.settings.redis_url:
self.redis_client = redis.from_url(
self.settings.redis_url,
socket_timeout=self.settings.redis_timeout,
socket_connect_timeout=5,
health_check_interval=30
)
async def shutdown(self):
"""Clean up connections on shutdown."""
if self.db_pool:
await self.db_pool.close()
if self.redis_client:
await self.redis_client.close()
@asynccontextmanager
async def get_db_connection(self):
"""Get database connection from pool."""
async with self.db_pool.acquire() as conn:
yield conn
@asynccontextmanager
async def lifespan(app: FastMCP):
"""Application lifespan management."""
connection_manager = ConnectionManager(settings)
await connection_manager.startup()
# Make connection manager available to the app
app.state.connections = connection_manager
yield
await connection_manager.shutdown()
app = FastMCP(name="optimized-server", lifespan=lifespan)
```
### **Caching Strategy**
```python
# ✅ DO: Implement intelligent caching
from functools import wraps
import json
import hashlib
class CacheManager:
def __init__(self, redis_client: redis.Redis):
self.redis = redis_client
self.default_ttl = 300 # 5 minutes
def _generate_cache_key(self, prefix: str, *args, **kwargs) -> str:
"""Generate cache key from function arguments."""
key_data = f"{prefix}:{args}:{sorted(kwargs.items())}"
return hashlib.md5(key_data.encode()).hexdigest()
async def get(self, key: str) -> Any:
"""Get value from cache."""
try:
value = await self.redis.get(key)
return json.loads(value) if value else None
except Exception:
return None
async def set(self, key: str, value: Any, ttl: int | None = None) -> None:
"""Set value in cache."""
try:
ttl = ttl or self.default_ttl
await self.redis.setex(key, ttl, json.dumps(value, default=str))
except Exception:
pass # Fail silently for cache errors
def cached(ttl: int = 300, prefix: str = "fastmcp"):
"""Decorator for caching function results."""
def decorator(func):
@wraps(func)
async def wrapper(*args, **kwargs):
cache = app.state.cache_manager
cache_key = cache._generate_cache_key(prefix, func.__name__, *args, **kwargs)
# Try to get from cache
cached_result = await cache.get(cache_key)
if cached_result is not None:
return cached_result
# Execute function and cache result
result = await func(*args, **kwargs)
await cache.set(cache_key, result, ttl)
return result
return wrapper
return decorator
@app.tool()
@cached(ttl=600, prefix="weather")
async def get_weather_data(location: str) -> dict:
"""Get weather data with caching."""
# Expensive API call
return {"location": location, "temperature": 72}
```
### **Async Optimization**
```python
# ✅ DO: Optimize async operations
import asyncio
from concurrent.futures import ThreadPoolExecutor
class AsyncOptimizer:
def __init__(self, max_workers: int = 10):
self.executor = ThreadPoolExecutor(max_workers=max_workers)
async def run_in_thread(self, func, *args, **kwargs):
"""Run CPU-bound function in thread pool."""
loop = asyncio.get_event_loop()
return await loop.run_in_executor(self.executor, func, *args, **kwargs)
async def gather_with_semaphore(self, semaphore: asyncio.Semaphore, tasks: list):
"""Run tasks with concurrency limiting."""
async def limited_task(task):
async with semaphore:
return await task
return await asyncio.gather(*[limited_task(task) for task in tasks])
optimizer = AsyncOptimizer()
@app.tool()
async def process_bulk_data(items: list[dict]) -> list[dict]:
"""Process multiple items concurrently with limits."""
# Limit concurrent operations
semaphore = asyncio.Semaphore(5)
async def process_item(item):
# Simulate processing
await asyncio.sleep(0.1)
return {"processed": item}
tasks = [process_item(item) for item in items]
return await optimizer.gather_with_semaphore(semaphore, tasks)
```
## Logging & Observability
### **Structured Logging**
```python
# ✅ DO: Implement structured logging
import structlog
import logging.config
# Logging configuration
LOGGING_CONFIG = {
"version": 1,
"disable_existing_loggers": False,
"formatters": {
"json": {
"()": structlog.stdlib.ProcessorFormatter,
"processor": structlog.dev.ConsoleRenderer() if settings.debug
else structlog.processors.JSONRenderer(),
},
},
"handlers": {
"console": {
"class": "logging.StreamHandler",
"formatter": "json",
},
"file": {
"class": "logging.handlers.RotatingFileHandler",
"filename": "/var/log/fastmcp/app.log",
"maxBytes": 10485760, # 10MB
"backupCount": 5,
"formatter": "json",
},
},
"loggers": {
"": {
"handlers": ["console", "file"],
"level": settings.log_level,
},
},
}
logging.config.dictConfig(LOGGING_CONFIG)
# Configure structlog
structlog.configure(
processors=[
structlog.stdlib.add_log_level,
structlog.stdlib.add_logger_name,
structlog.stdlib.PositionalArgumentsFormatter(),
structlog.processors.TimeStamper(fmt="iso"),
structlog.processors.StackInfoRenderer(),
structlog.processors.format_exc_info,
structlog.stdlib.ProcessorFormatter.wrap_for_formatter,
],
context_class=dict,
logger_factory=structlog.stdlib.LoggerFactory(),
wrapper_class=structlog.stdlib.BoundLogger,
cache_logger_on_first_use=True,
)
logger = structlog.get_logger()
@app.tool()
async def logged_operation(data: str) -> str:
"""Operation with structured logging."""
logger.info("Starting operation", operation="logged_operation", input_size=len(data))
try:
result = f"Processed: {data}"
logger.info("Operation completed", operation="logged_operation", success=True)
return result
except Exception as e:
logger.error("Operation failed", operation="logged_operation", error=str(e))
raise
```
## Deployment Checklist
### **Pre-Deployment Validation**
```python
# ✅ DO: Create deployment validation script
import sys
from pathlib import Path
class DeploymentValidator:
def __init__(self, settings: AppSettings):
self.settings = settings
self.errors: list[str] = []
def validate_environment(self):
"""Validate environment configuration."""
if not self.settings.secret_key:
self.errors.append("SECRET_KEY not configured")
if self.settings.database_url and not self.settings.database_url.startswith(("postgresql://", "mysql://")):
self.errors.append("Invalid DATABASE_URL format")
if not Path("/var/log/fastmcp").exists():
self.errors.append("Log directory does not exist")
def validate_dependencies(self):
"""Validate external dependencies."""
# Check database connectivity
# Check Redis connectivity
# Check external API endpoints
pass
def validate_security(self):
"""Validate security configuration."""
if self.settings.debug and "production" in self.settings.app_name.lower():
self.errors.append("DEBUG mode enabled in production")
if not self.settings.cors_origins and self.settings.enable_cors:
self.errors.append("CORS enabled but no origins configured")
def run_validation(self) -> bool:
"""Run all validations."""
self.validate_environment()
self.validate_dependencies()
self.validate_security()
if self.errors:
print("Deployment validation failed:")
for error in self.errors:
print(f" - {error}")
return False
print("Deployment validation passed")
return True
if __name__ == "__main__":
validator = DeploymentValidator(settings)
if not validator.run_validation():
sys.exit(1)
```
## Best Practices Summary
1. **Transport Selection**: Choose appropriate transport based on deployment scenario
2. **Environment Configuration**: Use environment variables for all configuration
3. **Security**: Implement proper authentication and security measures
4. **Health Checks**: Add comprehensive health checks and monitoring
5. **Performance**: Implement connection pooling, caching, and async optimization
6. **Logging**: Use structured logging for better observability
7. **Validation**: Validate configuration and dependencies before deployment
8. **Scaling**: Design for horizontal scaling from the start
## References
- [Deployment Documentation](mdc:fastmcp_py/docs/deployment/running-server.mdx)
- [ASGI Deployment](mdc:fastmcp_py/docs/deployment/asgi.mdx)
- [Authentication Documentation](mdc:fastmcp_py/docs/deployment/authentication.mdx)
- [CLI Documentation](mdc:fastmcp_py/docs/deployment/cli.mdx)