"""
认证中间件
提供用户认证、权限验证和安全检查功能。
"""
import os
from pathlib import Path
from typing import Optional, List, Set
from datetime import datetime, timedelta
import hashlib
from data_access.security import SecurityValidator
class AuthMiddleware:
"""认证中间件类"""
def __init__(self, allowed_paths: Optional[List[str]] = None):
"""
初始化认证中间件
Args:
allowed_paths: 允许访问的路径列表,为空时允许所有路径
"""
self.allowed_paths = set(allowed_paths) if allowed_paths else set()
self.security_validator = SecurityValidator()
self.session_cache = {} # 简单的内存缓存,生产环境建议使用Redis
def validate_path(self, path: str) -> tuple[bool, str]:
"""
验证路径安全性
Args:
path: 要验证的路径
Returns:
(是否安全, 错误信息)
"""
try:
# 基础安全验证
if not self.security_validator.is_safe_path(path):
return False, "路径包含不安全字符"
# 规范化路径
normalized_path = Path(path).resolve()
# 检查是否在允许的路径范围内
if self.allowed_paths:
if not any(normalized_path.is_relative_to(Path(allowed).resolve())
for allowed in self.allowed_paths):
return False, f"路径 {path} 不在允许的访问范围内"
# 检查文件/目录是否存在且可访问
if not normalized_path.exists():
return False, f"路径 {path} 不存在"
# 检查读取权限
if not os.access(normalized_path, os.R_OK):
return False, f"路径 {path} 没有读取权限"
return True, ""
except (RuntimeError, ValueError) as e:
return False, f"路径验证失败: {str(e)}"
def create_session(self, user_id: str, permissions: Set[str]) -> str:
"""
创建用户会话
Args:
user_id: 用户ID
permissions: 用户权限集合
Returns:
会话ID
"""
session_id = hashlib.sha256(f"{user_id}{datetime.now()}".encode()).hexdigest()
self.session_cache[session_id] = {
"user_id": user_id,
"permissions": permissions,
"created_at": datetime.now(),
"last_accessed": datetime.now()
}
return session_id
def validate_session(self, session_id: str, required_permission: str) -> tuple[bool, str]:
"""
验证会话和权限
Args:
session_id: 会话ID
required_permission: 需要的权限
Returns:
(是否有效, 错误信息)
"""
session = self.session_cache.get(session_id)
if not session:
return False, "会话无效或已过期"
# 检查会话是否过期(24小时)
if datetime.now() - session["last_accessed"] > timedelta(hours=24):
del self.session_cache[session_id]
return False, "会话已过期"
# 检查权限
if required_permission not in session["permissions"]:
return False, f"权限不足,需要权限: {required_permission}"
# 更新最后访问时间
session["last_accessed"] = datetime.now()
return True, ""
def cleanup_expired_sessions(self):
"""清理过期会话"""
current_time = datetime.now()
expired_sessions = [
session_id for session_id, session in self.session_cache.items()
if current_time - session["last_accessed"] > timedelta(hours=24)
]
for session_id in expired_sessions:
del self.session_cache[session_id]
# 全局认证中间件实例
auth_middleware = AuthMiddleware()