TOKEN_VALIDATION_TECH_DETAILS.md•7.11 kB
# Token校验技术实现细节
## 概述
本文档详细说明了SOAR MCP服务器中token校验机制的技术实现,包括从手动验证升级到基于FastMCP AuthProvider的自动化认证系统。
## 技术架构
### 1. FastMCP框架升级
- **原版本**: FastMCP 2.8.0 (功能有限的认证支持)
- **升级版本**: FastMCP 2.12.3 (完整的认证架构)
- **关键功能增强**:
- `AuthProvider` 基类和抽象接口
- `TokenVerifier` 协议接口
- `JWTVerifier` 和 `StaticTokenVerifier` 实现
- HTTP传输层的stateless_http支持
- 增强的中间件和上下文管理
### 2. 自定义认证提供者 - SOARAuthProvider
#### 核心类定义
```python
class SOARAuthProvider(AuthProvider):
"""
SOAR自定义认证提供者
基于URL参数中的token进行认证
"""
```
#### 认证流程
1. **Token提取**: 从URL查询参数中获取token (`?token=xxx`)
2. **数据库验证**: 调用现有的`db_manager.get_token_by_value()`获取token信息
3. **有效性检查**: 使用`db_manager.verify_token()`验证token并更新使用记录
4. **AccessToken创建**: 构造FastMCP标准的AccessToken对象
5. **用户信息存储**: 将认证信息存储到线程本地存储供审计使用
#### 关键实现代码
```python
async def authenticate(self, request: Request) -> Optional[AccessToken]:
token_value = request.query_params.get('token')
if not token_value:
return None
token_info = db_manager.get_token_by_value(token_value)
if not token_info:
return None
is_valid = db_manager.verify_token(token_value)
if not is_valid:
return None
# 创建标准AccessToken对象
access_token = AccessToken(
token=token_value,
scopes=self.required_scopes,
expires_at=None,
user_id=str(token_info['id']),
username=token_info['name']
)
return access_token
```
### 3. 受保护的MCP端点
#### 端点保护机制
通过`get_routes()`方法创建受保护的MCP路由:
```python
async def protected_mcp_endpoint(request: Request):
# 1. 先进行认证
access_token = await self.authenticate(request)
if not access_token:
return JSONResponse(status_code=401, content={
"error": "unauthorized",
"message": "Invalid or missing token. Please add ?token=your_token to the URL."
})
# 2. 存储认证信息到请求状态
request.state.access_token = access_token
request.state.user_id = access_token.user_id
request.state.username = access_token.username
# 3. 存储到线程本地存储供审计使用
token_info = db_manager.get_token_by_value(access_token.token)
simple_mcp_server.set_current_user_info(access_token.token, token_info)
# 4. 调用原始MCP端点
return await mcp_endpoint(request)
```
### 4. 线程本地存储机制
#### 用户信息管理
为了在认证层和审计层之间传递用户信息,实现了线程本地存储:
```python
# 线程本地存储
request_context = threading.local()
def set_current_user_info(token: str, token_info: dict):
"""设置当前请求的用户信息到线程本地存储"""
request_context.token = token
request_context.token_info = token_info
request_context.user_id = token_info.get('id') if token_info else None
request_context.username = token_info.get('name') if token_info else None
def get_current_user_info() -> dict:
"""从线程本地存储获取当前请求的用户信息"""
return {
'token': getattr(request_context, 'token', None),
'token_info': getattr(request_context, 'token_info', None),
'user_id': getattr(request_context, 'user_id', None),
'username': getattr(request_context, 'username', None)
}
```
#### 向后兼容性
保留了原有的Flask context获取机制作为fallback:
```python
def get_token_from_context() -> Optional[str]:
"""从线程本地存储中获取token"""
try:
# 首先尝试从线程本地存储获取
user_info = get_current_user_info()
if user_info['token']:
return user_info['token']
# Fallback到Flask context(保持向后兼容)
from flask import g
if hasattr(g, 'current_token'):
return g.current_token
except Exception:
pass
return None
```
### 5. 审计日志集成
#### 错误修复
原系统尝试使用不存在的FastMCP Context API,已修复为线程本地存储方案:
```python
# 修复前(错误)
# user_info = Context.current() # ❌ Context.current()方法不存在
# 修复后(正确)
user_info = get_current_user_info()
if user_info['token_info']:
db_manager.log_audit_event(
action=action,
resource=resource,
parameters=parameters,
result=result,
error_message=error_message,
token_info=user_info['token_info'] # ✅ 使用正确的参数名
)
```
### 6. 错误处理和日志优化
#### 调试信息优化
将可能引起误解的错误样式日志改为信息样式:
```python
# 优化前
logger.debug("❌ 没有Flask请求上下文")
logger.debug("❌ 最终未找到token")
# 优化后
logger.debug("ℹ️ 没有Flask请求上下文(正常,MCP请求通过FastMCP处理)")
logger.debug("ℹ️ 审计系统未找到token(MCP请求认证已在FastMCP层完成)")
```
## 技术优势
### 1. 标准化
- 使用FastMCP官方AuthProvider架构
- 符合MCP协议标准
- 与Cherry Studio等客户端完全兼容
### 2. 安全性
- 在HTTP传输层进行认证
- 统一的错误处理和响应格式
- 完整的审计日志记录
### 3. 可维护性
- 清晰的认证流程分离
- 向后兼容现有database token系统
- 模块化设计便于扩展
### 4. 性能
- 异步认证处理
- 线程本地存储减少上下文传递开销
- 高效的token验证缓存
## 部署说明
### 1. 依赖升级
```bash
pip install --upgrade fastmcp==2.12.3
```
### 2. 配置文件
服务器自动加载`SOARAuthProvider`,无需额外配置。
### 3. 客户端访问
MCP客户端需要在URL中添加token参数:
```
http://localhost:12345/mcp?token=your_token_here
```
### 4. 健康检查
新增健康检查端点:
```
GET http://localhost:12345/health
```
## 已知问题和解决方案
### 1. 循环导入
**问题**: `auth_provider.py`无法直接导入`simple_mcp_server`
**解决**: 使用动态导入在运行时加载
### 2. API兼容性
**问题**: FastMCP.run()方法不接受auth参数
**解决**: 通过AuthProvider的get_routes()方法注册受保护路由
### 3. 上下文传递
**问题**: FastMCP Context API功能有限
**解决**: 实现自定义线程本地存储机制
## 后续优化建议
1. **性能监控**: 添加认证性能指标收集
2. **缓存机制**: 实现token验证结果缓存
3. **多租户支持**: 扩展为支持多租户的认证体系
4. **JWT集成**: 考虑集成JWT token支持
5. **权限系统**: 基于token实现细粒度权限控制
---
*文档最后更新: 2025-09-19*
*技术栈: FastMCP 2.12.3, Python 3.11, SQLite*