Skip to main content
Glama
auth.py4.2 kB
""" 认证中间件 提供用户认证、权限验证和安全检查功能。 """ import os from pathlib import Path from typing import Optional, List, Set from datetime import datetime, timedelta import hashlib from data_access.security import SecurityValidator class AuthMiddleware: """认证中间件类""" def __init__(self, allowed_paths: Optional[List[str]] = None): """ 初始化认证中间件 Args: allowed_paths: 允许访问的路径列表,为空时允许所有路径 """ self.allowed_paths = set(allowed_paths) if allowed_paths else set() self.security_validator = SecurityValidator() self.session_cache = {} # 简单的内存缓存,生产环境建议使用Redis def validate_path(self, path: str) -> tuple[bool, str]: """ 验证路径安全性 Args: path: 要验证的路径 Returns: (是否安全, 错误信息) """ try: # 基础安全验证 if not self.security_validator.is_safe_path(path): return False, "路径包含不安全字符" # 规范化路径 normalized_path = Path(path).resolve() # 检查是否在允许的路径范围内 if self.allowed_paths: if not any(normalized_path.is_relative_to(Path(allowed).resolve()) for allowed in self.allowed_paths): return False, f"路径 {path} 不在允许的访问范围内" # 检查文件/目录是否存在且可访问 if not normalized_path.exists(): return False, f"路径 {path} 不存在" # 检查读取权限 if not os.access(normalized_path, os.R_OK): return False, f"路径 {path} 没有读取权限" return True, "" except (RuntimeError, ValueError) as e: return False, f"路径验证失败: {str(e)}" def create_session(self, user_id: str, permissions: Set[str]) -> str: """ 创建用户会话 Args: user_id: 用户ID permissions: 用户权限集合 Returns: 会话ID """ session_id = hashlib.sha256(f"{user_id}{datetime.now()}".encode()).hexdigest() self.session_cache[session_id] = { "user_id": user_id, "permissions": permissions, "created_at": datetime.now(), "last_accessed": datetime.now() } return session_id def validate_session(self, session_id: str, required_permission: str) -> tuple[bool, str]: """ 验证会话和权限 Args: session_id: 会话ID required_permission: 需要的权限 Returns: (是否有效, 错误信息) """ session = self.session_cache.get(session_id) if not session: return False, "会话无效或已过期" # 检查会话是否过期(24小时) if datetime.now() - session["last_accessed"] > timedelta(hours=24): del self.session_cache[session_id] return False, "会话已过期" # 检查权限 if required_permission not in session["permissions"]: return False, f"权限不足,需要权限: {required_permission}" # 更新最后访问时间 session["last_accessed"] = datetime.now() return True, "" def cleanup_expired_sessions(self): """清理过期会话""" current_time = datetime.now() expired_sessions = [ session_id for session_id, session in self.session_cache.items() if current_time - session["last_accessed"] > timedelta(hours=24) ] for session_id in expired_sessions: del self.session_cache[session_id] # 全局认证中间件实例 auth_middleware = AuthMiddleware()

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/kscz0000/Zhiwen-Assistant-MCP'

If you have feedback or need assistance with the MCP directory API, please join our Discord server