---
description: FastMCP Python v2.0 context
globs:
alwaysApply: false
---
> You are an expert in FastMCP Python v2.0 context management, dependency injection, session handling, and MCP protocol capabilities. You focus on creating robust, stateful applications with proper context isolation, lifecycle management, and capability negotiation.
## FastMCP Context Architecture Flow
```
┌─────────────────┐ ┌──────────────────┐ ┌─────────────────┐
│ Request │ │ Context │ │ Session │
│ Handler │───▶│ Creation │───▶│ Management │
│ │ │ │ │ │
│ - Tool call │ │ - Dependency │ │ - State │
│ - Resource req │ │ injection │ │ - Capabilities │
│ - Prompt call │ │ - Logging setup │ │ - Lifecycle │
└─────────────────┘ └──────────────────┘ └─────────────────┘
│ │ │
▼ ▼ ▼
┌─────────────────┐ ┌──────────────────┐ ┌─────────────────┐
│ Context │ │ Progress │ │ Cleanup │
│ Execution │ │ Reporting │ │ & Resource │
│ & Isolation │ │ & Logging │ │ Management │
└─────────────────┘ └──────────────────┘ └─────────────────┘
```
## Project Structure
```
fastmcp_context_project/
├── src/
│ ├── my_mcp_server/
│ │ ├── __init__.py # Package initialization
│ │ ├── server.py # Main server with context setup
│ │ └── lifecycle.py # Server lifecycle management
│ ├── context/
│ │ ├── __init__.py # Context exports
│ │ ├── providers.py # Context providers
│ │ ├── session.py # Session management
│ │ ├── capabilities.py # MCP capabilities
│ │ └── injection.py # Dependency injection
│ ├── services/
│ │ ├── __init__.py # Service exports
│ │ ├── database.py # Database services
│ │ ├── external_api.py # External API services
│ │ └── cache.py # Cache services
│ ├── middleware/
│ │ ├── __init__.py # Middleware exports
│ │ ├── logging.py # Logging middleware
│ │ ├── auth.py # Authentication middleware
│ │ └── monitoring.py # Monitoring middleware
│ └── utils/
│ ├── __init__.py # Utility exports
│ ├── state.py # State management
│ └── isolation.py # Context isolation
├── tests/
│ ├── test_context/
│ │ ├── test_injection.py
│ │ ├── test_session.py
│ │ └── test_capabilities.py
│ └── test_integration.py
└── config/
├── context.yaml # Context configuration
└── capabilities.yaml # MCP capabilities config
```
## Core Implementation Patterns
### Context Injection and Dependency Management
```python
# ✅ DO: Proper context injection with comprehensive dependency management
from fastmcp import FastMCP
from fastmcp.server import Context
from fastmcp.server.dependencies import Depends
from typing import Dict, Any, Optional, Protocol
from contextlib import asynccontextmanager
import asyncio
from datetime import datetime
import logging
server = FastMCP(name="context-server")
# Define service interfaces
class DatabaseService(Protocol):
async def query(self, sql: str, params: tuple = ()) -> List[Dict[str, Any]]:
...
async def execute(self, sql: str, params: tuple = ()) -> int:
...
class CacheService(Protocol):
async def get(self, key: str) -> Optional[Any]:
...
async def set(self, key: str, value: Any, ttl: int = 3600) -> bool:
...
class NotificationService(Protocol):
async def send(self, recipient: str, message: str) -> bool:
...
# Service implementations
class PostgreSQLService:
def __init__(self, database_url: str):
self.database_url = database_url
self.pool = None
async def initialize(self):
"""Initialize database connection pool."""
import asyncpg
self.pool = await asyncpg.create_pool(self.database_url)
async def cleanup(self):
"""Clean up database connections."""
if self.pool:
await self.pool.close()
async def query(self, sql: str, params: tuple = ()) -> List[Dict[str, Any]]:
async with self.pool.acquire() as conn:
rows = await conn.fetch(sql, *params)
return [dict(row) for row in rows]
async def execute(self, sql: str, params: tuple = ()) -> int:
async with self.pool.acquire() as conn:
return await conn.execute(sql, *params)
class RedisCache:
def __init__(self, redis_url: str):
self.redis_url = redis_url
self.client = None
async def initialize(self):
"""Initialize Redis connection."""
import redis.asyncio as redis
self.client = redis.from_url(self.redis_url)
async def cleanup(self):
"""Clean up Redis connections."""
if self.client:
await self.client.close()
async def get(self, key: str) -> Optional[Any]:
import json
value = await self.client.get(key)
return json.loads(value) if value else None
async def set(self, key: str, value: Any, ttl: int = 3600) -> bool:
import json
return await self.client.setex(key, ttl, json.dumps(value))
# Dependency providers
async def get_database_service() -> DatabaseService:
"""Get database service instance."""
# In production, this would be managed by the server lifecycle
if not hasattr(server, '_db_service'):
server._db_service = PostgreSQLService("postgresql://...")
await server._db_service.initialize()
return server._db_service
async def get_cache_service() -> CacheService:
"""Get cache service instance."""
if not hasattr(server, '_cache_service'):
server._cache_service = RedisCache("redis://...")
await server._cache_service.initialize()
return server._cache_service
async def get_user_from_context(context: Context) -> Optional[Dict[str, Any]]:
"""Extract user information from context."""
# Access context metadata
user_id = context.meta.get("user_id")
if not user_id:
return None
# Get user from database
db_service = await get_database_service()
users = await db_service.query(
"SELECT id, username, email, roles FROM users WHERE id = $1",
(user_id,)
)
return users[0] if users else None
# Tools with dependency injection
@server.tool()
async def get_user_data(
user_id: int,
context: Context,
db_service: DatabaseService = Depends(get_database_service),
cache_service: CacheService = Depends(get_cache_service)
) -> Dict[str, Any]:
"""Get user data with caching and comprehensive logging."""
await context.info(f"Retrieving user data for ID: {user_id}")
# Check cache first
cache_key = f"user:{user_id}"
cached_user = await cache_service.get(cache_key)
if cached_user:
await context.debug("User data found in cache")
return {**cached_user, "source": "cache"}
# Query database
await context.debug("Querying database for user data")
try:
users = await db_service.query(
"""
SELECT id, username, email, created_at, last_login,
array_agg(r.name) as roles
FROM users u
LEFT JOIN user_roles ur ON u.id = ur.user_id
LEFT JOIN roles r ON ur.role_id = r.id
WHERE u.id = $1 AND u.deleted_at IS NULL
GROUP BY u.id, u.username, u.email, u.created_at, u.last_login
""",
(user_id,)
)
if not users:
await context.warning(f"User {user_id} not found")
raise ValueError(f"User {user_id} not found")
user_data = users[0]
# Cache the result
await cache_service.set(cache_key, user_data, ttl=1800) # 30 minutes
await context.info(f"User data retrieved successfully for {user_data['username']}")
return {**user_data, "source": "database"}
except Exception as e:
await context.error(f"Failed to retrieve user data: {e}")
raise
# Context-aware operations
@server.tool()
async def update_user_profile(
profile_data: Dict[str, Any],
context: Context,
db_service: DatabaseService = Depends(get_database_service),
current_user: Optional[Dict[str, Any]] = Depends(get_user_from_context)
) -> Dict[str, Any]:
"""Update user profile with authorization and context."""
if not current_user:
await context.error("No authenticated user in context")
raise PermissionError("Authentication required")
user_id = current_user["id"]
await context.info(f"Updating profile for user {current_user['username']}")
# Validate allowed fields
allowed_fields = {"bio", "location", "website", "display_name"}
update_fields = set(profile_data.keys())
if not update_fields.issubset(allowed_fields):
invalid_fields = update_fields - allowed_fields
await context.warning(f"Invalid fields in update: {invalid_fields}")
raise ValueError(f"Invalid fields: {invalid_fields}")
# Build update query
set_clauses = []
params = []
param_count = 1
for field, value in profile_data.items():
set_clauses.append(f"{field} = ${param_count}")
params.append(value)
param_count += 1
# Add user_id and updated_at
set_clauses.append(f"updated_at = ${param_count}")
params.append(datetime.now())
param_count += 1
params.append(user_id) # WHERE clause parameter
query = f"""
UPDATE users
SET {', '.join(set_clauses)}
WHERE id = ${param_count}
RETURNING id, username, updated_at
"""
try:
result = await db_service.query(query, tuple(params))
if result:
await context.info("Profile updated successfully")
return {
"success": True,
"updated_fields": list(profile_data.keys()),
"timestamp": result[0]["updated_at"].isoformat()
}
else:
await context.error("Profile update failed - no rows affected")
raise RuntimeError("Update operation failed")
except Exception as e:
await context.error(f"Database error during profile update: {e}")
raise
# ❌ DON'T: Use global state or skip dependency injection
global_db_connection = None # Bad: global state
@server.tool()
async def bad_user_query(user_id: int):
# No context, no dependency injection, no error handling
global global_db_connection
if not global_db_connection:
global_db_connection = create_connection() # Shared state
return global_db_connection.query(f"SELECT * FROM users WHERE id = {user_id}") # SQL injection risk
```
### Session Management and State
```python
# ✅ DO: Implement comprehensive session management
from fastmcp.server.session import SessionManager
from typing import Dict, Any, Optional
import uuid
from datetime import datetime, timedelta
class SessionData:
"""Session data container with type safety."""
def __init__(self, session_id: str, user_id: Optional[str] = None):
self.session_id = session_id
self.user_id = user_id
self.created_at = datetime.now()
self.last_accessed = datetime.now()
self.data: Dict[str, Any] = {}
self.temp_data: Dict[str, Any] = {} # Cleared on session end
def touch(self):
"""Update last accessed timestamp."""
self.last_accessed = datetime.now()
def is_expired(self, max_age: timedelta = timedelta(hours=24)) -> bool:
"""Check if session is expired."""
return datetime.now() - self.last_accessed > max_age
def get(self, key: str, default: Any = None) -> Any:
"""Get session data with default."""
self.touch()
return self.data.get(key, default)
def set(self, key: str, value: Any) -> None:
"""Set session data."""
self.touch()
self.data[key] = value
def set_temp(self, key: str, value: Any) -> None:
"""Set temporary data (cleared on session end)."""
self.touch()
self.temp_data[key] = value
def clear_temp(self) -> None:
"""Clear temporary data."""
self.temp_data.clear()
class EnhancedSessionManager:
"""Enhanced session manager with persistence and cleanup."""
def __init__(self, storage_service: Optional[Any] = None):
self.storage_service = storage_service
self.sessions: Dict[str, SessionData] = {}
self.cleanup_interval = timedelta(hours=1)
self._cleanup_task: Optional[asyncio.Task] = None
async def initialize(self):
"""Initialize session manager."""
# Start cleanup task
self._cleanup_task = asyncio.create_task(self._cleanup_loop())
# Restore sessions from storage if available
if self.storage_service:
await self._restore_sessions()
async def cleanup(self):
"""Clean up session manager."""
if self._cleanup_task:
self._cleanup_task.cancel()
try:
await self._cleanup_task
except asyncio.CancelledError:
pass
# Persist sessions if storage available
if self.storage_service:
await self._persist_sessions()
async def create_session(self, user_id: Optional[str] = None) -> SessionData:
"""Create a new session."""
session_id = str(uuid.uuid4())
session = SessionData(session_id, user_id)
self.sessions[session_id] = session
if self.storage_service:
await self._persist_session(session)
return session
async def get_session(self, session_id: str) -> Optional[SessionData]:
"""Get session by ID."""
session = self.sessions.get(session_id)
if session and session.is_expired():
await self.destroy_session(session_id)
return None
if session:
session.touch()
if self.storage_service:
await self._persist_session(session)
return session
async def destroy_session(self, session_id: str) -> bool:
"""Destroy a session."""
session = self.sessions.pop(session_id, None)
if session:
session.clear_temp()
if self.storage_service:
await self._remove_session(session_id)
return True
return False
async def _cleanup_loop(self):
"""Periodic cleanup of expired sessions."""
while True:
try:
await asyncio.sleep(self.cleanup_interval.total_seconds())
await self._cleanup_expired_sessions()
except asyncio.CancelledError:
break
except Exception as e:
# Log error but continue cleanup
logging.error(f"Session cleanup error: {e}")
async def _cleanup_expired_sessions(self):
"""Remove expired sessions."""
expired_sessions = [
session_id for session_id, session in self.sessions.items()
if session.is_expired()
]
for session_id in expired_sessions:
await self.destroy_session(session_id)
if expired_sessions:
logging.info(f"Cleaned up {len(expired_sessions)} expired sessions")
async def _persist_session(self, session: SessionData):
"""Persist session to storage."""
if self.storage_service:
await self.storage_service.store_session(session)
async def _restore_sessions(self):
"""Restore sessions from storage."""
if self.storage_service:
sessions = await self.storage_service.load_sessions()
self.sessions.update(sessions)
async def _persist_sessions(self):
"""Persist all sessions to storage."""
if self.storage_service:
await self.storage_service.store_all_sessions(self.sessions)
async def _remove_session(self, session_id: str):
"""Remove session from storage."""
if self.storage_service:
await self.storage_service.remove_session(session_id)
# Session-aware context provider
session_manager = EnhancedSessionManager()
async def get_session_from_context(context: Context) -> Optional[SessionData]:
"""Get session from context metadata."""
session_id = context.meta.get("session_id")
if not session_id:
return None
return await session_manager.get_session(session_id)
# Tools with session management
@server.tool()
async def store_user_preference(
key: str,
value: Any,
context: Context,
session: Optional[SessionData] = Depends(get_session_from_context)
) -> Dict[str, Any]:
"""Store user preference in session."""
if not session:
await context.warning("No session available for preference storage")
# Create anonymous session
session = await session_manager.create_session()
await context.info(f"Created anonymous session: {session.session_id}")
await context.info(f"Storing preference '{key}' in session {session.session_id}")
# Validate key format
if not key.replace("_", "").replace("-", "").isalnum():
await context.error(f"Invalid preference key format: {key}")
raise ValueError("Preference key must be alphanumeric with underscores/hyphens")
# Store in session
session.set(f"pref_{key}", value)
await context.debug(f"Preference stored: {key} = {value}")
return {
"success": True,
"session_id": session.session_id,
"key": key,
"stored_at": datetime.now().isoformat()
}
@server.tool()
async def get_user_preferences(
context: Context,
session: Optional[SessionData] = Depends(get_session_from_context)
) -> Dict[str, Any]:
"""Get all user preferences from session."""
if not session:
await context.info("No session available, returning empty preferences")
return {"preferences": {}, "session_id": None}
await context.info(f"Retrieving preferences from session {session.session_id}")
# Extract preference keys
preferences = {}
for key, value in session.data.items():
if key.startswith("pref_"):
pref_key = key[5:] # Remove "pref_" prefix
preferences[pref_key] = value
await context.debug(f"Found {len(preferences)} preferences")
return {
"preferences": preferences,
"session_id": session.session_id,
"session_age": (datetime.now() - session.created_at).total_seconds()
}
# ❌ DON'T: Use stateless operations when state is needed
@server.tool()
async def bad_store_preference(key: str, value: Any):
# No session management, no persistence
global_prefs[key] = value # Global state - bad
return {"stored": True}
```
### MCP Capabilities and Protocol Handling
```python
# ✅ DO: Implement comprehensive MCP capabilities management
from fastmcp.server.capabilities import ServerCapabilities, ToolCapability, ResourceCapability
from mcp.types import Implementation, ServerCapabilities as MCPCapabilities
class CapabilityManager:
"""Manage MCP server capabilities dynamically."""
def __init__(self):
self.capabilities: Dict[str, Any] = {}
self.dynamic_capabilities: Dict[str, Any] = {}
self.capability_handlers: Dict[str, callable] = {}
def register_capability(self, name: str, capability: Any, handler: callable = None):
"""Register a capability with optional handler."""
self.capabilities[name] = capability
if handler:
self.capability_handlers[name] = handler
def register_dynamic_capability(self, name: str, provider: callable):
"""Register a dynamic capability provider."""
self.dynamic_capabilities[name] = provider
async def get_capabilities(self, context: Context) -> MCPCapabilities:
"""Get current server capabilities."""
await context.debug("Building server capabilities")
# Base capabilities
capabilities = MCPCapabilities(
tools={},
resources={},
prompts={},
logging={}
)
# Add static capabilities
for name, capability in self.capabilities.items():
if isinstance(capability, ToolCapability):
capabilities.tools[name] = capability
elif isinstance(capability, ResourceCapability):
capabilities.resources[name] = capability
# Add dynamic capabilities
for name, provider in self.dynamic_capabilities.items():
try:
dynamic_cap = await provider(context)
if dynamic_cap:
capabilities.tools[name] = dynamic_cap
except Exception as e:
await context.warning(f"Failed to load dynamic capability {name}: {e}")
await context.info(f"Capabilities built: {len(capabilities.tools)} tools, {len(capabilities.resources)} resources")
return capabilities
async def handle_capability_request(self, capability_name: str, context: Context) -> Any:
"""Handle capability-specific requests."""
handler = self.capability_handlers.get(capability_name)
if not handler:
await context.error(f"No handler for capability: {capability_name}")
raise ValueError(f"Capability {capability_name} not supported")
await context.debug(f"Handling capability request: {capability_name}")
try:
result = await handler(context)
await context.info(f"Capability {capability_name} handled successfully")
return result
except Exception as e:
await context.error(f"Capability handler failed for {capability_name}: {e}")
raise
# Initialize capability manager
capability_manager = CapabilityManager()
# Register server capabilities
@server.on_startup
async def setup_capabilities():
"""Setup server capabilities on startup."""
# Register logging capability
capability_manager.register_capability(
"advanced_logging",
{
"levels": ["debug", "info", "warning", "error"],
"formats": ["text", "json", "structured"],
"persistence": True
}
)
# Register file access capability
capability_manager.register_capability(
"file_access",
{
"read": True,
"write": False,
"allowed_extensions": [".txt", ".json", ".csv", ".md"],
"max_file_size": 10485760 # 10MB
}
)
# Register database capability
async def database_capability_provider(context: Context):
"""Provide database capabilities based on context."""
# Check if user has database permissions
user = await get_user_from_context(context)
if not user:
return None
user_roles = user.get("roles", [])
if "admin" in user_roles:
return {
"read": True,
"write": True,
"admin": True,
"tables": ["users", "orders", "products", "logs"]
}
elif "user" in user_roles:
return {
"read": True,
"write": False,
"admin": False,
"tables": ["users", "orders"]
}
return None
capability_manager.register_dynamic_capability(
"database_access",
database_capability_provider
)
# Context with capability checking
async def check_capability(capability_name: str, context: Context) -> bool:
"""Check if context has required capability."""
try:
capabilities = await capability_manager.get_capabilities(context)
return capability_name in capabilities.tools
except Exception as e:
await context.error(f"Capability check failed: {e}")
return False
# Capability-aware tools
@server.tool()
async def capability_aware_operation(
operation: str,
context: Context
) -> Dict[str, Any]:
"""Perform operation based on available capabilities."""
await context.info(f"Checking capabilities for operation: {operation}")
# Define required capabilities for operations
capability_requirements = {
"read_file": "file_access",
"query_database": "database_access",
"admin_operation": "admin_access"
}
required_capability = capability_requirements.get(operation)
if not required_capability:
await context.error(f"Unknown operation: {operation}")
raise ValueError(f"Operation {operation} not recognized")
# Check capability
has_capability = await check_capability(required_capability, context)
if not has_capability:
await context.warning(f"Insufficient capabilities for {operation}")
raise PermissionError(f"Operation {operation} requires {required_capability} capability")
await context.info(f"Capability check passed for {operation}")
# Perform operation based on capability
if operation == "read_file":
return await perform_file_operation(context)
elif operation == "query_database":
return await perform_database_operation(context)
elif operation == "admin_operation":
return await perform_admin_operation(context)
return {"success": True, "operation": operation, "capability": required_capability}
async def perform_file_operation(context: Context) -> Dict[str, Any]:
"""Perform file operation with capability constraints."""
await context.info("Performing file operation")
# Get file capability details
capabilities = await capability_manager.get_capabilities(context)
file_cap = capabilities.tools.get("file_access", {})
allowed_extensions = file_cap.get("allowed_extensions", [])
max_size = file_cap.get("max_file_size", 1048576) # 1MB default
return {
"operation": "file_read",
"allowed_extensions": allowed_extensions,
"max_file_size": max_size,
"timestamp": datetime.now().isoformat()
}
async def perform_database_operation(context: Context) -> Dict[str, Any]:
"""Perform database operation with capability constraints."""
await context.info("Performing database operation")
# Get dynamic database capability
db_capability = await capability_manager.dynamic_capabilities["database_access"](mdc:context)
if not db_capability:
raise PermissionError("Database access not available")
return {
"operation": "database_query",
"permissions": db_capability,
"timestamp": datetime.now().isoformat()
}
async def perform_admin_operation(context: Context) -> Dict[str, Any]:
"""Perform admin operation with elevated privileges."""
await context.info("Performing admin operation")
# Additional admin checks
user = await get_user_from_context(context)
if not user or "admin" not in user.get("roles", []):
raise PermissionError("Admin role required")
return {
"operation": "admin_function",
"admin_user": user["username"],
"timestamp": datetime.now().isoformat()
}
# ❌ DON'T: Skip capability management or use hardcoded permissions
@server.tool()
async def bad_operation(operation: str):
# No capability checking, no context awareness
if operation == "admin":
return perform_admin() # No permission check
return {"result": "executed"}
```
## Best Practices Summary
### Context Management
- Use dependency injection for service access
- Implement proper context isolation
- Handle context lifecycle appropriately
- Provide comprehensive logging and progress reporting
### Session Management
- Implement session persistence and cleanup
- Use proper session expiration policies
- Handle anonymous and authenticated sessions
- Provide session state isolation
### Capability Management
- Register capabilities dynamically
- Check permissions before operations
- Provide meaningful capability metadata
- Handle capability negotiation properly
### Dependency Injection
- Define clear service interfaces
- Use proper lifecycle management
- Handle dependency resolution gracefully
- Implement proper cleanup procedures
## References
- [FastMCP Context Documentation](mdc:https:/gofastmcp.com/servers/context)
- [MCP Capability Specification](mdc:https:/spec.modelcontextprotocol.io/specification/basic/capabilities)
- [Python Dependency Injection Patterns](mdc:https:/python-dependency-injector.ets-labs.org)
- [Session Management Best Practices](mdc:https:/cheatsheetseries.owasp.org/cheatsheets/Session_Management_Cheat_Sheet.html)