"""
安全验证模块
提供路径安全验证、输入验证和安全检查功能。
"""
import os
import re
from pathlib import Path
from typing import List, Set, Optional, Tuple
import string
import unicodedata
class SecurityValidator:
"""安全验证器类"""
def __init__(self):
# 危险字符和模式
self.dangerous_patterns = [
r'\.\./', # 路径遍历
r'\.\.\\', # Windows路径遍历
r'^\.\.', # 以..开头
r'[<>:"|?*]', # Windows非法文件名字符
r'[\x00-\x1f\x7f]', # 控制字符
]
# 敏感文件扩展名
self.sensitive_extensions = {
'.key', '.pem', '.crt', '.p12', '.pfx', # 证书
'.env', '.config', '.ini', '.conf', # 配置
'.db', '.sqlite', '.sqlite3', # 数据库
'.log', # 日志
'.bak', '.backup', '.tmp', '.temp', # 备份和临时
}
# 敏感文件名模式
self.sensitive_filename_patterns = [
r'password',
r'secret',
r'key',
r'token',
r'credential',
r'auth',
r'private',
r'confidential',
]
# 最大文件大小限制(字节)
self.max_file_size = 50 * 1024 * 1024 # 50MB
# 最大路径长度
self.max_path_length = 260 # Windows默认限制
def is_safe_path(self, path: str) -> bool:
"""
检查路径是否安全
Args:
path: 要检查的路径
Returns:
路径是否安全
"""
if not path or not isinstance(path, str):
return False
# 检查路径长度
if len(path) > self.max_path_length:
return False
# 检查危险模式
normalized_path = os.path.normpath(path)
for pattern in self.dangerous_patterns:
if re.search(pattern, normalized_path, re.IGNORECASE):
return False
# 检查是否为绝对路径(在某些场景下可能需要)
# if os.path.isabs(normalized_path):
# return False
return True
def secure_path_join(self, base_path: str, *paths: str) -> str:
"""
安全的路径连接,防止路径遍历攻击
Args:
base_path: 基础路径
*paths: 要连接的路径部分
Returns:
安全的绝对路径
Raises:
ValueError: 如果检测到路径遍历尝试
"""
try:
base = Path(base_path).resolve()
target = base.joinpath(*paths).resolve()
# 确保目标路径在基础路径内
if not str(target).startswith(str(base)):
raise ValueError(f"路径遍历攻击检测: {paths} 尝试访问基础路径之外的位置")
return str(target)
except (OSError, ValueError) as e:
raise ValueError(f"路径安全验证失败: {str(e)}")
def validate_path_boundaries(self, base_path: str, target_path: str) -> Tuple[bool, str]:
"""
验证目标路径是否在允许的边界内
Args:
base_path: 允许的基础路径
target_path: 要验证的目标路径
Returns:
(是否安全, 错误信息)
"""
try:
base = Path(base_path).resolve()
target = Path(target_path).resolve()
# 检查目标路径是否在基础路径内
if not str(target).startswith(str(base)):
return False, f"目标路径 {target} 超出允许的基础路径 {base}"
return True, ""
except (SecurityError, ValueError) as e:
return False, f"路径边界验证失败: {str(e)}"
def is_safe_filename(self, filename: str) -> bool:
"""
检查文件名是否安全
Args:
filename: 文件名
Returns:
文件名是否安全
"""
if not filename or not isinstance(filename, str):
return False
# 检查文件名长度
if len(filename) > 255:
return False
# 检查危险字符
for char in '<>:"|?*':
if char in filename:
return False
# 检查控制字符
for char in filename:
if unicodedata.category(char) == 'Cc':
return False
# 检查是否为Windows保留名称
reserved_names = {
'CON', 'PRN', 'AUX', 'NUL',
'COM1', 'COM2', 'COM3', 'COM4', 'COM5', 'COM6', 'COM7', 'COM8', 'COM9',
'LPT1', 'LPT2', 'LPT3', 'LPT4', 'LPT5', 'LPT6', 'LPT7', 'LPT8', 'LPT9'
}
base_name = os.path.splitext(filename)[0].upper()
if base_name in reserved_names:
return False
return True
def is_sensitive_file(self, file_path: str) -> bool:
"""
检查是否为敏感文件
Args:
file_path: 文件路径
Returns:
是否为敏感文件
"""
file_path = file_path.lower()
# 检查扩展名
_, ext = os.path.splitext(file_path)
if ext in self.sensitive_extensions:
return True
# 检查文件名模式
filename = os.path.basename(file_path)
for pattern in self.sensitive_filename_patterns:
if re.search(pattern, filename, re.IGNORECASE):
return True
return False
def validate_file_size(self, file_path: str) -> Tuple[bool, str]:
"""
验证文件大小
Args:
file_path: 文件路径
Returns:
(是否有效, 错误信息)
"""
try:
if not os.path.exists(file_path):
return False, "文件不存在"
size = os.path.getsize(file_path)
if size > self.max_file_size:
return False, f"文件大小 {size} 字节超过限制 {self.max_file_size} 字节"
return True, ""
except (SecurityError, ValueError) as e:
return False, f"无法获取文件大小: {str(e)}"
def validate_directory_access(self, dir_path: str, required_permissions: Set[str] = None) -> Tuple[bool, str]:
"""
验证目录访问权限
Args:
dir_path: 目录路径
required_permissions: 需要的权限集合 {'read', 'write', 'execute'}
Returns:
(是否有效, 错误信息)
"""
if required_permissions is None:
required_permissions = {'read'}
try:
if not os.path.exists(dir_path):
return False, "目录不存在"
if not os.path.isdir(dir_path):
return False, "路径不是目录"
# 检查权限
if 'read' in required_permissions and not os.access(dir_path, os.R_OK):
return False, "目录没有读取权限"
if 'write' in required_permissions and not os.access(dir_path, os.W_OK):
return False, "目录没有写入权限"
if 'execute' in required_permissions and not os.access(dir_path, os.X_OK):
return False, "目录没有执行权限"
return True, ""
except (SecurityError, ValueError) as e:
return False, f"权限验证失败: {str(e)}"
def sanitize_path(self, path: str) -> str:
"""
清理和标准化路径
Args:
path: 原始路径
Returns:
清理后的路径
"""
if not path:
return ""
# 规范化路径分隔符
normalized = path.replace('\\', '/')
# 移除多余的斜杠
normalized = re.sub(r'/+', '/', normalized)
# 规范化路径
normalized = os.path.normpath(normalized)
return normalized
def extract_sensitive_info(self, text: str) -> List[str]:
"""
从文本中提取潜在的敏感信息
Args:
text: 要分析的文本
Returns:
发现的敏感信息列表
"""
sensitive_patterns = [
r'password["\']?\s*[:=]\s*["\']?([^"\'\s]+)',
r'api[_-]?key["\']?\s*[:=]\s*["\']?([^"\'\s]+)',
r'token["\']?\s*[:=]\s*["\']?([^"\'\s]+)',
r'secret["\']?\s*[:=]\s*["\']?([^"\'\s]+)',
r'(?:\d{3}[-.\s]?){2}\d{4}', # 社会安全号码格式
r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b', # 邮箱
]
found = []
for pattern in sensitive_patterns:
matches = re.findall(pattern, text, re.IGNORECASE)
found.extend(matches)
return list(set(found)) # 去重
def validate_input_text(self, text: str, max_length: int = 1000) -> Tuple[bool, str]:
"""
验证输入文本安全性
Args:
text: 输入文本
max_length: 最大长度
Returns:
(是否有效, 错误信息)
"""
if not isinstance(text, str):
return False, "输入必须是字符串"
if len(text) > max_length:
return False, f"输入长度超过限制 {max_length}"
# 检查是否包含潜在的脚本注入
script_patterns = [
r'<script[^>]*>.*?</script>',
r'javascript:',
r'vbscript:',
r'on\w+\s*=', # onclick=, onload=等
]
for pattern in script_patterns:
if re.search(pattern, text, re.IGNORECASE):
return False, "输入包含不安全的内容"
return True, ""
def create_secure_temp_file(self, prefix: str = "mcp_", suffix: str = ".tmp") -> str:
"""
创建安全的临时文件
Args:
prefix: 文件名前缀
suffix: 文件名后缀
Returns:
临时文件路径
"""
import tempfile
import uuid
# 使用UUID确保文件名唯一性
unique_id = str(uuid.uuid4())[:8]
safe_prefix = re.sub(r'[^\w\-_]', '', prefix)[:8]
safe_suffix = re.sub(r'[^\w\-_]', '', suffix)[:8]
filename = f"{safe_prefix}_{unique_id}{safe_suffix}"
# 创建临时文件
temp_file = tempfile.NamedTemporaryFile(
prefix=filename,
suffix=suffix,
delete=False
)
temp_file.close()
return temp_file.name