"""参数验证和错误处理模块
这个模块提供了参数验证和错误处理的工具函数。
"""
import os
import re
from pathlib import Path
from typing import Any, Dict, List, Optional
import logging
logger = logging.getLogger(__name__)
class ValidationError(Exception):
"""参数验证错误"""
pass
class FileOperationError(Exception):
"""文件操作错误"""
pass
def validate_path(path: str, must_exist: bool = False, must_be_file: bool = False, must_be_dir: bool = False) -> Path:
"""验证路径参数
Args:
path: 路径字符串
must_exist: 路径必须存在
must_be_file: 路径必须是文件
must_be_dir: 路径必须是目录
Returns:
验证后的Path对象
Raises:
ValidationError: 验证失败
"""
if not path or not isinstance(path, str):
raise ValidationError("路径不能为空且必须是字符串")
# 检查路径长度
if len(path) > 4096: # 大多数文件系统的路径长度限制
raise ValidationError("路径长度超过限制(4096字符)")
# 检查非法字符(Windows和Unix通用)
illegal_chars = ['<', '>', ':', '"', '|', '?', '*']
if any(char in path for char in illegal_chars):
raise ValidationError(f"路径包含非法字符: {illegal_chars}")
# 检查相对路径攻击
if '..' in path:
raise ValidationError("路径不能包含'..'(相对路径攻击防护)")
try:
path_obj = Path(path).resolve()
except (OSError, ValueError) as e:
raise ValidationError(f"无效的路径格式: {str(e)}")
if must_exist and not path_obj.exists():
raise ValidationError(f"路径不存在: {path}")
if must_be_file and path_obj.exists() and not path_obj.is_file():
raise ValidationError(f"路径不是文件: {path}")
if must_be_dir and path_obj.exists() and not path_obj.is_dir():
raise ValidationError(f"路径不是目录: {path}")
return path_obj
def validate_encoding(encoding: str) -> str:
"""验证编码参数
Args:
encoding: 编码字符串
Returns:
验证后的编码字符串
Raises:
ValidationError: 验证失败
"""
if not encoding or not isinstance(encoding, str):
raise ValidationError("编码不能为空且必须是字符串")
# 支持的编码列表
supported_encodings = [
'utf-8', 'utf-16', 'utf-32', 'ascii', 'latin-1', 'cp1252',
'gbk', 'gb2312', 'big5', 'shift_jis', 'euc-jp', 'euc-kr'
]
encoding_lower = encoding.lower().replace('_', '-')
if encoding_lower not in [enc.replace('_', '-') for enc in supported_encodings]:
raise ValidationError(f"不支持的编码: {encoding}。支持的编码: {', '.join(supported_encodings)}")
return encoding
def validate_file_content(content: str, max_size: int = 10 * 1024 * 1024) -> str:
"""验证文件内容
Args:
content: 文件内容
max_size: 最大文件大小(字节)
Returns:
验证后的内容
Raises:
ValidationError: 验证失败
"""
if not isinstance(content, str):
raise ValidationError("文件内容必须是字符串")
# 检查内容大小
content_size = len(content.encode('utf-8'))
if content_size > max_size:
raise ValidationError(f"文件内容过大: {content_size} 字节,最大允许: {max_size} 字节")
return content
def validate_search_pattern(pattern: str) -> str:
"""验证搜索模式
Args:
pattern: 搜索模式
Returns:
验证后的模式
Raises:
ValidationError: 验证失败
"""
if not pattern or not isinstance(pattern, str):
raise ValidationError("搜索模式不能为空且必须是字符串")
# 检查模式长度
if len(pattern) > 255:
raise ValidationError("搜索模式长度超过限制(255字符)")
# 检查非法字符
illegal_chars = ['<', '>', ':', '"', '|']
if any(char in pattern for char in illegal_chars):
raise ValidationError(f"搜索模式包含非法字符: {illegal_chars}")
return pattern
def safe_file_operation(operation_name: str):
"""安全文件操作装饰器
Args:
operation_name: 操作名称
Returns:
装饰器函数
"""
def decorator(func):
async def wrapper(*args, **kwargs):
try:
return await func(*args, **kwargs)
except ValidationError as e:
error_msg = f"参数验证失败 ({operation_name}): {str(e)}"
logger.warning(error_msg)
return f"错误: {str(e)}"
except PermissionError as e:
error_msg = f"权限错误 ({operation_name}): {str(e)}"
logger.warning(error_msg)
return f"错误: 没有权限执行此操作"
except FileNotFoundError as e:
error_msg = f"文件未找到 ({operation_name}): {str(e)}"
logger.warning(error_msg)
return f"错误: 文件或目录不存在"
except OSError as e:
error_msg = f"系统错误 ({operation_name}): {str(e)}"
logger.error(error_msg)
return f"错误: 系统操作失败: {str(e)}"
except Exception as e:
error_msg = f"未知错误 ({operation_name}): {str(e)}"
logger.error(error_msg)
return f"错误: 操作失败: {str(e)}"
return wrapper
return decorator
def validate_tool_arguments(tool_name: str, arguments: Dict[str, Any]) -> Dict[str, Any]:
"""验证工具参数
Args:
tool_name: 工具名称
arguments: 参数字典
Returns:
验证后的参数字典
Raises:
ValidationError: 验证失败
"""
if not isinstance(arguments, dict):
raise ValidationError("参数必须是字典类型")
validated_args = arguments.copy()
if tool_name == "list_directory":
if "path" not in arguments:
raise ValidationError("缺少必需参数: path")
validated_args["path"] = str(validate_path(arguments["path"], must_exist=True, must_be_dir=True))
validated_args["show_hidden"] = bool(arguments.get("show_hidden", False))
elif tool_name == "read_file":
if "path" not in arguments:
raise ValidationError("缺少必需参数: path")
validated_args["path"] = str(validate_path(arguments["path"], must_exist=True, must_be_file=True))
validated_args["encoding"] = validate_encoding(arguments.get("encoding", "utf-8"))
elif tool_name == "create_file":
if "path" not in arguments:
raise ValidationError("缺少必需参数: path")
validated_args["path"] = str(validate_path(arguments["path"]))
validated_args["content"] = validate_file_content(arguments.get("content", ""))
validated_args["encoding"] = validate_encoding(arguments.get("encoding", "utf-8"))
elif tool_name == "delete_file":
if "path" not in arguments:
raise ValidationError("缺少必需参数: path")
validated_args["path"] = str(validate_path(arguments["path"], must_exist=True))
validated_args["recursive"] = bool(arguments.get("recursive", False))
elif tool_name == "search_files":
if "directory" not in arguments:
raise ValidationError("缺少必需参数: directory")
if "pattern" not in arguments:
raise ValidationError("缺少必需参数: pattern")
validated_args["directory"] = str(validate_path(arguments["directory"], must_exist=True, must_be_dir=True))
validated_args["pattern"] = validate_search_pattern(arguments["pattern"])
validated_args["recursive"] = bool(arguments.get("recursive", True))
else:
raise ValidationError(f"未知的工具: {tool_name}")
return validated_args