"""参数验证服务"""
import os
from typing import Any, List
from ..models import DataStruct
class ParameterValidator:
"""参数验证服务类"""
def validate_data_struct(self, data_struct: DataStruct, data_format: str) -> None:
"""验证数据结构与格式匹配
允许field和index都为空,表示该列不需要数据绑定
"""
if data_format == "json":
for field in data_struct.dataFields:
# JSON格式时,如果提供了index但没有field,则报错
if field.index is not None and not field.field:
raise ValueError(
f"JSON格式时字段'{field.name}'不应提供index属性,"
f"请使用field属性或留空表示不绑定数据"
)
elif data_format == "array":
for field in data_struct.dataFields:
# 数组格式时,如果提供了field但没有index,则报错
if field.field and field.index is None:
raise ValueError(
f"数组格式时字段'{field.name}'不应提供field属性,"
f"请使用index属性或留空表示不绑定数据"
)
# 检查index的有效性(如果提供了index)
if field.index is not None and field.index < 0:
raise ValueError(f"数组格式时字段'{field.name}'的index必须大于等于0")
# 检查字段名称唯一性
field_names = [field.name for field in data_struct.dataFields]
if len(field_names) != len(set(field_names)):
raise ValueError("字段名称必须唯一")
# 检查数组格式时索引唯一性
if data_format == "array":
indices = [field.index for field in data_struct.dataFields if field.index is not None]
if len(indices) != len(set(indices)):
raise ValueError("数组格式时字段索引必须唯一")
def validate_sample_data(
self,
sample_data: List[Any],
data_struct: DataStruct,
data_format: str
) -> None:
"""验证示例数据格式"""
if not sample_data:
return
for i, item in enumerate(sample_data):
if data_format == "json":
if not isinstance(item, dict):
raise ValueError(f"示例数据第{i + 1}项必须是对象类型")
# 检查必需字段是否存在(只检查有数据绑定的字段)
for field in data_struct.dataFields:
if field.field and field.field not in item:
raise ValueError(f"示例数据第{i + 1}项缺少字段: {field.field}")
elif data_format == "array":
if not isinstance(item, list):
raise ValueError(f"示例数据第{i + 1}项必须是数组类型")
# 检查数组长度是否足够(只检查有index的字段)
indices = [field.index for field in data_struct.dataFields if field.index is not None]
if indices: # 只有存在需要绑定的字段时才检查
max_index = max(indices)
if len(item) <= max_index:
raise ValueError(f"示例数据第{i + 1}项数组长度不足,至少需要{max_index + 1}个元素")
def validate_template_name(self, template_name: str) -> None:
"""验证模板名称安全性"""
if not template_name:
raise ValueError("模板名称不能为空")
if len(template_name) > 100:
raise ValueError("模板名称长度不能超过100个字符")
# 检查不安全字符
unsafe_chars = ['/', '\\', ':', '*', '?', '"', '<', '>', '|']
for char in unsafe_chars:
if char in template_name:
raise ValueError(f"模板名称不能包含字符: {char}")
# 检查保留名称
reserved_names = ['CON', 'PRN', 'AUX', 'NUL'] + [f'COM{i}' for i in range(1, 10)] + [f'LPT{i}' for i in
range(1, 10)]
if template_name.upper() in reserved_names:
raise ValueError(f"模板名称不能使用保留名称: {template_name}")
def validate_output_path(self, output_path: str) -> None:
"""验证输出路径安全性"""
if not output_path:
return
# 规范化路径
normalized_path = os.path.normpath(output_path)
# 检查路径遍历攻击
if '..' in normalized_path:
raise ValueError("输出路径不能包含'..'")
# 检查目录是否存在(如果指定了目录)
directory = os.path.dirname(normalized_path)
if directory and not os.path.exists(directory):
try:
os.makedirs(directory, exist_ok=True)
except OSError as e:
raise ValueError(f"无法创建输出目录: {e}")
def validate_variable_names(self, data_struct: DataStruct) -> None:
"""验证变量名称的有效性"""
# 检查集合名称
if not self._is_valid_identifier(data_struct.collectName):
raise ValueError(f"集合名称'{data_struct.collectName}'不是有效的标识符")
# 检查循环变量名称
if not self._is_valid_identifier(data_struct.itemVariable):
raise ValueError(f"循环变量名称'{data_struct.itemVariable}'不是有效的标识符")
# 检查JSON格式的字段名称
for field in data_struct.dataFields:
if field.field and not self._is_valid_identifier(field.field):
raise ValueError(f"字段名称'{field.field}'不是有效的标识符")
def _is_valid_identifier(self, name: str) -> bool:
"""检查是否是有效的标识符"""
if not name:
return False
# 简单的标识符检查:字母、数字、下划线,不能以数字开头
if not name[0].isalpha() and name[0] != '_':
return False
for char in name[1:]:
if not (char.isalnum() or char == '_'):
return False
return True